Compare commits

..

No commits in common. "24e11d9103a7c31bf6c70df84ad43dff55c2cd36" and "4a2d3f03576d1b7df5f3ad736aeb624a2b9574ac" have entirely different histories.

14 changed files with 26 additions and 104 deletions

View file

@ -1,13 +1,26 @@
on: [push] on: [push]
jobs: jobs:
release: test:
runs-on: docker runs-on: docker
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
- uses: actions/setup-go@v5 - uses: actions/setup-go@v5
with: with:
go-version: '1.22' go-version: '1.22'
- run: GOOS=linux GOARCH=amd64 go build - run: go build
- uses: forgejo/upload-artifact@v3
with:
name: morffix
path: morffix
release:
runs-on: docker
if: github.event_name == 'push' && contains(github.ref, 'refs/tags/')
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version: '1.22'
- run: go build
- run: mkdir out - run: mkdir out
- run: cp morffix out/morffix - run: cp morffix out/morffix
- uses: actions/forgejo-release@eb0fcc44a150c0de82e6fdb36752dd56bf27d017 - uses: actions/forgejo-release@eb0fcc44a150c0de82e6fdb36752dd56bf27d017

View file

@ -1,2 +1,2 @@
# Morffix # Morffix
Morph and fix your Media distributedly with ffmpeg Morph and fix you Media distributedly with ffmpeg

View file

@ -3,5 +3,3 @@ package constants
import "fmt" import "fmt"
var ErrTaskDoesNotExist = fmt.Errorf("Task does not Exist") var ErrTaskDoesNotExist = fmt.Errorf("Task does not Exist")
var ErrTaskIsAlreadyRunning = fmt.Errorf("Task is Already Running")
var ErrRPCRequestTimeout = fmt.Errorf("Request timed out")

View file

@ -1,2 +0,0 @@
ALTER TABLE tasks
DROP IF EXISTS log_offset;

View file

@ -1,2 +0,0 @@
ALTER TABLE tasks
ADD log_offset integer NOT NULL DEFAULT COALESCE(CARDINALITY(log),0);

View file

@ -6,7 +6,6 @@ import (
"fmt" "fmt"
"time" "time"
"git.lastassault.de/speatzle/morffix/constants"
"github.com/google/uuid" "github.com/google/uuid"
"nhooyr.io/websocket" "nhooyr.io/websocket"
) )
@ -90,7 +89,7 @@ func (s *server) Call(ctx context.Context, c *websocket.Conn, method string, par
// remove request from map // remove request from map
delete(s.requests, id) delete(s.requests, id)
return nil, constants.ErrRPCRequestTimeout return nil, fmt.Errorf("Request timed out")
} }
} }

View file

@ -4,13 +4,11 @@ import (
"bytes" "bytes"
"context" "context"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"log/slog" "log/slog"
"net/http" "net/http"
"slices" "slices"
"strconv" "strconv"
"strings"
"time" "time"
"git.lastassault.de/speatzle/morffix/constants" "git.lastassault.de/speatzle/morffix/constants"
@ -360,7 +358,7 @@ func assignQueuedTasks(ctx context.Context) error {
} }
if Workers[i].Connected { if Workers[i].Connected {
var count int var count int
err := db.QueryRow(ctx, "SELECT COUNT(*) FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4 OR status = $5)", i, constants.TASK_STATUS_UNKNOWN, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING).Scan(&count) err := db.QueryRow(ctx, "SELECT COUNT(*) FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4)", i, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING).Scan(&count)
if err != nil { if err != nil {
return fmt.Errorf("Error Querying Worker Task Count: %w", err) return fmt.Errorf("Error Querying Worker Task Count: %w", err)
} }
@ -389,23 +387,7 @@ func assignQueuedTasks(ctx context.Context) error {
} }
_, err = rpcServer.Call(ctx, Workers[i].Conn, "task-start", taskStart, nil) _, err = rpcServer.Call(ctx, Workers[i].Conn, "task-start", taskStart, nil)
if strings.HasSuffix(err.Error(), constants.ErrTaskIsAlreadyRunning.Error()) { if err != nil {
// Task was started previously but something went wrong and we are out of sync
slog.WarnContext(ctx, "Task is apparently already Running on this Worker, thats bad", "task_id", taskStart.ID, "worker", Workers[i].Name)
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_ASSIGNED, []string{fmt.Sprintf("%v MASTER: Task Start, Task Already Running!", time.Now())})
if err != nil {
return fmt.Errorf("Updating Task status during already running error: %w", err)
}
} else if errors.Is(err, constants.ErrRPCRequestTimeout) {
// We really don't know whats going on, might be slow response, oom, disk full or a bug
slog.WarnContext(ctx, "Task start Timed Out", "task_id", taskStart.ID, "worker", Workers[i].Name)
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Task Start RPC Call Timed Out!", time.Now())})
if err != nil {
return fmt.Errorf("Updating Unknown Task Status due to Timeout while starting Task: %w", err)
}
} else if err != nil {
return fmt.Errorf("Error Starting Task: %w", err) return fmt.Errorf("Error Starting Task: %w", err)
} }

View file

@ -2,8 +2,6 @@ package server
import ( import (
"context" "context"
"errors"
"fmt"
"log/slog" "log/slog"
"net/http" "net/http"
"strings" "strings"
@ -182,7 +180,7 @@ func updateWorkerTaskStatus(ctx context.Context) {
if Workers[uuid].Connected { if Workers[uuid].Connected {
w := Workers[uuid] w := Workers[uuid]
rows, err := db.Query(ctx, "SELECT id, log_offset FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4 OR status = $5)", uuid, constants.TASK_STATUS_UNKNOWN, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING) rows, err := db.Query(ctx, "SELECT id, COALESCE(CARDINALITY(log),0) as log_offset FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4)", uuid, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING)
if err != nil { if err != nil {
slog.ErrorContext(ctx, "Error Getting Tasks for Worker", "err", err, "worker_id", uuid) slog.ErrorContext(ctx, "Error Getting Tasks for Worker", "err", err, "worker_id", uuid)
return return
@ -209,7 +207,7 @@ func updateWorkerTaskStatus(ctx context.Context) {
if strings.HasSuffix(err.Error(), constants.ErrTaskDoesNotExist.Error()) { if strings.HasSuffix(err.Error(), constants.ErrTaskDoesNotExist.Error()) {
// Worker says it does not know of this task, mark it failed so that we don't asks the worker about it again and again // Worker says it does not know of this task, mark it failed so that we don't asks the worker about it again and again
slog.ErrorContext(ctx, "Task is unknown by worker, Failing...", "err", err, "id", taskID) slog.ErrorContext(ctx, "Task is unknown by worker, Failing...", "err", err, "id", taskID)
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_FAILED, []string{fmt.Sprintf("%v MASTER: Task Failed because it is unknown to Assigned Worker", time.Now())}) _, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_FAILED, []string{"Task Failed because it is unknown to Assigned Worker"})
if err != nil { if err != nil {
slog.ErrorContext(ctx, "Updating Failed Task Status", "err", err) slog.ErrorContext(ctx, "Updating Failed Task Status", "err", err)
return return
@ -217,17 +215,10 @@ func updateWorkerTaskStatus(ctx context.Context) {
slog.Info("Updating task done", "id", taskID, "status", constants.TASK_STATUS_FAILED) slog.Info("Updating task done", "id", taskID, "status", constants.TASK_STATUS_FAILED)
return return
} else if errors.Is(err, constants.ErrRPCRequestTimeout) {
// We really don't know whats going on, might be slow response, oom, disk full or a bug
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Status RPC Call Timed Out!", time.Now())})
if err != nil {
slog.ErrorContext(ctx, "Updating Unknown Task Status due to Timeout", "err", err)
return
}
} else { } else {
slog.ErrorContext(ctx, "Getting Task Status", "err", err) slog.ErrorContext(ctx, "Getting Task Status", "err", err)
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Status RPC Call Error: %v", time.Now(), err.Error())}) _, err = db.Exec(ctx, "UPDATE tasks SET status = $2 WHERE id = $1", taskID, constants.TASK_STATUS_UNKNOWN)
if err != nil { if err != nil {
slog.ErrorContext(ctx, "Updating Unknown Task Status", "err", err) slog.ErrorContext(ctx, "Updating Unknown Task Status", "err", err)
return return
@ -236,7 +227,7 @@ func updateWorkerTaskStatus(ctx context.Context) {
} }
} }
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3, log_offset = log_offset + $4 WHERE id = $1", taskID, ts.Task.Status, ts.Task.Log, len(ts.Task.Log)) _, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, ts.Task.Status, ts.Task.Log)
if err != nil { if err != nil {
slog.ErrorContext(ctx, "Updating Task Status", "err", err) slog.ErrorContext(ctx, "Updating Task Status", "err", err)
return return

View file

@ -1,18 +0,0 @@
[Unit]
Description=morffix worker
ConditionPathExists=/opt/morffix/morffix
After=network.target
Wants=morffix.service
[Service]
Type=simple
User=root
LimitNOFILE=1024
Restart=on-failure
RestartSec=10
StartLimitIntervalSec=60
WorkingDirectory=/opt/morffix
ExecStart=/opt/morffix/morffix
[Install]
WantedBy=multi-user.target

View file

@ -1,19 +0,0 @@
[Unit]
Description=morffix
ConditionPathExists=/opt/morffix/morffix
After=network.target
Wants=postgresql@15-main.service
[Service]
Type=simple
User=root
Group=media_library
LimitNOFILE=1024
Restart=on-failure
RestartSec=10
StartLimitIntervalSec=60
WorkingDirectory=/opt/morffix
ExecStart=/opt/morffix/morffix -server
[Install]
WantedBy=multi-user.target

View file

@ -17,7 +17,7 @@ func RunHealthCheck(conf config.Config, t *types.Task, data types.HealthCheckDat
l := log.GetTaskLogger(t) l := log.GetTaskLogger(t)
// TODO Figure out how to get correct file ending // TODO Figure out how to get correct file ending
path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-health-%v-%v.mkv", t.ID, t.FileID)) path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("%v-%v.mkv", t.ID, t.FileID))
// Set ffmpeg input path // Set ffmpeg input path
if len(t.FfmpegCommand.InputFiles) == 0 { if len(t.FfmpegCommand.InputFiles) == 0 {

View file

@ -17,11 +17,6 @@ func StartTask(conf config.Config, data types.TaskStart) error {
taskMutex.Lock() taskMutex.Lock()
defer taskMutex.Unlock() defer taskMutex.Unlock()
_, ok := tasks[data.ID]
if ok {
return constants.ErrTaskIsAlreadyRunning
}
tasks[data.ID] = &types.Task{ tasks[data.ID] = &types.Task{
ID: data.ID, ID: data.ID,
Type: data.Type, Type: data.Type,

View file

@ -17,8 +17,8 @@ func RunTranscode(conf config.Config, t *types.Task, data types.TranscodeData) {
l := log.GetTaskLogger(t) l := log.GetTaskLogger(t)
// TODO Figure out how to get correct file ending // TODO Figure out how to get correct file ending
src_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-src-%v-%v.mkv", t.ID, t.FileID)) src_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("src-%v-%v.mkv", t.ID, t.FileID))
dst_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-dst-%v-%v.mkv", t.ID, t.FileID)) dst_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("dst-%v-%v.mkv", t.ID, t.FileID))
// Set ffmpeg input path // Set ffmpeg input path
if len(t.FfmpegCommand.InputFiles) == 0 { if len(t.FfmpegCommand.InputFiles) == 0 {

View file

@ -5,7 +5,6 @@ import (
"io" "io"
"log/slog" "log/slog"
"net/http" "net/http"
"path/filepath"
"git.lastassault.de/speatzle/morffix/rpc" "git.lastassault.de/speatzle/morffix/rpc"
@ -37,20 +36,6 @@ func Start(_conf config.Config) {
return return
} }
slog.InfoContext(ctx, "Cleaning tmp Files...")
files, err := filepath.Glob("/tmp/morffix-*")
if err != nil {
slog.Error("Get tmp Files", "err", err)
return
}
for _, f := range files {
slog.InfoContext(ctx, "Deleting File", "path", f)
if err := os.Remove(f); err != nil {
slog.Error("Deleting tmp File", "err", err, "path", f)
return
}
}
sigs := make(chan os.Signal, 1) sigs := make(chan os.Signal, 1)
signal.Notify(sigs, os.Interrupt) signal.Notify(sigs, os.Interrupt)
exit := false exit := false