move ffmpeg command to seperate table
This commit is contained in:
parent
72dfd2408f
commit
cdeb266e38
20 changed files with 245 additions and 84 deletions
128
server/task.go
128
server/task.go
|
@ -16,9 +16,10 @@ import (
|
|||
)
|
||||
|
||||
type TasksData struct {
|
||||
Libraries []Library
|
||||
Tasks []TaskDisplay
|
||||
Stats TaskStats
|
||||
Libraries []Library
|
||||
FfmpegCommands []FfmpegCommand
|
||||
Tasks []TaskDisplay
|
||||
Stats TaskStats
|
||||
}
|
||||
|
||||
type TaskStats struct {
|
||||
|
@ -34,23 +35,25 @@ type TaskStats struct {
|
|||
}
|
||||
|
||||
type TaskDisplay struct {
|
||||
ID int `db:"id"`
|
||||
Library int `db:"library"`
|
||||
Worker *string `db:"worker"`
|
||||
Type int `db:"type"`
|
||||
Status string `db:"status"`
|
||||
File string `db:"file"`
|
||||
UpdatedAt string `db:"updated_at"`
|
||||
ID int `db:"id"`
|
||||
Library int `db:"library"`
|
||||
Worker *string `db:"worker"`
|
||||
Type int `db:"type"`
|
||||
FfmpegCommand *string `db:"ffmpeg_command"`
|
||||
Status string `db:"status"`
|
||||
File string `db:"file"`
|
||||
UpdatedAt string `db:"updated_at"`
|
||||
}
|
||||
|
||||
type TaskDB struct {
|
||||
ID int `db:"id"`
|
||||
Library int `db:"library"`
|
||||
Worker *string `db:"worker"`
|
||||
Type int `db:"type"`
|
||||
Status constants.TaskStatus `db:"status"`
|
||||
File string `db:"file"`
|
||||
UpdatedAt time.Time `db:"updated_at"`
|
||||
ID int `db:"id"`
|
||||
Library int `db:"library"`
|
||||
Worker *string `db:"worker"`
|
||||
Type int `db:"type"`
|
||||
FfmpegCommand *string `db:"ffmpeg_command"`
|
||||
Status constants.TaskStatus `db:"status"`
|
||||
File string `db:"file"`
|
||||
UpdatedAt time.Time `db:"updated_at"`
|
||||
}
|
||||
|
||||
func handleTasks(w http.ResponseWriter, r *http.Request) {
|
||||
|
@ -92,12 +95,26 @@ func handleTasks(w http.ResponseWriter, r *http.Request) {
|
|||
libraries, err := pgx.CollectRows[Library](rows, pgx.RowToStructByName[Library])
|
||||
if err != nil {
|
||||
slog.ErrorContext(r.Context(), "Collect Rows", "err", err)
|
||||
http.Error(w, "Error Query Libraries: "+err.Error(), http.StatusInternalServerError)
|
||||
http.Error(w, "Error Collect Libraries: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
data.Libraries = libraries
|
||||
|
||||
rows, err = db.Query(r.Context(), "SELECT t.id AS id, l.id AS library, t.worker_id AS worker, t.type AS type, t.status AS status, f.path AS file, t.updated_at AS updated_at FROM tasks t INNER JOIN files f ON f.id = t.file_id INNER JOIN libraries l ON l.id = f.library_id ORDER BY CASE t.type WHEN 3 THEN -1 ELSE t.type END, t.id")
|
||||
rows, err = db.Query(r.Context(), "SELECT id, name, data FROM ffmpeg_commands")
|
||||
if err != nil {
|
||||
slog.ErrorContext(r.Context(), "Query Ffmpeg Commands", "err", err)
|
||||
http.Error(w, "Error Ffmpeg Commands: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
ffmpegCommands, err := pgx.CollectRows[FfmpegCommand](rows, pgx.RowToStructByName[FfmpegCommand])
|
||||
if err != nil {
|
||||
slog.ErrorContext(r.Context(), "Collect Ffmpeg Commands", "err", err)
|
||||
http.Error(w, "Error Collect Ffmpeg Commands: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
data.FfmpegCommands = ffmpegCommands
|
||||
|
||||
rows, err = db.Query(r.Context(), "SELECT t.id AS id, l.id AS library, t.worker_id AS worker, t.type AS type, fc.name AS ffmpeg_command, t.status AS status, f.path AS file, t.updated_at AS updated_at FROM tasks t INNER JOIN files f ON f.id = t.file_id INNER JOIN libraries l ON l.id = f.library_id INNER JOIN ffmpeg_commands fc ON fc.id = t.ffmpeg_command_id ORDER BY CASE t.type WHEN 3 THEN -1 ELSE t.type END, t.id")
|
||||
if err != nil {
|
||||
slog.ErrorContext(r.Context(), "Query Tasks", "err", err)
|
||||
http.Error(w, "Error Query Tasks: "+err.Error(), http.StatusInternalServerError)
|
||||
|
@ -111,13 +128,14 @@ func handleTasks(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
for i := range tasks {
|
||||
data.Tasks = append(data.Tasks, TaskDisplay{
|
||||
ID: tasks[i].ID,
|
||||
Library: tasks[i].Library,
|
||||
Worker: tasks[i].Worker,
|
||||
Type: tasks[i].Type,
|
||||
File: tasks[i].File,
|
||||
Status: tasks[i].Status.String(),
|
||||
UpdatedAt: tasks[i].UpdatedAt.Format(time.DateTime),
|
||||
ID: tasks[i].ID,
|
||||
Library: tasks[i].Library,
|
||||
Worker: tasks[i].Worker,
|
||||
Type: tasks[i].Type,
|
||||
FfmpegCommand: tasks[i].FfmpegCommand,
|
||||
File: tasks[i].File,
|
||||
Status: tasks[i].Status.String(),
|
||||
UpdatedAt: tasks[i].UpdatedAt.Format(time.DateTime),
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -198,23 +216,25 @@ func createTask(ctx context.Context, r *http.Request) error {
|
|||
defer tx.Rollback(ctx)
|
||||
|
||||
var data any
|
||||
if typ == constants.TASK_TYPE_HEALTHCHECK {
|
||||
/*
|
||||
if typ == constants.TASK_TYPE_HEALTHCHECK {
|
||||
|
||||
// ffmpeg.exe -stats -v error -i "in.mp4" -f null -max_muxing_queue_size 9999 "out.mp4"
|
||||
data = types.HealthCheckData{Command: types.FFmpegCommand{
|
||||
Args: []types.Arg{{Flag: "-stats"}, {Flag: "-v", Value: "error"}, {Flag: "-xerror"}},
|
||||
InputFiles: []types.File{{Path: "input.mkv"}},
|
||||
OutputFiles: []types.File{{Path: "output.mkv", Arguments: []types.Arg{{Flag: "-f", Value: "null"}, {Flag: "-max_muxing_queue_size", Value: "9999"}}}},
|
||||
}}
|
||||
} else if typ == constants.TASK_TYPE_TRANSCODE {
|
||||
data = types.TranscodeData{Command: types.FFmpegCommand{
|
||||
Args: []types.Arg{{Flag: "-stats"}, {Flag: "-v", Value: "error"}, {Flag: "-xerror"}},
|
||||
InputFiles: []types.File{{Path: "input.mkv"}},
|
||||
OutputFiles: []types.File{{Path: "output.mkv", Arguments: []types.Arg{{Flag: "-map", Value: "0"}, {Flag: "-c", Value: "copy"}, {Flag: "-c:a", Value: "aac"}, {Flag: "-c:v", Value: "libsvtav1"}, {Flag: "-crf", Value: "35"}, {Flag: "-preset", Value: "8"}, {Flag: "-max_muxing_queue_size", Value: "9999"}}}},
|
||||
}}
|
||||
} else {
|
||||
return fmt.Errorf("Unkown Task Type: %v", typ)
|
||||
}
|
||||
// ffmpeg.exe -stats -v error -i "in.mp4" -f null -max_muxing_queue_size 9999 "out.mp4"
|
||||
data = types.HealthCheckData{Command: types.FFmpegCommand{
|
||||
Args: []types.Arg{{Flag: "-stats"}, {Flag: "-v", Value: "error"}, {Flag: "-xerror"}},
|
||||
InputFiles: []types.File{{Path: "input.mkv"}},
|
||||
OutputFiles: []types.File{{Path: "output.mkv", Arguments: []types.Arg{{Flag: "-f", Value: "null"}, {Flag: "-max_muxing_queue_size", Value: "9999"}}}},
|
||||
}}
|
||||
} else if typ == constants.TASK_TYPE_TRANSCODE {
|
||||
data = types.TranscodeData{Command: types.FFmpegCommand{
|
||||
Args: []types.Arg{{Flag: "-stats"}, {Flag: "-v", Value: "error"}, {Flag: "-xerror"}},
|
||||
InputFiles: []types.File{{Path: "input.mkv"}},
|
||||
OutputFiles: []types.File{{Path: "output.mkv", Arguments: []types.Arg{{Flag: "-map", Value: "0"}, {Flag: "-c", Value: "copy"}, {Flag: "-c:a", Value: "aac"}, {Flag: "-c:v", Value: "libsvtav1"}, {Flag: "-crf", Value: "35"}, {Flag: "-preset", Value: "8"}, {Flag: "-max_muxing_queue_size", Value: "9999"}}}},
|
||||
}}
|
||||
} else {
|
||||
return fmt.Errorf("Unkown Task Type: %v", typ)
|
||||
}
|
||||
*/
|
||||
|
||||
for _, file := range files {
|
||||
_, err = db.Exec(r.Context(), "INSERT INTO tasks (file_id, type, status, data) VALUES ($1,$2,$3,$4)", file.ID, typ, constants.TASK_STATUS_QUEUED, data)
|
||||
|
@ -232,15 +252,16 @@ func createTask(ctx context.Context, r *http.Request) error {
|
|||
}
|
||||
|
||||
type QueuedTask struct {
|
||||
ID int
|
||||
Type int
|
||||
FileID int `json:"file_id"`
|
||||
FileMD5 []byte `json:"file_md5" db:"md5"`
|
||||
Data json.RawMessage
|
||||
ID int
|
||||
Type int
|
||||
FileID int `json:"file_id"`
|
||||
FileMD5 []byte `json:"file_md5" db:"md5"`
|
||||
Data json.RawMessage
|
||||
FfmpegCommand types.FFmpegCommand `json:"ffmpeg_command" db:"ffmpeg_command"`
|
||||
}
|
||||
|
||||
func assignQueuedTasks(ctx context.Context) error {
|
||||
rows, err := db.Query(ctx, "SELECT t.id as id, t.type as type, t.file_id as file_id, f.md5 as md5, t.data as data FROM tasks t INNER JOIN files f ON f.id = t.file_id WHERE t.status = $1", constants.TASK_STATUS_QUEUED)
|
||||
rows, err := db.Query(ctx, "SELECT t.id as id, t.type as type, t.file_id as file_id, f.md5 as md5, t.data as data, fc.data as ffmpeg_command FROM tasks t INNER JOIN files f ON f.id = t.file_id INNER JOIN ffmpeg_commands fc ON fc.id = t.ffmpeg_command_id WHERE t.status = $1", constants.TASK_STATUS_QUEUED)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Query Queued Tasks: %w", err)
|
||||
}
|
||||
|
@ -288,11 +309,12 @@ func assignQueuedTasks(ctx context.Context) error {
|
|||
}
|
||||
|
||||
taskStart := types.TaskStart{
|
||||
ID: queuedTasks[lastAssigned].ID,
|
||||
Type: queuedTasks[lastAssigned].Type,
|
||||
FileID: queuedTasks[lastAssigned].FileID,
|
||||
FileMD5: queuedTasks[lastAssigned].FileMD5,
|
||||
Data: queuedTasks[lastAssigned].Data,
|
||||
ID: queuedTasks[lastAssigned].ID,
|
||||
Type: queuedTasks[lastAssigned].Type,
|
||||
FileID: queuedTasks[lastAssigned].FileID,
|
||||
FileMD5: queuedTasks[lastAssigned].FileMD5,
|
||||
Data: queuedTasks[lastAssigned].Data,
|
||||
FfmpegCommand: queuedTasks[lastAssigned].FfmpegCommand,
|
||||
}
|
||||
|
||||
_, err = rpcServer.Call(ctx, Workers[i].Conn, "task-start", taskStart, nil)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue