Track Workers by uuid for Reconnection
This commit is contained in:
parent
9ac9ed8fe2
commit
d0f220fd0a
6 changed files with 64 additions and 9 deletions
|
@ -12,4 +12,5 @@ type Server struct {
|
||||||
|
|
||||||
type Worker struct {
|
type Worker struct {
|
||||||
Address string
|
Address string
|
||||||
|
Name string
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,3 +1,5 @@
|
||||||
package constants
|
package constants
|
||||||
|
|
||||||
const SHARED_SECRET_HEADER = "morffix-secret"
|
const SHARED_SECRET_HEADER = "morffix-secret"
|
||||||
|
const NAME_HEADER = "morffix-name"
|
||||||
|
const UUID_HEADER = "morffix-uuid"
|
||||||
|
|
5
go.mod
5
go.mod
|
@ -4,4 +4,7 @@ go 1.21.8
|
||||||
|
|
||||||
require github.com/BurntSushi/toml v1.3.2
|
require github.com/BurntSushi/toml v1.3.2
|
||||||
|
|
||||||
require nhooyr.io/websocket v1.8.11 // indirect
|
require (
|
||||||
|
github.com/google/uuid v1.6.0 // indirect
|
||||||
|
nhooyr.io/websocket v1.8.11 // indirect
|
||||||
|
)
|
||||||
|
|
2
go.sum
2
go.sum
|
@ -1,4 +1,6 @@
|
||||||
github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8=
|
github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8=
|
||||||
github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
|
github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
|
||||||
|
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||||
|
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||||
nhooyr.io/websocket v1.8.11 h1:f/qXNc2/3DpoSZkHt1DQu6rj4zGC8JmkkLkWss0MgN0=
|
nhooyr.io/websocket v1.8.11 h1:f/qXNc2/3DpoSZkHt1DQu6rj4zGC8JmkkLkWss0MgN0=
|
||||||
nhooyr.io/websocket v1.8.11/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c=
|
nhooyr.io/websocket v1.8.11/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c=
|
||||||
|
|
|
@ -4,16 +4,24 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"git.lastassault.de/speatzle/morffix/constants"
|
"git.lastassault.de/speatzle/morffix/constants"
|
||||||
|
|
||||||
"nhooyr.io/websocket"
|
"nhooyr.io/websocket"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Connection struct {
|
type Worker struct {
|
||||||
|
Name string
|
||||||
|
Address string
|
||||||
|
Conn *websocket.Conn
|
||||||
|
Connected bool
|
||||||
|
ConnectionChanged time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
var Connections []Connection
|
var Workers = map[string]*Worker{}
|
||||||
|
var WorkersMutex sync.Mutex
|
||||||
|
|
||||||
func handleWorkerWebsocket(w http.ResponseWriter, r *http.Request) {
|
func handleWorkerWebsocket(w http.ResponseWriter, r *http.Request) {
|
||||||
if r.Header.Get(constants.SHARED_SECRET_HEADER) != conf.SharedSecret {
|
if r.Header.Get(constants.SHARED_SECRET_HEADER) != conf.SharedSecret {
|
||||||
|
@ -29,9 +37,48 @@ func handleWorkerWebsocket(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
defer c.CloseNow()
|
defer c.CloseNow()
|
||||||
|
|
||||||
slog.InfoContext(r.Context(), "Worker Websocket Connection", "RemoteAddress", r.RemoteAddr)
|
// Connection ID
|
||||||
|
uuid := r.Header.Get(constants.UUID_HEADER)
|
||||||
|
|
||||||
// TODO Add Connection Tracking
|
// Track Connection
|
||||||
|
func() {
|
||||||
|
WorkersMutex.Lock()
|
||||||
|
defer WorkersMutex.Unlock()
|
||||||
|
|
||||||
|
w, ok := Workers[uuid]
|
||||||
|
if ok && w.Connected {
|
||||||
|
slog.WarnContext(r.Context(), "Worker Reconnected before the old connection died, killing...")
|
||||||
|
w.Conn.CloseNow()
|
||||||
|
|
||||||
|
// Since we Closed the Connection, we need to wait for the old connections deferred cleanup to finish before Writing down the new connection or we will be overwritten.
|
||||||
|
for {
|
||||||
|
WorkersMutex.Unlock()
|
||||||
|
time.Sleep(time.Millisecond * 10)
|
||||||
|
WorkersMutex.Lock()
|
||||||
|
if !Workers[uuid].Connected {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Workers[uuid] = &Worker{
|
||||||
|
Name: r.Header.Get(constants.NAME_HEADER),
|
||||||
|
Address: r.RemoteAddr,
|
||||||
|
Conn: c,
|
||||||
|
Connected: true,
|
||||||
|
ConnectionChanged: time.Now(),
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Set Status on Disconnect
|
||||||
|
defer func() {
|
||||||
|
WorkersMutex.Lock()
|
||||||
|
defer WorkersMutex.Unlock()
|
||||||
|
Workers[uuid].Connected = false
|
||||||
|
Workers[uuid].ConnectionChanged = time.Now()
|
||||||
|
}()
|
||||||
|
|
||||||
|
slog.InfoContext(r.Context(), "Worker Websocket Connection", "RemoteAddress", r.RemoteAddr)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
err = readMessage(r.Context(), c)
|
err = readMessage(r.Context(), c)
|
||||||
|
|
|
@ -10,15 +10,15 @@ import (
|
||||||
"git.lastassault.de/speatzle/morffix/config"
|
"git.lastassault.de/speatzle/morffix/config"
|
||||||
"git.lastassault.de/speatzle/morffix/constants"
|
"git.lastassault.de/speatzle/morffix/constants"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
|
|
||||||
"nhooyr.io/websocket"
|
"nhooyr.io/websocket"
|
||||||
)
|
)
|
||||||
|
|
||||||
func Start(conf config.Config) {
|
uuid := uuid.New()
|
||||||
ctx := context.Background()
|
|
||||||
headers := http.Header{}
|
|
||||||
|
|
||||||
slog.InfoContext(ctx, "Connecting to Server...", "Address", conf.Worker.Address)
|
slog.InfoContext(ctx, "Connecting to Server...", "Address", conf.Worker.Address)
|
||||||
headers.Add(constants.SHARED_SECRET_HEADER, conf.SharedSecret)
|
headers.Add(constants.SHARED_SECRET_HEADER, conf.SharedSecret)
|
||||||
|
headers.Add(constants.UUID_HEADER, uuid.String())
|
||||||
c, res, err := websocket.Dial(ctx, conf.Worker.Address, &websocket.DialOptions{
|
c, res, err := websocket.Dial(ctx, conf.Worker.Address, &websocket.DialOptions{
|
||||||
HTTPHeader: headers,
|
HTTPHeader: headers,
|
||||||
})
|
})
|
||||||
|
|
Loading…
Add table
Reference in a new issue