Compare commits
1 commit
b7f0ccc8a6
...
3ec0e28272
Author | SHA1 | Date | |
---|---|---|---|
![]() |
3ec0e28272 |
15 changed files with 12 additions and 142 deletions
|
@ -1,23 +0,0 @@
|
|||
on: [push]
|
||||
jobs:
|
||||
release:
|
||||
runs-on: docker
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/setup-go@v5
|
||||
with:
|
||||
go-version: '1.22'
|
||||
- run: GOOS=linux GOARCH=amd64 go build
|
||||
- run: mkdir out
|
||||
- run: cp morffix out/morffix
|
||||
- uses: actions/forgejo-release@eb0fcc44a150c0de82e6fdb36752dd56bf27d017
|
||||
with:
|
||||
direction: upload
|
||||
url: https://git.soontm.de
|
||||
release-dir: out
|
||||
release-notes: "New Release"
|
||||
token: ${{ secrets.GITHUB_TOKEN }}
|
||||
override: true
|
||||
|
||||
|
||||
|
|
@ -1,2 +1,2 @@
|
|||
# Morffix
|
||||
Morph and fix your Media distributedly with ffmpeg
|
||||
Morph and fix you Media distributedly with ffmpeg
|
||||
|
|
|
@ -3,5 +3,3 @@ package constants
|
|||
import "fmt"
|
||||
|
||||
var ErrTaskDoesNotExist = fmt.Errorf("Task does not Exist")
|
||||
var ErrTaskIsAlreadyRunning = fmt.Errorf("Task is Already Running")
|
||||
var ErrRPCRequestTimeout = fmt.Errorf("Request timed out")
|
||||
|
|
|
@ -1,2 +0,0 @@
|
|||
ALTER TABLE tasks
|
||||
DROP IF EXISTS log_offset;
|
|
@ -1,2 +0,0 @@
|
|||
ALTER TABLE tasks
|
||||
ADD log_offset integer NOT NULL DEFAULT 0;
|
|
@ -6,7 +6,6 @@ import (
|
|||
"fmt"
|
||||
"time"
|
||||
|
||||
"git.lastassault.de/speatzle/morffix/constants"
|
||||
"github.com/google/uuid"
|
||||
"nhooyr.io/websocket"
|
||||
)
|
||||
|
@ -90,7 +89,7 @@ func (s *server) Call(ctx context.Context, c *websocket.Conn, method string, par
|
|||
|
||||
// remove request from map
|
||||
delete(s.requests, id)
|
||||
return nil, constants.ErrRPCRequestTimeout
|
||||
return nil, fmt.Errorf("Request timed out")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -4,13 +4,11 @@ import (
|
|||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"slices"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"git.lastassault.de/speatzle/morffix/constants"
|
||||
|
@ -260,7 +258,6 @@ func createTask(ctx context.Context, r *http.Request) error {
|
|||
}
|
||||
library := r.FormValue("library")
|
||||
health := r.FormValue("health")
|
||||
transcode := r.FormValue("transcode")
|
||||
typ, err := strconv.Atoi(r.FormValue("type"))
|
||||
if err != nil {
|
||||
return fmt.Errorf("Parsing Task Type: %w", err)
|
||||
|
@ -272,7 +269,7 @@ func createTask(ctx context.Context, r *http.Request) error {
|
|||
|
||||
slog.Info("Got Task Create", "library", library, "health", health, "type", typ)
|
||||
|
||||
rows, err := db.Query(r.Context(), "SELECT id, path, size, status, health, transcode, md5, updated_at FROM files where library_id = $1 AND status = $2 AND (-1 = $3 OR health = $3) AND (-1 = $4 OR transcode = $4)", library, constants.FILE_STATUS_EXISTS, health, transcode)
|
||||
rows, err := db.Query(r.Context(), "SELECT id, path, size, status, health, transcode, md5, updated_at FROM files where library_id = $1 AND status = $2 AND (-1 = $3 OR health = $3)", library, constants.FILE_STATUS_EXISTS, health)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Query Files: %w", err)
|
||||
}
|
||||
|
@ -361,7 +358,7 @@ func assignQueuedTasks(ctx context.Context) error {
|
|||
}
|
||||
if Workers[i].Connected {
|
||||
var count int
|
||||
err := db.QueryRow(ctx, "SELECT COUNT(*) FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4 OR status = $5)", i, constants.TASK_STATUS_UNKNOWN, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING).Scan(&count)
|
||||
err := db.QueryRow(ctx, "SELECT COUNT(*) FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4)", i, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING).Scan(&count)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error Querying Worker Task Count: %w", err)
|
||||
}
|
||||
|
@ -391,30 +388,7 @@ func assignQueuedTasks(ctx context.Context) error {
|
|||
|
||||
_, err = rpcServer.Call(ctx, Workers[i].Conn, "task-start", taskStart, nil)
|
||||
if err != nil {
|
||||
if strings.HasSuffix(err.Error(), constants.ErrTaskIsAlreadyRunning.Error()) {
|
||||
// Task was started previously but something went wrong and we are out of sync
|
||||
slog.WarnContext(ctx, "Task is apparently already Running on this Worker, thats bad", "task_id", taskStart.ID, "worker", Workers[i].Name)
|
||||
|
||||
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_ASSIGNED, []string{fmt.Sprintf("%v MASTER: Task Start, Task Already Running!", time.Now())})
|
||||
if err != nil {
|
||||
return fmt.Errorf("Updating Task status during already running error: %w", err)
|
||||
}
|
||||
} else if errors.Is(err, constants.ErrRPCRequestTimeout) {
|
||||
// We really don't know whats going on, might be slow response, oom, disk full or a bug
|
||||
slog.WarnContext(ctx, "Task start Timed Out", "task_id", taskStart.ID, "worker", Workers[i].Name)
|
||||
|
||||
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Task Start RPC Call Timed Out!", time.Now())})
|
||||
if err != nil {
|
||||
return fmt.Errorf("Updating Unknown Task Status due to Timeout while starting Task: %w", err)
|
||||
}
|
||||
} else {
|
||||
slog.ErrorContext(ctx, "Task start Error", "task_id", taskStart.ID, "worker", Workers[i].Name)
|
||||
|
||||
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Task Start Error: %v", time.Now(), err.Error())})
|
||||
if err != nil {
|
||||
return fmt.Errorf("Updating Unknown Task Status due to Error while starting Task: %w", err)
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("Error Starting Task: %w", err)
|
||||
}
|
||||
|
||||
err = tx.Commit(ctx)
|
||||
|
|
|
@ -2,8 +2,6 @@ package server
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
@ -182,7 +180,7 @@ func updateWorkerTaskStatus(ctx context.Context) {
|
|||
if Workers[uuid].Connected {
|
||||
w := Workers[uuid]
|
||||
|
||||
rows, err := db.Query(ctx, "SELECT id, log_offset FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4 OR status = $5)", uuid, constants.TASK_STATUS_UNKNOWN, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING)
|
||||
rows, err := db.Query(ctx, "SELECT id, COALESCE(CARDINALITY(log),0) as log_offset FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4)", uuid, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING)
|
||||
if err != nil {
|
||||
slog.ErrorContext(ctx, "Error Getting Tasks for Worker", "err", err, "worker_id", uuid)
|
||||
return
|
||||
|
@ -209,7 +207,7 @@ func updateWorkerTaskStatus(ctx context.Context) {
|
|||
if strings.HasSuffix(err.Error(), constants.ErrTaskDoesNotExist.Error()) {
|
||||
// Worker says it does not know of this task, mark it failed so that we don't asks the worker about it again and again
|
||||
slog.ErrorContext(ctx, "Task is unknown by worker, Failing...", "err", err, "id", taskID)
|
||||
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_FAILED, []string{fmt.Sprintf("%v MASTER: Task Failed because it is unknown to Assigned Worker", time.Now())})
|
||||
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_FAILED, []string{"Task Failed because it is unknown to Assigned Worker"})
|
||||
if err != nil {
|
||||
slog.ErrorContext(ctx, "Updating Failed Task Status", "err", err)
|
||||
return
|
||||
|
@ -217,17 +215,10 @@ func updateWorkerTaskStatus(ctx context.Context) {
|
|||
slog.Info("Updating task done", "id", taskID, "status", constants.TASK_STATUS_FAILED)
|
||||
|
||||
return
|
||||
} else if errors.Is(err, constants.ErrRPCRequestTimeout) {
|
||||
// We really don't know whats going on, might be slow response, oom, disk full or a bug
|
||||
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Status RPC Call Timed Out!", time.Now())})
|
||||
if err != nil {
|
||||
slog.ErrorContext(ctx, "Updating Unknown Task Status due to Timeout", "err", err)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
slog.ErrorContext(ctx, "Getting Task Status", "err", err)
|
||||
|
||||
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Status RPC Call Error: %v", time.Now(), err.Error())})
|
||||
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2 WHERE id = $1", taskID, constants.TASK_STATUS_UNKNOWN)
|
||||
if err != nil {
|
||||
slog.ErrorContext(ctx, "Updating Unknown Task Status", "err", err)
|
||||
return
|
||||
|
@ -236,7 +227,7 @@ func updateWorkerTaskStatus(ctx context.Context) {
|
|||
}
|
||||
}
|
||||
|
||||
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3, log_offset = log_offset + $4 WHERE id = $1", taskID, ts.Task.Status, ts.Task.Log, len(ts.Task.Log))
|
||||
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, ts.Task.Status, ts.Task.Log)
|
||||
if err != nil {
|
||||
slog.ErrorContext(ctx, "Updating Task Status", "err", err)
|
||||
return
|
||||
|
|
|
@ -1,18 +0,0 @@
|
|||
[Unit]
|
||||
Description=morffix worker
|
||||
ConditionPathExists=/opt/morffix/morffix
|
||||
After=network.target
|
||||
Wants=morffix.service
|
||||
|
||||
[Service]
|
||||
Type=simple
|
||||
User=root
|
||||
LimitNOFILE=1024
|
||||
Restart=on-failure
|
||||
RestartSec=10
|
||||
StartLimitIntervalSec=60
|
||||
WorkingDirectory=/opt/morffix
|
||||
ExecStart=/opt/morffix/morffix
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
|
@ -1,19 +0,0 @@
|
|||
[Unit]
|
||||
Description=morffix
|
||||
ConditionPathExists=/opt/morffix/morffix
|
||||
After=network.target
|
||||
Wants=postgresql@15-main.service
|
||||
|
||||
[Service]
|
||||
Type=simple
|
||||
User=root
|
||||
Group=media_library
|
||||
LimitNOFILE=1024
|
||||
Restart=on-failure
|
||||
RestartSec=10
|
||||
StartLimitIntervalSec=60
|
||||
WorkingDirectory=/opt/morffix
|
||||
ExecStart=/opt/morffix/morffix -server
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
|
@ -17,7 +17,7 @@ func RunHealthCheck(conf config.Config, t *types.Task, data types.HealthCheckDat
|
|||
l := log.GetTaskLogger(t)
|
||||
|
||||
// TODO Figure out how to get correct file ending
|
||||
path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-health-%v-%v.mkv", t.ID, t.FileID))
|
||||
path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("%v-%v.mkv", t.ID, t.FileID))
|
||||
|
||||
// Set ffmpeg input path
|
||||
if len(t.FfmpegCommand.InputFiles) == 0 {
|
||||
|
|
|
@ -17,11 +17,6 @@ func StartTask(conf config.Config, data types.TaskStart) error {
|
|||
taskMutex.Lock()
|
||||
defer taskMutex.Unlock()
|
||||
|
||||
_, ok := tasks[data.ID]
|
||||
if ok {
|
||||
return constants.ErrTaskIsAlreadyRunning
|
||||
}
|
||||
|
||||
tasks[data.ID] = &types.Task{
|
||||
ID: data.ID,
|
||||
Type: data.Type,
|
||||
|
|
|
@ -17,8 +17,8 @@ func RunTranscode(conf config.Config, t *types.Task, data types.TranscodeData) {
|
|||
l := log.GetTaskLogger(t)
|
||||
|
||||
// TODO Figure out how to get correct file ending
|
||||
src_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-src-%v-%v.mkv", t.ID, t.FileID))
|
||||
dst_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-dst-%v-%v.mkv", t.ID, t.FileID))
|
||||
src_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("src-%v-%v.mkv", t.ID, t.FileID))
|
||||
dst_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("dst-%v-%v.mkv", t.ID, t.FileID))
|
||||
|
||||
// Set ffmpeg input path
|
||||
if len(t.FfmpegCommand.InputFiles) == 0 {
|
||||
|
|
|
@ -15,14 +15,6 @@
|
|||
<option value="1">Damaged</option>
|
||||
<option value="2">Healthy</option>
|
||||
</select>
|
||||
<label for="transcode">Transcode Status</label>
|
||||
<select id="transcode" name="transcode">
|
||||
<option value="-1">All</option>
|
||||
<option value="0">Unknown</option>
|
||||
<option value="1">None</option>
|
||||
<option value="2">Failed</option>
|
||||
<option value="2">Success</option>
|
||||
</select>
|
||||
<label for="type">Type</label>
|
||||
<select id="type" name="type">
|
||||
<option value="0">Health Check</option>
|
||||
|
|
|
@ -5,7 +5,6 @@ import (
|
|||
"io"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"path/filepath"
|
||||
|
||||
"git.lastassault.de/speatzle/morffix/rpc"
|
||||
|
||||
|
@ -37,20 +36,6 @@ func Start(_conf config.Config) {
|
|||
return
|
||||
}
|
||||
|
||||
slog.InfoContext(ctx, "Cleaning tmp Files...")
|
||||
files, err := filepath.Glob("/tmp/morffix-*")
|
||||
if err != nil {
|
||||
slog.Error("Get tmp Files", "err", err)
|
||||
return
|
||||
}
|
||||
for _, f := range files {
|
||||
slog.InfoContext(ctx, "Deleting File", "path", f)
|
||||
if err := os.Remove(f); err != nil {
|
||||
slog.Error("Deleting tmp File", "err", err, "path", f)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
sigs := make(chan os.Signal, 1)
|
||||
signal.Notify(sigs, os.Interrupt)
|
||||
exit := false
|
||||
|
|
Loading…
Add table
Reference in a new issue