Compare commits

..

No commits in common. "aeb47c259367e61afa3246c71a2d5ca1e3417c0f" and "bae1112074f2ffdbbe2f094caf5680c70cc34fce" have entirely different histories.

3 changed files with 67 additions and 164 deletions

View file

@ -10,11 +10,11 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"slices" "slices"
"strconv"
"sync" "sync"
"time" "time"
"git.lastassault.de/speatzle/morffix/constants" "git.lastassault.de/speatzle/morffix/constants"
"git.lastassault.de/speatzle/morffix/workerpool"
"github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5"
"gopkg.in/vansante/go-ffprobe.v2" "gopkg.in/vansante/go-ffprobe.v2"
) )
@ -22,47 +22,44 @@ import (
var videoFileExtensions = []string{".mkv", ".mp4", ".webm", ".flv", ".avi"} var videoFileExtensions = []string{".mkv", ".mp4", ".webm", ".flv", ".avi"}
func handleScan(w http.ResponseWriter, r *http.Request) { func handleScan(w http.ResponseWriter, r *http.Request) {
http.Error(w, "Not Implemented", http.StatusNotImplemented) if r.Method == "GET" {
/* scanStatus(w, r)
if r.Method == "GET" { return
scanStatus(w, r) }
return
}
id, err := strconv.Atoi(r.FormValue("id")) id, err := strconv.Atoi(r.FormValue("id"))
if err != nil { if err != nil {
http.Error(w, "Parsing ID", http.StatusBadRequest) http.Error(w, "Parsing ID", http.StatusBadRequest)
return return
} }
var name string var name string
var path string var path string
var enabled bool var enabled bool
err = db.QueryRow(r.Context(), "SELECT name, path, enable FROM libraries WHERE id = $1", id).Scan(&name, &path, &enabled) err = db.QueryRow(r.Context(), "SELECT name, path, enable FROM libraries WHERE id = $1", id).Scan(&name, &path, &enabled)
if err != nil { if err != nil {
slog.ErrorContext(r.Context(), "Get Library", "err", err) slog.ErrorContext(r.Context(), "Get Library", "err", err)
http.Error(w, "Error Get Library: "+err.Error(), http.StatusInternalServerError) http.Error(w, "Error Get Library: "+err.Error(), http.StatusInternalServerError)
return return
} }
scanCtx := context.Background() scanCtx := context.Background()
go scanLibrary(scanCtx, id) go scanLibrary(scanCtx, id)
message := "Scan Started" message := "Scan Started"
buf := bytes.Buffer{} buf := bytes.Buffer{}
err = templates.ExecuteTemplate(&buf, constants.MESSAGE_TEMPLATE_NAME, message) err = templates.ExecuteTemplate(&buf, constants.MESSAGE_TEMPLATE_NAME, message)
if err != nil { if err != nil {
slog.ErrorContext(r.Context(), "Executing Library Template", "err", err) slog.ErrorContext(r.Context(), "Executing Library Template", "err", err)
http.Error(w, "Error Executing Template: "+err.Error(), http.StatusInternalServerError) http.Error(w, "Error Executing Template: "+err.Error(), http.StatusInternalServerError)
return return
} }
_, err = w.Write(buf.Bytes()) _, err = w.Write(buf.Bytes())
if err != nil { if err != nil {
slog.ErrorContext(r.Context(), "Writing http Response", "err", err) slog.ErrorContext(r.Context(), "Writing http Response", "err", err)
} }
*/
} }
func scanStatus(w http.ResponseWriter, r *http.Request) { func scanStatus(w http.ResponseWriter, r *http.Request) {
@ -82,7 +79,7 @@ func scanStatus(w http.ResponseWriter, r *http.Request) {
} }
} }
func manageScan(stop chan bool, scanPool *workerpool.WorkerPool, hashPool *workerpool.WorkerPool) { func manageScan(stop chan bool) {
scanTicker := time.NewTicker(time.Minute) scanTicker := time.NewTicker(time.Minute)
hashTicker := time.NewTicker(time.Second) hashTicker := time.NewTicker(time.Second)
scanRunning := false scanRunning := false
@ -114,7 +111,7 @@ func manageScan(stop chan bool, scanPool *workerpool.WorkerPool, hashPool *worke
} }
for _, l := range libraries { for _, l := range libraries {
scanLibrary(ctx, l, scanPool) scanLibrary(ctx, l)
} }
}() }()
@ -129,32 +126,19 @@ func manageScan(stop chan bool, scanPool *workerpool.WorkerPool, hashPool *worke
hashRunning = false hashRunning = false
}() }()
rows, err := db.Query(ctx, "SELECT id FROM files WHERE status = $1 OR status = $2 LIMIT 1", constants.FILE_STATUS_CHANGED, constants.FILE_STATUS_NEW) var fileID uint
err := db.QueryRow(ctx, "SELECT id FROM files WHERE status = $1 OR status = $2 LIMIT 1", constants.FILE_STATUS_CHANGED, constants.FILE_STATUS_NEW).Scan(&fileID)
if err != nil { if err != nil {
if !errors.Is(err, pgx.ErrNoRows) { if !errors.Is(err, pgx.ErrNoRows) {
slog.ErrorContext(ctx, "Error Getting Files for Hashing", "err", err) slog.ErrorContext(ctx, "Error Getting Files for Hashing", "err", err)
} }
// Nothing to do
return return
} }
files, err := pgx.CollectRows[uint](rows, pgx.RowToStructByName[uint]) err = hashFile(ctx, fileID)
if err != nil { if err != nil {
slog.ErrorContext(ctx, "Collect File Rows", "err", err) slog.ErrorContext(ctx, "Error Hashing File", "err", err)
return
} }
for _, f := range files {
hashPool.QueueJob(func(ctx context.Context) {
err = hashFile(ctx, f)
if err != nil {
slog.ErrorContext(ctx, "Error Hashing File", "err", err)
}
})
}
// Wait for all jobs to finish since we don't track which ones are in progress and don't want to hash the same file twice. Not ideal but better than just doing one at a time
hashPool.WaitForEmptyQueue()
}() }()
case <-stop: case <-stop:
cancel() cancel()
@ -165,7 +149,7 @@ func manageScan(stop chan bool, scanPool *workerpool.WorkerPool, hashPool *worke
var scanLock sync.Mutex var scanLock sync.Mutex
func scanLibrary(ctx context.Context, id int, scanPool *workerpool.WorkerPool) { func scanLibrary(ctx context.Context, id int) {
slog.InfoContext(ctx, "Acquiring Scan Lock...") slog.InfoContext(ctx, "Acquiring Scan Lock...")
scanLock.Lock() scanLock.Lock()
defer scanLock.Unlock() defer scanLock.Unlock()
@ -234,53 +218,49 @@ func scanLibrary(ctx context.Context, id int, scanPool *workerpool.WorkerPool) {
var fileID int var fileID int
var oldSize uint var oldSize uint
var oldModTime time.Time var oldModTime time.Time
// Remove Timezone and Round to nearest Second
newModTime := info.ModTime().UTC().Round(time.Second)
err = db.QueryRow(ctx, "SELECT id, size, mod_time FROM files WHERE library_id = $1 AND path = $2", id, fPath).Scan(&fileID, &oldSize, &oldModTime) err = db.QueryRow(ctx, "SELECT id, size, mod_time FROM files WHERE library_id = $1 AND path = $2", id, fPath).Scan(&fileID, &oldSize, &oldModTime)
if errors.Is(err, pgx.ErrNoRows) { if errors.Is(err, pgx.ErrNoRows) {
// File Does not Exist Yet // File Does not Exist Yet
go func() {
slog.InfoContext(ctx, "File is New, Running FFProbe...", "path", fullPath)
scanPool.QueueJob( ffprobeData, err := ffprobe.ProbeURL(ctx, fullPath)
func(ctx context.Context) { if err != nil {
slog.InfoContext(ctx, "File is New, Running FFProbe...", "path", fullPath) slog.ErrorContext(ctx, "ffprobe New File", "err", err)
}
slog.InfoContext(ctx, "ffprobe Done", "path", fullPath)
ffprobeData, err := ffprobe.ProbeURL(ctx, fullPath) _, err = db.Exec(ctx, "INSERT INTO files (library_id, path, size, status, health, ffprobe_data) VALUES ($1, $2, $3, $4, $5, $6)", id, fPath, info.Size(), constants.FILE_STATUS_NEW, constants.FILE_HEALTH_UNKNOWN, ffprobeData)
if err != nil { if err != nil {
slog.ErrorContext(ctx, "ffprobe New File", "err", err) slog.ErrorContext(ctx, "Add New File to DB", "err", err)
} }
slog.InfoContext(ctx, "ffprobe Done", "path", fullPath) }()
_, err = db.Exec(ctx, "INSERT INTO files (library_id, path, size, status, health, ffprobe_data, mod_time) VALUES ($1, $2, $3, $4, $5, $6, $7)", id, fPath, info.Size(), constants.FILE_STATUS_NEW, constants.FILE_HEALTH_UNKNOWN, ffprobeData, newModTime)
if err != nil {
slog.ErrorContext(ctx, "Add New File to DB", "err", err)
}
})
return nil return nil
} else if err != nil { } else if err != nil {
return fmt.Errorf("Getting File: %w", err) return fmt.Errorf("Getting File: %w", err)
} }
// Remove Timezone and Round to nearest Second
newModTime := info.ModTime().UTC().Round(time.Second)
// File Already Exists, Check if it has been changed // File Already Exists, Check if it has been changed
if newModTime.Equal(oldModTime) && uint(info.Size()) == oldSize { if newModTime.Equal(oldModTime) && uint(info.Size()) == oldSize {
slog.Debug("File stayed the same", "id", fileID) slog.Debug("File stayed the same", "id", fileID)
} else { } else {
scanPool.QueueJob(func(ctx context.Context) { slog.InfoContext(ctx, "File Has Changed", "path", fullPath, "old_mod_time", oldModTime, "new_mod_time", newModTime, "old_size", oldSize, "new_size", info.Size())
slog.InfoContext(ctx, "File Has Changed", "path", fullPath, "old_mod_time", oldModTime, "new_mod_time", newModTime, "old_size", oldSize, "new_size", info.Size())
ffprobeData, err := ffprobe.ProbeURL(ctx, fullPath) ffprobeData, err := ffprobe.ProbeURL(ctx, fullPath)
if err != nil { if err != nil {
slog.ErrorContext(ctx, "ffprobe Changed File", "err", err) slog.ErrorContext(ctx, "ffprobe Changed File", "err", err)
} }
slog.InfoContext(ctx, "ffprobe Done", "path", fullPath) slog.InfoContext(ctx, "ffprobe Done", "path", fullPath)
_, err = db.Exec(ctx, "UPDATE files SET size = $2, status = $3, health = $4, mod_time = $5, ffprobe_data = $6 WHERE id = $1", fileID, info.Size(), constants.FILE_STATUS_CHANGED, constants.FILE_HEALTH_UNKNOWN, newModTime, ffprobeData) _, err = db.Exec(ctx, "UPDATE files SET size = $2, status = $3, health = $4, mod_time = $5, ffprobe_data = $6 WHERE id = $1", fileID, info.Size(), constants.FILE_STATUS_CHANGED, constants.FILE_HEALTH_UNKNOWN, newModTime, ffprobeData)
if err != nil { if err != nil {
slog.ErrorContext(ctx, "Updating Changed File in DB", "err", err) return fmt.Errorf("Updating Changed File in DB: %w", err)
} }
})
} }
return nil return nil
}) })
if err != nil { if err != nil {
@ -288,7 +268,5 @@ func scanLibrary(ctx context.Context, id int, scanPool *workerpool.WorkerPool) {
return return
} }
scanPool.WaitForEmptyQueue()
slog.InfoContext(ctx, "Scan Done", "id", id) slog.InfoContext(ctx, "Scan Done", "id", id)
} }

View file

@ -17,7 +17,6 @@ import (
"github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/pgxpool"
"git.lastassault.de/speatzle/morffix/config" "git.lastassault.de/speatzle/morffix/config"
"git.lastassault.de/speatzle/morffix/workerpool"
) )
var conf config.Config var conf config.Config
@ -130,11 +129,8 @@ func Start(_conf config.Config, tmplFS embed.FS, staticFS embed.FS, migrationsFS
stopWorkerManagement := make(chan bool, 1) stopWorkerManagement := make(chan bool, 1)
go manageWorkers(stopWorkerManagement) go manageWorkers(stopWorkerManagement)
scanPool := workerpool.NewWorkerPool(10, 100000)
hashPool := workerpool.NewWorkerPool(5, 100000)
stopScanning := make(chan bool, 1) stopScanning := make(chan bool, 1)
go manageScan(stopScanning, scanPool, hashPool) go manageScan(stopScanning)
sigs := make(chan os.Signal, 1) sigs := make(chan os.Signal, 1)
signal.Notify(sigs, os.Interrupt) signal.Notify(sigs, os.Interrupt)
@ -150,10 +146,6 @@ func Start(_conf config.Config, tmplFS embed.FS, staticFS embed.FS, migrationsFS
stopWorkerManagement <- true stopWorkerManagement <- true
slog.Info("Stopping Scanning...") slog.Info("Stopping Scanning...")
stopScanning <- true stopScanning <- true
slog.Info("Stopping Scanning Pool...")
scanPool.Stop()
slog.Info("Stopping Hashing Pool...")
hashPool.Stop()
slog.Info("Done") slog.Info("Done")
} }
} }

View file

@ -1,67 +0,0 @@
package workerpool
import (
"context"
"sync"
)
type WorkerPool struct {
workers int
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
queue chan func(context.Context)
qg sync.WaitGroup
}
func NewWorkerPool(workers int, queueSize int) *WorkerPool {
ctx, cancel := context.WithCancel(context.Background())
queue := make(chan func(context.Context), queueSize)
workerPool := WorkerPool{
workers: workers,
ctx: ctx,
cancel: cancel,
queue: queue,
}
for i := 0; i < workerPool.workers; i++ {
workerPool.wg.Add(1)
go workerPool.work(workerPool.ctx)
}
return &workerPool
}
func (wp *WorkerPool) Stop() {
wp.cancel()
wp.wg.Wait()
}
func (wp *WorkerPool) QueueJob(job func(context.Context)) {
wp.qg.Add(1)
wp.queue <- job
}
func (wp *WorkerPool) WaitForEmptyQueue() {
wp.qg.Wait()
}
func (wp *WorkerPool) QueueLength() int {
return len(wp.queue)
}
func (wp *WorkerPool) work(ctx context.Context) {
for {
select {
case <-ctx.Done():
wp.wg.Done()
return
case job := <-wp.queue:
func() {
defer wp.qg.Done()
job(ctx)
}()
}
}
}