Use database status and health, calculate md5

This commit is contained in:
Samuel Lorch 2024-05-11 00:43:26 +02:00
parent 806f6e7e61
commit 78d818b8d1
7 changed files with 112 additions and 37 deletions

View file

@ -52,7 +52,7 @@ func handleIndex(w http.ResponseWriter, r *http.Request) {
}()
var size int
err := db.QueryRow(r.Context(), "SELECT SUM(size) AS size FROM files WHERE missing = $1", false).Scan(&size)
err := db.QueryRow(r.Context(), "SELECT SUM(size) AS size FROM files WHERE status = $1", constants.FILE_STATUS_EXISTS).Scan(&size)
if err != nil {
size = 0
}

View file

@ -2,6 +2,7 @@ package server
import (
"bytes"
"fmt"
"log/slog"
"net/http"
@ -11,14 +12,25 @@ import (
type LibraryData struct {
Library Library
Files []File
Files []FileDisplay
}
type File struct {
ID int `db:"id"`
Path string `db:"path"`
Size int64 `db:"size"`
Missing bool `db:"missing"`
ID int `db:"id"`
Path string `db:"path"`
Size int64 `db:"size"`
Status constants.FileStatus `db:"status"`
Health constants.FileHealth `db:"health"`
MD5 []byte `db:"md5"`
}
type FileDisplay struct {
ID int
Path string
Size int64
Status string
Health string
MD5 string
}
func handleLibrary(w http.ResponseWriter, r *http.Request) {
@ -50,7 +62,7 @@ func handleLibrary(w http.ResponseWriter, r *http.Request) {
// TODO
}
rows, err := db.Query(r.Context(), "SELECT id, path, size, missing FROM files where library_id = $1", id)
rows, err := db.Query(r.Context(), "SELECT id, path, size, status, health, md5 FROM files where library_id = $1", id)
if err != nil {
slog.ErrorContext(r.Context(), "Query Files", "err", err)
http.Error(w, "Error Query Files: "+err.Error(), http.StatusInternalServerError)
@ -62,7 +74,17 @@ func handleLibrary(w http.ResponseWriter, r *http.Request) {
http.Error(w, "Error Query Files: "+err.Error(), http.StatusInternalServerError)
return
}
data.Files = files
for i := range files {
data.Files = append(data.Files, FileDisplay{
ID: files[i].ID,
Path: files[i].Path,
Size: files[i].Size,
Status: files[i].Status.String(),
Health: files[i].Health.String(),
MD5: fmt.Sprintf("%x", files[i].MD5),
})
}
buf := bytes.Buffer{}
err = templates.ExecuteTemplate(&buf, constants.LIBRARY_TEMPLATE_NAME, data)

View file

@ -3,17 +3,22 @@ package server
import (
"bytes"
"context"
"crypto/md5"
"errors"
"fmt"
"io"
"log/slog"
"net/http"
"os"
"path/filepath"
"slices"
"git.lastassault.de/speatzle/morffix/constants"
"github.com/jackc/pgx/v5"
)
var videoFileExtensions = []string{".mkv", ".mp4", ".webm", ".flv", ".avi"}
func handleScan(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
scanStatus(w, r)
@ -110,7 +115,7 @@ func scan(ctx context.Context, id string) {
slog.InfoContext(ctx, "Checking Files...", "id", id, "path", lpath)
// Mark all Files as Missing
_, err = tx.Exec(ctx, "UPDATE files SET missing = true where library_id = $1", id)
_, err = tx.Exec(ctx, "UPDATE files SET status = $2 where library_id = $1", id, constants.FILE_STATUS_MISSING)
if err != nil {
slog.ErrorContext(ctx, "Setting Missing Status", "err", err)
return
@ -129,16 +134,35 @@ func scan(ctx context.Context, id string) {
// We don't care about folders
return nil
}
slog.InfoContext(ctx, "Scanning File", "path", path, "size", info.Size())
if !slices.Contains(videoFileExtensions, filepath.Ext(path)) {
slog.InfoContext(ctx, "Skipping non video file", "path", path)
return nil
}
slog.InfoContext(ctx, "Hashing File", "path", path, "size", info.Size())
file, err := os.Open(path)
if err != nil {
return fmt.Errorf("Opening File: %w", err)
}
hash := md5.New()
if _, err := io.Copy(hash, file); err != nil {
return fmt.Errorf("Reading File: %w", err)
}
newMD5 := hash.Sum(nil)
slog.InfoContext(ctx, "File MD5", "path", path, "size", info.Size(), "md5", newMD5)
var fileID int
err = tx.QueryRow(ctx, "SELECT id FROM files WHERE library_id = $1 AND path = $2", id, path).Scan(&fileID)
var oldMD5 []byte
var health constants.FileHealth
err = tx.QueryRow(ctx, "SELECT id, md5, health FROM files WHERE library_id = $1 AND path = $2", id, path).Scan(&fileID, &oldMD5, &health)
if errors.Is(err, pgx.ErrNoRows) {
// File Does not Exist Yet
// TODO check file extension and exclude
slog.InfoContext(ctx, "File is New", "path", path)
_, err = tx.Exec(ctx, "INSERT INTO files (library_id, path, size, missing) VALUES ($1, $2, $3, $4)", id, path, info.Size(), false)
_, err = tx.Exec(ctx, "INSERT INTO files (library_id, path, size, status, health, md5) VALUES ($1, $2, $3, $4, $5, $6)", id, path, info.Size(), constants.FILE_STATUS_EXISTS, constants.FILE_HEALTH_UNKNOWN, newMD5)
if err != nil {
return fmt.Errorf("Add New File to DB: %w", err)
}
@ -146,8 +170,13 @@ func scan(ctx context.Context, id string) {
} else if err != nil {
return fmt.Errorf("Getting File: %w", err)
}
// File Exists so update Size and missing Status
_, err = tx.Exec(ctx, "UPDATE files SET size = $1, missing = $2 WHERE id = $3", info.Size(), false, fileID)
if slices.Compare[[]byte](newMD5, oldMD5) != 0 {
// File has changed on disk so reset health
health = constants.FILE_HEALTH_UNKNOWN
}
// File Exists so update Size, status and hash
_, err = tx.Exec(ctx, "UPDATE files SET size = $2, status = $3, health = $4, md5 = $5 WHERE id = $1", fileID, info.Size(), constants.FILE_STATUS_EXISTS, health, newMD5)
if err != nil {
return fmt.Errorf("Updating File in DB: %w", err)
}

View file

@ -141,7 +141,7 @@ func createTask(ctx context.Context, r *http.Request) error {
typ := r.FormValue("type")
slog.Info("Got Task Create", "library", library, "type", typ)
rows, err := db.Query(r.Context(), "SELECT id, path, size, missing FROM files where library_id = $1 AND missing = $2", library, false)
rows, err := db.Query(r.Context(), "SELECT id, path, size, status, health, md5 FROM files where library_id = $1 AND status = $2", library, constants.FILE_STATUS_EXISTS)
if err != nil {
return fmt.Errorf("Query Files: %w", err)
}
@ -183,14 +183,15 @@ func createTask(ctx context.Context, r *http.Request) error {
}
type QueuedTask struct {
ID int
Type int
FileID int `json:"file_id"`
Data json.RawMessage
ID int
Type int
FileID int `json:"file_id"`
FileMD5 []byte `json:"file_md5" db:"md5"`
Data json.RawMessage
}
func assignQueuedTasks(ctx context.Context) error {
rows, err := db.Query(ctx, "SELECT id, type, file_id, data FROM tasks WHERE status = $1", constants.TASK_STATUS_QUEUED)
rows, err := db.Query(ctx, "SELECT t.id as id, t.type as type, t.file_id as file_id, f.md5 as md5, t.data as data FROM tasks t INNER JOIN files f ON f.id = t.file_id WHERE t.status = $1", constants.TASK_STATUS_QUEUED)
if err != nil {
return fmt.Errorf("Query Queued Tasks: %w", err)
}
@ -238,10 +239,11 @@ func assignQueuedTasks(ctx context.Context) error {
}
taskStart := types.TaskStart{
ID: queuedTasks[lastAssigned].ID,
Type: queuedTasks[lastAssigned].Type,
FileID: queuedTasks[lastAssigned].FileID,
Data: queuedTasks[lastAssigned].Data,
ID: queuedTasks[lastAssigned].ID,
Type: queuedTasks[lastAssigned].Type,
FileID: queuedTasks[lastAssigned].FileID,
FileMD5: queuedTasks[lastAssigned].FileMD5,
Data: queuedTasks[lastAssigned].Data,
}
_, err = rpcServer.Call(ctx, Workers[i].Conn, "task-start", taskStart, nil)

View file

@ -167,6 +167,7 @@ func killDeadWorkers() {
func updateWorkerTaskStatus(ctx context.Context) {
var wg sync.WaitGroup
// TODO figure out why this locks up on worker status error
func() {
WorkersMutex.Lock()
defer WorkersMutex.Unlock()
@ -196,7 +197,13 @@ func updateWorkerTaskStatus(ctx context.Context) {
_, err := rpcServer.Call(ctx, w.Conn, "task-status", tsr, &ts)
if err != nil {
slog.ErrorContext(ctx, "Error Getting Task Status", "err", err)
// TODO Mark Task as Unknown?
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2 WHERE id = $1", ts.Task.ID, constants.TASK_STATUS_UNKNOWN)
if err != nil {
slog.ErrorContext(ctx, "Error Updating Unknown Task Status", "err", err)
return
}
return
}
@ -213,14 +220,19 @@ func updateWorkerTaskStatus(ctx context.Context) {
slog.ErrorContext(ctx, "Error Deleting Finished Task From Worker", "err", err)
return
}
// TODO Update file health for healthcheck tasks
}
}()
}
// TODO Handle tasks with status unkown assigned to this worker
// maybe requeue after 5 minutes?
} else {
// TODO wait for 5 minutes for worker to reconnect
// Set Task Status to Unknown for Unfinished Tasks which where assigned to this not connected worker, right now they just get stuck
}
}
}()

View file

@ -11,7 +11,9 @@
<th>ID</th>
<th>Path</th>
<th>Size</th>
<th>Missing</th>
<th>Status</th>
<th>Health</th>
<th>MD5</th>
</tr>
{{range $f := .Files}}
<tr>
@ -25,7 +27,13 @@
{{ $f.Size }}
</td>
<td>
{{ $f.Missing }}
{{ $f.Status }}
</td>
<td>
{{ $f.Health }}
</td>
<td>
{{ $f.MD5 }}
</td>
</tr>
{{end}}

View file

@ -7,18 +7,20 @@ import (
)
type TaskStart struct {
ID int `json:"id"`
FileID int `json:"file_id"`
Type int `json:"type"`
Data json.RawMessage
ID int `json:"id"`
FileID int `json:"file_id"`
FileMD5 []byte `json:"file_md5"`
Type int `json:"type"`
Data json.RawMessage
}
type Task struct {
ID int `json:"id"`
FileID int `json:"file_id"`
Type int `json:"type"`
Status constants.TaskStatus `json:"status"`
Log []string `json:"log"`
ID int `json:"id"`
FileID int `json:"file_id"`
FileMD5 []byte `json:"md5"`
Type int `json:"type"`
Status constants.TaskStatus `json:"status"`
Log []string `json:"log"`
}
type TaskStatus struct {