morffix/server/worker.go
speatzle f2e2236653
All checks were successful
/ release (push) Successful in 29s
fix sql
2024-10-11 17:45:23 +02:00

320 lines
11 KiB
Go

package server
import (
"context"
"errors"
"fmt"
"log/slog"
"net/http"
"strings"
"sync"
"time"
"git.lastassault.de/speatzle/morffix/rpc"
"git.lastassault.de/speatzle/morffix/types"
"github.com/jackc/pgx/v5"
"git.lastassault.de/speatzle/morffix/constants"
"nhooyr.io/websocket"
)
type Worker struct {
Name string
Address string
Conn *websocket.Conn
Connected bool
ConnectionChanged time.Time
}
var Workers = map[string]*Worker{}
var WorkersMutex sync.Mutex
var rpcServer = rpc.NewServer()
func handleWorkerWebsocket(w http.ResponseWriter, r *http.Request) {
if r.Header.Get(constants.SHARED_SECRET_HEADER) != conf.SharedSecret {
w.WriteHeader(http.StatusUnauthorized)
return
}
if r.Header.Get(constants.WORKER_VERSION_HEADER) != constants.WORKER_VERSION {
w.WriteHeader(http.StatusNotImplemented)
return
}
// Connection ID
uuid := r.Header.Get(constants.UUID_HEADER)
if uuid == "" {
w.WriteHeader(http.StatusBadRequest)
return
}
name := r.Header.Get(constants.NAME_HEADER)
_, err := db.Exec(r.Context(), "INSERT INTO workers (id, name) VALUES ($1, $2) ON CONFLICT (id) DO UPDATE SET name = $2", uuid, name)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
slog.ErrorContext(r.Context(), "Error Upserting Worker", "err", err)
return
}
c, err := websocket.Accept(w, r, &websocket.AcceptOptions{
InsecureSkipVerify: true,
})
if err != nil {
slog.ErrorContext(r.Context(), "Error Accepting Worker Websocket Connection", "err", err)
return
}
defer c.CloseNow()
// Increase Read limit to 10 MB Since the task status might be very long due to ffmpeg log spam
c.SetReadLimit(10000000)
// Track Connection
func() {
WorkersMutex.Lock()
defer WorkersMutex.Unlock()
w, ok := Workers[uuid]
if ok && w.Connected {
slog.WarnContext(r.Context(), "Worker Reconnected before the old connection died, killing...")
w.Conn.CloseNow()
// Since we Closed the Connection, we need to wait for the old connections deferred cleanup to finish before Writing down the new connection or we will be overwritten.
for {
WorkersMutex.Unlock()
time.Sleep(time.Millisecond * 10)
WorkersMutex.Lock()
if !Workers[uuid].Connected {
break
}
}
}
Workers[uuid] = &Worker{
Name: name,
Address: r.RemoteAddr,
Conn: c,
Connected: true,
ConnectionChanged: time.Now(),
}
// TODO call function for init worker connect or call function to handle Worker Reconnection (get task status, continue processing)
}()
// Set Status on Disconnect
defer func() {
WorkersMutex.Lock()
defer WorkersMutex.Unlock()
Workers[uuid].Connected = false
Workers[uuid].ConnectionChanged = time.Now()
}()
slog.InfoContext(r.Context(), "Worker Websocket Connection", "RemoteAddress", r.RemoteAddr)
for {
err = readMessage(r.Context(), c)
if websocket.CloseStatus(err) == websocket.StatusNormalClosure || websocket.CloseStatus(err) == websocket.StatusAbnormalClosure || websocket.CloseStatus(err) == websocket.StatusGoingAway {
slog.InfoContext(r.Context(), "Websocket Closed")
return
}
if err != nil {
slog.ErrorContext(r.Context(), "Error Reading Websocket Message", "err", err)
return
}
}
}
func readMessage(ctx context.Context, c *websocket.Conn) error {
t, data, err := c.Read(ctx)
if err != nil {
return err
}
slog.DebugContext(ctx, "Got Websocket Message", "type", t.String(), "data", data)
rpcServer.HandleMessage(ctx, c, data)
return nil
}
func manageWorkers(stop chan bool) {
deadTicker := time.NewTicker(time.Second)
statusTicker := time.NewTicker(time.Second)
assignTicker := time.NewTicker(time.Second)
for {
select {
case <-deadTicker.C:
killDeadWorkers()
case <-statusTicker.C:
updateWorkerTaskStatus(context.TODO())
case <-assignTicker.C:
err := assignQueuedTasks(context.TODO())
if err != nil {
slog.Error("Assigning Queued Tasks", "err", err)
}
case <-stop:
return
}
}
}
func killDeadWorkers() {
WorkersMutex.Lock()
defer WorkersMutex.Unlock()
for uuid, w := range Workers {
// TODO Not dead Workers are sometimes also killed?
if !w.Connected && w.ConnectionChanged.Add(time.Minute*5).Before(time.Now()) {
slog.Warn("Removing Dead Worker", "uuid", uuid, "name", w.Name)
delete(Workers, uuid)
// TODO Free any Jobs that the Worker had
}
}
}
func updateWorkerTaskStatus(ctx context.Context) {
var wg sync.WaitGroup
// TODO figure out why this locks up on worker status error
func() {
WorkersMutex.Lock()
defer WorkersMutex.Unlock()
for uuid := range Workers {
if Workers[uuid].Connected {
w := Workers[uuid]
rows, err := db.Query(ctx, "SELECT id, log_offset FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4 OR status = $5)", uuid, constants.TASK_STATUS_UNKNOWN, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING)
if err != nil {
slog.ErrorContext(ctx, "Error Getting Tasks for Worker", "err", err, "worker_id", uuid)
return
}
taskStatusRequests, err := pgx.CollectRows[types.TaskStatusRequest](rows, pgx.RowToStructByName[types.TaskStatusRequest])
if err != nil {
slog.ErrorContext(ctx, "Collect Task Status Rows", "err", err)
return
}
for _, taskStatusRequest := range taskStatusRequests {
tsr := taskStatusRequest
taskID := tsr.ID
wg.Add(1)
go func() {
defer wg.Done()
var ts types.TaskStatus
_, err := rpcServer.Call(ctx, w.Conn, "task-status", tsr, &ts)
if err != nil {
// Find better way to compare errors which where send via websocket
if strings.HasSuffix(err.Error(), constants.ErrTaskDoesNotExist.Error()) {
// Worker says it does not know of this task, mark it failed so that we don't asks the worker about it again and again
slog.ErrorContext(ctx, "Task is unknown by worker, Failing...", "err", err, "id", taskID)
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_FAILED, []string{fmt.Sprintf("%v MASTER: Task Failed because it is unknown to Assigned Worker", time.Now())})
if err != nil {
slog.ErrorContext(ctx, "Updating Failed Task Status", "err", err)
return
}
slog.Info("Updating task done", "id", taskID, "status", constants.TASK_STATUS_FAILED)
return
} else if errors.Is(err, constants.ErrRPCRequestTimeout) {
// We really don't know whats going on, might be slow response, oom, disk full or a bug
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Status RPC Call Timed Out!", time.Now())})
if err != nil {
slog.ErrorContext(ctx, "Updating Unknown Task Status due to Timeout", "err", err)
return
}
} else {
slog.ErrorContext(ctx, "Getting Task Status", "err", err)
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Status RPC Call Error: %v", time.Now(), err.Error())})
if err != nil {
slog.ErrorContext(ctx, "Updating Unknown Task Status", "err", err)
return
}
return
}
}
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3, log_offset = log_offset + $4 WHERE id = $1", taskID, ts.Task.Status, ts.Task.Log, len(ts.Task.Log))
if err != nil {
slog.ErrorContext(ctx, "Updating Task Status", "err", err)
return
}
if ts.Task.Status == constants.TASK_STATUS_FAILED || ts.Task.Status == constants.TASK_STATUS_SUCCESS {
if ts.Task.Type == constants.TASK_TYPE_HEALTHCHECK {
var health constants.FileHealth
if ts.Task.Status == constants.TASK_STATUS_SUCCESS {
health = constants.FILE_HEALTH_HEALTHY
// Auto Queue Transcode for Successfull Transcodes if library setting say so
go queueOnHealth(context.TODO(), ts.Task.ID)
} else {
// TODO, not all failures mean the file is damaged, only update on success and track ffmpeg errors in task result data. also remove -xerror and scan for errors manually to see all problems in logs
health = constants.FILE_HEALTH_DAMAGED
}
_, err = db.Exec(ctx, "UPDATE files SET health = $2 WHERE id = $1", ts.Task.FileID, health)
if err != nil {
slog.ErrorContext(ctx, "Updating File health", "err", err)
return
}
} else if ts.Task.Type == constants.TASK_TYPE_TRANSCODE {
var transcode constants.FileTranscode
if ts.Task.Status == constants.TASK_STATUS_SUCCESS {
transcode = constants.FILE_TRANSCODE_SUCCESS
} else {
transcode = constants.FILE_TRANSCODE_FAILED
}
_, err = db.Exec(ctx, "UPDATE files SET transcode = $2 WHERE id = $1", ts.Task.FileID, transcode)
if err != nil {
slog.ErrorContext(ctx, "Updating File transcode", "err", err)
return
}
}
// Tell Worker to Delete Finished Tasks
_, err := rpcServer.Call(ctx, w.Conn, "task-delete", taskID, nil)
if err != nil {
slog.ErrorContext(ctx, "Deleting Finished Task From Worker", "err", err)
return
}
}
}()
}
// TODO Handle tasks with status unkown assigned to this worker
// maybe requeue after 5 minutes?
} else {
// TODO wait for 5 minutes for worker to reconnect
// Set Task Status to Unknown for Unfinished Tasks which where assigned to this not connected worker, right now they just get stuck if worker does not reconnect
}
}
}()
wg.Wait()
}
func queueOnHealth(ctx context.Context, taskID int) {
var transcode_command_id *int
var file_id *int
err := db.QueryRow(ctx, `SELECT libraries.transcode_command_id, files.id
FROM tasks
INNER JOIN files ON files.id = tasks.file_id
INNER JOIN libraries ON files.library_id = libraries.id
WHERE tasks.id = $1 AND libraries.health_ok_queue_transcode = true AND libraries.transcode_command_id IS NOT NULL`, taskID).
Scan(&transcode_command_id, &file_id)
if err == pgx.ErrNoRows {
slog.DebugContext(ctx, "No need to queue Transcode", "task_id", taskID)
} else if err != nil {
slog.ErrorContext(ctx, "Unable to Query if and how we should queue a Transcode on Health Check Success", "err", err)
return
}
slog.InfoContext(ctx, "Queueing Transcode On HealthCheck Success", "health_task_id", taskID, "transcode_command_id", transcode_command_id)
_, err = db.Exec(ctx, "INSERT INTO tasks (file_id, type, status, data, ffmpeg_command_id) VALUES ($1,$2,$3,$4,$5)", file_id, constants.TASK_TYPE_TRANSCODE, constants.TASK_STATUS_QUEUED, nil, transcode_command_id)
if err != nil {
slog.ErrorContext(ctx, "Queueing Transcode On HealthCheck Success", "err", err, "health_task_id", taskID, "transcode_command_id", transcode_command_id)
}
}