135 lines
3.3 KiB
Go
135 lines
3.3 KiB
Go
package server
|
|
|
|
import (
|
|
"context"
|
|
"log/slog"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
"git.lastassault.de/speatzle/morffix/rpc"
|
|
|
|
"git.lastassault.de/speatzle/morffix/constants"
|
|
|
|
"nhooyr.io/websocket"
|
|
)
|
|
|
|
type Worker struct {
|
|
Name string
|
|
Address string
|
|
Conn *websocket.Conn
|
|
Connected bool
|
|
ConnectionChanged time.Time
|
|
}
|
|
|
|
var Workers = map[string]*Worker{}
|
|
var WorkersMutex sync.Mutex
|
|
|
|
var rpcServer = rpc.NewServer()
|
|
|
|
func handleWorkerWebsocket(w http.ResponseWriter, r *http.Request) {
|
|
if r.Header.Get(constants.SHARED_SECRET_HEADER) != conf.SharedSecret {
|
|
w.WriteHeader(http.StatusUnauthorized)
|
|
return
|
|
}
|
|
c, err := websocket.Accept(w, r, &websocket.AcceptOptions{
|
|
InsecureSkipVerify: true,
|
|
})
|
|
if err != nil {
|
|
slog.ErrorContext(r.Context(), "Error Accepting Worker Websocket Connection", "err", err)
|
|
return
|
|
}
|
|
defer c.CloseNow()
|
|
|
|
// Connection ID
|
|
uuid := r.Header.Get(constants.UUID_HEADER)
|
|
|
|
// Track Connection
|
|
func() {
|
|
WorkersMutex.Lock()
|
|
defer WorkersMutex.Unlock()
|
|
|
|
w, ok := Workers[uuid]
|
|
if ok && w.Connected {
|
|
slog.WarnContext(r.Context(), "Worker Reconnected before the old connection died, killing...")
|
|
w.Conn.CloseNow()
|
|
|
|
// Since we Closed the Connection, we need to wait for the old connections deferred cleanup to finish before Writing down the new connection or we will be overwritten.
|
|
for {
|
|
WorkersMutex.Unlock()
|
|
time.Sleep(time.Millisecond * 10)
|
|
WorkersMutex.Lock()
|
|
if !Workers[uuid].Connected {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
Workers[uuid] = &Worker{
|
|
Name: r.Header.Get(constants.NAME_HEADER),
|
|
Address: r.RemoteAddr,
|
|
Conn: c,
|
|
Connected: true,
|
|
ConnectionChanged: time.Now(),
|
|
}
|
|
|
|
// TODO call function for init worker connect or call function to handle Worker Reconnection (get task status, continue processing)
|
|
|
|
}()
|
|
|
|
// Set Status on Disconnect
|
|
defer func() {
|
|
WorkersMutex.Lock()
|
|
defer WorkersMutex.Unlock()
|
|
Workers[uuid].Connected = false
|
|
Workers[uuid].ConnectionChanged = time.Now()
|
|
}()
|
|
|
|
slog.InfoContext(r.Context(), "Worker Websocket Connection", "RemoteAddress", r.RemoteAddr)
|
|
|
|
for {
|
|
err = readMessage(r.Context(), c)
|
|
if websocket.CloseStatus(err) == websocket.StatusNormalClosure || websocket.CloseStatus(err) == websocket.StatusAbnormalClosure || websocket.CloseStatus(err) == websocket.StatusGoingAway {
|
|
slog.InfoContext(r.Context(), "Websocket Closed")
|
|
return
|
|
}
|
|
if err != nil {
|
|
slog.ErrorContext(r.Context(), "Error Reading Websocket Message", "err", err)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func readMessage(ctx context.Context, c *websocket.Conn) error {
|
|
t, data, err := c.Read(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
slog.InfoContext(ctx, "Got Websocket Message", "type", t.String(), "data", data)
|
|
rpcServer.HandleMessage(ctx, c, data)
|
|
return nil
|
|
}
|
|
|
|
func cleanupDeadWorkers(stop chan bool) {
|
|
ticker := time.NewTicker(time.Second)
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
func() {
|
|
WorkersMutex.Lock()
|
|
defer WorkersMutex.Unlock()
|
|
|
|
for uuid, w := range Workers {
|
|
// TODO Not dead Workers are sometimes also killed?
|
|
if !w.Connected && w.ConnectionChanged.Add(time.Minute*5).Before(time.Now()) {
|
|
slog.Warn("Removing Dead Worker", "uuid", uuid, "name", w.Name)
|
|
delete(Workers, uuid)
|
|
// TODO Free any Jobs that the Worker had
|
|
}
|
|
}
|
|
}()
|
|
case <-stop:
|
|
return
|
|
}
|
|
}
|
|
}
|