Compare commits
1 commit
b7f0ccc8a6
...
3ec0e28272
Author | SHA1 | Date | |
---|---|---|---|
![]() |
3ec0e28272 |
15 changed files with 12 additions and 142 deletions
|
@ -1,23 +0,0 @@
|
||||||
on: [push]
|
|
||||||
jobs:
|
|
||||||
release:
|
|
||||||
runs-on: docker
|
|
||||||
steps:
|
|
||||||
- uses: actions/checkout@v4
|
|
||||||
- uses: actions/setup-go@v5
|
|
||||||
with:
|
|
||||||
go-version: '1.22'
|
|
||||||
- run: GOOS=linux GOARCH=amd64 go build
|
|
||||||
- run: mkdir out
|
|
||||||
- run: cp morffix out/morffix
|
|
||||||
- uses: actions/forgejo-release@eb0fcc44a150c0de82e6fdb36752dd56bf27d017
|
|
||||||
with:
|
|
||||||
direction: upload
|
|
||||||
url: https://git.soontm.de
|
|
||||||
release-dir: out
|
|
||||||
release-notes: "New Release"
|
|
||||||
token: ${{ secrets.GITHUB_TOKEN }}
|
|
||||||
override: true
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,2 +1,2 @@
|
||||||
# Morffix
|
# Morffix
|
||||||
Morph and fix your Media distributedly with ffmpeg
|
Morph and fix you Media distributedly with ffmpeg
|
||||||
|
|
|
@ -3,5 +3,3 @@ package constants
|
||||||
import "fmt"
|
import "fmt"
|
||||||
|
|
||||||
var ErrTaskDoesNotExist = fmt.Errorf("Task does not Exist")
|
var ErrTaskDoesNotExist = fmt.Errorf("Task does not Exist")
|
||||||
var ErrTaskIsAlreadyRunning = fmt.Errorf("Task is Already Running")
|
|
||||||
var ErrRPCRequestTimeout = fmt.Errorf("Request timed out")
|
|
||||||
|
|
|
@ -1,2 +0,0 @@
|
||||||
ALTER TABLE tasks
|
|
||||||
DROP IF EXISTS log_offset;
|
|
|
@ -1,2 +0,0 @@
|
||||||
ALTER TABLE tasks
|
|
||||||
ADD log_offset integer NOT NULL DEFAULT 0;
|
|
|
@ -6,7 +6,6 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.lastassault.de/speatzle/morffix/constants"
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"nhooyr.io/websocket"
|
"nhooyr.io/websocket"
|
||||||
)
|
)
|
||||||
|
@ -90,7 +89,7 @@ func (s *server) Call(ctx context.Context, c *websocket.Conn, method string, par
|
||||||
|
|
||||||
// remove request from map
|
// remove request from map
|
||||||
delete(s.requests, id)
|
delete(s.requests, id)
|
||||||
return nil, constants.ErrRPCRequestTimeout
|
return nil, fmt.Errorf("Request timed out")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -4,13 +4,11 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"net/http"
|
"net/http"
|
||||||
"slices"
|
"slices"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.lastassault.de/speatzle/morffix/constants"
|
"git.lastassault.de/speatzle/morffix/constants"
|
||||||
|
@ -260,7 +258,6 @@ func createTask(ctx context.Context, r *http.Request) error {
|
||||||
}
|
}
|
||||||
library := r.FormValue("library")
|
library := r.FormValue("library")
|
||||||
health := r.FormValue("health")
|
health := r.FormValue("health")
|
||||||
transcode := r.FormValue("transcode")
|
|
||||||
typ, err := strconv.Atoi(r.FormValue("type"))
|
typ, err := strconv.Atoi(r.FormValue("type"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Parsing Task Type: %w", err)
|
return fmt.Errorf("Parsing Task Type: %w", err)
|
||||||
|
@ -272,7 +269,7 @@ func createTask(ctx context.Context, r *http.Request) error {
|
||||||
|
|
||||||
slog.Info("Got Task Create", "library", library, "health", health, "type", typ)
|
slog.Info("Got Task Create", "library", library, "health", health, "type", typ)
|
||||||
|
|
||||||
rows, err := db.Query(r.Context(), "SELECT id, path, size, status, health, transcode, md5, updated_at FROM files where library_id = $1 AND status = $2 AND (-1 = $3 OR health = $3) AND (-1 = $4 OR transcode = $4)", library, constants.FILE_STATUS_EXISTS, health, transcode)
|
rows, err := db.Query(r.Context(), "SELECT id, path, size, status, health, transcode, md5, updated_at FROM files where library_id = $1 AND status = $2 AND (-1 = $3 OR health = $3)", library, constants.FILE_STATUS_EXISTS, health)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Query Files: %w", err)
|
return fmt.Errorf("Query Files: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -361,7 +358,7 @@ func assignQueuedTasks(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
if Workers[i].Connected {
|
if Workers[i].Connected {
|
||||||
var count int
|
var count int
|
||||||
err := db.QueryRow(ctx, "SELECT COUNT(*) FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4 OR status = $5)", i, constants.TASK_STATUS_UNKNOWN, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING).Scan(&count)
|
err := db.QueryRow(ctx, "SELECT COUNT(*) FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4)", i, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING).Scan(&count)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Error Querying Worker Task Count: %w", err)
|
return fmt.Errorf("Error Querying Worker Task Count: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -391,30 +388,7 @@ func assignQueuedTasks(ctx context.Context) error {
|
||||||
|
|
||||||
_, err = rpcServer.Call(ctx, Workers[i].Conn, "task-start", taskStart, nil)
|
_, err = rpcServer.Call(ctx, Workers[i].Conn, "task-start", taskStart, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if strings.HasSuffix(err.Error(), constants.ErrTaskIsAlreadyRunning.Error()) {
|
return fmt.Errorf("Error Starting Task: %w", err)
|
||||||
// Task was started previously but something went wrong and we are out of sync
|
|
||||||
slog.WarnContext(ctx, "Task is apparently already Running on this Worker, thats bad", "task_id", taskStart.ID, "worker", Workers[i].Name)
|
|
||||||
|
|
||||||
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_ASSIGNED, []string{fmt.Sprintf("%v MASTER: Task Start, Task Already Running!", time.Now())})
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("Updating Task status during already running error: %w", err)
|
|
||||||
}
|
|
||||||
} else if errors.Is(err, constants.ErrRPCRequestTimeout) {
|
|
||||||
// We really don't know whats going on, might be slow response, oom, disk full or a bug
|
|
||||||
slog.WarnContext(ctx, "Task start Timed Out", "task_id", taskStart.ID, "worker", Workers[i].Name)
|
|
||||||
|
|
||||||
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Task Start RPC Call Timed Out!", time.Now())})
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("Updating Unknown Task Status due to Timeout while starting Task: %w", err)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
slog.ErrorContext(ctx, "Task start Error", "task_id", taskStart.ID, "worker", Workers[i].Name)
|
|
||||||
|
|
||||||
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Task Start Error: %v", time.Now(), err.Error())})
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("Updating Unknown Task Status due to Error while starting Task: %w", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
err = tx.Commit(ctx)
|
err = tx.Commit(ctx)
|
||||||
|
|
|
@ -2,8 +2,6 @@ package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
|
@ -182,7 +180,7 @@ func updateWorkerTaskStatus(ctx context.Context) {
|
||||||
if Workers[uuid].Connected {
|
if Workers[uuid].Connected {
|
||||||
w := Workers[uuid]
|
w := Workers[uuid]
|
||||||
|
|
||||||
rows, err := db.Query(ctx, "SELECT id, log_offset FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4 OR status = $5)", uuid, constants.TASK_STATUS_UNKNOWN, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING)
|
rows, err := db.Query(ctx, "SELECT id, COALESCE(CARDINALITY(log),0) as log_offset FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4)", uuid, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.ErrorContext(ctx, "Error Getting Tasks for Worker", "err", err, "worker_id", uuid)
|
slog.ErrorContext(ctx, "Error Getting Tasks for Worker", "err", err, "worker_id", uuid)
|
||||||
return
|
return
|
||||||
|
@ -209,7 +207,7 @@ func updateWorkerTaskStatus(ctx context.Context) {
|
||||||
if strings.HasSuffix(err.Error(), constants.ErrTaskDoesNotExist.Error()) {
|
if strings.HasSuffix(err.Error(), constants.ErrTaskDoesNotExist.Error()) {
|
||||||
// Worker says it does not know of this task, mark it failed so that we don't asks the worker about it again and again
|
// Worker says it does not know of this task, mark it failed so that we don't asks the worker about it again and again
|
||||||
slog.ErrorContext(ctx, "Task is unknown by worker, Failing...", "err", err, "id", taskID)
|
slog.ErrorContext(ctx, "Task is unknown by worker, Failing...", "err", err, "id", taskID)
|
||||||
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_FAILED, []string{fmt.Sprintf("%v MASTER: Task Failed because it is unknown to Assigned Worker", time.Now())})
|
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_FAILED, []string{"Task Failed because it is unknown to Assigned Worker"})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.ErrorContext(ctx, "Updating Failed Task Status", "err", err)
|
slog.ErrorContext(ctx, "Updating Failed Task Status", "err", err)
|
||||||
return
|
return
|
||||||
|
@ -217,17 +215,10 @@ func updateWorkerTaskStatus(ctx context.Context) {
|
||||||
slog.Info("Updating task done", "id", taskID, "status", constants.TASK_STATUS_FAILED)
|
slog.Info("Updating task done", "id", taskID, "status", constants.TASK_STATUS_FAILED)
|
||||||
|
|
||||||
return
|
return
|
||||||
} else if errors.Is(err, constants.ErrRPCRequestTimeout) {
|
|
||||||
// We really don't know whats going on, might be slow response, oom, disk full or a bug
|
|
||||||
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Status RPC Call Timed Out!", time.Now())})
|
|
||||||
if err != nil {
|
|
||||||
slog.ErrorContext(ctx, "Updating Unknown Task Status due to Timeout", "err", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
slog.ErrorContext(ctx, "Getting Task Status", "err", err)
|
slog.ErrorContext(ctx, "Getting Task Status", "err", err)
|
||||||
|
|
||||||
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Status RPC Call Error: %v", time.Now(), err.Error())})
|
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2 WHERE id = $1", taskID, constants.TASK_STATUS_UNKNOWN)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.ErrorContext(ctx, "Updating Unknown Task Status", "err", err)
|
slog.ErrorContext(ctx, "Updating Unknown Task Status", "err", err)
|
||||||
return
|
return
|
||||||
|
@ -236,7 +227,7 @@ func updateWorkerTaskStatus(ctx context.Context) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3, log_offset = log_offset + $4 WHERE id = $1", taskID, ts.Task.Status, ts.Task.Log, len(ts.Task.Log))
|
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, ts.Task.Status, ts.Task.Log)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.ErrorContext(ctx, "Updating Task Status", "err", err)
|
slog.ErrorContext(ctx, "Updating Task Status", "err", err)
|
||||||
return
|
return
|
||||||
|
|
|
@ -1,18 +0,0 @@
|
||||||
[Unit]
|
|
||||||
Description=morffix worker
|
|
||||||
ConditionPathExists=/opt/morffix/morffix
|
|
||||||
After=network.target
|
|
||||||
Wants=morffix.service
|
|
||||||
|
|
||||||
[Service]
|
|
||||||
Type=simple
|
|
||||||
User=root
|
|
||||||
LimitNOFILE=1024
|
|
||||||
Restart=on-failure
|
|
||||||
RestartSec=10
|
|
||||||
StartLimitIntervalSec=60
|
|
||||||
WorkingDirectory=/opt/morffix
|
|
||||||
ExecStart=/opt/morffix/morffix
|
|
||||||
|
|
||||||
[Install]
|
|
||||||
WantedBy=multi-user.target
|
|
|
@ -1,19 +0,0 @@
|
||||||
[Unit]
|
|
||||||
Description=morffix
|
|
||||||
ConditionPathExists=/opt/morffix/morffix
|
|
||||||
After=network.target
|
|
||||||
Wants=postgresql@15-main.service
|
|
||||||
|
|
||||||
[Service]
|
|
||||||
Type=simple
|
|
||||||
User=root
|
|
||||||
Group=media_library
|
|
||||||
LimitNOFILE=1024
|
|
||||||
Restart=on-failure
|
|
||||||
RestartSec=10
|
|
||||||
StartLimitIntervalSec=60
|
|
||||||
WorkingDirectory=/opt/morffix
|
|
||||||
ExecStart=/opt/morffix/morffix -server
|
|
||||||
|
|
||||||
[Install]
|
|
||||||
WantedBy=multi-user.target
|
|
|
@ -17,7 +17,7 @@ func RunHealthCheck(conf config.Config, t *types.Task, data types.HealthCheckDat
|
||||||
l := log.GetTaskLogger(t)
|
l := log.GetTaskLogger(t)
|
||||||
|
|
||||||
// TODO Figure out how to get correct file ending
|
// TODO Figure out how to get correct file ending
|
||||||
path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-health-%v-%v.mkv", t.ID, t.FileID))
|
path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("%v-%v.mkv", t.ID, t.FileID))
|
||||||
|
|
||||||
// Set ffmpeg input path
|
// Set ffmpeg input path
|
||||||
if len(t.FfmpegCommand.InputFiles) == 0 {
|
if len(t.FfmpegCommand.InputFiles) == 0 {
|
||||||
|
|
|
@ -17,11 +17,6 @@ func StartTask(conf config.Config, data types.TaskStart) error {
|
||||||
taskMutex.Lock()
|
taskMutex.Lock()
|
||||||
defer taskMutex.Unlock()
|
defer taskMutex.Unlock()
|
||||||
|
|
||||||
_, ok := tasks[data.ID]
|
|
||||||
if ok {
|
|
||||||
return constants.ErrTaskIsAlreadyRunning
|
|
||||||
}
|
|
||||||
|
|
||||||
tasks[data.ID] = &types.Task{
|
tasks[data.ID] = &types.Task{
|
||||||
ID: data.ID,
|
ID: data.ID,
|
||||||
Type: data.Type,
|
Type: data.Type,
|
||||||
|
|
|
@ -17,8 +17,8 @@ func RunTranscode(conf config.Config, t *types.Task, data types.TranscodeData) {
|
||||||
l := log.GetTaskLogger(t)
|
l := log.GetTaskLogger(t)
|
||||||
|
|
||||||
// TODO Figure out how to get correct file ending
|
// TODO Figure out how to get correct file ending
|
||||||
src_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-src-%v-%v.mkv", t.ID, t.FileID))
|
src_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("src-%v-%v.mkv", t.ID, t.FileID))
|
||||||
dst_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-dst-%v-%v.mkv", t.ID, t.FileID))
|
dst_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("dst-%v-%v.mkv", t.ID, t.FileID))
|
||||||
|
|
||||||
// Set ffmpeg input path
|
// Set ffmpeg input path
|
||||||
if len(t.FfmpegCommand.InputFiles) == 0 {
|
if len(t.FfmpegCommand.InputFiles) == 0 {
|
||||||
|
|
|
@ -15,14 +15,6 @@
|
||||||
<option value="1">Damaged</option>
|
<option value="1">Damaged</option>
|
||||||
<option value="2">Healthy</option>
|
<option value="2">Healthy</option>
|
||||||
</select>
|
</select>
|
||||||
<label for="transcode">Transcode Status</label>
|
|
||||||
<select id="transcode" name="transcode">
|
|
||||||
<option value="-1">All</option>
|
|
||||||
<option value="0">Unknown</option>
|
|
||||||
<option value="1">None</option>
|
|
||||||
<option value="2">Failed</option>
|
|
||||||
<option value="2">Success</option>
|
|
||||||
</select>
|
|
||||||
<label for="type">Type</label>
|
<label for="type">Type</label>
|
||||||
<select id="type" name="type">
|
<select id="type" name="type">
|
||||||
<option value="0">Health Check</option>
|
<option value="0">Health Check</option>
|
||||||
|
|
|
@ -5,7 +5,6 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"net/http"
|
"net/http"
|
||||||
"path/filepath"
|
|
||||||
|
|
||||||
"git.lastassault.de/speatzle/morffix/rpc"
|
"git.lastassault.de/speatzle/morffix/rpc"
|
||||||
|
|
||||||
|
@ -37,20 +36,6 @@ func Start(_conf config.Config) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
slog.InfoContext(ctx, "Cleaning tmp Files...")
|
|
||||||
files, err := filepath.Glob("/tmp/morffix-*")
|
|
||||||
if err != nil {
|
|
||||||
slog.Error("Get tmp Files", "err", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
for _, f := range files {
|
|
||||||
slog.InfoContext(ctx, "Deleting File", "path", f)
|
|
||||||
if err := os.Remove(f); err != nil {
|
|
||||||
slog.Error("Deleting tmp File", "err", err, "path", f)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
sigs := make(chan os.Signal, 1)
|
sigs := make(chan os.Signal, 1)
|
||||||
signal.Notify(sigs, os.Interrupt)
|
signal.Notify(sigs, os.Interrupt)
|
||||||
exit := false
|
exit := false
|
||||||
|
|
Loading…
Add table
Reference in a new issue