morffix/server/worker.go

139 lines
3.5 KiB
Go

package server
import (
"context"
"log/slog"
"net/http"
"sync"
"time"
"git.lastassault.de/speatzle/morffix/rpc"
"git.lastassault.de/speatzle/morffix/constants"
"nhooyr.io/websocket"
)
type Worker struct {
Name string
Address string
Conn *websocket.Conn
Connected bool
ConnectionChanged time.Time
}
var Workers = map[string]*Worker{}
var WorkersMutex sync.Mutex
var rpcServer = rpc.NewServer()
func handleWorkerWebsocket(w http.ResponseWriter, r *http.Request) {
if r.Header.Get(constants.SHARED_SECRET_HEADER) != conf.SharedSecret {
w.WriteHeader(http.StatusUnauthorized)
return
}
if r.Header.Get(constants.WORKER_VERSION_HEADER) != constants.WORKER_VERSION {
w.WriteHeader(http.StatusNotImplemented)
return
}
c, err := websocket.Accept(w, r, &websocket.AcceptOptions{
InsecureSkipVerify: true,
})
if err != nil {
slog.ErrorContext(r.Context(), "Error Accepting Worker Websocket Connection", "err", err)
return
}
defer c.CloseNow()
// Connection ID
uuid := r.Header.Get(constants.UUID_HEADER)
// Track Connection
func() {
WorkersMutex.Lock()
defer WorkersMutex.Unlock()
w, ok := Workers[uuid]
if ok && w.Connected {
slog.WarnContext(r.Context(), "Worker Reconnected before the old connection died, killing...")
w.Conn.CloseNow()
// Since we Closed the Connection, we need to wait for the old connections deferred cleanup to finish before Writing down the new connection or we will be overwritten.
for {
WorkersMutex.Unlock()
time.Sleep(time.Millisecond * 10)
WorkersMutex.Lock()
if !Workers[uuid].Connected {
break
}
}
}
Workers[uuid] = &Worker{
Name: r.Header.Get(constants.NAME_HEADER),
Address: r.RemoteAddr,
Conn: c,
Connected: true,
ConnectionChanged: time.Now(),
}
// TODO call function for init worker connect or call function to handle Worker Reconnection (get task status, continue processing)
}()
// Set Status on Disconnect
defer func() {
WorkersMutex.Lock()
defer WorkersMutex.Unlock()
Workers[uuid].Connected = false
Workers[uuid].ConnectionChanged = time.Now()
}()
slog.InfoContext(r.Context(), "Worker Websocket Connection", "RemoteAddress", r.RemoteAddr)
for {
err = readMessage(r.Context(), c)
if websocket.CloseStatus(err) == websocket.StatusNormalClosure || websocket.CloseStatus(err) == websocket.StatusAbnormalClosure || websocket.CloseStatus(err) == websocket.StatusGoingAway {
slog.InfoContext(r.Context(), "Websocket Closed")
return
}
if err != nil {
slog.ErrorContext(r.Context(), "Error Reading Websocket Message", "err", err)
return
}
}
}
func readMessage(ctx context.Context, c *websocket.Conn) error {
t, data, err := c.Read(ctx)
if err != nil {
return err
}
slog.InfoContext(ctx, "Got Websocket Message", "type", t.String(), "data", data)
rpcServer.HandleMessage(ctx, c, data)
return nil
}
func cleanupDeadWorkers(stop chan bool) {
ticker := time.NewTicker(time.Second)
for {
select {
case <-ticker.C:
func() {
WorkersMutex.Lock()
defer WorkersMutex.Unlock()
for uuid, w := range Workers {
// TODO Not dead Workers are sometimes also killed?
if !w.Connected && w.ConnectionChanged.Add(time.Minute*5).Before(time.Now()) {
slog.Warn("Removing Dead Worker", "uuid", uuid, "name", w.Name)
delete(Workers, uuid)
// TODO Free any Jobs that the Worker had
}
}
}()
case <-stop:
return
}
}
}