From 05a28c06442f86ea6a5cf105cfd8214fbc7fc7f7 Mon Sep 17 00:00:00 2001 From: nikky Date: Sun, 9 Aug 2020 22:20:57 +0200 Subject: [PATCH] making the linter happy --- bridge/api/api.go | 143 +++++++++++++++++++++++++++++++++++----------- 1 file changed, 109 insertions(+), 34 deletions(-) diff --git a/bridge/api/api.go b/bridge/api/api.go index 2d2a5e51..5e1b0692 100644 --- a/bridge/api/api.go +++ b/bridge/api/api.go @@ -2,7 +2,6 @@ package api import ( "encoding/json" - "github.com/grafov/bcast" "net/http" "sync" "time" @@ -10,7 +9,7 @@ import ( "github.com/42wim/matterbridge/bridge" "github.com/42wim/matterbridge/bridge/config" "github.com/gorilla/websocket" - //"github.com/grafov/bcast" + "github.com/grafov/bcast" "github.com/labstack/echo/v4" "github.com/labstack/echo/v4/middleware" ring "github.com/zfjagann/golang-ring" @@ -28,12 +27,8 @@ const ( ) type API struct { - Messages ring.Ring - group bcast.Group - //messageMember bcast.Member - //messageChannel chan config.Message - //streamMember bcast.Member - //streamChannel chan config.Message + Messages ring.Ring + group bcast.Group sync.RWMutex *bridge.Config } @@ -48,7 +43,7 @@ type Message struct { type MessageWrapper struct { message config.Message - member *bcast.Member + member *bcast.Member } func New(cfg *bridge.Config) bridge.Bridger { @@ -62,8 +57,6 @@ func New(cfg *bridge.Config) bridge.Bridger { b.Messages = ring.Ring{} if b.GetInt("Buffer") != 0 { b.Messages.SetCapacity(b.GetInt("Buffer")) - } else { - // TODO: set default capacity ? } if b.GetString("Token") != "" { @@ -96,14 +89,14 @@ func New(cfg *bridge.Config) bridge.Bridger { func (b *API) Connect() error { return nil } + func (b *API) Disconnect() error { return nil - } -func (b *API) JoinChannel(channel config.ChannelInfo) error { - // we could have a `chan config.Message` for each text channel here, instead of hardcoded "api" - return nil +func (b *API) JoinChannel(channel config.ChannelInfo) error { + // we could have a `chan config.Message` for each text channel here, instead of hardcoded "api" ? + return nil } func (b *API) Send(msg config.Message) (string, error) { @@ -142,9 +135,40 @@ func (b *API) handlePostMessage(c echo.Context) error { func (b *API) handleMessages(c echo.Context) error { b.Lock() defer b.Unlock() - _ = c.JSONPretty(http.StatusOK, b.Messages.Values(), " ") + + // filter using header skip_before + timestamp := time.Time{} + hasTimestamp := false + timestampStr := c.Request().Header.Get("skip_before") + if timestampStr != "" { + b.Log.Debugf("received timestamp '%s'", timestampStr) + decodedTimestamp, err := time.Parse(time.RFC3339, timestampStr) + if err != nil { + b.Log.Errorf("failed to parse timestamp '%s'", timestampStr) + } + timestamp = decodedTimestamp + hasTimestamp = true + b.Log.Debugf("parsed time '%v'", timestamp) + } + + messages := []config.Message{} + + for _, msg := range b.Messages.Values() { + msg, ok := msg.(config.Message) + if !ok { + b.Log.Errorf("failed casting %v", msg) + break + } + if hasTimestamp && (timestamp.After(msg.Timestamp) || timestamp.Equal(msg.Timestamp)) { + // b.Log.Debugf("skipping mesage with timestamp %v", msg.Timestamp) + continue + } + messages = append(messages, msg) + } + + _ = c.JSONPretty(http.StatusOK, messages, " ") // not clearing history.. intentionally - //b.Messages = ring.Ring{} + // b.Messages = ring.Ring{} return nil } @@ -165,8 +189,36 @@ func (b *API) handleStream(c echo.Context) error { } c.Response().Flush() - // TODO: currently this skips sending history - // TODO: send history from ringbuffer ? + // filter using header skip_before + timestamp := time.Time{} + hasTimestamp := false + timestampStr := c.Request().Header.Get("skip_before") + if timestampStr != "" { + b.Log.Debugf("received timestamp '%s'", timestampStr) + decodedTimestamp, err := time.Parse(time.RFC3339, timestampStr) + if err != nil { + b.Log.Errorf("failed to parse timestamp '%s'", timestampStr) + } + timestamp = decodedTimestamp + hasTimestamp = true + b.Log.Debugf("parsed time '%v'", timestamp) + } + + // send messages from history + for _, msg := range b.Messages.Values() { + msg, ok := msg.(config.Message) + if !ok { + break + } + if hasTimestamp && (timestamp.After(msg.Timestamp) || timestamp.Equal(msg.Timestamp)) { + continue + } + if err := json.NewEncoder(c.Response()).Encode(msg); err != nil { + return err + } + c.Response().Flush() + time.Sleep(200 * time.Millisecond) + } member := *b.group.Join() defer func() { @@ -174,20 +226,19 @@ func (b *API) handleStream(c echo.Context) error { member.Close() }() - loop: for { - select { +loop: + for { // block until channel has message - case msg := <-member.Read: - messageWrapper, ok := msg.(MessageWrapper) - if !ok { - break loop - } - message := messageWrapper.message - if err := json.NewEncoder(c.Response()).Encode(message); err != nil { - return err - } - c.Response().Flush() + msg := <-member.Read + messageWrapper, ok := msg.(MessageWrapper) + if !ok { + break loop } + message := messageWrapper.message + if err := json.NewEncoder(c.Response()).Encode(message); err != nil { + return err + } + c.Response().Flush() time.Sleep(200 * time.Millisecond) } return nil @@ -213,14 +264,15 @@ func (b *API) writePump(conn *websocket.Conn, member *bcast.Member) { member.Close() }() - loop: for { +loop: + for { select { case msg := <-member.Read: messageWrapper, ok := msg.(MessageWrapper) if !ok { break loop } - b.Log.Debugf("compare pointer %p == %p", messageWrapper.member, member) + b.Log.Debugf("compare pointer %p == %p", messageWrapper.member, member) if messageWrapper.member == member { continue loop } @@ -278,7 +330,7 @@ func (b *API) readPump(conn *websocket.Conn, member *bcast.Member) { func (b *API) handleWebsocket(c echo.Context) error { u := websocket.Upgrader{ReadBufferSize: 1024, WriteBufferSize: 1024} conn, err := u.Upgrade(c.Response().Writer, c.Request(), nil) - //websocket.Upgrade(c.Response().Writer, c.Request(), nil, 1024, 1024) + // websocket.Upgrade(c.Response().Writer, c.Request(), nil, 1024, 1024) if err != nil { return err } @@ -288,8 +340,31 @@ func (b *API) handleWebsocket(c echo.Context) error { // TODO: maybe send all history as single message as json array ? + // filter using header skip_before + timestamp := time.Time{} + hasTimestamp := false + timestampStr := c.Request().Header.Get("skip_before") + if timestampStr != "" { + b.Log.Debugf("received timestamp '%s'", timestampStr) + decodedTimestamp, err := time.Parse(time.RFC3339, timestampStr) + if err != nil { + b.Log.Errorf("failed to parse timestamp '%s'", timestampStr) + } + timestamp = decodedTimestamp + hasTimestamp = true + b.Log.Debugf("parsed time '%v'", timestamp) + } + // send all messages from history for _, msg := range b.Messages.Values() { + msg, ok := msg.(config.Message) + if !ok { + break + } + if hasTimestamp && (timestamp.After(msg.Timestamp) || timestamp.Equal(msg.Timestamp)) { + continue + } + _ = conn.SetWriteDeadline(time.Now().Add(writeWait)) b.Log.Debugf("sending message %v", msg) err := conn.WriteJSON(msg)