diff --git a/migrations/000021_alter_workers_table_parallel_tasks.down.sql b/migrations/000021_alter_workers_table_parallel_tasks.down.sql new file mode 100644 index 0000000..c515699 --- /dev/null +++ b/migrations/000021_alter_workers_table_parallel_tasks.down.sql @@ -0,0 +1,2 @@ +ALTER TABLE workers +DROP IF EXISTS parallel_tasks; diff --git a/migrations/000021_alter_workers_table_parallel_tasks.up.sql b/migrations/000021_alter_workers_table_parallel_tasks.up.sql new file mode 100644 index 0000000..ec18538 --- /dev/null +++ b/migrations/000021_alter_workers_table_parallel_tasks.up.sql @@ -0,0 +1,2 @@ +ALTER TABLE workers +ADD parallel_tasks smallint NOT NULL DEFAULT 1; diff --git a/server/index.go b/server/index.go index bdb5efc..a23c9c5 100644 --- a/server/index.go +++ b/server/index.go @@ -18,8 +18,9 @@ type IndexData struct { type IndexWorker struct { ID string Worker - Status *types.WorkerStatus - QueueEnable bool + Status *types.WorkerStatus + QueueEnable bool + ParallelTasks int } func handleIndex(w http.ResponseWriter, r *http.Request) { @@ -31,6 +32,15 @@ func handleIndex(w http.ResponseWriter, r *http.Request) { WorkersMutex.Lock() defer WorkersMutex.Unlock() for i := range Workers { + + var queueEnable bool + var parallelTasks int + err := db.QueryRow(r.Context(), "SELECT queue_enable, parallel_tasks FROM workers WHERE id = $1", i).Scan(&queueEnable, ¶llelTasks) + if err != nil { + w.Write([]byte(err.Error())) + slog.ErrorContext(r.Context(), "Error Querying Worker", "err", err) + } + if Workers[i].Connected { var status types.WorkerStatus _, err := rpcServer.Call(r.Context(), Workers[i].Conn, "status", nil, &status) @@ -41,15 +51,19 @@ func handleIndex(w http.ResponseWriter, r *http.Request) { } slog.InfoContext(r.Context(), "Got Worker Status", "id", i, "status", status) data.Workers = append(data.Workers, IndexWorker{ - ID: i, - Worker: *Workers[i], - Status: &status, + ID: i, + Worker: *Workers[i], + Status: &status, + QueueEnable: queueEnable, + ParallelTasks: parallelTasks, }) } else { data.Workers = append(data.Workers, IndexWorker{ - ID: i, - Worker: *Workers[i], - Status: nil, + ID: i, + Worker: *Workers[i], + Status: nil, + QueueEnable: queueEnable, + ParallelTasks: parallelTasks, }) } } diff --git a/server/parallel_tasks.go b/server/parallel_tasks.go new file mode 100644 index 0000000..d76b3b7 --- /dev/null +++ b/server/parallel_tasks.go @@ -0,0 +1,33 @@ +package server + +import ( + "fmt" + "log/slog" + "net/http" + "strconv" +) + +func HandleSetParallelTasks(w http.ResponseWriter, r *http.Request) { + err := r.ParseForm() + if err != nil { + http.Error(w, fmt.Sprintf("Parseing Form: %v", err), http.StatusBadRequest) + return + } + parallelTasks, err := strconv.Atoi(r.FormValue("parallel_tasks")) + if err != nil { + http.Error(w, "Can't parse parallel_tasks: "+err.Error(), http.StatusBadRequest) + return + } + + worker := r.FormValue("worker") + + slog.Info("Got set Parallel Tasks", "parallel_tasks", parallelTasks, "worker", worker) + + _, err = db.Exec(r.Context(), "UPDATE workers SET parallel_tasks = $1", parallelTasks) + if err != nil { + http.Error(w, fmt.Sprintf("Setting Worker Parallel Tasks: %v", err), http.StatusInternalServerError) + return + } + + http.Redirect(w, r, r.Header.Get("Referer"), http.StatusFound) +} diff --git a/server/server.go b/server/server.go index 73871a9..a1ec131 100644 --- a/server/server.go +++ b/server/server.go @@ -103,6 +103,7 @@ func Start(_conf config.Config, tmplFS embed.FS, staticFS embed.FS, migrationsFS mux.HandleFunc("/libraries", handleLibraries) mux.HandleFunc("/ffmpeg_commands", handleFfmpegCommands) mux.HandleFunc("/queue_enable", HandleSetQueueEnable) + mux.HandleFunc("/parallel_tasks", HandleSetParallelTasks) mux.HandleFunc("/stats", handleStats) mux.HandleFunc("/stats/{id}", handleStats) diff --git a/server/task.go b/server/task.go index e19d624..c9ec63c 100644 --- a/server/task.go +++ b/server/task.go @@ -375,7 +375,8 @@ func assignQueuedTasks(ctx context.Context) error { } if Workers[i].Connected { var queueEnable bool - err := db.QueryRow(ctx, "SELECT queue_enable FROM workers WHERE id = $1", i).Scan(&queueEnable) + var parallelTasks int + err := db.QueryRow(ctx, "SELECT queue_enable, parallel_tasks FROM workers WHERE id = $1", i).Scan(&queueEnable, ¶llelTasks) if err != nil { return fmt.Errorf("Error Querying Worker Queue Enable: %w", err) } @@ -393,7 +394,7 @@ func assignQueuedTasks(ctx context.Context) error { slog.Debug("Assigning Queued Tasks Worker", "worker", i, "count", count) // Allow for Multiple Tasks at once in the future - if count < 1 { + if count < parallelTasks { tx, err := db.Begin(ctx) defer tx.Rollback(ctx) if err != nil { diff --git a/tmpl/index.tmpl b/tmpl/index.tmpl index e55ff03..a572a8b 100644 --- a/tmpl/index.tmpl +++ b/tmpl/index.tmpl @@ -31,6 +31,24 @@ +