make parallel websocket handling work

This commit is contained in:
nikky
2020-08-08 01:20:30 +02:00
parent 433b1528ad
commit 2d79cb03ef
11 changed files with 465 additions and 45 deletions

View File

@@ -1,9 +1,8 @@
package api
import (
"bytes"
"encoding/json"
"io"
"github.com/grafov/bcast"
"net/http"
"sync"
"time"
@@ -11,8 +10,10 @@ import (
"github.com/42wim/matterbridge/bridge"
"github.com/42wim/matterbridge/bridge/config"
"github.com/gorilla/websocket"
//"github.com/grafov/bcast"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
ring "github.com/zfjagann/golang-ring"
)
const (
@@ -20,14 +21,19 @@ const (
writeWait = 10 * time.Second
// Time allowed to read the next pong message from the peer.
pongWait = 10 * time.Second // TODO: 60 seconds
pongWait = 60 * time.Second // TODO: 60 seconds
// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
)
type API struct {
send chan config.Message
Messages ring.Ring
group bcast.Group
//messageMember bcast.Member
//messageChannel chan config.Message
//streamMember bcast.Member
//streamChannel chan config.Message
sync.RWMutex
*bridge.Config
}
@@ -45,7 +51,16 @@ func New(cfg *bridge.Config) bridge.Bridger {
e := echo.New()
e.HideBanner = true
e.HidePort = true
b.send = make(chan config.Message, b.GetInt("Buffer"))
b.group = *bcast.NewGroup()
go b.group.Broadcast(0) // TODO: cancel this group broadcast at some point ?
b.Messages = ring.Ring{}
if b.GetInt("Buffer") != 0 {
b.Messages.SetCapacity(b.GetInt("Buffer"))
} else {
// TODO: set default capacity ?
}
if b.GetString("Token") != "" {
e.Use(middleware.KeyAuth(func(key string, c echo.Context) (bool, error) {
return key == b.GetString("Token"), nil
@@ -81,6 +96,7 @@ func (b *API) Disconnect() error {
}
func (b *API) JoinChannel(channel config.ChannelInfo) error {
// we could have a `chan config.Message` for each text channel here, instead of hardcoded "api"
return nil
}
@@ -92,7 +108,9 @@ func (b *API) Send(msg config.Message) (string, error) {
if msg.Event == config.EventMsgDelete {
return "", nil
}
b.send <- msg
b.Log.Debugf("enqueueing message from %s to group broadcast", msg.Username)
b.Messages.Enqueue(msg)
b.group.Send(msg)
return "", nil
}
@@ -119,20 +137,8 @@ func (b *API) handlePostMessage(c echo.Context) error {
func (b *API) handleMessages(c echo.Context) error {
b.Lock()
defer b.Unlock()
// collect all messages until the channel has no more messages in the buffer
var messages []config.Message
loop: for {
select {
case msg := <- b.send:
messages = append(messages, msg)
default:
break loop
}
}
// TODO: get all messages from send channel
c.JSONPretty(http.StatusOK, messages, " ")
// TODO: clear send channel ?
//b.send = make(chan config.Message, b.GetInt("Buffer"))
_ = c.JSONPretty(http.StatusOK, b.Messages.Values(), " ")
// not clearing history.. intentionally
//b.Messages = ring.Ring{}
return nil
}
@@ -147,15 +153,26 @@ func (b *API) getGreeting() config.Message {
func (b *API) handleStream(c echo.Context) error {
c.Response().Header().Set(echo.HeaderContentType, echo.MIMEApplicationJSON)
c.Response().WriteHeader(http.StatusOK)
greet := b.getGreeting()
if err := json.NewEncoder(c.Response()).Encode(greet); err != nil {
return err
}
c.Response().Flush()
// TODO: currently this skips sending history
// TODO: send history from ringbuffer ?
member := *b.group.Join()
defer func() {
// i hope this will properly close it..
member.Close()
}()
for {
select {
// block until channel has message
case msg := <- b.send:
case msg := <-member.Read:
if err := json.NewEncoder(c.Response()).Encode(msg); err != nil {
return err
}
@@ -176,18 +193,20 @@ func (b *API) handleWebsocketMessage(message config.Message) {
b.Remote <- message
}
func (b *API) writePump(conn *websocket.Conn) {
func (b *API) writePump(conn *websocket.Conn, member bcast.Member) {
ticker := time.NewTicker(pingPeriod)
defer func() {
b.Log.Debug("closing websocket")
ticker.Stop()
conn.Close()
_ = conn.Close()
member.Close()
}()
for {
select {
case msg := <-b.send:
conn.SetWriteDeadline(time.Now().Add(writeWait))
case msg := <-member.Read:
_ = conn.SetWriteDeadline(time.Now().Add(writeWait))
b.Log.Debugf("sending message %v", msg)
err := conn.WriteJSON(msg)
if err != nil {
b.Log.Errorf("error: %v", err)
@@ -195,7 +214,7 @@ func (b *API) writePump(conn *websocket.Conn) {
}
case <-ticker.C:
b.Log.Debug("sending ping")
conn.SetWriteDeadline(time.Now().Add(writeWait))
_ = conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
b.Log.Errorf("error: %v", err)
return
@@ -207,36 +226,22 @@ func (b *API) writePump(conn *websocket.Conn) {
func (b *API) readPump(conn *websocket.Conn) {
defer func() {
b.Log.Debug("closing websocket")
conn.Close()
_ = conn.Close()
}()
_ = conn.SetReadDeadline(time.Now().Add(pongWait))
conn.SetPongHandler(
func(string) error {
b.Log.Debug("received pong")
conn.SetReadDeadline(time.Now().Add(pongWait))
_ = conn.SetReadDeadline(time.Now().Add(pongWait))
return nil
},
)
for {
for {
message := config.Message{}
//err := conn.ReadJSON(&message)
//if err != nil {
// b.Log.Errorf("error: %v", err)
// return
//}
_, messageBytes, err := conn.ReadMessage()
err := conn.ReadJSON(&message)
if err != nil {
b.Log.Errorf("error: %v", err)
return
}
err = json.NewDecoder(bytes.NewReader(messageBytes)).Decode(&message)
if err != nil {
if err == io.EOF {
// One value is expected in the message.
err = io.ErrUnexpectedEOF
}
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
b.Log.Errorf("Websocket closed unexpectedly: %v", err)
}
@@ -257,7 +262,22 @@ func (b *API) handleWebsocket(c echo.Context) error {
greet := b.getGreeting()
_ = conn.WriteJSON(greet)
go b.writePump(conn)
// TODO: maybe send all history as single message as json array ?
// send all messages from history
for _, msg := range b.Messages.Values() {
_ = conn.SetWriteDeadline(time.Now().Add(writeWait))
b.Log.Debugf("sending message %v", msg)
err := conn.WriteJSON(msg)
if err != nil {
b.Log.Errorf("error: %v", err)
break
}
}
member := *b.group.Join()
go b.writePump(conn, member)
go b.readPump(conn)
return nil

1
go.mod
View File

@@ -15,6 +15,7 @@ require (
github.com/gopackage/ddp v0.0.0-20170117053602-652027933df4 // indirect
github.com/gorilla/schema v1.1.0
github.com/gorilla/websocket v1.4.2
github.com/grafov/bcast v0.0.0-20190217190352-1447f067e08d
github.com/hashicorp/golang-lru v0.5.4
github.com/jpillora/backoff v1.0.0
github.com/keybase/go-keybase-chat-bot v0.0.0-20200505163032-5cacf52379da

2
go.sum
View File

@@ -245,6 +245,8 @@ github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/gregjones/httpcache v0.0.0-20190212212710-3befbb6ad0cc/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/grafov/bcast v0.0.0-20190217190352-1447f067e08d h1:Q2+KsA/1GLC9xyLsDun3/EOJ+83rY/IHRsO1DToPrdo=
github.com/grafov/bcast v0.0.0-20190217190352-1447f067e08d/go.mod h1:RInr+B3/Tx70hYm0rpNPMTD7vH0pBG5ny/JsHAs2KcQ=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/grpc-ecosystem/grpc-gateway v1.5.0/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw=

24
vendor/github.com/grafov/bcast/.gitignore generated vendored Normal file
View File

@@ -0,0 +1,24 @@
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
*.a
*.so
# Folders
_obj
_test
# Architecture specific extensions/prefixes
*.[568vq]
[568vq].out
*.cgo1.go
*.cgo2.c
_cgo_defun.c
_cgo_gotypes.go
_cgo_export.*
_testmain.go
*.exe
TAGS

1
vendor/github.com/grafov/bcast/.travis.yml generated vendored Normal file
View File

@@ -0,0 +1 @@
language: go

27
vendor/github.com/grafov/bcast/LICENSE generated vendored Normal file
View File

@@ -0,0 +1,27 @@
Copyright (c) 2013, Alexander I.Grafov aka Axel
All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
Redistributions in binary form must reproduce the above copyright notice, this
list of conditions and the following disclaimer in the documentation and/or
other materials provided with the distribution.
Neither the name of the author nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

69
vendor/github.com/grafov/bcast/README.md generated vendored Normal file
View File

@@ -0,0 +1,69 @@
bcast package for Go
====================
Broadcasting on a set of channels in Go. Go channels offer different usage patterns but not ready to use broadcast pattern.
This library solves the problem in direct way. Each routine keeps member structure with own input channel and single for all
members output channel. Central dispatcher accepts broadcasts and resend them to all members.
Usage [![Go Walker](http://img.shields.io/badge/docs-API-brightgreen.svg?style=flat)](http://gowalker.org/github.com/NimbleIndustry/bcast)
-----
Firstly import package and create broadcast group. You may create any number of groups for different broadcasts:
import (
"github.com/grafov/bcast"
)
group := bcast.NewGroup() // create broadcast group
go group.Broadcast(0) // accepts messages and broadcast it to all members
You may listen broadcasts limited time:
bcast.Broadcast(2 * time.Minute) // if message not arrived during 2 min. function exits
Now join to the group from different goroutines:
member1 := group.Join() // joined member1 from one routine
Either member may send message which received by all other members of the group:
member1.Send("test message") // send message to all members
Also you may send message to group from nonmember of a group:
group.Send("test message")
Method `Send` accepts `interface{}` type so any values may be broadcasted.
member2 := group.Join() // joined member2 form another routine
val := member1.Recv() // broadcasted value received
Another way to receive broadcasted messages is listen input channel of the member.
val := <-*member1.In // each member keeps pointer to its own input channel
It may be convenient for example when `select` used.
See more examples in a test suit `bcast_test.go`.
Install
-------
`go get github.com/grafov/bcast`
The library doesn't require external packages for build. The next
package required if you want to run unit tests:
`gopkg.in/fatih/set.v0`
License
-------
Library licensed under BSD 3-clause license. See LICENSE.
Project status [![Build Status](https://img.shields.io/travis/grafov/bcast/master.svg?style=flat)](https://travis-ci.org/grafov/bcast)
--------------
WIP again. There is bug found (see #12) and some possible improvements are waiting for review (#9).
API is stable. No major changes planned, maybe small improvements.

219
vendor/github.com/grafov/bcast/bcast.go generated vendored Normal file
View File

@@ -0,0 +1,219 @@
package bcast
/*
bcast package for Go. Broadcasting on a set of channels.
Copyright © 2013 Alexander I.Grafov <grafov@gmail.com>.
All rights reserved.
Use of this source code is governed by a BSD-style
license that can be found in the LICENSE file.
*/
import (
"container/heap"
"errors"
"sync"
"time"
)
// Message is an internal structure to pack messages together with
// info about sender.
type Message struct {
sender *Member
payload interface{}
clock int
}
// Member represents member of a Broadcast group.
type Member struct {
group *Group
Read chan interface{}
clock int
messageQueue PriorityQueue
send chan Message
close chan bool
}
// Group provides a mechanism for the broadcast of messages to a
// collection of channels.
type Group struct {
in chan Message
close chan bool
members []*Member
clock int
memberLock sync.Mutex
clockLock sync.Mutex
}
// NewGroup creates a new broadcast group.
func NewGroup() *Group {
in := make(chan Message)
close := make(chan bool)
return &Group{in: in, close: close, clock: 0}
}
// MemberCount returns the number of members in the Broadcast Group.
func (g *Group) MemberCount() int {
return len(g.Members())
}
// Members returns a slice of Members that are currently in the Group.
func (g *Group) Members() []*Member {
g.memberLock.Lock()
res := g.members[:]
g.memberLock.Unlock()
return res
}
// Join returns a new member object and handles the creation of its
// output channel.
func (g *Group) Join() *Member {
memberChannel := make(chan interface{})
return g.Add(memberChannel)
}
// Leave removes the provided member from the group and closes him
func (g *Group) Leave(leaving *Member) error {
g.memberLock.Lock()
memberIndex := -1
for index, member := range g.members {
if member == leaving {
memberIndex = index
break
}
}
if memberIndex == -1 {
g.memberLock.Unlock()
return errors.New("Could not find provided member for removal")
}
g.members = append(g.members[:memberIndex], g.members[memberIndex+1:]...)
leaving.close <- true // TODO: need to handle the case where there
close(leaving.Read)
// is still stuff in this Members priorityQueue
g.memberLock.Unlock()
return nil
}
// Add adds a member to the group for the provided interface channel.
func (g *Group) Add(memberChannel chan interface{}) *Member {
g.memberLock.Lock()
g.clockLock.Lock()
member := &Member{
group: g,
Read: memberChannel,
clock: g.clock,
messageQueue: PriorityQueue{},
send: make(chan Message),
close: make(chan bool),
}
go member.listen()
g.members = append(g.members, member)
g.clockLock.Unlock()
g.memberLock.Unlock()
return member
}
// Close terminates the group immediately.
func (g *Group) Close() {
g.close <- true
}
// Broadcast messages received from one group member to others.
// If incoming messages not arrived during `timeout` then function returns.
func (g *Group) Broadcast(timeout time.Duration) {
var timeoutChannel <-chan time.Time
if timeout != 0 {
timeoutChannel = time.After(timeout)
}
for {
select {
case received := <-g.in:
g.memberLock.Lock()
g.clockLock.Lock()
members := g.members[:]
received.clock = g.clock
g.clock++
g.clockLock.Unlock()
g.memberLock.Unlock()
for _, member := range members {
// This is done in a goroutine because if it
// weren't it would be a blocking call
go func(member *Member, received Message) {
member.send <- received
}(member, received)
}
case <-timeoutChannel:
if timeout > 0 {
return
}
case <-g.close:
return
}
}
}
// Send broadcasts a message to every one of a Group's members.
func (g *Group) Send(val interface{}) {
g.in <- Message{sender: nil, payload: val}
}
// Close removes the member it is called on from its broadcast group
// and closes Read channel.
func (m *Member) Close() {
m.group.Leave(m)
}
// Send broadcasts a message from one Member to the channels of all
// the other members in its group.
func (m *Member) Send(val interface{}) {
m.group.in <- Message{sender: m, payload: val}
}
// Recv reads one value from the member's Read channel
func (m *Member) Recv() interface{} {
return <-m.Read
}
func (m *Member) listen() {
for {
select {
case message := <-m.send:
m.handleMessage(&message)
case <-m.close:
return
}
}
}
func (m *Member) handleMessage(message *Message) {
if !m.trySend(message) {
heap.Push(&m.messageQueue, &Item{
priority: message.clock,
value: message,
})
return
}
if m.messageQueue.Len() > 0 {
nextMessage := m.messageQueue[0].value.(*Message)
for m.trySend(nextMessage) {
heap.Pop(&m.messageQueue)
if m.messageQueue.Len() > 0 {
nextMessage = m.messageQueue[0].value.(*Message)
} else {
break
}
}
}
}
func (m *Member) trySend(message *Message) bool {
shouldSend := message.clock == m.clock
if shouldSend {
if message.sender != m {
m.Read <- message.payload
}
m.clock++
}
return shouldSend
}

3
vendor/github.com/grafov/bcast/go.mod generated vendored Normal file
View File

@@ -0,0 +1,3 @@
module github.com/grafov/bcast
// go: no requirements found in vendor/vendor.json

52
vendor/github.com/grafov/bcast/priority_queue.go generated vendored Normal file
View File

@@ -0,0 +1,52 @@
package bcast
import (
"container/heap"
)
// An Item is something we manage in a priority queue.
type Item struct {
value interface{}
priority int // The priority of the item in the queue.
// The index is needed by update and is maintained by the heap.Interface methods.
index int // The index of the item in the heap.
}
// A PriorityQueue implements heap.Interface and holds Items.
type PriorityQueue []*Item
func (pq PriorityQueue) Len() int { return len(pq) }
func (pq PriorityQueue) Less(i, j int) bool {
// We want Pop to give us the lowest priority so we use less than here.
return pq[i].priority < pq[j].priority
}
func (pq PriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}
func (pq *PriorityQueue) Push(x interface{}) {
n := len(*pq)
item := x.(*Item)
item.index = n
*pq = append(*pq, item)
}
func (pq *PriorityQueue) Pop() interface{} {
old := *pq
n := len(old)
item := old[n-1]
item.index = -1 // for safety
*pq = old[0 : n-1]
return item
}
// update modifies the priority and value of an Item in the queue.
func (pq *PriorityQueue) update(item *Item, value string, priority int) {
item.value = value
item.priority = priority
heap.Fix(pq, item.index)
}

2
vendor/modules.txt vendored
View File

@@ -65,6 +65,8 @@ github.com/gopackage/ddp
github.com/gorilla/schema
# github.com/gorilla/websocket v1.4.2
github.com/gorilla/websocket
# github.com/grafov/bcast v0.0.0-20190217190352-1447f067e08d
github.com/grafov/bcast
# github.com/hashicorp/golang-lru v0.5.4
github.com/hashicorp/golang-lru
github.com/hashicorp/golang-lru/simplelru