From 13ea1cb755a71441eca7ee41a8bec0e9c6cfe4c0 Mon Sep 17 00:00:00 2001 From: Samuel Lorch Date: Sat, 22 Jun 2024 21:12:52 +0200 Subject: [PATCH] Transcode Task Handling --- server/task.go | 17 ++++++++-- server/worker.go | 47 +++++++++++++++++---------- task/task.go | 23 +++++++++++++- task/transcode.go | 79 ++++++++++++++++++++++++++++++++++++++++++++++ task/upload.go | 13 ++++++++ types/transcode.go | 5 +++ 6 files changed, 164 insertions(+), 20 deletions(-) create mode 100644 task/transcode.go create mode 100644 task/upload.go create mode 100644 types/transcode.go diff --git a/server/task.go b/server/task.go index 526bcb7..9714e57 100644 --- a/server/task.go +++ b/server/task.go @@ -7,6 +7,7 @@ import ( "fmt" "log/slog" "net/http" + "strconv" "time" "git.lastassault.de/speatzle/morffix/constants" @@ -174,7 +175,11 @@ func createTask(ctx context.Context, r *http.Request) error { } library := r.FormValue("library") health := r.FormValue("health") - typ := r.FormValue("type") + typ, err := strconv.Atoi(r.FormValue("type")) + if err != nil { + return fmt.Errorf("Parsing Task Type: %w", err) + } + slog.Info("Got Task Create", "library", library, "health", health, "type", typ) rows, err := db.Query(r.Context(), "SELECT id, path, size, status, health, md5, updated_at FROM files where library_id = $1 AND status = $2 AND (-1 = $3 OR health = $3)", library, constants.FILE_STATUS_EXISTS, health) @@ -193,7 +198,7 @@ func createTask(ctx context.Context, r *http.Request) error { defer tx.Rollback(ctx) var data any - if true { // TODO typ == constants.TASK_TYPE_HEALTHCHECK { + if typ == constants.TASK_TYPE_HEALTHCHECK { // ffmpeg.exe -stats -v error -i "in.mp4" -f null -max_muxing_queue_size 9999 "out.mp4" data = types.HealthCheckData{Command: types.FFmpegCommand{ @@ -201,6 +206,14 @@ func createTask(ctx context.Context, r *http.Request) error { InputFiles: []types.File{{Path: "input.mkv"}}, OutputFiles: []types.File{{Path: "output.mkv", Arguments: []types.Arg{{Flag: "-f", Value: "null"}, {Flag: "-max_muxing_queue_size", Value: "9999"}}}}, }} + } else if typ == constants.TASK_TYPE_TRANSCODE { + data = types.TranscodeData{Command: types.FFmpegCommand{ + Args: []types.Arg{{Flag: "-stats"}, {Flag: "-v", Value: "error"}, {Flag: "-xerror"}}, + InputFiles: []types.File{{Path: "input.mkv"}}, + OutputFiles: []types.File{{Path: "output.mkv", Arguments: []types.Arg{{Flag: "-f", Value: "null"}, {Flag: "-max_muxing_queue_size", Value: "9999"}}}}, + }} + } else { + return fmt.Errorf("Unkown Task Type: %v", typ) } for _, file := range files { diff --git a/server/worker.go b/server/worker.go index c3b2141..573afea 100644 --- a/server/worker.go +++ b/server/worker.go @@ -233,23 +233,36 @@ func updateWorkerTaskStatus(ctx context.Context) { return } - if ts.Task.Type == constants.TASK_TYPE_HEALTHCHECK && (ts.Task.Status == constants.TASK_STATUS_FAILED || ts.Task.Status == constants.TASK_STATUS_SUCCESS) { - var health constants.FileHealth - if ts.Task.Status == constants.TASK_STATUS_SUCCESS { - health = constants.FILE_HEALTH_HEALTHY - } else { - // TODO, not all failures mean the file is damaged, only update on success and track ffmpeg errors in task result data. also remove -xerror and scan for errors manually to see all problems in logs - health = constants.FILE_HEALTH_DAMAGED - } - _, err = db.Exec(ctx, "UPDATE files SET health = $2 WHERE id = $1", ts.Task.FileID, health) - if err != nil { - slog.ErrorContext(ctx, "Updating File health", "err", err) - return - } - } - - // Tell Worker to Delete Finished Tasks if ts.Task.Status == constants.TASK_STATUS_FAILED || ts.Task.Status == constants.TASK_STATUS_SUCCESS { + if ts.Task.Type == constants.TASK_TYPE_HEALTHCHECK { + var health constants.FileHealth + if ts.Task.Status == constants.TASK_STATUS_SUCCESS { + health = constants.FILE_HEALTH_HEALTHY + // TODO Auto Queue Transcode for Successfull Transcodes if library setting + } else { + // TODO, not all failures mean the file is damaged, only update on success and track ffmpeg errors in task result data. also remove -xerror and scan for errors manually to see all problems in logs + health = constants.FILE_HEALTH_DAMAGED + } + _, err = db.Exec(ctx, "UPDATE files SET health = $2 WHERE id = $1", ts.Task.FileID, health) + if err != nil { + slog.ErrorContext(ctx, "Updating File health", "err", err) + return + } + + } else if ts.Task.Type == constants.TASK_TYPE_TRANSCODE { + var transcode constants.FileTranscode + if ts.Task.Status == constants.TASK_STATUS_SUCCESS { + transcode = constants.FILE_TRANSCODE_SUCCESS + } else { + transcode = constants.FILE_TRANSCODE_FAILED + } + _, err = db.Exec(ctx, "UPDATE files SET transcode = $2 WHERE id = $1", ts.Task.FileID, transcode) + if err != nil { + slog.ErrorContext(ctx, "Updating File transcode", "err", err) + return + } + } + // Tell Worker to Delete Finished Tasks _, err := rpcServer.Call(ctx, w.Conn, "task-delete", taskID, nil) if err != nil { slog.ErrorContext(ctx, "Deleting Finished Task From Worker", "err", err) @@ -264,7 +277,7 @@ func updateWorkerTaskStatus(ctx context.Context) { } else { // TODO wait for 5 minutes for worker to reconnect - // Set Task Status to Unknown for Unfinished Tasks which where assigned to this not connected worker, right now they just get stuck + // Set Task Status to Unknown for Unfinished Tasks which where assigned to this not connected worker, right now they just get stuck if worker does not reconnect } } diff --git a/task/task.go b/task/task.go index c2d3ef3..39e2856 100644 --- a/task/task.go +++ b/task/task.go @@ -49,7 +49,28 @@ func StartTask(conf config.Config, data types.TaskStart) error { }() return nil case constants.TASK_TYPE_TRANSCODE: - return fmt.Errorf("Transcode Task Not Implemented") + var tData types.TranscodeData + err := json.Unmarshal(data.Data, &tData) + if err != nil { + return fmt.Errorf("Unmarshal Transcode Data: %w", err) + } + + tasks[data.ID].Status = constants.TASK_STATUS_RUNNING + go func() { + defer func() { + taskMutex.Lock() + defer taskMutex.Unlock() + t, ok := tasks[data.ID] + if ok { + if t.Status == constants.TASK_STATUS_RUNNING { + t.Status = constants.TASK_STATUS_FAILED + t.Log = append(t.Log, "Task Status Set to Failed by defer") + } + } + }() + RunTranscode(conf, tasks[data.ID], tData) + }() + return nil default: return fmt.Errorf("Unknown Task Type %v", data.Type) } diff --git a/task/transcode.go b/task/transcode.go new file mode 100644 index 0000000..7bec3a2 --- /dev/null +++ b/task/transcode.go @@ -0,0 +1,79 @@ +package task + +import ( + "context" + "fmt" + "os" + "path/filepath" + + "git.lastassault.de/speatzle/morffix/config" + "git.lastassault.de/speatzle/morffix/constants" + "git.lastassault.de/speatzle/morffix/task/log" + "git.lastassault.de/speatzle/morffix/types" +) + +func RunTranscode(conf config.Config, t *types.Task, data types.TranscodeData) { + ctx := context.TODO() + l := log.GetTaskLogger(t) + + // TODO Figure out how to get correct file ending + src_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("src-%v-%v.mkv", t.ID, t.FileID)) + dst_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("dst-%v-%v.mkv", t.ID, t.FileID)) + + // Set ffmpeg input path + if len(data.Command.InputFiles) == 0 { + l.ErrorContext(ctx, "FFmpeg Command has no input files") + return + } + + data.Command.InputFiles[0].Path = src_path + + // Set ffmpeg output path + if len(data.Command.OutputFiles) == 0 { + l.ErrorContext(ctx, "FFmpeg Command has no output files") + return + } + + data.Command.OutputFiles[0].Path = dst_path + + // TODO cleanup file when done + defer func() { + err := os.Remove(src_path) + if err != nil { + l.ErrorContext(ctx, "Removing Source File", "err", err, "path", src_path) + } else { + l.InfoContext(ctx, "Source File Removed Succesfully", "path", src_path) + } + }() + + err := downloadFile(ctx, l, conf.Worker.Address, src_path, t) + if err != nil { + l.ErrorContext(ctx, "Source File Download Failed", "err", err) + return + } + + // TODO cleanup file when done + defer func() { + err := os.Remove(dst_path) + if err != nil { + l.ErrorContext(ctx, "Removing Destination File", "err", err, "path", dst_path) + } else { + l.InfoContext(ctx, "File Destination Removed Succesfully", "path", dst_path) + } + }() + + err = runFfmpegCommand(ctx, l, conf, data.Command) + if err != nil { + l.ErrorContext(ctx, "FFmpeg Failed", "err", err) + return + } + + err = uploadFile(ctx, l, conf.Worker.Address, dst_path, t) + if err != nil { + l.ErrorContext(ctx, "File Upload Failed", "err", err) + return + } + + l.InfoContext(ctx, "Task Success") + t.Status = constants.TASK_STATUS_SUCCESS +} diff --git a/task/upload.go b/task/upload.go new file mode 100644 index 0000000..5b204a4 --- /dev/null +++ b/task/upload.go @@ -0,0 +1,13 @@ +package task + +import ( + "context" + "fmt" + "log/slog" + + "git.lastassault.de/speatzle/morffix/types" +) + +func uploadFile(ctx context.Context, l *slog.Logger, address string, path string, t *types.Task) error { + return fmt.Errorf("File Upload not Implemented") +} diff --git a/types/transcode.go b/types/transcode.go new file mode 100644 index 0000000..62f1258 --- /dev/null +++ b/types/transcode.go @@ -0,0 +1,5 @@ +package types + +type TranscodeData struct { + Command FFmpegCommand `json:"command"` +}