User workerpool for scanning and hashing
All checks were successful
/ release (push) Successful in 31s
All checks were successful
/ release (push) Successful in 31s
This commit is contained in:
parent
f12462250a
commit
aeb47c2593
3 changed files with 160 additions and 64 deletions
147
server/scan.go
147
server/scan.go
|
@ -10,11 +10,11 @@ import (
|
|||
"os"
|
||||
"path/filepath"
|
||||
"slices"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.lastassault.de/speatzle/morffix/constants"
|
||||
"git.lastassault.de/speatzle/morffix/workerpool"
|
||||
"github.com/jackc/pgx/v5"
|
||||
"gopkg.in/vansante/go-ffprobe.v2"
|
||||
)
|
||||
|
@ -22,44 +22,47 @@ import (
|
|||
var videoFileExtensions = []string{".mkv", ".mp4", ".webm", ".flv", ".avi"}
|
||||
|
||||
func handleScan(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method == "GET" {
|
||||
scanStatus(w, r)
|
||||
return
|
||||
}
|
||||
http.Error(w, "Not Implemented", http.StatusNotImplemented)
|
||||
/*
|
||||
if r.Method == "GET" {
|
||||
scanStatus(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
id, err := strconv.Atoi(r.FormValue("id"))
|
||||
if err != nil {
|
||||
http.Error(w, "Parsing ID", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
id, err := strconv.Atoi(r.FormValue("id"))
|
||||
if err != nil {
|
||||
http.Error(w, "Parsing ID", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
var name string
|
||||
var path string
|
||||
var enabled bool
|
||||
err = db.QueryRow(r.Context(), "SELECT name, path, enable FROM libraries WHERE id = $1", id).Scan(&name, &path, &enabled)
|
||||
if err != nil {
|
||||
slog.ErrorContext(r.Context(), "Get Library", "err", err)
|
||||
http.Error(w, "Error Get Library: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
var name string
|
||||
var path string
|
||||
var enabled bool
|
||||
err = db.QueryRow(r.Context(), "SELECT name, path, enable FROM libraries WHERE id = $1", id).Scan(&name, &path, &enabled)
|
||||
if err != nil {
|
||||
slog.ErrorContext(r.Context(), "Get Library", "err", err)
|
||||
http.Error(w, "Error Get Library: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
scanCtx := context.Background()
|
||||
go scanLibrary(scanCtx, id)
|
||||
scanCtx := context.Background()
|
||||
go scanLibrary(scanCtx, id)
|
||||
|
||||
message := "Scan Started"
|
||||
message := "Scan Started"
|
||||
|
||||
buf := bytes.Buffer{}
|
||||
err = templates.ExecuteTemplate(&buf, constants.MESSAGE_TEMPLATE_NAME, message)
|
||||
if err != nil {
|
||||
slog.ErrorContext(r.Context(), "Executing Library Template", "err", err)
|
||||
http.Error(w, "Error Executing Template: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
buf := bytes.Buffer{}
|
||||
err = templates.ExecuteTemplate(&buf, constants.MESSAGE_TEMPLATE_NAME, message)
|
||||
if err != nil {
|
||||
slog.ErrorContext(r.Context(), "Executing Library Template", "err", err)
|
||||
http.Error(w, "Error Executing Template: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
_, err = w.Write(buf.Bytes())
|
||||
if err != nil {
|
||||
slog.ErrorContext(r.Context(), "Writing http Response", "err", err)
|
||||
}
|
||||
_, err = w.Write(buf.Bytes())
|
||||
if err != nil {
|
||||
slog.ErrorContext(r.Context(), "Writing http Response", "err", err)
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
func scanStatus(w http.ResponseWriter, r *http.Request) {
|
||||
|
@ -79,7 +82,7 @@ func scanStatus(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
}
|
||||
|
||||
func manageScan(stop chan bool) {
|
||||
func manageScan(stop chan bool, scanPool *workerpool.WorkerPool, hashPool *workerpool.WorkerPool) {
|
||||
scanTicker := time.NewTicker(time.Minute)
|
||||
hashTicker := time.NewTicker(time.Second)
|
||||
scanRunning := false
|
||||
|
@ -111,7 +114,7 @@ func manageScan(stop chan bool) {
|
|||
}
|
||||
|
||||
for _, l := range libraries {
|
||||
scanLibrary(ctx, l)
|
||||
scanLibrary(ctx, l, scanPool)
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -126,19 +129,32 @@ func manageScan(stop chan bool) {
|
|||
hashRunning = false
|
||||
}()
|
||||
|
||||
var fileID uint
|
||||
err := db.QueryRow(ctx, "SELECT id FROM files WHERE status = $1 OR status = $2 LIMIT 1", constants.FILE_STATUS_CHANGED, constants.FILE_STATUS_NEW).Scan(&fileID)
|
||||
rows, err := db.Query(ctx, "SELECT id FROM files WHERE status = $1 OR status = $2 LIMIT 1", constants.FILE_STATUS_CHANGED, constants.FILE_STATUS_NEW)
|
||||
if err != nil {
|
||||
if !errors.Is(err, pgx.ErrNoRows) {
|
||||
slog.ErrorContext(ctx, "Error Getting Files for Hashing", "err", err)
|
||||
}
|
||||
// Nothing to do
|
||||
return
|
||||
}
|
||||
|
||||
err = hashFile(ctx, fileID)
|
||||
files, err := pgx.CollectRows[uint](rows, pgx.RowToStructByName[uint])
|
||||
if err != nil {
|
||||
slog.ErrorContext(ctx, "Error Hashing File", "err", err)
|
||||
slog.ErrorContext(ctx, "Collect File Rows", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, f := range files {
|
||||
hashPool.QueueJob(func(ctx context.Context) {
|
||||
err = hashFile(ctx, f)
|
||||
if err != nil {
|
||||
slog.ErrorContext(ctx, "Error Hashing File", "err", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// Wait for all jobs to finish since we don't track which ones are in progress and don't want to hash the same file twice. Not ideal but better than just doing one at a time
|
||||
hashPool.WaitForEmptyQueue()
|
||||
}()
|
||||
case <-stop:
|
||||
cancel()
|
||||
|
@ -149,7 +165,7 @@ func manageScan(stop chan bool) {
|
|||
|
||||
var scanLock sync.Mutex
|
||||
|
||||
func scanLibrary(ctx context.Context, id int) {
|
||||
func scanLibrary(ctx context.Context, id int, scanPool *workerpool.WorkerPool) {
|
||||
slog.InfoContext(ctx, "Acquiring Scan Lock...")
|
||||
scanLock.Lock()
|
||||
defer scanLock.Unlock()
|
||||
|
@ -225,20 +241,22 @@ func scanLibrary(ctx context.Context, id int) {
|
|||
err = db.QueryRow(ctx, "SELECT id, size, mod_time FROM files WHERE library_id = $1 AND path = $2", id, fPath).Scan(&fileID, &oldSize, &oldModTime)
|
||||
if errors.Is(err, pgx.ErrNoRows) {
|
||||
// File Does not Exist Yet
|
||||
go func() {
|
||||
slog.InfoContext(ctx, "File is New, Running FFProbe...", "path", fullPath)
|
||||
|
||||
ffprobeData, err := ffprobe.ProbeURL(ctx, fullPath)
|
||||
if err != nil {
|
||||
slog.ErrorContext(ctx, "ffprobe New File", "err", err)
|
||||
}
|
||||
slog.InfoContext(ctx, "ffprobe Done", "path", fullPath)
|
||||
scanPool.QueueJob(
|
||||
func(ctx context.Context) {
|
||||
slog.InfoContext(ctx, "File is New, Running FFProbe...", "path", fullPath)
|
||||
|
||||
_, err = db.Exec(ctx, "INSERT INTO files (library_id, path, size, status, health, ffprobe_data, mod_time) VALUES ($1, $2, $3, $4, $5, $6, $7)", id, fPath, info.Size(), constants.FILE_STATUS_NEW, constants.FILE_HEALTH_UNKNOWN, ffprobeData, newModTime)
|
||||
if err != nil {
|
||||
slog.ErrorContext(ctx, "Add New File to DB", "err", err)
|
||||
}
|
||||
}()
|
||||
ffprobeData, err := ffprobe.ProbeURL(ctx, fullPath)
|
||||
if err != nil {
|
||||
slog.ErrorContext(ctx, "ffprobe New File", "err", err)
|
||||
}
|
||||
slog.InfoContext(ctx, "ffprobe Done", "path", fullPath)
|
||||
|
||||
_, err = db.Exec(ctx, "INSERT INTO files (library_id, path, size, status, health, ffprobe_data, mod_time) VALUES ($1, $2, $3, $4, $5, $6, $7)", id, fPath, info.Size(), constants.FILE_STATUS_NEW, constants.FILE_HEALTH_UNKNOWN, ffprobeData, newModTime)
|
||||
if err != nil {
|
||||
slog.ErrorContext(ctx, "Add New File to DB", "err", err)
|
||||
}
|
||||
})
|
||||
return nil
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("Getting File: %w", err)
|
||||
|
@ -248,20 +266,21 @@ func scanLibrary(ctx context.Context, id int) {
|
|||
if newModTime.Equal(oldModTime) && uint(info.Size()) == oldSize {
|
||||
slog.Debug("File stayed the same", "id", fileID)
|
||||
} else {
|
||||
slog.InfoContext(ctx, "File Has Changed", "path", fullPath, "old_mod_time", oldModTime, "new_mod_time", newModTime, "old_size", oldSize, "new_size", info.Size())
|
||||
scanPool.QueueJob(func(ctx context.Context) {
|
||||
slog.InfoContext(ctx, "File Has Changed", "path", fullPath, "old_mod_time", oldModTime, "new_mod_time", newModTime, "old_size", oldSize, "new_size", info.Size())
|
||||
|
||||
ffprobeData, err := ffprobe.ProbeURL(ctx, fullPath)
|
||||
if err != nil {
|
||||
slog.ErrorContext(ctx, "ffprobe Changed File", "err", err)
|
||||
}
|
||||
slog.InfoContext(ctx, "ffprobe Done", "path", fullPath)
|
||||
ffprobeData, err := ffprobe.ProbeURL(ctx, fullPath)
|
||||
if err != nil {
|
||||
slog.ErrorContext(ctx, "ffprobe Changed File", "err", err)
|
||||
}
|
||||
slog.InfoContext(ctx, "ffprobe Done", "path", fullPath)
|
||||
|
||||
_, err = db.Exec(ctx, "UPDATE files SET size = $2, status = $3, health = $4, mod_time = $5, ffprobe_data = $6 WHERE id = $1", fileID, info.Size(), constants.FILE_STATUS_CHANGED, constants.FILE_HEALTH_UNKNOWN, newModTime, ffprobeData)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Updating Changed File in DB: %w", err)
|
||||
}
|
||||
_, err = db.Exec(ctx, "UPDATE files SET size = $2, status = $3, health = $4, mod_time = $5, ffprobe_data = $6 WHERE id = $1", fileID, info.Size(), constants.FILE_STATUS_CHANGED, constants.FILE_HEALTH_UNKNOWN, newModTime, ffprobeData)
|
||||
if err != nil {
|
||||
slog.ErrorContext(ctx, "Updating Changed File in DB", "err", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
|
@ -269,5 +288,7 @@ func scanLibrary(ctx context.Context, id int) {
|
|||
return
|
||||
}
|
||||
|
||||
scanPool.WaitForEmptyQueue()
|
||||
|
||||
slog.InfoContext(ctx, "Scan Done", "id", id)
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@ import (
|
|||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
|
||||
"git.lastassault.de/speatzle/morffix/config"
|
||||
"git.lastassault.de/speatzle/morffix/workerpool"
|
||||
)
|
||||
|
||||
var conf config.Config
|
||||
|
@ -129,8 +130,11 @@ func Start(_conf config.Config, tmplFS embed.FS, staticFS embed.FS, migrationsFS
|
|||
stopWorkerManagement := make(chan bool, 1)
|
||||
go manageWorkers(stopWorkerManagement)
|
||||
|
||||
scanPool := workerpool.NewWorkerPool(10, 100000)
|
||||
hashPool := workerpool.NewWorkerPool(5, 100000)
|
||||
|
||||
stopScanning := make(chan bool, 1)
|
||||
go manageScan(stopScanning)
|
||||
go manageScan(stopScanning, scanPool, hashPool)
|
||||
|
||||
sigs := make(chan os.Signal, 1)
|
||||
signal.Notify(sigs, os.Interrupt)
|
||||
|
@ -146,6 +150,10 @@ func Start(_conf config.Config, tmplFS embed.FS, staticFS embed.FS, migrationsFS
|
|||
stopWorkerManagement <- true
|
||||
slog.Info("Stopping Scanning...")
|
||||
stopScanning <- true
|
||||
slog.Info("Stopping Scanning Pool...")
|
||||
scanPool.Stop()
|
||||
slog.Info("Stopping Hashing Pool...")
|
||||
hashPool.Stop()
|
||||
slog.Info("Done")
|
||||
}
|
||||
}
|
||||
|
|
67
workerpool/workerpool.go
Normal file
67
workerpool/workerpool.go
Normal file
|
@ -0,0 +1,67 @@
|
|||
package workerpool
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type WorkerPool struct {
|
||||
workers int
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
queue chan func(context.Context)
|
||||
qg sync.WaitGroup
|
||||
}
|
||||
|
||||
func NewWorkerPool(workers int, queueSize int) *WorkerPool {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
queue := make(chan func(context.Context), queueSize)
|
||||
|
||||
workerPool := WorkerPool{
|
||||
workers: workers,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
queue: queue,
|
||||
}
|
||||
|
||||
for i := 0; i < workerPool.workers; i++ {
|
||||
workerPool.wg.Add(1)
|
||||
go workerPool.work(workerPool.ctx)
|
||||
}
|
||||
|
||||
return &workerPool
|
||||
}
|
||||
|
||||
func (wp *WorkerPool) Stop() {
|
||||
wp.cancel()
|
||||
wp.wg.Wait()
|
||||
}
|
||||
|
||||
func (wp *WorkerPool) QueueJob(job func(context.Context)) {
|
||||
wp.qg.Add(1)
|
||||
wp.queue <- job
|
||||
}
|
||||
|
||||
func (wp *WorkerPool) WaitForEmptyQueue() {
|
||||
wp.qg.Wait()
|
||||
}
|
||||
|
||||
func (wp *WorkerPool) QueueLength() int {
|
||||
return len(wp.queue)
|
||||
}
|
||||
|
||||
func (wp *WorkerPool) work(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
wp.wg.Done()
|
||||
return
|
||||
case job := <-wp.queue:
|
||||
func() {
|
||||
defer wp.qg.Done()
|
||||
job(ctx)
|
||||
}()
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Add table
Reference in a new issue