diff --git a/migrations/000019_alter_libraries_table_command.down.sql b/migrations/000019_alter_libraries_table_command.down.sql index f88eea8..7b8bc97 100644 --- a/migrations/000019_alter_libraries_table_command.down.sql +++ b/migrations/000019_alter_libraries_table_command.down.sql @@ -1,3 +1,3 @@ ALTER TABLE libraries -DROP IF EXISTS transcode_command, -DROP IF EXISTS health_command; +DROP IF EXISTS transcode_command_id, +DROP IF EXISTS health_command_id; diff --git a/migrations/000019_alter_libraries_table_command.up.sql b/migrations/000019_alter_libraries_table_command.up.sql index b813289..54b6404 100644 --- a/migrations/000019_alter_libraries_table_command.up.sql +++ b/migrations/000019_alter_libraries_table_command.up.sql @@ -1,3 +1,3 @@ ALTER TABLE libraries -ADD transcode_command bigint REFERENCES ffmpeg_commands(id), -ADD health_command bigint REFERENCES ffmpeg_commands(id); +ADD transcode_command_id bigint REFERENCES ffmpeg_commands(id), +ADD health_command_id bigint REFERENCES ffmpeg_commands(id); diff --git a/server/libraries.go b/server/libraries.go index 29912a7..13f0d6e 100644 --- a/server/libraries.go +++ b/server/libraries.go @@ -18,8 +18,8 @@ type LibrariesData struct { type Library struct { ID string `db:"id"` - HealthCommand *int `db:"health_command"` - TranscodeCommand *int `db:"transcode_command"` + HealthCommandID *int `db:"health_command_id"` + TranscodeCommandID *int `db:"transcode_command_id"` Name string `db:"name"` Path string `db:"path"` Enable bool `db:"enable"` @@ -102,21 +102,21 @@ func createLibrary(r *http.Request) error { scan_changed_queue_transcode := r.FormValue("scan_changed_queue_transcode") == "on" health_ok_queue_transcode := r.FormValue("health_ok_queue_transcode") == "on" - var health_command *int = nil - var transcode_command *int = nil + var health_command_id *int = nil + var transcode_command_id *int = nil - h, err := strconv.Atoi(r.FormValue("health_command")) + h, err := strconv.Atoi(r.FormValue("health_command_id")) if err == nil { - health_command = &h + health_command_id = &h } - t, err := strconv.Atoi(r.FormValue("transcode_command")) + t, err := strconv.Atoi(r.FormValue("transcode_command_id")) if err == nil { - transcode_command = &t + transcode_command_id = &t } slog.Info("Got Library Create", "name", name, "path", path, "enable", enable) - _, err = db.Exec(r.Context(), "INSERT INTO Libraries (name, path, enable, scan_new_queue_health, scan_changed_queue_health, scan_new_queue_transcode, scan_changed_queue_transcode, health_ok_queue_transcode, transcode_replace, transcode_path, health_command, transcode_command) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12)", name, path, enable, scan_new_queue_health, scan_changed_queue_health, scan_new_queue_transcode, scan_changed_queue_transcode, health_ok_queue_transcode, transcode_replace, transcode_path, health_command, transcode_command) + _, err = db.Exec(r.Context(), "INSERT INTO Libraries (name, path, enable, scan_new_queue_health, scan_changed_queue_health, scan_new_queue_transcode, scan_changed_queue_transcode, health_ok_queue_transcode, transcode_replace, transcode_path, health_command_id, transcode_command_id) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12)", name, path, enable, scan_new_queue_health, scan_changed_queue_health, scan_new_queue_transcode, scan_changed_queue_transcode, health_ok_queue_transcode, transcode_replace, transcode_path, health_command_id, transcode_command_id) if err != nil { return fmt.Errorf("Inserting Library: %w", err) } diff --git a/server/scan.go b/server/scan.go index f9b237e..9bf7a03 100644 --- a/server/scan.go +++ b/server/scan.go @@ -174,7 +174,9 @@ func scanLibrary(ctx context.Context, id int, scanPool *workerpool.WorkerPool) { var name string var lpath string var enabled bool - err := db.QueryRow(ctx, "SELECT name, path, enable FROM libraries WHERE id = $1", id).Scan(&name, &lpath, &enabled) + var health_command_id *int + var scan_new_queue_health bool + err := db.QueryRow(ctx, "SELECT name, path, enable, health_command_id, scan_new_queue_health FROM libraries WHERE id = $1", id).Scan(&name, &lpath, &enabled, &health_command_id, &scan_new_queue_health) if err != nil { slog.ErrorContext(ctx, "Get Library", "err", err) return @@ -252,10 +254,20 @@ func scanLibrary(ctx context.Context, id int, scanPool *workerpool.WorkerPool) { } slog.InfoContext(ctx, "ffprobe Done", "path", fullPath) - _, err = db.Exec(ctx, "INSERT INTO files (library_id, path, size, status, health, ffprobe_data, mod_time) VALUES ($1, $2, $3, $4, $5, $6, $7)", id, fPath, info.Size(), constants.FILE_STATUS_NEW, constants.FILE_HEALTH_UNKNOWN, ffprobeData, newModTime) + var file_id int + err = db.QueryRow(ctx, `INSERT INTO files (library_id, path, size, status, health, ffprobe_data, mod_time) + VALUES ($1, $2, $3, $4, $5, $6, $7) + RETURNING id`, id, fPath, info.Size(), constants.FILE_STATUS_NEW, constants.FILE_HEALTH_UNKNOWN, ffprobeData, newModTime).Scan(&file_id) if err != nil { slog.ErrorContext(ctx, "Add New File to DB", "err", err) } + if scan_new_queue_health && health_command_id != nil { + slog.InfoContext(ctx, "Queueing HealthCheck On New File", "file_id", file_id, "health_command_id", health_command_id) + _, err = db.Exec(ctx, "INSERT INTO tasks (file_id, type, status, data, ffmpeg_command_id) VALUES ($1,$2,$3,$4,$5)", file_id, constants.TASK_TYPE_HEALTHCHECK, constants.TASK_STATUS_QUEUED, nil, health_command_id) + if err != nil { + slog.ErrorContext(ctx, "Queueing Transcode On HealthCheck Success", "err", err, "file_id", file_id, "health_command_id", health_command_id) + } + } }) return nil } else if err != nil { diff --git a/server/task.go b/server/task.go index 57e124b..119a7ad 100644 --- a/server/task.go +++ b/server/task.go @@ -344,7 +344,7 @@ func assignQueuedTasks(ctx context.Context) error { assignRunning = false }() - rows, err := db.Query(ctx, "SELECT t.id as id, t.type as type, t.file_id as file_id, f.md5 as md5, t.data as data, fc.data as ffmpeg_command FROM tasks t INNER JOIN files f ON f.id = t.file_id INNER JOIN ffmpeg_commands fc ON fc.id = t.ffmpeg_command_id WHERE t.status = $1", constants.TASK_STATUS_QUEUED) + rows, err := db.Query(ctx, "SELECT t.id as id, t.type as type, t.file_id as file_id, f.md5 as md5, t.data as data, fc.data as ffmpeg_command FROM tasks t INNER JOIN files f ON f.id = t.file_id INNER JOIN ffmpeg_commands fc ON fc.id = t.ffmpeg_command_id WHERE t.status = $1 AND f.status", constants.TASK_STATUS_QUEUED, constants.FILE_STATUS_EXISTS) if err != nil { return fmt.Errorf("Query Queued Tasks: %w", err) } diff --git a/server/worker.go b/server/worker.go index caffd57..9668c45 100644 --- a/server/worker.go +++ b/server/worker.go @@ -247,7 +247,8 @@ func updateWorkerTaskStatus(ctx context.Context) { var health constants.FileHealth if ts.Task.Status == constants.TASK_STATUS_SUCCESS { health = constants.FILE_HEALTH_HEALTHY - // TODO Auto Queue Transcode for Successfull Transcodes if library setting + // Auto Queue Transcode for Successfull Transcodes if library setting say so + go queueOnHealth(context.TODO(), ts.Task.ID) } else { // TODO, not all failures mean the file is damaged, only update on success and track ffmpeg errors in task result data. also remove -xerror and scan for errors manually to see all problems in logs health = constants.FILE_HEALTH_DAMAGED @@ -294,3 +295,26 @@ func updateWorkerTaskStatus(ctx context.Context) { wg.Wait() } + +func queueOnHealth(ctx context.Context, taskID int) { + var transcode_command_id *int + var file_id *int + err := db.QueryRow(ctx, `SELECT libraries.transcode_command_id, file.id + FROM tasks + INNER JOIN files ON files.id = tasks.file_id + INNER JOIN libraries ON files.library_id = libraries.id + WHERE tasks.id = $1 AND libraries.health_ok_queue_transcode = t AND libraries.transcode_command_id IS NOT NULL`, taskID). + Scan(&transcode_command_id, &file_id) + if err == pgx.ErrNoRows { + slog.DebugContext(ctx, "No need to queue Transcode", "task_id", taskID) + } else if err != nil { + slog.ErrorContext(ctx, "Unable to Query if and how she should queue a Transcode on Health Check Success", "err", err) + return + } + + slog.InfoContext(ctx, "Queueing Transcode On HealthCheck Success", "health_task_id", taskID, "transcode_command_id", transcode_command_id) + _, err = db.Exec(ctx, "INSERT INTO tasks (file_id, type, status, data, ffmpeg_command_id) VALUES ($1,$2,$3,$4,$5)", file_id, constants.TASK_TYPE_TRANSCODE, constants.TASK_STATUS_QUEUED, nil, transcode_command_id) + if err != nil { + slog.ErrorContext(ctx, "Queueing Transcode On HealthCheck Success", "err", err, "health_task_id", taskID, "transcode_command_id", transcode_command_id) + } +} diff --git a/tmpl/libraries.tmpl b/tmpl/libraries.tmpl index 0a205ad..317d066 100644 --- a/tmpl/libraries.tmpl +++ b/tmpl/libraries.tmpl @@ -75,23 +75,23 @@ - + - + - + - - {{range $l := .FfmpegCommands}} {{end}} - - {{range $l := .FfmpegCommands}}