morffix/server/worker.go
Samuel Lorch f52f517dc1
All checks were successful
/ release (push) Successful in 1m23s
Move logs from array to dedicated table
This avoids the append problem which bloats the database to 200+ gb in a
month and shrinks again to 5g with a full vacuum
2025-05-18 19:18:52 +02:00

366 lines
12 KiB
Go

package server
import (
"context"
"errors"
"fmt"
"log/slog"
"net/http"
"strings"
"sync"
"time"
"git.lastassault.de/speatzle/morffix/rpc"
"git.lastassault.de/speatzle/morffix/types"
"github.com/jackc/pgx/v5"
"git.lastassault.de/speatzle/morffix/constants"
"nhooyr.io/websocket"
)
type Worker struct {
Name string
Address string
Conn *websocket.Conn
Connected bool
ConnectionChanged time.Time
}
var Workers = map[string]*Worker{}
var WorkersMutex sync.Mutex
var rpcServer = rpc.NewServer()
func handleWorkerWebsocket(w http.ResponseWriter, r *http.Request) {
if r.Header.Get(constants.SHARED_SECRET_HEADER) != conf.SharedSecret {
w.WriteHeader(http.StatusUnauthorized)
return
}
if r.Header.Get(constants.WORKER_VERSION_HEADER) != constants.WORKER_VERSION {
w.WriteHeader(http.StatusNotImplemented)
return
}
// Connection ID
uuid := r.Header.Get(constants.UUID_HEADER)
if uuid == "" {
w.WriteHeader(http.StatusBadRequest)
return
}
name := r.Header.Get(constants.NAME_HEADER)
_, err := db.Exec(r.Context(), "INSERT INTO workers (id, name) VALUES ($1, $2) ON CONFLICT (id) DO UPDATE SET name = $2", uuid, name)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
slog.ErrorContext(r.Context(), "Error Upserting Worker", "err", err)
return
}
c, err := websocket.Accept(w, r, &websocket.AcceptOptions{
InsecureSkipVerify: true,
})
if err != nil {
slog.ErrorContext(r.Context(), "Error Accepting Worker Websocket Connection", "err", err)
return
}
defer c.CloseNow()
// Increase Read limit to 10 MB Since the task status might be very long due to ffmpeg log spam
c.SetReadLimit(10000000)
// Track Connection
func() {
WorkersMutex.Lock()
defer WorkersMutex.Unlock()
w, ok := Workers[uuid]
if ok && w.Connected {
slog.WarnContext(r.Context(), "Worker Reconnected before the old connection died, killing...")
w.Conn.CloseNow()
// Since we Closed the Connection, we need to wait for the old connections deferred cleanup to finish before Writing down the new connection or we will be overwritten.
for {
WorkersMutex.Unlock()
time.Sleep(time.Millisecond * 10)
WorkersMutex.Lock()
if !Workers[uuid].Connected {
break
}
}
}
Workers[uuid] = &Worker{
Name: name,
Address: r.RemoteAddr,
Conn: c,
Connected: true,
ConnectionChanged: time.Now(),
}
// TODO call function for init worker connect or call function to handle Worker Reconnection (get task status, continue processing)
}()
// Set Status on Disconnect
defer func() {
WorkersMutex.Lock()
defer WorkersMutex.Unlock()
Workers[uuid].Connected = false
Workers[uuid].ConnectionChanged = time.Now()
}()
slog.InfoContext(r.Context(), "Worker Websocket Connection", "RemoteAddress", r.RemoteAddr)
for {
err = readMessage(r.Context(), c)
if websocket.CloseStatus(err) == websocket.StatusNormalClosure || websocket.CloseStatus(err) == websocket.StatusAbnormalClosure || websocket.CloseStatus(err) == websocket.StatusGoingAway {
slog.InfoContext(r.Context(), "Websocket Closed")
return
}
if err != nil {
slog.ErrorContext(r.Context(), "Error Reading Websocket Message", "err", err)
return
}
}
}
func readMessage(ctx context.Context, c *websocket.Conn) error {
t, data, err := c.Read(ctx)
if err != nil {
return err
}
slog.DebugContext(ctx, "Got Websocket Message", "type", t.String(), "data", data)
rpcServer.HandleMessage(ctx, c, data)
return nil
}
func manageWorkers(stop chan bool) {
deadTicker := time.NewTicker(time.Second)
statusTicker := time.NewTicker(time.Second)
assignTicker := time.NewTicker(time.Second)
for {
select {
case <-deadTicker.C:
killDeadWorkers()
case <-statusTicker.C:
updateWorkerTaskStatus(context.TODO())
case <-assignTicker.C:
err := assignQueuedTasks(context.TODO())
if err != nil {
slog.Error("Assigning Queued Tasks", "err", err)
}
case <-stop:
return
}
}
}
func killDeadWorkers() {
WorkersMutex.Lock()
defer WorkersMutex.Unlock()
for uuid, w := range Workers {
// TODO Not dead Workers are sometimes also killed?
if !w.Connected && w.ConnectionChanged.Add(time.Minute*5).Before(time.Now()) {
slog.Warn("Removing Dead Worker", "uuid", uuid, "name", w.Name)
delete(Workers, uuid)
// TODO Free any Jobs that the Worker had
}
}
}
func updateWorkerTaskStatus(ctx context.Context) {
var wg sync.WaitGroup
// TODO figure out why this locks up on worker status error
func() {
WorkersMutex.Lock()
defer WorkersMutex.Unlock()
for uuid := range Workers {
if Workers[uuid].Connected {
w := Workers[uuid]
rows, err := db.Query(ctx, "SELECT id, log_offset FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4 OR status = $5)", uuid, constants.TASK_STATUS_UNKNOWN, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING)
if err != nil {
slog.ErrorContext(ctx, "Error Getting Tasks for Worker", "err", err, "worker_id", uuid)
return
}
taskStatusRequests, err := pgx.CollectRows[types.TaskStatusRequest](rows, pgx.RowToStructByName[types.TaskStatusRequest])
if err != nil {
slog.ErrorContext(ctx, "Collect Task Status Rows", "err", err)
return
}
for _, taskStatusRequest := range taskStatusRequests {
tsr := taskStatusRequest
taskID := tsr.ID
wg.Add(1)
go func() {
defer wg.Done()
var ts types.TaskStatus
_, err := rpcServer.Call(ctx, w.Conn, "task-status", tsr, &ts)
if err != nil {
tx, err := db.Begin(ctx)
if err != nil {
slog.ErrorContext(ctx, "Task Status Error Begin Transaction", "err", err)
return
}
defer tx.Rollback(ctx)
// Find better way to compare errors which where send via websocket
if strings.HasSuffix(err.Error(), constants.ErrTaskDoesNotExist.Error()) {
// Worker says it does not know of this task, mark it failed so that we don't asks the worker about it again and again
slog.ErrorContext(ctx, "Task is unknown by worker, Failing...", "err", err, "id", taskID)
_, err = tx.Exec(ctx, "UPDATE tasks SET status = $2 WHERE id = $1", taskID, constants.TASK_STATUS_FAILED)
if err != nil {
slog.ErrorContext(ctx, "Updating Failed Task Status", "err", err)
return
}
_, err = tx.Exec(ctx, "INSERT INTO logs(task_id, message) VALUES($1,$2)", taskID, fmt.Sprintf("%v MASTER: Task Failed because it is unknown to Assigned Worker", time.Now()))
if err != nil {
slog.ErrorContext(ctx, "Updating Failed Task Log", "err", err)
return
}
slog.Info("Updating task done", "id", taskID, "status", constants.TASK_STATUS_FAILED)
} else if errors.Is(err, constants.ErrRPCRequestTimeout) {
// We really don't know whats going on, might be slow response, oom, disk full or a bug
_, err = tx.Exec(ctx, "UPDATE tasks SET status = $2 WHERE id = $1", taskID, constants.TASK_STATUS_UNKNOWN)
if err != nil {
slog.ErrorContext(ctx, "Updating Unknown Task Status due to Timeout", "err", err)
return
}
_, err = tx.Exec(ctx, "INSERT INTO logs(task_id, message) VALUES($1,$2)", taskID, fmt.Sprintf("%v MASTER: Status RPC Call Timed Out!", time.Now()))
if err != nil {
slog.ErrorContext(ctx, "Updating Unknown Task Log due to Timeout", "err", err)
return
}
} else {
slog.ErrorContext(ctx, "Getting Task Status", "err", err)
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2 WHERE id = $1", taskID, constants.TASK_STATUS_UNKNOWN)
if err != nil {
slog.ErrorContext(ctx, "Updating Unknown Task Status", "err", err)
return
}
_, err = tx.Exec(ctx, "INSERT INTO logs(task_id, message) VALUES($1,$2)", taskID, fmt.Sprintf("%v MASTER: Status RPC Call Error: %v", time.Now(), err.Error()))
if err != nil {
slog.ErrorContext(ctx, "Updating Unknown Task Log", "err", err)
return
}
}
err = tx.Commit(ctx)
if err != nil {
slog.ErrorContext(ctx, "Task Status Error Commit Transaction", "err", err)
}
return
}
tx, err := db.Begin(ctx)
if err != nil {
slog.ErrorContext(ctx, "Task Status Success Begin Transaction", "err", err)
return
}
_, err = tx.Exec(ctx, "UPDATE tasks SET status = $2, log_offset = log_offset + $3 WHERE id = $1", taskID, ts.Task.Status, len(ts.Task.Log))
if err != nil {
slog.ErrorContext(ctx, "Updating Task Status and offset", "err", err)
return
}
// TODO batch these or use copy protocol
for _, l := range ts.Task.Log {
_, err = tx.Exec(ctx, "INSERT INTO logs(task_id, message) VALUES($1,$2)", taskID, l)
if err != nil {
slog.ErrorContext(ctx, "Updating Task Log", "err", err)
return
}
}
err = tx.Commit(ctx)
if err != nil {
slog.ErrorContext(ctx, "Task Status Commit Transaction", "err", err)
return
}
if ts.Task.Status == constants.TASK_STATUS_FAILED || ts.Task.Status == constants.TASK_STATUS_SUCCESS {
if ts.Task.Type == constants.TASK_TYPE_HEALTHCHECK {
var health constants.FileHealth
if ts.Task.Status == constants.TASK_STATUS_SUCCESS {
health = constants.FILE_HEALTH_HEALTHY
// Auto Queue Transcode for Successfull Transcodes if library setting say so
go queueOnHealth(context.TODO(), ts.Task.ID)
} else {
// TODO, not all failures mean the file is damaged, only update on success and track ffmpeg errors in task result data. also remove -xerror and scan for errors manually to see all problems in logs
health = constants.FILE_HEALTH_DAMAGED
}
_, err = db.Exec(ctx, "UPDATE files SET health = $2 WHERE id = $1", ts.Task.FileID, health)
if err != nil {
slog.ErrorContext(ctx, "Updating File health", "err", err)
return
}
} else if ts.Task.Type == constants.TASK_TYPE_TRANSCODE {
var transcode constants.FileTranscode
if ts.Task.Status == constants.TASK_STATUS_SUCCESS {
transcode = constants.FILE_TRANSCODE_SUCCESS
} else {
transcode = constants.FILE_TRANSCODE_FAILED
}
_, err = db.Exec(ctx, "UPDATE files SET transcode = $2 WHERE id = $1", ts.Task.FileID, transcode)
if err != nil {
slog.ErrorContext(ctx, "Updating File transcode", "err", err)
return
}
}
// Tell Worker to Delete Finished Tasks
_, err := rpcServer.Call(ctx, w.Conn, "task-delete", taskID, nil)
if err != nil {
slog.ErrorContext(ctx, "Deleting Finished Task From Worker", "err", err)
return
}
}
}()
}
// TODO Handle tasks with status unkown assigned to this worker
// maybe requeue after 5 minutes?
} else {
// TODO wait for 5 minutes for worker to reconnect
// Set Task Status to Unknown for Unfinished Tasks which where assigned to this not connected worker, right now they just get stuck if worker does not reconnect
}
}
}()
wg.Wait()
}
func queueOnHealth(ctx context.Context, taskID int) {
var transcode_command_id *int
var file_id *int
err := db.QueryRow(ctx, `SELECT libraries.transcode_command_id, files.id
FROM tasks
INNER JOIN files ON files.id = tasks.file_id
INNER JOIN libraries ON files.library_id = libraries.id
WHERE tasks.id = $1 AND libraries.health_ok_queue_transcode = true AND libraries.transcode_command_id IS NOT NULL`, taskID).
Scan(&transcode_command_id, &file_id)
if err == pgx.ErrNoRows {
slog.DebugContext(ctx, "No need to queue Transcode", "task_id", taskID)
} else if err != nil {
slog.ErrorContext(ctx, "Unable to Query if and how we should queue a Transcode on Health Check Success", "err", err)
return
}
slog.InfoContext(ctx, "Queueing Transcode On HealthCheck Success", "health_task_id", taskID, "transcode_command_id", transcode_command_id)
_, err = db.Exec(ctx, "INSERT INTO tasks (file_id, type, status, data, ffmpeg_command_id) VALUES ($1,$2,$3,$4,$5)", file_id, constants.TASK_TYPE_TRANSCODE, constants.TASK_STATUS_QUEUED, nil, transcode_command_id)
if err != nil {
slog.ErrorContext(ctx, "Queueing Transcode On HealthCheck Success", "err", err, "health_task_id", taskID, "transcode_command_id", transcode_command_id)
}
}