Merge branch 'task' into 'main'

Get Task stuff Working

See merge request speatzle/morffix!1
This commit is contained in:
Samuel Lorch 2024-05-10 08:50:44 +00:00
commit d9f2da2519
22 changed files with 963 additions and 102 deletions

View file

@ -12,6 +12,9 @@ type Server struct {
} }
type Worker struct { type Worker struct {
Address string ID string
Name string TempDir string
Address string
Name string
FFmpegPath string
} }

View file

@ -1,5 +1,10 @@
package constants package constants
import "fmt"
const WORKER_VERSION = "v1"
const WORKER_VERSION_HEADER = "morffix-version"
const SHARED_SECRET_HEADER = "morffix-secret" const SHARED_SECRET_HEADER = "morffix-secret"
const NAME_HEADER = "morffix-name" const NAME_HEADER = "morffix-name"
const UUID_HEADER = "morffix-uuid" const UUID_HEADER = "morffix-uuid"
@ -8,8 +13,47 @@ const INDEX_TEMPLATE_NAME = "index.tmpl"
const LIBRARIES_TEMPLATE_NAME = "libraries.tmpl" const LIBRARIES_TEMPLATE_NAME = "libraries.tmpl"
const LIBRARY_TEMPLATE_NAME = "library.tmpl" const LIBRARY_TEMPLATE_NAME = "library.tmpl"
const MESSAGE_TEMPLATE_NAME = "message.tmpl" const MESSAGE_TEMPLATE_NAME = "message.tmpl"
const TASK_TEMPLATE_NAME = "tasks.tmpl" const TASKS_TEMPLATE_NAME = "tasks.tmpl"
const TASK_TEMPLATE_NAME = "task.tmpl"
const ( const (
HEALTHCHECK_TASK_TYPE = iota TASK_TYPE_HEALTHCHECK = iota
TASK_TYPE_TRANSCODE
) )
type TaskStatus int
// Non Append Changes Need Worker Version Bump
const (
TASK_STATUS_UNKNOWN TaskStatus = iota
TASK_STATUS_FAILED
TASK_STATUS_SUCCESS
TASK_STATUS_RUNNING
TASK_STATUS_QUEUED
TASK_STATUS_ASSIGNED
TASK_STATUS_PAUSED
TASK_STATUS_WAITING
)
func (s TaskStatus) String() string {
switch s {
case TASK_STATUS_UNKNOWN:
return "Unknown"
case TASK_STATUS_FAILED:
return "Failed"
case TASK_STATUS_SUCCESS:
return "Success"
case TASK_STATUS_RUNNING:
return "Running"
case TASK_STATUS_QUEUED:
return "Queued"
case TASK_STATUS_ASSIGNED:
return "Assigned"
case TASK_STATUS_PAUSED:
return "Paused"
case TASK_STATUS_WAITING:
return "Waiting"
default:
return fmt.Sprintf("%d", int(s))
}
}

9
go.mod
View file

@ -18,9 +18,12 @@ require (
github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 // indirect github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/samber/lo v1.38.1 // indirect
github.com/samber/slog-multi v1.0.2 // indirect
go.uber.org/atomic v1.11.0 // indirect go.uber.org/atomic v1.11.0 // indirect
golang.org/x/crypto v0.22.0 // indirect golang.org/x/crypto v0.23.0 // indirect
golang.org/x/exp v0.0.0-20230315142452-642cacee5cc0 // indirect
golang.org/x/sync v0.7.0 // indirect golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.19.0 // indirect golang.org/x/sys v0.20.0 // indirect
golang.org/x/text v0.14.0 // indirect golang.org/x/text v0.15.0 // indirect
) )

12
go.sum
View file

@ -28,6 +28,10 @@ github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFr
github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs=
github.com/mackerelio/go-osstat v0.2.4/go.mod h1:Zy+qzGdZs3A9cuIqmgbJvwbmLQH9dJvtio5ZjJTbdlQ= github.com/mackerelio/go-osstat v0.2.4/go.mod h1:Zy+qzGdZs3A9cuIqmgbJvwbmLQH9dJvtio5ZjJTbdlQ=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM=
github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA=
github.com/samber/slog-multi v1.0.2 h1:6BVH9uHGAsiGkbbtQgAOQJMpKgV8unMrHhhJaw+X1EQ=
github.com/samber/slog-multi v1.0.2/go.mod h1:uLAvHpGqbYgX4FSL0p1ZwoLuveIAJvBECtE07XmYvFo=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
@ -41,6 +45,10 @@ golang.org/x/crypto v0.20.0 h1:jmAMJJZXr5KiCw05dfYK9QnqaqKLYXijU23lsEdcQqg=
golang.org/x/crypto v0.20.0/go.mod h1:Xwo95rrVNIoSMx9wa1JroENMToLWn3RNVrTBpLHgZPQ= golang.org/x/crypto v0.20.0/go.mod h1:Xwo95rrVNIoSMx9wa1JroENMToLWn3RNVrTBpLHgZPQ=
golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30= golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30=
golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M= golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M=
golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI=
golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
golang.org/x/exp v0.0.0-20230315142452-642cacee5cc0 h1:pVgRXcIictcr+lBQIFeiwuwtDIs4eL21OuM9nyAADmo=
golang.org/x/exp v0.0.0-20230315142452-642cacee5cc0/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE= golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE=
@ -53,8 +61,12 @@ golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk=
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
nhooyr.io/websocket v1.8.11 h1:f/qXNc2/3DpoSZkHt1DQu6rj4zGC8JmkkLkWss0MgN0= nhooyr.io/websocket v1.8.11 h1:f/qXNc2/3DpoSZkHt1DQu6rj4zGC8JmkkLkWss0MgN0=

View file

@ -40,6 +40,14 @@ func main() {
return return
} }
if conf.Worker.FFmpegPath == "" {
conf.Worker.FFmpegPath = "ffmpeg"
}
if conf.Worker.TempDir == "" {
conf.Worker.TempDir = "/tmp"
}
if *isserver { if *isserver {
slog.Info("Starting Server...") slog.Info("Starting Server...")
server.Start(conf, templates, static, migrations) server.Start(conf, templates, static, migrations)

View file

@ -3,6 +3,8 @@ CREATE TABLE IF NOT EXISTS tasks(
id serial PRIMARY KEY, id serial PRIMARY KEY,
worker_id uuid REFERENCES workers(id), worker_id uuid REFERENCES workers(id),
file_id integer REFERENCES files(id) NOT NULL, file_id integer REFERENCES files(id) NOT NULL,
type smallint NOT NULL,
status smallint NOT NULL, status smallint NOT NULL,
type smallint NOT NULL data JSONB,
log text[]
); );

26
server/file.go Normal file
View file

@ -0,0 +1,26 @@
package server
import (
"log/slog"
"net/http"
)
func handleFile(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
if id == "" {
http.Error(w, "No id", http.StatusBadRequest)
return
}
// TODO check if worker is working on a task involving this file
var path string
err := db.QueryRow(r.Context(), "SELECT path FROM files WHERE id = $1", id).Scan(&path)
if err != nil {
http.Error(w, "Error Getting Path: "+err.Error(), http.StatusBadRequest)
slog.ErrorContext(r.Context(), "Getting Path", "err", err)
return
}
http.ServeFile(w, r, path)
}

View file

@ -7,14 +7,12 @@ import (
"git.lastassault.de/speatzle/morffix/constants" "git.lastassault.de/speatzle/morffix/constants"
"git.lastassault.de/speatzle/morffix/types" "git.lastassault.de/speatzle/morffix/types"
"github.com/jackc/pgx/v5"
) )
type IndexData struct { type IndexData struct {
Counter []int Counter []int
Workers []IndexWorker Workers []IndexWorker
Tasks []Task
} }
type IndexWorker struct { type IndexWorker struct {
@ -22,24 +20,11 @@ type IndexWorker struct {
Status *types.WorkerStatus Status *types.WorkerStatus
} }
var count = 123456789
type Task struct {
ID int `db:"id"`
Library int `db:"library"`
Worker *string `db:"worker"`
Type int `db:"type"`
Status int `db:"status"`
File string `db:"file"`
}
func handleIndex(w http.ResponseWriter, r *http.Request) { func handleIndex(w http.ResponseWriter, r *http.Request) {
data := IndexData{ data := IndexData{
Counter: splitInt(count),
Workers: []IndexWorker{}, Workers: []IndexWorker{},
} }
count++
func() { func() {
WorkersMutex.Lock() WorkersMutex.Lock()
defer WorkersMutex.Unlock() defer WorkersMutex.Unlock()
@ -66,19 +51,12 @@ func handleIndex(w http.ResponseWriter, r *http.Request) {
} }
}() }()
rows, err := db.Query(r.Context(), "SELECT t.id AS id, l.id AS library, t.worker_id AS worker, t.type AS type, t.status AS status, f.path AS file FROM tasks t INNER JOIN files f ON f.id = t.file_id INNER JOIN libraries l ON l.id = f.library_id") var size int
err := db.QueryRow(r.Context(), "SELECT SUM(size) AS size FROM files WHERE missing = $1", false).Scan(&size)
if err != nil { if err != nil {
slog.ErrorContext(r.Context(), "Query Tasks", "err", err) size = 0
http.Error(w, "Error Query Tasks: "+err.Error(), http.StatusInternalServerError)
return
} }
tasks, err := pgx.CollectRows[Task](rows, pgx.RowToStructByName[Task]) data.Counter = splitInt(size)
if err != nil {
slog.ErrorContext(r.Context(), "Executing index Template", "err", err)
http.Error(w, "Error Query Tasks: "+err.Error(), http.StatusInternalServerError)
return
}
data.Tasks = tasks
buf := bytes.Buffer{} buf := bytes.Buffer{}
err = templates.ExecuteTemplate(&buf, constants.INDEX_TEMPLATE_NAME, data) err = templates.ExecuteTemplate(&buf, constants.INDEX_TEMPLATE_NAME, data)

View file

@ -74,7 +74,9 @@ func Start(_conf config.Config, tmplFS embed.FS, staticFS embed.FS, migrationsFS
mux := http.NewServeMux() mux := http.NewServeMux()
mux.HandleFunc("/worker", handleWorkerWebsocket) mux.HandleFunc("/worker", handleWorkerWebsocket)
mux.Handle("/static/", fs) mux.Handle("/static/", fs)
mux.HandleFunc("/tasks", handleTask) mux.HandleFunc("/tasks", handleTasks)
mux.HandleFunc("/files/{id}", handleFile)
mux.HandleFunc("/tasks/{id}", handleTask)
mux.HandleFunc("/scan/{id}", handleScan) mux.HandleFunc("/scan/{id}", handleScan)
mux.HandleFunc("/libraries/{id}", handleLibrary) mux.HandleFunc("/libraries/{id}", handleLibrary)
mux.HandleFunc("/libraries", handleLibraries) mux.HandleFunc("/libraries", handleLibraries)
@ -102,7 +104,7 @@ func Start(_conf config.Config, tmplFS embed.FS, staticFS embed.FS, migrationsFS
}() }()
stopCleanup := make(chan bool, 1) stopCleanup := make(chan bool, 1)
go cleanupDeadWorkers(stopCleanup) go manageWorkers(stopCleanup)
sigs := make(chan os.Signal, 1) sigs := make(chan os.Signal, 1)
signal.Notify(sigs, os.Interrupt) signal.Notify(sigs, os.Interrupt)

View file

@ -3,20 +3,40 @@ package server
import ( import (
"bytes" "bytes"
"context" "context"
"encoding/json"
"fmt" "fmt"
"log/slog" "log/slog"
"net/http" "net/http"
"git.lastassault.de/speatzle/morffix/constants" "git.lastassault.de/speatzle/morffix/constants"
"git.lastassault.de/speatzle/morffix/types"
"github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5"
) )
type TaskData struct { type TasksData struct {
Libraries []Library Libraries []Library
Tasks []Task Tasks []TaskDisplay
} }
func handleTask(w http.ResponseWriter, r *http.Request) { type TaskDisplay struct {
ID int `db:"id"`
Library int `db:"library"`
Worker *string `db:"worker"`
Type int `db:"type"`
Status string `db:"status"`
File string `db:"file"`
}
type TaskDB struct {
ID int `db:"id"`
Library int `db:"library"`
Worker *string `db:"worker"`
Type int `db:"type"`
Status constants.TaskStatus `db:"status"`
File string `db:"file"`
}
func handleTasks(w http.ResponseWriter, r *http.Request) {
if r.Method == "POST" { if r.Method == "POST" {
err := createTask(r.Context(), r) err := createTask(r.Context(), r)
if err != nil { if err != nil {
@ -26,7 +46,7 @@ func handleTask(w http.ResponseWriter, r *http.Request) {
} }
} }
data := TaskData{} data := TasksData{}
rows, err := db.Query(r.Context(), "SELECT id, name, path, enable FROM libraries WHERE enable = $1", true) rows, err := db.Query(r.Context(), "SELECT id, name, path, enable FROM libraries WHERE enable = $1", true)
if err != nil { if err != nil {
@ -48,16 +68,58 @@ func handleTask(w http.ResponseWriter, r *http.Request) {
http.Error(w, "Error Query Tasks: "+err.Error(), http.StatusInternalServerError) http.Error(w, "Error Query Tasks: "+err.Error(), http.StatusInternalServerError)
return return
} }
tasks, err := pgx.CollectRows[Task](rows, pgx.RowToStructByName[Task]) tasks, err := pgx.CollectRows[TaskDB](rows, pgx.RowToStructByName[TaskDB])
if err != nil { if err != nil {
slog.ErrorContext(r.Context(), "Collect Tasks", "err", err) slog.ErrorContext(r.Context(), "Collect Tasks", "err", err)
http.Error(w, "Error Query Tasks: "+err.Error(), http.StatusInternalServerError) http.Error(w, "Error Query Tasks: "+err.Error(), http.StatusInternalServerError)
return return
} }
data.Tasks = tasks for i := range tasks {
data.Tasks = append(data.Tasks, TaskDisplay{
ID: tasks[i].ID,
Library: tasks[i].Library,
Worker: tasks[i].Worker,
Type: tasks[i].Type,
File: tasks[i].File,
Status: tasks[i].Status.String(),
})
}
buf := bytes.Buffer{} buf := bytes.Buffer{}
err = templates.ExecuteTemplate(&buf, constants.TASK_TEMPLATE_NAME, data) err = templates.ExecuteTemplate(&buf, constants.TASKS_TEMPLATE_NAME, data)
if err != nil {
slog.ErrorContext(r.Context(), "Executing Task Template", "err", err)
http.Error(w, "Error Executing Template: "+err.Error(), http.StatusInternalServerError)
return
}
_, err = w.Write(buf.Bytes())
if err != nil {
slog.ErrorContext(r.Context(), "Writing http Response", "err", err)
}
}
func handleTask(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
if id == "" {
handleTasks(w, r)
return
}
var log []string
err := db.QueryRow(r.Context(), "SELECT log FROM tasks WHERE id = $1", id).Scan(&log)
if err != nil {
slog.ErrorContext(r.Context(), "Query Tasks", "err", err)
http.Error(w, "Error Query Tasks: "+err.Error(), http.StatusInternalServerError)
return
}
t := types.Task{
Log: log,
}
buf := bytes.Buffer{}
err = templates.ExecuteTemplate(&buf, constants.TASK_TEMPLATE_NAME, t)
if err != nil { if err != nil {
slog.ErrorContext(r.Context(), "Executing Task Template", "err", err) slog.ErrorContext(r.Context(), "Executing Task Template", "err", err)
http.Error(w, "Error Executing Template: "+err.Error(), http.StatusInternalServerError) http.Error(w, "Error Executing Template: "+err.Error(), http.StatusInternalServerError)
@ -94,8 +156,19 @@ func createTask(ctx context.Context, r *http.Request) error {
} }
defer tx.Rollback(ctx) defer tx.Rollback(ctx)
var data any
if true { // TODO typ == constants.TASK_TYPE_HEALTHCHECK {
// ffmpeg.exe -stats -v error -i "in.mp4" -f null -max_muxing_queue_size 9999 "out.mp4"
data = types.HealthCheckData{Command: types.FFmpegCommand{
Args: []types.Arg{{Flag: "-stats"}, {Flag: "-v", Value: "error"}, {Flag: "-xerror"}},
InputFiles: []types.File{{Path: "input.mkv"}},
OutputFiles: []types.File{{Path: "output.mkv", Arguments: []types.Arg{{Flag: "-f", Value: "null"}, {Flag: "-max_muxing_queue_size", Value: "9999"}}}},
}}
}
for _, file := range files { for _, file := range files {
_, err = db.Exec(r.Context(), "INSERT INTO tasks (file_id, type, status) VALUES ($1,$2,$3)", file.ID, typ, 0) _, err = db.Exec(r.Context(), "INSERT INTO tasks (file_id, type, status, data) VALUES ($1,$2,$3,$4)", file.ID, typ, constants.TASK_STATUS_QUEUED, data)
if err != nil { if err != nil {
return fmt.Errorf("Inserting Task: %w", err) return fmt.Errorf("Inserting Task: %w", err)
} }
@ -108,3 +181,83 @@ func createTask(ctx context.Context, r *http.Request) error {
return nil return nil
} }
type QueuedTask struct {
ID int
Type int
FileID int `json:"file_id"`
Data json.RawMessage
}
func assignQueuedTasks(ctx context.Context) error {
rows, err := db.Query(ctx, "SELECT id, type, file_id, data FROM tasks WHERE status = $1", constants.TASK_STATUS_QUEUED)
if err != nil {
return fmt.Errorf("Query Queued Tasks: %w", err)
}
queuedTasks, err := pgx.CollectRows[QueuedTask](rows, pgx.RowToStructByName[QueuedTask])
if err != nil {
return fmt.Errorf("Collect Queued Tasks: %w", err)
}
//slog.Info("Assigning Queued Tasks", "count", len(queuedTasks))
if len(queuedTasks) == 0 {
return nil
}
WorkersMutex.Lock()
defer WorkersMutex.Unlock()
lastAssigned := 0
for i := range Workers {
if lastAssigned == len(queuedTasks) {
slog.Info("All Tasks assigned")
// All Tasks have been Assigned
return nil
}
if Workers[i].Connected {
var count int
err := db.QueryRow(ctx, "SELECT COUNT(*) FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4)", i, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING).Scan(&count)
if err != nil {
return fmt.Errorf("Error Querying Worker Task Count: %w", err)
}
slog.Info("Assigning Queued Tasks Worker", "worker", i, "count", count)
// Allow for Multiple Tasks at once in the future
if count < 1 {
tx, err := db.Begin(ctx)
defer tx.Rollback(ctx)
if err != nil {
return fmt.Errorf("Starting Transaction: %w", err)
}
_, err = tx.Exec(ctx, "UPDATE tasks SET status = $2, worker_id = $3 WHERE id = $1", queuedTasks[lastAssigned].ID, constants.TASK_STATUS_ASSIGNED, i)
if err != nil {
return fmt.Errorf("Setting tasks Assignment: %w", err)
}
taskStart := types.TaskStart{
ID: queuedTasks[lastAssigned].ID,
Type: queuedTasks[lastAssigned].Type,
FileID: queuedTasks[lastAssigned].FileID,
Data: queuedTasks[lastAssigned].Data,
}
_, err = rpcServer.Call(ctx, Workers[i].Conn, "task-start", taskStart, nil)
if err != nil {
return fmt.Errorf("Error Starting Task: %w", err)
}
err = tx.Commit(ctx)
if err != nil {
return fmt.Errorf("Error Committing Transaction: %w", err)
}
slog.InfoContext(ctx, "Assigned Task to Worker", "task", queuedTasks[lastAssigned].ID, "worker", i)
lastAssigned++
}
}
}
return nil
}

View file

@ -8,6 +8,8 @@ import (
"time" "time"
"git.lastassault.de/speatzle/morffix/rpc" "git.lastassault.de/speatzle/morffix/rpc"
"git.lastassault.de/speatzle/morffix/types"
"github.com/jackc/pgx/v5"
"git.lastassault.de/speatzle/morffix/constants" "git.lastassault.de/speatzle/morffix/constants"
@ -32,6 +34,25 @@ func handleWorkerWebsocket(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusUnauthorized) w.WriteHeader(http.StatusUnauthorized)
return return
} }
if r.Header.Get(constants.WORKER_VERSION_HEADER) != constants.WORKER_VERSION {
w.WriteHeader(http.StatusNotImplemented)
return
}
// Connection ID
uuid := r.Header.Get(constants.UUID_HEADER)
if uuid == "" {
w.WriteHeader(http.StatusBadRequest)
return
}
name := r.Header.Get(constants.NAME_HEADER)
_, err := db.Exec(r.Context(), "INSERT INTO workers (id, name) VALUES ($1, $2) ON CONFLICT (id) DO UPDATE SET name = $2", uuid, name)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
slog.ErrorContext(r.Context(), "Error Upserting Worker", "err", err)
return
}
c, err := websocket.Accept(w, r, &websocket.AcceptOptions{ c, err := websocket.Accept(w, r, &websocket.AcceptOptions{
InsecureSkipVerify: true, InsecureSkipVerify: true,
}) })
@ -41,9 +62,6 @@ func handleWorkerWebsocket(w http.ResponseWriter, r *http.Request) {
} }
defer c.CloseNow() defer c.CloseNow()
// Connection ID
uuid := r.Header.Get(constants.UUID_HEADER)
// Track Connection // Track Connection
func() { func() {
WorkersMutex.Lock() WorkersMutex.Lock()
@ -66,7 +84,7 @@ func handleWorkerWebsocket(w http.ResponseWriter, r *http.Request) {
} }
Workers[uuid] = &Worker{ Workers[uuid] = &Worker{
Name: r.Header.Get(constants.NAME_HEADER), Name: name,
Address: r.RemoteAddr, Address: r.RemoteAddr,
Conn: c, Conn: c,
Connected: true, Connected: true,
@ -110,26 +128,102 @@ func readMessage(ctx context.Context, c *websocket.Conn) error {
return nil return nil
} }
func cleanupDeadWorkers(stop chan bool) { func manageWorkers(stop chan bool) {
ticker := time.NewTicker(time.Second) deadTicker := time.NewTicker(time.Second)
statusTicker := time.NewTicker(time.Second)
assignTicker := time.NewTicker(time.Second)
for { for {
select { select {
case <-ticker.C: case <-deadTicker.C:
func() { killDeadWorkers()
WorkersMutex.Lock() case <-statusTicker.C:
defer WorkersMutex.Unlock() updateWorkerTaskStatus(context.TODO())
case <-assignTicker.C:
for uuid, w := range Workers { err := assignQueuedTasks(context.TODO())
// TODO Not dead Workers are sometimes also killed? if err != nil {
if !w.Connected && w.ConnectionChanged.Add(time.Minute*5).Before(time.Now()) { slog.Error("Assigning Queued Tasks", "err", err)
slog.Warn("Removing Dead Worker", "uuid", uuid, "name", w.Name) }
delete(Workers, uuid)
// TODO Free any Jobs that the Worker had
}
}
}()
case <-stop: case <-stop:
return return
} }
} }
} }
func killDeadWorkers() {
WorkersMutex.Lock()
defer WorkersMutex.Unlock()
for uuid, w := range Workers {
// TODO Not dead Workers are sometimes also killed?
if !w.Connected && w.ConnectionChanged.Add(time.Minute*5).Before(time.Now()) {
slog.Warn("Removing Dead Worker", "uuid", uuid, "name", w.Name)
delete(Workers, uuid)
// TODO Free any Jobs that the Worker had
}
}
}
func updateWorkerTaskStatus(ctx context.Context) {
var wg sync.WaitGroup
func() {
WorkersMutex.Lock()
defer WorkersMutex.Unlock()
for uuid := range Workers {
if Workers[uuid].Connected {
w := Workers[uuid]
rows, err := db.Query(ctx, "SELECT id, COALESCE(CARDINALITY(log),0) as log_offset FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4)", uuid, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING)
if err != nil {
slog.ErrorContext(ctx, "Error Getting Tasks for Worker", "err", err, "worker_id", uuid)
return
}
taskStatusRequests, err := pgx.CollectRows[types.TaskStatusRequest](rows, pgx.RowToStructByName[types.TaskStatusRequest])
if err != nil {
slog.ErrorContext(ctx, "Collect Task Status Rows", "err", err)
return
}
for _, tsr := range taskStatusRequests {
wg.Add(1)
go func() {
defer wg.Done()
var ts types.TaskStatus
_, err := rpcServer.Call(ctx, w.Conn, "task-status", tsr, &ts)
if err != nil {
slog.ErrorContext(ctx, "Error Getting Task Status", "err", err)
// TODO Mark Task as Unknown?
return
}
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", ts.Task.ID, ts.Task.Status, ts.Task.Log)
if err != nil {
slog.ErrorContext(ctx, "Error Updating Task Status", "err", err)
return
}
// Tell Worker to Delete Finished Tasks
if ts.Task.Status == constants.TASK_STATUS_FAILED || ts.Task.Status == constants.TASK_STATUS_SUCCESS {
_, err := rpcServer.Call(ctx, w.Conn, "task-delete", ts.Task.ID, nil)
if err != nil {
slog.ErrorContext(ctx, "Error Deleting Finished Task From Worker", "err", err)
return
}
}
}()
}
// TODO Handle tasks with status unkown assigned to this worker
} else {
// Set Task Status to Unknown for Unfinished Tasks which where assigned to this not connected worker, right now they just get stuck
}
}
}()
wg.Wait()
}

157
task/healthcheck.go Normal file
View file

@ -0,0 +1,157 @@
package task
import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"net/http"
"os"
"os/exec"
"path/filepath"
"sync"
"git.lastassault.de/speatzle/morffix/config"
"git.lastassault.de/speatzle/morffix/constants"
"git.lastassault.de/speatzle/morffix/task/log"
"git.lastassault.de/speatzle/morffix/types"
)
// dropCR drops a terminal \r from the data.
func dropCR(data []byte) []byte {
if len(data) > 0 && data[len(data)-1] == '\r' {
return data[0 : len(data)-1]
}
return data
}
func scanLines(data []byte, atEOF bool) (advance int, token []byte, err error) {
if atEOF && len(data) == 0 {
return 0, nil, nil
}
if i := bytes.IndexByte(data, '\n'); i >= 0 {
// We have a full newline-terminated line.
return i + 1, dropCR(data[0:i]), nil
}
if i := bytes.IndexByte(data, '\r'); i >= 0 {
// We have a return line.
return i + 1, dropCR(data[0:i]), nil
}
// If we're at EOF, we have a final, non-terminated line. Return it.
if atEOF {
return len(data), dropCR(data), nil
}
// Request more data.
return 0, nil, nil
}
func RunHealthCheck(conf config.Config, t *types.Task, data types.HealthCheckData) {
ctx := context.TODO()
l := log.GetTaskLogger(t)
// TODO Figure out how to get correct file ending
path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("%v-%v.mkv", t.ID, t.FileID))
// Set ffmpeg input path
if len(data.Command.InputFiles) == 0 {
l.ErrorContext(ctx, "FFmpeg Command has no input files")
return
}
data.Command.InputFiles[0].Path = path
// TODO cleanup file when done
defer func() {
err := os.Remove(path)
if err != nil {
l.ErrorContext(ctx, "Removing File", "err", err, "path", path)
} else {
l.ErrorContext(ctx, "File Removed Succesfully", "path", path)
}
}()
l.InfoContext(ctx, "Starting File Download", "task_id", t.ID, "file_id", t.FileID, "path", path)
err := func() error {
out, err := os.Create(path)
if err != nil {
return fmt.Errorf("Creating File: %w", err)
}
defer out.Close()
resp, err := http.Get(fmt.Sprintf("%v/files/%v", conf.Worker.Address, t.FileID))
if err != nil {
return fmt.Errorf("Getting File: %w", err)
}
// TODO Log at interval logs read
defer resp.Body.Close()
n, err := io.Copy(out, resp.Body)
if err != nil {
return fmt.Errorf("Reading File: %w", err)
}
l.InfoContext(ctx, "Downloaded File", "bytes", n)
return nil
}()
if err != nil {
l.ErrorContext(ctx, "File Download Failed", "err", err)
return
}
l.InfoContext(ctx, "Running ffmpeg", "args", data.Command.GetArgs())
cmd := exec.CommandContext(ctx, conf.Worker.FFmpegPath, data.Command.GetArgs()...)
var wg sync.WaitGroup
stdout, err := cmd.StdoutPipe()
if err != nil {
l.ErrorContext(ctx, "Error getting StdoutPipe", "err", err)
return
}
wg.Add(1)
outScanner := bufio.NewScanner(stdout)
outScanner.Split(scanLines)
go func() {
for outScanner.Scan() {
l.InfoContext(ctx, outScanner.Text())
}
wg.Done()
}()
stderr, err := cmd.StderrPipe()
if err != nil {
l.ErrorContext(ctx, "Error getting StderrPipe", "err", err)
return
}
wg.Add(1)
errScanner := bufio.NewScanner(stderr)
errScanner.Split(scanLines)
go func() {
for errScanner.Scan() {
l.InfoContext(ctx, errScanner.Text())
}
wg.Done()
}()
err = cmd.Start()
if err != nil {
l.ErrorContext(ctx, "Error Starting ffmpeg", "err", err)
return
}
wg.Wait()
err = cmd.Wait()
if err != nil {
l.ErrorContext(ctx, "Error Running ffmpeg", "err", err)
return
}
l.InfoContext(ctx, "Task Success")
t.Status = constants.TASK_STATUS_SUCCESS
}

145
task/log/log.go Normal file
View file

@ -0,0 +1,145 @@
package log
import (
"context"
"fmt"
"log/slog"
"sync"
"time"
"git.lastassault.de/speatzle/morffix/types"
slogmulti "github.com/samber/slog-multi"
)
func GetTaskLogger(t *types.Task) *slog.Logger {
return slog.New(
slogmulti.Fanout(
slog.Default().Handler(),
New(t, &Options{slog.LevelDebug}),
),
)
}
// groupOrAttrs holds either a group name or a list of slog.Attrs.
type groupOrAttrs struct {
group string // group name if non-empty
attrs []slog.Attr // attrs if non-empty
}
type TaskHandler struct {
opts Options
// TODO: state for WithGroup and WithAttrs
mu *sync.Mutex
t *types.Task
goas []groupOrAttrs
}
type Options struct {
// Level reports the minimum level to log.
// Levels with lower levels are discarded.
// If nil, the Handler uses [slog.LevelInfo].
Level slog.Leveler
}
func New(t *types.Task, opts *Options) *TaskHandler {
h := &TaskHandler{t: t, mu: &sync.Mutex{}}
if opts != nil {
h.opts = *opts
}
if h.opts.Level == nil {
h.opts.Level = slog.LevelInfo
}
return h
}
func (h *TaskHandler) Enabled(ctx context.Context, level slog.Level) bool {
return level >= h.opts.Level.Level()
}
func (h *TaskHandler) Handle(ctx context.Context, r slog.Record) error {
buf := make([]byte, 0, 1024)
if !r.Time.IsZero() {
buf = h.appendAttr(buf, slog.Time(slog.TimeKey, r.Time))
}
buf = h.appendAttr(buf, slog.String(slog.MessageKey, r.Message))
// Handle state from WithGroup and WithAttrs.
goas := h.goas
if r.NumAttrs() == 0 {
// If the record has no Attrs, remove groups at the end of the list; they are empty.
for len(goas) > 0 && goas[len(goas)-1].group != "" {
goas = goas[:len(goas)-1]
}
}
for _, goa := range goas {
if goa.group == "" {
for _, a := range goa.attrs {
buf = h.appendAttr(buf, a)
}
}
}
r.Attrs(func(a slog.Attr) bool {
buf = h.appendAttr(buf, a)
return true
})
h.mu.Lock()
defer h.mu.Unlock()
h.t.Log = append(h.t.Log, string(buf))
return nil
}
func (h *TaskHandler) appendAttr(buf []byte, a slog.Attr) []byte {
// Resolve the Attr's value before doing anything else.
a.Value = a.Value.Resolve()
// Ignore empty Attrs.
if a.Equal(slog.Attr{}) {
return buf
}
switch a.Value.Kind() {
case slog.KindString:
// Quote string values, to make them easy to parse.
buf = fmt.Appendf(buf, "%s: %q\n", a.Key, a.Value.String())
case slog.KindTime:
// Write times in a standard way, without the monotonic time.
buf = fmt.Appendf(buf, "%s: %s\n", a.Key, a.Value.Time().Format(time.DateTime))
case slog.KindGroup:
attrs := a.Value.Group()
// Ignore empty groups.
if len(attrs) == 0 {
return buf
}
// If the key is non-empty, write it out and indent the rest of the attrs.
// Otherwise, inline the attrs.
if a.Key != "" {
buf = fmt.Appendf(buf, "%s:\n", a.Key)
}
for _, ga := range attrs {
buf = h.appendAttr(buf, ga)
}
default:
buf = fmt.Appendf(buf, "%s: %s\n", a.Key, a.Value)
}
return buf
}
func (h *TaskHandler) withGroupOrAttrs(goa groupOrAttrs) *TaskHandler {
h2 := *h
h2.goas = make([]groupOrAttrs, len(h.goas)+1)
copy(h2.goas, h.goas)
h2.goas[len(h2.goas)-1] = goa
return &h2
}
func (h *TaskHandler) WithGroup(name string) slog.Handler {
if name == "" {
return h
}
return h.withGroupOrAttrs(groupOrAttrs{group: name})
}
func (h *TaskHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
if len(attrs) == 0 {
return h
}
return h.withGroupOrAttrs(groupOrAttrs{attrs: attrs})
}

88
task/task.go Normal file
View file

@ -0,0 +1,88 @@
package task
import (
"encoding/json"
"fmt"
"sync"
"git.lastassault.de/speatzle/morffix/config"
"git.lastassault.de/speatzle/morffix/constants"
"git.lastassault.de/speatzle/morffix/types"
)
var tasks = map[int]*types.Task{}
var taskMutex sync.Mutex
func StartTask(conf config.Config, data types.TaskStart) error {
taskMutex.Lock()
defer taskMutex.Unlock()
tasks[data.ID] = &types.Task{
ID: data.ID,
Type: data.Type,
FileID: data.FileID,
}
switch data.Type {
case constants.TASK_TYPE_HEALTHCHECK:
var hData types.HealthCheckData
err := json.Unmarshal(data.Data, &hData)
if err != nil {
return fmt.Errorf("Unmarshal Healthcheck Data: %w", err)
}
tasks[data.ID].Status = constants.TASK_STATUS_RUNNING
go func() {
defer func() {
if tasks[data.ID].Status == constants.TASK_STATUS_RUNNING {
tasks[data.ID].Status = constants.TASK_STATUS_FAILED
tasks[data.ID].Log = append(tasks[data.ID].Log, "Task Status Set to Failed by defer")
}
}()
RunHealthCheck(conf, tasks[data.ID], hData)
}()
return nil
case constants.TASK_TYPE_TRANSCODE:
return fmt.Errorf("Transcode Task Not Implemented")
default:
return fmt.Errorf("Unknown Task Type %v", data.Type)
}
}
func Get(r types.TaskStatusRequest) (*types.Task, error) {
taskMutex.Lock()
defer taskMutex.Unlock()
t, ok := tasks[r.ID]
if !ok {
return nil, fmt.Errorf("Task does not Exist")
}
res := *t
// Send only new logs if there are any
if len(res.Log) >= r.LogOffset {
res.Log = res.Log[r.LogOffset:]
} else {
res.Log = []string{}
}
return &res, nil
}
func DeleteTask(id int) error {
taskMutex.Lock()
defer taskMutex.Unlock()
_, ok := tasks[id]
if !ok {
return fmt.Errorf("Task does not Exist")
}
if tasks[id].Status == constants.TASK_STATUS_RUNNING {
return fmt.Errorf("Task is Currently Running")
}
delete(tasks, id)
return nil
}

View file

@ -72,40 +72,4 @@
</div> </div>
{{end}} {{end}}
</div> </div>
<p>
<h2>Tasks</h2>
<div class="task-list">
<table>
<tr>
<th>ID</th>
<th>Library</th>
<th>Worker</th>
<th>Type</th>
<th>Status</th>
<th>File</th>
</tr>
{{range $t := .Tasks}}
<tr>
<td>
{{ $t.ID }}
</td>
<td>
{{ $t.Library }}
</td>
<td>
{{ $t.Worker }}
</td>
<td>
{{ $t.Type }}
</td>
<td>
{{ $t.Status }}
</td>
<td>
{{ $t.File }}
</td>
</tr>
{{end}}
</table>
</div>
{{template "tail"}} {{template "tail"}}

9
tmpl/task.tmpl Normal file
View file

@ -0,0 +1,9 @@
{{template "head"}}
<h2>Task {{.ID}}</h2>
<div class="task-log">
{{range $t := .Log}}
{{ $t }}<br/>
{{end}}
</div>
{{template "tail"}}

View file

@ -19,7 +19,10 @@
<table> <table>
<tr> <tr>
<th>ID</th> <th>ID</th>
<th>Library</th>
<th>Worker</th>
<th>Type</th> <th>Type</th>
<th>Status</th>
<th>File</th> <th>File</th>
</tr> </tr>
{{range $t := .Tasks}} {{range $t := .Tasks}}
@ -27,9 +30,18 @@
<td> <td>
{{ $t.ID }} {{ $t.ID }}
</td> </td>
<td>
{{ $t.Library }}
</td>
<td>
{{ $t.Worker }}
</td>
<td> <td>
{{ $t.Type }} {{ $t.Type }}
</td> </td>
<td>
{{ $t.Status }}
</td>
<td> <td>
{{ $t.File }} {{ $t.File }}
</td> </td>

53
types/ffmpeg.go Normal file
View file

@ -0,0 +1,53 @@
package types
type FFmpegCommand struct {
Args []Arg `json:"args"`
InputFiles []File `json:"input_files"`
OutputFiles []File `json:"output_files"`
}
type File struct {
Path string `json:"path"`
Arguments []Arg `json:"args"`
}
type Arg struct {
Flag string `json:"flag"`
Value string `json:"value"`
}
func (c *FFmpegCommand) GetArgs() []string {
args := []string{}
for _, a := range c.Args {
args = append(args, a.Flag)
if a.Value != "" {
args = append(args, a.Value)
}
}
for _, i := range c.InputFiles {
for _, a := range i.Arguments {
args = append(args, a.Flag)
if a.Value != "" {
args = append(args, a.Value)
}
}
args = append(args, "-i", i.Path)
}
for _, i := range c.OutputFiles {
for _, a := range i.Arguments {
args = append(args, a.Flag)
if a.Value != "" {
args = append(args, a.Value)
}
}
args = append(args, i.Path)
}
return args
}
// ffmpeg -loglevel error -stats
// ffmpeg.exe -stats -v error -i "in.mp4" -f null -max_muxing_queue_size 9999 "out.mp4"

5
types/healtcheck.go Normal file
View file

@ -0,0 +1,5 @@
package types
type HealthCheckData struct {
Command FFmpegCommand `json:"command"`
}

31
types/task.go Normal file
View file

@ -0,0 +1,31 @@
package types
import (
"encoding/json"
"git.lastassault.de/speatzle/morffix/constants"
)
type TaskStart struct {
ID int `json:"id"`
FileID int `json:"file_id"`
Type int `json:"type"`
Data json.RawMessage
}
type Task struct {
ID int `json:"id"`
FileID int `json:"file_id"`
Type int `json:"type"`
Status constants.TaskStatus `json:"status"`
Log []string `json:"log"`
}
type TaskStatus struct {
Task Task `json:"task"`
}
type TaskStatusRequest struct {
ID int `json:"id" db:"id"`
LogOffset int `json:"log_offset" db:"log_offset"`
}

64
worker/task.go Normal file
View file

@ -0,0 +1,64 @@
package worker
import (
"context"
"encoding/json"
"fmt"
"git.lastassault.de/speatzle/morffix/rpc"
"git.lastassault.de/speatzle/morffix/task"
"git.lastassault.de/speatzle/morffix/types"
)
func init() {
rpcServer.RegisterMethod("task-start", taskStart)
rpcServer.RegisterMethod("task-status", taskStatus)
rpcServer.RegisterMethod("task-delete", taskDelete)
}
func taskStart(ctx context.Context, req rpc.Request) (any, error) {
var data types.TaskStart
err := json.Unmarshal(*req.Params, &data)
if err != nil {
return nil, fmt.Errorf("Unmarshal Task Start Params: %w", err)
}
err = task.StartTask(conf, data)
if err != nil {
return nil, fmt.Errorf("Starting Task: %w", err)
}
return nil, nil
}
func taskStatus(ctx context.Context, req rpc.Request) (any, error) {
var data types.TaskStatusRequest
err := json.Unmarshal(*req.Params, &data)
if err != nil {
return nil, fmt.Errorf("Unmarshal Task Status Request: %w", err)
}
t, err := task.Get(data)
if err != nil {
return nil, fmt.Errorf("Get Task Status: %w", err)
}
return types.TaskStatus{
Task: *t,
}, nil
}
func taskDelete(ctx context.Context, req rpc.Request) (any, error) {
var data int
err := json.Unmarshal(*req.Params, &data)
if err != nil {
return nil, fmt.Errorf("Unmarshal Task Delete ID: %w", err)
}
err = task.DeleteTask(data)
if err != nil {
return nil, fmt.Errorf("Deleting Task: %w", err)
}
return nil, nil
}

View file

@ -26,8 +26,15 @@ var rpcServer = rpc.NewServer()
func Start(_conf config.Config) { func Start(_conf config.Config) {
conf = _conf conf = _conf
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel()
uuid := uuid.New() // TODO Generate and Save a ID if the Config has none
//uuid := uuid.New()
uuid, err := uuid.Parse(conf.Worker.ID)
if err != nil {
slog.Error("Cannot Parse ID", "err", err)
return
}
sigs := make(chan os.Signal, 1) sigs := make(chan os.Signal, 1)
signal.Notify(sigs, os.Interrupt) signal.Notify(sigs, os.Interrupt)
@ -59,8 +66,9 @@ func connectToServer(ctx context.Context, uuid uuid.UUID) {
headers.Add(constants.SHARED_SECRET_HEADER, conf.SharedSecret) headers.Add(constants.SHARED_SECRET_HEADER, conf.SharedSecret)
headers.Add(constants.NAME_HEADER, conf.Worker.Name) headers.Add(constants.NAME_HEADER, conf.Worker.Name)
headers.Add(constants.UUID_HEADER, uuid.String()) headers.Add(constants.UUID_HEADER, uuid.String())
headers.Add(constants.WORKER_VERSION_HEADER, constants.WORKER_VERSION)
c, res, err := websocket.Dial(ctx, conf.Worker.Address, &websocket.DialOptions{ c, res, err := websocket.Dial(ctx, conf.Worker.Address+"/worker", &websocket.DialOptions{
HTTPHeader: headers, HTTPHeader: headers,
}) })
if err != nil { if err != nil {