diff --git a/server/worker.go b/server/worker.go index fa5fd18..bcbf4a0 100644 --- a/server/worker.go +++ b/server/worker.go @@ -8,6 +8,7 @@ import ( "time" "git.lastassault.de/speatzle/morffix/rpc" + "git.lastassault.de/speatzle/morffix/types" "git.lastassault.de/speatzle/morffix/constants" @@ -126,26 +127,90 @@ func readMessage(ctx context.Context, c *websocket.Conn) error { return nil } -func cleanupDeadWorkers(stop chan bool) { - ticker := time.NewTicker(time.Second) +func manageWorkers(stop chan bool) { + deadTicker := time.NewTicker(time.Second) + statusTicker := time.NewTicker(time.Second) + assignTicker := time.NewTicker(time.Second) + for { select { - case <-ticker.C: - func() { - WorkersMutex.Lock() - defer WorkersMutex.Unlock() - - for uuid, w := range Workers { - // TODO Not dead Workers are sometimes also killed? - if !w.Connected && w.ConnectionChanged.Add(time.Minute*5).Before(time.Now()) { - slog.Warn("Removing Dead Worker", "uuid", uuid, "name", w.Name) - delete(Workers, uuid) - // TODO Free any Jobs that the Worker had - } - } - }() + case <-deadTicker.C: + killDeadWorkers() + case <-statusTicker.C: + updateWorkerTaskStatus(context.TODO()) + case <-assignTicker.C: + err := assignQueuedTasks(context.TODO()) + if err != nil { + slog.Error("Assigning Queued Tasks", "err", err) + } case <-stop: return } } } + +func killDeadWorkers() { + WorkersMutex.Lock() + defer WorkersMutex.Unlock() + + for uuid, w := range Workers { + // TODO Not dead Workers are sometimes also killed? + if !w.Connected && w.ConnectionChanged.Add(time.Minute*5).Before(time.Now()) { + slog.Warn("Removing Dead Worker", "uuid", uuid, "name", w.Name) + delete(Workers, uuid) + // TODO Free any Jobs that the Worker had + } + } +} + +func updateWorkerTaskStatus(ctx context.Context) { + var wg sync.WaitGroup + + func() { + WorkersMutex.Lock() + defer WorkersMutex.Unlock() + + for uuid := range Workers { + if Workers[uuid].Connected { + w := Workers[uuid] + wg.Add(1) + + go func() { + defer wg.Done() + var status types.TaskStatus + _, err := rpcServer.Call(ctx, w.Conn, "task-status", nil, &status) + if err != nil { + slog.ErrorContext(ctx, "Error Getting Task Status", "err", err) + // TODO Mark Worker / Tasks as Unknown? + return + } + + for _, t := range status.Tasks { + // TODO check if this Task was even assigned to this Worker + + _, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = $3 WHERE id = $1", t.ID, t.Status, t.Log) + if err != nil { + slog.ErrorContext(ctx, "Error Updating Task Status", "err", err) + continue + } + + // Tell Worker to Delete Finished Tasks + if t.Status == constants.TASK_STATUS_FAILED || t.Status == constants.TASK_STATUS_SUCCESS { + _, err := rpcServer.Call(ctx, w.Conn, "task-delete", t.ID, nil) + if err != nil { + slog.ErrorContext(ctx, "Error Deleting Finished Task From Worker", "err", err) + continue + } + } + } + + // TODO Set Task Status to QUEUED For Unfinished Tasks which where assigned to this worker but where not in the status response + }() + } else { + // Set Task Status to Unknown for Unfinished Tasks which where assigned to this not connected worker + } + } + }() + + wg.Wait() +}