now websockets see messages from others, but not their own anymore
This commit is contained in:
@@ -46,6 +46,11 @@ type Message struct {
|
|||||||
Gateway string `json:"gateway"`
|
Gateway string `json:"gateway"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type MessageWrapper struct {
|
||||||
|
message config.Message
|
||||||
|
member *bcast.Member
|
||||||
|
}
|
||||||
|
|
||||||
func New(cfg *bridge.Config) bridge.Bridger {
|
func New(cfg *bridge.Config) bridge.Bridger {
|
||||||
b := &API{Config: cfg}
|
b := &API{Config: cfg}
|
||||||
e := echo.New()
|
e := echo.New()
|
||||||
@@ -110,7 +115,7 @@ func (b *API) Send(msg config.Message) (string, error) {
|
|||||||
}
|
}
|
||||||
b.Log.Debugf("enqueueing message from %s to group broadcast", msg.Username)
|
b.Log.Debugf("enqueueing message from %s to group broadcast", msg.Username)
|
||||||
b.Messages.Enqueue(msg)
|
b.Messages.Enqueue(msg)
|
||||||
b.group.Send(msg)
|
b.group.Send(MessageWrapper{msg, nil})
|
||||||
return "", nil
|
return "", nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -169,17 +174,23 @@ func (b *API) handleStream(c echo.Context) error {
|
|||||||
member.Close()
|
member.Close()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
for {
|
loop: for {
|
||||||
select {
|
select {
|
||||||
// block until channel has message
|
// block until channel has message
|
||||||
case msg := <-member.Read:
|
case msg := <-member.Read:
|
||||||
if err := json.NewEncoder(c.Response()).Encode(msg); err != nil {
|
messageWrapper, ok := msg.(MessageWrapper)
|
||||||
|
if !ok {
|
||||||
|
break loop
|
||||||
|
}
|
||||||
|
message := messageWrapper.message
|
||||||
|
if err := json.NewEncoder(c.Response()).Encode(message); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
c.Response().Flush()
|
c.Response().Flush()
|
||||||
}
|
}
|
||||||
time.Sleep(200 * time.Millisecond)
|
time.Sleep(200 * time.Millisecond)
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *API) handleWebsocketMessage(message config.Message) {
|
func (b *API) handleWebsocketMessage(message config.Message) {
|
||||||
@@ -193,7 +204,7 @@ func (b *API) handleWebsocketMessage(message config.Message) {
|
|||||||
b.Remote <- message
|
b.Remote <- message
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *API) writePump(conn *websocket.Conn, member bcast.Member) {
|
func (b *API) writePump(conn *websocket.Conn, member *bcast.Member) {
|
||||||
ticker := time.NewTicker(pingPeriod)
|
ticker := time.NewTicker(pingPeriod)
|
||||||
defer func() {
|
defer func() {
|
||||||
b.Log.Debug("closing websocket")
|
b.Log.Debug("closing websocket")
|
||||||
@@ -202,14 +213,23 @@ func (b *API) writePump(conn *websocket.Conn, member bcast.Member) {
|
|||||||
member.Close()
|
member.Close()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
for {
|
loop: for {
|
||||||
select {
|
select {
|
||||||
case msg := <-member.Read:
|
case msg := <-member.Read:
|
||||||
|
messageWrapper, ok := msg.(MessageWrapper)
|
||||||
|
if !ok {
|
||||||
|
break loop
|
||||||
|
}
|
||||||
|
b.Log.Debugf("compare pointer %p == %p", messageWrapper.member, member)
|
||||||
|
if messageWrapper.member == member {
|
||||||
|
continue loop
|
||||||
|
}
|
||||||
|
message := messageWrapper.message
|
||||||
_ = conn.SetWriteDeadline(time.Now().Add(writeWait))
|
_ = conn.SetWriteDeadline(time.Now().Add(writeWait))
|
||||||
b.Log.Debugf("sending message %v", msg)
|
b.Log.Debugf("sending message %v", message)
|
||||||
err := conn.WriteJSON(msg)
|
err := conn.WriteJSON(message)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.Log.Errorf("error: %v", err)
|
b.Log.Errorf("error: %v", message)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
@@ -223,10 +243,11 @@ func (b *API) writePump(conn *websocket.Conn, member bcast.Member) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *API) readPump(conn *websocket.Conn) {
|
func (b *API) readPump(conn *websocket.Conn, member *bcast.Member) {
|
||||||
defer func() {
|
defer func() {
|
||||||
b.Log.Debug("closing websocket")
|
b.Log.Debug("closing websocket")
|
||||||
_ = conn.Close()
|
_ = conn.Close()
|
||||||
|
member.Close()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
_ = conn.SetReadDeadline(time.Now().Add(pongWait))
|
_ = conn.SetReadDeadline(time.Now().Add(pongWait))
|
||||||
@@ -248,6 +269,9 @@ func (b *API) readPump(conn *websocket.Conn) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
b.handleWebsocketMessage(message)
|
b.handleWebsocketMessage(message)
|
||||||
|
// this also sends to itself, seems like there is no nice way to prevent that
|
||||||
|
// should not be a problem as long as clients set their userid correctly and filter away messages from themself
|
||||||
|
member.Send(MessageWrapper{message, member})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -275,10 +299,10 @@ func (b *API) handleWebsocket(c echo.Context) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
member := *b.group.Join()
|
member := b.group.Join()
|
||||||
|
|
||||||
go b.writePump(conn, member)
|
go b.writePump(conn, member)
|
||||||
go b.readPump(conn)
|
go b.readPump(conn, member)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user