Compare commits

..

No commits in common. "main" and "scan_rework" have entirely different histories.

38 changed files with 194 additions and 1112 deletions

View file

@ -2,14 +2,13 @@ package constants
import "fmt"
const WORKER_VERSION = "v2"
const WORKER_VERSION = "v1"
const WORKER_VERSION_HEADER = "morffix-version"
const SHARED_SECRET_HEADER = "morffix-secret"
const NAME_HEADER = "morffix-name"
const UUID_HEADER = "morffix-uuid"
const HASH_HEADER = "morffix-hash"
const TASK_ID_HEADER = "morffix-task-id"
const INDEX_TEMPLATE_NAME = "index.tmpl"
const LIBRARIES_TEMPLATE_NAME = "libraries.tmpl"
@ -18,7 +17,6 @@ const MESSAGE_TEMPLATE_NAME = "message.tmpl"
const TASKS_TEMPLATE_NAME = "tasks.tmpl"
const TASK_TEMPLATE_NAME = "task.tmpl"
const FFMPEG_COMMANDS_TEMPLATE_NAME = "ffmpeg_commands.tmpl"
const STATS_TEMPLATE_NAME = "stats.tmpl"
const FORM_FILE_KEY = "file"
@ -27,8 +25,6 @@ const SORT_ORDER_ASC_PARAM = "order_asc"
const QUERY_LIMIT_PARAM = "limit"
const QUERY_PAGE_PARAM = "page"
const TEMP_FILE_EXTENSION = ".morffix"
type TaskType int
const (

1
go.mod
View file

@ -12,7 +12,6 @@ require (
)
require (
github.com/go-echarts/go-echarts/v2 v2.4.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 // indirect

2
go.sum
View file

@ -2,8 +2,6 @@ github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8
github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-echarts/go-echarts/v2 v2.4.0 h1:efD46dmAvaZEWrBHAGjE8cfDK48vvFTHz5N9VqW5rYc=
github.com/go-echarts/go-echarts/v2 v2.4.0/go.mod h1:56YlvzhW/a+du15f3S2qUGNDfKnFOeJSThBIrVFHDtI=
github.com/golang-migrate/migrate/v4 v4.17.1 h1:4zQ6iqL6t6AiItphxJctQb3cFqWiSpMnX7wLTPnnYO4=
github.com/golang-migrate/migrate/v4 v4.17.1/go.mod h1:m8hinFyWBn0SA4QKHuKh175Pm9wjmxj3S2Mia7dbXzM=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=

View file

@ -1,2 +1,2 @@
ALTER TABLE files
DROP IF EXISTS ffprobe_data;
DROP IF EXISTS ffprobe;

View file

@ -1,3 +0,0 @@
ALTER TABLE libraries
DROP IF EXISTS transcode_command_id,
DROP IF EXISTS health_command_id;

View file

@ -1,3 +0,0 @@
ALTER TABLE libraries
ADD transcode_command_id bigint REFERENCES ffmpeg_commands(id),
ADD health_command_id bigint REFERENCES ffmpeg_commands(id);

View file

@ -1,4 +0,0 @@
ALTER TABLE files
DROP IF EXISTS old_md5,
DROP IF EXISTS old_ffprobe_data,
DROP IF EXISTS old_size;

View file

@ -1,3 +0,0 @@
ALTER TABLE files ADD old_md5 uuid,
ADD old_ffprobe_data JSONB,
ADD old_size bigint;

View file

@ -1,2 +0,0 @@
ALTER TABLE workers
DROP IF EXISTS parallel_tasks;

View file

@ -1,2 +0,0 @@
ALTER TABLE workers
ADD parallel_tasks smallint NOT NULL DEFAULT 1;

View file

@ -18,9 +18,8 @@ type IndexData struct {
type IndexWorker struct {
ID string
Worker
Status *types.WorkerStatus
QueueEnable bool
ParallelTasks int
Status *types.WorkerStatus
QueueEnable bool
}
func handleIndex(w http.ResponseWriter, r *http.Request) {
@ -32,15 +31,6 @@ func handleIndex(w http.ResponseWriter, r *http.Request) {
WorkersMutex.Lock()
defer WorkersMutex.Unlock()
for i := range Workers {
var queueEnable bool
var parallelTasks int
err := db.QueryRow(r.Context(), "SELECT queue_enable, parallel_tasks FROM workers WHERE id = $1", i).Scan(&queueEnable, &parallelTasks)
if err != nil {
w.Write([]byte(err.Error()))
slog.ErrorContext(r.Context(), "Error Querying Worker", "err", err)
}
if Workers[i].Connected {
var status types.WorkerStatus
_, err := rpcServer.Call(r.Context(), Workers[i].Conn, "status", nil, &status)
@ -51,19 +41,15 @@ func handleIndex(w http.ResponseWriter, r *http.Request) {
}
slog.InfoContext(r.Context(), "Got Worker Status", "id", i, "status", status)
data.Workers = append(data.Workers, IndexWorker{
ID: i,
Worker: *Workers[i],
Status: &status,
QueueEnable: queueEnable,
ParallelTasks: parallelTasks,
ID: i,
Worker: *Workers[i],
Status: &status,
})
} else {
data.Workers = append(data.Workers, IndexWorker{
ID: i,
Worker: *Workers[i],
Status: nil,
QueueEnable: queueEnable,
ParallelTasks: parallelTasks,
ID: i,
Worker: *Workers[i],
Status: nil,
})
}
}
@ -91,10 +77,6 @@ func handleIndex(w http.ResponseWriter, r *http.Request) {
}
func splitInt(n int) []int {
if n == 0 {
return []int{0}
}
slc := []int{}
for n > 0 {
slc = append(slc, n%10)

View file

@ -5,21 +5,17 @@ import (
"fmt"
"log/slog"
"net/http"
"strconv"
"git.lastassault.de/speatzle/morffix/constants"
"github.com/jackc/pgx/v5"
)
type LibrariesData struct {
Libraries []Library
FfmpegCommands []FfmpegCommand
Libraries []Library
}
type Library struct {
ID string `db:"id"`
HealthCommandID *int `db:"health_command_id"`
TranscodeCommandID *int `db:"transcode_command_id"`
Name string `db:"name"`
Path string `db:"path"`
Enable bool `db:"enable"`
@ -44,7 +40,7 @@ func handleLibraries(w http.ResponseWriter, r *http.Request) {
}
}
rows, err := db.Query(r.Context(), "SELECT * FROM libraries")
rows, err := db.Query(r.Context(), "SELECT id, name, path, enable, scan_new_queue_health, scan_changed_queue_health, scan_new_queue_transcode, scan_changed_queue_transcode, health_ok_queue_transcode, transcode_replace, transcode_path FROM libraries")
if err != nil {
slog.ErrorContext(r.Context(), "Query Libraries", "err", err)
http.Error(w, "Error Query Libraries: "+err.Error(), http.StatusInternalServerError)
@ -58,20 +54,6 @@ func handleLibraries(w http.ResponseWriter, r *http.Request) {
}
data.Libraries = libraries
rows, err = db.Query(r.Context(), "SELECT id, name, data FROM ffmpeg_commands")
if err != nil {
slog.ErrorContext(r.Context(), "Query Ffmpeg Commands", "err", err)
http.Error(w, "Error Ffmpeg Commands: "+err.Error(), http.StatusInternalServerError)
return
}
ffmpegCommands, err := pgx.CollectRows[FfmpegCommand](rows, pgx.RowToStructByName[FfmpegCommand])
if err != nil {
slog.ErrorContext(r.Context(), "Collect Ffmpeg Commands", "err", err)
http.Error(w, "Error Collect Ffmpeg Commands: "+err.Error(), http.StatusInternalServerError)
return
}
data.FfmpegCommands = ffmpegCommands
buf := bytes.Buffer{}
err = templates.ExecuteTemplate(&buf, constants.LIBRARIES_TEMPLATE_NAME, data)
if err != nil {
@ -102,21 +84,9 @@ func createLibrary(r *http.Request) error {
scan_changed_queue_transcode := r.FormValue("scan_changed_queue_transcode") == "on"
health_ok_queue_transcode := r.FormValue("health_ok_queue_transcode") == "on"
var health_command_id *int = nil
var transcode_command_id *int = nil
h, err := strconv.Atoi(r.FormValue("health_command_id"))
if err == nil {
health_command_id = &h
}
t, err := strconv.Atoi(r.FormValue("transcode_command_id"))
if err == nil {
transcode_command_id = &t
}
slog.Info("Got Library Create", "name", name, "path", path, "enable", enable)
_, err = db.Exec(r.Context(), "INSERT INTO Libraries (name, path, enable, scan_new_queue_health, scan_changed_queue_health, scan_new_queue_transcode, scan_changed_queue_transcode, health_ok_queue_transcode, transcode_replace, transcode_path, health_command_id, transcode_command_id) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12)", name, path, enable, scan_new_queue_health, scan_changed_queue_health, scan_new_queue_transcode, scan_changed_queue_transcode, health_ok_queue_transcode, transcode_replace, transcode_path, health_command_id, transcode_command_id)
_, err = db.Exec(r.Context(), "INSERT INTO Libraries (name, path, enable, scan_new_queue_health, scan_changed_queue_health, scan_new_queue_transcode, scan_changed_queue_transcode, health_ok_queue_transcode, transcode_replace, transcode_path) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10)", name, path, enable, scan_new_queue_health, scan_changed_queue_health, scan_new_queue_transcode, scan_changed_queue_transcode, health_ok_queue_transcode, transcode_replace, transcode_path)
if err != nil {
return fmt.Errorf("Inserting Library: %w", err)
}

View file

@ -20,12 +20,10 @@ type File struct {
ID int `db:"id"`
Path string `db:"path"`
Size int64 `db:"size"`
OldSize *int64 `db:"old_size"`
Status constants.FileStatus `db:"status"`
Health constants.FileHealth `db:"health"`
Transcode constants.FileTranscode `db:"transcode"`
MD5 []byte `db:"md5"`
OldMD5 *[]byte `db:"old_md5"`
UpdatedAt time.Time `db:"updated_at"`
}
@ -65,7 +63,7 @@ func handleLibrary(w http.ResponseWriter, r *http.Request) {
Enable: enabled,
}
rows, err := db.Query(r.Context(), "SELECT id, path, size, old_size, status, health, transcode, md5, old_md5, updated_at FROM files where library_id = $1", id)
rows, err := db.Query(r.Context(), "SELECT id, path, size, status, health, transcode, md5, updated_at FROM files where library_id = $1", id)
if err != nil {
slog.ErrorContext(r.Context(), "Query Files", "err", err)
http.Error(w, "Error Query Files: "+err.Error(), http.StatusInternalServerError)

View file

@ -1,41 +0,0 @@
package server
import (
"fmt"
"log/slog"
"net/http"
"strconv"
)
func HandleSetParallelTasks(w http.ResponseWriter, r *http.Request) {
err := r.ParseForm()
if err != nil {
http.Error(w, fmt.Sprintf("Parseing Form: %v", err), http.StatusBadRequest)
return
}
parallelTasks, err := strconv.Atoi(r.FormValue("parallel_tasks"))
if err != nil {
http.Error(w, "Can't parse parallel_tasks: "+err.Error(), http.StatusBadRequest)
return
}
worker := r.FormValue("worker")
slog.Info("Got set Parallel Tasks", "parallel_tasks", parallelTasks, "worker", worker)
if worker == "all" {
_, err = db.Exec(r.Context(), "UPDATE workers SET parallel_tasks = $1", parallelTasks)
if err != nil {
http.Error(w, fmt.Sprintf("Setting Worker Parallel Tasks: %v", err), http.StatusInternalServerError)
return
}
} else {
_, err = db.Exec(r.Context(), "UPDATE workers SET parallel_tasks = $1 where id = $2", parallelTasks, worker)
if err != nil {
http.Error(w, fmt.Sprintf("Setting Worker Parallel Tasks: %v", err), http.StatusInternalServerError)
return
}
}
http.Redirect(w, r, r.Header.Get("Referer"), http.StatusFound)
}

View file

@ -10,11 +10,11 @@ import (
"os"
"path/filepath"
"slices"
"strconv"
"sync"
"time"
"git.lastassault.de/speatzle/morffix/constants"
"git.lastassault.de/speatzle/morffix/workerpool"
"github.com/jackc/pgx/v5"
"gopkg.in/vansante/go-ffprobe.v2"
)
@ -22,47 +22,44 @@ import (
var videoFileExtensions = []string{".mkv", ".mp4", ".webm", ".flv", ".avi"}
func handleScan(w http.ResponseWriter, r *http.Request) {
http.Error(w, "Not Implemented", http.StatusNotImplemented)
/*
if r.Method == "GET" {
scanStatus(w, r)
return
}
if r.Method == "GET" {
scanStatus(w, r)
return
}
id, err := strconv.Atoi(r.FormValue("id"))
if err != nil {
http.Error(w, "Parsing ID", http.StatusBadRequest)
return
}
id, err := strconv.Atoi(r.FormValue("id"))
if err != nil {
http.Error(w, "Parsing ID", http.StatusBadRequest)
return
}
var name string
var path string
var enabled bool
err = db.QueryRow(r.Context(), "SELECT name, path, enable FROM libraries WHERE id = $1", id).Scan(&name, &path, &enabled)
if err != nil {
slog.ErrorContext(r.Context(), "Get Library", "err", err)
http.Error(w, "Error Get Library: "+err.Error(), http.StatusInternalServerError)
return
}
var name string
var path string
var enabled bool
err = db.QueryRow(r.Context(), "SELECT name, path, enable FROM libraries WHERE id = $1", id).Scan(&name, &path, &enabled)
if err != nil {
slog.ErrorContext(r.Context(), "Get Library", "err", err)
http.Error(w, "Error Get Library: "+err.Error(), http.StatusInternalServerError)
return
}
scanCtx := context.Background()
go scanLibrary(scanCtx, id)
scanCtx := context.Background()
go scanLibrary(scanCtx, id)
message := "Scan Started"
message := "Scan Started"
buf := bytes.Buffer{}
err = templates.ExecuteTemplate(&buf, constants.MESSAGE_TEMPLATE_NAME, message)
if err != nil {
slog.ErrorContext(r.Context(), "Executing Library Template", "err", err)
http.Error(w, "Error Executing Template: "+err.Error(), http.StatusInternalServerError)
return
}
buf := bytes.Buffer{}
err = templates.ExecuteTemplate(&buf, constants.MESSAGE_TEMPLATE_NAME, message)
if err != nil {
slog.ErrorContext(r.Context(), "Executing Library Template", "err", err)
http.Error(w, "Error Executing Template: "+err.Error(), http.StatusInternalServerError)
return
}
_, err = w.Write(buf.Bytes())
if err != nil {
slog.ErrorContext(r.Context(), "Writing http Response", "err", err)
}
*/
_, err = w.Write(buf.Bytes())
if err != nil {
slog.ErrorContext(r.Context(), "Writing http Response", "err", err)
}
}
func scanStatus(w http.ResponseWriter, r *http.Request) {
@ -82,7 +79,7 @@ func scanStatus(w http.ResponseWriter, r *http.Request) {
}
}
func manageScan(stop chan bool, scanPool *workerpool.WorkerPool, hashPool *workerpool.WorkerPool) {
func manageScan(stop chan bool) {
scanTicker := time.NewTicker(time.Minute)
hashTicker := time.NewTicker(time.Second)
scanRunning := false
@ -114,7 +111,7 @@ func manageScan(stop chan bool, scanPool *workerpool.WorkerPool, hashPool *worke
}
for _, l := range libraries {
scanLibrary(ctx, l, scanPool)
scanLibrary(ctx, l)
}
}()
@ -129,32 +126,19 @@ func manageScan(stop chan bool, scanPool *workerpool.WorkerPool, hashPool *worke
hashRunning = false
}()
rows, err := db.Query(ctx, "SELECT f.id FROM files f INNER JOIN libraries l ON f.library_id = l.id WHERE l.enable = true AND (f.status = $1 OR f.status = $2) LIMIT 10", constants.FILE_STATUS_CHANGED, constants.FILE_STATUS_NEW)
var fileID uint
err := db.QueryRow(ctx, "SELECT id FROM files WHERE status = $1 OR status = $2 LIMIT 1", constants.FILE_STATUS_CHANGED, constants.FILE_STATUS_NEW).Scan(&fileID)
if err != nil {
if !errors.Is(err, pgx.ErrNoRows) {
slog.ErrorContext(ctx, "Error Getting Files for Hashing", "err", err)
}
// Nothing to do
return
}
files, err := pgx.CollectRows[uint](rows, pgx.RowTo[uint])
err = hashFile(ctx, fileID)
if err != nil {
slog.ErrorContext(ctx, "Collect File Rows", "err", err)
return
slog.ErrorContext(ctx, "Error Hashing File", "err", err)
}
for _, f := range files {
hashPool.QueueJob(func(ctx context.Context) {
err = hashFile(ctx, f)
if err != nil {
slog.ErrorContext(ctx, "Error Hashing File", "err", err)
}
})
}
// Wait for all jobs to finish since we don't track which ones are in progress and don't want to hash the same file twice. Not ideal but better than just doing one at a time
hashPool.WaitForEmptyQueue()
}()
case <-stop:
cancel()
@ -165,7 +149,7 @@ func manageScan(stop chan bool, scanPool *workerpool.WorkerPool, hashPool *worke
var scanLock sync.Mutex
func scanLibrary(ctx context.Context, id int, scanPool *workerpool.WorkerPool) {
func scanLibrary(ctx context.Context, id int) {
slog.InfoContext(ctx, "Acquiring Scan Lock...")
scanLock.Lock()
defer scanLock.Unlock()
@ -174,9 +158,7 @@ func scanLibrary(ctx context.Context, id int, scanPool *workerpool.WorkerPool) {
var name string
var lpath string
var enabled bool
var health_command_id *int
var scan_new_queue_health bool
err := db.QueryRow(ctx, "SELECT name, path, enable, health_command_id, scan_new_queue_health FROM libraries WHERE id = $1", id).Scan(&name, &lpath, &enabled, &health_command_id, &scan_new_queue_health)
err := db.QueryRow(ctx, "SELECT name, path, enable FROM libraries WHERE id = $1", id).Scan(&name, &lpath, &enabled)
if err != nil {
slog.ErrorContext(ctx, "Get Library", "err", err)
return
@ -198,17 +180,22 @@ func scanLibrary(ctx context.Context, id int, scanPool *workerpool.WorkerPool) {
return
}
tx, err := db.Begin(ctx)
if err != nil {
slog.ErrorContext(ctx, "Begin Transaction", "err", err)
return
}
defer tx.Rollback(ctx)
slog.InfoContext(ctx, "Checking Files...", "id", id, "path", lpath)
// TODO Work on getting missing status again
/*
// Mark all Files as Missing
_, err = tx.Exec(ctx, "UPDATE files SET status = $2 where library_id = $1", id, constants.FILE_STATUS_MISSING)
if err != nil {
slog.ErrorContext(ctx, "Setting Missing Status", "err", err)
return
}
*/
// Mark all Files as Missing
_, err = tx.Exec(ctx, "UPDATE files SET status = $2 where library_id = $1", id, constants.FILE_STATUS_MISSING)
if err != nil {
slog.ErrorContext(ctx, "Setting Missing Status", "err", err)
return
}
err = filepath.Walk(lpath,
func(fullPath string, info os.FileInfo, err error) error {
if errors.Is(err, os.ErrPermission) {
@ -224,7 +211,7 @@ func scanLibrary(ctx context.Context, id int, scanPool *workerpool.WorkerPool) {
}
if !slices.Contains(videoFileExtensions, filepath.Ext(fullPath)) {
slog.DebugContext(ctx, "Skipping non video file", "path", fullPath)
slog.InfoContext(ctx, "Skipping non video file", "path", fullPath)
return nil
}
@ -236,63 +223,52 @@ func scanLibrary(ctx context.Context, id int, scanPool *workerpool.WorkerPool) {
var fileID int
var oldSize uint
var oldModTime time.Time
// Remove Timezone and Round to nearest Second
newModTime := info.ModTime().UTC().Round(time.Second)
err = db.QueryRow(ctx, "SELECT id, size, mod_time FROM files WHERE library_id = $1 AND path = $2", id, fPath).Scan(&fileID, &oldSize, &oldModTime)
err = tx.QueryRow(ctx, "SELECT id, size, mod_time FROM files WHERE library_id = $1 AND path = $2", id, fPath).Scan(&fileID, &oldSize, &oldModTime)
if errors.Is(err, pgx.ErrNoRows) {
// File Does not Exist Yet
scanPool.QueueJob(
func(ctx context.Context) {
slog.InfoContext(ctx, "File is New, Running FFProbe...", "path", fullPath)
slog.InfoContext(ctx, "File is New, Running FFProbe...", "path", fullPath)
ffprobeData, err := ffprobe.ProbeURL(ctx, fullPath)
if err != nil {
slog.ErrorContext(ctx, "ffprobe New File", "err", err)
}
slog.InfoContext(ctx, "ffprobe Done", "path", fullPath)
ffprobeData, err := ffprobe.ProbeURL(ctx, fullPath)
if err != nil {
return fmt.Errorf("ffprobe New File: %w", err)
}
slog.InfoContext(ctx, "ffprobe Done", "path", fullPath)
var file_id int
err = db.QueryRow(ctx, `INSERT INTO files (library_id, path, size, status, health, ffprobe_data, mod_time)
VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING id`, id, fPath, info.Size(), constants.FILE_STATUS_NEW, constants.FILE_HEALTH_UNKNOWN, ffprobeData, newModTime).Scan(&file_id)
if err != nil {
slog.ErrorContext(ctx, "Add New File to DB", "err", err)
}
if scan_new_queue_health && health_command_id != nil {
slog.InfoContext(ctx, "Queueing HealthCheck On New File", "file_id", file_id, "health_command_id", health_command_id)
_, err = db.Exec(ctx, "INSERT INTO tasks (file_id, type, status, data, ffmpeg_command_id) VALUES ($1,$2,$3,$4,$5)", file_id, constants.TASK_TYPE_HEALTHCHECK, constants.TASK_STATUS_QUEUED, nil, health_command_id)
if err != nil {
slog.ErrorContext(ctx, "Queueing Transcode On HealthCheck Success", "err", err, "file_id", file_id, "health_command_id", health_command_id)
}
}
})
_, err = tx.Exec(ctx, "INSERT INTO files (library_id, path, size, status, health, ffprobe_data) VALUES ($1, $2, $3, $4, $5, $6)", id, fPath, info.Size(), constants.FILE_STATUS_NEW, constants.FILE_HEALTH_UNKNOWN, ffprobeData)
if err != nil {
return fmt.Errorf("Add New File to DB: %w", err)
}
return nil
} else if err != nil {
return fmt.Errorf("Getting File: %w", err)
}
// Remove Timezone and Round to nearest Second
newModTime := info.ModTime().UTC().Round(time.Second)
// File Already Exists, Check if it has been changed
if newModTime.Equal(oldModTime) && uint(info.Size()) == oldSize {
slog.Debug("File stayed the same", "id", fileID)
_, err = tx.Exec(ctx, "UPDATE files SET status = $2 WHERE id = $1", fileID, constants.FILE_STATUS_EXISTS)
if err != nil {
return fmt.Errorf("Updating Non Changed File in DB: %w", err)
}
} else {
scanPool.QueueJob(func(ctx context.Context) {
slog.InfoContext(ctx, "File Has Changed", "path", fullPath, "old_mod_time", oldModTime, "new_mod_time", newModTime, "old_size", oldSize, "new_size", info.Size())
slog.InfoContext(ctx, "File Has Changed", "path", fullPath, "old_mod_time", oldModTime, "new_mod_time", newModTime, "old_size", oldSize, "new_size", info.Size())
ffprobeData, err := ffprobe.ProbeURL(ctx, fullPath)
if err != nil {
slog.ErrorContext(ctx, "ffprobe Changed File", "err", err)
}
slog.InfoContext(ctx, "ffprobe Done", "path", fullPath)
ffprobeData, err := ffprobe.ProbeURL(ctx, fullPath)
if err != nil {
return fmt.Errorf("ffprobe Changed File: %w", err)
}
slog.InfoContext(ctx, "ffprobe Done", "path", fullPath)
_, err = db.Exec(ctx, "UPDATE files SET size = $2, status = $3, health = $4, mod_time = $5, ffprobe_data = $6 WHERE id = $1", fileID, info.Size(), constants.FILE_STATUS_CHANGED, constants.FILE_HEALTH_UNKNOWN, newModTime, ffprobeData)
if err != nil {
slog.ErrorContext(ctx, "Updating Changed File in DB", "err", err)
}
})
_, err = tx.Exec(ctx, "UPDATE files SET size = $2, status = $3, health = $4, mod_time = $5, ffprobe_data = $6 WHERE id = $1", fileID, info.Size(), constants.FILE_STATUS_CHANGED, constants.FILE_HEALTH_UNKNOWN, newModTime, ffprobeData)
if err != nil {
return fmt.Errorf("Updating Changed File in DB: %w", err)
}
}
return nil
})
if err != nil {
@ -300,7 +276,10 @@ func scanLibrary(ctx context.Context, id int, scanPool *workerpool.WorkerPool) {
return
}
scanPool.WaitForEmptyQueue()
err = tx.Commit(ctx)
if err != nil {
slog.ErrorContext(ctx, "Error Committing Changes", "err", err)
return
}
slog.InfoContext(ctx, "Scan Done", "id", id)
}

View file

@ -17,7 +17,6 @@ import (
"github.com/jackc/pgx/v5/pgxpool"
"git.lastassault.de/speatzle/morffix/config"
"git.lastassault.de/speatzle/morffix/workerpool"
)
var conf config.Config
@ -103,9 +102,6 @@ func Start(_conf config.Config, tmplFS embed.FS, staticFS embed.FS, migrationsFS
mux.HandleFunc("/libraries", handleLibraries)
mux.HandleFunc("/ffmpeg_commands", handleFfmpegCommands)
mux.HandleFunc("/queue_enable", HandleSetQueueEnable)
mux.HandleFunc("/parallel_tasks", HandleSetParallelTasks)
mux.HandleFunc("/stats", handleStats)
mux.HandleFunc("/stats/{id}", handleStats)
mux.HandleFunc("/", handleIndex)
@ -133,11 +129,8 @@ func Start(_conf config.Config, tmplFS embed.FS, staticFS embed.FS, migrationsFS
stopWorkerManagement := make(chan bool, 1)
go manageWorkers(stopWorkerManagement)
scanPool := workerpool.NewWorkerPool(10, 100000)
hashPool := workerpool.NewWorkerPool(5, 100000)
stopScanning := make(chan bool, 1)
go manageScan(stopScanning, scanPool, hashPool)
go manageScan(stopScanning)
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, os.Interrupt)
@ -153,10 +146,6 @@ func Start(_conf config.Config, tmplFS embed.FS, staticFS embed.FS, migrationsFS
stopWorkerManagement <- true
slog.Info("Stopping Scanning...")
stopScanning <- true
slog.Info("Stopping Scanning Pool...")
scanPool.Stop()
slog.Info("Stopping Hashing Pool...")
hashPool.Stop()
slog.Info("Done")
}
}

View file

@ -1,369 +0,0 @@
package server
import (
"bytes"
"context"
"fmt"
"html/template"
"log/slog"
"net/http"
"slices"
"strings"
"time"
"git.lastassault.de/speatzle/morffix/constants"
"github.com/go-echarts/go-echarts/v2/charts"
"github.com/go-echarts/go-echarts/v2/opts"
"github.com/jackc/pgx/v5"
)
type StatsDisplay struct {
Charts []ChartData
Libraries []Library
SelectedLibrary string
Size []int
Duration []int
Count []int
Saved []int
}
type ChartData struct {
Element template.HTML
Script template.HTML
}
type PieValue struct {
Name string
Value int
}
type PieIntValue struct {
Id int
Value int
}
const CHART_COLOR_SUCCESS = "#7cffb2"
const CHART_COLOR_FAILED = "#ff6e76"
const CHART_COLOR_UNKNOWN = "#fddd60"
func generatePie(name string, data []opts.PieData) ChartData {
pie := charts.NewPie()
pie.SetGlobalOptions(
charts.WithInitializationOpts(opts.Initialization{
Theme: "dark",
BackgroundColor: "#111",
}),
charts.WithTitleOpts(opts.Title{
Title: name,
}))
for i := range data {
if data[i].Name == "Success" || data[i].Name == "Healthy" {
data[i].ItemStyle = &opts.ItemStyle{
Color: CHART_COLOR_SUCCESS,
}
} else if data[i].Name == "Failed" || data[i].Name == "Damaged" {
data[i].ItemStyle = &opts.ItemStyle{
Color: CHART_COLOR_FAILED,
}
} else if data[i].Name == "Unknown" || data[i].Name == "None" {
data[i].ItemStyle = &opts.ItemStyle{
Color: CHART_COLOR_UNKNOWN,
}
}
}
pie.AddSeries(name, data).SetSeriesOptions(
charts.WithLabelOpts(
opts.Label{
Show: opts.Bool(true),
Formatter: "{b}: {c}",
}),
)
snippet := pie.RenderSnippet()
return ChartData{
Element: template.HTML(snippet.Element),
Script: template.HTML(snippet.Script),
}
}
func generateStats(ctx context.Context, library_id string) ([]ChartData, error) {
data := []ChartData{}
rows, err := db.Query(ctx,
`SELECT COALESCE(jsonb_path_query_first(ffprobe_data, '$.streams[*] ? (@.codec_type == "video") ? (@.disposition.attached_pic == 0).codec_name')::text, 'Unknown') AS name, COUNT(*) AS value
FROM files
WHERE ffprobe_data IS NOT NULL AND ($1 = -1 OR library_id = $1)
GROUP BY 1;`, library_id)
if err != nil {
return nil, fmt.Errorf("Query Codecs: %w", err)
}
codecCounts, err := pgx.CollectRows(rows, pgx.RowToStructByName[PieValue])
if err != nil {
return nil, fmt.Errorf("Collect Codec Data: %w", err)
}
res := []opts.PieData{}
for _, v := range codecCounts {
res = append(res, opts.PieData{
Name: strings.ReplaceAll(v.Name, "\"", ""),
Value: v.Value,
})
}
data = append(data, generatePie("Codecs", res))
rows, err = db.Query(ctx,
`SELECT COALESCE(jsonb_path_query_first(ffprobe_data, '$.streams[*] ? (@.codec_type == "video") ? (@.disposition.attached_pic == 0).width')::text, 'Unknown') AS name, COUNT(*) AS value
FROM files
WHERE ffprobe_data IS NOT NULL AND ($1 = -1 OR library_id = $1)
GROUP BY 1;`, library_id)
if err != nil {
return nil, fmt.Errorf("Query Resolution: %w", err)
}
resolutionCounts, err := pgx.CollectRows(rows, pgx.RowToStructByName[PieValue])
if err != nil {
return nil, fmt.Errorf("Collect Codec Data: %w", err)
}
res = []opts.PieData{}
for _, v := range resolutionCounts {
res = append(res, opts.PieData{
Name: strings.ReplaceAll(v.Name, "\"", ""),
Value: v.Value,
})
}
data = append(data, generatePie("Resolution", res))
rows, err = db.Query(ctx,
`SELECT COALESCE(jsonb_path_query_first(ffprobe_data, '$.format.format_name')::text, 'Unknown') AS name, COUNT(*) AS value
FROM files
WHERE ffprobe_data IS NOT NULL AND ($1 = -1 OR library_id = $1)
GROUP BY 1;`, library_id)
if err != nil {
return nil, fmt.Errorf("Query Container: %w", err)
}
containerCounts, err := pgx.CollectRows(rows, pgx.RowToStructByName[PieValue])
if err != nil {
return nil, fmt.Errorf("Collect Container Data: %w", err)
}
res = []opts.PieData{}
for _, v := range containerCounts {
res = append(res, opts.PieData{
Name: strings.ReplaceAll(v.Name, "\"", ""),
Value: v.Value,
})
}
data = append(data, generatePie("Container", res))
rows, err = db.Query(ctx,
`SELECT health AS id, COUNT(*) AS value
FROM files WHERE ($1 = -1 OR library_id = $1)
GROUP BY 1;`, library_id)
if err != nil {
return nil, fmt.Errorf("Query Health: %w", err)
}
healthCounts, err := pgx.CollectRows(rows, pgx.RowToStructByName[PieIntValue])
if err != nil {
return nil, fmt.Errorf("Collect Health Data: %w", err)
}
res = []opts.PieData{}
for _, v := range healthCounts {
res = append(res, opts.PieData{
Name: constants.FileHealth(v.Id).String(),
Value: v.Value,
})
}
data = append(data, generatePie("Health", res))
rows, err = db.Query(ctx,
`SELECT transcode AS id, COUNT(*) AS value
FROM files WHERE ($1 = -1 OR library_id = $1)
GROUP BY 1;`, library_id)
if err != nil {
return nil, fmt.Errorf("Query Transcode: %w", err)
}
transcodeCounts, err := pgx.CollectRows(rows, pgx.RowToStructByName[PieIntValue])
if err != nil {
return nil, fmt.Errorf("Collect Transcode Data: %w", err)
}
res = []opts.PieData{}
for _, v := range transcodeCounts {
res = append(res, opts.PieData{
Name: constants.FileTranscode(v.Id).String(),
Value: v.Value,
})
}
data = append(data, generatePie("Transcode Status", res))
rows, err = db.Query(ctx,
`SELECT tasks.status AS id, COUNT(*) AS value
FROM tasks INNER JOIN files ON files.id = tasks.file_id
WHERE ($1 = -1 OR files.library_id = $1)
GROUP BY 1;`, library_id)
if err != nil {
return nil, fmt.Errorf("Query Task Status: %w", err)
}
taskStatusCounts, err := pgx.CollectRows(rows, pgx.RowToStructByName[PieIntValue])
if err != nil {
return nil, fmt.Errorf("Collect Task Status Data: %w", err)
}
res = []opts.PieData{}
for _, v := range taskStatusCounts {
res = append(res, opts.PieData{
Name: constants.TaskStatus(v.Id).String(),
Value: v.Value,
})
}
data = append(data, generatePie("Task Status", res))
type BarTaskRowValue struct {
Date time.Time
Status constants.TaskStatus
Count int
}
rows, err = db.Query(ctx,
`SELECT date_trunc('day', tasks.updated_at) date, tasks.status, COUNT(*) AS count
FROM tasks INNER JOIN files ON files.id = tasks.file_id
WHERE ($1 = -1 OR files.library_id = $1) AND tasks.updated_at > CURRENT_DATE - 7 AND (tasks.status = $2 OR tasks.status = $3)
GROUP BY 1,2
ORDER BY date;`, library_id, constants.TASK_STATUS_SUCCESS, constants.TASK_STATUS_FAILED)
if err != nil {
return nil, fmt.Errorf("Query Task Status Day: %w", err)
}
taskStatusDayCounts, err := pgx.CollectRows(rows, pgx.RowToStructByName[BarTaskRowValue])
if err != nil {
return nil, fmt.Errorf("Collect Task Status Day Data: %w", err)
}
days := []string{}
successBarData := []opts.BarData{}
failedBarData := []opts.BarData{}
for _, v := range taskStatusDayCounts {
if !slices.Contains(days, v.Date.Format(time.DateOnly)) {
days = append(days, v.Date.Format(time.DateOnly))
}
if v.Status == constants.TASK_STATUS_SUCCESS {
successBarData = append(successBarData, opts.BarData{
Value: v.Count,
ItemStyle: &opts.ItemStyle{
Color: CHART_COLOR_SUCCESS,
},
})
} else if v.Status == constants.TASK_STATUS_FAILED {
failedBarData = append(failedBarData, opts.BarData{
Value: v.Count,
ItemStyle: &opts.ItemStyle{
Color: CHART_COLOR_FAILED,
},
})
}
}
bar := charts.NewBar()
bar.SetGlobalOptions(
charts.WithInitializationOpts(opts.Initialization{
Theme: "dark",
BackgroundColor: "#111",
}),
charts.WithTitleOpts(opts.Title{
Title: "Task Success/Failed Last 7 Days",
}),
)
bar.SetXAxis(days).
AddSeries("Success", successBarData).
AddSeries("Failed", failedBarData).
SetSeriesOptions(charts.WithBarChartOpts(opts.BarChart{
Stack: "stackA",
}))
snippet := bar.RenderSnippet()
data = append(data, ChartData{
Element: template.HTML(snippet.Element),
Script: template.HTML(snippet.Script),
})
return data, nil
}
func handleStats(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
if id == "" {
id = "-1"
}
data, err := generateStats(r.Context(), id)
if err != nil {
slog.ErrorContext(r.Context(), "Generate Stats:", "err", err)
http.Error(w, "Generate Stats: "+err.Error(), http.StatusInternalServerError)
return
}
rows, err := db.Query(r.Context(),
`SELECT *
FROM libraries;`)
if err != nil {
slog.ErrorContext(r.Context(), "Query Libraries", "err", err)
http.Error(w, "Query Libraries: "+err.Error(), http.StatusInternalServerError)
return
}
libraries, err := pgx.CollectRows(rows, pgx.RowToStructByName[Library])
if err != nil {
slog.ErrorContext(r.Context(), "Collect Libraries", "err", err)
http.Error(w, "Collect Libraries: "+err.Error(), http.StatusInternalServerError)
return
}
var size int
var count int
var duration int
err = db.QueryRow(r.Context(),
`SELECT SUM(size) AS size,
COUNT(id) as count,
0 as duration
FROM files WHERE ($1 = -1 OR files.library_id = $1) AND status = $2`, id, constants.FILE_STATUS_EXISTS).Scan(&size, &count, &duration)
if err != nil {
size = 0
count = 0
duration = 0
}
var saved int
err = db.QueryRow(r.Context(),
`SELECT (SUM(old_size) - COALESCE(SUM(size), 0)) AS saved
FROM files WHERE ($1 = -1 OR files.library_id = $1) AND status = $2 AND old_size IS NOT NULL`, id, constants.FILE_STATUS_EXISTS).Scan(&saved)
if err != nil {
saved = 0
}
buf := bytes.Buffer{}
err = templates.ExecuteTemplate(&buf, constants.STATS_TEMPLATE_NAME, StatsDisplay{
Libraries: libraries,
SelectedLibrary: id,
Size: splitInt(size),
Count: splitInt(count),
Duration: splitInt(duration),
Saved: splitInt(saved),
Charts: data,
})
if err != nil {
slog.ErrorContext(r.Context(), "Executing Stats Template", "err", err)
http.Error(w, "Error Executing Template: "+err.Error(), http.StatusInternalServerError)
return
}
_, err = w.Write(buf.Bytes())
if err != nil {
slog.ErrorContext(r.Context(), "Writing http Response", "err", err)
}
}

View file

@ -107,7 +107,7 @@ func handleTasks(w http.ResponseWriter, r *http.Request) {
return
}
rows, err := db.Query(r.Context(), "SELECT * FROM libraries WHERE enable = $1", true)
rows, err := db.Query(r.Context(), "SELECT id, name, path, enable, scan_new_queue_health, scan_changed_queue_health, scan_new_queue_transcode, scan_changed_queue_transcode, health_ok_queue_transcode, transcode_replace, transcode_path FROM libraries WHERE enable = $1", true)
if err != nil {
slog.ErrorContext(r.Context(), "Query Libraries", "err", err)
http.Error(w, "Error Query Libraries: "+err.Error(), http.StatusInternalServerError)
@ -328,7 +328,6 @@ type QueuedTask struct {
Type constants.TaskType
FileID int `json:"file_id"`
FileMD5 []byte `json:"file_md5" db:"md5"`
FileExtension string `json:"file_extension"`
Data json.RawMessage
FfmpegCommand types.FFmpegCommand `json:"ffmpeg_command" db:"ffmpeg_command"`
}
@ -345,7 +344,7 @@ func assignQueuedTasks(ctx context.Context) error {
assignRunning = false
}()
rows, err := db.Query(ctx, "SELECT t.id as id, t.type as type, t.file_id as file_id, f.md5 as md5, substring(f.path FROM '\\.([^\\.]*)$') AS file_extension, t.data as data, fc.data as ffmpeg_command FROM tasks t INNER JOIN files f ON f.id = t.file_id INNER JOIN ffmpeg_commands fc ON fc.id = t.ffmpeg_command_id WHERE t.status = $1 AND f.status = $2 ORDER BY type DESC", constants.TASK_STATUS_QUEUED, constants.FILE_STATUS_EXISTS)
rows, err := db.Query(ctx, "SELECT t.id as id, t.type as type, t.file_id as file_id, f.md5 as md5, t.data as data, fc.data as ffmpeg_command FROM tasks t INNER JOIN files f ON f.id = t.file_id INNER JOIN ffmpeg_commands fc ON fc.id = t.ffmpeg_command_id WHERE t.status = $1", constants.TASK_STATUS_QUEUED)
if err != nil {
return fmt.Errorf("Query Queued Tasks: %w", err)
}
@ -354,8 +353,6 @@ func assignQueuedTasks(ctx context.Context) error {
return fmt.Errorf("Collect Queued Tasks: %w", err)
}
// TODO, allow overwriting of extension
//slog.Info("Assigning Queued Tasks", "count", len(queuedTasks))
if len(queuedTasks) == 0 {
@ -375,8 +372,7 @@ func assignQueuedTasks(ctx context.Context) error {
}
if Workers[i].Connected {
var queueEnable bool
var parallelTasks int
err := db.QueryRow(ctx, "SELECT queue_enable, parallel_tasks FROM workers WHERE id = $1", i).Scan(&queueEnable, &parallelTasks)
err := db.QueryRow(ctx, "SELECT queue_enable FROM workers WHERE id = $1", i).Scan(&queueEnable)
if err != nil {
return fmt.Errorf("Error Querying Worker Queue Enable: %w", err)
}
@ -394,7 +390,7 @@ func assignQueuedTasks(ctx context.Context) error {
slog.Debug("Assigning Queued Tasks Worker", "worker", i, "count", count)
// Allow for Multiple Tasks at once in the future
if count < parallelTasks {
if count < 1 {
tx, err := db.Begin(ctx)
defer tx.Rollback(ctx)
if err != nil {
@ -410,7 +406,6 @@ func assignQueuedTasks(ctx context.Context) error {
Type: queuedTasks[lastAssigned].Type,
FileID: queuedTasks[lastAssigned].FileID,
FileMD5: queuedTasks[lastAssigned].FileMD5,
FileExtension: queuedTasks[lastAssigned].FileExtension,
Data: queuedTasks[lastAssigned].Data,
FfmpegCommand: queuedTasks[lastAssigned].FfmpegCommand,
}

View file

@ -1,20 +1,15 @@
package server
import (
"context"
"encoding/base64"
"errors"
"fmt"
"io"
"log/slog"
"net/http"
"os"
"path/filepath"
"strconv"
"time"
"git.lastassault.de/speatzle/morffix/constants"
"gopkg.in/vansante/go-ffprobe.v2"
)
func handleUpload(w http.ResponseWriter, r *http.Request) {
@ -23,56 +18,30 @@ func handleUpload(w http.ResponseWriter, r *http.Request) {
return
}
strid := r.PathValue("id")
if strid == "" {
id := r.PathValue("id")
if id == "" {
http.Error(w, "No id", http.StatusBadRequest)
return
}
id, err := strconv.Atoi(strid)
var count int
err := db.QueryRow(r.Context(), "SELECT COUNT(*) FROM tasks WHERE file_id = $1 AND worker_id = $2", id, r.Header.Get(constants.UUID_HEADER)).Scan(&count)
if err != nil {
errorUpload(r, w, 0, "Convert File ID to int", err)
slog.ErrorContext(r.Context(), "Query Task Count", "err", err)
http.Error(w, "Error Query Task Count: "+err.Error(), http.StatusInternalServerError)
return
}
if r.Header.Get(constants.WORKER_VERSION_HEADER) != constants.WORKER_VERSION {
http.Error(w, "Wrong Worker Version", http.StatusBadRequest)
return
}
taskid, err := strconv.Atoi(r.Header.Get(constants.TASK_ID_HEADER))
if err != nil {
errorUpload(r, w, 0, "Convert Task ID to int", err)
return
}
var fileid int
var workerid string
var status constants.TaskStatus
err = db.QueryRow(r.Context(), "SELECT file_id, worker_id, status FROM tasks WHERE id = $1", taskid).Scan(&fileid, &workerid, &status)
if err != nil {
errorUpload(r, w, 0, "Getting Task for Upload", err)
return
}
if workerid != r.Header.Get(constants.UUID_HEADER) {
errorUpload(r, w, taskid, "Worker is not Assigned to Task", err)
return
}
if fileid != id {
errorUpload(r, w, taskid, "File is not for this Task", err)
return
}
if !(status == constants.TASK_STATUS_RUNNING || status == constants.TASK_STATUS_UNKNOWN) {
errorUpload(r, w, taskid, "File can only be uploaded when task status is running or unknown", err)
if count < 1 {
slog.ErrorContext(r.Context(), "No Running Task for file", "id", id)
http.Error(w, "No Running Task for file: "+id, http.StatusBadRequest)
return
}
hash, err := base64.StdEncoding.DecodeString(r.Header.Get(constants.HASH_HEADER))
if err != nil {
errorUpload(r, w, taskid, "Decode Hash", err)
slog.ErrorContext(r.Context(), "Decode Hash", "err", err)
http.Error(w, "Decode Hash: "+err.Error(), http.StatusBadRequest)
return
}
@ -80,7 +49,8 @@ func handleUpload(w http.ResponseWriter, r *http.Request) {
var libraryID int
err = db.QueryRow(r.Context(), "SELECT path, library_id FROM files WHERE id = $1", id).Scan(&fPath, &libraryID)
if err != nil {
errorUpload(r, w, taskid, "Query File Path", err)
slog.ErrorContext(r.Context(), "Query File Path", "err", err)
http.Error(w, "Error Query File Path: "+err.Error(), http.StatusInternalServerError)
return
}
@ -89,18 +59,18 @@ func handleUpload(w http.ResponseWriter, r *http.Request) {
var tReplace bool
err = db.QueryRow(r.Context(), "SELECT path, transcode_path, transcode_replace FROM libraries WHERE id = $1", libraryID).Scan(&lPath, &tPath, &tReplace)
if err != nil {
errorUpload(r, w, taskid, "Query Library Path", err)
slog.ErrorContext(r.Context(), "Query Library Path", "err", err)
http.Error(w, "Error Query Library Path: "+err.Error(), http.StatusInternalServerError)
return
}
// When replacing, send to temp, otherwise write directly
var path string
if tReplace {
path = filepath.Join(lPath, fPath+constants.TEMP_FILE_EXTENSION)
// if replace then this is a temp file and should be cleaned up on error
defer os.Remove(path)
//path = filepath.Join(lPath, fPath)
slog.ErrorContext(r.Context(), "Replace mode is not implemented")
http.Error(w, "Replace mode is not implemented", http.StatusNotImplemented)
return
} else {
// todo write this to a temp file first also to be able to run cleanup on error and unify the rename logic
path = filepath.Join(tPath, fPath)
}
@ -108,13 +78,15 @@ func handleUpload(w http.ResponseWriter, r *http.Request) {
err = r.ParseMultipartForm(100 << 20)
if err != nil {
errorUpload(r, w, taskid, "Parse Multipart Form", err)
slog.ErrorContext(r.Context(), "Parse Multipart Form", "err", err)
http.Error(w, "Parse Multipart Form: "+err.Error(), http.StatusInternalServerError)
return
}
srcFile, _, err := r.FormFile(constants.FORM_FILE_KEY)
if err != nil {
errorUpload(r, w, taskid, "Form File", err)
slog.ErrorContext(r.Context(), "Parse Multipart Form", "err", err)
http.Error(w, "Parse Multipart Form: "+err.Error(), http.StatusInternalServerError)
return
}
@ -122,18 +94,17 @@ func handleUpload(w http.ResponseWriter, r *http.Request) {
// MaxBytesReader closes the underlying io.Reader on its Close() is called
defer src.Close()
// if we are not replacing then we dont know if the destination folder exists
if !tReplace {
err = os.MkdirAll(filepath.Dir(path), 0775)
if err != nil {
errorUpload(r, w, taskid, "Creating Folder", err)
return
}
err = os.MkdirAll(filepath.Dir(path), 0775)
if err != nil {
slog.ErrorContext(r.Context(), "Creating Folder", "err", err)
http.Error(w, "Creating Folder: "+err.Error(), http.StatusInternalServerError)
return
}
out, err := os.Create(path)
if err != nil {
errorUpload(r, w, taskid, "Creating File", err)
slog.ErrorContext(r.Context(), "Creating File", "err", err)
http.Error(w, "Creating File: "+err.Error(), http.StatusInternalServerError)
return
}
defer out.Close()
@ -142,80 +113,17 @@ func handleUpload(w http.ResponseWriter, r *http.Request) {
if err != nil {
var maxBytesError *http.MaxBytesError
if errors.As(err, &maxBytesError) {
slog.ErrorContext(r.Context(), "File to Large", "err", err)
http.Error(w, "File to Large: "+err.Error(), http.StatusRequestEntityTooLarge)
slog.ErrorContext(r.Context(), "Parse Multipart Form", "err", err)
http.Error(w, "Parse Multipart Form: "+err.Error(), http.StatusRequestEntityTooLarge)
return
}
errorUpload(r, w, taskid, "Failed to write the uploaded content", err)
slog.ErrorContext(r.Context(), "Failed to write the uploaded content", "err", err)
http.Error(w, "Failed to write the uploaded content: "+err.Error(), http.StatusInternalServerError)
return
}
// TODO check file integrity
_ = hash
slog.InfoContext(r.Context(), "upload done", "written", written)
out.Close()
// if we are replacing, then we need to change the original file to the uploaded file and update the hash in the database
if tReplace {
tx, err := db.Begin(r.Context())
if err != nil {
errorUpload(r, w, taskid, "Begin Transaction", err)
return
}
dstPath := filepath.Join(lPath, fPath)
slog.InfoContext(r.Context(), "Replacing Original file With Uploaded File", "fileid", id, "path", path, "dstPath", dstPath)
err = os.Rename(path, dstPath)
if err != nil {
errorUpload(r, w, taskid, "Replace File", err)
return
}
info, err := os.Stat(dstPath)
if err != nil {
errorUpload(r, w, taskid, "Stat File", err)
return
}
modTime := info.ModTime().UTC().Round(time.Second)
size := info.Size()
ffprobeData, err := ffprobe.ProbeURL(r.Context(), dstPath)
if err != nil {
errorUpload(r, w, taskid, "ffProbe File", err)
return
}
_, err = tx.Exec(r.Context(), "UPDATE files SET old_md5 = md5, old_size = size, old_ffprobe_data = ffprobe_data WHERE id = $1", fileid)
if err != nil {
errorUpload(r, w, taskid, "Copy Filed Current Values to Old Values", err)
return
}
_, err = tx.Exec(r.Context(), "UPDATE files SET md5 = $2, size = $3, mod_time = $4, ffprobe_data = $5 WHERE id = $1", fileid, hash, size, modTime, ffprobeData)
if err != nil {
errorUpload(r, w, taskid, "Set New File Values", err)
return
}
err = tx.Commit(r.Context())
if err != nil {
errorUpload(r, w, taskid, "Commit File Hash", err)
return
}
slog.InfoContext(r.Context(), "Original file Replaced with Uploaded File", "fileid", id, "dstPath", dstPath)
} else {
// TODO implement "old" fields for non replace libraries, scan also needs to use old field if it is is non replace and the file has been transcoded
}
}
func errorUpload(r *http.Request, w http.ResponseWriter, taskID int, msg string, err error) {
slog.ErrorContext(r.Context(), msg, "err", err)
http.Error(w, msg+": "+err.Error(), http.StatusInternalServerError)
if taskID != 0 {
_, err2 := db.Exec(context.TODO(), "UPDATE tasks SET log = log || $2 WHERE id = $1", taskID, []string{fmt.Sprintf("%v MASTER: upload error: "+msg+": "+err.Error(), time.Now())})
if err != nil {
slog.ErrorContext(r.Context(), "Updating task log with upload error", "err", err2)
}
}
}

View file

@ -247,8 +247,7 @@ func updateWorkerTaskStatus(ctx context.Context) {
var health constants.FileHealth
if ts.Task.Status == constants.TASK_STATUS_SUCCESS {
health = constants.FILE_HEALTH_HEALTHY
// Auto Queue Transcode for Successfull Transcodes if library setting say so
go queueOnHealth(context.TODO(), ts.Task.ID)
// TODO Auto Queue Transcode for Successfull Transcodes if library setting
} else {
// TODO, not all failures mean the file is damaged, only update on success and track ffmpeg errors in task result data. also remove -xerror and scan for errors manually to see all problems in logs
health = constants.FILE_HEALTH_DAMAGED
@ -295,26 +294,3 @@ func updateWorkerTaskStatus(ctx context.Context) {
wg.Wait()
}
func queueOnHealth(ctx context.Context, taskID int) {
var transcode_command_id *int
var file_id *int
err := db.QueryRow(ctx, `SELECT libraries.transcode_command_id, files.id
FROM tasks
INNER JOIN files ON files.id = tasks.file_id
INNER JOIN libraries ON files.library_id = libraries.id
WHERE tasks.id = $1 AND libraries.health_ok_queue_transcode = true AND libraries.transcode_command_id IS NOT NULL`, taskID).
Scan(&transcode_command_id, &file_id)
if err == pgx.ErrNoRows {
slog.DebugContext(ctx, "No need to queue Transcode", "task_id", taskID)
} else if err != nil {
slog.ErrorContext(ctx, "Unable to Query if and how we should queue a Transcode on Health Check Success", "err", err)
return
}
slog.InfoContext(ctx, "Queueing Transcode On HealthCheck Success", "health_task_id", taskID, "transcode_command_id", transcode_command_id)
_, err = db.Exec(ctx, "INSERT INTO tasks (file_id, type, status, data, ffmpeg_command_id) VALUES ($1,$2,$3,$4,$5)", file_id, constants.TASK_TYPE_TRANSCODE, constants.TASK_STATUS_QUEUED, nil, transcode_command_id)
if err != nil {
slog.ErrorContext(ctx, "Queueing Transcode On HealthCheck Success", "err", err, "health_task_id", taskID, "transcode_command_id", transcode_command_id)
}
}

File diff suppressed because one or more lines are too long

View file

@ -1,7 +0,0 @@
function setLibrary(library_id) {
if (library_id == "-1") {
window.location.href = "/stats";
} else {
window.location.href = "/stats/" + library_id;
}
}

View file

@ -3,65 +3,58 @@
@import url("component.css");
:root {
font-family: monospace;
font-size: 12px;
overflow-y: auto;
font-family: monospace;
font-size: 12px;
overflow-y: auto;
}
body {
padding: 0.5rem;
gap: 0.5rem;
padding: 0.5rem;
gap: 0.5rem;
}
.counter-image {
image-rendering: pixelated;
image-rendering: -moz-crisp-edges;
image-rendering: crisp-edges;
flex-grow: 1;
max-height: 50vh;
max-width: 22.5vh;
image-rendering: pixelated;
image-rendering: -moz-crisp-edges;
image-rendering: crisp-edges;
flex-grow: 1;
}
.counter {
flex-flow: row nowrap;
align-items: end;
font-size: 1.5rem;
font-weight: bold;
flex-flow: row nowrap;
align-items: end;
font-size: 1.5rem;
font-weight: bold;
}
.workers {
flex-flow: row wrap;
gap: 2rem;
flex-flow: row wrap;
gap: 2rem;
}
.workers > * {
gap: 0.5rem;
border: 1px solid var(--fg);
.workers>* {
gap: 0.5rem;
border: 1px solid var(--fg);
}
.log > p {
padding: 0.25rem 0;
user-select: text;
.log>p {
padding: 0.25rem 0;
user-select: text;
}
.log > p:hover {
background: var(--cl-hl);
.log>p:hover {
background: var(--cl-hl);
}
nav > :first-child {
font-size: 2rem;
font-weight: bold;
nav> :first-child {
font-size: 2rem;
font-weight: bold;
}
.short-button {
align-self: start;
align-self: start;
}
.button-list {
flex-direction: row;
}
.stats {
flex-flow: row wrap;
gap: 2rem;
}
flex-direction: row;
}

View file

@ -17,7 +17,7 @@ func RunHealthCheck(conf config.Config, t *types.Task, data types.HealthCheckDat
l := log.GetTaskLogger(t)
// TODO Figure out how to get correct file ending
path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-health-%v-%v."+t.FileExtension, t.ID, t.FileID))
path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-health-%v-%v.mkv", t.ID, t.FileID))
// Set ffmpeg input path
if len(t.FfmpegCommand.InputFiles) == 0 {

View file

@ -27,7 +27,6 @@ func StartTask(conf config.Config, data types.TaskStart) error {
Type: data.Type,
FileID: data.FileID,
FileMD5: data.FileMD5,
FileExtension: data.FileExtension,
FfmpegCommand: data.FfmpegCommand,
}

View file

@ -17,8 +17,8 @@ func RunTranscode(conf config.Config, t *types.Task, data types.TranscodeData) {
l := log.GetTaskLogger(t)
// TODO Figure out how to get correct file ending
src_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-src-%v-%v."+t.FileExtension, t.ID, t.FileID))
dst_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-dst-%v-%v."+t.FileExtension, t.ID, t.FileID))
src_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-src-%v-%v.mkv", t.ID, t.FileID))
dst_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-dst-%v-%v.mkv", t.ID, t.FileID))
// Set ffmpeg input path
if len(t.FfmpegCommand.InputFiles) == 0 {

View file

@ -9,7 +9,6 @@ import (
"mime/multipart"
"net/http"
"os"
"strconv"
"time"
"git.lastassault.de/speatzle/morffix/config"
@ -83,7 +82,6 @@ func uploadFile(ctx context.Context, l *slog.Logger, conf config.Config, path st
req.Header.Add(constants.UUID_HEADER, uuid.String())
req.Header.Add(constants.WORKER_VERSION_HEADER, constants.WORKER_VERSION)
req.Header.Add(constants.HASH_HEADER, base64.StdEncoding.EncodeToString(hash))
req.Header.Add(constants.TASK_ID_HEADER, strconv.Itoa(t.ID))
req.Header.Add("Content-Type", "multipart/form-data; boundary=\""+m.Boundary()+"\"")
var client = &http.Client{
@ -99,8 +97,7 @@ func uploadFile(ctx context.Context, l *slog.Logger, conf config.Config, path st
l.InfoContext(ctx, "Upload Done")
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("Got HTTP Status Code: %v Body: %v", resp.StatusCode, string(body))
return fmt.Errorf("Got HTTP Status Code: %v", resp.StatusCode)
}
return nil

View file

@ -3,8 +3,7 @@
<html>
<head>
<link rel="stylesheet" href="/static/style/style.css">
<script src="/static/js/echarts.min.js"></script>
</head>
<body>
{{template "navbar"}}
{{end}}
{{end}}

View file

@ -31,24 +31,6 @@
</select>
<input type="submit" value="Submit">
</form>
<h2>Set Parallel Tasks</h2>
<form method="POST" action= "/parallel_tasks">
<label for="worker">Worker:</label>
<select id="worker" name="worker">
<option value="all">All</option>
{{range $w := .Workers}}
<option value="{{$w.ID}}">{{$w.Name}}</option>
{{end}}
</select>
<label for="parallel_tasks">Parallel Tasks</label>
<select id="parallel_tasks" name="parallel_tasks">
<option value="1">1</option>
<option value="2">2</option>
<option value="3">3</option>
<option value="4">4</option>
</select>
<input type="submit" value="Submit">
</form>
<h2>Workers</h2>
<div class="workers">
{{range $w := .Workers}}
@ -78,22 +60,6 @@
<td>
{{ $w.ConnectionChanged }}
</td>
</tr>
<tr>
<td>
QueueEnable
</td>
<td>
{{ $w.QueueEnable }}
</td>
</tr>
<tr>
<td>
ParallelTasks
</td>
<td>
{{ $w.ParallelTasks }}
</td>
</tr>
{{if $w.Connected}}
<tr>
@ -129,8 +95,8 @@
</td>
</tr>
{{end}}
</table>
</table>
</div>
{{end}}
</div>
{{template "tail"}}
{{template "tail"}}

View file

@ -13,8 +13,6 @@
<th>ScanNewQueueTranscode</th>
<th>ScanChangedQueueTranscode</th>
<th>HealthOkQueueTranscode</th>
<th>HealthCommand</th>
<th>TranscodeCommand</th>
</tr>
{{range $l := .Libraries}}
<tr onclick="window.location='/libraries/{{ $l.ID }}';">
@ -51,12 +49,6 @@
<td>
{{ $l.HealthOkQueueTranscode }}
</td>
<td>
{{ $l.HealthCommandID }}
</td>
<td>
{{ $l.TranscodeCommandID }}
</td>
</tr>
{{end}}
</table>
@ -75,28 +67,14 @@
<input type="text" name="transcode_path">
<label>Queue Heathcheck for new files on Scan:</label>
<input type="checkbox" name="scan_new_queue_health">
<label>Queue Heathcheck for changed files on Scan (Not Implemented):</label>
<label>Queue Heathcheck for changed files on Scan:</label>
<input type="checkbox" name="scan_changed_queue_health">
<label>Queue Transcode for new files on Scan (Not Implemented):</label>
<label>Queue Transcode for new files on Scan:</label>
<input type="checkbox" name="scan_new_queue_transcode">
<label>Queue Transcode for changed files on Scan (Not Implemented):</label>
<label>Queue Transcode for changed files on Scan:</label>
<input type="checkbox" name="scan_changed_queue_transcode">
<label>Queue Transcode on Healthcheck Success:</label>
<input type="checkbox" name="health_ok_queue_transcode">
<label for="health_command_id">Health Command:</label>
<select id="health_command_id" name="health_command_id">
<option value="">None</option>
{{range $l := .FfmpegCommands}}
<option value="{{$l.ID}}">{{$l.Name}}</option>
{{end}}
</select>
<label for="transcode_command_id">Transcode Command:</label>
<select id="transcode_command_id" name="transcode_command_id">
<option value="">None</option>
{{range $l := .FfmpegCommands}}
<option value="{{$l.ID}}">{{$l.Name}}</option>
{{end}}
</select>
<input type="submit" value="Submit">
</form>
{{template "tail"}}
{{template "tail"}}

View file

@ -49,4 +49,4 @@
{{end}}
</table>
</div>
{{template "tail"}}
{{template "tail"}}

View file

@ -4,6 +4,5 @@
<a class="button" href="/libraries">Libraries</a>
<a class="button" href="/tasks">Tasks</a>
<a class="button" href="/ffmpeg_commands">FFmpeg Commands</a>
<a class="button" href="/stats">Stats</a>
</nav>
{{end}}
{{end}}

View file

@ -1,69 +0,0 @@
{{template "head"}}
<script src="/static/js/library_filter.js"></script>
<h2>Stats</h2>
<label for="library">Library:</label>
<select id="library" name="library" onchange="setLibrary(this.value)" class="short-button">
<option {{if eq .SelectedLibrary "-1" }} selected {{end}}value="-1">ALL</option>
{{range $l := $.Libraries}}
<option {{if eq $.SelectedLibrary $l.ID }} selected {{end}}value="{{$l.ID}}">{{$l.Name}}</option>
{{end}}
</select>
<h4>Size</h4>
<div class="counter">
{{ $l := len .Size }}
{{range $i, $c := .Size}}
<img class="counter-image" alt="{{$c}}" src="/static/counter/{{$c}}.gif">
{{ $n := sub $l $i }}
{{ if eq $n 4 }}
KB
{{ else if eq $n 7 }}
MB
{{ else if eq $n 10 }}
GB
{{ else if eq $n 13 }}
TB
{{end}}
{{end}}
</div>
<h4>Files</h4>
<div class="counter">
{{ $l := len .Count }}
{{range $i, $c := .Count}}
<img class="counter-image" alt="{{$c}}" src="/static/counter/{{$c}}.gif">
{{ $n := sub $l $i }}
{{ if eq $n 4 }}
.
{{ else if eq $n 7 }}
.
{{ else if eq $n 10 }}
.
{{ else if eq $n 13 }}
.
{{end}}
{{end}}
</div>
<h4>Saved</h4>
<div class="counter">
{{ $l := len .Saved }}
{{range $i, $c := .Saved}}
<img class="counter-image" alt="{{$c}}" src="/static/counter/{{$c}}.gif">
{{ $n := sub $l $i }}
{{ if eq $n 4 }}
KB
{{ else if eq $n 7 }}
MB
{{ else if eq $n 10 }}
GB
{{ else if eq $n 13 }}
TB
{{end}}
{{end}}
</div>
<div class="stats">
{{range $c := .Charts}}
{{$c.Element}} {{$c.Script}}
{{end}}
</div>
{{template "tail"}}

View file

@ -10,7 +10,6 @@ type TaskStart struct {
ID int `json:"id"`
FileID int `json:"file_id"`
FileMD5 []byte `json:"file_md5"`
FileExtension string `json:"file_extension"`
Type constants.TaskType `json:"type"`
Data json.RawMessage
FfmpegCommand FFmpegCommand `json:"ffmpeg_command"`
@ -20,7 +19,6 @@ type Task struct {
ID int `json:"id"`
FileID int `json:"file_id"`
FileMD5 []byte `json:"md5"`
FileExtension string `json:"file_extension"`
Type constants.TaskType `json:"type"`
Status constants.TaskStatus `json:"status"`
FfmpegCommand FFmpegCommand `json:"ffmpeg_command"`

View file

@ -2,7 +2,6 @@ package worker
import (
"context"
"time"
"github.com/mackerelio/go-osstat/cpu"
"github.com/mackerelio/go-osstat/memory"
@ -15,31 +14,13 @@ func init() {
rpcServer.RegisterMethod("status", status)
}
var cpuBefore *cpu.Stats
var cpuUsage uint64
var cpuCount uint64
func calcUsage() {
for {
cStats, _ := cpu.Get()
if cStats != nil {
cpuUsage = cStats.Total
cpuCount = uint64(cStats.CPUCount)
if cpuBefore != nil {
total := float64(cStats.Total - cpuBefore.Total)
cpuUsage = uint64(float64((cStats.User-cpuBefore.User)+(cStats.System-cpuBefore.System)) / total * 100)
}
cpuBefore = cStats
}
time.Sleep(time.Second)
}
}
func status(ctx context.Context, req rpc.Request) (any, error) {
s := types.WorkerStatus{}
s.CPUUsage = cpuUsage
s.CPUCount = cpuCount
cStats, _ := cpu.Get()
if cStats != nil {
s.CPUUsage = cStats.Total
s.CPUCount = uint64(cStats.CPUCount)
}
mStats, _ := memory.Get()
if mStats != nil {
s.MemoryUsage = uint64(float64(mStats.Used) / float64(mStats.Total) * 100)

View file

@ -60,9 +60,6 @@ func Start(_conf config.Config) {
exit = true
cancel()
}()
go calcUsage()
for {
if exit {
slog.InfoContext(ctx, "Done")

View file

@ -1,67 +0,0 @@
package workerpool
import (
"context"
"sync"
)
type WorkerPool struct {
workers int
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
queue chan func(context.Context)
qg sync.WaitGroup
}
func NewWorkerPool(workers int, queueSize int) *WorkerPool {
ctx, cancel := context.WithCancel(context.Background())
queue := make(chan func(context.Context), queueSize)
workerPool := WorkerPool{
workers: workers,
ctx: ctx,
cancel: cancel,
queue: queue,
}
for i := 0; i < workerPool.workers; i++ {
workerPool.wg.Add(1)
go workerPool.work(workerPool.ctx)
}
return &workerPool
}
func (wp *WorkerPool) Stop() {
wp.cancel()
wp.wg.Wait()
}
func (wp *WorkerPool) QueueJob(job func(context.Context)) {
wp.qg.Add(1)
wp.queue <- job
}
func (wp *WorkerPool) WaitForEmptyQueue() {
wp.qg.Wait()
}
func (wp *WorkerPool) QueueLength() int {
return len(wp.queue)
}
func (wp *WorkerPool) work(ctx context.Context) {
for {
select {
case <-ctx.Done():
wp.wg.Done()
return
case job := <-wp.queue:
func() {
defer wp.qg.Done()
job(ctx)
}()
}
}
}