diff --git a/config/config.go b/config/config.go index 9194e79..44d8a30 100644 --- a/config/config.go +++ b/config/config.go @@ -12,6 +12,9 @@ type Server struct { } type Worker struct { - Address string - Name string + ID string + TempDir string + Address string + Name string + FFmpegPath string } diff --git a/constants/constants.go b/constants/constants.go index 6521c44..f50fc3a 100644 --- a/constants/constants.go +++ b/constants/constants.go @@ -1,5 +1,10 @@ package constants +import "fmt" + +const WORKER_VERSION = "v1" + +const WORKER_VERSION_HEADER = "morffix-version" const SHARED_SECRET_HEADER = "morffix-secret" const NAME_HEADER = "morffix-name" const UUID_HEADER = "morffix-uuid" @@ -8,8 +13,47 @@ const INDEX_TEMPLATE_NAME = "index.tmpl" const LIBRARIES_TEMPLATE_NAME = "libraries.tmpl" const LIBRARY_TEMPLATE_NAME = "library.tmpl" const MESSAGE_TEMPLATE_NAME = "message.tmpl" -const TASK_TEMPLATE_NAME = "tasks.tmpl" +const TASKS_TEMPLATE_NAME = "tasks.tmpl" +const TASK_TEMPLATE_NAME = "task.tmpl" const ( - HEALTHCHECK_TASK_TYPE = iota + TASK_TYPE_HEALTHCHECK = iota + TASK_TYPE_TRANSCODE ) + +type TaskStatus int + +// Non Append Changes Need Worker Version Bump +const ( + TASK_STATUS_UNKNOWN TaskStatus = iota + TASK_STATUS_FAILED + TASK_STATUS_SUCCESS + TASK_STATUS_RUNNING + TASK_STATUS_QUEUED + TASK_STATUS_ASSIGNED + TASK_STATUS_PAUSED + TASK_STATUS_WAITING +) + +func (s TaskStatus) String() string { + switch s { + case TASK_STATUS_UNKNOWN: + return "Unknown" + case TASK_STATUS_FAILED: + return "Failed" + case TASK_STATUS_SUCCESS: + return "Success" + case TASK_STATUS_RUNNING: + return "Running" + case TASK_STATUS_QUEUED: + return "Queued" + case TASK_STATUS_ASSIGNED: + return "Assigned" + case TASK_STATUS_PAUSED: + return "Paused" + case TASK_STATUS_WAITING: + return "Waiting" + default: + return fmt.Sprintf("%d", int(s)) + } +} diff --git a/go.mod b/go.mod index cac59c0..1cf601b 100644 --- a/go.mod +++ b/go.mod @@ -18,9 +18,12 @@ require ( github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 // indirect github.com/jackc/puddle/v2 v2.2.1 // indirect + github.com/samber/lo v1.38.1 // indirect + github.com/samber/slog-multi v1.0.2 // indirect go.uber.org/atomic v1.11.0 // indirect - golang.org/x/crypto v0.22.0 // indirect + golang.org/x/crypto v0.23.0 // indirect + golang.org/x/exp v0.0.0-20230315142452-642cacee5cc0 // indirect golang.org/x/sync v0.7.0 // indirect - golang.org/x/sys v0.19.0 // indirect - golang.org/x/text v0.14.0 // indirect + golang.org/x/sys v0.20.0 // indirect + golang.org/x/text v0.15.0 // indirect ) diff --git a/go.sum b/go.sum index 2bd6758..576771c 100644 --- a/go.sum +++ b/go.sum @@ -28,6 +28,10 @@ github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFr github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= github.com/mackerelio/go-osstat v0.2.4/go.mod h1:Zy+qzGdZs3A9cuIqmgbJvwbmLQH9dJvtio5ZjJTbdlQ= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM= +github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA= +github.com/samber/slog-multi v1.0.2 h1:6BVH9uHGAsiGkbbtQgAOQJMpKgV8unMrHhhJaw+X1EQ= +github.com/samber/slog-multi v1.0.2/go.mod h1:uLAvHpGqbYgX4FSL0p1ZwoLuveIAJvBECtE07XmYvFo= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -41,6 +45,10 @@ golang.org/x/crypto v0.20.0 h1:jmAMJJZXr5KiCw05dfYK9QnqaqKLYXijU23lsEdcQqg= golang.org/x/crypto v0.20.0/go.mod h1:Xwo95rrVNIoSMx9wa1JroENMToLWn3RNVrTBpLHgZPQ= golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30= golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M= +golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI= +golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= +golang.org/x/exp v0.0.0-20230315142452-642cacee5cc0 h1:pVgRXcIictcr+lBQIFeiwuwtDIs4eL21OuM9nyAADmo= +golang.org/x/exp v0.0.0-20230315142452-642cacee5cc0/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE= @@ -53,8 +61,12 @@ golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= +golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= +golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= nhooyr.io/websocket v1.8.11 h1:f/qXNc2/3DpoSZkHt1DQu6rj4zGC8JmkkLkWss0MgN0= diff --git a/main.go b/main.go index 13c67af..5eb7672 100644 --- a/main.go +++ b/main.go @@ -40,6 +40,14 @@ func main() { return } + if conf.Worker.FFmpegPath == "" { + conf.Worker.FFmpegPath = "ffmpeg" + } + + if conf.Worker.TempDir == "" { + conf.Worker.TempDir = "/tmp" + } + if *isserver { slog.Info("Starting Server...") server.Start(conf, templates, static, migrations) diff --git a/migrations/000004_create_task_table.up.sql b/migrations/000004_create_task_table.up.sql index 25e4e69..54f1a77 100644 --- a/migrations/000004_create_task_table.up.sql +++ b/migrations/000004_create_task_table.up.sql @@ -3,6 +3,8 @@ CREATE TABLE IF NOT EXISTS tasks( id serial PRIMARY KEY, worker_id uuid REFERENCES workers(id), file_id integer REFERENCES files(id) NOT NULL, + type smallint NOT NULL, status smallint NOT NULL, - type smallint NOT NULL + data JSONB, + log text[] ); \ No newline at end of file diff --git a/server/file.go b/server/file.go new file mode 100644 index 0000000..3fb7c84 --- /dev/null +++ b/server/file.go @@ -0,0 +1,26 @@ +package server + +import ( + "log/slog" + "net/http" +) + +func handleFile(w http.ResponseWriter, r *http.Request) { + id := r.PathValue("id") + if id == "" { + http.Error(w, "No id", http.StatusBadRequest) + return + } + + // TODO check if worker is working on a task involving this file + + var path string + err := db.QueryRow(r.Context(), "SELECT path FROM files WHERE id = $1", id).Scan(&path) + if err != nil { + http.Error(w, "Error Getting Path: "+err.Error(), http.StatusBadRequest) + slog.ErrorContext(r.Context(), "Getting Path", "err", err) + return + } + + http.ServeFile(w, r, path) +} diff --git a/server/index.go b/server/index.go index b6b703f..b736a00 100644 --- a/server/index.go +++ b/server/index.go @@ -7,14 +7,12 @@ import ( "git.lastassault.de/speatzle/morffix/constants" "git.lastassault.de/speatzle/morffix/types" - "github.com/jackc/pgx/v5" ) type IndexData struct { Counter []int Workers []IndexWorker - Tasks []Task } type IndexWorker struct { @@ -22,24 +20,11 @@ type IndexWorker struct { Status *types.WorkerStatus } -var count = 123456789 - -type Task struct { - ID int `db:"id"` - Library int `db:"library"` - Worker *string `db:"worker"` - Type int `db:"type"` - Status int `db:"status"` - File string `db:"file"` -} - func handleIndex(w http.ResponseWriter, r *http.Request) { data := IndexData{ - Counter: splitInt(count), Workers: []IndexWorker{}, } - count++ func() { WorkersMutex.Lock() defer WorkersMutex.Unlock() @@ -66,19 +51,12 @@ func handleIndex(w http.ResponseWriter, r *http.Request) { } }() - rows, err := db.Query(r.Context(), "SELECT t.id AS id, l.id AS library, t.worker_id AS worker, t.type AS type, t.status AS status, f.path AS file FROM tasks t INNER JOIN files f ON f.id = t.file_id INNER JOIN libraries l ON l.id = f.library_id") + var size int + err := db.QueryRow(r.Context(), "SELECT SUM(size) AS size FROM files WHERE missing = $1", false).Scan(&size) if err != nil { - slog.ErrorContext(r.Context(), "Query Tasks", "err", err) - http.Error(w, "Error Query Tasks: "+err.Error(), http.StatusInternalServerError) - return + size = 0 } - tasks, err := pgx.CollectRows[Task](rows, pgx.RowToStructByName[Task]) - if err != nil { - slog.ErrorContext(r.Context(), "Executing index Template", "err", err) - http.Error(w, "Error Query Tasks: "+err.Error(), http.StatusInternalServerError) - return - } - data.Tasks = tasks + data.Counter = splitInt(size) buf := bytes.Buffer{} err = templates.ExecuteTemplate(&buf, constants.INDEX_TEMPLATE_NAME, data) diff --git a/server/server.go b/server/server.go index 9119dbd..7992fba 100644 --- a/server/server.go +++ b/server/server.go @@ -74,7 +74,9 @@ func Start(_conf config.Config, tmplFS embed.FS, staticFS embed.FS, migrationsFS mux := http.NewServeMux() mux.HandleFunc("/worker", handleWorkerWebsocket) mux.Handle("/static/", fs) - mux.HandleFunc("/tasks", handleTask) + mux.HandleFunc("/tasks", handleTasks) + mux.HandleFunc("/files/{id}", handleFile) + mux.HandleFunc("/tasks/{id}", handleTask) mux.HandleFunc("/scan/{id}", handleScan) mux.HandleFunc("/libraries/{id}", handleLibrary) mux.HandleFunc("/libraries", handleLibraries) @@ -102,7 +104,7 @@ func Start(_conf config.Config, tmplFS embed.FS, staticFS embed.FS, migrationsFS }() stopCleanup := make(chan bool, 1) - go cleanupDeadWorkers(stopCleanup) + go manageWorkers(stopCleanup) sigs := make(chan os.Signal, 1) signal.Notify(sigs, os.Interrupt) diff --git a/server/task.go b/server/task.go index a3f8000..aafc7ee 100644 --- a/server/task.go +++ b/server/task.go @@ -3,20 +3,40 @@ package server import ( "bytes" "context" + "encoding/json" "fmt" "log/slog" "net/http" "git.lastassault.de/speatzle/morffix/constants" + "git.lastassault.de/speatzle/morffix/types" "github.com/jackc/pgx/v5" ) -type TaskData struct { +type TasksData struct { Libraries []Library - Tasks []Task + Tasks []TaskDisplay } -func handleTask(w http.ResponseWriter, r *http.Request) { +type TaskDisplay struct { + ID int `db:"id"` + Library int `db:"library"` + Worker *string `db:"worker"` + Type int `db:"type"` + Status string `db:"status"` + File string `db:"file"` +} + +type TaskDB struct { + ID int `db:"id"` + Library int `db:"library"` + Worker *string `db:"worker"` + Type int `db:"type"` + Status constants.TaskStatus `db:"status"` + File string `db:"file"` +} + +func handleTasks(w http.ResponseWriter, r *http.Request) { if r.Method == "POST" { err := createTask(r.Context(), r) if err != nil { @@ -26,7 +46,7 @@ func handleTask(w http.ResponseWriter, r *http.Request) { } } - data := TaskData{} + data := TasksData{} rows, err := db.Query(r.Context(), "SELECT id, name, path, enable FROM libraries WHERE enable = $1", true) if err != nil { @@ -48,16 +68,58 @@ func handleTask(w http.ResponseWriter, r *http.Request) { http.Error(w, "Error Query Tasks: "+err.Error(), http.StatusInternalServerError) return } - tasks, err := pgx.CollectRows[Task](rows, pgx.RowToStructByName[Task]) + tasks, err := pgx.CollectRows[TaskDB](rows, pgx.RowToStructByName[TaskDB]) if err != nil { slog.ErrorContext(r.Context(), "Collect Tasks", "err", err) http.Error(w, "Error Query Tasks: "+err.Error(), http.StatusInternalServerError) return } - data.Tasks = tasks + for i := range tasks { + data.Tasks = append(data.Tasks, TaskDisplay{ + ID: tasks[i].ID, + Library: tasks[i].Library, + Worker: tasks[i].Worker, + Type: tasks[i].Type, + File: tasks[i].File, + Status: tasks[i].Status.String(), + }) + } buf := bytes.Buffer{} - err = templates.ExecuteTemplate(&buf, constants.TASK_TEMPLATE_NAME, data) + err = templates.ExecuteTemplate(&buf, constants.TASKS_TEMPLATE_NAME, data) + if err != nil { + slog.ErrorContext(r.Context(), "Executing Task Template", "err", err) + http.Error(w, "Error Executing Template: "+err.Error(), http.StatusInternalServerError) + return + } + + _, err = w.Write(buf.Bytes()) + if err != nil { + slog.ErrorContext(r.Context(), "Writing http Response", "err", err) + } +} + +func handleTask(w http.ResponseWriter, r *http.Request) { + id := r.PathValue("id") + if id == "" { + handleTasks(w, r) + return + } + + var log []string + err := db.QueryRow(r.Context(), "SELECT log FROM tasks WHERE id = $1", id).Scan(&log) + if err != nil { + slog.ErrorContext(r.Context(), "Query Tasks", "err", err) + http.Error(w, "Error Query Tasks: "+err.Error(), http.StatusInternalServerError) + return + } + + t := types.Task{ + Log: log, + } + + buf := bytes.Buffer{} + err = templates.ExecuteTemplate(&buf, constants.TASK_TEMPLATE_NAME, t) if err != nil { slog.ErrorContext(r.Context(), "Executing Task Template", "err", err) http.Error(w, "Error Executing Template: "+err.Error(), http.StatusInternalServerError) @@ -94,8 +156,19 @@ func createTask(ctx context.Context, r *http.Request) error { } defer tx.Rollback(ctx) + var data any + if true { // TODO typ == constants.TASK_TYPE_HEALTHCHECK { + + // ffmpeg.exe -stats -v error -i "in.mp4" -f null -max_muxing_queue_size 9999 "out.mp4" + data = types.HealthCheckData{Command: types.FFmpegCommand{ + Args: []types.Arg{{Flag: "-stats"}, {Flag: "-v", Value: "error"}, {Flag: "-xerror"}}, + InputFiles: []types.File{{Path: "input.mkv"}}, + OutputFiles: []types.File{{Path: "output.mkv", Arguments: []types.Arg{{Flag: "-f", Value: "null"}, {Flag: "-max_muxing_queue_size", Value: "9999"}}}}, + }} + } + for _, file := range files { - _, err = db.Exec(r.Context(), "INSERT INTO tasks (file_id, type, status) VALUES ($1,$2,$3)", file.ID, typ, 0) + _, err = db.Exec(r.Context(), "INSERT INTO tasks (file_id, type, status, data) VALUES ($1,$2,$3,$4)", file.ID, typ, constants.TASK_STATUS_QUEUED, data) if err != nil { return fmt.Errorf("Inserting Task: %w", err) } @@ -108,3 +181,83 @@ func createTask(ctx context.Context, r *http.Request) error { return nil } + +type QueuedTask struct { + ID int + Type int + FileID int `json:"file_id"` + Data json.RawMessage +} + +func assignQueuedTasks(ctx context.Context) error { + rows, err := db.Query(ctx, "SELECT id, type, file_id, data FROM tasks WHERE status = $1", constants.TASK_STATUS_QUEUED) + if err != nil { + return fmt.Errorf("Query Queued Tasks: %w", err) + } + queuedTasks, err := pgx.CollectRows[QueuedTask](rows, pgx.RowToStructByName[QueuedTask]) + if err != nil { + return fmt.Errorf("Collect Queued Tasks: %w", err) + } + + //slog.Info("Assigning Queued Tasks", "count", len(queuedTasks)) + + if len(queuedTasks) == 0 { + return nil + } + + WorkersMutex.Lock() + defer WorkersMutex.Unlock() + + lastAssigned := 0 + + for i := range Workers { + if lastAssigned == len(queuedTasks) { + slog.Info("All Tasks assigned") + // All Tasks have been Assigned + return nil + } + if Workers[i].Connected { + var count int + err := db.QueryRow(ctx, "SELECT COUNT(*) FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4)", i, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING).Scan(&count) + if err != nil { + return fmt.Errorf("Error Querying Worker Task Count: %w", err) + } + + slog.Info("Assigning Queued Tasks Worker", "worker", i, "count", count) + + // Allow for Multiple Tasks at once in the future + if count < 1 { + tx, err := db.Begin(ctx) + defer tx.Rollback(ctx) + if err != nil { + return fmt.Errorf("Starting Transaction: %w", err) + } + _, err = tx.Exec(ctx, "UPDATE tasks SET status = $2, worker_id = $3 WHERE id = $1", queuedTasks[lastAssigned].ID, constants.TASK_STATUS_ASSIGNED, i) + if err != nil { + return fmt.Errorf("Setting tasks Assignment: %w", err) + } + + taskStart := types.TaskStart{ + ID: queuedTasks[lastAssigned].ID, + Type: queuedTasks[lastAssigned].Type, + FileID: queuedTasks[lastAssigned].FileID, + Data: queuedTasks[lastAssigned].Data, + } + + _, err = rpcServer.Call(ctx, Workers[i].Conn, "task-start", taskStart, nil) + if err != nil { + return fmt.Errorf("Error Starting Task: %w", err) + } + + err = tx.Commit(ctx) + if err != nil { + return fmt.Errorf("Error Committing Transaction: %w", err) + } + + slog.InfoContext(ctx, "Assigned Task to Worker", "task", queuedTasks[lastAssigned].ID, "worker", i) + lastAssigned++ + } + } + } + return nil +} diff --git a/server/worker.go b/server/worker.go index 0390462..17a44e4 100644 --- a/server/worker.go +++ b/server/worker.go @@ -8,6 +8,8 @@ import ( "time" "git.lastassault.de/speatzle/morffix/rpc" + "git.lastassault.de/speatzle/morffix/types" + "github.com/jackc/pgx/v5" "git.lastassault.de/speatzle/morffix/constants" @@ -32,6 +34,25 @@ func handleWorkerWebsocket(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusUnauthorized) return } + if r.Header.Get(constants.WORKER_VERSION_HEADER) != constants.WORKER_VERSION { + w.WriteHeader(http.StatusNotImplemented) + return + } + // Connection ID + uuid := r.Header.Get(constants.UUID_HEADER) + if uuid == "" { + w.WriteHeader(http.StatusBadRequest) + return + } + name := r.Header.Get(constants.NAME_HEADER) + + _, err := db.Exec(r.Context(), "INSERT INTO workers (id, name) VALUES ($1, $2) ON CONFLICT (id) DO UPDATE SET name = $2", uuid, name) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + slog.ErrorContext(r.Context(), "Error Upserting Worker", "err", err) + return + } + c, err := websocket.Accept(w, r, &websocket.AcceptOptions{ InsecureSkipVerify: true, }) @@ -41,9 +62,6 @@ func handleWorkerWebsocket(w http.ResponseWriter, r *http.Request) { } defer c.CloseNow() - // Connection ID - uuid := r.Header.Get(constants.UUID_HEADER) - // Track Connection func() { WorkersMutex.Lock() @@ -66,7 +84,7 @@ func handleWorkerWebsocket(w http.ResponseWriter, r *http.Request) { } Workers[uuid] = &Worker{ - Name: r.Header.Get(constants.NAME_HEADER), + Name: name, Address: r.RemoteAddr, Conn: c, Connected: true, @@ -110,26 +128,102 @@ func readMessage(ctx context.Context, c *websocket.Conn) error { return nil } -func cleanupDeadWorkers(stop chan bool) { - ticker := time.NewTicker(time.Second) +func manageWorkers(stop chan bool) { + deadTicker := time.NewTicker(time.Second) + statusTicker := time.NewTicker(time.Second) + assignTicker := time.NewTicker(time.Second) + for { select { - case <-ticker.C: - func() { - WorkersMutex.Lock() - defer WorkersMutex.Unlock() - - for uuid, w := range Workers { - // TODO Not dead Workers are sometimes also killed? - if !w.Connected && w.ConnectionChanged.Add(time.Minute*5).Before(time.Now()) { - slog.Warn("Removing Dead Worker", "uuid", uuid, "name", w.Name) - delete(Workers, uuid) - // TODO Free any Jobs that the Worker had - } - } - }() + case <-deadTicker.C: + killDeadWorkers() + case <-statusTicker.C: + updateWorkerTaskStatus(context.TODO()) + case <-assignTicker.C: + err := assignQueuedTasks(context.TODO()) + if err != nil { + slog.Error("Assigning Queued Tasks", "err", err) + } case <-stop: return } } } + +func killDeadWorkers() { + WorkersMutex.Lock() + defer WorkersMutex.Unlock() + + for uuid, w := range Workers { + // TODO Not dead Workers are sometimes also killed? + if !w.Connected && w.ConnectionChanged.Add(time.Minute*5).Before(time.Now()) { + slog.Warn("Removing Dead Worker", "uuid", uuid, "name", w.Name) + delete(Workers, uuid) + // TODO Free any Jobs that the Worker had + } + } +} + +func updateWorkerTaskStatus(ctx context.Context) { + var wg sync.WaitGroup + + func() { + WorkersMutex.Lock() + defer WorkersMutex.Unlock() + + for uuid := range Workers { + if Workers[uuid].Connected { + w := Workers[uuid] + + rows, err := db.Query(ctx, "SELECT id, COALESCE(CARDINALITY(log),0) as log_offset FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4)", uuid, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING) + if err != nil { + slog.ErrorContext(ctx, "Error Getting Tasks for Worker", "err", err, "worker_id", uuid) + return + } + + taskStatusRequests, err := pgx.CollectRows[types.TaskStatusRequest](rows, pgx.RowToStructByName[types.TaskStatusRequest]) + if err != nil { + slog.ErrorContext(ctx, "Collect Task Status Rows", "err", err) + return + } + + for _, tsr := range taskStatusRequests { + wg.Add(1) + + go func() { + defer wg.Done() + var ts types.TaskStatus + _, err := rpcServer.Call(ctx, w.Conn, "task-status", tsr, &ts) + if err != nil { + slog.ErrorContext(ctx, "Error Getting Task Status", "err", err) + // TODO Mark Task as Unknown? + return + } + + _, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", ts.Task.ID, ts.Task.Status, ts.Task.Log) + if err != nil { + slog.ErrorContext(ctx, "Error Updating Task Status", "err", err) + return + } + + // Tell Worker to Delete Finished Tasks + if ts.Task.Status == constants.TASK_STATUS_FAILED || ts.Task.Status == constants.TASK_STATUS_SUCCESS { + _, err := rpcServer.Call(ctx, w.Conn, "task-delete", ts.Task.ID, nil) + if err != nil { + slog.ErrorContext(ctx, "Error Deleting Finished Task From Worker", "err", err) + return + } + } + + }() + } + // TODO Handle tasks with status unkown assigned to this worker + + } else { + // Set Task Status to Unknown for Unfinished Tasks which where assigned to this not connected worker, right now they just get stuck + } + } + }() + + wg.Wait() +} diff --git a/task/healthcheck.go b/task/healthcheck.go new file mode 100644 index 0000000..5d2fa3e --- /dev/null +++ b/task/healthcheck.go @@ -0,0 +1,157 @@ +package task + +import ( + "bufio" + "bytes" + "context" + "fmt" + "io" + "net/http" + "os" + "os/exec" + "path/filepath" + "sync" + + "git.lastassault.de/speatzle/morffix/config" + "git.lastassault.de/speatzle/morffix/constants" + "git.lastassault.de/speatzle/morffix/task/log" + "git.lastassault.de/speatzle/morffix/types" +) + +// dropCR drops a terminal \r from the data. +func dropCR(data []byte) []byte { + if len(data) > 0 && data[len(data)-1] == '\r' { + return data[0 : len(data)-1] + } + return data +} + +func scanLines(data []byte, atEOF bool) (advance int, token []byte, err error) { + if atEOF && len(data) == 0 { + return 0, nil, nil + } + if i := bytes.IndexByte(data, '\n'); i >= 0 { + // We have a full newline-terminated line. + return i + 1, dropCR(data[0:i]), nil + } + if i := bytes.IndexByte(data, '\r'); i >= 0 { + // We have a return line. + return i + 1, dropCR(data[0:i]), nil + } + // If we're at EOF, we have a final, non-terminated line. Return it. + if atEOF { + return len(data), dropCR(data), nil + } + // Request more data. + return 0, nil, nil +} + +func RunHealthCheck(conf config.Config, t *types.Task, data types.HealthCheckData) { + ctx := context.TODO() + l := log.GetTaskLogger(t) + + // TODO Figure out how to get correct file ending + path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("%v-%v.mkv", t.ID, t.FileID)) + + // Set ffmpeg input path + if len(data.Command.InputFiles) == 0 { + l.ErrorContext(ctx, "FFmpeg Command has no input files") + return + } + + data.Command.InputFiles[0].Path = path + + // TODO cleanup file when done + defer func() { + err := os.Remove(path) + if err != nil { + l.ErrorContext(ctx, "Removing File", "err", err, "path", path) + } else { + l.ErrorContext(ctx, "File Removed Succesfully", "path", path) + } + }() + + l.InfoContext(ctx, "Starting File Download", "task_id", t.ID, "file_id", t.FileID, "path", path) + + err := func() error { + out, err := os.Create(path) + if err != nil { + return fmt.Errorf("Creating File: %w", err) + } + defer out.Close() + resp, err := http.Get(fmt.Sprintf("%v/files/%v", conf.Worker.Address, t.FileID)) + if err != nil { + return fmt.Errorf("Getting File: %w", err) + } + + // TODO Log at interval logs read + + defer resp.Body.Close() + n, err := io.Copy(out, resp.Body) + if err != nil { + return fmt.Errorf("Reading File: %w", err) + } + + l.InfoContext(ctx, "Downloaded File", "bytes", n) + + return nil + }() + + if err != nil { + l.ErrorContext(ctx, "File Download Failed", "err", err) + return + } + + l.InfoContext(ctx, "Running ffmpeg", "args", data.Command.GetArgs()) + cmd := exec.CommandContext(ctx, conf.Worker.FFmpegPath, data.Command.GetArgs()...) + + var wg sync.WaitGroup + + stdout, err := cmd.StdoutPipe() + if err != nil { + l.ErrorContext(ctx, "Error getting StdoutPipe", "err", err) + return + } + + wg.Add(1) + outScanner := bufio.NewScanner(stdout) + outScanner.Split(scanLines) + go func() { + for outScanner.Scan() { + l.InfoContext(ctx, outScanner.Text()) + } + wg.Done() + }() + + stderr, err := cmd.StderrPipe() + if err != nil { + l.ErrorContext(ctx, "Error getting StderrPipe", "err", err) + return + } + wg.Add(1) + errScanner := bufio.NewScanner(stderr) + errScanner.Split(scanLines) + go func() { + for errScanner.Scan() { + l.InfoContext(ctx, errScanner.Text()) + } + wg.Done() + }() + + err = cmd.Start() + if err != nil { + l.ErrorContext(ctx, "Error Starting ffmpeg", "err", err) + return + } + + wg.Wait() + + err = cmd.Wait() + if err != nil { + l.ErrorContext(ctx, "Error Running ffmpeg", "err", err) + return + } + + l.InfoContext(ctx, "Task Success") + t.Status = constants.TASK_STATUS_SUCCESS +} diff --git a/task/log/log.go b/task/log/log.go new file mode 100644 index 0000000..6ef30dc --- /dev/null +++ b/task/log/log.go @@ -0,0 +1,145 @@ +package log + +import ( + "context" + "fmt" + "log/slog" + "sync" + "time" + + "git.lastassault.de/speatzle/morffix/types" + slogmulti "github.com/samber/slog-multi" +) + +func GetTaskLogger(t *types.Task) *slog.Logger { + return slog.New( + slogmulti.Fanout( + slog.Default().Handler(), + New(t, &Options{slog.LevelDebug}), + ), + ) +} + +// groupOrAttrs holds either a group name or a list of slog.Attrs. +type groupOrAttrs struct { + group string // group name if non-empty + attrs []slog.Attr // attrs if non-empty +} + +type TaskHandler struct { + opts Options + // TODO: state for WithGroup and WithAttrs + mu *sync.Mutex + t *types.Task + goas []groupOrAttrs +} + +type Options struct { + // Level reports the minimum level to log. + // Levels with lower levels are discarded. + // If nil, the Handler uses [slog.LevelInfo]. + Level slog.Leveler +} + +func New(t *types.Task, opts *Options) *TaskHandler { + h := &TaskHandler{t: t, mu: &sync.Mutex{}} + if opts != nil { + h.opts = *opts + } + if h.opts.Level == nil { + h.opts.Level = slog.LevelInfo + } + return h +} + +func (h *TaskHandler) Enabled(ctx context.Context, level slog.Level) bool { + return level >= h.opts.Level.Level() +} + +func (h *TaskHandler) Handle(ctx context.Context, r slog.Record) error { + buf := make([]byte, 0, 1024) + if !r.Time.IsZero() { + buf = h.appendAttr(buf, slog.Time(slog.TimeKey, r.Time)) + } + buf = h.appendAttr(buf, slog.String(slog.MessageKey, r.Message)) + // Handle state from WithGroup and WithAttrs. + goas := h.goas + if r.NumAttrs() == 0 { + // If the record has no Attrs, remove groups at the end of the list; they are empty. + for len(goas) > 0 && goas[len(goas)-1].group != "" { + goas = goas[:len(goas)-1] + } + } + for _, goa := range goas { + if goa.group == "" { + for _, a := range goa.attrs { + buf = h.appendAttr(buf, a) + } + } + } + r.Attrs(func(a slog.Attr) bool { + buf = h.appendAttr(buf, a) + return true + }) + h.mu.Lock() + defer h.mu.Unlock() + h.t.Log = append(h.t.Log, string(buf)) + return nil +} + +func (h *TaskHandler) appendAttr(buf []byte, a slog.Attr) []byte { + // Resolve the Attr's value before doing anything else. + a.Value = a.Value.Resolve() + // Ignore empty Attrs. + if a.Equal(slog.Attr{}) { + return buf + } + + switch a.Value.Kind() { + case slog.KindString: + // Quote string values, to make them easy to parse. + buf = fmt.Appendf(buf, "%s: %q\n", a.Key, a.Value.String()) + case slog.KindTime: + // Write times in a standard way, without the monotonic time. + buf = fmt.Appendf(buf, "%s: %s\n", a.Key, a.Value.Time().Format(time.DateTime)) + case slog.KindGroup: + attrs := a.Value.Group() + // Ignore empty groups. + if len(attrs) == 0 { + return buf + } + // If the key is non-empty, write it out and indent the rest of the attrs. + // Otherwise, inline the attrs. + if a.Key != "" { + buf = fmt.Appendf(buf, "%s:\n", a.Key) + } + for _, ga := range attrs { + buf = h.appendAttr(buf, ga) + } + default: + buf = fmt.Appendf(buf, "%s: %s\n", a.Key, a.Value) + } + return buf +} + +func (h *TaskHandler) withGroupOrAttrs(goa groupOrAttrs) *TaskHandler { + h2 := *h + h2.goas = make([]groupOrAttrs, len(h.goas)+1) + copy(h2.goas, h.goas) + h2.goas[len(h2.goas)-1] = goa + return &h2 +} + +func (h *TaskHandler) WithGroup(name string) slog.Handler { + if name == "" { + return h + } + return h.withGroupOrAttrs(groupOrAttrs{group: name}) +} + +func (h *TaskHandler) WithAttrs(attrs []slog.Attr) slog.Handler { + if len(attrs) == 0 { + return h + } + return h.withGroupOrAttrs(groupOrAttrs{attrs: attrs}) +} diff --git a/task/task.go b/task/task.go new file mode 100644 index 0000000..df7c51a --- /dev/null +++ b/task/task.go @@ -0,0 +1,88 @@ +package task + +import ( + "encoding/json" + "fmt" + "sync" + + "git.lastassault.de/speatzle/morffix/config" + "git.lastassault.de/speatzle/morffix/constants" + "git.lastassault.de/speatzle/morffix/types" +) + +var tasks = map[int]*types.Task{} +var taskMutex sync.Mutex + +func StartTask(conf config.Config, data types.TaskStart) error { + taskMutex.Lock() + defer taskMutex.Unlock() + + tasks[data.ID] = &types.Task{ + ID: data.ID, + Type: data.Type, + FileID: data.FileID, + } + + switch data.Type { + case constants.TASK_TYPE_HEALTHCHECK: + var hData types.HealthCheckData + err := json.Unmarshal(data.Data, &hData) + if err != nil { + return fmt.Errorf("Unmarshal Healthcheck Data: %w", err) + } + + tasks[data.ID].Status = constants.TASK_STATUS_RUNNING + go func() { + defer func() { + if tasks[data.ID].Status == constants.TASK_STATUS_RUNNING { + tasks[data.ID].Status = constants.TASK_STATUS_FAILED + tasks[data.ID].Log = append(tasks[data.ID].Log, "Task Status Set to Failed by defer") + } + }() + RunHealthCheck(conf, tasks[data.ID], hData) + }() + return nil + case constants.TASK_TYPE_TRANSCODE: + return fmt.Errorf("Transcode Task Not Implemented") + default: + return fmt.Errorf("Unknown Task Type %v", data.Type) + } +} + +func Get(r types.TaskStatusRequest) (*types.Task, error) { + taskMutex.Lock() + defer taskMutex.Unlock() + + t, ok := tasks[r.ID] + if !ok { + return nil, fmt.Errorf("Task does not Exist") + } + + res := *t + + // Send only new logs if there are any + if len(res.Log) >= r.LogOffset { + res.Log = res.Log[r.LogOffset:] + } else { + res.Log = []string{} + } + + return &res, nil +} + +func DeleteTask(id int) error { + taskMutex.Lock() + defer taskMutex.Unlock() + + _, ok := tasks[id] + if !ok { + return fmt.Errorf("Task does not Exist") + } + + if tasks[id].Status == constants.TASK_STATUS_RUNNING { + return fmt.Errorf("Task is Currently Running") + } + + delete(tasks, id) + return nil +} diff --git a/tmpl/index.tmpl b/tmpl/index.tmpl index 46049b7..644beba 100644 --- a/tmpl/index.tmpl +++ b/tmpl/index.tmpl @@ -72,40 +72,4 @@ {{end}} -
-
ID | -Library | -Worker | -Type | -Status | -File | -
---|---|---|---|---|---|
- {{ $t.ID }} - | -- {{ $t.Library }} - | -- {{ $t.Worker }} - | -- {{ $t.Type }} - | -- {{ $t.Status }} - | -- {{ $t.File }} - | -
ID | +Library | +Worker | Type | +Status | File | {{ $t.ID }} | ++ {{ $t.Library }} + | ++ {{ $t.Worker }} + | {{ $t.Type }} | ++ {{ $t.Status }} + | {{ $t.File }} | diff --git a/types/ffmpeg.go b/types/ffmpeg.go new file mode 100644 index 0000000..4f9bd99 --- /dev/null +++ b/types/ffmpeg.go @@ -0,0 +1,53 @@ +package types + +type FFmpegCommand struct { + Args []Arg `json:"args"` + InputFiles []File `json:"input_files"` + OutputFiles []File `json:"output_files"` +} + +type File struct { + Path string `json:"path"` + Arguments []Arg `json:"args"` +} + +type Arg struct { + Flag string `json:"flag"` + Value string `json:"value"` +} + +func (c *FFmpegCommand) GetArgs() []string { + args := []string{} + + for _, a := range c.Args { + args = append(args, a.Flag) + if a.Value != "" { + args = append(args, a.Value) + } + } + + for _, i := range c.InputFiles { + for _, a := range i.Arguments { + args = append(args, a.Flag) + if a.Value != "" { + args = append(args, a.Value) + } + } + args = append(args, "-i", i.Path) + } + + for _, i := range c.OutputFiles { + for _, a := range i.Arguments { + args = append(args, a.Flag) + if a.Value != "" { + args = append(args, a.Value) + } + } + args = append(args, i.Path) + } + + return args +} + +// ffmpeg -loglevel error -stats +// ffmpeg.exe -stats -v error -i "in.mp4" -f null -max_muxing_queue_size 9999 "out.mp4" diff --git a/types/healtcheck.go b/types/healtcheck.go new file mode 100644 index 0000000..1299ccf --- /dev/null +++ b/types/healtcheck.go @@ -0,0 +1,5 @@ +package types + +type HealthCheckData struct { + Command FFmpegCommand `json:"command"` +} diff --git a/types/task.go b/types/task.go new file mode 100644 index 0000000..e47fce2 --- /dev/null +++ b/types/task.go @@ -0,0 +1,31 @@ +package types + +import ( + "encoding/json" + + "git.lastassault.de/speatzle/morffix/constants" +) + +type TaskStart struct { + ID int `json:"id"` + FileID int `json:"file_id"` + Type int `json:"type"` + Data json.RawMessage +} + +type Task struct { + ID int `json:"id"` + FileID int `json:"file_id"` + Type int `json:"type"` + Status constants.TaskStatus `json:"status"` + Log []string `json:"log"` +} + +type TaskStatus struct { + Task Task `json:"task"` +} + +type TaskStatusRequest struct { + ID int `json:"id" db:"id"` + LogOffset int `json:"log_offset" db:"log_offset"` +} diff --git a/worker/task.go b/worker/task.go new file mode 100644 index 0000000..b909a9b --- /dev/null +++ b/worker/task.go @@ -0,0 +1,64 @@ +package worker + +import ( + "context" + "encoding/json" + "fmt" + + "git.lastassault.de/speatzle/morffix/rpc" + "git.lastassault.de/speatzle/morffix/task" + "git.lastassault.de/speatzle/morffix/types" +) + +func init() { + rpcServer.RegisterMethod("task-start", taskStart) + rpcServer.RegisterMethod("task-status", taskStatus) + rpcServer.RegisterMethod("task-delete", taskDelete) +} + +func taskStart(ctx context.Context, req rpc.Request) (any, error) { + var data types.TaskStart + err := json.Unmarshal(*req.Params, &data) + if err != nil { + return nil, fmt.Errorf("Unmarshal Task Start Params: %w", err) + } + + err = task.StartTask(conf, data) + if err != nil { + return nil, fmt.Errorf("Starting Task: %w", err) + } + + return nil, nil +} + +func taskStatus(ctx context.Context, req rpc.Request) (any, error) { + var data types.TaskStatusRequest + err := json.Unmarshal(*req.Params, &data) + if err != nil { + return nil, fmt.Errorf("Unmarshal Task Status Request: %w", err) + } + + t, err := task.Get(data) + if err != nil { + return nil, fmt.Errorf("Get Task Status: %w", err) + } + + return types.TaskStatus{ + Task: *t, + }, nil +} + +func taskDelete(ctx context.Context, req rpc.Request) (any, error) { + var data int + err := json.Unmarshal(*req.Params, &data) + if err != nil { + return nil, fmt.Errorf("Unmarshal Task Delete ID: %w", err) + } + + err = task.DeleteTask(data) + if err != nil { + return nil, fmt.Errorf("Deleting Task: %w", err) + } + + return nil, nil +} diff --git a/worker/worker.go b/worker/worker.go index 3f42d47..07dbad6 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -26,8 +26,15 @@ var rpcServer = rpc.NewServer() func Start(_conf config.Config) { conf = _conf ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - uuid := uuid.New() + // TODO Generate and Save a ID if the Config has none + //uuid := uuid.New() + uuid, err := uuid.Parse(conf.Worker.ID) + if err != nil { + slog.Error("Cannot Parse ID", "err", err) + return + } sigs := make(chan os.Signal, 1) signal.Notify(sigs, os.Interrupt) @@ -59,8 +66,9 @@ func connectToServer(ctx context.Context, uuid uuid.UUID) { headers.Add(constants.SHARED_SECRET_HEADER, conf.SharedSecret) headers.Add(constants.NAME_HEADER, conf.Worker.Name) headers.Add(constants.UUID_HEADER, uuid.String()) + headers.Add(constants.WORKER_VERSION_HEADER, constants.WORKER_VERSION) - c, res, err := websocket.Dial(ctx, conf.Worker.Address, &websocket.DialOptions{ + c, res, err := websocket.Dial(ctx, conf.Worker.Address+"/worker", &websocket.DialOptions{ HTTPHeader: headers, }) if err != nil {
---|