Move logs from array to dedicated table
All checks were successful
/ release (push) Successful in 1m23s

This avoids the append problem which bloats the database to 200+ gb in a
month and shrinks again to 5g with a full vacuum
This commit is contained in:
Samuel Lorch 2025-05-18 19:18:52 +02:00
parent 15a960da19
commit f52f517dc1
9 changed files with 99 additions and 11 deletions

View file

@ -0,0 +1 @@
DROP TABLE IF EXISTS logs;

View file

@ -0,0 +1,5 @@
CREATE TABLE IF NOT EXISTS logs(
id bigserial PRIMARY KEY,
task_id bigint REFERENCES tasks(id) NOT NULL,
message text
);

View file

@ -0,0 +1 @@
DROP INDEX logs_task_id;

View file

@ -0,0 +1 @@
CREATE INDEX logs_task_id on logs(task_id);

View file

@ -0,0 +1 @@
/* migration is irreversable */

View file

@ -0,0 +1,3 @@
INSERT INTO logs (task_id, message)
SELECT id as task_id, l as message
FROM tasks, unnest(log) l ORDER BY task_id ASC;

View file

@ -230,13 +230,28 @@ func handleTask(w http.ResponseWriter, r *http.Request) {
var typ constants.TaskType var typ constants.TaskType
var status constants.TaskStatus var status constants.TaskStatus
t := TaskDisplay{} t := TaskDisplay{}
err := db.QueryRow(r.Context(), "SELECT t.id, l.name, w.name, t.type, t.status, f.path, t.log, t.updated_at FROM tasks t LEFT JOIN workers w ON w.id = t.worker_id INNER JOIN files f ON f.id = t.file_id INNER JOIN libraries l ON l.id = f.library_id WHERE t.id = $1", id).Scan(&t.ID, &t.Library, &t.Worker, &typ, &status, &t.Filename, &t.Log, &t.UpdatedAt) err := db.QueryRow(r.Context(), "SELECT t.id, l.name, w.name, t.type, t.status, f.path, t.updated_at FROM tasks t LEFT JOIN workers w ON w.id = t.worker_id INNER JOIN files f ON f.id = t.file_id INNER JOIN libraries l ON l.id = f.library_id WHERE t.id = $1", id).Scan(&t.ID, &t.Library, &t.Worker, &typ, &status, &t.Filename, &t.UpdatedAt)
if err != nil { if err != nil {
slog.ErrorContext(r.Context(), "Query Tasks", "err", err) slog.ErrorContext(r.Context(), "Query Tasks", "err", err)
http.Error(w, "Error Query Tasks: "+err.Error(), http.StatusInternalServerError) http.Error(w, "Error Query Tasks: "+err.Error(), http.StatusInternalServerError)
return return
} }
rows, err := db.Query(r.Context(), "SELECT message FROM logs WHERE task_id = $1 ORDER BY task_ID ASC", t.ID)
if err != nil {
slog.ErrorContext(r.Context(), "Query Task Logs", "err", err)
http.Error(w, "Error Query Task Logs: "+err.Error(), http.StatusInternalServerError)
return
}
messages, err := pgx.CollectRows[string](rows, pgx.RowTo[string])
if err != nil {
slog.ErrorContext(r.Context(), "Collect Task Logs", "err", err)
http.Error(w, "Error Collect Task Logs: "+err.Error(), http.StatusInternalServerError)
return
}
t.Log = messages
t.Type = typ.String() t.Type = typ.String()
t.Status = status.String() t.Status = status.String()
buf := bytes.Buffer{} buf := bytes.Buffer{}
@ -421,25 +436,40 @@ func assignQueuedTasks(ctx context.Context) error {
// Task was started previously but something went wrong and we are out of sync // Task was started previously but something went wrong and we are out of sync
slog.WarnContext(ctx, "Task is apparently already Running on this Worker, thats bad", "task_id", taskStart.ID, "worker", Workers[i].Name) slog.WarnContext(ctx, "Task is apparently already Running on this Worker, thats bad", "task_id", taskStart.ID, "worker", Workers[i].Name)
_, err = tx.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_ASSIGNED, []string{fmt.Sprintf("%v MASTER: Task Start, Task Already Running!", time.Now())}) _, err = tx.Exec(ctx, "UPDATE tasks SET status = $2 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_ASSIGNED)
if err != nil { if err != nil {
return fmt.Errorf("Updating Task status during already running error: %w", err) return fmt.Errorf("Updating Task status during already running error: %w", err)
} }
_, err = tx.Exec(ctx, "INSERT INTO logs(task_id, message) VALUES($1,$2)", taskStart.ID, fmt.Sprintf("%v MASTER: Task Start, Task Already Running!", time.Now()))
if err != nil {
return fmt.Errorf("Updating Task log during already running error: %w", err)
}
} else if errors.Is(err, constants.ErrRPCRequestTimeout) { } else if errors.Is(err, constants.ErrRPCRequestTimeout) {
// We really don't know whats going on, might be slow response, oom, disk full or a bug // We really don't know whats going on, might be slow response, oom, disk full or a bug
slog.WarnContext(ctx, "Task start Timed Out", "task_id", taskStart.ID, "worker", Workers[i].Name) slog.WarnContext(ctx, "Task start Timed Out", "task_id", taskStart.ID, "worker", Workers[i].Name)
_, err = tx.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Task Start RPC Call Timed Out!", time.Now())}) _, err = tx.Exec(ctx, "UPDATE tasks SET status = $2 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_UNKNOWN)
if err != nil { if err != nil {
return fmt.Errorf("Updating Unknown Task Status due to Timeout while starting Task: %w", err) return fmt.Errorf("Updating Unknown Task Status due to Timeout while starting Task: %w", err)
} }
_, err = tx.Exec(ctx, "INSERT INTO logs(task_id, message) VALUES($1,$2)", taskStart.ID, fmt.Sprintf("%v MASTER: Task Start RPC Call Timed Out!", time.Now()))
if err != nil {
return fmt.Errorf("Updating Unknown Task log due to Timeout while starting Task: %w", err)
}
} else { } else {
slog.ErrorContext(ctx, "Task start Error", "task_id", taskStart.ID, "worker", Workers[i].Name) slog.ErrorContext(ctx, "Task start Error", "task_id", taskStart.ID, "worker", Workers[i].Name)
_, err = tx.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Task Start Error: %v", time.Now(), err.Error())}) _, err = tx.Exec(ctx, "UPDATE tasks SET status = $2 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_UNKNOWN)
if err != nil { if err != nil {
return fmt.Errorf("Updating Unknown Task Status due to Error while starting Task: %w", err) return fmt.Errorf("Updating Unknown Task Status due to Error while starting Task: %w", err)
} }
_, err = tx.Exec(ctx, "INSERT INTO logs(task_id, message) VALUES($1,$2)", taskStart.ID, fmt.Sprintf("%v MASTER: Task Start Error: %v", time.Now(), err.Error()))
if err != nil {
return fmt.Errorf("Updating Unknown Task Log due to Error while starting Task: %w", err)
}
} }
} }

View file

@ -213,7 +213,7 @@ func errorUpload(r *http.Request, w http.ResponseWriter, taskID int, msg string,
slog.ErrorContext(r.Context(), msg, "err", err) slog.ErrorContext(r.Context(), msg, "err", err)
http.Error(w, msg+": "+err.Error(), http.StatusInternalServerError) http.Error(w, msg+": "+err.Error(), http.StatusInternalServerError)
if taskID != 0 { if taskID != 0 {
_, err2 := db.Exec(context.TODO(), "UPDATE tasks SET log = log || $2 WHERE id = $1", taskID, []string{fmt.Sprintf("%v MASTER: upload error: "+msg+": "+err.Error(), time.Now())}) _, err2 := db.Exec(context.TODO(), "INSERT INTO logs(task_id, message) VALUES($1,$2)", taskID, fmt.Sprintf("%v MASTER: upload error: "+msg+": "+err.Error(), time.Now()))
if err != nil { if err != nil {
slog.ErrorContext(r.Context(), "Updating task log with upload error", "err", err2) slog.ErrorContext(r.Context(), "Updating task log with upload error", "err", err2)
} }

View file

@ -205,40 +205,86 @@ func updateWorkerTaskStatus(ctx context.Context) {
_, err := rpcServer.Call(ctx, w.Conn, "task-status", tsr, &ts) _, err := rpcServer.Call(ctx, w.Conn, "task-status", tsr, &ts)
if err != nil { if err != nil {
tx, err := db.Begin(ctx)
if err != nil {
slog.ErrorContext(ctx, "Task Status Error Begin Transaction", "err", err)
return
}
defer tx.Rollback(ctx)
// Find better way to compare errors which where send via websocket // Find better way to compare errors which where send via websocket
if strings.HasSuffix(err.Error(), constants.ErrTaskDoesNotExist.Error()) { if strings.HasSuffix(err.Error(), constants.ErrTaskDoesNotExist.Error()) {
// Worker says it does not know of this task, mark it failed so that we don't asks the worker about it again and again // Worker says it does not know of this task, mark it failed so that we don't asks the worker about it again and again
slog.ErrorContext(ctx, "Task is unknown by worker, Failing...", "err", err, "id", taskID) slog.ErrorContext(ctx, "Task is unknown by worker, Failing...", "err", err, "id", taskID)
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_FAILED, []string{fmt.Sprintf("%v MASTER: Task Failed because it is unknown to Assigned Worker", time.Now())})
_, err = tx.Exec(ctx, "UPDATE tasks SET status = $2 WHERE id = $1", taskID, constants.TASK_STATUS_FAILED)
if err != nil { if err != nil {
slog.ErrorContext(ctx, "Updating Failed Task Status", "err", err) slog.ErrorContext(ctx, "Updating Failed Task Status", "err", err)
return return
} }
_, err = tx.Exec(ctx, "INSERT INTO logs(task_id, message) VALUES($1,$2)", taskID, fmt.Sprintf("%v MASTER: Task Failed because it is unknown to Assigned Worker", time.Now()))
if err != nil {
slog.ErrorContext(ctx, "Updating Failed Task Log", "err", err)
return
}
slog.Info("Updating task done", "id", taskID, "status", constants.TASK_STATUS_FAILED) slog.Info("Updating task done", "id", taskID, "status", constants.TASK_STATUS_FAILED)
return
} else if errors.Is(err, constants.ErrRPCRequestTimeout) { } else if errors.Is(err, constants.ErrRPCRequestTimeout) {
// We really don't know whats going on, might be slow response, oom, disk full or a bug // We really don't know whats going on, might be slow response, oom, disk full or a bug
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Status RPC Call Timed Out!", time.Now())}) _, err = tx.Exec(ctx, "UPDATE tasks SET status = $2 WHERE id = $1", taskID, constants.TASK_STATUS_UNKNOWN)
if err != nil { if err != nil {
slog.ErrorContext(ctx, "Updating Unknown Task Status due to Timeout", "err", err) slog.ErrorContext(ctx, "Updating Unknown Task Status due to Timeout", "err", err)
return return
} }
_, err = tx.Exec(ctx, "INSERT INTO logs(task_id, message) VALUES($1,$2)", taskID, fmt.Sprintf("%v MASTER: Status RPC Call Timed Out!", time.Now()))
if err != nil {
slog.ErrorContext(ctx, "Updating Unknown Task Log due to Timeout", "err", err)
return
}
} else { } else {
slog.ErrorContext(ctx, "Getting Task Status", "err", err) slog.ErrorContext(ctx, "Getting Task Status", "err", err)
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Status RPC Call Error: %v", time.Now(), err.Error())}) _, err = db.Exec(ctx, "UPDATE tasks SET status = $2 WHERE id = $1", taskID, constants.TASK_STATUS_UNKNOWN)
if err != nil { if err != nil {
slog.ErrorContext(ctx, "Updating Unknown Task Status", "err", err) slog.ErrorContext(ctx, "Updating Unknown Task Status", "err", err)
return return
} }
_, err = tx.Exec(ctx, "INSERT INTO logs(task_id, message) VALUES($1,$2)", taskID, fmt.Sprintf("%v MASTER: Status RPC Call Error: %v", time.Now(), err.Error()))
if err != nil {
slog.ErrorContext(ctx, "Updating Unknown Task Log", "err", err)
return
}
}
err = tx.Commit(ctx)
if err != nil {
slog.ErrorContext(ctx, "Task Status Error Commit Transaction", "err", err)
}
return
}
tx, err := db.Begin(ctx)
if err != nil {
slog.ErrorContext(ctx, "Task Status Success Begin Transaction", "err", err)
return
}
_, err = tx.Exec(ctx, "UPDATE tasks SET status = $2, log_offset = log_offset + $3 WHERE id = $1", taskID, ts.Task.Status, len(ts.Task.Log))
if err != nil {
slog.ErrorContext(ctx, "Updating Task Status and offset", "err", err)
return
}
// TODO batch these or use copy protocol
for _, l := range ts.Task.Log {
_, err = tx.Exec(ctx, "INSERT INTO logs(task_id, message) VALUES($1,$2)", taskID, l)
if err != nil {
slog.ErrorContext(ctx, "Updating Task Log", "err", err)
return return
} }
} }
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3, log_offset = log_offset + $4 WHERE id = $1", taskID, ts.Task.Status, ts.Task.Log, len(ts.Task.Log)) err = tx.Commit(ctx)
if err != nil { if err != nil {
slog.ErrorContext(ctx, "Updating Task Status", "err", err) slog.ErrorContext(ctx, "Task Status Commit Transaction", "err", err)
return return
} }