worker use md5
This commit is contained in:
parent
981b745c9e
commit
c7df166a2d
2 changed files with 45 additions and 10 deletions
|
@ -4,12 +4,14 @@ import (
|
|||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/md5"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"slices"
|
||||
"sync"
|
||||
|
||||
"git.lastassault.de/speatzle/morffix/config"
|
||||
|
@ -71,7 +73,7 @@ func RunHealthCheck(conf config.Config, t *types.Task, data types.HealthCheckDat
|
|||
}
|
||||
}()
|
||||
|
||||
l.InfoContext(ctx, "Starting File Download", "task_id", t.ID, "file_id", t.FileID, "path", path)
|
||||
l.InfoContext(ctx, "Starting File Download", "task_id", t.ID, "file_id", t.FileID, "path", path, "md5", t.FileMD5)
|
||||
|
||||
err := func() error {
|
||||
out, err := os.Create(path)
|
||||
|
@ -79,20 +81,47 @@ func RunHealthCheck(conf config.Config, t *types.Task, data types.HealthCheckDat
|
|||
return fmt.Errorf("Creating File: %w", err)
|
||||
}
|
||||
defer out.Close()
|
||||
resp, err := http.Get(fmt.Sprintf("%v/files/%v", conf.Worker.Address, t.FileID))
|
||||
|
||||
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%v/files/%v", conf.Worker.Address, t.FileID), nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("New Request: %w", err)
|
||||
}
|
||||
|
||||
req.Close = true
|
||||
|
||||
var client = &http.Client{
|
||||
Transport: &http.Transport{},
|
||||
}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Getting File: %w", err)
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf("Got HTTP Status Code: %v", resp.StatusCode)
|
||||
}
|
||||
|
||||
req.Close = true
|
||||
|
||||
// TODO Log at interval logs read
|
||||
|
||||
// Calculate hash and write file at the same time
|
||||
hash := md5.New()
|
||||
tr := io.TeeReader(resp.Body, hash)
|
||||
|
||||
defer resp.Body.Close()
|
||||
n, err := io.Copy(out, resp.Body)
|
||||
n, err := io.Copy(out, tr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Reading File: %w", err)
|
||||
}
|
||||
|
||||
l.InfoContext(ctx, "Downloaded File", "bytes", n)
|
||||
md5 := hash.Sum(nil)
|
||||
|
||||
l.InfoContext(ctx, "Downloaded File", "bytes", n, "md5", md5)
|
||||
|
||||
if slices.Compare[[]byte](md5, t.FileMD5) != 0 {
|
||||
return fmt.Errorf("Downloaded File does not match md5")
|
||||
}
|
||||
|
||||
return nil
|
||||
}()
|
||||
|
|
18
task/task.go
18
task/task.go
|
@ -18,9 +18,10 @@ func StartTask(conf config.Config, data types.TaskStart) error {
|
|||
defer taskMutex.Unlock()
|
||||
|
||||
tasks[data.ID] = &types.Task{
|
||||
ID: data.ID,
|
||||
Type: data.Type,
|
||||
FileID: data.FileID,
|
||||
ID: data.ID,
|
||||
Type: data.Type,
|
||||
FileID: data.FileID,
|
||||
FileMD5: data.FileMD5,
|
||||
}
|
||||
|
||||
switch data.Type {
|
||||
|
@ -34,9 +35,14 @@ func StartTask(conf config.Config, data types.TaskStart) error {
|
|||
tasks[data.ID].Status = constants.TASK_STATUS_RUNNING
|
||||
go func() {
|
||||
defer func() {
|
||||
if tasks[data.ID].Status == constants.TASK_STATUS_RUNNING {
|
||||
tasks[data.ID].Status = constants.TASK_STATUS_FAILED
|
||||
tasks[data.ID].Log = append(tasks[data.ID].Log, "Task Status Set to Failed by defer")
|
||||
taskMutex.Lock()
|
||||
defer taskMutex.Unlock()
|
||||
t, ok := tasks[data.ID]
|
||||
if ok {
|
||||
if t.Status == constants.TASK_STATUS_RUNNING {
|
||||
t.Status = constants.TASK_STATUS_FAILED
|
||||
t.Log = append(t.Log, "Task Status Set to Failed by defer")
|
||||
}
|
||||
}
|
||||
}()
|
||||
RunHealthCheck(conf, tasks[data.ID], hData)
|
||||
|
|
Loading…
Add table
Reference in a new issue