From a040d5d56ce3df570978b8b9d60b6ea7b2e7f49a Mon Sep 17 00:00:00 2001 From: speatzle Date: Tue, 7 May 2024 18:52:06 +0200 Subject: [PATCH] task wip --- config/config.go | 6 ++-- constants/constants.go | 10 ++++++- task/healthcheck.go | 17 +++++++++++ task/task.go | 9 ++++++ types/task.go | 18 ++++++++++++ worker/task.go | 66 ++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 123 insertions(+), 3 deletions(-) create mode 100644 task/healthcheck.go create mode 100644 task/task.go create mode 100644 types/task.go create mode 100644 worker/task.go diff --git a/config/config.go b/config/config.go index 9194e79..cf61b60 100644 --- a/config/config.go +++ b/config/config.go @@ -7,8 +7,10 @@ type Config struct { } type Server struct { - Address string - Database string + Address string + Database string + HealthCheckCommand string + TranscodeCommand string } type Worker struct { diff --git a/constants/constants.go b/constants/constants.go index a1f5cf3..2d8d7b6 100644 --- a/constants/constants.go +++ b/constants/constants.go @@ -14,5 +14,13 @@ const MESSAGE_TEMPLATE_NAME = "message.tmpl" const TASK_TEMPLATE_NAME = "tasks.tmpl" const ( - HEALTHCHECK_TASK_TYPE = iota + TASK_TYPE_HEALTHCHECK = iota + TASK_TYPE_TRANSCODE +) + +const ( + TASK_STATUS_UNKNOWN = iota + TASK_STATUS_RUNNING + TASK_STATUS_FAILED + TASK_STATUS_SUCCESS ) diff --git a/task/healthcheck.go b/task/healthcheck.go new file mode 100644 index 0000000..19e0266 --- /dev/null +++ b/task/healthcheck.go @@ -0,0 +1,17 @@ +package task + +import "git.lastassault.de/speatzle/morffix/constants" + +type HealthCheckData struct { + Command string `json:"command"` +} + +func (t *Task) RunHealthCheck(data HealthCheckData) { + defer func() { + if t.Status == constants.TASK_STATUS_RUNNING { + t.Status = constants.TASK_STATUS_FAILED + t.Log = append(t.Log, "Task Status Failed by Defer") + } + }() + +} diff --git a/task/task.go b/task/task.go new file mode 100644 index 0000000..2b632c1 --- /dev/null +++ b/task/task.go @@ -0,0 +1,9 @@ +package task + +type Task struct { + ID int `json:"id"` + FileID int `json:"file_id"` + Type int `json:"type"` + Status int `json:"status"` + Log []string `json:"log"` +} diff --git a/types/task.go b/types/task.go new file mode 100644 index 0000000..d88958b --- /dev/null +++ b/types/task.go @@ -0,0 +1,18 @@ +package types + +import ( + "encoding/json" + + "git.lastassault.de/speatzle/morffix/task" +) + +type TaskStart struct { + ID int `json:"id"` + FileID int `json:"file_id"` + Type int `json:"type"` + Data json.RawMessage +} + +type TaskStatus struct { + task.Task +} diff --git a/worker/task.go b/worker/task.go new file mode 100644 index 0000000..887aeb6 --- /dev/null +++ b/worker/task.go @@ -0,0 +1,66 @@ +package worker + +import ( + "context" + "encoding/json" + "fmt" + "sync" + + "git.lastassault.de/speatzle/morffix/constants" + "git.lastassault.de/speatzle/morffix/rpc" + "git.lastassault.de/speatzle/morffix/task" + "git.lastassault.de/speatzle/morffix/types" +) + +var tasks map[int]*task.Task +var taskMutex sync.Mutex + +func init() { + rpcServer.RegisterMethod("task-start", taskStart) + rpcServer.RegisterMethod("task-status", taskStatus) +} + +func taskStart(ctx context.Context, req rpc.Request) (any, error) { + var data types.TaskStart + err := json.Unmarshal(*req.Params, &data) + if err != nil { + return nil, fmt.Errorf("Unmarshal Task Start Params: %w", err) + } + + // TODO move to task module + taskMutex.Lock() + defer taskMutex.Unlock() + + switch data.Type { + case constants.TASK_TYPE_HEALTHCHECK: + tasks[data.ID] = &task.Task{ + ID: data.ID, + Type: data.Type, + FileID: data.FileID, + } + + var hData task.HealthCheckData + err := json.Unmarshal(data.Data, &hData) + if err != nil { + return nil, fmt.Errorf("Unmarshal Healthcheck Data: %w", err) + } + + tasks[data.ID].Status = constants.TASK_STATUS_RUNNING + go tasks[data.ID].RunHealthCheck(hData) + case constants.TASK_TYPE_TRANSCODE: + return nil, fmt.Errorf("Transcode Task Not Implemented") + default: + return nil, fmt.Errorf("Unknown Task Type %v", data.Type) + } + + return nil, nil +} + +func taskStatus(ctx context.Context, req rpc.Request) (any, error) { + var id int + err := json.Unmarshal(*req.Params, &id) + if err != nil { + return nil, fmt.Errorf("Unmarshal Task Status Params: %w", err) + } + return nil, nil +}