Show Task Status better
This commit is contained in:
parent
755b650df5
commit
a9adb5ebdd
3 changed files with 79 additions and 8 deletions
|
@ -1,5 +1,7 @@
|
|||
package constants
|
||||
|
||||
import "fmt"
|
||||
|
||||
const WORKER_VERSION = "v1"
|
||||
|
||||
const WORKER_VERSION_HEADER = "morffix-version"
|
||||
|
@ -19,9 +21,11 @@ const (
|
|||
TASK_TYPE_TRANSCODE
|
||||
)
|
||||
|
||||
type TaskStatus int
|
||||
|
||||
// Non Append Changes Need Worker Version Bump
|
||||
const (
|
||||
TASK_STATUS_UNKNOWN = iota
|
||||
TASK_STATUS_UNKNOWN TaskStatus = iota
|
||||
TASK_STATUS_FAILED
|
||||
TASK_STATUS_SUCCESS
|
||||
TASK_STATUS_RUNNING
|
||||
|
@ -30,3 +34,26 @@ const (
|
|||
TASK_STATUS_PAUSED
|
||||
TASK_STATUS_WAITING
|
||||
)
|
||||
|
||||
func (s TaskStatus) String() string {
|
||||
switch s {
|
||||
case TASK_STATUS_UNKNOWN:
|
||||
return "Unknown"
|
||||
case TASK_STATUS_FAILED:
|
||||
return "Failed"
|
||||
case TASK_STATUS_SUCCESS:
|
||||
return "Success"
|
||||
case TASK_STATUS_RUNNING:
|
||||
return "Running"
|
||||
case TASK_STATUS_QUEUED:
|
||||
return "Queued"
|
||||
case TASK_STATUS_ASSIGNED:
|
||||
return "Assigned"
|
||||
case TASK_STATUS_PAUSED:
|
||||
return "Paused"
|
||||
case TASK_STATUS_WAITING:
|
||||
return "Waiting"
|
||||
default:
|
||||
return fmt.Sprintf("%d", int(s))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,6 +18,24 @@ type TasksData struct {
|
|||
Tasks []TaskDisplay
|
||||
}
|
||||
|
||||
type TaskDisplay struct {
|
||||
ID int `db:"id"`
|
||||
Library int `db:"library"`
|
||||
Worker *string `db:"worker"`
|
||||
Type int `db:"type"`
|
||||
Status string `db:"status"`
|
||||
File string `db:"file"`
|
||||
}
|
||||
|
||||
type TaskDB struct {
|
||||
ID int `db:"id"`
|
||||
Library int `db:"library"`
|
||||
Worker *string `db:"worker"`
|
||||
Type int `db:"type"`
|
||||
Status constants.TaskStatus `db:"status"`
|
||||
File string `db:"file"`
|
||||
}
|
||||
|
||||
func handleTasks(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method == "POST" {
|
||||
err := createTask(r.Context(), r)
|
||||
|
@ -50,13 +68,22 @@ func handleTasks(w http.ResponseWriter, r *http.Request) {
|
|||
http.Error(w, "Error Query Tasks: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
tasks, err := pgx.CollectRows[TaskDisplay](rows, pgx.RowToStructByName[TaskDisplay])
|
||||
tasks, err := pgx.CollectRows[TaskDB](rows, pgx.RowToStructByName[TaskDB])
|
||||
if err != nil {
|
||||
slog.ErrorContext(r.Context(), "Collect Tasks", "err", err)
|
||||
http.Error(w, "Error Query Tasks: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
data.Tasks = tasks
|
||||
for i := range tasks {
|
||||
data.Tasks = append(data.Tasks, TaskDisplay{
|
||||
ID: tasks[i].ID,
|
||||
Library: tasks[i].Library,
|
||||
Worker: tasks[i].Worker,
|
||||
Type: tasks[i].Type,
|
||||
File: tasks[i].File,
|
||||
Status: tasks[i].Status.String(),
|
||||
})
|
||||
}
|
||||
|
||||
buf := bytes.Buffer{}
|
||||
err = templates.ExecuteTemplate(&buf, constants.TASKS_TEMPLATE_NAME, data)
|
||||
|
@ -133,11 +160,11 @@ func createTask(ctx context.Context, r *http.Request) error {
|
|||
if true { // TODO typ == constants.TASK_TYPE_HEALTHCHECK {
|
||||
|
||||
// ffmpeg.exe -stats -v error -i "in.mp4" -f null -max_muxing_queue_size 9999 "out.mp4"
|
||||
data = types.FFmpegCommand{
|
||||
Args: []types.Arg{{Flag: "-stats"}, {Flag: "-v", Value: "error"}},
|
||||
data = types.HealthCheckData{Command: types.FFmpegCommand{
|
||||
Args: []types.Arg{{Flag: "-stats"}, {Flag: "-v", Value: "error"}, {Flag: "-xerror"}},
|
||||
InputFiles: []types.File{{Path: "input.mkv"}},
|
||||
OutputFiles: []types.File{{Path: "output.mkv", Arguments: []types.Arg{{Flag: "-f", Value: "null"}, {Flag: "-max_muxing_queue_size", Value: "9999"}}}},
|
||||
}
|
||||
}}
|
||||
}
|
||||
|
||||
for _, file := range files {
|
||||
|
@ -172,6 +199,8 @@ func assignQueuedTasks(ctx context.Context) error {
|
|||
return fmt.Errorf("Collect Queued Tasks: %w", err)
|
||||
}
|
||||
|
||||
//slog.Info("Assigning Queued Tasks", "count", len(queuedTasks))
|
||||
|
||||
if len(queuedTasks) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
@ -182,17 +211,20 @@ func assignQueuedTasks(ctx context.Context) error {
|
|||
lastAssigned := 0
|
||||
|
||||
for i := range Workers {
|
||||
if lastAssigned == len(queuedTasks)-1 {
|
||||
if lastAssigned == len(queuedTasks) {
|
||||
slog.Info("All Tasks assigned")
|
||||
// All Tasks have been Assigned
|
||||
return nil
|
||||
}
|
||||
if Workers[i].Connected {
|
||||
var count int
|
||||
err := db.QueryRow(ctx, "SELECT COUNT(*) FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4 OR status = $5)", i, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING, constants.TASK_STATUS_UNKNOWN).Scan(&count)
|
||||
err := db.QueryRow(ctx, "SELECT COUNT(*) FROM tasks WHERE worker_id = $1 AND (status = $2 OR status = $3 OR status = $4)", i, constants.TASK_STATUS_ASSIGNED, constants.TASK_STATUS_RUNNING, constants.TASK_STATUS_WAITING).Scan(&count)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error Querying Worker Task Count: %w", err)
|
||||
}
|
||||
|
||||
slog.Info("Assigning Queued Tasks Worker", "worker", i, "count", count)
|
||||
|
||||
// Allow for Multiple Tasks at once in the future
|
||||
if count < 1 {
|
||||
tx, err := db.Begin(ctx)
|
||||
|
|
|
@ -19,7 +19,10 @@
|
|||
<table>
|
||||
<tr>
|
||||
<th>ID</th>
|
||||
<th>Library</th>
|
||||
<th>Worker</th>
|
||||
<th>Type</th>
|
||||
<th>Status</th>
|
||||
<th>File</th>
|
||||
</tr>
|
||||
{{range $t := .Tasks}}
|
||||
|
@ -27,9 +30,18 @@
|
|||
<td>
|
||||
{{ $t.ID }}
|
||||
</td>
|
||||
<td>
|
||||
{{ $t.Library }}
|
||||
</td>
|
||||
<td>
|
||||
{{ $t.Worker }}
|
||||
</td>
|
||||
<td>
|
||||
{{ $t.Type }}
|
||||
</td>
|
||||
<td>
|
||||
{{ $t.Status }}
|
||||
</td>
|
||||
<td>
|
||||
{{ $t.File }}
|
||||
</td>
|
||||
|
|
Loading…
Add table
Reference in a new issue