morffix/server/scan.go
Samuel Lorch e47a254cad
All checks were successful
/ release (push) Successful in 42s
Improve Scan, Seperate Hashing
2024-07-07 01:12:01 +02:00

270 lines
7.1 KiB
Go

package server
import (
"bytes"
"context"
"errors"
"fmt"
"log/slog"
"net/http"
"os"
"path/filepath"
"slices"
"strconv"
"sync"
"time"
"git.lastassault.de/speatzle/morffix/constants"
"github.com/jackc/pgx/v5"
)
var videoFileExtensions = []string{".mkv", ".mp4", ".webm", ".flv", ".avi"}
func handleScan(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
scanStatus(w, r)
return
}
id, err := strconv.Atoi(r.FormValue("id"))
if err != nil {
http.Error(w, "Parsing ID", http.StatusBadRequest)
return
}
var name string
var path string
var enabled bool
err = db.QueryRow(r.Context(), "SELECT name, path, enable FROM libraries WHERE id = $1", id).Scan(&name, &path, &enabled)
if err != nil {
slog.ErrorContext(r.Context(), "Get Library", "err", err)
http.Error(w, "Error Get Library: "+err.Error(), http.StatusInternalServerError)
return
}
scanCtx := context.Background()
go scanLibrary(scanCtx, id)
message := "Scan Started"
buf := bytes.Buffer{}
err = templates.ExecuteTemplate(&buf, constants.MESSAGE_TEMPLATE_NAME, message)
if err != nil {
slog.ErrorContext(r.Context(), "Executing Library Template", "err", err)
http.Error(w, "Error Executing Template: "+err.Error(), http.StatusInternalServerError)
return
}
_, err = w.Write(buf.Bytes())
if err != nil {
slog.ErrorContext(r.Context(), "Writing http Response", "err", err)
}
}
func scanStatus(w http.ResponseWriter, r *http.Request) {
message := "TODO"
buf := bytes.Buffer{}
err := templates.ExecuteTemplate(&buf, constants.MESSAGE_TEMPLATE_NAME, message)
if err != nil {
slog.ErrorContext(r.Context(), "Executing Library Template", "err", err)
http.Error(w, "Error Executing Template: "+err.Error(), http.StatusInternalServerError)
return
}
_, err = w.Write(buf.Bytes())
if err != nil {
slog.ErrorContext(r.Context(), "Writing http Response", "err", err)
}
}
func manageScan(stop chan bool) {
scanTicker := time.NewTicker(time.Minute)
hashTicker := time.NewTicker(time.Second)
scanRunning := false
hashRunning := false
ctx, cancel := context.WithCancel(context.Background())
for {
select {
case <-scanTicker.C:
if scanRunning {
continue
}
scanRunning = true
go func() {
defer func() {
scanRunning = false
}()
rows, err := db.Query(ctx, "SELECT id FROM libraries WHERE enable = true")
if err != nil {
slog.ErrorContext(ctx, "Error getting Enabled Libraries for Scan", "err", err)
return
}
libraries, err := pgx.CollectRows[int](rows, pgx.RowTo[int])
if err != nil {
slog.ErrorContext(ctx, "Error Collecting Enabled Libraries for Scan", "err", err)
return
}
for _, l := range libraries {
scanLibrary(ctx, l)
}
}()
case <-hashTicker.C:
if hashRunning {
continue
}
hashRunning = true
go func() {
defer func() {
hashRunning = false
}()
var fileID uint
err := db.QueryRow(ctx, "SELECT id FROM files WHERE status = $1 OR status = $2 LIMIT 1", constants.FILE_STATUS_CHANGED, constants.FILE_STATUS_NEW).Scan(&fileID)
if err != nil {
if !errors.Is(err, pgx.ErrNoRows) {
slog.ErrorContext(ctx, "Error Getting Files for Hashing", "err", err)
}
return
}
err = hashFile(ctx, fileID)
if err != nil {
slog.ErrorContext(ctx, "Error Hashing File", "err", err)
}
}()
case <-stop:
cancel()
return
}
}
}
var scanLock sync.Mutex
func scanLibrary(ctx context.Context, id int) {
slog.InfoContext(ctx, "Acquiring Scan Lock...")
scanLock.Lock()
defer scanLock.Unlock()
slog.InfoContext(ctx, "Starting Scan", "id", id)
var name string
var lpath string
var enabled bool
err := db.QueryRow(ctx, "SELECT name, path, enable FROM libraries WHERE id = $1", id).Scan(&name, &lpath, &enabled)
if err != nil {
slog.ErrorContext(ctx, "Get Library", "err", err)
return
}
if !enabled {
slog.ErrorContext(ctx, "Scan Aborted, Library not Enabled", "id", id)
return
}
dirInfo, err := os.Stat(lpath)
if err != nil {
slog.ErrorContext(ctx, "Stating Library Path", "id", id, "path", lpath, "err", err)
return
}
if !dirInfo.IsDir() {
slog.ErrorContext(ctx, "Library Path is not a Folder", "id", id, "path", lpath)
return
}
tx, err := db.Begin(ctx)
if err != nil {
slog.ErrorContext(ctx, "Begin Transaction", "err", err)
return
}
defer tx.Rollback(ctx)
slog.InfoContext(ctx, "Checking Files...", "id", id, "path", lpath)
// Mark all Files as Missing
_, err = tx.Exec(ctx, "UPDATE files SET status = $2 where library_id = $1", id, constants.FILE_STATUS_MISSING)
if err != nil {
slog.ErrorContext(ctx, "Setting Missing Status", "err", err)
return
}
err = filepath.Walk(lpath,
func(fullPath string, info os.FileInfo, err error) error {
if errors.Is(err, os.ErrPermission) {
slog.WarnContext(ctx, "Permission Denied While Scanning File", "path", fullPath)
return nil
} else if err != nil {
return err
}
if info.IsDir() {
// We don't care about folders
return nil
}
if !slices.Contains(videoFileExtensions, filepath.Ext(fullPath)) {
slog.InfoContext(ctx, "Skipping non video file", "path", fullPath)
return nil
}
fPath, err := filepath.Rel(lpath, fullPath)
if err != nil {
return fmt.Errorf("Getting Relative Path: %w", err)
}
var fileID int
var oldSize uint
var oldModTime time.Time
err = tx.QueryRow(ctx, "SELECT id, size, mod_time FROM files WHERE library_id = $1 AND path = $2", id, fPath).Scan(&fileID, &oldSize, &oldModTime)
if errors.Is(err, pgx.ErrNoRows) {
// File Does not Exist Yet
slog.InfoContext(ctx, "File is New", "path", fullPath)
_, err = tx.Exec(ctx, "INSERT INTO files (library_id, path, size, status, health) VALUES ($1, $2, $3, $4, $5)", id, fPath, info.Size(), constants.FILE_STATUS_NEW, constants.FILE_HEALTH_UNKNOWN)
if err != nil {
return fmt.Errorf("Add New File to DB: %w", err)
}
return nil
} else if err != nil {
return fmt.Errorf("Getting File: %w", err)
}
// Remove Timezone and Round to nearest Second
newModTime := info.ModTime().UTC().Round(time.Second)
// File Already Exists, Check if it has been changed
if newModTime.Equal(oldModTime) && uint(info.Size()) == oldSize {
slog.Debug("File stayed the same", "id", fileID)
_, err = tx.Exec(ctx, "UPDATE files SET status = $2 WHERE id = $1", fileID, constants.FILE_STATUS_EXISTS)
if err != nil {
return fmt.Errorf("Updating Non Changed File in DB: %w", err)
}
} else {
slog.InfoContext(ctx, "File Has Changed", "path", fullPath, "old_mod_time", oldModTime, "new_mod_time", newModTime, "old_size", oldSize, "new_size", info.Size())
_, err = tx.Exec(ctx, "UPDATE files SET size = $2, status = $3, health = $4, mod_time = $5 WHERE id = $1", fileID, info.Size(), constants.FILE_STATUS_CHANGED, constants.FILE_HEALTH_UNKNOWN, newModTime)
if err != nil {
return fmt.Errorf("Updating Changed File in DB: %w", err)
}
}
return nil
})
if err != nil {
slog.ErrorContext(ctx, "Error Walking Library", "err", err)
return
}
err = tx.Commit(ctx)
if err != nil {
slog.ErrorContext(ctx, "Error Committing Changes", "err", err)
return
}
slog.InfoContext(ctx, "Scan Done", "id", id)
}