diff --git a/bridge/api/api.go b/bridge/api/api.go index e9871548..2b034568 100644 --- a/bridge/api/api.go +++ b/bridge/api/api.go @@ -108,13 +108,17 @@ func (b *API) handleMessages(c echo.Context) error { return nil } -func (b *API) handleStream(c echo.Context) error { - c.Response().Header().Set(echo.HeaderContentType, echo.MIMEApplicationJSON) - c.Response().WriteHeader(http.StatusOK) - greet := config.Message{ +func (b *API) getGreeting() config.Message { + return config.Message{ Event: config.EventAPIConnected, Timestamp: time.Now(), } +} + +func (b *API) handleStream(c echo.Context) error { + c.Response().Header().Set(echo.HeaderContentType, echo.MIMEApplicationJSON) + c.Response().WriteHeader(http.StatusOK) + greet := b.getGreeting() if err := json.NewEncoder(c.Response()).Encode(greet); err != nil { return err } @@ -142,28 +146,40 @@ func (b *API) handleWebsocketMessage(message config.Message) { b.Remote <- message } +func (b *API) writePump(conn *websocket.Conn) { + for { + msg := b.Messages.Dequeue() + if msg != nil { + err := conn.WriteJSON(msg) + if err != nil { + break + } + } + } +} + +func (b *API) readPump(conn *websocket.Conn) { + for { + message := config.Message{} + err := conn.ReadJSON(&message) + if err != nil { + break + } + b.handleWebsocketMessage(message) + } +} + func (b *API) handleWebsocket(c echo.Context) error { conn, err := websocket.Upgrade(c.Response().Writer, c.Request(), nil, 1024, 1024) if err != nil { return err } - greet := config.Message{ - Event: config.EventAPIConnected, - Timestamp: time.Now(), - } - conn.WriteJSON(greet) + greet := b.getGreeting() + _ = conn.WriteJSON(greet) - for { - msg := b.Messages.Dequeue() - if msg != nil { - _ = conn.WriteJSON(msg) - } + go b.writePump(conn) + go b.readPump(conn) - message := config.Message{} - err := conn.ReadJSON(&message) - if err == nil { - b.handleWebsocketMessage(message) - } - } + return nil }