Add Basic Websocket Server/Worker Connection

This commit is contained in:
speatzle 2024-04-26 18:33:49 +02:00
parent 01e34936e9
commit 9ac9ed8fe2
9 changed files with 218 additions and 7 deletions

15
config/config.go Normal file
View file

@ -0,0 +1,15 @@
package config
type Config struct {
SharedSecret string
Server Server
Worker Worker
}
type Server struct {
Address string
}
type Worker struct {
Address string
}

3
constants/constants.go Normal file
View file

@ -0,0 +1,3 @@
package constants
const SHARED_SECRET_HEADER = "morffix-secret"

2
go.mod
View file

@ -3,3 +3,5 @@ module git.lastassault.de/speatzle/morffix
go 1.21.8
require github.com/BurntSushi/toml v1.3.2
require nhooyr.io/websocket v1.8.11 // indirect

2
go.sum
View file

@ -1,2 +1,4 @@
github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8=
github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
nhooyr.io/websocket v1.8.11 h1:f/qXNc2/3DpoSZkHt1DQu6rj4zGC8JmkkLkWss0MgN0=
nhooyr.io/websocket v1.8.11/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c=

22
main.go
View file

@ -1,31 +1,39 @@
package main
import (
"flag"
"log/slog"
"os"
"git.lastassault.de/speatzle/morffix/config"
"git.lastassault.de/speatzle/morffix/server"
"git.lastassault.de/speatzle/morffix/worker"
"github.com/BurntSushi/toml"
)
type Config struct {
Server bool
}
var conf Config
var conf config.Config
func main() {
slog.Info("Starting...")
isserver := flag.Bool("server", false, "Run as Server")
flag.Parse()
confPath := "config.toml"
_, err := os.Stat(confPath)
if err != nil {
confPath = "/etc/morffix/config.toml"
}
slog.Info("Loading Config", "path", confPath)
_, err = toml.DecodeFile(confPath, &conf)
if err != nil {
slog.Error("Error Loading Config", "err", err)
return
}
if *isserver {
slog.Info("Starting Server...")
server.Start(conf)
} else {
slog.Info("Starting Worker...")
worker.Start(conf)
}
}

9
server/index.go Normal file
View file

@ -0,0 +1,9 @@
package server
import (
"net/http"
)
func handleIndex(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("Alive"))
}

55
server/server.go Normal file
View file

@ -0,0 +1,55 @@
package server
import (
"context"
"log/slog"
"net/http"
"os"
"os/signal"
"time"
"git.lastassault.de/speatzle/morffix/config"
)
var conf config.Config
func Start(_conf config.Config) {
conf = _conf
mux := http.NewServeMux()
mux.HandleFunc("/worker", handleWorkerWebsocket)
mux.HandleFunc("/", handleIndex)
server := &http.Server{
Addr: conf.Server.Address,
Handler: mux,
ReadTimeout: 1 * time.Second,
WriteTimeout: 1 * time.Second,
IdleTimeout: 30 * time.Second,
ReadHeaderTimeout: 2 * time.Second,
}
serverClose := make(chan bool)
go func() {
slog.Info("Listening...", "Address", conf.Server.Address)
err := server.ListenAndServe()
if err != http.ErrServerClosed {
slog.Error("Listen Failed", "err", err)
} else {
slog.Info("Server Closed")
}
serverClose <- true
}()
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, os.Interrupt)
select {
case <-serverClose:
slog.Info("Exiting due to Listen Failure")
case sig := <-sigs:
slog.Info("Stopping Server...", "signal", sig)
stopCtx, cancel := context.WithTimeout(context.Background(), time.Second*10)
server.Shutdown(stopCtx)
cancel()
slog.Info("Done")
}
}

56
server/worker.go Normal file
View file

@ -0,0 +1,56 @@
package server
import (
"context"
"log/slog"
"net/http"
"git.lastassault.de/speatzle/morffix/constants"
"nhooyr.io/websocket"
)
type Connection struct {
}
var Connections []Connection
func handleWorkerWebsocket(w http.ResponseWriter, r *http.Request) {
if r.Header.Get(constants.SHARED_SECRET_HEADER) != conf.SharedSecret {
w.WriteHeader(http.StatusUnauthorized)
return
}
c, err := websocket.Accept(w, r, &websocket.AcceptOptions{
InsecureSkipVerify: true,
})
if err != nil {
slog.ErrorContext(r.Context(), "Error Accepting Worker Websocket Connection", "err", err)
return
}
defer c.CloseNow()
slog.InfoContext(r.Context(), "Worker Websocket Connection", "RemoteAddress", r.RemoteAddr)
// TODO Add Connection Tracking
for {
err = readMessage(r.Context(), c)
if websocket.CloseStatus(err) == websocket.StatusNormalClosure || websocket.CloseStatus(err) == websocket.StatusAbnormalClosure || websocket.CloseStatus(err) == websocket.StatusGoingAway {
slog.InfoContext(r.Context(), "Websocket Closed")
return
}
if err != nil {
slog.ErrorContext(r.Context(), "Error Reading Websocket Message", "err", err)
return
}
}
}
func readMessage(ctx context.Context, c *websocket.Conn) error {
t, data, err := c.Read(ctx)
if err != nil {
return err
}
slog.InfoContext(ctx, "Got Websocket Message", "type", t.String(), "data", data)
return c.Write(ctx, t, data)
}

61
worker/worker.go Normal file
View file

@ -0,0 +1,61 @@
package worker
import (
"context"
"io"
"log/slog"
"net/http"
"time"
"git.lastassault.de/speatzle/morffix/config"
"git.lastassault.de/speatzle/morffix/constants"
"nhooyr.io/websocket"
)
func Start(conf config.Config) {
ctx := context.Background()
headers := http.Header{}
slog.InfoContext(ctx, "Connecting to Server...", "Address", conf.Worker.Address)
headers.Add(constants.SHARED_SECRET_HEADER, conf.SharedSecret)
c, res, err := websocket.Dial(ctx, conf.Worker.Address, &websocket.DialOptions{
HTTPHeader: headers,
})
if err != nil {
if res != nil {
b, _ := io.ReadAll(res.Body)
slog.ErrorContext(ctx, "Error Connecting to Server", "err", err, "code", res.Status, "body", b)
} else {
slog.ErrorContext(ctx, "Error Connecting to Server", "err", err)
}
return
}
go func() {
time.Sleep(time.Second * 5)
c.Write(ctx, websocket.MessageText, []byte("test"))
}()
slog.InfoContext(ctx, "Waiting for Messages")
for {
err = readMessage(ctx, c)
if websocket.CloseStatus(err) == websocket.StatusNormalClosure || websocket.CloseStatus(err) == websocket.StatusAbnormalClosure || websocket.CloseStatus(err) == websocket.StatusGoingAway {
slog.InfoContext(ctx, "Websocket Closed")
return
}
if err != nil {
slog.ErrorContext(ctx, "Error Reading Websocket Message", "err", err)
return
}
}
}
func readMessage(ctx context.Context, c *websocket.Conn) error {
t, data, err := c.Read(ctx)
if err != nil {
return err
}
slog.InfoContext(ctx, "Got Websocket Message", "type", t.String(), "data", data)
return nil
}