320 lines
11 KiB
Go
320 lines
11 KiB
Go
package server
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"log/slog"
|
|
"net/http"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"git.lastassault.de/speatzle/morffix/rpc"
|
|
"git.lastassault.de/speatzle/morffix/types"
|
|
"github.com/jackc/pgx/v5"
|
|
|
|
"git.lastassault.de/speatzle/morffix/constants"
|
|
|
|
"nhooyr.io/websocket"
|
|
)
|
|
|
|
type Worker struct {
|
|
Name string
|
|
Address string
|
|
Conn *websocket.Conn
|
|
Connected bool
|
|
ConnectionChanged time.Time
|
|
}
|
|
|
|
var Workers = map[string]*Worker{}
|
|
var WorkersMutex sync.Mutex
|
|
|
|
var rpcServer = rpc.NewServer()
|
|
|
|
func handleWorkerWebsocket(w http.ResponseWriter, r *http.Request) {
|
|
if r.Header.Get(constants.SHARED_SECRET_HEADER) != conf.SharedSecret {
|
|
w.WriteHeader(http.StatusUnauthorized)
|
|
return
|
|
}
|
|
if r.Header.Get(constants.WORKER_VERSION_HEADER) != constants.WORKER_VERSION {
|
|
w.WriteHeader(http.StatusNotImplemented)
|
|
return
|
|
}
|
|
// Connection ID
|
|
uuid := r.Header.Get(constants.UUID_HEADER)
|
|
if uuid == "" {
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
return
|
|
}
|
|
name := r.Header.Get(constants.NAME_HEADER)
|
|
|
|
_, err := db.Exec(r.Context(), "INSERT INTO workers (id, name) VALUES ($1, $2) ON CONFLICT (id) DO UPDATE SET name = $2", uuid, name)
|
|
if err != nil {
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
slog.ErrorContext(r.Context(), "Error Upserting Worker", "err", err)
|
|
return
|
|
}
|
|
|
|
c, err := websocket.Accept(w, r, &websocket.AcceptOptions{
|
|
InsecureSkipVerify: true,
|
|
})
|
|
if err != nil {
|
|
slog.ErrorContext(r.Context(), "Error Accepting Worker Websocket Connection", "err", err)
|
|
return
|
|
}
|
|
defer c.CloseNow()
|
|
|
|
// Increase Read limit to 10 MB Since the task status might be very long due to ffmpeg log spam
|
|
c.SetReadLimit(10000000)
|
|
|
|
// Track Connection
|
|
func() {
|
|
WorkersMutex.Lock()
|
|
defer WorkersMutex.Unlock()
|
|
|
|
w, ok := Workers[uuid]
|
|
if ok && w.Connected {
|
|
slog.WarnContext(r.Context(), "Worker Reconnected before the old connection died, killing...")
|
|
w.Conn.CloseNow()
|
|
|
|
// Since we Closed the Connection, we need to wait for the old connections deferred cleanup to finish before Writing down the new connection or we will be overwritten.
|
|
for {
|
|
WorkersMutex.Unlock()
|
|
time.Sleep(time.Millisecond * 10)
|
|
WorkersMutex.Lock()
|
|
if !Workers[uuid].Connected {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
Workers[uuid] = &Worker{
|
|
Name: name,
|
|
Address: r.RemoteAddr,
|
|
Conn: c,
|
|
Connected: true,
|
|
ConnectionChanged: time.Now(),
|
|
}
|
|
|
|
// TODO call function for init worker connect or call function to handle Worker Reconnection (get task status, continue processing)
|
|
|
|
}()
|
|
|
|
// Set Status on Disconnect
|
|
defer func() {
|
|
WorkersMutex.Lock()
|
|
defer WorkersMutex.Unlock()
|
|
Workers[uuid].Connected = false
|
|
Workers[uuid].ConnectionChanged = time.Now()
|
|
}()
|
|
|
|
slog.InfoContext(r.Context(), "Worker Websocket Connection", "RemoteAddress", r.RemoteAddr)
|
|
|
|
for {
|
|
err = readMessage(r.Context(), c)
|
|
if websocket.CloseStatus(err) == websocket.StatusNormalClosure || websocket.CloseStatus(err) == websocket.StatusAbnormalClosure || websocket.CloseStatus(err) == websocket.StatusGoingAway {
|
|
slog.InfoContext(r.Context(), "Websocket Closed")
|
|
return
|
|
}
|
|
if err != nil {
|
|
slog.ErrorContext(r.Context(), "Error Reading Websocket Message", "err", err)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func readMessage(ctx context.Context, c *websocket.Conn) error {
|
|
t, data, err := c.Read(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
slog.DebugContext(ctx, "Got Websocket Message", "type", t.String(), "data", data)
|
|
rpcServer.HandleMessage(ctx, c, data)
|
|
return nil
|
|
}
|
|
|
|
func manageWorkers(stop chan bool) {
|
|
deadTicker := time.NewTicker(time.Second)
|
|
statusTicker := time.NewTicker(time.Second)
|
|
assignTicker := time.NewTicker(time.Second)
|
|
|
|
for {
|
|
select {
|
|
case <-deadTicker.C:
|
|
killDeadWorkers()
|
|
case <-statusTicker.C:
|
|
updateWorkerTaskStatus(context.TODO())
|
|
case <-assignTicker.C:
|
|
err := assignQueuedTasks(context.TODO())
|
|
if err != nil {
|
|
slog.Error("Assigning Queued Tasks", "err", err)
|
|
}
|
|
case <-stop:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func killDeadWorkers() {
|
|
WorkersMutex.Lock()
|
|
defer WorkersMutex.Unlock()
|
|
|
|
for uuid, w := range Workers {
|
|
// TODO Not dead Workers are sometimes also killed?
|
|
if !w.Connected && w.ConnectionChanged.Add(time.Minute*5).Before(time.Now()) {
|
|
slog.Warn("Removing Dead Worker", "uuid", uuid, "name", w.Name)
|
|
delete(Workers, uuid)
|
|
// TODO Free any Jobs that the Worker had
|
|
}
|
|
}
|
|
}
|
|
|
|
func updateWorkerTaskStatus(ctx context.Context) {
|
|
var wg sync.WaitGroup
|
|
|
|
// TODO figure out why this locks up on worker status error
|
|
func() {
|
|
WorkersMutex.Lock()
|
|
defer WorkersMutex.Unlock()
|
|
|
|
for uuid := range Workers {
|
|
if Workers[uuid].Connected {
|
|
w := Workers[uuid]
|
|
|
|
rows, err := db.Query(ctx, "SELECT id, log_offset FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4 OR status = $5)", uuid, constants.TASK_STATUS_UNKNOWN, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING)
|
|
if err != nil {
|
|
slog.ErrorContext(ctx, "Error Getting Tasks for Worker", "err", err, "worker_id", uuid)
|
|
return
|
|
}
|
|
|
|
taskStatusRequests, err := pgx.CollectRows[types.TaskStatusRequest](rows, pgx.RowToStructByName[types.TaskStatusRequest])
|
|
if err != nil {
|
|
slog.ErrorContext(ctx, "Collect Task Status Rows", "err", err)
|
|
return
|
|
}
|
|
|
|
for _, taskStatusRequest := range taskStatusRequests {
|
|
tsr := taskStatusRequest
|
|
taskID := tsr.ID
|
|
wg.Add(1)
|
|
|
|
go func() {
|
|
defer wg.Done()
|
|
var ts types.TaskStatus
|
|
_, err := rpcServer.Call(ctx, w.Conn, "task-status", tsr, &ts)
|
|
|
|
if err != nil {
|
|
// Find better way to compare errors which where send via websocket
|
|
if strings.HasSuffix(err.Error(), constants.ErrTaskDoesNotExist.Error()) {
|
|
// Worker says it does not know of this task, mark it failed so that we don't asks the worker about it again and again
|
|
slog.ErrorContext(ctx, "Task is unknown by worker, Failing...", "err", err, "id", taskID)
|
|
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_FAILED, []string{fmt.Sprintf("%v MASTER: Task Failed because it is unknown to Assigned Worker", time.Now())})
|
|
if err != nil {
|
|
slog.ErrorContext(ctx, "Updating Failed Task Status", "err", err)
|
|
return
|
|
}
|
|
slog.Info("Updating task done", "id", taskID, "status", constants.TASK_STATUS_FAILED)
|
|
|
|
return
|
|
} else if errors.Is(err, constants.ErrRPCRequestTimeout) {
|
|
// We really don't know whats going on, might be slow response, oom, disk full or a bug
|
|
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Status RPC Call Timed Out!", time.Now())})
|
|
if err != nil {
|
|
slog.ErrorContext(ctx, "Updating Unknown Task Status due to Timeout", "err", err)
|
|
return
|
|
}
|
|
} else {
|
|
slog.ErrorContext(ctx, "Getting Task Status", "err", err)
|
|
|
|
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Status RPC Call Error: %v", time.Now(), err.Error())})
|
|
if err != nil {
|
|
slog.ErrorContext(ctx, "Updating Unknown Task Status", "err", err)
|
|
return
|
|
}
|
|
return
|
|
}
|
|
}
|
|
|
|
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3, log_offset = log_offset + $4 WHERE id = $1", taskID, ts.Task.Status, ts.Task.Log, len(ts.Task.Log))
|
|
if err != nil {
|
|
slog.ErrorContext(ctx, "Updating Task Status", "err", err)
|
|
return
|
|
}
|
|
|
|
if ts.Task.Status == constants.TASK_STATUS_FAILED || ts.Task.Status == constants.TASK_STATUS_SUCCESS {
|
|
if ts.Task.Type == constants.TASK_TYPE_HEALTHCHECK {
|
|
var health constants.FileHealth
|
|
if ts.Task.Status == constants.TASK_STATUS_SUCCESS {
|
|
health = constants.FILE_HEALTH_HEALTHY
|
|
// Auto Queue Transcode for Successfull Transcodes if library setting say so
|
|
go queueOnHealth(context.TODO(), ts.Task.ID)
|
|
} else {
|
|
// TODO, not all failures mean the file is damaged, only update on success and track ffmpeg errors in task result data. also remove -xerror and scan for errors manually to see all problems in logs
|
|
health = constants.FILE_HEALTH_DAMAGED
|
|
}
|
|
_, err = db.Exec(ctx, "UPDATE files SET health = $2 WHERE id = $1", ts.Task.FileID, health)
|
|
if err != nil {
|
|
slog.ErrorContext(ctx, "Updating File health", "err", err)
|
|
return
|
|
}
|
|
|
|
} else if ts.Task.Type == constants.TASK_TYPE_TRANSCODE {
|
|
var transcode constants.FileTranscode
|
|
if ts.Task.Status == constants.TASK_STATUS_SUCCESS {
|
|
transcode = constants.FILE_TRANSCODE_SUCCESS
|
|
} else {
|
|
transcode = constants.FILE_TRANSCODE_FAILED
|
|
}
|
|
_, err = db.Exec(ctx, "UPDATE files SET transcode = $2 WHERE id = $1", ts.Task.FileID, transcode)
|
|
if err != nil {
|
|
slog.ErrorContext(ctx, "Updating File transcode", "err", err)
|
|
return
|
|
}
|
|
}
|
|
// Tell Worker to Delete Finished Tasks
|
|
_, err := rpcServer.Call(ctx, w.Conn, "task-delete", taskID, nil)
|
|
if err != nil {
|
|
slog.ErrorContext(ctx, "Deleting Finished Task From Worker", "err", err)
|
|
return
|
|
}
|
|
}
|
|
|
|
}()
|
|
}
|
|
// TODO Handle tasks with status unkown assigned to this worker
|
|
// maybe requeue after 5 minutes?
|
|
|
|
} else {
|
|
// TODO wait for 5 minutes for worker to reconnect
|
|
// Set Task Status to Unknown for Unfinished Tasks which where assigned to this not connected worker, right now they just get stuck if worker does not reconnect
|
|
|
|
}
|
|
}
|
|
}()
|
|
|
|
wg.Wait()
|
|
}
|
|
|
|
func queueOnHealth(ctx context.Context, taskID int) {
|
|
var transcode_command_id *int
|
|
var file_id *int
|
|
err := db.QueryRow(ctx, `SELECT libraries.transcode_command_id, file.id
|
|
FROM tasks
|
|
INNER JOIN files ON files.id = tasks.file_id
|
|
INNER JOIN libraries ON files.library_id = libraries.id
|
|
WHERE tasks.id = $1 AND libraries.health_ok_queue_transcode = true AND libraries.transcode_command_id IS NOT NULL`, taskID).
|
|
Scan(&transcode_command_id, &file_id)
|
|
if err == pgx.ErrNoRows {
|
|
slog.DebugContext(ctx, "No need to queue Transcode", "task_id", taskID)
|
|
} else if err != nil {
|
|
slog.ErrorContext(ctx, "Unable to Query if and how she should queue a Transcode on Health Check Success", "err", err)
|
|
return
|
|
}
|
|
|
|
slog.InfoContext(ctx, "Queueing Transcode On HealthCheck Success", "health_task_id", taskID, "transcode_command_id", transcode_command_id)
|
|
_, err = db.Exec(ctx, "INSERT INTO tasks (file_id, type, status, data, ffmpeg_command_id) VALUES ($1,$2,$3,$4,$5)", file_id, constants.TASK_TYPE_TRANSCODE, constants.TASK_STATUS_QUEUED, nil, transcode_command_id)
|
|
if err != nil {
|
|
slog.ErrorContext(ctx, "Queueing Transcode On HealthCheck Success", "err", err, "health_task_id", taskID, "transcode_command_id", transcode_command_id)
|
|
}
|
|
}
|