diff --git a/constants/constants.go b/constants/constants.go index bfdffa6..f50fc3a 100644 --- a/constants/constants.go +++ b/constants/constants.go @@ -1,5 +1,7 @@ package constants +import "fmt" + const WORKER_VERSION = "v1" const WORKER_VERSION_HEADER = "morffix-version" @@ -19,9 +21,11 @@ const ( TASK_TYPE_TRANSCODE ) +type TaskStatus int + // Non Append Changes Need Worker Version Bump const ( - TASK_STATUS_UNKNOWN = iota + TASK_STATUS_UNKNOWN TaskStatus = iota TASK_STATUS_FAILED TASK_STATUS_SUCCESS TASK_STATUS_RUNNING @@ -30,3 +34,26 @@ const ( TASK_STATUS_PAUSED TASK_STATUS_WAITING ) + +func (s TaskStatus) String() string { + switch s { + case TASK_STATUS_UNKNOWN: + return "Unknown" + case TASK_STATUS_FAILED: + return "Failed" + case TASK_STATUS_SUCCESS: + return "Success" + case TASK_STATUS_RUNNING: + return "Running" + case TASK_STATUS_QUEUED: + return "Queued" + case TASK_STATUS_ASSIGNED: + return "Assigned" + case TASK_STATUS_PAUSED: + return "Paused" + case TASK_STATUS_WAITING: + return "Waiting" + default: + return fmt.Sprintf("%d", int(s)) + } +} diff --git a/server/task.go b/server/task.go index 682e7fd..aafc7ee 100644 --- a/server/task.go +++ b/server/task.go @@ -18,6 +18,24 @@ type TasksData struct { Tasks []TaskDisplay } +type TaskDisplay struct { + ID int `db:"id"` + Library int `db:"library"` + Worker *string `db:"worker"` + Type int `db:"type"` + Status string `db:"status"` + File string `db:"file"` +} + +type TaskDB struct { + ID int `db:"id"` + Library int `db:"library"` + Worker *string `db:"worker"` + Type int `db:"type"` + Status constants.TaskStatus `db:"status"` + File string `db:"file"` +} + func handleTasks(w http.ResponseWriter, r *http.Request) { if r.Method == "POST" { err := createTask(r.Context(), r) @@ -50,13 +68,22 @@ func handleTasks(w http.ResponseWriter, r *http.Request) { http.Error(w, "Error Query Tasks: "+err.Error(), http.StatusInternalServerError) return } - tasks, err := pgx.CollectRows[TaskDisplay](rows, pgx.RowToStructByName[TaskDisplay]) + tasks, err := pgx.CollectRows[TaskDB](rows, pgx.RowToStructByName[TaskDB]) if err != nil { slog.ErrorContext(r.Context(), "Collect Tasks", "err", err) http.Error(w, "Error Query Tasks: "+err.Error(), http.StatusInternalServerError) return } - data.Tasks = tasks + for i := range tasks { + data.Tasks = append(data.Tasks, TaskDisplay{ + ID: tasks[i].ID, + Library: tasks[i].Library, + Worker: tasks[i].Worker, + Type: tasks[i].Type, + File: tasks[i].File, + Status: tasks[i].Status.String(), + }) + } buf := bytes.Buffer{} err = templates.ExecuteTemplate(&buf, constants.TASKS_TEMPLATE_NAME, data) @@ -133,11 +160,11 @@ func createTask(ctx context.Context, r *http.Request) error { if true { // TODO typ == constants.TASK_TYPE_HEALTHCHECK { // ffmpeg.exe -stats -v error -i "in.mp4" -f null -max_muxing_queue_size 9999 "out.mp4" - data = types.FFmpegCommand{ - Args: []types.Arg{{Flag: "-stats"}, {Flag: "-v", Value: "error"}}, + data = types.HealthCheckData{Command: types.FFmpegCommand{ + Args: []types.Arg{{Flag: "-stats"}, {Flag: "-v", Value: "error"}, {Flag: "-xerror"}}, InputFiles: []types.File{{Path: "input.mkv"}}, OutputFiles: []types.File{{Path: "output.mkv", Arguments: []types.Arg{{Flag: "-f", Value: "null"}, {Flag: "-max_muxing_queue_size", Value: "9999"}}}}, - } + }} } for _, file := range files { @@ -172,6 +199,8 @@ func assignQueuedTasks(ctx context.Context) error { return fmt.Errorf("Collect Queued Tasks: %w", err) } + //slog.Info("Assigning Queued Tasks", "count", len(queuedTasks)) + if len(queuedTasks) == 0 { return nil } @@ -182,17 +211,20 @@ func assignQueuedTasks(ctx context.Context) error { lastAssigned := 0 for i := range Workers { - if lastAssigned == len(queuedTasks)-1 { + if lastAssigned == len(queuedTasks) { + slog.Info("All Tasks assigned") // All Tasks have been Assigned return nil } if Workers[i].Connected { var count int - err := db.QueryRow(ctx, "SELECT COUNT(*) FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4 OR status = $5)", i, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING, constants.TASK_STATUS_UNKNOWN).Scan(&count) + err := db.QueryRow(ctx, "SELECT COUNT(*) FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4)", i, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING).Scan(&count) if err != nil { return fmt.Errorf("Error Querying Worker Task Count: %w", err) } + slog.Info("Assigning Queued Tasks Worker", "worker", i, "count", count) + // Allow for Multiple Tasks at once in the future if count < 1 { tx, err := db.Begin(ctx) diff --git a/tmpl/tasks.tmpl b/tmpl/tasks.tmpl index b270ee3..6baee53 100644 --- a/tmpl/tasks.tmpl +++ b/tmpl/tasks.tmpl @@ -19,7 +19,10 @@
ID | +Library | +Worker | Type | +Status | File | {{ $t.ID }} | ++ {{ $t.Library }} + | ++ {{ $t.Worker }} + | {{ $t.Type }} | ++ {{ $t.Status }} + | {{ $t.File }} |
---|