diff --git a/migrations/000022_create_logs_table.down.sql b/migrations/000022_create_logs_table.down.sql new file mode 100644 index 0000000..7bd492b --- /dev/null +++ b/migrations/000022_create_logs_table.down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS logs; diff --git a/migrations/000022_create_logs_table.up.sql b/migrations/000022_create_logs_table.up.sql new file mode 100644 index 0000000..3c595a5 --- /dev/null +++ b/migrations/000022_create_logs_table.up.sql @@ -0,0 +1,5 @@ +CREATE TABLE IF NOT EXISTS logs( + id bigserial PRIMARY KEY, + task_id bigint REFERENCES tasks(id) NOT NULL, + message text +); diff --git a/migrations/000023_create_index_logs_table_task_id.down.sql b/migrations/000023_create_index_logs_table_task_id.down.sql new file mode 100644 index 0000000..9bff470 --- /dev/null +++ b/migrations/000023_create_index_logs_table_task_id.down.sql @@ -0,0 +1 @@ +DROP INDEX logs_task_id; diff --git a/migrations/000023_create_index_logs_table_task_id.up.sql b/migrations/000023_create_index_logs_table_task_id.up.sql new file mode 100644 index 0000000..cbdd60f --- /dev/null +++ b/migrations/000023_create_index_logs_table_task_id.up.sql @@ -0,0 +1 @@ +CREATE INDEX logs_task_id on logs(task_id); diff --git a/migrations/000024_migrate_logs.down.sql b/migrations/000024_migrate_logs.down.sql new file mode 100644 index 0000000..1314245 --- /dev/null +++ b/migrations/000024_migrate_logs.down.sql @@ -0,0 +1 @@ +/* migration is irreversable */ diff --git a/migrations/000024_migrate_logs.up.sql b/migrations/000024_migrate_logs.up.sql new file mode 100644 index 0000000..59bd750 --- /dev/null +++ b/migrations/000024_migrate_logs.up.sql @@ -0,0 +1,3 @@ +INSERT INTO logs (task_id, message) +SELECT id as task_id, l as message +FROM tasks, unnest(log) l ORDER BY task_id ASC; diff --git a/server/task.go b/server/task.go index c9ec63c..9b1a54f 100644 --- a/server/task.go +++ b/server/task.go @@ -230,13 +230,28 @@ func handleTask(w http.ResponseWriter, r *http.Request) { var typ constants.TaskType var status constants.TaskStatus t := TaskDisplay{} - err := db.QueryRow(r.Context(), "SELECT t.id, l.name, w.name, t.type, t.status, f.path, t.log, t.updated_at FROM tasks t LEFT JOIN workers w ON w.id = t.worker_id INNER JOIN files f ON f.id = t.file_id INNER JOIN libraries l ON l.id = f.library_id WHERE t.id = $1", id).Scan(&t.ID, &t.Library, &t.Worker, &typ, &status, &t.Filename, &t.Log, &t.UpdatedAt) + err := db.QueryRow(r.Context(), "SELECT t.id, l.name, w.name, t.type, t.status, f.path, t.updated_at FROM tasks t LEFT JOIN workers w ON w.id = t.worker_id INNER JOIN files f ON f.id = t.file_id INNER JOIN libraries l ON l.id = f.library_id WHERE t.id = $1", id).Scan(&t.ID, &t.Library, &t.Worker, &typ, &status, &t.Filename, &t.UpdatedAt) if err != nil { slog.ErrorContext(r.Context(), "Query Tasks", "err", err) http.Error(w, "Error Query Tasks: "+err.Error(), http.StatusInternalServerError) return } + rows, err := db.Query(r.Context(), "SELECT message FROM logs WHERE task_id = $1 ORDER BY task_ID ASC", t.ID) + if err != nil { + slog.ErrorContext(r.Context(), "Query Task Logs", "err", err) + http.Error(w, "Error Query Task Logs: "+err.Error(), http.StatusInternalServerError) + return + } + + messages, err := pgx.CollectRows[string](rows, pgx.RowTo[string]) + if err != nil { + slog.ErrorContext(r.Context(), "Collect Task Logs", "err", err) + http.Error(w, "Error Collect Task Logs: "+err.Error(), http.StatusInternalServerError) + return + } + t.Log = messages + t.Type = typ.String() t.Status = status.String() buf := bytes.Buffer{} @@ -421,25 +436,40 @@ func assignQueuedTasks(ctx context.Context) error { // Task was started previously but something went wrong and we are out of sync slog.WarnContext(ctx, "Task is apparently already Running on this Worker, thats bad", "task_id", taskStart.ID, "worker", Workers[i].Name) - _, err = tx.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_ASSIGNED, []string{fmt.Sprintf("%v MASTER: Task Start, Task Already Running!", time.Now())}) + _, err = tx.Exec(ctx, "UPDATE tasks SET status = $2 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_ASSIGNED) if err != nil { return fmt.Errorf("Updating Task status during already running error: %w", err) } + + _, err = tx.Exec(ctx, "INSERT INTO logs(task_id, message) VALUES($1,$2)", taskStart.ID, fmt.Sprintf("%v MASTER: Task Start, Task Already Running!", time.Now())) + if err != nil { + return fmt.Errorf("Updating Task log during already running error: %w", err) + } } else if errors.Is(err, constants.ErrRPCRequestTimeout) { // We really don't know whats going on, might be slow response, oom, disk full or a bug slog.WarnContext(ctx, "Task start Timed Out", "task_id", taskStart.ID, "worker", Workers[i].Name) - _, err = tx.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Task Start RPC Call Timed Out!", time.Now())}) + _, err = tx.Exec(ctx, "UPDATE tasks SET status = $2 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_UNKNOWN) if err != nil { return fmt.Errorf("Updating Unknown Task Status due to Timeout while starting Task: %w", err) } + + _, err = tx.Exec(ctx, "INSERT INTO logs(task_id, message) VALUES($1,$2)", taskStart.ID, fmt.Sprintf("%v MASTER: Task Start RPC Call Timed Out!", time.Now())) + if err != nil { + return fmt.Errorf("Updating Unknown Task log due to Timeout while starting Task: %w", err) + } } else { slog.ErrorContext(ctx, "Task start Error", "task_id", taskStart.ID, "worker", Workers[i].Name) - _, err = tx.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Task Start Error: %v", time.Now(), err.Error())}) + _, err = tx.Exec(ctx, "UPDATE tasks SET status = $2 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_UNKNOWN) if err != nil { return fmt.Errorf("Updating Unknown Task Status due to Error while starting Task: %w", err) } + + _, err = tx.Exec(ctx, "INSERT INTO logs(task_id, message) VALUES($1,$2)", taskStart.ID, fmt.Sprintf("%v MASTER: Task Start Error: %v", time.Now(), err.Error())) + if err != nil { + return fmt.Errorf("Updating Unknown Task Log due to Error while starting Task: %w", err) + } } } diff --git a/server/upload.go b/server/upload.go index f811edc..727f5a1 100644 --- a/server/upload.go +++ b/server/upload.go @@ -213,7 +213,7 @@ func errorUpload(r *http.Request, w http.ResponseWriter, taskID int, msg string, slog.ErrorContext(r.Context(), msg, "err", err) http.Error(w, msg+": "+err.Error(), http.StatusInternalServerError) if taskID != 0 { - _, err2 := db.Exec(context.TODO(), "UPDATE tasks SET log = log || $2 WHERE id = $1", taskID, []string{fmt.Sprintf("%v MASTER: upload error: "+msg+": "+err.Error(), time.Now())}) + _, err2 := db.Exec(context.TODO(), "INSERT INTO logs(task_id, message) VALUES($1,$2)", taskID, fmt.Sprintf("%v MASTER: upload error: "+msg+": "+err.Error(), time.Now())) if err != nil { slog.ErrorContext(r.Context(), "Updating task log with upload error", "err", err2) } diff --git a/server/worker.go b/server/worker.go index 293594e..171d9cc 100644 --- a/server/worker.go +++ b/server/worker.go @@ -205,40 +205,86 @@ func updateWorkerTaskStatus(ctx context.Context) { _, err := rpcServer.Call(ctx, w.Conn, "task-status", tsr, &ts) if err != nil { + tx, err := db.Begin(ctx) + if err != nil { + slog.ErrorContext(ctx, "Task Status Error Begin Transaction", "err", err) + return + } + defer tx.Rollback(ctx) // Find better way to compare errors which where send via websocket if strings.HasSuffix(err.Error(), constants.ErrTaskDoesNotExist.Error()) { // Worker says it does not know of this task, mark it failed so that we don't asks the worker about it again and again slog.ErrorContext(ctx, "Task is unknown by worker, Failing...", "err", err, "id", taskID) - _, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_FAILED, []string{fmt.Sprintf("%v MASTER: Task Failed because it is unknown to Assigned Worker", time.Now())}) + + _, err = tx.Exec(ctx, "UPDATE tasks SET status = $2 WHERE id = $1", taskID, constants.TASK_STATUS_FAILED) if err != nil { slog.ErrorContext(ctx, "Updating Failed Task Status", "err", err) return } + _, err = tx.Exec(ctx, "INSERT INTO logs(task_id, message) VALUES($1,$2)", taskID, fmt.Sprintf("%v MASTER: Task Failed because it is unknown to Assigned Worker", time.Now())) + if err != nil { + slog.ErrorContext(ctx, "Updating Failed Task Log", "err", err) + return + } slog.Info("Updating task done", "id", taskID, "status", constants.TASK_STATUS_FAILED) - return } else if errors.Is(err, constants.ErrRPCRequestTimeout) { // We really don't know whats going on, might be slow response, oom, disk full or a bug - _, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Status RPC Call Timed Out!", time.Now())}) + _, err = tx.Exec(ctx, "UPDATE tasks SET status = $2 WHERE id = $1", taskID, constants.TASK_STATUS_UNKNOWN) if err != nil { slog.ErrorContext(ctx, "Updating Unknown Task Status due to Timeout", "err", err) return } + _, err = tx.Exec(ctx, "INSERT INTO logs(task_id, message) VALUES($1,$2)", taskID, fmt.Sprintf("%v MASTER: Status RPC Call Timed Out!", time.Now())) + if err != nil { + slog.ErrorContext(ctx, "Updating Unknown Task Log due to Timeout", "err", err) + return + } } else { slog.ErrorContext(ctx, "Getting Task Status", "err", err) - _, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Status RPC Call Error: %v", time.Now(), err.Error())}) + _, err = db.Exec(ctx, "UPDATE tasks SET status = $2 WHERE id = $1", taskID, constants.TASK_STATUS_UNKNOWN) if err != nil { slog.ErrorContext(ctx, "Updating Unknown Task Status", "err", err) return } + _, err = tx.Exec(ctx, "INSERT INTO logs(task_id, message) VALUES($1,$2)", taskID, fmt.Sprintf("%v MASTER: Status RPC Call Error: %v", time.Now(), err.Error())) + if err != nil { + slog.ErrorContext(ctx, "Updating Unknown Task Log", "err", err) + return + } + } + err = tx.Commit(ctx) + if err != nil { + slog.ErrorContext(ctx, "Task Status Error Commit Transaction", "err", err) + } + return + } + + tx, err := db.Begin(ctx) + if err != nil { + slog.ErrorContext(ctx, "Task Status Success Begin Transaction", "err", err) + return + } + + _, err = tx.Exec(ctx, "UPDATE tasks SET status = $2, log_offset = log_offset + $3 WHERE id = $1", taskID, ts.Task.Status, len(ts.Task.Log)) + if err != nil { + slog.ErrorContext(ctx, "Updating Task Status and offset", "err", err) + return + } + + // TODO batch these or use copy protocol + for _, l := range ts.Task.Log { + _, err = tx.Exec(ctx, "INSERT INTO logs(task_id, message) VALUES($1,$2)", taskID, l) + if err != nil { + slog.ErrorContext(ctx, "Updating Task Log", "err", err) return } } - _, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3, log_offset = log_offset + $4 WHERE id = $1", taskID, ts.Task.Status, ts.Task.Log, len(ts.Task.Log)) + err = tx.Commit(ctx) if err != nil { - slog.ErrorContext(ctx, "Updating Task Status", "err", err) + slog.ErrorContext(ctx, "Task Status Commit Transaction", "err", err) return }