diff --git a/server/index.go b/server/index.go index b736a00..d755dd8 100644 --- a/server/index.go +++ b/server/index.go @@ -52,7 +52,7 @@ func handleIndex(w http.ResponseWriter, r *http.Request) { }() var size int - err := db.QueryRow(r.Context(), "SELECT SUM(size) AS size FROM files WHERE missing = $1", false).Scan(&size) + err := db.QueryRow(r.Context(), "SELECT SUM(size) AS size FROM files WHERE status = $1", constants.FILE_STATUS_EXISTS).Scan(&size) if err != nil { size = 0 } diff --git a/server/library.go b/server/library.go index ae29948..690c7f0 100644 --- a/server/library.go +++ b/server/library.go @@ -2,6 +2,7 @@ package server import ( "bytes" + "fmt" "log/slog" "net/http" @@ -11,14 +12,25 @@ import ( type LibraryData struct { Library Library - Files []File + Files []FileDisplay } type File struct { - ID int `db:"id"` - Path string `db:"path"` - Size int64 `db:"size"` - Missing bool `db:"missing"` + ID int `db:"id"` + Path string `db:"path"` + Size int64 `db:"size"` + Status constants.FileStatus `db:"status"` + Health constants.FileHealth `db:"health"` + MD5 []byte `db:"md5"` +} + +type FileDisplay struct { + ID int + Path string + Size int64 + Status string + Health string + MD5 string } func handleLibrary(w http.ResponseWriter, r *http.Request) { @@ -50,7 +62,7 @@ func handleLibrary(w http.ResponseWriter, r *http.Request) { // TODO } - rows, err := db.Query(r.Context(), "SELECT id, path, size, missing FROM files where library_id = $1", id) + rows, err := db.Query(r.Context(), "SELECT id, path, size, status, health, md5 FROM files where library_id = $1", id) if err != nil { slog.ErrorContext(r.Context(), "Query Files", "err", err) http.Error(w, "Error Query Files: "+err.Error(), http.StatusInternalServerError) @@ -62,7 +74,17 @@ func handleLibrary(w http.ResponseWriter, r *http.Request) { http.Error(w, "Error Query Files: "+err.Error(), http.StatusInternalServerError) return } - data.Files = files + + for i := range files { + data.Files = append(data.Files, FileDisplay{ + ID: files[i].ID, + Path: files[i].Path, + Size: files[i].Size, + Status: files[i].Status.String(), + Health: files[i].Health.String(), + MD5: fmt.Sprintf("%x", files[i].MD5), + }) + } buf := bytes.Buffer{} err = templates.ExecuteTemplate(&buf, constants.LIBRARY_TEMPLATE_NAME, data) diff --git a/server/scan.go b/server/scan.go index d255826..1404a54 100644 --- a/server/scan.go +++ b/server/scan.go @@ -3,17 +3,22 @@ package server import ( "bytes" "context" + "crypto/md5" "errors" "fmt" + "io" "log/slog" "net/http" "os" "path/filepath" + "slices" "git.lastassault.de/speatzle/morffix/constants" "github.com/jackc/pgx/v5" ) +var videoFileExtensions = []string{".mkv", ".mp4", ".webm", ".flv", ".avi"} + func handleScan(w http.ResponseWriter, r *http.Request) { if r.Method == "GET" { scanStatus(w, r) @@ -110,7 +115,7 @@ func scan(ctx context.Context, id string) { slog.InfoContext(ctx, "Checking Files...", "id", id, "path", lpath) // Mark all Files as Missing - _, err = tx.Exec(ctx, "UPDATE files SET missing = true where library_id = $1", id) + _, err = tx.Exec(ctx, "UPDATE files SET status = $2 where library_id = $1", id, constants.FILE_STATUS_MISSING) if err != nil { slog.ErrorContext(ctx, "Setting Missing Status", "err", err) return @@ -129,16 +134,35 @@ func scan(ctx context.Context, id string) { // We don't care about folders return nil } - slog.InfoContext(ctx, "Scanning File", "path", path, "size", info.Size()) + + if !slices.Contains(videoFileExtensions, filepath.Ext(path)) { + slog.InfoContext(ctx, "Skipping non video file", "path", path) + return nil + } + slog.InfoContext(ctx, "Hashing File", "path", path, "size", info.Size()) + + file, err := os.Open(path) + if err != nil { + return fmt.Errorf("Opening File: %w", err) + } + + hash := md5.New() + if _, err := io.Copy(hash, file); err != nil { + return fmt.Errorf("Reading File: %w", err) + } + newMD5 := hash.Sum(nil) + + slog.InfoContext(ctx, "File MD5", "path", path, "size", info.Size(), "md5", newMD5) var fileID int - err = tx.QueryRow(ctx, "SELECT id FROM files WHERE library_id = $1 AND path = $2", id, path).Scan(&fileID) + var oldMD5 []byte + var health constants.FileHealth + err = tx.QueryRow(ctx, "SELECT id, md5, health FROM files WHERE library_id = $1 AND path = $2", id, path).Scan(&fileID, &oldMD5, &health) if errors.Is(err, pgx.ErrNoRows) { // File Does not Exist Yet - // TODO check file extension and exclude slog.InfoContext(ctx, "File is New", "path", path) - _, err = tx.Exec(ctx, "INSERT INTO files (library_id, path, size, missing) VALUES ($1, $2, $3, $4)", id, path, info.Size(), false) + _, err = tx.Exec(ctx, "INSERT INTO files (library_id, path, size, status, health, md5) VALUES ($1, $2, $3, $4, $5, $6)", id, path, info.Size(), constants.FILE_STATUS_EXISTS, constants.FILE_HEALTH_UNKNOWN, newMD5) if err != nil { return fmt.Errorf("Add New File to DB: %w", err) } @@ -146,8 +170,13 @@ func scan(ctx context.Context, id string) { } else if err != nil { return fmt.Errorf("Getting File: %w", err) } - // File Exists so update Size and missing Status - _, err = tx.Exec(ctx, "UPDATE files SET size = $1, missing = $2 WHERE id = $3", info.Size(), false, fileID) + + if slices.Compare[[]byte](newMD5, oldMD5) != 0 { + // File has changed on disk so reset health + health = constants.FILE_HEALTH_UNKNOWN + } + // File Exists so update Size, status and hash + _, err = tx.Exec(ctx, "UPDATE files SET size = $2, status = $3, health = $4, md5 = $5 WHERE id = $1", fileID, info.Size(), constants.FILE_STATUS_EXISTS, health, newMD5) if err != nil { return fmt.Errorf("Updating File in DB: %w", err) } diff --git a/server/task.go b/server/task.go index aafc7ee..fd3da92 100644 --- a/server/task.go +++ b/server/task.go @@ -141,7 +141,7 @@ func createTask(ctx context.Context, r *http.Request) error { typ := r.FormValue("type") slog.Info("Got Task Create", "library", library, "type", typ) - rows, err := db.Query(r.Context(), "SELECT id, path, size, missing FROM files where library_id = $1 AND missing = $2", library, false) + rows, err := db.Query(r.Context(), "SELECT id, path, size, status, health, md5 FROM files where library_id = $1 AND status = $2", library, constants.FILE_STATUS_EXISTS) if err != nil { return fmt.Errorf("Query Files: %w", err) } @@ -183,14 +183,15 @@ func createTask(ctx context.Context, r *http.Request) error { } type QueuedTask struct { - ID int - Type int - FileID int `json:"file_id"` - Data json.RawMessage + ID int + Type int + FileID int `json:"file_id"` + FileMD5 []byte `json:"file_md5" db:"md5"` + Data json.RawMessage } func assignQueuedTasks(ctx context.Context) error { - rows, err := db.Query(ctx, "SELECT id, type, file_id, data FROM tasks WHERE status = $1", constants.TASK_STATUS_QUEUED) + rows, err := db.Query(ctx, "SELECT t.id as id, t.type as type, t.file_id as file_id, f.md5 as md5, t.data as data FROM tasks t INNER JOIN files f ON f.id = t.file_id WHERE t.status = $1", constants.TASK_STATUS_QUEUED) if err != nil { return fmt.Errorf("Query Queued Tasks: %w", err) } @@ -238,10 +239,11 @@ func assignQueuedTasks(ctx context.Context) error { } taskStart := types.TaskStart{ - ID: queuedTasks[lastAssigned].ID, - Type: queuedTasks[lastAssigned].Type, - FileID: queuedTasks[lastAssigned].FileID, - Data: queuedTasks[lastAssigned].Data, + ID: queuedTasks[lastAssigned].ID, + Type: queuedTasks[lastAssigned].Type, + FileID: queuedTasks[lastAssigned].FileID, + FileMD5: queuedTasks[lastAssigned].FileMD5, + Data: queuedTasks[lastAssigned].Data, } _, err = rpcServer.Call(ctx, Workers[i].Conn, "task-start", taskStart, nil) diff --git a/server/worker.go b/server/worker.go index 17a44e4..05d8c20 100644 --- a/server/worker.go +++ b/server/worker.go @@ -167,6 +167,7 @@ func killDeadWorkers() { func updateWorkerTaskStatus(ctx context.Context) { var wg sync.WaitGroup + // TODO figure out why this locks up on worker status error func() { WorkersMutex.Lock() defer WorkersMutex.Unlock() @@ -196,7 +197,13 @@ func updateWorkerTaskStatus(ctx context.Context) { _, err := rpcServer.Call(ctx, w.Conn, "task-status", tsr, &ts) if err != nil { slog.ErrorContext(ctx, "Error Getting Task Status", "err", err) - // TODO Mark Task as Unknown? + + _, err = db.Exec(ctx, "UPDATE tasks SET status = $2 WHERE id = $1", ts.Task.ID, constants.TASK_STATUS_UNKNOWN) + if err != nil { + slog.ErrorContext(ctx, "Error Updating Unknown Task Status", "err", err) + return + } + return } @@ -213,14 +220,19 @@ func updateWorkerTaskStatus(ctx context.Context) { slog.ErrorContext(ctx, "Error Deleting Finished Task From Worker", "err", err) return } + + // TODO Update file health for healthcheck tasks } }() } // TODO Handle tasks with status unkown assigned to this worker + // maybe requeue after 5 minutes? } else { + // TODO wait for 5 minutes for worker to reconnect // Set Task Status to Unknown for Unfinished Tasks which where assigned to this not connected worker, right now they just get stuck + } } }() diff --git a/tmpl/library.tmpl b/tmpl/library.tmpl index ac17e3d..7b59ca8 100644 --- a/tmpl/library.tmpl +++ b/tmpl/library.tmpl @@ -11,7 +11,9 @@ ID Path Size - Missing + Status + Health + MD5 {{range $f := .Files}} @@ -25,7 +27,13 @@ {{ $f.Size }} - {{ $f.Missing }} + {{ $f.Status }} + + + {{ $f.Health }} + + + {{ $f.MD5 }} {{end}} diff --git a/types/task.go b/types/task.go index e47fce2..ee7ed44 100644 --- a/types/task.go +++ b/types/task.go @@ -7,18 +7,20 @@ import ( ) type TaskStart struct { - ID int `json:"id"` - FileID int `json:"file_id"` - Type int `json:"type"` - Data json.RawMessage + ID int `json:"id"` + FileID int `json:"file_id"` + FileMD5 []byte `json:"file_md5"` + Type int `json:"type"` + Data json.RawMessage } type Task struct { - ID int `json:"id"` - FileID int `json:"file_id"` - Type int `json:"type"` - Status constants.TaskStatus `json:"status"` - Log []string `json:"log"` + ID int `json:"id"` + FileID int `json:"file_id"` + FileMD5 []byte `json:"md5"` + Type int `json:"type"` + Status constants.TaskStatus `json:"status"` + Log []string `json:"log"` } type TaskStatus struct {