feat: use micro seconds for logs capture (BREAKING)

This commit is contained in:
Matthieu Bessat 2024-07-28 19:34:04 +02:00
parent 3e2ec661a1
commit 3384228e61
5 changed files with 12 additions and 8 deletions

View file

@ -4,8 +4,9 @@
- [x] Add CSS badge and color code on job status
- [ ] Add tasks timeout
- [ ] Add details on runtime
- [ ] Implement basic auth with OAuth2, find a minimal oauth2
- [ ] Support connecting to remote server by SSH to execute task remotely
- [ ] Implement basic auth with OAuth2
- [ ] Add a way to categorize tasks, regroup tasks
- [ ] Don't use long UUID, but only ids
- [ ] Validating config file

View file

@ -11,7 +11,8 @@ tasks:
name: Do magic incantation
environment:
PYTHONUNBUFFERED: "1"
SIMULATION_SPEED: 11
SIMULATION_SPEED: 0.4
SIMULATION_MAX_ITERATIONS: 50
command:
- /usr/bin/python3
- /path/to/autotasker/examples/do_something_1.py

View file

@ -7,7 +7,8 @@ tasks:
name: Do magic incantation
environment:
PYTHONUNBUFFERED: "1"
SIMULATION_SPEED: 11
SIMULATION_SPEED: 0.4
SIMULATION_MAX_ITERATIONS: 50
command:
- /usr/bin/python3
- /home/mbess/workspace/autotasker/examples/do_something_1.py

View file

@ -4,7 +4,7 @@ import string
from time import sleep
def main():
iterations = random.randint(10, 150)
iterations = random.randint(15, int(os.getenv("SIMULATION_MAX_ITERATIONS") or 100))
speed = float(os.getenv("SIMULATION_SPEED") or 0.8)
print(f"Going for {iterations=} with {speed=}")
for i in range(iterations):
@ -20,7 +20,7 @@ def main():
)
)
sleep(
speed * (0.02 * random.expovariate(0.5) +
(1/speed) * (0.02 * random.expovariate(0.5) +
(random.uniform(0, 1) if random.uniform(0, 1) > 0.8 else 0) +
(random.uniform(0, 5) if random.uniform(0, 1) > 0.99 else 0))
)

View file

@ -36,9 +36,10 @@ fn collect_logs<Stream>(task_run_id: Uuid, kind: LogKind, stream: Stream, db: Po
tokio::spawn(async move {
let mut lines = BufReader::with_capacity(1048, stream).lines();
let mut collected_logs: Vec<LogLine> = vec![];
// TODO: adaptive flush size
let max_lines = match &kind {
LogKind::Stdout => 20,
LogKind::Stderr => 3
LogKind::Stdout => 10,
LogKind::Stderr => 2
};
while let Some(line) = lines.next_line().await.unwrap() {
collected_logs.push(LogLine {
@ -46,7 +47,7 @@ fn collect_logs<Stream>(task_run_id: Uuid, kind: LogKind, stream: Stream, db: Po
kind: kind.clone(),
content: line,
task_run_id: task_run_id.to_string(),
captured_at: Utc::now().timestamp_millis(),
captured_at: Utc::now().timestamp_micros(),
});
if collected_logs.len() > max_lines {
insert_logs(&db, &mut collected_logs).await.expect("Error inserting logs.");