From c7df166a2d763a8f51dd24467d1c2f1a6c7ab7f5 Mon Sep 17 00:00:00 2001 From: Samuel Lorch Date: Sat, 11 May 2024 00:43:58 +0200 Subject: [PATCH] worker use md5 --- task/healthcheck.go | 37 +++++++++++++++++++++++++++++++++---- task/task.go | 18 ++++++++++++------ 2 files changed, 45 insertions(+), 10 deletions(-) diff --git a/task/healthcheck.go b/task/healthcheck.go index 5d2fa3e..aa2e1f7 100644 --- a/task/healthcheck.go +++ b/task/healthcheck.go @@ -4,12 +4,14 @@ import ( "bufio" "bytes" "context" + "crypto/md5" "fmt" "io" "net/http" "os" "os/exec" "path/filepath" + "slices" "sync" "git.lastassault.de/speatzle/morffix/config" @@ -71,7 +73,7 @@ func RunHealthCheck(conf config.Config, t *types.Task, data types.HealthCheckDat } }() - l.InfoContext(ctx, "Starting File Download", "task_id", t.ID, "file_id", t.FileID, "path", path) + l.InfoContext(ctx, "Starting File Download", "task_id", t.ID, "file_id", t.FileID, "path", path, "md5", t.FileMD5) err := func() error { out, err := os.Create(path) @@ -79,20 +81,47 @@ func RunHealthCheck(conf config.Config, t *types.Task, data types.HealthCheckDat return fmt.Errorf("Creating File: %w", err) } defer out.Close() - resp, err := http.Get(fmt.Sprintf("%v/files/%v", conf.Worker.Address, t.FileID)) + + req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%v/files/%v", conf.Worker.Address, t.FileID), nil) + if err != nil { + return fmt.Errorf("New Request: %w", err) + } + + req.Close = true + + var client = &http.Client{ + Transport: &http.Transport{}, + } + resp, err := client.Do(req) if err != nil { return fmt.Errorf("Getting File: %w", err) } + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("Got HTTP Status Code: %v", resp.StatusCode) + } + + req.Close = true + // TODO Log at interval logs read + // Calculate hash and write file at the same time + hash := md5.New() + tr := io.TeeReader(resp.Body, hash) + defer resp.Body.Close() - n, err := io.Copy(out, resp.Body) + n, err := io.Copy(out, tr) if err != nil { return fmt.Errorf("Reading File: %w", err) } - l.InfoContext(ctx, "Downloaded File", "bytes", n) + md5 := hash.Sum(nil) + + l.InfoContext(ctx, "Downloaded File", "bytes", n, "md5", md5) + + if slices.Compare[[]byte](md5, t.FileMD5) != 0 { + return fmt.Errorf("Downloaded File does not match md5") + } return nil }() diff --git a/task/task.go b/task/task.go index df7c51a..5dd00bc 100644 --- a/task/task.go +++ b/task/task.go @@ -18,9 +18,10 @@ func StartTask(conf config.Config, data types.TaskStart) error { defer taskMutex.Unlock() tasks[data.ID] = &types.Task{ - ID: data.ID, - Type: data.Type, - FileID: data.FileID, + ID: data.ID, + Type: data.Type, + FileID: data.FileID, + FileMD5: data.FileMD5, } switch data.Type { @@ -34,9 +35,14 @@ func StartTask(conf config.Config, data types.TaskStart) error { tasks[data.ID].Status = constants.TASK_STATUS_RUNNING go func() { defer func() { - if tasks[data.ID].Status == constants.TASK_STATUS_RUNNING { - tasks[data.ID].Status = constants.TASK_STATUS_FAILED - tasks[data.ID].Log = append(tasks[data.ID].Log, "Task Status Set to Failed by defer") + taskMutex.Lock() + defer taskMutex.Unlock() + t, ok := tasks[data.ID] + if ok { + if t.Status == constants.TASK_STATUS_RUNNING { + t.Status = constants.TASK_STATUS_FAILED + t.Log = append(t.Log, "Task Status Set to Failed by defer") + } } }() RunHealthCheck(conf, tasks[data.ID], hData)