diff --git a/config/config.go b/config/config.go new file mode 100644 index 0000000..ff3fb40 --- /dev/null +++ b/config/config.go @@ -0,0 +1,15 @@ +package config + +type Config struct { + SharedSecret string + Server Server + Worker Worker +} + +type Server struct { + Address string +} + +type Worker struct { + Address string +} diff --git a/constants/constants.go b/constants/constants.go new file mode 100644 index 0000000..321166a --- /dev/null +++ b/constants/constants.go @@ -0,0 +1,3 @@ +package constants + +const SHARED_SECRET_HEADER = "morffix-secret" diff --git a/go.mod b/go.mod index 65df1c9..ff6d54f 100644 --- a/go.mod +++ b/go.mod @@ -3,3 +3,5 @@ module git.lastassault.de/speatzle/morffix go 1.21.8 require github.com/BurntSushi/toml v1.3.2 + +require nhooyr.io/websocket v1.8.11 // indirect diff --git a/go.sum b/go.sum index ef0f966..d3e27ac 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,4 @@ github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8= github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= +nhooyr.io/websocket v1.8.11 h1:f/qXNc2/3DpoSZkHt1DQu6rj4zGC8JmkkLkWss0MgN0= +nhooyr.io/websocket v1.8.11/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c= diff --git a/main.go b/main.go index 9350d03..93562f7 100644 --- a/main.go +++ b/main.go @@ -1,31 +1,39 @@ package main import ( + "flag" "log/slog" "os" + "git.lastassault.de/speatzle/morffix/config" + "git.lastassault.de/speatzle/morffix/server" + "git.lastassault.de/speatzle/morffix/worker" "github.com/BurntSushi/toml" ) -type Config struct { - Server bool -} - -var conf Config +var conf config.Config func main() { - slog.Info("Starting...") - + isserver := flag.Bool("server", false, "Run as Server") + flag.Parse() confPath := "config.toml" _, err := os.Stat(confPath) if err != nil { confPath = "/etc/morffix/config.toml" } + slog.Info("Loading Config", "path", confPath) _, err = toml.DecodeFile(confPath, &conf) if err != nil { slog.Error("Error Loading Config", "err", err) return } + if *isserver { + slog.Info("Starting Server...") + server.Start(conf) + } else { + slog.Info("Starting Worker...") + worker.Start(conf) + } } diff --git a/server/index.go b/server/index.go new file mode 100644 index 0000000..ab08922 --- /dev/null +++ b/server/index.go @@ -0,0 +1,9 @@ +package server + +import ( + "net/http" +) + +func handleIndex(w http.ResponseWriter, r *http.Request) { + w.Write([]byte("Alive")) +} diff --git a/server/server.go b/server/server.go new file mode 100644 index 0000000..96de468 --- /dev/null +++ b/server/server.go @@ -0,0 +1,55 @@ +package server + +import ( + "context" + "log/slog" + "net/http" + "os" + "os/signal" + "time" + + "git.lastassault.de/speatzle/morffix/config" +) + +var conf config.Config + +func Start(_conf config.Config) { + conf = _conf + mux := http.NewServeMux() + mux.HandleFunc("/worker", handleWorkerWebsocket) + mux.HandleFunc("/", handleIndex) + + server := &http.Server{ + Addr: conf.Server.Address, + Handler: mux, + ReadTimeout: 1 * time.Second, + WriteTimeout: 1 * time.Second, + IdleTimeout: 30 * time.Second, + ReadHeaderTimeout: 2 * time.Second, + } + + serverClose := make(chan bool) + go func() { + slog.Info("Listening...", "Address", conf.Server.Address) + err := server.ListenAndServe() + if err != http.ErrServerClosed { + slog.Error("Listen Failed", "err", err) + } else { + slog.Info("Server Closed") + } + serverClose <- true + }() + + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, os.Interrupt) + select { + case <-serverClose: + slog.Info("Exiting due to Listen Failure") + case sig := <-sigs: + slog.Info("Stopping Server...", "signal", sig) + stopCtx, cancel := context.WithTimeout(context.Background(), time.Second*10) + server.Shutdown(stopCtx) + cancel() + slog.Info("Done") + } +} diff --git a/server/worker.go b/server/worker.go new file mode 100644 index 0000000..fe111f6 --- /dev/null +++ b/server/worker.go @@ -0,0 +1,56 @@ +package server + +import ( + "context" + "log/slog" + "net/http" + + "git.lastassault.de/speatzle/morffix/constants" + + "nhooyr.io/websocket" +) + +type Connection struct { +} + +var Connections []Connection + +func handleWorkerWebsocket(w http.ResponseWriter, r *http.Request) { + if r.Header.Get(constants.SHARED_SECRET_HEADER) != conf.SharedSecret { + w.WriteHeader(http.StatusUnauthorized) + return + } + c, err := websocket.Accept(w, r, &websocket.AcceptOptions{ + InsecureSkipVerify: true, + }) + if err != nil { + slog.ErrorContext(r.Context(), "Error Accepting Worker Websocket Connection", "err", err) + return + } + defer c.CloseNow() + + slog.InfoContext(r.Context(), "Worker Websocket Connection", "RemoteAddress", r.RemoteAddr) + + // TODO Add Connection Tracking + + for { + err = readMessage(r.Context(), c) + if websocket.CloseStatus(err) == websocket.StatusNormalClosure || websocket.CloseStatus(err) == websocket.StatusAbnormalClosure || websocket.CloseStatus(err) == websocket.StatusGoingAway { + slog.InfoContext(r.Context(), "Websocket Closed") + return + } + if err != nil { + slog.ErrorContext(r.Context(), "Error Reading Websocket Message", "err", err) + return + } + } +} + +func readMessage(ctx context.Context, c *websocket.Conn) error { + t, data, err := c.Read(ctx) + if err != nil { + return err + } + slog.InfoContext(ctx, "Got Websocket Message", "type", t.String(), "data", data) + return c.Write(ctx, t, data) +} diff --git a/worker/worker.go b/worker/worker.go new file mode 100644 index 0000000..e686d10 --- /dev/null +++ b/worker/worker.go @@ -0,0 +1,61 @@ +package worker + +import ( + "context" + "io" + "log/slog" + "net/http" + "time" + + "git.lastassault.de/speatzle/morffix/config" + "git.lastassault.de/speatzle/morffix/constants" + + "nhooyr.io/websocket" +) + +func Start(conf config.Config) { + ctx := context.Background() + headers := http.Header{} + + slog.InfoContext(ctx, "Connecting to Server...", "Address", conf.Worker.Address) + headers.Add(constants.SHARED_SECRET_HEADER, conf.SharedSecret) + c, res, err := websocket.Dial(ctx, conf.Worker.Address, &websocket.DialOptions{ + HTTPHeader: headers, + }) + if err != nil { + if res != nil { + b, _ := io.ReadAll(res.Body) + slog.ErrorContext(ctx, "Error Connecting to Server", "err", err, "code", res.Status, "body", b) + + } else { + slog.ErrorContext(ctx, "Error Connecting to Server", "err", err) + } + return + } + go func() { + time.Sleep(time.Second * 5) + c.Write(ctx, websocket.MessageText, []byte("test")) + }() + + slog.InfoContext(ctx, "Waiting for Messages") + for { + err = readMessage(ctx, c) + if websocket.CloseStatus(err) == websocket.StatusNormalClosure || websocket.CloseStatus(err) == websocket.StatusAbnormalClosure || websocket.CloseStatus(err) == websocket.StatusGoingAway { + slog.InfoContext(ctx, "Websocket Closed") + return + } + if err != nil { + slog.ErrorContext(ctx, "Error Reading Websocket Message", "err", err) + return + } + } +} + +func readMessage(ctx context.Context, c *websocket.Conn) error { + t, data, err := c.Read(ctx) + if err != nil { + return err + } + slog.InfoContext(ctx, "Got Websocket Message", "type", t.String(), "data", data) + return nil +}