diff --git a/server/task.go b/server/task.go index bfc6d16..4eb2aeb 100644 --- a/server/task.go +++ b/server/task.go @@ -390,24 +390,31 @@ func assignQueuedTasks(ctx context.Context) error { } _, err = rpcServer.Call(ctx, Workers[i].Conn, "task-start", taskStart, nil) - if strings.HasSuffix(err.Error(), constants.ErrTaskIsAlreadyRunning.Error()) { - // Task was started previously but something went wrong and we are out of sync - slog.WarnContext(ctx, "Task is apparently already Running on this Worker, thats bad", "task_id", taskStart.ID, "worker", Workers[i].Name) + if err != nil { + if strings.HasSuffix(err.Error(), constants.ErrTaskIsAlreadyRunning.Error()) { + // Task was started previously but something went wrong and we are out of sync + slog.WarnContext(ctx, "Task is apparently already Running on this Worker, thats bad", "task_id", taskStart.ID, "worker", Workers[i].Name) - _, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_ASSIGNED, []string{fmt.Sprintf("%v MASTER: Task Start, Task Already Running!", time.Now())}) - if err != nil { - return fmt.Errorf("Updating Task status during already running error: %w", err) - } - } else if errors.Is(err, constants.ErrRPCRequestTimeout) { - // We really don't know whats going on, might be slow response, oom, disk full or a bug - slog.WarnContext(ctx, "Task start Timed Out", "task_id", taskStart.ID, "worker", Workers[i].Name) + _, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_ASSIGNED, []string{fmt.Sprintf("%v MASTER: Task Start, Task Already Running!", time.Now())}) + if err != nil { + return fmt.Errorf("Updating Task status during already running error: %w", err) + } + } else if errors.Is(err, constants.ErrRPCRequestTimeout) { + // We really don't know whats going on, might be slow response, oom, disk full or a bug + slog.WarnContext(ctx, "Task start Timed Out", "task_id", taskStart.ID, "worker", Workers[i].Name) - _, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Task Start RPC Call Timed Out!", time.Now())}) - if err != nil { - return fmt.Errorf("Updating Unknown Task Status due to Timeout while starting Task: %w", err) + _, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Task Start RPC Call Timed Out!", time.Now())}) + if err != nil { + return fmt.Errorf("Updating Unknown Task Status due to Timeout while starting Task: %w", err) + } + } else { + slog.ErrorContext(ctx, "Task start Error", "task_id", taskStart.ID, "worker", Workers[i].Name) + + _, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Task Start Error: %v", time.Now(), err.Error())}) + if err != nil { + return fmt.Errorf("Updating Unknown Task Status due to Error while starting Task: %w", err) + } } - } else if err != nil { - return fmt.Errorf("Error Starting Task: %w", err) } err = tx.Commit(ctx)