This commit is contained in:
parent
de06175b8d
commit
7ef82b1827
3 changed files with 43 additions and 43 deletions
|
@ -436,39 +436,39 @@ func assignQueuedTasks(ctx context.Context) error {
|
|||
// Task was started previously but something went wrong and we are out of sync
|
||||
slog.WarnContext(ctx, "Task is apparently already Running on this Worker, thats bad", "task_id", taskStart.ID, "worker", Workers[i].Name)
|
||||
|
||||
_, err = tx.Exec(ctx, "UPDATE tasks SET status = $2 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_ASSIGNED)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Updating Task status during already running error: %w", err)
|
||||
_, err2 := tx.Exec(ctx, "UPDATE tasks SET status = $2 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_ASSIGNED)
|
||||
if err2 != nil {
|
||||
return fmt.Errorf("Updating Task status during already running error: %w", err2)
|
||||
}
|
||||
|
||||
_, err = tx.Exec(ctx, "INSERT INTO logs(task_id, message) VALUES($1,$2)", taskStart.ID, fmt.Sprintf("%v MASTER: Task Start, Task Already Running!", time.Now()))
|
||||
if err != nil {
|
||||
return fmt.Errorf("Updating Task log during already running error: %w", err)
|
||||
_, err2 = tx.Exec(ctx, "INSERT INTO logs(task_id, message) VALUES($1,$2)", taskStart.ID, fmt.Sprintf("%v MASTER: Task Start, Task Already Running!", time.Now()))
|
||||
if err2 != nil {
|
||||
return fmt.Errorf("Updating Task log during already running error: %w", err2)
|
||||
}
|
||||
} else if errors.Is(err, constants.ErrRPCRequestTimeout) {
|
||||
// We really don't know whats going on, might be slow response, oom, disk full or a bug
|
||||
slog.WarnContext(ctx, "Task start Timed Out", "task_id", taskStart.ID, "worker", Workers[i].Name)
|
||||
|
||||
_, err = tx.Exec(ctx, "UPDATE tasks SET status = $2 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_UNKNOWN)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Updating Unknown Task Status due to Timeout while starting Task: %w", err)
|
||||
_, err2 := tx.Exec(ctx, "UPDATE tasks SET status = $2 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_UNKNOWN)
|
||||
if err2 != nil {
|
||||
return fmt.Errorf("Updating Unknown Task Status due to Timeout while starting Task: %w", err2)
|
||||
}
|
||||
|
||||
_, err = tx.Exec(ctx, "INSERT INTO logs(task_id, message) VALUES($1,$2)", taskStart.ID, fmt.Sprintf("%v MASTER: Task Start RPC Call Timed Out!", time.Now()))
|
||||
if err != nil {
|
||||
return fmt.Errorf("Updating Unknown Task log due to Timeout while starting Task: %w", err)
|
||||
_, err2 = tx.Exec(ctx, "INSERT INTO logs(task_id, message) VALUES($1,$2)", taskStart.ID, fmt.Sprintf("%v MASTER: Task Start RPC Call Timed Out!", time.Now()))
|
||||
if err2 != nil {
|
||||
return fmt.Errorf("Updating Unknown Task log due to Timeout while starting Task: %w", err2)
|
||||
}
|
||||
} else {
|
||||
slog.ErrorContext(ctx, "Task start Error", "task_id", taskStart.ID, "worker", Workers[i].Name)
|
||||
|
||||
_, err = tx.Exec(ctx, "UPDATE tasks SET status = $2 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_UNKNOWN)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Updating Unknown Task Status due to Error while starting Task: %w", err)
|
||||
_, err2 := tx.Exec(ctx, "UPDATE tasks SET status = $2 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_UNKNOWN)
|
||||
if err2 != nil {
|
||||
return fmt.Errorf("Updating Unknown Task Status due to Error while starting Task: %w", err2)
|
||||
}
|
||||
|
||||
_, err = tx.Exec(ctx, "INSERT INTO logs(task_id, message) VALUES($1,$2)", taskStart.ID, fmt.Sprintf("%v MASTER: Task Start Error: %v", time.Now(), err.Error()))
|
||||
if err != nil {
|
||||
return fmt.Errorf("Updating Unknown Task Log due to Error while starting Task: %w", err)
|
||||
_, err2 = tx.Exec(ctx, "INSERT INTO logs(task_id, message) VALUES($1,$2)", taskStart.ID, fmt.Sprintf("%v MASTER: Task Start Error: %v", time.Now(), err.Error()))
|
||||
if err2 != nil {
|
||||
return fmt.Errorf("Updating Unknown Task Log due to Error while starting Task: %w", err2)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -214,7 +214,7 @@ func errorUpload(r *http.Request, w http.ResponseWriter, taskID int, msg string,
|
|||
http.Error(w, msg+": "+err.Error(), http.StatusInternalServerError)
|
||||
if taskID != 0 {
|
||||
_, err2 := db.Exec(context.TODO(), "INSERT INTO logs(task_id, message) VALUES($1,$2)", taskID, fmt.Sprintf("%v MASTER: upload error: "+msg+": "+err.Error(), time.Now()))
|
||||
if err != nil {
|
||||
if err2 != nil {
|
||||
slog.ErrorContext(r.Context(), "Updating task log with upload error", "err", err2)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -205,9 +205,9 @@ func updateWorkerTaskStatus(ctx context.Context) {
|
|||
_, err := rpcServer.Call(ctx, w.Conn, "task-status", tsr, &ts)
|
||||
|
||||
if err != nil {
|
||||
tx, err := db.Begin(ctx)
|
||||
if err != nil {
|
||||
slog.ErrorContext(ctx, "Task Status Error Begin Transaction", "err", err)
|
||||
tx, err2 := db.Begin(ctx)
|
||||
if err2 != nil {
|
||||
slog.ErrorContext(ctx, "Task Status Error Begin Transaction", "err", err2)
|
||||
return
|
||||
}
|
||||
defer tx.Rollback(ctx)
|
||||
|
@ -216,47 +216,47 @@ func updateWorkerTaskStatus(ctx context.Context) {
|
|||
// Worker says it does not know of this task, mark it failed so that we don't asks the worker about it again and again
|
||||
slog.ErrorContext(ctx, "Task is unknown by worker, Failing...", "err", err, "id", taskID)
|
||||
|
||||
_, err = tx.Exec(ctx, "UPDATE tasks SET status = $2 WHERE id = $1", taskID, constants.TASK_STATUS_FAILED)
|
||||
if err != nil {
|
||||
slog.ErrorContext(ctx, "Updating Failed Task Status", "err", err)
|
||||
_, err2 = tx.Exec(ctx, "UPDATE tasks SET status = $2 WHERE id = $1", taskID, constants.TASK_STATUS_FAILED)
|
||||
if err2 != nil {
|
||||
slog.ErrorContext(ctx, "Updating Failed Task Status", "err", err2)
|
||||
return
|
||||
}
|
||||
_, err = tx.Exec(ctx, "INSERT INTO logs(task_id, message) VALUES($1,$2)", taskID, fmt.Sprintf("%v MASTER: Task Failed because it is unknown to Assigned Worker", time.Now()))
|
||||
if err != nil {
|
||||
slog.ErrorContext(ctx, "Updating Failed Task Log", "err", err)
|
||||
_, err2 = tx.Exec(ctx, "INSERT INTO logs(task_id, message) VALUES($1,$2)", taskID, fmt.Sprintf("%v MASTER: Task Failed because it is unknown to Assigned Worker", time.Now()))
|
||||
if err2 != nil {
|
||||
slog.ErrorContext(ctx, "Updating Failed Task Log", "err", err2)
|
||||
return
|
||||
}
|
||||
slog.Info("Updating task done", "id", taskID, "status", constants.TASK_STATUS_FAILED)
|
||||
|
||||
} else if errors.Is(err, constants.ErrRPCRequestTimeout) {
|
||||
// We really don't know whats going on, might be slow response, oom, disk full or a bug
|
||||
_, err = tx.Exec(ctx, "UPDATE tasks SET status = $2 WHERE id = $1", taskID, constants.TASK_STATUS_UNKNOWN)
|
||||
if err != nil {
|
||||
slog.ErrorContext(ctx, "Updating Unknown Task Status due to Timeout", "err", err)
|
||||
_, err2 = tx.Exec(ctx, "UPDATE tasks SET status = $2 WHERE id = $1", taskID, constants.TASK_STATUS_UNKNOWN)
|
||||
if err2 != nil {
|
||||
slog.ErrorContext(ctx, "Updating Unknown Task Status due to Timeout", "err", err2)
|
||||
return
|
||||
}
|
||||
_, err = tx.Exec(ctx, "INSERT INTO logs(task_id, message) VALUES($1,$2)", taskID, fmt.Sprintf("%v MASTER: Status RPC Call Timed Out!", time.Now()))
|
||||
if err != nil {
|
||||
slog.ErrorContext(ctx, "Updating Unknown Task Log due to Timeout", "err", err)
|
||||
_, err2 = tx.Exec(ctx, "INSERT INTO logs(task_id, message) VALUES($1,$2)", taskID, fmt.Sprintf("%v MASTER: Status RPC Call Timed Out!", time.Now()))
|
||||
if err2 != nil {
|
||||
slog.ErrorContext(ctx, "Updating Unknown Task Log due to Timeout", "err", err2)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
slog.ErrorContext(ctx, "Getting Task Status", "err", err)
|
||||
|
||||
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2 WHERE id = $1", taskID, constants.TASK_STATUS_UNKNOWN)
|
||||
if err != nil {
|
||||
slog.ErrorContext(ctx, "Updating Unknown Task Status", "err", err)
|
||||
_, err2 = db.Exec(ctx, "UPDATE tasks SET status = $2 WHERE id = $1", taskID, constants.TASK_STATUS_UNKNOWN)
|
||||
if err2 != nil {
|
||||
slog.ErrorContext(ctx, "Updating Unknown Task Status", "err", err2)
|
||||
return
|
||||
}
|
||||
_, err = tx.Exec(ctx, "INSERT INTO logs(task_id, message) VALUES($1,$2)", taskID, fmt.Sprintf("%v MASTER: Status RPC Call Error: %v", time.Now(), err.Error()))
|
||||
if err != nil {
|
||||
slog.ErrorContext(ctx, "Updating Unknown Task Log", "err", err)
|
||||
_, err2 = tx.Exec(ctx, "INSERT INTO logs(task_id, message) VALUES($1,$2)", taskID, fmt.Sprintf("%v MASTER: Status RPC Call Error: %v", time.Now(), err.Error()))
|
||||
if err2 != nil {
|
||||
slog.ErrorContext(ctx, "Updating Unknown Task Log", "err", err2)
|
||||
return
|
||||
}
|
||||
}
|
||||
err = tx.Commit(ctx)
|
||||
if err != nil {
|
||||
slog.ErrorContext(ctx, "Task Status Error Commit Transaction", "err", err)
|
||||
err2 = tx.Commit(ctx)
|
||||
if err2 != nil {
|
||||
slog.ErrorContext(ctx, "Task Status Error Commit Transaction", "err", err2)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue