diff --git a/constants/constants.go b/constants/constants.go index 52ead95..1d34cf4 100644 --- a/constants/constants.go +++ b/constants/constants.go @@ -8,6 +8,7 @@ const WORKER_VERSION_HEADER = "morffix-version" const SHARED_SECRET_HEADER = "morffix-secret" const NAME_HEADER = "morffix-name" const UUID_HEADER = "morffix-uuid" +const HASH_HEADER = "morffix-hash" const INDEX_TEMPLATE_NAME = "index.tmpl" const LIBRARIES_TEMPLATE_NAME = "libraries.tmpl" @@ -16,6 +17,8 @@ const MESSAGE_TEMPLATE_NAME = "message.tmpl" const TASKS_TEMPLATE_NAME = "tasks.tmpl" const TASK_TEMPLATE_NAME = "task.tmpl" +const FORM_FILE_KEY = "file" + const ( TASK_TYPE_HEALTHCHECK = iota TASK_TYPE_TRANSCODE diff --git a/server/server.go b/server/server.go index af26ec2..c9af803 100644 --- a/server/server.go +++ b/server/server.go @@ -95,6 +95,7 @@ func Start(_conf config.Config, tmplFS embed.FS, staticFS embed.FS, migrationsFS mux.Handle("/static/", fs) mux.HandleFunc("/tasks", handleTasks) mux.HandleFunc("/files/{id}", handleFile) + mux.HandleFunc("/upload/{id}", handleUpload) mux.HandleFunc("/tasks/{id}", handleTask) mux.HandleFunc("/scan/{id}", handleScan) mux.HandleFunc("/libraries/{id}", handleLibrary) diff --git a/server/task.go b/server/task.go index a0365cf..6f41437 100644 --- a/server/task.go +++ b/server/task.go @@ -210,7 +210,7 @@ func createTask(ctx context.Context, r *http.Request) error { data = types.TranscodeData{Command: types.FFmpegCommand{ Args: []types.Arg{{Flag: "-stats"}, {Flag: "-v", Value: "error"}, {Flag: "-xerror"}}, InputFiles: []types.File{{Path: "input.mkv"}}, - OutputFiles: []types.File{{Path: "output.mkv", Arguments: []types.Arg{{Flag: "-f", Value: "null"}, {Flag: "-max_muxing_queue_size", Value: "9999"}}}}, + OutputFiles: []types.File{{Path: "output.mkv", Arguments: []types.Arg{{Flag: "-c:s", Value: "copy"}, {Flag: "-c:a", Value: "aac"}, {Flag: "-c:v", Value: "libsvtav1"}, {Flag: "-crf", Value: "35"}, {Flag: "-preset", Value: "8"}, {Flag: "-max_muxing_queue_size", Value: "9999"}}}}, }} } else { return fmt.Errorf("Unkown Task Type: %v", typ) diff --git a/server/upload.go b/server/upload.go new file mode 100644 index 0000000..3ba279d --- /dev/null +++ b/server/upload.go @@ -0,0 +1,122 @@ +package server + +import ( + "encoding/base64" + "errors" + "io" + "log/slog" + "net/http" + "os" + "path/filepath" + + "git.lastassault.de/speatzle/morffix/constants" +) + +func handleUpload(w http.ResponseWriter, r *http.Request) { + if r.Header.Get(constants.SHARED_SECRET_HEADER) != conf.SharedSecret { + w.WriteHeader(http.StatusUnauthorized) + return + } + + id := r.PathValue("id") + if id == "" { + http.Error(w, "No id", http.StatusBadRequest) + return + } + + var count int + err := db.QueryRow(r.Context(), "SELECT COUNT(*) FROM tasks WHERE file_id = $1 AND worker_id = $2", id, r.Header.Get(constants.UUID_HEADER)).Scan(&count) + if err != nil { + slog.ErrorContext(r.Context(), "Query Task Count", "err", err) + http.Error(w, "Error Query Task Count: "+err.Error(), http.StatusInternalServerError) + return + } + + if count < 1 { + slog.ErrorContext(r.Context(), "No Running Task for file", "id", id) + http.Error(w, "No Running Task for file: "+id, http.StatusBadRequest) + return + } + + hash, err := base64.StdEncoding.DecodeString(r.Header.Get(constants.HASH_HEADER)) + if err != nil { + slog.ErrorContext(r.Context(), "Decode Hash", "err", err) + http.Error(w, "Decode Hash: "+err.Error(), http.StatusBadRequest) + return + } + + var fPath string + var libraryID int + err = db.QueryRow(r.Context(), "SELECT path, library_id FROM files WHERE id = $1", id).Scan(&fPath, &libraryID) + if err != nil { + slog.ErrorContext(r.Context(), "Query File Path", "err", err) + http.Error(w, "Error Query File Path: "+err.Error(), http.StatusInternalServerError) + return + } + + var lPath string + var tPath string + var tReplace bool + err = db.QueryRow(r.Context(), "SELECT path, transcode_path, transcode_replace FROM libraries WHERE id = $1", libraryID).Scan(&lPath, &tPath, &tReplace) + if err != nil { + slog.ErrorContext(r.Context(), "Query Library Path", "err", err) + http.Error(w, "Error Query Library Path: "+err.Error(), http.StatusInternalServerError) + return + } + + var path string + if tReplace { + //path = filepath.Join(lPath, fPath) + slog.ErrorContext(r.Context(), "Replace mode is not implemented") + http.Error(w, "Replace mode is not implemented", http.StatusNotImplemented) + return + } else { + path = filepath.Join(tPath, fPath) + } + + slog.InfoContext(r.Context(), "Starting multipart Parsing for file", "id", id, "path", path) + + err = r.ParseMultipartForm(100 << 20) + if err != nil { + slog.ErrorContext(r.Context(), "Parse Multipart Form", "err", err) + http.Error(w, "Parse Multipart Form: "+err.Error(), http.StatusInternalServerError) + return + } + + srcFile, _, err := r.FormFile(constants.FORM_FILE_KEY) + if err != nil { + slog.ErrorContext(r.Context(), "Parse Multipart Form", "err", err) + http.Error(w, "Parse Multipart Form: "+err.Error(), http.StatusInternalServerError) + return + } + + src := http.MaxBytesReader(w, srcFile, 1000000<<20) + // MaxBytesReader closes the underlying io.Reader on its Close() is called + defer src.Close() + + out, err := os.Create(path) + if err != nil { + slog.ErrorContext(r.Context(), "Creating File", "err", err) + http.Error(w, "Creating File: "+err.Error(), http.StatusInternalServerError) + return + } + defer out.Close() + + written, err := io.Copy(out, src) + if err != nil { + var maxBytesError *http.MaxBytesError + if errors.As(err, &maxBytesError) { + slog.ErrorContext(r.Context(), "Parse Multipart Form", "err", err) + http.Error(w, "Parse Multipart Form: "+err.Error(), http.StatusRequestEntityTooLarge) + return + } + slog.ErrorContext(r.Context(), "Failed to write the uploaded content", "err", err) + http.Error(w, "Failed to write the uploaded content: "+err.Error(), http.StatusInternalServerError) + return + } + + // TODO check file integrity + _ = hash + + slog.InfoContext(r.Context(), "upload done", "written", written) +} diff --git a/task/hash.go b/task/hash.go new file mode 100644 index 0000000..66523d3 --- /dev/null +++ b/task/hash.go @@ -0,0 +1,21 @@ +package task + +import ( + "crypto/md5" + "fmt" + "io" + "os" +) + +func hashFile(path string) ([]byte, error) { + file, err := os.Open(path) + if err != nil { + return nil, fmt.Errorf("Opening File: %w", err) + } + + hash := md5.New() + if _, err := io.Copy(hash, file); err != nil { + return nil, fmt.Errorf("Reading File: %w", err) + } + return hash.Sum(nil), nil +} diff --git a/task/transcode.go b/task/transcode.go index d5f806c..6ddd781 100644 --- a/task/transcode.go +++ b/task/transcode.go @@ -68,7 +68,13 @@ func RunTranscode(conf config.Config, t *types.Task, data types.TranscodeData) { return } - err = uploadFile(ctx, l, conf.Worker.Address, dst_path, t) + hash, err := hashFile(dst_path) + if err != nil { + l.ErrorContext(ctx, "Generating Hash", "err", err) + return + } + + err = uploadFile(ctx, l, conf, dst_path, t, hash) if err != nil { l.ErrorContext(ctx, "File Upload Failed", "err", err) return diff --git a/task/upload.go b/task/upload.go index 5b204a4..ec988ae 100644 --- a/task/upload.go +++ b/task/upload.go @@ -2,12 +2,103 @@ package task import ( "context" + "encoding/base64" "fmt" + "io" "log/slog" + "mime/multipart" + "net/http" + "os" + "time" + "git.lastassault.de/speatzle/morffix/config" + "git.lastassault.de/speatzle/morffix/constants" "git.lastassault.de/speatzle/morffix/types" + "github.com/google/uuid" ) -func uploadFile(ctx context.Context, l *slog.Logger, address string, path string, t *types.Task) error { - return fmt.Errorf("File Upload not Implemented") +func uploadFile(ctx context.Context, l *slog.Logger, conf config.Config, path string, t *types.Task, hash []byte) error { + l.InfoContext(ctx, "Starting File Upload", "task_id", t.ID, "file_id", t.FileID, "path", path, "hash", hash) + + uuid, err := uuid.Parse(conf.Worker.ID) + if err != nil { + return fmt.Errorf("Cannot Parse ID: %w", err) + } + + r, w := io.Pipe() + m := multipart.NewWriter(w) + go func() { + defer w.Close() + defer m.Close() + part, err := m.CreateFormFile(constants.FORM_FILE_KEY, path) + if err != nil { + l.ErrorContext(ctx, "Creating Form File", "err", err) + return + } + file, err := os.Open(path) + if err != nil { + l.ErrorContext(ctx, "Opening File", "err", err) + return + } + defer file.Close() + if _, err = io.Copy(part, file); err != nil { + l.ErrorContext(ctx, "Copy Form File", "err", err) + return + } + }() + + cr := &countReader{Reader: r} + + stopProgress := make(chan bool, 1) + + go func() { + tik := time.NewTicker(time.Second) + + lastCount := 0 + + for { + select { + case <-tik.C: + speed := cr.n - lastCount + l.InfoContext(ctx, "Upload Progress", "bytes", cr.n, "lastCount", lastCount, "speed", speed) + lastCount = cr.n + case <-stopProgress: + tik.Stop() + } + } + }() + + defer func() { + stopProgress <- true + }() + + req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%v/upload/%v", conf.Worker.Address, t.FileID), cr) + if err != nil { + return fmt.Errorf("New Request: %w", err) + } + + req.Header.Add(constants.SHARED_SECRET_HEADER, conf.SharedSecret) + req.Header.Add(constants.NAME_HEADER, conf.Worker.Name) + req.Header.Add(constants.UUID_HEADER, uuid.String()) + req.Header.Add(constants.WORKER_VERSION_HEADER, constants.WORKER_VERSION) + req.Header.Add(constants.HASH_HEADER, base64.StdEncoding.EncodeToString(hash)) + req.Header.Add("Content-Type", "multipart/form-data; boundary=\""+m.Boundary()+"\"") + + var client = &http.Client{ + Transport: &http.Transport{}, + } + + l.InfoContext(ctx, "Starting Upload") + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("Uploading File: %w", err) + } + + l.InfoContext(ctx, "Upload Done") + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("Got HTTP Status Code: %v", resp.StatusCode) + } + + return nil }