From 2f72b7e545d337e863c378774afc2a7c90eab287 Mon Sep 17 00:00:00 2001 From: nikky Date: Sat, 8 Aug 2020 02:17:01 +0200 Subject: [PATCH] now websockets see messages from others, but not their own anymore --- bridge/api/api.go | 46 +++++++++++++++++++++++++++++++++++----------- 1 file changed, 35 insertions(+), 11 deletions(-) diff --git a/bridge/api/api.go b/bridge/api/api.go index 59393a4a..2d2a5e51 100644 --- a/bridge/api/api.go +++ b/bridge/api/api.go @@ -46,6 +46,11 @@ type Message struct { Gateway string `json:"gateway"` } +type MessageWrapper struct { + message config.Message + member *bcast.Member +} + func New(cfg *bridge.Config) bridge.Bridger { b := &API{Config: cfg} e := echo.New() @@ -110,7 +115,7 @@ func (b *API) Send(msg config.Message) (string, error) { } b.Log.Debugf("enqueueing message from %s to group broadcast", msg.Username) b.Messages.Enqueue(msg) - b.group.Send(msg) + b.group.Send(MessageWrapper{msg, nil}) return "", nil } @@ -169,17 +174,23 @@ func (b *API) handleStream(c echo.Context) error { member.Close() }() - for { + loop: for { select { // block until channel has message case msg := <-member.Read: - if err := json.NewEncoder(c.Response()).Encode(msg); err != nil { + messageWrapper, ok := msg.(MessageWrapper) + if !ok { + break loop + } + message := messageWrapper.message + if err := json.NewEncoder(c.Response()).Encode(message); err != nil { return err } c.Response().Flush() } time.Sleep(200 * time.Millisecond) } + return nil } func (b *API) handleWebsocketMessage(message config.Message) { @@ -193,7 +204,7 @@ func (b *API) handleWebsocketMessage(message config.Message) { b.Remote <- message } -func (b *API) writePump(conn *websocket.Conn, member bcast.Member) { +func (b *API) writePump(conn *websocket.Conn, member *bcast.Member) { ticker := time.NewTicker(pingPeriod) defer func() { b.Log.Debug("closing websocket") @@ -202,14 +213,23 @@ func (b *API) writePump(conn *websocket.Conn, member bcast.Member) { member.Close() }() - for { + loop: for { select { case msg := <-member.Read: + messageWrapper, ok := msg.(MessageWrapper) + if !ok { + break loop + } + b.Log.Debugf("compare pointer %p == %p", messageWrapper.member, member) + if messageWrapper.member == member { + continue loop + } + message := messageWrapper.message _ = conn.SetWriteDeadline(time.Now().Add(writeWait)) - b.Log.Debugf("sending message %v", msg) - err := conn.WriteJSON(msg) + b.Log.Debugf("sending message %v", message) + err := conn.WriteJSON(message) if err != nil { - b.Log.Errorf("error: %v", err) + b.Log.Errorf("error: %v", message) return } case <-ticker.C: @@ -223,10 +243,11 @@ func (b *API) writePump(conn *websocket.Conn, member bcast.Member) { } } -func (b *API) readPump(conn *websocket.Conn) { +func (b *API) readPump(conn *websocket.Conn, member *bcast.Member) { defer func() { b.Log.Debug("closing websocket") _ = conn.Close() + member.Close() }() _ = conn.SetReadDeadline(time.Now().Add(pongWait)) @@ -248,6 +269,9 @@ func (b *API) readPump(conn *websocket.Conn) { return } b.handleWebsocketMessage(message) + // this also sends to itself, seems like there is no nice way to prevent that + // should not be a problem as long as clients set their userid correctly and filter away messages from themself + member.Send(MessageWrapper{message, member}) } } @@ -275,10 +299,10 @@ func (b *API) handleWebsocket(c echo.Context) error { } } - member := *b.group.Join() + member := b.group.Join() go b.writePump(conn, member) - go b.readPump(conn) + go b.readPump(conn, member) return nil }