diff --git a/server/index.go b/server/index.go index ce0d850..f9a7907 100644 --- a/server/index.go +++ b/server/index.go @@ -14,7 +14,7 @@ type IndexData struct { Counter []int Workers []IndexWorker - Tasks []Task + Tasks []TaskDisplay } type IndexWorker struct { @@ -22,7 +22,7 @@ type IndexWorker struct { Status *types.WorkerStatus } -type Task struct { +type TaskDisplay struct { ID int `db:"id"` Library int `db:"library"` Worker *string `db:"worker"` @@ -75,7 +75,7 @@ func handleIndex(w http.ResponseWriter, r *http.Request) { http.Error(w, "Error Query Tasks: "+err.Error(), http.StatusInternalServerError) return } - tasks, err := pgx.CollectRows[Task](rows, pgx.RowToStructByName[Task]) + tasks, err := pgx.CollectRows[TaskDisplay](rows, pgx.RowToStructByName[TaskDisplay]) if err != nil { slog.ErrorContext(r.Context(), "Executing index Template", "err", err) http.Error(w, "Error Query Tasks: "+err.Error(), http.StatusInternalServerError) diff --git a/server/server.go b/server/server.go index 9119dbd..0a5985d 100644 --- a/server/server.go +++ b/server/server.go @@ -74,7 +74,8 @@ func Start(_conf config.Config, tmplFS embed.FS, staticFS embed.FS, migrationsFS mux := http.NewServeMux() mux.HandleFunc("/worker", handleWorkerWebsocket) mux.Handle("/static/", fs) - mux.HandleFunc("/tasks", handleTask) + mux.HandleFunc("/tasks", handleTasks) + mux.HandleFunc("/tasks/{id}", handleTask) mux.HandleFunc("/scan/{id}", handleScan) mux.HandleFunc("/libraries/{id}", handleLibrary) mux.HandleFunc("/libraries", handleLibraries) @@ -102,7 +103,7 @@ func Start(_conf config.Config, tmplFS embed.FS, staticFS embed.FS, migrationsFS }() stopCleanup := make(chan bool, 1) - go cleanupDeadWorkers(stopCleanup) + go manageWorkers(stopCleanup) sigs := make(chan os.Signal, 1) signal.Notify(sigs, os.Interrupt) diff --git a/server/task.go b/server/task.go index a3f8000..682e7fd 100644 --- a/server/task.go +++ b/server/task.go @@ -3,20 +3,22 @@ package server import ( "bytes" "context" + "encoding/json" "fmt" "log/slog" "net/http" "git.lastassault.de/speatzle/morffix/constants" + "git.lastassault.de/speatzle/morffix/types" "github.com/jackc/pgx/v5" ) -type TaskData struct { +type TasksData struct { Libraries []Library - Tasks []Task + Tasks []TaskDisplay } -func handleTask(w http.ResponseWriter, r *http.Request) { +func handleTasks(w http.ResponseWriter, r *http.Request) { if r.Method == "POST" { err := createTask(r.Context(), r) if err != nil { @@ -26,7 +28,7 @@ func handleTask(w http.ResponseWriter, r *http.Request) { } } - data := TaskData{} + data := TasksData{} rows, err := db.Query(r.Context(), "SELECT id, name, path, enable FROM libraries WHERE enable = $1", true) if err != nil { @@ -48,7 +50,7 @@ func handleTask(w http.ResponseWriter, r *http.Request) { http.Error(w, "Error Query Tasks: "+err.Error(), http.StatusInternalServerError) return } - tasks, err := pgx.CollectRows[Task](rows, pgx.RowToStructByName[Task]) + tasks, err := pgx.CollectRows[TaskDisplay](rows, pgx.RowToStructByName[TaskDisplay]) if err != nil { slog.ErrorContext(r.Context(), "Collect Tasks", "err", err) http.Error(w, "Error Query Tasks: "+err.Error(), http.StatusInternalServerError) @@ -57,7 +59,40 @@ func handleTask(w http.ResponseWriter, r *http.Request) { data.Tasks = tasks buf := bytes.Buffer{} - err = templates.ExecuteTemplate(&buf, constants.TASK_TEMPLATE_NAME, data) + err = templates.ExecuteTemplate(&buf, constants.TASKS_TEMPLATE_NAME, data) + if err != nil { + slog.ErrorContext(r.Context(), "Executing Task Template", "err", err) + http.Error(w, "Error Executing Template: "+err.Error(), http.StatusInternalServerError) + return + } + + _, err = w.Write(buf.Bytes()) + if err != nil { + slog.ErrorContext(r.Context(), "Writing http Response", "err", err) + } +} + +func handleTask(w http.ResponseWriter, r *http.Request) { + id := r.PathValue("id") + if id == "" { + handleTasks(w, r) + return + } + + var log []string + err := db.QueryRow(r.Context(), "SELECT log FROM tasks WHERE id = $1", id).Scan(&log) + if err != nil { + slog.ErrorContext(r.Context(), "Query Tasks", "err", err) + http.Error(w, "Error Query Tasks: "+err.Error(), http.StatusInternalServerError) + return + } + + t := types.Task{ + Log: log, + } + + buf := bytes.Buffer{} + err = templates.ExecuteTemplate(&buf, constants.TASK_TEMPLATE_NAME, t) if err != nil { slog.ErrorContext(r.Context(), "Executing Task Template", "err", err) http.Error(w, "Error Executing Template: "+err.Error(), http.StatusInternalServerError) @@ -94,8 +129,19 @@ func createTask(ctx context.Context, r *http.Request) error { } defer tx.Rollback(ctx) + var data any + if true { // TODO typ == constants.TASK_TYPE_HEALTHCHECK { + + // ffmpeg.exe -stats -v error -i "in.mp4" -f null -max_muxing_queue_size 9999 "out.mp4" + data = types.FFmpegCommand{ + Args: []types.Arg{{Flag: "-stats"}, {Flag: "-v", Value: "error"}}, + InputFiles: []types.File{{Path: "input.mkv"}}, + OutputFiles: []types.File{{Path: "output.mkv", Arguments: []types.Arg{{Flag: "-f", Value: "null"}, {Flag: "-max_muxing_queue_size", Value: "9999"}}}}, + } + } + for _, file := range files { - _, err = db.Exec(r.Context(), "INSERT INTO tasks (file_id, type, status) VALUES ($1,$2,$3)", file.ID, typ, 0) + _, err = db.Exec(r.Context(), "INSERT INTO tasks (file_id, type, status, data) VALUES ($1,$2,$3,$4)", file.ID, typ, constants.TASK_STATUS_QUEUED, data) if err != nil { return fmt.Errorf("Inserting Task: %w", err) } @@ -108,3 +154,78 @@ func createTask(ctx context.Context, r *http.Request) error { return nil } + +type QueuedTask struct { + ID int + Type int + FileID int `json:"file_id"` + Data json.RawMessage +} + +func assignQueuedTasks(ctx context.Context) error { + rows, err := db.Query(ctx, "SELECT id, type, file_id, data FROM tasks WHERE status = $1", constants.TASK_STATUS_QUEUED) + if err != nil { + return fmt.Errorf("Query Queued Tasks: %w", err) + } + queuedTasks, err := pgx.CollectRows[QueuedTask](rows, pgx.RowToStructByName[QueuedTask]) + if err != nil { + return fmt.Errorf("Collect Queued Tasks: %w", err) + } + + if len(queuedTasks) == 0 { + return nil + } + + WorkersMutex.Lock() + defer WorkersMutex.Unlock() + + lastAssigned := 0 + + for i := range Workers { + if lastAssigned == len(queuedTasks)-1 { + // All Tasks have been Assigned + return nil + } + if Workers[i].Connected { + var count int + err := db.QueryRow(ctx, "SELECT COUNT(*) FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4 OR status = $5)", i, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING, constants.TASK_STATUS_UNKNOWN).Scan(&count) + if err != nil { + return fmt.Errorf("Error Querying Worker Task Count: %w", err) + } + + // Allow for Multiple Tasks at once in the future + if count < 1 { + tx, err := db.Begin(ctx) + defer tx.Rollback(ctx) + if err != nil { + return fmt.Errorf("Starting Transaction: %w", err) + } + _, err = tx.Exec(ctx, "UPDATE tasks SET status = $2, worker_id = $3 WHERE id = $1", queuedTasks[lastAssigned].ID, constants.TASK_STATUS_ASSIGNED, i) + if err != nil { + return fmt.Errorf("Setting tasks Assignment: %w", err) + } + + taskStart := types.TaskStart{ + ID: queuedTasks[lastAssigned].ID, + Type: queuedTasks[lastAssigned].Type, + FileID: queuedTasks[lastAssigned].FileID, + Data: queuedTasks[lastAssigned].Data, + } + + _, err = rpcServer.Call(ctx, Workers[i].Conn, "task-start", taskStart, nil) + if err != nil { + return fmt.Errorf("Error Starting Task: %w", err) + } + + err = tx.Commit(ctx) + if err != nil { + return fmt.Errorf("Error Committing Transaction: %w", err) + } + + slog.InfoContext(ctx, "Assigned Task to Worker", "task", queuedTasks[lastAssigned].ID, "worker", i) + lastAssigned++ + } + } + } + return nil +} diff --git a/tmpl/task.tmpl b/tmpl/task.tmpl new file mode 100644 index 0000000..118bc2d --- /dev/null +++ b/tmpl/task.tmpl @@ -0,0 +1,9 @@ +{{template "head"}} +