diff --git a/.forgejo/workflows/build.yaml b/.forgejo/workflows/build.yaml
new file mode 100644
index 0000000..2f63773
--- /dev/null
+++ b/.forgejo/workflows/build.yaml
@@ -0,0 +1,23 @@
+on: [push]
+jobs:
+ release:
+ runs-on: docker
+ steps:
+ - uses: actions/checkout@v4
+ - uses: actions/setup-go@v5
+ with:
+ go-version: '1.22'
+ - run: GOOS=linux GOARCH=amd64 go build
+ - run: mkdir out
+ - run: cp morffix out/morffix
+ - uses: actions/forgejo-release@eb0fcc44a150c0de82e6fdb36752dd56bf27d017
+ with:
+ direction: upload
+ url: https://git.soontm.de
+ release-dir: out
+ release-notes: "New Release"
+ token: ${{ secrets.GITHUB_TOKEN }}
+ override: true
+
+
+
\ No newline at end of file
diff --git a/README.md b/README.md
index f67a607..dea61ce 100644
--- a/README.md
+++ b/README.md
@@ -1,2 +1,2 @@
# Morffix
-Morph and fix you Media distributedly with ffmpeg
+Morph and fix your Media distributedly with ffmpeg
diff --git a/constants/constants.go b/constants/constants.go
index 99736a3..dfcb9ab 100644
--- a/constants/constants.go
+++ b/constants/constants.go
@@ -87,6 +87,7 @@ const (
FILE_STATUS_MISSING
FILE_STATUS_EXISTS
FILE_STATUS_CHANGED
+ FILE_STATUS_NEW
)
func (s FileStatus) String() string {
@@ -99,6 +100,8 @@ func (s FileStatus) String() string {
return "Exists"
case FILE_STATUS_CHANGED:
return "Changed"
+ case FILE_STATUS_NEW:
+ return "New"
default:
return fmt.Sprintf("%d", int(s))
}
diff --git a/constants/error.go b/constants/error.go
index c375e01..bb3e278 100644
--- a/constants/error.go
+++ b/constants/error.go
@@ -3,3 +3,5 @@ package constants
import "fmt"
var ErrTaskDoesNotExist = fmt.Errorf("Task does not Exist")
+var ErrTaskIsAlreadyRunning = fmt.Errorf("Task is Already Running")
+var ErrRPCRequestTimeout = fmt.Errorf("Request timed out")
diff --git a/migrations/000015_alter_files_table_hash.down.sql b/migrations/000015_alter_files_table_hash.down.sql
new file mode 100644
index 0000000..ee61bdc
--- /dev/null
+++ b/migrations/000015_alter_files_table_hash.down.sql
@@ -0,0 +1,2 @@
+ALTER TABLE files
+ALTER COLUMN hash bigint SET NOT NULL;
\ No newline at end of file
diff --git a/migrations/000015_alter_files_table_hash.up.sql b/migrations/000015_alter_files_table_hash.up.sql
new file mode 100644
index 0000000..c282260
--- /dev/null
+++ b/migrations/000015_alter_files_table_hash.up.sql
@@ -0,0 +1,2 @@
+ALTER TABLE files
+ALTER COLUMN hash bigint DROP NOT NULL;
\ No newline at end of file
diff --git a/migrations/000015_alter_tasks_table_log_offset.down.sql b/migrations/000015_alter_tasks_table_log_offset.down.sql
new file mode 100644
index 0000000..bfbd448
--- /dev/null
+++ b/migrations/000015_alter_tasks_table_log_offset.down.sql
@@ -0,0 +1,2 @@
+ALTER TABLE tasks
+DROP IF EXISTS log_offset;
diff --git a/migrations/000015_alter_tasks_table_log_offset.up.sql b/migrations/000015_alter_tasks_table_log_offset.up.sql
new file mode 100644
index 0000000..1e6a5d5
--- /dev/null
+++ b/migrations/000015_alter_tasks_table_log_offset.up.sql
@@ -0,0 +1,2 @@
+ALTER TABLE tasks
+ADD log_offset integer NOT NULL DEFAULT 0;
diff --git a/rpc/call.go b/rpc/call.go
index 71696de..037f384 100644
--- a/rpc/call.go
+++ b/rpc/call.go
@@ -6,6 +6,7 @@ import (
"fmt"
"time"
+ "git.lastassault.de/speatzle/morffix/constants"
"github.com/google/uuid"
"nhooyr.io/websocket"
)
@@ -89,7 +90,7 @@ func (s *server) Call(ctx context.Context, c *websocket.Conn, method string, par
// remove request from map
delete(s.requests, id)
- return nil, fmt.Errorf("Request timed out")
+ return nil, constants.ErrRPCRequestTimeout
}
}
diff --git a/server/scan.go b/server/scan.go
index 4e9d49e..5d3bb1a 100644
--- a/server/scan.go
+++ b/server/scan.go
@@ -3,10 +3,8 @@ package server
import (
"bytes"
"context"
- "crypto/md5"
"errors"
"fmt"
- "io"
"log/slog"
"net/http"
"os"
@@ -31,8 +29,6 @@ func handleScan(w http.ResponseWriter, r *http.Request) {
return
}
- full := r.FormValue("full") == "on"
-
var name string
var path string
var enabled bool
@@ -44,7 +40,7 @@ func handleScan(w http.ResponseWriter, r *http.Request) {
}
scanCtx := context.Background()
- go scan(scanCtx, id, full)
+ go scan(scanCtx, id)
message := "Scan Started"
@@ -79,7 +75,7 @@ func scanStatus(w http.ResponseWriter, r *http.Request) {
}
}
-func scan(ctx context.Context, id string, full bool) {
+func scan(ctx context.Context, id string) {
slog.InfoContext(ctx, "Starting Scan", "id", id)
// TODO Scan settings:
@@ -150,35 +146,36 @@ func scan(ctx context.Context, id string, full bool) {
slog.InfoContext(ctx, "Skipping non video file", "path", fullPath)
return nil
}
- slog.InfoContext(ctx, "Hashing File", "path", fullPath, "size", info.Size())
+ /*
+ slog.InfoContext(ctx, "Hashing File", "path", fullPath, "size", info.Size())
- file, err := os.Open(fullPath)
- if err != nil {
- return fmt.Errorf("Opening File: %w", err)
- }
+ file, err := os.Open(fullPath)
+ if err != nil {
+ return fmt.Errorf("Opening File: %w", err)
+ }
- hash := md5.New()
- if _, err := io.Copy(hash, file); err != nil {
- return fmt.Errorf("Reading File: %w", err)
- }
- newMD5 := hash.Sum(nil)
-
- slog.InfoContext(ctx, "File MD5", "path", fullPath, "size", info.Size(), "md5", newMD5)
+ hash := md5.New()
+ if _, err := io.Copy(hash, file); err != nil {
+ return fmt.Errorf("Reading File: %w", err)
+ }
+ newMD5 := hash.Sum(nil)
+ slog.InfoContext(ctx, "File MD5", "path", fullPath, "size", info.Size(), "md5", newMD5)
+ */
fPath, err := filepath.Rel(lpath, fullPath)
if err != nil {
return fmt.Errorf("Getting Relative Path: %w", err)
}
var fileID int
- var oldMD5 []byte
+ var size uint
var health constants.FileHealth
- err = tx.QueryRow(ctx, "SELECT id, md5, health FROM files WHERE library_id = $1 AND path = $2", id, fPath).Scan(&fileID, &oldMD5, &health)
+ err = tx.QueryRow(ctx, "SELECT id, size, health FROM files WHERE library_id = $1 AND path = $2", id, fPath).Scan(&fileID, &size, &health)
if errors.Is(err, pgx.ErrNoRows) {
// File Does not Exist Yet
slog.InfoContext(ctx, "File is New", "path", fullPath)
- _, err = tx.Exec(ctx, "INSERT INTO files (library_id, path, size, status, health, md5) VALUES ($1, $2, $3, $4, $5, $6)", id, fPath, info.Size(), constants.FILE_STATUS_EXISTS, constants.FILE_HEALTH_UNKNOWN, newMD5)
+ _, err = tx.Exec(ctx, "INSERT INTO files (library_id, path, size, status, health) VALUES ($1, $2, $3, $4, $5)", id, fPath, info.Size(), constants.FILE_STATUS_NEW, constants.FILE_HEALTH_UNKNOWN)
if err != nil {
return fmt.Errorf("Add New File to DB: %w", err)
}
@@ -187,12 +184,14 @@ func scan(ctx context.Context, id string, full bool) {
return fmt.Errorf("Getting File: %w", err)
}
- if slices.Compare[[]byte](newMD5, oldMD5) != 0 {
- // File has changed on disk so reset health
- health = constants.FILE_HEALTH_UNKNOWN
- }
+ /*
+ if slices.Compare[[]byte](newMD5, oldMD5) != 0 {
+ // File has changed on disk so reset health
+ health = constants.FILE_HEALTH_UNKNOWN
+ }
+ */
// File Exists so update Size, status and hash
- _, err = tx.Exec(ctx, "UPDATE files SET size = $2, status = $3, health = $4, md5 = $5 WHERE id = $1", fileID, info.Size(), constants.FILE_STATUS_EXISTS, health, newMD5)
+ _, err = tx.Exec(ctx, "UPDATE files SET size = $2, status = $3, health = $4 WHERE id = $1", fileID, info.Size(), constants.FILE_STATUS_EXISTS, health)
if err != nil {
return fmt.Errorf("Updating File in DB: %w", err)
}
diff --git a/server/task.go b/server/task.go
index 8161ca1..4eb2aeb 100644
--- a/server/task.go
+++ b/server/task.go
@@ -4,11 +4,13 @@ import (
"bytes"
"context"
"encoding/json"
+ "errors"
"fmt"
"log/slog"
"net/http"
"slices"
"strconv"
+ "strings"
"time"
"git.lastassault.de/speatzle/morffix/constants"
@@ -258,6 +260,7 @@ func createTask(ctx context.Context, r *http.Request) error {
}
library := r.FormValue("library")
health := r.FormValue("health")
+ transcode := r.FormValue("transcode")
typ, err := strconv.Atoi(r.FormValue("type"))
if err != nil {
return fmt.Errorf("Parsing Task Type: %w", err)
@@ -269,7 +272,7 @@ func createTask(ctx context.Context, r *http.Request) error {
slog.Info("Got Task Create", "library", library, "health", health, "type", typ)
- rows, err := db.Query(r.Context(), "SELECT id, path, size, status, health, transcode, md5, updated_at FROM files where library_id = $1 AND status = $2 AND (-1 = $3 OR health = $3)", library, constants.FILE_STATUS_EXISTS, health)
+ rows, err := db.Query(r.Context(), "SELECT id, path, size, status, health, transcode, md5, updated_at FROM files where library_id = $1 AND status = $2 AND (-1 = $3 OR health = $3) AND (-1 = $4 OR transcode = $4)", library, constants.FILE_STATUS_EXISTS, health, transcode)
if err != nil {
return fmt.Errorf("Query Files: %w", err)
}
@@ -358,7 +361,7 @@ func assignQueuedTasks(ctx context.Context) error {
}
if Workers[i].Connected {
var count int
- err := db.QueryRow(ctx, "SELECT COUNT(*) FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4)", i, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING).Scan(&count)
+ err := db.QueryRow(ctx, "SELECT COUNT(*) FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4 OR status = $5)", i, constants.TASK_STATUS_UNKNOWN, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING).Scan(&count)
if err != nil {
return fmt.Errorf("Error Querying Worker Task Count: %w", err)
}
@@ -388,7 +391,30 @@ func assignQueuedTasks(ctx context.Context) error {
_, err = rpcServer.Call(ctx, Workers[i].Conn, "task-start", taskStart, nil)
if err != nil {
- return fmt.Errorf("Error Starting Task: %w", err)
+ if strings.HasSuffix(err.Error(), constants.ErrTaskIsAlreadyRunning.Error()) {
+ // Task was started previously but something went wrong and we are out of sync
+ slog.WarnContext(ctx, "Task is apparently already Running on this Worker, thats bad", "task_id", taskStart.ID, "worker", Workers[i].Name)
+
+ _, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_ASSIGNED, []string{fmt.Sprintf("%v MASTER: Task Start, Task Already Running!", time.Now())})
+ if err != nil {
+ return fmt.Errorf("Updating Task status during already running error: %w", err)
+ }
+ } else if errors.Is(err, constants.ErrRPCRequestTimeout) {
+ // We really don't know whats going on, might be slow response, oom, disk full or a bug
+ slog.WarnContext(ctx, "Task start Timed Out", "task_id", taskStart.ID, "worker", Workers[i].Name)
+
+ _, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Task Start RPC Call Timed Out!", time.Now())})
+ if err != nil {
+ return fmt.Errorf("Updating Unknown Task Status due to Timeout while starting Task: %w", err)
+ }
+ } else {
+ slog.ErrorContext(ctx, "Task start Error", "task_id", taskStart.ID, "worker", Workers[i].Name)
+
+ _, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Task Start Error: %v", time.Now(), err.Error())})
+ if err != nil {
+ return fmt.Errorf("Updating Unknown Task Status due to Error while starting Task: %w", err)
+ }
+ }
}
err = tx.Commit(ctx)
diff --git a/server/worker.go b/server/worker.go
index 573afea..ed8322e 100644
--- a/server/worker.go
+++ b/server/worker.go
@@ -2,6 +2,8 @@ package server
import (
"context"
+ "errors"
+ "fmt"
"log/slog"
"net/http"
"strings"
@@ -180,7 +182,7 @@ func updateWorkerTaskStatus(ctx context.Context) {
if Workers[uuid].Connected {
w := Workers[uuid]
- rows, err := db.Query(ctx, "SELECT id, COALESCE(CARDINALITY(log),0) as log_offset FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4)", uuid, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING)
+ rows, err := db.Query(ctx, "SELECT id, log_offset FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4 OR status = $5)", uuid, constants.TASK_STATUS_UNKNOWN, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING)
if err != nil {
slog.ErrorContext(ctx, "Error Getting Tasks for Worker", "err", err, "worker_id", uuid)
return
@@ -207,7 +209,7 @@ func updateWorkerTaskStatus(ctx context.Context) {
if strings.HasSuffix(err.Error(), constants.ErrTaskDoesNotExist.Error()) {
// Worker says it does not know of this task, mark it failed so that we don't asks the worker about it again and again
slog.ErrorContext(ctx, "Task is unknown by worker, Failing...", "err", err, "id", taskID)
- _, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_FAILED, []string{"Task Failed because it is unknown to Assigned Worker"})
+ _, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_FAILED, []string{fmt.Sprintf("%v MASTER: Task Failed because it is unknown to Assigned Worker", time.Now())})
if err != nil {
slog.ErrorContext(ctx, "Updating Failed Task Status", "err", err)
return
@@ -215,10 +217,17 @@ func updateWorkerTaskStatus(ctx context.Context) {
slog.Info("Updating task done", "id", taskID, "status", constants.TASK_STATUS_FAILED)
return
+ } else if errors.Is(err, constants.ErrRPCRequestTimeout) {
+ // We really don't know whats going on, might be slow response, oom, disk full or a bug
+ _, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Status RPC Call Timed Out!", time.Now())})
+ if err != nil {
+ slog.ErrorContext(ctx, "Updating Unknown Task Status due to Timeout", "err", err)
+ return
+ }
} else {
slog.ErrorContext(ctx, "Getting Task Status", "err", err)
- _, err = db.Exec(ctx, "UPDATE tasks SET status = $2 WHERE id = $1", taskID, constants.TASK_STATUS_UNKNOWN)
+ _, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Status RPC Call Error: %v", time.Now(), err.Error())})
if err != nil {
slog.ErrorContext(ctx, "Updating Unknown Task Status", "err", err)
return
@@ -227,7 +236,7 @@ func updateWorkerTaskStatus(ctx context.Context) {
}
}
- _, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, ts.Task.Status, ts.Task.Log)
+ _, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3, log_offset = log_offset + $4 WHERE id = $1", taskID, ts.Task.Status, ts.Task.Log, len(ts.Task.Log))
if err != nil {
slog.ErrorContext(ctx, "Updating Task Status", "err", err)
return
diff --git a/systemd/morffix-worker.service b/systemd/morffix-worker.service
new file mode 100644
index 0000000..be423a2
--- /dev/null
+++ b/systemd/morffix-worker.service
@@ -0,0 +1,18 @@
+[Unit]
+Description=morffix worker
+ConditionPathExists=/opt/morffix/morffix
+After=network.target
+Wants=morffix.service
+
+[Service]
+Type=simple
+User=root
+LimitNOFILE=1024
+Restart=on-failure
+RestartSec=10
+StartLimitIntervalSec=60
+WorkingDirectory=/opt/morffix
+ExecStart=/opt/morffix/morffix
+
+[Install]
+WantedBy=multi-user.target
\ No newline at end of file
diff --git a/systemd/morffix.service b/systemd/morffix.service
new file mode 100644
index 0000000..72c7d43
--- /dev/null
+++ b/systemd/morffix.service
@@ -0,0 +1,19 @@
+[Unit]
+Description=morffix
+ConditionPathExists=/opt/morffix/morffix
+After=network.target
+Wants=postgresql@15-main.service
+
+[Service]
+Type=simple
+User=root
+Group=media_library
+LimitNOFILE=1024
+Restart=on-failure
+RestartSec=10
+StartLimitIntervalSec=60
+WorkingDirectory=/opt/morffix
+ExecStart=/opt/morffix/morffix -server
+
+[Install]
+WantedBy=multi-user.target
\ No newline at end of file
diff --git a/task/healthcheck.go b/task/healthcheck.go
index a8ab6a2..9039634 100644
--- a/task/healthcheck.go
+++ b/task/healthcheck.go
@@ -17,7 +17,7 @@ func RunHealthCheck(conf config.Config, t *types.Task, data types.HealthCheckDat
l := log.GetTaskLogger(t)
// TODO Figure out how to get correct file ending
- path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("%v-%v.mkv", t.ID, t.FileID))
+ path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-health-%v-%v.mkv", t.ID, t.FileID))
// Set ffmpeg input path
if len(t.FfmpegCommand.InputFiles) == 0 {
diff --git a/task/task.go b/task/task.go
index 6e2ad61..f6c687c 100644
--- a/task/task.go
+++ b/task/task.go
@@ -17,6 +17,11 @@ func StartTask(conf config.Config, data types.TaskStart) error {
taskMutex.Lock()
defer taskMutex.Unlock()
+ _, ok := tasks[data.ID]
+ if ok {
+ return constants.ErrTaskIsAlreadyRunning
+ }
+
tasks[data.ID] = &types.Task{
ID: data.ID,
Type: data.Type,
diff --git a/task/transcode.go b/task/transcode.go
index 3a1e863..9d6a143 100644
--- a/task/transcode.go
+++ b/task/transcode.go
@@ -17,8 +17,8 @@ func RunTranscode(conf config.Config, t *types.Task, data types.TranscodeData) {
l := log.GetTaskLogger(t)
// TODO Figure out how to get correct file ending
- src_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("src-%v-%v.mkv", t.ID, t.FileID))
- dst_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("dst-%v-%v.mkv", t.ID, t.FileID))
+ src_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-src-%v-%v.mkv", t.ID, t.FileID))
+ dst_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-dst-%v-%v.mkv", t.ID, t.FileID))
// Set ffmpeg input path
if len(t.FfmpegCommand.InputFiles) == 0 {
diff --git a/tmpl/tasks.tmpl b/tmpl/tasks.tmpl
index 647bca2..6220bcd 100644
--- a/tmpl/tasks.tmpl
+++ b/tmpl/tasks.tmpl
@@ -15,6 +15,14 @@
+
+