package server import ( "context" "log/slog" "net/http" "sync" "time" "git.lastassault.de/speatzle/morffix/rpc" "git.lastassault.de/speatzle/morffix/types" "github.com/jackc/pgx/v5" "git.lastassault.de/speatzle/morffix/constants" "nhooyr.io/websocket" ) type Worker struct { Name string Address string Conn *websocket.Conn Connected bool ConnectionChanged time.Time } var Workers = map[string]*Worker{} var WorkersMutex sync.Mutex var rpcServer = rpc.NewServer() func handleWorkerWebsocket(w http.ResponseWriter, r *http.Request) { if r.Header.Get(constants.SHARED_SECRET_HEADER) != conf.SharedSecret { w.WriteHeader(http.StatusUnauthorized) return } if r.Header.Get(constants.WORKER_VERSION_HEADER) != constants.WORKER_VERSION { w.WriteHeader(http.StatusNotImplemented) return } // Connection ID uuid := r.Header.Get(constants.UUID_HEADER) if uuid == "" { w.WriteHeader(http.StatusBadRequest) return } name := r.Header.Get(constants.NAME_HEADER) _, err := db.Exec(r.Context(), "INSERT INTO workers (id, name) VALUES ($1, $2) ON CONFLICT (id) DO UPDATE SET name = $2", uuid, name) if err != nil { w.WriteHeader(http.StatusInternalServerError) slog.ErrorContext(r.Context(), "Error Upserting Worker", "err", err) return } c, err := websocket.Accept(w, r, &websocket.AcceptOptions{ InsecureSkipVerify: true, }) if err != nil { slog.ErrorContext(r.Context(), "Error Accepting Worker Websocket Connection", "err", err) return } defer c.CloseNow() // Increase Read limit to 10 MB Since the task status might be very long due to ffmpeg log spam c.SetReadLimit(10000000) // Track Connection func() { WorkersMutex.Lock() defer WorkersMutex.Unlock() w, ok := Workers[uuid] if ok && w.Connected { slog.WarnContext(r.Context(), "Worker Reconnected before the old connection died, killing...") w.Conn.CloseNow() // Since we Closed the Connection, we need to wait for the old connections deferred cleanup to finish before Writing down the new connection or we will be overwritten. for { WorkersMutex.Unlock() time.Sleep(time.Millisecond * 10) WorkersMutex.Lock() if !Workers[uuid].Connected { break } } } Workers[uuid] = &Worker{ Name: name, Address: r.RemoteAddr, Conn: c, Connected: true, ConnectionChanged: time.Now(), } // TODO call function for init worker connect or call function to handle Worker Reconnection (get task status, continue processing) }() // Set Status on Disconnect defer func() { WorkersMutex.Lock() defer WorkersMutex.Unlock() Workers[uuid].Connected = false Workers[uuid].ConnectionChanged = time.Now() }() slog.InfoContext(r.Context(), "Worker Websocket Connection", "RemoteAddress", r.RemoteAddr) for { err = readMessage(r.Context(), c) if websocket.CloseStatus(err) == websocket.StatusNormalClosure || websocket.CloseStatus(err) == websocket.StatusAbnormalClosure || websocket.CloseStatus(err) == websocket.StatusGoingAway { slog.InfoContext(r.Context(), "Websocket Closed") return } if err != nil { slog.ErrorContext(r.Context(), "Error Reading Websocket Message", "err", err) return } } } func readMessage(ctx context.Context, c *websocket.Conn) error { t, data, err := c.Read(ctx) if err != nil { return err } slog.InfoContext(ctx, "Got Websocket Message", "type", t.String(), "data", data) rpcServer.HandleMessage(ctx, c, data) return nil } func manageWorkers(stop chan bool) { deadTicker := time.NewTicker(time.Second) statusTicker := time.NewTicker(time.Second) assignTicker := time.NewTicker(time.Second) for { select { case <-deadTicker.C: killDeadWorkers() case <-statusTicker.C: updateWorkerTaskStatus(context.TODO()) case <-assignTicker.C: err := assignQueuedTasks(context.TODO()) if err != nil { slog.Error("Assigning Queued Tasks", "err", err) } case <-stop: return } } } func killDeadWorkers() { WorkersMutex.Lock() defer WorkersMutex.Unlock() for uuid, w := range Workers { // TODO Not dead Workers are sometimes also killed? if !w.Connected && w.ConnectionChanged.Add(time.Minute*5).Before(time.Now()) { slog.Warn("Removing Dead Worker", "uuid", uuid, "name", w.Name) delete(Workers, uuid) // TODO Free any Jobs that the Worker had } } } func updateWorkerTaskStatus(ctx context.Context) { var wg sync.WaitGroup // TODO figure out why this locks up on worker status error func() { WorkersMutex.Lock() defer WorkersMutex.Unlock() for uuid := range Workers { if Workers[uuid].Connected { w := Workers[uuid] rows, err := db.Query(ctx, "SELECT id, COALESCE(CARDINALITY(log),0) as log_offset FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4)", uuid, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING) if err != nil { slog.ErrorContext(ctx, "Error Getting Tasks for Worker", "err", err, "worker_id", uuid) return } taskStatusRequests, err := pgx.CollectRows[types.TaskStatusRequest](rows, pgx.RowToStructByName[types.TaskStatusRequest]) if err != nil { slog.ErrorContext(ctx, "Collect Task Status Rows", "err", err) return } for _, tsr := range taskStatusRequests { wg.Add(1) go func() { defer wg.Done() var ts types.TaskStatus _, err := rpcServer.Call(ctx, w.Conn, "task-status", tsr, &ts) if err != nil { slog.ErrorContext(ctx, "Error Getting Task Status", "err", err) _, err = db.Exec(ctx, "UPDATE tasks SET status = $2 WHERE id = $1", ts.Task.ID, constants.TASK_STATUS_UNKNOWN) if err != nil { slog.ErrorContext(ctx, "Error Updating Unknown Task Status", "err", err) return } return } _, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", ts.Task.ID, ts.Task.Status, ts.Task.Log) if err != nil { slog.ErrorContext(ctx, "Error Updating Task Status", "err", err) return } if ts.Task.Type == constants.TASK_TYPE_HEALTHCHECK && (ts.Task.Status == constants.TASK_STATUS_FAILED || ts.Task.Status == constants.TASK_STATUS_SUCCESS) { var health constants.FileHealth if ts.Task.Status == constants.TASK_STATUS_SUCCESS { health = constants.FILE_HEALTH_HEALTHY } else { // TODO, not all failures mean the file is damaged, only update on success and track ffmpeg errors in task result data. also remove -xerror and scan for errors manually to see all problems in logs health = constants.FILE_HEALTH_DAMAGED } _, err = db.Exec(ctx, "UPDATE files SET health = $2 WHERE id = $1", ts.Task.FileID, health) if err != nil { slog.ErrorContext(ctx, "Error Updating File health", "err", err) return } } // Tell Worker to Delete Finished Tasks if ts.Task.Status == constants.TASK_STATUS_FAILED || ts.Task.Status == constants.TASK_STATUS_SUCCESS { _, err := rpcServer.Call(ctx, w.Conn, "task-delete", ts.Task.ID, nil) if err != nil { slog.ErrorContext(ctx, "Error Deleting Finished Task From Worker", "err", err) return } } }() } // TODO Handle tasks with status unkown assigned to this worker // maybe requeue after 5 minutes? } else { // TODO wait for 5 minutes for worker to reconnect // Set Task Status to Unknown for Unfinished Tasks which where assigned to this not connected worker, right now they just get stuck } } }() wg.Wait() }