feat: add scheduler

This commit is contained in:
Matthieu Bessat 2024-07-22 20:25:30 +02:00
parent c2a2721ec6
commit 02639175f4
12 changed files with 430 additions and 288 deletions

506
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -19,5 +19,5 @@ fully_pub = "0.1.4"
log = "0.4.22" log = "0.4.22"
env_logger = "0.11.3" env_logger = "0.11.3"
tower-http = { version = "0.5.2", features = ["fs"] } tower-http = { version = "0.5.2", features = ["fs"] }
axum-template = { version = "2.3.0", features = ["minijinja"] } tokio-cron-scheduler = "0.10.2"

View file

@ -5,6 +5,7 @@
- [ ] Implement basic scheduler - [ ] Implement basic scheduler
- [ ] Implement basic auth with OAuth2 - [ ] Implement basic auth with OAuth2
- [ ] Validating config file - [ ] Validating config file
- validate schedule CRON syntax
- [ ] Load config file from `/etc/` - [ ] Load config file from `/etc/`
- [ ] Add CSS style with bootstrap - [ ] Add CSS style with bootstrap
- [ ] Add `Dockerfile` and docker-compose example - [ ] Add `Dockerfile` and docker-compose example

View file

@ -1,5 +1,9 @@
instance:
name: Example company
logo_uri: https://src.lefuturiste.fr/images/lefuturiste-300-300.png
tasks: tasks:
- id: do_magic_stuff do_magic_stuff:
name: Do magic incantation name: Do magic incantation
env: env:
PYTHONUNBUFFERED: "1" PYTHONUNBUFFERED: "1"
@ -7,11 +11,22 @@ tasks:
command: command:
- /usr/bin/python3 - /usr/bin/python3
- /home/mbess/workspace/autotasker/examples/do_something_1.py - /home/mbess/workspace/autotasker/examples/do_something_1.py
store_logs: true
- id: reindex_db reindex_db:
name: Reindex the whole database name: Reindex the whole database
env: {} env: {}
command: command:
- ls - ls
- /etc/fstab - /etc/fstab
schedule:
seconds: 15
clean_up:
name: Clean up things
env: {}
command:
- cat
- /etc/environment
schedule:
"0 * * * * *"

View file

@ -1,30 +1,28 @@
use crate::models::{Task, TaskRun, TaskRunSummary}; use crate::models::{TaskRun, TaskRunSummary};
use axum::extract::{Path as ExtractPath, State}; use axum::extract::{Path as ExtractPath, State};
use axum::http::StatusCode; use axum::http::StatusCode;
use axum::Json; use axum::Json;
use axum::response::{Html, IntoResponse, Response}; use axum::response::{Html, IntoResponse};
use axum_template::RenderHtml;
use minijinja::{context, render}; use minijinja::{context, render};
use uuid::Uuid; use uuid::Uuid;
use crate::models::ExecutorOrder; use crate::models::ExecutorOrder;
use crate::{AppState}; use crate::AppState;
pub async fn home( pub async fn home(
State(app_state): State<AppState> State(app_state): State<AppState>
) -> impl IntoResponse { ) -> impl IntoResponse {
Html( Html(
app_state.template_engine.get_template("pages/home.html").unwrap() app_state.templating_env.get_template("pages/home.html").unwrap()
.render(context!()) .render(context!())
.unwrap() .unwrap()
) )
} }
pub async fn list_tasks(State(app_state): State<AppState>) -> Html<String> { pub async fn list_tasks(State(app_state): State<AppState>) -> Html<String> {
let tasks: Vec<Task> = app_state.config.tasks;
Html(render!( Html(render!(
include_str!("./templates/pages/list_tasks.html"), include_str!("./templates/pages/list_tasks.html"),
tasks => tasks tasks => Vec::from_iter(app_state.config.tasks.iter())
)) ))
} }
@ -32,8 +30,7 @@ pub async fn trigger_task(
State(app_state): State<AppState>, State(app_state): State<AppState>,
ExtractPath(task_id): ExtractPath<String>, ExtractPath(task_id): ExtractPath<String>,
) -> (StatusCode, Html<String>) { ) -> (StatusCode, Html<String>) {
let tasks: Vec<Task> = app_state.config.tasks; let task = match app_state.config.tasks.get(&task_id) {
let task = match tasks.iter().find(|t| t.id == task_id) {
Some(t) => t, Some(t) => t,
None => { None => {
return ( return (
@ -46,7 +43,7 @@ pub async fn trigger_task(
.executor_tx .executor_tx
.send(ExecutorOrder { .send(ExecutorOrder {
id: Uuid::new_v4(), id: Uuid::new_v4(),
task: task.clone(), task_id
}) })
.await .await
.unwrap(); .unwrap();
@ -55,7 +52,7 @@ pub async fn trigger_task(
StatusCode::OK, StatusCode::OK,
Html(render!( Html(render!(
include_str!("./templates/pages/run_task.html"), include_str!("./templates/pages/run_task.html"),
tasks => tasks task => task
)), )),
) )
} }
@ -78,7 +75,7 @@ pub async fn list_task_runs(
.fetch_all(&app_state.db) .fetch_all(&app_state.db)
.await .await
.unwrap(); .unwrap();
let task = match app_state.config.tasks.iter().find(|t| t.id == task_id) { let task = match app_state.config.tasks.get(&task_id) {
Some(v) => v, Some(v) => v,
None => { None => {
return Html("<h1>Task not found</h1>".to_string()); return Html("<h1>Task not found</h1>".to_string());
@ -100,8 +97,7 @@ pub async fn get_task_run(
.fetch_one(&app_state.db) .fetch_one(&app_state.db)
.await { .await {
Ok(v) => v, Ok(v) => v,
Err(e) => { Err(_e) => {
dbg!(e);
return Html("<h1>Task run not found</h1>".to_string()); return Html("<h1>Task run not found</h1>".to_string());
} }
}; };

View file

@ -10,26 +10,27 @@ use crate::models::ExecutorOrder;
use crate::AppState; use crate::AppState;
async fn run_task(state: AppState, order: ExecutorOrder) -> Result<()> { async fn run_task(state: AppState, order: ExecutorOrder) -> Result<()> {
let executable = match order.task.command.first() {
Some(v) => v,
None => return Err(anyhow!("Could not find command to execute")),
};
debug!("Start processing of order {}", order.id); debug!("Start processing of order {}", order.id);
// save in DB // save in DB
let _result = sqlx::query("INSERT INTO task_runs (id, task_id, trigger_mode, status, submitted_at, started_at) VALUES ($1, $2, $3, $4, $5, $5)") let _result = sqlx::query("INSERT INTO task_runs (id, task_id, trigger_mode, status, submitted_at, started_at) VALUES ($1, $2, $3, $4, $5, $5)")
.bind(order.id.to_string()) .bind(order.id.to_string())
.bind(order.task.id.clone()) .bind(order.task_id.clone())
.bind("manual") .bind("manual")
.bind("running") .bind("running")
.bind(Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true)) .bind(Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true))
.execute(&state.db) .execute(&state.db)
.await.unwrap(); .await.unwrap();
let task = state.config.tasks.get(&order.task_id).expect("Task id to be valid");
let executable = match task.command.first() {
Some(v) => v,
None => return Err(anyhow!("Could not find command to execute")),
};
let mut cmd = Command::new(executable); let mut cmd = Command::new(executable);
cmd.args(order.task.command.iter().skip(1).collect::<Vec<&String>>()) cmd.args(task.command.iter().skip(1).collect::<Vec<&String>>())
.stdout(Stdio::piped()); .stdout(Stdio::piped());
for (key, val) in order.task.env.iter() { for (key, val) in task.env.iter() {
cmd.env(key, val); cmd.env(key, val);
} }

View file

@ -1,13 +1,14 @@
mod controllers; mod controllers;
mod models; mod models;
mod executor; mod executor;
mod scheduler;
use axum_template::engine::Engine;
use log::info; use log::info;
use anyhow::{anyhow, Context, Result}; use anyhow::{anyhow, Context, Result};
use axum::routing::get; use axum::routing::get;
use axum::Router; use axum::Router;
use minijinja::Environment; use minijinja::Environment;
use scheduler::run_scheduler;
use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions}; use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
use sqlx::{ConnectOptions, Pool, Sqlite}; use sqlx::{ConnectOptions, Pool, Sqlite};
use tower_http::services::ServeDir; use tower_http::services::ServeDir;
@ -23,7 +24,7 @@ pub struct AppState {
config: Config, config: Config,
db: Pool<Sqlite>, db: Pool<Sqlite>,
executor_tx: Arc<Sender<ExecutorOrder>>, executor_tx: Arc<Sender<ExecutorOrder>>,
template_engine: Environment<'static> templating_env: Environment<'static>
} }
fn get_config() -> Result<Config> { fn get_config() -> Result<Config> {
@ -46,17 +47,19 @@ async fn main() -> Result<()> {
let (tx, rx) = mpsc::channel::<ExecutorOrder>(32); let (tx, rx) = mpsc::channel::<ExecutorOrder>(32);
let config: Config = get_config().expect("Cannot get config"); let config: Config = get_config().expect("Cannot get config");
let mut jinja = Environment::new(); let mut templating_env = Environment::new();
jinja templating_env
.add_template("layouts/base.html", include_str!("./templates/layouts/base.html")) .add_template("layouts/base.html", include_str!("./templates/layouts/base.html"))
.unwrap(); .unwrap();
jinja.add_template("pages/home.html", include_str!("./templates/pages/home.html")).unwrap(); templating_env
.add_template("pages/home.html", include_str!("./templates/pages/home.html"))
.unwrap();
let state = AppState { let state = AppState {
config, config,
db: pool, db: pool,
executor_tx: Arc::new(tx), executor_tx: Arc::new(tx),
template_engine: jinja templating_env
}; };
// start executor daemon // start executor daemon
@ -64,6 +67,10 @@ async fn main() -> Result<()> {
let executor_handle = tokio::spawn(async { let executor_handle = tokio::spawn(async {
run_executor(executor_app_state, rx).await run_executor(executor_app_state, rx).await
}); });
let scheduler_app_state = state.clone();
let scheduler_handle = tokio::spawn(async {
run_scheduler(scheduler_app_state).await
});
// build our application with a single route // build our application with a single route
let app = Router::new() let app = Router::new()
@ -84,6 +91,7 @@ async fn main() -> Result<()> {
let listener = tokio::net::TcpListener::bind(listen_addr).await.unwrap(); let listener = tokio::net::TcpListener::bind(listen_addr).await.unwrap();
axum::serve(listener, app).await?; axum::serve(listener, app).await?;
executor_handle.await?; executor_handle.await?;
scheduler_handle.await??;
Ok(()) Ok(())
} }
@ -91,13 +99,6 @@ async fn main() -> Result<()> {
async fn prepare_database() -> Result<Pool<Sqlite>> { async fn prepare_database() -> Result<Pool<Sqlite>> {
let conn_str = "sqlite:./tmp/dbs/autotasker.db"; let conn_str = "sqlite:./tmp/dbs/autotasker.db";
// // create database if it does not exist
// let conn = SqliteConnectOptions::from_str(&conn_str)?
// .create_if_missing(true)
// .connect()
// .await?;
// let _ = conn.close().await;
let pool = SqlitePoolOptions::new() let pool = SqlitePoolOptions::new()
.max_connections(50) .max_connections(50)
.connect_with( .connect_with(

View file

@ -49,24 +49,44 @@ struct TaskRun {
ended_at: Option<DateTime<Utc>>, ended_at: Option<DateTime<Utc>>,
} }
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(untagged)]
#[fully_pub]
enum ScheduleConfig {
DurationSeconds { seconds: u32 },
DurationMinutes { minutes: u32 },
DurationHours { hours: u32 },
// cron syntax expression https://en.wikipedia.org/wiki/Cron#Overview
Cron(String)
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[fully_pub] #[fully_pub]
struct Task { struct Task {
id: String,
name: String, name: String,
description: Option<String>,
env: HashMap<String, String>, env: HashMap<String, String>,
command: Vec<String>, command: Vec<String>,
schedule: Option<ScheduleConfig>
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
#[fully_pub] #[fully_pub]
struct ExecutorOrder { struct ExecutorOrder {
id: Uuid, id: Uuid,
task: Task, task_id: String,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[fully_pub]
struct InstanceConfig {
name: String,
logo_uri: String
} }
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[fully_pub] #[fully_pub]
struct Config { struct Config {
tasks: Vec<Task>, instance: InstanceConfig,
tasks: HashMap<String, Task>,
} }

86
src/scheduler.rs Normal file
View file

@ -0,0 +1,86 @@
use std::{sync::Arc, time::Duration};
use log::debug;
use tokio_cron_scheduler::{Job, JobScheduler, JobSchedulerError};
use tokio::sync::mpsc::Sender;
use uuid::Uuid;
use crate::{models::{ExecutorOrder, ScheduleConfig}, AppState};
fn get_repeated_job(task_id: String, executor_tx: Arc<Sender<ExecutorOrder>>, seconds: u64) -> Result<Job, JobSchedulerError> {
Job::new_repeated_async(
Duration::from_secs(seconds),
move |_uuid, _l| {
Box::pin({
let executor_tx = executor_tx.clone();
let order = ExecutorOrder {
id: Uuid::new_v4(),
task_id: task_id.clone()
};
async move {
executor_tx
.send(order)
.await.unwrap();
}
})
}
)
}
pub async fn run_scheduler(app_state: AppState) -> Result<(), JobSchedulerError> {
let mut sched = JobScheduler::new().await?;
let executor_tx = app_state.executor_tx.clone();
// register schedule for each job that need schedule
for (task_id, task) in app_state.config.tasks.clone() {
let schedule = match &task.schedule {
Some(schedule) => schedule,
None => { continue; }
};
let executor_tx = executor_tx.clone();
let task_id = task_id.clone();
debug!("Registering task {:?} schedule", &task_id);
let job = match schedule {
ScheduleConfig::Cron(expr) => {
Job::new_async(
expr.as_str(),
move |_uuid, _l| {
Box::pin({
let executor_tx = executor_tx.clone();
let task_id = task_id.clone();
async move {
executor_tx
.send(ExecutorOrder {
id: Uuid::new_v4(),
task_id
})
.await.unwrap();
}
})
}
)?
},
ScheduleConfig::DurationSeconds { seconds } =>
get_repeated_job(task_id, executor_tx, *seconds as u64)?,
ScheduleConfig::DurationMinutes { minutes } =>
get_repeated_job(task_id, executor_tx, 60*(*minutes as u64))?,
ScheduleConfig::DurationHours { hours } =>
get_repeated_job(task_id, executor_tx, 60*60*(*hours as u64))?
};
sched.add(job).await?;
}
// Add code to be run during/after shutdown
sched.set_shutdown_handler(Box::new(|| {
Box::pin(async move {
println!("Shut down done");
})
}));
// Start the scheduler
sched.start().await?;
Ok(())
}

View file

@ -1,4 +1,4 @@
<h1>List of task runs for {{ task.name }}</h1> <h1>List of task runs for "{{ task.name }}"</h1>
<ul> <ul>
{% for task_run in runs %} {% for task_run in runs %}
<li><a href="/tasks/{{ task.id }}/runs/{{ task_run.id }}">{{ task_run.id }}</a> {{ task_run.status }}</li> <li><a href="/tasks/{{ task.id }}/runs/{{ task_run.id }}">{{ task_run.id }}</a> {{ task_run.status }}</li>

View file

@ -3,11 +3,11 @@
No tasks were configured. No tasks were configured.
{% endif %} {% endif %}
<ul> <ul>
{% for task in tasks %} {% for (id, task) in tasks %}
<li> <li>
{{ task.name }} {{ task.name }}
<a href="/tasks/{{ task.id }}/trigger">Trigger task</a> <a href="/tasks/{{ id }}/trigger">Trigger task</a>
<a href="/tasks/{{ task.id }}/runs">See runs</a> <a href="/tasks/{{ id }}/runs">See runs</a>
</li> </li>
{% endfor %} {% endfor %}
</ul> </ul>

View file

@ -1 +1 @@
Task triggered! Task "{{ task.name }}" triggered!