morffix/server/scan.go
Samuel Lorch 03eb3541a5
All checks were successful
/ release (push) Successful in 30s
Only hash files if library is enabled
2024-07-12 23:04:57 +02:00

294 lines
8.3 KiB
Go

package server
import (
"bytes"
"context"
"errors"
"fmt"
"log/slog"
"net/http"
"os"
"path/filepath"
"slices"
"sync"
"time"
"git.lastassault.de/speatzle/morffix/constants"
"git.lastassault.de/speatzle/morffix/workerpool"
"github.com/jackc/pgx/v5"
"gopkg.in/vansante/go-ffprobe.v2"
)
var videoFileExtensions = []string{".mkv", ".mp4", ".webm", ".flv", ".avi"}
func handleScan(w http.ResponseWriter, r *http.Request) {
http.Error(w, "Not Implemented", http.StatusNotImplemented)
/*
if r.Method == "GET" {
scanStatus(w, r)
return
}
id, err := strconv.Atoi(r.FormValue("id"))
if err != nil {
http.Error(w, "Parsing ID", http.StatusBadRequest)
return
}
var name string
var path string
var enabled bool
err = db.QueryRow(r.Context(), "SELECT name, path, enable FROM libraries WHERE id = $1", id).Scan(&name, &path, &enabled)
if err != nil {
slog.ErrorContext(r.Context(), "Get Library", "err", err)
http.Error(w, "Error Get Library: "+err.Error(), http.StatusInternalServerError)
return
}
scanCtx := context.Background()
go scanLibrary(scanCtx, id)
message := "Scan Started"
buf := bytes.Buffer{}
err = templates.ExecuteTemplate(&buf, constants.MESSAGE_TEMPLATE_NAME, message)
if err != nil {
slog.ErrorContext(r.Context(), "Executing Library Template", "err", err)
http.Error(w, "Error Executing Template: "+err.Error(), http.StatusInternalServerError)
return
}
_, err = w.Write(buf.Bytes())
if err != nil {
slog.ErrorContext(r.Context(), "Writing http Response", "err", err)
}
*/
}
func scanStatus(w http.ResponseWriter, r *http.Request) {
message := "TODO"
buf := bytes.Buffer{}
err := templates.ExecuteTemplate(&buf, constants.MESSAGE_TEMPLATE_NAME, message)
if err != nil {
slog.ErrorContext(r.Context(), "Executing Library Template", "err", err)
http.Error(w, "Error Executing Template: "+err.Error(), http.StatusInternalServerError)
return
}
_, err = w.Write(buf.Bytes())
if err != nil {
slog.ErrorContext(r.Context(), "Writing http Response", "err", err)
}
}
func manageScan(stop chan bool, scanPool *workerpool.WorkerPool, hashPool *workerpool.WorkerPool) {
scanTicker := time.NewTicker(time.Minute)
hashTicker := time.NewTicker(time.Second)
scanRunning := false
hashRunning := false
ctx, cancel := context.WithCancel(context.Background())
for {
select {
case <-scanTicker.C:
if scanRunning {
continue
}
scanRunning = true
go func() {
defer func() {
scanRunning = false
}()
rows, err := db.Query(ctx, "SELECT id FROM libraries WHERE enable = true")
if err != nil {
slog.ErrorContext(ctx, "Error getting Enabled Libraries for Scan", "err", err)
return
}
libraries, err := pgx.CollectRows[int](rows, pgx.RowTo[int])
if err != nil {
slog.ErrorContext(ctx, "Error Collecting Enabled Libraries for Scan", "err", err)
return
}
for _, l := range libraries {
scanLibrary(ctx, l, scanPool)
}
}()
case <-hashTicker.C:
if hashRunning {
continue
}
hashRunning = true
go func() {
defer func() {
hashRunning = false
}()
rows, err := db.Query(ctx, "SELECT f.id FROM files f INNER JOIN libraries l ON f.library_id = l.id WHERE l.enabled = true AND (f.status = $1 OR f.status = $2) LIMIT 10", constants.FILE_STATUS_CHANGED, constants.FILE_STATUS_NEW)
if err != nil {
if !errors.Is(err, pgx.ErrNoRows) {
slog.ErrorContext(ctx, "Error Getting Files for Hashing", "err", err)
}
// Nothing to do
return
}
files, err := pgx.CollectRows[uint](rows, pgx.RowTo[uint])
if err != nil {
slog.ErrorContext(ctx, "Collect File Rows", "err", err)
return
}
for _, f := range files {
hashPool.QueueJob(func(ctx context.Context) {
err = hashFile(ctx, f)
if err != nil {
slog.ErrorContext(ctx, "Error Hashing File", "err", err)
}
})
}
// Wait for all jobs to finish since we don't track which ones are in progress and don't want to hash the same file twice. Not ideal but better than just doing one at a time
hashPool.WaitForEmptyQueue()
}()
case <-stop:
cancel()
return
}
}
}
var scanLock sync.Mutex
func scanLibrary(ctx context.Context, id int, scanPool *workerpool.WorkerPool) {
slog.InfoContext(ctx, "Acquiring Scan Lock...")
scanLock.Lock()
defer scanLock.Unlock()
slog.InfoContext(ctx, "Starting Scan", "id", id)
var name string
var lpath string
var enabled bool
err := db.QueryRow(ctx, "SELECT name, path, enable FROM libraries WHERE id = $1", id).Scan(&name, &lpath, &enabled)
if err != nil {
slog.ErrorContext(ctx, "Get Library", "err", err)
return
}
if !enabled {
slog.ErrorContext(ctx, "Scan Aborted, Library not Enabled", "id", id)
return
}
dirInfo, err := os.Stat(lpath)
if err != nil {
slog.ErrorContext(ctx, "Stating Library Path", "id", id, "path", lpath, "err", err)
return
}
if !dirInfo.IsDir() {
slog.ErrorContext(ctx, "Library Path is not a Folder", "id", id, "path", lpath)
return
}
slog.InfoContext(ctx, "Checking Files...", "id", id, "path", lpath)
// TODO Work on getting missing status again
/*
// Mark all Files as Missing
_, err = tx.Exec(ctx, "UPDATE files SET status = $2 where library_id = $1", id, constants.FILE_STATUS_MISSING)
if err != nil {
slog.ErrorContext(ctx, "Setting Missing Status", "err", err)
return
}
*/
err = filepath.Walk(lpath,
func(fullPath string, info os.FileInfo, err error) error {
if errors.Is(err, os.ErrPermission) {
slog.WarnContext(ctx, "Permission Denied While Scanning File", "path", fullPath)
return nil
} else if err != nil {
return err
}
if info.IsDir() {
// We don't care about folders
return nil
}
if !slices.Contains(videoFileExtensions, filepath.Ext(fullPath)) {
slog.DebugContext(ctx, "Skipping non video file", "path", fullPath)
return nil
}
fPath, err := filepath.Rel(lpath, fullPath)
if err != nil {
return fmt.Errorf("Getting Relative Path: %w", err)
}
var fileID int
var oldSize uint
var oldModTime time.Time
// Remove Timezone and Round to nearest Second
newModTime := info.ModTime().UTC().Round(time.Second)
err = db.QueryRow(ctx, "SELECT id, size, mod_time FROM files WHERE library_id = $1 AND path = $2", id, fPath).Scan(&fileID, &oldSize, &oldModTime)
if errors.Is(err, pgx.ErrNoRows) {
// File Does not Exist Yet
scanPool.QueueJob(
func(ctx context.Context) {
slog.InfoContext(ctx, "File is New, Running FFProbe...", "path", fullPath)
ffprobeData, err := ffprobe.ProbeURL(ctx, fullPath)
if err != nil {
slog.ErrorContext(ctx, "ffprobe New File", "err", err)
}
slog.InfoContext(ctx, "ffprobe Done", "path", fullPath)
_, err = db.Exec(ctx, "INSERT INTO files (library_id, path, size, status, health, ffprobe_data, mod_time) VALUES ($1, $2, $3, $4, $5, $6, $7)", id, fPath, info.Size(), constants.FILE_STATUS_NEW, constants.FILE_HEALTH_UNKNOWN, ffprobeData, newModTime)
if err != nil {
slog.ErrorContext(ctx, "Add New File to DB", "err", err)
}
})
return nil
} else if err != nil {
return fmt.Errorf("Getting File: %w", err)
}
// File Already Exists, Check if it has been changed
if newModTime.Equal(oldModTime) && uint(info.Size()) == oldSize {
slog.Debug("File stayed the same", "id", fileID)
} else {
scanPool.QueueJob(func(ctx context.Context) {
slog.InfoContext(ctx, "File Has Changed", "path", fullPath, "old_mod_time", oldModTime, "new_mod_time", newModTime, "old_size", oldSize, "new_size", info.Size())
ffprobeData, err := ffprobe.ProbeURL(ctx, fullPath)
if err != nil {
slog.ErrorContext(ctx, "ffprobe Changed File", "err", err)
}
slog.InfoContext(ctx, "ffprobe Done", "path", fullPath)
_, err = db.Exec(ctx, "UPDATE files SET size = $2, status = $3, health = $4, mod_time = $5, ffprobe_data = $6 WHERE id = $1", fileID, info.Size(), constants.FILE_STATUS_CHANGED, constants.FILE_HEALTH_UNKNOWN, newModTime, ffprobeData)
if err != nil {
slog.ErrorContext(ctx, "Updating Changed File in DB", "err", err)
}
})
}
return nil
})
if err != nil {
slog.ErrorContext(ctx, "Error Walking Library", "err", err)
return
}
scanPool.WaitForEmptyQueue()
slog.InfoContext(ctx, "Scan Done", "id", id)
}