diff --git a/Cargo.lock b/Cargo.lock index 2f45a3e..e2df5c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -191,6 +191,7 @@ dependencies = [ "sqlx", "tokio", "tokio-cron-scheduler", + "tokio-util", "tower-http", "uuid", ] diff --git a/Cargo.toml b/Cargo.toml index 101dbdf..ebd1af8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,4 +21,5 @@ env_logger = "0.11.3" tower-http = { version = "0.5.2", features = ["fs"] } tokio-cron-scheduler = "0.10.2" argh = "0.1.12" +tokio-util = "0.7.11" diff --git a/README.md b/README.md index 3679681..15f77bf 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,7 @@ Unix-like - [x] Schedule tasks (CRON-like). - [ ] OpenMetrics exporter to alert when a task failed. - [ ] External alerting when a task failed. -- [ ] Run task via webhook, with a webhook token. +- [ ] Trigger task via webhook, with a webhook token and debouncing. - [ ] Content negociation with JSON or HTML. - [ ] OAuth2 support for admin. diff --git a/config.yaml b/config.yaml index efac905..dbdb436 100644 --- a/config.yaml +++ b/config.yaml @@ -10,7 +10,7 @@ tasks: SIMULATION_SPEED: 0.2 command: - /usr/bin/python3 - - /home/mbess/workspace/autotasker/examples/do_something_1.py + - /home/mbess/workspace/perso/autotasker/examples/do_something_1.py reindex_db: name: Reindex the whole database @@ -30,3 +30,10 @@ tasks: # schedule: # "0 * * * * *" +webhooks: + - id: 1 + name: "Trigger magic stuff" + token: 988c19fe-fd5d-4887-8210-60e0dc50ba9e + target_tasks: + - do_magic_stuff + debounce_secs: 10 diff --git a/src/controllers.rs b/src/controllers.rs index 3796ccb..badb50f 100644 --- a/src/controllers.rs +++ b/src/controllers.rs @@ -3,7 +3,7 @@ use axum::extract::{Path as ExtractPath, State}; use axum::http::StatusCode; use axum::Json; use axum::response::{Html, IntoResponse}; -use minijinja::{context, render}; +use minijinja::context; use uuid::Uuid; use crate::models::ExecutorOrder; @@ -64,12 +64,28 @@ pub async fn trigger_task( } pub async fn handle_webhook( - State(_app_state): State, - ExtractPath(token): ExtractPath, + State(app_state): State, + ExtractPath((webhook_id, submitted_token)): ExtractPath<(u64, String)>, ) -> (StatusCode, Json) { - println!("Webhook token {}", token); - (StatusCode::OK, axum::Json("WebHook handle".to_string())) + let webhooks = app_state.config.webhooks.unwrap_or_default(); + let webhook = match webhooks.iter().find(|x| x.id == webhook_id) { + Some(x) => x, + None => { + return (StatusCode::NOT_FOUND, axum::Json("Webhook not found".to_string())) + } + }; + if webhook.token != submitted_token { + return (StatusCode::UNAUTHORIZED, axum::Json("Webhook token is invalid".to_string())) + } + // start debounce + app_state + .debouncer_tx + .send(webhook_id) + .await + .unwrap(); + + (StatusCode::OK, axum::Json("WebHook received".to_string())) } pub async fn list_task_runs( diff --git a/src/debouncer.rs b/src/debouncer.rs new file mode 100644 index 0000000..44cfe08 --- /dev/null +++ b/src/debouncer.rs @@ -0,0 +1,68 @@ +use log::{debug, trace}; +use tokio_util::sync::CancellationToken; +use uuid::Uuid; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::mpsc::{Receiver, Sender}; + +use crate::models::{ExecutorOrder, Webhook}; +use crate::AppState; + +async fn run_debouncer(webhook: Webhook, duration: Duration, executor_tx: Arc>, cancellation_token: CancellationToken) { + tokio::select! { + // Step 3: Using cloned token to listen to cancellation requests + _ = cancellation_token.cancelled() => { + // The token was cancelled, task can shut down + trace!("Debouncer cancelled"); + }, + _ = tokio::time::sleep(duration) => { + // Long work has completed + trace!("Debouncer finished for webhook {:?}", webhook); + for task_id in &webhook.target_tasks { + trace!("Debouncer is now triggering {}", &task_id); + let _ = executor_tx.send(ExecutorOrder { + id: Uuid::new_v4(), + task_id: task_id.to_string() + }).await; + } + } + } + +} + +#[derive(Debug)] +struct Debouncer { + webhook_id: u64, + cancellation_token: CancellationToken +} + +pub async fn run_debouncer_supervisor(state: AppState, mut rx: Receiver) { + debug!("Debouncer started"); + let mut actives_debouncers: Vec = vec![]; + while let Some(webhook_id) = rx.recv().await { + // get webhook + let state = state.clone(); + let webhook = state.config.webhooks.unwrap().iter().find(|w| w.id == webhook_id).expect("Webhook id is not correct").clone(); + match actives_debouncers.iter().find(|ad| ad.webhook_id == webhook_id) { + None => {}, + Some(current_active_debouncer) => { + trace!("Cancelling active debouncer…"); + current_active_debouncer.cancellation_token.cancel(); + actives_debouncers.remove(actives_debouncers.iter().position(|ad| ad.webhook_id == webhook_id).unwrap()); + } + } + let debouncer = Debouncer { + webhook_id, + cancellation_token: CancellationToken::new() + }; + let duration = Duration::from_secs(webhook.debounce_secs as u64); + trace!("Starting debouncer for webhook {:?} with duration {:?}", &webhook_id, &duration); + let cloned_ct = debouncer.cancellation_token.clone(); + tokio::spawn(async move { + run_debouncer(webhook, duration, state.executor_tx.clone(), cloned_ct).await; + }); + actives_debouncers.push(debouncer); + } + debug!("Debouncer stopped"); +} + diff --git a/src/executor.rs b/src/executor.rs index 1bbb8d9..bf24cc9 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -1,6 +1,6 @@ use anyhow::{anyhow, Result}; use chrono::{SecondsFormat, Utc}; -use log::{debug, info, trace, error}; +use log::{debug, info, error}; use std::process::Stdio; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::Command; @@ -10,7 +10,7 @@ use crate::models::ExecutorOrder; use crate::AppState; async fn run_task(state: AppState, order: ExecutorOrder) -> Result<()> { - debug!("Start processing of order {}", order.id); + debug!("Start processing of order {:?}", &order.id); // save in DB let _result = sqlx::query("INSERT INTO task_runs (id, task_id, trigger_mode, status, submitted_at, started_at) VALUES ($1, $2, $3, $4, $5, $5)") .bind(order.id.to_string()) @@ -51,12 +51,11 @@ async fn run_task(state: AppState, order: ExecutorOrder) -> Result<()> { let mut collected_logs = String::new(); let mut lines = BufReader::with_capacity(16, stdout).lines(); while let Some(line) = lines.next_line().await.unwrap() { - trace!("{}: {}", order.id, line); collected_logs += &format!("{}\n", line); } let status = process_handle.await?; if !status.success() { - error!("Non successful exit code found {}", status); + error!("Non successful exit code found: {}", status); } let _result = sqlx::query("UPDATE task_runs SET status = $2, ended_at = $3, exit_code = $4, logs = $5 WHERE id = $1") .bind(order.id.to_string()) @@ -69,14 +68,14 @@ async fn run_task(state: AppState, order: ExecutorOrder) -> Result<()> { .bind(collected_logs) .execute(&state.db) .await.unwrap(); - debug!("End of task, details saved"); + debug!("End of executor order {:?}, details saved", &order.id); Ok(()) } pub async fn run_executor(state: AppState, mut rx: Receiver) { debug!("Executor started"); while let Some(order) = rx.recv().await { - println!("Got Order: {:?}", order); + debug!("Executor got Order: {:?}", order); let local_state = state.clone(); tokio::spawn(async { run_task(local_state, order).await diff --git a/src/main.rs b/src/main.rs index 12212e2..268f55c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,8 +2,10 @@ mod controllers; mod models; mod executor; mod scheduler; +mod debouncer; use argh::FromArgs; +use debouncer::run_debouncer_supervisor; use log::info; use anyhow::{anyhow, Context, Result}; use axum::routing::get; @@ -25,6 +27,7 @@ pub struct AppState { config: Config, db: Pool, executor_tx: Arc>, + debouncer_tx: Arc>, // u64 represent the id of the webhook templating_env: Environment<'static> } @@ -56,7 +59,7 @@ fn build_templating_env() -> Environment<'static> { let content: &'static str = Box::leak(fs::read_to_string(&path).unwrap().into_boxed_str()); let path: &'static str = Box::leak(format!("pages/{}", file_name).into_boxed_str()); templating_env - .add_template(&path, &content) + .add_template(path, content) .unwrap(); } templating_env.add_global("gl", context! { @@ -76,7 +79,9 @@ async fn main() -> Result<()> { let pool = prepare_database().await.context("Prepare db")?; // start channel to talk to executor daemon - let (tx, rx) = mpsc::channel::(32); + let (executor_tx, executor_rx) = mpsc::channel::(32); + // channel to talk to debouncer, by using webook id + let (debouncer_tx, debouncer_rx) = mpsc::channel::(32); let config_path = match flags.config { Some(v) => v, @@ -88,20 +93,27 @@ async fn main() -> Result<()> { let state = AppState { config, db: pool, - executor_tx: Arc::new(tx), - templating_env: build_templating_env() + executor_tx: Arc::new(executor_tx), + templating_env: build_templating_env(), + debouncer_tx: Arc::new(debouncer_tx) }; // start executor daemon let executor_app_state = state.clone(); let executor_handle = tokio::spawn(async { - run_executor(executor_app_state, rx).await + run_executor(executor_app_state, executor_rx).await }); let scheduler_app_state = state.clone(); let scheduler_handle = tokio::spawn(async { run_scheduler(scheduler_app_state).await }); + // run debouncer + let debouncer_supervisor_app_state = state.clone(); + tokio::spawn(async { + run_debouncer_supervisor(debouncer_supervisor_app_state, debouncer_rx).await + }); + // build our application with a single route let app = Router::new() .route("/", get(controllers::home)) @@ -112,7 +124,7 @@ async fn main() -> Result<()> { "/tasks/:task_id/runs/:run_id", get(controllers::get_task_run), ) - .route("/webhooks/:token", get(controllers::handle_webhook)) + .route("/webhooks/:id/:token", get(controllers::handle_webhook)) .nest_service("/assets", ServeDir::new("./assets")) .with_state(state); diff --git a/src/models.rs b/src/models.rs index 5f0935f..f4c32ae 100644 --- a/src/models.rs +++ b/src/models.rs @@ -84,9 +84,27 @@ struct InstanceConfig { logo_uri: String } +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[fully_pub] +struct Webhook { + /// a arbitrary id to identify the webhook + id: u64, + /// a secret token used to secure the call + token: String, + /// a descriptive name + name: String, + /// list of tasks id + target_tasks: Vec, + /// number of seconds we should wait before actually triggering target tasks + debounce_secs: u32 +} + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[fully_pub] struct Config { instance: InstanceConfig, tasks: HashMap, + webhooks: Option> } + + diff --git a/src/scheduler.rs b/src/scheduler.rs index 87f18d9..9f4105b 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -73,7 +73,7 @@ pub async fn run_scheduler(app_state: AppState) -> Result<(), JobSchedulerError> // Add code to be run during/after shutdown sched.set_shutdown_handler(Box::new(|| { Box::pin(async move { - println!("Shut down done"); + debug!("Shut down done"); }) }));