From 57969cc07d7987c470e04879819f2dc5b513d189 Mon Sep 17 00:00:00 2001 From: Samuel Lorch Date: Wed, 19 Mar 2025 21:39:46 +0100 Subject: [PATCH] Add Parallel Tasks Settings --- ...lter_workers_table_parallel_tasks.down.sql | 2 + ..._alter_workers_table_parallel_tasks.up.sql | 2 + server/index.go | 30 +++++++++++---- server/parallel_tasks.go | 33 ++++++++++++++++ server/server.go | 1 + server/task.go | 5 ++- tmpl/index.tmpl | 38 ++++++++++++++++++- 7 files changed, 99 insertions(+), 12 deletions(-) create mode 100644 migrations/000021_alter_workers_table_parallel_tasks.down.sql create mode 100644 migrations/000021_alter_workers_table_parallel_tasks.up.sql create mode 100644 server/parallel_tasks.go diff --git a/migrations/000021_alter_workers_table_parallel_tasks.down.sql b/migrations/000021_alter_workers_table_parallel_tasks.down.sql new file mode 100644 index 0000000..c515699 --- /dev/null +++ b/migrations/000021_alter_workers_table_parallel_tasks.down.sql @@ -0,0 +1,2 @@ +ALTER TABLE workers +DROP IF EXISTS parallel_tasks; diff --git a/migrations/000021_alter_workers_table_parallel_tasks.up.sql b/migrations/000021_alter_workers_table_parallel_tasks.up.sql new file mode 100644 index 0000000..ec18538 --- /dev/null +++ b/migrations/000021_alter_workers_table_parallel_tasks.up.sql @@ -0,0 +1,2 @@ +ALTER TABLE workers +ADD parallel_tasks smallint NOT NULL DEFAULT 1; diff --git a/server/index.go b/server/index.go index bdb5efc..a23c9c5 100644 --- a/server/index.go +++ b/server/index.go @@ -18,8 +18,9 @@ type IndexData struct { type IndexWorker struct { ID string Worker - Status *types.WorkerStatus - QueueEnable bool + Status *types.WorkerStatus + QueueEnable bool + ParallelTasks int } func handleIndex(w http.ResponseWriter, r *http.Request) { @@ -31,6 +32,15 @@ func handleIndex(w http.ResponseWriter, r *http.Request) { WorkersMutex.Lock() defer WorkersMutex.Unlock() for i := range Workers { + + var queueEnable bool + var parallelTasks int + err := db.QueryRow(r.Context(), "SELECT queue_enable, parallel_tasks FROM workers WHERE id = $1", i).Scan(&queueEnable, ¶llelTasks) + if err != nil { + w.Write([]byte(err.Error())) + slog.ErrorContext(r.Context(), "Error Querying Worker", "err", err) + } + if Workers[i].Connected { var status types.WorkerStatus _, err := rpcServer.Call(r.Context(), Workers[i].Conn, "status", nil, &status) @@ -41,15 +51,19 @@ func handleIndex(w http.ResponseWriter, r *http.Request) { } slog.InfoContext(r.Context(), "Got Worker Status", "id", i, "status", status) data.Workers = append(data.Workers, IndexWorker{ - ID: i, - Worker: *Workers[i], - Status: &status, + ID: i, + Worker: *Workers[i], + Status: &status, + QueueEnable: queueEnable, + ParallelTasks: parallelTasks, }) } else { data.Workers = append(data.Workers, IndexWorker{ - ID: i, - Worker: *Workers[i], - Status: nil, + ID: i, + Worker: *Workers[i], + Status: nil, + QueueEnable: queueEnable, + ParallelTasks: parallelTasks, }) } } diff --git a/server/parallel_tasks.go b/server/parallel_tasks.go new file mode 100644 index 0000000..d76b3b7 --- /dev/null +++ b/server/parallel_tasks.go @@ -0,0 +1,33 @@ +package server + +import ( + "fmt" + "log/slog" + "net/http" + "strconv" +) + +func HandleSetParallelTasks(w http.ResponseWriter, r *http.Request) { + err := r.ParseForm() + if err != nil { + http.Error(w, fmt.Sprintf("Parseing Form: %v", err), http.StatusBadRequest) + return + } + parallelTasks, err := strconv.Atoi(r.FormValue("parallel_tasks")) + if err != nil { + http.Error(w, "Can't parse parallel_tasks: "+err.Error(), http.StatusBadRequest) + return + } + + worker := r.FormValue("worker") + + slog.Info("Got set Parallel Tasks", "parallel_tasks", parallelTasks, "worker", worker) + + _, err = db.Exec(r.Context(), "UPDATE workers SET parallel_tasks = $1", parallelTasks) + if err != nil { + http.Error(w, fmt.Sprintf("Setting Worker Parallel Tasks: %v", err), http.StatusInternalServerError) + return + } + + http.Redirect(w, r, r.Header.Get("Referer"), http.StatusFound) +} diff --git a/server/server.go b/server/server.go index 73871a9..a1ec131 100644 --- a/server/server.go +++ b/server/server.go @@ -103,6 +103,7 @@ func Start(_conf config.Config, tmplFS embed.FS, staticFS embed.FS, migrationsFS mux.HandleFunc("/libraries", handleLibraries) mux.HandleFunc("/ffmpeg_commands", handleFfmpegCommands) mux.HandleFunc("/queue_enable", HandleSetQueueEnable) + mux.HandleFunc("/parallel_tasks", HandleSetParallelTasks) mux.HandleFunc("/stats", handleStats) mux.HandleFunc("/stats/{id}", handleStats) diff --git a/server/task.go b/server/task.go index e19d624..c9ec63c 100644 --- a/server/task.go +++ b/server/task.go @@ -375,7 +375,8 @@ func assignQueuedTasks(ctx context.Context) error { } if Workers[i].Connected { var queueEnable bool - err := db.QueryRow(ctx, "SELECT queue_enable FROM workers WHERE id = $1", i).Scan(&queueEnable) + var parallelTasks int + err := db.QueryRow(ctx, "SELECT queue_enable, parallel_tasks FROM workers WHERE id = $1", i).Scan(&queueEnable, ¶llelTasks) if err != nil { return fmt.Errorf("Error Querying Worker Queue Enable: %w", err) } @@ -393,7 +394,7 @@ func assignQueuedTasks(ctx context.Context) error { slog.Debug("Assigning Queued Tasks Worker", "worker", i, "count", count) // Allow for Multiple Tasks at once in the future - if count < 1 { + if count < parallelTasks { tx, err := db.Begin(ctx) defer tx.Rollback(ctx) if err != nil { diff --git a/tmpl/index.tmpl b/tmpl/index.tmpl index e55ff03..a572a8b 100644 --- a/tmpl/index.tmpl +++ b/tmpl/index.tmpl @@ -31,6 +31,24 @@ +

Set Queue Enable

+
+ + + + + +

Workers

{{range $w := .Workers}} @@ -60,6 +78,22 @@ {{ $w.ConnectionChanged }} + + + + QueueEnabled + + + {{ $w.QueueEnabled }} + + + + + ParallelTasks + + + {{ $w.ParallelTasks }} + {{if $w.Connected}} @@ -95,8 +129,8 @@ {{end}} - +
{{end}} -{{template "tail"}} \ No newline at end of file +{{template "tail"}}