From 595efd0c66dbc166dd9f15ea49dda54667d894c6 Mon Sep 17 00:00:00 2001 From: speatzle Date: Sun, 5 May 2024 17:14:39 +0200 Subject: [PATCH 01/27] Add Worker Version Check --- constants/constants.go | 3 +++ server/worker.go | 4 ++++ worker/worker.go | 1 + 3 files changed, 8 insertions(+) diff --git a/constants/constants.go b/constants/constants.go index 6521c44..a1f5cf3 100644 --- a/constants/constants.go +++ b/constants/constants.go @@ -1,5 +1,8 @@ package constants +const WORKER_VERSION = "v1" + +const WORKER_VERSION_HEADER = "morffix-version" const SHARED_SECRET_HEADER = "morffix-secret" const NAME_HEADER = "morffix-name" const UUID_HEADER = "morffix-uuid" diff --git a/server/worker.go b/server/worker.go index 0390462..e79c2ea 100644 --- a/server/worker.go +++ b/server/worker.go @@ -32,6 +32,10 @@ func handleWorkerWebsocket(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusUnauthorized) return } + if r.Header.Get(constants.WORKER_VERSION_HEADER) != constants.WORKER_VERSION { + w.WriteHeader(http.StatusNotImplemented) + return + } c, err := websocket.Accept(w, r, &websocket.AcceptOptions{ InsecureSkipVerify: true, }) diff --git a/worker/worker.go b/worker/worker.go index 3f42d47..670ef2a 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -59,6 +59,7 @@ func connectToServer(ctx context.Context, uuid uuid.UUID) { headers.Add(constants.SHARED_SECRET_HEADER, conf.SharedSecret) headers.Add(constants.NAME_HEADER, conf.Worker.Name) headers.Add(constants.UUID_HEADER, uuid.String()) + headers.Add(constants.WORKER_VERSION_HEADER, constants.WORKER_VERSION) c, res, err := websocket.Dial(ctx, conf.Worker.Address, &websocket.DialOptions{ HTTPHeader: headers, From 4153e48eb33e6049a32eafe9edf94ad9651f9a08 Mon Sep 17 00:00:00 2001 From: speatzle Date: Sun, 5 May 2024 17:15:05 +0200 Subject: [PATCH 02/27] Use Library Size for Counter --- server/index.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/server/index.go b/server/index.go index b6b703f..98ac8b0 100644 --- a/server/index.go +++ b/server/index.go @@ -22,8 +22,6 @@ type IndexWorker struct { Status *types.WorkerStatus } -var count = 123456789 - type Task struct { ID int `db:"id"` Library int `db:"library"` @@ -36,10 +34,8 @@ type Task struct { func handleIndex(w http.ResponseWriter, r *http.Request) { data := IndexData{ - Counter: splitInt(count), Workers: []IndexWorker{}, } - count++ func() { WorkersMutex.Lock() defer WorkersMutex.Unlock() @@ -66,6 +62,15 @@ func handleIndex(w http.ResponseWriter, r *http.Request) { } }() + var size int + err := db.QueryRow(r.Context(), "SELECT SUM(size) AS size FROM files WHERE missing = $1", false).Scan(&size) + if err != nil { + slog.ErrorContext(r.Context(), "Query File Size Sum", "err", err) + http.Error(w, "Error Query File Size Sum: "+err.Error(), http.StatusInternalServerError) + return + } + data.Counter = splitInt(size) + rows, err := db.Query(r.Context(), "SELECT t.id AS id, l.id AS library, t.worker_id AS worker, t.type AS type, t.status AS status, f.path AS file FROM tasks t INNER JOIN files f ON f.id = t.file_id INNER JOIN libraries l ON l.id = f.library_id") if err != nil { slog.ErrorContext(r.Context(), "Query Tasks", "err", err) From 172bf636d62e9261bfcf864aec2c0a24a9d15b9a Mon Sep 17 00:00:00 2001 From: speatzle Date: Sun, 5 May 2024 17:15:17 +0200 Subject: [PATCH 03/27] Update deps --- go.mod | 4 ++-- go.sum | 4 ++++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index cac59c0..f4b9a9b 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,6 @@ require ( go.uber.org/atomic v1.11.0 // indirect golang.org/x/crypto v0.22.0 // indirect golang.org/x/sync v0.7.0 // indirect - golang.org/x/sys v0.19.0 // indirect - golang.org/x/text v0.14.0 // indirect + golang.org/x/sys v0.20.0 // indirect + golang.org/x/text v0.15.0 // indirect ) diff --git a/go.sum b/go.sum index 2bd6758..0a7c418 100644 --- a/go.sum +++ b/go.sum @@ -53,8 +53,12 @@ golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= +golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= +golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= nhooyr.io/websocket v1.8.11 h1:f/qXNc2/3DpoSZkHt1DQu6rj4zGC8JmkkLkWss0MgN0= From a040d5d56ce3df570978b8b9d60b6ea7b2e7f49a Mon Sep 17 00:00:00 2001 From: speatzle Date: Tue, 7 May 2024 18:52:06 +0200 Subject: [PATCH 04/27] task wip --- config/config.go | 6 ++-- constants/constants.go | 10 ++++++- task/healthcheck.go | 17 +++++++++++ task/task.go | 9 ++++++ types/task.go | 18 ++++++++++++ worker/task.go | 66 ++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 123 insertions(+), 3 deletions(-) create mode 100644 task/healthcheck.go create mode 100644 task/task.go create mode 100644 types/task.go create mode 100644 worker/task.go diff --git a/config/config.go b/config/config.go index 9194e79..cf61b60 100644 --- a/config/config.go +++ b/config/config.go @@ -7,8 +7,10 @@ type Config struct { } type Server struct { - Address string - Database string + Address string + Database string + HealthCheckCommand string + TranscodeCommand string } type Worker struct { diff --git a/constants/constants.go b/constants/constants.go index a1f5cf3..2d8d7b6 100644 --- a/constants/constants.go +++ b/constants/constants.go @@ -14,5 +14,13 @@ const MESSAGE_TEMPLATE_NAME = "message.tmpl" const TASK_TEMPLATE_NAME = "tasks.tmpl" const ( - HEALTHCHECK_TASK_TYPE = iota + TASK_TYPE_HEALTHCHECK = iota + TASK_TYPE_TRANSCODE +) + +const ( + TASK_STATUS_UNKNOWN = iota + TASK_STATUS_RUNNING + TASK_STATUS_FAILED + TASK_STATUS_SUCCESS ) diff --git a/task/healthcheck.go b/task/healthcheck.go new file mode 100644 index 0000000..19e0266 --- /dev/null +++ b/task/healthcheck.go @@ -0,0 +1,17 @@ +package task + +import "git.lastassault.de/speatzle/morffix/constants" + +type HealthCheckData struct { + Command string `json:"command"` +} + +func (t *Task) RunHealthCheck(data HealthCheckData) { + defer func() { + if t.Status == constants.TASK_STATUS_RUNNING { + t.Status = constants.TASK_STATUS_FAILED + t.Log = append(t.Log, "Task Status Failed by Defer") + } + }() + +} diff --git a/task/task.go b/task/task.go new file mode 100644 index 0000000..2b632c1 --- /dev/null +++ b/task/task.go @@ -0,0 +1,9 @@ +package task + +type Task struct { + ID int `json:"id"` + FileID int `json:"file_id"` + Type int `json:"type"` + Status int `json:"status"` + Log []string `json:"log"` +} diff --git a/types/task.go b/types/task.go new file mode 100644 index 0000000..d88958b --- /dev/null +++ b/types/task.go @@ -0,0 +1,18 @@ +package types + +import ( + "encoding/json" + + "git.lastassault.de/speatzle/morffix/task" +) + +type TaskStart struct { + ID int `json:"id"` + FileID int `json:"file_id"` + Type int `json:"type"` + Data json.RawMessage +} + +type TaskStatus struct { + task.Task +} diff --git a/worker/task.go b/worker/task.go new file mode 100644 index 0000000..887aeb6 --- /dev/null +++ b/worker/task.go @@ -0,0 +1,66 @@ +package worker + +import ( + "context" + "encoding/json" + "fmt" + "sync" + + "git.lastassault.de/speatzle/morffix/constants" + "git.lastassault.de/speatzle/morffix/rpc" + "git.lastassault.de/speatzle/morffix/task" + "git.lastassault.de/speatzle/morffix/types" +) + +var tasks map[int]*task.Task +var taskMutex sync.Mutex + +func init() { + rpcServer.RegisterMethod("task-start", taskStart) + rpcServer.RegisterMethod("task-status", taskStatus) +} + +func taskStart(ctx context.Context, req rpc.Request) (any, error) { + var data types.TaskStart + err := json.Unmarshal(*req.Params, &data) + if err != nil { + return nil, fmt.Errorf("Unmarshal Task Start Params: %w", err) + } + + // TODO move to task module + taskMutex.Lock() + defer taskMutex.Unlock() + + switch data.Type { + case constants.TASK_TYPE_HEALTHCHECK: + tasks[data.ID] = &task.Task{ + ID: data.ID, + Type: data.Type, + FileID: data.FileID, + } + + var hData task.HealthCheckData + err := json.Unmarshal(data.Data, &hData) + if err != nil { + return nil, fmt.Errorf("Unmarshal Healthcheck Data: %w", err) + } + + tasks[data.ID].Status = constants.TASK_STATUS_RUNNING + go tasks[data.ID].RunHealthCheck(hData) + case constants.TASK_TYPE_TRANSCODE: + return nil, fmt.Errorf("Transcode Task Not Implemented") + default: + return nil, fmt.Errorf("Unknown Task Type %v", data.Type) + } + + return nil, nil +} + +func taskStatus(ctx context.Context, req rpc.Request) (any, error) { + var id int + err := json.Unmarshal(*req.Params, &id) + if err != nil { + return nil, fmt.Errorf("Unmarshal Task Status Params: %w", err) + } + return nil, nil +} From 0155c36af1d08aa4b56ed1f86213ce1c91a1cc87 Mon Sep 17 00:00:00 2001 From: Samuel Lorch Date: Thu, 9 May 2024 04:41:57 +0200 Subject: [PATCH 05/27] Worker Use UUID from Config --- worker/worker.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/worker/worker.go b/worker/worker.go index 670ef2a..54ef359 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -26,8 +26,15 @@ var rpcServer = rpc.NewServer() func Start(_conf config.Config) { conf = _conf ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - uuid := uuid.New() + // TODO Generate and Save a ID if the Config has none + //uuid := uuid.New() + uuid, err := uuid.Parse(conf.Worker.ID) + if err != nil { + slog.Error("Cannot Parse ID", "err", err) + return + } sigs := make(chan os.Signal, 1) signal.Notify(sigs, os.Interrupt) From 4e0df234f1de1c910e3931079284ebf4809de66f Mon Sep 17 00:00:00 2001 From: Samuel Lorch Date: Thu, 9 May 2024 04:42:24 +0200 Subject: [PATCH 06/27] Auto Create Workers in Database --- server/worker.go | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/server/worker.go b/server/worker.go index e79c2ea..fa5fd18 100644 --- a/server/worker.go +++ b/server/worker.go @@ -36,6 +36,21 @@ func handleWorkerWebsocket(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNotImplemented) return } + // Connection ID + uuid := r.Header.Get(constants.UUID_HEADER) + if uuid == "" { + w.WriteHeader(http.StatusBadRequest) + return + } + name := r.Header.Get(constants.NAME_HEADER) + + _, err := db.Exec(r.Context(), "INSERT INTO workers (id, name) VALUES ($1, $2) ON CONFLICT (id) DO UPDATE SET name = $2", uuid, name) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + slog.ErrorContext(r.Context(), "Error Upserting Worker", "err", err) + return + } + c, err := websocket.Accept(w, r, &websocket.AcceptOptions{ InsecureSkipVerify: true, }) @@ -45,9 +60,6 @@ func handleWorkerWebsocket(w http.ResponseWriter, r *http.Request) { } defer c.CloseNow() - // Connection ID - uuid := r.Header.Get(constants.UUID_HEADER) - // Track Connection func() { WorkersMutex.Lock() @@ -70,7 +82,7 @@ func handleWorkerWebsocket(w http.ResponseWriter, r *http.Request) { } Workers[uuid] = &Worker{ - Name: r.Header.Get(constants.NAME_HEADER), + Name: name, Address: r.RemoteAddr, Conn: c, Connected: true, From fa0f543feb8b98f7f8027035666c0077d7577920 Mon Sep 17 00:00:00 2001 From: Samuel Lorch Date: Thu, 9 May 2024 04:42:54 +0200 Subject: [PATCH 07/27] Fix Empty Librarys --- server/index.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/server/index.go b/server/index.go index 98ac8b0..ce0d850 100644 --- a/server/index.go +++ b/server/index.go @@ -65,9 +65,7 @@ func handleIndex(w http.ResponseWriter, r *http.Request) { var size int err := db.QueryRow(r.Context(), "SELECT SUM(size) AS size FROM files WHERE missing = $1", false).Scan(&size) if err != nil { - slog.ErrorContext(r.Context(), "Query File Size Sum", "err", err) - http.Error(w, "Error Query File Size Sum: "+err.Error(), http.StatusInternalServerError) - return + size = 0 } data.Counter = splitInt(size) From bbe103c4467a1a6f4f316482e3289b14f6459f6a Mon Sep 17 00:00:00 2001 From: Samuel Lorch Date: Thu, 9 May 2024 04:43:20 +0200 Subject: [PATCH 08/27] Add slog-multi --- go.mod | 5 ++++- go.sum | 8 ++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index f4b9a9b..1cf601b 100644 --- a/go.mod +++ b/go.mod @@ -18,8 +18,11 @@ require ( github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 // indirect github.com/jackc/puddle/v2 v2.2.1 // indirect + github.com/samber/lo v1.38.1 // indirect + github.com/samber/slog-multi v1.0.2 // indirect go.uber.org/atomic v1.11.0 // indirect - golang.org/x/crypto v0.22.0 // indirect + golang.org/x/crypto v0.23.0 // indirect + golang.org/x/exp v0.0.0-20230315142452-642cacee5cc0 // indirect golang.org/x/sync v0.7.0 // indirect golang.org/x/sys v0.20.0 // indirect golang.org/x/text v0.15.0 // indirect diff --git a/go.sum b/go.sum index 0a7c418..576771c 100644 --- a/go.sum +++ b/go.sum @@ -28,6 +28,10 @@ github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFr github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= github.com/mackerelio/go-osstat v0.2.4/go.mod h1:Zy+qzGdZs3A9cuIqmgbJvwbmLQH9dJvtio5ZjJTbdlQ= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM= +github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA= +github.com/samber/slog-multi v1.0.2 h1:6BVH9uHGAsiGkbbtQgAOQJMpKgV8unMrHhhJaw+X1EQ= +github.com/samber/slog-multi v1.0.2/go.mod h1:uLAvHpGqbYgX4FSL0p1ZwoLuveIAJvBECtE07XmYvFo= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -41,6 +45,10 @@ golang.org/x/crypto v0.20.0 h1:jmAMJJZXr5KiCw05dfYK9QnqaqKLYXijU23lsEdcQqg= golang.org/x/crypto v0.20.0/go.mod h1:Xwo95rrVNIoSMx9wa1JroENMToLWn3RNVrTBpLHgZPQ= golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30= golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M= +golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI= +golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= +golang.org/x/exp v0.0.0-20230315142452-642cacee5cc0 h1:pVgRXcIictcr+lBQIFeiwuwtDIs4eL21OuM9nyAADmo= +golang.org/x/exp v0.0.0-20230315142452-642cacee5cc0/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE= From bce27eddf3b594e1b1fc6b1d50112999ede7fe82 Mon Sep 17 00:00:00 2001 From: Samuel Lorch Date: Thu, 9 May 2024 04:43:58 +0200 Subject: [PATCH 09/27] Implement Task Logger --- task/log/log.go | 152 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 152 insertions(+) create mode 100644 task/log/log.go diff --git a/task/log/log.go b/task/log/log.go new file mode 100644 index 0000000..5372f60 --- /dev/null +++ b/task/log/log.go @@ -0,0 +1,152 @@ +package log + +import ( + "context" + "fmt" + "log/slog" + "runtime" + "sync" + "time" + + "git.lastassault.de/speatzle/morffix/types" + slogmulti "github.com/samber/slog-multi" +) + +func GetTaskLogger(t *types.Task) *slog.Logger { + return slog.New( + slogmulti.Fanout( + slog.Default().Handler(), + New(t, &Options{slog.LevelDebug}), + ), + ) +} + +// groupOrAttrs holds either a group name or a list of slog.Attrs. +type groupOrAttrs struct { + group string // group name if non-empty + attrs []slog.Attr // attrs if non-empty +} + +type TaskHandler struct { + opts Options + // TODO: state for WithGroup and WithAttrs + mu *sync.Mutex + t *types.Task + goas []groupOrAttrs +} + +type Options struct { + // Level reports the minimum level to log. + // Levels with lower levels are discarded. + // If nil, the Handler uses [slog.LevelInfo]. + Level slog.Leveler +} + +func New(t *types.Task, opts *Options) *TaskHandler { + h := &TaskHandler{t: t, mu: &sync.Mutex{}} + if opts != nil { + h.opts = *opts + } + if h.opts.Level == nil { + h.opts.Level = slog.LevelInfo + } + return h +} + +func (h *TaskHandler) Enabled(ctx context.Context, level slog.Level) bool { + return level >= h.opts.Level.Level() +} + +func (h *TaskHandler) Handle(ctx context.Context, r slog.Record) error { + buf := make([]byte, 0, 1024) + if !r.Time.IsZero() { + buf = h.appendAttr(buf, slog.Time(slog.TimeKey, r.Time)) + } + buf = h.appendAttr(buf, slog.Any(slog.LevelKey, r.Level)) + if r.PC != 0 { + fs := runtime.CallersFrames([]uintptr{r.PC}) + f, _ := fs.Next() + buf = h.appendAttr(buf, slog.String(slog.SourceKey, fmt.Sprintf("%s:%d", f.File, f.Line))) + } + buf = h.appendAttr(buf, slog.String(slog.MessageKey, r.Message)) + // Handle state from WithGroup and WithAttrs. + goas := h.goas + if r.NumAttrs() == 0 { + // If the record has no Attrs, remove groups at the end of the list; they are empty. + for len(goas) > 0 && goas[len(goas)-1].group != "" { + goas = goas[:len(goas)-1] + } + } + for _, goa := range goas { + if goa.group == "" { + for _, a := range goa.attrs { + buf = h.appendAttr(buf, a) + } + } + } + r.Attrs(func(a slog.Attr) bool { + buf = h.appendAttr(buf, a) + return true + }) + h.mu.Lock() + defer h.mu.Unlock() + h.t.Log = append(h.t.Log, string(buf)) + return nil +} + +func (h *TaskHandler) appendAttr(buf []byte, a slog.Attr) []byte { + // Resolve the Attr's value before doing anything else. + a.Value = a.Value.Resolve() + // Ignore empty Attrs. + if a.Equal(slog.Attr{}) { + return buf + } + + switch a.Value.Kind() { + case slog.KindString: + // Quote string values, to make them easy to parse. + buf = fmt.Appendf(buf, "%s: %q\n", a.Key, a.Value.String()) + case slog.KindTime: + // Write times in a standard way, without the monotonic time. + buf = fmt.Appendf(buf, "%s: %s\n", a.Key, a.Value.Time().Format(time.RFC3339Nano)) + case slog.KindGroup: + attrs := a.Value.Group() + // Ignore empty groups. + if len(attrs) == 0 { + return buf + } + // If the key is non-empty, write it out and indent the rest of the attrs. + // Otherwise, inline the attrs. + if a.Key != "" { + buf = fmt.Appendf(buf, "%s:\n", a.Key) + } + for _, ga := range attrs { + buf = h.appendAttr(buf, ga) + } + default: + buf = fmt.Appendf(buf, "%s: %s\n", a.Key, a.Value) + } + return buf +} + +func (h *TaskHandler) withGroupOrAttrs(goa groupOrAttrs) *TaskHandler { + h2 := *h + h2.goas = make([]groupOrAttrs, len(h.goas)+1) + copy(h2.goas, h.goas) + h2.goas[len(h2.goas)-1] = goa + return &h2 +} + +func (h *TaskHandler) WithGroup(name string) slog.Handler { + if name == "" { + return h + } + return h.withGroupOrAttrs(groupOrAttrs{group: name}) +} + +func (h *TaskHandler) WithAttrs(attrs []slog.Attr) slog.Handler { + if len(attrs) == 0 { + return h + } + return h.withGroupOrAttrs(groupOrAttrs{attrs: attrs}) +} From b17e55de7da5271554b6594fd8dda097fcf39eff Mon Sep 17 00:00:00 2001 From: Samuel Lorch Date: Thu, 9 May 2024 04:45:54 +0200 Subject: [PATCH 10/27] Move Task Starting into task --- task/task.go | 81 ++++++++++++++++++++++++++++++++++++++++++++++---- worker/task.go | 51 ++++++++++++------------------- 2 files changed, 94 insertions(+), 38 deletions(-) diff --git a/task/task.go b/task/task.go index 2b632c1..c0212d1 100644 --- a/task/task.go +++ b/task/task.go @@ -1,9 +1,78 @@ package task -type Task struct { - ID int `json:"id"` - FileID int `json:"file_id"` - Type int `json:"type"` - Status int `json:"status"` - Log []string `json:"log"` +import ( + "encoding/json" + "fmt" + "sync" + + "git.lastassault.de/speatzle/morffix/config" + "git.lastassault.de/speatzle/morffix/constants" + "git.lastassault.de/speatzle/morffix/types" +) + +var tasks = map[int]*types.Task{} +var taskMutex sync.Mutex + +func StartTask(conf config.Config, data types.TaskStart) error { + taskMutex.Lock() + defer taskMutex.Unlock() + + tasks[data.ID] = &types.Task{ + ID: data.ID, + Type: data.Type, + FileID: data.FileID, + } + + switch data.Type { + case constants.TASK_TYPE_HEALTHCHECK: + var hData HealthCheckData + err := json.Unmarshal(data.Data, &hData) + if err != nil { + return fmt.Errorf("Unmarshal Healthcheck Data: %w", err) + } + + tasks[data.ID].Status = constants.TASK_STATUS_RUNNING + go func() { + defer func() { + if tasks[data.ID].Status == constants.TASK_STATUS_RUNNING { + tasks[data.ID].Status = constants.TASK_STATUS_FAILED + tasks[data.ID].Log = append(tasks[data.ID].Log, "Task Status Set to Failed by defer") + } + }() + RunHealthCheck(conf, tasks[data.ID], hData) + }() + return nil + case constants.TASK_TYPE_TRANSCODE: + return fmt.Errorf("Transcode Task Not Implemented") + default: + return fmt.Errorf("Unknown Task Type %v", data.Type) + } +} + +func Get() []types.Task { + taskMutex.Lock() + defer taskMutex.Unlock() + + t := []types.Task{} + for i := range tasks { + t = append(t, *tasks[i]) + } + return t +} + +func DeleteTask(id int) error { + taskMutex.Lock() + defer taskMutex.Unlock() + + _, ok := tasks[id] + if !ok { + return fmt.Errorf("Task does not Exist") + } + + if tasks[id].Status == constants.TASK_STATUS_RUNNING { + return fmt.Errorf("Task is Currently Running") + } + + delete(tasks, id) + return nil } diff --git a/worker/task.go b/worker/task.go index 887aeb6..80be85c 100644 --- a/worker/task.go +++ b/worker/task.go @@ -4,20 +4,16 @@ import ( "context" "encoding/json" "fmt" - "sync" - "git.lastassault.de/speatzle/morffix/constants" "git.lastassault.de/speatzle/morffix/rpc" "git.lastassault.de/speatzle/morffix/task" "git.lastassault.de/speatzle/morffix/types" ) -var tasks map[int]*task.Task -var taskMutex sync.Mutex - func init() { rpcServer.RegisterMethod("task-start", taskStart) rpcServer.RegisterMethod("task-status", taskStatus) + rpcServer.RegisterMethod("task-delete", taskDelete) } func taskStart(ctx context.Context, req rpc.Request) (any, error) { @@ -27,40 +23,31 @@ func taskStart(ctx context.Context, req rpc.Request) (any, error) { return nil, fmt.Errorf("Unmarshal Task Start Params: %w", err) } - // TODO move to task module - taskMutex.Lock() - defer taskMutex.Unlock() - - switch data.Type { - case constants.TASK_TYPE_HEALTHCHECK: - tasks[data.ID] = &task.Task{ - ID: data.ID, - Type: data.Type, - FileID: data.FileID, - } - - var hData task.HealthCheckData - err := json.Unmarshal(data.Data, &hData) - if err != nil { - return nil, fmt.Errorf("Unmarshal Healthcheck Data: %w", err) - } - - tasks[data.ID].Status = constants.TASK_STATUS_RUNNING - go tasks[data.ID].RunHealthCheck(hData) - case constants.TASK_TYPE_TRANSCODE: - return nil, fmt.Errorf("Transcode Task Not Implemented") - default: - return nil, fmt.Errorf("Unknown Task Type %v", data.Type) + err = task.StartTask(conf, data) + if err != nil { + return nil, fmt.Errorf("Starting Task: %w", err) } return nil, nil } func taskStatus(ctx context.Context, req rpc.Request) (any, error) { - var id int - err := json.Unmarshal(*req.Params, &id) + return types.TaskStatus{ + Tasks: task.Get(), + }, nil +} + +func taskDelete(ctx context.Context, req rpc.Request) (any, error) { + var data int + err := json.Unmarshal(*req.Params, &data) if err != nil { - return nil, fmt.Errorf("Unmarshal Task Status Params: %w", err) + return nil, fmt.Errorf("Unmarshal Task Delete ID: %w", err) } + + err = task.DeleteTask(data) + if err != nil { + return nil, fmt.Errorf("Deleting Task: %w", err) + } + return nil, nil } From a31bd84eaae8f9b79922593a85921260e67ed427 Mon Sep 17 00:00:00 2001 From: Samuel Lorch Date: Thu, 9 May 2024 04:46:17 +0200 Subject: [PATCH 11/27] Add data and log task columns --- migrations/000004_create_task_table.up.sql | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/migrations/000004_create_task_table.up.sql b/migrations/000004_create_task_table.up.sql index 25e4e69..54f1a77 100644 --- a/migrations/000004_create_task_table.up.sql +++ b/migrations/000004_create_task_table.up.sql @@ -3,6 +3,8 @@ CREATE TABLE IF NOT EXISTS tasks( id serial PRIMARY KEY, worker_id uuid REFERENCES workers(id), file_id integer REFERENCES files(id) NOT NULL, + type smallint NOT NULL, status smallint NOT NULL, - type smallint NOT NULL + data JSONB, + log text[] ); \ No newline at end of file From 956c8f3a201f13da822b2298fb4c8da13d79897d Mon Sep 17 00:00:00 2001 From: Samuel Lorch Date: Thu, 9 May 2024 04:46:49 +0200 Subject: [PATCH 12/27] Implement ffmpeg building --- types/ffmpeg.go | 53 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 types/ffmpeg.go diff --git a/types/ffmpeg.go b/types/ffmpeg.go new file mode 100644 index 0000000..4f9bd99 --- /dev/null +++ b/types/ffmpeg.go @@ -0,0 +1,53 @@ +package types + +type FFmpegCommand struct { + Args []Arg `json:"args"` + InputFiles []File `json:"input_files"` + OutputFiles []File `json:"output_files"` +} + +type File struct { + Path string `json:"path"` + Arguments []Arg `json:"args"` +} + +type Arg struct { + Flag string `json:"flag"` + Value string `json:"value"` +} + +func (c *FFmpegCommand) GetArgs() []string { + args := []string{} + + for _, a := range c.Args { + args = append(args, a.Flag) + if a.Value != "" { + args = append(args, a.Value) + } + } + + for _, i := range c.InputFiles { + for _, a := range i.Arguments { + args = append(args, a.Flag) + if a.Value != "" { + args = append(args, a.Value) + } + } + args = append(args, "-i", i.Path) + } + + for _, i := range c.OutputFiles { + for _, a := range i.Arguments { + args = append(args, a.Flag) + if a.Value != "" { + args = append(args, a.Value) + } + } + args = append(args, i.Path) + } + + return args +} + +// ffmpeg -loglevel error -stats +// ffmpeg.exe -stats -v error -i "in.mp4" -f null -max_muxing_queue_size 9999 "out.mp4" From 04a1bb4ccbb183e4f7f5ec6a22ebac3954cc6416 Mon Sep 17 00:00:00 2001 From: Samuel Lorch Date: Thu, 9 May 2024 04:47:19 +0200 Subject: [PATCH 13/27] Remove old Options --- config/config.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/config/config.go b/config/config.go index cf61b60..a09884c 100644 --- a/config/config.go +++ b/config/config.go @@ -7,13 +7,13 @@ type Config struct { } type Server struct { - Address string - Database string - HealthCheckCommand string - TranscodeCommand string + Address string + Database string } type Worker struct { - Address string - Name string + ID string + Address string + Name string + FFmpegPath string } From 86ba486f4789ffe5942ec1c97f4f13c15d83f10b Mon Sep 17 00:00:00 2001 From: Samuel Lorch Date: Thu, 9 May 2024 04:47:39 +0200 Subject: [PATCH 14/27] Implement Healthcheck Command Execution --- task/healthcheck.go | 68 ++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 61 insertions(+), 7 deletions(-) diff --git a/task/healthcheck.go b/task/healthcheck.go index 19e0266..0cb2431 100644 --- a/task/healthcheck.go +++ b/task/healthcheck.go @@ -1,17 +1,71 @@ package task -import "git.lastassault.de/speatzle/morffix/constants" +import ( + "bufio" + "context" + "log/slog" + "os/exec" + "sync" + + "git.lastassault.de/speatzle/morffix/config" + "git.lastassault.de/speatzle/morffix/constants" + "git.lastassault.de/speatzle/morffix/task/log" + "git.lastassault.de/speatzle/morffix/types" +) type HealthCheckData struct { - Command string `json:"command"` + Command types.FFmpegCommand `json:"command"` } -func (t *Task) RunHealthCheck(data HealthCheckData) { - defer func() { - if t.Status == constants.TASK_STATUS_RUNNING { - t.Status = constants.TASK_STATUS_FAILED - t.Log = append(t.Log, "Task Status Failed by Defer") +func RunHealthCheck(conf config.Config, t *types.Task, data HealthCheckData) { + ctx := context.TODO() + l := log.GetTaskLogger(t) + l.InfoContext(ctx, "Running ffmpeg", "command", data.Command) + cmd := exec.CommandContext(ctx, conf.Worker.FFmpegPath, data.Command.GetArgs()...) + + var wg sync.WaitGroup + + stdout, err := cmd.StdoutPipe() + if err != nil { + l.ErrorContext(ctx, "Error getting StdoutPipe", "err", err) + return + } + + wg.Add(1) + outScanner := bufio.NewScanner(stdout) + go func() { + for outScanner.Scan() { + slog.InfoContext(ctx, outScanner.Text(), "pipe", "out") } + wg.Done() }() + stderr, err := cmd.StderrPipe() + if err != nil { + l.ErrorContext(ctx, "Error getting StderrPipe", "err", err) + return + } + wg.Add(1) + errScanner := bufio.NewScanner(stderr) + go func() { + for errScanner.Scan() { + slog.InfoContext(ctx, errScanner.Text(), "pipe", "err") + } + wg.Done() + }() + + err = cmd.Start() + if err != nil { + l.ErrorContext(ctx, "Error Starting ffmpeg", "err", err) + return + } + + wg.Wait() + + err = cmd.Wait() + if err != nil { + l.ErrorContext(ctx, "Error Running ffmpeg", "err", err) + return + } + t.Status = constants.TASK_STATUS_SUCCESS } From 799082bb896ef9dae07989941f65e3afb40600e3 Mon Sep 17 00:00:00 2001 From: Samuel Lorch Date: Thu, 9 May 2024 04:48:18 +0200 Subject: [PATCH 15/27] Add More Task States --- constants/constants.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/constants/constants.go b/constants/constants.go index 2d8d7b6..bfdffa6 100644 --- a/constants/constants.go +++ b/constants/constants.go @@ -11,16 +11,22 @@ const INDEX_TEMPLATE_NAME = "index.tmpl" const LIBRARIES_TEMPLATE_NAME = "libraries.tmpl" const LIBRARY_TEMPLATE_NAME = "library.tmpl" const MESSAGE_TEMPLATE_NAME = "message.tmpl" -const TASK_TEMPLATE_NAME = "tasks.tmpl" +const TASKS_TEMPLATE_NAME = "tasks.tmpl" +const TASK_TEMPLATE_NAME = "task.tmpl" const ( TASK_TYPE_HEALTHCHECK = iota TASK_TYPE_TRANSCODE ) +// Non Append Changes Need Worker Version Bump const ( TASK_STATUS_UNKNOWN = iota - TASK_STATUS_RUNNING TASK_STATUS_FAILED TASK_STATUS_SUCCESS + TASK_STATUS_RUNNING + TASK_STATUS_QUEUED + TASK_STATUS_ASSIGNED + TASK_STATUS_PAUSED + TASK_STATUS_WAITING ) From 45a2ef52f6706564e8d8d9b7b967db7ba33e16c9 Mon Sep 17 00:00:00 2001 From: Samuel Lorch Date: Thu, 9 May 2024 04:48:34 +0200 Subject: [PATCH 16/27] Remove redundant struct --- types/task.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/types/task.go b/types/task.go index d88958b..7e94a49 100644 --- a/types/task.go +++ b/types/task.go @@ -2,8 +2,6 @@ package types import ( "encoding/json" - - "git.lastassault.de/speatzle/morffix/task" ) type TaskStart struct { @@ -13,6 +11,14 @@ type TaskStart struct { Data json.RawMessage } -type TaskStatus struct { - task.Task +type Task struct { + ID int `json:"id"` + FileID int `json:"file_id"` + Type int `json:"type"` + Status int `json:"status"` + Log []string `json:"log"` +} + +type TaskStatus struct { + Tasks []Task `json:"tasks"` } From f31fdfd3b55092512e2ea691b4cbac12561487c8 Mon Sep 17 00:00:00 2001 From: Samuel Lorch Date: Thu, 9 May 2024 04:49:24 +0200 Subject: [PATCH 17/27] Implement worker management --- server/worker.go | 97 ++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 81 insertions(+), 16 deletions(-) diff --git a/server/worker.go b/server/worker.go index fa5fd18..bcbf4a0 100644 --- a/server/worker.go +++ b/server/worker.go @@ -8,6 +8,7 @@ import ( "time" "git.lastassault.de/speatzle/morffix/rpc" + "git.lastassault.de/speatzle/morffix/types" "git.lastassault.de/speatzle/morffix/constants" @@ -126,26 +127,90 @@ func readMessage(ctx context.Context, c *websocket.Conn) error { return nil } -func cleanupDeadWorkers(stop chan bool) { - ticker := time.NewTicker(time.Second) +func manageWorkers(stop chan bool) { + deadTicker := time.NewTicker(time.Second) + statusTicker := time.NewTicker(time.Second) + assignTicker := time.NewTicker(time.Second) + for { select { - case <-ticker.C: - func() { - WorkersMutex.Lock() - defer WorkersMutex.Unlock() - - for uuid, w := range Workers { - // TODO Not dead Workers are sometimes also killed? - if !w.Connected && w.ConnectionChanged.Add(time.Minute*5).Before(time.Now()) { - slog.Warn("Removing Dead Worker", "uuid", uuid, "name", w.Name) - delete(Workers, uuid) - // TODO Free any Jobs that the Worker had - } - } - }() + case <-deadTicker.C: + killDeadWorkers() + case <-statusTicker.C: + updateWorkerTaskStatus(context.TODO()) + case <-assignTicker.C: + err := assignQueuedTasks(context.TODO()) + if err != nil { + slog.Error("Assigning Queued Tasks", "err", err) + } case <-stop: return } } } + +func killDeadWorkers() { + WorkersMutex.Lock() + defer WorkersMutex.Unlock() + + for uuid, w := range Workers { + // TODO Not dead Workers are sometimes also killed? + if !w.Connected && w.ConnectionChanged.Add(time.Minute*5).Before(time.Now()) { + slog.Warn("Removing Dead Worker", "uuid", uuid, "name", w.Name) + delete(Workers, uuid) + // TODO Free any Jobs that the Worker had + } + } +} + +func updateWorkerTaskStatus(ctx context.Context) { + var wg sync.WaitGroup + + func() { + WorkersMutex.Lock() + defer WorkersMutex.Unlock() + + for uuid := range Workers { + if Workers[uuid].Connected { + w := Workers[uuid] + wg.Add(1) + + go func() { + defer wg.Done() + var status types.TaskStatus + _, err := rpcServer.Call(ctx, w.Conn, "task-status", nil, &status) + if err != nil { + slog.ErrorContext(ctx, "Error Getting Task Status", "err", err) + // TODO Mark Worker / Tasks as Unknown? + return + } + + for _, t := range status.Tasks { + // TODO check if this Task was even assigned to this Worker + + _, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = $3 WHERE id = $1", t.ID, t.Status, t.Log) + if err != nil { + slog.ErrorContext(ctx, "Error Updating Task Status", "err", err) + continue + } + + // Tell Worker to Delete Finished Tasks + if t.Status == constants.TASK_STATUS_FAILED || t.Status == constants.TASK_STATUS_SUCCESS { + _, err := rpcServer.Call(ctx, w.Conn, "task-delete", t.ID, nil) + if err != nil { + slog.ErrorContext(ctx, "Error Deleting Finished Task From Worker", "err", err) + continue + } + } + } + + // TODO Set Task Status to QUEUED For Unfinished Tasks which where assigned to this worker but where not in the status response + }() + } else { + // Set Task Status to Unknown for Unfinished Tasks which where assigned to this not connected worker + } + } + }() + + wg.Wait() +} From 7a0564a2107b1e8c8f6bab7f1dcafa221211c698 Mon Sep 17 00:00:00 2001 From: Samuel Lorch Date: Thu, 9 May 2024 04:49:46 +0200 Subject: [PATCH 18/27] Task Log display --- server/index.go | 6 +-- server/server.go | 5 +- server/task.go | 135 ++++++++++++++++++++++++++++++++++++++++++++--- tmpl/task.tmpl | 9 ++++ 4 files changed, 143 insertions(+), 12 deletions(-) create mode 100644 tmpl/task.tmpl diff --git a/server/index.go b/server/index.go index ce0d850..f9a7907 100644 --- a/server/index.go +++ b/server/index.go @@ -14,7 +14,7 @@ type IndexData struct { Counter []int Workers []IndexWorker - Tasks []Task + Tasks []TaskDisplay } type IndexWorker struct { @@ -22,7 +22,7 @@ type IndexWorker struct { Status *types.WorkerStatus } -type Task struct { +type TaskDisplay struct { ID int `db:"id"` Library int `db:"library"` Worker *string `db:"worker"` @@ -75,7 +75,7 @@ func handleIndex(w http.ResponseWriter, r *http.Request) { http.Error(w, "Error Query Tasks: "+err.Error(), http.StatusInternalServerError) return } - tasks, err := pgx.CollectRows[Task](rows, pgx.RowToStructByName[Task]) + tasks, err := pgx.CollectRows[TaskDisplay](rows, pgx.RowToStructByName[TaskDisplay]) if err != nil { slog.ErrorContext(r.Context(), "Executing index Template", "err", err) http.Error(w, "Error Query Tasks: "+err.Error(), http.StatusInternalServerError) diff --git a/server/server.go b/server/server.go index 9119dbd..0a5985d 100644 --- a/server/server.go +++ b/server/server.go @@ -74,7 +74,8 @@ func Start(_conf config.Config, tmplFS embed.FS, staticFS embed.FS, migrationsFS mux := http.NewServeMux() mux.HandleFunc("/worker", handleWorkerWebsocket) mux.Handle("/static/", fs) - mux.HandleFunc("/tasks", handleTask) + mux.HandleFunc("/tasks", handleTasks) + mux.HandleFunc("/tasks/{id}", handleTask) mux.HandleFunc("/scan/{id}", handleScan) mux.HandleFunc("/libraries/{id}", handleLibrary) mux.HandleFunc("/libraries", handleLibraries) @@ -102,7 +103,7 @@ func Start(_conf config.Config, tmplFS embed.FS, staticFS embed.FS, migrationsFS }() stopCleanup := make(chan bool, 1) - go cleanupDeadWorkers(stopCleanup) + go manageWorkers(stopCleanup) sigs := make(chan os.Signal, 1) signal.Notify(sigs, os.Interrupt) diff --git a/server/task.go b/server/task.go index a3f8000..682e7fd 100644 --- a/server/task.go +++ b/server/task.go @@ -3,20 +3,22 @@ package server import ( "bytes" "context" + "encoding/json" "fmt" "log/slog" "net/http" "git.lastassault.de/speatzle/morffix/constants" + "git.lastassault.de/speatzle/morffix/types" "github.com/jackc/pgx/v5" ) -type TaskData struct { +type TasksData struct { Libraries []Library - Tasks []Task + Tasks []TaskDisplay } -func handleTask(w http.ResponseWriter, r *http.Request) { +func handleTasks(w http.ResponseWriter, r *http.Request) { if r.Method == "POST" { err := createTask(r.Context(), r) if err != nil { @@ -26,7 +28,7 @@ func handleTask(w http.ResponseWriter, r *http.Request) { } } - data := TaskData{} + data := TasksData{} rows, err := db.Query(r.Context(), "SELECT id, name, path, enable FROM libraries WHERE enable = $1", true) if err != nil { @@ -48,7 +50,7 @@ func handleTask(w http.ResponseWriter, r *http.Request) { http.Error(w, "Error Query Tasks: "+err.Error(), http.StatusInternalServerError) return } - tasks, err := pgx.CollectRows[Task](rows, pgx.RowToStructByName[Task]) + tasks, err := pgx.CollectRows[TaskDisplay](rows, pgx.RowToStructByName[TaskDisplay]) if err != nil { slog.ErrorContext(r.Context(), "Collect Tasks", "err", err) http.Error(w, "Error Query Tasks: "+err.Error(), http.StatusInternalServerError) @@ -57,7 +59,40 @@ func handleTask(w http.ResponseWriter, r *http.Request) { data.Tasks = tasks buf := bytes.Buffer{} - err = templates.ExecuteTemplate(&buf, constants.TASK_TEMPLATE_NAME, data) + err = templates.ExecuteTemplate(&buf, constants.TASKS_TEMPLATE_NAME, data) + if err != nil { + slog.ErrorContext(r.Context(), "Executing Task Template", "err", err) + http.Error(w, "Error Executing Template: "+err.Error(), http.StatusInternalServerError) + return + } + + _, err = w.Write(buf.Bytes()) + if err != nil { + slog.ErrorContext(r.Context(), "Writing http Response", "err", err) + } +} + +func handleTask(w http.ResponseWriter, r *http.Request) { + id := r.PathValue("id") + if id == "" { + handleTasks(w, r) + return + } + + var log []string + err := db.QueryRow(r.Context(), "SELECT log FROM tasks WHERE id = $1", id).Scan(&log) + if err != nil { + slog.ErrorContext(r.Context(), "Query Tasks", "err", err) + http.Error(w, "Error Query Tasks: "+err.Error(), http.StatusInternalServerError) + return + } + + t := types.Task{ + Log: log, + } + + buf := bytes.Buffer{} + err = templates.ExecuteTemplate(&buf, constants.TASK_TEMPLATE_NAME, t) if err != nil { slog.ErrorContext(r.Context(), "Executing Task Template", "err", err) http.Error(w, "Error Executing Template: "+err.Error(), http.StatusInternalServerError) @@ -94,8 +129,19 @@ func createTask(ctx context.Context, r *http.Request) error { } defer tx.Rollback(ctx) + var data any + if true { // TODO typ == constants.TASK_TYPE_HEALTHCHECK { + + // ffmpeg.exe -stats -v error -i "in.mp4" -f null -max_muxing_queue_size 9999 "out.mp4" + data = types.FFmpegCommand{ + Args: []types.Arg{{Flag: "-stats"}, {Flag: "-v", Value: "error"}}, + InputFiles: []types.File{{Path: "input.mkv"}}, + OutputFiles: []types.File{{Path: "output.mkv", Arguments: []types.Arg{{Flag: "-f", Value: "null"}, {Flag: "-max_muxing_queue_size", Value: "9999"}}}}, + } + } + for _, file := range files { - _, err = db.Exec(r.Context(), "INSERT INTO tasks (file_id, type, status) VALUES ($1,$2,$3)", file.ID, typ, 0) + _, err = db.Exec(r.Context(), "INSERT INTO tasks (file_id, type, status, data) VALUES ($1,$2,$3,$4)", file.ID, typ, constants.TASK_STATUS_QUEUED, data) if err != nil { return fmt.Errorf("Inserting Task: %w", err) } @@ -108,3 +154,78 @@ func createTask(ctx context.Context, r *http.Request) error { return nil } + +type QueuedTask struct { + ID int + Type int + FileID int `json:"file_id"` + Data json.RawMessage +} + +func assignQueuedTasks(ctx context.Context) error { + rows, err := db.Query(ctx, "SELECT id, type, file_id, data FROM tasks WHERE status = $1", constants.TASK_STATUS_QUEUED) + if err != nil { + return fmt.Errorf("Query Queued Tasks: %w", err) + } + queuedTasks, err := pgx.CollectRows[QueuedTask](rows, pgx.RowToStructByName[QueuedTask]) + if err != nil { + return fmt.Errorf("Collect Queued Tasks: %w", err) + } + + if len(queuedTasks) == 0 { + return nil + } + + WorkersMutex.Lock() + defer WorkersMutex.Unlock() + + lastAssigned := 0 + + for i := range Workers { + if lastAssigned == len(queuedTasks)-1 { + // All Tasks have been Assigned + return nil + } + if Workers[i].Connected { + var count int + err := db.QueryRow(ctx, "SELECT COUNT(*) FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4 OR status = $5)", i, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING, constants.TASK_STATUS_UNKNOWN).Scan(&count) + if err != nil { + return fmt.Errorf("Error Querying Worker Task Count: %w", err) + } + + // Allow for Multiple Tasks at once in the future + if count < 1 { + tx, err := db.Begin(ctx) + defer tx.Rollback(ctx) + if err != nil { + return fmt.Errorf("Starting Transaction: %w", err) + } + _, err = tx.Exec(ctx, "UPDATE tasks SET status = $2, worker_id = $3 WHERE id = $1", queuedTasks[lastAssigned].ID, constants.TASK_STATUS_ASSIGNED, i) + if err != nil { + return fmt.Errorf("Setting tasks Assignment: %w", err) + } + + taskStart := types.TaskStart{ + ID: queuedTasks[lastAssigned].ID, + Type: queuedTasks[lastAssigned].Type, + FileID: queuedTasks[lastAssigned].FileID, + Data: queuedTasks[lastAssigned].Data, + } + + _, err = rpcServer.Call(ctx, Workers[i].Conn, "task-start", taskStart, nil) + if err != nil { + return fmt.Errorf("Error Starting Task: %w", err) + } + + err = tx.Commit(ctx) + if err != nil { + return fmt.Errorf("Error Committing Transaction: %w", err) + } + + slog.InfoContext(ctx, "Assigned Task to Worker", "task", queuedTasks[lastAssigned].ID, "worker", i) + lastAssigned++ + } + } + } + return nil +} diff --git a/tmpl/task.tmpl b/tmpl/task.tmpl new file mode 100644 index 0000000..118bc2d --- /dev/null +++ b/tmpl/task.tmpl @@ -0,0 +1,9 @@ +{{template "head"}} +

Task {{.ID}}

+ +
+ {{range $t := .Log}} + {{ $t }}
+ {{end}} +
+{{template "tail"}} \ No newline at end of file From 9979a1eba8489eb20f9398f8f7461f8ff48a15eb Mon Sep 17 00:00:00 2001 From: speatzle Date: Thu, 9 May 2024 19:06:51 +0200 Subject: [PATCH 19/27] fix logs --- task/healthcheck.go | 48 +++++++++++++++++++++++++++++++++++++-------- task/log/log.go | 9 +-------- 2 files changed, 41 insertions(+), 16 deletions(-) diff --git a/task/healthcheck.go b/task/healthcheck.go index 0cb2431..824d609 100644 --- a/task/healthcheck.go +++ b/task/healthcheck.go @@ -2,8 +2,8 @@ package task import ( "bufio" + "bytes" "context" - "log/slog" "os/exec" "sync" @@ -13,15 +13,43 @@ import ( "git.lastassault.de/speatzle/morffix/types" ) -type HealthCheckData struct { - Command types.FFmpegCommand `json:"command"` +// dropCR drops a terminal \r from the data. +func dropCR(data []byte) []byte { + if len(data) > 0 && data[len(data)-1] == '\r' { + return data[0 : len(data)-1] + } + return data } -func RunHealthCheck(conf config.Config, t *types.Task, data HealthCheckData) { +func scanLines(data []byte, atEOF bool) (advance int, token []byte, err error) { + if atEOF && len(data) == 0 { + return 0, nil, nil + } + if i := bytes.IndexByte(data, '\n'); i >= 0 { + // We have a full newline-terminated line. + return i + 1, dropCR(data[0:i]), nil + } + if i := bytes.IndexByte(data, '\r'); i >= 0 { + // We have a return line. + return i + 1, dropCR(data[0:i]), nil + } + // If we're at EOF, we have a final, non-terminated line. Return it. + if atEOF { + return len(data), dropCR(data), nil + } + // Request more data. + return 0, nil, nil +} + +func RunHealthCheck(conf config.Config, t *types.Task, data types.HealthCheckData) { ctx := context.TODO() l := log.GetTaskLogger(t) - l.InfoContext(ctx, "Running ffmpeg", "command", data.Command) - cmd := exec.CommandContext(ctx, conf.Worker.FFmpegPath, data.Command.GetArgs()...) + l.InfoContext(ctx, "Running ffmpeg", "args", data.Command.GetArgs()) + path := conf.Worker.FFmpegPath + if path == "" { + path = "ffmpeg" + } + cmd := exec.CommandContext(ctx, path, data.Command.GetArgs()...) var wg sync.WaitGroup @@ -33,9 +61,10 @@ func RunHealthCheck(conf config.Config, t *types.Task, data HealthCheckData) { wg.Add(1) outScanner := bufio.NewScanner(stdout) + outScanner.Split(scanLines) go func() { for outScanner.Scan() { - slog.InfoContext(ctx, outScanner.Text(), "pipe", "out") + l.InfoContext(ctx, outScanner.Text()) } wg.Done() }() @@ -47,9 +76,10 @@ func RunHealthCheck(conf config.Config, t *types.Task, data HealthCheckData) { } wg.Add(1) errScanner := bufio.NewScanner(stderr) + errScanner.Split(scanLines) go func() { for errScanner.Scan() { - slog.InfoContext(ctx, errScanner.Text(), "pipe", "err") + l.InfoContext(ctx, errScanner.Text()) } wg.Done() }() @@ -67,5 +97,7 @@ func RunHealthCheck(conf config.Config, t *types.Task, data HealthCheckData) { l.ErrorContext(ctx, "Error Running ffmpeg", "err", err) return } + + l.InfoContext(ctx, "Task Success") t.Status = constants.TASK_STATUS_SUCCESS } diff --git a/task/log/log.go b/task/log/log.go index 5372f60..6ef30dc 100644 --- a/task/log/log.go +++ b/task/log/log.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "log/slog" - "runtime" "sync" "time" @@ -62,12 +61,6 @@ func (h *TaskHandler) Handle(ctx context.Context, r slog.Record) error { if !r.Time.IsZero() { buf = h.appendAttr(buf, slog.Time(slog.TimeKey, r.Time)) } - buf = h.appendAttr(buf, slog.Any(slog.LevelKey, r.Level)) - if r.PC != 0 { - fs := runtime.CallersFrames([]uintptr{r.PC}) - f, _ := fs.Next() - buf = h.appendAttr(buf, slog.String(slog.SourceKey, fmt.Sprintf("%s:%d", f.File, f.Line))) - } buf = h.appendAttr(buf, slog.String(slog.MessageKey, r.Message)) // Handle state from WithGroup and WithAttrs. goas := h.goas @@ -108,7 +101,7 @@ func (h *TaskHandler) appendAttr(buf []byte, a slog.Attr) []byte { buf = fmt.Appendf(buf, "%s: %q\n", a.Key, a.Value.String()) case slog.KindTime: // Write times in a standard way, without the monotonic time. - buf = fmt.Appendf(buf, "%s: %s\n", a.Key, a.Value.Time().Format(time.RFC3339Nano)) + buf = fmt.Appendf(buf, "%s: %s\n", a.Key, a.Value.Time().Format(time.DateTime)) case slog.KindGroup: attrs := a.Value.Group() // Ignore empty groups. From 755b650df56101daf9053c0ed280eabd482ea04c Mon Sep 17 00:00:00 2001 From: speatzle Date: Thu, 9 May 2024 19:07:17 +0200 Subject: [PATCH 20/27] Remove Index Task display --- server/index.go | 25 ------------------------- tmpl/index.tmpl | 36 ------------------------------------ 2 files changed, 61 deletions(-) diff --git a/server/index.go b/server/index.go index f9a7907..b736a00 100644 --- a/server/index.go +++ b/server/index.go @@ -7,14 +7,12 @@ import ( "git.lastassault.de/speatzle/morffix/constants" "git.lastassault.de/speatzle/morffix/types" - "github.com/jackc/pgx/v5" ) type IndexData struct { Counter []int Workers []IndexWorker - Tasks []TaskDisplay } type IndexWorker struct { @@ -22,15 +20,6 @@ type IndexWorker struct { Status *types.WorkerStatus } -type TaskDisplay struct { - ID int `db:"id"` - Library int `db:"library"` - Worker *string `db:"worker"` - Type int `db:"type"` - Status int `db:"status"` - File string `db:"file"` -} - func handleIndex(w http.ResponseWriter, r *http.Request) { data := IndexData{ @@ -69,20 +58,6 @@ func handleIndex(w http.ResponseWriter, r *http.Request) { } data.Counter = splitInt(size) - rows, err := db.Query(r.Context(), "SELECT t.id AS id, l.id AS library, t.worker_id AS worker, t.type AS type, t.status AS status, f.path AS file FROM tasks t INNER JOIN files f ON f.id = t.file_id INNER JOIN libraries l ON l.id = f.library_id") - if err != nil { - slog.ErrorContext(r.Context(), "Query Tasks", "err", err) - http.Error(w, "Error Query Tasks: "+err.Error(), http.StatusInternalServerError) - return - } - tasks, err := pgx.CollectRows[TaskDisplay](rows, pgx.RowToStructByName[TaskDisplay]) - if err != nil { - slog.ErrorContext(r.Context(), "Executing index Template", "err", err) - http.Error(w, "Error Query Tasks: "+err.Error(), http.StatusInternalServerError) - return - } - data.Tasks = tasks - buf := bytes.Buffer{} err = templates.ExecuteTemplate(&buf, constants.INDEX_TEMPLATE_NAME, data) if err != nil { diff --git a/tmpl/index.tmpl b/tmpl/index.tmpl index 46049b7..644beba 100644 --- a/tmpl/index.tmpl +++ b/tmpl/index.tmpl @@ -72,40 +72,4 @@ {{end}} -

-

Tasks

-
- - - - - - - - - - {{range $t := .Tasks}} - - - - - - - - - {{end}} -
IDLibraryWorkerTypeStatusFile
- {{ $t.ID }} - - {{ $t.Library }} - - {{ $t.Worker }} - - {{ $t.Type }} - - {{ $t.Status }} - - {{ $t.File }} -
-
{{template "tail"}} \ No newline at end of file From a9adb5ebdde0d09ad4244be507015b940aae7100 Mon Sep 17 00:00:00 2001 From: speatzle Date: Thu, 9 May 2024 19:08:10 +0200 Subject: [PATCH 21/27] Show Task Status better --- constants/constants.go | 29 +++++++++++++++++++++++++- server/task.go | 46 +++++++++++++++++++++++++++++++++++------- tmpl/tasks.tmpl | 12 +++++++++++ 3 files changed, 79 insertions(+), 8 deletions(-) diff --git a/constants/constants.go b/constants/constants.go index bfdffa6..f50fc3a 100644 --- a/constants/constants.go +++ b/constants/constants.go @@ -1,5 +1,7 @@ package constants +import "fmt" + const WORKER_VERSION = "v1" const WORKER_VERSION_HEADER = "morffix-version" @@ -19,9 +21,11 @@ const ( TASK_TYPE_TRANSCODE ) +type TaskStatus int + // Non Append Changes Need Worker Version Bump const ( - TASK_STATUS_UNKNOWN = iota + TASK_STATUS_UNKNOWN TaskStatus = iota TASK_STATUS_FAILED TASK_STATUS_SUCCESS TASK_STATUS_RUNNING @@ -30,3 +34,26 @@ const ( TASK_STATUS_PAUSED TASK_STATUS_WAITING ) + +func (s TaskStatus) String() string { + switch s { + case TASK_STATUS_UNKNOWN: + return "Unknown" + case TASK_STATUS_FAILED: + return "Failed" + case TASK_STATUS_SUCCESS: + return "Success" + case TASK_STATUS_RUNNING: + return "Running" + case TASK_STATUS_QUEUED: + return "Queued" + case TASK_STATUS_ASSIGNED: + return "Assigned" + case TASK_STATUS_PAUSED: + return "Paused" + case TASK_STATUS_WAITING: + return "Waiting" + default: + return fmt.Sprintf("%d", int(s)) + } +} diff --git a/server/task.go b/server/task.go index 682e7fd..aafc7ee 100644 --- a/server/task.go +++ b/server/task.go @@ -18,6 +18,24 @@ type TasksData struct { Tasks []TaskDisplay } +type TaskDisplay struct { + ID int `db:"id"` + Library int `db:"library"` + Worker *string `db:"worker"` + Type int `db:"type"` + Status string `db:"status"` + File string `db:"file"` +} + +type TaskDB struct { + ID int `db:"id"` + Library int `db:"library"` + Worker *string `db:"worker"` + Type int `db:"type"` + Status constants.TaskStatus `db:"status"` + File string `db:"file"` +} + func handleTasks(w http.ResponseWriter, r *http.Request) { if r.Method == "POST" { err := createTask(r.Context(), r) @@ -50,13 +68,22 @@ func handleTasks(w http.ResponseWriter, r *http.Request) { http.Error(w, "Error Query Tasks: "+err.Error(), http.StatusInternalServerError) return } - tasks, err := pgx.CollectRows[TaskDisplay](rows, pgx.RowToStructByName[TaskDisplay]) + tasks, err := pgx.CollectRows[TaskDB](rows, pgx.RowToStructByName[TaskDB]) if err != nil { slog.ErrorContext(r.Context(), "Collect Tasks", "err", err) http.Error(w, "Error Query Tasks: "+err.Error(), http.StatusInternalServerError) return } - data.Tasks = tasks + for i := range tasks { + data.Tasks = append(data.Tasks, TaskDisplay{ + ID: tasks[i].ID, + Library: tasks[i].Library, + Worker: tasks[i].Worker, + Type: tasks[i].Type, + File: tasks[i].File, + Status: tasks[i].Status.String(), + }) + } buf := bytes.Buffer{} err = templates.ExecuteTemplate(&buf, constants.TASKS_TEMPLATE_NAME, data) @@ -133,11 +160,11 @@ func createTask(ctx context.Context, r *http.Request) error { if true { // TODO typ == constants.TASK_TYPE_HEALTHCHECK { // ffmpeg.exe -stats -v error -i "in.mp4" -f null -max_muxing_queue_size 9999 "out.mp4" - data = types.FFmpegCommand{ - Args: []types.Arg{{Flag: "-stats"}, {Flag: "-v", Value: "error"}}, + data = types.HealthCheckData{Command: types.FFmpegCommand{ + Args: []types.Arg{{Flag: "-stats"}, {Flag: "-v", Value: "error"}, {Flag: "-xerror"}}, InputFiles: []types.File{{Path: "input.mkv"}}, OutputFiles: []types.File{{Path: "output.mkv", Arguments: []types.Arg{{Flag: "-f", Value: "null"}, {Flag: "-max_muxing_queue_size", Value: "9999"}}}}, - } + }} } for _, file := range files { @@ -172,6 +199,8 @@ func assignQueuedTasks(ctx context.Context) error { return fmt.Errorf("Collect Queued Tasks: %w", err) } + //slog.Info("Assigning Queued Tasks", "count", len(queuedTasks)) + if len(queuedTasks) == 0 { return nil } @@ -182,17 +211,20 @@ func assignQueuedTasks(ctx context.Context) error { lastAssigned := 0 for i := range Workers { - if lastAssigned == len(queuedTasks)-1 { + if lastAssigned == len(queuedTasks) { + slog.Info("All Tasks assigned") // All Tasks have been Assigned return nil } if Workers[i].Connected { var count int - err := db.QueryRow(ctx, "SELECT COUNT(*) FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4 OR status = $5)", i, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING, constants.TASK_STATUS_UNKNOWN).Scan(&count) + err := db.QueryRow(ctx, "SELECT COUNT(*) FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4)", i, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING).Scan(&count) if err != nil { return fmt.Errorf("Error Querying Worker Task Count: %w", err) } + slog.Info("Assigning Queued Tasks Worker", "worker", i, "count", count) + // Allow for Multiple Tasks at once in the future if count < 1 { tx, err := db.Begin(ctx) diff --git a/tmpl/tasks.tmpl b/tmpl/tasks.tmpl index b270ee3..6baee53 100644 --- a/tmpl/tasks.tmpl +++ b/tmpl/tasks.tmpl @@ -19,7 +19,10 @@ + + + {{range $t := .Tasks}} @@ -27,9 +30,18 @@ + + + From d185784cd142d63c63b654f89a192e04fa9d26aa Mon Sep 17 00:00:00 2001 From: speatzle Date: Thu, 9 May 2024 19:08:57 +0200 Subject: [PATCH 22/27] Move to types --- types/healtcheck.go | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 types/healtcheck.go diff --git a/types/healtcheck.go b/types/healtcheck.go new file mode 100644 index 0000000..1299ccf --- /dev/null +++ b/types/healtcheck.go @@ -0,0 +1,5 @@ +package types + +type HealthCheckData struct { + Command FFmpegCommand `json:"command"` +} From 12e83aa8d22184af7a8fac7f8d9589af9bd8752b Mon Sep 17 00:00:00 2001 From: speatzle Date: Thu, 9 May 2024 19:09:48 +0200 Subject: [PATCH 23/27] Ask For Task Status Seperatly and logs incrementally --- server/worker.go | 55 ++++++++++++++++++++++++++++++------------------ task/task.go | 20 ++++++++++++------ types/task.go | 19 +++++++++++------ worker/task.go | 13 +++++++++++- 4 files changed, 73 insertions(+), 34 deletions(-) diff --git a/server/worker.go b/server/worker.go index bcbf4a0..17a44e4 100644 --- a/server/worker.go +++ b/server/worker.go @@ -9,6 +9,7 @@ import ( "git.lastassault.de/speatzle/morffix/rpc" "git.lastassault.de/speatzle/morffix/types" + "github.com/jackc/pgx/v5" "git.lastassault.de/speatzle/morffix/constants" @@ -173,41 +174,53 @@ func updateWorkerTaskStatus(ctx context.Context) { for uuid := range Workers { if Workers[uuid].Connected { w := Workers[uuid] - wg.Add(1) - go func() { - defer wg.Done() - var status types.TaskStatus - _, err := rpcServer.Call(ctx, w.Conn, "task-status", nil, &status) - if err != nil { - slog.ErrorContext(ctx, "Error Getting Task Status", "err", err) - // TODO Mark Worker / Tasks as Unknown? - return - } + rows, err := db.Query(ctx, "SELECT id, COALESCE(CARDINALITY(log),0) as log_offset FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4)", uuid, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING) + if err != nil { + slog.ErrorContext(ctx, "Error Getting Tasks for Worker", "err", err, "worker_id", uuid) + return + } - for _, t := range status.Tasks { - // TODO check if this Task was even assigned to this Worker + taskStatusRequests, err := pgx.CollectRows[types.TaskStatusRequest](rows, pgx.RowToStructByName[types.TaskStatusRequest]) + if err != nil { + slog.ErrorContext(ctx, "Collect Task Status Rows", "err", err) + return + } - _, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = $3 WHERE id = $1", t.ID, t.Status, t.Log) + for _, tsr := range taskStatusRequests { + wg.Add(1) + + go func() { + defer wg.Done() + var ts types.TaskStatus + _, err := rpcServer.Call(ctx, w.Conn, "task-status", tsr, &ts) + if err != nil { + slog.ErrorContext(ctx, "Error Getting Task Status", "err", err) + // TODO Mark Task as Unknown? + return + } + + _, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", ts.Task.ID, ts.Task.Status, ts.Task.Log) if err != nil { slog.ErrorContext(ctx, "Error Updating Task Status", "err", err) - continue + return } // Tell Worker to Delete Finished Tasks - if t.Status == constants.TASK_STATUS_FAILED || t.Status == constants.TASK_STATUS_SUCCESS { - _, err := rpcServer.Call(ctx, w.Conn, "task-delete", t.ID, nil) + if ts.Task.Status == constants.TASK_STATUS_FAILED || ts.Task.Status == constants.TASK_STATUS_SUCCESS { + _, err := rpcServer.Call(ctx, w.Conn, "task-delete", ts.Task.ID, nil) if err != nil { slog.ErrorContext(ctx, "Error Deleting Finished Task From Worker", "err", err) - continue + return } } - } - // TODO Set Task Status to QUEUED For Unfinished Tasks which where assigned to this worker but where not in the status response - }() + }() + } + // TODO Handle tasks with status unkown assigned to this worker + } else { - // Set Task Status to Unknown for Unfinished Tasks which where assigned to this not connected worker + // Set Task Status to Unknown for Unfinished Tasks which where assigned to this not connected worker, right now they just get stuck } } }() diff --git a/task/task.go b/task/task.go index c0212d1..56416d0 100644 --- a/task/task.go +++ b/task/task.go @@ -25,7 +25,7 @@ func StartTask(conf config.Config, data types.TaskStart) error { switch data.Type { case constants.TASK_TYPE_HEALTHCHECK: - var hData HealthCheckData + var hData types.HealthCheckData err := json.Unmarshal(data.Data, &hData) if err != nil { return fmt.Errorf("Unmarshal Healthcheck Data: %w", err) @@ -49,15 +49,23 @@ func StartTask(conf config.Config, data types.TaskStart) error { } } -func Get() []types.Task { +func Get(r types.TaskStatusRequest) (*types.Task, error) { taskMutex.Lock() defer taskMutex.Unlock() - t := []types.Task{} - for i := range tasks { - t = append(t, *tasks[i]) + t, ok := tasks[r.ID] + if !ok { + return nil, fmt.Errorf("Task does not Exist") } - return t + + res := *t + + // Send only new logs if there are any + if len(res.Log) >= r.LogOffset { + res.Log = res.Log[r.LogOffset:] + } + + return &res, nil } func DeleteTask(id int) error { diff --git a/types/task.go b/types/task.go index 7e94a49..e47fce2 100644 --- a/types/task.go +++ b/types/task.go @@ -2,6 +2,8 @@ package types import ( "encoding/json" + + "git.lastassault.de/speatzle/morffix/constants" ) type TaskStart struct { @@ -12,13 +14,18 @@ type TaskStart struct { } type Task struct { - ID int `json:"id"` - FileID int `json:"file_id"` - Type int `json:"type"` - Status int `json:"status"` - Log []string `json:"log"` + ID int `json:"id"` + FileID int `json:"file_id"` + Type int `json:"type"` + Status constants.TaskStatus `json:"status"` + Log []string `json:"log"` } type TaskStatus struct { - Tasks []Task `json:"tasks"` + Task Task `json:"task"` +} + +type TaskStatusRequest struct { + ID int `json:"id" db:"id"` + LogOffset int `json:"log_offset" db:"log_offset"` } diff --git a/worker/task.go b/worker/task.go index 80be85c..b909a9b 100644 --- a/worker/task.go +++ b/worker/task.go @@ -32,8 +32,19 @@ func taskStart(ctx context.Context, req rpc.Request) (any, error) { } func taskStatus(ctx context.Context, req rpc.Request) (any, error) { + var data types.TaskStatusRequest + err := json.Unmarshal(*req.Params, &data) + if err != nil { + return nil, fmt.Errorf("Unmarshal Task Status Request: %w", err) + } + + t, err := task.Get(data) + if err != nil { + return nil, fmt.Errorf("Get Task Status: %w", err) + } + return types.TaskStatus{ - Tasks: task.Get(), + Task: *t, }, nil } From 69b2d96c92d679fedad3f91dbe7980b225e7be5f Mon Sep 17 00:00:00 2001 From: speatzle Date: Thu, 9 May 2024 19:12:59 +0200 Subject: [PATCH 24/27] fix duplicate log --- task/task.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/task/task.go b/task/task.go index 56416d0..df7c51a 100644 --- a/task/task.go +++ b/task/task.go @@ -63,6 +63,8 @@ func Get(r types.TaskStatusRequest) (*types.Task, error) { // Send only new logs if there are any if len(res.Log) >= r.LogOffset { res.Log = res.Log[r.LogOffset:] + } else { + res.Log = []string{} } return &res, nil From da66c2fdf93c2dd7cf0b08c358031902125da459 Mon Sep 17 00:00:00 2001 From: speatzle Date: Thu, 9 May 2024 20:05:56 +0200 Subject: [PATCH 25/27] add config tempdir and default values --- config/config.go | 1 + main.go | 8 ++++++++ 2 files changed, 9 insertions(+) diff --git a/config/config.go b/config/config.go index a09884c..44d8a30 100644 --- a/config/config.go +++ b/config/config.go @@ -13,6 +13,7 @@ type Server struct { type Worker struct { ID string + TempDir string Address string Name string FFmpegPath string diff --git a/main.go b/main.go index 13c67af..5eb7672 100644 --- a/main.go +++ b/main.go @@ -40,6 +40,14 @@ func main() { return } + if conf.Worker.FFmpegPath == "" { + conf.Worker.FFmpegPath = "ffmpeg" + } + + if conf.Worker.TempDir == "" { + conf.Worker.TempDir = "/tmp" + } + if *isserver { slog.Info("Starting Server...") server.Start(conf, templates, static, migrations) From 445c1321ba2c48255d5313ea1e99b5d97fe512aa Mon Sep 17 00:00:00 2001 From: speatzle Date: Thu, 9 May 2024 20:06:30 +0200 Subject: [PATCH 26/27] serve library files for download --- server/file.go | 26 ++++++++++++++++++++++++++ server/server.go | 1 + 2 files changed, 27 insertions(+) create mode 100644 server/file.go diff --git a/server/file.go b/server/file.go new file mode 100644 index 0000000..3fb7c84 --- /dev/null +++ b/server/file.go @@ -0,0 +1,26 @@ +package server + +import ( + "log/slog" + "net/http" +) + +func handleFile(w http.ResponseWriter, r *http.Request) { + id := r.PathValue("id") + if id == "" { + http.Error(w, "No id", http.StatusBadRequest) + return + } + + // TODO check if worker is working on a task involving this file + + var path string + err := db.QueryRow(r.Context(), "SELECT path FROM files WHERE id = $1", id).Scan(&path) + if err != nil { + http.Error(w, "Error Getting Path: "+err.Error(), http.StatusBadRequest) + slog.ErrorContext(r.Context(), "Getting Path", "err", err) + return + } + + http.ServeFile(w, r, path) +} diff --git a/server/server.go b/server/server.go index 0a5985d..7992fba 100644 --- a/server/server.go +++ b/server/server.go @@ -75,6 +75,7 @@ func Start(_conf config.Config, tmplFS embed.FS, staticFS embed.FS, migrationsFS mux.HandleFunc("/worker", handleWorkerWebsocket) mux.Handle("/static/", fs) mux.HandleFunc("/tasks", handleTasks) + mux.HandleFunc("/files/{id}", handleFile) mux.HandleFunc("/tasks/{id}", handleTask) mux.HandleFunc("/scan/{id}", handleScan) mux.HandleFunc("/libraries/{id}", handleLibrary) From 8dc7d907a8b7f346ff841589e33d75776eb8ec8c Mon Sep 17 00:00:00 2001 From: speatzle Date: Thu, 9 May 2024 20:07:38 +0200 Subject: [PATCH 27/27] Download file for healthcheck --- task/healthcheck.go | 64 +++++++++++++++++++++++++++++++++++++++++---- worker/worker.go | 2 +- 2 files changed, 60 insertions(+), 6 deletions(-) diff --git a/task/healthcheck.go b/task/healthcheck.go index 824d609..5d2fa3e 100644 --- a/task/healthcheck.go +++ b/task/healthcheck.go @@ -4,7 +4,12 @@ import ( "bufio" "bytes" "context" + "fmt" + "io" + "net/http" + "os" "os/exec" + "path/filepath" "sync" "git.lastassault.de/speatzle/morffix/config" @@ -44,12 +49,61 @@ func scanLines(data []byte, atEOF bool) (advance int, token []byte, err error) { func RunHealthCheck(conf config.Config, t *types.Task, data types.HealthCheckData) { ctx := context.TODO() l := log.GetTaskLogger(t) - l.InfoContext(ctx, "Running ffmpeg", "args", data.Command.GetArgs()) - path := conf.Worker.FFmpegPath - if path == "" { - path = "ffmpeg" + + // TODO Figure out how to get correct file ending + path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("%v-%v.mkv", t.ID, t.FileID)) + + // Set ffmpeg input path + if len(data.Command.InputFiles) == 0 { + l.ErrorContext(ctx, "FFmpeg Command has no input files") + return } - cmd := exec.CommandContext(ctx, path, data.Command.GetArgs()...) + + data.Command.InputFiles[0].Path = path + + // TODO cleanup file when done + defer func() { + err := os.Remove(path) + if err != nil { + l.ErrorContext(ctx, "Removing File", "err", err, "path", path) + } else { + l.ErrorContext(ctx, "File Removed Succesfully", "path", path) + } + }() + + l.InfoContext(ctx, "Starting File Download", "task_id", t.ID, "file_id", t.FileID, "path", path) + + err := func() error { + out, err := os.Create(path) + if err != nil { + return fmt.Errorf("Creating File: %w", err) + } + defer out.Close() + resp, err := http.Get(fmt.Sprintf("%v/files/%v", conf.Worker.Address, t.FileID)) + if err != nil { + return fmt.Errorf("Getting File: %w", err) + } + + // TODO Log at interval logs read + + defer resp.Body.Close() + n, err := io.Copy(out, resp.Body) + if err != nil { + return fmt.Errorf("Reading File: %w", err) + } + + l.InfoContext(ctx, "Downloaded File", "bytes", n) + + return nil + }() + + if err != nil { + l.ErrorContext(ctx, "File Download Failed", "err", err) + return + } + + l.InfoContext(ctx, "Running ffmpeg", "args", data.Command.GetArgs()) + cmd := exec.CommandContext(ctx, conf.Worker.FFmpegPath, data.Command.GetArgs()...) var wg sync.WaitGroup diff --git a/worker/worker.go b/worker/worker.go index 54ef359..07dbad6 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -68,7 +68,7 @@ func connectToServer(ctx context.Context, uuid uuid.UUID) { headers.Add(constants.UUID_HEADER, uuid.String()) headers.Add(constants.WORKER_VERSION_HEADER, constants.WORKER_VERSION) - c, res, err := websocket.Dial(ctx, conf.Worker.Address, &websocket.DialOptions{ + c, res, err := websocket.Dial(ctx, conf.Worker.Address+"/worker", &websocket.DialOptions{ HTTPHeader: headers, }) if err != nil {
IDLibraryWorker TypeStatus File
{{ $t.ID }} + {{ $t.Library }} + + {{ $t.Worker }} + {{ $t.Type }} + {{ $t.Status }} + {{ $t.File }}