package server import ( "context" "log/slog" "net/http" "sync" "time" "git.lastassault.de/speatzle/morffix/constants" "nhooyr.io/websocket" ) type Worker struct { Name string Address string Conn *websocket.Conn Connected bool ConnectionChanged time.Time } var Workers = map[string]*Worker{} var WorkersMutex sync.Mutex func handleWorkerWebsocket(w http.ResponseWriter, r *http.Request) { if r.Header.Get(constants.SHARED_SECRET_HEADER) != conf.SharedSecret { w.WriteHeader(http.StatusUnauthorized) return } c, err := websocket.Accept(w, r, &websocket.AcceptOptions{ InsecureSkipVerify: true, }) if err != nil { slog.ErrorContext(r.Context(), "Error Accepting Worker Websocket Connection", "err", err) return } defer c.CloseNow() // Connection ID uuid := r.Header.Get(constants.UUID_HEADER) // Track Connection func() { WorkersMutex.Lock() defer WorkersMutex.Unlock() w, ok := Workers[uuid] if ok && w.Connected { slog.WarnContext(r.Context(), "Worker Reconnected before the old connection died, killing...") w.Conn.CloseNow() // Since we Closed the Connection, we need to wait for the old connections deferred cleanup to finish before Writing down the new connection or we will be overwritten. for { WorkersMutex.Unlock() time.Sleep(time.Millisecond * 10) WorkersMutex.Lock() if !Workers[uuid].Connected { break } } } Workers[uuid] = &Worker{ Name: r.Header.Get(constants.NAME_HEADER), Address: r.RemoteAddr, Conn: c, Connected: true, ConnectionChanged: time.Now(), } }() // Set Status on Disconnect defer func() { WorkersMutex.Lock() defer WorkersMutex.Unlock() Workers[uuid].Connected = false Workers[uuid].ConnectionChanged = time.Now() }() slog.InfoContext(r.Context(), "Worker Websocket Connection", "RemoteAddress", r.RemoteAddr) for { err = readMessage(r.Context(), c) if websocket.CloseStatus(err) == websocket.StatusNormalClosure || websocket.CloseStatus(err) == websocket.StatusAbnormalClosure || websocket.CloseStatus(err) == websocket.StatusGoingAway { slog.InfoContext(r.Context(), "Websocket Closed") return } if err != nil { slog.ErrorContext(r.Context(), "Error Reading Websocket Message", "err", err) return } } } func readMessage(ctx context.Context, c *websocket.Conn) error { t, data, err := c.Read(ctx) if err != nil { return err } slog.InfoContext(ctx, "Got Websocket Message", "type", t.String(), "data", data) return c.Write(ctx, t, data) } func cleanupDeadWorkers(stop chan bool) { ticker := time.NewTicker(time.Second) for { select { case <-ticker.C: func() { WorkersMutex.Lock() defer WorkersMutex.Unlock() for uuid, w := range Workers { if !w.Connected && w.ConnectionChanged.Add(time.Minute*5).Before(time.Now()) { slog.Warn("Removing Dead Worker", "uuid", uuid, "name", w.Name) delete(Workers, uuid) // TODO Free any Jobs that the Worker had } } }() case <-stop: return } } }