From f4219fe0b2adeab45aae7f735cd37ce8518ccd4f Mon Sep 17 00:00:00 2001 From: Samuel Lorch Date: Sat, 6 Jul 2024 17:06:26 +0200 Subject: [PATCH 1/3] Downgrade logs to debug to prevent spam --- server/task.go | 2 +- server/worker.go | 2 +- worker/worker.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/server/task.go b/server/task.go index f048fa3..6691b1f 100644 --- a/server/task.go +++ b/server/task.go @@ -366,7 +366,7 @@ func assignQueuedTasks(ctx context.Context) error { return fmt.Errorf("Error Querying Worker Task Count: %w", err) } - slog.Info("Assigning Queued Tasks Worker", "worker", i, "count", count) + slog.Debug("Assigning Queued Tasks Worker", "worker", i, "count", count) // Allow for Multiple Tasks at once in the future if count < 1 { diff --git a/server/worker.go b/server/worker.go index ed8322e..caffd57 100644 --- a/server/worker.go +++ b/server/worker.go @@ -129,7 +129,7 @@ func readMessage(ctx context.Context, c *websocket.Conn) error { if err != nil { return err } - slog.InfoContext(ctx, "Got Websocket Message", "type", t.String(), "data", data) + slog.DebugContext(ctx, "Got Websocket Message", "type", t.String(), "data", data) rpcServer.HandleMessage(ctx, c, data) return nil } diff --git a/worker/worker.go b/worker/worker.go index 411c872..75bd1bd 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -116,7 +116,7 @@ func readMessage(ctx context.Context, c *websocket.Conn) error { if err != nil { return err } - slog.InfoContext(ctx, "Got Websocket Message", "type", t.String(), "data", data) + slog.DebugContext(ctx, "Got Websocket Message", "type", t.String(), "data", data) rpcServer.HandleMessage(ctx, c, data) return nil } From 77d7a8624c22d2aefced244316d5b5af7a920aae Mon Sep 17 00:00:00 2001 From: Samuel Lorch Date: Sat, 6 Jul 2024 17:08:28 +0200 Subject: [PATCH 2/3] Don't run assigning multiple times --- server/task.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/server/task.go b/server/task.go index 6691b1f..76935d0 100644 --- a/server/task.go +++ b/server/task.go @@ -332,7 +332,18 @@ type QueuedTask struct { FfmpegCommand types.FFmpegCommand `json:"ffmpeg_command" db:"ffmpeg_command"` } +var assignRunning = false + func assignQueuedTasks(ctx context.Context) error { + if assignRunning { + return nil + } + + assignRunning = true + defer func() { + assignRunning = false + }() + rows, err := db.Query(ctx, "SELECT t.id as id, t.type as type, t.file_id as file_id, f.md5 as md5, t.data as data, fc.data as ffmpeg_command FROM tasks t INNER JOIN files f ON f.id = t.file_id INNER JOIN ffmpeg_commands fc ON fc.id = t.ffmpeg_command_id WHERE t.status = $1", constants.TASK_STATUS_QUEUED) if err != nil { return fmt.Errorf("Query Queued Tasks: %w", err) From b03e85db0b8d846cd89fd41e0f1b96a0715fae15 Mon Sep 17 00:00:00 2001 From: Samuel Lorch Date: Sat, 6 Jul 2024 18:01:51 +0200 Subject: [PATCH 3/3] Queue Enable --- ..._alter_workers_table_queue_enable.down.sql | 2 + ...16_alter_workers_table_queue_enable.up.sql | 2 + server/index.go | 6 ++- server/queue.go | 40 +++++++++++++++++++ server/server.go | 1 + server/task.go | 12 +++++- tmpl/index.tmpl | 16 ++++++++ 7 files changed, 77 insertions(+), 2 deletions(-) create mode 100644 migrations/000016_alter_workers_table_queue_enable.down.sql create mode 100644 migrations/000016_alter_workers_table_queue_enable.up.sql create mode 100644 server/queue.go diff --git a/migrations/000016_alter_workers_table_queue_enable.down.sql b/migrations/000016_alter_workers_table_queue_enable.down.sql new file mode 100644 index 0000000..16b4676 --- /dev/null +++ b/migrations/000016_alter_workers_table_queue_enable.down.sql @@ -0,0 +1,2 @@ +ALTER TABLE workers +DROP IF EXISTS queue_enable; diff --git a/migrations/000016_alter_workers_table_queue_enable.up.sql b/migrations/000016_alter_workers_table_queue_enable.up.sql new file mode 100644 index 0000000..0bfece4 --- /dev/null +++ b/migrations/000016_alter_workers_table_queue_enable.up.sql @@ -0,0 +1,2 @@ +ALTER TABLE workers +ADD queue_enable boolean NOT NULL DEFAULT false; diff --git a/server/index.go b/server/index.go index d755dd8..bdb5efc 100644 --- a/server/index.go +++ b/server/index.go @@ -16,8 +16,10 @@ type IndexData struct { } type IndexWorker struct { + ID string Worker - Status *types.WorkerStatus + Status *types.WorkerStatus + QueueEnable bool } func handleIndex(w http.ResponseWriter, r *http.Request) { @@ -39,11 +41,13 @@ func handleIndex(w http.ResponseWriter, r *http.Request) { } slog.InfoContext(r.Context(), "Got Worker Status", "id", i, "status", status) data.Workers = append(data.Workers, IndexWorker{ + ID: i, Worker: *Workers[i], Status: &status, }) } else { data.Workers = append(data.Workers, IndexWorker{ + ID: i, Worker: *Workers[i], Status: nil, }) diff --git a/server/queue.go b/server/queue.go new file mode 100644 index 0000000..97d5ed5 --- /dev/null +++ b/server/queue.go @@ -0,0 +1,40 @@ +package server + +import ( + "fmt" + "log/slog" + "net/http" +) + +func HandleSetQueueEnable(w http.ResponseWriter, r *http.Request) { + err := r.ParseForm() + if err != nil { + http.Error(w, fmt.Sprintf("Parseing Form: %v", err), http.StatusBadRequest) + return + } + enable := r.FormValue("enable") + if enable != "true" && enable != "false" { + http.Error(w, "Enable must be true or false", http.StatusBadRequest) + return + } + + worker := r.FormValue("worker") + + slog.Info("Got set Queue Enable", "enable", enable, "worker", worker) + + if worker == "all" { + _, err = db.Exec(r.Context(), "UPDATE workers SET queue_enable = $1", enable) + if err != nil { + http.Error(w, fmt.Sprintf("Setting Worker Queue Enable: %v", err), http.StatusInternalServerError) + return + } + } else { + _, err = db.Exec(r.Context(), "UPDATE workers SET queue_enable = $1 where id = $2", enable, worker) + if err != nil { + http.Error(w, fmt.Sprintf("Setting Worker Queue Enable: %v", err), http.StatusInternalServerError) + return + } + } + + http.Redirect(w, r, r.Header.Get("Referer"), http.StatusFound) +} diff --git a/server/server.go b/server/server.go index cb79411..812f163 100644 --- a/server/server.go +++ b/server/server.go @@ -101,6 +101,7 @@ func Start(_conf config.Config, tmplFS embed.FS, staticFS embed.FS, migrationsFS mux.HandleFunc("/libraries/{id}", handleLibrary) mux.HandleFunc("/libraries", handleLibraries) mux.HandleFunc("/ffmpeg_commands", handleFfmpegCommands) + mux.HandleFunc("/queue_enable", HandleSetQueueEnable) mux.HandleFunc("/", handleIndex) diff --git a/server/task.go b/server/task.go index 76935d0..c9d739a 100644 --- a/server/task.go +++ b/server/task.go @@ -371,8 +371,18 @@ func assignQueuedTasks(ctx context.Context) error { return nil } if Workers[i].Connected { + var queueEnable bool + err := db.QueryRow(ctx, "SELECT queue_enable FROM workers WHERE id = $1", i).Scan(&queueEnable) + if err != nil { + return fmt.Errorf("Error Querying Worker Queue Enable: %w", err) + } + if !queueEnable { + slog.DebugContext(ctx, "Skipping Worker since Queueing is disabled", "worker_id", i) + continue + } + var count int - err := db.QueryRow(ctx, "SELECT COUNT(*) FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4 OR status = $5)", i, constants.TASK_STATUS_UNKNOWN, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING).Scan(&count) + err = db.QueryRow(ctx, "SELECT COUNT(*) FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4 OR status = $5)", i, constants.TASK_STATUS_UNKNOWN, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING).Scan(&count) if err != nil { return fmt.Errorf("Error Querying Worker Task Count: %w", err) } diff --git a/tmpl/index.tmpl b/tmpl/index.tmpl index 8471c0d..e55ff03 100644 --- a/tmpl/index.tmpl +++ b/tmpl/index.tmpl @@ -15,6 +15,22 @@ {{end}} {{end}} +

Set Queue Enable

+
+ + + + + +

Workers

{{range $w := .Workers}}