diff --git a/bridge/api/api.go b/bridge/api/api.go index bd0b63c6..59393a4a 100644 --- a/bridge/api/api.go +++ b/bridge/api/api.go @@ -1,9 +1,8 @@ package api import ( - "bytes" "encoding/json" - "io" + "github.com/grafov/bcast" "net/http" "sync" "time" @@ -11,8 +10,10 @@ import ( "github.com/42wim/matterbridge/bridge" "github.com/42wim/matterbridge/bridge/config" "github.com/gorilla/websocket" + //"github.com/grafov/bcast" "github.com/labstack/echo/v4" "github.com/labstack/echo/v4/middleware" + ring "github.com/zfjagann/golang-ring" ) const ( @@ -20,14 +21,19 @@ const ( writeWait = 10 * time.Second // Time allowed to read the next pong message from the peer. - pongWait = 10 * time.Second // TODO: 60 seconds + pongWait = 60 * time.Second // TODO: 60 seconds // Send pings to peer with this period. Must be less than pongWait. pingPeriod = (pongWait * 9) / 10 ) type API struct { - send chan config.Message + Messages ring.Ring + group bcast.Group + //messageMember bcast.Member + //messageChannel chan config.Message + //streamMember bcast.Member + //streamChannel chan config.Message sync.RWMutex *bridge.Config } @@ -45,7 +51,16 @@ func New(cfg *bridge.Config) bridge.Bridger { e := echo.New() e.HideBanner = true e.HidePort = true - b.send = make(chan config.Message, b.GetInt("Buffer")) + b.group = *bcast.NewGroup() + go b.group.Broadcast(0) // TODO: cancel this group broadcast at some point ? + + b.Messages = ring.Ring{} + if b.GetInt("Buffer") != 0 { + b.Messages.SetCapacity(b.GetInt("Buffer")) + } else { + // TODO: set default capacity ? + } + if b.GetString("Token") != "" { e.Use(middleware.KeyAuth(func(key string, c echo.Context) (bool, error) { return key == b.GetString("Token"), nil @@ -81,6 +96,7 @@ func (b *API) Disconnect() error { } func (b *API) JoinChannel(channel config.ChannelInfo) error { + // we could have a `chan config.Message` for each text channel here, instead of hardcoded "api" return nil } @@ -92,7 +108,9 @@ func (b *API) Send(msg config.Message) (string, error) { if msg.Event == config.EventMsgDelete { return "", nil } - b.send <- msg + b.Log.Debugf("enqueueing message from %s to group broadcast", msg.Username) + b.Messages.Enqueue(msg) + b.group.Send(msg) return "", nil } @@ -119,20 +137,8 @@ func (b *API) handlePostMessage(c echo.Context) error { func (b *API) handleMessages(c echo.Context) error { b.Lock() defer b.Unlock() - // collect all messages until the channel has no more messages in the buffer - var messages []config.Message - loop: for { - select { - case msg := <- b.send: - messages = append(messages, msg) - default: - break loop - } - } - // TODO: get all messages from send channel - c.JSONPretty(http.StatusOK, messages, " ") - // TODO: clear send channel ? - //b.send = make(chan config.Message, b.GetInt("Buffer")) + _ = c.JSONPretty(http.StatusOK, b.Messages.Values(), " ") + // not clearing history.. intentionally //b.Messages = ring.Ring{} return nil } @@ -147,15 +153,26 @@ func (b *API) getGreeting() config.Message { func (b *API) handleStream(c echo.Context) error { c.Response().Header().Set(echo.HeaderContentType, echo.MIMEApplicationJSON) c.Response().WriteHeader(http.StatusOK) + greet := b.getGreeting() if err := json.NewEncoder(c.Response()).Encode(greet); err != nil { return err } c.Response().Flush() + + // TODO: currently this skips sending history + // TODO: send history from ringbuffer ? + + member := *b.group.Join() + defer func() { + // i hope this will properly close it.. + member.Close() + }() + for { select { // block until channel has message - case msg := <- b.send: + case msg := <-member.Read: if err := json.NewEncoder(c.Response()).Encode(msg); err != nil { return err } @@ -176,18 +193,20 @@ func (b *API) handleWebsocketMessage(message config.Message) { b.Remote <- message } -func (b *API) writePump(conn *websocket.Conn) { +func (b *API) writePump(conn *websocket.Conn, member bcast.Member) { ticker := time.NewTicker(pingPeriod) defer func() { b.Log.Debug("closing websocket") ticker.Stop() - conn.Close() + _ = conn.Close() + member.Close() }() for { select { - case msg := <-b.send: - conn.SetWriteDeadline(time.Now().Add(writeWait)) + case msg := <-member.Read: + _ = conn.SetWriteDeadline(time.Now().Add(writeWait)) + b.Log.Debugf("sending message %v", msg) err := conn.WriteJSON(msg) if err != nil { b.Log.Errorf("error: %v", err) @@ -195,7 +214,7 @@ func (b *API) writePump(conn *websocket.Conn) { } case <-ticker.C: b.Log.Debug("sending ping") - conn.SetWriteDeadline(time.Now().Add(writeWait)) + _ = conn.SetWriteDeadline(time.Now().Add(writeWait)) if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil { b.Log.Errorf("error: %v", err) return @@ -207,36 +226,22 @@ func (b *API) writePump(conn *websocket.Conn) { func (b *API) readPump(conn *websocket.Conn) { defer func() { b.Log.Debug("closing websocket") - conn.Close() + _ = conn.Close() }() _ = conn.SetReadDeadline(time.Now().Add(pongWait)) conn.SetPongHandler( func(string) error { b.Log.Debug("received pong") - conn.SetReadDeadline(time.Now().Add(pongWait)) + _ = conn.SetReadDeadline(time.Now().Add(pongWait)) return nil }, ) - for { + for { message := config.Message{} - //err := conn.ReadJSON(&message) - //if err != nil { - // b.Log.Errorf("error: %v", err) - // return - //} - _, messageBytes, err := conn.ReadMessage() + err := conn.ReadJSON(&message) if err != nil { - b.Log.Errorf("error: %v", err) - return - } - err = json.NewDecoder(bytes.NewReader(messageBytes)).Decode(&message) - if err != nil { - if err == io.EOF { - // One value is expected in the message. - err = io.ErrUnexpectedEOF - } if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { b.Log.Errorf("Websocket closed unexpectedly: %v", err) } @@ -257,7 +262,22 @@ func (b *API) handleWebsocket(c echo.Context) error { greet := b.getGreeting() _ = conn.WriteJSON(greet) - go b.writePump(conn) + // TODO: maybe send all history as single message as json array ? + + // send all messages from history + for _, msg := range b.Messages.Values() { + _ = conn.SetWriteDeadline(time.Now().Add(writeWait)) + b.Log.Debugf("sending message %v", msg) + err := conn.WriteJSON(msg) + if err != nil { + b.Log.Errorf("error: %v", err) + break + } + } + + member := *b.group.Join() + + go b.writePump(conn, member) go b.readPump(conn) return nil diff --git a/go.mod b/go.mod index b3c277c6..66fff769 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/gopackage/ddp v0.0.0-20170117053602-652027933df4 // indirect github.com/gorilla/schema v1.1.0 github.com/gorilla/websocket v1.4.2 + github.com/grafov/bcast v0.0.0-20190217190352-1447f067e08d github.com/hashicorp/golang-lru v0.5.4 github.com/jpillora/backoff v1.0.0 github.com/keybase/go-keybase-chat-bot v0.0.0-20200505163032-5cacf52379da diff --git a/go.sum b/go.sum index de1929b5..73ac2792 100644 --- a/go.sum +++ b/go.sum @@ -245,6 +245,8 @@ github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= github.com/gregjones/httpcache v0.0.0-20190212212710-3befbb6ad0cc/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= +github.com/grafov/bcast v0.0.0-20190217190352-1447f067e08d h1:Q2+KsA/1GLC9xyLsDun3/EOJ+83rY/IHRsO1DToPrdo= +github.com/grafov/bcast v0.0.0-20190217190352-1447f067e08d/go.mod h1:RInr+B3/Tx70hYm0rpNPMTD7vH0pBG5ny/JsHAs2KcQ= github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= github.com/grpc-ecosystem/grpc-gateway v1.5.0/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw= diff --git a/vendor/github.com/grafov/bcast/.gitignore b/vendor/github.com/grafov/bcast/.gitignore new file mode 100644 index 00000000..fd43a0ae --- /dev/null +++ b/vendor/github.com/grafov/bcast/.gitignore @@ -0,0 +1,24 @@ +# Compiled Object files, Static and Dynamic libs (Shared Objects) +*.o +*.a +*.so + +# Folders +_obj +_test + +# Architecture specific extensions/prefixes +*.[568vq] +[568vq].out + +*.cgo1.go +*.cgo2.c +_cgo_defun.c +_cgo_gotypes.go +_cgo_export.* + +_testmain.go + +*.exe + +TAGS diff --git a/vendor/github.com/grafov/bcast/.travis.yml b/vendor/github.com/grafov/bcast/.travis.yml new file mode 100644 index 00000000..4f2ee4d9 --- /dev/null +++ b/vendor/github.com/grafov/bcast/.travis.yml @@ -0,0 +1 @@ +language: go diff --git a/vendor/github.com/grafov/bcast/LICENSE b/vendor/github.com/grafov/bcast/LICENSE new file mode 100644 index 00000000..3dacf484 --- /dev/null +++ b/vendor/github.com/grafov/bcast/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2013, Alexander I.Grafov aka Axel +All rights reserved. + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + + Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + + Redistributions in binary form must reproduce the above copyright notice, this + list of conditions and the following disclaimer in the documentation and/or + other materials provided with the distribution. + + Neither the name of the author nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR +ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/github.com/grafov/bcast/README.md b/vendor/github.com/grafov/bcast/README.md new file mode 100644 index 00000000..909ccb60 --- /dev/null +++ b/vendor/github.com/grafov/bcast/README.md @@ -0,0 +1,69 @@ +bcast package for Go +==================== + +Broadcasting on a set of channels in Go. Go channels offer different usage patterns but not ready to use broadcast pattern. +This library solves the problem in direct way. Each routine keeps member structure with own input channel and single for all +members output channel. Central dispatcher accepts broadcasts and resend them to all members. + +Usage [![Go Walker](http://img.shields.io/badge/docs-API-brightgreen.svg?style=flat)](http://gowalker.org/github.com/NimbleIndustry/bcast) +----- + +Firstly import package and create broadcast group. You may create any number of groups for different broadcasts: + + import ( + "github.com/grafov/bcast" + ) + + group := bcast.NewGroup() // create broadcast group + go group.Broadcast(0) // accepts messages and broadcast it to all members + +You may listen broadcasts limited time: + + bcast.Broadcast(2 * time.Minute) // if message not arrived during 2 min. function exits + +Now join to the group from different goroutines: + + member1 := group.Join() // joined member1 from one routine + +Either member may send message which received by all other members of the group: + + member1.Send("test message") // send message to all members + +Also you may send message to group from nonmember of a group: + + group.Send("test message") + +Method `Send` accepts `interface{}` type so any values may be broadcasted. + + member2 := group.Join() // joined member2 form another routine + val := member1.Recv() // broadcasted value received + +Another way to receive broadcasted messages is listen input channel of the member. + + val := <-*member1.In // each member keeps pointer to its own input channel + +It may be convenient for example when `select` used. + +See more examples in a test suit `bcast_test.go`. + +Install +------- + +`go get github.com/grafov/bcast` + +The library doesn't require external packages for build. The next +package required if you want to run unit tests: + +`gopkg.in/fatih/set.v0` + +License +------- + +Library licensed under BSD 3-clause license. See LICENSE. + +Project status [![Build Status](https://img.shields.io/travis/grafov/bcast/master.svg?style=flat)](https://travis-ci.org/grafov/bcast) +-------------- + +WIP again. There is bug found (see #12) and some possible improvements are waiting for review (#9). + +API is stable. No major changes planned, maybe small improvements. diff --git a/vendor/github.com/grafov/bcast/bcast.go b/vendor/github.com/grafov/bcast/bcast.go new file mode 100644 index 00000000..02be7aa8 --- /dev/null +++ b/vendor/github.com/grafov/bcast/bcast.go @@ -0,0 +1,219 @@ +package bcast + +/* + bcast package for Go. Broadcasting on a set of channels. + + Copyright © 2013 Alexander I.Grafov . + All rights reserved. + Use of this source code is governed by a BSD-style + license that can be found in the LICENSE file. +*/ + +import ( + "container/heap" + "errors" + "sync" + "time" +) + +// Message is an internal structure to pack messages together with +// info about sender. +type Message struct { + sender *Member + payload interface{} + clock int +} + +// Member represents member of a Broadcast group. +type Member struct { + group *Group + Read chan interface{} + clock int + messageQueue PriorityQueue + send chan Message + close chan bool +} + +// Group provides a mechanism for the broadcast of messages to a +// collection of channels. +type Group struct { + in chan Message + close chan bool + members []*Member + clock int + memberLock sync.Mutex + clockLock sync.Mutex +} + +// NewGroup creates a new broadcast group. +func NewGroup() *Group { + in := make(chan Message) + close := make(chan bool) + return &Group{in: in, close: close, clock: 0} +} + +// MemberCount returns the number of members in the Broadcast Group. +func (g *Group) MemberCount() int { + return len(g.Members()) +} + +// Members returns a slice of Members that are currently in the Group. +func (g *Group) Members() []*Member { + g.memberLock.Lock() + res := g.members[:] + g.memberLock.Unlock() + return res +} + +// Join returns a new member object and handles the creation of its +// output channel. +func (g *Group) Join() *Member { + memberChannel := make(chan interface{}) + return g.Add(memberChannel) +} + +// Leave removes the provided member from the group and closes him +func (g *Group) Leave(leaving *Member) error { + g.memberLock.Lock() + memberIndex := -1 + for index, member := range g.members { + if member == leaving { + memberIndex = index + break + } + } + if memberIndex == -1 { + g.memberLock.Unlock() + return errors.New("Could not find provided member for removal") + } + g.members = append(g.members[:memberIndex], g.members[memberIndex+1:]...) + leaving.close <- true // TODO: need to handle the case where there + close(leaving.Read) + + // is still stuff in this Members priorityQueue + g.memberLock.Unlock() + return nil +} + +// Add adds a member to the group for the provided interface channel. +func (g *Group) Add(memberChannel chan interface{}) *Member { + g.memberLock.Lock() + g.clockLock.Lock() + member := &Member{ + group: g, + Read: memberChannel, + clock: g.clock, + messageQueue: PriorityQueue{}, + send: make(chan Message), + close: make(chan bool), + } + go member.listen() + g.members = append(g.members, member) + g.clockLock.Unlock() + g.memberLock.Unlock() + return member +} + +// Close terminates the group immediately. +func (g *Group) Close() { + g.close <- true +} + +// Broadcast messages received from one group member to others. +// If incoming messages not arrived during `timeout` then function returns. +func (g *Group) Broadcast(timeout time.Duration) { + var timeoutChannel <-chan time.Time + if timeout != 0 { + timeoutChannel = time.After(timeout) + } + for { + select { + case received := <-g.in: + g.memberLock.Lock() + g.clockLock.Lock() + members := g.members[:] + received.clock = g.clock + g.clock++ + g.clockLock.Unlock() + g.memberLock.Unlock() + for _, member := range members { + // This is done in a goroutine because if it + // weren't it would be a blocking call + go func(member *Member, received Message) { + member.send <- received + }(member, received) + } + case <-timeoutChannel: + if timeout > 0 { + return + } + case <-g.close: + return + } + } +} + +// Send broadcasts a message to every one of a Group's members. +func (g *Group) Send(val interface{}) { + g.in <- Message{sender: nil, payload: val} +} + +// Close removes the member it is called on from its broadcast group +// and closes Read channel. +func (m *Member) Close() { + m.group.Leave(m) +} + +// Send broadcasts a message from one Member to the channels of all +// the other members in its group. +func (m *Member) Send(val interface{}) { + m.group.in <- Message{sender: m, payload: val} +} + +// Recv reads one value from the member's Read channel +func (m *Member) Recv() interface{} { + return <-m.Read +} + +func (m *Member) listen() { + for { + select { + case message := <-m.send: + m.handleMessage(&message) + case <-m.close: + return + } + } +} + +func (m *Member) handleMessage(message *Message) { + if !m.trySend(message) { + heap.Push(&m.messageQueue, &Item{ + priority: message.clock, + value: message, + }) + return + } + if m.messageQueue.Len() > 0 { + nextMessage := m.messageQueue[0].value.(*Message) + for m.trySend(nextMessage) { + heap.Pop(&m.messageQueue) + if m.messageQueue.Len() > 0 { + nextMessage = m.messageQueue[0].value.(*Message) + } else { + break + } + } + } +} + +func (m *Member) trySend(message *Message) bool { + shouldSend := message.clock == m.clock + if shouldSend { + if message.sender != m { + m.Read <- message.payload + } + m.clock++ + } + return shouldSend +} diff --git a/vendor/github.com/grafov/bcast/go.mod b/vendor/github.com/grafov/bcast/go.mod new file mode 100644 index 00000000..de902aca --- /dev/null +++ b/vendor/github.com/grafov/bcast/go.mod @@ -0,0 +1,3 @@ +module github.com/grafov/bcast + +// go: no requirements found in vendor/vendor.json diff --git a/vendor/github.com/grafov/bcast/priority_queue.go b/vendor/github.com/grafov/bcast/priority_queue.go new file mode 100644 index 00000000..bbf7b714 --- /dev/null +++ b/vendor/github.com/grafov/bcast/priority_queue.go @@ -0,0 +1,52 @@ +package bcast + +import ( + "container/heap" +) + +// An Item is something we manage in a priority queue. +type Item struct { + value interface{} + priority int // The priority of the item in the queue. + // The index is needed by update and is maintained by the heap.Interface methods. + index int // The index of the item in the heap. +} + +// A PriorityQueue implements heap.Interface and holds Items. +type PriorityQueue []*Item + +func (pq PriorityQueue) Len() int { return len(pq) } + +func (pq PriorityQueue) Less(i, j int) bool { + // We want Pop to give us the lowest priority so we use less than here. + return pq[i].priority < pq[j].priority +} + +func (pq PriorityQueue) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] + pq[i].index = i + pq[j].index = j +} + +func (pq *PriorityQueue) Push(x interface{}) { + n := len(*pq) + item := x.(*Item) + item.index = n + *pq = append(*pq, item) +} + +func (pq *PriorityQueue) Pop() interface{} { + old := *pq + n := len(old) + item := old[n-1] + item.index = -1 // for safety + *pq = old[0 : n-1] + return item +} + +// update modifies the priority and value of an Item in the queue. +func (pq *PriorityQueue) update(item *Item, value string, priority int) { + item.value = value + item.priority = priority + heap.Fix(pq, item.index) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index da0774ff..68ab6572 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -65,6 +65,8 @@ github.com/gopackage/ddp github.com/gorilla/schema # github.com/gorilla/websocket v1.4.2 github.com/gorilla/websocket +# github.com/grafov/bcast v0.0.0-20190217190352-1447f067e08d +github.com/grafov/bcast # github.com/hashicorp/golang-lru v0.5.4 github.com/hashicorp/golang-lru github.com/hashicorp/golang-lru/simplelru