From fea955fb7932d72364a9276790d9fc961b41f3b0 Mon Sep 17 00:00:00 2001 From: Samuel Lorch Date: Sat, 22 Jun 2024 21:04:03 +0200 Subject: [PATCH] Make Healthcheck download and ffmpeg reusable --- task/download.go | 94 +++++++++++++++++++++++ task/ffmpeg.go | 105 ++++++++++++++++++++++++++ task/healthcheck.go | 176 +------------------------------------------- 3 files changed, 202 insertions(+), 173 deletions(-) create mode 100644 task/download.go create mode 100644 task/ffmpeg.go diff --git a/task/download.go b/task/download.go new file mode 100644 index 0000000..aff883f --- /dev/null +++ b/task/download.go @@ -0,0 +1,94 @@ +package task + +import ( + "context" + "crypto/md5" + "fmt" + "io" + "log/slog" + "net/http" + "os" + "slices" + "time" + + "git.lastassault.de/speatzle/morffix/types" +) + +func downloadFile(ctx context.Context, l *slog.Logger, address string, path string, t *types.Task) error { + l.InfoContext(ctx, "Starting File Download", "task_id", t.ID, "file_id", t.FileID, "path", path, "md5", t.FileMD5) + + out, err := os.Create(path) + if err != nil { + return fmt.Errorf("Creating File: %w", err) + } + defer out.Close() + + req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%v/files/%v", address, t.FileID), nil) + if err != nil { + return fmt.Errorf("New Request: %w", err) + } + + req.Close = true + + var client = &http.Client{ + Transport: &http.Transport{}, + } + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("Getting File: %w", err) + } + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("Got HTTP Status Code: %v", resp.StatusCode) + } + + req.Close = true + + // TODO Log at interval logs read + + // Calculate hash and write file at the same time + hash := md5.New() + tr := io.TeeReader(resp.Body, hash) + + cr := &countReader{Reader: tr} + + stopProgress := make(chan bool, 1) + + go func() { + tik := time.NewTicker(time.Second) + + lastCount := 0 + + for { + select { + case <-tik.C: + speed := cr.n - lastCount + l.InfoContext(ctx, "Download Progress", "bytes", cr.n, "lastCount", lastCount, "length", resp.ContentLength, "speed", speed) + lastCount = cr.n + case <-stopProgress: + tik.Stop() + } + + } + }() + + defer func() { + stopProgress <- true + }() + + defer resp.Body.Close() + n, err := io.Copy(out, cr) + if err != nil { + return fmt.Errorf("Reading File: %w", err) + } + + md5 := hash.Sum(nil) + + l.InfoContext(ctx, "Downloaded File", "bytes", n, "md5", md5) + + if slices.Compare[[]byte](md5, t.FileMD5) != 0 { + return fmt.Errorf("Downloaded File does not match md5") + } + + return nil +} diff --git a/task/ffmpeg.go b/task/ffmpeg.go new file mode 100644 index 0000000..e5b4d2b --- /dev/null +++ b/task/ffmpeg.go @@ -0,0 +1,105 @@ +package task + +import ( + "bufio" + "bytes" + "context" + "fmt" + "io" + "log/slog" + "os/exec" + "sync" + + "git.lastassault.de/speatzle/morffix/config" + "git.lastassault.de/speatzle/morffix/types" +) + +type countReader struct { + io.Reader + n int +} + +func (w *countReader) Read(p []byte) (int, error) { + n, err := w.Reader.Read(p) + w.n += n + return n, err +} + +// dropCR drops a terminal \r from the data. +func dropCR(data []byte) []byte { + if len(data) > 0 && data[len(data)-1] == '\r' { + return data[0 : len(data)-1] + } + return data +} + +func scanLines(data []byte, atEOF bool) (advance int, token []byte, err error) { + if atEOF && len(data) == 0 { + return 0, nil, nil + } + if i := bytes.IndexByte(data, '\n'); i >= 0 { + // We have a full newline-terminated line. + return i + 1, dropCR(data[0:i]), nil + } + if i := bytes.IndexByte(data, '\r'); i >= 0 { + // We have a return line. + return i + 1, dropCR(data[0:i]), nil + } + // If we're at EOF, we have a final, non-terminated line. Return it. + if atEOF { + return len(data), dropCR(data), nil + } + // Request more data. + return 0, nil, nil +} + +func runFfmpegCommand(ctx context.Context, l *slog.Logger, conf config.Config, command types.FFmpegCommand) error { + l.InfoContext(ctx, "Running ffmpeg", "args", command.GetArgs()) + cmd := exec.CommandContext(ctx, conf.Worker.FFmpegPath, command.GetArgs()...) + + var wg sync.WaitGroup + + stdout, err := cmd.StdoutPipe() + if err != nil { + return fmt.Errorf("Error getting StdoutPipe: %w", err) + } + + wg.Add(1) + outScanner := bufio.NewScanner(stdout) + outScanner.Split(scanLines) + go func() { + for outScanner.Scan() { + l.InfoContext(ctx, outScanner.Text()) + } + wg.Done() + }() + + stderr, err := cmd.StderrPipe() + if err != nil { + return fmt.Errorf("Error getting StderrPipe: %w", err) + } + wg.Add(1) + errScanner := bufio.NewScanner(stderr) + errScanner.Split(scanLines) + go func() { + for errScanner.Scan() { + l.InfoContext(ctx, errScanner.Text()) + } + wg.Done() + }() + + err = cmd.Start() + if err != nil { + l.ErrorContext(ctx, "Error Starting ffmpeg", "err", err) + return fmt.Errorf("Error Starting ffmpeg: %w", err) + } + + wg.Wait() + + err = cmd.Wait() + if err != nil { + l.ErrorContext(ctx, "Error Running ffmpeg", "err", err) + return fmt.Errorf("Error Running ffmpeg: %w", err) + } + return nil +} diff --git a/task/healthcheck.go b/task/healthcheck.go index 0a65843..d97283f 100644 --- a/task/healthcheck.go +++ b/task/healthcheck.go @@ -1,19 +1,10 @@ package task import ( - "bufio" - "bytes" "context" - "crypto/md5" "fmt" - "io" - "net/http" "os" - "os/exec" "path/filepath" - "slices" - "sync" - "time" "git.lastassault.de/speatzle/morffix/config" "git.lastassault.de/speatzle/morffix/constants" @@ -21,45 +12,6 @@ import ( "git.lastassault.de/speatzle/morffix/types" ) -type countReader struct { - io.Reader - n int -} - -func (w *countReader) Read(p []byte) (int, error) { - n, err := w.Reader.Read(p) - w.n += n - return n, err -} - -// dropCR drops a terminal \r from the data. -func dropCR(data []byte) []byte { - if len(data) > 0 && data[len(data)-1] == '\r' { - return data[0 : len(data)-1] - } - return data -} - -func scanLines(data []byte, atEOF bool) (advance int, token []byte, err error) { - if atEOF && len(data) == 0 { - return 0, nil, nil - } - if i := bytes.IndexByte(data, '\n'); i >= 0 { - // We have a full newline-terminated line. - return i + 1, dropCR(data[0:i]), nil - } - if i := bytes.IndexByte(data, '\r'); i >= 0 { - // We have a return line. - return i + 1, dropCR(data[0:i]), nil - } - // If we're at EOF, we have a final, non-terminated line. Return it. - if atEOF { - return len(data), dropCR(data), nil - } - // Request more data. - return 0, nil, nil -} - func RunHealthCheck(conf config.Config, t *types.Task, data types.HealthCheckData) { ctx := context.TODO() l := log.GetTaskLogger(t) @@ -85,137 +37,15 @@ func RunHealthCheck(conf config.Config, t *types.Task, data types.HealthCheckDat } }() - l.InfoContext(ctx, "Starting File Download", "task_id", t.ID, "file_id", t.FileID, "path", path, "md5", t.FileMD5) - - err := func() error { - out, err := os.Create(path) - if err != nil { - return fmt.Errorf("Creating File: %w", err) - } - defer out.Close() - - req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%v/files/%v", conf.Worker.Address, t.FileID), nil) - if err != nil { - return fmt.Errorf("New Request: %w", err) - } - - req.Close = true - - var client = &http.Client{ - Transport: &http.Transport{}, - } - resp, err := client.Do(req) - if err != nil { - return fmt.Errorf("Getting File: %w", err) - } - - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("Got HTTP Status Code: %v", resp.StatusCode) - } - - req.Close = true - - // TODO Log at interval logs read - - // Calculate hash and write file at the same time - hash := md5.New() - tr := io.TeeReader(resp.Body, hash) - - cr := &countReader{Reader: tr} - - stopProgress := make(chan bool, 1) - - go func() { - tik := time.NewTicker(time.Second) - - lastCount := 0 - - for { - select { - case <-tik.C: - speed := cr.n - lastCount - l.InfoContext(ctx, "Download Progress", "bytes", cr.n, "lastCount", lastCount, "length", resp.ContentLength, "speed", speed) - lastCount = cr.n - case <-stopProgress: - tik.Stop() - } - - } - }() - - defer func() { - stopProgress <- true - }() - - defer resp.Body.Close() - n, err := io.Copy(out, cr) - if err != nil { - return fmt.Errorf("Reading File: %w", err) - } - - md5 := hash.Sum(nil) - - l.InfoContext(ctx, "Downloaded File", "bytes", n, "md5", md5) - - if slices.Compare[[]byte](md5, t.FileMD5) != 0 { - return fmt.Errorf("Downloaded File does not match md5") - } - - return nil - }() - + err := downloadFile(ctx, l, conf.Worker.Address, path, t) if err != nil { l.ErrorContext(ctx, "File Download Failed", "err", err) return } - l.InfoContext(ctx, "Running ffmpeg", "args", data.Command.GetArgs()) - cmd := exec.CommandContext(ctx, conf.Worker.FFmpegPath, data.Command.GetArgs()...) - - var wg sync.WaitGroup - - stdout, err := cmd.StdoutPipe() + err = runFfmpegCommand(ctx, l, conf, data.Command) if err != nil { - l.ErrorContext(ctx, "Error getting StdoutPipe", "err", err) - return - } - - wg.Add(1) - outScanner := bufio.NewScanner(stdout) - outScanner.Split(scanLines) - go func() { - for outScanner.Scan() { - l.InfoContext(ctx, outScanner.Text()) - } - wg.Done() - }() - - stderr, err := cmd.StderrPipe() - if err != nil { - l.ErrorContext(ctx, "Error getting StderrPipe", "err", err) - return - } - wg.Add(1) - errScanner := bufio.NewScanner(stderr) - errScanner.Split(scanLines) - go func() { - for errScanner.Scan() { - l.InfoContext(ctx, errScanner.Text()) - } - wg.Done() - }() - - err = cmd.Start() - if err != nil { - l.ErrorContext(ctx, "Error Starting ffmpeg", "err", err) - return - } - - wg.Wait() - - err = cmd.Wait() - if err != nil { - l.ErrorContext(ctx, "Error Running ffmpeg", "err", err) + l.ErrorContext(ctx, "FFmpeg Failed", "err", err) return }