save log offeset so that the master can also log without dropping worker log lines
All checks were successful
/ release (push) Successful in 1m4s
All checks were successful
/ release (push) Successful in 1m4s
This commit is contained in:
parent
78ab8c9daf
commit
24e11d9103
3 changed files with 6 additions and 2 deletions
2
migrations/000015_alter_tasks_table_log_offset.down.sql
Normal file
2
migrations/000015_alter_tasks_table_log_offset.down.sql
Normal file
|
@ -0,0 +1,2 @@
|
||||||
|
ALTER TABLE tasks
|
||||||
|
DROP IF EXISTS log_offset;
|
2
migrations/000015_alter_tasks_table_log_offset.up.sql
Normal file
2
migrations/000015_alter_tasks_table_log_offset.up.sql
Normal file
|
@ -0,0 +1,2 @@
|
||||||
|
ALTER TABLE tasks
|
||||||
|
ADD log_offset integer NOT NULL DEFAULT COALESCE(CARDINALITY(log),0);
|
|
@ -182,7 +182,7 @@ func updateWorkerTaskStatus(ctx context.Context) {
|
||||||
if Workers[uuid].Connected {
|
if Workers[uuid].Connected {
|
||||||
w := Workers[uuid]
|
w := Workers[uuid]
|
||||||
|
|
||||||
rows, err := db.Query(ctx, "SELECT id, COALESCE(CARDINALITY(log),0) as log_offset FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4 OR status = $5)", uuid, constants.TASK_STATUS_UNKNOWN, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING)
|
rows, err := db.Query(ctx, "SELECT id, log_offset FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4 OR status = $5)", uuid, constants.TASK_STATUS_UNKNOWN, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.ErrorContext(ctx, "Error Getting Tasks for Worker", "err", err, "worker_id", uuid)
|
slog.ErrorContext(ctx, "Error Getting Tasks for Worker", "err", err, "worker_id", uuid)
|
||||||
return
|
return
|
||||||
|
@ -236,7 +236,7 @@ func updateWorkerTaskStatus(ctx context.Context) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, ts.Task.Status, ts.Task.Log)
|
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3, log_offset = log_offset + $4 WHERE id = $1", taskID, ts.Task.Status, ts.Task.Log, len(ts.Task.Log))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.ErrorContext(ctx, "Updating Task Status", "err", err)
|
slog.ErrorContext(ctx, "Updating Task Status", "err", err)
|
||||||
return
|
return
|
||||||
|
|
Loading…
Add table
Reference in a new issue