Skip to content

Commit

Permalink
wait_for_task_exit
Browse files Browse the repository at this point in the history
  • Loading branch information
SoloJacobs committed Nov 9, 2023
1 parent 13212f2 commit 28ab3c4
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 50 deletions.
2 changes: 1 addition & 1 deletion v2/robotmk/src/bin/scheduler/child_process_supervisor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::command_spec::CommandSpec;
use super::termination::kill_process_tree;
use robotmk::termination::{TerminationFlag, waited, Outcome};
use robotmk::termination::{waited, Outcome, TerminationFlag};

use anyhow::{Context, Result};
use camino::Utf8PathBuf;
Expand Down
83 changes: 37 additions & 46 deletions v2/robotmk/src/bin/scheduler/sessions/schtasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,44 @@ use crate::termination::kill_process_tree;
use robotmk::termination::TerminationFlag;

use anyhow::{bail, Context, Result};
use async_std::{
future::{timeout, TimeoutError},
task::sleep as async_sleep,
};
use camino::{Utf8Path, Utf8PathBuf};
use chrono::{Duration as ChronoDuration, Local};
use futures::executor;
use log::{debug, error};
use robotmk::termination::{waited, Outcome};
use std::fs::{read_to_string, write};
use std::process::Command;
use std::str::FromStr;
use std::thread::sleep;
use std::time::Duration;
use sysinfo::Pid;

fn wait_for_task_exit(task: &TaskSpec) -> Result<RunOutcome> {
let duration = Duration::from_secs(task.timeout);
let paths = &Paths::from(task.base_path);
let outcome = waited(
duration,
task.termination_flag,
query(task.task_name, &paths.exit_code),
);
match outcome {
Outcome::Cancel => {
kill_and_delete_task(task.task_name, &paths.pid);
Ok(RunOutcome::TimedOut)
}
Outcome::Timeout => {
error!("Timeout");
kill_and_delete_task(task.task_name, &paths.pid);
Ok(RunOutcome::Terminated)
}
Outcome::Completed(Err(e)) => Err(e.into()),
Outcome::Completed(Ok(code)) => {
debug!("Task {} completed", task.task_name);
delete_task(task.task_name);
Ok(RunOutcome::Exited(Some(code)))
}
}
}

pub fn run_task(task_spec: &TaskSpec) -> Result<RunOutcome> {
debug!(
"Running the following command as task {} for user {}:\n{}\n\nBase path: {}",
Expand All @@ -34,36 +57,7 @@ pub fn run_task(task_spec: &TaskSpec) -> Result<RunOutcome> {
run_schtasks(["/run", "/tn", task_spec.task_name])
.context(format!("Failed to start task {}", task_spec.task_name))?;

let run = executor::block_on(timeout(
Duration::from_secs(task_spec.timeout),
wait_for_task_exit(task_spec.task_name, task_spec.termination_flag, &paths.pid),
));
let run_outcome = match run {
Ok(Ok(outcome)) => outcome,
Ok(Err(e)) => {
kill_and_delete_task(task_spec.task_name, &paths.pid);
return Err(e);
}
Err(TimeoutError { .. }) => {
error!("Timed out");
kill_and_delete_task(task_spec.task_name, &paths.pid);
return Ok(RunOutcome::TimedOut);
}
};

if let Some(run_outcome) = run_outcome {
return Ok(run_outcome);
};
debug!("Task {} completed", task_spec.task_name);

delete_task(task_spec.task_name);

let raw_exit_code = read_until_first_whitespace(&paths.exit_code)?;
Ok(RunOutcome::Exited(Some(
raw_exit_code
.parse::<i32>()
.context(format!("Failed to parse {} as i32", raw_exit_code))?,
)))
wait_for_task_exit(task_spec)
}

pub struct TaskSpec<'a> {
Expand Down Expand Up @@ -169,22 +163,19 @@ where
))
}

async fn wait_for_task_exit(
task_name: &str,
termination_flag: &TerminationFlag,
path_pid: &Utf8Path,
) -> Result<Option<RunOutcome>> {
async fn query(task_name: &str, exit_path: &Utf8Path) -> Result<i32> {
debug!("Waiting for task {} to complete", task_name);
while query_if_task_is_running(task_name)
.context(format!("Failed to query if task {task_name} is running"))?
{
if termination_flag.should_terminate() {
kill_and_delete_task(task_name, path_pid);
return Ok(Some(RunOutcome::Terminated));
}
async_sleep(Duration::from_millis(250)).await
std::hint::spin_loop();
}
Ok(None)

let raw_exit_code = read_until_first_whitespace(exit_path)?;
let exit_code: i32 = raw_exit_code
.parse()
.context(format!("Failed to parse {} as i32", raw_exit_code))?;
Ok(exit_code)
}

fn query_if_task_is_running(task_name: &str) -> Result<bool> {
Expand Down
4 changes: 1 addition & 3 deletions v2/robotmk/src/termination.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::future::Future;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::future::Future;
use std::time::Duration;
use tokio::time::sleep;

Expand All @@ -17,7 +17,6 @@ impl TerminationFlag {
}
}


pub enum Outcome<T> {
Cancel,
Timeout,
Expand All @@ -41,4 +40,3 @@ where
_ = sleep(duration) => { Outcome::Timeout },
}
}

0 comments on commit 28ab3c4

Please sign in to comment.