feat: webhook with debounce support

This commit is contained in:
Matthieu Bessat 2024-07-24 16:03:44 +02:00
parent 481d59e963
commit c40c6aedef
10 changed files with 142 additions and 20 deletions

1
Cargo.lock generated
View file

@ -191,6 +191,7 @@ dependencies = [
"sqlx",
"tokio",
"tokio-cron-scheduler",
"tokio-util",
"tower-http",
"uuid",
]

View file

@ -21,4 +21,5 @@ env_logger = "0.11.3"
tower-http = { version = "0.5.2", features = ["fs"] }
tokio-cron-scheduler = "0.10.2"
argh = "0.1.12"
tokio-util = "0.7.11"

View file

@ -26,7 +26,7 @@ Unix-like
- [x] Schedule tasks (CRON-like).
- [ ] OpenMetrics exporter to alert when a task failed.
- [ ] External alerting when a task failed.
- [ ] Run task via webhook, with a webhook token.
- [ ] Trigger task via webhook, with a webhook token and debouncing.
- [ ] Content negociation with JSON or HTML.
- [ ] OAuth2 support for admin.

View file

@ -10,7 +10,7 @@ tasks:
SIMULATION_SPEED: 0.2
command:
- /usr/bin/python3
- /home/mbess/workspace/autotasker/examples/do_something_1.py
- /home/mbess/workspace/perso/autotasker/examples/do_something_1.py
reindex_db:
name: Reindex the whole database
@ -30,3 +30,10 @@ tasks:
# schedule:
# "0 * * * * *"
webhooks:
- id: 1
name: "Trigger magic stuff"
token: 988c19fe-fd5d-4887-8210-60e0dc50ba9e
target_tasks:
- do_magic_stuff
debounce_secs: 10

View file

@ -3,7 +3,7 @@ use axum::extract::{Path as ExtractPath, State};
use axum::http::StatusCode;
use axum::Json;
use axum::response::{Html, IntoResponse};
use minijinja::{context, render};
use minijinja::context;
use uuid::Uuid;
use crate::models::ExecutorOrder;
@ -64,12 +64,28 @@ pub async fn trigger_task(
}
pub async fn handle_webhook(
State(_app_state): State<AppState>,
ExtractPath(token): ExtractPath<String>,
State(app_state): State<AppState>,
ExtractPath((webhook_id, submitted_token)): ExtractPath<(u64, String)>,
) -> (StatusCode, Json<String>) {
println!("Webhook token {}", token);
(StatusCode::OK, axum::Json("WebHook handle".to_string()))
let webhooks = app_state.config.webhooks.unwrap_or_default();
let webhook = match webhooks.iter().find(|x| x.id == webhook_id) {
Some(x) => x,
None => {
return (StatusCode::NOT_FOUND, axum::Json("Webhook not found".to_string()))
}
};
if webhook.token != submitted_token {
return (StatusCode::UNAUTHORIZED, axum::Json("Webhook token is invalid".to_string()))
}
// start debounce
app_state
.debouncer_tx
.send(webhook_id)
.await
.unwrap();
(StatusCode::OK, axum::Json("WebHook received".to_string()))
}
pub async fn list_task_runs(

68
src/debouncer.rs Normal file
View file

@ -0,0 +1,68 @@
use log::{debug, trace};
use tokio_util::sync::CancellationToken;
use uuid::Uuid;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::{Receiver, Sender};
use crate::models::{ExecutorOrder, Webhook};
use crate::AppState;
async fn run_debouncer(webhook: Webhook, duration: Duration, executor_tx: Arc<Sender<ExecutorOrder>>, cancellation_token: CancellationToken) {
tokio::select! {
// Step 3: Using cloned token to listen to cancellation requests
_ = cancellation_token.cancelled() => {
// The token was cancelled, task can shut down
trace!("Debouncer cancelled");
},
_ = tokio::time::sleep(duration) => {
// Long work has completed
trace!("Debouncer finished for webhook {:?}", webhook);
for task_id in &webhook.target_tasks {
trace!("Debouncer is now triggering {}", &task_id);
let _ = executor_tx.send(ExecutorOrder {
id: Uuid::new_v4(),
task_id: task_id.to_string()
}).await;
}
}
}
}
#[derive(Debug)]
struct Debouncer {
webhook_id: u64,
cancellation_token: CancellationToken
}
pub async fn run_debouncer_supervisor(state: AppState, mut rx: Receiver<u64>) {
debug!("Debouncer started");
let mut actives_debouncers: Vec<Debouncer> = vec![];
while let Some(webhook_id) = rx.recv().await {
// get webhook
let state = state.clone();
let webhook = state.config.webhooks.unwrap().iter().find(|w| w.id == webhook_id).expect("Webhook id is not correct").clone();
match actives_debouncers.iter().find(|ad| ad.webhook_id == webhook_id) {
None => {},
Some(current_active_debouncer) => {
trace!("Cancelling active debouncer…");
current_active_debouncer.cancellation_token.cancel();
actives_debouncers.remove(actives_debouncers.iter().position(|ad| ad.webhook_id == webhook_id).unwrap());
}
}
let debouncer = Debouncer {
webhook_id,
cancellation_token: CancellationToken::new()
};
let duration = Duration::from_secs(webhook.debounce_secs as u64);
trace!("Starting debouncer for webhook {:?} with duration {:?}", &webhook_id, &duration);
let cloned_ct = debouncer.cancellation_token.clone();
tokio::spawn(async move {
run_debouncer(webhook, duration, state.executor_tx.clone(), cloned_ct).await;
});
actives_debouncers.push(debouncer);
}
debug!("Debouncer stopped");
}

View file

@ -1,6 +1,6 @@
use anyhow::{anyhow, Result};
use chrono::{SecondsFormat, Utc};
use log::{debug, info, trace, error};
use log::{debug, info, error};
use std::process::Stdio;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command;
@ -10,7 +10,7 @@ use crate::models::ExecutorOrder;
use crate::AppState;
async fn run_task(state: AppState, order: ExecutorOrder) -> Result<()> {
debug!("Start processing of order {}", order.id);
debug!("Start processing of order {:?}", &order.id);
// save in DB
let _result = sqlx::query("INSERT INTO task_runs (id, task_id, trigger_mode, status, submitted_at, started_at) VALUES ($1, $2, $3, $4, $5, $5)")
.bind(order.id.to_string())
@ -51,12 +51,11 @@ async fn run_task(state: AppState, order: ExecutorOrder) -> Result<()> {
let mut collected_logs = String::new();
let mut lines = BufReader::with_capacity(16, stdout).lines();
while let Some(line) = lines.next_line().await.unwrap() {
trace!("{}: {}", order.id, line);
collected_logs += &format!("{}\n", line);
}
let status = process_handle.await?;
if !status.success() {
error!("Non successful exit code found {}", status);
error!("Non successful exit code found: {}", status);
}
let _result = sqlx::query("UPDATE task_runs SET status = $2, ended_at = $3, exit_code = $4, logs = $5 WHERE id = $1")
.bind(order.id.to_string())
@ -69,14 +68,14 @@ async fn run_task(state: AppState, order: ExecutorOrder) -> Result<()> {
.bind(collected_logs)
.execute(&state.db)
.await.unwrap();
debug!("End of task, details saved");
debug!("End of executor order {:?}, details saved", &order.id);
Ok(())
}
pub async fn run_executor(state: AppState, mut rx: Receiver<ExecutorOrder>) {
debug!("Executor started");
while let Some(order) = rx.recv().await {
println!("Got Order: {:?}", order);
debug!("Executor got Order: {:?}", order);
let local_state = state.clone();
tokio::spawn(async {
run_task(local_state, order).await

View file

@ -2,8 +2,10 @@ mod controllers;
mod models;
mod executor;
mod scheduler;
mod debouncer;
use argh::FromArgs;
use debouncer::run_debouncer_supervisor;
use log::info;
use anyhow::{anyhow, Context, Result};
use axum::routing::get;
@ -25,6 +27,7 @@ pub struct AppState {
config: Config,
db: Pool<Sqlite>,
executor_tx: Arc<Sender<ExecutorOrder>>,
debouncer_tx: Arc<Sender<u64>>, // u64 represent the id of the webhook
templating_env: Environment<'static>
}
@ -56,7 +59,7 @@ fn build_templating_env() -> Environment<'static> {
let content: &'static str = Box::leak(fs::read_to_string(&path).unwrap().into_boxed_str());
let path: &'static str = Box::leak(format!("pages/{}", file_name).into_boxed_str());
templating_env
.add_template(&path, &content)
.add_template(path, content)
.unwrap();
}
templating_env.add_global("gl", context! {
@ -76,7 +79,9 @@ async fn main() -> Result<()> {
let pool = prepare_database().await.context("Prepare db")?;
// start channel to talk to executor daemon
let (tx, rx) = mpsc::channel::<ExecutorOrder>(32);
let (executor_tx, executor_rx) = mpsc::channel::<ExecutorOrder>(32);
// channel to talk to debouncer, by using webook id
let (debouncer_tx, debouncer_rx) = mpsc::channel::<u64>(32);
let config_path = match flags.config {
Some(v) => v,
@ -88,20 +93,27 @@ async fn main() -> Result<()> {
let state = AppState {
config,
db: pool,
executor_tx: Arc::new(tx),
templating_env: build_templating_env()
executor_tx: Arc::new(executor_tx),
templating_env: build_templating_env(),
debouncer_tx: Arc::new(debouncer_tx)
};
// start executor daemon
let executor_app_state = state.clone();
let executor_handle = tokio::spawn(async {
run_executor(executor_app_state, rx).await
run_executor(executor_app_state, executor_rx).await
});
let scheduler_app_state = state.clone();
let scheduler_handle = tokio::spawn(async {
run_scheduler(scheduler_app_state).await
});
// run debouncer
let debouncer_supervisor_app_state = state.clone();
tokio::spawn(async {
run_debouncer_supervisor(debouncer_supervisor_app_state, debouncer_rx).await
});
// build our application with a single route
let app = Router::new()
.route("/", get(controllers::home))
@ -112,7 +124,7 @@ async fn main() -> Result<()> {
"/tasks/:task_id/runs/:run_id",
get(controllers::get_task_run),
)
.route("/webhooks/:token", get(controllers::handle_webhook))
.route("/webhooks/:id/:token", get(controllers::handle_webhook))
.nest_service("/assets", ServeDir::new("./assets"))
.with_state(state);

View file

@ -84,9 +84,27 @@ struct InstanceConfig {
logo_uri: String
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[fully_pub]
struct Webhook {
/// a arbitrary id to identify the webhook
id: u64,
/// a secret token used to secure the call
token: String,
/// a descriptive name
name: String,
/// list of tasks id
target_tasks: Vec<String>,
/// number of seconds we should wait before actually triggering target tasks
debounce_secs: u32
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[fully_pub]
struct Config {
instance: InstanceConfig,
tasks: HashMap<String, Task>,
webhooks: Option<Vec<Webhook>>
}

View file

@ -73,7 +73,7 @@ pub async fn run_scheduler(app_state: AppState) -> Result<(), JobSchedulerError>
// Add code to be run during/after shutdown
sched.set_shutdown_handler(Box::new(|| {
Box::pin(async move {
println!("Shut down done");
debug!("Shut down done");
})
}));