Compare commits
2 commits
bae1112074
...
aeb47c2593
Author | SHA1 | Date | |
---|---|---|---|
aeb47c2593 | |||
f12462250a |
3 changed files with 164 additions and 67 deletions
|
@ -10,11 +10,11 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"slices"
|
"slices"
|
||||||
"strconv"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.lastassault.de/speatzle/morffix/constants"
|
"git.lastassault.de/speatzle/morffix/constants"
|
||||||
|
"git.lastassault.de/speatzle/morffix/workerpool"
|
||||||
"github.com/jackc/pgx/v5"
|
"github.com/jackc/pgx/v5"
|
||||||
"gopkg.in/vansante/go-ffprobe.v2"
|
"gopkg.in/vansante/go-ffprobe.v2"
|
||||||
)
|
)
|
||||||
|
@ -22,6 +22,8 @@ import (
|
||||||
var videoFileExtensions = []string{".mkv", ".mp4", ".webm", ".flv", ".avi"}
|
var videoFileExtensions = []string{".mkv", ".mp4", ".webm", ".flv", ".avi"}
|
||||||
|
|
||||||
func handleScan(w http.ResponseWriter, r *http.Request) {
|
func handleScan(w http.ResponseWriter, r *http.Request) {
|
||||||
|
http.Error(w, "Not Implemented", http.StatusNotImplemented)
|
||||||
|
/*
|
||||||
if r.Method == "GET" {
|
if r.Method == "GET" {
|
||||||
scanStatus(w, r)
|
scanStatus(w, r)
|
||||||
return
|
return
|
||||||
|
@ -60,6 +62,7 @@ func handleScan(w http.ResponseWriter, r *http.Request) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.ErrorContext(r.Context(), "Writing http Response", "err", err)
|
slog.ErrorContext(r.Context(), "Writing http Response", "err", err)
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
}
|
}
|
||||||
|
|
||||||
func scanStatus(w http.ResponseWriter, r *http.Request) {
|
func scanStatus(w http.ResponseWriter, r *http.Request) {
|
||||||
|
@ -79,7 +82,7 @@ func scanStatus(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func manageScan(stop chan bool) {
|
func manageScan(stop chan bool, scanPool *workerpool.WorkerPool, hashPool *workerpool.WorkerPool) {
|
||||||
scanTicker := time.NewTicker(time.Minute)
|
scanTicker := time.NewTicker(time.Minute)
|
||||||
hashTicker := time.NewTicker(time.Second)
|
hashTicker := time.NewTicker(time.Second)
|
||||||
scanRunning := false
|
scanRunning := false
|
||||||
|
@ -111,7 +114,7 @@ func manageScan(stop chan bool) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, l := range libraries {
|
for _, l := range libraries {
|
||||||
scanLibrary(ctx, l)
|
scanLibrary(ctx, l, scanPool)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
@ -126,19 +129,32 @@ func manageScan(stop chan bool) {
|
||||||
hashRunning = false
|
hashRunning = false
|
||||||
}()
|
}()
|
||||||
|
|
||||||
var fileID uint
|
rows, err := db.Query(ctx, "SELECT id FROM files WHERE status = $1 OR status = $2 LIMIT 1", constants.FILE_STATUS_CHANGED, constants.FILE_STATUS_NEW)
|
||||||
err := db.QueryRow(ctx, "SELECT id FROM files WHERE status = $1 OR status = $2 LIMIT 1", constants.FILE_STATUS_CHANGED, constants.FILE_STATUS_NEW).Scan(&fileID)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !errors.Is(err, pgx.ErrNoRows) {
|
if !errors.Is(err, pgx.ErrNoRows) {
|
||||||
slog.ErrorContext(ctx, "Error Getting Files for Hashing", "err", err)
|
slog.ErrorContext(ctx, "Error Getting Files for Hashing", "err", err)
|
||||||
}
|
}
|
||||||
|
// Nothing to do
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err = hashFile(ctx, fileID)
|
files, err := pgx.CollectRows[uint](rows, pgx.RowToStructByName[uint])
|
||||||
|
if err != nil {
|
||||||
|
slog.ErrorContext(ctx, "Collect File Rows", "err", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, f := range files {
|
||||||
|
hashPool.QueueJob(func(ctx context.Context) {
|
||||||
|
err = hashFile(ctx, f)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.ErrorContext(ctx, "Error Hashing File", "err", err)
|
slog.ErrorContext(ctx, "Error Hashing File", "err", err)
|
||||||
}
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for all jobs to finish since we don't track which ones are in progress and don't want to hash the same file twice. Not ideal but better than just doing one at a time
|
||||||
|
hashPool.WaitForEmptyQueue()
|
||||||
}()
|
}()
|
||||||
case <-stop:
|
case <-stop:
|
||||||
cancel()
|
cancel()
|
||||||
|
@ -149,7 +165,7 @@ func manageScan(stop chan bool) {
|
||||||
|
|
||||||
var scanLock sync.Mutex
|
var scanLock sync.Mutex
|
||||||
|
|
||||||
func scanLibrary(ctx context.Context, id int) {
|
func scanLibrary(ctx context.Context, id int, scanPool *workerpool.WorkerPool) {
|
||||||
slog.InfoContext(ctx, "Acquiring Scan Lock...")
|
slog.InfoContext(ctx, "Acquiring Scan Lock...")
|
||||||
scanLock.Lock()
|
scanLock.Lock()
|
||||||
defer scanLock.Unlock()
|
defer scanLock.Unlock()
|
||||||
|
@ -218,10 +234,16 @@ func scanLibrary(ctx context.Context, id int) {
|
||||||
var fileID int
|
var fileID int
|
||||||
var oldSize uint
|
var oldSize uint
|
||||||
var oldModTime time.Time
|
var oldModTime time.Time
|
||||||
|
|
||||||
|
// Remove Timezone and Round to nearest Second
|
||||||
|
newModTime := info.ModTime().UTC().Round(time.Second)
|
||||||
|
|
||||||
err = db.QueryRow(ctx, "SELECT id, size, mod_time FROM files WHERE library_id = $1 AND path = $2", id, fPath).Scan(&fileID, &oldSize, &oldModTime)
|
err = db.QueryRow(ctx, "SELECT id, size, mod_time FROM files WHERE library_id = $1 AND path = $2", id, fPath).Scan(&fileID, &oldSize, &oldModTime)
|
||||||
if errors.Is(err, pgx.ErrNoRows) {
|
if errors.Is(err, pgx.ErrNoRows) {
|
||||||
// File Does not Exist Yet
|
// File Does not Exist Yet
|
||||||
go func() {
|
|
||||||
|
scanPool.QueueJob(
|
||||||
|
func(ctx context.Context) {
|
||||||
slog.InfoContext(ctx, "File is New, Running FFProbe...", "path", fullPath)
|
slog.InfoContext(ctx, "File is New, Running FFProbe...", "path", fullPath)
|
||||||
|
|
||||||
ffprobeData, err := ffprobe.ProbeURL(ctx, fullPath)
|
ffprobeData, err := ffprobe.ProbeURL(ctx, fullPath)
|
||||||
|
@ -230,23 +252,21 @@ func scanLibrary(ctx context.Context, id int) {
|
||||||
}
|
}
|
||||||
slog.InfoContext(ctx, "ffprobe Done", "path", fullPath)
|
slog.InfoContext(ctx, "ffprobe Done", "path", fullPath)
|
||||||
|
|
||||||
_, err = db.Exec(ctx, "INSERT INTO files (library_id, path, size, status, health, ffprobe_data) VALUES ($1, $2, $3, $4, $5, $6)", id, fPath, info.Size(), constants.FILE_STATUS_NEW, constants.FILE_HEALTH_UNKNOWN, ffprobeData)
|
_, err = db.Exec(ctx, "INSERT INTO files (library_id, path, size, status, health, ffprobe_data, mod_time) VALUES ($1, $2, $3, $4, $5, $6, $7)", id, fPath, info.Size(), constants.FILE_STATUS_NEW, constants.FILE_HEALTH_UNKNOWN, ffprobeData, newModTime)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.ErrorContext(ctx, "Add New File to DB", "err", err)
|
slog.ErrorContext(ctx, "Add New File to DB", "err", err)
|
||||||
}
|
}
|
||||||
}()
|
})
|
||||||
return nil
|
return nil
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
return fmt.Errorf("Getting File: %w", err)
|
return fmt.Errorf("Getting File: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove Timezone and Round to nearest Second
|
|
||||||
newModTime := info.ModTime().UTC().Round(time.Second)
|
|
||||||
|
|
||||||
// File Already Exists, Check if it has been changed
|
// File Already Exists, Check if it has been changed
|
||||||
if newModTime.Equal(oldModTime) && uint(info.Size()) == oldSize {
|
if newModTime.Equal(oldModTime) && uint(info.Size()) == oldSize {
|
||||||
slog.Debug("File stayed the same", "id", fileID)
|
slog.Debug("File stayed the same", "id", fileID)
|
||||||
} else {
|
} else {
|
||||||
|
scanPool.QueueJob(func(ctx context.Context) {
|
||||||
slog.InfoContext(ctx, "File Has Changed", "path", fullPath, "old_mod_time", oldModTime, "new_mod_time", newModTime, "old_size", oldSize, "new_size", info.Size())
|
slog.InfoContext(ctx, "File Has Changed", "path", fullPath, "old_mod_time", oldModTime, "new_mod_time", newModTime, "old_size", oldSize, "new_size", info.Size())
|
||||||
|
|
||||||
ffprobeData, err := ffprobe.ProbeURL(ctx, fullPath)
|
ffprobeData, err := ffprobe.ProbeURL(ctx, fullPath)
|
||||||
|
@ -257,10 +277,10 @@ func scanLibrary(ctx context.Context, id int) {
|
||||||
|
|
||||||
_, err = db.Exec(ctx, "UPDATE files SET size = $2, status = $3, health = $4, mod_time = $5, ffprobe_data = $6 WHERE id = $1", fileID, info.Size(), constants.FILE_STATUS_CHANGED, constants.FILE_HEALTH_UNKNOWN, newModTime, ffprobeData)
|
_, err = db.Exec(ctx, "UPDATE files SET size = $2, status = $3, health = $4, mod_time = $5, ffprobe_data = $6 WHERE id = $1", fileID, info.Size(), constants.FILE_STATUS_CHANGED, constants.FILE_HEALTH_UNKNOWN, newModTime, ffprobeData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Updating Changed File in DB: %w", err)
|
slog.ErrorContext(ctx, "Updating Changed File in DB", "err", err)
|
||||||
}
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -268,5 +288,7 @@ func scanLibrary(ctx context.Context, id int) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
scanPool.WaitForEmptyQueue()
|
||||||
|
|
||||||
slog.InfoContext(ctx, "Scan Done", "id", id)
|
slog.InfoContext(ctx, "Scan Done", "id", id)
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,7 @@ import (
|
||||||
"github.com/jackc/pgx/v5/pgxpool"
|
"github.com/jackc/pgx/v5/pgxpool"
|
||||||
|
|
||||||
"git.lastassault.de/speatzle/morffix/config"
|
"git.lastassault.de/speatzle/morffix/config"
|
||||||
|
"git.lastassault.de/speatzle/morffix/workerpool"
|
||||||
)
|
)
|
||||||
|
|
||||||
var conf config.Config
|
var conf config.Config
|
||||||
|
@ -129,8 +130,11 @@ func Start(_conf config.Config, tmplFS embed.FS, staticFS embed.FS, migrationsFS
|
||||||
stopWorkerManagement := make(chan bool, 1)
|
stopWorkerManagement := make(chan bool, 1)
|
||||||
go manageWorkers(stopWorkerManagement)
|
go manageWorkers(stopWorkerManagement)
|
||||||
|
|
||||||
|
scanPool := workerpool.NewWorkerPool(10, 100000)
|
||||||
|
hashPool := workerpool.NewWorkerPool(5, 100000)
|
||||||
|
|
||||||
stopScanning := make(chan bool, 1)
|
stopScanning := make(chan bool, 1)
|
||||||
go manageScan(stopScanning)
|
go manageScan(stopScanning, scanPool, hashPool)
|
||||||
|
|
||||||
sigs := make(chan os.Signal, 1)
|
sigs := make(chan os.Signal, 1)
|
||||||
signal.Notify(sigs, os.Interrupt)
|
signal.Notify(sigs, os.Interrupt)
|
||||||
|
@ -146,6 +150,10 @@ func Start(_conf config.Config, tmplFS embed.FS, staticFS embed.FS, migrationsFS
|
||||||
stopWorkerManagement <- true
|
stopWorkerManagement <- true
|
||||||
slog.Info("Stopping Scanning...")
|
slog.Info("Stopping Scanning...")
|
||||||
stopScanning <- true
|
stopScanning <- true
|
||||||
|
slog.Info("Stopping Scanning Pool...")
|
||||||
|
scanPool.Stop()
|
||||||
|
slog.Info("Stopping Hashing Pool...")
|
||||||
|
hashPool.Stop()
|
||||||
slog.Info("Done")
|
slog.Info("Done")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
67
workerpool/workerpool.go
Normal file
67
workerpool/workerpool.go
Normal file
|
@ -0,0 +1,67 @@
|
||||||
|
package workerpool
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
type WorkerPool struct {
|
||||||
|
workers int
|
||||||
|
ctx context.Context
|
||||||
|
cancel context.CancelFunc
|
||||||
|
wg sync.WaitGroup
|
||||||
|
queue chan func(context.Context)
|
||||||
|
qg sync.WaitGroup
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewWorkerPool(workers int, queueSize int) *WorkerPool {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
queue := make(chan func(context.Context), queueSize)
|
||||||
|
|
||||||
|
workerPool := WorkerPool{
|
||||||
|
workers: workers,
|
||||||
|
ctx: ctx,
|
||||||
|
cancel: cancel,
|
||||||
|
queue: queue,
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < workerPool.workers; i++ {
|
||||||
|
workerPool.wg.Add(1)
|
||||||
|
go workerPool.work(workerPool.ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &workerPool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wp *WorkerPool) Stop() {
|
||||||
|
wp.cancel()
|
||||||
|
wp.wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wp *WorkerPool) QueueJob(job func(context.Context)) {
|
||||||
|
wp.qg.Add(1)
|
||||||
|
wp.queue <- job
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wp *WorkerPool) WaitForEmptyQueue() {
|
||||||
|
wp.qg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wp *WorkerPool) QueueLength() int {
|
||||||
|
return len(wp.queue)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wp *WorkerPool) work(ctx context.Context) {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
wp.wg.Done()
|
||||||
|
return
|
||||||
|
case job := <-wp.queue:
|
||||||
|
func() {
|
||||||
|
defer wp.qg.Done()
|
||||||
|
job(ctx)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Add table
Add a link
Reference in a new issue