morffix/server/worker.go

216 lines
5.7 KiB
Go

package server
import (
"context"
"log/slog"
"net/http"
"sync"
"time"
"git.lastassault.de/speatzle/morffix/rpc"
"git.lastassault.de/speatzle/morffix/types"
"git.lastassault.de/speatzle/morffix/constants"
"nhooyr.io/websocket"
)
type Worker struct {
Name string
Address string
Conn *websocket.Conn
Connected bool
ConnectionChanged time.Time
}
var Workers = map[string]*Worker{}
var WorkersMutex sync.Mutex
var rpcServer = rpc.NewServer()
func handleWorkerWebsocket(w http.ResponseWriter, r *http.Request) {
if r.Header.Get(constants.SHARED_SECRET_HEADER) != conf.SharedSecret {
w.WriteHeader(http.StatusUnauthorized)
return
}
if r.Header.Get(constants.WORKER_VERSION_HEADER) != constants.WORKER_VERSION {
w.WriteHeader(http.StatusNotImplemented)
return
}
// Connection ID
uuid := r.Header.Get(constants.UUID_HEADER)
if uuid == "" {
w.WriteHeader(http.StatusBadRequest)
return
}
name := r.Header.Get(constants.NAME_HEADER)
_, err := db.Exec(r.Context(), "INSERT INTO workers (id, name) VALUES ($1, $2) ON CONFLICT (id) DO UPDATE SET name = $2", uuid, name)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
slog.ErrorContext(r.Context(), "Error Upserting Worker", "err", err)
return
}
c, err := websocket.Accept(w, r, &websocket.AcceptOptions{
InsecureSkipVerify: true,
})
if err != nil {
slog.ErrorContext(r.Context(), "Error Accepting Worker Websocket Connection", "err", err)
return
}
defer c.CloseNow()
// Track Connection
func() {
WorkersMutex.Lock()
defer WorkersMutex.Unlock()
w, ok := Workers[uuid]
if ok && w.Connected {
slog.WarnContext(r.Context(), "Worker Reconnected before the old connection died, killing...")
w.Conn.CloseNow()
// Since we Closed the Connection, we need to wait for the old connections deferred cleanup to finish before Writing down the new connection or we will be overwritten.
for {
WorkersMutex.Unlock()
time.Sleep(time.Millisecond * 10)
WorkersMutex.Lock()
if !Workers[uuid].Connected {
break
}
}
}
Workers[uuid] = &Worker{
Name: name,
Address: r.RemoteAddr,
Conn: c,
Connected: true,
ConnectionChanged: time.Now(),
}
// TODO call function for init worker connect or call function to handle Worker Reconnection (get task status, continue processing)
}()
// Set Status on Disconnect
defer func() {
WorkersMutex.Lock()
defer WorkersMutex.Unlock()
Workers[uuid].Connected = false
Workers[uuid].ConnectionChanged = time.Now()
}()
slog.InfoContext(r.Context(), "Worker Websocket Connection", "RemoteAddress", r.RemoteAddr)
for {
err = readMessage(r.Context(), c)
if websocket.CloseStatus(err) == websocket.StatusNormalClosure || websocket.CloseStatus(err) == websocket.StatusAbnormalClosure || websocket.CloseStatus(err) == websocket.StatusGoingAway {
slog.InfoContext(r.Context(), "Websocket Closed")
return
}
if err != nil {
slog.ErrorContext(r.Context(), "Error Reading Websocket Message", "err", err)
return
}
}
}
func readMessage(ctx context.Context, c *websocket.Conn) error {
t, data, err := c.Read(ctx)
if err != nil {
return err
}
slog.InfoContext(ctx, "Got Websocket Message", "type", t.String(), "data", data)
rpcServer.HandleMessage(ctx, c, data)
return nil
}
func manageWorkers(stop chan bool) {
deadTicker := time.NewTicker(time.Second)
statusTicker := time.NewTicker(time.Second)
assignTicker := time.NewTicker(time.Second)
for {
select {
case <-deadTicker.C:
killDeadWorkers()
case <-statusTicker.C:
updateWorkerTaskStatus(context.TODO())
case <-assignTicker.C:
err := assignQueuedTasks(context.TODO())
if err != nil {
slog.Error("Assigning Queued Tasks", "err", err)
}
case <-stop:
return
}
}
}
func killDeadWorkers() {
WorkersMutex.Lock()
defer WorkersMutex.Unlock()
for uuid, w := range Workers {
// TODO Not dead Workers are sometimes also killed?
if !w.Connected && w.ConnectionChanged.Add(time.Minute*5).Before(time.Now()) {
slog.Warn("Removing Dead Worker", "uuid", uuid, "name", w.Name)
delete(Workers, uuid)
// TODO Free any Jobs that the Worker had
}
}
}
func updateWorkerTaskStatus(ctx context.Context) {
var wg sync.WaitGroup
func() {
WorkersMutex.Lock()
defer WorkersMutex.Unlock()
for uuid := range Workers {
if Workers[uuid].Connected {
w := Workers[uuid]
wg.Add(1)
go func() {
defer wg.Done()
var status types.TaskStatus
_, err := rpcServer.Call(ctx, w.Conn, "task-status", nil, &status)
if err != nil {
slog.ErrorContext(ctx, "Error Getting Task Status", "err", err)
// TODO Mark Worker / Tasks as Unknown?
return
}
for _, t := range status.Tasks {
// TODO check if this Task was even assigned to this Worker
_, err = db.Exec(ctx, "UPDATE tasks SET status = $2, log = $3 WHERE id = $1", t.ID, t.Status, t.Log)
if err != nil {
slog.ErrorContext(ctx, "Error Updating Task Status", "err", err)
continue
}
// Tell Worker to Delete Finished Tasks
if t.Status == constants.TASK_STATUS_FAILED || t.Status == constants.TASK_STATUS_SUCCESS {
_, err := rpcServer.Call(ctx, w.Conn, "task-delete", t.ID, nil)
if err != nil {
slog.ErrorContext(ctx, "Error Deleting Finished Task From Worker", "err", err)
continue
}
}
}
// TODO Set Task Status to QUEUED For Unfinished Tasks which where assigned to this worker but where not in the status response
}()
} else {
// Set Task Status to Unknown for Unfinished Tasks which where assigned to this not connected worker
}
}
}()
wg.Wait()
}