Transcode Task Handling
This commit is contained in:
parent
fea955fb79
commit
13ea1cb755
6 changed files with 164 additions and 20 deletions
|
@ -7,6 +7,7 @@ import (
|
|||
"fmt"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"git.lastassault.de/speatzle/morffix/constants"
|
||||
|
@ -174,7 +175,11 @@ func createTask(ctx context.Context, r *http.Request) error {
|
|||
}
|
||||
library := r.FormValue("library")
|
||||
health := r.FormValue("health")
|
||||
typ := r.FormValue("type")
|
||||
typ, err := strconv.Atoi(r.FormValue("type"))
|
||||
if err != nil {
|
||||
return fmt.Errorf("Parsing Task Type: %w", err)
|
||||
}
|
||||
|
||||
slog.Info("Got Task Create", "library", library, "health", health, "type", typ)
|
||||
|
||||
rows, err := db.Query(r.Context(), "SELECT id, path, size, status, health, md5, updated_at FROM files where library_id = $1 AND status = $2 AND (-1 = $3 OR health = $3)", library, constants.FILE_STATUS_EXISTS, health)
|
||||
|
@ -193,7 +198,7 @@ func createTask(ctx context.Context, r *http.Request) error {
|
|||
defer tx.Rollback(ctx)
|
||||
|
||||
var data any
|
||||
if true { // TODO typ == constants.TASK_TYPE_HEALTHCHECK {
|
||||
if typ == constants.TASK_TYPE_HEALTHCHECK {
|
||||
|
||||
// ffmpeg.exe -stats -v error -i "in.mp4" -f null -max_muxing_queue_size 9999 "out.mp4"
|
||||
data = types.HealthCheckData{Command: types.FFmpegCommand{
|
||||
|
@ -201,6 +206,14 @@ func createTask(ctx context.Context, r *http.Request) error {
|
|||
InputFiles: []types.File{{Path: "input.mkv"}},
|
||||
OutputFiles: []types.File{{Path: "output.mkv", Arguments: []types.Arg{{Flag: "-f", Value: "null"}, {Flag: "-max_muxing_queue_size", Value: "9999"}}}},
|
||||
}}
|
||||
} else if typ == constants.TASK_TYPE_TRANSCODE {
|
||||
data = types.TranscodeData{Command: types.FFmpegCommand{
|
||||
Args: []types.Arg{{Flag: "-stats"}, {Flag: "-v", Value: "error"}, {Flag: "-xerror"}},
|
||||
InputFiles: []types.File{{Path: "input.mkv"}},
|
||||
OutputFiles: []types.File{{Path: "output.mkv", Arguments: []types.Arg{{Flag: "-f", Value: "null"}, {Flag: "-max_muxing_queue_size", Value: "9999"}}}},
|
||||
}}
|
||||
} else {
|
||||
return fmt.Errorf("Unkown Task Type: %v", typ)
|
||||
}
|
||||
|
||||
for _, file := range files {
|
||||
|
|
|
@ -233,23 +233,36 @@ func updateWorkerTaskStatus(ctx context.Context) {
|
|||
return
|
||||
}
|
||||
|
||||
if ts.Task.Type == constants.TASK_TYPE_HEALTHCHECK && (ts.Task.Status == constants.TASK_STATUS_FAILED || ts.Task.Status == constants.TASK_STATUS_SUCCESS) {
|
||||
var health constants.FileHealth
|
||||
if ts.Task.Status == constants.TASK_STATUS_SUCCESS {
|
||||
health = constants.FILE_HEALTH_HEALTHY
|
||||
} else {
|
||||
// TODO, not all failures mean the file is damaged, only update on success and track ffmpeg errors in task result data. also remove -xerror and scan for errors manually to see all problems in logs
|
||||
health = constants.FILE_HEALTH_DAMAGED
|
||||
}
|
||||
_, err = db.Exec(ctx, "UPDATE files SET health = $2 WHERE id = $1", ts.Task.FileID, health)
|
||||
if err != nil {
|
||||
slog.ErrorContext(ctx, "Updating File health", "err", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Tell Worker to Delete Finished Tasks
|
||||
if ts.Task.Status == constants.TASK_STATUS_FAILED || ts.Task.Status == constants.TASK_STATUS_SUCCESS {
|
||||
if ts.Task.Type == constants.TASK_TYPE_HEALTHCHECK {
|
||||
var health constants.FileHealth
|
||||
if ts.Task.Status == constants.TASK_STATUS_SUCCESS {
|
||||
health = constants.FILE_HEALTH_HEALTHY
|
||||
// TODO Auto Queue Transcode for Successfull Transcodes if library setting
|
||||
} else {
|
||||
// TODO, not all failures mean the file is damaged, only update on success and track ffmpeg errors in task result data. also remove -xerror and scan for errors manually to see all problems in logs
|
||||
health = constants.FILE_HEALTH_DAMAGED
|
||||
}
|
||||
_, err = db.Exec(ctx, "UPDATE files SET health = $2 WHERE id = $1", ts.Task.FileID, health)
|
||||
if err != nil {
|
||||
slog.ErrorContext(ctx, "Updating File health", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
} else if ts.Task.Type == constants.TASK_TYPE_TRANSCODE {
|
||||
var transcode constants.FileTranscode
|
||||
if ts.Task.Status == constants.TASK_STATUS_SUCCESS {
|
||||
transcode = constants.FILE_TRANSCODE_SUCCESS
|
||||
} else {
|
||||
transcode = constants.FILE_TRANSCODE_FAILED
|
||||
}
|
||||
_, err = db.Exec(ctx, "UPDATE files SET transcode = $2 WHERE id = $1", ts.Task.FileID, transcode)
|
||||
if err != nil {
|
||||
slog.ErrorContext(ctx, "Updating File transcode", "err", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
// Tell Worker to Delete Finished Tasks
|
||||
_, err := rpcServer.Call(ctx, w.Conn, "task-delete", taskID, nil)
|
||||
if err != nil {
|
||||
slog.ErrorContext(ctx, "Deleting Finished Task From Worker", "err", err)
|
||||
|
@ -264,7 +277,7 @@ func updateWorkerTaskStatus(ctx context.Context) {
|
|||
|
||||
} else {
|
||||
// TODO wait for 5 minutes for worker to reconnect
|
||||
// Set Task Status to Unknown for Unfinished Tasks which where assigned to this not connected worker, right now they just get stuck
|
||||
// Set Task Status to Unknown for Unfinished Tasks which where assigned to this not connected worker, right now they just get stuck if worker does not reconnect
|
||||
|
||||
}
|
||||
}
|
||||
|
|
23
task/task.go
23
task/task.go
|
@ -49,7 +49,28 @@ func StartTask(conf config.Config, data types.TaskStart) error {
|
|||
}()
|
||||
return nil
|
||||
case constants.TASK_TYPE_TRANSCODE:
|
||||
return fmt.Errorf("Transcode Task Not Implemented")
|
||||
var tData types.TranscodeData
|
||||
err := json.Unmarshal(data.Data, &tData)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Unmarshal Transcode Data: %w", err)
|
||||
}
|
||||
|
||||
tasks[data.ID].Status = constants.TASK_STATUS_RUNNING
|
||||
go func() {
|
||||
defer func() {
|
||||
taskMutex.Lock()
|
||||
defer taskMutex.Unlock()
|
||||
t, ok := tasks[data.ID]
|
||||
if ok {
|
||||
if t.Status == constants.TASK_STATUS_RUNNING {
|
||||
t.Status = constants.TASK_STATUS_FAILED
|
||||
t.Log = append(t.Log, "Task Status Set to Failed by defer")
|
||||
}
|
||||
}
|
||||
}()
|
||||
RunTranscode(conf, tasks[data.ID], tData)
|
||||
}()
|
||||
return nil
|
||||
default:
|
||||
return fmt.Errorf("Unknown Task Type %v", data.Type)
|
||||
}
|
||||
|
|
79
task/transcode.go
Normal file
79
task/transcode.go
Normal file
|
@ -0,0 +1,79 @@
|
|||
package task
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"git.lastassault.de/speatzle/morffix/config"
|
||||
"git.lastassault.de/speatzle/morffix/constants"
|
||||
"git.lastassault.de/speatzle/morffix/task/log"
|
||||
"git.lastassault.de/speatzle/morffix/types"
|
||||
)
|
||||
|
||||
func RunTranscode(conf config.Config, t *types.Task, data types.TranscodeData) {
|
||||
ctx := context.TODO()
|
||||
l := log.GetTaskLogger(t)
|
||||
|
||||
// TODO Figure out how to get correct file ending
|
||||
src_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("src-%v-%v.mkv", t.ID, t.FileID))
|
||||
dst_path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("dst-%v-%v.mkv", t.ID, t.FileID))
|
||||
|
||||
// Set ffmpeg input path
|
||||
if len(data.Command.InputFiles) == 0 {
|
||||
l.ErrorContext(ctx, "FFmpeg Command has no input files")
|
||||
return
|
||||
}
|
||||
|
||||
data.Command.InputFiles[0].Path = src_path
|
||||
|
||||
// Set ffmpeg output path
|
||||
if len(data.Command.OutputFiles) == 0 {
|
||||
l.ErrorContext(ctx, "FFmpeg Command has no output files")
|
||||
return
|
||||
}
|
||||
|
||||
data.Command.OutputFiles[0].Path = dst_path
|
||||
|
||||
// TODO cleanup file when done
|
||||
defer func() {
|
||||
err := os.Remove(src_path)
|
||||
if err != nil {
|
||||
l.ErrorContext(ctx, "Removing Source File", "err", err, "path", src_path)
|
||||
} else {
|
||||
l.InfoContext(ctx, "Source File Removed Succesfully", "path", src_path)
|
||||
}
|
||||
}()
|
||||
|
||||
err := downloadFile(ctx, l, conf.Worker.Address, src_path, t)
|
||||
if err != nil {
|
||||
l.ErrorContext(ctx, "Source File Download Failed", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
// TODO cleanup file when done
|
||||
defer func() {
|
||||
err := os.Remove(dst_path)
|
||||
if err != nil {
|
||||
l.ErrorContext(ctx, "Removing Destination File", "err", err, "path", dst_path)
|
||||
} else {
|
||||
l.InfoContext(ctx, "File Destination Removed Succesfully", "path", dst_path)
|
||||
}
|
||||
}()
|
||||
|
||||
err = runFfmpegCommand(ctx, l, conf, data.Command)
|
||||
if err != nil {
|
||||
l.ErrorContext(ctx, "FFmpeg Failed", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
err = uploadFile(ctx, l, conf.Worker.Address, dst_path, t)
|
||||
if err != nil {
|
||||
l.ErrorContext(ctx, "File Upload Failed", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
l.InfoContext(ctx, "Task Success")
|
||||
t.Status = constants.TASK_STATUS_SUCCESS
|
||||
}
|
13
task/upload.go
Normal file
13
task/upload.go
Normal file
|
@ -0,0 +1,13 @@
|
|||
package task
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
|
||||
"git.lastassault.de/speatzle/morffix/types"
|
||||
)
|
||||
|
||||
func uploadFile(ctx context.Context, l *slog.Logger, address string, path string, t *types.Task) error {
|
||||
return fmt.Errorf("File Upload not Implemented")
|
||||
}
|
5
types/transcode.go
Normal file
5
types/transcode.go
Normal file
|
@ -0,0 +1,5 @@
|
|||
package types
|
||||
|
||||
type TranscodeData struct {
|
||||
Command FFmpegCommand `json:"command"`
|
||||
}
|
Loading…
Add table
Reference in a new issue