Compare commits

..

No commits in common. "main" and "scan_rework" have entirely different histories.

38 changed files with 194 additions and 1112 deletions

View file

@ -2,14 +2,13 @@ package constants
import "fmt" import "fmt"
const WORKER_VERSION = "v2" const WORKER_VERSION = "v1"
const WORKER_VERSION_HEADER = "morffix-version" const WORKER_VERSION_HEADER = "morffix-version"
const SHARED_SECRET_HEADER = "morffix-secret" const SHARED_SECRET_HEADER = "morffix-secret"
const NAME_HEADER = "morffix-name" const NAME_HEADER = "morffix-name"
const UUID_HEADER = "morffix-uuid" const UUID_HEADER = "morffix-uuid"
const HASH_HEADER = "morffix-hash" const HASH_HEADER = "morffix-hash"
const TASK_ID_HEADER = "morffix-task-id"
const INDEX_TEMPLATE_NAME = "index.tmpl" const INDEX_TEMPLATE_NAME = "index.tmpl"
const LIBRARIES_TEMPLATE_NAME = "libraries.tmpl" const LIBRARIES_TEMPLATE_NAME = "libraries.tmpl"
@ -18,7 +17,6 @@ const MESSAGE_TEMPLATE_NAME = "message.tmpl"
const TASKS_TEMPLATE_NAME = "tasks.tmpl" const TASKS_TEMPLATE_NAME = "tasks.tmpl"
const TASK_TEMPLATE_NAME = "task.tmpl" const TASK_TEMPLATE_NAME = "task.tmpl"
const FFMPEG_COMMANDS_TEMPLATE_NAME = "ffmpeg_commands.tmpl" const FFMPEG_COMMANDS_TEMPLATE_NAME = "ffmpeg_commands.tmpl"
const STATS_TEMPLATE_NAME = "stats.tmpl"
const FORM_FILE_KEY = "file" const FORM_FILE_KEY = "file"
@ -27,8 +25,6 @@ const SORT_ORDER_ASC_PARAM = "order_asc"
const QUERY_LIMIT_PARAM = "limit" const QUERY_LIMIT_PARAM = "limit"
const QUERY_PAGE_PARAM = "page" const QUERY_PAGE_PARAM = "page"
const TEMP_FILE_EXTENSION = ".morffix"
type TaskType int type TaskType int
const ( const (

1
go.mod
View file

@ -12,7 +12,6 @@ require (
) )
require ( require (
github.com/go-echarts/go-echarts/v2 v2.4.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 // indirect github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 // indirect

2
go.sum
View file

@ -2,8 +2,6 @@ github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8
github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-echarts/go-echarts/v2 v2.4.0 h1:efD46dmAvaZEWrBHAGjE8cfDK48vvFTHz5N9VqW5rYc=
github.com/go-echarts/go-echarts/v2 v2.4.0/go.mod h1:56YlvzhW/a+du15f3S2qUGNDfKnFOeJSThBIrVFHDtI=
github.com/golang-migrate/migrate/v4 v4.17.1 h1:4zQ6iqL6t6AiItphxJctQb3cFqWiSpMnX7wLTPnnYO4= github.com/golang-migrate/migrate/v4 v4.17.1 h1:4zQ6iqL6t6AiItphxJctQb3cFqWiSpMnX7wLTPnnYO4=
github.com/golang-migrate/migrate/v4 v4.17.1/go.mod h1:m8hinFyWBn0SA4QKHuKh175Pm9wjmxj3S2Mia7dbXzM= github.com/golang-migrate/migrate/v4 v4.17.1/go.mod h1:m8hinFyWBn0SA4QKHuKh175Pm9wjmxj3S2Mia7dbXzM=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=

View file

@ -1,2 +1,2 @@
ALTER TABLE files ALTER TABLE files
DROP IF EXISTS ffprobe_data; DROP IF EXISTS ffprobe;

View file

@ -1,3 +0,0 @@
ALTER TABLE libraries
DROP IF EXISTS transcode_command_id,
DROP IF EXISTS health_command_id;

View file

@ -1,3 +0,0 @@
ALTER TABLE libraries
ADD transcode_command_id bigint REFERENCES ffmpeg_commands(id),
ADD health_command_id bigint REFERENCES ffmpeg_commands(id);

View file

@ -1,4 +0,0 @@
ALTER TABLE files
DROP IF EXISTS old_md5,
DROP IF EXISTS old_ffprobe_data,
DROP IF EXISTS old_size;

View file

@ -1,3 +0,0 @@
ALTER TABLE files ADD old_md5 uuid,
ADD old_ffprobe_data JSONB,
ADD old_size bigint;

View file

@ -1,2 +0,0 @@
ALTER TABLE workers
DROP IF EXISTS parallel_tasks;

View file

@ -1,2 +0,0 @@
ALTER TABLE workers
ADD parallel_tasks smallint NOT NULL DEFAULT 1;

View file

@ -18,9 +18,8 @@ type IndexData struct {
type IndexWorker struct { type IndexWorker struct {
ID string ID string
Worker Worker
Status *types.WorkerStatus Status *types.WorkerStatus
QueueEnable bool QueueEnable bool
ParallelTasks int
} }
func handleIndex(w http.ResponseWriter, r *http.Request) { func handleIndex(w http.ResponseWriter, r *http.Request) {
@ -32,15 +31,6 @@ func handleIndex(w http.ResponseWriter, r *http.Request) {
WorkersMutex.Lock() WorkersMutex.Lock()
defer WorkersMutex.Unlock() defer WorkersMutex.Unlock()
for i := range Workers { for i := range Workers {
var queueEnable bool
var parallelTasks int
err := db.QueryRow(r.Context(), "SELECT queue_enable, parallel_tasks FROM workers WHERE id = $1", i).Scan(&queueEnable, &parallelTasks)
if err != nil {
w.Write([]byte(err.Error()))
slog.ErrorContext(r.Context(), "Error Querying Worker", "err", err)
}
if Workers[i].Connected { if Workers[i].Connected {
var status types.WorkerStatus var status types.WorkerStatus
_, err := rpcServer.Call(r.Context(), Workers[i].Conn, "status", nil, &status) _, err := rpcServer.Call(r.Context(), Workers[i].Conn, "status", nil, &status)
@ -51,19 +41,15 @@ func handleIndex(w http.ResponseWriter, r *http.Request) {
} }
slog.InfoContext(r.Context(), "Got Worker Status", "id", i, "status", status) slog.InfoContext(r.Context(), "Got Worker Status", "id", i, "status", status)
data.Workers = append(data.Workers, IndexWorker{ data.Workers = append(data.Workers, IndexWorker{
ID: i, ID: i,
Worker: *Workers[i], Worker: *Workers[i],
Status: &status, Status: &status,
QueueEnable: queueEnable,
ParallelTasks: parallelTasks,
}) })
} else { } else {
data.Workers = append(data.Workers, IndexWorker{ data.Workers = append(data.Workers, IndexWorker{
ID: i, ID: i,
Worker: *Workers[i], Worker: *Workers[i],
Status: nil, Status: nil,
QueueEnable: queueEnable,
ParallelTasks: parallelTasks,
}) })
} }
} }
@ -91,10 +77,6 @@ func handleIndex(w http.ResponseWriter, r *http.Request) {
} }
func splitInt(n int) []int { func splitInt(n int) []int {
if n == 0 {
return []int{0}
}
slc := []int{} slc := []int{}
for n > 0 { for n > 0 {
slc = append(slc, n%10) slc = append(slc, n%10)

View file

@ -5,21 +5,17 @@ import (
"fmt" "fmt"
"log/slog" "log/slog"
"net/http" "net/http"
"strconv"
"git.lastassault.de/speatzle/morffix/constants" "git.lastassault.de/speatzle/morffix/constants"
"github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5"
) )
type LibrariesData struct { type LibrariesData struct {
Libraries []Library Libraries []Library
FfmpegCommands []FfmpegCommand
} }
type Library struct { type Library struct {
ID string `db:"id"` ID string `db:"id"`
HealthCommandID *int `db:"health_command_id"`
TranscodeCommandID *int `db:"transcode_command_id"`
Name string `db:"name"` Name string `db:"name"`
Path string `db:"path"` Path string `db:"path"`
Enable bool `db:"enable"` Enable bool `db:"enable"`
@ -44,7 +40,7 @@ func handleLibraries(w http.ResponseWriter, r *http.Request) {
} }
} }
rows, err := db.Query(r.Context(), "SELECT * FROM libraries") rows, err := db.Query(r.Context(), "SELECT id, name, path, enable, scan_new_queue_health, scan_changed_queue_health, scan_new_queue_transcode, scan_changed_queue_transcode, health_ok_queue_transcode, transcode_replace, transcode_path FROM libraries")
if err != nil { if err != nil {
slog.ErrorContext(r.Context(), "Query Libraries", "err", err) slog.ErrorContext(r.Context(), "Query Libraries", "err", err)
http.Error(w, "Error Query Libraries: "+err.Error(), http.StatusInternalServerError) http.Error(w, "Error Query Libraries: "+err.Error(), http.StatusInternalServerError)
@ -58,20 +54,6 @@ func handleLibraries(w http.ResponseWriter, r *http.Request) {
} }
data.Libraries = libraries data.Libraries = libraries
rows, err = db.Query(r.Context(), "SELECT id, name, data FROM ffmpeg_commands")
if err != nil {
slog.ErrorContext(r.Context(), "Query Ffmpeg Commands", "err", err)
http.Error(w, "Error Ffmpeg Commands: "+err.Error(), http.StatusInternalServerError)
return
}
ffmpegCommands, err := pgx.CollectRows[FfmpegCommand](rows, pgx.RowToStructByName[FfmpegCommand])
if err != nil {
slog.ErrorContext(r.Context(), "Collect Ffmpeg Commands", "err", err)
http.Error(w, "Error Collect Ffmpeg Commands: "+err.Error(), http.StatusInternalServerError)
return
}
data.FfmpegCommands = ffmpegCommands
buf := bytes.Buffer{} buf := bytes.Buffer{}
err = templates.ExecuteTemplate(&buf, constants.LIBRARIES_TEMPLATE_NAME, data) err = templates.ExecuteTemplate(&buf, constants.LIBRARIES_TEMPLATE_NAME, data)
if err != nil { if err != nil {
@ -102,21 +84,9 @@ func createLibrary(r *http.Request) error {
scan_changed_queue_transcode := r.FormValue("scan_changed_queue_transcode") == "on" scan_changed_queue_transcode := r.FormValue("scan_changed_queue_transcode") == "on"
health_ok_queue_transcode := r.FormValue("health_ok_queue_transcode") == "on" health_ok_queue_transcode := r.FormValue("health_ok_queue_transcode") == "on"
var health_command_id *int = nil
var transcode_command_id *int = nil
h, err := strconv.Atoi(r.FormValue("health_command_id"))
if err == nil {
health_command_id = &h
}
t, err := strconv.Atoi(r.FormValue("transcode_command_id"))
if err == nil {
transcode_command_id = &t
}
slog.Info("Got Library Create", "name", name, "path", path, "enable", enable) slog.Info("Got Library Create", "name", name, "path", path, "enable", enable)
_, err = db.Exec(r.Context(), "INSERT INTO Libraries (name, path, enable, scan_new_queue_health, scan_changed_queue_health, scan_new_queue_transcode, scan_changed_queue_transcode, health_ok_queue_transcode, transcode_replace, transcode_path, health_command_id, transcode_command_id) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12)", name, path, enable, scan_new_queue_health, scan_changed_queue_health, scan_new_queue_transcode, scan_changed_queue_transcode, health_ok_queue_transcode, transcode_replace, transcode_path, health_command_id, transcode_command_id) _, err = db.Exec(r.Context(), "INSERT INTO Libraries (name, path, enable, scan_new_queue_health, scan_changed_queue_health, scan_new_queue_transcode, scan_changed_queue_transcode, health_ok_queue_transcode, transcode_replace, transcode_path) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10)", name, path, enable, scan_new_queue_health, scan_changed_queue_health, scan_new_queue_transcode, scan_changed_queue_transcode, health_ok_queue_transcode, transcode_replace, transcode_path)
if err != nil { if err != nil {
return fmt.Errorf("Inserting Library: %w", err) return fmt.Errorf("Inserting Library: %w", err)
} }

View file

@ -20,12 +20,10 @@ type File struct {
ID int `db:"id"` ID int `db:"id"`
Path string `db:"path"` Path string `db:"path"`
Size int64 `db:"size"` Size int64 `db:"size"`
OldSize *int64 `db:"old_size"`
Status constants.FileStatus `db:"status"` Status constants.FileStatus `db:"status"`
Health constants.FileHealth `db:"health"` Health constants.FileHealth `db:"health"`
Transcode constants.FileTranscode `db:"transcode"` Transcode constants.FileTranscode `db:"transcode"`
MD5 []byte `db:"md5"` MD5 []byte `db:"md5"`
OldMD5 *[]byte `db:"old_md5"`
UpdatedAt time.Time `db:"updated_at"` UpdatedAt time.Time `db:"updated_at"`
} }
@ -65,7 +63,7 @@ func handleLibrary(w http.ResponseWriter, r *http.Request) {
Enable: enabled, Enable: enabled,
} }
rows, err := db.Query(r.Context(), "SELECT id, path, size, old_size, status, health, transcode, md5, old_md5, updated_at FROM files where library_id = $1", id) rows, err := db.Query(r.Context(), "SELECT id, path, size, status, health, transcode, md5, updated_at FROM files where library_id = $1", id)
if err != nil { if err != nil {
slog.ErrorContext(r.Context(), "Query Files", "err", err) slog.ErrorContext(r.Context(), "Query Files", "err", err)
http.Error(w, "Error Query Files: "+err.Error(), http.StatusInternalServerError) http.Error(w, "Error Query Files: "+err.Error(), http.StatusInternalServerError)

View file

@ -1,41 +0,0 @@
package server
import (
"fmt"
"log/slog"
"net/http"
"strconv"
)
func HandleSetParallelTasks(w http.ResponseWriter, r *http.Request) {
err := r.ParseForm()
if err != nil {
http.Error(w, fmt.Sprintf("Parseing Form: %v", err), http.StatusBadRequest)
return
}
parallelTasks, err := strconv.Atoi(r.FormValue("parallel_tasks"))
if err != nil {
http.Error(w, "Can't parse parallel_tasks: "+err.Error(), http.StatusBadRequest)
return
}
worker := r.FormValue("worker")
slog.Info("Got set Parallel Tasks", "parallel_tasks", parallelTasks, "worker", worker)
if worker == "all" {
_, err = db.Exec(r.Context(), "UPDATE workers SET parallel_tasks = $1", parallelTasks)
if err != nil {
http.Error(w, fmt.Sprintf("Setting Worker Parallel Tasks: %v", err), http.StatusInternalServerError)
return
}
} else {
_, err = db.Exec(r.Context(), "UPDATE workers SET parallel_tasks = $1 where id = $2", parallelTasks, worker)
if err != nil {
http.Error(w, fmt.Sprintf("Setting Worker Parallel Tasks: %v", err), http.StatusInternalServerError)
return
}
}
http.Redirect(w, r, r.Header.Get("Referer"), http.StatusFound)
}

View file

@ -10,11 +10,11 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"slices" "slices"
"strconv"
"sync" "sync"
"time" "time"
"git.lastassault.de/speatzle/morffix/constants" "git.lastassault.de/speatzle/morffix/constants"
"git.lastassault.de/speatzle/morffix/workerpool"
"github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5"
"gopkg.in/vansante/go-ffprobe.v2" "gopkg.in/vansante/go-ffprobe.v2"
) )
@ -22,47 +22,44 @@ import (
var videoFileExtensions = []string{".mkv", ".mp4", ".webm", ".flv", ".avi"} var videoFileExtensions = []string{".mkv", ".mp4", ".webm", ".flv", ".avi"}
func handleScan(w http.ResponseWriter, r *http.Request) { func handleScan(w http.ResponseWriter, r *http.Request) {
http.Error(w, "Not Implemented", http.StatusNotImplemented) if r.Method == "GET" {
/* scanStatus(w, r)
if r.Method == "GET" { return
scanStatus(w, r) }
return
}
id, err := strconv.Atoi(r.FormValue("id")) id, err := strconv.Atoi(r.FormValue("id"))
if err != nil { if err != nil {
http.Error(w, "Parsing ID", http.StatusBadRequest) http.Error(w, "Parsing ID", http.StatusBadRequest)
return return
} }
var name string var name string
var path string var path string
var enabled bool var enabled bool
err = db.QueryRow(r.Context(), "SELECT name, path, enable FROM libraries WHERE id = $1", id).Scan(&name, &path, &enabled) err = db.QueryRow(r.Context(), "SELECT name, path, enable FROM libraries WHERE id = $1", id).Scan(&name, &path, &enabled)
if err != nil { if err != nil {
slog.ErrorContext(r.Context(), "Get Library", "err", err) slog.ErrorContext(r.Context(), "Get Library", "err", err)
http.Error(w, "Error Get Library: "+err.Error(), http.StatusInternalServerError) http.Error(w, "Error Get Library: "+err.Error(), http.StatusInternalServerError)
return return
} }
scanCtx := context.Background() scanCtx := context.Background()
go scanLibrary(scanCtx, id) go scanLibrary(scanCtx, id)
message := "Scan Started" message := "Scan Started"
buf := bytes.Buffer{} buf := bytes.Buffer{}
err = templates.ExecuteTemplate(&buf, constants.MESSAGE_TEMPLATE_NAME, message) err = templates.ExecuteTemplate(&buf, constants.MESSAGE_TEMPLATE_NAME, message)
if err != nil { if err != nil {
slog.ErrorContext(r.Context(), "Executing Library Template", "err", err) slog.ErrorContext(r.Context(), "Executing Library Template", "err", err)
http.Error(w, "Error Executing Template: "+err.Error(), http.StatusInternalServerError) http.Error(w, "Error Executing Template: "+err.Error(), http.StatusInternalServerError)
return return
} }
_, err = w.Write(buf.Bytes()) _, err = w.Write(buf.Bytes())
if err != nil { if err != nil {
slog.ErrorContext(r.Context(), "Writing http Response", "err", err) slog.ErrorContext(r.Context(), "Writing http Response", "err", err)
} }
*/
} }
func scanStatus(w http.ResponseWriter, r *http.Request) { func scanStatus(w http.ResponseWriter, r *http.Request) {
@ -82,7 +79,7 @@ func scanStatus(w http.ResponseWriter, r *http.Request) {
} }
} }
func manageScan(stop chan bool, scanPool *workerpool.WorkerPool, hashPool *workerpool.WorkerPool) { func manageScan(stop chan bool) {
scanTicker := time.NewTicker(time.Minute) scanTicker := time.NewTicker(time.Minute)
hashTicker := time.NewTicker(time.Second) hashTicker := time.NewTicker(time.Second)
scanRunning := false scanRunning := false
@ -114,7 +111,7 @@ func manageScan(stop chan bool, scanPool *workerpool.WorkerPool, hashPool *worke
} }
for _, l := range libraries { for _, l := range libraries {
scanLibrary(ctx, l, scanPool) scanLibrary(ctx, l)
} }
}() }()
@ -129,32 +126,19 @@ func manageScan(stop chan bool, scanPool *workerpool.WorkerPool, hashPool *worke
hashRunning = false hashRunning = false
}() }()
rows, err := db.Query(ctx, "SELECT f.id FROM files f INNER JOIN libraries l ON f.library_id = l.id WHERE l.enable = true AND (f.status = $1 OR f.status = $2) LIMIT 10", constants.FILE_STATUS_CHANGED, constants.FILE_STATUS_NEW) var fileID uint
err := db.QueryRow(ctx, "SELECT id FROM files WHERE status = $1 OR status = $2 LIMIT 1", constants.FILE_STATUS_CHANGED, constants.FILE_STATUS_NEW).Scan(&fileID)
if err != nil { if err != nil {
if !errors.Is(err, pgx.ErrNoRows) { if !errors.Is(err, pgx.ErrNoRows) {
slog.ErrorContext(ctx, "Error Getting Files for Hashing", "err", err) slog.ErrorContext(ctx, "Error Getting Files for Hashing", "err", err)
} }
// Nothing to do
return return
} }
files, err := pgx.CollectRows[uint](rows, pgx.RowTo[uint]) err = hashFile(ctx, fileID)
if err != nil { if err != nil {
slog.ErrorContext(ctx, "Collect File Rows", "err", err) slog.ErrorContext(ctx, "Error Hashing File", "err", err)
return
} }
for _, f := range files {
hashPool.QueueJob(func(ctx context.Context) {
err = hashFile(ctx, f)
if err != nil {
slog.ErrorContext(ctx, "Error Hashing File", "err", err)
}
})
}
// Wait for all jobs to finish since we don't track which ones are in progress and don't want to hash the same file twice. Not ideal but better than just doing one at a time
hashPool.WaitForEmptyQueue()
}() }()
case <-stop: case <-stop:
cancel() cancel()
@ -165,7 +149,7 @@ func manageScan(stop chan bool, scanPool *workerpool.WorkerPool, hashPool *worke
var scanLock sync.Mutex var scanLock sync.Mutex
func scanLibrary(ctx context.Context, id int, scanPool *workerpool.WorkerPool) { func scanLibrary(ctx context.Context, id int) {
slog.InfoContext(ctx, "Acquiring Scan Lock...") slog.InfoContext(ctx, "Acquiring Scan Lock...")
scanLock.Lock() scanLock.Lock()
defer scanLock.Unlock() defer scanLock.Unlock()
@ -174,9 +158,7 @@ func scanLibrary(ctx context.Context, id int, scanPool *workerpool.WorkerPool) {
var name string var name string
var lpath string var lpath string
var enabled bool var enabled bool
var health_command_id *int err := db.QueryRow(ctx, "SELECT name, path, enable FROM libraries WHERE id = $1", id).Scan(&name, &lpath, &enabled)
var scan_new_queue_health bool
err := db.QueryRow(ctx, "SELECT name, path, enable, health_command_id, scan_new_queue_health FROM libraries WHERE id = $1", id).Scan(&name, &lpath, &enabled, &health_command_id, &scan_new_queue_health)
if err != nil { if err != nil {
slog.ErrorContext(ctx, "Get Library", "err", err) slog.ErrorContext(ctx, "Get Library", "err", err)
return return
@ -198,17 +180,22 @@ func scanLibrary(ctx context.Context, id int, scanPool *workerpool.WorkerPool) {
return return
} }
tx, err := db.Begin(ctx)
if err != nil {
slog.ErrorContext(ctx, "Begin Transaction", "err", err)
return
}
defer tx.Rollback(ctx)
slog.InfoContext(ctx, "Checking Files...", "id", id, "path", lpath) slog.InfoContext(ctx, "Checking Files...", "id", id, "path", lpath)
// TODO Work on getting missing status again // Mark all Files as Missing
/* _, err = tx.Exec(ctx, "UPDATE files SET status = $2 where library_id = $1", id, constants.FILE_STATUS_MISSING)
// Mark all Files as Missing if err != nil {
_, err = tx.Exec(ctx, "UPDATE files SET status = $2 where library_id = $1", id, constants.FILE_STATUS_MISSING) slog.ErrorContext(ctx, "Setting Missing Status", "err", err)
if err != nil { return
slog.ErrorContext(ctx, "Setting Missing Status", "err", err) }
return
}
*/
err = filepath.Walk(lpath, err = filepath.Walk(lpath,
func(fullPath string, info os.FileInfo, err error) error { func(fullPath string, info os.FileInfo, err error) error {
if errors.Is(err, os.ErrPermission) { if errors.Is(err, os.ErrPermission) {
@ -224,7 +211,7 @@ func scanLibrary(ctx context.Context, id int, scanPool *workerpool.WorkerPool) {
} }
if !slices.Contains(videoFileExtensions, filepath.Ext(fullPath)) { if !slices.Contains(videoFileExtensions, filepath.Ext(fullPath)) {
slog.DebugContext(ctx, "Skipping non video file", "path", fullPath) slog.InfoContext(ctx, "Skipping non video file", "path", fullPath)
return nil return nil
} }
@ -236,63 +223,52 @@ func scanLibrary(ctx context.Context, id int, scanPool *workerpool.WorkerPool) {
var fileID int var fileID int
var oldSize uint var oldSize uint
var oldModTime time.Time var oldModTime time.Time
err = tx.QueryRow(ctx, "SELECT id, size, mod_time FROM files WHERE library_id = $1 AND path = $2", id, fPath).Scan(&fileID, &oldSize, &oldModTime)
// Remove Timezone and Round to nearest Second
newModTime := info.ModTime().UTC().Round(time.Second)
err = db.QueryRow(ctx, "SELECT id, size, mod_time FROM files WHERE library_id = $1 AND path = $2", id, fPath).Scan(&fileID, &oldSize, &oldModTime)
if errors.Is(err, pgx.ErrNoRows) { if errors.Is(err, pgx.ErrNoRows) {
// File Does not Exist Yet // File Does not Exist Yet
scanPool.QueueJob( slog.InfoContext(ctx, "File is New, Running FFProbe...", "path", fullPath)
func(ctx context.Context) {
slog.InfoContext(ctx, "File is New, Running FFProbe...", "path", fullPath)
ffprobeData, err := ffprobe.ProbeURL(ctx, fullPath) ffprobeData, err := ffprobe.ProbeURL(ctx, fullPath)
if err != nil { if err != nil {
slog.ErrorContext(ctx, "ffprobe New File", "err", err) return fmt.Errorf("ffprobe New File: %w", err)
} }
slog.InfoContext(ctx, "ffprobe Done", "path", fullPath) slog.InfoContext(ctx, "ffprobe Done", "path", fullPath)
var file_id int _, err = tx.Exec(ctx, "INSERT INTO files (library_id, path, size, status, health, ffprobe_data) VALUES ($1, $2, $3, $4, $5, $6)", id, fPath, info.Size(), constants.FILE_STATUS_NEW, constants.FILE_HEALTH_UNKNOWN, ffprobeData)
err = db.QueryRow(ctx, `INSERT INTO files (library_id, path, size, status, health, ffprobe_data, mod_time) if err != nil {
VALUES ($1, $2, $3, $4, $5, $6, $7) return fmt.Errorf("Add New File to DB: %w", err)
RETURNING id`, id, fPath, info.Size(), constants.FILE_STATUS_NEW, constants.FILE_HEALTH_UNKNOWN, ffprobeData, newModTime).Scan(&file_id) }
if err != nil {
slog.ErrorContext(ctx, "Add New File to DB", "err", err)
}
if scan_new_queue_health && health_command_id != nil {
slog.InfoContext(ctx, "Queueing HealthCheck On New File", "file_id", file_id, "health_command_id", health_command_id)
_, err = db.Exec(ctx, "INSERT INTO tasks (file_id, type, status, data, ffmpeg_command_id) VALUES ($1,$2,$3,$4,$5)", file_id, constants.TASK_TYPE_HEALTHCHECK, constants.TASK_STATUS_QUEUED, nil, health_command_id)
if err != nil {
slog.ErrorContext(ctx, "Queueing Transcode On HealthCheck Success", "err", err, "file_id", file_id, "health_command_id", health_command_id)
}
}
})
return nil return nil
} else if err != nil { } else if err != nil {
return fmt.Errorf("Getting File: %w", err) return fmt.Errorf("Getting File: %w", err)
} }
// Remove Timezone and Round to nearest Second
newModTime := info.ModTime().UTC().Round(time.Second)
// File Already Exists, Check if it has been changed // File Already Exists, Check if it has been changed
if newModTime.Equal(oldModTime) && uint(info.Size()) == oldSize { if newModTime.Equal(oldModTime) && uint(info.Size()) == oldSize {
slog.Debug("File stayed the same", "id", fileID) slog.Debug("File stayed the same", "id", fileID)
_, err = tx.Exec(ctx, "UPDATE files SET status = $2 WHERE id = $1", fileID, constants.FILE_STATUS_EXISTS)
if err != nil {
return fmt.Errorf("Updating Non Changed File in DB: %w", err)
}
} else { } else {
scanPool.QueueJob(func(ctx context.Context) { slog.InfoContext(ctx, "File Has Changed", "path", fullPath, "old_mod_time", oldModTime, "new_mod_time", newModTime, "old_size", oldSize, "new_size", info.Size())
slog.InfoContext(ctx, "File Has Changed", "path", fullPath, "old_mod_time", oldModTime, "new_mod_time", newModTime, "old_size", oldSize, "new_size", info.Size())
ffprobeData, err := ffprobe.ProbeURL(ctx, fullPath) ffprobeData, err := ffprobe.ProbeURL(ctx, fullPath)
if err != nil { if err != nil {
slog.ErrorContext(ctx, "ffprobe Changed File", "err", err) return fmt.Errorf("ffprobe Changed File: %w", err)
} }
slog.InfoContext(ctx, "ffprobe Done", "path", fullPath) slog.InfoContext(ctx, "ffprobe Done", "path", fullPath)
_, err = db.Exec(ctx, "UPDATE files SET size = $2, status = $3, health = $4, mod_time = $5, ffprobe_data = $6 WHERE id = $1", fileID, info.Size(), constants.FILE_STATUS_CHANGED, constants.FILE_HEALTH_UNKNOWN, newModTime, ffprobeData) _, err = tx.Exec(ctx, "UPDATE files SET size = $2, status = $3, health = $4, mod_time = $5, ffprobe_data = $6 WHERE id = $1", fileID, info.Size(), constants.FILE_STATUS_CHANGED, constants.FILE_HEALTH_UNKNOWN, newModTime, ffprobeData)
if err != nil { if err != nil {
slog.ErrorContext(ctx, "Updating Changed File in DB", "err", err) return fmt.Errorf("Updating Changed File in DB: %w", err)
} }
})
} }
return nil return nil
}) })
if err != nil { if err != nil {
@ -300,7 +276,10 @@ func scanLibrary(ctx context.Context, id int, scanPool *workerpool.WorkerPool) {
return return
} }
scanPool.WaitForEmptyQueue() err = tx.Commit(ctx)
if err != nil {
slog.ErrorContext(ctx, "Error Committing Changes", "err", err)
return
}
slog.InfoContext(ctx, "Scan Done", "id", id) slog.InfoContext(ctx, "Scan Done", "id", id)
} }

View file

@ -17,7 +17,6 @@ import (
"github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/pgxpool"
"git.lastassault.de/speatzle/morffix/config" "git.lastassault.de/speatzle/morffix/config"
"git.lastassault.de/speatzle/morffix/workerpool"
) )
var conf config.Config var conf config.Config
@ -103,9 +102,6 @@ func Start(_conf config.Config, tmplFS embed.FS, staticFS embed.FS, migrationsFS
mux.HandleFunc("/libraries", handleLibraries) mux.HandleFunc("/libraries", handleLibraries)
mux.HandleFunc("/ffmpeg_commands", handleFfmpegCommands) mux.HandleFunc("/ffmpeg_commands", handleFfmpegCommands)
mux.HandleFunc("/queue_enable", HandleSetQueueEnable) mux.HandleFunc("/queue_enable", HandleSetQueueEnable)
mux.HandleFunc("/parallel_tasks", HandleSetParallelTasks)
mux.HandleFunc("/stats", handleStats)
mux.HandleFunc("/stats/{id}", handleStats)
mux.HandleFunc("/", handleIndex) mux.HandleFunc("/", handleIndex)
@ -133,11 +129,8 @@ func Start(_conf config.Config, tmplFS embed.FS, staticFS embed.FS, migrationsFS
stopWorkerManagement := make(chan bool, 1) stopWorkerManagement := make(chan bool, 1)
go manageWorkers(stopWorkerManagement) go manageWorkers(stopWorkerManagement)
scanPool := workerpool.NewWorkerPool(10, 100000)
hashPool := workerpool.NewWorkerPool(5, 100000)
stopScanning := make(chan bool, 1) stopScanning := make(chan bool, 1)
go manageScan(stopScanning, scanPool, hashPool) go manageScan(stopScanning)
sigs := make(chan os.Signal, 1) sigs := make(chan os.Signal, 1)
signal.Notify(sigs, os.Interrupt) signal.Notify(sigs, os.Interrupt)
@ -153,10 +146,6 @@ func Start(_conf config.Config, tmplFS embed.FS, staticFS embed.FS, migrationsFS
stopWorkerManagement <- true stopWorkerManagement <- true
slog.Info("Stopping Scanning...") slog.Info("Stopping Scanning...")
stopScanning <- true stopScanning <- true
slog.Info("Stopping Scanning Pool...")
scanPool.Stop()
slog.Info("Stopping Hashing Pool...")
hashPool.Stop()
slog.Info("Done") slog.Info("Done")
} }
} }

View file

@ -1,369 +0,0 @@
package server
import (
"bytes"
"context"
"fmt"
"html/template"
"log/slog"
"net/http"
"slices"
"strings"
"time"
"git.lastassault.de/speatzle/morffix/constants"
"github.com/go-echarts/go-echarts/v2/charts"
"github.com/go-echarts/go-echarts/v2/opts"
"github.com/jackc/pgx/v5"
)
type StatsDisplay struct {
Charts []ChartData
Libraries []Library
SelectedLibrary string
Size []int
Duration []int
Count []int
Saved []int
}
type ChartData struct {
Element template.HTML
Script template.HTML
}
type PieValue struct {
Name string
Value int
}
type PieIntValue struct {
Id int
Value int
}
const CHART_COLOR_SUCCESS = "#7cffb2"
const CHART_COLOR_FAILED = "#ff6e76"
const CHART_COLOR_UNKNOWN = "#fddd60"
func generatePie(name string, data []opts.PieData) ChartData {
pie := charts.NewPie()
pie.SetGlobalOptions(
charts.WithInitializationOpts(opts.Initialization{
Theme: "dark",
BackgroundColor: "#111",
}),
charts.WithTitleOpts(opts.Title{
Title: name,
}))
for i := range data {
if data[i].Name == "Success" || data[i].Name == "Healthy" {
data[i].ItemStyle = &opts.ItemStyle{
Color: CHART_COLOR_SUCCESS,
}
} else if data[i].Name == "Failed" || data[i].Name == "Damaged" {
data[i].ItemStyle = &opts.ItemStyle{
Color: CHART_COLOR_FAILED,
}
} else if data[i].Name == "Unknown" || data[i].Name == "None" {
data[i].ItemStyle = &opts.ItemStyle{
Color: CHART_COLOR_UNKNOWN,
}
}
}
pie.AddSeries(name, data).SetSeriesOptions(
charts.WithLabelOpts(
opts.Label{
Show: opts.Bool(true),
Formatter: "{b}: {c}",
}),
)
snippet := pie.RenderSnippet()
return ChartData{
Element: template.HTML(snippet.Element),
Script: template.HTML(snippet.Script),
}
}
func generateStats(ctx context.Context, library_id string) ([]ChartData, error) {
data := []ChartData{}
rows, err := db.Query(ctx,
`SELECT COALESCE(jsonb_path_query_first(ffprobe_data, '$.streams[*] ? (@.codec_type == "video") ? (@.disposition.attached_pic == 0).codec_name')::text, 'Unknown') AS name, COUNT(*) AS value
FROM files
WHERE ffprobe_data IS NOT NULL AND ($1 = -1 OR library_id = $1)
GROUP BY 1;`, library_id)
if err != nil {
return nil, fmt.Errorf("Query Codecs: %w", err)
}
codecCounts, err := pgx.CollectRows(rows, pgx.RowToStructByName[PieValue])
if err != nil {
return nil, fmt.Errorf("Collect Codec Data: %w", err)
}
res := []opts.PieData{}
for _, v := range codecCounts {
res = append(res, opts.PieData{
Name: strings.ReplaceAll(v.Name, "\"", ""),
Value: v.Value,
})
}
data = append(data, generatePie("Codecs", res))
rows, err = db.Query(ctx,
`SELECT COALESCE(jsonb_path_query_first(ffprobe_data, '$.streams[*] ? (@.codec_type == "video") ? (@.disposition.attached_pic == 0).width')::text, 'Unknown') AS name, COUNT(*) AS value
FROM files
WHERE ffprobe_data IS NOT NULL AND ($1 = -1 OR library_id = $1)
GROUP BY 1;`, library_id)
if err != nil {
return nil, fmt.Errorf("Query Resolution: %w", err)
}
resolutionCounts, err := pgx.CollectRows(rows, pgx.RowToStructByName[PieValue])
if err != nil {
return nil, fmt.Errorf("Collect Codec Data: %w", err)
}
res = []opts.PieData{}
for _, v := range resolutionCounts {
res = append(res, opts.PieData{
Name: strings.ReplaceAll(v.Name, "\"", ""),
Value: v.Value,
})
}
data = append(data, generatePie("Resolution", res))
rows, err = db.Query(ctx,
`SELECT COALESCE(jsonb_path_query_first(ffprobe_data, '$.format.format_name')::text, 'Unknown') AS name, COUNT(*) AS value
FROM files
WHERE ffprobe_data IS NOT NULL AND ($1 = -1 OR library_id = $1)
GROUP BY 1;`, library_id)
if err != nil {
return nil, fmt.Errorf("Query Container: %w", err)
}
containerCounts, err := pgx.CollectRows(rows, pgx.RowToStructByName[PieValue])
if err != nil {
return nil, fmt.Errorf("Collect Container Data: %w", err)
}
res = []opts.PieData{}
for _, v := range containerCounts {
res = append(res, opts.PieData{
Name: strings.ReplaceAll(v.Name, "\"", ""),
Value: v.Value,
})
}
data = append(data, generatePie("Container", res))
rows, err = db.Query(ctx,
`SELECT health AS id, COUNT(*) AS value
FROM files WHERE ($1 = -1 OR library_id = $1)
GROUP BY 1;`, library_id)
if err != nil {
return nil, fmt.Errorf("Query Health: %w", err)
}
healthCounts, err := pgx.CollectRows(rows, pgx.RowToStructByName[PieIntValue])
if err != nil {
return nil, fmt.Errorf("Collect Health Data: %w", err)
}
res = []opts.PieData{}
for _, v := range healthCounts {
res = append(res, opts.PieData{
Name: constants.FileHealth(v.Id).String(),
Value: v.Value,
})
}
data = append(data, generatePie("Health", res))
rows, err = db.Query(ctx,
`SELECT transcode AS id, COUNT(*) AS value
FROM files WHERE ($1 = -1 OR library_id = $1)
GROUP BY 1;`, library_id)
if err != nil {
return nil, fmt.Errorf("Query Transcode: %w", err)
}
transcodeCounts, err := pgx.CollectRows(rows, pgx.RowToStructByName[PieIntValue])
if err != nil {
return nil, fmt.Errorf("Collect Transcode Data: %w", err)
}
res = []opts.PieData{}
for _, v := range transcodeCounts {
res = append(res, opts.PieData{
Name: constants.FileTranscode(v.Id).String(),
Value: v.Value,
})
}
data = append(data, generatePie("Transcode Status", res))
rows, err = db.Query(ctx,
`SELECT tasks.status AS id, COUNT(*) AS value
FROM tasks INNER JOIN files ON files.id = tasks.file_id
WHERE ($1 = -1 OR files.library_id = $1)
GROUP BY 1;`, library_id)
if err != nil {
return nil, fmt.Errorf("Query Task Status: %w", err)
}
taskStatusCounts, err := pgx.CollectRows(rows, pgx.RowToStructByName[PieIntValue])
if err != nil {
return nil, fmt.Errorf("Collect Task Status Data: %w", err)
}
res = []opts.PieData{}
for _, v := range taskStatusCounts {
res = append(res, opts.PieData{
Name: constants.TaskStatus(v.Id).String(),
Value: v.Value,
})
}
data = append(data, generatePie("Task Status", res))
type BarTaskRowValue struct {
Date time.Time
Status constants.TaskStatus
Count int
}
rows, err = db.Query(ctx,
`SELECT date_trunc('day', tasks.updated_at) date, tasks.status, COUNT(*) AS count
FROM tasks INNER JOIN files ON files.id = tasks.file_id
WHERE ($1 = -1 OR files.library_id = $1) AND tasks.updated_at > CURRENT_DATE - 7 AND (tasks.status = $2 OR tasks.status = $3)
GROUP BY 1,2
ORDER BY date;`, library_id, constants.TASK_STATUS_SUCCESS, constants.TASK_STATUS_FAILED)
if err != nil {
return nil, fmt.Errorf("Query Task Status Day: %w", err)
}
taskStatusDayCounts, err := pgx.CollectRows(rows, pgx.RowToStructByName[BarTaskRowValue])
if err != nil {
return nil, fmt.Errorf("Collect Task Status Day Data: %w", err)
}
days := []string{}
successBarData := []opts.BarData{}
failedBarData := []opts.BarData{}
for _, v := range taskStatusDayCounts {
if !slices.Contains(days, v.Date.Format(time.DateOnly)) {
days = append(days, v.Date.Format(time.DateOnly))
}
if v.Status == constants.TASK_STATUS_SUCCESS {
successBarData = append(successBarData, opts.BarData{
Value: v.Count,
ItemStyle: &opts.ItemStyle{
Color: CHART_COLOR_SUCCESS,
},
})
} else if v.Status == constants.TASK_STATUS_FAILED {
failedBarData = append(failedBarData, opts.BarData{
Value: v.Count,
ItemStyle: &opts.ItemStyle{
Color: CHART_COLOR_FAILED,
},
})
}
}
bar := charts.NewBar()
bar.SetGlobalOptions(
charts.WithInitializationOpts(opts.Initialization{
Theme: "dark",
BackgroundColor: "#111",
}),
charts.WithTitleOpts(opts.Title{
Title: "Task Success/Failed Last 7 Days",
}),
)
bar.SetXAxis(days).
AddSeries("Success", successBarData).
AddSeries("Failed", failedBarData).
SetSeriesOptions(charts.WithBarChartOpts(opts.BarChart{
Stack: "stackA",
}))
snippet := bar.RenderSnippet()
data = append(data, ChartData{
Element: template.HTML(snippet.Element),
Script: template.HTML(snippet.Script),
})
return data, nil
}
func handleStats(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
if id == "" {
id = "-1"
}
data, err := generateStats(r.Context(), id)
if err != nil {
slog.ErrorContext(r.Context(), "Generate Stats:", "err", err)
http.Error(w, "Generate Stats: "+err.Error(), http.StatusInternalServerError)
return
}
rows, err := db.Query(r.Context(),
`SELECT *
FROM libraries;`)
if err != nil {
slog.ErrorContext(r.Context(), "Query Libraries", "err", err)
http.Error(w, "Query Libraries: "+err.Error(), http.StatusInternalServerError)
return
}
libraries, err := pgx.CollectRows(rows, pgx.RowToStructByName[Library])
if err != nil {
slog.ErrorContext(r.Context(), "Collect Libraries", "err", err)
http.Error(w, "Collect Libraries: "+err.Error(), http.StatusInternalServerError)
return
}
var size int
var count int
var duration int
err = db.QueryRow(r.Context(),
`SELECT SUM(size) AS size,
COUNT(id) as count,
0 as duration
FROM files WHERE ($1 = -1 OR files.library_id = $1) AND status = $2`, id, constants.FILE_STATUS_EXISTS).Scan(&size, &count, &duration)
if err != nil {
size = 0
count = 0
duration = 0
}
var saved int
err = db.QueryRow(r.Context(),
`SELECT (SUM(old_size) - COALESCE(SUM(size), 0)) AS saved
FROM files WHERE ($1 = -1 OR files.library_id = $1) AND status = $2 AND old_size IS NOT NULL`, id, constants.FILE_STATUS_EXISTS).Scan(&saved)
if err != nil {
saved = 0
}
buf := bytes.Buffer{}
err = templates.ExecuteTemplate(&buf, constants.STATS_TEMPLATE_NAME, StatsDisplay{
Libraries: libraries,
SelectedLibrary: id,
Size: splitInt(size),
Count: splitInt(count),
Duration: splitInt(duration),
Saved: splitInt(saved),
Charts: data,
})
if err != nil {
slog.ErrorContext(r.Context(), "Executing Stats Template", "err", err)
http.Error(w, "Error Executing Template: "+err.Error(), http.StatusInternalServerError)
return
}
_, err = w.Write(buf.Bytes())
if err != nil {
slog.ErrorContext(r.Context(), "Writing http Response", "err", err)
}
}

View file

@ -107,7 +107,7 @@ func handleTasks(w http.ResponseWriter, r *http.Request) {
return return
} }
rows, err := db.Query(r.Context(), "SELECT * FROM libraries WHERE enable = $1", true) rows, err := db.Query(r.Context(), "SELECT id, name, path, enable, scan_new_queue_health, scan_changed_queue_health, scan_new_queue_transcode, scan_changed_queue_transcode, health_ok_queue_transcode, transcode_replace, transcode_path FROM libraries WHERE enable = $1", true)
if err != nil { if err != nil {
slog.ErrorContext(r.Context(), "Query Libraries", "err", err) slog.ErrorContext(r.Context(), "Query Libraries", "err", err)
http.Error(w, "Error Query Libraries: "+err.Error(), http.StatusInternalServerError) http.Error(w, "Error Query Libraries: "+err.Error(), http.StatusInternalServerError)
@ -328,7 +328,6 @@ type QueuedTask struct {
Type constants.TaskType Type constants.TaskType
FileID int `json:"file_id"` FileID int `json:"file_id"`
FileMD5 []byte `json:"file_md5" db:"md5"` FileMD5 []byte `json:"file_md5" db:"md5"`
FileExtension string `json:"file_extension"`
Data json.RawMessage Data json.RawMessage
FfmpegCommand types.FFmpegCommand `json:"ffmpeg_command" db:"ffmpeg_command"` FfmpegCommand types.FFmpegCommand `json:"ffmpeg_command" db:"ffmpeg_command"`
} }
@ -345,7 +344,7 @@ func assignQueuedTasks(ctx context.Context) error {
assignRunning = false assignRunning = false
}() }()
rows, err := db.Query(ctx, "SELECT t.id as id, t.type as type, t.file_id as file_id, f.md5 as md5, substring(f.path FROM '\\.([^\\.]*)$') AS file_extension, t.data as data, fc.data as ffmpeg_command FROM tasks t INNER JOIN files f ON f.id = t.file_id INNER JOIN ffmpeg_commands fc ON fc.id = t.ffmpeg_command_id WHERE t.status = $1 AND f.status = $2 ORDER BY type DESC", constants.TASK_STATUS_QUEUED, constants.FILE_STATUS_EXISTS) rows, err := db.Query(ctx, "SELECT t.id as id, t.type as type, t.file_id as file_id, f.md5 as md5, t.data as data, fc.data as ffmpeg_command FROM tasks t INNER JOIN files f ON f.id = t.file_id INNER JOIN ffmpeg_commands fc ON fc.id = t.ffmpeg_command_id WHERE t.status = $1", constants.TASK_STATUS_QUEUED)
if err != nil { if err != nil {
return fmt.Errorf("Query Queued Tasks: %w", err) return fmt.Errorf("Query Queued Tasks: %w", err)
} }
@ -354,8 +353,6 @@ func assignQueuedTasks(ctx context.Context) error {
return fmt.Errorf("Collect Queued Tasks: %w", err) return fmt.Errorf("Collect Queued Tasks: %w", err)
} }
// TODO, allow overwriting of extension
//slog.Info("Assigning Queued Tasks", "count", len(queuedTasks)) //slog.Info("Assigning Queued Tasks", "count", len(queuedTasks))
if len(queuedTasks) == 0 { if len(queuedTasks) == 0 {
@ -375,8 +372,7 @@ func assignQueuedTasks(ctx context.Context) error {
} }
if Workers[i].Connected { if Workers[i].Connected {
var queueEnable bool var queueEnable bool
var parallelTasks int err := db.QueryRow(ctx, "SELECT queue_enable FROM workers WHERE id = $1", i).Scan(&queueEnable)
err := db.QueryRow(ctx, "SELECT queue_enable, parallel_tasks FROM workers WHERE id = $1", i).Scan(&queueEnable, &parallelTasks)
if err != nil { if err != nil {
return fmt.Errorf("Error Querying Worker Queue Enable: %w", err) return fmt.Errorf("Error Querying Worker Queue Enable: %w", err)
} }
@ -394,7 +390,7 @@ func assignQueuedTasks(ctx context.Context) error {
slog.Debug("Assigning Queued Tasks Worker", "worker", i, "count", count) slog.Debug("Assigning Queued Tasks Worker", "worker", i, "count", count)
// Allow for Multiple Tasks at once in the future // Allow for Multiple Tasks at once in the future
if count < parallelTasks { if count < 1 {
tx, err := db.Begin(ctx) tx, err := db.Begin(ctx)
defer tx.Rollback(ctx) defer tx.Rollback(ctx)
if err != nil { if err != nil {
@ -410,7 +406,6 @@ func assignQueuedTasks(ctx context.Context) error {
Type: queuedTasks[lastAssigned].Type, Type: queuedTasks[lastAssigned].Type,
FileID: queuedTasks[lastAssigned].FileID, FileID: queuedTasks[lastAssigned].FileID,
FileMD5: queuedTasks[lastAssigned].FileMD5, FileMD5: queuedTasks[lastAssigned].FileMD5,
FileExtension: queuedTasks[lastAssigned].FileExtension,
Data: queuedTasks[lastAssigned].Data, Data: queuedTasks[lastAssigned].Data,
FfmpegCommand: queuedTasks[lastAssigned].FfmpegCommand, FfmpegCommand: queuedTasks[lastAssigned].FfmpegCommand,
} }

View file

@ -1,20 +1,15 @@
package server package server
import ( import (
"context"
"encoding/base64" "encoding/base64"
"errors" "errors"
"fmt"
"io" "io"
"log/slog" "log/slog"
"net/http" "net/http"
"os" "os"
"path/filepath" "path/filepath"
"strconv"
"time"
"git.lastassault.de/speatzle/morffix/constants" "git.lastassault.de/speatzle/morffix/constants"
"gopkg.in/vansante/go-ffprobe.v2"
) )
func handleUpload(w http.ResponseWriter, r *http.Request) { func handleUpload(w http.ResponseWriter, r *http.Request) {
@ -23,56 +18,30 @@ func handleUpload(w http.ResponseWriter, r *http.Request) {
return return
} }
strid := r.PathValue("id") id := r.PathValue("id")
if strid == "" { if id == "" {
http.Error(w, "No id", http.StatusBadRequest) http.Error(w, "No id", http.StatusBadRequest)
return return
} }
id, err := strconv.Atoi(strid) var count int
err := db.QueryRow(r.Context(), "SELECT COUNT(*) FROM tasks WHERE file_id = $1 AND worker_id = $2", id, r.Header.Get(constants.UUID_HEADER)).Scan(&count)
if err != nil { if err != nil {
errorUpload(r, w, 0, "Convert File ID to int", err) slog.ErrorContext(r.Context(), "Query Task Count", "err", err)
http.Error(w, "Error Query Task Count: "+err.Error(), http.StatusInternalServerError)
return return
} }
if r.Header.Get(constants.WORKER_VERSION_HEADER) != constants.WORKER_VERSION { if count < 1 {
http.Error(w, "Wrong Worker Version", http.StatusBadRequest) slog.ErrorContext(r.Context(), "No Running Task for file", "id", id)
return http.Error(w, "No Running Task for file: "+id, http.StatusBadRequest)
}
taskid, err := strconv.Atoi(r.Header.Get(constants.TASK_ID_HEADER))
if err != nil {
errorUpload(r, w, 0, "Convert Task ID to int", err)
return
}
var fileid int
var workerid string
var status constants.TaskStatus
err = db.QueryRow(r.Context(), "SELECT file_id, worker_id, status FROM tasks WHERE id = $1", taskid).Scan(&fileid, &workerid, &status)
if err != nil {
errorUpload(r, w, 0, "Getting Task for Upload", err)
return
}
if workerid != r.Header.Get(constants.UUID_HEADER) {
errorUpload(r, w, taskid, "Worker is not Assigned to Task", err)
return
}
if fileid != id {
errorUpload(r, w, taskid, "File is not for this Task", err)
return
}
if !(status == constants.TASK_STATUS_RUNNING || status == constants.TASK_STATUS_UNKNOWN) {
errorUpload(r, w, taskid, "File can only be uploaded when task status is running or unknown", err)
return return
} }
hash, err := base64.StdEncoding.DecodeString(r.Header.Get(constants.HASH_HEADER)) hash, err := base64.StdEncoding.DecodeString(r.Header.Get(constants.HASH_HEADER))
if err != nil { if err != nil {
errorUpload(r, w, taskid, "Decode Hash", err) slog.ErrorContext(r.Context(), "Decode Hash", "err", err)
http.Error(w, "Decode Hash: "+err.Error(), http.StatusBadRequest)
return return
} }
@ -80,7 +49,8 @@ func handleUpload(w http.ResponseWriter, r *http.Request) {
var libraryID int var libraryID int
err = db.QueryRow(r.Context(), "SELECT path, library_id FROM files WHERE id = $1", id).Scan(&fPath, &libraryID) err = db.QueryRow(r.Context(), "SELECT path, library_id FROM files WHERE id = $1", id).Scan(&fPath, &libraryID)
if err != nil { if err != nil {
errorUpload(r, w, taskid, "Query File Path", err) slog.ErrorContext(r.Context(), "Query File Path", "err", err)
http.Error(w, "Error Query File Path: "+err.Error(), http.StatusInternalServerError)
return return
} }
@ -89,18 +59,18 @@ func handleUpload(w http.ResponseWriter, r *http.Request) {
var tReplace bool var tReplace bool
err = db.QueryRow(r.Context(), "SELECT path, transcode_path, transcode_replace FROM libraries WHERE id = $1", libraryID).Scan(&lPath, &tPath, &tReplace) err = db.QueryRow(r.Context(), "SELECT path, transcode_path, transcode_replace FROM libraries WHERE id = $1", libraryID).Scan(&lPath, &tPath, &tReplace)
if err != nil { if err != nil {
errorUpload(r, w, taskid, "Query Library Path", err) slog.ErrorContext(r.Context(), "Query Library Path", "err", err)
http.Error(w, "Error Query Library Path: "+err.Error(), http.StatusInternalServerError)
return return
} }
// When replacing, send to temp, otherwise write directly
var path string var path string
if tReplace { if tReplace {
path = filepath.Join(lPath, fPath+constants.TEMP_FILE_EXTENSION) //path = filepath.Join(lPath, fPath)
// if replace then this is a temp file and should be cleaned up on error slog.ErrorContext(r.Context(), "Replace mode is not implemented")
defer os.Remove(path) http.Error(w, "Replace mode is not implemented", http.StatusNotImplemented)
return
} else { } else {
// todo write this to a temp file first also to be able to run cleanup on error and unify the rename logic
path = filepath.Join(tPath, fPath) path = filepath.Join(tPath, fPath)
} }
@ -108,13 +78,15 @@ func handleUpload(w http.ResponseWriter, r *http.Request) {
err = r.ParseMultipartForm(100 << 20) err = r.ParseMultipartForm(100 << 20)
if err != nil { if err != nil {
errorUpload(r, w, taskid, "Parse Multipart Form", err) slog.ErrorContext(r.Context(), "Parse Multipart Form", "err", err)
http.Error(w, "Parse Multipart Form: "+err.Error(), http.StatusInternalServerError)
return return
} }
srcFile, _, err := r.FormFile(constants.FORM_FILE_KEY) srcFile, _, err := r.FormFile(constants.FORM_FILE_KEY)
if err != nil { if err != nil {
errorUpload(r, w, taskid, "Form File", err) slog.ErrorContext(r.Context(), "Parse Multipart Form", "err", err)
http.Error(w, "Parse Multipart Form: "+err.Error(), http.StatusInternalServerError)
return return
} }
@ -122,18 +94,17 @@ func handleUpload(w http.ResponseWriter, r *http.Request) {
// MaxBytesReader closes the underlying io.Reader on its Close() is called // MaxBytesReader closes the underlying io.Reader on its Close() is called
defer src.Close() defer src.Close()
// if we are not replacing then we dont know if the destination folder exists err = os.MkdirAll(filepath.Dir(path), 0775)
if !tReplace { if err != nil {
err = os.MkdirAll(filepath.Dir(path), 0775) slog.ErrorContext(r.Context(), "Creating Folder", "err", err)
if err != nil { http.Error(w, "Creating Folder: "+err.Error(), http.StatusInternalServerError)
errorUpload(r, w, taskid, "Creating Folder", err) return
return
}
} }
out, err := os.Create(path) out, err := os.Create(path)
if err != nil { if err != nil {
errorUpload(r, w, taskid, "Creating File", err) slog.ErrorContext(r.Context(), "Creating File", "err", err)
http.Error(w, "Creating File: "+err.Error(), http.StatusInternalServerError)
return return
} }
defer out.Close() defer out.Close()
@ -142,80 +113,17 @@ func handleUpload(w http.ResponseWriter, r *http.Request) {
if err != nil { if err != nil {
var maxBytesError *http.MaxBytesError var maxBytesError *http.MaxBytesError
if errors.As(err, &maxBytesError) { if errors.As(err, &maxBytesError) {
slog.ErrorContext(r.Context(), "File to Large", "err", err) slog.ErrorContext(r.Context(), "Parse Multipart Form", "err", err)
http.Error(w, "File to Large: "+err.Error(), http.StatusRequestEntityTooLarge) http.Error(w, "Parse Multipart Form: "+err.Error(), http.StatusRequestEntityTooLarge)
return return
} }
errorUpload(r, w, taskid, "Failed to write the uploaded content", err) slog.ErrorContext(r.Context(), "Failed to write the uploaded content", "err", err)
http.Error(w, "Failed to write the uploaded content: "+err.Error(), http.StatusInternalServerError)
return return
} }
// TODO check file integrity
_ = hash
slog.InfoContext(r.Context(), "upload done", "written", written) slog.InfoContext(r.Context(), "upload done", "written", written)
out.Close()
// if we are replacing, then we need to change the original file to the uploaded file and update the hash in the database
if tReplace {
tx, err := db.Begin(r.Context())
if err != nil {
errorUpload(r, w, taskid, "Begin Transaction", err)
return
}
dstPath := filepath.Join(lPath, fPath)
slog.InfoContext(r.Context(), "Replacing Original file With Uploaded File", "fileid", id, "path", path, "dstPath", dstPath)
err = os.Rename(path, dstPath)
if err != nil {
errorUpload(r, w, taskid, "Replace File", err)
return
}
info, err := os.Stat(dstPath)
if err != nil {
errorUpload(r, w, taskid, "Stat File", err)
return
}
modTime := info.ModTime().UTC().Round(time.Second)
size := info.Size()
ffprobeData, err := ffprobe.ProbeURL(r.Context(), dstPath)
if err != nil {
errorUpload(r, w, taskid, "ffProbe File", err)
return
}
_, err = tx.Exec(r.Context(), "UPDATE files SET old_md5 = md5, old_size = size, old_ffprobe_data = ffprobe_data WHERE id = $1", fileid)
if err != nil {
errorUpload(r, w, taskid, "Copy Filed Current Values to Old Values", err)
return
}
_, err = tx.Exec(r.Context(), "UPDATE files SET md5 = $2, size = $3, mod_time = $4, ffprobe_data = $5 WHERE id = $1", fileid, hash, size, modTime, ffprobeData)
if err != nil {
errorUpload(r, w, taskid, "Set New File Values", err)
return
}
err = tx.Commit(r.Context())
if err != nil {
errorUpload(r, w, taskid, "Commit File Hash", err)
return
}
slog.InfoContext(r.Context(), "Original file Replaced with Uploaded File", "fileid", id, "dstPath", dstPath)
} else {
// TODO implement "old" fields for non replace libraries, scan also needs to use old field if it is is non replace and the file has been transcoded
}
}
func errorUpload(r *http.Request, w http.ResponseWriter, taskID int, msg string, err error) {
slog.ErrorContext(r.Context(), msg, "err", err)
http.Error(w, msg+": "+err.Error(), http.StatusInternalServerError)
if taskID != 0 {
_, err2 := db.Exec(context.TODO(), "UPDATE tasks SET log = log || $2 WHERE id = $1", taskID, []string{fmt.Sprintf("%v MASTER: upload error: "+msg+": "+err.Error(), time.Now())})
if err != nil {
slog.ErrorContext(r.Context(), "Updating task log with upload error", "err", err2)
}
}
} }

View file

@ -247,8 +247,7 @@ func updateWorkerTaskStatus(ctx context.Context) {
var health constants.FileHealth var health constants.FileHealth
if ts.Task.Status == constants.TASK_STATUS_SUCCESS { if ts.Task.Status == constants.TASK_STATUS_SUCCESS {
health = constants.FILE_HEALTH_HEALTHY health = constants.FILE_HEALTH_HEALTHY
// Auto Queue Transcode for Successfull Transcodes if library setting say so // TODO Auto Queue Transcode for Successfull Transcodes if library setting
go queueOnHealth(context.TODO(), ts.Task.ID)
} else { } else {
// TODO, not all failures mean the file is damaged, only update on success and track ffmpeg errors in task result data. also remove -xerror and scan for errors manually to see all problems in logs // TODO, not all failures mean the file is damaged, only update on success and track ffmpeg errors in task result data. also remove -xerror and scan for errors manually to see all problems in logs
health = constants.FILE_HEALTH_DAMAGED health = constants.FILE_HEALTH_DAMAGED
@ -295,26 +294,3 @@ func updateWorkerTaskStatus(ctx context.Context) {
wg.Wait() wg.Wait()
} }
func queueOnHealth(ctx context.Context, taskID int) {
var transcode_command_id *int
var file_id *int
err := db.QueryRow(ctx, `SELECT libraries.transcode_command_id, files.id
FROM tasks
INNER JOIN files ON files.id = tasks.file_id
INNER JOIN libraries ON files.library_id = libraries.id
WHERE tasks.id = $1 AND libraries.health_ok_queue_transcode = true AND libraries.transcode_command_id IS NOT NULL`, taskID).
Scan(&transcode_command_id, &file_id)
if err == pgx.ErrNoRows {
slog.DebugContext(ctx, "No need to queue Transcode", "task_id", taskID)
} else if err != nil {
slog.ErrorContext(ctx, "Unable to Query if and how we should queue a Transcode on Health Check Success", "err", err)
return
}
slog.InfoContext(ctx, "Queueing Transcode On HealthCheck Success", "health_task_id", taskID, "transcode_command_id", transcode_command_id)
_, err = db.Exec(ctx, "INSERT INTO tasks (file_id, type, status, data, ffmpeg_command_id) VALUES ($1,$2,$3,$4,$5)", file_id, constants.TASK_TYPE_TRANSCODE, constants.TASK_STATUS_QUEUED, nil, transcode_command_id)
if err != nil {
slog.ErrorContext(ctx, "Queueing Transcode On HealthCheck Success", "err", err, "health_task_id", taskID, "transcode_command_id", transcode_command_id)
}
}

File diff suppressed because one or more lines are too long

View file

@ -1,7 +0,0 @@
function setLibrary(library_id) {
if (library_id == "-1") {
window.location.href = "/stats";
} else {
window.location.href = "/stats/" + library_id;
}
}

View file

@ -3,65 +3,58 @@
@import url("component.css"); @import url("component.css");
:root { :root {
font-family: monospace; font-family: monospace;
font-size: 12px; font-size: 12px;
overflow-y: auto; overflow-y: auto;
} }
body { body {
padding: 0.5rem; padding: 0.5rem;
gap: 0.5rem; gap: 0.5rem;
} }
.counter-image { .counter-image {
image-rendering: pixelated; image-rendering: pixelated;
image-rendering: -moz-crisp-edges; image-rendering: -moz-crisp-edges;
image-rendering: crisp-edges; image-rendering: crisp-edges;
flex-grow: 1; flex-grow: 1;
max-height: 50vh;
max-width: 22.5vh;
} }
.counter { .counter {
flex-flow: row nowrap; flex-flow: row nowrap;
align-items: end; align-items: end;
font-size: 1.5rem; font-size: 1.5rem;
font-weight: bold; font-weight: bold;
} }
.workers { .workers {
flex-flow: row wrap; flex-flow: row wrap;
gap: 2rem; gap: 2rem;
} }
.workers > * { .workers>* {
gap: 0.5rem; gap: 0.5rem;
border: 1px solid var(--fg); border: 1px solid var(--fg);
} }
.log > p { .log>p {
padding: 0.25rem 0; padding: 0.25rem 0;
user-select: text; user-select: text;
} }
.log > p:hover { .log>p:hover {
background: var(--cl-hl); background: var(--cl-hl);
} }
nav > :first-child { nav> :first-child {
font-size: 2rem; font-size: 2rem;
font-weight: bold; font-weight: bold;
} }
.short-button { .short-button {
align-self: start; align-self: start;
} }
.button-list { .button-list {
flex-direction: row; flex-direction: row;
} }
.stats {
flex-flow: row wrap;
gap: 2rem;
}

View file

@ -17,7 +17,7 @@ func RunHealthCheck(conf config.Config, t *types.Task, data types.HealthCheckDat
l := log.GetTaskLogger(t) l := log.GetTaskLogger(t)
// TODO Figure out how to get correct file ending // TODO Figure out how to get correct file ending
path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-health-%v-%v."+t.FileExtension, t.ID, t.FileID)) path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-health-%v-%v.mkv", t.ID, t.FileID))
// Set ffmpeg input path // Set ffmpeg input path
if len(t.FfmpegCommand.InputFiles) == 0 { if len(t.FfmpegCommand.InputFiles) == 0 {

View file

@ -27,7 +27,6 @@ func StartTask(conf config.Config, data types.TaskStart) error {
Type: data.Type, Type: data.Type,
FileID: data.FileID, FileID: data.FileID,
FileMD5: data.FileMD5, FileMD5: data.FileMD5,
FileExtension: data.FileExtension,
FfmpegCommand: data.FfmpegCommand, FfmpegCommand: data.FfmpegCommand,
} }

View file

@ -17,8 +17,8 @@ func RunTranscode(conf config.Config, t *types.Task, data types.TranscodeData) {
l := log.GetTaskLogger(t) l := log.GetTaskLogger(t)
// TODO Figure out how to get correct file ending // TODO Figure out how to get correct file ending
src_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-src-%v-%v."+t.FileExtension, t.ID, t.FileID)) src_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-src-%v-%v.mkv", t.ID, t.FileID))
dst_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-dst-%v-%v."+t.FileExtension, t.ID, t.FileID)) dst_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-dst-%v-%v.mkv", t.ID, t.FileID))
// Set ffmpeg input path // Set ffmpeg input path
if len(t.FfmpegCommand.InputFiles) == 0 { if len(t.FfmpegCommand.InputFiles) == 0 {

View file

@ -9,7 +9,6 @@ import (
"mime/multipart" "mime/multipart"
"net/http" "net/http"
"os" "os"
"strconv"
"time" "time"
"git.lastassault.de/speatzle/morffix/config" "git.lastassault.de/speatzle/morffix/config"
@ -83,7 +82,6 @@ func uploadFile(ctx context.Context, l *slog.Logger, conf config.Config, path st
req.Header.Add(constants.UUID_HEADER, uuid.String()) req.Header.Add(constants.UUID_HEADER, uuid.String())
req.Header.Add(constants.WORKER_VERSION_HEADER, constants.WORKER_VERSION) req.Header.Add(constants.WORKER_VERSION_HEADER, constants.WORKER_VERSION)
req.Header.Add(constants.HASH_HEADER, base64.StdEncoding.EncodeToString(hash)) req.Header.Add(constants.HASH_HEADER, base64.StdEncoding.EncodeToString(hash))
req.Header.Add(constants.TASK_ID_HEADER, strconv.Itoa(t.ID))
req.Header.Add("Content-Type", "multipart/form-data; boundary=\""+m.Boundary()+"\"") req.Header.Add("Content-Type", "multipart/form-data; boundary=\""+m.Boundary()+"\"")
var client = &http.Client{ var client = &http.Client{
@ -99,8 +97,7 @@ func uploadFile(ctx context.Context, l *slog.Logger, conf config.Config, path st
l.InfoContext(ctx, "Upload Done") l.InfoContext(ctx, "Upload Done")
if resp.StatusCode != http.StatusOK { if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body) return fmt.Errorf("Got HTTP Status Code: %v", resp.StatusCode)
return fmt.Errorf("Got HTTP Status Code: %v Body: %v", resp.StatusCode, string(body))
} }
return nil return nil

View file

@ -3,8 +3,7 @@
<html> <html>
<head> <head>
<link rel="stylesheet" href="/static/style/style.css"> <link rel="stylesheet" href="/static/style/style.css">
<script src="/static/js/echarts.min.js"></script>
</head> </head>
<body> <body>
{{template "navbar"}} {{template "navbar"}}
{{end}} {{end}}

View file

@ -31,24 +31,6 @@
</select> </select>
<input type="submit" value="Submit"> <input type="submit" value="Submit">
</form> </form>
<h2>Set Parallel Tasks</h2>
<form method="POST" action= "/parallel_tasks">
<label for="worker">Worker:</label>
<select id="worker" name="worker">
<option value="all">All</option>
{{range $w := .Workers}}
<option value="{{$w.ID}}">{{$w.Name}}</option>
{{end}}
</select>
<label for="parallel_tasks">Parallel Tasks</label>
<select id="parallel_tasks" name="parallel_tasks">
<option value="1">1</option>
<option value="2">2</option>
<option value="3">3</option>
<option value="4">4</option>
</select>
<input type="submit" value="Submit">
</form>
<h2>Workers</h2> <h2>Workers</h2>
<div class="workers"> <div class="workers">
{{range $w := .Workers}} {{range $w := .Workers}}
@ -78,22 +60,6 @@
<td> <td>
{{ $w.ConnectionChanged }} {{ $w.ConnectionChanged }}
</td> </td>
</tr>
<tr>
<td>
QueueEnable
</td>
<td>
{{ $w.QueueEnable }}
</td>
</tr>
<tr>
<td>
ParallelTasks
</td>
<td>
{{ $w.ParallelTasks }}
</td>
</tr> </tr>
{{if $w.Connected}} {{if $w.Connected}}
<tr> <tr>
@ -129,8 +95,8 @@
</td> </td>
</tr> </tr>
{{end}} {{end}}
</table> </table>
</div> </div>
{{end}} {{end}}
</div> </div>
{{template "tail"}} {{template "tail"}}

View file

@ -13,8 +13,6 @@
<th>ScanNewQueueTranscode</th> <th>ScanNewQueueTranscode</th>
<th>ScanChangedQueueTranscode</th> <th>ScanChangedQueueTranscode</th>
<th>HealthOkQueueTranscode</th> <th>HealthOkQueueTranscode</th>
<th>HealthCommand</th>
<th>TranscodeCommand</th>
</tr> </tr>
{{range $l := .Libraries}} {{range $l := .Libraries}}
<tr onclick="window.location='/libraries/{{ $l.ID }}';"> <tr onclick="window.location='/libraries/{{ $l.ID }}';">
@ -51,12 +49,6 @@
<td> <td>
{{ $l.HealthOkQueueTranscode }} {{ $l.HealthOkQueueTranscode }}
</td> </td>
<td>
{{ $l.HealthCommandID }}
</td>
<td>
{{ $l.TranscodeCommandID }}
</td>
</tr> </tr>
{{end}} {{end}}
</table> </table>
@ -75,28 +67,14 @@
<input type="text" name="transcode_path"> <input type="text" name="transcode_path">
<label>Queue Heathcheck for new files on Scan:</label> <label>Queue Heathcheck for new files on Scan:</label>
<input type="checkbox" name="scan_new_queue_health"> <input type="checkbox" name="scan_new_queue_health">
<label>Queue Heathcheck for changed files on Scan (Not Implemented):</label> <label>Queue Heathcheck for changed files on Scan:</label>
<input type="checkbox" name="scan_changed_queue_health"> <input type="checkbox" name="scan_changed_queue_health">
<label>Queue Transcode for new files on Scan (Not Implemented):</label> <label>Queue Transcode for new files on Scan:</label>
<input type="checkbox" name="scan_new_queue_transcode"> <input type="checkbox" name="scan_new_queue_transcode">
<label>Queue Transcode for changed files on Scan (Not Implemented):</label> <label>Queue Transcode for changed files on Scan:</label>
<input type="checkbox" name="scan_changed_queue_transcode"> <input type="checkbox" name="scan_changed_queue_transcode">
<label>Queue Transcode on Healthcheck Success:</label> <label>Queue Transcode on Healthcheck Success:</label>
<input type="checkbox" name="health_ok_queue_transcode"> <input type="checkbox" name="health_ok_queue_transcode">
<label for="health_command_id">Health Command:</label>
<select id="health_command_id" name="health_command_id">
<option value="">None</option>
{{range $l := .FfmpegCommands}}
<option value="{{$l.ID}}">{{$l.Name}}</option>
{{end}}
</select>
<label for="transcode_command_id">Transcode Command:</label>
<select id="transcode_command_id" name="transcode_command_id">
<option value="">None</option>
{{range $l := .FfmpegCommands}}
<option value="{{$l.ID}}">{{$l.Name}}</option>
{{end}}
</select>
<input type="submit" value="Submit"> <input type="submit" value="Submit">
</form> </form>
{{template "tail"}} {{template "tail"}}

View file

@ -49,4 +49,4 @@
{{end}} {{end}}
</table> </table>
</div> </div>
{{template "tail"}} {{template "tail"}}

View file

@ -4,6 +4,5 @@
<a class="button" href="/libraries">Libraries</a> <a class="button" href="/libraries">Libraries</a>
<a class="button" href="/tasks">Tasks</a> <a class="button" href="/tasks">Tasks</a>
<a class="button" href="/ffmpeg_commands">FFmpeg Commands</a> <a class="button" href="/ffmpeg_commands">FFmpeg Commands</a>
<a class="button" href="/stats">Stats</a>
</nav> </nav>
{{end}} {{end}}

View file

@ -1,69 +0,0 @@
{{template "head"}}
<script src="/static/js/library_filter.js"></script>
<h2>Stats</h2>
<label for="library">Library:</label>
<select id="library" name="library" onchange="setLibrary(this.value)" class="short-button">
<option {{if eq .SelectedLibrary "-1" }} selected {{end}}value="-1">ALL</option>
{{range $l := $.Libraries}}
<option {{if eq $.SelectedLibrary $l.ID }} selected {{end}}value="{{$l.ID}}">{{$l.Name}}</option>
{{end}}
</select>
<h4>Size</h4>
<div class="counter">
{{ $l := len .Size }}
{{range $i, $c := .Size}}
<img class="counter-image" alt="{{$c}}" src="/static/counter/{{$c}}.gif">
{{ $n := sub $l $i }}
{{ if eq $n 4 }}
KB
{{ else if eq $n 7 }}
MB
{{ else if eq $n 10 }}
GB
{{ else if eq $n 13 }}
TB
{{end}}
{{end}}
</div>
<h4>Files</h4>
<div class="counter">
{{ $l := len .Count }}
{{range $i, $c := .Count}}
<img class="counter-image" alt="{{$c}}" src="/static/counter/{{$c}}.gif">
{{ $n := sub $l $i }}
{{ if eq $n 4 }}
.
{{ else if eq $n 7 }}
.
{{ else if eq $n 10 }}
.
{{ else if eq $n 13 }}
.
{{end}}
{{end}}
</div>
<h4>Saved</h4>
<div class="counter">
{{ $l := len .Saved }}
{{range $i, $c := .Saved}}
<img class="counter-image" alt="{{$c}}" src="/static/counter/{{$c}}.gif">
{{ $n := sub $l $i }}
{{ if eq $n 4 }}
KB
{{ else if eq $n 7 }}
MB
{{ else if eq $n 10 }}
GB
{{ else if eq $n 13 }}
TB
{{end}}
{{end}}
</div>
<div class="stats">
{{range $c := .Charts}}
{{$c.Element}} {{$c.Script}}
{{end}}
</div>
{{template "tail"}}

View file

@ -10,7 +10,6 @@ type TaskStart struct {
ID int `json:"id"` ID int `json:"id"`
FileID int `json:"file_id"` FileID int `json:"file_id"`
FileMD5 []byte `json:"file_md5"` FileMD5 []byte `json:"file_md5"`
FileExtension string `json:"file_extension"`
Type constants.TaskType `json:"type"` Type constants.TaskType `json:"type"`
Data json.RawMessage Data json.RawMessage
FfmpegCommand FFmpegCommand `json:"ffmpeg_command"` FfmpegCommand FFmpegCommand `json:"ffmpeg_command"`
@ -20,7 +19,6 @@ type Task struct {
ID int `json:"id"` ID int `json:"id"`
FileID int `json:"file_id"` FileID int `json:"file_id"`
FileMD5 []byte `json:"md5"` FileMD5 []byte `json:"md5"`
FileExtension string `json:"file_extension"`
Type constants.TaskType `json:"type"` Type constants.TaskType `json:"type"`
Status constants.TaskStatus `json:"status"` Status constants.TaskStatus `json:"status"`
FfmpegCommand FFmpegCommand `json:"ffmpeg_command"` FfmpegCommand FFmpegCommand `json:"ffmpeg_command"`

View file

@ -2,7 +2,6 @@ package worker
import ( import (
"context" "context"
"time"
"github.com/mackerelio/go-osstat/cpu" "github.com/mackerelio/go-osstat/cpu"
"github.com/mackerelio/go-osstat/memory" "github.com/mackerelio/go-osstat/memory"
@ -15,31 +14,13 @@ func init() {
rpcServer.RegisterMethod("status", status) rpcServer.RegisterMethod("status", status)
} }
var cpuBefore *cpu.Stats
var cpuUsage uint64
var cpuCount uint64
func calcUsage() {
for {
cStats, _ := cpu.Get()
if cStats != nil {
cpuUsage = cStats.Total
cpuCount = uint64(cStats.CPUCount)
if cpuBefore != nil {
total := float64(cStats.Total - cpuBefore.Total)
cpuUsage = uint64(float64((cStats.User-cpuBefore.User)+(cStats.System-cpuBefore.System)) / total * 100)
}
cpuBefore = cStats
}
time.Sleep(time.Second)
}
}
func status(ctx context.Context, req rpc.Request) (any, error) { func status(ctx context.Context, req rpc.Request) (any, error) {
s := types.WorkerStatus{} s := types.WorkerStatus{}
s.CPUUsage = cpuUsage cStats, _ := cpu.Get()
s.CPUCount = cpuCount if cStats != nil {
s.CPUUsage = cStats.Total
s.CPUCount = uint64(cStats.CPUCount)
}
mStats, _ := memory.Get() mStats, _ := memory.Get()
if mStats != nil { if mStats != nil {
s.MemoryUsage = uint64(float64(mStats.Used) / float64(mStats.Total) * 100) s.MemoryUsage = uint64(float64(mStats.Used) / float64(mStats.Total) * 100)

View file

@ -60,9 +60,6 @@ func Start(_conf config.Config) {
exit = true exit = true
cancel() cancel()
}() }()
go calcUsage()
for { for {
if exit { if exit {
slog.InfoContext(ctx, "Done") slog.InfoContext(ctx, "Done")

View file

@ -1,67 +0,0 @@
package workerpool
import (
"context"
"sync"
)
type WorkerPool struct {
workers int
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
queue chan func(context.Context)
qg sync.WaitGroup
}
func NewWorkerPool(workers int, queueSize int) *WorkerPool {
ctx, cancel := context.WithCancel(context.Background())
queue := make(chan func(context.Context), queueSize)
workerPool := WorkerPool{
workers: workers,
ctx: ctx,
cancel: cancel,
queue: queue,
}
for i := 0; i < workerPool.workers; i++ {
workerPool.wg.Add(1)
go workerPool.work(workerPool.ctx)
}
return &workerPool
}
func (wp *WorkerPool) Stop() {
wp.cancel()
wp.wg.Wait()
}
func (wp *WorkerPool) QueueJob(job func(context.Context)) {
wp.qg.Add(1)
wp.queue <- job
}
func (wp *WorkerPool) WaitForEmptyQueue() {
wp.qg.Wait()
}
func (wp *WorkerPool) QueueLength() int {
return len(wp.queue)
}
func (wp *WorkerPool) work(ctx context.Context) {
for {
select {
case <-ctx.Done():
wp.wg.Done()
return
case job := <-wp.queue:
func() {
defer wp.qg.Done()
job(ctx)
}()
}
}
}