diff --git a/.forgejo/workflows/build.yaml b/.forgejo/workflows/build.yaml index 2f63773..da58767 100644 --- a/.forgejo/workflows/build.yaml +++ b/.forgejo/workflows/build.yaml @@ -1,13 +1,26 @@ on: [push] jobs: - release: + test: runs-on: docker steps: - uses: actions/checkout@v4 - uses: actions/setup-go@v5 with: go-version: '1.22' - - run: GOOS=linux GOARCH=amd64 go build + - run: go build + - uses: forgejo/upload-artifact@v3 + with: + name: morffix + path: morffix + release: + runs-on: docker + if: github.event_name == 'push' && contains(github.ref, 'refs/tags/') + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-go@v5 + with: + go-version: '1.22' + - run: go build - run: mkdir out - run: cp morffix out/morffix - uses: actions/forgejo-release@eb0fcc44a150c0de82e6fdb36752dd56bf27d017 diff --git a/README.md b/README.md index dea61ce..f67a607 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,2 @@ # Morffix -Morph and fix your Media distributedly with ffmpeg +Morph and fix you Media distributedly with ffmpeg diff --git a/constants/error.go b/constants/error.go index bb3e278..c375e01 100644 --- a/constants/error.go +++ b/constants/error.go @@ -3,5 +3,3 @@ package constants import "fmt" var ErrTaskDoesNotExist = fmt.Errorf("Task does not Exist") -var ErrTaskIsAlreadyRunning = fmt.Errorf("Task is Already Running") -var ErrRPCRequestTimeout = fmt.Errorf("Request timed out") diff --git a/migrations/000015_alter_tasks_table_log_offset.down.sql b/migrations/000015_alter_tasks_table_log_offset.down.sql deleted file mode 100644 index bfbd448..0000000 --- a/migrations/000015_alter_tasks_table_log_offset.down.sql +++ /dev/null @@ -1,2 +0,0 @@ -ALTER TABLE tasks -DROP IF EXISTS log_offset; diff --git a/migrations/000015_alter_tasks_table_log_offset.up.sql b/migrations/000015_alter_tasks_table_log_offset.up.sql deleted file mode 100644 index 7543ea9..0000000 --- a/migrations/000015_alter_tasks_table_log_offset.up.sql +++ /dev/null @@ -1,2 +0,0 @@ -ALTER TABLE tasks -ADD log_offset integer NOT NULL DEFAULT COALESCE(CARDINALITY(log),0); diff --git a/rpc/call.go b/rpc/call.go index 037f384..71696de 100644 --- a/rpc/call.go +++ b/rpc/call.go @@ -6,7 +6,6 @@ import ( "fmt" "time" - "git.lastassault.de/speatzle/morffix/constants" "github.com/google/uuid" "nhooyr.io/websocket" ) @@ -90,7 +89,7 @@ func (s *server) Call(ctx context.Context, c *websocket.Conn, method string, par // remove request from map delete(s.requests, id) - return nil, constants.ErrRPCRequestTimeout + return nil, fmt.Errorf("Request timed out") } } diff --git a/server/task.go b/server/task.go index d86d899..8161ca1 100644 --- a/server/task.go +++ b/server/task.go @@ -4,13 +4,11 @@ import ( "bytes" "context" "encoding/json" - "errors" "fmt" "log/slog" "net/http" "slices" "strconv" - "strings" "time" "git.lastassault.de/speatzle/morffix/constants" @@ -360,7 +358,7 @@ func assignQueuedTasks(ctx context.Context) error { } if Workers[i].Connected { var count int - err := db.QueryRow(ctx, "SELECT COUNT(*) FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4 OR status = $5)", i, constants.TASK_STATUS_UNKNOWN, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING).Scan(&count) + err := db.QueryRow(ctx, "SELECT COUNT(*) FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4)", i, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING).Scan(&count) if err != nil { return fmt.Errorf("Error Querying Worker Task Count: %w", err) } @@ -389,23 +387,7 @@ func assignQueuedTasks(ctx context.Context) error { } _, err = rpcServer.Call(ctx, Workers[i].Conn, "task-start", taskStart, nil) - if strings.HasSuffix(err.Error(), constants.ErrTaskIsAlreadyRunning.Error()) { - // Task was started previously but something went wrong and we are out of sync - slog.WarnContext(ctx, "Task is apparently already Running on this Worker, thats bad", "task_id", taskStart.ID, "worker", Workers[i].Name) - - _, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_ASSIGNED, []string{fmt.Sprintf("%v MASTER: Task Start, Task Already Running!", time.Now())}) - if err != nil { - return fmt.Errorf("Updating Task status during already running error: %w", err) - } - } else if errors.Is(err, constants.ErrRPCRequestTimeout) { - // We really don't know whats going on, might be slow response, oom, disk full or a bug - slog.WarnContext(ctx, "Task start Timed Out", "task_id", taskStart.ID, "worker", Workers[i].Name) - - _, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Task Start RPC Call Timed Out!", time.Now())}) - if err != nil { - return fmt.Errorf("Updating Unknown Task Status due to Timeout while starting Task: %w", err) - } - } else if err != nil { + if err != nil { return fmt.Errorf("Error Starting Task: %w", err) } diff --git a/server/worker.go b/server/worker.go index ed8322e..573afea 100644 --- a/server/worker.go +++ b/server/worker.go @@ -2,8 +2,6 @@ package server import ( "context" - "errors" - "fmt" "log/slog" "net/http" "strings" @@ -182,7 +180,7 @@ func updateWorkerTaskStatus(ctx context.Context) { if Workers[uuid].Connected { w := Workers[uuid] - rows, err := db.Query(ctx, "SELECT id, log_offset FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4 OR status = $5)", uuid, constants.TASK_STATUS_UNKNOWN, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING) + rows, err := db.Query(ctx, "SELECT id, COALESCE(CARDINALITY(log),0) as log_offset FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4)", uuid, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING) if err != nil { slog.ErrorContext(ctx, "Error Getting Tasks for Worker", "err", err, "worker_id", uuid) return @@ -209,7 +207,7 @@ func updateWorkerTaskStatus(ctx context.Context) { if strings.HasSuffix(err.Error(), constants.ErrTaskDoesNotExist.Error()) { // Worker says it does not know of this task, mark it failed so that we don't asks the worker about it again and again slog.ErrorContext(ctx, "Task is unknown by worker, Failing...", "err", err, "id", taskID) - _, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_FAILED, []string{fmt.Sprintf("%v MASTER: Task Failed because it is unknown to Assigned Worker", time.Now())}) + _, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_FAILED, []string{"Task Failed because it is unknown to Assigned Worker"}) if err != nil { slog.ErrorContext(ctx, "Updating Failed Task Status", "err", err) return @@ -217,17 +215,10 @@ func updateWorkerTaskStatus(ctx context.Context) { slog.Info("Updating task done", "id", taskID, "status", constants.TASK_STATUS_FAILED) return - } else if errors.Is(err, constants.ErrRPCRequestTimeout) { - // We really don't know whats going on, might be slow response, oom, disk full or a bug - _, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Status RPC Call Timed Out!", time.Now())}) - if err != nil { - slog.ErrorContext(ctx, "Updating Unknown Task Status due to Timeout", "err", err) - return - } } else { slog.ErrorContext(ctx, "Getting Task Status", "err", err) - _, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Status RPC Call Error: %v", time.Now(), err.Error())}) + _, err = db.Exec(ctx, "UPDATE tasks SET status = $2 WHERE id = $1", taskID, constants.TASK_STATUS_UNKNOWN) if err != nil { slog.ErrorContext(ctx, "Updating Unknown Task Status", "err", err) return @@ -236,7 +227,7 @@ func updateWorkerTaskStatus(ctx context.Context) { } } - _, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3, log_offset = log_offset + $4 WHERE id = $1", taskID, ts.Task.Status, ts.Task.Log, len(ts.Task.Log)) + _, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, ts.Task.Status, ts.Task.Log) if err != nil { slog.ErrorContext(ctx, "Updating Task Status", "err", err) return diff --git a/systemd/morffix-worker.service b/systemd/morffix-worker.service deleted file mode 100644 index be423a2..0000000 --- a/systemd/morffix-worker.service +++ /dev/null @@ -1,18 +0,0 @@ -[Unit] -Description=morffix worker -ConditionPathExists=/opt/morffix/morffix -After=network.target -Wants=morffix.service - -[Service] -Type=simple -User=root -LimitNOFILE=1024 -Restart=on-failure -RestartSec=10 -StartLimitIntervalSec=60 -WorkingDirectory=/opt/morffix -ExecStart=/opt/morffix/morffix - -[Install] -WantedBy=multi-user.target \ No newline at end of file diff --git a/systemd/morffix.service b/systemd/morffix.service deleted file mode 100644 index 72c7d43..0000000 --- a/systemd/morffix.service +++ /dev/null @@ -1,19 +0,0 @@ -[Unit] -Description=morffix -ConditionPathExists=/opt/morffix/morffix -After=network.target -Wants=postgresql@15-main.service - -[Service] -Type=simple -User=root -Group=media_library -LimitNOFILE=1024 -Restart=on-failure -RestartSec=10 -StartLimitIntervalSec=60 -WorkingDirectory=/opt/morffix -ExecStart=/opt/morffix/morffix -server - -[Install] -WantedBy=multi-user.target \ No newline at end of file diff --git a/task/healthcheck.go b/task/healthcheck.go index 9039634..a8ab6a2 100644 --- a/task/healthcheck.go +++ b/task/healthcheck.go @@ -17,7 +17,7 @@ func RunHealthCheck(conf config.Config, t *types.Task, data types.HealthCheckDat l := log.GetTaskLogger(t) // TODO Figure out how to get correct file ending - path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-health-%v-%v.mkv", t.ID, t.FileID)) + path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("%v-%v.mkv", t.ID, t.FileID)) // Set ffmpeg input path if len(t.FfmpegCommand.InputFiles) == 0 { diff --git a/task/task.go b/task/task.go index f6c687c..6e2ad61 100644 --- a/task/task.go +++ b/task/task.go @@ -17,11 +17,6 @@ func StartTask(conf config.Config, data types.TaskStart) error { taskMutex.Lock() defer taskMutex.Unlock() - _, ok := tasks[data.ID] - if ok { - return constants.ErrTaskIsAlreadyRunning - } - tasks[data.ID] = &types.Task{ ID: data.ID, Type: data.Type, diff --git a/task/transcode.go b/task/transcode.go index 9d6a143..3a1e863 100644 --- a/task/transcode.go +++ b/task/transcode.go @@ -17,8 +17,8 @@ func RunTranscode(conf config.Config, t *types.Task, data types.TranscodeData) { l := log.GetTaskLogger(t) // TODO Figure out how to get correct file ending - src_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-src-%v-%v.mkv", t.ID, t.FileID)) - dst_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-dst-%v-%v.mkv", t.ID, t.FileID)) + src_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("src-%v-%v.mkv", t.ID, t.FileID)) + dst_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("dst-%v-%v.mkv", t.ID, t.FileID)) // Set ffmpeg input path if len(t.FfmpegCommand.InputFiles) == 0 { diff --git a/worker/worker.go b/worker/worker.go index 411c872..07dbad6 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -5,7 +5,6 @@ import ( "io" "log/slog" "net/http" - "path/filepath" "git.lastassault.de/speatzle/morffix/rpc" @@ -37,20 +36,6 @@ func Start(_conf config.Config) { return } - slog.InfoContext(ctx, "Cleaning tmp Files...") - files, err := filepath.Glob("/tmp/morffix-*") - if err != nil { - slog.Error("Get tmp Files", "err", err) - return - } - for _, f := range files { - slog.InfoContext(ctx, "Deleting File", "path", f) - if err := os.Remove(f); err != nil { - slog.Error("Deleting tmp File", "err", err, "path", f) - return - } - } - sigs := make(chan os.Signal, 1) signal.Notify(sigs, os.Interrupt) exit := false