Compare commits
65 commits
scan_rewor
...
main
Author | SHA1 | Date | |
---|---|---|---|
15a960da19 | |||
a5fc856764 | |||
e7371850d1 | |||
1e4016d075 | |||
553392e73f | |||
57969cc07d | |||
8c3ba7e883 | |||
5ecfd81967 | |||
0ce5b46449 | |||
64fd3da925 | |||
fa1deed55a | |||
1af3a3d5b9 | |||
b98be1fd70 | |||
808953f46f | |||
d25890ad26 | |||
2630ebb0cf | |||
d0d2529570 | |||
8a6390fb8d | |||
a2efbdea4c | |||
a6e10369bb | |||
f2e2236653 | |||
9298f4cccf | |||
e05051f34d | |||
c87dab94a0 | |||
12f700d1d0 | |||
f08a07e87d | |||
a60f05be12 | |||
6927e31a62 | |||
011f97a3e6 | |||
f401127127 | |||
20164f8983 | |||
275657e584 | |||
dbaa8f1de7 | |||
dcbddfc6cd | |||
849b4cf004 | |||
91e3504abe | |||
29c7cdd0bf | |||
cee5353b54 | |||
b6c22df6f1 | |||
7d037be106 | |||
9d2519c085 | |||
f818a3525b | |||
043d81b823 | |||
04a05284ed | |||
8b6e20ba00 | |||
64315356ae | |||
4c3df8fef6 | |||
a75ce3287d | |||
2e360f4b20 | |||
cd78de96b4 | |||
8e15cdd6e8 | |||
091ef322d5 | |||
38219f7b07 | |||
0f5d842a64 | |||
8ba0e8f2ab | |||
03eb3541a5 | |||
1871e1c26f | |||
81fa489417 | |||
aeb47c2593 | |||
f12462250a | |||
bae1112074 | |||
5d6a29407d | |||
4019d1764e | |||
67aa389f79 | |||
75a578fd36 |
38 changed files with 1112 additions and 194 deletions
|
@ -2,13 +2,14 @@ package constants
|
|||
|
||||
import "fmt"
|
||||
|
||||
const WORKER_VERSION = "v1"
|
||||
const WORKER_VERSION = "v2"
|
||||
|
||||
const WORKER_VERSION_HEADER = "morffix-version"
|
||||
const SHARED_SECRET_HEADER = "morffix-secret"
|
||||
const NAME_HEADER = "morffix-name"
|
||||
const UUID_HEADER = "morffix-uuid"
|
||||
const HASH_HEADER = "morffix-hash"
|
||||
const TASK_ID_HEADER = "morffix-task-id"
|
||||
|
||||
const INDEX_TEMPLATE_NAME = "index.tmpl"
|
||||
const LIBRARIES_TEMPLATE_NAME = "libraries.tmpl"
|
||||
|
@ -17,6 +18,7 @@ const MESSAGE_TEMPLATE_NAME = "message.tmpl"
|
|||
const TASKS_TEMPLATE_NAME = "tasks.tmpl"
|
||||
const TASK_TEMPLATE_NAME = "task.tmpl"
|
||||
const FFMPEG_COMMANDS_TEMPLATE_NAME = "ffmpeg_commands.tmpl"
|
||||
const STATS_TEMPLATE_NAME = "stats.tmpl"
|
||||
|
||||
const FORM_FILE_KEY = "file"
|
||||
|
||||
|
@ -25,6 +27,8 @@ const SORT_ORDER_ASC_PARAM = "order_asc"
|
|||
const QUERY_LIMIT_PARAM = "limit"
|
||||
const QUERY_PAGE_PARAM = "page"
|
||||
|
||||
const TEMP_FILE_EXTENSION = ".morffix"
|
||||
|
||||
type TaskType int
|
||||
|
||||
const (
|
||||
|
|
1
go.mod
1
go.mod
|
@ -12,6 +12,7 @@ require (
|
|||
)
|
||||
|
||||
require (
|
||||
github.com/go-echarts/go-echarts/v2 v2.4.0 // indirect
|
||||
github.com/hashicorp/errwrap v1.1.0 // indirect
|
||||
github.com/hashicorp/go-multierror v1.1.1 // indirect
|
||||
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 // indirect
|
||||
|
|
2
go.sum
2
go.sum
|
@ -2,6 +2,8 @@ github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8
|
|||
github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/go-echarts/go-echarts/v2 v2.4.0 h1:efD46dmAvaZEWrBHAGjE8cfDK48vvFTHz5N9VqW5rYc=
|
||||
github.com/go-echarts/go-echarts/v2 v2.4.0/go.mod h1:56YlvzhW/a+du15f3S2qUGNDfKnFOeJSThBIrVFHDtI=
|
||||
github.com/golang-migrate/migrate/v4 v4.17.1 h1:4zQ6iqL6t6AiItphxJctQb3cFqWiSpMnX7wLTPnnYO4=
|
||||
github.com/golang-migrate/migrate/v4 v4.17.1/go.mod h1:m8hinFyWBn0SA4QKHuKh175Pm9wjmxj3S2Mia7dbXzM=
|
||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||
|
|
|
@ -1,2 +1,2 @@
|
|||
ALTER TABLE files
|
||||
DROP IF EXISTS ffprobe;
|
||||
DROP IF EXISTS ffprobe_data;
|
||||
|
|
3
migrations/000019_alter_libraries_table_command.down.sql
Normal file
3
migrations/000019_alter_libraries_table_command.down.sql
Normal file
|
@ -0,0 +1,3 @@
|
|||
ALTER TABLE libraries
|
||||
DROP IF EXISTS transcode_command_id,
|
||||
DROP IF EXISTS health_command_id;
|
3
migrations/000019_alter_libraries_table_command.up.sql
Normal file
3
migrations/000019_alter_libraries_table_command.up.sql
Normal file
|
@ -0,0 +1,3 @@
|
|||
ALTER TABLE libraries
|
||||
ADD transcode_command_id bigint REFERENCES ffmpeg_commands(id),
|
||||
ADD health_command_id bigint REFERENCES ffmpeg_commands(id);
|
4
migrations/000020_alter_files_table_transcode.down.sql
Normal file
4
migrations/000020_alter_files_table_transcode.down.sql
Normal file
|
@ -0,0 +1,4 @@
|
|||
ALTER TABLE files
|
||||
DROP IF EXISTS old_md5,
|
||||
DROP IF EXISTS old_ffprobe_data,
|
||||
DROP IF EXISTS old_size;
|
3
migrations/000020_alter_files_table_transcode.up.sql
Normal file
3
migrations/000020_alter_files_table_transcode.up.sql
Normal file
|
@ -0,0 +1,3 @@
|
|||
ALTER TABLE files ADD old_md5 uuid,
|
||||
ADD old_ffprobe_data JSONB,
|
||||
ADD old_size bigint;
|
|
@ -0,0 +1,2 @@
|
|||
ALTER TABLE workers
|
||||
DROP IF EXISTS parallel_tasks;
|
|
@ -0,0 +1,2 @@
|
|||
ALTER TABLE workers
|
||||
ADD parallel_tasks smallint NOT NULL DEFAULT 1;
|
|
@ -18,8 +18,9 @@ type IndexData struct {
|
|||
type IndexWorker struct {
|
||||
ID string
|
||||
Worker
|
||||
Status *types.WorkerStatus
|
||||
QueueEnable bool
|
||||
Status *types.WorkerStatus
|
||||
QueueEnable bool
|
||||
ParallelTasks int
|
||||
}
|
||||
|
||||
func handleIndex(w http.ResponseWriter, r *http.Request) {
|
||||
|
@ -31,6 +32,15 @@ func handleIndex(w http.ResponseWriter, r *http.Request) {
|
|||
WorkersMutex.Lock()
|
||||
defer WorkersMutex.Unlock()
|
||||
for i := range Workers {
|
||||
|
||||
var queueEnable bool
|
||||
var parallelTasks int
|
||||
err := db.QueryRow(r.Context(), "SELECT queue_enable, parallel_tasks FROM workers WHERE id = $1", i).Scan(&queueEnable, ¶llelTasks)
|
||||
if err != nil {
|
||||
w.Write([]byte(err.Error()))
|
||||
slog.ErrorContext(r.Context(), "Error Querying Worker", "err", err)
|
||||
}
|
||||
|
||||
if Workers[i].Connected {
|
||||
var status types.WorkerStatus
|
||||
_, err := rpcServer.Call(r.Context(), Workers[i].Conn, "status", nil, &status)
|
||||
|
@ -41,15 +51,19 @@ func handleIndex(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
slog.InfoContext(r.Context(), "Got Worker Status", "id", i, "status", status)
|
||||
data.Workers = append(data.Workers, IndexWorker{
|
||||
ID: i,
|
||||
Worker: *Workers[i],
|
||||
Status: &status,
|
||||
ID: i,
|
||||
Worker: *Workers[i],
|
||||
Status: &status,
|
||||
QueueEnable: queueEnable,
|
||||
ParallelTasks: parallelTasks,
|
||||
})
|
||||
} else {
|
||||
data.Workers = append(data.Workers, IndexWorker{
|
||||
ID: i,
|
||||
Worker: *Workers[i],
|
||||
Status: nil,
|
||||
ID: i,
|
||||
Worker: *Workers[i],
|
||||
Status: nil,
|
||||
QueueEnable: queueEnable,
|
||||
ParallelTasks: parallelTasks,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -77,6 +91,10 @@ func handleIndex(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
func splitInt(n int) []int {
|
||||
if n == 0 {
|
||||
return []int{0}
|
||||
}
|
||||
|
||||
slc := []int{}
|
||||
for n > 0 {
|
||||
slc = append(slc, n%10)
|
||||
|
|
|
@ -5,17 +5,21 @@ import (
|
|||
"fmt"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"strconv"
|
||||
|
||||
"git.lastassault.de/speatzle/morffix/constants"
|
||||
"github.com/jackc/pgx/v5"
|
||||
)
|
||||
|
||||
type LibrariesData struct {
|
||||
Libraries []Library
|
||||
Libraries []Library
|
||||
FfmpegCommands []FfmpegCommand
|
||||
}
|
||||
|
||||
type Library struct {
|
||||
ID string `db:"id"`
|
||||
HealthCommandID *int `db:"health_command_id"`
|
||||
TranscodeCommandID *int `db:"transcode_command_id"`
|
||||
Name string `db:"name"`
|
||||
Path string `db:"path"`
|
||||
Enable bool `db:"enable"`
|
||||
|
@ -40,7 +44,7 @@ func handleLibraries(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
}
|
||||
|
||||
rows, err := db.Query(r.Context(), "SELECT id, name, path, enable, scan_new_queue_health, scan_changed_queue_health, scan_new_queue_transcode, scan_changed_queue_transcode, health_ok_queue_transcode, transcode_replace, transcode_path FROM libraries")
|
||||
rows, err := db.Query(r.Context(), "SELECT * FROM libraries")
|
||||
if err != nil {
|
||||
slog.ErrorContext(r.Context(), "Query Libraries", "err", err)
|
||||
http.Error(w, "Error Query Libraries: "+err.Error(), http.StatusInternalServerError)
|
||||
|
@ -54,6 +58,20 @@ func handleLibraries(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
data.Libraries = libraries
|
||||
|
||||
rows, err = db.Query(r.Context(), "SELECT id, name, data FROM ffmpeg_commands")
|
||||
if err != nil {
|
||||
slog.ErrorContext(r.Context(), "Query Ffmpeg Commands", "err", err)
|
||||
http.Error(w, "Error Ffmpeg Commands: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
ffmpegCommands, err := pgx.CollectRows[FfmpegCommand](rows, pgx.RowToStructByName[FfmpegCommand])
|
||||
if err != nil {
|
||||
slog.ErrorContext(r.Context(), "Collect Ffmpeg Commands", "err", err)
|
||||
http.Error(w, "Error Collect Ffmpeg Commands: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
data.FfmpegCommands = ffmpegCommands
|
||||
|
||||
buf := bytes.Buffer{}
|
||||
err = templates.ExecuteTemplate(&buf, constants.LIBRARIES_TEMPLATE_NAME, data)
|
||||
if err != nil {
|
||||
|
@ -84,9 +102,21 @@ func createLibrary(r *http.Request) error {
|
|||
scan_changed_queue_transcode := r.FormValue("scan_changed_queue_transcode") == "on"
|
||||
health_ok_queue_transcode := r.FormValue("health_ok_queue_transcode") == "on"
|
||||
|
||||
var health_command_id *int = nil
|
||||
var transcode_command_id *int = nil
|
||||
|
||||
h, err := strconv.Atoi(r.FormValue("health_command_id"))
|
||||
if err == nil {
|
||||
health_command_id = &h
|
||||
}
|
||||
t, err := strconv.Atoi(r.FormValue("transcode_command_id"))
|
||||
if err == nil {
|
||||
transcode_command_id = &t
|
||||
}
|
||||
|
||||
slog.Info("Got Library Create", "name", name, "path", path, "enable", enable)
|
||||
|
||||
_, err = db.Exec(r.Context(), "INSERT INTO Libraries (name, path, enable, scan_new_queue_health, scan_changed_queue_health, scan_new_queue_transcode, scan_changed_queue_transcode, health_ok_queue_transcode, transcode_replace, transcode_path) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10)", name, path, enable, scan_new_queue_health, scan_changed_queue_health, scan_new_queue_transcode, scan_changed_queue_transcode, health_ok_queue_transcode, transcode_replace, transcode_path)
|
||||
_, err = db.Exec(r.Context(), "INSERT INTO Libraries (name, path, enable, scan_new_queue_health, scan_changed_queue_health, scan_new_queue_transcode, scan_changed_queue_transcode, health_ok_queue_transcode, transcode_replace, transcode_path, health_command_id, transcode_command_id) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12)", name, path, enable, scan_new_queue_health, scan_changed_queue_health, scan_new_queue_transcode, scan_changed_queue_transcode, health_ok_queue_transcode, transcode_replace, transcode_path, health_command_id, transcode_command_id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Inserting Library: %w", err)
|
||||
}
|
||||
|
|
|
@ -20,10 +20,12 @@ type File struct {
|
|||
ID int `db:"id"`
|
||||
Path string `db:"path"`
|
||||
Size int64 `db:"size"`
|
||||
OldSize *int64 `db:"old_size"`
|
||||
Status constants.FileStatus `db:"status"`
|
||||
Health constants.FileHealth `db:"health"`
|
||||
Transcode constants.FileTranscode `db:"transcode"`
|
||||
MD5 []byte `db:"md5"`
|
||||
OldMD5 *[]byte `db:"old_md5"`
|
||||
UpdatedAt time.Time `db:"updated_at"`
|
||||
}
|
||||
|
||||
|
@ -63,7 +65,7 @@ func handleLibrary(w http.ResponseWriter, r *http.Request) {
|
|||
Enable: enabled,
|
||||
}
|
||||
|
||||
rows, err := db.Query(r.Context(), "SELECT id, path, size, status, health, transcode, md5, updated_at FROM files where library_id = $1", id)
|
||||
rows, err := db.Query(r.Context(), "SELECT id, path, size, old_size, status, health, transcode, md5, old_md5, updated_at FROM files where library_id = $1", id)
|
||||
if err != nil {
|
||||
slog.ErrorContext(r.Context(), "Query Files", "err", err)
|
||||
http.Error(w, "Error Query Files: "+err.Error(), http.StatusInternalServerError)
|
||||
|
|
41
server/parallel_tasks.go
Normal file
41
server/parallel_tasks.go
Normal file
|
@ -0,0 +1,41 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
func HandleSetParallelTasks(w http.ResponseWriter, r *http.Request) {
|
||||
err := r.ParseForm()
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("Parseing Form: %v", err), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
parallelTasks, err := strconv.Atoi(r.FormValue("parallel_tasks"))
|
||||
if err != nil {
|
||||
http.Error(w, "Can't parse parallel_tasks: "+err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
worker := r.FormValue("worker")
|
||||
|
||||
slog.Info("Got set Parallel Tasks", "parallel_tasks", parallelTasks, "worker", worker)
|
||||
|
||||
if worker == "all" {
|
||||
_, err = db.Exec(r.Context(), "UPDATE workers SET parallel_tasks = $1", parallelTasks)
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("Setting Worker Parallel Tasks: %v", err), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
_, err = db.Exec(r.Context(), "UPDATE workers SET parallel_tasks = $1 where id = $2", parallelTasks, worker)
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("Setting Worker Parallel Tasks: %v", err), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
http.Redirect(w, r, r.Header.Get("Referer"), http.StatusFound)
|
||||
}
|
201
server/scan.go
201
server/scan.go
|
@ -10,11 +10,11 @@ import (
|
|||
"os"
|
||||
"path/filepath"
|
||||
"slices"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.lastassault.de/speatzle/morffix/constants"
|
||||
"git.lastassault.de/speatzle/morffix/workerpool"
|
||||
"github.com/jackc/pgx/v5"
|
||||
"gopkg.in/vansante/go-ffprobe.v2"
|
||||
)
|
||||
|
@ -22,44 +22,47 @@ import (
|
|||
var videoFileExtensions = []string{".mkv", ".mp4", ".webm", ".flv", ".avi"}
|
||||
|
||||
func handleScan(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method == "GET" {
|
||||
scanStatus(w, r)
|
||||
return
|
||||
}
|
||||
http.Error(w, "Not Implemented", http.StatusNotImplemented)
|
||||
/*
|
||||
if r.Method == "GET" {
|
||||
scanStatus(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
id, err := strconv.Atoi(r.FormValue("id"))
|
||||
if err != nil {
|
||||
http.Error(w, "Parsing ID", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
id, err := strconv.Atoi(r.FormValue("id"))
|
||||
if err != nil {
|
||||
http.Error(w, "Parsing ID", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
var name string
|
||||
var path string
|
||||
var enabled bool
|
||||
err = db.QueryRow(r.Context(), "SELECT name, path, enable FROM libraries WHERE id = $1", id).Scan(&name, &path, &enabled)
|
||||
if err != nil {
|
||||
slog.ErrorContext(r.Context(), "Get Library", "err", err)
|
||||
http.Error(w, "Error Get Library: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
var name string
|
||||
var path string
|
||||
var enabled bool
|
||||
err = db.QueryRow(r.Context(), "SELECT name, path, enable FROM libraries WHERE id = $1", id).Scan(&name, &path, &enabled)
|
||||
if err != nil {
|
||||
slog.ErrorContext(r.Context(), "Get Library", "err", err)
|
||||
http.Error(w, "Error Get Library: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
scanCtx := context.Background()
|
||||
go scanLibrary(scanCtx, id)
|
||||
scanCtx := context.Background()
|
||||
go scanLibrary(scanCtx, id)
|
||||
|
||||
message := "Scan Started"
|
||||
message := "Scan Started"
|
||||
|
||||
buf := bytes.Buffer{}
|
||||
err = templates.ExecuteTemplate(&buf, constants.MESSAGE_TEMPLATE_NAME, message)
|
||||
if err != nil {
|
||||
slog.ErrorContext(r.Context(), "Executing Library Template", "err", err)
|
||||
http.Error(w, "Error Executing Template: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
buf := bytes.Buffer{}
|
||||
err = templates.ExecuteTemplate(&buf, constants.MESSAGE_TEMPLATE_NAME, message)
|
||||
if err != nil {
|
||||
slog.ErrorContext(r.Context(), "Executing Library Template", "err", err)
|
||||
http.Error(w, "Error Executing Template: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
_, err = w.Write(buf.Bytes())
|
||||
if err != nil {
|
||||
slog.ErrorContext(r.Context(), "Writing http Response", "err", err)
|
||||
}
|
||||
_, err = w.Write(buf.Bytes())
|
||||
if err != nil {
|
||||
slog.ErrorContext(r.Context(), "Writing http Response", "err", err)
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
func scanStatus(w http.ResponseWriter, r *http.Request) {
|
||||
|
@ -79,7 +82,7 @@ func scanStatus(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
}
|
||||
|
||||
func manageScan(stop chan bool) {
|
||||
func manageScan(stop chan bool, scanPool *workerpool.WorkerPool, hashPool *workerpool.WorkerPool) {
|
||||
scanTicker := time.NewTicker(time.Minute)
|
||||
hashTicker := time.NewTicker(time.Second)
|
||||
scanRunning := false
|
||||
|
@ -111,7 +114,7 @@ func manageScan(stop chan bool) {
|
|||
}
|
||||
|
||||
for _, l := range libraries {
|
||||
scanLibrary(ctx, l)
|
||||
scanLibrary(ctx, l, scanPool)
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -126,19 +129,32 @@ func manageScan(stop chan bool) {
|
|||
hashRunning = false
|
||||
}()
|
||||
|
||||
var fileID uint
|
||||
err := db.QueryRow(ctx, "SELECT id FROM files WHERE status = $1 OR status = $2 LIMIT 1", constants.FILE_STATUS_CHANGED, constants.FILE_STATUS_NEW).Scan(&fileID)
|
||||
rows, err := db.Query(ctx, "SELECT f.id FROM files f INNER JOIN libraries l ON f.library_id = l.id WHERE l.enable = true AND (f.status = $1 OR f.status = $2) LIMIT 10", constants.FILE_STATUS_CHANGED, constants.FILE_STATUS_NEW)
|
||||
if err != nil {
|
||||
if !errors.Is(err, pgx.ErrNoRows) {
|
||||
slog.ErrorContext(ctx, "Error Getting Files for Hashing", "err", err)
|
||||
}
|
||||
// Nothing to do
|
||||
return
|
||||
}
|
||||
|
||||
err = hashFile(ctx, fileID)
|
||||
files, err := pgx.CollectRows[uint](rows, pgx.RowTo[uint])
|
||||
if err != nil {
|
||||
slog.ErrorContext(ctx, "Error Hashing File", "err", err)
|
||||
slog.ErrorContext(ctx, "Collect File Rows", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, f := range files {
|
||||
hashPool.QueueJob(func(ctx context.Context) {
|
||||
err = hashFile(ctx, f)
|
||||
if err != nil {
|
||||
slog.ErrorContext(ctx, "Error Hashing File", "err", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// Wait for all jobs to finish since we don't track which ones are in progress and don't want to hash the same file twice. Not ideal but better than just doing one at a time
|
||||
hashPool.WaitForEmptyQueue()
|
||||
}()
|
||||
case <-stop:
|
||||
cancel()
|
||||
|
@ -149,7 +165,7 @@ func manageScan(stop chan bool) {
|
|||
|
||||
var scanLock sync.Mutex
|
||||
|
||||
func scanLibrary(ctx context.Context, id int) {
|
||||
func scanLibrary(ctx context.Context, id int, scanPool *workerpool.WorkerPool) {
|
||||
slog.InfoContext(ctx, "Acquiring Scan Lock...")
|
||||
scanLock.Lock()
|
||||
defer scanLock.Unlock()
|
||||
|
@ -158,7 +174,9 @@ func scanLibrary(ctx context.Context, id int) {
|
|||
var name string
|
||||
var lpath string
|
||||
var enabled bool
|
||||
err := db.QueryRow(ctx, "SELECT name, path, enable FROM libraries WHERE id = $1", id).Scan(&name, &lpath, &enabled)
|
||||
var health_command_id *int
|
||||
var scan_new_queue_health bool
|
||||
err := db.QueryRow(ctx, "SELECT name, path, enable, health_command_id, scan_new_queue_health FROM libraries WHERE id = $1", id).Scan(&name, &lpath, &enabled, &health_command_id, &scan_new_queue_health)
|
||||
if err != nil {
|
||||
slog.ErrorContext(ctx, "Get Library", "err", err)
|
||||
return
|
||||
|
@ -180,22 +198,17 @@ func scanLibrary(ctx context.Context, id int) {
|
|||
return
|
||||
}
|
||||
|
||||
tx, err := db.Begin(ctx)
|
||||
if err != nil {
|
||||
slog.ErrorContext(ctx, "Begin Transaction", "err", err)
|
||||
return
|
||||
}
|
||||
defer tx.Rollback(ctx)
|
||||
|
||||
slog.InfoContext(ctx, "Checking Files...", "id", id, "path", lpath)
|
||||
|
||||
// Mark all Files as Missing
|
||||
_, err = tx.Exec(ctx, "UPDATE files SET status = $2 where library_id = $1", id, constants.FILE_STATUS_MISSING)
|
||||
if err != nil {
|
||||
slog.ErrorContext(ctx, "Setting Missing Status", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
// TODO Work on getting missing status again
|
||||
/*
|
||||
// Mark all Files as Missing
|
||||
_, err = tx.Exec(ctx, "UPDATE files SET status = $2 where library_id = $1", id, constants.FILE_STATUS_MISSING)
|
||||
if err != nil {
|
||||
slog.ErrorContext(ctx, "Setting Missing Status", "err", err)
|
||||
return
|
||||
}
|
||||
*/
|
||||
err = filepath.Walk(lpath,
|
||||
func(fullPath string, info os.FileInfo, err error) error {
|
||||
if errors.Is(err, os.ErrPermission) {
|
||||
|
@ -211,7 +224,7 @@ func scanLibrary(ctx context.Context, id int) {
|
|||
}
|
||||
|
||||
if !slices.Contains(videoFileExtensions, filepath.Ext(fullPath)) {
|
||||
slog.InfoContext(ctx, "Skipping non video file", "path", fullPath)
|
||||
slog.DebugContext(ctx, "Skipping non video file", "path", fullPath)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -223,52 +236,63 @@ func scanLibrary(ctx context.Context, id int) {
|
|||
var fileID int
|
||||
var oldSize uint
|
||||
var oldModTime time.Time
|
||||
err = tx.QueryRow(ctx, "SELECT id, size, mod_time FROM files WHERE library_id = $1 AND path = $2", id, fPath).Scan(&fileID, &oldSize, &oldModTime)
|
||||
|
||||
// Remove Timezone and Round to nearest Second
|
||||
newModTime := info.ModTime().UTC().Round(time.Second)
|
||||
|
||||
err = db.QueryRow(ctx, "SELECT id, size, mod_time FROM files WHERE library_id = $1 AND path = $2", id, fPath).Scan(&fileID, &oldSize, &oldModTime)
|
||||
if errors.Is(err, pgx.ErrNoRows) {
|
||||
// File Does not Exist Yet
|
||||
|
||||
slog.InfoContext(ctx, "File is New, Running FFProbe...", "path", fullPath)
|
||||
scanPool.QueueJob(
|
||||
func(ctx context.Context) {
|
||||
slog.InfoContext(ctx, "File is New, Running FFProbe...", "path", fullPath)
|
||||
|
||||
ffprobeData, err := ffprobe.ProbeURL(ctx, fullPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("ffprobe New File: %w", err)
|
||||
}
|
||||
slog.InfoContext(ctx, "ffprobe Done", "path", fullPath)
|
||||
ffprobeData, err := ffprobe.ProbeURL(ctx, fullPath)
|
||||
if err != nil {
|
||||
slog.ErrorContext(ctx, "ffprobe New File", "err", err)
|
||||
}
|
||||
slog.InfoContext(ctx, "ffprobe Done", "path", fullPath)
|
||||
|
||||
_, err = tx.Exec(ctx, "INSERT INTO files (library_id, path, size, status, health, ffprobe_data) VALUES ($1, $2, $3, $4, $5, $6)", id, fPath, info.Size(), constants.FILE_STATUS_NEW, constants.FILE_HEALTH_UNKNOWN, ffprobeData)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Add New File to DB: %w", err)
|
||||
}
|
||||
var file_id int
|
||||
err = db.QueryRow(ctx, `INSERT INTO files (library_id, path, size, status, health, ffprobe_data, mod_time)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7)
|
||||
RETURNING id`, id, fPath, info.Size(), constants.FILE_STATUS_NEW, constants.FILE_HEALTH_UNKNOWN, ffprobeData, newModTime).Scan(&file_id)
|
||||
if err != nil {
|
||||
slog.ErrorContext(ctx, "Add New File to DB", "err", err)
|
||||
}
|
||||
if scan_new_queue_health && health_command_id != nil {
|
||||
slog.InfoContext(ctx, "Queueing HealthCheck On New File", "file_id", file_id, "health_command_id", health_command_id)
|
||||
_, err = db.Exec(ctx, "INSERT INTO tasks (file_id, type, status, data, ffmpeg_command_id) VALUES ($1,$2,$3,$4,$5)", file_id, constants.TASK_TYPE_HEALTHCHECK, constants.TASK_STATUS_QUEUED, nil, health_command_id)
|
||||
if err != nil {
|
||||
slog.ErrorContext(ctx, "Queueing Transcode On HealthCheck Success", "err", err, "file_id", file_id, "health_command_id", health_command_id)
|
||||
}
|
||||
}
|
||||
})
|
||||
return nil
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("Getting File: %w", err)
|
||||
}
|
||||
|
||||
// Remove Timezone and Round to nearest Second
|
||||
newModTime := info.ModTime().UTC().Round(time.Second)
|
||||
|
||||
// File Already Exists, Check if it has been changed
|
||||
if newModTime.Equal(oldModTime) && uint(info.Size()) == oldSize {
|
||||
slog.Debug("File stayed the same", "id", fileID)
|
||||
_, err = tx.Exec(ctx, "UPDATE files SET status = $2 WHERE id = $1", fileID, constants.FILE_STATUS_EXISTS)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Updating Non Changed File in DB: %w", err)
|
||||
}
|
||||
} else {
|
||||
slog.InfoContext(ctx, "File Has Changed", "path", fullPath, "old_mod_time", oldModTime, "new_mod_time", newModTime, "old_size", oldSize, "new_size", info.Size())
|
||||
scanPool.QueueJob(func(ctx context.Context) {
|
||||
slog.InfoContext(ctx, "File Has Changed", "path", fullPath, "old_mod_time", oldModTime, "new_mod_time", newModTime, "old_size", oldSize, "new_size", info.Size())
|
||||
|
||||
ffprobeData, err := ffprobe.ProbeURL(ctx, fullPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("ffprobe Changed File: %w", err)
|
||||
}
|
||||
slog.InfoContext(ctx, "ffprobe Done", "path", fullPath)
|
||||
ffprobeData, err := ffprobe.ProbeURL(ctx, fullPath)
|
||||
if err != nil {
|
||||
slog.ErrorContext(ctx, "ffprobe Changed File", "err", err)
|
||||
}
|
||||
slog.InfoContext(ctx, "ffprobe Done", "path", fullPath)
|
||||
|
||||
_, err = tx.Exec(ctx, "UPDATE files SET size = $2, status = $3, health = $4, mod_time = $5, ffprobe_data = $6 WHERE id = $1", fileID, info.Size(), constants.FILE_STATUS_CHANGED, constants.FILE_HEALTH_UNKNOWN, newModTime, ffprobeData)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Updating Changed File in DB: %w", err)
|
||||
}
|
||||
_, err = db.Exec(ctx, "UPDATE files SET size = $2, status = $3, health = $4, mod_time = $5, ffprobe_data = $6 WHERE id = $1", fileID, info.Size(), constants.FILE_STATUS_CHANGED, constants.FILE_HEALTH_UNKNOWN, newModTime, ffprobeData)
|
||||
if err != nil {
|
||||
slog.ErrorContext(ctx, "Updating Changed File in DB", "err", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
|
@ -276,10 +300,7 @@ func scanLibrary(ctx context.Context, id int) {
|
|||
return
|
||||
}
|
||||
|
||||
err = tx.Commit(ctx)
|
||||
if err != nil {
|
||||
slog.ErrorContext(ctx, "Error Committing Changes", "err", err)
|
||||
return
|
||||
}
|
||||
scanPool.WaitForEmptyQueue()
|
||||
|
||||
slog.InfoContext(ctx, "Scan Done", "id", id)
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@ import (
|
|||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
|
||||
"git.lastassault.de/speatzle/morffix/config"
|
||||
"git.lastassault.de/speatzle/morffix/workerpool"
|
||||
)
|
||||
|
||||
var conf config.Config
|
||||
|
@ -102,6 +103,9 @@ func Start(_conf config.Config, tmplFS embed.FS, staticFS embed.FS, migrationsFS
|
|||
mux.HandleFunc("/libraries", handleLibraries)
|
||||
mux.HandleFunc("/ffmpeg_commands", handleFfmpegCommands)
|
||||
mux.HandleFunc("/queue_enable", HandleSetQueueEnable)
|
||||
mux.HandleFunc("/parallel_tasks", HandleSetParallelTasks)
|
||||
mux.HandleFunc("/stats", handleStats)
|
||||
mux.HandleFunc("/stats/{id}", handleStats)
|
||||
|
||||
mux.HandleFunc("/", handleIndex)
|
||||
|
||||
|
@ -129,8 +133,11 @@ func Start(_conf config.Config, tmplFS embed.FS, staticFS embed.FS, migrationsFS
|
|||
stopWorkerManagement := make(chan bool, 1)
|
||||
go manageWorkers(stopWorkerManagement)
|
||||
|
||||
scanPool := workerpool.NewWorkerPool(10, 100000)
|
||||
hashPool := workerpool.NewWorkerPool(5, 100000)
|
||||
|
||||
stopScanning := make(chan bool, 1)
|
||||
go manageScan(stopScanning)
|
||||
go manageScan(stopScanning, scanPool, hashPool)
|
||||
|
||||
sigs := make(chan os.Signal, 1)
|
||||
signal.Notify(sigs, os.Interrupt)
|
||||
|
@ -146,6 +153,10 @@ func Start(_conf config.Config, tmplFS embed.FS, staticFS embed.FS, migrationsFS
|
|||
stopWorkerManagement <- true
|
||||
slog.Info("Stopping Scanning...")
|
||||
stopScanning <- true
|
||||
slog.Info("Stopping Scanning Pool...")
|
||||
scanPool.Stop()
|
||||
slog.Info("Stopping Hashing Pool...")
|
||||
hashPool.Stop()
|
||||
slog.Info("Done")
|
||||
}
|
||||
}
|
||||
|
|
369
server/stats.go
Normal file
369
server/stats.go
Normal file
|
@ -0,0 +1,369 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"html/template"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"slices"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"git.lastassault.de/speatzle/morffix/constants"
|
||||
"github.com/go-echarts/go-echarts/v2/charts"
|
||||
"github.com/go-echarts/go-echarts/v2/opts"
|
||||
"github.com/jackc/pgx/v5"
|
||||
)
|
||||
|
||||
type StatsDisplay struct {
|
||||
Charts []ChartData
|
||||
Libraries []Library
|
||||
SelectedLibrary string
|
||||
Size []int
|
||||
Duration []int
|
||||
Count []int
|
||||
Saved []int
|
||||
}
|
||||
|
||||
type ChartData struct {
|
||||
Element template.HTML
|
||||
Script template.HTML
|
||||
}
|
||||
|
||||
type PieValue struct {
|
||||
Name string
|
||||
Value int
|
||||
}
|
||||
|
||||
type PieIntValue struct {
|
||||
Id int
|
||||
Value int
|
||||
}
|
||||
|
||||
const CHART_COLOR_SUCCESS = "#7cffb2"
|
||||
const CHART_COLOR_FAILED = "#ff6e76"
|
||||
const CHART_COLOR_UNKNOWN = "#fddd60"
|
||||
|
||||
func generatePie(name string, data []opts.PieData) ChartData {
|
||||
pie := charts.NewPie()
|
||||
pie.SetGlobalOptions(
|
||||
charts.WithInitializationOpts(opts.Initialization{
|
||||
Theme: "dark",
|
||||
BackgroundColor: "#111",
|
||||
}),
|
||||
charts.WithTitleOpts(opts.Title{
|
||||
Title: name,
|
||||
}))
|
||||
|
||||
for i := range data {
|
||||
if data[i].Name == "Success" || data[i].Name == "Healthy" {
|
||||
data[i].ItemStyle = &opts.ItemStyle{
|
||||
Color: CHART_COLOR_SUCCESS,
|
||||
}
|
||||
} else if data[i].Name == "Failed" || data[i].Name == "Damaged" {
|
||||
data[i].ItemStyle = &opts.ItemStyle{
|
||||
Color: CHART_COLOR_FAILED,
|
||||
}
|
||||
} else if data[i].Name == "Unknown" || data[i].Name == "None" {
|
||||
data[i].ItemStyle = &opts.ItemStyle{
|
||||
Color: CHART_COLOR_UNKNOWN,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pie.AddSeries(name, data).SetSeriesOptions(
|
||||
charts.WithLabelOpts(
|
||||
opts.Label{
|
||||
Show: opts.Bool(true),
|
||||
Formatter: "{b}: {c}",
|
||||
}),
|
||||
)
|
||||
|
||||
snippet := pie.RenderSnippet()
|
||||
|
||||
return ChartData{
|
||||
Element: template.HTML(snippet.Element),
|
||||
Script: template.HTML(snippet.Script),
|
||||
}
|
||||
}
|
||||
|
||||
func generateStats(ctx context.Context, library_id string) ([]ChartData, error) {
|
||||
data := []ChartData{}
|
||||
|
||||
rows, err := db.Query(ctx,
|
||||
`SELECT COALESCE(jsonb_path_query_first(ffprobe_data, '$.streams[*] ? (@.codec_type == "video") ? (@.disposition.attached_pic == 0).codec_name')::text, 'Unknown') AS name, COUNT(*) AS value
|
||||
FROM files
|
||||
WHERE ffprobe_data IS NOT NULL AND ($1 = -1 OR library_id = $1)
|
||||
GROUP BY 1;`, library_id)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Query Codecs: %w", err)
|
||||
}
|
||||
|
||||
codecCounts, err := pgx.CollectRows(rows, pgx.RowToStructByName[PieValue])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Collect Codec Data: %w", err)
|
||||
}
|
||||
res := []opts.PieData{}
|
||||
for _, v := range codecCounts {
|
||||
res = append(res, opts.PieData{
|
||||
Name: strings.ReplaceAll(v.Name, "\"", ""),
|
||||
Value: v.Value,
|
||||
})
|
||||
}
|
||||
data = append(data, generatePie("Codecs", res))
|
||||
|
||||
rows, err = db.Query(ctx,
|
||||
`SELECT COALESCE(jsonb_path_query_first(ffprobe_data, '$.streams[*] ? (@.codec_type == "video") ? (@.disposition.attached_pic == 0).width')::text, 'Unknown') AS name, COUNT(*) AS value
|
||||
FROM files
|
||||
WHERE ffprobe_data IS NOT NULL AND ($1 = -1 OR library_id = $1)
|
||||
GROUP BY 1;`, library_id)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Query Resolution: %w", err)
|
||||
}
|
||||
|
||||
resolutionCounts, err := pgx.CollectRows(rows, pgx.RowToStructByName[PieValue])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Collect Codec Data: %w", err)
|
||||
}
|
||||
res = []opts.PieData{}
|
||||
for _, v := range resolutionCounts {
|
||||
res = append(res, opts.PieData{
|
||||
Name: strings.ReplaceAll(v.Name, "\"", ""),
|
||||
Value: v.Value,
|
||||
})
|
||||
}
|
||||
data = append(data, generatePie("Resolution", res))
|
||||
|
||||
rows, err = db.Query(ctx,
|
||||
`SELECT COALESCE(jsonb_path_query_first(ffprobe_data, '$.format.format_name')::text, 'Unknown') AS name, COUNT(*) AS value
|
||||
FROM files
|
||||
WHERE ffprobe_data IS NOT NULL AND ($1 = -1 OR library_id = $1)
|
||||
GROUP BY 1;`, library_id)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Query Container: %w", err)
|
||||
}
|
||||
|
||||
containerCounts, err := pgx.CollectRows(rows, pgx.RowToStructByName[PieValue])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Collect Container Data: %w", err)
|
||||
}
|
||||
res = []opts.PieData{}
|
||||
for _, v := range containerCounts {
|
||||
res = append(res, opts.PieData{
|
||||
Name: strings.ReplaceAll(v.Name, "\"", ""),
|
||||
Value: v.Value,
|
||||
})
|
||||
}
|
||||
data = append(data, generatePie("Container", res))
|
||||
|
||||
rows, err = db.Query(ctx,
|
||||
`SELECT health AS id, COUNT(*) AS value
|
||||
FROM files WHERE ($1 = -1 OR library_id = $1)
|
||||
GROUP BY 1;`, library_id)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Query Health: %w", err)
|
||||
}
|
||||
|
||||
healthCounts, err := pgx.CollectRows(rows, pgx.RowToStructByName[PieIntValue])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Collect Health Data: %w", err)
|
||||
}
|
||||
res = []opts.PieData{}
|
||||
for _, v := range healthCounts {
|
||||
res = append(res, opts.PieData{
|
||||
Name: constants.FileHealth(v.Id).String(),
|
||||
Value: v.Value,
|
||||
})
|
||||
}
|
||||
data = append(data, generatePie("Health", res))
|
||||
|
||||
rows, err = db.Query(ctx,
|
||||
`SELECT transcode AS id, COUNT(*) AS value
|
||||
FROM files WHERE ($1 = -1 OR library_id = $1)
|
||||
GROUP BY 1;`, library_id)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Query Transcode: %w", err)
|
||||
}
|
||||
|
||||
transcodeCounts, err := pgx.CollectRows(rows, pgx.RowToStructByName[PieIntValue])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Collect Transcode Data: %w", err)
|
||||
}
|
||||
res = []opts.PieData{}
|
||||
for _, v := range transcodeCounts {
|
||||
res = append(res, opts.PieData{
|
||||
Name: constants.FileTranscode(v.Id).String(),
|
||||
Value: v.Value,
|
||||
})
|
||||
}
|
||||
data = append(data, generatePie("Transcode Status", res))
|
||||
|
||||
rows, err = db.Query(ctx,
|
||||
`SELECT tasks.status AS id, COUNT(*) AS value
|
||||
FROM tasks INNER JOIN files ON files.id = tasks.file_id
|
||||
WHERE ($1 = -1 OR files.library_id = $1)
|
||||
GROUP BY 1;`, library_id)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Query Task Status: %w", err)
|
||||
}
|
||||
|
||||
taskStatusCounts, err := pgx.CollectRows(rows, pgx.RowToStructByName[PieIntValue])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Collect Task Status Data: %w", err)
|
||||
}
|
||||
res = []opts.PieData{}
|
||||
for _, v := range taskStatusCounts {
|
||||
res = append(res, opts.PieData{
|
||||
Name: constants.TaskStatus(v.Id).String(),
|
||||
Value: v.Value,
|
||||
})
|
||||
}
|
||||
data = append(data, generatePie("Task Status", res))
|
||||
|
||||
type BarTaskRowValue struct {
|
||||
Date time.Time
|
||||
Status constants.TaskStatus
|
||||
Count int
|
||||
}
|
||||
|
||||
rows, err = db.Query(ctx,
|
||||
`SELECT date_trunc('day', tasks.updated_at) date, tasks.status, COUNT(*) AS count
|
||||
FROM tasks INNER JOIN files ON files.id = tasks.file_id
|
||||
WHERE ($1 = -1 OR files.library_id = $1) AND tasks.updated_at > CURRENT_DATE - 7 AND (tasks.status = $2 OR tasks.status = $3)
|
||||
GROUP BY 1,2
|
||||
ORDER BY date;`, library_id, constants.TASK_STATUS_SUCCESS, constants.TASK_STATUS_FAILED)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Query Task Status Day: %w", err)
|
||||
}
|
||||
|
||||
taskStatusDayCounts, err := pgx.CollectRows(rows, pgx.RowToStructByName[BarTaskRowValue])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Collect Task Status Day Data: %w", err)
|
||||
}
|
||||
|
||||
days := []string{}
|
||||
successBarData := []opts.BarData{}
|
||||
failedBarData := []opts.BarData{}
|
||||
|
||||
for _, v := range taskStatusDayCounts {
|
||||
if !slices.Contains(days, v.Date.Format(time.DateOnly)) {
|
||||
days = append(days, v.Date.Format(time.DateOnly))
|
||||
}
|
||||
if v.Status == constants.TASK_STATUS_SUCCESS {
|
||||
successBarData = append(successBarData, opts.BarData{
|
||||
Value: v.Count,
|
||||
ItemStyle: &opts.ItemStyle{
|
||||
Color: CHART_COLOR_SUCCESS,
|
||||
},
|
||||
})
|
||||
} else if v.Status == constants.TASK_STATUS_FAILED {
|
||||
failedBarData = append(failedBarData, opts.BarData{
|
||||
Value: v.Count,
|
||||
ItemStyle: &opts.ItemStyle{
|
||||
Color: CHART_COLOR_FAILED,
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
bar := charts.NewBar()
|
||||
bar.SetGlobalOptions(
|
||||
charts.WithInitializationOpts(opts.Initialization{
|
||||
Theme: "dark",
|
||||
BackgroundColor: "#111",
|
||||
}),
|
||||
charts.WithTitleOpts(opts.Title{
|
||||
Title: "Task Success/Failed Last 7 Days",
|
||||
}),
|
||||
)
|
||||
bar.SetXAxis(days).
|
||||
AddSeries("Success", successBarData).
|
||||
AddSeries("Failed", failedBarData).
|
||||
SetSeriesOptions(charts.WithBarChartOpts(opts.BarChart{
|
||||
Stack: "stackA",
|
||||
}))
|
||||
|
||||
snippet := bar.RenderSnippet()
|
||||
|
||||
data = append(data, ChartData{
|
||||
Element: template.HTML(snippet.Element),
|
||||
Script: template.HTML(snippet.Script),
|
||||
})
|
||||
|
||||
return data, nil
|
||||
}
|
||||
|
||||
func handleStats(w http.ResponseWriter, r *http.Request) {
|
||||
id := r.PathValue("id")
|
||||
if id == "" {
|
||||
id = "-1"
|
||||
}
|
||||
|
||||
data, err := generateStats(r.Context(), id)
|
||||
if err != nil {
|
||||
slog.ErrorContext(r.Context(), "Generate Stats:", "err", err)
|
||||
http.Error(w, "Generate Stats: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
rows, err := db.Query(r.Context(),
|
||||
`SELECT *
|
||||
FROM libraries;`)
|
||||
if err != nil {
|
||||
slog.ErrorContext(r.Context(), "Query Libraries", "err", err)
|
||||
http.Error(w, "Query Libraries: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
libraries, err := pgx.CollectRows(rows, pgx.RowToStructByName[Library])
|
||||
if err != nil {
|
||||
slog.ErrorContext(r.Context(), "Collect Libraries", "err", err)
|
||||
http.Error(w, "Collect Libraries: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
var size int
|
||||
var count int
|
||||
var duration int
|
||||
err = db.QueryRow(r.Context(),
|
||||
`SELECT SUM(size) AS size,
|
||||
COUNT(id) as count,
|
||||
0 as duration
|
||||
FROM files WHERE ($1 = -1 OR files.library_id = $1) AND status = $2`, id, constants.FILE_STATUS_EXISTS).Scan(&size, &count, &duration)
|
||||
if err != nil {
|
||||
size = 0
|
||||
count = 0
|
||||
duration = 0
|
||||
}
|
||||
|
||||
var saved int
|
||||
err = db.QueryRow(r.Context(),
|
||||
`SELECT (SUM(old_size) - COALESCE(SUM(size), 0)) AS saved
|
||||
FROM files WHERE ($1 = -1 OR files.library_id = $1) AND status = $2 AND old_size IS NOT NULL`, id, constants.FILE_STATUS_EXISTS).Scan(&saved)
|
||||
if err != nil {
|
||||
saved = 0
|
||||
}
|
||||
|
||||
buf := bytes.Buffer{}
|
||||
err = templates.ExecuteTemplate(&buf, constants.STATS_TEMPLATE_NAME, StatsDisplay{
|
||||
Libraries: libraries,
|
||||
SelectedLibrary: id,
|
||||
Size: splitInt(size),
|
||||
Count: splitInt(count),
|
||||
Duration: splitInt(duration),
|
||||
Saved: splitInt(saved),
|
||||
Charts: data,
|
||||
})
|
||||
if err != nil {
|
||||
slog.ErrorContext(r.Context(), "Executing Stats Template", "err", err)
|
||||
http.Error(w, "Error Executing Template: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
_, err = w.Write(buf.Bytes())
|
||||
if err != nil {
|
||||
slog.ErrorContext(r.Context(), "Writing http Response", "err", err)
|
||||
}
|
||||
}
|
|
@ -107,7 +107,7 @@ func handleTasks(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
rows, err := db.Query(r.Context(), "SELECT id, name, path, enable, scan_new_queue_health, scan_changed_queue_health, scan_new_queue_transcode, scan_changed_queue_transcode, health_ok_queue_transcode, transcode_replace, transcode_path FROM libraries WHERE enable = $1", true)
|
||||
rows, err := db.Query(r.Context(), "SELECT * FROM libraries WHERE enable = $1", true)
|
||||
if err != nil {
|
||||
slog.ErrorContext(r.Context(), "Query Libraries", "err", err)
|
||||
http.Error(w, "Error Query Libraries: "+err.Error(), http.StatusInternalServerError)
|
||||
|
@ -328,6 +328,7 @@ type QueuedTask struct {
|
|||
Type constants.TaskType
|
||||
FileID int `json:"file_id"`
|
||||
FileMD5 []byte `json:"file_md5" db:"md5"`
|
||||
FileExtension string `json:"file_extension"`
|
||||
Data json.RawMessage
|
||||
FfmpegCommand types.FFmpegCommand `json:"ffmpeg_command" db:"ffmpeg_command"`
|
||||
}
|
||||
|
@ -344,7 +345,7 @@ func assignQueuedTasks(ctx context.Context) error {
|
|||
assignRunning = false
|
||||
}()
|
||||
|
||||
rows, err := db.Query(ctx, "SELECT t.id as id, t.type as type, t.file_id as file_id, f.md5 as md5, t.data as data, fc.data as ffmpeg_command FROM tasks t INNER JOIN files f ON f.id = t.file_id INNER JOIN ffmpeg_commands fc ON fc.id = t.ffmpeg_command_id WHERE t.status = $1", constants.TASK_STATUS_QUEUED)
|
||||
rows, err := db.Query(ctx, "SELECT t.id as id, t.type as type, t.file_id as file_id, f.md5 as md5, substring(f.path FROM '\\.([^\\.]*)$') AS file_extension, t.data as data, fc.data as ffmpeg_command FROM tasks t INNER JOIN files f ON f.id = t.file_id INNER JOIN ffmpeg_commands fc ON fc.id = t.ffmpeg_command_id WHERE t.status = $1 AND f.status = $2 ORDER BY type DESC", constants.TASK_STATUS_QUEUED, constants.FILE_STATUS_EXISTS)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Query Queued Tasks: %w", err)
|
||||
}
|
||||
|
@ -353,6 +354,8 @@ func assignQueuedTasks(ctx context.Context) error {
|
|||
return fmt.Errorf("Collect Queued Tasks: %w", err)
|
||||
}
|
||||
|
||||
// TODO, allow overwriting of extension
|
||||
|
||||
//slog.Info("Assigning Queued Tasks", "count", len(queuedTasks))
|
||||
|
||||
if len(queuedTasks) == 0 {
|
||||
|
@ -372,7 +375,8 @@ func assignQueuedTasks(ctx context.Context) error {
|
|||
}
|
||||
if Workers[i].Connected {
|
||||
var queueEnable bool
|
||||
err := db.QueryRow(ctx, "SELECT queue_enable FROM workers WHERE id = $1", i).Scan(&queueEnable)
|
||||
var parallelTasks int
|
||||
err := db.QueryRow(ctx, "SELECT queue_enable, parallel_tasks FROM workers WHERE id = $1", i).Scan(&queueEnable, ¶llelTasks)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error Querying Worker Queue Enable: %w", err)
|
||||
}
|
||||
|
@ -390,7 +394,7 @@ func assignQueuedTasks(ctx context.Context) error {
|
|||
slog.Debug("Assigning Queued Tasks Worker", "worker", i, "count", count)
|
||||
|
||||
// Allow for Multiple Tasks at once in the future
|
||||
if count < 1 {
|
||||
if count < parallelTasks {
|
||||
tx, err := db.Begin(ctx)
|
||||
defer tx.Rollback(ctx)
|
||||
if err != nil {
|
||||
|
@ -406,6 +410,7 @@ func assignQueuedTasks(ctx context.Context) error {
|
|||
Type: queuedTasks[lastAssigned].Type,
|
||||
FileID: queuedTasks[lastAssigned].FileID,
|
||||
FileMD5: queuedTasks[lastAssigned].FileMD5,
|
||||
FileExtension: queuedTasks[lastAssigned].FileExtension,
|
||||
Data: queuedTasks[lastAssigned].Data,
|
||||
FfmpegCommand: queuedTasks[lastAssigned].FfmpegCommand,
|
||||
}
|
||||
|
|
166
server/upload.go
166
server/upload.go
|
@ -1,15 +1,20 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"git.lastassault.de/speatzle/morffix/constants"
|
||||
"gopkg.in/vansante/go-ffprobe.v2"
|
||||
)
|
||||
|
||||
func handleUpload(w http.ResponseWriter, r *http.Request) {
|
||||
|
@ -18,30 +23,56 @@ func handleUpload(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
id := r.PathValue("id")
|
||||
if id == "" {
|
||||
strid := r.PathValue("id")
|
||||
if strid == "" {
|
||||
http.Error(w, "No id", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
var count int
|
||||
err := db.QueryRow(r.Context(), "SELECT COUNT(*) FROM tasks WHERE file_id = $1 AND worker_id = $2", id, r.Header.Get(constants.UUID_HEADER)).Scan(&count)
|
||||
id, err := strconv.Atoi(strid)
|
||||
if err != nil {
|
||||
slog.ErrorContext(r.Context(), "Query Task Count", "err", err)
|
||||
http.Error(w, "Error Query Task Count: "+err.Error(), http.StatusInternalServerError)
|
||||
errorUpload(r, w, 0, "Convert File ID to int", err)
|
||||
return
|
||||
}
|
||||
|
||||
if count < 1 {
|
||||
slog.ErrorContext(r.Context(), "No Running Task for file", "id", id)
|
||||
http.Error(w, "No Running Task for file: "+id, http.StatusBadRequest)
|
||||
if r.Header.Get(constants.WORKER_VERSION_HEADER) != constants.WORKER_VERSION {
|
||||
http.Error(w, "Wrong Worker Version", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
taskid, err := strconv.Atoi(r.Header.Get(constants.TASK_ID_HEADER))
|
||||
if err != nil {
|
||||
errorUpload(r, w, 0, "Convert Task ID to int", err)
|
||||
return
|
||||
}
|
||||
|
||||
var fileid int
|
||||
var workerid string
|
||||
var status constants.TaskStatus
|
||||
err = db.QueryRow(r.Context(), "SELECT file_id, worker_id, status FROM tasks WHERE id = $1", taskid).Scan(&fileid, &workerid, &status)
|
||||
if err != nil {
|
||||
errorUpload(r, w, 0, "Getting Task for Upload", err)
|
||||
return
|
||||
}
|
||||
|
||||
if workerid != r.Header.Get(constants.UUID_HEADER) {
|
||||
errorUpload(r, w, taskid, "Worker is not Assigned to Task", err)
|
||||
return
|
||||
}
|
||||
|
||||
if fileid != id {
|
||||
errorUpload(r, w, taskid, "File is not for this Task", err)
|
||||
return
|
||||
}
|
||||
|
||||
if !(status == constants.TASK_STATUS_RUNNING || status == constants.TASK_STATUS_UNKNOWN) {
|
||||
errorUpload(r, w, taskid, "File can only be uploaded when task status is running or unknown", err)
|
||||
return
|
||||
}
|
||||
|
||||
hash, err := base64.StdEncoding.DecodeString(r.Header.Get(constants.HASH_HEADER))
|
||||
if err != nil {
|
||||
slog.ErrorContext(r.Context(), "Decode Hash", "err", err)
|
||||
http.Error(w, "Decode Hash: "+err.Error(), http.StatusBadRequest)
|
||||
errorUpload(r, w, taskid, "Decode Hash", err)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -49,8 +80,7 @@ func handleUpload(w http.ResponseWriter, r *http.Request) {
|
|||
var libraryID int
|
||||
err = db.QueryRow(r.Context(), "SELECT path, library_id FROM files WHERE id = $1", id).Scan(&fPath, &libraryID)
|
||||
if err != nil {
|
||||
slog.ErrorContext(r.Context(), "Query File Path", "err", err)
|
||||
http.Error(w, "Error Query File Path: "+err.Error(), http.StatusInternalServerError)
|
||||
errorUpload(r, w, taskid, "Query File Path", err)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -59,18 +89,18 @@ func handleUpload(w http.ResponseWriter, r *http.Request) {
|
|||
var tReplace bool
|
||||
err = db.QueryRow(r.Context(), "SELECT path, transcode_path, transcode_replace FROM libraries WHERE id = $1", libraryID).Scan(&lPath, &tPath, &tReplace)
|
||||
if err != nil {
|
||||
slog.ErrorContext(r.Context(), "Query Library Path", "err", err)
|
||||
http.Error(w, "Error Query Library Path: "+err.Error(), http.StatusInternalServerError)
|
||||
errorUpload(r, w, taskid, "Query Library Path", err)
|
||||
return
|
||||
}
|
||||
|
||||
// When replacing, send to temp, otherwise write directly
|
||||
var path string
|
||||
if tReplace {
|
||||
//path = filepath.Join(lPath, fPath)
|
||||
slog.ErrorContext(r.Context(), "Replace mode is not implemented")
|
||||
http.Error(w, "Replace mode is not implemented", http.StatusNotImplemented)
|
||||
return
|
||||
path = filepath.Join(lPath, fPath+constants.TEMP_FILE_EXTENSION)
|
||||
// if replace then this is a temp file and should be cleaned up on error
|
||||
defer os.Remove(path)
|
||||
} else {
|
||||
// todo write this to a temp file first also to be able to run cleanup on error and unify the rename logic
|
||||
path = filepath.Join(tPath, fPath)
|
||||
}
|
||||
|
||||
|
@ -78,15 +108,13 @@ func handleUpload(w http.ResponseWriter, r *http.Request) {
|
|||
|
||||
err = r.ParseMultipartForm(100 << 20)
|
||||
if err != nil {
|
||||
slog.ErrorContext(r.Context(), "Parse Multipart Form", "err", err)
|
||||
http.Error(w, "Parse Multipart Form: "+err.Error(), http.StatusInternalServerError)
|
||||
errorUpload(r, w, taskid, "Parse Multipart Form", err)
|
||||
return
|
||||
}
|
||||
|
||||
srcFile, _, err := r.FormFile(constants.FORM_FILE_KEY)
|
||||
if err != nil {
|
||||
slog.ErrorContext(r.Context(), "Parse Multipart Form", "err", err)
|
||||
http.Error(w, "Parse Multipart Form: "+err.Error(), http.StatusInternalServerError)
|
||||
errorUpload(r, w, taskid, "Form File", err)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -94,17 +122,18 @@ func handleUpload(w http.ResponseWriter, r *http.Request) {
|
|||
// MaxBytesReader closes the underlying io.Reader on its Close() is called
|
||||
defer src.Close()
|
||||
|
||||
err = os.MkdirAll(filepath.Dir(path), 0775)
|
||||
if err != nil {
|
||||
slog.ErrorContext(r.Context(), "Creating Folder", "err", err)
|
||||
http.Error(w, "Creating Folder: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
// if we are not replacing then we dont know if the destination folder exists
|
||||
if !tReplace {
|
||||
err = os.MkdirAll(filepath.Dir(path), 0775)
|
||||
if err != nil {
|
||||
errorUpload(r, w, taskid, "Creating Folder", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
out, err := os.Create(path)
|
||||
if err != nil {
|
||||
slog.ErrorContext(r.Context(), "Creating File", "err", err)
|
||||
http.Error(w, "Creating File: "+err.Error(), http.StatusInternalServerError)
|
||||
errorUpload(r, w, taskid, "Creating File", err)
|
||||
return
|
||||
}
|
||||
defer out.Close()
|
||||
|
@ -113,17 +142,80 @@ func handleUpload(w http.ResponseWriter, r *http.Request) {
|
|||
if err != nil {
|
||||
var maxBytesError *http.MaxBytesError
|
||||
if errors.As(err, &maxBytesError) {
|
||||
slog.ErrorContext(r.Context(), "Parse Multipart Form", "err", err)
|
||||
http.Error(w, "Parse Multipart Form: "+err.Error(), http.StatusRequestEntityTooLarge)
|
||||
slog.ErrorContext(r.Context(), "File to Large", "err", err)
|
||||
http.Error(w, "File to Large: "+err.Error(), http.StatusRequestEntityTooLarge)
|
||||
return
|
||||
}
|
||||
slog.ErrorContext(r.Context(), "Failed to write the uploaded content", "err", err)
|
||||
http.Error(w, "Failed to write the uploaded content: "+err.Error(), http.StatusInternalServerError)
|
||||
errorUpload(r, w, taskid, "Failed to write the uploaded content", err)
|
||||
return
|
||||
}
|
||||
|
||||
// TODO check file integrity
|
||||
_ = hash
|
||||
|
||||
slog.InfoContext(r.Context(), "upload done", "written", written)
|
||||
|
||||
out.Close()
|
||||
|
||||
// if we are replacing, then we need to change the original file to the uploaded file and update the hash in the database
|
||||
if tReplace {
|
||||
tx, err := db.Begin(r.Context())
|
||||
if err != nil {
|
||||
errorUpload(r, w, taskid, "Begin Transaction", err)
|
||||
return
|
||||
}
|
||||
|
||||
dstPath := filepath.Join(lPath, fPath)
|
||||
slog.InfoContext(r.Context(), "Replacing Original file With Uploaded File", "fileid", id, "path", path, "dstPath", dstPath)
|
||||
|
||||
err = os.Rename(path, dstPath)
|
||||
if err != nil {
|
||||
errorUpload(r, w, taskid, "Replace File", err)
|
||||
return
|
||||
}
|
||||
|
||||
info, err := os.Stat(dstPath)
|
||||
if err != nil {
|
||||
errorUpload(r, w, taskid, "Stat File", err)
|
||||
return
|
||||
}
|
||||
modTime := info.ModTime().UTC().Round(time.Second)
|
||||
size := info.Size()
|
||||
|
||||
ffprobeData, err := ffprobe.ProbeURL(r.Context(), dstPath)
|
||||
if err != nil {
|
||||
errorUpload(r, w, taskid, "ffProbe File", err)
|
||||
return
|
||||
}
|
||||
|
||||
_, err = tx.Exec(r.Context(), "UPDATE files SET old_md5 = md5, old_size = size, old_ffprobe_data = ffprobe_data WHERE id = $1", fileid)
|
||||
if err != nil {
|
||||
errorUpload(r, w, taskid, "Copy Filed Current Values to Old Values", err)
|
||||
return
|
||||
}
|
||||
|
||||
_, err = tx.Exec(r.Context(), "UPDATE files SET md5 = $2, size = $3, mod_time = $4, ffprobe_data = $5 WHERE id = $1", fileid, hash, size, modTime, ffprobeData)
|
||||
if err != nil {
|
||||
errorUpload(r, w, taskid, "Set New File Values", err)
|
||||
return
|
||||
}
|
||||
|
||||
err = tx.Commit(r.Context())
|
||||
if err != nil {
|
||||
errorUpload(r, w, taskid, "Commit File Hash", err)
|
||||
return
|
||||
}
|
||||
|
||||
slog.InfoContext(r.Context(), "Original file Replaced with Uploaded File", "fileid", id, "dstPath", dstPath)
|
||||
} else {
|
||||
// TODO implement "old" fields for non replace libraries, scan also needs to use old field if it is is non replace and the file has been transcoded
|
||||
}
|
||||
}
|
||||
|
||||
func errorUpload(r *http.Request, w http.ResponseWriter, taskID int, msg string, err error) {
|
||||
slog.ErrorContext(r.Context(), msg, "err", err)
|
||||
http.Error(w, msg+": "+err.Error(), http.StatusInternalServerError)
|
||||
if taskID != 0 {
|
||||
_, err2 := db.Exec(context.TODO(), "UPDATE tasks SET log = log || $2 WHERE id = $1", taskID, []string{fmt.Sprintf("%v MASTER: upload error: "+msg+": "+err.Error(), time.Now())})
|
||||
if err != nil {
|
||||
slog.ErrorContext(r.Context(), "Updating task log with upload error", "err", err2)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -247,7 +247,8 @@ func updateWorkerTaskStatus(ctx context.Context) {
|
|||
var health constants.FileHealth
|
||||
if ts.Task.Status == constants.TASK_STATUS_SUCCESS {
|
||||
health = constants.FILE_HEALTH_HEALTHY
|
||||
// TODO Auto Queue Transcode for Successfull Transcodes if library setting
|
||||
// Auto Queue Transcode for Successfull Transcodes if library setting say so
|
||||
go queueOnHealth(context.TODO(), ts.Task.ID)
|
||||
} else {
|
||||
// TODO, not all failures mean the file is damaged, only update on success and track ffmpeg errors in task result data. also remove -xerror and scan for errors manually to see all problems in logs
|
||||
health = constants.FILE_HEALTH_DAMAGED
|
||||
|
@ -294,3 +295,26 @@ func updateWorkerTaskStatus(ctx context.Context) {
|
|||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func queueOnHealth(ctx context.Context, taskID int) {
|
||||
var transcode_command_id *int
|
||||
var file_id *int
|
||||
err := db.QueryRow(ctx, `SELECT libraries.transcode_command_id, files.id
|
||||
FROM tasks
|
||||
INNER JOIN files ON files.id = tasks.file_id
|
||||
INNER JOIN libraries ON files.library_id = libraries.id
|
||||
WHERE tasks.id = $1 AND libraries.health_ok_queue_transcode = true AND libraries.transcode_command_id IS NOT NULL`, taskID).
|
||||
Scan(&transcode_command_id, &file_id)
|
||||
if err == pgx.ErrNoRows {
|
||||
slog.DebugContext(ctx, "No need to queue Transcode", "task_id", taskID)
|
||||
} else if err != nil {
|
||||
slog.ErrorContext(ctx, "Unable to Query if and how we should queue a Transcode on Health Check Success", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
slog.InfoContext(ctx, "Queueing Transcode On HealthCheck Success", "health_task_id", taskID, "transcode_command_id", transcode_command_id)
|
||||
_, err = db.Exec(ctx, "INSERT INTO tasks (file_id, type, status, data, ffmpeg_command_id) VALUES ($1,$2,$3,$4,$5)", file_id, constants.TASK_TYPE_TRANSCODE, constants.TASK_STATUS_QUEUED, nil, transcode_command_id)
|
||||
if err != nil {
|
||||
slog.ErrorContext(ctx, "Queueing Transcode On HealthCheck Success", "err", err, "health_task_id", taskID, "transcode_command_id", transcode_command_id)
|
||||
}
|
||||
}
|
||||
|
|
45
static/js/echarts.min.js
vendored
Normal file
45
static/js/echarts.min.js
vendored
Normal file
File diff suppressed because one or more lines are too long
7
static/js/library_filter.js
Normal file
7
static/js/library_filter.js
Normal file
|
@ -0,0 +1,7 @@
|
|||
function setLibrary(library_id) {
|
||||
if (library_id == "-1") {
|
||||
window.location.href = "/stats";
|
||||
} else {
|
||||
window.location.href = "/stats/" + library_id;
|
||||
}
|
||||
}
|
|
@ -3,58 +3,65 @@
|
|||
@import url("component.css");
|
||||
|
||||
:root {
|
||||
font-family: monospace;
|
||||
font-size: 12px;
|
||||
overflow-y: auto;
|
||||
font-family: monospace;
|
||||
font-size: 12px;
|
||||
overflow-y: auto;
|
||||
}
|
||||
|
||||
body {
|
||||
padding: 0.5rem;
|
||||
gap: 0.5rem;
|
||||
padding: 0.5rem;
|
||||
gap: 0.5rem;
|
||||
}
|
||||
|
||||
.counter-image {
|
||||
image-rendering: pixelated;
|
||||
image-rendering: -moz-crisp-edges;
|
||||
image-rendering: crisp-edges;
|
||||
flex-grow: 1;
|
||||
image-rendering: pixelated;
|
||||
image-rendering: -moz-crisp-edges;
|
||||
image-rendering: crisp-edges;
|
||||
flex-grow: 1;
|
||||
max-height: 50vh;
|
||||
max-width: 22.5vh;
|
||||
}
|
||||
|
||||
.counter {
|
||||
flex-flow: row nowrap;
|
||||
align-items: end;
|
||||
font-size: 1.5rem;
|
||||
font-weight: bold;
|
||||
flex-flow: row nowrap;
|
||||
align-items: end;
|
||||
font-size: 1.5rem;
|
||||
font-weight: bold;
|
||||
}
|
||||
|
||||
.workers {
|
||||
flex-flow: row wrap;
|
||||
gap: 2rem;
|
||||
flex-flow: row wrap;
|
||||
gap: 2rem;
|
||||
}
|
||||
|
||||
.workers>* {
|
||||
gap: 0.5rem;
|
||||
border: 1px solid var(--fg);
|
||||
.workers > * {
|
||||
gap: 0.5rem;
|
||||
border: 1px solid var(--fg);
|
||||
}
|
||||
|
||||
.log>p {
|
||||
padding: 0.25rem 0;
|
||||
user-select: text;
|
||||
.log > p {
|
||||
padding: 0.25rem 0;
|
||||
user-select: text;
|
||||
}
|
||||
|
||||
.log>p:hover {
|
||||
background: var(--cl-hl);
|
||||
.log > p:hover {
|
||||
background: var(--cl-hl);
|
||||
}
|
||||
|
||||
nav> :first-child {
|
||||
font-size: 2rem;
|
||||
font-weight: bold;
|
||||
nav > :first-child {
|
||||
font-size: 2rem;
|
||||
font-weight: bold;
|
||||
}
|
||||
|
||||
.short-button {
|
||||
align-self: start;
|
||||
align-self: start;
|
||||
}
|
||||
|
||||
.button-list {
|
||||
flex-direction: row;
|
||||
flex-direction: row;
|
||||
}
|
||||
|
||||
.stats {
|
||||
flex-flow: row wrap;
|
||||
gap: 2rem;
|
||||
}
|
|
@ -17,7 +17,7 @@ func RunHealthCheck(conf config.Config, t *types.Task, data types.HealthCheckDat
|
|||
l := log.GetTaskLogger(t)
|
||||
|
||||
// TODO Figure out how to get correct file ending
|
||||
path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-health-%v-%v.mkv", t.ID, t.FileID))
|
||||
path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-health-%v-%v."+t.FileExtension, t.ID, t.FileID))
|
||||
|
||||
// Set ffmpeg input path
|
||||
if len(t.FfmpegCommand.InputFiles) == 0 {
|
||||
|
|
|
@ -27,6 +27,7 @@ func StartTask(conf config.Config, data types.TaskStart) error {
|
|||
Type: data.Type,
|
||||
FileID: data.FileID,
|
||||
FileMD5: data.FileMD5,
|
||||
FileExtension: data.FileExtension,
|
||||
FfmpegCommand: data.FfmpegCommand,
|
||||
}
|
||||
|
||||
|
|
|
@ -17,8 +17,8 @@ func RunTranscode(conf config.Config, t *types.Task, data types.TranscodeData) {
|
|||
l := log.GetTaskLogger(t)
|
||||
|
||||
// TODO Figure out how to get correct file ending
|
||||
src_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-src-%v-%v.mkv", t.ID, t.FileID))
|
||||
dst_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-dst-%v-%v.mkv", t.ID, t.FileID))
|
||||
src_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-src-%v-%v."+t.FileExtension, t.ID, t.FileID))
|
||||
dst_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-dst-%v-%v."+t.FileExtension, t.ID, t.FileID))
|
||||
|
||||
// Set ffmpeg input path
|
||||
if len(t.FfmpegCommand.InputFiles) == 0 {
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"mime/multipart"
|
||||
"net/http"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"git.lastassault.de/speatzle/morffix/config"
|
||||
|
@ -82,6 +83,7 @@ func uploadFile(ctx context.Context, l *slog.Logger, conf config.Config, path st
|
|||
req.Header.Add(constants.UUID_HEADER, uuid.String())
|
||||
req.Header.Add(constants.WORKER_VERSION_HEADER, constants.WORKER_VERSION)
|
||||
req.Header.Add(constants.HASH_HEADER, base64.StdEncoding.EncodeToString(hash))
|
||||
req.Header.Add(constants.TASK_ID_HEADER, strconv.Itoa(t.ID))
|
||||
req.Header.Add("Content-Type", "multipart/form-data; boundary=\""+m.Boundary()+"\"")
|
||||
|
||||
var client = &http.Client{
|
||||
|
@ -97,7 +99,8 @@ func uploadFile(ctx context.Context, l *slog.Logger, conf config.Config, path st
|
|||
l.InfoContext(ctx, "Upload Done")
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf("Got HTTP Status Code: %v", resp.StatusCode)
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return fmt.Errorf("Got HTTP Status Code: %v Body: %v", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
<html>
|
||||
<head>
|
||||
<link rel="stylesheet" href="/static/style/style.css">
|
||||
<script src="/static/js/echarts.min.js"></script>
|
||||
</head>
|
||||
<body>
|
||||
{{template "navbar"}}
|
||||
|
|
|
@ -31,6 +31,24 @@
|
|||
</select>
|
||||
<input type="submit" value="Submit">
|
||||
</form>
|
||||
<h2>Set Parallel Tasks</h2>
|
||||
<form method="POST" action= "/parallel_tasks">
|
||||
<label for="worker">Worker:</label>
|
||||
<select id="worker" name="worker">
|
||||
<option value="all">All</option>
|
||||
{{range $w := .Workers}}
|
||||
<option value="{{$w.ID}}">{{$w.Name}}</option>
|
||||
{{end}}
|
||||
</select>
|
||||
<label for="parallel_tasks">Parallel Tasks</label>
|
||||
<select id="parallel_tasks" name="parallel_tasks">
|
||||
<option value="1">1</option>
|
||||
<option value="2">2</option>
|
||||
<option value="3">3</option>
|
||||
<option value="4">4</option>
|
||||
</select>
|
||||
<input type="submit" value="Submit">
|
||||
</form>
|
||||
<h2>Workers</h2>
|
||||
<div class="workers">
|
||||
{{range $w := .Workers}}
|
||||
|
@ -60,6 +78,22 @@
|
|||
<td>
|
||||
{{ $w.ConnectionChanged }}
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>
|
||||
QueueEnable
|
||||
</td>
|
||||
<td>
|
||||
{{ $w.QueueEnable }}
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>
|
||||
ParallelTasks
|
||||
</td>
|
||||
<td>
|
||||
{{ $w.ParallelTasks }}
|
||||
</td>
|
||||
</tr>
|
||||
{{if $w.Connected}}
|
||||
<tr>
|
||||
|
|
|
@ -13,6 +13,8 @@
|
|||
<th>ScanNewQueueTranscode</th>
|
||||
<th>ScanChangedQueueTranscode</th>
|
||||
<th>HealthOkQueueTranscode</th>
|
||||
<th>HealthCommand</th>
|
||||
<th>TranscodeCommand</th>
|
||||
</tr>
|
||||
{{range $l := .Libraries}}
|
||||
<tr onclick="window.location='/libraries/{{ $l.ID }}';">
|
||||
|
@ -49,6 +51,12 @@
|
|||
<td>
|
||||
{{ $l.HealthOkQueueTranscode }}
|
||||
</td>
|
||||
<td>
|
||||
{{ $l.HealthCommandID }}
|
||||
</td>
|
||||
<td>
|
||||
{{ $l.TranscodeCommandID }}
|
||||
</td>
|
||||
</tr>
|
||||
{{end}}
|
||||
</table>
|
||||
|
@ -67,14 +75,28 @@
|
|||
<input type="text" name="transcode_path">
|
||||
<label>Queue Heathcheck for new files on Scan:</label>
|
||||
<input type="checkbox" name="scan_new_queue_health">
|
||||
<label>Queue Heathcheck for changed files on Scan:</label>
|
||||
<label>Queue Heathcheck for changed files on Scan (Not Implemented):</label>
|
||||
<input type="checkbox" name="scan_changed_queue_health">
|
||||
<label>Queue Transcode for new files on Scan:</label>
|
||||
<label>Queue Transcode for new files on Scan (Not Implemented):</label>
|
||||
<input type="checkbox" name="scan_new_queue_transcode">
|
||||
<label>Queue Transcode for changed files on Scan:</label>
|
||||
<label>Queue Transcode for changed files on Scan (Not Implemented):</label>
|
||||
<input type="checkbox" name="scan_changed_queue_transcode">
|
||||
<label>Queue Transcode on Healthcheck Success:</label>
|
||||
<input type="checkbox" name="health_ok_queue_transcode">
|
||||
<label for="health_command_id">Health Command:</label>
|
||||
<select id="health_command_id" name="health_command_id">
|
||||
<option value="">None</option>
|
||||
{{range $l := .FfmpegCommands}}
|
||||
<option value="{{$l.ID}}">{{$l.Name}}</option>
|
||||
{{end}}
|
||||
</select>
|
||||
<label for="transcode_command_id">Transcode Command:</label>
|
||||
<select id="transcode_command_id" name="transcode_command_id">
|
||||
<option value="">None</option>
|
||||
{{range $l := .FfmpegCommands}}
|
||||
<option value="{{$l.ID}}">{{$l.Name}}</option>
|
||||
{{end}}
|
||||
</select>
|
||||
<input type="submit" value="Submit">
|
||||
</form>
|
||||
{{template "tail"}}
|
|
@ -4,5 +4,6 @@
|
|||
<a class="button" href="/libraries">Libraries</a>
|
||||
<a class="button" href="/tasks">Tasks</a>
|
||||
<a class="button" href="/ffmpeg_commands">FFmpeg Commands</a>
|
||||
<a class="button" href="/stats">Stats</a>
|
||||
</nav>
|
||||
{{end}}
|
69
tmpl/stats.tmpl
Normal file
69
tmpl/stats.tmpl
Normal file
|
@ -0,0 +1,69 @@
|
|||
{{template "head"}}
|
||||
<script src="/static/js/library_filter.js"></script>
|
||||
<h2>Stats</h2>
|
||||
<label for="library">Library:</label>
|
||||
<select id="library" name="library" onchange="setLibrary(this.value)" class="short-button">
|
||||
<option {{if eq .SelectedLibrary "-1" }} selected {{end}}value="-1">ALL</option>
|
||||
{{range $l := $.Libraries}}
|
||||
<option {{if eq $.SelectedLibrary $l.ID }} selected {{end}}value="{{$l.ID}}">{{$l.Name}}</option>
|
||||
{{end}}
|
||||
</select>
|
||||
|
||||
<h4>Size</h4>
|
||||
<div class="counter">
|
||||
{{ $l := len .Size }}
|
||||
{{range $i, $c := .Size}}
|
||||
<img class="counter-image" alt="{{$c}}" src="/static/counter/{{$c}}.gif">
|
||||
{{ $n := sub $l $i }}
|
||||
{{ if eq $n 4 }}
|
||||
KB
|
||||
{{ else if eq $n 7 }}
|
||||
MB
|
||||
{{ else if eq $n 10 }}
|
||||
GB
|
||||
{{ else if eq $n 13 }}
|
||||
TB
|
||||
{{end}}
|
||||
{{end}}
|
||||
</div>
|
||||
|
||||
<h4>Files</h4>
|
||||
<div class="counter">
|
||||
{{ $l := len .Count }}
|
||||
{{range $i, $c := .Count}}
|
||||
<img class="counter-image" alt="{{$c}}" src="/static/counter/{{$c}}.gif">
|
||||
{{ $n := sub $l $i }}
|
||||
{{ if eq $n 4 }}
|
||||
.
|
||||
{{ else if eq $n 7 }}
|
||||
.
|
||||
{{ else if eq $n 10 }}
|
||||
.
|
||||
{{ else if eq $n 13 }}
|
||||
.
|
||||
{{end}}
|
||||
{{end}}
|
||||
</div>
|
||||
<h4>Saved</h4>
|
||||
<div class="counter">
|
||||
{{ $l := len .Saved }}
|
||||
{{range $i, $c := .Saved}}
|
||||
<img class="counter-image" alt="{{$c}}" src="/static/counter/{{$c}}.gif">
|
||||
{{ $n := sub $l $i }}
|
||||
{{ if eq $n 4 }}
|
||||
KB
|
||||
{{ else if eq $n 7 }}
|
||||
MB
|
||||
{{ else if eq $n 10 }}
|
||||
GB
|
||||
{{ else if eq $n 13 }}
|
||||
TB
|
||||
{{end}}
|
||||
{{end}}
|
||||
</div>
|
||||
<div class="stats">
|
||||
{{range $c := .Charts}}
|
||||
{{$c.Element}} {{$c.Script}}
|
||||
{{end}}
|
||||
</div>
|
||||
{{template "tail"}}
|
|
@ -10,6 +10,7 @@ type TaskStart struct {
|
|||
ID int `json:"id"`
|
||||
FileID int `json:"file_id"`
|
||||
FileMD5 []byte `json:"file_md5"`
|
||||
FileExtension string `json:"file_extension"`
|
||||
Type constants.TaskType `json:"type"`
|
||||
Data json.RawMessage
|
||||
FfmpegCommand FFmpegCommand `json:"ffmpeg_command"`
|
||||
|
@ -19,6 +20,7 @@ type Task struct {
|
|||
ID int `json:"id"`
|
||||
FileID int `json:"file_id"`
|
||||
FileMD5 []byte `json:"md5"`
|
||||
FileExtension string `json:"file_extension"`
|
||||
Type constants.TaskType `json:"type"`
|
||||
Status constants.TaskStatus `json:"status"`
|
||||
FfmpegCommand FFmpegCommand `json:"ffmpeg_command"`
|
||||
|
|
|
@ -2,6 +2,7 @@ package worker
|
|||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/mackerelio/go-osstat/cpu"
|
||||
"github.com/mackerelio/go-osstat/memory"
|
||||
|
@ -14,13 +15,31 @@ func init() {
|
|||
rpcServer.RegisterMethod("status", status)
|
||||
}
|
||||
|
||||
var cpuBefore *cpu.Stats
|
||||
var cpuUsage uint64
|
||||
var cpuCount uint64
|
||||
|
||||
func calcUsage() {
|
||||
for {
|
||||
cStats, _ := cpu.Get()
|
||||
if cStats != nil {
|
||||
cpuUsage = cStats.Total
|
||||
cpuCount = uint64(cStats.CPUCount)
|
||||
|
||||
if cpuBefore != nil {
|
||||
total := float64(cStats.Total - cpuBefore.Total)
|
||||
cpuUsage = uint64(float64((cStats.User-cpuBefore.User)+(cStats.System-cpuBefore.System)) / total * 100)
|
||||
}
|
||||
cpuBefore = cStats
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
func status(ctx context.Context, req rpc.Request) (any, error) {
|
||||
s := types.WorkerStatus{}
|
||||
cStats, _ := cpu.Get()
|
||||
if cStats != nil {
|
||||
s.CPUUsage = cStats.Total
|
||||
s.CPUCount = uint64(cStats.CPUCount)
|
||||
}
|
||||
s.CPUUsage = cpuUsage
|
||||
s.CPUCount = cpuCount
|
||||
mStats, _ := memory.Get()
|
||||
if mStats != nil {
|
||||
s.MemoryUsage = uint64(float64(mStats.Used) / float64(mStats.Total) * 100)
|
||||
|
|
|
@ -60,6 +60,9 @@ func Start(_conf config.Config) {
|
|||
exit = true
|
||||
cancel()
|
||||
}()
|
||||
|
||||
go calcUsage()
|
||||
|
||||
for {
|
||||
if exit {
|
||||
slog.InfoContext(ctx, "Done")
|
||||
|
|
67
workerpool/workerpool.go
Normal file
67
workerpool/workerpool.go
Normal file
|
@ -0,0 +1,67 @@
|
|||
package workerpool
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type WorkerPool struct {
|
||||
workers int
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
queue chan func(context.Context)
|
||||
qg sync.WaitGroup
|
||||
}
|
||||
|
||||
func NewWorkerPool(workers int, queueSize int) *WorkerPool {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
queue := make(chan func(context.Context), queueSize)
|
||||
|
||||
workerPool := WorkerPool{
|
||||
workers: workers,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
queue: queue,
|
||||
}
|
||||
|
||||
for i := 0; i < workerPool.workers; i++ {
|
||||
workerPool.wg.Add(1)
|
||||
go workerPool.work(workerPool.ctx)
|
||||
}
|
||||
|
||||
return &workerPool
|
||||
}
|
||||
|
||||
func (wp *WorkerPool) Stop() {
|
||||
wp.cancel()
|
||||
wp.wg.Wait()
|
||||
}
|
||||
|
||||
func (wp *WorkerPool) QueueJob(job func(context.Context)) {
|
||||
wp.qg.Add(1)
|
||||
wp.queue <- job
|
||||
}
|
||||
|
||||
func (wp *WorkerPool) WaitForEmptyQueue() {
|
||||
wp.qg.Wait()
|
||||
}
|
||||
|
||||
func (wp *WorkerPool) QueueLength() int {
|
||||
return len(wp.queue)
|
||||
}
|
||||
|
||||
func (wp *WorkerPool) work(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
wp.wg.Done()
|
||||
return
|
||||
case job := <-wp.queue:
|
||||
func() {
|
||||
defer wp.qg.Done()
|
||||
job(ctx)
|
||||
}()
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Add table
Reference in a new issue