Compare commits
10 commits
4a2d3f0357
...
24e11d9103
Author | SHA1 | Date | |
---|---|---|---|
24e11d9103 | |||
78ab8c9daf | |||
7574628a1d | |||
a9d901da71 | |||
c37c26908d | |||
b99a818040 | |||
e8f7521fa4 | |||
265b2c1633 | |||
a45becb2ab | |||
9e8c5545cc |
14 changed files with 103 additions and 25 deletions
|
@ -1,26 +1,13 @@
|
||||||
on: [push]
|
on: [push]
|
||||||
jobs:
|
jobs:
|
||||||
test:
|
|
||||||
runs-on: docker
|
|
||||||
steps:
|
|
||||||
- uses: actions/checkout@v4
|
|
||||||
- uses: actions/setup-go@v5
|
|
||||||
with:
|
|
||||||
go-version: '1.22'
|
|
||||||
- run: go build
|
|
||||||
- uses: forgejo/upload-artifact@v3
|
|
||||||
with:
|
|
||||||
name: morffix
|
|
||||||
path: morffix
|
|
||||||
release:
|
release:
|
||||||
runs-on: docker
|
runs-on: docker
|
||||||
if: github.event_name == 'push' && contains(github.ref, 'refs/tags/')
|
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v4
|
- uses: actions/checkout@v4
|
||||||
- uses: actions/setup-go@v5
|
- uses: actions/setup-go@v5
|
||||||
with:
|
with:
|
||||||
go-version: '1.22'
|
go-version: '1.22'
|
||||||
- run: go build
|
- run: GOOS=linux GOARCH=amd64 go build
|
||||||
- run: mkdir out
|
- run: mkdir out
|
||||||
- run: cp morffix out/morffix
|
- run: cp morffix out/morffix
|
||||||
- uses: actions/forgejo-release@eb0fcc44a150c0de82e6fdb36752dd56bf27d017
|
- uses: actions/forgejo-release@eb0fcc44a150c0de82e6fdb36752dd56bf27d017
|
||||||
|
|
|
@ -1,2 +1,2 @@
|
||||||
# Morffix
|
# Morffix
|
||||||
Morph and fix you Media distributedly with ffmpeg
|
Morph and fix your Media distributedly with ffmpeg
|
||||||
|
|
|
@ -3,3 +3,5 @@ package constants
|
||||||
import "fmt"
|
import "fmt"
|
||||||
|
|
||||||
var ErrTaskDoesNotExist = fmt.Errorf("Task does not Exist")
|
var ErrTaskDoesNotExist = fmt.Errorf("Task does not Exist")
|
||||||
|
var ErrTaskIsAlreadyRunning = fmt.Errorf("Task is Already Running")
|
||||||
|
var ErrRPCRequestTimeout = fmt.Errorf("Request timed out")
|
||||||
|
|
2
migrations/000015_alter_tasks_table_log_offset.down.sql
Normal file
2
migrations/000015_alter_tasks_table_log_offset.down.sql
Normal file
|
@ -0,0 +1,2 @@
|
||||||
|
ALTER TABLE tasks
|
||||||
|
DROP IF EXISTS log_offset;
|
2
migrations/000015_alter_tasks_table_log_offset.up.sql
Normal file
2
migrations/000015_alter_tasks_table_log_offset.up.sql
Normal file
|
@ -0,0 +1,2 @@
|
||||||
|
ALTER TABLE tasks
|
||||||
|
ADD log_offset integer NOT NULL DEFAULT COALESCE(CARDINALITY(log),0);
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"git.lastassault.de/speatzle/morffix/constants"
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"nhooyr.io/websocket"
|
"nhooyr.io/websocket"
|
||||||
)
|
)
|
||||||
|
@ -89,7 +90,7 @@ func (s *server) Call(ctx context.Context, c *websocket.Conn, method string, par
|
||||||
|
|
||||||
// remove request from map
|
// remove request from map
|
||||||
delete(s.requests, id)
|
delete(s.requests, id)
|
||||||
return nil, fmt.Errorf("Request timed out")
|
return nil, constants.ErrRPCRequestTimeout
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -4,11 +4,13 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"net/http"
|
"net/http"
|
||||||
"slices"
|
"slices"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.lastassault.de/speatzle/morffix/constants"
|
"git.lastassault.de/speatzle/morffix/constants"
|
||||||
|
@ -358,7 +360,7 @@ func assignQueuedTasks(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
if Workers[i].Connected {
|
if Workers[i].Connected {
|
||||||
var count int
|
var count int
|
||||||
err := db.QueryRow(ctx, "SELECT COUNT(*) FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4)", i, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING).Scan(&count)
|
err := db.QueryRow(ctx, "SELECT COUNT(*) FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4 OR status = $5)", i, constants.TASK_STATUS_UNKNOWN, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING).Scan(&count)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Error Querying Worker Task Count: %w", err)
|
return fmt.Errorf("Error Querying Worker Task Count: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -387,7 +389,23 @@ func assignQueuedTasks(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = rpcServer.Call(ctx, Workers[i].Conn, "task-start", taskStart, nil)
|
_, err = rpcServer.Call(ctx, Workers[i].Conn, "task-start", taskStart, nil)
|
||||||
if err != nil {
|
if strings.HasSuffix(err.Error(), constants.ErrTaskIsAlreadyRunning.Error()) {
|
||||||
|
// Task was started previously but something went wrong and we are out of sync
|
||||||
|
slog.WarnContext(ctx, "Task is apparently already Running on this Worker, thats bad", "task_id", taskStart.ID, "worker", Workers[i].Name)
|
||||||
|
|
||||||
|
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_ASSIGNED, []string{fmt.Sprintf("%v MASTER: Task Start, Task Already Running!", time.Now())})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Updating Task status during already running error: %w", err)
|
||||||
|
}
|
||||||
|
} else if errors.Is(err, constants.ErrRPCRequestTimeout) {
|
||||||
|
// We really don't know whats going on, might be slow response, oom, disk full or a bug
|
||||||
|
slog.WarnContext(ctx, "Task start Timed Out", "task_id", taskStart.ID, "worker", Workers[i].Name)
|
||||||
|
|
||||||
|
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Task Start RPC Call Timed Out!", time.Now())})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Updating Unknown Task Status due to Timeout while starting Task: %w", err)
|
||||||
|
}
|
||||||
|
} else if err != nil {
|
||||||
return fmt.Errorf("Error Starting Task: %w", err)
|
return fmt.Errorf("Error Starting Task: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,6 +2,8 @@ package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
|
@ -180,7 +182,7 @@ func updateWorkerTaskStatus(ctx context.Context) {
|
||||||
if Workers[uuid].Connected {
|
if Workers[uuid].Connected {
|
||||||
w := Workers[uuid]
|
w := Workers[uuid]
|
||||||
|
|
||||||
rows, err := db.Query(ctx, "SELECT id, COALESCE(CARDINALITY(log),0) as log_offset FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4)", uuid, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING)
|
rows, err := db.Query(ctx, "SELECT id, log_offset FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4 OR status = $5)", uuid, constants.TASK_STATUS_UNKNOWN, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.ErrorContext(ctx, "Error Getting Tasks for Worker", "err", err, "worker_id", uuid)
|
slog.ErrorContext(ctx, "Error Getting Tasks for Worker", "err", err, "worker_id", uuid)
|
||||||
return
|
return
|
||||||
|
@ -207,7 +209,7 @@ func updateWorkerTaskStatus(ctx context.Context) {
|
||||||
if strings.HasSuffix(err.Error(), constants.ErrTaskDoesNotExist.Error()) {
|
if strings.HasSuffix(err.Error(), constants.ErrTaskDoesNotExist.Error()) {
|
||||||
// Worker says it does not know of this task, mark it failed so that we don't asks the worker about it again and again
|
// Worker says it does not know of this task, mark it failed so that we don't asks the worker about it again and again
|
||||||
slog.ErrorContext(ctx, "Task is unknown by worker, Failing...", "err", err, "id", taskID)
|
slog.ErrorContext(ctx, "Task is unknown by worker, Failing...", "err", err, "id", taskID)
|
||||||
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_FAILED, []string{"Task Failed because it is unknown to Assigned Worker"})
|
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_FAILED, []string{fmt.Sprintf("%v MASTER: Task Failed because it is unknown to Assigned Worker", time.Now())})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.ErrorContext(ctx, "Updating Failed Task Status", "err", err)
|
slog.ErrorContext(ctx, "Updating Failed Task Status", "err", err)
|
||||||
return
|
return
|
||||||
|
@ -215,10 +217,17 @@ func updateWorkerTaskStatus(ctx context.Context) {
|
||||||
slog.Info("Updating task done", "id", taskID, "status", constants.TASK_STATUS_FAILED)
|
slog.Info("Updating task done", "id", taskID, "status", constants.TASK_STATUS_FAILED)
|
||||||
|
|
||||||
return
|
return
|
||||||
|
} else if errors.Is(err, constants.ErrRPCRequestTimeout) {
|
||||||
|
// We really don't know whats going on, might be slow response, oom, disk full or a bug
|
||||||
|
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Status RPC Call Timed Out!", time.Now())})
|
||||||
|
if err != nil {
|
||||||
|
slog.ErrorContext(ctx, "Updating Unknown Task Status due to Timeout", "err", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
slog.ErrorContext(ctx, "Getting Task Status", "err", err)
|
slog.ErrorContext(ctx, "Getting Task Status", "err", err)
|
||||||
|
|
||||||
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2 WHERE id = $1", taskID, constants.TASK_STATUS_UNKNOWN)
|
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Status RPC Call Error: %v", time.Now(), err.Error())})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.ErrorContext(ctx, "Updating Unknown Task Status", "err", err)
|
slog.ErrorContext(ctx, "Updating Unknown Task Status", "err", err)
|
||||||
return
|
return
|
||||||
|
@ -227,7 +236,7 @@ func updateWorkerTaskStatus(ctx context.Context) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, ts.Task.Status, ts.Task.Log)
|
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3, log_offset = log_offset + $4 WHERE id = $1", taskID, ts.Task.Status, ts.Task.Log, len(ts.Task.Log))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.ErrorContext(ctx, "Updating Task Status", "err", err)
|
slog.ErrorContext(ctx, "Updating Task Status", "err", err)
|
||||||
return
|
return
|
||||||
|
|
18
systemd/morffix-worker.service
Normal file
18
systemd/morffix-worker.service
Normal file
|
@ -0,0 +1,18 @@
|
||||||
|
[Unit]
|
||||||
|
Description=morffix worker
|
||||||
|
ConditionPathExists=/opt/morffix/morffix
|
||||||
|
After=network.target
|
||||||
|
Wants=morffix.service
|
||||||
|
|
||||||
|
[Service]
|
||||||
|
Type=simple
|
||||||
|
User=root
|
||||||
|
LimitNOFILE=1024
|
||||||
|
Restart=on-failure
|
||||||
|
RestartSec=10
|
||||||
|
StartLimitIntervalSec=60
|
||||||
|
WorkingDirectory=/opt/morffix
|
||||||
|
ExecStart=/opt/morffix/morffix
|
||||||
|
|
||||||
|
[Install]
|
||||||
|
WantedBy=multi-user.target
|
19
systemd/morffix.service
Normal file
19
systemd/morffix.service
Normal file
|
@ -0,0 +1,19 @@
|
||||||
|
[Unit]
|
||||||
|
Description=morffix
|
||||||
|
ConditionPathExists=/opt/morffix/morffix
|
||||||
|
After=network.target
|
||||||
|
Wants=postgresql@15-main.service
|
||||||
|
|
||||||
|
[Service]
|
||||||
|
Type=simple
|
||||||
|
User=root
|
||||||
|
Group=media_library
|
||||||
|
LimitNOFILE=1024
|
||||||
|
Restart=on-failure
|
||||||
|
RestartSec=10
|
||||||
|
StartLimitIntervalSec=60
|
||||||
|
WorkingDirectory=/opt/morffix
|
||||||
|
ExecStart=/opt/morffix/morffix -server
|
||||||
|
|
||||||
|
[Install]
|
||||||
|
WantedBy=multi-user.target
|
|
@ -17,7 +17,7 @@ func RunHealthCheck(conf config.Config, t *types.Task, data types.HealthCheckDat
|
||||||
l := log.GetTaskLogger(t)
|
l := log.GetTaskLogger(t)
|
||||||
|
|
||||||
// TODO Figure out how to get correct file ending
|
// TODO Figure out how to get correct file ending
|
||||||
path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("%v-%v.mkv", t.ID, t.FileID))
|
path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-health-%v-%v.mkv", t.ID, t.FileID))
|
||||||
|
|
||||||
// Set ffmpeg input path
|
// Set ffmpeg input path
|
||||||
if len(t.FfmpegCommand.InputFiles) == 0 {
|
if len(t.FfmpegCommand.InputFiles) == 0 {
|
||||||
|
|
|
@ -17,6 +17,11 @@ func StartTask(conf config.Config, data types.TaskStart) error {
|
||||||
taskMutex.Lock()
|
taskMutex.Lock()
|
||||||
defer taskMutex.Unlock()
|
defer taskMutex.Unlock()
|
||||||
|
|
||||||
|
_, ok := tasks[data.ID]
|
||||||
|
if ok {
|
||||||
|
return constants.ErrTaskIsAlreadyRunning
|
||||||
|
}
|
||||||
|
|
||||||
tasks[data.ID] = &types.Task{
|
tasks[data.ID] = &types.Task{
|
||||||
ID: data.ID,
|
ID: data.ID,
|
||||||
Type: data.Type,
|
Type: data.Type,
|
||||||
|
|
|
@ -17,8 +17,8 @@ func RunTranscode(conf config.Config, t *types.Task, data types.TranscodeData) {
|
||||||
l := log.GetTaskLogger(t)
|
l := log.GetTaskLogger(t)
|
||||||
|
|
||||||
// TODO Figure out how to get correct file ending
|
// TODO Figure out how to get correct file ending
|
||||||
src_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("src-%v-%v.mkv", t.ID, t.FileID))
|
src_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-src-%v-%v.mkv", t.ID, t.FileID))
|
||||||
dst_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("dst-%v-%v.mkv", t.ID, t.FileID))
|
dst_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-dst-%v-%v.mkv", t.ID, t.FileID))
|
||||||
|
|
||||||
// Set ffmpeg input path
|
// Set ffmpeg input path
|
||||||
if len(t.FfmpegCommand.InputFiles) == 0 {
|
if len(t.FfmpegCommand.InputFiles) == 0 {
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"path/filepath"
|
||||||
|
|
||||||
"git.lastassault.de/speatzle/morffix/rpc"
|
"git.lastassault.de/speatzle/morffix/rpc"
|
||||||
|
|
||||||
|
@ -36,6 +37,20 @@ func Start(_conf config.Config) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
slog.InfoContext(ctx, "Cleaning tmp Files...")
|
||||||
|
files, err := filepath.Glob("/tmp/morffix-*")
|
||||||
|
if err != nil {
|
||||||
|
slog.Error("Get tmp Files", "err", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for _, f := range files {
|
||||||
|
slog.InfoContext(ctx, "Deleting File", "path", f)
|
||||||
|
if err := os.Remove(f); err != nil {
|
||||||
|
slog.Error("Deleting tmp File", "err", err, "path", f)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
sigs := make(chan os.Signal, 1)
|
sigs := make(chan os.Signal, 1)
|
||||||
signal.Notify(sigs, os.Interrupt)
|
signal.Notify(sigs, os.Interrupt)
|
||||||
exit := false
|
exit := false
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue