Compare commits

...

65 commits

Author SHA1 Message Date
15a960da19 fix sql, counter show 0
All checks were successful
/ release (push) Successful in 46s
2025-03-19 23:13:08 +01:00
a5fc856764 Saved Stat
All checks were successful
/ release (push) Successful in 48s
2025-03-19 23:07:11 +01:00
e7371850d1 Set Parallel Tasks, Respect which worker
All checks were successful
/ release (push) Successful in 47s
2025-03-19 21:50:24 +01:00
1e4016d075 fix parallel_tasks form
All checks were successful
/ release (push) Successful in 43s
2025-03-19 21:46:35 +01:00
553392e73f fix template
All checks were successful
/ release (push) Successful in 44s
2025-03-19 21:44:11 +01:00
57969cc07d Add Parallel Tasks Settings
All checks were successful
/ release (push) Successful in 44s
2025-03-19 21:39:46 +01:00
8c3ba7e883 pass missing field
All checks were successful
/ release (push) Successful in 43s
2025-03-19 21:06:36 +01:00
5ecfd81967 preserve file extension
All checks were successful
/ release (push) Successful in 52s
2025-03-19 20:59:38 +01:00
0ce5b46449 fix sql, improve errors, run ffprobe
All checks were successful
/ release (push) Successful in 47s
2025-03-19 17:47:40 +01:00
64fd3da925 fix struct
All checks were successful
/ release (push) Successful in 41s
2025-03-19 13:33:57 +01:00
fa1deed55a fix migrations
All checks were successful
/ release (push) Successful in 42s
2025-03-19 13:32:37 +01:00
1af3a3d5b9 add missing fields to struct
All checks were successful
/ release (push) Successful in 44s
2025-03-19 13:27:15 +01:00
b98be1fd70 copy over ffprobe_data
All checks were successful
/ release (push) Successful in 43s
2025-03-19 13:18:04 +01:00
808953f46f Add old fields, fix md5, fix migratins.
All checks were successful
/ release (push) Successful in 50s
2025-03-19 13:11:03 +01:00
d25890ad26 Log HTTP Bodye on Upload Error into Task
All checks were successful
/ release (push) Successful in 46s
2025-03-19 11:06:18 +01:00
2630ebb0cf Fix sql
All checks were successful
/ release (push) Successful in 48s
2025-03-19 09:57:18 +00:00
d0d2529570 Update mod_time and size as to not confuse the scan
All checks were successful
/ release (push) Successful in 43s
2025-03-19 00:36:19 +01:00
8a6390fb8d Prioritize Transcode Tasks 2025-03-19 00:35:36 +01:00
a2efbdea4c Implement Transcode Replace mode
All checks were successful
/ release (push) Successful in 1m10s
2025-03-18 23:35:28 +01:00
a6e10369bb Update worker version, add TASK ID HEADER 2025-03-18 23:34:50 +01:00
f2e2236653 fix sql
All checks were successful
/ release (push) Successful in 29s
2024-10-11 17:45:23 +02:00
9298f4cccf fix sql
All checks were successful
/ release (push) Successful in 28s
2024-10-11 17:22:08 +02:00
e05051f34d fix template
All checks were successful
/ release (push) Successful in 29s
2024-10-11 17:17:42 +02:00
c87dab94a0 fix sql
All checks were successful
/ release (push) Successful in 29s
2024-10-11 17:15:38 +02:00
12f700d1d0 Implement Queue Health On New File and Queue Transcode on Health Success
All checks were successful
/ release (push) Successful in 32s
2024-10-11 17:07:52 +02:00
f08a07e87d fix tasks
All checks were successful
/ release (push) Successful in 29s
2024-10-08 16:51:36 +02:00
a60f05be12 add none
All checks were successful
/ release (push) Successful in 28s
2024-10-08 16:48:12 +02:00
6927e31a62 fix template
All checks were successful
/ release (push) Successful in 29s
2024-10-08 16:46:51 +02:00
011f97a3e6 Add Comand to Library
All checks were successful
/ release (push) Successful in 32s
2024-10-08 16:42:25 +02:00
f401127127 fix css
All checks were successful
/ release (push) Successful in 29s
2024-10-08 15:07:15 +02:00
20164f8983 limit counter size
All checks were successful
/ release (push) Successful in 30s
2024-10-08 14:50:24 +02:00
275657e584 adjust css
All checks were successful
/ release (push) Successful in 28s
2024-10-08 13:56:54 +02:00
dbaa8f1de7 add counters
All checks were successful
/ release (push) Successful in 29s
2024-10-08 13:49:01 +02:00
dcbddfc6cd fix js
All checks were successful
/ release (push) Successful in 29s
2024-10-08 13:01:59 +02:00
849b4cf004 fix template
All checks were successful
/ release (push) Successful in 28s
2024-10-08 12:55:54 +02:00
91e3504abe fix template
All checks were successful
/ release (push) Successful in 28s
2024-10-08 12:49:32 +02:00
29c7cdd0bf fix sql
All checks were successful
/ release (push) Successful in 28s
2024-10-08 12:48:12 +02:00
cee5353b54 make -1 all
All checks were successful
/ release (push) Successful in 30s
2024-10-08 12:44:23 +02:00
b6c22df6f1 fix template
All checks were successful
/ release (push) Successful in 31s
2024-10-08 12:36:03 +02:00
7d037be106 Add Library filter to Stats
All checks were successful
/ release (push) Successful in 1m7s
2024-10-08 11:30:04 +02:00
9d2519c085 fix cpu usage reporting
All checks were successful
/ release (push) Successful in 29s
2024-07-14 05:04:25 +02:00
f818a3525b Adjust more colors
All checks were successful
/ release (push) Successful in 30s
2024-07-14 04:15:25 +02:00
043d81b823 Set Chart Colors
All checks were successful
/ release (push) Successful in 28s
2024-07-14 03:54:00 +02:00
04a05284ed deduplicate days
All checks were successful
/ release (push) Successful in 28s
2024-07-14 03:11:22 +02:00
8b6e20ba00 limit task status chart to 7 days
All checks were successful
/ release (push) Successful in 28s
2024-07-14 02:36:53 +02:00
64315356ae Fix Order by
All checks were successful
/ release (push) Successful in 28s
2024-07-14 02:33:17 +02:00
4c3df8fef6 ORder TAsk Status
All checks were successful
/ release (push) Successful in 28s
2024-07-14 02:31:10 +02:00
a75ce3287d Add Task Status Per Day Bar Chart
All checks were successful
/ release (push) Successful in 27s
2024-07-14 02:18:10 +02:00
2e360f4b20 Improve Stats group by 2024-07-14 00:37:16 +02:00
cd78de96b4 Add Resolution chart
All checks were successful
/ release (push) Successful in 29s
2024-07-14 00:08:07 +02:00
8e15cdd6e8 Rework chart gen. Add Health, Tanscode, Task Status Charts
All checks were successful
/ release (push) Successful in 32s
2024-07-13 23:18:47 +02:00
091ef322d5 Fix Chart appearance
All checks were successful
/ release (push) Successful in 30s
2024-07-13 03:46:55 +02:00
38219f7b07 Add Stats Chart
All checks were successful
/ release (push) Successful in 39s
2024-07-13 03:33:43 +02:00
0f5d842a64 Add stats page
All checks were successful
/ release (push) Successful in 29s
2024-07-13 01:45:43 +02:00
8ba0e8f2ab fix sql column
All checks were successful
/ release (push) Successful in 27s
2024-07-12 23:15:40 +02:00
03eb3541a5 Only hash files if library is enabled
All checks were successful
/ release (push) Successful in 30s
2024-07-12 23:04:57 +02:00
1871e1c26f increase hash limit
All checks were successful
/ release (push) Successful in 27s
2024-07-11 22:58:48 +02:00
81fa489417 fix row collection
All checks were successful
/ release (push) Successful in 28s
2024-07-11 22:55:39 +02:00
aeb47c2593 User workerpool for scanning and hashing
All checks were successful
/ release (push) Successful in 31s
2024-07-11 22:47:26 +02:00
f12462250a Set Correct Mod Time for New Files 2024-07-11 19:24:15 +02:00
bae1112074 remove extra commit
All checks were successful
/ release (push) Successful in 28s
2024-07-07 03:50:16 +02:00
5d6a29407d Fix Subseqent scan chaning all New and Changed File to Exists. Brick Missing for now. Make new file async
Some checks failed
/ release (push) Failing after 11s
2024-07-07 03:49:21 +02:00
4019d1764e Reduce non video file log level
All checks were successful
/ release (push) Successful in 27s
2024-07-07 03:14:37 +02:00
67aa389f79 dont error scan on ffprobe failure
All checks were successful
/ release (push) Successful in 26s
2024-07-07 02:18:09 +02:00
75a578fd36 Merge pull request 'scan_rework' (#1) from scan_rework into main
All checks were successful
/ release (push) Successful in 27s
Reviewed-on: #1
2024-07-06 23:53:27 +00:00
38 changed files with 1112 additions and 194 deletions

View file

@ -2,13 +2,14 @@ package constants
import "fmt"
const WORKER_VERSION = "v1"
const WORKER_VERSION = "v2"
const WORKER_VERSION_HEADER = "morffix-version"
const SHARED_SECRET_HEADER = "morffix-secret"
const NAME_HEADER = "morffix-name"
const UUID_HEADER = "morffix-uuid"
const HASH_HEADER = "morffix-hash"
const TASK_ID_HEADER = "morffix-task-id"
const INDEX_TEMPLATE_NAME = "index.tmpl"
const LIBRARIES_TEMPLATE_NAME = "libraries.tmpl"
@ -17,6 +18,7 @@ const MESSAGE_TEMPLATE_NAME = "message.tmpl"
const TASKS_TEMPLATE_NAME = "tasks.tmpl"
const TASK_TEMPLATE_NAME = "task.tmpl"
const FFMPEG_COMMANDS_TEMPLATE_NAME = "ffmpeg_commands.tmpl"
const STATS_TEMPLATE_NAME = "stats.tmpl"
const FORM_FILE_KEY = "file"
@ -25,6 +27,8 @@ const SORT_ORDER_ASC_PARAM = "order_asc"
const QUERY_LIMIT_PARAM = "limit"
const QUERY_PAGE_PARAM = "page"
const TEMP_FILE_EXTENSION = ".morffix"
type TaskType int
const (

1
go.mod
View file

@ -12,6 +12,7 @@ require (
)
require (
github.com/go-echarts/go-echarts/v2 v2.4.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 // indirect

2
go.sum
View file

@ -2,6 +2,8 @@ github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8
github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-echarts/go-echarts/v2 v2.4.0 h1:efD46dmAvaZEWrBHAGjE8cfDK48vvFTHz5N9VqW5rYc=
github.com/go-echarts/go-echarts/v2 v2.4.0/go.mod h1:56YlvzhW/a+du15f3S2qUGNDfKnFOeJSThBIrVFHDtI=
github.com/golang-migrate/migrate/v4 v4.17.1 h1:4zQ6iqL6t6AiItphxJctQb3cFqWiSpMnX7wLTPnnYO4=
github.com/golang-migrate/migrate/v4 v4.17.1/go.mod h1:m8hinFyWBn0SA4QKHuKh175Pm9wjmxj3S2Mia7dbXzM=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=

View file

@ -1,2 +1,2 @@
ALTER TABLE files
DROP IF EXISTS ffprobe;
DROP IF EXISTS ffprobe_data;

View file

@ -0,0 +1,3 @@
ALTER TABLE libraries
DROP IF EXISTS transcode_command_id,
DROP IF EXISTS health_command_id;

View file

@ -0,0 +1,3 @@
ALTER TABLE libraries
ADD transcode_command_id bigint REFERENCES ffmpeg_commands(id),
ADD health_command_id bigint REFERENCES ffmpeg_commands(id);

View file

@ -0,0 +1,4 @@
ALTER TABLE files
DROP IF EXISTS old_md5,
DROP IF EXISTS old_ffprobe_data,
DROP IF EXISTS old_size;

View file

@ -0,0 +1,3 @@
ALTER TABLE files ADD old_md5 uuid,
ADD old_ffprobe_data JSONB,
ADD old_size bigint;

View file

@ -0,0 +1,2 @@
ALTER TABLE workers
DROP IF EXISTS parallel_tasks;

View file

@ -0,0 +1,2 @@
ALTER TABLE workers
ADD parallel_tasks smallint NOT NULL DEFAULT 1;

View file

@ -18,8 +18,9 @@ type IndexData struct {
type IndexWorker struct {
ID string
Worker
Status *types.WorkerStatus
QueueEnable bool
Status *types.WorkerStatus
QueueEnable bool
ParallelTasks int
}
func handleIndex(w http.ResponseWriter, r *http.Request) {
@ -31,6 +32,15 @@ func handleIndex(w http.ResponseWriter, r *http.Request) {
WorkersMutex.Lock()
defer WorkersMutex.Unlock()
for i := range Workers {
var queueEnable bool
var parallelTasks int
err := db.QueryRow(r.Context(), "SELECT queue_enable, parallel_tasks FROM workers WHERE id = $1", i).Scan(&queueEnable, &parallelTasks)
if err != nil {
w.Write([]byte(err.Error()))
slog.ErrorContext(r.Context(), "Error Querying Worker", "err", err)
}
if Workers[i].Connected {
var status types.WorkerStatus
_, err := rpcServer.Call(r.Context(), Workers[i].Conn, "status", nil, &status)
@ -41,15 +51,19 @@ func handleIndex(w http.ResponseWriter, r *http.Request) {
}
slog.InfoContext(r.Context(), "Got Worker Status", "id", i, "status", status)
data.Workers = append(data.Workers, IndexWorker{
ID: i,
Worker: *Workers[i],
Status: &status,
ID: i,
Worker: *Workers[i],
Status: &status,
QueueEnable: queueEnable,
ParallelTasks: parallelTasks,
})
} else {
data.Workers = append(data.Workers, IndexWorker{
ID: i,
Worker: *Workers[i],
Status: nil,
ID: i,
Worker: *Workers[i],
Status: nil,
QueueEnable: queueEnable,
ParallelTasks: parallelTasks,
})
}
}
@ -77,6 +91,10 @@ func handleIndex(w http.ResponseWriter, r *http.Request) {
}
func splitInt(n int) []int {
if n == 0 {
return []int{0}
}
slc := []int{}
for n > 0 {
slc = append(slc, n%10)

View file

@ -5,17 +5,21 @@ import (
"fmt"
"log/slog"
"net/http"
"strconv"
"git.lastassault.de/speatzle/morffix/constants"
"github.com/jackc/pgx/v5"
)
type LibrariesData struct {
Libraries []Library
Libraries []Library
FfmpegCommands []FfmpegCommand
}
type Library struct {
ID string `db:"id"`
HealthCommandID *int `db:"health_command_id"`
TranscodeCommandID *int `db:"transcode_command_id"`
Name string `db:"name"`
Path string `db:"path"`
Enable bool `db:"enable"`
@ -40,7 +44,7 @@ func handleLibraries(w http.ResponseWriter, r *http.Request) {
}
}
rows, err := db.Query(r.Context(), "SELECT id, name, path, enable, scan_new_queue_health, scan_changed_queue_health, scan_new_queue_transcode, scan_changed_queue_transcode, health_ok_queue_transcode, transcode_replace, transcode_path FROM libraries")
rows, err := db.Query(r.Context(), "SELECT * FROM libraries")
if err != nil {
slog.ErrorContext(r.Context(), "Query Libraries", "err", err)
http.Error(w, "Error Query Libraries: "+err.Error(), http.StatusInternalServerError)
@ -54,6 +58,20 @@ func handleLibraries(w http.ResponseWriter, r *http.Request) {
}
data.Libraries = libraries
rows, err = db.Query(r.Context(), "SELECT id, name, data FROM ffmpeg_commands")
if err != nil {
slog.ErrorContext(r.Context(), "Query Ffmpeg Commands", "err", err)
http.Error(w, "Error Ffmpeg Commands: "+err.Error(), http.StatusInternalServerError)
return
}
ffmpegCommands, err := pgx.CollectRows[FfmpegCommand](rows, pgx.RowToStructByName[FfmpegCommand])
if err != nil {
slog.ErrorContext(r.Context(), "Collect Ffmpeg Commands", "err", err)
http.Error(w, "Error Collect Ffmpeg Commands: "+err.Error(), http.StatusInternalServerError)
return
}
data.FfmpegCommands = ffmpegCommands
buf := bytes.Buffer{}
err = templates.ExecuteTemplate(&buf, constants.LIBRARIES_TEMPLATE_NAME, data)
if err != nil {
@ -84,9 +102,21 @@ func createLibrary(r *http.Request) error {
scan_changed_queue_transcode := r.FormValue("scan_changed_queue_transcode") == "on"
health_ok_queue_transcode := r.FormValue("health_ok_queue_transcode") == "on"
var health_command_id *int = nil
var transcode_command_id *int = nil
h, err := strconv.Atoi(r.FormValue("health_command_id"))
if err == nil {
health_command_id = &h
}
t, err := strconv.Atoi(r.FormValue("transcode_command_id"))
if err == nil {
transcode_command_id = &t
}
slog.Info("Got Library Create", "name", name, "path", path, "enable", enable)
_, err = db.Exec(r.Context(), "INSERT INTO Libraries (name, path, enable, scan_new_queue_health, scan_changed_queue_health, scan_new_queue_transcode, scan_changed_queue_transcode, health_ok_queue_transcode, transcode_replace, transcode_path) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10)", name, path, enable, scan_new_queue_health, scan_changed_queue_health, scan_new_queue_transcode, scan_changed_queue_transcode, health_ok_queue_transcode, transcode_replace, transcode_path)
_, err = db.Exec(r.Context(), "INSERT INTO Libraries (name, path, enable, scan_new_queue_health, scan_changed_queue_health, scan_new_queue_transcode, scan_changed_queue_transcode, health_ok_queue_transcode, transcode_replace, transcode_path, health_command_id, transcode_command_id) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12)", name, path, enable, scan_new_queue_health, scan_changed_queue_health, scan_new_queue_transcode, scan_changed_queue_transcode, health_ok_queue_transcode, transcode_replace, transcode_path, health_command_id, transcode_command_id)
if err != nil {
return fmt.Errorf("Inserting Library: %w", err)
}

View file

@ -20,10 +20,12 @@ type File struct {
ID int `db:"id"`
Path string `db:"path"`
Size int64 `db:"size"`
OldSize *int64 `db:"old_size"`
Status constants.FileStatus `db:"status"`
Health constants.FileHealth `db:"health"`
Transcode constants.FileTranscode `db:"transcode"`
MD5 []byte `db:"md5"`
OldMD5 *[]byte `db:"old_md5"`
UpdatedAt time.Time `db:"updated_at"`
}
@ -63,7 +65,7 @@ func handleLibrary(w http.ResponseWriter, r *http.Request) {
Enable: enabled,
}
rows, err := db.Query(r.Context(), "SELECT id, path, size, status, health, transcode, md5, updated_at FROM files where library_id = $1", id)
rows, err := db.Query(r.Context(), "SELECT id, path, size, old_size, status, health, transcode, md5, old_md5, updated_at FROM files where library_id = $1", id)
if err != nil {
slog.ErrorContext(r.Context(), "Query Files", "err", err)
http.Error(w, "Error Query Files: "+err.Error(), http.StatusInternalServerError)

41
server/parallel_tasks.go Normal file
View file

@ -0,0 +1,41 @@
package server
import (
"fmt"
"log/slog"
"net/http"
"strconv"
)
func HandleSetParallelTasks(w http.ResponseWriter, r *http.Request) {
err := r.ParseForm()
if err != nil {
http.Error(w, fmt.Sprintf("Parseing Form: %v", err), http.StatusBadRequest)
return
}
parallelTasks, err := strconv.Atoi(r.FormValue("parallel_tasks"))
if err != nil {
http.Error(w, "Can't parse parallel_tasks: "+err.Error(), http.StatusBadRequest)
return
}
worker := r.FormValue("worker")
slog.Info("Got set Parallel Tasks", "parallel_tasks", parallelTasks, "worker", worker)
if worker == "all" {
_, err = db.Exec(r.Context(), "UPDATE workers SET parallel_tasks = $1", parallelTasks)
if err != nil {
http.Error(w, fmt.Sprintf("Setting Worker Parallel Tasks: %v", err), http.StatusInternalServerError)
return
}
} else {
_, err = db.Exec(r.Context(), "UPDATE workers SET parallel_tasks = $1 where id = $2", parallelTasks, worker)
if err != nil {
http.Error(w, fmt.Sprintf("Setting Worker Parallel Tasks: %v", err), http.StatusInternalServerError)
return
}
}
http.Redirect(w, r, r.Header.Get("Referer"), http.StatusFound)
}

View file

@ -10,11 +10,11 @@ import (
"os"
"path/filepath"
"slices"
"strconv"
"sync"
"time"
"git.lastassault.de/speatzle/morffix/constants"
"git.lastassault.de/speatzle/morffix/workerpool"
"github.com/jackc/pgx/v5"
"gopkg.in/vansante/go-ffprobe.v2"
)
@ -22,44 +22,47 @@ import (
var videoFileExtensions = []string{".mkv", ".mp4", ".webm", ".flv", ".avi"}
func handleScan(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
scanStatus(w, r)
return
}
http.Error(w, "Not Implemented", http.StatusNotImplemented)
/*
if r.Method == "GET" {
scanStatus(w, r)
return
}
id, err := strconv.Atoi(r.FormValue("id"))
if err != nil {
http.Error(w, "Parsing ID", http.StatusBadRequest)
return
}
id, err := strconv.Atoi(r.FormValue("id"))
if err != nil {
http.Error(w, "Parsing ID", http.StatusBadRequest)
return
}
var name string
var path string
var enabled bool
err = db.QueryRow(r.Context(), "SELECT name, path, enable FROM libraries WHERE id = $1", id).Scan(&name, &path, &enabled)
if err != nil {
slog.ErrorContext(r.Context(), "Get Library", "err", err)
http.Error(w, "Error Get Library: "+err.Error(), http.StatusInternalServerError)
return
}
var name string
var path string
var enabled bool
err = db.QueryRow(r.Context(), "SELECT name, path, enable FROM libraries WHERE id = $1", id).Scan(&name, &path, &enabled)
if err != nil {
slog.ErrorContext(r.Context(), "Get Library", "err", err)
http.Error(w, "Error Get Library: "+err.Error(), http.StatusInternalServerError)
return
}
scanCtx := context.Background()
go scanLibrary(scanCtx, id)
scanCtx := context.Background()
go scanLibrary(scanCtx, id)
message := "Scan Started"
message := "Scan Started"
buf := bytes.Buffer{}
err = templates.ExecuteTemplate(&buf, constants.MESSAGE_TEMPLATE_NAME, message)
if err != nil {
slog.ErrorContext(r.Context(), "Executing Library Template", "err", err)
http.Error(w, "Error Executing Template: "+err.Error(), http.StatusInternalServerError)
return
}
buf := bytes.Buffer{}
err = templates.ExecuteTemplate(&buf, constants.MESSAGE_TEMPLATE_NAME, message)
if err != nil {
slog.ErrorContext(r.Context(), "Executing Library Template", "err", err)
http.Error(w, "Error Executing Template: "+err.Error(), http.StatusInternalServerError)
return
}
_, err = w.Write(buf.Bytes())
if err != nil {
slog.ErrorContext(r.Context(), "Writing http Response", "err", err)
}
_, err = w.Write(buf.Bytes())
if err != nil {
slog.ErrorContext(r.Context(), "Writing http Response", "err", err)
}
*/
}
func scanStatus(w http.ResponseWriter, r *http.Request) {
@ -79,7 +82,7 @@ func scanStatus(w http.ResponseWriter, r *http.Request) {
}
}
func manageScan(stop chan bool) {
func manageScan(stop chan bool, scanPool *workerpool.WorkerPool, hashPool *workerpool.WorkerPool) {
scanTicker := time.NewTicker(time.Minute)
hashTicker := time.NewTicker(time.Second)
scanRunning := false
@ -111,7 +114,7 @@ func manageScan(stop chan bool) {
}
for _, l := range libraries {
scanLibrary(ctx, l)
scanLibrary(ctx, l, scanPool)
}
}()
@ -126,19 +129,32 @@ func manageScan(stop chan bool) {
hashRunning = false
}()
var fileID uint
err := db.QueryRow(ctx, "SELECT id FROM files WHERE status = $1 OR status = $2 LIMIT 1", constants.FILE_STATUS_CHANGED, constants.FILE_STATUS_NEW).Scan(&fileID)
rows, err := db.Query(ctx, "SELECT f.id FROM files f INNER JOIN libraries l ON f.library_id = l.id WHERE l.enable = true AND (f.status = $1 OR f.status = $2) LIMIT 10", constants.FILE_STATUS_CHANGED, constants.FILE_STATUS_NEW)
if err != nil {
if !errors.Is(err, pgx.ErrNoRows) {
slog.ErrorContext(ctx, "Error Getting Files for Hashing", "err", err)
}
// Nothing to do
return
}
err = hashFile(ctx, fileID)
files, err := pgx.CollectRows[uint](rows, pgx.RowTo[uint])
if err != nil {
slog.ErrorContext(ctx, "Error Hashing File", "err", err)
slog.ErrorContext(ctx, "Collect File Rows", "err", err)
return
}
for _, f := range files {
hashPool.QueueJob(func(ctx context.Context) {
err = hashFile(ctx, f)
if err != nil {
slog.ErrorContext(ctx, "Error Hashing File", "err", err)
}
})
}
// Wait for all jobs to finish since we don't track which ones are in progress and don't want to hash the same file twice. Not ideal but better than just doing one at a time
hashPool.WaitForEmptyQueue()
}()
case <-stop:
cancel()
@ -149,7 +165,7 @@ func manageScan(stop chan bool) {
var scanLock sync.Mutex
func scanLibrary(ctx context.Context, id int) {
func scanLibrary(ctx context.Context, id int, scanPool *workerpool.WorkerPool) {
slog.InfoContext(ctx, "Acquiring Scan Lock...")
scanLock.Lock()
defer scanLock.Unlock()
@ -158,7 +174,9 @@ func scanLibrary(ctx context.Context, id int) {
var name string
var lpath string
var enabled bool
err := db.QueryRow(ctx, "SELECT name, path, enable FROM libraries WHERE id = $1", id).Scan(&name, &lpath, &enabled)
var health_command_id *int
var scan_new_queue_health bool
err := db.QueryRow(ctx, "SELECT name, path, enable, health_command_id, scan_new_queue_health FROM libraries WHERE id = $1", id).Scan(&name, &lpath, &enabled, &health_command_id, &scan_new_queue_health)
if err != nil {
slog.ErrorContext(ctx, "Get Library", "err", err)
return
@ -180,22 +198,17 @@ func scanLibrary(ctx context.Context, id int) {
return
}
tx, err := db.Begin(ctx)
if err != nil {
slog.ErrorContext(ctx, "Begin Transaction", "err", err)
return
}
defer tx.Rollback(ctx)
slog.InfoContext(ctx, "Checking Files...", "id", id, "path", lpath)
// Mark all Files as Missing
_, err = tx.Exec(ctx, "UPDATE files SET status = $2 where library_id = $1", id, constants.FILE_STATUS_MISSING)
if err != nil {
slog.ErrorContext(ctx, "Setting Missing Status", "err", err)
return
}
// TODO Work on getting missing status again
/*
// Mark all Files as Missing
_, err = tx.Exec(ctx, "UPDATE files SET status = $2 where library_id = $1", id, constants.FILE_STATUS_MISSING)
if err != nil {
slog.ErrorContext(ctx, "Setting Missing Status", "err", err)
return
}
*/
err = filepath.Walk(lpath,
func(fullPath string, info os.FileInfo, err error) error {
if errors.Is(err, os.ErrPermission) {
@ -211,7 +224,7 @@ func scanLibrary(ctx context.Context, id int) {
}
if !slices.Contains(videoFileExtensions, filepath.Ext(fullPath)) {
slog.InfoContext(ctx, "Skipping non video file", "path", fullPath)
slog.DebugContext(ctx, "Skipping non video file", "path", fullPath)
return nil
}
@ -223,52 +236,63 @@ func scanLibrary(ctx context.Context, id int) {
var fileID int
var oldSize uint
var oldModTime time.Time
err = tx.QueryRow(ctx, "SELECT id, size, mod_time FROM files WHERE library_id = $1 AND path = $2", id, fPath).Scan(&fileID, &oldSize, &oldModTime)
// Remove Timezone and Round to nearest Second
newModTime := info.ModTime().UTC().Round(time.Second)
err = db.QueryRow(ctx, "SELECT id, size, mod_time FROM files WHERE library_id = $1 AND path = $2", id, fPath).Scan(&fileID, &oldSize, &oldModTime)
if errors.Is(err, pgx.ErrNoRows) {
// File Does not Exist Yet
slog.InfoContext(ctx, "File is New, Running FFProbe...", "path", fullPath)
scanPool.QueueJob(
func(ctx context.Context) {
slog.InfoContext(ctx, "File is New, Running FFProbe...", "path", fullPath)
ffprobeData, err := ffprobe.ProbeURL(ctx, fullPath)
if err != nil {
return fmt.Errorf("ffprobe New File: %w", err)
}
slog.InfoContext(ctx, "ffprobe Done", "path", fullPath)
ffprobeData, err := ffprobe.ProbeURL(ctx, fullPath)
if err != nil {
slog.ErrorContext(ctx, "ffprobe New File", "err", err)
}
slog.InfoContext(ctx, "ffprobe Done", "path", fullPath)
_, err = tx.Exec(ctx, "INSERT INTO files (library_id, path, size, status, health, ffprobe_data) VALUES ($1, $2, $3, $4, $5, $6)", id, fPath, info.Size(), constants.FILE_STATUS_NEW, constants.FILE_HEALTH_UNKNOWN, ffprobeData)
if err != nil {
return fmt.Errorf("Add New File to DB: %w", err)
}
var file_id int
err = db.QueryRow(ctx, `INSERT INTO files (library_id, path, size, status, health, ffprobe_data, mod_time)
VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING id`, id, fPath, info.Size(), constants.FILE_STATUS_NEW, constants.FILE_HEALTH_UNKNOWN, ffprobeData, newModTime).Scan(&file_id)
if err != nil {
slog.ErrorContext(ctx, "Add New File to DB", "err", err)
}
if scan_new_queue_health && health_command_id != nil {
slog.InfoContext(ctx, "Queueing HealthCheck On New File", "file_id", file_id, "health_command_id", health_command_id)
_, err = db.Exec(ctx, "INSERT INTO tasks (file_id, type, status, data, ffmpeg_command_id) VALUES ($1,$2,$3,$4,$5)", file_id, constants.TASK_TYPE_HEALTHCHECK, constants.TASK_STATUS_QUEUED, nil, health_command_id)
if err != nil {
slog.ErrorContext(ctx, "Queueing Transcode On HealthCheck Success", "err", err, "file_id", file_id, "health_command_id", health_command_id)
}
}
})
return nil
} else if err != nil {
return fmt.Errorf("Getting File: %w", err)
}
// Remove Timezone and Round to nearest Second
newModTime := info.ModTime().UTC().Round(time.Second)
// File Already Exists, Check if it has been changed
if newModTime.Equal(oldModTime) && uint(info.Size()) == oldSize {
slog.Debug("File stayed the same", "id", fileID)
_, err = tx.Exec(ctx, "UPDATE files SET status = $2 WHERE id = $1", fileID, constants.FILE_STATUS_EXISTS)
if err != nil {
return fmt.Errorf("Updating Non Changed File in DB: %w", err)
}
} else {
slog.InfoContext(ctx, "File Has Changed", "path", fullPath, "old_mod_time", oldModTime, "new_mod_time", newModTime, "old_size", oldSize, "new_size", info.Size())
scanPool.QueueJob(func(ctx context.Context) {
slog.InfoContext(ctx, "File Has Changed", "path", fullPath, "old_mod_time", oldModTime, "new_mod_time", newModTime, "old_size", oldSize, "new_size", info.Size())
ffprobeData, err := ffprobe.ProbeURL(ctx, fullPath)
if err != nil {
return fmt.Errorf("ffprobe Changed File: %w", err)
}
slog.InfoContext(ctx, "ffprobe Done", "path", fullPath)
ffprobeData, err := ffprobe.ProbeURL(ctx, fullPath)
if err != nil {
slog.ErrorContext(ctx, "ffprobe Changed File", "err", err)
}
slog.InfoContext(ctx, "ffprobe Done", "path", fullPath)
_, err = tx.Exec(ctx, "UPDATE files SET size = $2, status = $3, health = $4, mod_time = $5, ffprobe_data = $6 WHERE id = $1", fileID, info.Size(), constants.FILE_STATUS_CHANGED, constants.FILE_HEALTH_UNKNOWN, newModTime, ffprobeData)
if err != nil {
return fmt.Errorf("Updating Changed File in DB: %w", err)
}
_, err = db.Exec(ctx, "UPDATE files SET size = $2, status = $3, health = $4, mod_time = $5, ffprobe_data = $6 WHERE id = $1", fileID, info.Size(), constants.FILE_STATUS_CHANGED, constants.FILE_HEALTH_UNKNOWN, newModTime, ffprobeData)
if err != nil {
slog.ErrorContext(ctx, "Updating Changed File in DB", "err", err)
}
})
}
return nil
})
if err != nil {
@ -276,10 +300,7 @@ func scanLibrary(ctx context.Context, id int) {
return
}
err = tx.Commit(ctx)
if err != nil {
slog.ErrorContext(ctx, "Error Committing Changes", "err", err)
return
}
scanPool.WaitForEmptyQueue()
slog.InfoContext(ctx, "Scan Done", "id", id)
}

View file

@ -17,6 +17,7 @@ import (
"github.com/jackc/pgx/v5/pgxpool"
"git.lastassault.de/speatzle/morffix/config"
"git.lastassault.de/speatzle/morffix/workerpool"
)
var conf config.Config
@ -102,6 +103,9 @@ func Start(_conf config.Config, tmplFS embed.FS, staticFS embed.FS, migrationsFS
mux.HandleFunc("/libraries", handleLibraries)
mux.HandleFunc("/ffmpeg_commands", handleFfmpegCommands)
mux.HandleFunc("/queue_enable", HandleSetQueueEnable)
mux.HandleFunc("/parallel_tasks", HandleSetParallelTasks)
mux.HandleFunc("/stats", handleStats)
mux.HandleFunc("/stats/{id}", handleStats)
mux.HandleFunc("/", handleIndex)
@ -129,8 +133,11 @@ func Start(_conf config.Config, tmplFS embed.FS, staticFS embed.FS, migrationsFS
stopWorkerManagement := make(chan bool, 1)
go manageWorkers(stopWorkerManagement)
scanPool := workerpool.NewWorkerPool(10, 100000)
hashPool := workerpool.NewWorkerPool(5, 100000)
stopScanning := make(chan bool, 1)
go manageScan(stopScanning)
go manageScan(stopScanning, scanPool, hashPool)
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, os.Interrupt)
@ -146,6 +153,10 @@ func Start(_conf config.Config, tmplFS embed.FS, staticFS embed.FS, migrationsFS
stopWorkerManagement <- true
slog.Info("Stopping Scanning...")
stopScanning <- true
slog.Info("Stopping Scanning Pool...")
scanPool.Stop()
slog.Info("Stopping Hashing Pool...")
hashPool.Stop()
slog.Info("Done")
}
}

369
server/stats.go Normal file
View file

@ -0,0 +1,369 @@
package server
import (
"bytes"
"context"
"fmt"
"html/template"
"log/slog"
"net/http"
"slices"
"strings"
"time"
"git.lastassault.de/speatzle/morffix/constants"
"github.com/go-echarts/go-echarts/v2/charts"
"github.com/go-echarts/go-echarts/v2/opts"
"github.com/jackc/pgx/v5"
)
type StatsDisplay struct {
Charts []ChartData
Libraries []Library
SelectedLibrary string
Size []int
Duration []int
Count []int
Saved []int
}
type ChartData struct {
Element template.HTML
Script template.HTML
}
type PieValue struct {
Name string
Value int
}
type PieIntValue struct {
Id int
Value int
}
const CHART_COLOR_SUCCESS = "#7cffb2"
const CHART_COLOR_FAILED = "#ff6e76"
const CHART_COLOR_UNKNOWN = "#fddd60"
func generatePie(name string, data []opts.PieData) ChartData {
pie := charts.NewPie()
pie.SetGlobalOptions(
charts.WithInitializationOpts(opts.Initialization{
Theme: "dark",
BackgroundColor: "#111",
}),
charts.WithTitleOpts(opts.Title{
Title: name,
}))
for i := range data {
if data[i].Name == "Success" || data[i].Name == "Healthy" {
data[i].ItemStyle = &opts.ItemStyle{
Color: CHART_COLOR_SUCCESS,
}
} else if data[i].Name == "Failed" || data[i].Name == "Damaged" {
data[i].ItemStyle = &opts.ItemStyle{
Color: CHART_COLOR_FAILED,
}
} else if data[i].Name == "Unknown" || data[i].Name == "None" {
data[i].ItemStyle = &opts.ItemStyle{
Color: CHART_COLOR_UNKNOWN,
}
}
}
pie.AddSeries(name, data).SetSeriesOptions(
charts.WithLabelOpts(
opts.Label{
Show: opts.Bool(true),
Formatter: "{b}: {c}",
}),
)
snippet := pie.RenderSnippet()
return ChartData{
Element: template.HTML(snippet.Element),
Script: template.HTML(snippet.Script),
}
}
func generateStats(ctx context.Context, library_id string) ([]ChartData, error) {
data := []ChartData{}
rows, err := db.Query(ctx,
`SELECT COALESCE(jsonb_path_query_first(ffprobe_data, '$.streams[*] ? (@.codec_type == "video") ? (@.disposition.attached_pic == 0).codec_name')::text, 'Unknown') AS name, COUNT(*) AS value
FROM files
WHERE ffprobe_data IS NOT NULL AND ($1 = -1 OR library_id = $1)
GROUP BY 1;`, library_id)
if err != nil {
return nil, fmt.Errorf("Query Codecs: %w", err)
}
codecCounts, err := pgx.CollectRows(rows, pgx.RowToStructByName[PieValue])
if err != nil {
return nil, fmt.Errorf("Collect Codec Data: %w", err)
}
res := []opts.PieData{}
for _, v := range codecCounts {
res = append(res, opts.PieData{
Name: strings.ReplaceAll(v.Name, "\"", ""),
Value: v.Value,
})
}
data = append(data, generatePie("Codecs", res))
rows, err = db.Query(ctx,
`SELECT COALESCE(jsonb_path_query_first(ffprobe_data, '$.streams[*] ? (@.codec_type == "video") ? (@.disposition.attached_pic == 0).width')::text, 'Unknown') AS name, COUNT(*) AS value
FROM files
WHERE ffprobe_data IS NOT NULL AND ($1 = -1 OR library_id = $1)
GROUP BY 1;`, library_id)
if err != nil {
return nil, fmt.Errorf("Query Resolution: %w", err)
}
resolutionCounts, err := pgx.CollectRows(rows, pgx.RowToStructByName[PieValue])
if err != nil {
return nil, fmt.Errorf("Collect Codec Data: %w", err)
}
res = []opts.PieData{}
for _, v := range resolutionCounts {
res = append(res, opts.PieData{
Name: strings.ReplaceAll(v.Name, "\"", ""),
Value: v.Value,
})
}
data = append(data, generatePie("Resolution", res))
rows, err = db.Query(ctx,
`SELECT COALESCE(jsonb_path_query_first(ffprobe_data, '$.format.format_name')::text, 'Unknown') AS name, COUNT(*) AS value
FROM files
WHERE ffprobe_data IS NOT NULL AND ($1 = -1 OR library_id = $1)
GROUP BY 1;`, library_id)
if err != nil {
return nil, fmt.Errorf("Query Container: %w", err)
}
containerCounts, err := pgx.CollectRows(rows, pgx.RowToStructByName[PieValue])
if err != nil {
return nil, fmt.Errorf("Collect Container Data: %w", err)
}
res = []opts.PieData{}
for _, v := range containerCounts {
res = append(res, opts.PieData{
Name: strings.ReplaceAll(v.Name, "\"", ""),
Value: v.Value,
})
}
data = append(data, generatePie("Container", res))
rows, err = db.Query(ctx,
`SELECT health AS id, COUNT(*) AS value
FROM files WHERE ($1 = -1 OR library_id = $1)
GROUP BY 1;`, library_id)
if err != nil {
return nil, fmt.Errorf("Query Health: %w", err)
}
healthCounts, err := pgx.CollectRows(rows, pgx.RowToStructByName[PieIntValue])
if err != nil {
return nil, fmt.Errorf("Collect Health Data: %w", err)
}
res = []opts.PieData{}
for _, v := range healthCounts {
res = append(res, opts.PieData{
Name: constants.FileHealth(v.Id).String(),
Value: v.Value,
})
}
data = append(data, generatePie("Health", res))
rows, err = db.Query(ctx,
`SELECT transcode AS id, COUNT(*) AS value
FROM files WHERE ($1 = -1 OR library_id = $1)
GROUP BY 1;`, library_id)
if err != nil {
return nil, fmt.Errorf("Query Transcode: %w", err)
}
transcodeCounts, err := pgx.CollectRows(rows, pgx.RowToStructByName[PieIntValue])
if err != nil {
return nil, fmt.Errorf("Collect Transcode Data: %w", err)
}
res = []opts.PieData{}
for _, v := range transcodeCounts {
res = append(res, opts.PieData{
Name: constants.FileTranscode(v.Id).String(),
Value: v.Value,
})
}
data = append(data, generatePie("Transcode Status", res))
rows, err = db.Query(ctx,
`SELECT tasks.status AS id, COUNT(*) AS value
FROM tasks INNER JOIN files ON files.id = tasks.file_id
WHERE ($1 = -1 OR files.library_id = $1)
GROUP BY 1;`, library_id)
if err != nil {
return nil, fmt.Errorf("Query Task Status: %w", err)
}
taskStatusCounts, err := pgx.CollectRows(rows, pgx.RowToStructByName[PieIntValue])
if err != nil {
return nil, fmt.Errorf("Collect Task Status Data: %w", err)
}
res = []opts.PieData{}
for _, v := range taskStatusCounts {
res = append(res, opts.PieData{
Name: constants.TaskStatus(v.Id).String(),
Value: v.Value,
})
}
data = append(data, generatePie("Task Status", res))
type BarTaskRowValue struct {
Date time.Time
Status constants.TaskStatus
Count int
}
rows, err = db.Query(ctx,
`SELECT date_trunc('day', tasks.updated_at) date, tasks.status, COUNT(*) AS count
FROM tasks INNER JOIN files ON files.id = tasks.file_id
WHERE ($1 = -1 OR files.library_id = $1) AND tasks.updated_at > CURRENT_DATE - 7 AND (tasks.status = $2 OR tasks.status = $3)
GROUP BY 1,2
ORDER BY date;`, library_id, constants.TASK_STATUS_SUCCESS, constants.TASK_STATUS_FAILED)
if err != nil {
return nil, fmt.Errorf("Query Task Status Day: %w", err)
}
taskStatusDayCounts, err := pgx.CollectRows(rows, pgx.RowToStructByName[BarTaskRowValue])
if err != nil {
return nil, fmt.Errorf("Collect Task Status Day Data: %w", err)
}
days := []string{}
successBarData := []opts.BarData{}
failedBarData := []opts.BarData{}
for _, v := range taskStatusDayCounts {
if !slices.Contains(days, v.Date.Format(time.DateOnly)) {
days = append(days, v.Date.Format(time.DateOnly))
}
if v.Status == constants.TASK_STATUS_SUCCESS {
successBarData = append(successBarData, opts.BarData{
Value: v.Count,
ItemStyle: &opts.ItemStyle{
Color: CHART_COLOR_SUCCESS,
},
})
} else if v.Status == constants.TASK_STATUS_FAILED {
failedBarData = append(failedBarData, opts.BarData{
Value: v.Count,
ItemStyle: &opts.ItemStyle{
Color: CHART_COLOR_FAILED,
},
})
}
}
bar := charts.NewBar()
bar.SetGlobalOptions(
charts.WithInitializationOpts(opts.Initialization{
Theme: "dark",
BackgroundColor: "#111",
}),
charts.WithTitleOpts(opts.Title{
Title: "Task Success/Failed Last 7 Days",
}),
)
bar.SetXAxis(days).
AddSeries("Success", successBarData).
AddSeries("Failed", failedBarData).
SetSeriesOptions(charts.WithBarChartOpts(opts.BarChart{
Stack: "stackA",
}))
snippet := bar.RenderSnippet()
data = append(data, ChartData{
Element: template.HTML(snippet.Element),
Script: template.HTML(snippet.Script),
})
return data, nil
}
func handleStats(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
if id == "" {
id = "-1"
}
data, err := generateStats(r.Context(), id)
if err != nil {
slog.ErrorContext(r.Context(), "Generate Stats:", "err", err)
http.Error(w, "Generate Stats: "+err.Error(), http.StatusInternalServerError)
return
}
rows, err := db.Query(r.Context(),
`SELECT *
FROM libraries;`)
if err != nil {
slog.ErrorContext(r.Context(), "Query Libraries", "err", err)
http.Error(w, "Query Libraries: "+err.Error(), http.StatusInternalServerError)
return
}
libraries, err := pgx.CollectRows(rows, pgx.RowToStructByName[Library])
if err != nil {
slog.ErrorContext(r.Context(), "Collect Libraries", "err", err)
http.Error(w, "Collect Libraries: "+err.Error(), http.StatusInternalServerError)
return
}
var size int
var count int
var duration int
err = db.QueryRow(r.Context(),
`SELECT SUM(size) AS size,
COUNT(id) as count,
0 as duration
FROM files WHERE ($1 = -1 OR files.library_id = $1) AND status = $2`, id, constants.FILE_STATUS_EXISTS).Scan(&size, &count, &duration)
if err != nil {
size = 0
count = 0
duration = 0
}
var saved int
err = db.QueryRow(r.Context(),
`SELECT (SUM(old_size) - COALESCE(SUM(size), 0)) AS saved
FROM files WHERE ($1 = -1 OR files.library_id = $1) AND status = $2 AND old_size IS NOT NULL`, id, constants.FILE_STATUS_EXISTS).Scan(&saved)
if err != nil {
saved = 0
}
buf := bytes.Buffer{}
err = templates.ExecuteTemplate(&buf, constants.STATS_TEMPLATE_NAME, StatsDisplay{
Libraries: libraries,
SelectedLibrary: id,
Size: splitInt(size),
Count: splitInt(count),
Duration: splitInt(duration),
Saved: splitInt(saved),
Charts: data,
})
if err != nil {
slog.ErrorContext(r.Context(), "Executing Stats Template", "err", err)
http.Error(w, "Error Executing Template: "+err.Error(), http.StatusInternalServerError)
return
}
_, err = w.Write(buf.Bytes())
if err != nil {
slog.ErrorContext(r.Context(), "Writing http Response", "err", err)
}
}

View file

@ -107,7 +107,7 @@ func handleTasks(w http.ResponseWriter, r *http.Request) {
return
}
rows, err := db.Query(r.Context(), "SELECT id, name, path, enable, scan_new_queue_health, scan_changed_queue_health, scan_new_queue_transcode, scan_changed_queue_transcode, health_ok_queue_transcode, transcode_replace, transcode_path FROM libraries WHERE enable = $1", true)
rows, err := db.Query(r.Context(), "SELECT * FROM libraries WHERE enable = $1", true)
if err != nil {
slog.ErrorContext(r.Context(), "Query Libraries", "err", err)
http.Error(w, "Error Query Libraries: "+err.Error(), http.StatusInternalServerError)
@ -328,6 +328,7 @@ type QueuedTask struct {
Type constants.TaskType
FileID int `json:"file_id"`
FileMD5 []byte `json:"file_md5" db:"md5"`
FileExtension string `json:"file_extension"`
Data json.RawMessage
FfmpegCommand types.FFmpegCommand `json:"ffmpeg_command" db:"ffmpeg_command"`
}
@ -344,7 +345,7 @@ func assignQueuedTasks(ctx context.Context) error {
assignRunning = false
}()
rows, err := db.Query(ctx, "SELECT t.id as id, t.type as type, t.file_id as file_id, f.md5 as md5, t.data as data, fc.data as ffmpeg_command FROM tasks t INNER JOIN files f ON f.id = t.file_id INNER JOIN ffmpeg_commands fc ON fc.id = t.ffmpeg_command_id WHERE t.status = $1", constants.TASK_STATUS_QUEUED)
rows, err := db.Query(ctx, "SELECT t.id as id, t.type as type, t.file_id as file_id, f.md5 as md5, substring(f.path FROM '\\.([^\\.]*)$') AS file_extension, t.data as data, fc.data as ffmpeg_command FROM tasks t INNER JOIN files f ON f.id = t.file_id INNER JOIN ffmpeg_commands fc ON fc.id = t.ffmpeg_command_id WHERE t.status = $1 AND f.status = $2 ORDER BY type DESC", constants.TASK_STATUS_QUEUED, constants.FILE_STATUS_EXISTS)
if err != nil {
return fmt.Errorf("Query Queued Tasks: %w", err)
}
@ -353,6 +354,8 @@ func assignQueuedTasks(ctx context.Context) error {
return fmt.Errorf("Collect Queued Tasks: %w", err)
}
// TODO, allow overwriting of extension
//slog.Info("Assigning Queued Tasks", "count", len(queuedTasks))
if len(queuedTasks) == 0 {
@ -372,7 +375,8 @@ func assignQueuedTasks(ctx context.Context) error {
}
if Workers[i].Connected {
var queueEnable bool
err := db.QueryRow(ctx, "SELECT queue_enable FROM workers WHERE id = $1", i).Scan(&queueEnable)
var parallelTasks int
err := db.QueryRow(ctx, "SELECT queue_enable, parallel_tasks FROM workers WHERE id = $1", i).Scan(&queueEnable, &parallelTasks)
if err != nil {
return fmt.Errorf("Error Querying Worker Queue Enable: %w", err)
}
@ -390,7 +394,7 @@ func assignQueuedTasks(ctx context.Context) error {
slog.Debug("Assigning Queued Tasks Worker", "worker", i, "count", count)
// Allow for Multiple Tasks at once in the future
if count < 1 {
if count < parallelTasks {
tx, err := db.Begin(ctx)
defer tx.Rollback(ctx)
if err != nil {
@ -406,6 +410,7 @@ func assignQueuedTasks(ctx context.Context) error {
Type: queuedTasks[lastAssigned].Type,
FileID: queuedTasks[lastAssigned].FileID,
FileMD5: queuedTasks[lastAssigned].FileMD5,
FileExtension: queuedTasks[lastAssigned].FileExtension,
Data: queuedTasks[lastAssigned].Data,
FfmpegCommand: queuedTasks[lastAssigned].FfmpegCommand,
}

View file

@ -1,15 +1,20 @@
package server
import (
"context"
"encoding/base64"
"errors"
"fmt"
"io"
"log/slog"
"net/http"
"os"
"path/filepath"
"strconv"
"time"
"git.lastassault.de/speatzle/morffix/constants"
"gopkg.in/vansante/go-ffprobe.v2"
)
func handleUpload(w http.ResponseWriter, r *http.Request) {
@ -18,30 +23,56 @@ func handleUpload(w http.ResponseWriter, r *http.Request) {
return
}
id := r.PathValue("id")
if id == "" {
strid := r.PathValue("id")
if strid == "" {
http.Error(w, "No id", http.StatusBadRequest)
return
}
var count int
err := db.QueryRow(r.Context(), "SELECT COUNT(*) FROM tasks WHERE file_id = $1 AND worker_id = $2", id, r.Header.Get(constants.UUID_HEADER)).Scan(&count)
id, err := strconv.Atoi(strid)
if err != nil {
slog.ErrorContext(r.Context(), "Query Task Count", "err", err)
http.Error(w, "Error Query Task Count: "+err.Error(), http.StatusInternalServerError)
errorUpload(r, w, 0, "Convert File ID to int", err)
return
}
if count < 1 {
slog.ErrorContext(r.Context(), "No Running Task for file", "id", id)
http.Error(w, "No Running Task for file: "+id, http.StatusBadRequest)
if r.Header.Get(constants.WORKER_VERSION_HEADER) != constants.WORKER_VERSION {
http.Error(w, "Wrong Worker Version", http.StatusBadRequest)
return
}
taskid, err := strconv.Atoi(r.Header.Get(constants.TASK_ID_HEADER))
if err != nil {
errorUpload(r, w, 0, "Convert Task ID to int", err)
return
}
var fileid int
var workerid string
var status constants.TaskStatus
err = db.QueryRow(r.Context(), "SELECT file_id, worker_id, status FROM tasks WHERE id = $1", taskid).Scan(&fileid, &workerid, &status)
if err != nil {
errorUpload(r, w, 0, "Getting Task for Upload", err)
return
}
if workerid != r.Header.Get(constants.UUID_HEADER) {
errorUpload(r, w, taskid, "Worker is not Assigned to Task", err)
return
}
if fileid != id {
errorUpload(r, w, taskid, "File is not for this Task", err)
return
}
if !(status == constants.TASK_STATUS_RUNNING || status == constants.TASK_STATUS_UNKNOWN) {
errorUpload(r, w, taskid, "File can only be uploaded when task status is running or unknown", err)
return
}
hash, err := base64.StdEncoding.DecodeString(r.Header.Get(constants.HASH_HEADER))
if err != nil {
slog.ErrorContext(r.Context(), "Decode Hash", "err", err)
http.Error(w, "Decode Hash: "+err.Error(), http.StatusBadRequest)
errorUpload(r, w, taskid, "Decode Hash", err)
return
}
@ -49,8 +80,7 @@ func handleUpload(w http.ResponseWriter, r *http.Request) {
var libraryID int
err = db.QueryRow(r.Context(), "SELECT path, library_id FROM files WHERE id = $1", id).Scan(&fPath, &libraryID)
if err != nil {
slog.ErrorContext(r.Context(), "Query File Path", "err", err)
http.Error(w, "Error Query File Path: "+err.Error(), http.StatusInternalServerError)
errorUpload(r, w, taskid, "Query File Path", err)
return
}
@ -59,18 +89,18 @@ func handleUpload(w http.ResponseWriter, r *http.Request) {
var tReplace bool
err = db.QueryRow(r.Context(), "SELECT path, transcode_path, transcode_replace FROM libraries WHERE id = $1", libraryID).Scan(&lPath, &tPath, &tReplace)
if err != nil {
slog.ErrorContext(r.Context(), "Query Library Path", "err", err)
http.Error(w, "Error Query Library Path: "+err.Error(), http.StatusInternalServerError)
errorUpload(r, w, taskid, "Query Library Path", err)
return
}
// When replacing, send to temp, otherwise write directly
var path string
if tReplace {
//path = filepath.Join(lPath, fPath)
slog.ErrorContext(r.Context(), "Replace mode is not implemented")
http.Error(w, "Replace mode is not implemented", http.StatusNotImplemented)
return
path = filepath.Join(lPath, fPath+constants.TEMP_FILE_EXTENSION)
// if replace then this is a temp file and should be cleaned up on error
defer os.Remove(path)
} else {
// todo write this to a temp file first also to be able to run cleanup on error and unify the rename logic
path = filepath.Join(tPath, fPath)
}
@ -78,15 +108,13 @@ func handleUpload(w http.ResponseWriter, r *http.Request) {
err = r.ParseMultipartForm(100 << 20)
if err != nil {
slog.ErrorContext(r.Context(), "Parse Multipart Form", "err", err)
http.Error(w, "Parse Multipart Form: "+err.Error(), http.StatusInternalServerError)
errorUpload(r, w, taskid, "Parse Multipart Form", err)
return
}
srcFile, _, err := r.FormFile(constants.FORM_FILE_KEY)
if err != nil {
slog.ErrorContext(r.Context(), "Parse Multipart Form", "err", err)
http.Error(w, "Parse Multipart Form: "+err.Error(), http.StatusInternalServerError)
errorUpload(r, w, taskid, "Form File", err)
return
}
@ -94,17 +122,18 @@ func handleUpload(w http.ResponseWriter, r *http.Request) {
// MaxBytesReader closes the underlying io.Reader on its Close() is called
defer src.Close()
err = os.MkdirAll(filepath.Dir(path), 0775)
if err != nil {
slog.ErrorContext(r.Context(), "Creating Folder", "err", err)
http.Error(w, "Creating Folder: "+err.Error(), http.StatusInternalServerError)
return
// if we are not replacing then we dont know if the destination folder exists
if !tReplace {
err = os.MkdirAll(filepath.Dir(path), 0775)
if err != nil {
errorUpload(r, w, taskid, "Creating Folder", err)
return
}
}
out, err := os.Create(path)
if err != nil {
slog.ErrorContext(r.Context(), "Creating File", "err", err)
http.Error(w, "Creating File: "+err.Error(), http.StatusInternalServerError)
errorUpload(r, w, taskid, "Creating File", err)
return
}
defer out.Close()
@ -113,17 +142,80 @@ func handleUpload(w http.ResponseWriter, r *http.Request) {
if err != nil {
var maxBytesError *http.MaxBytesError
if errors.As(err, &maxBytesError) {
slog.ErrorContext(r.Context(), "Parse Multipart Form", "err", err)
http.Error(w, "Parse Multipart Form: "+err.Error(), http.StatusRequestEntityTooLarge)
slog.ErrorContext(r.Context(), "File to Large", "err", err)
http.Error(w, "File to Large: "+err.Error(), http.StatusRequestEntityTooLarge)
return
}
slog.ErrorContext(r.Context(), "Failed to write the uploaded content", "err", err)
http.Error(w, "Failed to write the uploaded content: "+err.Error(), http.StatusInternalServerError)
errorUpload(r, w, taskid, "Failed to write the uploaded content", err)
return
}
// TODO check file integrity
_ = hash
slog.InfoContext(r.Context(), "upload done", "written", written)
out.Close()
// if we are replacing, then we need to change the original file to the uploaded file and update the hash in the database
if tReplace {
tx, err := db.Begin(r.Context())
if err != nil {
errorUpload(r, w, taskid, "Begin Transaction", err)
return
}
dstPath := filepath.Join(lPath, fPath)
slog.InfoContext(r.Context(), "Replacing Original file With Uploaded File", "fileid", id, "path", path, "dstPath", dstPath)
err = os.Rename(path, dstPath)
if err != nil {
errorUpload(r, w, taskid, "Replace File", err)
return
}
info, err := os.Stat(dstPath)
if err != nil {
errorUpload(r, w, taskid, "Stat File", err)
return
}
modTime := info.ModTime().UTC().Round(time.Second)
size := info.Size()
ffprobeData, err := ffprobe.ProbeURL(r.Context(), dstPath)
if err != nil {
errorUpload(r, w, taskid, "ffProbe File", err)
return
}
_, err = tx.Exec(r.Context(), "UPDATE files SET old_md5 = md5, old_size = size, old_ffprobe_data = ffprobe_data WHERE id = $1", fileid)
if err != nil {
errorUpload(r, w, taskid, "Copy Filed Current Values to Old Values", err)
return
}
_, err = tx.Exec(r.Context(), "UPDATE files SET md5 = $2, size = $3, mod_time = $4, ffprobe_data = $5 WHERE id = $1", fileid, hash, size, modTime, ffprobeData)
if err != nil {
errorUpload(r, w, taskid, "Set New File Values", err)
return
}
err = tx.Commit(r.Context())
if err != nil {
errorUpload(r, w, taskid, "Commit File Hash", err)
return
}
slog.InfoContext(r.Context(), "Original file Replaced with Uploaded File", "fileid", id, "dstPath", dstPath)
} else {
// TODO implement "old" fields for non replace libraries, scan also needs to use old field if it is is non replace and the file has been transcoded
}
}
func errorUpload(r *http.Request, w http.ResponseWriter, taskID int, msg string, err error) {
slog.ErrorContext(r.Context(), msg, "err", err)
http.Error(w, msg+": "+err.Error(), http.StatusInternalServerError)
if taskID != 0 {
_, err2 := db.Exec(context.TODO(), "UPDATE tasks SET log = log || $2 WHERE id = $1", taskID, []string{fmt.Sprintf("%v MASTER: upload error: "+msg+": "+err.Error(), time.Now())})
if err != nil {
slog.ErrorContext(r.Context(), "Updating task log with upload error", "err", err2)
}
}
}

View file

@ -247,7 +247,8 @@ func updateWorkerTaskStatus(ctx context.Context) {
var health constants.FileHealth
if ts.Task.Status == constants.TASK_STATUS_SUCCESS {
health = constants.FILE_HEALTH_HEALTHY
// TODO Auto Queue Transcode for Successfull Transcodes if library setting
// Auto Queue Transcode for Successfull Transcodes if library setting say so
go queueOnHealth(context.TODO(), ts.Task.ID)
} else {
// TODO, not all failures mean the file is damaged, only update on success and track ffmpeg errors in task result data. also remove -xerror and scan for errors manually to see all problems in logs
health = constants.FILE_HEALTH_DAMAGED
@ -294,3 +295,26 @@ func updateWorkerTaskStatus(ctx context.Context) {
wg.Wait()
}
func queueOnHealth(ctx context.Context, taskID int) {
var transcode_command_id *int
var file_id *int
err := db.QueryRow(ctx, `SELECT libraries.transcode_command_id, files.id
FROM tasks
INNER JOIN files ON files.id = tasks.file_id
INNER JOIN libraries ON files.library_id = libraries.id
WHERE tasks.id = $1 AND libraries.health_ok_queue_transcode = true AND libraries.transcode_command_id IS NOT NULL`, taskID).
Scan(&transcode_command_id, &file_id)
if err == pgx.ErrNoRows {
slog.DebugContext(ctx, "No need to queue Transcode", "task_id", taskID)
} else if err != nil {
slog.ErrorContext(ctx, "Unable to Query if and how we should queue a Transcode on Health Check Success", "err", err)
return
}
slog.InfoContext(ctx, "Queueing Transcode On HealthCheck Success", "health_task_id", taskID, "transcode_command_id", transcode_command_id)
_, err = db.Exec(ctx, "INSERT INTO tasks (file_id, type, status, data, ffmpeg_command_id) VALUES ($1,$2,$3,$4,$5)", file_id, constants.TASK_TYPE_TRANSCODE, constants.TASK_STATUS_QUEUED, nil, transcode_command_id)
if err != nil {
slog.ErrorContext(ctx, "Queueing Transcode On HealthCheck Success", "err", err, "health_task_id", taskID, "transcode_command_id", transcode_command_id)
}
}

45
static/js/echarts.min.js vendored Normal file

File diff suppressed because one or more lines are too long

View file

@ -0,0 +1,7 @@
function setLibrary(library_id) {
if (library_id == "-1") {
window.location.href = "/stats";
} else {
window.location.href = "/stats/" + library_id;
}
}

View file

@ -3,58 +3,65 @@
@import url("component.css");
:root {
font-family: monospace;
font-size: 12px;
overflow-y: auto;
font-family: monospace;
font-size: 12px;
overflow-y: auto;
}
body {
padding: 0.5rem;
gap: 0.5rem;
padding: 0.5rem;
gap: 0.5rem;
}
.counter-image {
image-rendering: pixelated;
image-rendering: -moz-crisp-edges;
image-rendering: crisp-edges;
flex-grow: 1;
image-rendering: pixelated;
image-rendering: -moz-crisp-edges;
image-rendering: crisp-edges;
flex-grow: 1;
max-height: 50vh;
max-width: 22.5vh;
}
.counter {
flex-flow: row nowrap;
align-items: end;
font-size: 1.5rem;
font-weight: bold;
flex-flow: row nowrap;
align-items: end;
font-size: 1.5rem;
font-weight: bold;
}
.workers {
flex-flow: row wrap;
gap: 2rem;
flex-flow: row wrap;
gap: 2rem;
}
.workers>* {
gap: 0.5rem;
border: 1px solid var(--fg);
.workers > * {
gap: 0.5rem;
border: 1px solid var(--fg);
}
.log>p {
padding: 0.25rem 0;
user-select: text;
.log > p {
padding: 0.25rem 0;
user-select: text;
}
.log>p:hover {
background: var(--cl-hl);
.log > p:hover {
background: var(--cl-hl);
}
nav> :first-child {
font-size: 2rem;
font-weight: bold;
nav > :first-child {
font-size: 2rem;
font-weight: bold;
}
.short-button {
align-self: start;
align-self: start;
}
.button-list {
flex-direction: row;
flex-direction: row;
}
.stats {
flex-flow: row wrap;
gap: 2rem;
}

View file

@ -17,7 +17,7 @@ func RunHealthCheck(conf config.Config, t *types.Task, data types.HealthCheckDat
l := log.GetTaskLogger(t)
// TODO Figure out how to get correct file ending
path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-health-%v-%v.mkv", t.ID, t.FileID))
path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-health-%v-%v."+t.FileExtension, t.ID, t.FileID))
// Set ffmpeg input path
if len(t.FfmpegCommand.InputFiles) == 0 {

View file

@ -27,6 +27,7 @@ func StartTask(conf config.Config, data types.TaskStart) error {
Type: data.Type,
FileID: data.FileID,
FileMD5: data.FileMD5,
FileExtension: data.FileExtension,
FfmpegCommand: data.FfmpegCommand,
}

View file

@ -17,8 +17,8 @@ func RunTranscode(conf config.Config, t *types.Task, data types.TranscodeData) {
l := log.GetTaskLogger(t)
// TODO Figure out how to get correct file ending
src_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-src-%v-%v.mkv", t.ID, t.FileID))
dst_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-dst-%v-%v.mkv", t.ID, t.FileID))
src_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-src-%v-%v."+t.FileExtension, t.ID, t.FileID))
dst_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("morffix-dst-%v-%v."+t.FileExtension, t.ID, t.FileID))
// Set ffmpeg input path
if len(t.FfmpegCommand.InputFiles) == 0 {

View file

@ -9,6 +9,7 @@ import (
"mime/multipart"
"net/http"
"os"
"strconv"
"time"
"git.lastassault.de/speatzle/morffix/config"
@ -82,6 +83,7 @@ func uploadFile(ctx context.Context, l *slog.Logger, conf config.Config, path st
req.Header.Add(constants.UUID_HEADER, uuid.String())
req.Header.Add(constants.WORKER_VERSION_HEADER, constants.WORKER_VERSION)
req.Header.Add(constants.HASH_HEADER, base64.StdEncoding.EncodeToString(hash))
req.Header.Add(constants.TASK_ID_HEADER, strconv.Itoa(t.ID))
req.Header.Add("Content-Type", "multipart/form-data; boundary=\""+m.Boundary()+"\"")
var client = &http.Client{
@ -97,7 +99,8 @@ func uploadFile(ctx context.Context, l *slog.Logger, conf config.Config, path st
l.InfoContext(ctx, "Upload Done")
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("Got HTTP Status Code: %v", resp.StatusCode)
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("Got HTTP Status Code: %v Body: %v", resp.StatusCode, string(body))
}
return nil

View file

@ -3,6 +3,7 @@
<html>
<head>
<link rel="stylesheet" href="/static/style/style.css">
<script src="/static/js/echarts.min.js"></script>
</head>
<body>
{{template "navbar"}}

View file

@ -31,6 +31,24 @@
</select>
<input type="submit" value="Submit">
</form>
<h2>Set Parallel Tasks</h2>
<form method="POST" action= "/parallel_tasks">
<label for="worker">Worker:</label>
<select id="worker" name="worker">
<option value="all">All</option>
{{range $w := .Workers}}
<option value="{{$w.ID}}">{{$w.Name}}</option>
{{end}}
</select>
<label for="parallel_tasks">Parallel Tasks</label>
<select id="parallel_tasks" name="parallel_tasks">
<option value="1">1</option>
<option value="2">2</option>
<option value="3">3</option>
<option value="4">4</option>
</select>
<input type="submit" value="Submit">
</form>
<h2>Workers</h2>
<div class="workers">
{{range $w := .Workers}}
@ -60,6 +78,22 @@
<td>
{{ $w.ConnectionChanged }}
</td>
</tr>
<tr>
<td>
QueueEnable
</td>
<td>
{{ $w.QueueEnable }}
</td>
</tr>
<tr>
<td>
ParallelTasks
</td>
<td>
{{ $w.ParallelTasks }}
</td>
</tr>
{{if $w.Connected}}
<tr>

View file

@ -13,6 +13,8 @@
<th>ScanNewQueueTranscode</th>
<th>ScanChangedQueueTranscode</th>
<th>HealthOkQueueTranscode</th>
<th>HealthCommand</th>
<th>TranscodeCommand</th>
</tr>
{{range $l := .Libraries}}
<tr onclick="window.location='/libraries/{{ $l.ID }}';">
@ -49,6 +51,12 @@
<td>
{{ $l.HealthOkQueueTranscode }}
</td>
<td>
{{ $l.HealthCommandID }}
</td>
<td>
{{ $l.TranscodeCommandID }}
</td>
</tr>
{{end}}
</table>
@ -67,14 +75,28 @@
<input type="text" name="transcode_path">
<label>Queue Heathcheck for new files on Scan:</label>
<input type="checkbox" name="scan_new_queue_health">
<label>Queue Heathcheck for changed files on Scan:</label>
<label>Queue Heathcheck for changed files on Scan (Not Implemented):</label>
<input type="checkbox" name="scan_changed_queue_health">
<label>Queue Transcode for new files on Scan:</label>
<label>Queue Transcode for new files on Scan (Not Implemented):</label>
<input type="checkbox" name="scan_new_queue_transcode">
<label>Queue Transcode for changed files on Scan:</label>
<label>Queue Transcode for changed files on Scan (Not Implemented):</label>
<input type="checkbox" name="scan_changed_queue_transcode">
<label>Queue Transcode on Healthcheck Success:</label>
<input type="checkbox" name="health_ok_queue_transcode">
<label for="health_command_id">Health Command:</label>
<select id="health_command_id" name="health_command_id">
<option value="">None</option>
{{range $l := .FfmpegCommands}}
<option value="{{$l.ID}}">{{$l.Name}}</option>
{{end}}
</select>
<label for="transcode_command_id">Transcode Command:</label>
<select id="transcode_command_id" name="transcode_command_id">
<option value="">None</option>
{{range $l := .FfmpegCommands}}
<option value="{{$l.ID}}">{{$l.Name}}</option>
{{end}}
</select>
<input type="submit" value="Submit">
</form>
{{template "tail"}}

View file

@ -4,5 +4,6 @@
<a class="button" href="/libraries">Libraries</a>
<a class="button" href="/tasks">Tasks</a>
<a class="button" href="/ffmpeg_commands">FFmpeg Commands</a>
<a class="button" href="/stats">Stats</a>
</nav>
{{end}}

69
tmpl/stats.tmpl Normal file
View file

@ -0,0 +1,69 @@
{{template "head"}}
<script src="/static/js/library_filter.js"></script>
<h2>Stats</h2>
<label for="library">Library:</label>
<select id="library" name="library" onchange="setLibrary(this.value)" class="short-button">
<option {{if eq .SelectedLibrary "-1" }} selected {{end}}value="-1">ALL</option>
{{range $l := $.Libraries}}
<option {{if eq $.SelectedLibrary $l.ID }} selected {{end}}value="{{$l.ID}}">{{$l.Name}}</option>
{{end}}
</select>
<h4>Size</h4>
<div class="counter">
{{ $l := len .Size }}
{{range $i, $c := .Size}}
<img class="counter-image" alt="{{$c}}" src="/static/counter/{{$c}}.gif">
{{ $n := sub $l $i }}
{{ if eq $n 4 }}
KB
{{ else if eq $n 7 }}
MB
{{ else if eq $n 10 }}
GB
{{ else if eq $n 13 }}
TB
{{end}}
{{end}}
</div>
<h4>Files</h4>
<div class="counter">
{{ $l := len .Count }}
{{range $i, $c := .Count}}
<img class="counter-image" alt="{{$c}}" src="/static/counter/{{$c}}.gif">
{{ $n := sub $l $i }}
{{ if eq $n 4 }}
.
{{ else if eq $n 7 }}
.
{{ else if eq $n 10 }}
.
{{ else if eq $n 13 }}
.
{{end}}
{{end}}
</div>
<h4>Saved</h4>
<div class="counter">
{{ $l := len .Saved }}
{{range $i, $c := .Saved}}
<img class="counter-image" alt="{{$c}}" src="/static/counter/{{$c}}.gif">
{{ $n := sub $l $i }}
{{ if eq $n 4 }}
KB
{{ else if eq $n 7 }}
MB
{{ else if eq $n 10 }}
GB
{{ else if eq $n 13 }}
TB
{{end}}
{{end}}
</div>
<div class="stats">
{{range $c := .Charts}}
{{$c.Element}} {{$c.Script}}
{{end}}
</div>
{{template "tail"}}

View file

@ -10,6 +10,7 @@ type TaskStart struct {
ID int `json:"id"`
FileID int `json:"file_id"`
FileMD5 []byte `json:"file_md5"`
FileExtension string `json:"file_extension"`
Type constants.TaskType `json:"type"`
Data json.RawMessage
FfmpegCommand FFmpegCommand `json:"ffmpeg_command"`
@ -19,6 +20,7 @@ type Task struct {
ID int `json:"id"`
FileID int `json:"file_id"`
FileMD5 []byte `json:"md5"`
FileExtension string `json:"file_extension"`
Type constants.TaskType `json:"type"`
Status constants.TaskStatus `json:"status"`
FfmpegCommand FFmpegCommand `json:"ffmpeg_command"`

View file

@ -2,6 +2,7 @@ package worker
import (
"context"
"time"
"github.com/mackerelio/go-osstat/cpu"
"github.com/mackerelio/go-osstat/memory"
@ -14,13 +15,31 @@ func init() {
rpcServer.RegisterMethod("status", status)
}
var cpuBefore *cpu.Stats
var cpuUsage uint64
var cpuCount uint64
func calcUsage() {
for {
cStats, _ := cpu.Get()
if cStats != nil {
cpuUsage = cStats.Total
cpuCount = uint64(cStats.CPUCount)
if cpuBefore != nil {
total := float64(cStats.Total - cpuBefore.Total)
cpuUsage = uint64(float64((cStats.User-cpuBefore.User)+(cStats.System-cpuBefore.System)) / total * 100)
}
cpuBefore = cStats
}
time.Sleep(time.Second)
}
}
func status(ctx context.Context, req rpc.Request) (any, error) {
s := types.WorkerStatus{}
cStats, _ := cpu.Get()
if cStats != nil {
s.CPUUsage = cStats.Total
s.CPUCount = uint64(cStats.CPUCount)
}
s.CPUUsage = cpuUsage
s.CPUCount = cpuCount
mStats, _ := memory.Get()
if mStats != nil {
s.MemoryUsage = uint64(float64(mStats.Used) / float64(mStats.Total) * 100)

View file

@ -60,6 +60,9 @@ func Start(_conf config.Config) {
exit = true
cancel()
}()
go calcUsage()
for {
if exit {
slog.InfoContext(ctx, "Done")

67
workerpool/workerpool.go Normal file
View file

@ -0,0 +1,67 @@
package workerpool
import (
"context"
"sync"
)
type WorkerPool struct {
workers int
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
queue chan func(context.Context)
qg sync.WaitGroup
}
func NewWorkerPool(workers int, queueSize int) *WorkerPool {
ctx, cancel := context.WithCancel(context.Background())
queue := make(chan func(context.Context), queueSize)
workerPool := WorkerPool{
workers: workers,
ctx: ctx,
cancel: cancel,
queue: queue,
}
for i := 0; i < workerPool.workers; i++ {
workerPool.wg.Add(1)
go workerPool.work(workerPool.ctx)
}
return &workerPool
}
func (wp *WorkerPool) Stop() {
wp.cancel()
wp.wg.Wait()
}
func (wp *WorkerPool) QueueJob(job func(context.Context)) {
wp.qg.Add(1)
wp.queue <- job
}
func (wp *WorkerPool) WaitForEmptyQueue() {
wp.qg.Wait()
}
func (wp *WorkerPool) QueueLength() int {
return len(wp.queue)
}
func (wp *WorkerPool) work(ctx context.Context) {
for {
select {
case <-ctx.Done():
wp.wg.Done()
return
case job := <-wp.queue:
func() {
defer wp.qg.Done()
job(ctx)
}()
}
}
}