Ask For Task Status Seperatly and logs incrementally

This commit is contained in:
speatzle 2024-05-09 19:09:48 +02:00
parent d185784cd1
commit 12e83aa8d2
4 changed files with 73 additions and 34 deletions

View file

@ -9,6 +9,7 @@ import (
"git.lastassault.de/speatzle/morffix/rpc" "git.lastassault.de/speatzle/morffix/rpc"
"git.lastassault.de/speatzle/morffix/types" "git.lastassault.de/speatzle/morffix/types"
"github.com/jackc/pgx/v5"
"git.lastassault.de/speatzle/morffix/constants" "git.lastassault.de/speatzle/morffix/constants"
@ -173,41 +174,53 @@ func updateWorkerTaskStatus(ctx context.Context) {
for uuid := range Workers { for uuid := range Workers {
if Workers[uuid].Connected { if Workers[uuid].Connected {
w := Workers[uuid] w := Workers[uuid]
wg.Add(1)
go func() { rows, err := db.Query(ctx, "SELECT id, COALESCE(CARDINALITY(log),0) as log_offset FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4)", uuid, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING)
defer wg.Done() if err != nil {
var status types.TaskStatus slog.ErrorContext(ctx, "Error Getting Tasks for Worker", "err", err, "worker_id", uuid)
_, err := rpcServer.Call(ctx, w.Conn, "task-status", nil, &status) return
if err != nil { }
slog.ErrorContext(ctx, "Error Getting Task Status", "err", err)
// TODO Mark Worker / Tasks as Unknown?
return
}
for _, t := range status.Tasks { taskStatusRequests, err := pgx.CollectRows[types.TaskStatusRequest](rows, pgx.RowToStructByName[types.TaskStatusRequest])
// TODO check if this Task was even assigned to this Worker if err != nil {
slog.ErrorContext(ctx, "Collect Task Status Rows", "err", err)
return
}
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = $3 WHERE id = $1", t.ID, t.Status, t.Log) for _, tsr := range taskStatusRequests {
wg.Add(1)
go func() {
defer wg.Done()
var ts types.TaskStatus
_, err := rpcServer.Call(ctx, w.Conn, "task-status", tsr, &ts)
if err != nil {
slog.ErrorContext(ctx, "Error Getting Task Status", "err", err)
// TODO Mark Task as Unknown?
return
}
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", ts.Task.ID, ts.Task.Status, ts.Task.Log)
if err != nil { if err != nil {
slog.ErrorContext(ctx, "Error Updating Task Status", "err", err) slog.ErrorContext(ctx, "Error Updating Task Status", "err", err)
continue return
} }
// Tell Worker to Delete Finished Tasks // Tell Worker to Delete Finished Tasks
if t.Status == constants.TASK_STATUS_FAILED || t.Status == constants.TASK_STATUS_SUCCESS { if ts.Task.Status == constants.TASK_STATUS_FAILED || ts.Task.Status == constants.TASK_STATUS_SUCCESS {
_, err := rpcServer.Call(ctx, w.Conn, "task-delete", t.ID, nil) _, err := rpcServer.Call(ctx, w.Conn, "task-delete", ts.Task.ID, nil)
if err != nil { if err != nil {
slog.ErrorContext(ctx, "Error Deleting Finished Task From Worker", "err", err) slog.ErrorContext(ctx, "Error Deleting Finished Task From Worker", "err", err)
continue return
} }
} }
}
// TODO Set Task Status to QUEUED For Unfinished Tasks which where assigned to this worker but where not in the status response }()
}() }
// TODO Handle tasks with status unkown assigned to this worker
} else { } else {
// Set Task Status to Unknown for Unfinished Tasks which where assigned to this not connected worker // Set Task Status to Unknown for Unfinished Tasks which where assigned to this not connected worker, right now they just get stuck
} }
} }
}() }()

View file

@ -25,7 +25,7 @@ func StartTask(conf config.Config, data types.TaskStart) error {
switch data.Type { switch data.Type {
case constants.TASK_TYPE_HEALTHCHECK: case constants.TASK_TYPE_HEALTHCHECK:
var hData HealthCheckData var hData types.HealthCheckData
err := json.Unmarshal(data.Data, &hData) err := json.Unmarshal(data.Data, &hData)
if err != nil { if err != nil {
return fmt.Errorf("Unmarshal Healthcheck Data: %w", err) return fmt.Errorf("Unmarshal Healthcheck Data: %w", err)
@ -49,15 +49,23 @@ func StartTask(conf config.Config, data types.TaskStart) error {
} }
} }
func Get() []types.Task { func Get(r types.TaskStatusRequest) (*types.Task, error) {
taskMutex.Lock() taskMutex.Lock()
defer taskMutex.Unlock() defer taskMutex.Unlock()
t := []types.Task{} t, ok := tasks[r.ID]
for i := range tasks { if !ok {
t = append(t, *tasks[i]) return nil, fmt.Errorf("Task does not Exist")
} }
return t
res := *t
// Send only new logs if there are any
if len(res.Log) >= r.LogOffset {
res.Log = res.Log[r.LogOffset:]
}
return &res, nil
} }
func DeleteTask(id int) error { func DeleteTask(id int) error {

View file

@ -2,6 +2,8 @@ package types
import ( import (
"encoding/json" "encoding/json"
"git.lastassault.de/speatzle/morffix/constants"
) )
type TaskStart struct { type TaskStart struct {
@ -12,13 +14,18 @@ type TaskStart struct {
} }
type Task struct { type Task struct {
ID int `json:"id"` ID int `json:"id"`
FileID int `json:"file_id"` FileID int `json:"file_id"`
Type int `json:"type"` Type int `json:"type"`
Status int `json:"status"` Status constants.TaskStatus `json:"status"`
Log []string `json:"log"` Log []string `json:"log"`
} }
type TaskStatus struct { type TaskStatus struct {
Tasks []Task `json:"tasks"` Task Task `json:"task"`
}
type TaskStatusRequest struct {
ID int `json:"id" db:"id"`
LogOffset int `json:"log_offset" db:"log_offset"`
} }

View file

@ -32,8 +32,19 @@ func taskStart(ctx context.Context, req rpc.Request) (any, error) {
} }
func taskStatus(ctx context.Context, req rpc.Request) (any, error) { func taskStatus(ctx context.Context, req rpc.Request) (any, error) {
var data types.TaskStatusRequest
err := json.Unmarshal(*req.Params, &data)
if err != nil {
return nil, fmt.Errorf("Unmarshal Task Status Request: %w", err)
}
t, err := task.Get(data)
if err != nil {
return nil, fmt.Errorf("Get Task Status: %w", err)
}
return types.TaskStatus{ return types.TaskStatus{
Tasks: task.Get(), Task: *t,
}, nil }, nil
} }