452 lines
16 KiB
Go
452 lines
16 KiB
Go
package server
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"log/slog"
|
|
"net/http"
|
|
"slices"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"git.lastassault.de/speatzle/morffix/constants"
|
|
"git.lastassault.de/speatzle/morffix/types"
|
|
"github.com/jackc/pgx/v5"
|
|
)
|
|
|
|
type TasksData struct {
|
|
Libraries []Library
|
|
FfmpegCommands []FfmpegCommand
|
|
Tasks []TasksDisplay
|
|
Stats TaskStats
|
|
OrderBy string
|
|
OrderAsc bool
|
|
Limit uint
|
|
Page uint
|
|
Count uint
|
|
}
|
|
|
|
type TaskStats struct {
|
|
RunningCount int
|
|
QueuedCount int
|
|
FailedCount int
|
|
SuccessCount int
|
|
UnknownCount int
|
|
AssignedCount int
|
|
PausedCount int
|
|
WaitingCount int
|
|
TotalCount int
|
|
}
|
|
|
|
type TasksDisplay struct {
|
|
ID int `db:"id"`
|
|
Library int `db:"library"`
|
|
Worker *string `db:"worker"`
|
|
Type int `db:"type"`
|
|
FfmpegCommand *string `db:"ffmpeg_command"`
|
|
Status string `db:"status"`
|
|
File string `db:"file"`
|
|
UpdatedAt string `db:"updated_at"`
|
|
}
|
|
|
|
type TaskDisplay struct {
|
|
ID int
|
|
Library string
|
|
Worker *string
|
|
Type string
|
|
Status string
|
|
Filename string
|
|
Log []string
|
|
UpdatedAt time.Time
|
|
}
|
|
|
|
type TaskDB struct {
|
|
ID int `db:"id"`
|
|
Library int `db:"library"`
|
|
Worker *string `db:"worker"`
|
|
Type int `db:"type"`
|
|
FfmpegCommand *string `db:"ffmpeg_command"`
|
|
Status constants.TaskStatus `db:"status"`
|
|
File string `db:"file"`
|
|
UpdatedAt time.Time `db:"updated_at"`
|
|
}
|
|
|
|
var taskColumns = []string{"id", "library", "worker", "type", "ffmpeg_command", "status", "file", "updated_at"}
|
|
|
|
func handleTasks(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method == "POST" {
|
|
err := createTask(r.Context(), r)
|
|
if err != nil {
|
|
slog.ErrorContext(r.Context(), "Create Task", "err", err)
|
|
http.Error(w, "Error Create Tasks: "+err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
}
|
|
|
|
data := TasksData{}
|
|
|
|
err := db.QueryRow(r.Context(), `
|
|
SELECT COUNT(*),
|
|
COALESCE(SUM(CASE WHEN status = 0 THEN 1 ELSE 0 END),0),
|
|
COALESCE(SUM(CASE WHEN status = 1 THEN 1 ELSE 0 END),0),
|
|
COALESCE(SUM(CASE WHEN status = 2 THEN 1 ELSE 0 END),0),
|
|
COALESCE(SUM(CASE WHEN status = 3 THEN 1 ELSE 0 END),0),
|
|
COALESCE(SUM(CASE WHEN status = 4 THEN 1 ELSE 0 END),0),
|
|
COALESCE(SUM(CASE WHEN status = 5 THEN 1 ELSE 0 END),0),
|
|
COALESCE(SUM(CASE WHEN status = 6 THEN 1 ELSE 0 END),0),
|
|
COALESCE(SUM(CASE WHEN status = 7 THEN 1 ELSE 0 END),0)
|
|
FROM tasks
|
|
`).Scan(&data.Stats.TotalCount, &data.Stats.UnknownCount, &data.Stats.FailedCount, &data.Stats.SuccessCount, &data.Stats.RunningCount, &data.Stats.QueuedCount, &data.Stats.AssignedCount, &data.Stats.PausedCount, &data.Stats.WaitingCount)
|
|
if err != nil {
|
|
slog.ErrorContext(r.Context(), "Query Task Counts", "err", err)
|
|
http.Error(w, "Error Query Task Counts: "+err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
rows, err := db.Query(r.Context(), "SELECT * FROM libraries WHERE enable = $1", true)
|
|
if err != nil {
|
|
slog.ErrorContext(r.Context(), "Query Libraries", "err", err)
|
|
http.Error(w, "Error Query Libraries: "+err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
libraries, err := pgx.CollectRows[Library](rows, pgx.RowToStructByName[Library])
|
|
if err != nil {
|
|
slog.ErrorContext(r.Context(), "Collect Rows", "err", err)
|
|
http.Error(w, "Error Collect Libraries: "+err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
data.Libraries = libraries
|
|
|
|
rows, err = db.Query(r.Context(), "SELECT id, name, data FROM ffmpeg_commands")
|
|
if err != nil {
|
|
slog.ErrorContext(r.Context(), "Query Ffmpeg Commands", "err", err)
|
|
http.Error(w, "Error Ffmpeg Commands: "+err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
ffmpegCommands, err := pgx.CollectRows[FfmpegCommand](rows, pgx.RowToStructByName[FfmpegCommand])
|
|
if err != nil {
|
|
slog.ErrorContext(r.Context(), "Collect Ffmpeg Commands", "err", err)
|
|
http.Error(w, "Error Collect Ffmpeg Commands: "+err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
data.FfmpegCommands = ffmpegCommands
|
|
|
|
query := r.URL.Query()
|
|
data.OrderBy = "updated_at"
|
|
if query.Has(constants.SORT_ORDER_BY_PARAM) {
|
|
if slices.Contains(taskColumns, query.Get(constants.SORT_ORDER_BY_PARAM)) {
|
|
data.OrderBy = query.Get(constants.SORT_ORDER_BY_PARAM)
|
|
} else {
|
|
http.Error(w, "Error Unknown Column in order_by: "+query.Get(constants.SORT_ORDER_BY_PARAM), http.StatusBadRequest)
|
|
return
|
|
}
|
|
}
|
|
|
|
orderDir := "DESC"
|
|
if query.Has(constants.SORT_ORDER_ASC_PARAM) {
|
|
orderDir = "ASC"
|
|
data.OrderAsc = true
|
|
}
|
|
|
|
data.Limit = 100
|
|
if query.Has(constants.QUERY_LIMIT_PARAM) {
|
|
limit, err := strconv.Atoi(query.Get(constants.QUERY_LIMIT_PARAM))
|
|
if err != nil {
|
|
http.Error(w, "Error Parsing query limit: "+query.Get(constants.QUERY_LIMIT_PARAM), http.StatusBadRequest)
|
|
return
|
|
}
|
|
if limit < 0 {
|
|
http.Error(w, "Error query limit smaller than 0: "+query.Get(constants.QUERY_LIMIT_PARAM), http.StatusBadRequest)
|
|
return
|
|
}
|
|
data.Limit = uint(limit)
|
|
}
|
|
|
|
data.Page = 0
|
|
if query.Has(constants.QUERY_PAGE_PARAM) {
|
|
page, err := strconv.Atoi(query.Get(constants.QUERY_PAGE_PARAM))
|
|
if err != nil {
|
|
http.Error(w, "Error Parsing query page: "+query.Get(constants.QUERY_PAGE_PARAM), http.StatusBadRequest)
|
|
return
|
|
}
|
|
if page < 0 {
|
|
http.Error(w, "Error query limit smaller than 0: "+query.Get(constants.QUERY_PAGE_PARAM), http.StatusBadRequest)
|
|
return
|
|
}
|
|
data.Page = uint(page)
|
|
}
|
|
|
|
rows, err = db.Query(r.Context(), "SELECT t.id AS id, l.id AS library, w.name AS worker, t.type AS type, fc.name AS ffmpeg_command, t.status AS status, f.path AS file, t.updated_at AS updated_at FROM tasks t INNER JOIN files f ON f.id = t.file_id INNER JOIN libraries l ON l.id = f.library_id INNER JOIN ffmpeg_commands fc ON fc.id = t.ffmpeg_command_id LEFT JOIN workers w ON t.worker_id = w.id "+fmt.Sprintf("ORDER BY %v %v LIMIT %d OFFSET %d", data.OrderBy, orderDir, data.Limit, data.Page*data.Limit))
|
|
if err != nil {
|
|
slog.ErrorContext(r.Context(), "Query Tasks", "err", err)
|
|
http.Error(w, "Error Query Tasks: "+err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
tasks, err := pgx.CollectRows[TaskDB](rows, pgx.RowToStructByName[TaskDB])
|
|
if err != nil {
|
|
slog.ErrorContext(r.Context(), "Collect Tasks", "err", err)
|
|
http.Error(w, "Error Query Tasks: "+err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
for i := range tasks {
|
|
data.Tasks = append(data.Tasks, TasksDisplay{
|
|
ID: tasks[i].ID,
|
|
Library: tasks[i].Library,
|
|
Worker: tasks[i].Worker,
|
|
Type: tasks[i].Type,
|
|
FfmpegCommand: tasks[i].FfmpegCommand,
|
|
File: tasks[i].File,
|
|
Status: tasks[i].Status.String(),
|
|
UpdatedAt: tasks[i].UpdatedAt.Format(time.DateTime),
|
|
})
|
|
}
|
|
data.Count = uint(len(tasks))
|
|
|
|
buf := bytes.Buffer{}
|
|
err = templates.ExecuteTemplate(&buf, constants.TASKS_TEMPLATE_NAME, data)
|
|
if err != nil {
|
|
slog.ErrorContext(r.Context(), "Executing Task Template", "err", err)
|
|
http.Error(w, "Error Executing Template: "+err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
_, err = w.Write(buf.Bytes())
|
|
if err != nil {
|
|
slog.ErrorContext(r.Context(), "Writing http Response", "err", err)
|
|
}
|
|
}
|
|
|
|
func handleTask(w http.ResponseWriter, r *http.Request) {
|
|
id := r.PathValue("id")
|
|
if id == "" {
|
|
handleTasks(w, r)
|
|
return
|
|
}
|
|
|
|
var typ constants.TaskType
|
|
var status constants.TaskStatus
|
|
t := TaskDisplay{}
|
|
err := db.QueryRow(r.Context(), "SELECT t.id, l.name, w.name, t.type, t.status, f.path, t.log, t.updated_at FROM tasks t LEFT JOIN workers w ON w.id = t.worker_id INNER JOIN files f ON f.id = t.file_id INNER JOIN libraries l ON l.id = f.library_id WHERE t.id = $1", id).Scan(&t.ID, &t.Library, &t.Worker, &typ, &status, &t.Filename, &t.Log, &t.UpdatedAt)
|
|
if err != nil {
|
|
slog.ErrorContext(r.Context(), "Query Tasks", "err", err)
|
|
http.Error(w, "Error Query Tasks: "+err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
t.Type = typ.String()
|
|
t.Status = status.String()
|
|
buf := bytes.Buffer{}
|
|
err = templates.ExecuteTemplate(&buf, constants.TASK_TEMPLATE_NAME, t)
|
|
if err != nil {
|
|
slog.ErrorContext(r.Context(), "Executing Task Template", "err", err)
|
|
http.Error(w, "Error Executing Template: "+err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
_, err = w.Write(buf.Bytes())
|
|
if err != nil {
|
|
slog.ErrorContext(r.Context(), "Writing http Response", "err", err)
|
|
}
|
|
}
|
|
|
|
func createTask(ctx context.Context, r *http.Request) error {
|
|
err := r.ParseForm()
|
|
if err != nil {
|
|
return fmt.Errorf("Parseing Form: %w", err)
|
|
}
|
|
library := r.FormValue("library")
|
|
health := r.FormValue("health")
|
|
transcode := r.FormValue("transcode")
|
|
typ, err := strconv.Atoi(r.FormValue("type"))
|
|
if err != nil {
|
|
return fmt.Errorf("Parsing Task Type: %w", err)
|
|
}
|
|
ffmpeg_command, err := strconv.Atoi(r.FormValue("ffmpeg_command"))
|
|
if err != nil {
|
|
return fmt.Errorf("Parsing FFmpeg Command: %w", err)
|
|
}
|
|
|
|
slog.Info("Got Task Create", "library", library, "health", health, "type", typ)
|
|
|
|
rows, err := db.Query(r.Context(), "SELECT id, path, size, status, health, transcode, md5, updated_at FROM files where library_id = $1 AND status = $2 AND (-1 = $3 OR health = $3) AND (-1 = $4 OR transcode = $4)", library, constants.FILE_STATUS_EXISTS, health, transcode)
|
|
if err != nil {
|
|
return fmt.Errorf("Query Files: %w", err)
|
|
}
|
|
files, err := pgx.CollectRows[File](rows, pgx.RowToStructByName[File])
|
|
if err != nil {
|
|
return fmt.Errorf("Collect Files: %w", err)
|
|
}
|
|
|
|
tx, err := db.Begin(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("Begin Transaction: %w", err)
|
|
}
|
|
defer tx.Rollback(ctx)
|
|
|
|
var data any
|
|
/*
|
|
if typ == constants.TASK_TYPE_HEALTHCHECK {
|
|
|
|
// ffmpeg.exe -stats -v error -i "in.mp4" -f null -max_muxing_queue_size 9999 "out.mp4"
|
|
data = types.HealthCheckData{Command: types.FFmpegCommand{
|
|
Args: []types.Arg{{Flag: "-stats"}, {Flag: "-v", Value: "error"}, {Flag: "-xerror"}},
|
|
InputFiles: []types.File{{Path: "input.mkv"}},
|
|
OutputFiles: []types.File{{Path: "output.mkv", Arguments: []types.Arg{{Flag: "-f", Value: "null"}, {Flag: "-max_muxing_queue_size", Value: "9999"}}}},
|
|
}}
|
|
} else if typ == constants.TASK_TYPE_TRANSCODE {
|
|
data = types.TranscodeData{Command: types.FFmpegCommand{
|
|
Args: []types.Arg{{Flag: "-stats"}, {Flag: "-v", Value: "error"}, {Flag: "-xerror"}},
|
|
InputFiles: []types.File{{Path: "input.mkv"}},
|
|
OutputFiles: []types.File{{Path: "output.mkv", Arguments: []types.Arg{{Flag: "-map", Value: "0"}, {Flag: "-c", Value: "copy"}, {Flag: "-c:a", Value: "aac"}, {Flag: "-c:v", Value: "libsvtav1"}, {Flag: "-crf", Value: "35"}, {Flag: "-preset", Value: "8"}, {Flag: "-max_muxing_queue_size", Value: "9999"}}}},
|
|
}}
|
|
} else {
|
|
return fmt.Errorf("Unkown Task Type: %v", typ)
|
|
}
|
|
*/
|
|
|
|
for _, file := range files {
|
|
_, err = db.Exec(r.Context(), "INSERT INTO tasks (file_id, type, status, data, ffmpeg_command_id) VALUES ($1,$2,$3,$4,$5)", file.ID, typ, constants.TASK_STATUS_QUEUED, data, ffmpeg_command)
|
|
if err != nil {
|
|
return fmt.Errorf("Inserting Task: %w", err)
|
|
}
|
|
}
|
|
|
|
err = tx.Commit(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("Committing Transcation: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
type QueuedTask struct {
|
|
ID int
|
|
Type constants.TaskType
|
|
FileID int `json:"file_id"`
|
|
FileMD5 []byte `json:"file_md5" db:"md5"`
|
|
Data json.RawMessage
|
|
FfmpegCommand types.FFmpegCommand `json:"ffmpeg_command" db:"ffmpeg_command"`
|
|
}
|
|
|
|
var assignRunning = false
|
|
|
|
func assignQueuedTasks(ctx context.Context) error {
|
|
if assignRunning {
|
|
return nil
|
|
}
|
|
|
|
assignRunning = true
|
|
defer func() {
|
|
assignRunning = false
|
|
}()
|
|
|
|
rows, err := db.Query(ctx, "SELECT t.id as id, t.type as type, t.file_id as file_id, f.md5 as md5, t.data as data, fc.data as ffmpeg_command FROM tasks t INNER JOIN files f ON f.id = t.file_id INNER JOIN ffmpeg_commands fc ON fc.id = t.ffmpeg_command_id WHERE t.status = $1 AND f.status = $2", constants.TASK_STATUS_QUEUED, constants.FILE_STATUS_EXISTS)
|
|
if err != nil {
|
|
return fmt.Errorf("Query Queued Tasks: %w", err)
|
|
}
|
|
queuedTasks, err := pgx.CollectRows[QueuedTask](rows, pgx.RowToStructByName[QueuedTask])
|
|
if err != nil {
|
|
return fmt.Errorf("Collect Queued Tasks: %w", err)
|
|
}
|
|
|
|
//slog.Info("Assigning Queued Tasks", "count", len(queuedTasks))
|
|
|
|
if len(queuedTasks) == 0 {
|
|
return nil
|
|
}
|
|
|
|
WorkersMutex.Lock()
|
|
defer WorkersMutex.Unlock()
|
|
|
|
lastAssigned := 0
|
|
|
|
for i := range Workers {
|
|
if lastAssigned == len(queuedTasks) {
|
|
slog.Info("All Tasks assigned")
|
|
// All Tasks have been Assigned
|
|
return nil
|
|
}
|
|
if Workers[i].Connected {
|
|
var queueEnable bool
|
|
err := db.QueryRow(ctx, "SELECT queue_enable FROM workers WHERE id = $1", i).Scan(&queueEnable)
|
|
if err != nil {
|
|
return fmt.Errorf("Error Querying Worker Queue Enable: %w", err)
|
|
}
|
|
if !queueEnable {
|
|
slog.DebugContext(ctx, "Skipping Worker since Queueing is disabled", "worker_id", i)
|
|
continue
|
|
}
|
|
|
|
var count int
|
|
err = db.QueryRow(ctx, "SELECT COUNT(*) FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4 OR status = $5)", i, constants.TASK_STATUS_UNKNOWN, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING).Scan(&count)
|
|
if err != nil {
|
|
return fmt.Errorf("Error Querying Worker Task Count: %w", err)
|
|
}
|
|
|
|
slog.Debug("Assigning Queued Tasks Worker", "worker", i, "count", count)
|
|
|
|
// Allow for Multiple Tasks at once in the future
|
|
if count < 1 {
|
|
tx, err := db.Begin(ctx)
|
|
defer tx.Rollback(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("Starting Transaction: %w", err)
|
|
}
|
|
_, err = tx.Exec(ctx, "UPDATE tasks SET status = $2, worker_id = $3 WHERE id = $1", queuedTasks[lastAssigned].ID, constants.TASK_STATUS_ASSIGNED, i)
|
|
if err != nil {
|
|
return fmt.Errorf("Setting tasks Assignment: %w", err)
|
|
}
|
|
|
|
taskStart := types.TaskStart{
|
|
ID: queuedTasks[lastAssigned].ID,
|
|
Type: queuedTasks[lastAssigned].Type,
|
|
FileID: queuedTasks[lastAssigned].FileID,
|
|
FileMD5: queuedTasks[lastAssigned].FileMD5,
|
|
Data: queuedTasks[lastAssigned].Data,
|
|
FfmpegCommand: queuedTasks[lastAssigned].FfmpegCommand,
|
|
}
|
|
|
|
_, err = rpcServer.Call(ctx, Workers[i].Conn, "task-start", taskStart, nil)
|
|
if err != nil {
|
|
if strings.HasSuffix(err.Error(), constants.ErrTaskIsAlreadyRunning.Error()) {
|
|
// Task was started previously but something went wrong and we are out of sync
|
|
slog.WarnContext(ctx, "Task is apparently already Running on this Worker, thats bad", "task_id", taskStart.ID, "worker", Workers[i].Name)
|
|
|
|
_, err = tx.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_ASSIGNED, []string{fmt.Sprintf("%v MASTER: Task Start, Task Already Running!", time.Now())})
|
|
if err != nil {
|
|
return fmt.Errorf("Updating Task status during already running error: %w", err)
|
|
}
|
|
} else if errors.Is(err, constants.ErrRPCRequestTimeout) {
|
|
// We really don't know whats going on, might be slow response, oom, disk full or a bug
|
|
slog.WarnContext(ctx, "Task start Timed Out", "task_id", taskStart.ID, "worker", Workers[i].Name)
|
|
|
|
_, err = tx.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Task Start RPC Call Timed Out!", time.Now())})
|
|
if err != nil {
|
|
return fmt.Errorf("Updating Unknown Task Status due to Timeout while starting Task: %w", err)
|
|
}
|
|
} else {
|
|
slog.ErrorContext(ctx, "Task start Error", "task_id", taskStart.ID, "worker", Workers[i].Name)
|
|
|
|
_, err = tx.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Task Start Error: %v", time.Now(), err.Error())})
|
|
if err != nil {
|
|
return fmt.Errorf("Updating Unknown Task Status due to Error while starting Task: %w", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
err = tx.Commit(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("Error Committing Transaction: %w", err)
|
|
}
|
|
|
|
slog.InfoContext(ctx, "Assigned Task to Worker", "task", queuedTasks[lastAssigned].ID, "worker", i)
|
|
lastAssigned++
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|