Handle Unkown Tasks and RPC Error way better

This commit is contained in:
Samuel Lorch 2024-07-06 15:54:49 +02:00
parent 7574628a1d
commit 78ab8c9daf
4 changed files with 33 additions and 4 deletions

View file

@ -4,3 +4,4 @@ import "fmt"
var ErrTaskDoesNotExist = fmt.Errorf("Task does not Exist")
var ErrTaskIsAlreadyRunning = fmt.Errorf("Task is Already Running")
var ErrRPCRequestTimeout = fmt.Errorf("Request timed out")

View file

@ -6,6 +6,7 @@ import (
"fmt"
"time"
"git.lastassault.de/speatzle/morffix/constants"
"github.com/google/uuid"
"nhooyr.io/websocket"
)
@ -89,7 +90,7 @@ func (s *server) Call(ctx context.Context, c *websocket.Conn, method string, par
// remove request from map
delete(s.requests, id)
return nil, fmt.Errorf("Request timed out")
return nil, constants.ErrRPCRequestTimeout
}
}

View file

@ -4,11 +4,13 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"net/http"
"slices"
"strconv"
"strings"
"time"
"git.lastassault.de/speatzle/morffix/constants"
@ -387,7 +389,23 @@ func assignQueuedTasks(ctx context.Context) error {
}
_, err = rpcServer.Call(ctx, Workers[i].Conn, "task-start", taskStart, nil)
if err != nil {
if strings.HasSuffix(err.Error(), constants.ErrTaskIsAlreadyRunning.Error()) {
// Task was started previously but something went wrong and we are out of sync
slog.WarnContext(ctx, "Task is apparently already Running on this Worker, thats bad", "task_id", taskStart.ID, "worker", Workers[i].Name)
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_ASSIGNED, []string{fmt.Sprintf("%v MASTER: Task Start, Task Already Running!", time.Now())})
if err != nil {
return fmt.Errorf("Updating Task status during already running error: %w", err)
}
} else if errors.Is(err, constants.ErrRPCRequestTimeout) {
// We really don't know whats going on, might be slow response, oom, disk full or a bug
slog.WarnContext(ctx, "Task start Timed Out", "task_id", taskStart.ID, "worker", Workers[i].Name)
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Task Start RPC Call Timed Out!", time.Now())})
if err != nil {
return fmt.Errorf("Updating Unknown Task Status due to Timeout while starting Task: %w", err)
}
} else if err != nil {
return fmt.Errorf("Error Starting Task: %w", err)
}

View file

@ -2,6 +2,8 @@ package server
import (
"context"
"errors"
"fmt"
"log/slog"
"net/http"
"strings"
@ -207,7 +209,7 @@ func updateWorkerTaskStatus(ctx context.Context) {
if strings.HasSuffix(err.Error(), constants.ErrTaskDoesNotExist.Error()) {
// Worker says it does not know of this task, mark it failed so that we don't asks the worker about it again and again
slog.ErrorContext(ctx, "Task is unknown by worker, Failing...", "err", err, "id", taskID)
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_FAILED, []string{"Task Failed because it is unknown to Assigned Worker"})
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_FAILED, []string{fmt.Sprintf("%v MASTER: Task Failed because it is unknown to Assigned Worker", time.Now())})
if err != nil {
slog.ErrorContext(ctx, "Updating Failed Task Status", "err", err)
return
@ -215,10 +217,17 @@ func updateWorkerTaskStatus(ctx context.Context) {
slog.Info("Updating task done", "id", taskID, "status", constants.TASK_STATUS_FAILED)
return
} else if errors.Is(err, constants.ErrRPCRequestTimeout) {
// We really don't know whats going on, might be slow response, oom, disk full or a bug
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Status RPC Call Timed Out!", time.Now())})
if err != nil {
slog.ErrorContext(ctx, "Updating Unknown Task Status due to Timeout", "err", err)
return
}
} else {
slog.ErrorContext(ctx, "Getting Task Status", "err", err)
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2 WHERE id = $1", taskID, constants.TASK_STATUS_UNKNOWN)
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Status RPC Call Error: %v", time.Now(), err.Error())})
if err != nil {
slog.ErrorContext(ctx, "Updating Unknown Task Status", "err", err)
return