121 lines
2.6 KiB
Go
121 lines
2.6 KiB
Go
package task
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"sync"
|
|
|
|
"git.lastassault.de/speatzle/morffix/config"
|
|
"git.lastassault.de/speatzle/morffix/constants"
|
|
"git.lastassault.de/speatzle/morffix/types"
|
|
)
|
|
|
|
var tasks = map[int]*types.Task{}
|
|
var taskMutex sync.Mutex
|
|
|
|
func StartTask(conf config.Config, data types.TaskStart) error {
|
|
taskMutex.Lock()
|
|
defer taskMutex.Unlock()
|
|
|
|
_, ok := tasks[data.ID]
|
|
if ok {
|
|
return constants.ErrTaskIsAlreadyRunning
|
|
}
|
|
|
|
tasks[data.ID] = &types.Task{
|
|
ID: data.ID,
|
|
Type: data.Type,
|
|
FileID: data.FileID,
|
|
FileMD5: data.FileMD5,
|
|
FfmpegCommand: data.FfmpegCommand,
|
|
}
|
|
|
|
switch data.Type {
|
|
case constants.TASK_TYPE_HEALTHCHECK:
|
|
var hData types.HealthCheckData
|
|
err := json.Unmarshal(data.Data, &hData)
|
|
if err != nil {
|
|
return fmt.Errorf("Unmarshal Healthcheck Data: %w", err)
|
|
}
|
|
|
|
tasks[data.ID].Status = constants.TASK_STATUS_RUNNING
|
|
go func() {
|
|
defer func() {
|
|
taskMutex.Lock()
|
|
defer taskMutex.Unlock()
|
|
t, ok := tasks[data.ID]
|
|
if ok {
|
|
if t.Status == constants.TASK_STATUS_RUNNING {
|
|
t.Status = constants.TASK_STATUS_FAILED
|
|
t.Log = append(t.Log, "Task Status Set to Failed by defer")
|
|
}
|
|
}
|
|
}()
|
|
RunHealthCheck(conf, tasks[data.ID], hData)
|
|
}()
|
|
return nil
|
|
case constants.TASK_TYPE_TRANSCODE:
|
|
var tData types.TranscodeData
|
|
err := json.Unmarshal(data.Data, &tData)
|
|
if err != nil {
|
|
return fmt.Errorf("Unmarshal Transcode Data: %w", err)
|
|
}
|
|
|
|
tasks[data.ID].Status = constants.TASK_STATUS_RUNNING
|
|
go func() {
|
|
defer func() {
|
|
taskMutex.Lock()
|
|
defer taskMutex.Unlock()
|
|
t, ok := tasks[data.ID]
|
|
if ok {
|
|
if t.Status == constants.TASK_STATUS_RUNNING {
|
|
t.Status = constants.TASK_STATUS_FAILED
|
|
t.Log = append(t.Log, "Task Status Set to Failed by defer")
|
|
}
|
|
}
|
|
}()
|
|
RunTranscode(conf, tasks[data.ID], tData)
|
|
}()
|
|
return nil
|
|
default:
|
|
return fmt.Errorf("Unknown Task Type %v", data.Type)
|
|
}
|
|
}
|
|
|
|
func Get(r types.TaskStatusRequest) (*types.Task, error) {
|
|
taskMutex.Lock()
|
|
defer taskMutex.Unlock()
|
|
|
|
t, ok := tasks[r.ID]
|
|
if !ok {
|
|
return nil, constants.ErrTaskDoesNotExist
|
|
}
|
|
|
|
res := *t
|
|
|
|
// Send only new logs if there are any
|
|
if len(res.Log) >= r.LogOffset {
|
|
res.Log = res.Log[r.LogOffset:]
|
|
} else {
|
|
res.Log = []string{}
|
|
}
|
|
|
|
return &res, nil
|
|
}
|
|
|
|
func DeleteTask(id int) error {
|
|
taskMutex.Lock()
|
|
defer taskMutex.Unlock()
|
|
|
|
_, ok := tasks[id]
|
|
if !ok {
|
|
return constants.ErrTaskDoesNotExist
|
|
}
|
|
|
|
if tasks[id].Status == constants.TASK_STATUS_RUNNING {
|
|
return fmt.Errorf("Task is Currently Running")
|
|
}
|
|
|
|
delete(tasks, id)
|
|
return nil
|
|
}
|