refactor: split run_process in executor

This commit is contained in:
Matthieu Bessat 2024-07-28 18:31:35 +02:00
parent 71c2b9dc63
commit 9f7c81644d

View file

@ -4,7 +4,7 @@ use log::{debug, info, error};
use sqlx::{Pool, QueryBuilder, Sqlite};
use tokio::task::JoinHandle;
use uuid::Uuid;
use std::process::Stdio;
use std::process::{ExitStatus, Stdio};
use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader};
use tokio::process::Command;
use tokio::sync::mpsc::Receiver;
@ -16,7 +16,6 @@ async fn insert_logs(db: &Pool<Sqlite>, collected_logs: &mut Vec<LogLine>) -> Re
if collected_logs.is_empty() {
return Ok(());
}
// insert into sqlite dB
let mut query_builder = QueryBuilder::new("INSERT INTO logs_lines (id, task_run_id, captured_at, kind, content)");
query_builder.push_values(collected_logs.iter(), |mut b, log_line| {
b
@ -35,8 +34,12 @@ fn collect_logs<Stream>(task_run_id: Uuid, kind: LogKind, stream: Stream, db: Po
where Stream: AsyncRead + Unpin + std::marker::Send + 'static
{
tokio::spawn(async move {
let mut lines = BufReader::with_capacity(2048, stream).lines();
let mut lines = BufReader::with_capacity(1048, stream).lines();
let mut collected_logs: Vec<LogLine> = vec![];
let max_lines = match &kind {
LogKind::Stdout => 20,
LogKind::Stderr => 3
};
while let Some(line) = lines.next_line().await.unwrap() {
collected_logs.push(LogLine {
id: Uuid::new_v4().to_string(),
@ -45,40 +48,15 @@ fn collect_logs<Stream>(task_run_id: Uuid, kind: LogKind, stream: Stream, db: Po
task_run_id: task_run_id.to_string(),
captured_at: Utc::now().timestamp_millis(),
});
if collected_logs.len() > 20 {
insert_logs(&db, &mut collected_logs).await.expect("Error inserting log");
if collected_logs.len() > max_lines {
insert_logs(&db, &mut collected_logs).await.expect("Error inserting logs.");
}
}
insert_logs(&db, &mut collected_logs).await.expect("Error inserting log");
})
}
async fn run_task(state: AppState, order: ExecutorOrder) -> Result<()> {
debug!("Start processing of order {:?}", &order.id);
// save in DB
let _result = sqlx::query("INSERT INTO task_runs (id, task_id, trigger_mode, status, submitted_at, started_at) VALUES ($1, $2, $3, $4, $5, $5)")
.bind(order.id.to_string())
.bind(order.task_id.clone())
.bind(order.trigger_mode.to_string())
.bind(TaskStatus::Running.to_string())
.bind(Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true))
.execute(&state.db)
.await.unwrap();
let task = state.config.tasks.get(&order.task_id).expect("Task id to be valid");
let executable = match task.command.first() {
Some(v) => v,
None => return Err(anyhow!("Could not find command to execute")),
};
let mut cmd = Command::new(executable);
cmd.args(task.command.iter().skip(1).collect::<Vec<&String>>())
.stdout(Stdio::piped())
.stderr(Stdio::piped());
for (key, val) in task.env.iter() {
cmd.env(key, val);
}
async fn execute_process(state: &AppState, order: &ExecutorOrder, cmd: &mut Command) -> Result<ExitStatus> {
let mut child = cmd.spawn().expect("Failed to execute process");
let child_stdout = child.stdout.take().unwrap(); // take is like clone
let child_stderr = child.stderr.take().unwrap();
@ -93,22 +71,52 @@ async fn run_task(state: AppState, order: ExecutorOrder) -> Result<()> {
});
// parallel stdout and stderr capture
let stdout_handle = collect_logs(
order.id,
LogKind::Stderr,
child_stderr,
state.db.clone()
);
let stderr_handle = collect_logs(
order.id,
LogKind::Stdout,
child_stdout,
state.db.clone()
);
let stdout_handle = collect_logs(
order.id,
LogKind::Stderr,
child_stderr,
state.db.clone()
);
stdout_handle.await?;
stderr_handle.await?;
let status = process_handle.await?;
stdout_handle.await?;
return Ok(process_handle.await?);
}
async fn run_task(state: AppState, order: ExecutorOrder) -> Result<()> {
debug!("Start processing of order {:?}", &order.id);
// save in DB
let _result = sqlx::query("INSERT INTO task_runs (id, task_id, trigger_mode, status, submitted_at, started_at) VALUES ($1, $2, $3, $4, $5, $5)")
.bind(order.id.to_string())
.bind(order.task_id.clone())
.bind(order.trigger_mode.to_string())
.bind(TaskStatus::Running.to_string())
.bind(Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true))
.execute(&state.db)
.await.unwrap();
let task = state.config.tasks.get(&order.task_id)
.expect("Task id in executor order is not valid.");
let executable = match task.command.first() {
Some(v) => v,
None => return Err(anyhow!("Could not find command to execute")),
};
let mut cmd = Command::new(executable);
cmd.args(task.command.iter().skip(1).collect::<Vec<&String>>())
.stdout(Stdio::piped())
.stderr(Stdio::piped());
for (key, val) in task.env.iter() {
cmd.env(key, val);
}
let status = execute_process(&state, &order, &mut cmd).await?;
if !status.success() {
error!("Non successful exit code found: {}", status);
}