From 12e83aa8d22184af7a8fac7f8d9589af9bd8752b Mon Sep 17 00:00:00 2001 From: speatzle Date: Thu, 9 May 2024 19:09:48 +0200 Subject: [PATCH] Ask For Task Status Seperatly and logs incrementally --- server/worker.go | 55 ++++++++++++++++++++++++++++++------------------ task/task.go | 20 ++++++++++++------ types/task.go | 19 +++++++++++------ worker/task.go | 13 +++++++++++- 4 files changed, 73 insertions(+), 34 deletions(-) diff --git a/server/worker.go b/server/worker.go index bcbf4a0..17a44e4 100644 --- a/server/worker.go +++ b/server/worker.go @@ -9,6 +9,7 @@ import ( "git.lastassault.de/speatzle/morffix/rpc" "git.lastassault.de/speatzle/morffix/types" + "github.com/jackc/pgx/v5" "git.lastassault.de/speatzle/morffix/constants" @@ -173,41 +174,53 @@ func updateWorkerTaskStatus(ctx context.Context) { for uuid := range Workers { if Workers[uuid].Connected { w := Workers[uuid] - wg.Add(1) - go func() { - defer wg.Done() - var status types.TaskStatus - _, err := rpcServer.Call(ctx, w.Conn, "task-status", nil, &status) - if err != nil { - slog.ErrorContext(ctx, "Error Getting Task Status", "err", err) - // TODO Mark Worker / Tasks as Unknown? - return - } + rows, err := db.Query(ctx, "SELECT id, COALESCE(CARDINALITY(log),0) as log_offset FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4)", uuid, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING) + if err != nil { + slog.ErrorContext(ctx, "Error Getting Tasks for Worker", "err", err, "worker_id", uuid) + return + } - for _, t := range status.Tasks { - // TODO check if this Task was even assigned to this Worker + taskStatusRequests, err := pgx.CollectRows[types.TaskStatusRequest](rows, pgx.RowToStructByName[types.TaskStatusRequest]) + if err != nil { + slog.ErrorContext(ctx, "Collect Task Status Rows", "err", err) + return + } - _, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = $3 WHERE id = $1", t.ID, t.Status, t.Log) + for _, tsr := range taskStatusRequests { + wg.Add(1) + + go func() { + defer wg.Done() + var ts types.TaskStatus + _, err := rpcServer.Call(ctx, w.Conn, "task-status", tsr, &ts) + if err != nil { + slog.ErrorContext(ctx, "Error Getting Task Status", "err", err) + // TODO Mark Task as Unknown? + return + } + + _, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", ts.Task.ID, ts.Task.Status, ts.Task.Log) if err != nil { slog.ErrorContext(ctx, "Error Updating Task Status", "err", err) - continue + return } // Tell Worker to Delete Finished Tasks - if t.Status == constants.TASK_STATUS_FAILED || t.Status == constants.TASK_STATUS_SUCCESS { - _, err := rpcServer.Call(ctx, w.Conn, "task-delete", t.ID, nil) + if ts.Task.Status == constants.TASK_STATUS_FAILED || ts.Task.Status == constants.TASK_STATUS_SUCCESS { + _, err := rpcServer.Call(ctx, w.Conn, "task-delete", ts.Task.ID, nil) if err != nil { slog.ErrorContext(ctx, "Error Deleting Finished Task From Worker", "err", err) - continue + return } } - } - // TODO Set Task Status to QUEUED For Unfinished Tasks which where assigned to this worker but where not in the status response - }() + }() + } + // TODO Handle tasks with status unkown assigned to this worker + } else { - // Set Task Status to Unknown for Unfinished Tasks which where assigned to this not connected worker + // Set Task Status to Unknown for Unfinished Tasks which where assigned to this not connected worker, right now they just get stuck } } }() diff --git a/task/task.go b/task/task.go index c0212d1..56416d0 100644 --- a/task/task.go +++ b/task/task.go @@ -25,7 +25,7 @@ func StartTask(conf config.Config, data types.TaskStart) error { switch data.Type { case constants.TASK_TYPE_HEALTHCHECK: - var hData HealthCheckData + var hData types.HealthCheckData err := json.Unmarshal(data.Data, &hData) if err != nil { return fmt.Errorf("Unmarshal Healthcheck Data: %w", err) @@ -49,15 +49,23 @@ func StartTask(conf config.Config, data types.TaskStart) error { } } -func Get() []types.Task { +func Get(r types.TaskStatusRequest) (*types.Task, error) { taskMutex.Lock() defer taskMutex.Unlock() - t := []types.Task{} - for i := range tasks { - t = append(t, *tasks[i]) + t, ok := tasks[r.ID] + if !ok { + return nil, fmt.Errorf("Task does not Exist") } - return t + + res := *t + + // Send only new logs if there are any + if len(res.Log) >= r.LogOffset { + res.Log = res.Log[r.LogOffset:] + } + + return &res, nil } func DeleteTask(id int) error { diff --git a/types/task.go b/types/task.go index 7e94a49..e47fce2 100644 --- a/types/task.go +++ b/types/task.go @@ -2,6 +2,8 @@ package types import ( "encoding/json" + + "git.lastassault.de/speatzle/morffix/constants" ) type TaskStart struct { @@ -12,13 +14,18 @@ type TaskStart struct { } type Task struct { - ID int `json:"id"` - FileID int `json:"file_id"` - Type int `json:"type"` - Status int `json:"status"` - Log []string `json:"log"` + ID int `json:"id"` + FileID int `json:"file_id"` + Type int `json:"type"` + Status constants.TaskStatus `json:"status"` + Log []string `json:"log"` } type TaskStatus struct { - Tasks []Task `json:"tasks"` + Task Task `json:"task"` +} + +type TaskStatusRequest struct { + ID int `json:"id" db:"id"` + LogOffset int `json:"log_offset" db:"log_offset"` } diff --git a/worker/task.go b/worker/task.go index 80be85c..b909a9b 100644 --- a/worker/task.go +++ b/worker/task.go @@ -32,8 +32,19 @@ func taskStart(ctx context.Context, req rpc.Request) (any, error) { } func taskStatus(ctx context.Context, req rpc.Request) (any, error) { + var data types.TaskStatusRequest + err := json.Unmarshal(*req.Params, &data) + if err != nil { + return nil, fmt.Errorf("Unmarshal Task Status Request: %w", err) + } + + t, err := task.Get(data) + if err != nil { + return nil, fmt.Errorf("Get Task Status: %w", err) + } + return types.TaskStatus{ - Tasks: task.Get(), + Task: *t, }, nil }