morffix/server/worker.go

254 lines
7.4 KiB
Go

package server
import (
"context"
"log/slog"
"net/http"
"sync"
"time"
"git.lastassault.de/speatzle/morffix/rpc"
"git.lastassault.de/speatzle/morffix/types"
"github.com/jackc/pgx/v5"
"git.lastassault.de/speatzle/morffix/constants"
"nhooyr.io/websocket"
)
type Worker struct {
Name string
Address string
Conn *websocket.Conn
Connected bool
ConnectionChanged time.Time
}
var Workers = map[string]*Worker{}
var WorkersMutex sync.Mutex
var rpcServer = rpc.NewServer()
func handleWorkerWebsocket(w http.ResponseWriter, r *http.Request) {
if r.Header.Get(constants.SHARED_SECRET_HEADER) != conf.SharedSecret {
w.WriteHeader(http.StatusUnauthorized)
return
}
if r.Header.Get(constants.WORKER_VERSION_HEADER) != constants.WORKER_VERSION {
w.WriteHeader(http.StatusNotImplemented)
return
}
// Connection ID
uuid := r.Header.Get(constants.UUID_HEADER)
if uuid == "" {
w.WriteHeader(http.StatusBadRequest)
return
}
name := r.Header.Get(constants.NAME_HEADER)
_, err := db.Exec(r.Context(), "INSERT INTO workers (id, name) VALUES ($1, $2) ON CONFLICT (id) DO UPDATE SET name = $2", uuid, name)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
slog.ErrorContext(r.Context(), "Error Upserting Worker", "err", err)
return
}
c, err := websocket.Accept(w, r, &websocket.AcceptOptions{
InsecureSkipVerify: true,
})
if err != nil {
slog.ErrorContext(r.Context(), "Error Accepting Worker Websocket Connection", "err", err)
return
}
defer c.CloseNow()
// Track Connection
func() {
WorkersMutex.Lock()
defer WorkersMutex.Unlock()
w, ok := Workers[uuid]
if ok && w.Connected {
slog.WarnContext(r.Context(), "Worker Reconnected before the old connection died, killing...")
w.Conn.CloseNow()
// Since we Closed the Connection, we need to wait for the old connections deferred cleanup to finish before Writing down the new connection or we will be overwritten.
for {
WorkersMutex.Unlock()
time.Sleep(time.Millisecond * 10)
WorkersMutex.Lock()
if !Workers[uuid].Connected {
break
}
}
}
Workers[uuid] = &Worker{
Name: name,
Address: r.RemoteAddr,
Conn: c,
Connected: true,
ConnectionChanged: time.Now(),
}
// TODO call function for init worker connect or call function to handle Worker Reconnection (get task status, continue processing)
}()
// Set Status on Disconnect
defer func() {
WorkersMutex.Lock()
defer WorkersMutex.Unlock()
Workers[uuid].Connected = false
Workers[uuid].ConnectionChanged = time.Now()
}()
slog.InfoContext(r.Context(), "Worker Websocket Connection", "RemoteAddress", r.RemoteAddr)
for {
err = readMessage(r.Context(), c)
if websocket.CloseStatus(err) == websocket.StatusNormalClosure || websocket.CloseStatus(err) == websocket.StatusAbnormalClosure || websocket.CloseStatus(err) == websocket.StatusGoingAway {
slog.InfoContext(r.Context(), "Websocket Closed")
return
}
if err != nil {
slog.ErrorContext(r.Context(), "Error Reading Websocket Message", "err", err)
return
}
}
}
func readMessage(ctx context.Context, c *websocket.Conn) error {
t, data, err := c.Read(ctx)
if err != nil {
return err
}
slog.InfoContext(ctx, "Got Websocket Message", "type", t.String(), "data", data)
rpcServer.HandleMessage(ctx, c, data)
return nil
}
func manageWorkers(stop chan bool) {
deadTicker := time.NewTicker(time.Second)
statusTicker := time.NewTicker(time.Second)
assignTicker := time.NewTicker(time.Second)
for {
select {
case <-deadTicker.C:
killDeadWorkers()
case <-statusTicker.C:
updateWorkerTaskStatus(context.TODO())
case <-assignTicker.C:
err := assignQueuedTasks(context.TODO())
if err != nil {
slog.Error("Assigning Queued Tasks", "err", err)
}
case <-stop:
return
}
}
}
func killDeadWorkers() {
WorkersMutex.Lock()
defer WorkersMutex.Unlock()
for uuid, w := range Workers {
// TODO Not dead Workers are sometimes also killed?
if !w.Connected && w.ConnectionChanged.Add(time.Minute*5).Before(time.Now()) {
slog.Warn("Removing Dead Worker", "uuid", uuid, "name", w.Name)
delete(Workers, uuid)
// TODO Free any Jobs that the Worker had
}
}
}
func updateWorkerTaskStatus(ctx context.Context) {
var wg sync.WaitGroup
// TODO figure out why this locks up on worker status error
func() {
WorkersMutex.Lock()
defer WorkersMutex.Unlock()
for uuid := range Workers {
if Workers[uuid].Connected {
w := Workers[uuid]
rows, err := db.Query(ctx, "SELECT id, COALESCE(CARDINALITY(log),0) as log_offset FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4)", uuid, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING)
if err != nil {
slog.ErrorContext(ctx, "Error Getting Tasks for Worker", "err", err, "worker_id", uuid)
return
}
taskStatusRequests, err := pgx.CollectRows[types.TaskStatusRequest](rows, pgx.RowToStructByName[types.TaskStatusRequest])
if err != nil {
slog.ErrorContext(ctx, "Collect Task Status Rows", "err", err)
return
}
for _, tsr := range taskStatusRequests {
wg.Add(1)
go func() {
defer wg.Done()
var ts types.TaskStatus
_, err := rpcServer.Call(ctx, w.Conn, "task-status", tsr, &ts)
if err != nil {
slog.ErrorContext(ctx, "Error Getting Task Status", "err", err)
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2 WHERE id = $1", ts.Task.ID, constants.TASK_STATUS_UNKNOWN)
if err != nil {
slog.ErrorContext(ctx, "Error Updating Unknown Task Status", "err", err)
return
}
return
}
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", ts.Task.ID, ts.Task.Status, ts.Task.Log)
if err != nil {
slog.ErrorContext(ctx, "Error Updating Task Status", "err", err)
return
}
if ts.Task.Type == constants.TASK_TYPE_HEALTHCHECK && (ts.Task.Status == constants.TASK_STATUS_FAILED || ts.Task.Status == constants.TASK_STATUS_SUCCESS) {
var health constants.FileHealth
if ts.Task.Status == constants.TASK_STATUS_SUCCESS {
health = constants.FILE_HEALTH_HEALTHY
} else {
// TODO, not all failures mean the file is damaged, only update on success and track ffmpeg errors in task result data. also remove -xerror and scan for errors manually to see all problems in logs
health = constants.FILE_HEALTH_DAMAGED
}
_, err = db.Exec(ctx, "UPDATE files SET health = $2 WHERE id = $1", ts.Task.FileID, health)
if err != nil {
slog.ErrorContext(ctx, "Error Updating File health", "err", err)
return
}
}
// Tell Worker to Delete Finished Tasks
if ts.Task.Status == constants.TASK_STATUS_FAILED || ts.Task.Status == constants.TASK_STATUS_SUCCESS {
_, err := rpcServer.Call(ctx, w.Conn, "task-delete", ts.Task.ID, nil)
if err != nil {
slog.ErrorContext(ctx, "Error Deleting Finished Task From Worker", "err", err)
return
}
}
}()
}
// TODO Handle tasks with status unkown assigned to this worker
// maybe requeue after 5 minutes?
} else {
// TODO wait for 5 minutes for worker to reconnect
// Set Task Status to Unknown for Unfinished Tasks which where assigned to this not connected worker, right now they just get stuck
}
}
}()
wg.Wait()
}