diff --git a/Cargo.lock b/Cargo.lock index e2df5c8..6c6600b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -189,6 +189,7 @@ dependencies = [ "serde_json", "serde_yaml", "sqlx", + "strum_macros", "tokio", "tokio-cron-scheduler", "tokio-util", @@ -715,6 +716,12 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + [[package]] name = "hermit-abi" version = "0.3.9" @@ -1603,6 +1610,7 @@ dependencies = [ "tokio-stream", "tracing", "url", + "uuid", ] [[package]] @@ -1626,7 +1634,7 @@ checksum = "5833ef53aaa16d860e92123292f1f6a3d53c34ba8b1969f152ef1a7bb803f3c8" dependencies = [ "dotenvy", "either", - "heck", + "heck 0.4.1", "hex", "once_cell", "proc-macro2", @@ -1684,6 +1692,7 @@ dependencies = [ "stringprep", "thiserror", "tracing", + "uuid", "whoami", ] @@ -1723,6 +1732,7 @@ dependencies = [ "stringprep", "thiserror", "tracing", + "uuid", "whoami", ] @@ -1748,6 +1758,7 @@ dependencies = [ "tracing", "url", "urlencoding", + "uuid", ] [[package]] @@ -1767,6 +1778,19 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" +[[package]] +name = "strum_macros" +version = "0.26.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c6bee85a5a24955dc440386795aa378cd9cf82acd5f764469152d2270e581be" +dependencies = [ + "heck 0.5.0", + "proc-macro2", + "quote", + "rustversion", + "syn 2.0.72", +] + [[package]] name = "subtle" version = "2.6.1" diff --git a/Cargo.toml b/Cargo.toml index ebd1af8..7cb4a8d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" serde_yaml = "0.9" chrono = { version = "0.4.26", features = ["serde"] } -sqlx = { version = "0.7.4", features = ["sqlite", "runtime-tokio", "chrono"] } +sqlx = { version = "0.7.4", features = ["sqlite", "runtime-tokio", "chrono", "uuid"] } anyhow = "1.0.75" clap = "4.5.4" tokio = { version = "1.37.0", features = ["full"] } @@ -22,4 +22,5 @@ tower-http = { version = "0.5.2", features = ["fs"] } tokio-cron-scheduler = "0.10.2" argh = "0.1.12" tokio-util = "0.7.11" +strum_macros = "0.26.4" diff --git a/TODO.md b/TODO.md index 623bd73..f6f526e 100644 --- a/TODO.md +++ b/TODO.md @@ -2,7 +2,8 @@ ## TODO -- [ ] Implement support for webhook and debouncing of webhook +- [x] Implement support for webhook and debouncing of webhook +- [ ] Add stderr capture - [ ] Implement basic auth with HTTP basic auth (to trigger and see logs only) - [ ] add CSS badge and color code on job status - [ ] Validating config file diff --git a/assets/styles/app.css b/assets/styles/app.css index 0aef0fe..bc8a4a7 100644 --- a/assets/styles/app.css +++ b/assets/styles/app.css @@ -4,3 +4,11 @@ table { table tbody tr td:first-child { padding-right: 3rem !important; } + +.line-stderr { + background-color: rgba(255, 0, 0, 0.5); +} + +.logs-lines { + white-space: nowrap; +} diff --git a/config.yaml b/config.yaml index dbdb436..8189ed3 100644 --- a/config.yaml +++ b/config.yaml @@ -10,7 +10,7 @@ tasks: SIMULATION_SPEED: 0.2 command: - /usr/bin/python3 - - /home/mbess/workspace/perso/autotasker/examples/do_something_1.py + - /home/mbess/workspace/autotasker/examples/do_something_1.py reindex_db: name: Reindex the whole database diff --git a/examples/do_something_1.py b/examples/do_something_1.py index bc29a5b..c5a31b1 100644 --- a/examples/do_something_1.py +++ b/examples/do_something_1.py @@ -1,14 +1,15 @@ +import os, sys import random -import os import string from time import sleep - def main(): iterations = random.randint(10, 150) speed = float(os.getenv("SIMULATION_SPEED") or 0.8) - print(f"Going for {iterations=}") + print(f"Going for {iterations=} with {speed=}") for i in range(iterations): + if random.uniform(0, 1) > 0.8: + print("Some error we will see", file=sys.stderr) print( str(i) + " " + ''.join( @@ -24,6 +25,7 @@ def main(): (random.uniform(0, 5) if random.uniform(0, 1) > 0.99 else 0)) ) print("Done, script is finished") + if __name__ == "__main__": main() diff --git a/migrations/all.sql b/migrations/all.sql index 77434c1..7d00174 100644 --- a/migrations/all.sql +++ b/migrations/all.sql @@ -5,8 +5,17 @@ CREATE TABLE task_runs ( status TEXT CHECK(status IN ('pending','running','failed','success')) NOT NULL DEFAULT 'pending', trigger_mode TEXT CHECK(trigger_mode IN ('manual','webhook','schedule')) NOT NULL DEFAULT 'manual', exit_code INT, - logs TEXT, + runtime_details JSON, submitted_at DATETIME, started_at DATETIME, ended_at DATETIME ); + +DROP TABLE IF EXISTS logs_lines; +CREATE TABLE logs_lines ( + id TEXT PRIMARY KEY, + task_run_id TEXT NOT NULL, + kind TEXT CHECK(kind IN ('Stdout', 'Stderr')), + captured_at INT, -- unix timestamp + content TEXT +) diff --git a/src/controllers.rs b/src/controllers.rs index badb50f..5d0bad1 100644 --- a/src/controllers.rs +++ b/src/controllers.rs @@ -1,4 +1,4 @@ -use crate::models::{TaskRun, TaskRunSummary}; +use crate::models::{LogLine, TaskRun, TaskRunSummary}; use axum::extract::{Path as ExtractPath, State}; use axum::http::StatusCode; use axum::Json; @@ -119,7 +119,7 @@ pub async fn get_task_run( ExtractPath((_task_id, run_id)): ExtractPath<(String, String)>, ) -> Html { let run_details = match sqlx::query_as::<_, TaskRun>("SELECT * FROM task_runs WHERE id = $1") - .bind(run_id) + .bind(&run_id) .fetch_one(&app_state.db) .await { Ok(v) => v, @@ -127,11 +127,17 @@ pub async fn get_task_run( return Html("

Task run not found

".to_string()); } }; + let logs_lines = sqlx::query_as::<_, LogLine>("SELECT * FROM logs_lines WHERE task_run_id = $1 ORDER BY captured_at ASC") + .bind(&run_id) + .fetch_all(&app_state.db) + .await + .unwrap(); Html( app_state.templating_env.get_template("pages/task_run_details.html").unwrap() .render(context!( - run => run_details + run => run_details, + logs_lines )) .unwrap() ) diff --git a/src/executor.rs b/src/executor.rs index bf24cc9..dfd7948 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -1,14 +1,58 @@ use anyhow::{anyhow, Result}; use chrono::{SecondsFormat, Utc}; use log::{debug, info, error}; +use sqlx::{Pool, QueryBuilder, Sqlite}; +use tokio::task::JoinHandle; +use uuid::Uuid; use std::process::Stdio; -use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader}; use tokio::process::Command; use tokio::sync::mpsc::Receiver; -use crate::models::ExecutorOrder; +use crate::models::{ExecutorOrder, LogKind, LogLine}; use crate::AppState; +async fn insert_logs(db: &Pool, collected_logs: &mut Vec) -> Result<()> { + if collected_logs.is_empty() { + return Ok(()); + } + // insert into sqlite dB + let mut query_builder = QueryBuilder::new("INSERT INTO logs_lines (id, task_run_id, captured_at, kind, content)"); + query_builder.push_values(collected_logs.iter(), |mut b, log_line| { + b + .push_bind(log_line.id.to_string()) + .push_bind(log_line.task_run_id.to_string()) + .push_bind(log_line.captured_at) + .push_bind(log_line.kind.to_string()) + .push_bind(&log_line.content); + }); + let _ = query_builder.build().execute(db).await?; + *collected_logs = vec![]; + Ok(()) +} + +fn collect_logs(task_run_id: Uuid, kind: LogKind, stream: Stream, db: Pool) -> JoinHandle<()> + where Stream: AsyncRead + Unpin + std::marker::Send + 'static +{ + tokio::spawn(async move { + let mut lines = BufReader::with_capacity(2048, stream).lines(); + let mut collected_logs: Vec = vec![]; + while let Some(line) = lines.next_line().await.unwrap() { + collected_logs.push(LogLine { + id: Uuid::new_v4().to_string(), + kind: kind.clone(), + content: line, + task_run_id: task_run_id.to_string(), + captured_at: Utc::now().timestamp_millis(), + }); + if collected_logs.len() > 20 { + insert_logs(&db, &mut collected_logs).await.expect("Error inserting log"); + } + } + insert_logs(&db, &mut collected_logs).await.expect("Error inserting log"); + }) +} + async fn run_task(state: AppState, order: ExecutorOrder) -> Result<()> { debug!("Start processing of order {:?}", &order.id); // save in DB @@ -29,15 +73,15 @@ async fn run_task(state: AppState, order: ExecutorOrder) -> Result<()> { let mut cmd = Command::new(executable); cmd.args(task.command.iter().skip(1).collect::>()) - .stdout(Stdio::piped()); + .stdout(Stdio::piped()) + .stderr(Stdio::piped()); for (key, val) in task.env.iter() { cmd.env(key, val); } let mut child = cmd.spawn().expect("Failed to execute process"); - - let stdout = child.stdout.take().unwrap(); - + let child_stdout = child.stdout.take().unwrap(); // take is like clone + let child_stderr = child.stderr.take().unwrap(); let process_handle = tokio::spawn(async move { let status = child .wait() @@ -48,16 +92,27 @@ async fn run_task(state: AppState, order: ExecutorOrder) -> Result<()> { status }); - let mut collected_logs = String::new(); - let mut lines = BufReader::with_capacity(16, stdout).lines(); - while let Some(line) = lines.next_line().await.unwrap() { - collected_logs += &format!("{}\n", line); - } + // parallel stdout and stderr capture + let stdout_handle = collect_logs( + order.id, + LogKind::Stderr, + child_stderr, + state.db.clone() + ); + let stderr_handle = collect_logs( + order.id, + LogKind::Stdout, + child_stdout, + state.db.clone() + ); + + stdout_handle.await?; + stderr_handle.await?; let status = process_handle.await?; if !status.success() { error!("Non successful exit code found: {}", status); } - let _result = sqlx::query("UPDATE task_runs SET status = $2, ended_at = $3, exit_code = $4, logs = $5 WHERE id = $1") + let _result = sqlx::query("UPDATE task_runs SET status = $2, ended_at = $3, exit_code = $4 WHERE id = $1") .bind(order.id.to_string()) .bind(match status.success() { true => "success", @@ -65,7 +120,6 @@ async fn run_task(state: AppState, order: ExecutorOrder) -> Result<()> { }) .bind(Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true)) .bind(status.code()) - .bind(collected_logs) .execute(&state.db) .await.unwrap(); debug!("End of executor order {:?}, details saved", &order.id); diff --git a/src/models.rs b/src/models.rs index f4c32ae..9bbf8c6 100644 --- a/src/models.rs +++ b/src/models.rs @@ -15,8 +15,8 @@ enum TriggerMode { } #[derive(sqlx::Type, Debug, Serialize, Deserialize)] -#[fully_pub] #[sqlx(rename_all = "lowercase")] +#[fully_pub] enum TaskStatus { Pending, Running, @@ -24,6 +24,25 @@ enum TaskStatus { Success, } +#[derive(sqlx::Type, Clone, Debug, Serialize, Deserialize)] +#[derive(strum_macros::Display)] +#[fully_pub] +enum LogKind { + Stdout, + Stderr +} + +#[derive(sqlx::FromRow, Debug, Serialize, Deserialize)] +#[fully_pub] +struct LogLine { + id: String, + task_run_id: String, + /// unix timestamp millis + captured_at: i64, + kind: LogKind, + content: String, +} + #[derive(sqlx::FromRow, Deserialize, Serialize)] #[fully_pub] struct TaskRunSummary { @@ -43,7 +62,7 @@ struct TaskRun { status: TaskStatus, trigger_mode: TriggerMode, exit_code: Option, - logs: String, + // logs: sqlx::types::Json>, submitted_at: DateTime, started_at: Option>, ended_at: Option>, diff --git a/src/templates/pages/task_run_details.html b/src/templates/pages/task_run_details.html index a33c381..f0d1eef 100644 --- a/src/templates/pages/task_run_details.html +++ b/src/templates/pages/task_run_details.html @@ -12,5 +12,9 @@

Logs

Exit code is {{ run.exit_code }}

-
{{ run.logs }}
+
+    {% for line in logs_lines %}
+    
{{- line.content -}}
+ {% endfor %} +
{% endblock %} diff --git a/tmp/dbs/current.db b/tmp/dbs/current.db deleted file mode 100644 index 0b13db9..0000000 Binary files a/tmp/dbs/current.db and /dev/null differ