diff --git a/migrations/000016_alter_workers_table_queue_enable.down.sql b/migrations/000016_alter_workers_table_queue_enable.down.sql deleted file mode 100644 index 16b4676..0000000 --- a/migrations/000016_alter_workers_table_queue_enable.down.sql +++ /dev/null @@ -1,2 +0,0 @@ -ALTER TABLE workers -DROP IF EXISTS queue_enable; diff --git a/migrations/000016_alter_workers_table_queue_enable.up.sql b/migrations/000016_alter_workers_table_queue_enable.up.sql deleted file mode 100644 index 0bfece4..0000000 --- a/migrations/000016_alter_workers_table_queue_enable.up.sql +++ /dev/null @@ -1,2 +0,0 @@ -ALTER TABLE workers -ADD queue_enable boolean NOT NULL DEFAULT false; diff --git a/server/index.go b/server/index.go index bdb5efc..d755dd8 100644 --- a/server/index.go +++ b/server/index.go @@ -16,10 +16,8 @@ type IndexData struct { } type IndexWorker struct { - ID string Worker - Status *types.WorkerStatus - QueueEnable bool + Status *types.WorkerStatus } func handleIndex(w http.ResponseWriter, r *http.Request) { @@ -41,13 +39,11 @@ func handleIndex(w http.ResponseWriter, r *http.Request) { } slog.InfoContext(r.Context(), "Got Worker Status", "id", i, "status", status) data.Workers = append(data.Workers, IndexWorker{ - ID: i, Worker: *Workers[i], Status: &status, }) } else { data.Workers = append(data.Workers, IndexWorker{ - ID: i, Worker: *Workers[i], Status: nil, }) diff --git a/server/queue.go b/server/queue.go deleted file mode 100644 index 97d5ed5..0000000 --- a/server/queue.go +++ /dev/null @@ -1,40 +0,0 @@ -package server - -import ( - "fmt" - "log/slog" - "net/http" -) - -func HandleSetQueueEnable(w http.ResponseWriter, r *http.Request) { - err := r.ParseForm() - if err != nil { - http.Error(w, fmt.Sprintf("Parseing Form: %v", err), http.StatusBadRequest) - return - } - enable := r.FormValue("enable") - if enable != "true" && enable != "false" { - http.Error(w, "Enable must be true or false", http.StatusBadRequest) - return - } - - worker := r.FormValue("worker") - - slog.Info("Got set Queue Enable", "enable", enable, "worker", worker) - - if worker == "all" { - _, err = db.Exec(r.Context(), "UPDATE workers SET queue_enable = $1", enable) - if err != nil { - http.Error(w, fmt.Sprintf("Setting Worker Queue Enable: %v", err), http.StatusInternalServerError) - return - } - } else { - _, err = db.Exec(r.Context(), "UPDATE workers SET queue_enable = $1 where id = $2", enable, worker) - if err != nil { - http.Error(w, fmt.Sprintf("Setting Worker Queue Enable: %v", err), http.StatusInternalServerError) - return - } - } - - http.Redirect(w, r, r.Header.Get("Referer"), http.StatusFound) -} diff --git a/server/server.go b/server/server.go index 812f163..cb79411 100644 --- a/server/server.go +++ b/server/server.go @@ -101,7 +101,6 @@ func Start(_conf config.Config, tmplFS embed.FS, staticFS embed.FS, migrationsFS mux.HandleFunc("/libraries/{id}", handleLibrary) mux.HandleFunc("/libraries", handleLibraries) mux.HandleFunc("/ffmpeg_commands", handleFfmpegCommands) - mux.HandleFunc("/queue_enable", HandleSetQueueEnable) mux.HandleFunc("/", handleIndex) diff --git a/server/task.go b/server/task.go index c9d739a..f048fa3 100644 --- a/server/task.go +++ b/server/task.go @@ -332,18 +332,7 @@ type QueuedTask struct { FfmpegCommand types.FFmpegCommand `json:"ffmpeg_command" db:"ffmpeg_command"` } -var assignRunning = false - func assignQueuedTasks(ctx context.Context) error { - if assignRunning { - return nil - } - - assignRunning = true - defer func() { - assignRunning = false - }() - rows, err := db.Query(ctx, "SELECT t.id as id, t.type as type, t.file_id as file_id, f.md5 as md5, t.data as data, fc.data as ffmpeg_command FROM tasks t INNER JOIN files f ON f.id = t.file_id INNER JOIN ffmpeg_commands fc ON fc.id = t.ffmpeg_command_id WHERE t.status = $1", constants.TASK_STATUS_QUEUED) if err != nil { return fmt.Errorf("Query Queued Tasks: %w", err) @@ -371,23 +360,13 @@ func assignQueuedTasks(ctx context.Context) error { return nil } if Workers[i].Connected { - var queueEnable bool - err := db.QueryRow(ctx, "SELECT queue_enable FROM workers WHERE id = $1", i).Scan(&queueEnable) - if err != nil { - return fmt.Errorf("Error Querying Worker Queue Enable: %w", err) - } - if !queueEnable { - slog.DebugContext(ctx, "Skipping Worker since Queueing is disabled", "worker_id", i) - continue - } - var count int - err = db.QueryRow(ctx, "SELECT COUNT(*) FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4 OR status = $5)", i, constants.TASK_STATUS_UNKNOWN, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING).Scan(&count) + err := db.QueryRow(ctx, "SELECT COUNT(*) FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4 OR status = $5)", i, constants.TASK_STATUS_UNKNOWN, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING).Scan(&count) if err != nil { return fmt.Errorf("Error Querying Worker Task Count: %w", err) } - slog.Debug("Assigning Queued Tasks Worker", "worker", i, "count", count) + slog.Info("Assigning Queued Tasks Worker", "worker", i, "count", count) // Allow for Multiple Tasks at once in the future if count < 1 { diff --git a/server/worker.go b/server/worker.go index caffd57..ed8322e 100644 --- a/server/worker.go +++ b/server/worker.go @@ -129,7 +129,7 @@ func readMessage(ctx context.Context, c *websocket.Conn) error { if err != nil { return err } - slog.DebugContext(ctx, "Got Websocket Message", "type", t.String(), "data", data) + slog.InfoContext(ctx, "Got Websocket Message", "type", t.String(), "data", data) rpcServer.HandleMessage(ctx, c, data) return nil } diff --git a/tmpl/index.tmpl b/tmpl/index.tmpl index e55ff03..8471c0d 100644 --- a/tmpl/index.tmpl +++ b/tmpl/index.tmpl @@ -15,22 +15,6 @@ {{end}} {{end}} -

Set Queue Enable

-
- - - - - -

Workers

{{range $w := .Workers}} diff --git a/worker/worker.go b/worker/worker.go index 75bd1bd..411c872 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -116,7 +116,7 @@ func readMessage(ctx context.Context, c *websocket.Conn) error { if err != nil { return err } - slog.DebugContext(ctx, "Got Websocket Message", "type", t.String(), "data", data) + slog.InfoContext(ctx, "Got Websocket Message", "type", t.String(), "data", data) rpcServer.HandleMessage(ctx, c, data) return nil }