diff --git a/Cargo.lock b/Cargo.lock index c6f36ea..f3394d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -148,7 +148,9 @@ dependencies = [ "axum", "chrono", "clap", + "env_logger", "fully_pub", + "log", "minijinja", "serde", "serde_json", @@ -499,6 +501,29 @@ dependencies = [ "serde", ] +[[package]] +name = "env_filter" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a009aa4810eb158359dda09d0c87378e4bbb89b5a801f016885a4707ba24f7ea" +dependencies = [ + "log", + "regex", +] + +[[package]] +name = "env_logger" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38b35839ba51819680ba087cd351788c9a3c476841207e0b8cee0b04722343b9" +dependencies = [ + "anstream", + "anstyle", + "env_filter", + "humantime", + "log", +] + [[package]] name = "equivalent" version = "1.0.1" @@ -825,6 +850,12 @@ dependencies = [ "libm", ] +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + [[package]] name = "hyper" version = "1.3.1" @@ -993,9 +1024,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.21" +version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" +checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" [[package]] name = "matchit" diff --git a/Cargo.toml b/Cargo.toml index be30172..21096a9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,4 +17,6 @@ axum = { version = "0.7.5", features = ["json"] } minijinja = { version = "1.0.20", features = ["builtins"] } uuid = { version = "1.8.0", features = ["serde", "v4"] } fully_pub = "0.1.4" +log = "0.4.22" +env_logger = "0.11.3" diff --git a/config.example.yaml b/config.example.yaml new file mode 100644 index 0000000..aa9ad7d --- /dev/null +++ b/config.example.yaml @@ -0,0 +1,17 @@ +tasks: + - id: do_magic_stuff + name: Do magic incantation + env: + PYTHONUNBUFFERED: "1" + SIMULATION_SPEED: 0.2 + command: + - /usr/bin/python3 + - /home/mbess/workspace/autotasker/examples/do_something_1.py + store_logs: true + - id: reindex_db + name: Reindex the whole database + env: {} + command: + - ls + - /etc/fstab + diff --git a/migrations/01_create_task_runs_table.sql b/migrations/all.sql similarity index 60% rename from migrations/01_create_task_runs_table.sql rename to migrations/all.sql index ed6d468..77434c1 100644 --- a/migrations/01_create_task_runs_table.sql +++ b/migrations/all.sql @@ -1,11 +1,12 @@ -CREATE TABLE IF NOT EXISTS task_runs ( +DROP TABLE IF EXISTS task_runs; +CREATE TABLE task_runs ( id TEXT PRIMARY KEY, task_id TEXT NOT NULL, status TEXT CHECK(status IN ('pending','running','failed','success')) NOT NULL DEFAULT 'pending', - trigger_mode TEXT CHECK(status IN ('manual','webhook','schedule')) NOT NULL, + trigger_mode TEXT CHECK(trigger_mode IN ('manual','webhook','schedule')) NOT NULL DEFAULT 'manual', exit_code INT, logs TEXT, submitted_at DATETIME, started_at DATETIME, - end_at DATETIME -) + ended_at DATETIME +); diff --git a/src/controllers.rs b/src/controllers.rs index 98a3349..97e0d83 100644 --- a/src/controllers.rs +++ b/src/controllers.rs @@ -1,21 +1,9 @@ -use crate::models::{Config, Task}; -use anyhow::{anyhow, Context, Result}; +use crate::models::{Task, TaskRun, TaskRunSummary}; use axum::extract::{Path as ExtractPath, State}; -use axum::http::{Response, StatusCode}; +use axum::http::StatusCode; use axum::Json; -use axum::{response::Html, routing::get, Router}; +use axum::response::Html; use minijinja::render; -use serde::{Deserialize, Serialize}; -use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions}; -use sqlx::{ConnectOptions, Connection, Pool, Sqlite}; -use std::collections::HashMap; -use std::fs; -use std::process::Stdio; -use std::str::FromStr; -use std::sync::Arc; -use tokio::io::{AsyncBufReadExt, BufReader}; -use tokio::process::{Child, Command}; -use tokio::sync::mpsc::{self, Receiver, Sender}; use uuid::Uuid; use crate::models::ExecutorOrder; @@ -24,7 +12,6 @@ use crate::AppState; pub async fn home() -> Html { Html(render!( include_str!("./templates/home.html"), - first_name => "John Doe" )) } @@ -40,7 +27,6 @@ pub async fn trigger_task( State(app_state): State, ExtractPath(task_id): ExtractPath, ) -> (StatusCode, Html) { - println!("Run task {}", task_id); let tasks: Vec = app_state.config.tasks; let task = match tasks.iter().find(|t| t.id == task_id) { Some(t) => t, @@ -70,7 +56,7 @@ pub async fn trigger_task( } pub async fn handle_webhook( - State(app_state): State, + State(_app_state): State, ExtractPath(token): ExtractPath, ) -> (StatusCode, Json) { println!("Webhook token {}", token); @@ -82,16 +68,41 @@ pub async fn list_task_runs( State(app_state): State, ExtractPath(task_id): ExtractPath, ) -> Html { - Html(render!(include_str!("./templates/list_task_runs.html"))) + let runs = sqlx::query_as::<_, TaskRunSummary>("SELECT id,status,trigger_mode,submitted_at,started_at,ended_at FROM task_runs WHERE task_id = $1 LIMIT 100") + .bind(&task_id) + .fetch_all(&app_state.db) + .await + .unwrap(); + let task = match app_state.config.tasks.iter().find(|t| t.id == task_id) { + Some(v) => v, + None => { + return Html("

Task not found

".to_string()); + } + }; + Html(render!( + include_str!("./templates/list_task_runs.html"), + task => task, + runs => runs, + )) } pub async fn get_task_run( State(app_state): State, - ExtractPath(task_id): ExtractPath, - ExtractPath(run_id): ExtractPath, + ExtractPath((_task_id, run_id)): ExtractPath<(String, String)>, ) -> Html { + let run_details = match sqlx::query_as::<_, TaskRun>("SELECT * FROM task_runs WHERE id = $1") + .bind(run_id) + .fetch_one(&app_state.db) + .await { + Ok(v) => v, + Err(e) => { + dbg!(e); + return Html("

Task run not found

".to_string()); + } + }; + Html(render!( include_str!("./templates/task_run_details.html"), - run => false + run => run_details )) } diff --git a/src/executor.rs b/src/executor.rs new file mode 100644 index 0000000..fbefe4d --- /dev/null +++ b/src/executor.rs @@ -0,0 +1,85 @@ +use anyhow::{anyhow, Result}; +use chrono::{SecondsFormat, Utc}; +use log::{debug, info, trace, error}; +use std::process::Stdio; +use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio::process::Command; +use tokio::sync::mpsc::Receiver; + +use crate::models::ExecutorOrder; +use crate::AppState; + +async fn run_task(state: AppState, order: ExecutorOrder) -> Result<()> { + let executable = match order.task.command.first() { + Some(v) => v, + None => return Err(anyhow!("Could not find command to execute")), + }; + debug!("Start processing of order {}", order.id); + // save in DB + let _result = sqlx::query("INSERT INTO task_runs (id, task_id, trigger_mode, status, submitted_at, started_at) VALUES ($1, $2, $3, $4, $5, $5)") + .bind(order.id.to_string()) + .bind(order.task.id.clone()) + .bind("manual") + .bind("running") + .bind(Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true)) + .execute(&state.db) + .await.unwrap(); + + let mut cmd = Command::new(executable); + + cmd.args(order.task.command.iter().skip(1).collect::>()) + .stdout(Stdio::piped()); + for (key, val) in order.task.env.iter() { + cmd.env(key, val); + } + + let mut child = cmd.spawn().expect("Failed to execute process"); + + let stdout = child.stdout.take().unwrap(); + + let process_handle = tokio::spawn(async move { + let status = child + .wait() + .await + .expect("child process encountered an error"); + + debug!("Child process done, exit status is {}", status); + status + }); + + let mut collected_logs = String::new(); + let mut lines = BufReader::with_capacity(16, stdout).lines(); + while let Some(line) = lines.next_line().await.unwrap() { + trace!("{}: {}", order.id, line); + collected_logs += &format!("{}\n", line); + } + let status = process_handle.await?; + if !status.success() { + error!("Non successful exit code found {}", status); + } + let _result = sqlx::query("UPDATE task_runs SET status = $2, ended_at = $3, exit_code = $4, logs = $5 WHERE id = $1") + .bind(order.id.to_string()) + .bind(match status.success() { + true => "success", + false => "failed" + }) + .bind(Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true)) + .bind(status.code()) + .bind(collected_logs) + .execute(&state.db) + .await.unwrap(); + debug!("End of task, details saved"); + Ok(()) +} + +pub async fn run_executor(state: AppState, mut rx: Receiver) { + debug!("Executor started"); + while let Some(order) = rx.recv().await { + println!("Got Order: {:?}", order); + let local_state = state.clone(); + tokio::spawn(async { + run_task(local_state, order).await + }); + } + info!("Executor stopped"); +} diff --git a/src/main.rs b/src/main.rs index 5e31c04..18b0fc1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,25 +1,19 @@ -use anyhow::{anyhow, Context, Result}; -use axum::extract::{Path as ExtractPath, State}; -use axum::http::{Response, StatusCode}; -use axum::routing::get; -use axum::{Json, Router}; -use minijinja::render; -use models::{Config, Task}; -use serde::{Deserialize, Serialize}; -use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions}; -use sqlx::{ConnectOptions, Connection, Pool, Sqlite}; -use std::collections::HashMap; -use std::fs; -use std::process::Stdio; -use std::str::FromStr; -use std::sync::Arc; -use tokio::io::{AsyncBufReadExt, BufReader}; -use tokio::process::{Child, Command}; -use tokio::sync::mpsc::{self, Receiver, Sender}; -use uuid::Uuid; - mod controllers; mod models; +mod executor; + +use log::info; +use anyhow::{anyhow, Context, Result}; +use axum::routing::get; +use axum::Router; +use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions}; +use sqlx::{ConnectOptions, Pool, Sqlite}; +use std::fs; +use std::str::FromStr; +use std::sync::Arc; +use tokio::sync::mpsc::{self, Sender}; +use executor::run_executor; +use models::{Config, ExecutorOrder}; #[derive(Debug, Clone)] pub struct AppState { @@ -32,64 +26,21 @@ fn get_config() -> Result { let inp_def_yaml = fs::read_to_string("./config.yaml") .expect("Should have been able to read the the config file"); - return serde_yaml::from_str(&inp_def_yaml) - .map_err(|e| anyhow!("Failed to parse config, {:?}", e)); + serde_yaml::from_str(&inp_def_yaml) + .map_err(|e| anyhow!("Failed to parse config, {:?}", e)) } -use crate::models::ExecutorOrder; - -async fn run_task(order: ExecutorOrder) { - let executable = match order.task.command.iter().next() { - Some(v) => v, - None => return, - }; - println!("Start processing of order {}", order.id); - let mut cmd = Command::new(executable); - - cmd.args(order.task.command.iter().skip(1).collect::>()) - .stdout(Stdio::piped()); - for (key, val) in order.task.env.iter() { - cmd.env(key, val); - } - - let mut child = cmd.spawn().expect("failed to execute process"); - - let stdout = child.stdout.take().unwrap(); - - tokio::spawn(async move { - let status = child - .wait() - .await - .expect("child process encountered an error"); - - println!("child status was: {}", status); - }); - - let mut lines = BufReader::with_capacity(16, stdout).lines(); - while let Some(line) = lines.next_line().await.unwrap() { - println!("{}: {}", order.id, line); - } - println!("End of task") -} - -async fn run_executor(mut rx: Receiver) { - println!("Executor started"); - while let Some(order) = rx.recv().await { - println!("Got Order: {:?}", order); - tokio::spawn(async { run_task(order).await }); - } - println!("Executor stopped"); -} #[tokio::main] async fn main() -> Result<()> { + env_logger::init(); + + info!("Starting autotasker"); let pool = prepare_database().await.context("Prepare db")?; - // start executor daemon + // start channel to talk to executor daemon let (tx, rx) = mpsc::channel::(32); - let executor_handle = tokio::spawn(async { run_executor(rx).await }); - let config: Config = get_config().expect("Cannot get config"); let state = AppState { config, @@ -97,6 +48,12 @@ async fn main() -> Result<()> { executor_tx: Arc::new(tx), }; + // start executor daemon + let executor_app_state = state.clone(); + let executor_handle = tokio::spawn(async { + run_executor(executor_app_state, rx).await + }); + // build our application with a single route let app = Router::new() .route("/", get(controllers::home)) @@ -110,27 +67,33 @@ async fn main() -> Result<()> { .route("/webhooks/:token", get(controllers::handle_webhook)) .with_state(state); - let listener = tokio::net::TcpListener::bind("0.0.0.0:8085").await.unwrap(); + let listen_addr = "0.0.0.0:8085"; + info!("Starting web server on http://{}", &listen_addr); + let listener = tokio::net::TcpListener::bind(listen_addr).await.unwrap(); axum::serve(listener, app).await?; + executor_handle.await?; Ok(()) } async fn prepare_database() -> Result> { - // create database if it does not exist - let conn = SqliteConnectOptions::from_str("sqlite:autotasker.db")? - .create_if_missing(true) - .connect() - .await?; - let _ = conn.close().await; + let conn_str = "sqlite:./tmp/dbs/autotasker.db"; + + // // create database if it does not exist + // let conn = SqliteConnectOptions::from_str(&conn_str)? + // .create_if_missing(true) + // .connect() + // .await?; + // let _ = conn.close().await; let pool = SqlitePoolOptions::new() .max_connections(50) - .connect("sqlite:autotasker.db") + .connect_with( + SqliteConnectOptions::from_str(conn_str)? + .log_statements(log::LevelFilter::Trace) + ) .await .context("could not connect to database_url")?; - sqlx::migrate!().run(&pool).await?; - Ok(pool) } diff --git a/src/models.rs b/src/models.rs index 9f8fc25..10dd5a0 100644 --- a/src/models.rs +++ b/src/models.rs @@ -24,6 +24,17 @@ enum TaskStatus { Success, } +#[derive(sqlx::FromRow, Deserialize, Serialize)] +#[fully_pub] +struct TaskRunSummary { + id: String, + status: TaskStatus, + trigger_mode: TriggerMode, + submitted_at: DateTime, + started_at: Option>, + ended_at: Option>, +} + #[derive(sqlx::FromRow, Deserialize, Serialize)] #[fully_pub] struct TaskRun { @@ -31,11 +42,11 @@ struct TaskRun { task_id: String, status: TaskStatus, trigger_mode: TriggerMode, - exit_code: u32, + exit_code: Option, logs: String, submitted_at: DateTime, - started_at: DateTime, - end_at: DateTime, + started_at: Option>, + ended_at: Option>, } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] diff --git a/src/templates/home.html b/src/templates/home.html index bbcb8be..918e026 100644 --- a/src/templates/home.html +++ b/src/templates/home.html @@ -1 +1,12 @@ -

Hello, {{ first_name }}

+

Hello welcome on autotasker

+ +You main want to start in here. + + +

+Autotasker is free software under GPLv3 licence. + +You can find source code on a self-hosted forge repository. +

diff --git a/src/templates/list_task_runs.html b/src/templates/list_task_runs.html index e69de29..a9336d5 100644 --- a/src/templates/list_task_runs.html +++ b/src/templates/list_task_runs.html @@ -0,0 +1,6 @@ +

List of task runs for {{ task.name }}

+
    +{% for task_run in runs %} +
  • {{ task_run.id }} {{ task_run.status }}
  • +{% endfor %} +
diff --git a/src/templates/list_tasks.html b/src/templates/list_tasks.html index 51d4b18..dcaa0bd 100644 --- a/src/templates/list_tasks.html +++ b/src/templates/list_tasks.html @@ -1,5 +1,13 @@ +

Tasks

+{% if tasks | length == 0 %} +No tasks were configured. +{% endif %} diff --git a/src/templates/task_run_details.html b/src/templates/task_run_details.html index e69de29..410d3cd 100644 --- a/src/templates/task_run_details.html +++ b/src/templates/task_run_details.html @@ -0,0 +1,16 @@ +

Task run {{ run.id }}

+ +
    +
  • Id: {{ run.id }}
  • +
  • State: {{ run.status }}
  • +
  • Submitted at: {{ run.submitted_at }}
  • +
  • Started at: {{ run.started_at }}
  • +
  • Ended at: {{ run.ended_at }}
  • +
  • Trigger Mode: {{ run.trigger_mode }}
  • +
+ +

Logs

+

Exit code is {{ run.exit_code }}

+
+{{ run.logs }}
+