This commit is contained in:
parent
8c3ba7e883
commit
57969cc07d
7 changed files with 99 additions and 12 deletions
|
@ -0,0 +1,2 @@
|
|||
ALTER TABLE workers
|
||||
DROP IF EXISTS parallel_tasks;
|
|
@ -0,0 +1,2 @@
|
|||
ALTER TABLE workers
|
||||
ADD parallel_tasks smallint NOT NULL DEFAULT 1;
|
|
@ -18,8 +18,9 @@ type IndexData struct {
|
|||
type IndexWorker struct {
|
||||
ID string
|
||||
Worker
|
||||
Status *types.WorkerStatus
|
||||
QueueEnable bool
|
||||
Status *types.WorkerStatus
|
||||
QueueEnable bool
|
||||
ParallelTasks int
|
||||
}
|
||||
|
||||
func handleIndex(w http.ResponseWriter, r *http.Request) {
|
||||
|
@ -31,6 +32,15 @@ func handleIndex(w http.ResponseWriter, r *http.Request) {
|
|||
WorkersMutex.Lock()
|
||||
defer WorkersMutex.Unlock()
|
||||
for i := range Workers {
|
||||
|
||||
var queueEnable bool
|
||||
var parallelTasks int
|
||||
err := db.QueryRow(r.Context(), "SELECT queue_enable, parallel_tasks FROM workers WHERE id = $1", i).Scan(&queueEnable, ¶llelTasks)
|
||||
if err != nil {
|
||||
w.Write([]byte(err.Error()))
|
||||
slog.ErrorContext(r.Context(), "Error Querying Worker", "err", err)
|
||||
}
|
||||
|
||||
if Workers[i].Connected {
|
||||
var status types.WorkerStatus
|
||||
_, err := rpcServer.Call(r.Context(), Workers[i].Conn, "status", nil, &status)
|
||||
|
@ -41,15 +51,19 @@ func handleIndex(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
slog.InfoContext(r.Context(), "Got Worker Status", "id", i, "status", status)
|
||||
data.Workers = append(data.Workers, IndexWorker{
|
||||
ID: i,
|
||||
Worker: *Workers[i],
|
||||
Status: &status,
|
||||
ID: i,
|
||||
Worker: *Workers[i],
|
||||
Status: &status,
|
||||
QueueEnable: queueEnable,
|
||||
ParallelTasks: parallelTasks,
|
||||
})
|
||||
} else {
|
||||
data.Workers = append(data.Workers, IndexWorker{
|
||||
ID: i,
|
||||
Worker: *Workers[i],
|
||||
Status: nil,
|
||||
ID: i,
|
||||
Worker: *Workers[i],
|
||||
Status: nil,
|
||||
QueueEnable: queueEnable,
|
||||
ParallelTasks: parallelTasks,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
33
server/parallel_tasks.go
Normal file
33
server/parallel_tasks.go
Normal file
|
@ -0,0 +1,33 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
func HandleSetParallelTasks(w http.ResponseWriter, r *http.Request) {
|
||||
err := r.ParseForm()
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("Parseing Form: %v", err), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
parallelTasks, err := strconv.Atoi(r.FormValue("parallel_tasks"))
|
||||
if err != nil {
|
||||
http.Error(w, "Can't parse parallel_tasks: "+err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
worker := r.FormValue("worker")
|
||||
|
||||
slog.Info("Got set Parallel Tasks", "parallel_tasks", parallelTasks, "worker", worker)
|
||||
|
||||
_, err = db.Exec(r.Context(), "UPDATE workers SET parallel_tasks = $1", parallelTasks)
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("Setting Worker Parallel Tasks: %v", err), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
http.Redirect(w, r, r.Header.Get("Referer"), http.StatusFound)
|
||||
}
|
|
@ -103,6 +103,7 @@ func Start(_conf config.Config, tmplFS embed.FS, staticFS embed.FS, migrationsFS
|
|||
mux.HandleFunc("/libraries", handleLibraries)
|
||||
mux.HandleFunc("/ffmpeg_commands", handleFfmpegCommands)
|
||||
mux.HandleFunc("/queue_enable", HandleSetQueueEnable)
|
||||
mux.HandleFunc("/parallel_tasks", HandleSetParallelTasks)
|
||||
mux.HandleFunc("/stats", handleStats)
|
||||
mux.HandleFunc("/stats/{id}", handleStats)
|
||||
|
||||
|
|
|
@ -375,7 +375,8 @@ func assignQueuedTasks(ctx context.Context) error {
|
|||
}
|
||||
if Workers[i].Connected {
|
||||
var queueEnable bool
|
||||
err := db.QueryRow(ctx, "SELECT queue_enable FROM workers WHERE id = $1", i).Scan(&queueEnable)
|
||||
var parallelTasks int
|
||||
err := db.QueryRow(ctx, "SELECT queue_enable, parallel_tasks FROM workers WHERE id = $1", i).Scan(&queueEnable, ¶llelTasks)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error Querying Worker Queue Enable: %w", err)
|
||||
}
|
||||
|
@ -393,7 +394,7 @@ func assignQueuedTasks(ctx context.Context) error {
|
|||
slog.Debug("Assigning Queued Tasks Worker", "worker", i, "count", count)
|
||||
|
||||
// Allow for Multiple Tasks at once in the future
|
||||
if count < 1 {
|
||||
if count < parallelTasks {
|
||||
tx, err := db.Begin(ctx)
|
||||
defer tx.Rollback(ctx)
|
||||
if err != nil {
|
||||
|
|
|
@ -31,6 +31,24 @@
|
|||
</select>
|
||||
<input type="submit" value="Submit">
|
||||
</form>
|
||||
<h2>Set Queue Enable</h2>
|
||||
<form method="POST" action= "/parallel_tasks">
|
||||
<label for="worker">Worker:</label>
|
||||
<select id="worker" name="worker">
|
||||
<option value="all">All</option>
|
||||
{{range $w := .Workers}}
|
||||
<option value="{{$w.ID}}">{{$w.Name}}</option>
|
||||
{{end}}
|
||||
</select>
|
||||
<label for="parallel_tasks">Parallel Tasks</label>
|
||||
<select id="parallel_tasks" name="enable">
|
||||
<option value="1">1</option>
|
||||
<option value="2">2</option>
|
||||
<option value="3">3</option>
|
||||
<option value="4">4</option>
|
||||
</select>
|
||||
<input type="submit" value="Submit">
|
||||
</form>
|
||||
<h2>Workers</h2>
|
||||
<div class="workers">
|
||||
{{range $w := .Workers}}
|
||||
|
@ -60,6 +78,22 @@
|
|||
<td>
|
||||
{{ $w.ConnectionChanged }}
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>
|
||||
QueueEnabled
|
||||
</td>
|
||||
<td>
|
||||
{{ $w.QueueEnabled }}
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>
|
||||
ParallelTasks
|
||||
</td>
|
||||
<td>
|
||||
{{ $w.ParallelTasks }}
|
||||
</td>
|
||||
</tr>
|
||||
{{if $w.Connected}}
|
||||
<tr>
|
||||
|
@ -95,8 +129,8 @@
|
|||
</td>
|
||||
</tr>
|
||||
{{end}}
|
||||
</table>
|
||||
</table>
|
||||
</div>
|
||||
{{end}}
|
||||
</div>
|
||||
{{template "tail"}}
|
||||
{{template "tail"}}
|
||||
|
|
Loading…
Add table
Reference in a new issue