diff --git a/constants/constants.go b/constants/constants.go index 1d34cf4..3d9911a 100644 --- a/constants/constants.go +++ b/constants/constants.go @@ -16,6 +16,7 @@ const LIBRARY_TEMPLATE_NAME = "library.tmpl" const MESSAGE_TEMPLATE_NAME = "message.tmpl" const TASKS_TEMPLATE_NAME = "tasks.tmpl" const TASK_TEMPLATE_NAME = "task.tmpl" +const FFMPEG_COMMANDS_TEMPLATE_NAME = "ffmpeg_commands.tmpl" const FORM_FILE_KEY = "file" diff --git a/migrations/000008_alter_files_table_updated_at.down.sql b/migrations/000008_alter_files_table_updated_at.down.sql index 60f8fb6..9471db8 100644 --- a/migrations/000008_alter_files_table_updated_at.down.sql +++ b/migrations/000008_alter_files_table_updated_at.down.sql @@ -1,2 +1,2 @@ ALTER TABLE files -DROP updated_at; \ No newline at end of file +DROP IF EXISTS updated_at; \ No newline at end of file diff --git a/migrations/000012_create_ffmpeg_commands_table.down.sql b/migrations/000012_create_ffmpeg_commands_table.down.sql new file mode 100644 index 0000000..d5bf8c1 --- /dev/null +++ b/migrations/000012_create_ffmpeg_commands_table.down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS ffmpeg_commands; \ No newline at end of file diff --git a/migrations/000012_create_ffmpeg_commands_table.up.sql b/migrations/000012_create_ffmpeg_commands_table.up.sql new file mode 100644 index 0000000..8c111ef --- /dev/null +++ b/migrations/000012_create_ffmpeg_commands_table.up.sql @@ -0,0 +1,6 @@ +CREATE TABLE IF NOT EXISTS ffmpeg_commands( + id bigserial PRIMARY KEY, + name VARCHAR (50) NOT NULL, + data JSONB +); + diff --git a/migrations/000013_insert_healthcheck.down.sql b/migrations/000013_insert_healthcheck.down.sql new file mode 100644 index 0000000..b1a081e --- /dev/null +++ b/migrations/000013_insert_healthcheck.down.sql @@ -0,0 +1 @@ +DELETE FROM ffmpeg_commands where id = 1; \ No newline at end of file diff --git a/migrations/000013_insert_healthcheck.up.sql b/migrations/000013_insert_healthcheck.up.sql new file mode 100644 index 0000000..5fa0bb5 --- /dev/null +++ b/migrations/000013_insert_healthcheck.up.sql @@ -0,0 +1 @@ +INSERT INTO ffmpeg_commands (name, data) VALUES ('healthcheck_command','{"args": [{"flag": "-stats", "value": ""}, {"flag": "-v", "value": "error"}, {"flag": "-xerror", "value": ""}], "input_files": [{"args": null, "path": "input.mkv"}], "output_files": [{"args": [{"flag": "-f", "value": "null"}, {"flag": "-max_muxing_queue_size", "value": "9999"}], "path": "output.mkv"}]}'); \ No newline at end of file diff --git a/migrations/000014_alter_task_table_ffmpeg_command.down.sql b/migrations/000014_alter_task_table_ffmpeg_command.down.sql new file mode 100644 index 0000000..15e1eef --- /dev/null +++ b/migrations/000014_alter_task_table_ffmpeg_command.down.sql @@ -0,0 +1,2 @@ +ALTER TABLE tasks +DROP IF EXISTS ffmpeg_command; \ No newline at end of file diff --git a/migrations/000014_alter_task_table_ffmpeg_command.up.sql b/migrations/000014_alter_task_table_ffmpeg_command.up.sql new file mode 100644 index 0000000..5d159be --- /dev/null +++ b/migrations/000014_alter_task_table_ffmpeg_command.up.sql @@ -0,0 +1,2 @@ +ALTER TABLE tasks +ADD ffmpeg_command_id bigint REFERENCES ffmpeg_commands(id) NOT NULL DEFAULT 1; \ No newline at end of file diff --git a/server/ffmpeg_command.go b/server/ffmpeg_command.go new file mode 100644 index 0000000..7afc31c --- /dev/null +++ b/server/ffmpeg_command.go @@ -0,0 +1,79 @@ +package server + +import ( + "bytes" + "fmt" + "log/slog" + "net/http" + + "git.lastassault.de/speatzle/morffix/constants" + "github.com/jackc/pgx/v5" +) + +type FfmpegCommandsData struct { + FfmpegCommands []FfmpegCommand +} + +type FfmpegCommand struct { + ID int `db:"id"` + Name string `db:"name"` + Data string `db:"data"` +} + +func handleFfmpegCommands(w http.ResponseWriter, r *http.Request) { + data := FfmpegCommandsData{} + + if r.Method == "POST" { + err := createFfmpegCommand(r) + if err != nil { + slog.ErrorContext(r.Context(), "Create FfmpegCommand", "err", err) + http.Error(w, "Error Create FfmpegCommand: "+err.Error(), http.StatusInternalServerError) + return + } + } + + rows, err := db.Query(r.Context(), "SELECT id, name, data FROM ffmpeg_commands") + if err != nil { + slog.ErrorContext(r.Context(), "Query ffmpegCommands", "err", err) + http.Error(w, "Error Query ffmpegCommands: "+err.Error(), http.StatusInternalServerError) + return + } + ffmpegCommands, err := pgx.CollectRows[FfmpegCommand](rows, pgx.RowToStructByName[FfmpegCommand]) + if err != nil { + slog.ErrorContext(r.Context(), "Collect Rows", "err", err) + http.Error(w, "Error Query ffmpegCommands: "+err.Error(), http.StatusInternalServerError) + return + } + data.FfmpegCommands = ffmpegCommands + + buf := bytes.Buffer{} + err = templates.ExecuteTemplate(&buf, constants.FFMPEG_COMMANDS_TEMPLATE_NAME, data) + if err != nil { + slog.ErrorContext(r.Context(), "Executing ffmpegCommands Template", "err", err) + http.Error(w, "Error Executing Template: "+err.Error(), http.StatusInternalServerError) + return + } + + _, err = w.Write(buf.Bytes()) + if err != nil { + slog.ErrorContext(r.Context(), "Writing http Response", "err", err) + } +} + +func createFfmpegCommand(r *http.Request) error { + err := r.ParseForm() + if err != nil { + return fmt.Errorf("Parseing Form: %w", err) + } + name := r.FormValue("name") + data := r.FormValue("data") + + slog.Info("Got FfmpegCommand Create", "name", name, "data", data) + + _, err = db.Exec(r.Context(), "INSERT INTO ffmpegCommands (name, data) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10)", name, data) + if err != nil { + return fmt.Errorf("Inserting FfmpegCommand: %w", err) + } + + return nil +} diff --git a/server/server.go b/server/server.go index c9af803..cb79411 100644 --- a/server/server.go +++ b/server/server.go @@ -100,6 +100,8 @@ func Start(_conf config.Config, tmplFS embed.FS, staticFS embed.FS, migrationsFS mux.HandleFunc("/scan/{id}", handleScan) mux.HandleFunc("/libraries/{id}", handleLibrary) mux.HandleFunc("/libraries", handleLibraries) + mux.HandleFunc("/ffmpeg_commands", handleFfmpegCommands) + mux.HandleFunc("/", handleIndex) server := &http.Server{ diff --git a/server/task.go b/server/task.go index 6fc255c..65c670f 100644 --- a/server/task.go +++ b/server/task.go @@ -16,9 +16,10 @@ import ( ) type TasksData struct { - Libraries []Library - Tasks []TaskDisplay - Stats TaskStats + Libraries []Library + FfmpegCommands []FfmpegCommand + Tasks []TaskDisplay + Stats TaskStats } type TaskStats struct { @@ -34,23 +35,25 @@ type TaskStats struct { } type TaskDisplay struct { - ID int `db:"id"` - Library int `db:"library"` - Worker *string `db:"worker"` - Type int `db:"type"` - Status string `db:"status"` - File string `db:"file"` - UpdatedAt string `db:"updated_at"` + ID int `db:"id"` + Library int `db:"library"` + Worker *string `db:"worker"` + Type int `db:"type"` + FfmpegCommand *string `db:"ffmpeg_command"` + Status string `db:"status"` + File string `db:"file"` + UpdatedAt string `db:"updated_at"` } type TaskDB struct { - ID int `db:"id"` - Library int `db:"library"` - Worker *string `db:"worker"` - Type int `db:"type"` - Status constants.TaskStatus `db:"status"` - File string `db:"file"` - UpdatedAt time.Time `db:"updated_at"` + ID int `db:"id"` + Library int `db:"library"` + Worker *string `db:"worker"` + Type int `db:"type"` + FfmpegCommand *string `db:"ffmpeg_command"` + Status constants.TaskStatus `db:"status"` + File string `db:"file"` + UpdatedAt time.Time `db:"updated_at"` } func handleTasks(w http.ResponseWriter, r *http.Request) { @@ -92,12 +95,26 @@ func handleTasks(w http.ResponseWriter, r *http.Request) { libraries, err := pgx.CollectRows[Library](rows, pgx.RowToStructByName[Library]) if err != nil { slog.ErrorContext(r.Context(), "Collect Rows", "err", err) - http.Error(w, "Error Query Libraries: "+err.Error(), http.StatusInternalServerError) + http.Error(w, "Error Collect Libraries: "+err.Error(), http.StatusInternalServerError) return } data.Libraries = libraries - rows, err = db.Query(r.Context(), "SELECT t.id AS id, l.id AS library, t.worker_id AS worker, t.type AS type, t.status AS status, f.path AS file, t.updated_at AS updated_at FROM tasks t INNER JOIN files f ON f.id = t.file_id INNER JOIN libraries l ON l.id = f.library_id ORDER BY CASE t.type WHEN 3 THEN -1 ELSE t.type END, t.id") + rows, err = db.Query(r.Context(), "SELECT id, name, data FROM ffmpeg_commands") + if err != nil { + slog.ErrorContext(r.Context(), "Query Ffmpeg Commands", "err", err) + http.Error(w, "Error Ffmpeg Commands: "+err.Error(), http.StatusInternalServerError) + return + } + ffmpegCommands, err := pgx.CollectRows[FfmpegCommand](rows, pgx.RowToStructByName[FfmpegCommand]) + if err != nil { + slog.ErrorContext(r.Context(), "Collect Ffmpeg Commands", "err", err) + http.Error(w, "Error Collect Ffmpeg Commands: "+err.Error(), http.StatusInternalServerError) + return + } + data.FfmpegCommands = ffmpegCommands + + rows, err = db.Query(r.Context(), "SELECT t.id AS id, l.id AS library, t.worker_id AS worker, t.type AS type, fc.name AS ffmpeg_command, t.status AS status, f.path AS file, t.updated_at AS updated_at FROM tasks t INNER JOIN files f ON f.id = t.file_id INNER JOIN libraries l ON l.id = f.library_id INNER JOIN ffmpeg_commands fc ON fc.id = t.ffmpeg_command_id ORDER BY CASE t.type WHEN 3 THEN -1 ELSE t.type END, t.id") if err != nil { slog.ErrorContext(r.Context(), "Query Tasks", "err", err) http.Error(w, "Error Query Tasks: "+err.Error(), http.StatusInternalServerError) @@ -111,13 +128,14 @@ func handleTasks(w http.ResponseWriter, r *http.Request) { } for i := range tasks { data.Tasks = append(data.Tasks, TaskDisplay{ - ID: tasks[i].ID, - Library: tasks[i].Library, - Worker: tasks[i].Worker, - Type: tasks[i].Type, - File: tasks[i].File, - Status: tasks[i].Status.String(), - UpdatedAt: tasks[i].UpdatedAt.Format(time.DateTime), + ID: tasks[i].ID, + Library: tasks[i].Library, + Worker: tasks[i].Worker, + Type: tasks[i].Type, + FfmpegCommand: tasks[i].FfmpegCommand, + File: tasks[i].File, + Status: tasks[i].Status.String(), + UpdatedAt: tasks[i].UpdatedAt.Format(time.DateTime), }) } @@ -198,23 +216,25 @@ func createTask(ctx context.Context, r *http.Request) error { defer tx.Rollback(ctx) var data any - if typ == constants.TASK_TYPE_HEALTHCHECK { + /* + if typ == constants.TASK_TYPE_HEALTHCHECK { - // ffmpeg.exe -stats -v error -i "in.mp4" -f null -max_muxing_queue_size 9999 "out.mp4" - data = types.HealthCheckData{Command: types.FFmpegCommand{ - Args: []types.Arg{{Flag: "-stats"}, {Flag: "-v", Value: "error"}, {Flag: "-xerror"}}, - InputFiles: []types.File{{Path: "input.mkv"}}, - OutputFiles: []types.File{{Path: "output.mkv", Arguments: []types.Arg{{Flag: "-f", Value: "null"}, {Flag: "-max_muxing_queue_size", Value: "9999"}}}}, - }} - } else if typ == constants.TASK_TYPE_TRANSCODE { - data = types.TranscodeData{Command: types.FFmpegCommand{ - Args: []types.Arg{{Flag: "-stats"}, {Flag: "-v", Value: "error"}, {Flag: "-xerror"}}, - InputFiles: []types.File{{Path: "input.mkv"}}, - OutputFiles: []types.File{{Path: "output.mkv", Arguments: []types.Arg{{Flag: "-map", Value: "0"}, {Flag: "-c", Value: "copy"}, {Flag: "-c:a", Value: "aac"}, {Flag: "-c:v", Value: "libsvtav1"}, {Flag: "-crf", Value: "35"}, {Flag: "-preset", Value: "8"}, {Flag: "-max_muxing_queue_size", Value: "9999"}}}}, - }} - } else { - return fmt.Errorf("Unkown Task Type: %v", typ) - } + // ffmpeg.exe -stats -v error -i "in.mp4" -f null -max_muxing_queue_size 9999 "out.mp4" + data = types.HealthCheckData{Command: types.FFmpegCommand{ + Args: []types.Arg{{Flag: "-stats"}, {Flag: "-v", Value: "error"}, {Flag: "-xerror"}}, + InputFiles: []types.File{{Path: "input.mkv"}}, + OutputFiles: []types.File{{Path: "output.mkv", Arguments: []types.Arg{{Flag: "-f", Value: "null"}, {Flag: "-max_muxing_queue_size", Value: "9999"}}}}, + }} + } else if typ == constants.TASK_TYPE_TRANSCODE { + data = types.TranscodeData{Command: types.FFmpegCommand{ + Args: []types.Arg{{Flag: "-stats"}, {Flag: "-v", Value: "error"}, {Flag: "-xerror"}}, + InputFiles: []types.File{{Path: "input.mkv"}}, + OutputFiles: []types.File{{Path: "output.mkv", Arguments: []types.Arg{{Flag: "-map", Value: "0"}, {Flag: "-c", Value: "copy"}, {Flag: "-c:a", Value: "aac"}, {Flag: "-c:v", Value: "libsvtav1"}, {Flag: "-crf", Value: "35"}, {Flag: "-preset", Value: "8"}, {Flag: "-max_muxing_queue_size", Value: "9999"}}}}, + }} + } else { + return fmt.Errorf("Unkown Task Type: %v", typ) + } + */ for _, file := range files { _, err = db.Exec(r.Context(), "INSERT INTO tasks (file_id, type, status, data) VALUES ($1,$2,$3,$4)", file.ID, typ, constants.TASK_STATUS_QUEUED, data) @@ -232,15 +252,16 @@ func createTask(ctx context.Context, r *http.Request) error { } type QueuedTask struct { - ID int - Type int - FileID int `json:"file_id"` - FileMD5 []byte `json:"file_md5" db:"md5"` - Data json.RawMessage + ID int + Type int + FileID int `json:"file_id"` + FileMD5 []byte `json:"file_md5" db:"md5"` + Data json.RawMessage + FfmpegCommand types.FFmpegCommand `json:"ffmpeg_command" db:"ffmpeg_command"` } func assignQueuedTasks(ctx context.Context) error { - rows, err := db.Query(ctx, "SELECT t.id as id, t.type as type, t.file_id as file_id, f.md5 as md5, t.data as data FROM tasks t INNER JOIN files f ON f.id = t.file_id WHERE t.status = $1", constants.TASK_STATUS_QUEUED) + rows, err := db.Query(ctx, "SELECT t.id as id, t.type as type, t.file_id as file_id, f.md5 as md5, t.data as data, fc.data as ffmpeg_command FROM tasks t INNER JOIN files f ON f.id = t.file_id INNER JOIN ffmpeg_commands fc ON fc.id = t.ffmpeg_command_id WHERE t.status = $1", constants.TASK_STATUS_QUEUED) if err != nil { return fmt.Errorf("Query Queued Tasks: %w", err) } @@ -288,11 +309,12 @@ func assignQueuedTasks(ctx context.Context) error { } taskStart := types.TaskStart{ - ID: queuedTasks[lastAssigned].ID, - Type: queuedTasks[lastAssigned].Type, - FileID: queuedTasks[lastAssigned].FileID, - FileMD5: queuedTasks[lastAssigned].FileMD5, - Data: queuedTasks[lastAssigned].Data, + ID: queuedTasks[lastAssigned].ID, + Type: queuedTasks[lastAssigned].Type, + FileID: queuedTasks[lastAssigned].FileID, + FileMD5: queuedTasks[lastAssigned].FileMD5, + Data: queuedTasks[lastAssigned].Data, + FfmpegCommand: queuedTasks[lastAssigned].FfmpegCommand, } _, err = rpcServer.Call(ctx, Workers[i].Conn, "task-start", taskStart, nil) diff --git a/task/healthcheck.go b/task/healthcheck.go index f4b8687..a8ab6a2 100644 --- a/task/healthcheck.go +++ b/task/healthcheck.go @@ -20,12 +20,12 @@ func RunHealthCheck(conf config.Config, t *types.Task, data types.HealthCheckDat path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("%v-%v.mkv", t.ID, t.FileID)) // Set ffmpeg input path - if len(data.Command.InputFiles) == 0 { - l.ErrorContext(ctx, "FFmpeg Command has no input files") + if len(t.FfmpegCommand.InputFiles) == 0 { + l.ErrorContext(ctx, "FFmpeg Command has no input files", "command", t.FfmpegCommand) return } - data.Command.InputFiles[0].Path = path + t.FfmpegCommand.InputFiles[0].Path = path // TODO cleanup file when done defer func() { @@ -43,7 +43,7 @@ func RunHealthCheck(conf config.Config, t *types.Task, data types.HealthCheckDat return } - err = runFfmpegCommand(ctx, l, conf, data.Command) + err = runFfmpegCommand(ctx, l, conf, t.FfmpegCommand) if err != nil { l.ErrorContext(ctx, "FFmpeg Failed", "err", err) return diff --git a/task/task.go b/task/task.go index 39e2856..6e2ad61 100644 --- a/task/task.go +++ b/task/task.go @@ -18,10 +18,11 @@ func StartTask(conf config.Config, data types.TaskStart) error { defer taskMutex.Unlock() tasks[data.ID] = &types.Task{ - ID: data.ID, - Type: data.Type, - FileID: data.FileID, - FileMD5: data.FileMD5, + ID: data.ID, + Type: data.Type, + FileID: data.FileID, + FileMD5: data.FileMD5, + FfmpegCommand: data.FfmpegCommand, } switch data.Type { diff --git a/task/transcode.go b/task/transcode.go index 6ddd781..3a1e863 100644 --- a/task/transcode.go +++ b/task/transcode.go @@ -21,20 +21,20 @@ func RunTranscode(conf config.Config, t *types.Task, data types.TranscodeData) { dst_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("dst-%v-%v.mkv", t.ID, t.FileID)) // Set ffmpeg input path - if len(data.Command.InputFiles) == 0 { - l.ErrorContext(ctx, "FFmpeg Command has no input files") + if len(t.FfmpegCommand.InputFiles) == 0 { + l.ErrorContext(ctx, "FFmpeg Command has no input files", "command", t.FfmpegCommand) return } - data.Command.InputFiles[0].Path = src_path + t.FfmpegCommand.InputFiles[0].Path = src_path // Set ffmpeg output path - if len(data.Command.OutputFiles) == 0 { - l.ErrorContext(ctx, "FFmpeg Command has no output files") + if len(t.FfmpegCommand.OutputFiles) == 0 { + l.ErrorContext(ctx, "FFmpeg Command has no output files", "command", t.FfmpegCommand) return } - data.Command.OutputFiles[0].Path = dst_path + t.FfmpegCommand.OutputFiles[0].Path = dst_path // TODO cleanup file when done defer func() { @@ -62,7 +62,7 @@ func RunTranscode(conf config.Config, t *types.Task, data types.TranscodeData) { } }() - err = runFfmpegCommand(ctx, l, conf, data.Command) + err = runFfmpegCommand(ctx, l, conf, t.FfmpegCommand) if err != nil { l.ErrorContext(ctx, "FFmpeg Failed", "err", err) return diff --git a/tmpl/ffmpeg_commands.tmpl b/tmpl/ffmpeg_commands.tmpl new file mode 100644 index 0000000..2d6f23e --- /dev/null +++ b/tmpl/ffmpeg_commands.tmpl @@ -0,0 +1,32 @@ +{{template "head"}} +
ID | +Name | +Data | +
---|---|---|
+ {{ $t.ID }} + | ++ {{ $t.Name }} + | ++ {{ $t.Data }} + | +