fix: trigger mode and status enums
This commit is contained in:
parent
55438fbd83
commit
b7df4f9ae2
8 changed files with 38 additions and 29 deletions
30
TODO.md
30
TODO.md
|
@ -2,31 +2,30 @@
|
||||||
|
|
||||||
## TODO
|
## TODO
|
||||||
|
|
||||||
- [x] Implement support for webhook and debouncing of webhook
|
- [ ] Support connecting to remote server by SSH to execute task remotely
|
||||||
- [ ] Add stderr capture
|
- [ ] Implement basic auth with OAuth2
|
||||||
- [ ] Implement basic auth with HTTP basic auth (to trigger and see logs only)
|
- [ ] Add a way to categorize tasks, regroup tasks
|
||||||
- [ ] add CSS badge and color code on job status
|
- [ ] Don't use long UUID, but only ids
|
||||||
|
- [ ] Add CSS badge and color code on job status
|
||||||
- [ ] Validating config file
|
- [ ] Validating config file
|
||||||
- validate schedule CRON syntax
|
- validate schedule CRON syntax
|
||||||
- [ ] Implement basic auth with OAuth2
|
|
||||||
- [ ] Add `Dockerfile` and docker-compose example
|
- [ ] Add `Dockerfile` and docker-compose example
|
||||||
- [ ] Add CI/CD to build docker image
|
- [ ] Add CI/CD to build docker image
|
||||||
- [ ] Add configuration to limit the logs head and tail
|
- [ ] Add configuration options to limit the logs capture head and tail
|
||||||
- In a limited mode, we would filter out logs and only keep the head and tail AND Errors or stacktrace
|
- In a limited mode, we would filter out logs and only keep the head and tail AND Errors or stacktrace
|
||||||
- [ ] Save logs incremently, in regular fashion
|
|
||||||
- [ ] Add HTML table instead of UL for tasks run history
|
|
||||||
- [ ] Sort tasks run history by date
|
- [ ] Sort tasks run history by date
|
||||||
- [ ] Pagination of tasks run history
|
- [ ] Add pagination of tasks run history
|
||||||
- [ ] JSON support, content-negotiation
|
- [ ] JSON support, content-negotiation, can be used with a CLI API client
|
||||||
- [ ] Websocket return of logs
|
- [ ] Websocket return of logs
|
||||||
- [ ] Add openmetrics exporter endpoint to alert when jobs fails
|
- [ ] Add openmetrics exporter endpoint to alert when jobs fails
|
||||||
- [ ] Add jobs groups?
|
|
||||||
- [ ] Add a way to trigger one or multiples tasks from a webhook endpoint, defined in a config file
|
|
||||||
- [ ] Support connecting to remote server by SSH to execute task
|
|
||||||
- [ ] Support role authorization RBAC, reader and writer.
|
- [ ] Support role authorization RBAC, reader and writer.
|
||||||
- Reader can only read logs and tasks results
|
- Reader can only read logs and tasks results
|
||||||
- Writter can trigger task run.
|
- Writter can trigger task run.
|
||||||
|
|
||||||
|
- [ ] TO see: Adapt the rate at which we update logs into the DB depending on the rate the program is outputing logs (or duration of program)?
|
||||||
|
|
||||||
|
- [ ] Add stable orders of tasks, sort by a "order" field or by name and category?
|
||||||
|
|
||||||
## Archive
|
## Archive
|
||||||
|
|
||||||
- [x] setup sqlite
|
- [x] setup sqlite
|
||||||
|
@ -34,3 +33,8 @@
|
||||||
- [x] Implement basic scheduler
|
- [x] Implement basic scheduler
|
||||||
- [x] Add basic CSS
|
- [x] Add basic CSS
|
||||||
- [x] Load config file from `/etc/`
|
- [x] Load config file from `/etc/`
|
||||||
|
- [x] Implement support for webhook and debouncing of webhook
|
||||||
|
- [x] Add stderr capture
|
||||||
|
- [x] Save logs incremently, in regular fashion
|
||||||
|
- [x] Add HTML table instead of UL for tasks run history
|
||||||
|
- [x] Add a way to trigger one or multiples tasks from a webhook endpoint, defined in a config file
|
||||||
|
|
|
@ -7,7 +7,7 @@ tasks:
|
||||||
name: Do magic incantation
|
name: Do magic incantation
|
||||||
env:
|
env:
|
||||||
PYTHONUNBUFFERED: "1"
|
PYTHONUNBUFFERED: "1"
|
||||||
SIMULATION_SPEED: 0.2
|
SIMULATION_SPEED: 11
|
||||||
command:
|
command:
|
||||||
- /usr/bin/python3
|
- /usr/bin/python3
|
||||||
- /home/mbess/workspace/autotasker/examples/do_something_1.py
|
- /home/mbess/workspace/autotasker/examples/do_something_1.py
|
||||||
|
|
|
@ -2,8 +2,8 @@ DROP TABLE IF EXISTS task_runs;
|
||||||
CREATE TABLE task_runs (
|
CREATE TABLE task_runs (
|
||||||
id TEXT PRIMARY KEY,
|
id TEXT PRIMARY KEY,
|
||||||
task_id TEXT NOT NULL,
|
task_id TEXT NOT NULL,
|
||||||
status TEXT CHECK(status IN ('pending','running','failed','success')) NOT NULL DEFAULT 'pending',
|
status TEXT CHECK(status IN ('Pending','Running','Failed','Success')) NOT NULL DEFAULT 'pending',
|
||||||
trigger_mode TEXT CHECK(trigger_mode IN ('manual','webhook','schedule')) NOT NULL DEFAULT 'manual',
|
trigger_mode TEXT CHECK(trigger_mode IN ('Manual','Webhook','Schedule')) NOT NULL DEFAULT 'manual',
|
||||||
exit_code INT,
|
exit_code INT,
|
||||||
runtime_details JSON,
|
runtime_details JSON,
|
||||||
submitted_at DATETIME,
|
submitted_at DATETIME,
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
use crate::models::{LogLine, TaskRun, TaskRunSummary};
|
use crate::models::{LogLine, TaskRun, TaskRunSummary, TriggerMode};
|
||||||
use axum::extract::{Path as ExtractPath, State};
|
use axum::extract::{Path as ExtractPath, State};
|
||||||
use axum::http::StatusCode;
|
use axum::http::StatusCode;
|
||||||
use axum::Json;
|
use axum::Json;
|
||||||
|
@ -46,6 +46,7 @@ pub async fn trigger_task(
|
||||||
.executor_tx
|
.executor_tx
|
||||||
.send(ExecutorOrder {
|
.send(ExecutorOrder {
|
||||||
id: Uuid::new_v4(),
|
id: Uuid::new_v4(),
|
||||||
|
trigger_mode: TriggerMode::Manual,
|
||||||
task_id
|
task_id
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
|
@ -92,7 +93,7 @@ pub async fn list_task_runs(
|
||||||
State(app_state): State<AppState>,
|
State(app_state): State<AppState>,
|
||||||
ExtractPath(task_id): ExtractPath<String>,
|
ExtractPath(task_id): ExtractPath<String>,
|
||||||
) -> Html<String> {
|
) -> Html<String> {
|
||||||
let runs = sqlx::query_as::<_, TaskRunSummary>("SELECT id,status,trigger_mode,submitted_at,started_at,ended_at FROM task_runs WHERE task_id = $1")
|
let runs = sqlx::query_as::<_, TaskRunSummary>("SELECT id,status,trigger_mode,submitted_at,started_at,ended_at FROM task_runs WHERE task_id = $1 ORDER BY submitted_at ASC")
|
||||||
.bind(&task_id)
|
.bind(&task_id)
|
||||||
.fetch_all(&app_state.db)
|
.fetch_all(&app_state.db)
|
||||||
.await
|
.await
|
||||||
|
|
|
@ -5,7 +5,7 @@ use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tokio::sync::mpsc::{Receiver, Sender};
|
use tokio::sync::mpsc::{Receiver, Sender};
|
||||||
|
|
||||||
use crate::models::{ExecutorOrder, Webhook};
|
use crate::models::{ExecutorOrder, TriggerMode, Webhook};
|
||||||
use crate::AppState;
|
use crate::AppState;
|
||||||
|
|
||||||
async fn run_debouncer(webhook: Webhook, duration: Duration, executor_tx: Arc<Sender<ExecutorOrder>>, cancellation_token: CancellationToken) {
|
async fn run_debouncer(webhook: Webhook, duration: Duration, executor_tx: Arc<Sender<ExecutorOrder>>, cancellation_token: CancellationToken) {
|
||||||
|
@ -22,6 +22,7 @@ async fn run_debouncer(webhook: Webhook, duration: Duration, executor_tx: Arc<Se
|
||||||
trace!("Debouncer is now triggering {}", &task_id);
|
trace!("Debouncer is now triggering {}", &task_id);
|
||||||
let _ = executor_tx.send(ExecutorOrder {
|
let _ = executor_tx.send(ExecutorOrder {
|
||||||
id: Uuid::new_v4(),
|
id: Uuid::new_v4(),
|
||||||
|
trigger_mode: TriggerMode::Webhook,
|
||||||
task_id: task_id.to_string()
|
task_id: task_id.to_string()
|
||||||
}).await;
|
}).await;
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,7 +9,7 @@ use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader};
|
||||||
use tokio::process::Command;
|
use tokio::process::Command;
|
||||||
use tokio::sync::mpsc::Receiver;
|
use tokio::sync::mpsc::Receiver;
|
||||||
|
|
||||||
use crate::models::{ExecutorOrder, LogKind, LogLine};
|
use crate::models::{ExecutorOrder, LogKind, LogLine, TaskStatus};
|
||||||
use crate::AppState;
|
use crate::AppState;
|
||||||
|
|
||||||
async fn insert_logs(db: &Pool<Sqlite>, collected_logs: &mut Vec<LogLine>) -> Result<()> {
|
async fn insert_logs(db: &Pool<Sqlite>, collected_logs: &mut Vec<LogLine>) -> Result<()> {
|
||||||
|
@ -59,8 +59,8 @@ async fn run_task(state: AppState, order: ExecutorOrder) -> Result<()> {
|
||||||
let _result = sqlx::query("INSERT INTO task_runs (id, task_id, trigger_mode, status, submitted_at, started_at) VALUES ($1, $2, $3, $4, $5, $5)")
|
let _result = sqlx::query("INSERT INTO task_runs (id, task_id, trigger_mode, status, submitted_at, started_at) VALUES ($1, $2, $3, $4, $5, $5)")
|
||||||
.bind(order.id.to_string())
|
.bind(order.id.to_string())
|
||||||
.bind(order.task_id.clone())
|
.bind(order.task_id.clone())
|
||||||
.bind("manual")
|
.bind(order.trigger_mode.to_string())
|
||||||
.bind("running")
|
.bind(TaskStatus::Running.to_string())
|
||||||
.bind(Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true))
|
.bind(Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true))
|
||||||
.execute(&state.db)
|
.execute(&state.db)
|
||||||
.await.unwrap();
|
.await.unwrap();
|
||||||
|
@ -115,8 +115,8 @@ async fn run_task(state: AppState, order: ExecutorOrder) -> Result<()> {
|
||||||
let _result = sqlx::query("UPDATE task_runs SET status = $2, ended_at = $3, exit_code = $4 WHERE id = $1")
|
let _result = sqlx::query("UPDATE task_runs SET status = $2, ended_at = $3, exit_code = $4 WHERE id = $1")
|
||||||
.bind(order.id.to_string())
|
.bind(order.id.to_string())
|
||||||
.bind(match status.success() {
|
.bind(match status.success() {
|
||||||
true => "success",
|
true => TaskStatus::Success,
|
||||||
false => "failed"
|
false => TaskStatus::Failed,
|
||||||
})
|
})
|
||||||
.bind(Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true))
|
.bind(Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true))
|
||||||
.bind(status.code())
|
.bind(status.code())
|
||||||
|
|
|
@ -5,9 +5,9 @@ use fully_pub::fully_pub;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
#[derive(sqlx::Type, Debug, Serialize, Deserialize)]
|
#[derive(sqlx::Type, Clone, Debug, Serialize, Deserialize)]
|
||||||
|
#[derive(strum_macros::Display)]
|
||||||
#[fully_pub]
|
#[fully_pub]
|
||||||
#[sqlx(rename_all = "lowercase")]
|
|
||||||
enum TriggerMode {
|
enum TriggerMode {
|
||||||
Manual,
|
Manual,
|
||||||
Webhook,
|
Webhook,
|
||||||
|
@ -15,7 +15,7 @@ enum TriggerMode {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(sqlx::Type, Debug, Serialize, Deserialize)]
|
#[derive(sqlx::Type, Debug, Serialize, Deserialize)]
|
||||||
#[sqlx(rename_all = "lowercase")]
|
#[derive(strum_macros::Display)]
|
||||||
#[fully_pub]
|
#[fully_pub]
|
||||||
enum TaskStatus {
|
enum TaskStatus {
|
||||||
Pending,
|
Pending,
|
||||||
|
@ -94,6 +94,7 @@ struct Task {
|
||||||
struct ExecutorOrder {
|
struct ExecutorOrder {
|
||||||
id: Uuid,
|
id: Uuid,
|
||||||
task_id: String,
|
task_id: String,
|
||||||
|
trigger_mode: TriggerMode
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||||
|
|
|
@ -4,16 +4,17 @@ use tokio_cron_scheduler::{Job, JobScheduler, JobSchedulerError};
|
||||||
use tokio::sync::mpsc::Sender;
|
use tokio::sync::mpsc::Sender;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::{models::{ExecutorOrder, ScheduleConfig}, AppState};
|
use crate::{models::{ExecutorOrder, ScheduleConfig, TriggerMode}, AppState};
|
||||||
|
|
||||||
fn get_repeated_job(task_id: String, executor_tx: Arc<Sender<ExecutorOrder>>, seconds: u64) -> Result<Job, JobSchedulerError> {
|
fn get_repeated_job(task_id: String, executor_tx: Arc<Sender<ExecutorOrder>>, seconds: u64) -> Result<Job, JobSchedulerError> {
|
||||||
Job::new_repeated_async(
|
Job::new_repeated_async(
|
||||||
Duration::from_secs(seconds),
|
Duration::from_secs(seconds),
|
||||||
move |_uuid, _l| {
|
move |_uuid, _l| {
|
||||||
|
let executor_tx = executor_tx.clone();
|
||||||
Box::pin({
|
Box::pin({
|
||||||
let executor_tx = executor_tx.clone();
|
|
||||||
let order = ExecutorOrder {
|
let order = ExecutorOrder {
|
||||||
id: Uuid::new_v4(),
|
id: Uuid::new_v4(),
|
||||||
|
trigger_mode: TriggerMode::Schedule,
|
||||||
task_id: task_id.clone()
|
task_id: task_id.clone()
|
||||||
};
|
};
|
||||||
async move {
|
async move {
|
||||||
|
@ -52,6 +53,7 @@ pub async fn run_scheduler(app_state: AppState) -> Result<(), JobSchedulerError>
|
||||||
executor_tx
|
executor_tx
|
||||||
.send(ExecutorOrder {
|
.send(ExecutorOrder {
|
||||||
id: Uuid::new_v4(),
|
id: Uuid::new_v4(),
|
||||||
|
trigger_mode: TriggerMode::Schedule,
|
||||||
task_id
|
task_id
|
||||||
})
|
})
|
||||||
.await.unwrap();
|
.await.unwrap();
|
||||||
|
|
Loading…
Reference in a new issue