From 78ab8c9daf18e9d5a962b8c718683dbe2828533f Mon Sep 17 00:00:00 2001 From: Samuel Lorch Date: Sat, 6 Jul 2024 15:54:49 +0200 Subject: [PATCH] Handle Unkown Tasks and RPC Error way better --- constants/error.go | 1 + rpc/call.go | 3 ++- server/task.go | 20 +++++++++++++++++++- server/worker.go | 13 +++++++++++-- 4 files changed, 33 insertions(+), 4 deletions(-) diff --git a/constants/error.go b/constants/error.go index 341148c..bb3e278 100644 --- a/constants/error.go +++ b/constants/error.go @@ -4,3 +4,4 @@ import "fmt" var ErrTaskDoesNotExist = fmt.Errorf("Task does not Exist") var ErrTaskIsAlreadyRunning = fmt.Errorf("Task is Already Running") +var ErrRPCRequestTimeout = fmt.Errorf("Request timed out") diff --git a/rpc/call.go b/rpc/call.go index 71696de..037f384 100644 --- a/rpc/call.go +++ b/rpc/call.go @@ -6,6 +6,7 @@ import ( "fmt" "time" + "git.lastassault.de/speatzle/morffix/constants" "github.com/google/uuid" "nhooyr.io/websocket" ) @@ -89,7 +90,7 @@ func (s *server) Call(ctx context.Context, c *websocket.Conn, method string, par // remove request from map delete(s.requests, id) - return nil, fmt.Errorf("Request timed out") + return nil, constants.ErrRPCRequestTimeout } } diff --git a/server/task.go b/server/task.go index 4d430a1..d86d899 100644 --- a/server/task.go +++ b/server/task.go @@ -4,11 +4,13 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "log/slog" "net/http" "slices" "strconv" + "strings" "time" "git.lastassault.de/speatzle/morffix/constants" @@ -387,7 +389,23 @@ func assignQueuedTasks(ctx context.Context) error { } _, err = rpcServer.Call(ctx, Workers[i].Conn, "task-start", taskStart, nil) - if err != nil { + if strings.HasSuffix(err.Error(), constants.ErrTaskIsAlreadyRunning.Error()) { + // Task was started previously but something went wrong and we are out of sync + slog.WarnContext(ctx, "Task is apparently already Running on this Worker, thats bad", "task_id", taskStart.ID, "worker", Workers[i].Name) + + _, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_ASSIGNED, []string{fmt.Sprintf("%v MASTER: Task Start, Task Already Running!", time.Now())}) + if err != nil { + return fmt.Errorf("Updating Task status during already running error: %w", err) + } + } else if errors.Is(err, constants.ErrRPCRequestTimeout) { + // We really don't know whats going on, might be slow response, oom, disk full or a bug + slog.WarnContext(ctx, "Task start Timed Out", "task_id", taskStart.ID, "worker", Workers[i].Name) + + _, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Task Start RPC Call Timed Out!", time.Now())}) + if err != nil { + return fmt.Errorf("Updating Unknown Task Status due to Timeout while starting Task: %w", err) + } + } else if err != nil { return fmt.Errorf("Error Starting Task: %w", err) } diff --git a/server/worker.go b/server/worker.go index 2305cff..b501165 100644 --- a/server/worker.go +++ b/server/worker.go @@ -2,6 +2,8 @@ package server import ( "context" + "errors" + "fmt" "log/slog" "net/http" "strings" @@ -207,7 +209,7 @@ func updateWorkerTaskStatus(ctx context.Context) { if strings.HasSuffix(err.Error(), constants.ErrTaskDoesNotExist.Error()) { // Worker says it does not know of this task, mark it failed so that we don't asks the worker about it again and again slog.ErrorContext(ctx, "Task is unknown by worker, Failing...", "err", err, "id", taskID) - _, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_FAILED, []string{"Task Failed because it is unknown to Assigned Worker"}) + _, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_FAILED, []string{fmt.Sprintf("%v MASTER: Task Failed because it is unknown to Assigned Worker", time.Now())}) if err != nil { slog.ErrorContext(ctx, "Updating Failed Task Status", "err", err) return @@ -215,10 +217,17 @@ func updateWorkerTaskStatus(ctx context.Context) { slog.Info("Updating task done", "id", taskID, "status", constants.TASK_STATUS_FAILED) return + } else if errors.Is(err, constants.ErrRPCRequestTimeout) { + // We really don't know whats going on, might be slow response, oom, disk full or a bug + _, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Status RPC Call Timed Out!", time.Now())}) + if err != nil { + slog.ErrorContext(ctx, "Updating Unknown Task Status due to Timeout", "err", err) + return + } } else { slog.ErrorContext(ctx, "Getting Task Status", "err", err) - _, err = db.Exec(ctx, "UPDATE tasks SET status = $2 WHERE id = $1", taskID, constants.TASK_STATUS_UNKNOWN) + _, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Status RPC Call Error: %v", time.Now(), err.Error())}) if err != nil { slog.ErrorContext(ctx, "Updating Unknown Task Status", "err", err) return