Implement Queue Health On New File and Queue Transcode on Health Success
All checks were successful
/ release (push) Successful in 32s

This commit is contained in:
speatzle 2024-10-11 17:07:52 +02:00
parent f08a07e87d
commit 12f700d1d0
7 changed files with 60 additions and 24 deletions

View file

@ -1,3 +1,3 @@
ALTER TABLE libraries
DROP IF EXISTS transcode_command,
DROP IF EXISTS health_command;
DROP IF EXISTS transcode_command_id,
DROP IF EXISTS health_command_id;

View file

@ -1,3 +1,3 @@
ALTER TABLE libraries
ADD transcode_command bigint REFERENCES ffmpeg_commands(id),
ADD health_command bigint REFERENCES ffmpeg_commands(id);
ADD transcode_command_id bigint REFERENCES ffmpeg_commands(id),
ADD health_command_id bigint REFERENCES ffmpeg_commands(id);

View file

@ -18,8 +18,8 @@ type LibrariesData struct {
type Library struct {
ID string `db:"id"`
HealthCommand *int `db:"health_command"`
TranscodeCommand *int `db:"transcode_command"`
HealthCommandID *int `db:"health_command_id"`
TranscodeCommandID *int `db:"transcode_command_id"`
Name string `db:"name"`
Path string `db:"path"`
Enable bool `db:"enable"`
@ -102,21 +102,21 @@ func createLibrary(r *http.Request) error {
scan_changed_queue_transcode := r.FormValue("scan_changed_queue_transcode") == "on"
health_ok_queue_transcode := r.FormValue("health_ok_queue_transcode") == "on"
var health_command *int = nil
var transcode_command *int = nil
var health_command_id *int = nil
var transcode_command_id *int = nil
h, err := strconv.Atoi(r.FormValue("health_command"))
h, err := strconv.Atoi(r.FormValue("health_command_id"))
if err == nil {
health_command = &h
health_command_id = &h
}
t, err := strconv.Atoi(r.FormValue("transcode_command"))
t, err := strconv.Atoi(r.FormValue("transcode_command_id"))
if err == nil {
transcode_command = &t
transcode_command_id = &t
}
slog.Info("Got Library Create", "name", name, "path", path, "enable", enable)
_, err = db.Exec(r.Context(), "INSERT INTO Libraries (name, path, enable, scan_new_queue_health, scan_changed_queue_health, scan_new_queue_transcode, scan_changed_queue_transcode, health_ok_queue_transcode, transcode_replace, transcode_path, health_command, transcode_command) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12)", name, path, enable, scan_new_queue_health, scan_changed_queue_health, scan_new_queue_transcode, scan_changed_queue_transcode, health_ok_queue_transcode, transcode_replace, transcode_path, health_command, transcode_command)
_, err = db.Exec(r.Context(), "INSERT INTO Libraries (name, path, enable, scan_new_queue_health, scan_changed_queue_health, scan_new_queue_transcode, scan_changed_queue_transcode, health_ok_queue_transcode, transcode_replace, transcode_path, health_command_id, transcode_command_id) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12)", name, path, enable, scan_new_queue_health, scan_changed_queue_health, scan_new_queue_transcode, scan_changed_queue_transcode, health_ok_queue_transcode, transcode_replace, transcode_path, health_command_id, transcode_command_id)
if err != nil {
return fmt.Errorf("Inserting Library: %w", err)
}

View file

@ -174,7 +174,9 @@ func scanLibrary(ctx context.Context, id int, scanPool *workerpool.WorkerPool) {
var name string
var lpath string
var enabled bool
err := db.QueryRow(ctx, "SELECT name, path, enable FROM libraries WHERE id = $1", id).Scan(&name, &lpath, &enabled)
var health_command_id *int
var scan_new_queue_health bool
err := db.QueryRow(ctx, "SELECT name, path, enable, health_command_id, scan_new_queue_health FROM libraries WHERE id = $1", id).Scan(&name, &lpath, &enabled, &health_command_id, &scan_new_queue_health)
if err != nil {
slog.ErrorContext(ctx, "Get Library", "err", err)
return
@ -252,10 +254,20 @@ func scanLibrary(ctx context.Context, id int, scanPool *workerpool.WorkerPool) {
}
slog.InfoContext(ctx, "ffprobe Done", "path", fullPath)
_, err = db.Exec(ctx, "INSERT INTO files (library_id, path, size, status, health, ffprobe_data, mod_time) VALUES ($1, $2, $3, $4, $5, $6, $7)", id, fPath, info.Size(), constants.FILE_STATUS_NEW, constants.FILE_HEALTH_UNKNOWN, ffprobeData, newModTime)
var file_id int
err = db.QueryRow(ctx, `INSERT INTO files (library_id, path, size, status, health, ffprobe_data, mod_time)
VALUES ($1, $2, $3, $4, $5, $6, $7)
RETURNING id`, id, fPath, info.Size(), constants.FILE_STATUS_NEW, constants.FILE_HEALTH_UNKNOWN, ffprobeData, newModTime).Scan(&file_id)
if err != nil {
slog.ErrorContext(ctx, "Add New File to DB", "err", err)
}
if scan_new_queue_health && health_command_id != nil {
slog.InfoContext(ctx, "Queueing HealthCheck On New File", "file_id", file_id, "health_command_id", health_command_id)
_, err = db.Exec(ctx, "INSERT INTO tasks (file_id, type, status, data, ffmpeg_command_id) VALUES ($1,$2,$3,$4,$5)", file_id, constants.TASK_TYPE_HEALTHCHECK, constants.TASK_STATUS_QUEUED, nil, health_command_id)
if err != nil {
slog.ErrorContext(ctx, "Queueing Transcode On HealthCheck Success", "err", err, "file_id", file_id, "health_command_id", health_command_id)
}
}
})
return nil
} else if err != nil {

View file

@ -344,7 +344,7 @@ func assignQueuedTasks(ctx context.Context) error {
assignRunning = false
}()
rows, err := db.Query(ctx, "SELECT t.id as id, t.type as type, t.file_id as file_id, f.md5 as md5, t.data as data, fc.data as ffmpeg_command FROM tasks t INNER JOIN files f ON f.id = t.file_id INNER JOIN ffmpeg_commands fc ON fc.id = t.ffmpeg_command_id WHERE t.status = $1", constants.TASK_STATUS_QUEUED)
rows, err := db.Query(ctx, "SELECT t.id as id, t.type as type, t.file_id as file_id, f.md5 as md5, t.data as data, fc.data as ffmpeg_command FROM tasks t INNER JOIN files f ON f.id = t.file_id INNER JOIN ffmpeg_commands fc ON fc.id = t.ffmpeg_command_id WHERE t.status = $1 AND f.status", constants.TASK_STATUS_QUEUED, constants.FILE_STATUS_EXISTS)
if err != nil {
return fmt.Errorf("Query Queued Tasks: %w", err)
}

View file

@ -247,7 +247,8 @@ func updateWorkerTaskStatus(ctx context.Context) {
var health constants.FileHealth
if ts.Task.Status == constants.TASK_STATUS_SUCCESS {
health = constants.FILE_HEALTH_HEALTHY
// TODO Auto Queue Transcode for Successfull Transcodes if library setting
// Auto Queue Transcode for Successfull Transcodes if library setting say so
go queueOnHealth(context.TODO(), ts.Task.ID)
} else {
// TODO, not all failures mean the file is damaged, only update on success and track ffmpeg errors in task result data. also remove -xerror and scan for errors manually to see all problems in logs
health = constants.FILE_HEALTH_DAMAGED
@ -294,3 +295,26 @@ func updateWorkerTaskStatus(ctx context.Context) {
wg.Wait()
}
func queueOnHealth(ctx context.Context, taskID int) {
var transcode_command_id *int
var file_id *int
err := db.QueryRow(ctx, `SELECT libraries.transcode_command_id, file.id
FROM tasks
INNER JOIN files ON files.id = tasks.file_id
INNER JOIN libraries ON files.library_id = libraries.id
WHERE tasks.id = $1 AND libraries.health_ok_queue_transcode = t AND libraries.transcode_command_id IS NOT NULL`, taskID).
Scan(&transcode_command_id, &file_id)
if err == pgx.ErrNoRows {
slog.DebugContext(ctx, "No need to queue Transcode", "task_id", taskID)
} else if err != nil {
slog.ErrorContext(ctx, "Unable to Query if and how she should queue a Transcode on Health Check Success", "err", err)
return
}
slog.InfoContext(ctx, "Queueing Transcode On HealthCheck Success", "health_task_id", taskID, "transcode_command_id", transcode_command_id)
_, err = db.Exec(ctx, "INSERT INTO tasks (file_id, type, status, data, ffmpeg_command_id) VALUES ($1,$2,$3,$4,$5)", file_id, constants.TASK_TYPE_TRANSCODE, constants.TASK_STATUS_QUEUED, nil, transcode_command_id)
if err != nil {
slog.ErrorContext(ctx, "Queueing Transcode On HealthCheck Success", "err", err, "health_task_id", taskID, "transcode_command_id", transcode_command_id)
}
}

View file

@ -75,23 +75,23 @@
<input type="text" name="transcode_path">
<label>Queue Heathcheck for new files on Scan:</label>
<input type="checkbox" name="scan_new_queue_health">
<label>Queue Heathcheck for changed files on Scan:</label>
<label>Queue Heathcheck for changed files on Scan (Not Implemented):</label>
<input type="checkbox" name="scan_changed_queue_health">
<label>Queue Transcode for new files on Scan:</label>
<label>Queue Transcode for new files on Scan (Not Implemented):</label>
<input type="checkbox" name="scan_new_queue_transcode">
<label>Queue Transcode for changed files on Scan:</label>
<label>Queue Transcode for changed files on Scan (Not Implemented):</label>
<input type="checkbox" name="scan_changed_queue_transcode">
<label>Queue Transcode on Healthcheck Success:</label>
<input type="checkbox" name="health_ok_queue_transcode">
<label for="health_command">Health Command:</label>
<select id="health_command" name="health_command">
<label for="health_command_id">Health Command:</label>
<select id="health_command_id" name="health_command_id">
<option value="">None</option>
{{range $l := .FfmpegCommands}}
<option value="{{$l.ID}}">{{$l.Name}}</option>
{{end}}
</select>
<label for="transcode_command">Transcode Command:</label>
<select id="transcode_command" name="transcode_command">
<label for="transcode_command_id">Transcode Command:</label>
<select id="transcode_command_id" name="transcode_command_id">
<option value="">None</option>
{{range $l := .FfmpegCommands}}
<option value="{{$l.ID}}">{{$l.Name}}</option>