From b03e85db0b8d846cd89fd41e0f1b96a0715fae15 Mon Sep 17 00:00:00 2001 From: Samuel Lorch Date: Sat, 6 Jul 2024 18:01:51 +0200 Subject: [PATCH] Queue Enable --- ..._alter_workers_table_queue_enable.down.sql | 2 + ...16_alter_workers_table_queue_enable.up.sql | 2 + server/index.go | 6 ++- server/queue.go | 40 +++++++++++++++++++ server/server.go | 1 + server/task.go | 12 +++++- tmpl/index.tmpl | 16 ++++++++ 7 files changed, 77 insertions(+), 2 deletions(-) create mode 100644 migrations/000016_alter_workers_table_queue_enable.down.sql create mode 100644 migrations/000016_alter_workers_table_queue_enable.up.sql create mode 100644 server/queue.go diff --git a/migrations/000016_alter_workers_table_queue_enable.down.sql b/migrations/000016_alter_workers_table_queue_enable.down.sql new file mode 100644 index 0000000..16b4676 --- /dev/null +++ b/migrations/000016_alter_workers_table_queue_enable.down.sql @@ -0,0 +1,2 @@ +ALTER TABLE workers +DROP IF EXISTS queue_enable; diff --git a/migrations/000016_alter_workers_table_queue_enable.up.sql b/migrations/000016_alter_workers_table_queue_enable.up.sql new file mode 100644 index 0000000..0bfece4 --- /dev/null +++ b/migrations/000016_alter_workers_table_queue_enable.up.sql @@ -0,0 +1,2 @@ +ALTER TABLE workers +ADD queue_enable boolean NOT NULL DEFAULT false; diff --git a/server/index.go b/server/index.go index d755dd8..bdb5efc 100644 --- a/server/index.go +++ b/server/index.go @@ -16,8 +16,10 @@ type IndexData struct { } type IndexWorker struct { + ID string Worker - Status *types.WorkerStatus + Status *types.WorkerStatus + QueueEnable bool } func handleIndex(w http.ResponseWriter, r *http.Request) { @@ -39,11 +41,13 @@ func handleIndex(w http.ResponseWriter, r *http.Request) { } slog.InfoContext(r.Context(), "Got Worker Status", "id", i, "status", status) data.Workers = append(data.Workers, IndexWorker{ + ID: i, Worker: *Workers[i], Status: &status, }) } else { data.Workers = append(data.Workers, IndexWorker{ + ID: i, Worker: *Workers[i], Status: nil, }) diff --git a/server/queue.go b/server/queue.go new file mode 100644 index 0000000..97d5ed5 --- /dev/null +++ b/server/queue.go @@ -0,0 +1,40 @@ +package server + +import ( + "fmt" + "log/slog" + "net/http" +) + +func HandleSetQueueEnable(w http.ResponseWriter, r *http.Request) { + err := r.ParseForm() + if err != nil { + http.Error(w, fmt.Sprintf("Parseing Form: %v", err), http.StatusBadRequest) + return + } + enable := r.FormValue("enable") + if enable != "true" && enable != "false" { + http.Error(w, "Enable must be true or false", http.StatusBadRequest) + return + } + + worker := r.FormValue("worker") + + slog.Info("Got set Queue Enable", "enable", enable, "worker", worker) + + if worker == "all" { + _, err = db.Exec(r.Context(), "UPDATE workers SET queue_enable = $1", enable) + if err != nil { + http.Error(w, fmt.Sprintf("Setting Worker Queue Enable: %v", err), http.StatusInternalServerError) + return + } + } else { + _, err = db.Exec(r.Context(), "UPDATE workers SET queue_enable = $1 where id = $2", enable, worker) + if err != nil { + http.Error(w, fmt.Sprintf("Setting Worker Queue Enable: %v", err), http.StatusInternalServerError) + return + } + } + + http.Redirect(w, r, r.Header.Get("Referer"), http.StatusFound) +} diff --git a/server/server.go b/server/server.go index cb79411..812f163 100644 --- a/server/server.go +++ b/server/server.go @@ -101,6 +101,7 @@ func Start(_conf config.Config, tmplFS embed.FS, staticFS embed.FS, migrationsFS mux.HandleFunc("/libraries/{id}", handleLibrary) mux.HandleFunc("/libraries", handleLibraries) mux.HandleFunc("/ffmpeg_commands", handleFfmpegCommands) + mux.HandleFunc("/queue_enable", HandleSetQueueEnable) mux.HandleFunc("/", handleIndex) diff --git a/server/task.go b/server/task.go index 76935d0..c9d739a 100644 --- a/server/task.go +++ b/server/task.go @@ -371,8 +371,18 @@ func assignQueuedTasks(ctx context.Context) error { return nil } if Workers[i].Connected { + var queueEnable bool + err := db.QueryRow(ctx, "SELECT queue_enable FROM workers WHERE id = $1", i).Scan(&queueEnable) + if err != nil { + return fmt.Errorf("Error Querying Worker Queue Enable: %w", err) + } + if !queueEnable { + slog.DebugContext(ctx, "Skipping Worker since Queueing is disabled", "worker_id", i) + continue + } + var count int - err := db.QueryRow(ctx, "SELECT COUNT(*) FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4 OR status = $5)", i, constants.TASK_STATUS_UNKNOWN, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING).Scan(&count) + err = db.QueryRow(ctx, "SELECT COUNT(*) FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4 OR status = $5)", i, constants.TASK_STATUS_UNKNOWN, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING).Scan(&count) if err != nil { return fmt.Errorf("Error Querying Worker Task Count: %w", err) } diff --git a/tmpl/index.tmpl b/tmpl/index.tmpl index 8471c0d..e55ff03 100644 --- a/tmpl/index.tmpl +++ b/tmpl/index.tmpl @@ -15,6 +15,22 @@ {{end}} {{end}} +

Set Queue Enable

+
+ + + + + +

Workers

{{range $w := .Workers}}