From 8dc7d907a8b7f346ff841589e33d75776eb8ec8c Mon Sep 17 00:00:00 2001 From: speatzle Date: Thu, 9 May 2024 20:07:38 +0200 Subject: [PATCH] Download file for healthcheck --- task/healthcheck.go | 64 +++++++++++++++++++++++++++++++++++++++++---- worker/worker.go | 2 +- 2 files changed, 60 insertions(+), 6 deletions(-) diff --git a/task/healthcheck.go b/task/healthcheck.go index 824d609..5d2fa3e 100644 --- a/task/healthcheck.go +++ b/task/healthcheck.go @@ -4,7 +4,12 @@ import ( "bufio" "bytes" "context" + "fmt" + "io" + "net/http" + "os" "os/exec" + "path/filepath" "sync" "git.lastassault.de/speatzle/morffix/config" @@ -44,12 +49,61 @@ func scanLines(data []byte, atEOF bool) (advance int, token []byte, err error) { func RunHealthCheck(conf config.Config, t *types.Task, data types.HealthCheckData) { ctx := context.TODO() l := log.GetTaskLogger(t) - l.InfoContext(ctx, "Running ffmpeg", "args", data.Command.GetArgs()) - path := conf.Worker.FFmpegPath - if path == "" { - path = "ffmpeg" + + // TODO Figure out how to get correct file ending + path := filepath.Join(conf.Worker.TempDir, fmt.Sprintf("%v-%v.mkv", t.ID, t.FileID)) + + // Set ffmpeg input path + if len(data.Command.InputFiles) == 0 { + l.ErrorContext(ctx, "FFmpeg Command has no input files") + return } - cmd := exec.CommandContext(ctx, path, data.Command.GetArgs()...) + + data.Command.InputFiles[0].Path = path + + // TODO cleanup file when done + defer func() { + err := os.Remove(path) + if err != nil { + l.ErrorContext(ctx, "Removing File", "err", err, "path", path) + } else { + l.ErrorContext(ctx, "File Removed Succesfully", "path", path) + } + }() + + l.InfoContext(ctx, "Starting File Download", "task_id", t.ID, "file_id", t.FileID, "path", path) + + err := func() error { + out, err := os.Create(path) + if err != nil { + return fmt.Errorf("Creating File: %w", err) + } + defer out.Close() + resp, err := http.Get(fmt.Sprintf("%v/files/%v", conf.Worker.Address, t.FileID)) + if err != nil { + return fmt.Errorf("Getting File: %w", err) + } + + // TODO Log at interval logs read + + defer resp.Body.Close() + n, err := io.Copy(out, resp.Body) + if err != nil { + return fmt.Errorf("Reading File: %w", err) + } + + l.InfoContext(ctx, "Downloaded File", "bytes", n) + + return nil + }() + + if err != nil { + l.ErrorContext(ctx, "File Download Failed", "err", err) + return + } + + l.InfoContext(ctx, "Running ffmpeg", "args", data.Command.GetArgs()) + cmd := exec.CommandContext(ctx, conf.Worker.FFmpegPath, data.Command.GetArgs()...) var wg sync.WaitGroup diff --git a/worker/worker.go b/worker/worker.go index 54ef359..07dbad6 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -68,7 +68,7 @@ func connectToServer(ctx context.Context, uuid uuid.UUID) { headers.Add(constants.UUID_HEADER, uuid.String()) headers.Add(constants.WORKER_VERSION_HEADER, constants.WORKER_VERSION) - c, res, err := websocket.Dial(ctx, conf.Worker.Address, &websocket.DialOptions{ + c, res, err := websocket.Dial(ctx, conf.Worker.Address+"/worker", &websocket.DialOptions{ HTTPHeader: headers, }) if err != nil {