Compare commits
30 commits
3ec0e28272
...
b7f0ccc8a6
Author | SHA1 | Date | |
---|---|---|---|
![]() |
b7f0ccc8a6 | ||
2dec6a6271 | |||
8046093e9e | |||
eb3cfbfdba | |||
24e11d9103 | |||
78ab8c9daf | |||
7574628a1d | |||
a9d901da71 | |||
c37c26908d | |||
b99a818040 | |||
e8f7521fa4 | |||
265b2c1633 | |||
a45becb2ab | |||
9e8c5545cc | |||
4a2d3f0357 | |||
927d4b695a | |||
659b9ddaca | |||
471a545ba3 | |||
a29d7ec013 | |||
a98a172b79 | |||
0fe9e7943b | |||
3895e17090 | |||
e9c5e4f088 | |||
e4fc76e6b9 | |||
6a07db76ca | |||
dfc9e0a1bc | |||
017ff526ed | |||
08ecab2e56 | |||
1acf6183dd | |||
182f3819b3 |
19 changed files with 174 additions and 38 deletions
23
.forgejo/workflows/build.yaml
Normal file
23
.forgejo/workflows/build.yaml
Normal file
|
@ -0,0 +1,23 @@
|
||||||
|
on: [push]
|
||||||
|
jobs:
|
||||||
|
release:
|
||||||
|
runs-on: docker
|
||||||
|
steps:
|
||||||
|
- uses: actions/checkout@v4
|
||||||
|
- uses: actions/setup-go@v5
|
||||||
|
with:
|
||||||
|
go-version: '1.22'
|
||||||
|
- run: GOOS=linux GOARCH=amd64 go build
|
||||||
|
- run: mkdir out
|
||||||
|
- run: cp morffix out/morffix
|
||||||
|
- uses: actions/forgejo-release@eb0fcc44a150c0de82e6fdb36752dd56bf27d017
|
||||||
|
with:
|
||||||
|
direction: upload
|
||||||
|
url: https://git.soontm.de
|
||||||
|
release-dir: out
|
||||||
|
release-notes: "New Release"
|
||||||
|
token: ${{ secrets.GITHUB_TOKEN }}
|
||||||
|
override: true
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,2 +1,2 @@
|
||||||
# Morffix
|
# Morffix
|
||||||
Morph and fix you Media distributedly with ffmpeg
|
Morph and fix your Media distributedly with ffmpeg
|
||||||
|
|
|
@ -87,6 +87,7 @@ const (
|
||||||
FILE_STATUS_MISSING
|
FILE_STATUS_MISSING
|
||||||
FILE_STATUS_EXISTS
|
FILE_STATUS_EXISTS
|
||||||
FILE_STATUS_CHANGED
|
FILE_STATUS_CHANGED
|
||||||
|
FILE_STATUS_NEW
|
||||||
)
|
)
|
||||||
|
|
||||||
func (s FileStatus) String() string {
|
func (s FileStatus) String() string {
|
||||||
|
@ -99,6 +100,8 @@ func (s FileStatus) String() string {
|
||||||
return "Exists"
|
return "Exists"
|
||||||
case FILE_STATUS_CHANGED:
|
case FILE_STATUS_CHANGED:
|
||||||
return "Changed"
|
return "Changed"
|
||||||
|
case FILE_STATUS_NEW:
|
||||||
|
return "New"
|
||||||
default:
|
default:
|
||||||
return fmt.Sprintf("%d", int(s))
|
return fmt.Sprintf("%d", int(s))
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,3 +3,5 @@ package constants
|
||||||
import "fmt"
|
import "fmt"
|
||||||
|
|
||||||
var ErrTaskDoesNotExist = fmt.Errorf("Task does not Exist")
|
var ErrTaskDoesNotExist = fmt.Errorf("Task does not Exist")
|
||||||
|
var ErrTaskIsAlreadyRunning = fmt.Errorf("Task is Already Running")
|
||||||
|
var ErrRPCRequestTimeout = fmt.Errorf("Request timed out")
|
||||||
|
|
2
migrations/000015_alter_files_table_hash.down.sql
Normal file
2
migrations/000015_alter_files_table_hash.down.sql
Normal file
|
@ -0,0 +1,2 @@
|
||||||
|
ALTER TABLE files
|
||||||
|
ALTER COLUMN hash bigint SET NOT NULL;
|
2
migrations/000015_alter_files_table_hash.up.sql
Normal file
2
migrations/000015_alter_files_table_hash.up.sql
Normal file
|
@ -0,0 +1,2 @@
|
||||||
|
ALTER TABLE files
|
||||||
|
ALTER COLUMN hash bigint DROP NOT NULL;
|
2
migrations/000015_alter_tasks_table_log_offset.down.sql
Normal file
2
migrations/000015_alter_tasks_table_log_offset.down.sql
Normal file
|
@ -0,0 +1,2 @@
|
||||||
|
ALTER TABLE tasks
|
||||||
|
DROP IF EXISTS log_offset;
|
2
migrations/000015_alter_tasks_table_log_offset.up.sql
Normal file
2
migrations/000015_alter_tasks_table_log_offset.up.sql
Normal file
|
@ -0,0 +1,2 @@
|
||||||
|
ALTER TABLE tasks
|
||||||
|
ADD log_offset integer NOT NULL DEFAULT 0;
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"git.lastassault.de/speatzle/morffix/constants"
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"nhooyr.io/websocket"
|
"nhooyr.io/websocket"
|
||||||
)
|
)
|
||||||
|
@ -89,7 +90,7 @@ func (s *server) Call(ctx context.Context, c *websocket.Conn, method string, par
|
||||||
|
|
||||||
// remove request from map
|
// remove request from map
|
||||||
delete(s.requests, id)
|
delete(s.requests, id)
|
||||||
return nil, fmt.Errorf("Request timed out")
|
return nil, constants.ErrRPCRequestTimeout
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3,10 +3,8 @@ package server
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"crypto/md5"
|
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
|
@ -31,8 +29,6 @@ func handleScan(w http.ResponseWriter, r *http.Request) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
full := r.FormValue("full") == "on"
|
|
||||||
|
|
||||||
var name string
|
var name string
|
||||||
var path string
|
var path string
|
||||||
var enabled bool
|
var enabled bool
|
||||||
|
@ -44,7 +40,7 @@ func handleScan(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
|
|
||||||
scanCtx := context.Background()
|
scanCtx := context.Background()
|
||||||
go scan(scanCtx, id, full)
|
go scan(scanCtx, id)
|
||||||
|
|
||||||
message := "Scan Started"
|
message := "Scan Started"
|
||||||
|
|
||||||
|
@ -79,7 +75,7 @@ func scanStatus(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func scan(ctx context.Context, id string, full bool) {
|
func scan(ctx context.Context, id string) {
|
||||||
slog.InfoContext(ctx, "Starting Scan", "id", id)
|
slog.InfoContext(ctx, "Starting Scan", "id", id)
|
||||||
|
|
||||||
// TODO Scan settings:
|
// TODO Scan settings:
|
||||||
|
@ -150,35 +146,36 @@ func scan(ctx context.Context, id string, full bool) {
|
||||||
slog.InfoContext(ctx, "Skipping non video file", "path", fullPath)
|
slog.InfoContext(ctx, "Skipping non video file", "path", fullPath)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
slog.InfoContext(ctx, "Hashing File", "path", fullPath, "size", info.Size())
|
/*
|
||||||
|
slog.InfoContext(ctx, "Hashing File", "path", fullPath, "size", info.Size())
|
||||||
|
|
||||||
file, err := os.Open(fullPath)
|
file, err := os.Open(fullPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Opening File: %w", err)
|
return fmt.Errorf("Opening File: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
hash := md5.New()
|
hash := md5.New()
|
||||||
if _, err := io.Copy(hash, file); err != nil {
|
if _, err := io.Copy(hash, file); err != nil {
|
||||||
return fmt.Errorf("Reading File: %w", err)
|
return fmt.Errorf("Reading File: %w", err)
|
||||||
}
|
}
|
||||||
newMD5 := hash.Sum(nil)
|
newMD5 := hash.Sum(nil)
|
||||||
|
|
||||||
slog.InfoContext(ctx, "File MD5", "path", fullPath, "size", info.Size(), "md5", newMD5)
|
|
||||||
|
|
||||||
|
slog.InfoContext(ctx, "File MD5", "path", fullPath, "size", info.Size(), "md5", newMD5)
|
||||||
|
*/
|
||||||
fPath, err := filepath.Rel(lpath, fullPath)
|
fPath, err := filepath.Rel(lpath, fullPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Getting Relative Path: %w", err)
|
return fmt.Errorf("Getting Relative Path: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var fileID int
|
var fileID int
|
||||||
var oldMD5 []byte
|
var size uint
|
||||||
var health constants.FileHealth
|
var health constants.FileHealth
|
||||||
err = tx.QueryRow(ctx, "SELECT id, md5, health FROM files WHERE library_id = $1 AND path = $2", id, fPath).Scan(&fileID, &oldMD5, &health)
|
err = tx.QueryRow(ctx, "SELECT id, size, health FROM files WHERE library_id = $1 AND path = $2", id, fPath).Scan(&fileID, &size, &health)
|
||||||
if errors.Is(err, pgx.ErrNoRows) {
|
if errors.Is(err, pgx.ErrNoRows) {
|
||||||
// File Does not Exist Yet
|
// File Does not Exist Yet
|
||||||
|
|
||||||
slog.InfoContext(ctx, "File is New", "path", fullPath)
|
slog.InfoContext(ctx, "File is New", "path", fullPath)
|
||||||
_, err = tx.Exec(ctx, "INSERT INTO files (library_id, path, size, status, health, md5) VALUES ($1, $2, $3, $4, $5, $6)", id, fPath, info.Size(), constants.FILE_STATUS_EXISTS, constants.FILE_HEALTH_UNKNOWN, newMD5)
|
_, err = tx.Exec(ctx, "INSERT INTO files (library_id, path, size, status, health) VALUES ($1, $2, $3, $4, $5)", id, fPath, info.Size(), constants.FILE_STATUS_NEW, constants.FILE_HEALTH_UNKNOWN)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Add New File to DB: %w", err)
|
return fmt.Errorf("Add New File to DB: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -187,12 +184,14 @@ func scan(ctx context.Context, id string, full bool) {
|
||||||
return fmt.Errorf("Getting File: %w", err)
|
return fmt.Errorf("Getting File: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if slices.Compare[[]byte](newMD5, oldMD5) != 0 {
|
/*
|
||||||
// File has changed on disk so reset health
|
if slices.Compare[[]byte](newMD5, oldMD5) != 0 {
|
||||||
health = constants.FILE_HEALTH_UNKNOWN
|
// File has changed on disk so reset health
|
||||||
}
|
health = constants.FILE_HEALTH_UNKNOWN
|
||||||
|
}
|
||||||
|
*/
|
||||||
// File Exists so update Size, status and hash
|
// File Exists so update Size, status and hash
|
||||||
_, err = tx.Exec(ctx, "UPDATE files SET size = $2, status = $3, health = $4, md5 = $5 WHERE id = $1", fileID, info.Size(), constants.FILE_STATUS_EXISTS, health, newMD5)
|
_, err = tx.Exec(ctx, "UPDATE files SET size = $2, status = $3, health = $4 WHERE id = $1", fileID, info.Size(), constants.FILE_STATUS_EXISTS, health)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Updating File in DB: %w", err)
|
return fmt.Errorf("Updating File in DB: %w", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,11 +4,13 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"net/http"
|
"net/http"
|
||||||
"slices"
|
"slices"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.lastassault.de/speatzle/morffix/constants"
|
"git.lastassault.de/speatzle/morffix/constants"
|
||||||
|
@ -258,6 +260,7 @@ func createTask(ctx context.Context, r *http.Request) error {
|
||||||
}
|
}
|
||||||
library := r.FormValue("library")
|
library := r.FormValue("library")
|
||||||
health := r.FormValue("health")
|
health := r.FormValue("health")
|
||||||
|
transcode := r.FormValue("transcode")
|
||||||
typ, err := strconv.Atoi(r.FormValue("type"))
|
typ, err := strconv.Atoi(r.FormValue("type"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Parsing Task Type: %w", err)
|
return fmt.Errorf("Parsing Task Type: %w", err)
|
||||||
|
@ -269,7 +272,7 @@ func createTask(ctx context.Context, r *http.Request) error {
|
||||||
|
|
||||||
slog.Info("Got Task Create", "library", library, "health", health, "type", typ)
|
slog.Info("Got Task Create", "library", library, "health", health, "type", typ)
|
||||||
|
|
||||||
rows, err := db.Query(r.Context(), "SELECT id, path, size, status, health, transcode, md5, updated_at FROM files where library_id = $1 AND status = $2 AND (-1 = $3 OR health = $3)", library, constants.FILE_STATUS_EXISTS, health)
|
rows, err := db.Query(r.Context(), "SELECT id, path, size, status, health, transcode, md5, updated_at FROM files where library_id = $1 AND status = $2 AND (-1 = $3 OR health = $3) AND (-1 = $4 OR transcode = $4)", library, constants.FILE_STATUS_EXISTS, health, transcode)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Query Files: %w", err)
|
return fmt.Errorf("Query Files: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -358,7 +361,7 @@ func assignQueuedTasks(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
if Workers[i].Connected {
|
if Workers[i].Connected {
|
||||||
var count int
|
var count int
|
||||||
err := db.QueryRow(ctx, "SELECT COUNT(*) FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4)", i, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING).Scan(&count)
|
err := db.QueryRow(ctx, "SELECT COUNT(*) FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4 OR status = $5)", i, constants.TASK_STATUS_UNKNOWN, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING).Scan(&count)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Error Querying Worker Task Count: %w", err)
|
return fmt.Errorf("Error Querying Worker Task Count: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -388,7 +391,30 @@ func assignQueuedTasks(ctx context.Context) error {
|
||||||
|
|
||||||
_, err = rpcServer.Call(ctx, Workers[i].Conn, "task-start", taskStart, nil)
|
_, err = rpcServer.Call(ctx, Workers[i].Conn, "task-start", taskStart, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Error Starting Task: %w", err)
|
if strings.HasSuffix(err.Error(), constants.ErrTaskIsAlreadyRunning.Error()) {
|
||||||
|
// Task was started previously but something went wrong and we are out of sync
|
||||||
|
slog.WarnContext(ctx, "Task is apparently already Running on this Worker, thats bad", "task_id", taskStart.ID, "worker", Workers[i].Name)
|
||||||
|
|
||||||
|
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_ASSIGNED, []string{fmt.Sprintf("%v MASTER: Task Start, Task Already Running!", time.Now())})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Updating Task status during already running error: %w", err)
|
||||||
|
}
|
||||||
|
} else if errors.Is(err, constants.ErrRPCRequestTimeout) {
|
||||||
|
// We really don't know whats going on, might be slow response, oom, disk full or a bug
|
||||||
|
slog.WarnContext(ctx, "Task start Timed Out", "task_id", taskStart.ID, "worker", Workers[i].Name)
|
||||||
|
|
||||||
|
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Task Start RPC Call Timed Out!", time.Now())})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Updating Unknown Task Status due to Timeout while starting Task: %w", err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
slog.ErrorContext(ctx, "Task start Error", "task_id", taskStart.ID, "worker", Workers[i].Name)
|
||||||
|
|
||||||
|
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskStart.ID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Task Start Error: %v", time.Now(), err.Error())})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Updating Unknown Task Status due to Error while starting Task: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = tx.Commit(ctx)
|
err = tx.Commit(ctx)
|
||||||
|
|
|
@ -2,6 +2,8 @@ package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
|
@ -180,7 +182,7 @@ func updateWorkerTaskStatus(ctx context.Context) {
|
||||||
if Workers[uuid].Connected {
|
if Workers[uuid].Connected {
|
||||||
w := Workers[uuid]
|
w := Workers[uuid]
|
||||||
|
|
||||||
rows, err := db.Query(ctx, "SELECT id, COALESCE(CARDINALITY(log),0) as log_offset FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4)", uuid, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING)
|
rows, err := db.Query(ctx, "SELECT id, log_offset FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4 OR status = $5)", uuid, constants.TASK_STATUS_UNKNOWN, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.ErrorContext(ctx, "Error Getting Tasks for Worker", "err", err, "worker_id", uuid)
|
slog.ErrorContext(ctx, "Error Getting Tasks for Worker", "err", err, "worker_id", uuid)
|
||||||
return
|
return
|
||||||
|
@ -207,7 +209,7 @@ func updateWorkerTaskStatus(ctx context.Context) {
|
||||||
if strings.HasSuffix(err.Error(), constants.ErrTaskDoesNotExist.Error()) {
|
if strings.HasSuffix(err.Error(), constants.ErrTaskDoesNotExist.Error()) {
|
||||||
// Worker says it does not know of this task, mark it failed so that we don't asks the worker about it again and again
|
// Worker says it does not know of this task, mark it failed so that we don't asks the worker about it again and again
|
||||||
slog.ErrorContext(ctx, "Task is unknown by worker, Failing...", "err", err, "id", taskID)
|
slog.ErrorContext(ctx, "Task is unknown by worker, Failing...", "err", err, "id", taskID)
|
||||||
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_FAILED, []string{"Task Failed because it is unknown to Assigned Worker"})
|
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_FAILED, []string{fmt.Sprintf("%v MASTER: Task Failed because it is unknown to Assigned Worker", time.Now())})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.ErrorContext(ctx, "Updating Failed Task Status", "err", err)
|
slog.ErrorContext(ctx, "Updating Failed Task Status", "err", err)
|
||||||
return
|
return
|
||||||
|
@ -215,10 +217,17 @@ func updateWorkerTaskStatus(ctx context.Context) {
|
||||||
slog.Info("Updating task done", "id", taskID, "status", constants.TASK_STATUS_FAILED)
|
slog.Info("Updating task done", "id", taskID, "status", constants.TASK_STATUS_FAILED)
|
||||||
|
|
||||||
return
|
return
|
||||||
|
} else if errors.Is(err, constants.ErrRPCRequestTimeout) {
|
||||||
|
// We really don't know whats going on, might be slow response, oom, disk full or a bug
|
||||||
|
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Status RPC Call Timed Out!", time.Now())})
|
||||||
|
if err != nil {
|
||||||
|
slog.ErrorContext(ctx, "Updating Unknown Task Status due to Timeout", "err", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
slog.ErrorContext(ctx, "Getting Task Status", "err", err)
|
slog.ErrorContext(ctx, "Getting Task Status", "err", err)
|
||||||
|
|
||||||
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2 WHERE id = $1", taskID, constants.TASK_STATUS_UNKNOWN)
|
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, constants.TASK_STATUS_UNKNOWN, []string{fmt.Sprintf("%v MASTER: Status RPC Call Error: %v", time.Now(), err.Error())})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.ErrorContext(ctx, "Updating Unknown Task Status", "err", err)
|
slog.ErrorContext(ctx, "Updating Unknown Task Status", "err", err)
|
||||||
return
|
return
|
||||||
|
@ -227,7 +236,7 @@ func updateWorkerTaskStatus(ctx context.Context) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3 WHERE id = $1", taskID, ts.Task.Status, ts.Task.Log)
|
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = log || $3, log_offset = log_offset + $4 WHERE id = $1", taskID, ts.Task.Status, ts.Task.Log, len(ts.Task.Log))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.ErrorContext(ctx, "Updating Task Status", "err", err)
|
slog.ErrorContext(ctx, "Updating Task Status", "err", err)
|
||||||
return
|
return
|
||||||
|
|
18
systemd/morffix-worker.service
Normal file
18
systemd/morffix-worker.service
Normal file
|
@ -0,0 +1,18 @@
|
||||||
|
[Unit]
|
||||||
|
Description=morffix worker
|
||||||
|
ConditionPathExists=/opt/morffix/morffix
|
||||||
|
After=network.target
|
||||||
|
Wants=morffix.service
|
||||||
|
|
||||||
|
[Service]
|
||||||
|
Type=simple
|
||||||
|
User=root
|
||||||
|
LimitNOFILE=1024
|
||||||
|
Restart=on-failure
|
||||||
|
RestartSec=10
|
||||||
|
StartLimitIntervalSec=60
|
||||||
|
WorkingDirectory=/opt/morffix
|
||||||
|
ExecStart=/opt/morffix/morffix
|
||||||
|
|
||||||
|
[Install]
|
||||||
|
WantedBy=multi-user.target
|
19
systemd/morffix.service
Normal file
19
systemd/morffix.service
Normal file
|
@ -0,0 +1,19 @@
|
||||||
|
[Unit]
|
||||||
|
Description=morffix
|
||||||
|
ConditionPathExists=/opt/morffix/morffix
|
||||||
|
After=network.target
|
||||||
|
Wants=postgresql@15-main.service
|
||||||
|
|
||||||
|
[Service]
|
||||||
|
Type=simple
|
||||||
|
User=root
|
||||||
|
Group=media_library
|
||||||
|
LimitNOFILE=1024
|
||||||
|
Restart=on-failure
|
||||||
|
RestartSec=10
|
||||||
|
StartLimitIntervalSec=60
|
||||||
|
WorkingDirectory=/opt/morffix
|
||||||
|
ExecStart=/opt/morffix/morffix -server
|
||||||
|
|
||||||
|
[Install]
|
||||||
|
WantedBy=multi-user.target
|
|
@ -17,7 +17,7 @@ func RunHealthCheck(conf config.Config, t *types.Task, data types.HealthCheckDat
|
||||||
l := log.GetTaskLogger(t)
|
l := log.GetTaskLogger(t)
|
||||||
|
|
||||||
// TODO Figure out how to get correct file ending
|
// TODO Figure out how to get correct file ending
|
||||||
path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("%v-%v.mkv", t.ID, t.FileID))
|
path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-health-%v-%v.mkv", t.ID, t.FileID))
|
||||||
|
|
||||||
// Set ffmpeg input path
|
// Set ffmpeg input path
|
||||||
if len(t.FfmpegCommand.InputFiles) == 0 {
|
if len(t.FfmpegCommand.InputFiles) == 0 {
|
||||||
|
|
|
@ -17,6 +17,11 @@ func StartTask(conf config.Config, data types.TaskStart) error {
|
||||||
taskMutex.Lock()
|
taskMutex.Lock()
|
||||||
defer taskMutex.Unlock()
|
defer taskMutex.Unlock()
|
||||||
|
|
||||||
|
_, ok := tasks[data.ID]
|
||||||
|
if ok {
|
||||||
|
return constants.ErrTaskIsAlreadyRunning
|
||||||
|
}
|
||||||
|
|
||||||
tasks[data.ID] = &types.Task{
|
tasks[data.ID] = &types.Task{
|
||||||
ID: data.ID,
|
ID: data.ID,
|
||||||
Type: data.Type,
|
Type: data.Type,
|
||||||
|
|
|
@ -17,8 +17,8 @@ func RunTranscode(conf config.Config, t *types.Task, data types.TranscodeData) {
|
||||||
l := log.GetTaskLogger(t)
|
l := log.GetTaskLogger(t)
|
||||||
|
|
||||||
// TODO Figure out how to get correct file ending
|
// TODO Figure out how to get correct file ending
|
||||||
src_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("src-%v-%v.mkv", t.ID, t.FileID))
|
src_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-src-%v-%v.mkv", t.ID, t.FileID))
|
||||||
dst_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("dst-%v-%v.mkv", t.ID, t.FileID))
|
dst_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-dst-%v-%v.mkv", t.ID, t.FileID))
|
||||||
|
|
||||||
// Set ffmpeg input path
|
// Set ffmpeg input path
|
||||||
if len(t.FfmpegCommand.InputFiles) == 0 {
|
if len(t.FfmpegCommand.InputFiles) == 0 {
|
||||||
|
|
|
@ -15,6 +15,14 @@
|
||||||
<option value="1">Damaged</option>
|
<option value="1">Damaged</option>
|
||||||
<option value="2">Healthy</option>
|
<option value="2">Healthy</option>
|
||||||
</select>
|
</select>
|
||||||
|
<label for="transcode">Transcode Status</label>
|
||||||
|
<select id="transcode" name="transcode">
|
||||||
|
<option value="-1">All</option>
|
||||||
|
<option value="0">Unknown</option>
|
||||||
|
<option value="1">None</option>
|
||||||
|
<option value="2">Failed</option>
|
||||||
|
<option value="2">Success</option>
|
||||||
|
</select>
|
||||||
<label for="type">Type</label>
|
<label for="type">Type</label>
|
||||||
<select id="type" name="type">
|
<select id="type" name="type">
|
||||||
<option value="0">Health Check</option>
|
<option value="0">Health Check</option>
|
||||||
|
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"path/filepath"
|
||||||
|
|
||||||
"git.lastassault.de/speatzle/morffix/rpc"
|
"git.lastassault.de/speatzle/morffix/rpc"
|
||||||
|
|
||||||
|
@ -36,6 +37,20 @@ func Start(_conf config.Config) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
slog.InfoContext(ctx, "Cleaning tmp Files...")
|
||||||
|
files, err := filepath.Glob("/tmp/morffix-*")
|
||||||
|
if err != nil {
|
||||||
|
slog.Error("Get tmp Files", "err", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for _, f := range files {
|
||||||
|
slog.InfoContext(ctx, "Deleting File", "path", f)
|
||||||
|
if err := os.Remove(f); err != nil {
|
||||||
|
slog.Error("Deleting tmp File", "err", err, "path", f)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
sigs := make(chan os.Signal, 1)
|
sigs := make(chan os.Signal, 1)
|
||||||
signal.Notify(sigs, os.Interrupt)
|
signal.Notify(sigs, os.Interrupt)
|
||||||
exit := false
|
exit := false
|
||||||
|
|
Loading…
Add table
Reference in a new issue