Compare commits

...

65 commits

Author SHA1 Message Date
15a960da19 fix sql, counter show 0
All checks were successful
/ release (push) Successful in 46s
2025-03-19 23:13:08 +01:00
a5fc856764 Saved Stat
All checks were successful
/ release (push) Successful in 48s
2025-03-19 23:07:11 +01:00
e7371850d1 Set Parallel Tasks, Respect which worker
All checks were successful
/ release (push) Successful in 47s
2025-03-19 21:50:24 +01:00
1e4016d075 fix parallel_tasks form
All checks were successful
/ release (push) Successful in 43s
2025-03-19 21:46:35 +01:00
553392e73f fix template
All checks were successful
/ release (push) Successful in 44s
2025-03-19 21:44:11 +01:00
57969cc07d Add Parallel Tasks Settings
All checks were successful
/ release (push) Successful in 44s
2025-03-19 21:39:46 +01:00
8c3ba7e883 pass missing field
All checks were successful
/ release (push) Successful in 43s
2025-03-19 21:06:36 +01:00
5ecfd81967 preserve file extension
All checks were successful
/ release (push) Successful in 52s
2025-03-19 20:59:38 +01:00
0ce5b46449 fix sql, improve errors, run ffprobe
All checks were successful
/ release (push) Successful in 47s
2025-03-19 17:47:40 +01:00
64fd3da925 fix struct
All checks were successful
/ release (push) Successful in 41s
2025-03-19 13:33:57 +01:00
fa1deed55a fix migrations
All checks were successful
/ release (push) Successful in 42s
2025-03-19 13:32:37 +01:00
1af3a3d5b9 add missing fields to struct
All checks were successful
/ release (push) Successful in 44s
2025-03-19 13:27:15 +01:00
b98be1fd70 copy over ffprobe_data
All checks were successful
/ release (push) Successful in 43s
2025-03-19 13:18:04 +01:00
808953f46f Add old fields, fix md5, fix migratins.
All checks were successful
/ release (push) Successful in 50s
2025-03-19 13:11:03 +01:00
d25890ad26 Log HTTP Bodye on Upload Error into Task
All checks were successful
/ release (push) Successful in 46s
2025-03-19 11:06:18 +01:00
2630ebb0cf Fix sql
All checks were successful
/ release (push) Successful in 48s
2025-03-19 09:57:18 +00:00
d0d2529570 Update mod_time and size as to not confuse the scan
All checks were successful
/ release (push) Successful in 43s
2025-03-19 00:36:19 +01:00
8a6390fb8d Prioritize Transcode Tasks 2025-03-19 00:35:36 +01:00
a2efbdea4c Implement Transcode Replace mode
All checks were successful
/ release (push) Successful in 1m10s
2025-03-18 23:35:28 +01:00
a6e10369bb Update worker version, add TASK ID HEADER 2025-03-18 23:34:50 +01:00
f2e2236653 fix sql
All checks were successful
/ release (push) Successful in 29s
2024-10-11 17:45:23 +02:00
9298f4cccf fix sql
All checks were successful
/ release (push) Successful in 28s
2024-10-11 17:22:08 +02:00
e05051f34d fix template
All checks were successful
/ release (push) Successful in 29s
2024-10-11 17:17:42 +02:00
c87dab94a0 fix sql
All checks were successful
/ release (push) Successful in 29s
2024-10-11 17:15:38 +02:00
12f700d1d0 Implement Queue Health On New File and Queue Transcode on Health Success
All checks were successful
/ release (push) Successful in 32s
2024-10-11 17:07:52 +02:00
f08a07e87d fix tasks
All checks were successful
/ release (push) Successful in 29s
2024-10-08 16:51:36 +02:00
a60f05be12 add none
All checks were successful
/ release (push) Successful in 28s
2024-10-08 16:48:12 +02:00
6927e31a62 fix template
All checks were successful
/ release (push) Successful in 29s
2024-10-08 16:46:51 +02:00
011f97a3e6 Add Comand to Library
All checks were successful
/ release (push) Successful in 32s
2024-10-08 16:42:25 +02:00
f401127127 fix css
All checks were successful
/ release (push) Successful in 29s
2024-10-08 15:07:15 +02:00
20164f8983 limit counter size
All checks were successful
/ release (push) Successful in 30s
2024-10-08 14:50:24 +02:00
275657e584 adjust css
All checks were successful
/ release (push) Successful in 28s
2024-10-08 13:56:54 +02:00
dbaa8f1de7 add counters
All checks were successful
/ release (push) Successful in 29s
2024-10-08 13:49:01 +02:00
dcbddfc6cd fix js
All checks were successful
/ release (push) Successful in 29s
2024-10-08 13:01:59 +02:00
849b4cf004 fix template
All checks were successful
/ release (push) Successful in 28s
2024-10-08 12:55:54 +02:00
91e3504abe fix template
All checks were successful
/ release (push) Successful in 28s
2024-10-08 12:49:32 +02:00
29c7cdd0bf fix sql
All checks were successful
/ release (push) Successful in 28s
2024-10-08 12:48:12 +02:00
cee5353b54 make -1 all
All checks were successful
/ release (push) Successful in 30s
2024-10-08 12:44:23 +02:00
b6c22df6f1 fix template
All checks were successful
/ release (push) Successful in 31s
2024-10-08 12:36:03 +02:00
7d037be106 Add Library filter to Stats
All checks were successful
/ release (push) Successful in 1m7s
2024-10-08 11:30:04 +02:00
9d2519c085 fix cpu usage reporting
All checks were successful
/ release (push) Successful in 29s
2024-07-14 05:04:25 +02:00
f818a3525b Adjust more colors
All checks were successful
/ release (push) Successful in 30s
2024-07-14 04:15:25 +02:00
043d81b823 Set Chart Colors
All checks were successful
/ release (push) Successful in 28s
2024-07-14 03:54:00 +02:00
04a05284ed deduplicate days
All checks were successful
/ release (push) Successful in 28s
2024-07-14 03:11:22 +02:00
8b6e20ba00 limit task status chart to 7 days
All checks were successful
/ release (push) Successful in 28s
2024-07-14 02:36:53 +02:00
64315356ae Fix Order by
All checks were successful
/ release (push) Successful in 28s
2024-07-14 02:33:17 +02:00
4c3df8fef6 ORder TAsk Status
All checks were successful
/ release (push) Successful in 28s
2024-07-14 02:31:10 +02:00
a75ce3287d Add Task Status Per Day Bar Chart
All checks were successful
/ release (push) Successful in 27s
2024-07-14 02:18:10 +02:00
2e360f4b20 Improve Stats group by 2024-07-14 00:37:16 +02:00
cd78de96b4 Add Resolution chart
All checks were successful
/ release (push) Successful in 29s
2024-07-14 00:08:07 +02:00
8e15cdd6e8 Rework chart gen. Add Health, Tanscode, Task Status Charts
All checks were successful
/ release (push) Successful in 32s
2024-07-13 23:18:47 +02:00
091ef322d5 Fix Chart appearance
All checks were successful
/ release (push) Successful in 30s
2024-07-13 03:46:55 +02:00
38219f7b07 Add Stats Chart
All checks were successful
/ release (push) Successful in 39s
2024-07-13 03:33:43 +02:00
0f5d842a64 Add stats page
All checks were successful
/ release (push) Successful in 29s
2024-07-13 01:45:43 +02:00
8ba0e8f2ab fix sql column
All checks were successful
/ release (push) Successful in 27s
2024-07-12 23:15:40 +02:00
03eb3541a5 Only hash files if library is enabled
All checks were successful
/ release (push) Successful in 30s
2024-07-12 23:04:57 +02:00
1871e1c26f increase hash limit
All checks were successful
/ release (push) Successful in 27s
2024-07-11 22:58:48 +02:00
81fa489417 fix row collection
All checks were successful
/ release (push) Successful in 28s
2024-07-11 22:55:39 +02:00
aeb47c2593 User workerpool for scanning and hashing
All checks were successful
/ release (push) Successful in 31s
2024-07-11 22:47:26 +02:00
f12462250a Set Correct Mod Time for New Files 2024-07-11 19:24:15 +02:00
bae1112074 remove extra commit
All checks were successful
/ release (push) Successful in 28s
2024-07-07 03:50:16 +02:00
5d6a29407d Fix Subseqent scan chaning all New and Changed File to Exists. Brick Missing for now. Make new file async
Some checks failed
/ release (push) Failing after 11s
2024-07-07 03:49:21 +02:00
4019d1764e Reduce non video file log level
All checks were successful
/ release (push) Successful in 27s
2024-07-07 03:14:37 +02:00
67aa389f79 dont error scan on ffprobe failure
All checks were successful
/ release (push) Successful in 26s
2024-07-07 02:18:09 +02:00
75a578fd36 Merge pull request 'scan_rework' (#1) from scan_rework into main
All checks were successful
/ release (push) Successful in 27s
Reviewed-on: #1
2024-07-06 23:53:27 +00:00
38 changed files with 1112 additions and 194 deletions

View file

@ -2,13 +2,14 @@ package constants
import "fmt" import "fmt"
const WORKER_VERSION = "v1" const WORKER_VERSION = "v2"
const WORKER_VERSION_HEADER = "morffix-version" const WORKER_VERSION_HEADER = "morffix-version"
const SHARED_SECRET_HEADER = "morffix-secret" const SHARED_SECRET_HEADER = "morffix-secret"
const NAME_HEADER = "morffix-name" const NAME_HEADER = "morffix-name"
const UUID_HEADER = "morffix-uuid" const UUID_HEADER = "morffix-uuid"
const HASH_HEADER = "morffix-hash" const HASH_HEADER = "morffix-hash"
const TASK_ID_HEADER = "morffix-task-id"
const INDEX_TEMPLATE_NAME = "index.tmpl" const INDEX_TEMPLATE_NAME = "index.tmpl"
const LIBRARIES_TEMPLATE_NAME = "libraries.tmpl" const LIBRARIES_TEMPLATE_NAME = "libraries.tmpl"
@ -17,6 +18,7 @@ const MESSAGE_TEMPLATE_NAME = "message.tmpl"
const TASKS_TEMPLATE_NAME = "tasks.tmpl" const TASKS_TEMPLATE_NAME = "tasks.tmpl"
const TASK_TEMPLATE_NAME = "task.tmpl" const TASK_TEMPLATE_NAME = "task.tmpl"
const FFMPEG_COMMANDS_TEMPLATE_NAME = "ffmpeg_commands.tmpl" const FFMPEG_COMMANDS_TEMPLATE_NAME = "ffmpeg_commands.tmpl"
const STATS_TEMPLATE_NAME = "stats.tmpl"
const FORM_FILE_KEY = "file" const FORM_FILE_KEY = "file"
@ -25,6 +27,8 @@ const SORT_ORDER_ASC_PARAM = "order_asc"
const QUERY_LIMIT_PARAM = "limit" const QUERY_LIMIT_PARAM = "limit"
const QUERY_PAGE_PARAM = "page" const QUERY_PAGE_PARAM = "page"
const TEMP_FILE_EXTENSION = ".morffix"
type TaskType int type TaskType int
const ( const (

1
go.mod
View file

@ -12,6 +12,7 @@ require (
) )
require ( require (
github.com/go-echarts/go-echarts/v2 v2.4.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 // indirect github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 // indirect

2
go.sum
View file

@ -2,6 +2,8 @@ github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8
github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-echarts/go-echarts/v2 v2.4.0 h1:efD46dmAvaZEWrBHAGjE8cfDK48vvFTHz5N9VqW5rYc=
github.com/go-echarts/go-echarts/v2 v2.4.0/go.mod h1:56YlvzhW/a+du15f3S2qUGNDfKnFOeJSThBIrVFHDtI=
github.com/golang-migrate/migrate/v4 v4.17.1 h1:4zQ6iqL6t6AiItphxJctQb3cFqWiSpMnX7wLTPnnYO4= github.com/golang-migrate/migrate/v4 v4.17.1 h1:4zQ6iqL6t6AiItphxJctQb3cFqWiSpMnX7wLTPnnYO4=
github.com/golang-migrate/migrate/v4 v4.17.1/go.mod h1:m8hinFyWBn0SA4QKHuKh175Pm9wjmxj3S2Mia7dbXzM= github.com/golang-migrate/migrate/v4 v4.17.1/go.mod h1:m8hinFyWBn0SA4QKHuKh175Pm9wjmxj3S2Mia7dbXzM=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=

View file

@ -1,2 +1,2 @@
ALTER TABLE files ALTER TABLE files
DROP IF EXISTS ffprobe; DROP IF EXISTS ffprobe_data;

View file

@ -0,0 +1,3 @@
ALTER TABLE libraries
DROP IF EXISTS transcode_command_id,
DROP IF EXISTS health_command_id;

View file

@ -0,0 +1,3 @@
ALTER TABLE libraries
ADD transcode_command_id bigint REFERENCES ffmpeg_commands(id),
ADD health_command_id bigint REFERENCES ffmpeg_commands(id);

View file

@ -0,0 +1,4 @@
ALTER TABLE files
DROP IF EXISTS old_md5,
DROP IF EXISTS old_ffprobe_data,
DROP IF EXISTS old_size;

View file

@ -0,0 +1,3 @@
ALTER TABLE files ADD old_md5 uuid,
ADD old_ffprobe_data JSONB,
ADD old_size bigint;

View file

@ -0,0 +1,2 @@
ALTER TABLE workers
DROP IF EXISTS parallel_tasks;

View file

@ -0,0 +1,2 @@
ALTER TABLE workers
ADD parallel_tasks smallint NOT NULL DEFAULT 1;

View file

@ -18,8 +18,9 @@ type IndexData struct {
type IndexWorker struct { type IndexWorker struct {
ID string ID string
Worker Worker
Status *types.WorkerStatus Status *types.WorkerStatus
QueueEnable bool QueueEnable bool
ParallelTasks int
} }
func handleIndex(w http.ResponseWriter, r *http.Request) { func handleIndex(w http.ResponseWriter, r *http.Request) {
@ -31,6 +32,15 @@ func handleIndex(w http.ResponseWriter, r *http.Request) {
WorkersMutex.Lock() WorkersMutex.Lock()
defer WorkersMutex.Unlock() defer WorkersMutex.Unlock()
for i := range Workers { for i := range Workers {
var queueEnable bool
var parallelTasks int
err := db.QueryRow(r.Context(), "SELECT queue_enable, parallel_tasks FROM workers WHERE id = $1", i).Scan(&queueEnable, &parallelTasks)
if err != nil {
w.Write([]byte(err.Error()))
slog.ErrorContext(r.Context(), "Error Querying Worker", "err", err)
}
if Workers[i].Connected { if Workers[i].Connected {
var status types.WorkerStatus var status types.WorkerStatus
_, err := rpcServer.Call(r.Context(), Workers[i].Conn, "status", nil, &status) _, err := rpcServer.Call(r.Context(), Workers[i].Conn, "status", nil, &status)
@ -41,15 +51,19 @@ func handleIndex(w http.ResponseWriter, r *http.Request) {
} }
slog.InfoContext(r.Context(), "Got Worker Status", "id", i, "status", status) slog.InfoContext(r.Context(), "Got Worker Status", "id", i, "status", status)
data.Workers = append(data.Workers, IndexWorker{ data.Workers = append(data.Workers, IndexWorker{
ID: i, ID: i,
Worker: *Workers[i], Worker: *Workers[i],
Status: &status, Status: &status,
QueueEnable: queueEnable,
ParallelTasks: parallelTasks,
}) })
} else { } else {
data.Workers = append(data.Workers, IndexWorker{ data.Workers = append(data.Workers, IndexWorker{
ID: i, ID: i,
Worker: *Workers[i], Worker: *Workers[i],
Status: nil, Status: nil,
QueueEnable: queueEnable,
ParallelTasks: parallelTasks,
}) })
} }
} }
@ -77,6 +91,10 @@ func handleIndex(w http.ResponseWriter, r *http.Request) {
} }
func splitInt(n int) []int { func splitInt(n int) []int {
if n == 0 {
return []int{0}
}
slc := []int{} slc := []int{}
for n > 0 { for n > 0 {
slc = append(slc, n%10) slc = append(slc, n%10)

View file

@ -5,17 +5,21 @@ import (
"fmt" "fmt"
"log/slog" "log/slog"
"net/http" "net/http"
"strconv"
"git.lastassault.de/speatzle/morffix/constants" "git.lastassault.de/speatzle/morffix/constants"
"github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5"
) )
type LibrariesData struct { type LibrariesData struct {
Libraries []Library Libraries []Library
FfmpegCommands []FfmpegCommand
} }
type Library struct { type Library struct {
ID string `db:"id"` ID string `db:"id"`
HealthCommandID *int `db:"health_command_id"`
TranscodeCommandID *int `db:"transcode_command_id"`
Name string `db:"name"` Name string `db:"name"`
Path string `db:"path"` Path string `db:"path"`
Enable bool `db:"enable"` Enable bool `db:"enable"`
@ -40,7 +44,7 @@ func handleLibraries(w http.ResponseWriter, r *http.Request) {
} }
} }
rows, err := db.Query(r.Context(), "SELECT id, name, path, enable, scan_new_queue_health, scan_changed_queue_health, scan_new_queue_transcode, scan_changed_queue_transcode, health_ok_queue_transcode, transcode_replace, transcode_path FROM libraries") rows, err := db.Query(r.Context(), "SELECT * FROM libraries")
if err != nil { if err != nil {
slog.ErrorContext(r.Context(), "Query Libraries", "err", err) slog.ErrorContext(r.Context(), "Query Libraries", "err", err)
http.Error(w, "Error Query Libraries: "+err.Error(), http.StatusInternalServerError) http.Error(w, "Error Query Libraries: "+err.Error(), http.StatusInternalServerError)
@ -54,6 +58,20 @@ func handleLibraries(w http.ResponseWriter, r *http.Request) {
} }
data.Libraries = libraries data.Libraries = libraries
rows, err = db.Query(r.Context(), "SELECT id, name, data FROM ffmpeg_commands")
if err != nil {
slog.ErrorContext(r.Context(), "Query Ffmpeg Commands", "err", err)
http.Error(w, "Error Ffmpeg Commands: "+err.Error(), http.StatusInternalServerError)
return
}
ffmpegCommands, err := pgx.CollectRows[FfmpegCommand](rows, pgx.RowToStructByName[FfmpegCommand])
if err != nil {
slog.ErrorContext(r.Context(), "Collect Ffmpeg Commands", "err", err)
http.Error(w, "Error Collect Ffmpeg Commands: "+err.Error(), http.StatusInternalServerError)
return
}
data.FfmpegCommands = ffmpegCommands
buf := bytes.Buffer{} buf := bytes.Buffer{}
err = templates.ExecuteTemplate(&buf, constants.LIBRARIES_TEMPLATE_NAME, data) err = templates.ExecuteTemplate(&buf, constants.LIBRARIES_TEMPLATE_NAME, data)
if err != nil { if err != nil {
@ -84,9 +102,21 @@ func createLibrary(r *http.Request) error {
scan_changed_queue_transcode := r.FormValue("scan_changed_queue_transcode") == "on" scan_changed_queue_transcode := r.FormValue("scan_changed_queue_transcode") == "on"
health_ok_queue_transcode := r.FormValue("health_ok_queue_transcode") == "on" health_ok_queue_transcode := r.FormValue("health_ok_queue_transcode") == "on"
var health_command_id *int = nil
var transcode_command_id *int = nil
h, err := strconv.Atoi(r.FormValue("health_command_id"))
if err == nil {
health_command_id = &h
}
t, err := strconv.Atoi(r.FormValue("transcode_command_id"))
if err == nil {
transcode_command_id = &t
}
slog.Info("Got Library Create", "name", name, "path", path, "enable", enable) slog.Info("Got Library Create", "name", name, "path", path, "enable", enable)
_, err = db.Exec(r.Context(), "INSERT INTO Libraries (name, path, enable, scan_new_queue_health, scan_changed_queue_health, scan_new_queue_transcode, scan_changed_queue_transcode, health_ok_queue_transcode, transcode_replace, transcode_path) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10)", name, path, enable, scan_new_queue_health, scan_changed_queue_health, scan_new_queue_transcode, scan_changed_queue_transcode, health_ok_queue_transcode, transcode_replace, transcode_path) _, err = db.Exec(r.Context(), "INSERT INTO Libraries (name, path, enable, scan_new_queue_health, scan_changed_queue_health, scan_new_queue_transcode, scan_changed_queue_transcode, health_ok_queue_transcode, transcode_replace, transcode_path, health_command_id, transcode_command_id) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12)", name, path, enable, scan_new_queue_health, scan_changed_queue_health, scan_new_queue_transcode, scan_changed_queue_transcode, health_ok_queue_transcode, transcode_replace, transcode_path, health_command_id, transcode_command_id)
if err != nil { if err != nil {
return fmt.Errorf("Inserting Library: %w", err) return fmt.Errorf("Inserting Library: %w", err)
} }

View file

@ -20,10 +20,12 @@ type File struct {
ID int `db:"id"` ID int `db:"id"`
Path string `db:"path"` Path string `db:"path"`
Size int64 `db:"size"` Size int64 `db:"size"`
OldSize *int64 `db:"old_size"`
Status constants.FileStatus `db:"status"` Status constants.FileStatus `db:"status"`
Health constants.FileHealth `db:"health"` Health constants.FileHealth `db:"health"`
Transcode constants.FileTranscode `db:"transcode"` Transcode constants.FileTranscode `db:"transcode"`
MD5 []byte `db:"md5"` MD5 []byte `db:"md5"`
OldMD5 *[]byte `db:"old_md5"`
UpdatedAt time.Time `db:"updated_at"` UpdatedAt time.Time `db:"updated_at"`
} }
@ -63,7 +65,7 @@ func handleLibrary(w http.ResponseWriter, r *http.Request) {
Enable: enabled, Enable: enabled,
} }
rows, err := db.Query(r.Context(), "SELECT id, path, size, status, health, transcode, md5, updated_at FROM files where library_id = $1", id) rows, err := db.Query(r.Context(), "SELECT id, path, size, old_size, status, health, transcode, md5, old_md5, updated_at FROM files where library_id = $1", id)
if err != nil { if err != nil {
slog.ErrorContext(r.Context(), "Query Files", "err", err) slog.ErrorContext(r.Context(), "Query Files", "err", err)
http.Error(w, "Error Query Files: "+err.Error(), http.StatusInternalServerError) http.Error(w, "Error Query Files: "+err.Error(), http.StatusInternalServerError)

41
server/parallel_tasks.go Normal file
View file

@ -0,0 +1,41 @@
package server
import (
"fmt"
"log/slog"
"net/http"
"strconv"
)
func HandleSetParallelTasks(w http.ResponseWriter, r *http.Request) {
err := r.ParseForm()
if err != nil {
http.Error(w, fmt.Sprintf("Parseing Form: %v", err), http.StatusBadRequest)
return
}
parallelTasks, err := strconv.Atoi(r.FormValue("parallel_tasks"))
if err != nil {
http.Error(w, "Can't parse parallel_tasks: "+err.Error(), http.StatusBadRequest)
return
}
worker := r.FormValue("worker")
slog.Info("Got set Parallel Tasks", "parallel_tasks", parallelTasks, "worker", worker)
if worker == "all" {
_, err = db.Exec(r.Context(), "UPDATE workers SET parallel_tasks = $1", parallelTasks)
if err != nil {
http.Error(w, fmt.Sprintf("Setting Worker Parallel Tasks: %v", err), http.StatusInternalServerError)
return
}
} else {
_, err = db.Exec(r.Context(), "UPDATE workers SET parallel_tasks = $1 where id = $2", parallelTasks, worker)
if err != nil {
http.Error(w, fmt.Sprintf("Setting Worker Parallel Tasks: %v", err), http.StatusInternalServerError)
return
}
}
http.Redirect(w, r, r.Header.Get("Referer"), http.StatusFound)
}

View file

@ -10,11 +10,11 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"slices" "slices"
"strconv"
"sync" "sync"
"time" "time"
"git.lastassault.de/speatzle/morffix/constants" "git.lastassault.de/speatzle/morffix/constants"
"git.lastassault.de/speatzle/morffix/workerpool"
"github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5"
"gopkg.in/vansante/go-ffprobe.v2" "gopkg.in/vansante/go-ffprobe.v2"
) )
@ -22,44 +22,47 @@ import (
var videoFileExtensions = []string{".mkv", ".mp4", ".webm", ".flv", ".avi"} var videoFileExtensions = []string{".mkv", ".mp4", ".webm", ".flv", ".avi"}
func handleScan(w http.ResponseWriter, r *http.Request) { func handleScan(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" { http.Error(w, "Not Implemented", http.StatusNotImplemented)
scanStatus(w, r) /*
return if r.Method == "GET" {
} scanStatus(w, r)
return
}
id, err := strconv.Atoi(r.FormValue("id")) id, err := strconv.Atoi(r.FormValue("id"))
if err != nil { if err != nil {
http.Error(w, "Parsing ID", http.StatusBadRequest) http.Error(w, "Parsing ID", http.StatusBadRequest)
return return
} }
var name string var name string
var path string var path string
var enabled bool var enabled bool
err = db.QueryRow(r.Context(), "SELECT name, path, enable FROM libraries WHERE id = $1", id).Scan(&name, &path, &enabled) err = db.QueryRow(r.Context(), "SELECT name, path, enable FROM libraries WHERE id = $1", id).Scan(&name, &path, &enabled)
if err != nil { if err != nil {
slog.ErrorContext(r.Context(), "Get Library", "err", err) slog.ErrorContext(r.Context(), "Get Library", "err", err)
http.Error(w, "Error Get Library: "+err.Error(), http.StatusInternalServerError) http.Error(w, "Error Get Library: "+err.Error(), http.StatusInternalServerError)
return return
} }
scanCtx := context.Background() scanCtx := context.Background()
go scanLibrary(scanCtx, id) go scanLibrary(scanCtx, id)
message := "Scan Started" message := "Scan Started"
buf := bytes.Buffer{} buf := bytes.Buffer{}
err = templates.ExecuteTemplate(&buf, constants.MESSAGE_TEMPLATE_NAME, message) err = templates.ExecuteTemplate(&buf, constants.MESSAGE_TEMPLATE_NAME, message)
if err != nil { if err != nil {
slog.ErrorContext(r.Context(), "Executing Library Template", "err", err) slog.ErrorContext(r.Context(), "Executing Library Template", "err", err)
http.Error(w, "Error Executing Template: "+err.Error(), http.StatusInternalServerError) http.Error(w, "Error Executing Template: "+err.Error(), http.StatusInternalServerError)
return return
} }
_, err = w.Write(buf.Bytes()) _, err = w.Write(buf.Bytes())
if err != nil { if err != nil {
slog.ErrorContext(r.Context(), "Writing http Response", "err", err) slog.ErrorContext(r.Context(), "Writing http Response", "err", err)
} }
*/
} }
func scanStatus(w http.ResponseWriter, r *http.Request) { func scanStatus(w http.ResponseWriter, r *http.Request) {
@ -79,7 +82,7 @@ func scanStatus(w http.ResponseWriter, r *http.Request) {
} }
} }
func manageScan(stop chan bool) { func manageScan(stop chan bool, scanPool *workerpool.WorkerPool, hashPool *workerpool.WorkerPool) {
scanTicker := time.NewTicker(time.Minute) scanTicker := time.NewTicker(time.Minute)
hashTicker := time.NewTicker(time.Second) hashTicker := time.NewTicker(time.Second)
scanRunning := false scanRunning := false
@ -111,7 +114,7 @@ func manageScan(stop chan bool) {
} }
for _, l := range libraries { for _, l := range libraries {
scanLibrary(ctx, l) scanLibrary(ctx, l, scanPool)
} }
}() }()
@ -126,19 +129,32 @@ func manageScan(stop chan bool) {
hashRunning = false hashRunning = false
}() }()
var fileID uint rows, err := db.Query(ctx, "SELECT f.id FROM files f INNER JOIN libraries l ON f.library_id = l.id WHERE l.enable = true AND (f.status = $1 OR f.status = $2) LIMIT 10", constants.FILE_STATUS_CHANGED, constants.FILE_STATUS_NEW)
err := db.QueryRow(ctx, "SELECT id FROM files WHERE status = $1 OR status = $2 LIMIT 1", constants.FILE_STATUS_CHANGED, constants.FILE_STATUS_NEW).Scan(&fileID)
if err != nil { if err != nil {
if !errors.Is(err, pgx.ErrNoRows) { if !errors.Is(err, pgx.ErrNoRows) {
slog.ErrorContext(ctx, "Error Getting Files for Hashing", "err", err) slog.ErrorContext(ctx, "Error Getting Files for Hashing", "err", err)
} }
// Nothing to do
return return
} }
err = hashFile(ctx, fileID) files, err := pgx.CollectRows[uint](rows, pgx.RowTo[uint])
if err != nil { if err != nil {
slog.ErrorContext(ctx, "Error Hashing File", "err", err) slog.ErrorContext(ctx, "Collect File Rows", "err", err)
return
} }
for _, f := range files {
hashPool.QueueJob(func(ctx context.Context) {
err = hashFile(ctx, f)
if err != nil {
slog.ErrorContext(ctx, "Error Hashing File", "err", err)
}
})
}
// Wait for all jobs to finish since we don't track which ones are in progress and don't want to hash the same file twice. Not ideal but better than just doing one at a time
hashPool.WaitForEmptyQueue()
}() }()
case <-stop: case <-stop:
cancel() cancel()
@ -149,7 +165,7 @@ func manageScan(stop chan bool) {
var scanLock sync.Mutex var scanLock sync.Mutex
func scanLibrary(ctx context.Context, id int) { func scanLibrary(ctx context.Context, id int, scanPool *workerpool.WorkerPool) {
slog.InfoContext(ctx, "Acquiring Scan Lock...") slog.InfoContext(ctx, "Acquiring Scan Lock...")
scanLock.Lock() scanLock.Lock()
defer scanLock.Unlock() defer scanLock.Unlock()
@ -158,7 +174,9 @@ func scanLibrary(ctx context.Context, id int) {
var name string var name string
var lpath string var lpath string
var enabled bool var enabled bool
err := db.QueryRow(ctx, "SELECT name, path, enable FROM libraries WHERE id = $1", id).Scan(&name, &lpath, &enabled) var health_command_id *int
var scan_new_queue_health bool
err := db.QueryRow(ctx, "SELECT name, path, enable, health_command_id, scan_new_queue_health FROM libraries WHERE id = $1", id).Scan(&name, &lpath, &enabled, &health_command_id, &scan_new_queue_health)
if err != nil { if err != nil {
slog.ErrorContext(ctx, "Get Library", "err", err) slog.ErrorContext(ctx, "Get Library", "err", err)
return return
@ -180,22 +198,17 @@ func scanLibrary(ctx context.Context, id int) {
return return
} }
tx, err := db.Begin(ctx)
if err != nil {
slog.ErrorContext(ctx, "Begin Transaction", "err", err)
return
}
defer tx.Rollback(ctx)
slog.InfoContext(ctx, "Checking Files...", "id", id, "path", lpath) slog.InfoContext(ctx, "Checking Files...", "id", id, "path", lpath)
// Mark all Files as Missing // TODO Work on getting missing status again
_, err = tx.Exec(ctx, "UPDATE files SET status = $2 where library_id = $1", id, constants.FILE_STATUS_MISSING) /*
if err != nil { // Mark all Files as Missing
slog.ErrorContext(ctx, "Setting Missing Status", "err", err) _, err = tx.Exec(ctx, "UPDATE files SET status = $2 where library_id = $1", id, constants.FILE_STATUS_MISSING)
return if err != nil {
} slog.ErrorContext(ctx, "Setting Missing Status", "err", err)
return
}
*/
err = filepath.Walk(lpath, err = filepath.Walk(lpath,
func(fullPath string, info os.FileInfo, err error) error { func(fullPath string, info os.FileInfo, err error) error {
if errors.Is(err, os.ErrPermission) { if errors.Is(err, os.ErrPermission) {
@ -211,7 +224,7 @@ func scanLibrary(ctx context.Context, id int) {
} }
if !slices.Contains(videoFileExtensions, filepath.Ext(fullPath)) { if !slices.Contains(videoFileExtensions, filepath.Ext(fullPath)) {
slog.InfoContext(ctx, "Skipping non video file", "path", fullPath) slog.DebugContext(ctx, "Skipping non video file", "path", fullPath)
return nil return nil
} }
@ -223,52 +236,63 @@ func scanLibrary(ctx context.Context, id int) {
var fileID int var fileID int
var oldSize uint var oldSize uint
var oldModTime time.Time var oldModTime time.Time
err = tx.QueryRow(ctx, "SELECT id, size, mod_time FROM files WHERE library_id = $1 AND path = $2", id, fPath).Scan(&fileID, &oldSize, &oldModTime)
// Remove Timezone and Round to nearest Second
newModTime := info.ModTime().UTC().Round(time.Second)
err = db.QueryRow(ctx, "SELECT id, size, mod_time FROM files WHERE library_id = $1 AND path = $2", id, fPath).Scan(&fileID, &oldSize, &oldModTime)
if errors.Is(err, pgx.ErrNoRows) { if errors.Is(err, pgx.ErrNoRows) {
// File Does not Exist Yet // File Does not Exist Yet
slog.InfoContext(ctx, "File is New, Running FFProbe...", "path", fullPath) scanPool.QueueJob(
func(ctx context.Context) {
slog.InfoContext(ctx, "File is New, Running FFProbe...", "path", fullPath)
ffprobeData, err := ffprobe.ProbeURL(ctx, fullPath) ffprobeData, err := ffprobe.ProbeURL(ctx, fullPath)
if err != nil { if err != nil {
return fmt.Errorf("ffprobe New File: %w", err) slog.ErrorContext(ctx, "ffprobe New File", "err", err)
} }
slog.InfoContext(ctx, "ffprobe Done", "path", fullPath) slog.InfoContext(ctx, "ffprobe Done", "path", fullPath)
_, err = tx.Exec(ctx, "INSERT INTO files (library_id, path, size, status, health, ffprobe_data) VALUES ($1, $2, $3, $4, $5, $6)", id, fPath, info.Size(), constants.FILE_STATUS_NEW, constants.FILE_HEALTH_UNKNOWN, ffprobeData) var file_id int
if err != nil { err = db.QueryRow(ctx, `INSERT INTO files (library_id, path, size, status, health, ffprobe_data, mod_time)
return fmt.Errorf("Add New File to DB: %w", err) VALUES ($1, $2, $3, $4, $5, $6, $7)
} RETURNING id`, id, fPath, info.Size(), constants.FILE_STATUS_NEW, constants.FILE_HEALTH_UNKNOWN, ffprobeData, newModTime).Scan(&file_id)
if err != nil {
slog.ErrorContext(ctx, "Add New File to DB", "err", err)
}
if scan_new_queue_health && health_command_id != nil {
slog.InfoContext(ctx, "Queueing HealthCheck On New File", "file_id", file_id, "health_command_id", health_command_id)
_, err = db.Exec(ctx, "INSERT INTO tasks (file_id, type, status, data, ffmpeg_command_id) VALUES ($1,$2,$3,$4,$5)", file_id, constants.TASK_TYPE_HEALTHCHECK, constants.TASK_STATUS_QUEUED, nil, health_command_id)
if err != nil {
slog.ErrorContext(ctx, "Queueing Transcode On HealthCheck Success", "err", err, "file_id", file_id, "health_command_id", health_command_id)
}
}
})
return nil return nil
} else if err != nil { } else if err != nil {
return fmt.Errorf("Getting File: %w", err) return fmt.Errorf("Getting File: %w", err)
} }
// Remove Timezone and Round to nearest Second
newModTime := info.ModTime().UTC().Round(time.Second)
// File Already Exists, Check if it has been changed // File Already Exists, Check if it has been changed
if newModTime.Equal(oldModTime) && uint(info.Size()) == oldSize { if newModTime.Equal(oldModTime) && uint(info.Size()) == oldSize {
slog.Debug("File stayed the same", "id", fileID) slog.Debug("File stayed the same", "id", fileID)
_, err = tx.Exec(ctx, "UPDATE files SET status = $2 WHERE id = $1", fileID, constants.FILE_STATUS_EXISTS)
if err != nil {
return fmt.Errorf("Updating Non Changed File in DB: %w", err)
}
} else { } else {
slog.InfoContext(ctx, "File Has Changed", "path", fullPath, "old_mod_time", oldModTime, "new_mod_time", newModTime, "old_size", oldSize, "new_size", info.Size()) scanPool.QueueJob(func(ctx context.Context) {
slog.InfoContext(ctx, "File Has Changed", "path", fullPath, "old_mod_time", oldModTime, "new_mod_time", newModTime, "old_size", oldSize, "new_size", info.Size())
ffprobeData, err := ffprobe.ProbeURL(ctx, fullPath) ffprobeData, err := ffprobe.ProbeURL(ctx, fullPath)
if err != nil { if err != nil {
return fmt.Errorf("ffprobe Changed File: %w", err) slog.ErrorContext(ctx, "ffprobe Changed File", "err", err)
} }
slog.InfoContext(ctx, "ffprobe Done", "path", fullPath) slog.InfoContext(ctx, "ffprobe Done", "path", fullPath)
_, err = tx.Exec(ctx, "UPDATE files SET size = $2, status = $3, health = $4, mod_time = $5, ffprobe_data = $6 WHERE id = $1", fileID, info.Size(), constants.FILE_STATUS_CHANGED, constants.FILE_HEALTH_UNKNOWN, newModTime, ffprobeData) _, err = db.Exec(ctx, "UPDATE files SET size = $2, status = $3, health = $4, mod_time = $5, ffprobe_data = $6 WHERE id = $1", fileID, info.Size(), constants.FILE_STATUS_CHANGED, constants.FILE_HEALTH_UNKNOWN, newModTime, ffprobeData)
if err != nil { if err != nil {
return fmt.Errorf("Updating Changed File in DB: %w", err) slog.ErrorContext(ctx, "Updating Changed File in DB", "err", err)
} }
})
} }
return nil return nil
}) })
if err != nil { if err != nil {
@ -276,10 +300,7 @@ func scanLibrary(ctx context.Context, id int) {
return return
} }
err = tx.Commit(ctx) scanPool.WaitForEmptyQueue()
if err != nil {
slog.ErrorContext(ctx, "Error Committing Changes", "err", err)
return
}
slog.InfoContext(ctx, "Scan Done", "id", id) slog.InfoContext(ctx, "Scan Done", "id", id)
} }

View file

@ -17,6 +17,7 @@ import (
"github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/pgxpool"
"git.lastassault.de/speatzle/morffix/config" "git.lastassault.de/speatzle/morffix/config"
"git.lastassault.de/speatzle/morffix/workerpool"
) )
var conf config.Config var conf config.Config
@ -102,6 +103,9 @@ func Start(_conf config.Config, tmplFS embed.FS, staticFS embed.FS, migrationsFS
mux.HandleFunc("/libraries", handleLibraries) mux.HandleFunc("/libraries", handleLibraries)
mux.HandleFunc("/ffmpeg_commands", handleFfmpegCommands) mux.HandleFunc("/ffmpeg_commands", handleFfmpegCommands)
mux.HandleFunc("/queue_enable", HandleSetQueueEnable) mux.HandleFunc("/queue_enable", HandleSetQueueEnable)
mux.HandleFunc("/parallel_tasks", HandleSetParallelTasks)
mux.HandleFunc("/stats", handleStats)
mux.HandleFunc("/stats/{id}", handleStats)
mux.HandleFunc("/", handleIndex) mux.HandleFunc("/", handleIndex)
@ -129,8 +133,11 @@ func Start(_conf config.Config, tmplFS embed.FS, staticFS embed.FS, migrationsFS
stopWorkerManagement := make(chan bool, 1) stopWorkerManagement := make(chan bool, 1)
go manageWorkers(stopWorkerManagement) go manageWorkers(stopWorkerManagement)
scanPool := workerpool.NewWorkerPool(10, 100000)
hashPool := workerpool.NewWorkerPool(5, 100000)
stopScanning := make(chan bool, 1) stopScanning := make(chan bool, 1)
go manageScan(stopScanning) go manageScan(stopScanning, scanPool, hashPool)
sigs := make(chan os.Signal, 1) sigs := make(chan os.Signal, 1)
signal.Notify(sigs, os.Interrupt) signal.Notify(sigs, os.Interrupt)
@ -146,6 +153,10 @@ func Start(_conf config.Config, tmplFS embed.FS, staticFS embed.FS, migrationsFS
stopWorkerManagement <- true stopWorkerManagement <- true
slog.Info("Stopping Scanning...") slog.Info("Stopping Scanning...")
stopScanning <- true stopScanning <- true
slog.Info("Stopping Scanning Pool...")
scanPool.Stop()
slog.Info("Stopping Hashing Pool...")
hashPool.Stop()
slog.Info("Done") slog.Info("Done")
} }
} }

369
server/stats.go Normal file
View file

@ -0,0 +1,369 @@
package server
import (
"bytes"
"context"
"fmt"
"html/template"
"log/slog"
"net/http"
"slices"
"strings"
"time"
"git.lastassault.de/speatzle/morffix/constants"
"github.com/go-echarts/go-echarts/v2/charts"
"github.com/go-echarts/go-echarts/v2/opts"
"github.com/jackc/pgx/v5"
)
type StatsDisplay struct {
Charts []ChartData
Libraries []Library
SelectedLibrary string
Size []int
Duration []int
Count []int
Saved []int
}
type ChartData struct {
Element template.HTML
Script template.HTML
}
type PieValue struct {
Name string
Value int
}
type PieIntValue struct {
Id int
Value int
}
const CHART_COLOR_SUCCESS = "#7cffb2"
const CHART_COLOR_FAILED = "#ff6e76"
const CHART_COLOR_UNKNOWN = "#fddd60"
func generatePie(name string, data []opts.PieData) ChartData {
pie := charts.NewPie()
pie.SetGlobalOptions(
charts.WithInitializationOpts(opts.Initialization{
Theme: "dark",
BackgroundColor: "#111",
}),
charts.WithTitleOpts(opts.Title{
Title: name,
}))
for i := range data {
if data[i].Name == "Success" || data[i].Name == "Healthy" {
data[i].ItemStyle = &opts.ItemStyle{
Color: CHART_COLOR_SUCCESS,
}
} else if data[i].Name == "Failed" || data[i].Name == "Damaged" {
data[i].ItemStyle = &opts.ItemStyle{
Color: CHART_COLOR_FAILED,
}
} else if data[i].Name == "Unknown" || data[i].Name == "None" {
data[i].ItemStyle = &opts.ItemStyle{
Color: CHART_COLOR_UNKNOWN,
}
}
}
pie.AddSeries(name, data).SetSeriesOptions(
charts.WithLabelOpts(
opts.Label{
Show: opts.Bool(true),
Formatter: "{b}: {c}",
}),
)
snippet := pie.RenderSnippet()
return ChartData{
Element: template.HTML(snippet.Element),
Script: template.HTML(snippet.Script),
}
}
func generateStats(ctx context.Context, library_id string) ([]ChartData, error) {
data := []ChartData{}
rows, err := db.Query(ctx,
`SELECT COALESCE(jsonb_path_query_first(ffprobe_data, '$.streams[*] ? (@.codec_type == "video") ? (@.disposition.attached_pic == 0).codec_name')::text, 'Unknown') AS name, COUNT(*) AS value
FROM files
WHERE ffprobe_data IS NOT NULL AND ($1 = -1 OR library_id = $1)
GROUP BY 1;`, library_id)
if err != nil {
return nil, fmt.Errorf("Query Codecs: %w", err)
}
codecCounts, err := pgx.CollectRows(rows, pgx.RowToStructByName[PieValue])
if err != nil {
return nil, fmt.Errorf("Collect Codec Data: %w", err)
}
res := []opts.PieData{}
for _, v := range codecCounts {
res = append(res, opts.PieData{
Name: strings.ReplaceAll(v.Name, "\"", ""),
Value: v.Value,
})
}
data = append(data, generatePie("Codecs", res))
rows, err = db.Query(ctx,
`SELECT COALESCE(jsonb_path_query_first(ffprobe_data, '$.streams[*] ? (@.codec_type == "video") ? (@.disposition.attached_pic == 0).width')::text, 'Unknown') AS name, COUNT(*) AS value
FROM files
WHERE ffprobe_data IS NOT NULL AND ($1 = -1 OR library_id = $1)
GROUP BY 1;`, library_id)
if err != nil {
return nil, fmt.Errorf("Query Resolution: %w", err)
}
resolutionCounts, err := pgx.CollectRows(rows, pgx.RowToStructByName[PieValue])
if err != nil {
return nil, fmt.Errorf("Collect Codec Data: %w", err)
}
res = []opts.PieData{}
for _, v := range resolutionCounts {
res = append(res, opts.PieData{
Name: strings.ReplaceAll(v.Name, "\"", ""),
Value: v.Value,
})
}
data = append(data, generatePie("Resolution", res))
rows, err = db.Query(ctx,
`SELECT COALESCE(jsonb_path_query_first(ffprobe_data, '$.format.format_name')::text, 'Unknown') AS name, COUNT(*) AS value
FROM files
WHERE ffprobe_data IS NOT NULL AND ($1 = -1 OR library_id = $1)
GROUP BY 1;`, library_id)
if err != nil {
return nil, fmt.Errorf("Query Container: %w", err)
}
containerCounts, err := pgx.CollectRows(rows, pgx.RowToStructByName[PieValue])
if err != nil {
return nil, fmt.Errorf("Collect Container Data: %w", err)
}
res = []opts.PieData{}
for _, v := range containerCounts {
res = append(res, opts.PieData{
Name: strings.ReplaceAll(v.Name, "\"", ""),
Value: v.Value,
})
}
data = append(data, generatePie("Container", res))
rows, err = db.Query(ctx,
`SELECT health AS id, COUNT(*) AS value
FROM files WHERE ($1 = -1 OR library_id = $1)
GROUP BY 1;`, library_id)
if err != nil {
return nil, fmt.Errorf("Query Health: %w", err)
}
healthCounts, err := pgx.CollectRows(rows, pgx.RowToStructByName[PieIntValue])
if err != nil {
return nil, fmt.Errorf("Collect Health Data: %w", err)
}
res = []opts.PieData{}
for _, v := range healthCounts {
res = append(res, opts.PieData{
Name: constants.FileHealth(v.Id).String(),
Value: v.Value,
})
}
data = append(data, generatePie("Health", res))
rows, err = db.Query(ctx,
`SELECT transcode AS id, COUNT(*) AS value
FROM files WHERE ($1 = -1 OR library_id = $1)
GROUP BY 1;`, library_id)
if err != nil {
return nil, fmt.Errorf("Query Transcode: %w", err)
}
transcodeCounts, err := pgx.CollectRows(rows, pgx.RowToStructByName[PieIntValue])
if err != nil {
return nil, fmt.Errorf("Collect Transcode Data: %w", err)
}
res = []opts.PieData{}
for _, v := range transcodeCounts {
res = append(res, opts.PieData{
Name: constants.FileTranscode(v.Id).String(),
Value: v.Value,
})
}
data = append(data, generatePie("Transcode Status", res))
rows, err = db.Query(ctx,
`SELECT tasks.status AS id, COUNT(*) AS value
FROM tasks INNER JOIN files ON files.id = tasks.file_id
WHERE ($1 = -1 OR files.library_id = $1)
GROUP BY 1;`, library_id)
if err != nil {
return nil, fmt.Errorf("Query Task Status: %w", err)
}
taskStatusCounts, err := pgx.CollectRows(rows, pgx.RowToStructByName[PieIntValue])
if err != nil {
return nil, fmt.Errorf("Collect Task Status Data: %w", err)
}
res = []opts.PieData{}
for _, v := range taskStatusCounts {
res = append(res, opts.PieData{
Name: constants.TaskStatus(v.Id).String(),
Value: v.Value,
})
}
data = append(data, generatePie("Task Status", res))
type BarTaskRowValue struct {
Date time.Time
Status constants.TaskStatus
Count int
}
rows, err = db.Query(ctx,
`SELECT date_trunc('day', tasks.updated_at) date, tasks.status, COUNT(*) AS count
FROM tasks INNER JOIN files ON files.id = tasks.file_id
WHERE ($1 = -1 OR files.library_id = $1) AND tasks.updated_at > CURRENT_DATE - 7 AND (tasks.status = $2 OR tasks.status = $3)
GROUP BY 1,2
ORDER BY date;`, library_id, constants.TASK_STATUS_SUCCESS, constants.TASK_STATUS_FAILED)
if err != nil {
return nil, fmt.Errorf("Query Task Status Day: %w", err)
}
taskStatusDayCounts, err := pgx.CollectRows(rows, pgx.RowToStructByName[BarTaskRowValue])
if err != nil {
return nil, fmt.Errorf("Collect Task Status Day Data: %w", err)
}
days := []string{}
successBarData := []opts.BarData{}
failedBarData := []opts.BarData{}
for _, v := range taskStatusDayCounts {
if !slices.Contains(days, v.Date.Format(time.DateOnly)) {
days = append(days, v.Date.Format(time.DateOnly))
}
if v.Status == constants.TASK_STATUS_SUCCESS {
successBarData = append(successBarData, opts.BarData{
Value: v.Count,
ItemStyle: &opts.ItemStyle{
Color: CHART_COLOR_SUCCESS,
},
})
} else if v.Status == constants.TASK_STATUS_FAILED {
failedBarData = append(failedBarData, opts.BarData{
Value: v.Count,
ItemStyle: &opts.ItemStyle{
Color: CHART_COLOR_FAILED,
},
})
}
}
bar := charts.NewBar()
bar.SetGlobalOptions(
charts.WithInitializationOpts(opts.Initialization{
Theme: "dark",
BackgroundColor: "#111",
}),
charts.WithTitleOpts(opts.Title{
Title: "Task Success/Failed Last 7 Days",
}),
)
bar.SetXAxis(days).
AddSeries("Success", successBarData).
AddSeries("Failed", failedBarData).
SetSeriesOptions(charts.WithBarChartOpts(opts.BarChart{
Stack: "stackA",
}))
snippet := bar.RenderSnippet()
data = append(data, ChartData{
Element: template.HTML(snippet.Element),
Script: template.HTML(snippet.Script),
})
return data, nil
}
func handleStats(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
if id == "" {
id = "-1"
}
data, err := generateStats(r.Context(), id)
if err != nil {
slog.ErrorContext(r.Context(), "Generate Stats:", "err", err)
http.Error(w, "Generate Stats: "+err.Error(), http.StatusInternalServerError)
return
}
rows, err := db.Query(r.Context(),
`SELECT *
FROM libraries;`)
if err != nil {
slog.ErrorContext(r.Context(), "Query Libraries", "err", err)
http.Error(w, "Query Libraries: "+err.Error(), http.StatusInternalServerError)
return
}
libraries, err := pgx.CollectRows(rows, pgx.RowToStructByName[Library])
if err != nil {
slog.ErrorContext(r.Context(), "Collect Libraries", "err", err)
http.Error(w, "Collect Libraries: "+err.Error(), http.StatusInternalServerError)
return
}
var size int
var count int
var duration int
err = db.QueryRow(r.Context(),
`SELECT SUM(size) AS size,
COUNT(id) as count,
0 as duration
FROM files WHERE ($1 = -1 OR files.library_id = $1) AND status = $2`, id, constants.FILE_STATUS_EXISTS).Scan(&size, &count, &duration)
if err != nil {
size = 0
count = 0
duration = 0
}
var saved int
err = db.QueryRow(r.Context(),
`SELECT (SUM(old_size) - COALESCE(SUM(size), 0)) AS saved
FROM files WHERE ($1 = -1 OR files.library_id = $1) AND status = $2 AND old_size IS NOT NULL`, id, constants.FILE_STATUS_EXISTS).Scan(&saved)
if err != nil {
saved = 0
}
buf := bytes.Buffer{}
err = templates.ExecuteTemplate(&buf, constants.STATS_TEMPLATE_NAME, StatsDisplay{
Libraries: libraries,
SelectedLibrary: id,
Size: splitInt(size),
Count: splitInt(count),
Duration: splitInt(duration),
Saved: splitInt(saved),
Charts: data,
})
if err != nil {
slog.ErrorContext(r.Context(), "Executing Stats Template", "err", err)
http.Error(w, "Error Executing Template: "+err.Error(), http.StatusInternalServerError)
return
}
_, err = w.Write(buf.Bytes())
if err != nil {
slog.ErrorContext(r.Context(), "Writing http Response", "err", err)
}
}

View file

@ -107,7 +107,7 @@ func handleTasks(w http.ResponseWriter, r *http.Request) {
return return
} }
rows, err := db.Query(r.Context(), "SELECT id, name, path, enable, scan_new_queue_health, scan_changed_queue_health, scan_new_queue_transcode, scan_changed_queue_transcode, health_ok_queue_transcode, transcode_replace, transcode_path FROM libraries WHERE enable = $1", true) rows, err := db.Query(r.Context(), "SELECT * FROM libraries WHERE enable = $1", true)
if err != nil { if err != nil {
slog.ErrorContext(r.Context(), "Query Libraries", "err", err) slog.ErrorContext(r.Context(), "Query Libraries", "err", err)
http.Error(w, "Error Query Libraries: "+err.Error(), http.StatusInternalServerError) http.Error(w, "Error Query Libraries: "+err.Error(), http.StatusInternalServerError)
@ -328,6 +328,7 @@ type QueuedTask struct {
Type constants.TaskType Type constants.TaskType
FileID int `json:"file_id"` FileID int `json:"file_id"`
FileMD5 []byte `json:"file_md5" db:"md5"` FileMD5 []byte `json:"file_md5" db:"md5"`
FileExtension string `json:"file_extension"`
Data json.RawMessage Data json.RawMessage
FfmpegCommand types.FFmpegCommand `json:"ffmpeg_command" db:"ffmpeg_command"` FfmpegCommand types.FFmpegCommand `json:"ffmpeg_command" db:"ffmpeg_command"`
} }
@ -344,7 +345,7 @@ func assignQueuedTasks(ctx context.Context) error {
assignRunning = false assignRunning = false
}() }()
rows, err := db.Query(ctx, "SELECT t.id as id, t.type as type, t.file_id as file_id, f.md5 as md5, t.data as data, fc.data as ffmpeg_command FROM tasks t INNER JOIN files f ON f.id = t.file_id INNER JOIN ffmpeg_commands fc ON fc.id = t.ffmpeg_command_id WHERE t.status = $1", constants.TASK_STATUS_QUEUED) rows, err := db.Query(ctx, "SELECT t.id as id, t.type as type, t.file_id as file_id, f.md5 as md5, substring(f.path FROM '\\.([^\\.]*)$') AS file_extension, t.data as data, fc.data as ffmpeg_command FROM tasks t INNER JOIN files f ON f.id = t.file_id INNER JOIN ffmpeg_commands fc ON fc.id = t.ffmpeg_command_id WHERE t.status = $1 AND f.status = $2 ORDER BY type DESC", constants.TASK_STATUS_QUEUED, constants.FILE_STATUS_EXISTS)
if err != nil { if err != nil {
return fmt.Errorf("Query Queued Tasks: %w", err) return fmt.Errorf("Query Queued Tasks: %w", err)
} }
@ -353,6 +354,8 @@ func assignQueuedTasks(ctx context.Context) error {
return fmt.Errorf("Collect Queued Tasks: %w", err) return fmt.Errorf("Collect Queued Tasks: %w", err)
} }
// TODO, allow overwriting of extension
//slog.Info("Assigning Queued Tasks", "count", len(queuedTasks)) //slog.Info("Assigning Queued Tasks", "count", len(queuedTasks))
if len(queuedTasks) == 0 { if len(queuedTasks) == 0 {
@ -372,7 +375,8 @@ func assignQueuedTasks(ctx context.Context) error {
} }
if Workers[i].Connected { if Workers[i].Connected {
var queueEnable bool var queueEnable bool
err := db.QueryRow(ctx, "SELECT queue_enable FROM workers WHERE id = $1", i).Scan(&queueEnable) var parallelTasks int
err := db.QueryRow(ctx, "SELECT queue_enable, parallel_tasks FROM workers WHERE id = $1", i).Scan(&queueEnable, &parallelTasks)
if err != nil { if err != nil {
return fmt.Errorf("Error Querying Worker Queue Enable: %w", err) return fmt.Errorf("Error Querying Worker Queue Enable: %w", err)
} }
@ -390,7 +394,7 @@ func assignQueuedTasks(ctx context.Context) error {
slog.Debug("Assigning Queued Tasks Worker", "worker", i, "count", count) slog.Debug("Assigning Queued Tasks Worker", "worker", i, "count", count)
// Allow for Multiple Tasks at once in the future // Allow for Multiple Tasks at once in the future
if count < 1 { if count < parallelTasks {
tx, err := db.Begin(ctx) tx, err := db.Begin(ctx)
defer tx.Rollback(ctx) defer tx.Rollback(ctx)
if err != nil { if err != nil {
@ -406,6 +410,7 @@ func assignQueuedTasks(ctx context.Context) error {
Type: queuedTasks[lastAssigned].Type, Type: queuedTasks[lastAssigned].Type,
FileID: queuedTasks[lastAssigned].FileID, FileID: queuedTasks[lastAssigned].FileID,
FileMD5: queuedTasks[lastAssigned].FileMD5, FileMD5: queuedTasks[lastAssigned].FileMD5,
FileExtension: queuedTasks[lastAssigned].FileExtension,
Data: queuedTasks[lastAssigned].Data, Data: queuedTasks[lastAssigned].Data,
FfmpegCommand: queuedTasks[lastAssigned].FfmpegCommand, FfmpegCommand: queuedTasks[lastAssigned].FfmpegCommand,
} }

View file

@ -1,15 +1,20 @@
package server package server
import ( import (
"context"
"encoding/base64" "encoding/base64"
"errors" "errors"
"fmt"
"io" "io"
"log/slog" "log/slog"
"net/http" "net/http"
"os" "os"
"path/filepath" "path/filepath"
"strconv"
"time"
"git.lastassault.de/speatzle/morffix/constants" "git.lastassault.de/speatzle/morffix/constants"
"gopkg.in/vansante/go-ffprobe.v2"
) )
func handleUpload(w http.ResponseWriter, r *http.Request) { func handleUpload(w http.ResponseWriter, r *http.Request) {
@ -18,30 +23,56 @@ func handleUpload(w http.ResponseWriter, r *http.Request) {
return return
} }
id := r.PathValue("id") strid := r.PathValue("id")
if id == "" { if strid == "" {
http.Error(w, "No id", http.StatusBadRequest) http.Error(w, "No id", http.StatusBadRequest)
return return
} }
var count int id, err := strconv.Atoi(strid)
err := db.QueryRow(r.Context(), "SELECT COUNT(*) FROM tasks WHERE file_id = $1 AND worker_id = $2", id, r.Header.Get(constants.UUID_HEADER)).Scan(&count)
if err != nil { if err != nil {
slog.ErrorContext(r.Context(), "Query Task Count", "err", err) errorUpload(r, w, 0, "Convert File ID to int", err)
http.Error(w, "Error Query Task Count: "+err.Error(), http.StatusInternalServerError)
return return
} }
if count < 1 { if r.Header.Get(constants.WORKER_VERSION_HEADER) != constants.WORKER_VERSION {
slog.ErrorContext(r.Context(), "No Running Task for file", "id", id) http.Error(w, "Wrong Worker Version", http.StatusBadRequest)
http.Error(w, "No Running Task for file: "+id, http.StatusBadRequest) return
}
taskid, err := strconv.Atoi(r.Header.Get(constants.TASK_ID_HEADER))
if err != nil {
errorUpload(r, w, 0, "Convert Task ID to int", err)
return
}
var fileid int
var workerid string
var status constants.TaskStatus
err = db.QueryRow(r.Context(), "SELECT file_id, worker_id, status FROM tasks WHERE id = $1", taskid).Scan(&fileid, &workerid, &status)
if err != nil {
errorUpload(r, w, 0, "Getting Task for Upload", err)
return
}
if workerid != r.Header.Get(constants.UUID_HEADER) {
errorUpload(r, w, taskid, "Worker is not Assigned to Task", err)
return
}
if fileid != id {
errorUpload(r, w, taskid, "File is not for this Task", err)
return
}
if !(status == constants.TASK_STATUS_RUNNING || status == constants.TASK_STATUS_UNKNOWN) {
errorUpload(r, w, taskid, "File can only be uploaded when task status is running or unknown", err)
return return
} }
hash, err := base64.StdEncoding.DecodeString(r.Header.Get(constants.HASH_HEADER)) hash, err := base64.StdEncoding.DecodeString(r.Header.Get(constants.HASH_HEADER))
if err != nil { if err != nil {
slog.ErrorContext(r.Context(), "Decode Hash", "err", err) errorUpload(r, w, taskid, "Decode Hash", err)
http.Error(w, "Decode Hash: "+err.Error(), http.StatusBadRequest)
return return
} }
@ -49,8 +80,7 @@ func handleUpload(w http.ResponseWriter, r *http.Request) {
var libraryID int var libraryID int
err = db.QueryRow(r.Context(), "SELECT path, library_id FROM files WHERE id = $1", id).Scan(&fPath, &libraryID) err = db.QueryRow(r.Context(), "SELECT path, library_id FROM files WHERE id = $1", id).Scan(&fPath, &libraryID)
if err != nil { if err != nil {
slog.ErrorContext(r.Context(), "Query File Path", "err", err) errorUpload(r, w, taskid, "Query File Path", err)
http.Error(w, "Error Query File Path: "+err.Error(), http.StatusInternalServerError)
return return
} }
@ -59,18 +89,18 @@ func handleUpload(w http.ResponseWriter, r *http.Request) {
var tReplace bool var tReplace bool
err = db.QueryRow(r.Context(), "SELECT path, transcode_path, transcode_replace FROM libraries WHERE id = $1", libraryID).Scan(&lPath, &tPath, &tReplace) err = db.QueryRow(r.Context(), "SELECT path, transcode_path, transcode_replace FROM libraries WHERE id = $1", libraryID).Scan(&lPath, &tPath, &tReplace)
if err != nil { if err != nil {
slog.ErrorContext(r.Context(), "Query Library Path", "err", err) errorUpload(r, w, taskid, "Query Library Path", err)
http.Error(w, "Error Query Library Path: "+err.Error(), http.StatusInternalServerError)
return return
} }
// When replacing, send to temp, otherwise write directly
var path string var path string
if tReplace { if tReplace {
//path = filepath.Join(lPath, fPath) path = filepath.Join(lPath, fPath+constants.TEMP_FILE_EXTENSION)
slog.ErrorContext(r.Context(), "Replace mode is not implemented") // if replace then this is a temp file and should be cleaned up on error
http.Error(w, "Replace mode is not implemented", http.StatusNotImplemented) defer os.Remove(path)
return
} else { } else {
// todo write this to a temp file first also to be able to run cleanup on error and unify the rename logic
path = filepath.Join(tPath, fPath) path = filepath.Join(tPath, fPath)
} }
@ -78,15 +108,13 @@ func handleUpload(w http.ResponseWriter, r *http.Request) {
err = r.ParseMultipartForm(100 << 20) err = r.ParseMultipartForm(100 << 20)
if err != nil { if err != nil {
slog.ErrorContext(r.Context(), "Parse Multipart Form", "err", err) errorUpload(r, w, taskid, "Parse Multipart Form", err)
http.Error(w, "Parse Multipart Form: "+err.Error(), http.StatusInternalServerError)
return return
} }
srcFile, _, err := r.FormFile(constants.FORM_FILE_KEY) srcFile, _, err := r.FormFile(constants.FORM_FILE_KEY)
if err != nil { if err != nil {
slog.ErrorContext(r.Context(), "Parse Multipart Form", "err", err) errorUpload(r, w, taskid, "Form File", err)
http.Error(w, "Parse Multipart Form: "+err.Error(), http.StatusInternalServerError)
return return
} }
@ -94,17 +122,18 @@ func handleUpload(w http.ResponseWriter, r *http.Request) {
// MaxBytesReader closes the underlying io.Reader on its Close() is called // MaxBytesReader closes the underlying io.Reader on its Close() is called
defer src.Close() defer src.Close()
err = os.MkdirAll(filepath.Dir(path), 0775) // if we are not replacing then we dont know if the destination folder exists
if err != nil { if !tReplace {
slog.ErrorContext(r.Context(), "Creating Folder", "err", err) err = os.MkdirAll(filepath.Dir(path), 0775)
http.Error(w, "Creating Folder: "+err.Error(), http.StatusInternalServerError) if err != nil {
return errorUpload(r, w, taskid, "Creating Folder", err)
return
}
} }
out, err := os.Create(path) out, err := os.Create(path)
if err != nil { if err != nil {
slog.ErrorContext(r.Context(), "Creating File", "err", err) errorUpload(r, w, taskid, "Creating File", err)
http.Error(w, "Creating File: "+err.Error(), http.StatusInternalServerError)
return return
} }
defer out.Close() defer out.Close()
@ -113,17 +142,80 @@ func handleUpload(w http.ResponseWriter, r *http.Request) {
if err != nil { if err != nil {
var maxBytesError *http.MaxBytesError var maxBytesError *http.MaxBytesError
if errors.As(err, &maxBytesError) { if errors.As(err, &maxBytesError) {
slog.ErrorContext(r.Context(), "Parse Multipart Form", "err", err) slog.ErrorContext(r.Context(), "File to Large", "err", err)
http.Error(w, "Parse Multipart Form: "+err.Error(), http.StatusRequestEntityTooLarge) http.Error(w, "File to Large: "+err.Error(), http.StatusRequestEntityTooLarge)
return return
} }
slog.ErrorContext(r.Context(), "Failed to write the uploaded content", "err", err) errorUpload(r, w, taskid, "Failed to write the uploaded content", err)
http.Error(w, "Failed to write the uploaded content: "+err.Error(), http.StatusInternalServerError)
return return
} }
// TODO check file integrity
_ = hash
slog.InfoContext(r.Context(), "upload done", "written", written) slog.InfoContext(r.Context(), "upload done", "written", written)
out.Close()
// if we are replacing, then we need to change the original file to the uploaded file and update the hash in the database
if tReplace {
tx, err := db.Begin(r.Context())
if err != nil {
errorUpload(r, w, taskid, "Begin Transaction", err)
return
}
dstPath := filepath.Join(lPath, fPath)
slog.InfoContext(r.Context(), "Replacing Original file With Uploaded File", "fileid", id, "path", path, "dstPath", dstPath)
err = os.Rename(path, dstPath)
if err != nil {
errorUpload(r, w, taskid, "Replace File", err)
return
}
info, err := os.Stat(dstPath)
if err != nil {
errorUpload(r, w, taskid, "Stat File", err)
return
}
modTime := info.ModTime().UTC().Round(time.Second)
size := info.Size()
ffprobeData, err := ffprobe.ProbeURL(r.Context(), dstPath)
if err != nil {
errorUpload(r, w, taskid, "ffProbe File", err)
return
}
_, err = tx.Exec(r.Context(), "UPDATE files SET old_md5 = md5, old_size = size, old_ffprobe_data = ffprobe_data WHERE id = $1", fileid)
if err != nil {
errorUpload(r, w, taskid, "Copy Filed Current Values to Old Values", err)
return
}
_, err = tx.Exec(r.Context(), "UPDATE files SET md5 = $2, size = $3, mod_time = $4, ffprobe_data = $5 WHERE id = $1", fileid, hash, size, modTime, ffprobeData)
if err != nil {
errorUpload(r, w, taskid, "Set New File Values", err)
return
}
err = tx.Commit(r.Context())
if err != nil {
errorUpload(r, w, taskid, "Commit File Hash", err)
return
}
slog.InfoContext(r.Context(), "Original file Replaced with Uploaded File", "fileid", id, "dstPath", dstPath)
} else {
// TODO implement "old" fields for non replace libraries, scan also needs to use old field if it is is non replace and the file has been transcoded
}
}
func errorUpload(r *http.Request, w http.ResponseWriter, taskID int, msg string, err error) {
slog.ErrorContext(r.Context(), msg, "err", err)
http.Error(w, msg+": "+err.Error(), http.StatusInternalServerError)
if taskID != 0 {
_, err2 := db.Exec(context.TODO(), "UPDATE tasks SET log = log || $2 WHERE id = $1", taskID, []string{fmt.Sprintf("%v MASTER: upload error: "+msg+": "+err.Error(), time.Now())})
if err != nil {
slog.ErrorContext(r.Context(), "Updating task log with upload error", "err", err2)
}
}
} }

View file

@ -247,7 +247,8 @@ func updateWorkerTaskStatus(ctx context.Context) {
var health constants.FileHealth var health constants.FileHealth
if ts.Task.Status == constants.TASK_STATUS_SUCCESS { if ts.Task.Status == constants.TASK_STATUS_SUCCESS {
health = constants.FILE_HEALTH_HEALTHY health = constants.FILE_HEALTH_HEALTHY
// TODO Auto Queue Transcode for Successfull Transcodes if library setting // Auto Queue Transcode for Successfull Transcodes if library setting say so
go queueOnHealth(context.TODO(), ts.Task.ID)
} else { } else {
// TODO, not all failures mean the file is damaged, only update on success and track ffmpeg errors in task result data. also remove -xerror and scan for errors manually to see all problems in logs // TODO, not all failures mean the file is damaged, only update on success and track ffmpeg errors in task result data. also remove -xerror and scan for errors manually to see all problems in logs
health = constants.FILE_HEALTH_DAMAGED health = constants.FILE_HEALTH_DAMAGED
@ -294,3 +295,26 @@ func updateWorkerTaskStatus(ctx context.Context) {
wg.Wait() wg.Wait()
} }
func queueOnHealth(ctx context.Context, taskID int) {
var transcode_command_id *int
var file_id *int
err := db.QueryRow(ctx, `SELECT libraries.transcode_command_id, files.id
FROM tasks
INNER JOIN files ON files.id = tasks.file_id
INNER JOIN libraries ON files.library_id = libraries.id
WHERE tasks.id = $1 AND libraries.health_ok_queue_transcode = true AND libraries.transcode_command_id IS NOT NULL`, taskID).
Scan(&transcode_command_id, &file_id)
if err == pgx.ErrNoRows {
slog.DebugContext(ctx, "No need to queue Transcode", "task_id", taskID)
} else if err != nil {
slog.ErrorContext(ctx, "Unable to Query if and how we should queue a Transcode on Health Check Success", "err", err)
return
}
slog.InfoContext(ctx, "Queueing Transcode On HealthCheck Success", "health_task_id", taskID, "transcode_command_id", transcode_command_id)
_, err = db.Exec(ctx, "INSERT INTO tasks (file_id, type, status, data, ffmpeg_command_id) VALUES ($1,$2,$3,$4,$5)", file_id, constants.TASK_TYPE_TRANSCODE, constants.TASK_STATUS_QUEUED, nil, transcode_command_id)
if err != nil {
slog.ErrorContext(ctx, "Queueing Transcode On HealthCheck Success", "err", err, "health_task_id", taskID, "transcode_command_id", transcode_command_id)
}
}

45
static/js/echarts.min.js vendored Normal file

File diff suppressed because one or more lines are too long

View file

@ -0,0 +1,7 @@
function setLibrary(library_id) {
if (library_id == "-1") {
window.location.href = "/stats";
} else {
window.location.href = "/stats/" + library_id;
}
}

View file

@ -3,58 +3,65 @@
@import url("component.css"); @import url("component.css");
:root { :root {
font-family: monospace; font-family: monospace;
font-size: 12px; font-size: 12px;
overflow-y: auto; overflow-y: auto;
} }
body { body {
padding: 0.5rem; padding: 0.5rem;
gap: 0.5rem; gap: 0.5rem;
} }
.counter-image { .counter-image {
image-rendering: pixelated; image-rendering: pixelated;
image-rendering: -moz-crisp-edges; image-rendering: -moz-crisp-edges;
image-rendering: crisp-edges; image-rendering: crisp-edges;
flex-grow: 1; flex-grow: 1;
max-height: 50vh;
max-width: 22.5vh;
} }
.counter { .counter {
flex-flow: row nowrap; flex-flow: row nowrap;
align-items: end; align-items: end;
font-size: 1.5rem; font-size: 1.5rem;
font-weight: bold; font-weight: bold;
} }
.workers { .workers {
flex-flow: row wrap; flex-flow: row wrap;
gap: 2rem; gap: 2rem;
} }
.workers>* { .workers > * {
gap: 0.5rem; gap: 0.5rem;
border: 1px solid var(--fg); border: 1px solid var(--fg);
} }
.log>p { .log > p {
padding: 0.25rem 0; padding: 0.25rem 0;
user-select: text; user-select: text;
} }
.log>p:hover { .log > p:hover {
background: var(--cl-hl); background: var(--cl-hl);
} }
nav> :first-child { nav > :first-child {
font-size: 2rem; font-size: 2rem;
font-weight: bold; font-weight: bold;
} }
.short-button { .short-button {
align-self: start; align-self: start;
} }
.button-list { .button-list {
flex-direction: row; flex-direction: row;
} }
.stats {
flex-flow: row wrap;
gap: 2rem;
}

View file

@ -17,7 +17,7 @@ func RunHealthCheck(conf config.Config, t *types.Task, data types.HealthCheckDat
l := log.GetTaskLogger(t) l := log.GetTaskLogger(t)
// TODO Figure out how to get correct file ending // TODO Figure out how to get correct file ending
path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-health-%v-%v.mkv", t.ID, t.FileID)) path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-health-%v-%v."+t.FileExtension, t.ID, t.FileID))
// Set ffmpeg input path // Set ffmpeg input path
if len(t.FfmpegCommand.InputFiles) == 0 { if len(t.FfmpegCommand.InputFiles) == 0 {

View file

@ -27,6 +27,7 @@ func StartTask(conf config.Config, data types.TaskStart) error {
Type: data.Type, Type: data.Type,
FileID: data.FileID, FileID: data.FileID,
FileMD5: data.FileMD5, FileMD5: data.FileMD5,
FileExtension: data.FileExtension,
FfmpegCommand: data.FfmpegCommand, FfmpegCommand: data.FfmpegCommand,
} }

View file

@ -17,8 +17,8 @@ func RunTranscode(conf config.Config, t *types.Task, data types.TranscodeData) {
l := log.GetTaskLogger(t) l := log.GetTaskLogger(t)
// TODO Figure out how to get correct file ending // TODO Figure out how to get correct file ending
src_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-src-%v-%v.mkv", t.ID, t.FileID)) src_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-src-%v-%v."+t.FileExtension, t.ID, t.FileID))
dst_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-dst-%v-%v.mkv", t.ID, t.FileID)) dst_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-dst-%v-%v."+t.FileExtension, t.ID, t.FileID))
// Set ffmpeg input path // Set ffmpeg input path
if len(t.FfmpegCommand.InputFiles) == 0 { if len(t.FfmpegCommand.InputFiles) == 0 {

View file

@ -9,6 +9,7 @@ import (
"mime/multipart" "mime/multipart"
"net/http" "net/http"
"os" "os"
"strconv"
"time" "time"
"git.lastassault.de/speatzle/morffix/config" "git.lastassault.de/speatzle/morffix/config"
@ -82,6 +83,7 @@ func uploadFile(ctx context.Context, l *slog.Logger, conf config.Config, path st
req.Header.Add(constants.UUID_HEADER, uuid.String()) req.Header.Add(constants.UUID_HEADER, uuid.String())
req.Header.Add(constants.WORKER_VERSION_HEADER, constants.WORKER_VERSION) req.Header.Add(constants.WORKER_VERSION_HEADER, constants.WORKER_VERSION)
req.Header.Add(constants.HASH_HEADER, base64.StdEncoding.EncodeToString(hash)) req.Header.Add(constants.HASH_HEADER, base64.StdEncoding.EncodeToString(hash))
req.Header.Add(constants.TASK_ID_HEADER, strconv.Itoa(t.ID))
req.Header.Add("Content-Type", "multipart/form-data; boundary=\""+m.Boundary()+"\"") req.Header.Add("Content-Type", "multipart/form-data; boundary=\""+m.Boundary()+"\"")
var client = &http.Client{ var client = &http.Client{
@ -97,7 +99,8 @@ func uploadFile(ctx context.Context, l *slog.Logger, conf config.Config, path st
l.InfoContext(ctx, "Upload Done") l.InfoContext(ctx, "Upload Done")
if resp.StatusCode != http.StatusOK { if resp.StatusCode != http.StatusOK {
return fmt.Errorf("Got HTTP Status Code: %v", resp.StatusCode) body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("Got HTTP Status Code: %v Body: %v", resp.StatusCode, string(body))
} }
return nil return nil

View file

@ -3,7 +3,8 @@
<html> <html>
<head> <head>
<link rel="stylesheet" href="/static/style/style.css"> <link rel="stylesheet" href="/static/style/style.css">
<script src="/static/js/echarts.min.js"></script>
</head> </head>
<body> <body>
{{template "navbar"}} {{template "navbar"}}
{{end}} {{end}}

View file

@ -31,6 +31,24 @@
</select> </select>
<input type="submit" value="Submit"> <input type="submit" value="Submit">
</form> </form>
<h2>Set Parallel Tasks</h2>
<form method="POST" action= "/parallel_tasks">
<label for="worker">Worker:</label>
<select id="worker" name="worker">
<option value="all">All</option>
{{range $w := .Workers}}
<option value="{{$w.ID}}">{{$w.Name}}</option>
{{end}}
</select>
<label for="parallel_tasks">Parallel Tasks</label>
<select id="parallel_tasks" name="parallel_tasks">
<option value="1">1</option>
<option value="2">2</option>
<option value="3">3</option>
<option value="4">4</option>
</select>
<input type="submit" value="Submit">
</form>
<h2>Workers</h2> <h2>Workers</h2>
<div class="workers"> <div class="workers">
{{range $w := .Workers}} {{range $w := .Workers}}
@ -60,6 +78,22 @@
<td> <td>
{{ $w.ConnectionChanged }} {{ $w.ConnectionChanged }}
</td> </td>
</tr>
<tr>
<td>
QueueEnable
</td>
<td>
{{ $w.QueueEnable }}
</td>
</tr>
<tr>
<td>
ParallelTasks
</td>
<td>
{{ $w.ParallelTasks }}
</td>
</tr> </tr>
{{if $w.Connected}} {{if $w.Connected}}
<tr> <tr>
@ -95,8 +129,8 @@
</td> </td>
</tr> </tr>
{{end}} {{end}}
</table> </table>
</div> </div>
{{end}} {{end}}
</div> </div>
{{template "tail"}} {{template "tail"}}

View file

@ -13,6 +13,8 @@
<th>ScanNewQueueTranscode</th> <th>ScanNewQueueTranscode</th>
<th>ScanChangedQueueTranscode</th> <th>ScanChangedQueueTranscode</th>
<th>HealthOkQueueTranscode</th> <th>HealthOkQueueTranscode</th>
<th>HealthCommand</th>
<th>TranscodeCommand</th>
</tr> </tr>
{{range $l := .Libraries}} {{range $l := .Libraries}}
<tr onclick="window.location='/libraries/{{ $l.ID }}';"> <tr onclick="window.location='/libraries/{{ $l.ID }}';">
@ -49,6 +51,12 @@
<td> <td>
{{ $l.HealthOkQueueTranscode }} {{ $l.HealthOkQueueTranscode }}
</td> </td>
<td>
{{ $l.HealthCommandID }}
</td>
<td>
{{ $l.TranscodeCommandID }}
</td>
</tr> </tr>
{{end}} {{end}}
</table> </table>
@ -67,14 +75,28 @@
<input type="text" name="transcode_path"> <input type="text" name="transcode_path">
<label>Queue Heathcheck for new files on Scan:</label> <label>Queue Heathcheck for new files on Scan:</label>
<input type="checkbox" name="scan_new_queue_health"> <input type="checkbox" name="scan_new_queue_health">
<label>Queue Heathcheck for changed files on Scan:</label> <label>Queue Heathcheck for changed files on Scan (Not Implemented):</label>
<input type="checkbox" name="scan_changed_queue_health"> <input type="checkbox" name="scan_changed_queue_health">
<label>Queue Transcode for new files on Scan:</label> <label>Queue Transcode for new files on Scan (Not Implemented):</label>
<input type="checkbox" name="scan_new_queue_transcode"> <input type="checkbox" name="scan_new_queue_transcode">
<label>Queue Transcode for changed files on Scan:</label> <label>Queue Transcode for changed files on Scan (Not Implemented):</label>
<input type="checkbox" name="scan_changed_queue_transcode"> <input type="checkbox" name="scan_changed_queue_transcode">
<label>Queue Transcode on Healthcheck Success:</label> <label>Queue Transcode on Healthcheck Success:</label>
<input type="checkbox" name="health_ok_queue_transcode"> <input type="checkbox" name="health_ok_queue_transcode">
<label for="health_command_id">Health Command:</label>
<select id="health_command_id" name="health_command_id">
<option value="">None</option>
{{range $l := .FfmpegCommands}}
<option value="{{$l.ID}}">{{$l.Name}}</option>
{{end}}
</select>
<label for="transcode_command_id">Transcode Command:</label>
<select id="transcode_command_id" name="transcode_command_id">
<option value="">None</option>
{{range $l := .FfmpegCommands}}
<option value="{{$l.ID}}">{{$l.Name}}</option>
{{end}}
</select>
<input type="submit" value="Submit"> <input type="submit" value="Submit">
</form> </form>
{{template "tail"}} {{template "tail"}}

View file

@ -49,4 +49,4 @@
{{end}} {{end}}
</table> </table>
</div> </div>
{{template "tail"}} {{template "tail"}}

View file

@ -4,5 +4,6 @@
<a class="button" href="/libraries">Libraries</a> <a class="button" href="/libraries">Libraries</a>
<a class="button" href="/tasks">Tasks</a> <a class="button" href="/tasks">Tasks</a>
<a class="button" href="/ffmpeg_commands">FFmpeg Commands</a> <a class="button" href="/ffmpeg_commands">FFmpeg Commands</a>
<a class="button" href="/stats">Stats</a>
</nav> </nav>
{{end}} {{end}}

69
tmpl/stats.tmpl Normal file
View file

@ -0,0 +1,69 @@
{{template "head"}}
<script src="/static/js/library_filter.js"></script>
<h2>Stats</h2>
<label for="library">Library:</label>
<select id="library" name="library" onchange="setLibrary(this.value)" class="short-button">
<option {{if eq .SelectedLibrary "-1" }} selected {{end}}value="-1">ALL</option>
{{range $l := $.Libraries}}
<option {{if eq $.SelectedLibrary $l.ID }} selected {{end}}value="{{$l.ID}}">{{$l.Name}}</option>
{{end}}
</select>
<h4>Size</h4>
<div class="counter">
{{ $l := len .Size }}
{{range $i, $c := .Size}}
<img class="counter-image" alt="{{$c}}" src="/static/counter/{{$c}}.gif">
{{ $n := sub $l $i }}
{{ if eq $n 4 }}
KB
{{ else if eq $n 7 }}
MB
{{ else if eq $n 10 }}
GB
{{ else if eq $n 13 }}
TB
{{end}}
{{end}}
</div>
<h4>Files</h4>
<div class="counter">
{{ $l := len .Count }}
{{range $i, $c := .Count}}
<img class="counter-image" alt="{{$c}}" src="/static/counter/{{$c}}.gif">
{{ $n := sub $l $i }}
{{ if eq $n 4 }}
.
{{ else if eq $n 7 }}
.
{{ else if eq $n 10 }}
.
{{ else if eq $n 13 }}
.
{{end}}
{{end}}
</div>
<h4>Saved</h4>
<div class="counter">
{{ $l := len .Saved }}
{{range $i, $c := .Saved}}
<img class="counter-image" alt="{{$c}}" src="/static/counter/{{$c}}.gif">
{{ $n := sub $l $i }}
{{ if eq $n 4 }}
KB
{{ else if eq $n 7 }}
MB
{{ else if eq $n 10 }}
GB
{{ else if eq $n 13 }}
TB
{{end}}
{{end}}
</div>
<div class="stats">
{{range $c := .Charts}}
{{$c.Element}} {{$c.Script}}
{{end}}
</div>
{{template "tail"}}

View file

@ -10,6 +10,7 @@ type TaskStart struct {
ID int `json:"id"` ID int `json:"id"`
FileID int `json:"file_id"` FileID int `json:"file_id"`
FileMD5 []byte `json:"file_md5"` FileMD5 []byte `json:"file_md5"`
FileExtension string `json:"file_extension"`
Type constants.TaskType `json:"type"` Type constants.TaskType `json:"type"`
Data json.RawMessage Data json.RawMessage
FfmpegCommand FFmpegCommand `json:"ffmpeg_command"` FfmpegCommand FFmpegCommand `json:"ffmpeg_command"`
@ -19,6 +20,7 @@ type Task struct {
ID int `json:"id"` ID int `json:"id"`
FileID int `json:"file_id"` FileID int `json:"file_id"`
FileMD5 []byte `json:"md5"` FileMD5 []byte `json:"md5"`
FileExtension string `json:"file_extension"`
Type constants.TaskType `json:"type"` Type constants.TaskType `json:"type"`
Status constants.TaskStatus `json:"status"` Status constants.TaskStatus `json:"status"`
FfmpegCommand FFmpegCommand `json:"ffmpeg_command"` FfmpegCommand FFmpegCommand `json:"ffmpeg_command"`

View file

@ -2,6 +2,7 @@ package worker
import ( import (
"context" "context"
"time"
"github.com/mackerelio/go-osstat/cpu" "github.com/mackerelio/go-osstat/cpu"
"github.com/mackerelio/go-osstat/memory" "github.com/mackerelio/go-osstat/memory"
@ -14,13 +15,31 @@ func init() {
rpcServer.RegisterMethod("status", status) rpcServer.RegisterMethod("status", status)
} }
var cpuBefore *cpu.Stats
var cpuUsage uint64
var cpuCount uint64
func calcUsage() {
for {
cStats, _ := cpu.Get()
if cStats != nil {
cpuUsage = cStats.Total
cpuCount = uint64(cStats.CPUCount)
if cpuBefore != nil {
total := float64(cStats.Total - cpuBefore.Total)
cpuUsage = uint64(float64((cStats.User-cpuBefore.User)+(cStats.System-cpuBefore.System)) / total * 100)
}
cpuBefore = cStats
}
time.Sleep(time.Second)
}
}
func status(ctx context.Context, req rpc.Request) (any, error) { func status(ctx context.Context, req rpc.Request) (any, error) {
s := types.WorkerStatus{} s := types.WorkerStatus{}
cStats, _ := cpu.Get() s.CPUUsage = cpuUsage
if cStats != nil { s.CPUCount = cpuCount
s.CPUUsage = cStats.Total
s.CPUCount = uint64(cStats.CPUCount)
}
mStats, _ := memory.Get() mStats, _ := memory.Get()
if mStats != nil { if mStats != nil {
s.MemoryUsage = uint64(float64(mStats.Used) / float64(mStats.Total) * 100) s.MemoryUsage = uint64(float64(mStats.Used) / float64(mStats.Total) * 100)

View file

@ -60,6 +60,9 @@ func Start(_conf config.Config) {
exit = true exit = true
cancel() cancel()
}() }()
go calcUsage()
for { for {
if exit { if exit {
slog.InfoContext(ctx, "Done") slog.InfoContext(ctx, "Done")

67
workerpool/workerpool.go Normal file
View file

@ -0,0 +1,67 @@
package workerpool
import (
"context"
"sync"
)
type WorkerPool struct {
workers int
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
queue chan func(context.Context)
qg sync.WaitGroup
}
func NewWorkerPool(workers int, queueSize int) *WorkerPool {
ctx, cancel := context.WithCancel(context.Background())
queue := make(chan func(context.Context), queueSize)
workerPool := WorkerPool{
workers: workers,
ctx: ctx,
cancel: cancel,
queue: queue,
}
for i := 0; i < workerPool.workers; i++ {
workerPool.wg.Add(1)
go workerPool.work(workerPool.ctx)
}
return &workerPool
}
func (wp *WorkerPool) Stop() {
wp.cancel()
wp.wg.Wait()
}
func (wp *WorkerPool) QueueJob(job func(context.Context)) {
wp.qg.Add(1)
wp.queue <- job
}
func (wp *WorkerPool) WaitForEmptyQueue() {
wp.qg.Wait()
}
func (wp *WorkerPool) QueueLength() int {
return len(wp.queue)
}
func (wp *WorkerPool) work(ctx context.Context) {
for {
select {
case <-ctx.Done():
wp.wg.Done()
return
case job := <-wp.queue:
func() {
defer wp.qg.Done()
job(ctx)
}()
}
}
}