feat(logs): store stderr and stdout with details in dedicated log messages table

This commit is contained in:
Matthieu Bessat 2024-07-25 23:23:58 +02:00
parent c40c6aedef
commit 55438fbd83
12 changed files with 155 additions and 27 deletions

View file

@ -1,4 +1,4 @@
use crate::models::{TaskRun, TaskRunSummary};
use crate::models::{LogLine, TaskRun, TaskRunSummary};
use axum::extract::{Path as ExtractPath, State};
use axum::http::StatusCode;
use axum::Json;
@ -119,7 +119,7 @@ pub async fn get_task_run(
ExtractPath((_task_id, run_id)): ExtractPath<(String, String)>,
) -> Html<String> {
let run_details = match sqlx::query_as::<_, TaskRun>("SELECT * FROM task_runs WHERE id = $1")
.bind(run_id)
.bind(&run_id)
.fetch_one(&app_state.db)
.await {
Ok(v) => v,
@ -127,11 +127,17 @@ pub async fn get_task_run(
return Html("<h1>Task run not found</h1>".to_string());
}
};
let logs_lines = sqlx::query_as::<_, LogLine>("SELECT * FROM logs_lines WHERE task_run_id = $1 ORDER BY captured_at ASC")
.bind(&run_id)
.fetch_all(&app_state.db)
.await
.unwrap();
Html(
app_state.templating_env.get_template("pages/task_run_details.html").unwrap()
.render(context!(
run => run_details
run => run_details,
logs_lines
))
.unwrap()
)

View file

@ -1,14 +1,58 @@
use anyhow::{anyhow, Result};
use chrono::{SecondsFormat, Utc};
use log::{debug, info, error};
use sqlx::{Pool, QueryBuilder, Sqlite};
use tokio::task::JoinHandle;
use uuid::Uuid;
use std::process::Stdio;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader};
use tokio::process::Command;
use tokio::sync::mpsc::Receiver;
use crate::models::ExecutorOrder;
use crate::models::{ExecutorOrder, LogKind, LogLine};
use crate::AppState;
async fn insert_logs(db: &Pool<Sqlite>, collected_logs: &mut Vec<LogLine>) -> Result<()> {
if collected_logs.is_empty() {
return Ok(());
}
// insert into sqlite dB
let mut query_builder = QueryBuilder::new("INSERT INTO logs_lines (id, task_run_id, captured_at, kind, content)");
query_builder.push_values(collected_logs.iter(), |mut b, log_line| {
b
.push_bind(log_line.id.to_string())
.push_bind(log_line.task_run_id.to_string())
.push_bind(log_line.captured_at)
.push_bind(log_line.kind.to_string())
.push_bind(&log_line.content);
});
let _ = query_builder.build().execute(db).await?;
*collected_logs = vec![];
Ok(())
}
fn collect_logs<Stream>(task_run_id: Uuid, kind: LogKind, stream: Stream, db: Pool<Sqlite>) -> JoinHandle<()>
where Stream: AsyncRead + Unpin + std::marker::Send + 'static
{
tokio::spawn(async move {
let mut lines = BufReader::with_capacity(2048, stream).lines();
let mut collected_logs: Vec<LogLine> = vec![];
while let Some(line) = lines.next_line().await.unwrap() {
collected_logs.push(LogLine {
id: Uuid::new_v4().to_string(),
kind: kind.clone(),
content: line,
task_run_id: task_run_id.to_string(),
captured_at: Utc::now().timestamp_millis(),
});
if collected_logs.len() > 20 {
insert_logs(&db, &mut collected_logs).await.expect("Error inserting log");
}
}
insert_logs(&db, &mut collected_logs).await.expect("Error inserting log");
})
}
async fn run_task(state: AppState, order: ExecutorOrder) -> Result<()> {
debug!("Start processing of order {:?}", &order.id);
// save in DB
@ -29,15 +73,15 @@ async fn run_task(state: AppState, order: ExecutorOrder) -> Result<()> {
let mut cmd = Command::new(executable);
cmd.args(task.command.iter().skip(1).collect::<Vec<&String>>())
.stdout(Stdio::piped());
.stdout(Stdio::piped())
.stderr(Stdio::piped());
for (key, val) in task.env.iter() {
cmd.env(key, val);
}
let mut child = cmd.spawn().expect("Failed to execute process");
let stdout = child.stdout.take().unwrap();
let child_stdout = child.stdout.take().unwrap(); // take is like clone
let child_stderr = child.stderr.take().unwrap();
let process_handle = tokio::spawn(async move {
let status = child
.wait()
@ -48,16 +92,27 @@ async fn run_task(state: AppState, order: ExecutorOrder) -> Result<()> {
status
});
let mut collected_logs = String::new();
let mut lines = BufReader::with_capacity(16, stdout).lines();
while let Some(line) = lines.next_line().await.unwrap() {
collected_logs += &format!("{}\n", line);
}
// parallel stdout and stderr capture
let stdout_handle = collect_logs(
order.id,
LogKind::Stderr,
child_stderr,
state.db.clone()
);
let stderr_handle = collect_logs(
order.id,
LogKind::Stdout,
child_stdout,
state.db.clone()
);
stdout_handle.await?;
stderr_handle.await?;
let status = process_handle.await?;
if !status.success() {
error!("Non successful exit code found: {}", status);
}
let _result = sqlx::query("UPDATE task_runs SET status = $2, ended_at = $3, exit_code = $4, logs = $5 WHERE id = $1")
let _result = sqlx::query("UPDATE task_runs SET status = $2, ended_at = $3, exit_code = $4 WHERE id = $1")
.bind(order.id.to_string())
.bind(match status.success() {
true => "success",
@ -65,7 +120,6 @@ async fn run_task(state: AppState, order: ExecutorOrder) -> Result<()> {
})
.bind(Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true))
.bind(status.code())
.bind(collected_logs)
.execute(&state.db)
.await.unwrap();
debug!("End of executor order {:?}, details saved", &order.id);

View file

@ -15,8 +15,8 @@ enum TriggerMode {
}
#[derive(sqlx::Type, Debug, Serialize, Deserialize)]
#[fully_pub]
#[sqlx(rename_all = "lowercase")]
#[fully_pub]
enum TaskStatus {
Pending,
Running,
@ -24,6 +24,25 @@ enum TaskStatus {
Success,
}
#[derive(sqlx::Type, Clone, Debug, Serialize, Deserialize)]
#[derive(strum_macros::Display)]
#[fully_pub]
enum LogKind {
Stdout,
Stderr
}
#[derive(sqlx::FromRow, Debug, Serialize, Deserialize)]
#[fully_pub]
struct LogLine {
id: String,
task_run_id: String,
/// unix timestamp millis
captured_at: i64,
kind: LogKind,
content: String,
}
#[derive(sqlx::FromRow, Deserialize, Serialize)]
#[fully_pub]
struct TaskRunSummary {
@ -43,7 +62,7 @@ struct TaskRun {
status: TaskStatus,
trigger_mode: TriggerMode,
exit_code: Option<u32>,
logs: String,
// logs: sqlx::types::Json<Vec<LogMessage>>,
submitted_at: DateTime<Utc>,
started_at: Option<DateTime<Utc>>,
ended_at: Option<DateTime<Utc>>,

View file

@ -12,5 +12,9 @@
<h3>Logs</h3>
<p>Exit code is {{ run.exit_code }}</p>
<pre>{{ run.logs }}</pre>
<pre class="logs-lines">
{% for line in logs_lines %}
<div class="line-{{ line.kind|lower }}">{{- line.content -}}</div>
{% endfor %}
</pre>
{% endblock %}