From 4194ab5944801134b7c37c4b61e689f291d828ef Mon Sep 17 00:00:00 2001 From: Samuel Lorch Date: Fri, 26 Apr 2024 22:48:09 +0200 Subject: [PATCH] Add Worker Cleanup --- server/server.go | 4 ++++ server/worker.go | 23 +++++++++++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/server/server.go b/server/server.go index 96de468..82d96a7 100644 --- a/server/server.go +++ b/server/server.go @@ -40,6 +40,9 @@ func Start(_conf config.Config) { serverClose <- true }() + stopCleanup := make(chan bool, 1) + go cleanupDeadWorkers(stopCleanup) + sigs := make(chan os.Signal, 1) signal.Notify(sigs, os.Interrupt) select { @@ -50,6 +53,7 @@ func Start(_conf config.Config) { stopCtx, cancel := context.WithTimeout(context.Background(), time.Second*10) server.Shutdown(stopCtx) cancel() + stopCleanup <- true slog.Info("Done") } } diff --git a/server/worker.go b/server/worker.go index 2a2edfc..2af1738 100644 --- a/server/worker.go +++ b/server/worker.go @@ -101,3 +101,26 @@ func readMessage(ctx context.Context, c *websocket.Conn) error { slog.InfoContext(ctx, "Got Websocket Message", "type", t.String(), "data", data) return c.Write(ctx, t, data) } + +func cleanupDeadWorkers(stop chan bool) { + ticker := time.NewTicker(time.Second) + for { + select { + case <-ticker.C: + func() { + WorkersMutex.Lock() + defer WorkersMutex.Unlock() + + for uuid, w := range Workers { + if !w.Connected && w.ConnectionChanged.Add(time.Minute*5).Before(time.Now()) { + slog.Warn("Removing Dead Worker", "uuid", uuid, "name", w.Name) + delete(Workers, uuid) + // TODO Free any Jobs that the Worker had + } + } + }() + case <-stop: + return + } + } +}