package server import ( "bytes" "context" "errors" "fmt" "log/slog" "net/http" "os" "path/filepath" "slices" "sync" "time" "git.lastassault.de/speatzle/morffix/constants" "git.lastassault.de/speatzle/morffix/workerpool" "github.com/jackc/pgx/v5" "gopkg.in/vansante/go-ffprobe.v2" ) var videoFileExtensions = []string{".mkv", ".mp4", ".webm", ".flv", ".avi"} func handleScan(w http.ResponseWriter, r *http.Request) { http.Error(w, "Not Implemented", http.StatusNotImplemented) /* if r.Method == "GET" { scanStatus(w, r) return } id, err := strconv.Atoi(r.FormValue("id")) if err != nil { http.Error(w, "Parsing ID", http.StatusBadRequest) return } var name string var path string var enabled bool err = db.QueryRow(r.Context(), "SELECT name, path, enable FROM libraries WHERE id = $1", id).Scan(&name, &path, &enabled) if err != nil { slog.ErrorContext(r.Context(), "Get Library", "err", err) http.Error(w, "Error Get Library: "+err.Error(), http.StatusInternalServerError) return } scanCtx := context.Background() go scanLibrary(scanCtx, id) message := "Scan Started" buf := bytes.Buffer{} err = templates.ExecuteTemplate(&buf, constants.MESSAGE_TEMPLATE_NAME, message) if err != nil { slog.ErrorContext(r.Context(), "Executing Library Template", "err", err) http.Error(w, "Error Executing Template: "+err.Error(), http.StatusInternalServerError) return } _, err = w.Write(buf.Bytes()) if err != nil { slog.ErrorContext(r.Context(), "Writing http Response", "err", err) } */ } func scanStatus(w http.ResponseWriter, r *http.Request) { message := "TODO" buf := bytes.Buffer{} err := templates.ExecuteTemplate(&buf, constants.MESSAGE_TEMPLATE_NAME, message) if err != nil { slog.ErrorContext(r.Context(), "Executing Library Template", "err", err) http.Error(w, "Error Executing Template: "+err.Error(), http.StatusInternalServerError) return } _, err = w.Write(buf.Bytes()) if err != nil { slog.ErrorContext(r.Context(), "Writing http Response", "err", err) } } func manageScan(stop chan bool, scanPool *workerpool.WorkerPool, hashPool *workerpool.WorkerPool) { scanTicker := time.NewTicker(time.Minute) hashTicker := time.NewTicker(time.Second) scanRunning := false hashRunning := false ctx, cancel := context.WithCancel(context.Background()) for { select { case <-scanTicker.C: if scanRunning { continue } scanRunning = true go func() { defer func() { scanRunning = false }() rows, err := db.Query(ctx, "SELECT id FROM libraries WHERE enable = true") if err != nil { slog.ErrorContext(ctx, "Error getting Enabled Libraries for Scan", "err", err) return } libraries, err := pgx.CollectRows[int](rows, pgx.RowTo[int]) if err != nil { slog.ErrorContext(ctx, "Error Collecting Enabled Libraries for Scan", "err", err) return } for _, l := range libraries { scanLibrary(ctx, l, scanPool) } }() case <-hashTicker.C: if hashRunning { continue } hashRunning = true go func() { defer func() { hashRunning = false }() rows, err := db.Query(ctx, "SELECT f.id FROM files f INNER JOIN libraries l ON f.library_id = l.id WHERE l.enable = true AND (f.status = $1 OR f.status = $2) LIMIT 10", constants.FILE_STATUS_CHANGED, constants.FILE_STATUS_NEW) if err != nil { if !errors.Is(err, pgx.ErrNoRows) { slog.ErrorContext(ctx, "Error Getting Files for Hashing", "err", err) } // Nothing to do return } files, err := pgx.CollectRows[uint](rows, pgx.RowTo[uint]) if err != nil { slog.ErrorContext(ctx, "Collect File Rows", "err", err) return } for _, f := range files { hashPool.QueueJob(func(ctx context.Context) { err = hashFile(ctx, f) if err != nil { slog.ErrorContext(ctx, "Error Hashing File", "err", err) } }) } // Wait for all jobs to finish since we don't track which ones are in progress and don't want to hash the same file twice. Not ideal but better than just doing one at a time hashPool.WaitForEmptyQueue() }() case <-stop: cancel() return } } } var scanLock sync.Mutex func scanLibrary(ctx context.Context, id int, scanPool *workerpool.WorkerPool) { slog.InfoContext(ctx, "Acquiring Scan Lock...") scanLock.Lock() defer scanLock.Unlock() slog.InfoContext(ctx, "Starting Scan", "id", id) var name string var lpath string var enabled bool var health_command_id *int var scan_new_queue_health bool err := db.QueryRow(ctx, "SELECT name, path, enable, health_command_id, scan_new_queue_health FROM libraries WHERE id = $1", id).Scan(&name, &lpath, &enabled, &health_command_id, &scan_new_queue_health) if err != nil { slog.ErrorContext(ctx, "Get Library", "err", err) return } if !enabled { slog.ErrorContext(ctx, "Scan Aborted, Library not Enabled", "id", id) return } dirInfo, err := os.Stat(lpath) if err != nil { slog.ErrorContext(ctx, "Stating Library Path", "id", id, "path", lpath, "err", err) return } if !dirInfo.IsDir() { slog.ErrorContext(ctx, "Library Path is not a Folder", "id", id, "path", lpath) return } slog.InfoContext(ctx, "Checking Files...", "id", id, "path", lpath) // TODO Work on getting missing status again /* // Mark all Files as Missing _, err = tx.Exec(ctx, "UPDATE files SET status = $2 where library_id = $1", id, constants.FILE_STATUS_MISSING) if err != nil { slog.ErrorContext(ctx, "Setting Missing Status", "err", err) return } */ err = filepath.Walk(lpath, func(fullPath string, info os.FileInfo, err error) error { if errors.Is(err, os.ErrPermission) { slog.WarnContext(ctx, "Permission Denied While Scanning File", "path", fullPath) return nil } else if err != nil { return err } if info.IsDir() { // We don't care about folders return nil } if !slices.Contains(videoFileExtensions, filepath.Ext(fullPath)) { slog.DebugContext(ctx, "Skipping non video file", "path", fullPath) return nil } fPath, err := filepath.Rel(lpath, fullPath) if err != nil { return fmt.Errorf("Getting Relative Path: %w", err) } var fileID int var oldSize uint var oldModTime time.Time // Remove Timezone and Round to nearest Second newModTime := info.ModTime().UTC().Round(time.Second) err = db.QueryRow(ctx, "SELECT id, size, mod_time FROM files WHERE library_id = $1 AND path = $2", id, fPath).Scan(&fileID, &oldSize, &oldModTime) if errors.Is(err, pgx.ErrNoRows) { // File Does not Exist Yet scanPool.QueueJob( func(ctx context.Context) { slog.InfoContext(ctx, "File is New, Running FFProbe...", "path", fullPath) ffprobeData, err := ffprobe.ProbeURL(ctx, fullPath) if err != nil { slog.ErrorContext(ctx, "ffprobe New File", "err", err) } slog.InfoContext(ctx, "ffprobe Done", "path", fullPath) var file_id int err = db.QueryRow(ctx, `INSERT INTO files (library_id, path, size, status, health, ffprobe_data, mod_time) VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING id`, id, fPath, info.Size(), constants.FILE_STATUS_NEW, constants.FILE_HEALTH_UNKNOWN, ffprobeData, newModTime).Scan(&file_id) if err != nil { slog.ErrorContext(ctx, "Add New File to DB", "err", err) } if scan_new_queue_health && health_command_id != nil { slog.InfoContext(ctx, "Queueing HealthCheck On New File", "file_id", file_id, "health_command_id", health_command_id) _, err = db.Exec(ctx, "INSERT INTO tasks (file_id, type, status, data, ffmpeg_command_id) VALUES ($1,$2,$3,$4,$5)", file_id, constants.TASK_TYPE_HEALTHCHECK, constants.TASK_STATUS_QUEUED, nil, health_command_id) if err != nil { slog.ErrorContext(ctx, "Queueing Transcode On HealthCheck Success", "err", err, "file_id", file_id, "health_command_id", health_command_id) } } }) return nil } else if err != nil { return fmt.Errorf("Getting File: %w", err) } // File Already Exists, Check if it has been changed if newModTime.Equal(oldModTime) && uint(info.Size()) == oldSize { slog.Debug("File stayed the same", "id", fileID) } else { scanPool.QueueJob(func(ctx context.Context) { slog.InfoContext(ctx, "File Has Changed", "path", fullPath, "old_mod_time", oldModTime, "new_mod_time", newModTime, "old_size", oldSize, "new_size", info.Size()) ffprobeData, err := ffprobe.ProbeURL(ctx, fullPath) if err != nil { slog.ErrorContext(ctx, "ffprobe Changed File", "err", err) } slog.InfoContext(ctx, "ffprobe Done", "path", fullPath) _, err = db.Exec(ctx, "UPDATE files SET size = $2, status = $3, health = $4, mod_time = $5, ffprobe_data = $6 WHERE id = $1", fileID, info.Size(), constants.FILE_STATUS_CHANGED, constants.FILE_HEALTH_UNKNOWN, newModTime, ffprobeData) if err != nil { slog.ErrorContext(ctx, "Updating Changed File in DB", "err", err) } }) } return nil }) if err != nil { slog.ErrorContext(ctx, "Error Walking Library", "err", err) return } scanPool.WaitForEmptyQueue() slog.InfoContext(ctx, "Scan Done", "id", id) }