diff --git a/server/worker.go b/server/worker.go index 2af1738..0390462 100644 --- a/server/worker.go +++ b/server/worker.go @@ -7,6 +7,8 @@ import ( "sync" "time" + "git.lastassault.de/speatzle/morffix/rpc" + "git.lastassault.de/speatzle/morffix/constants" "nhooyr.io/websocket" @@ -23,6 +25,8 @@ type Worker struct { var Workers = map[string]*Worker{} var WorkersMutex sync.Mutex +var rpcServer = rpc.NewServer() + func handleWorkerWebsocket(w http.ResponseWriter, r *http.Request) { if r.Header.Get(constants.SHARED_SECRET_HEADER) != conf.SharedSecret { w.WriteHeader(http.StatusUnauthorized) @@ -68,6 +72,9 @@ func handleWorkerWebsocket(w http.ResponseWriter, r *http.Request) { Connected: true, ConnectionChanged: time.Now(), } + + // TODO call function for init worker connect or call function to handle Worker Reconnection (get task status, continue processing) + }() // Set Status on Disconnect @@ -99,7 +106,8 @@ func readMessage(ctx context.Context, c *websocket.Conn) error { return err } slog.InfoContext(ctx, "Got Websocket Message", "type", t.String(), "data", data) - return c.Write(ctx, t, data) + rpcServer.HandleMessage(ctx, c, data) + return nil } func cleanupDeadWorkers(stop chan bool) { @@ -112,6 +120,7 @@ func cleanupDeadWorkers(stop chan bool) { defer WorkersMutex.Unlock() for uuid, w := range Workers { + // TODO Not dead Workers are sometimes also killed? if !w.Connected && w.ConnectionChanged.Add(time.Minute*5).Before(time.Now()) { slog.Warn("Removing Dead Worker", "uuid", uuid, "name", w.Name) delete(Workers, uuid) diff --git a/worker/worker.go b/worker/worker.go index 973ca3e..3f42d47 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -5,6 +5,9 @@ import ( "io" "log/slog" "net/http" + + "git.lastassault.de/speatzle/morffix/rpc" + "os" "os/signal" "time" @@ -18,6 +21,7 @@ import ( ) var conf config.Config +var rpcServer = rpc.NewServer() func Start(_conf config.Config) { conf = _conf @@ -69,10 +73,6 @@ func connectToServer(ctx context.Context, uuid uuid.UUID) { } return } - go func() { - time.Sleep(time.Second * 5) - c.Write(ctx, websocket.MessageText, []byte("test")) - }() slog.InfoContext(ctx, "Waiting for Messages") for { @@ -94,5 +94,6 @@ func readMessage(ctx context.Context, c *websocket.Conn) error { return err } slog.InfoContext(ctx, "Got Websocket Message", "type", t.String(), "data", data) + rpcServer.HandleMessage(ctx, c, data) return nil }