diff --git a/config/config.go b/config/config.go index ff3fb40..330584d 100644 --- a/config/config.go +++ b/config/config.go @@ -12,4 +12,5 @@ type Server struct { type Worker struct { Address string + Name string } diff --git a/constants/constants.go b/constants/constants.go index 321166a..c2ba5eb 100644 --- a/constants/constants.go +++ b/constants/constants.go @@ -1,3 +1,5 @@ package constants const SHARED_SECRET_HEADER = "morffix-secret" +const NAME_HEADER = "morffix-name" +const UUID_HEADER = "morffix-uuid" diff --git a/go.mod b/go.mod index ff6d54f..d1ead06 100644 --- a/go.mod +++ b/go.mod @@ -4,4 +4,7 @@ go 1.21.8 require github.com/BurntSushi/toml v1.3.2 -require nhooyr.io/websocket v1.8.11 // indirect +require ( + github.com/google/uuid v1.6.0 // indirect + nhooyr.io/websocket v1.8.11 // indirect +) diff --git a/go.sum b/go.sum index d3e27ac..354268f 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,6 @@ github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8= github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= nhooyr.io/websocket v1.8.11 h1:f/qXNc2/3DpoSZkHt1DQu6rj4zGC8JmkkLkWss0MgN0= nhooyr.io/websocket v1.8.11/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c= diff --git a/server/worker.go b/server/worker.go index fe111f6..2a2edfc 100644 --- a/server/worker.go +++ b/server/worker.go @@ -4,16 +4,24 @@ import ( "context" "log/slog" "net/http" + "sync" + "time" "git.lastassault.de/speatzle/morffix/constants" "nhooyr.io/websocket" ) -type Connection struct { +type Worker struct { + Name string + Address string + Conn *websocket.Conn + Connected bool + ConnectionChanged time.Time } -var Connections []Connection +var Workers = map[string]*Worker{} +var WorkersMutex sync.Mutex func handleWorkerWebsocket(w http.ResponseWriter, r *http.Request) { if r.Header.Get(constants.SHARED_SECRET_HEADER) != conf.SharedSecret { @@ -29,9 +37,48 @@ func handleWorkerWebsocket(w http.ResponseWriter, r *http.Request) { } defer c.CloseNow() - slog.InfoContext(r.Context(), "Worker Websocket Connection", "RemoteAddress", r.RemoteAddr) + // Connection ID + uuid := r.Header.Get(constants.UUID_HEADER) - // TODO Add Connection Tracking + // Track Connection + func() { + WorkersMutex.Lock() + defer WorkersMutex.Unlock() + + w, ok := Workers[uuid] + if ok && w.Connected { + slog.WarnContext(r.Context(), "Worker Reconnected before the old connection died, killing...") + w.Conn.CloseNow() + + // Since we Closed the Connection, we need to wait for the old connections deferred cleanup to finish before Writing down the new connection or we will be overwritten. + for { + WorkersMutex.Unlock() + time.Sleep(time.Millisecond * 10) + WorkersMutex.Lock() + if !Workers[uuid].Connected { + break + } + } + } + + Workers[uuid] = &Worker{ + Name: r.Header.Get(constants.NAME_HEADER), + Address: r.RemoteAddr, + Conn: c, + Connected: true, + ConnectionChanged: time.Now(), + } + }() + + // Set Status on Disconnect + defer func() { + WorkersMutex.Lock() + defer WorkersMutex.Unlock() + Workers[uuid].Connected = false + Workers[uuid].ConnectionChanged = time.Now() + }() + + slog.InfoContext(r.Context(), "Worker Websocket Connection", "RemoteAddress", r.RemoteAddr) for { err = readMessage(r.Context(), c) diff --git a/worker/worker.go b/worker/worker.go index e686d10..5e6ee6a 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -10,15 +10,15 @@ import ( "git.lastassault.de/speatzle/morffix/config" "git.lastassault.de/speatzle/morffix/constants" + "github.com/google/uuid" + "nhooyr.io/websocket" ) -func Start(conf config.Config) { - ctx := context.Background() - headers := http.Header{} - + uuid := uuid.New() slog.InfoContext(ctx, "Connecting to Server...", "Address", conf.Worker.Address) headers.Add(constants.SHARED_SECRET_HEADER, conf.SharedSecret) + headers.Add(constants.UUID_HEADER, uuid.String()) c, res, err := websocket.Dial(ctx, conf.Worker.Address, &websocket.DialOptions{ HTTPHeader: headers, })