Move Task Starting into task

This commit is contained in:
Samuel Lorch 2024-05-09 04:45:54 +02:00
parent bce27eddf3
commit b17e55de7d
2 changed files with 94 additions and 38 deletions

View file

@ -1,9 +1,78 @@
package task
type Task struct {
ID int `json:"id"`
FileID int `json:"file_id"`
Type int `json:"type"`
Status int `json:"status"`
Log []string `json:"log"`
import (
"encoding/json"
"fmt"
"sync"
"git.lastassault.de/speatzle/morffix/config"
"git.lastassault.de/speatzle/morffix/constants"
"git.lastassault.de/speatzle/morffix/types"
)
var tasks = map[int]*types.Task{}
var taskMutex sync.Mutex
func StartTask(conf config.Config, data types.TaskStart) error {
taskMutex.Lock()
defer taskMutex.Unlock()
tasks[data.ID] = &types.Task{
ID: data.ID,
Type: data.Type,
FileID: data.FileID,
}
switch data.Type {
case constants.TASK_TYPE_HEALTHCHECK:
var hData HealthCheckData
err := json.Unmarshal(data.Data, &hData)
if err != nil {
return fmt.Errorf("Unmarshal Healthcheck Data: %w", err)
}
tasks[data.ID].Status = constants.TASK_STATUS_RUNNING
go func() {
defer func() {
if tasks[data.ID].Status == constants.TASK_STATUS_RUNNING {
tasks[data.ID].Status = constants.TASK_STATUS_FAILED
tasks[data.ID].Log = append(tasks[data.ID].Log, "Task Status Set to Failed by defer")
}
}()
RunHealthCheck(conf, tasks[data.ID], hData)
}()
return nil
case constants.TASK_TYPE_TRANSCODE:
return fmt.Errorf("Transcode Task Not Implemented")
default:
return fmt.Errorf("Unknown Task Type %v", data.Type)
}
}
func Get() []types.Task {
taskMutex.Lock()
defer taskMutex.Unlock()
t := []types.Task{}
for i := range tasks {
t = append(t, *tasks[i])
}
return t
}
func DeleteTask(id int) error {
taskMutex.Lock()
defer taskMutex.Unlock()
_, ok := tasks[id]
if !ok {
return fmt.Errorf("Task does not Exist")
}
if tasks[id].Status == constants.TASK_STATUS_RUNNING {
return fmt.Errorf("Task is Currently Running")
}
delete(tasks, id)
return nil
}

View file

@ -4,20 +4,16 @@ import (
"context"
"encoding/json"
"fmt"
"sync"
"git.lastassault.de/speatzle/morffix/constants"
"git.lastassault.de/speatzle/morffix/rpc"
"git.lastassault.de/speatzle/morffix/task"
"git.lastassault.de/speatzle/morffix/types"
)
var tasks map[int]*task.Task
var taskMutex sync.Mutex
func init() {
rpcServer.RegisterMethod("task-start", taskStart)
rpcServer.RegisterMethod("task-status", taskStatus)
rpcServer.RegisterMethod("task-delete", taskDelete)
}
func taskStart(ctx context.Context, req rpc.Request) (any, error) {
@ -27,40 +23,31 @@ func taskStart(ctx context.Context, req rpc.Request) (any, error) {
return nil, fmt.Errorf("Unmarshal Task Start Params: %w", err)
}
// TODO move to task module
taskMutex.Lock()
defer taskMutex.Unlock()
switch data.Type {
case constants.TASK_TYPE_HEALTHCHECK:
tasks[data.ID] = &task.Task{
ID: data.ID,
Type: data.Type,
FileID: data.FileID,
}
var hData task.HealthCheckData
err := json.Unmarshal(data.Data, &hData)
if err != nil {
return nil, fmt.Errorf("Unmarshal Healthcheck Data: %w", err)
}
tasks[data.ID].Status = constants.TASK_STATUS_RUNNING
go tasks[data.ID].RunHealthCheck(hData)
case constants.TASK_TYPE_TRANSCODE:
return nil, fmt.Errorf("Transcode Task Not Implemented")
default:
return nil, fmt.Errorf("Unknown Task Type %v", data.Type)
err = task.StartTask(conf, data)
if err != nil {
return nil, fmt.Errorf("Starting Task: %w", err)
}
return nil, nil
}
func taskStatus(ctx context.Context, req rpc.Request) (any, error) {
var id int
err := json.Unmarshal(*req.Params, &id)
return types.TaskStatus{
Tasks: task.Get(),
}, nil
}
func taskDelete(ctx context.Context, req rpc.Request) (any, error) {
var data int
err := json.Unmarshal(*req.Params, &data)
if err != nil {
return nil, fmt.Errorf("Unmarshal Task Status Params: %w", err)
return nil, fmt.Errorf("Unmarshal Task Delete ID: %w", err)
}
err = task.DeleteTask(data)
if err != nil {
return nil, fmt.Errorf("Deleting Task: %w", err)
}
return nil, nil
}