diff --git a/task/task.go b/task/task.go index 2b632c1..c0212d1 100644 --- a/task/task.go +++ b/task/task.go @@ -1,9 +1,78 @@ package task -type Task struct { - ID int `json:"id"` - FileID int `json:"file_id"` - Type int `json:"type"` - Status int `json:"status"` - Log []string `json:"log"` +import ( + "encoding/json" + "fmt" + "sync" + + "git.lastassault.de/speatzle/morffix/config" + "git.lastassault.de/speatzle/morffix/constants" + "git.lastassault.de/speatzle/morffix/types" +) + +var tasks = map[int]*types.Task{} +var taskMutex sync.Mutex + +func StartTask(conf config.Config, data types.TaskStart) error { + taskMutex.Lock() + defer taskMutex.Unlock() + + tasks[data.ID] = &types.Task{ + ID: data.ID, + Type: data.Type, + FileID: data.FileID, + } + + switch data.Type { + case constants.TASK_TYPE_HEALTHCHECK: + var hData HealthCheckData + err := json.Unmarshal(data.Data, &hData) + if err != nil { + return fmt.Errorf("Unmarshal Healthcheck Data: %w", err) + } + + tasks[data.ID].Status = constants.TASK_STATUS_RUNNING + go func() { + defer func() { + if tasks[data.ID].Status == constants.TASK_STATUS_RUNNING { + tasks[data.ID].Status = constants.TASK_STATUS_FAILED + tasks[data.ID].Log = append(tasks[data.ID].Log, "Task Status Set to Failed by defer") + } + }() + RunHealthCheck(conf, tasks[data.ID], hData) + }() + return nil + case constants.TASK_TYPE_TRANSCODE: + return fmt.Errorf("Transcode Task Not Implemented") + default: + return fmt.Errorf("Unknown Task Type %v", data.Type) + } +} + +func Get() []types.Task { + taskMutex.Lock() + defer taskMutex.Unlock() + + t := []types.Task{} + for i := range tasks { + t = append(t, *tasks[i]) + } + return t +} + +func DeleteTask(id int) error { + taskMutex.Lock() + defer taskMutex.Unlock() + + _, ok := tasks[id] + if !ok { + return fmt.Errorf("Task does not Exist") + } + + if tasks[id].Status == constants.TASK_STATUS_RUNNING { + return fmt.Errorf("Task is Currently Running") + } + + delete(tasks, id) + return nil } diff --git a/worker/task.go b/worker/task.go index 887aeb6..80be85c 100644 --- a/worker/task.go +++ b/worker/task.go @@ -4,20 +4,16 @@ import ( "context" "encoding/json" "fmt" - "sync" - "git.lastassault.de/speatzle/morffix/constants" "git.lastassault.de/speatzle/morffix/rpc" "git.lastassault.de/speatzle/morffix/task" "git.lastassault.de/speatzle/morffix/types" ) -var tasks map[int]*task.Task -var taskMutex sync.Mutex - func init() { rpcServer.RegisterMethod("task-start", taskStart) rpcServer.RegisterMethod("task-status", taskStatus) + rpcServer.RegisterMethod("task-delete", taskDelete) } func taskStart(ctx context.Context, req rpc.Request) (any, error) { @@ -27,40 +23,31 @@ func taskStart(ctx context.Context, req rpc.Request) (any, error) { return nil, fmt.Errorf("Unmarshal Task Start Params: %w", err) } - // TODO move to task module - taskMutex.Lock() - defer taskMutex.Unlock() - - switch data.Type { - case constants.TASK_TYPE_HEALTHCHECK: - tasks[data.ID] = &task.Task{ - ID: data.ID, - Type: data.Type, - FileID: data.FileID, - } - - var hData task.HealthCheckData - err := json.Unmarshal(data.Data, &hData) - if err != nil { - return nil, fmt.Errorf("Unmarshal Healthcheck Data: %w", err) - } - - tasks[data.ID].Status = constants.TASK_STATUS_RUNNING - go tasks[data.ID].RunHealthCheck(hData) - case constants.TASK_TYPE_TRANSCODE: - return nil, fmt.Errorf("Transcode Task Not Implemented") - default: - return nil, fmt.Errorf("Unknown Task Type %v", data.Type) + err = task.StartTask(conf, data) + if err != nil { + return nil, fmt.Errorf("Starting Task: %w", err) } return nil, nil } func taskStatus(ctx context.Context, req rpc.Request) (any, error) { - var id int - err := json.Unmarshal(*req.Params, &id) + return types.TaskStatus{ + Tasks: task.Get(), + }, nil +} + +func taskDelete(ctx context.Context, req rpc.Request) (any, error) { + var data int + err := json.Unmarshal(*req.Params, &data) if err != nil { - return nil, fmt.Errorf("Unmarshal Task Status Params: %w", err) + return nil, fmt.Errorf("Unmarshal Task Delete ID: %w", err) } + + err = task.DeleteTask(data) + if err != nil { + return nil, fmt.Errorf("Deleting Task: %w", err) + } + return nil, nil }