package server import ( "bytes" "context" "encoding/json" "fmt" "log/slog" "net/http" "git.lastassault.de/speatzle/morffix/constants" "git.lastassault.de/speatzle/morffix/types" "github.com/jackc/pgx/v5" ) type TasksData struct { Libraries []Library Tasks []TaskDisplay } type TaskDisplay struct { ID int `db:"id"` Library int `db:"library"` Worker *string `db:"worker"` Type int `db:"type"` Status string `db:"status"` File string `db:"file"` } type TaskDB struct { ID int `db:"id"` Library int `db:"library"` Worker *string `db:"worker"` Type int `db:"type"` Status constants.TaskStatus `db:"status"` File string `db:"file"` } func handleTasks(w http.ResponseWriter, r *http.Request) { if r.Method == "POST" { err := createTask(r.Context(), r) if err != nil { slog.ErrorContext(r.Context(), "Create Task", "err", err) http.Error(w, "Error Create Library: "+err.Error(), http.StatusInternalServerError) return } } data := TasksData{} rows, err := db.Query(r.Context(), "SELECT id, name, path, enable FROM libraries WHERE enable = $1", true) if err != nil { slog.ErrorContext(r.Context(), "Query Libraries", "err", err) http.Error(w, "Error Query Libraries: "+err.Error(), http.StatusInternalServerError) return } libraries, err := pgx.CollectRows[Library](rows, pgx.RowToStructByName[Library]) if err != nil { slog.ErrorContext(r.Context(), "Collect Rows", "err", err) http.Error(w, "Error Query Libraries: "+err.Error(), http.StatusInternalServerError) return } data.Libraries = libraries rows, err = db.Query(r.Context(), "SELECT t.id AS id, l.id AS library, t.worker_id AS worker, t.type AS type, t.status AS status, f.path AS file FROM tasks t INNER JOIN files f ON f.id = t.file_id INNER JOIN libraries l ON l.id = f.library_id ORDER BY CASE t.type WHEN 3 THEN -1 ELSE t.type END, t.id") if err != nil { slog.ErrorContext(r.Context(), "Query Tasks", "err", err) http.Error(w, "Error Query Tasks: "+err.Error(), http.StatusInternalServerError) return } tasks, err := pgx.CollectRows[TaskDB](rows, pgx.RowToStructByName[TaskDB]) if err != nil { slog.ErrorContext(r.Context(), "Collect Tasks", "err", err) http.Error(w, "Error Query Tasks: "+err.Error(), http.StatusInternalServerError) return } for i := range tasks { data.Tasks = append(data.Tasks, TaskDisplay{ ID: tasks[i].ID, Library: tasks[i].Library, Worker: tasks[i].Worker, Type: tasks[i].Type, File: tasks[i].File, Status: tasks[i].Status.String(), }) } buf := bytes.Buffer{} err = templates.ExecuteTemplate(&buf, constants.TASKS_TEMPLATE_NAME, data) if err != nil { slog.ErrorContext(r.Context(), "Executing Task Template", "err", err) http.Error(w, "Error Executing Template: "+err.Error(), http.StatusInternalServerError) return } _, err = w.Write(buf.Bytes()) if err != nil { slog.ErrorContext(r.Context(), "Writing http Response", "err", err) } } func handleTask(w http.ResponseWriter, r *http.Request) { id := r.PathValue("id") if id == "" { handleTasks(w, r) return } var log []string err := db.QueryRow(r.Context(), "SELECT log FROM tasks WHERE id = $1", id).Scan(&log) if err != nil { slog.ErrorContext(r.Context(), "Query Tasks", "err", err) http.Error(w, "Error Query Tasks: "+err.Error(), http.StatusInternalServerError) return } t := types.Task{ Log: log, } buf := bytes.Buffer{} err = templates.ExecuteTemplate(&buf, constants.TASK_TEMPLATE_NAME, t) if err != nil { slog.ErrorContext(r.Context(), "Executing Task Template", "err", err) http.Error(w, "Error Executing Template: "+err.Error(), http.StatusInternalServerError) return } _, err = w.Write(buf.Bytes()) if err != nil { slog.ErrorContext(r.Context(), "Writing http Response", "err", err) } } func createTask(ctx context.Context, r *http.Request) error { err := r.ParseForm() if err != nil { return fmt.Errorf("Parseing Form: %w", err) } library := r.FormValue("library") typ := r.FormValue("type") slog.Info("Got Task Create", "library", library, "type", typ) rows, err := db.Query(r.Context(), "SELECT id, path, size, status, health, md5 FROM files where library_id = $1 AND status = $2", library, constants.FILE_STATUS_EXISTS) if err != nil { return fmt.Errorf("Query Files: %w", err) } files, err := pgx.CollectRows[File](rows, pgx.RowToStructByName[File]) if err != nil { return fmt.Errorf("Collect Files: %w", err) } tx, err := db.Begin(ctx) if err != nil { return fmt.Errorf("Begin Transaction: %w", err) } defer tx.Rollback(ctx) var data any if true { // TODO typ == constants.TASK_TYPE_HEALTHCHECK { // ffmpeg.exe -stats -v error -i "in.mp4" -f null -max_muxing_queue_size 9999 "out.mp4" data = types.HealthCheckData{Command: types.FFmpegCommand{ Args: []types.Arg{{Flag: "-stats"}, {Flag: "-v", Value: "error"}, {Flag: "-xerror"}}, InputFiles: []types.File{{Path: "input.mkv"}}, OutputFiles: []types.File{{Path: "output.mkv", Arguments: []types.Arg{{Flag: "-f", Value: "null"}, {Flag: "-max_muxing_queue_size", Value: "9999"}}}}, }} } for _, file := range files { _, err = db.Exec(r.Context(), "INSERT INTO tasks (file_id, type, status, data) VALUES ($1,$2,$3,$4)", file.ID, typ, constants.TASK_STATUS_QUEUED, data) if err != nil { return fmt.Errorf("Inserting Task: %w", err) } } err = tx.Commit(ctx) if err != nil { return fmt.Errorf("Committing Transcation: %w", err) } return nil } type QueuedTask struct { ID int Type int FileID int `json:"file_id"` FileMD5 []byte `json:"file_md5" db:"md5"` Data json.RawMessage } func assignQueuedTasks(ctx context.Context) error { rows, err := db.Query(ctx, "SELECT t.id as id, t.type as type, t.file_id as file_id, f.md5 as md5, t.data as data FROM tasks t INNER JOIN files f ON f.id = t.file_id WHERE t.status = $1", constants.TASK_STATUS_QUEUED) if err != nil { return fmt.Errorf("Query Queued Tasks: %w", err) } queuedTasks, err := pgx.CollectRows[QueuedTask](rows, pgx.RowToStructByName[QueuedTask]) if err != nil { return fmt.Errorf("Collect Queued Tasks: %w", err) } //slog.Info("Assigning Queued Tasks", "count", len(queuedTasks)) if len(queuedTasks) == 0 { return nil } WorkersMutex.Lock() defer WorkersMutex.Unlock() lastAssigned := 0 for i := range Workers { if lastAssigned == len(queuedTasks) { slog.Info("All Tasks assigned") // All Tasks have been Assigned return nil } if Workers[i].Connected { var count int err := db.QueryRow(ctx, "SELECT COUNT(*) FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4)", i, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING).Scan(&count) if err != nil { return fmt.Errorf("Error Querying Worker Task Count: %w", err) } slog.Info("Assigning Queued Tasks Worker", "worker", i, "count", count) // Allow for Multiple Tasks at once in the future if count < 1 { tx, err := db.Begin(ctx) defer tx.Rollback(ctx) if err != nil { return fmt.Errorf("Starting Transaction: %w", err) } _, err = tx.Exec(ctx, "UPDATE tasks SET status = $2, worker_id = $3 WHERE id = $1", queuedTasks[lastAssigned].ID, constants.TASK_STATUS_ASSIGNED, i) if err != nil { return fmt.Errorf("Setting tasks Assignment: %w", err) } taskStart := types.TaskStart{ ID: queuedTasks[lastAssigned].ID, Type: queuedTasks[lastAssigned].Type, FileID: queuedTasks[lastAssigned].FileID, FileMD5: queuedTasks[lastAssigned].FileMD5, Data: queuedTasks[lastAssigned].Data, } _, err = rpcServer.Call(ctx, Workers[i].Conn, "task-start", taskStart, nil) if err != nil { return fmt.Errorf("Error Starting Task: %w", err) } err = tx.Commit(ctx) if err != nil { return fmt.Errorf("Error Committing Transaction: %w", err) } slog.InfoContext(ctx, "Assigned Task to Worker", "task", queuedTasks[lastAssigned].ID, "worker", i) lastAssigned++ } } } return nil }