Setup RPC Server

This commit is contained in:
Samuel Lorch 2024-04-28 01:59:38 +02:00
parent 496d5f412f
commit cf000931e4
2 changed files with 15 additions and 5 deletions

View file

@ -7,6 +7,8 @@ import (
"sync"
"time"
"git.lastassault.de/speatzle/morffix/rpc"
"git.lastassault.de/speatzle/morffix/constants"
"nhooyr.io/websocket"
@ -23,6 +25,8 @@ type Worker struct {
var Workers = map[string]*Worker{}
var WorkersMutex sync.Mutex
var rpcServer = rpc.NewServer()
func handleWorkerWebsocket(w http.ResponseWriter, r *http.Request) {
if r.Header.Get(constants.SHARED_SECRET_HEADER) != conf.SharedSecret {
w.WriteHeader(http.StatusUnauthorized)
@ -68,6 +72,9 @@ func handleWorkerWebsocket(w http.ResponseWriter, r *http.Request) {
Connected: true,
ConnectionChanged: time.Now(),
}
// TODO call function for init worker connect or call function to handle Worker Reconnection (get task status, continue processing)
}()
// Set Status on Disconnect
@ -99,7 +106,8 @@ func readMessage(ctx context.Context, c *websocket.Conn) error {
return err
}
slog.InfoContext(ctx, "Got Websocket Message", "type", t.String(), "data", data)
return c.Write(ctx, t, data)
rpcServer.HandleMessage(ctx, c, data)
return nil
}
func cleanupDeadWorkers(stop chan bool) {
@ -112,6 +120,7 @@ func cleanupDeadWorkers(stop chan bool) {
defer WorkersMutex.Unlock()
for uuid, w := range Workers {
// TODO Not dead Workers are sometimes also killed?
if !w.Connected && w.ConnectionChanged.Add(time.Minute*5).Before(time.Now()) {
slog.Warn("Removing Dead Worker", "uuid", uuid, "name", w.Name)
delete(Workers, uuid)

View file

@ -5,6 +5,9 @@ import (
"io"
"log/slog"
"net/http"
"git.lastassault.de/speatzle/morffix/rpc"
"os"
"os/signal"
"time"
@ -18,6 +21,7 @@ import (
)
var conf config.Config
var rpcServer = rpc.NewServer()
func Start(_conf config.Config) {
conf = _conf
@ -69,10 +73,6 @@ func connectToServer(ctx context.Context, uuid uuid.UUID) {
}
return
}
go func() {
time.Sleep(time.Second * 5)
c.Write(ctx, websocket.MessageText, []byte("test"))
}()
slog.InfoContext(ctx, "Waiting for Messages")
for {
@ -94,5 +94,6 @@ func readMessage(ctx context.Context, c *websocket.Conn) error {
return err
}
slog.InfoContext(ctx, "Got Websocket Message", "type", t.String(), "data", data)
rpcServer.HandleMessage(ctx, c, data)
return nil
}