making the linter happy
This commit is contained in:
@@ -2,7 +2,6 @@ package api
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"github.com/grafov/bcast"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -10,7 +9,7 @@ import (
|
||||
"github.com/42wim/matterbridge/bridge"
|
||||
"github.com/42wim/matterbridge/bridge/config"
|
||||
"github.com/gorilla/websocket"
|
||||
//"github.com/grafov/bcast"
|
||||
"github.com/grafov/bcast"
|
||||
"github.com/labstack/echo/v4"
|
||||
"github.com/labstack/echo/v4/middleware"
|
||||
ring "github.com/zfjagann/golang-ring"
|
||||
@@ -28,12 +27,8 @@ const (
|
||||
)
|
||||
|
||||
type API struct {
|
||||
Messages ring.Ring
|
||||
group bcast.Group
|
||||
//messageMember bcast.Member
|
||||
//messageChannel chan config.Message
|
||||
//streamMember bcast.Member
|
||||
//streamChannel chan config.Message
|
||||
Messages ring.Ring
|
||||
group bcast.Group
|
||||
sync.RWMutex
|
||||
*bridge.Config
|
||||
}
|
||||
@@ -48,7 +43,7 @@ type Message struct {
|
||||
|
||||
type MessageWrapper struct {
|
||||
message config.Message
|
||||
member *bcast.Member
|
||||
member *bcast.Member
|
||||
}
|
||||
|
||||
func New(cfg *bridge.Config) bridge.Bridger {
|
||||
@@ -62,8 +57,6 @@ func New(cfg *bridge.Config) bridge.Bridger {
|
||||
b.Messages = ring.Ring{}
|
||||
if b.GetInt("Buffer") != 0 {
|
||||
b.Messages.SetCapacity(b.GetInt("Buffer"))
|
||||
} else {
|
||||
// TODO: set default capacity ?
|
||||
}
|
||||
|
||||
if b.GetString("Token") != "" {
|
||||
@@ -96,14 +89,14 @@ func New(cfg *bridge.Config) bridge.Bridger {
|
||||
func (b *API) Connect() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *API) Disconnect() error {
|
||||
return nil
|
||||
|
||||
}
|
||||
func (b *API) JoinChannel(channel config.ChannelInfo) error {
|
||||
// we could have a `chan config.Message` for each text channel here, instead of hardcoded "api"
|
||||
return nil
|
||||
|
||||
func (b *API) JoinChannel(channel config.ChannelInfo) error {
|
||||
// we could have a `chan config.Message` for each text channel here, instead of hardcoded "api" ?
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *API) Send(msg config.Message) (string, error) {
|
||||
@@ -142,9 +135,40 @@ func (b *API) handlePostMessage(c echo.Context) error {
|
||||
func (b *API) handleMessages(c echo.Context) error {
|
||||
b.Lock()
|
||||
defer b.Unlock()
|
||||
_ = c.JSONPretty(http.StatusOK, b.Messages.Values(), " ")
|
||||
|
||||
// filter using header skip_before
|
||||
timestamp := time.Time{}
|
||||
hasTimestamp := false
|
||||
timestampStr := c.Request().Header.Get("skip_before")
|
||||
if timestampStr != "" {
|
||||
b.Log.Debugf("received timestamp '%s'", timestampStr)
|
||||
decodedTimestamp, err := time.Parse(time.RFC3339, timestampStr)
|
||||
if err != nil {
|
||||
b.Log.Errorf("failed to parse timestamp '%s'", timestampStr)
|
||||
}
|
||||
timestamp = decodedTimestamp
|
||||
hasTimestamp = true
|
||||
b.Log.Debugf("parsed time '%v'", timestamp)
|
||||
}
|
||||
|
||||
messages := []config.Message{}
|
||||
|
||||
for _, msg := range b.Messages.Values() {
|
||||
msg, ok := msg.(config.Message)
|
||||
if !ok {
|
||||
b.Log.Errorf("failed casting %v", msg)
|
||||
break
|
||||
}
|
||||
if hasTimestamp && (timestamp.After(msg.Timestamp) || timestamp.Equal(msg.Timestamp)) {
|
||||
// b.Log.Debugf("skipping mesage with timestamp %v", msg.Timestamp)
|
||||
continue
|
||||
}
|
||||
messages = append(messages, msg)
|
||||
}
|
||||
|
||||
_ = c.JSONPretty(http.StatusOK, messages, " ")
|
||||
// not clearing history.. intentionally
|
||||
//b.Messages = ring.Ring{}
|
||||
// b.Messages = ring.Ring{}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -165,8 +189,36 @@ func (b *API) handleStream(c echo.Context) error {
|
||||
}
|
||||
c.Response().Flush()
|
||||
|
||||
// TODO: currently this skips sending history
|
||||
// TODO: send history from ringbuffer ?
|
||||
// filter using header skip_before
|
||||
timestamp := time.Time{}
|
||||
hasTimestamp := false
|
||||
timestampStr := c.Request().Header.Get("skip_before")
|
||||
if timestampStr != "" {
|
||||
b.Log.Debugf("received timestamp '%s'", timestampStr)
|
||||
decodedTimestamp, err := time.Parse(time.RFC3339, timestampStr)
|
||||
if err != nil {
|
||||
b.Log.Errorf("failed to parse timestamp '%s'", timestampStr)
|
||||
}
|
||||
timestamp = decodedTimestamp
|
||||
hasTimestamp = true
|
||||
b.Log.Debugf("parsed time '%v'", timestamp)
|
||||
}
|
||||
|
||||
// send messages from history
|
||||
for _, msg := range b.Messages.Values() {
|
||||
msg, ok := msg.(config.Message)
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
if hasTimestamp && (timestamp.After(msg.Timestamp) || timestamp.Equal(msg.Timestamp)) {
|
||||
continue
|
||||
}
|
||||
if err := json.NewEncoder(c.Response()).Encode(msg); err != nil {
|
||||
return err
|
||||
}
|
||||
c.Response().Flush()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
}
|
||||
|
||||
member := *b.group.Join()
|
||||
defer func() {
|
||||
@@ -174,20 +226,19 @@ func (b *API) handleStream(c echo.Context) error {
|
||||
member.Close()
|
||||
}()
|
||||
|
||||
loop: for {
|
||||
select {
|
||||
loop:
|
||||
for {
|
||||
// block until channel has message
|
||||
case msg := <-member.Read:
|
||||
messageWrapper, ok := msg.(MessageWrapper)
|
||||
if !ok {
|
||||
break loop
|
||||
}
|
||||
message := messageWrapper.message
|
||||
if err := json.NewEncoder(c.Response()).Encode(message); err != nil {
|
||||
return err
|
||||
}
|
||||
c.Response().Flush()
|
||||
msg := <-member.Read
|
||||
messageWrapper, ok := msg.(MessageWrapper)
|
||||
if !ok {
|
||||
break loop
|
||||
}
|
||||
message := messageWrapper.message
|
||||
if err := json.NewEncoder(c.Response()).Encode(message); err != nil {
|
||||
return err
|
||||
}
|
||||
c.Response().Flush()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
}
|
||||
return nil
|
||||
@@ -213,14 +264,15 @@ func (b *API) writePump(conn *websocket.Conn, member *bcast.Member) {
|
||||
member.Close()
|
||||
}()
|
||||
|
||||
loop: for {
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
case msg := <-member.Read:
|
||||
messageWrapper, ok := msg.(MessageWrapper)
|
||||
if !ok {
|
||||
break loop
|
||||
}
|
||||
b.Log.Debugf("compare pointer %p == %p", messageWrapper.member, member)
|
||||
b.Log.Debugf("compare pointer %p == %p", messageWrapper.member, member)
|
||||
if messageWrapper.member == member {
|
||||
continue loop
|
||||
}
|
||||
@@ -278,7 +330,7 @@ func (b *API) readPump(conn *websocket.Conn, member *bcast.Member) {
|
||||
func (b *API) handleWebsocket(c echo.Context) error {
|
||||
u := websocket.Upgrader{ReadBufferSize: 1024, WriteBufferSize: 1024}
|
||||
conn, err := u.Upgrade(c.Response().Writer, c.Request(), nil)
|
||||
//websocket.Upgrade(c.Response().Writer, c.Request(), nil, 1024, 1024)
|
||||
// websocket.Upgrade(c.Response().Writer, c.Request(), nil, 1024, 1024)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -288,8 +340,31 @@ func (b *API) handleWebsocket(c echo.Context) error {
|
||||
|
||||
// TODO: maybe send all history as single message as json array ?
|
||||
|
||||
// filter using header skip_before
|
||||
timestamp := time.Time{}
|
||||
hasTimestamp := false
|
||||
timestampStr := c.Request().Header.Get("skip_before")
|
||||
if timestampStr != "" {
|
||||
b.Log.Debugf("received timestamp '%s'", timestampStr)
|
||||
decodedTimestamp, err := time.Parse(time.RFC3339, timestampStr)
|
||||
if err != nil {
|
||||
b.Log.Errorf("failed to parse timestamp '%s'", timestampStr)
|
||||
}
|
||||
timestamp = decodedTimestamp
|
||||
hasTimestamp = true
|
||||
b.Log.Debugf("parsed time '%v'", timestamp)
|
||||
}
|
||||
|
||||
// send all messages from history
|
||||
for _, msg := range b.Messages.Values() {
|
||||
msg, ok := msg.(config.Message)
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
if hasTimestamp && (timestamp.After(msg.Timestamp) || timestamp.Equal(msg.Timestamp)) {
|
||||
continue
|
||||
}
|
||||
|
||||
_ = conn.SetWriteDeadline(time.Now().Add(writeWait))
|
||||
b.Log.Debugf("sending message %v", msg)
|
||||
err := conn.WriteJSON(msg)
|
||||
|
||||
Reference in New Issue
Block a user