294 lines
8.2 KiB
Go
294 lines
8.2 KiB
Go
package server
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"log/slog"
|
|
"net/http"
|
|
"os"
|
|
"path/filepath"
|
|
"slices"
|
|
"sync"
|
|
"time"
|
|
|
|
"git.lastassault.de/speatzle/morffix/constants"
|
|
"git.lastassault.de/speatzle/morffix/workerpool"
|
|
"github.com/jackc/pgx/v5"
|
|
"gopkg.in/vansante/go-ffprobe.v2"
|
|
)
|
|
|
|
var videoFileExtensions = []string{".mkv", ".mp4", ".webm", ".flv", ".avi"}
|
|
|
|
func handleScan(w http.ResponseWriter, r *http.Request) {
|
|
http.Error(w, "Not Implemented", http.StatusNotImplemented)
|
|
/*
|
|
if r.Method == "GET" {
|
|
scanStatus(w, r)
|
|
return
|
|
}
|
|
|
|
id, err := strconv.Atoi(r.FormValue("id"))
|
|
if err != nil {
|
|
http.Error(w, "Parsing ID", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
var name string
|
|
var path string
|
|
var enabled bool
|
|
err = db.QueryRow(r.Context(), "SELECT name, path, enable FROM libraries WHERE id = $1", id).Scan(&name, &path, &enabled)
|
|
if err != nil {
|
|
slog.ErrorContext(r.Context(), "Get Library", "err", err)
|
|
http.Error(w, "Error Get Library: "+err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
scanCtx := context.Background()
|
|
go scanLibrary(scanCtx, id)
|
|
|
|
message := "Scan Started"
|
|
|
|
buf := bytes.Buffer{}
|
|
err = templates.ExecuteTemplate(&buf, constants.MESSAGE_TEMPLATE_NAME, message)
|
|
if err != nil {
|
|
slog.ErrorContext(r.Context(), "Executing Library Template", "err", err)
|
|
http.Error(w, "Error Executing Template: "+err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
_, err = w.Write(buf.Bytes())
|
|
if err != nil {
|
|
slog.ErrorContext(r.Context(), "Writing http Response", "err", err)
|
|
}
|
|
*/
|
|
}
|
|
|
|
func scanStatus(w http.ResponseWriter, r *http.Request) {
|
|
message := "TODO"
|
|
|
|
buf := bytes.Buffer{}
|
|
err := templates.ExecuteTemplate(&buf, constants.MESSAGE_TEMPLATE_NAME, message)
|
|
if err != nil {
|
|
slog.ErrorContext(r.Context(), "Executing Library Template", "err", err)
|
|
http.Error(w, "Error Executing Template: "+err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
_, err = w.Write(buf.Bytes())
|
|
if err != nil {
|
|
slog.ErrorContext(r.Context(), "Writing http Response", "err", err)
|
|
}
|
|
}
|
|
|
|
func manageScan(stop chan bool, scanPool *workerpool.WorkerPool, hashPool *workerpool.WorkerPool) {
|
|
scanTicker := time.NewTicker(time.Minute)
|
|
hashTicker := time.NewTicker(time.Second)
|
|
scanRunning := false
|
|
hashRunning := false
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
for {
|
|
select {
|
|
case <-scanTicker.C:
|
|
if scanRunning {
|
|
continue
|
|
}
|
|
scanRunning = true
|
|
go func() {
|
|
defer func() {
|
|
scanRunning = false
|
|
}()
|
|
|
|
rows, err := db.Query(ctx, "SELECT id FROM libraries WHERE enable = true")
|
|
if err != nil {
|
|
slog.ErrorContext(ctx, "Error getting Enabled Libraries for Scan", "err", err)
|
|
return
|
|
}
|
|
|
|
libraries, err := pgx.CollectRows[int](rows, pgx.RowTo[int])
|
|
if err != nil {
|
|
slog.ErrorContext(ctx, "Error Collecting Enabled Libraries for Scan", "err", err)
|
|
return
|
|
}
|
|
|
|
for _, l := range libraries {
|
|
scanLibrary(ctx, l, scanPool)
|
|
}
|
|
}()
|
|
|
|
case <-hashTicker.C:
|
|
if hashRunning {
|
|
continue
|
|
}
|
|
|
|
hashRunning = true
|
|
go func() {
|
|
defer func() {
|
|
hashRunning = false
|
|
}()
|
|
|
|
rows, err := db.Query(ctx, "SELECT id FROM files WHERE status = $1 OR status = $2 LIMIT 10", constants.FILE_STATUS_CHANGED, constants.FILE_STATUS_NEW)
|
|
if err != nil {
|
|
if !errors.Is(err, pgx.ErrNoRows) {
|
|
slog.ErrorContext(ctx, "Error Getting Files for Hashing", "err", err)
|
|
}
|
|
// Nothing to do
|
|
return
|
|
}
|
|
|
|
files, err := pgx.CollectRows[uint](rows, pgx.RowTo[uint])
|
|
if err != nil {
|
|
slog.ErrorContext(ctx, "Collect File Rows", "err", err)
|
|
return
|
|
}
|
|
|
|
for _, f := range files {
|
|
hashPool.QueueJob(func(ctx context.Context) {
|
|
err = hashFile(ctx, f)
|
|
if err != nil {
|
|
slog.ErrorContext(ctx, "Error Hashing File", "err", err)
|
|
}
|
|
})
|
|
}
|
|
|
|
// Wait for all jobs to finish since we don't track which ones are in progress and don't want to hash the same file twice. Not ideal but better than just doing one at a time
|
|
hashPool.WaitForEmptyQueue()
|
|
}()
|
|
case <-stop:
|
|
cancel()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
var scanLock sync.Mutex
|
|
|
|
func scanLibrary(ctx context.Context, id int, scanPool *workerpool.WorkerPool) {
|
|
slog.InfoContext(ctx, "Acquiring Scan Lock...")
|
|
scanLock.Lock()
|
|
defer scanLock.Unlock()
|
|
slog.InfoContext(ctx, "Starting Scan", "id", id)
|
|
|
|
var name string
|
|
var lpath string
|
|
var enabled bool
|
|
err := db.QueryRow(ctx, "SELECT name, path, enable FROM libraries WHERE id = $1", id).Scan(&name, &lpath, &enabled)
|
|
if err != nil {
|
|
slog.ErrorContext(ctx, "Get Library", "err", err)
|
|
return
|
|
}
|
|
|
|
if !enabled {
|
|
slog.ErrorContext(ctx, "Scan Aborted, Library not Enabled", "id", id)
|
|
return
|
|
}
|
|
|
|
dirInfo, err := os.Stat(lpath)
|
|
if err != nil {
|
|
slog.ErrorContext(ctx, "Stating Library Path", "id", id, "path", lpath, "err", err)
|
|
return
|
|
}
|
|
|
|
if !dirInfo.IsDir() {
|
|
slog.ErrorContext(ctx, "Library Path is not a Folder", "id", id, "path", lpath)
|
|
return
|
|
}
|
|
|
|
slog.InfoContext(ctx, "Checking Files...", "id", id, "path", lpath)
|
|
|
|
// TODO Work on getting missing status again
|
|
/*
|
|
// Mark all Files as Missing
|
|
_, err = tx.Exec(ctx, "UPDATE files SET status = $2 where library_id = $1", id, constants.FILE_STATUS_MISSING)
|
|
if err != nil {
|
|
slog.ErrorContext(ctx, "Setting Missing Status", "err", err)
|
|
return
|
|
}
|
|
*/
|
|
err = filepath.Walk(lpath,
|
|
func(fullPath string, info os.FileInfo, err error) error {
|
|
if errors.Is(err, os.ErrPermission) {
|
|
slog.WarnContext(ctx, "Permission Denied While Scanning File", "path", fullPath)
|
|
return nil
|
|
} else if err != nil {
|
|
return err
|
|
}
|
|
|
|
if info.IsDir() {
|
|
// We don't care about folders
|
|
return nil
|
|
}
|
|
|
|
if !slices.Contains(videoFileExtensions, filepath.Ext(fullPath)) {
|
|
slog.DebugContext(ctx, "Skipping non video file", "path", fullPath)
|
|
return nil
|
|
}
|
|
|
|
fPath, err := filepath.Rel(lpath, fullPath)
|
|
if err != nil {
|
|
return fmt.Errorf("Getting Relative Path: %w", err)
|
|
}
|
|
|
|
var fileID int
|
|
var oldSize uint
|
|
var oldModTime time.Time
|
|
|
|
// Remove Timezone and Round to nearest Second
|
|
newModTime := info.ModTime().UTC().Round(time.Second)
|
|
|
|
err = db.QueryRow(ctx, "SELECT id, size, mod_time FROM files WHERE library_id = $1 AND path = $2", id, fPath).Scan(&fileID, &oldSize, &oldModTime)
|
|
if errors.Is(err, pgx.ErrNoRows) {
|
|
// File Does not Exist Yet
|
|
|
|
scanPool.QueueJob(
|
|
func(ctx context.Context) {
|
|
slog.InfoContext(ctx, "File is New, Running FFProbe...", "path", fullPath)
|
|
|
|
ffprobeData, err := ffprobe.ProbeURL(ctx, fullPath)
|
|
if err != nil {
|
|
slog.ErrorContext(ctx, "ffprobe New File", "err", err)
|
|
}
|
|
slog.InfoContext(ctx, "ffprobe Done", "path", fullPath)
|
|
|
|
_, err = db.Exec(ctx, "INSERT INTO files (library_id, path, size, status, health, ffprobe_data, mod_time) VALUES ($1, $2, $3, $4, $5, $6, $7)", id, fPath, info.Size(), constants.FILE_STATUS_NEW, constants.FILE_HEALTH_UNKNOWN, ffprobeData, newModTime)
|
|
if err != nil {
|
|
slog.ErrorContext(ctx, "Add New File to DB", "err", err)
|
|
}
|
|
})
|
|
return nil
|
|
} else if err != nil {
|
|
return fmt.Errorf("Getting File: %w", err)
|
|
}
|
|
|
|
// File Already Exists, Check if it has been changed
|
|
if newModTime.Equal(oldModTime) && uint(info.Size()) == oldSize {
|
|
slog.Debug("File stayed the same", "id", fileID)
|
|
} else {
|
|
scanPool.QueueJob(func(ctx context.Context) {
|
|
slog.InfoContext(ctx, "File Has Changed", "path", fullPath, "old_mod_time", oldModTime, "new_mod_time", newModTime, "old_size", oldSize, "new_size", info.Size())
|
|
|
|
ffprobeData, err := ffprobe.ProbeURL(ctx, fullPath)
|
|
if err != nil {
|
|
slog.ErrorContext(ctx, "ffprobe Changed File", "err", err)
|
|
}
|
|
slog.InfoContext(ctx, "ffprobe Done", "path", fullPath)
|
|
|
|
_, err = db.Exec(ctx, "UPDATE files SET size = $2, status = $3, health = $4, mod_time = $5, ffprobe_data = $6 WHERE id = $1", fileID, info.Size(), constants.FILE_STATUS_CHANGED, constants.FILE_HEALTH_UNKNOWN, newModTime, ffprobeData)
|
|
if err != nil {
|
|
slog.ErrorContext(ctx, "Updating Changed File in DB", "err", err)
|
|
}
|
|
})
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
slog.ErrorContext(ctx, "Error Walking Library", "err", err)
|
|
return
|
|
}
|
|
|
|
scanPool.WaitForEmptyQueue()
|
|
|
|
slog.InfoContext(ctx, "Scan Done", "id", id)
|
|
}
|