Task Log display

This commit is contained in:
Samuel Lorch 2024-05-09 04:49:46 +02:00
parent f31fdfd3b5
commit 7a0564a210
4 changed files with 143 additions and 12 deletions

View file

@ -14,7 +14,7 @@ type IndexData struct {
Counter []int
Workers []IndexWorker
Tasks []Task
Tasks []TaskDisplay
}
type IndexWorker struct {
@ -22,7 +22,7 @@ type IndexWorker struct {
Status *types.WorkerStatus
}
type Task struct {
type TaskDisplay struct {
ID int `db:"id"`
Library int `db:"library"`
Worker *string `db:"worker"`
@ -75,7 +75,7 @@ func handleIndex(w http.ResponseWriter, r *http.Request) {
http.Error(w, "Error Query Tasks: "+err.Error(), http.StatusInternalServerError)
return
}
tasks, err := pgx.CollectRows[Task](rows, pgx.RowToStructByName[Task])
tasks, err := pgx.CollectRows[TaskDisplay](rows, pgx.RowToStructByName[TaskDisplay])
if err != nil {
slog.ErrorContext(r.Context(), "Executing index Template", "err", err)
http.Error(w, "Error Query Tasks: "+err.Error(), http.StatusInternalServerError)

View file

@ -74,7 +74,8 @@ func Start(_conf config.Config, tmplFS embed.FS, staticFS embed.FS, migrationsFS
mux := http.NewServeMux()
mux.HandleFunc("/worker", handleWorkerWebsocket)
mux.Handle("/static/", fs)
mux.HandleFunc("/tasks", handleTask)
mux.HandleFunc("/tasks", handleTasks)
mux.HandleFunc("/tasks/{id}", handleTask)
mux.HandleFunc("/scan/{id}", handleScan)
mux.HandleFunc("/libraries/{id}", handleLibrary)
mux.HandleFunc("/libraries", handleLibraries)
@ -102,7 +103,7 @@ func Start(_conf config.Config, tmplFS embed.FS, staticFS embed.FS, migrationsFS
}()
stopCleanup := make(chan bool, 1)
go cleanupDeadWorkers(stopCleanup)
go manageWorkers(stopCleanup)
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, os.Interrupt)

View file

@ -3,20 +3,22 @@ package server
import (
"bytes"
"context"
"encoding/json"
"fmt"
"log/slog"
"net/http"
"git.lastassault.de/speatzle/morffix/constants"
"git.lastassault.de/speatzle/morffix/types"
"github.com/jackc/pgx/v5"
)
type TaskData struct {
type TasksData struct {
Libraries []Library
Tasks []Task
Tasks []TaskDisplay
}
func handleTask(w http.ResponseWriter, r *http.Request) {
func handleTasks(w http.ResponseWriter, r *http.Request) {
if r.Method == "POST" {
err := createTask(r.Context(), r)
if err != nil {
@ -26,7 +28,7 @@ func handleTask(w http.ResponseWriter, r *http.Request) {
}
}
data := TaskData{}
data := TasksData{}
rows, err := db.Query(r.Context(), "SELECT id, name, path, enable FROM libraries WHERE enable = $1", true)
if err != nil {
@ -48,7 +50,7 @@ func handleTask(w http.ResponseWriter, r *http.Request) {
http.Error(w, "Error Query Tasks: "+err.Error(), http.StatusInternalServerError)
return
}
tasks, err := pgx.CollectRows[Task](rows, pgx.RowToStructByName[Task])
tasks, err := pgx.CollectRows[TaskDisplay](rows, pgx.RowToStructByName[TaskDisplay])
if err != nil {
slog.ErrorContext(r.Context(), "Collect Tasks", "err", err)
http.Error(w, "Error Query Tasks: "+err.Error(), http.StatusInternalServerError)
@ -57,7 +59,40 @@ func handleTask(w http.ResponseWriter, r *http.Request) {
data.Tasks = tasks
buf := bytes.Buffer{}
err = templates.ExecuteTemplate(&buf, constants.TASK_TEMPLATE_NAME, data)
err = templates.ExecuteTemplate(&buf, constants.TASKS_TEMPLATE_NAME, data)
if err != nil {
slog.ErrorContext(r.Context(), "Executing Task Template", "err", err)
http.Error(w, "Error Executing Template: "+err.Error(), http.StatusInternalServerError)
return
}
_, err = w.Write(buf.Bytes())
if err != nil {
slog.ErrorContext(r.Context(), "Writing http Response", "err", err)
}
}
func handleTask(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
if id == "" {
handleTasks(w, r)
return
}
var log []string
err := db.QueryRow(r.Context(), "SELECT log FROM tasks WHERE id = $1", id).Scan(&log)
if err != nil {
slog.ErrorContext(r.Context(), "Query Tasks", "err", err)
http.Error(w, "Error Query Tasks: "+err.Error(), http.StatusInternalServerError)
return
}
t := types.Task{
Log: log,
}
buf := bytes.Buffer{}
err = templates.ExecuteTemplate(&buf, constants.TASK_TEMPLATE_NAME, t)
if err != nil {
slog.ErrorContext(r.Context(), "Executing Task Template", "err", err)
http.Error(w, "Error Executing Template: "+err.Error(), http.StatusInternalServerError)
@ -94,8 +129,19 @@ func createTask(ctx context.Context, r *http.Request) error {
}
defer tx.Rollback(ctx)
var data any
if true { // TODO typ == constants.TASK_TYPE_HEALTHCHECK {
// ffmpeg.exe -stats -v error -i "in.mp4" -f null -max_muxing_queue_size 9999 "out.mp4"
data = types.FFmpegCommand{
Args: []types.Arg{{Flag: "-stats"}, {Flag: "-v", Value: "error"}},
InputFiles: []types.File{{Path: "input.mkv"}},
OutputFiles: []types.File{{Path: "output.mkv", Arguments: []types.Arg{{Flag: "-f", Value: "null"}, {Flag: "-max_muxing_queue_size", Value: "9999"}}}},
}
}
for _, file := range files {
_, err = db.Exec(r.Context(), "INSERT INTO tasks (file_id, type, status) VALUES ($1,$2,$3)", file.ID, typ, 0)
_, err = db.Exec(r.Context(), "INSERT INTO tasks (file_id, type, status, data) VALUES ($1,$2,$3,$4)", file.ID, typ, constants.TASK_STATUS_QUEUED, data)
if err != nil {
return fmt.Errorf("Inserting Task: %w", err)
}
@ -108,3 +154,78 @@ func createTask(ctx context.Context, r *http.Request) error {
return nil
}
type QueuedTask struct {
ID int
Type int
FileID int `json:"file_id"`
Data json.RawMessage
}
func assignQueuedTasks(ctx context.Context) error {
rows, err := db.Query(ctx, "SELECT id, type, file_id, data FROM tasks WHERE status = $1", constants.TASK_STATUS_QUEUED)
if err != nil {
return fmt.Errorf("Query Queued Tasks: %w", err)
}
queuedTasks, err := pgx.CollectRows[QueuedTask](rows, pgx.RowToStructByName[QueuedTask])
if err != nil {
return fmt.Errorf("Collect Queued Tasks: %w", err)
}
if len(queuedTasks) == 0 {
return nil
}
WorkersMutex.Lock()
defer WorkersMutex.Unlock()
lastAssigned := 0
for i := range Workers {
if lastAssigned == len(queuedTasks)-1 {
// All Tasks have been Assigned
return nil
}
if Workers[i].Connected {
var count int
err := db.QueryRow(ctx, "SELECT COUNT(*) FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4 OR status = $5)", i, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING, constants.TASK_STATUS_UNKNOWN).Scan(&count)
if err != nil {
return fmt.Errorf("Error Querying Worker Task Count: %w", err)
}
// Allow for Multiple Tasks at once in the future
if count < 1 {
tx, err := db.Begin(ctx)
defer tx.Rollback(ctx)
if err != nil {
return fmt.Errorf("Starting Transaction: %w", err)
}
_, err = tx.Exec(ctx, "UPDATE tasks SET status = $2, worker_id = $3 WHERE id = $1", queuedTasks[lastAssigned].ID, constants.TASK_STATUS_ASSIGNED, i)
if err != nil {
return fmt.Errorf("Setting tasks Assignment: %w", err)
}
taskStart := types.TaskStart{
ID: queuedTasks[lastAssigned].ID,
Type: queuedTasks[lastAssigned].Type,
FileID: queuedTasks[lastAssigned].FileID,
Data: queuedTasks[lastAssigned].Data,
}
_, err = rpcServer.Call(ctx, Workers[i].Conn, "task-start", taskStart, nil)
if err != nil {
return fmt.Errorf("Error Starting Task: %w", err)
}
err = tx.Commit(ctx)
if err != nil {
return fmt.Errorf("Error Committing Transaction: %w", err)
}
slog.InfoContext(ctx, "Assigned Task to Worker", "task", queuedTasks[lastAssigned].ID, "worker", i)
lastAssigned++
}
}
}
return nil
}

9
tmpl/task.tmpl Normal file
View file

@ -0,0 +1,9 @@
{{template "head"}}
<h2>Task {{.ID}}</h2>
<div class="task-log">
{{range $t := .Log}}
{{ $t }}<br/>
{{end}}
</div>
{{template "tail"}}