diff --git a/src/executor.rs b/src/executor.rs index 136260d..be154c2 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -4,7 +4,7 @@ use log::{debug, info, error}; use sqlx::{Pool, QueryBuilder, Sqlite}; use tokio::task::JoinHandle; use uuid::Uuid; -use std::process::Stdio; +use std::process::{ExitStatus, Stdio}; use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader}; use tokio::process::Command; use tokio::sync::mpsc::Receiver; @@ -16,7 +16,6 @@ async fn insert_logs(db: &Pool, collected_logs: &mut Vec) -> Re if collected_logs.is_empty() { return Ok(()); } - // insert into sqlite dB let mut query_builder = QueryBuilder::new("INSERT INTO logs_lines (id, task_run_id, captured_at, kind, content)"); query_builder.push_values(collected_logs.iter(), |mut b, log_line| { b @@ -35,8 +34,12 @@ fn collect_logs(task_run_id: Uuid, kind: LogKind, stream: Stream, db: Po where Stream: AsyncRead + Unpin + std::marker::Send + 'static { tokio::spawn(async move { - let mut lines = BufReader::with_capacity(2048, stream).lines(); + let mut lines = BufReader::with_capacity(1048, stream).lines(); let mut collected_logs: Vec = vec![]; + let max_lines = match &kind { + LogKind::Stdout => 20, + LogKind::Stderr => 3 + }; while let Some(line) = lines.next_line().await.unwrap() { collected_logs.push(LogLine { id: Uuid::new_v4().to_string(), @@ -45,40 +48,15 @@ fn collect_logs(task_run_id: Uuid, kind: LogKind, stream: Stream, db: Po task_run_id: task_run_id.to_string(), captured_at: Utc::now().timestamp_millis(), }); - if collected_logs.len() > 20 { - insert_logs(&db, &mut collected_logs).await.expect("Error inserting log"); + if collected_logs.len() > max_lines { + insert_logs(&db, &mut collected_logs).await.expect("Error inserting logs."); } } insert_logs(&db, &mut collected_logs).await.expect("Error inserting log"); }) } -async fn run_task(state: AppState, order: ExecutorOrder) -> Result<()> { - debug!("Start processing of order {:?}", &order.id); - // save in DB - let _result = sqlx::query("INSERT INTO task_runs (id, task_id, trigger_mode, status, submitted_at, started_at) VALUES ($1, $2, $3, $4, $5, $5)") - .bind(order.id.to_string()) - .bind(order.task_id.clone()) - .bind(order.trigger_mode.to_string()) - .bind(TaskStatus::Running.to_string()) - .bind(Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true)) - .execute(&state.db) - .await.unwrap(); - - let task = state.config.tasks.get(&order.task_id).expect("Task id to be valid"); - let executable = match task.command.first() { - Some(v) => v, - None => return Err(anyhow!("Could not find command to execute")), - }; - let mut cmd = Command::new(executable); - - cmd.args(task.command.iter().skip(1).collect::>()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()); - for (key, val) in task.env.iter() { - cmd.env(key, val); - } - +async fn execute_process(state: &AppState, order: &ExecutorOrder, cmd: &mut Command) -> Result { let mut child = cmd.spawn().expect("Failed to execute process"); let child_stdout = child.stdout.take().unwrap(); // take is like clone let child_stderr = child.stderr.take().unwrap(); @@ -93,22 +71,52 @@ async fn run_task(state: AppState, order: ExecutorOrder) -> Result<()> { }); // parallel stdout and stderr capture - let stdout_handle = collect_logs( - order.id, - LogKind::Stderr, - child_stderr, - state.db.clone() - ); let stderr_handle = collect_logs( order.id, LogKind::Stdout, child_stdout, state.db.clone() ); + let stdout_handle = collect_logs( + order.id, + LogKind::Stderr, + child_stderr, + state.db.clone() + ); - stdout_handle.await?; stderr_handle.await?; - let status = process_handle.await?; + stdout_handle.await?; + return Ok(process_handle.await?); +} + +async fn run_task(state: AppState, order: ExecutorOrder) -> Result<()> { + debug!("Start processing of order {:?}", &order.id); + // save in DB + let _result = sqlx::query("INSERT INTO task_runs (id, task_id, trigger_mode, status, submitted_at, started_at) VALUES ($1, $2, $3, $4, $5, $5)") + .bind(order.id.to_string()) + .bind(order.task_id.clone()) + .bind(order.trigger_mode.to_string()) + .bind(TaskStatus::Running.to_string()) + .bind(Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true)) + .execute(&state.db) + .await.unwrap(); + + let task = state.config.tasks.get(&order.task_id) + .expect("Task id in executor order is not valid."); + let executable = match task.command.first() { + Some(v) => v, + None => return Err(anyhow!("Could not find command to execute")), + }; + let mut cmd = Command::new(executable); + + cmd.args(task.command.iter().skip(1).collect::>()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()); + for (key, val) in task.env.iter() { + cmd.env(key, val); + } + let status = execute_process(&state, &order, &mut cmd).await?; + if !status.success() { error!("Non successful exit code found: {}", status); }