feat: Waku v2 bridge

Issue #12610
This commit is contained in:
Michal Iskierko
2023-11-12 13:29:38 +01:00
parent 56e7bd01ca
commit 6d31343205
6716 changed files with 1982502 additions and 5891 deletions

View File

@@ -0,0 +1,43 @@
MailServer
==========
This document is meant to collect various information about our MailServer implementation.
## Syncing between mail servers
It might happen that one mail server is behind other due to various reasons like a machine being down for a few minutes etc.
There is an option to fix such a mail server:
1. SSH to a machine where this broken mail server runs,
2. Add a mail server from which you want to sync:
```
# sudo might be not needed in your setup
$ echo '{"jsonrpc":"2.0","method":"admin_addPeer", "params": ["enode://c42f368a23fa98ee546fd247220759062323249ef657d26d357a777443aec04db1b29a3a22ef3e7c548e18493ddaf51a31b0aed6079bd6ebe5ae838fcfaf3a49@206.189.243.162:30504"], "id":1}' | \
sudo socat -d -d - UNIX-CONNECT:/docker/statusd-mail/data/geth.ipc
```
3. Mark it as a trusted peer:
```
# sudo might be not needed in your setup
$ echo '{"jsonrpc":"2.0","method":"shh_markTrustedPeer", "params": ["enode://c42f368a23fa98ee546fd247220759062323249ef657d26d357a777443aec04db1b29a3a22ef3e7c548e18493ddaf51a31b0aed6079bd6ebe5ae838fcfaf3a49@206.189.243.162:30504"], "id":1}' | \
sudo socat -d -d - UNIX-CONNECT:/docker/statusd-mail/data/geth.ipc
```
4. Finally, trigger the sync command:
```
# sudo might be not needed in your setup
$ echo '{"jsonrpc":"2.0","method":"shhext_syncMessages","params":[{"mailServerPeer":"enode://c42f368a23fa98ee546fd247220759062323249ef657d26d357a777443aec04db1b29a3a22ef3e7c548e18493ddaf51a31b0aed6079bd6ebe5ae838fcfaf3a49@206.189.243.162:30504", "to": 1550479953, "from": 1550393583, "limit": 1000}],"id":1}' | \
sudo socat -d -d - UNIX-CONNECT:/docker/statusd-mail/data/geth.ipc
```
You can add `"followCursor": true` if you want it to automatically download messages until the cursor is empty meaning all data was synced.
### Debugging
To verify that your mail server received any responses, watch logs and seek for logs like this:
```
INFO [02-18|09:08:54.257] received sync response count=217 final=false err= cursor=[]
```
And it should finish with:
```
INFO [02-18|09:08:54.431] received sync response count=0 final=true err= cursor=[]
```

View File

@@ -0,0 +1,85 @@
package mailserver
import (
"sync"
"time"
"github.com/ethereum/go-ethereum/log"
)
const (
dbCleanerBatchSize = 1000
dbCleanerPeriod = time.Hour
)
// dbCleaner removes old messages from a db.
type dbCleaner struct {
sync.RWMutex
db DB
batchSize int
retention time.Duration
period time.Duration
cancel chan struct{}
}
// newDBCleaner returns a new cleaner for db.
func newDBCleaner(db DB, retention time.Duration) *dbCleaner {
return &dbCleaner{
db: db,
retention: retention,
batchSize: dbCleanerBatchSize,
period: dbCleanerPeriod,
}
}
// Start starts a loop that cleans up old messages.
func (c *dbCleaner) Start() {
log.Info("Starting cleaning envelopes", "period", c.period, "retention", c.retention)
cancel := make(chan struct{})
c.Lock()
c.cancel = cancel
c.Unlock()
go c.schedule(c.period, cancel)
}
// Stops stops the cleaning loop.
func (c *dbCleaner) Stop() {
c.Lock()
defer c.Unlock()
if c.cancel == nil {
return
}
close(c.cancel)
c.cancel = nil
}
func (c *dbCleaner) schedule(period time.Duration, cancel <-chan struct{}) {
t := time.NewTicker(period)
defer t.Stop()
for {
select {
case <-t.C:
count, err := c.PruneEntriesOlderThan(time.Now().Add(-c.retention))
if err != nil {
log.Error("failed to prune data", "err", err)
}
log.Info("Prunned some some messages successfully", "count", count)
case <-cancel:
return
}
}
}
// PruneEntriesOlderThan removes messages sent between lower and upper timestamps
// and returns how many have been removed.
func (c *dbCleaner) PruneEntriesOlderThan(t time.Time) (int, error) {
return c.db.Prune(t, c.batchSize)
}

View File

@@ -0,0 +1,53 @@
package mailserver
import (
"encoding/binary"
"errors"
"github.com/status-im/status-go/eth-node/types"
)
const (
// DBKeyLength is a size of the envelope key.
DBKeyLength = types.HashLength + timestampLength + types.TopicLength
CursorLength = types.HashLength + timestampLength
)
var (
// ErrInvalidByteSize is returned when DBKey can't be created
// from a byte slice because it has invalid length.
ErrInvalidByteSize = errors.New("byte slice has invalid length")
)
// DBKey key to be stored in a db.
type DBKey struct {
raw []byte
}
// Bytes returns a bytes representation of the DBKey.
func (k *DBKey) Bytes() []byte {
return k.raw
}
func (k *DBKey) Topic() types.TopicType {
return types.BytesToTopic(k.raw[timestampLength+types.HashLength:])
}
func (k *DBKey) EnvelopeHash() types.Hash {
return types.BytesToHash(k.raw[timestampLength : types.HashLength+timestampLength])
}
func (k *DBKey) Cursor() []byte {
// We don't use the whole cursor for backward compatibility (also it's not needed)
return k.raw[:CursorLength]
}
// NewDBKey creates a new DBKey with the given values.
func NewDBKey(timestamp uint32, topic types.TopicType, h types.Hash) *DBKey {
var k DBKey
k.raw = make([]byte, DBKeyLength)
binary.BigEndian.PutUint32(k.raw, timestamp)
copy(k.raw[timestampLength:], h[:])
copy(k.raw[timestampLength+types.HashLength:], topic[:])
return &k
}

View File

@@ -0,0 +1,88 @@
package mailserver
import (
"sync"
"time"
)
type rateLimiter struct {
sync.RWMutex
lifespan time.Duration // duration of the limit
db map[string]time.Time
period time.Duration
cancel chan struct{}
}
func newRateLimiter(duration time.Duration) *rateLimiter {
return &rateLimiter{
lifespan: duration,
db: make(map[string]time.Time),
period: time.Second,
}
}
func (l *rateLimiter) Start() {
cancel := make(chan struct{})
l.Lock()
l.cancel = cancel
l.Unlock()
go l.cleanUp(l.period, cancel)
}
func (l *rateLimiter) Stop() {
l.Lock()
defer l.Unlock()
if l.cancel == nil {
return
}
close(l.cancel)
l.cancel = nil
}
func (l *rateLimiter) Add(id string) {
l.Lock()
l.db[id] = time.Now()
l.Unlock()
}
func (l *rateLimiter) IsAllowed(id string) bool {
l.RLock()
defer l.RUnlock()
if lastRequestTime, ok := l.db[id]; ok {
return lastRequestTime.Add(l.lifespan).Before(time.Now())
}
return true
}
func (l *rateLimiter) cleanUp(period time.Duration, cancel <-chan struct{}) {
t := time.NewTicker(period)
defer t.Stop()
for {
select {
case <-t.C:
l.deleteExpired()
case <-cancel:
return
}
}
}
func (l *rateLimiter) deleteExpired() {
l.Lock()
defer l.Unlock()
now := time.Now()
for id, lastRequestTime := range l.db {
if lastRequestTime.Add(l.lifespan).Before(now) {
delete(l.db, id)
}
}
}

View File

@@ -0,0 +1,948 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package mailserver
import (
"crypto/ecdsa"
"encoding/binary"
"errors"
"fmt"
"math/rand"
"sync"
"time"
prom "github.com/prometheus/client_golang/prometheus"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
gethbridge "github.com/status-im/status-go/eth-node/bridge/geth"
"github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/params"
"github.com/status-im/status-go/waku"
wakucommon "github.com/status-im/status-go/waku/common"
)
const (
maxQueryRange = 24 * time.Hour
maxQueryLimit = 1000
// When we default the upper limit, we want to extend the range a bit
// to accommodate for envelopes with slightly higher timestamp, in seconds
whisperTTLSafeThreshold = 60
)
var (
errDirectoryNotProvided = errors.New("data directory not provided")
errDecryptionMethodNotProvided = errors.New("decryption method is not provided")
)
const (
timestampLength = 4
requestLimitLength = 4
requestTimeRangeLength = timestampLength * 2
processRequestTimeout = time.Minute
)
type Config struct {
// DataDir points to a directory where mailserver's data is stored.
DataDir string
// Password is used to create a symmetric key to decrypt requests.
Password string
// AsymKey is an asymmetric key to decrypt requests.
AsymKey string
// MininumPoW is a minimum PoW for requests.
MinimumPoW float64
// RateLimit is a maximum number of requests per second from a peer.
RateLimit int
// DataRetention specifies a number of days an envelope should be stored for.
DataRetention int
PostgresEnabled bool
PostgresURI string
}
// --------------
// WakuMailServer
// --------------
type WakuMailServer struct {
ms *mailServer
shh *waku.Waku
minRequestPoW float64
symFilter *wakucommon.Filter
asymFilter *wakucommon.Filter
}
func (s *WakuMailServer) Init(waku *waku.Waku, cfg *params.WakuConfig) error {
s.shh = waku
s.minRequestPoW = cfg.MinimumPoW
config := Config{
DataDir: cfg.DataDir,
Password: cfg.MailServerPassword,
MinimumPoW: cfg.MinimumPoW,
DataRetention: cfg.MailServerDataRetention,
RateLimit: cfg.MailServerRateLimit,
PostgresEnabled: cfg.DatabaseConfig.PGConfig.Enabled,
PostgresURI: cfg.DatabaseConfig.PGConfig.URI,
}
var err error
s.ms, err = newMailServer(
config,
&wakuAdapter{},
&wakuService{Waku: waku},
)
if err != nil {
return err
}
if err := s.setupDecryptor(config.Password, config.AsymKey); err != nil {
return err
}
return nil
}
func (s *WakuMailServer) Close() {
s.ms.Close()
}
func (s *WakuMailServer) Archive(env *wakucommon.Envelope) {
s.ms.Archive(gethbridge.NewWakuEnvelope(env))
}
func (s *WakuMailServer) Deliver(peerID []byte, req wakucommon.MessagesRequest) {
s.ms.DeliverMail(types.BytesToHash(peerID), types.BytesToHash(req.ID), MessagesRequestPayload{
Lower: req.From,
Upper: req.To,
Bloom: req.Bloom,
Topics: req.Topics,
Limit: req.Limit,
Cursor: req.Cursor,
Batch: true,
})
}
// DEPRECATED; user Deliver instead
func (s *WakuMailServer) DeliverMail(peerID []byte, req *wakucommon.Envelope) {
payload, err := s.decodeRequest(peerID, req)
if err != nil {
deliveryFailuresCounter.WithLabelValues("validation").Inc()
log.Error(
"[mailserver:DeliverMail] request failed validaton",
"peerID", types.BytesToHash(peerID),
"requestID", req.Hash().String(),
"err", err,
)
s.ms.sendHistoricMessageErrorResponse(types.BytesToHash(peerID), types.Hash(req.Hash()), err)
return
}
s.ms.DeliverMail(types.BytesToHash(peerID), types.Hash(req.Hash()), payload)
}
// bloomFromReceivedMessage for a given whisper.ReceivedMessage it extracts the
// used bloom filter.
func (s *WakuMailServer) bloomFromReceivedMessage(msg *wakucommon.ReceivedMessage) ([]byte, error) {
payloadSize := len(msg.Payload)
if payloadSize < 8 {
return nil, errors.New("Undersized p2p request")
} else if payloadSize == 8 {
return wakucommon.MakeFullNodeBloom(), nil
} else if payloadSize < 8+wakucommon.BloomFilterSize {
return nil, errors.New("Undersized bloom filter in p2p request")
}
return msg.Payload[8 : 8+wakucommon.BloomFilterSize], nil
}
func (s *WakuMailServer) decompositeRequest(peerID []byte, request *wakucommon.Envelope) (MessagesRequestPayload, error) {
var (
payload MessagesRequestPayload
err error
)
if s.minRequestPoW > 0.0 && request.PoW() < s.minRequestPoW {
return payload, fmt.Errorf("PoW() is too low")
}
decrypted := s.openEnvelope(request)
if decrypted == nil {
return payload, fmt.Errorf("failed to decrypt p2p request")
}
if err := checkMsgSignature(decrypted.Src, peerID); err != nil {
return payload, err
}
payload.Bloom, err = s.bloomFromReceivedMessage(decrypted)
if err != nil {
return payload, err
}
payload.Lower = binary.BigEndian.Uint32(decrypted.Payload[:4])
payload.Upper = binary.BigEndian.Uint32(decrypted.Payload[4:8])
if payload.Upper < payload.Lower {
err := fmt.Errorf("query range is invalid: from > to (%d > %d)", payload.Lower, payload.Upper)
return payload, err
}
lowerTime := time.Unix(int64(payload.Lower), 0)
upperTime := time.Unix(int64(payload.Upper), 0)
if upperTime.Sub(lowerTime) > maxQueryRange {
err := fmt.Errorf("query range too big for peer %s", string(peerID))
return payload, err
}
if len(decrypted.Payload) >= requestTimeRangeLength+wakucommon.BloomFilterSize+requestLimitLength {
payload.Limit = binary.BigEndian.Uint32(decrypted.Payload[requestTimeRangeLength+wakucommon.BloomFilterSize:])
}
if len(decrypted.Payload) == requestTimeRangeLength+wakucommon.BloomFilterSize+requestLimitLength+DBKeyLength {
payload.Cursor = decrypted.Payload[requestTimeRangeLength+wakucommon.BloomFilterSize+requestLimitLength:]
}
return payload, nil
}
func (s *WakuMailServer) setupDecryptor(password, asymKey string) error {
s.symFilter = nil
s.asymFilter = nil
if password != "" {
keyID, err := s.shh.AddSymKeyFromPassword(password)
if err != nil {
return fmt.Errorf("create symmetric key: %v", err)
}
symKey, err := s.shh.GetSymKey(keyID)
if err != nil {
return fmt.Errorf("save symmetric key: %v", err)
}
s.symFilter = &wakucommon.Filter{KeySym: symKey}
}
if asymKey != "" {
keyAsym, err := crypto.HexToECDSA(asymKey)
if err != nil {
return err
}
s.asymFilter = &wakucommon.Filter{KeyAsym: keyAsym}
}
return nil
}
// openEnvelope tries to decrypt an envelope, first based on asymetric key (if
// provided) and second on the symetric key (if provided)
func (s *WakuMailServer) openEnvelope(request *wakucommon.Envelope) *wakucommon.ReceivedMessage {
if s.asymFilter != nil {
if d := request.Open(s.asymFilter); d != nil {
return d
}
}
if s.symFilter != nil {
if d := request.Open(s.symFilter); d != nil {
return d
}
}
return nil
}
func (s *WakuMailServer) decodeRequest(peerID []byte, request *wakucommon.Envelope) (MessagesRequestPayload, error) {
var payload MessagesRequestPayload
if s.minRequestPoW > 0.0 && request.PoW() < s.minRequestPoW {
return payload, errors.New("PoW too low")
}
decrypted := s.openEnvelope(request)
if decrypted == nil {
log.Warn("Failed to decrypt p2p request")
return payload, errors.New("failed to decrypt p2p request")
}
if err := checkMsgSignature(decrypted.Src, peerID); err != nil {
log.Warn("Check message signature failed", "err", err.Error())
return payload, fmt.Errorf("check message signature failed: %v", err)
}
if err := rlp.DecodeBytes(decrypted.Payload, &payload); err != nil {
return payload, fmt.Errorf("failed to decode data: %v", err)
}
if payload.Upper == 0 {
payload.Upper = uint32(time.Now().Unix() + whisperTTLSafeThreshold)
}
if payload.Upper < payload.Lower {
log.Error("Query range is invalid: lower > upper", "lower", payload.Lower, "upper", payload.Upper)
return payload, errors.New("query range is invalid: lower > upper")
}
return payload, nil
}
// -------
// adapter
// -------
type adapter interface {
CreateRequestFailedPayload(reqID types.Hash, err error) []byte
CreateRequestCompletedPayload(reqID, lastEnvelopeHash types.Hash, cursor []byte) []byte
CreateSyncResponse(envelopes []types.Envelope, cursor []byte, final bool, err string) interface{}
CreateRawSyncResponse(envelopes []rlp.RawValue, cursor []byte, final bool, err string) interface{}
}
// -----------
// wakuAdapter
// -----------
type wakuAdapter struct{}
var _ adapter = (*wakuAdapter)(nil)
func (wakuAdapter) CreateRequestFailedPayload(reqID types.Hash, err error) []byte {
return waku.CreateMailServerRequestFailedPayload(common.Hash(reqID), err)
}
func (wakuAdapter) CreateRequestCompletedPayload(reqID, lastEnvelopeHash types.Hash, cursor []byte) []byte {
return waku.CreateMailServerRequestCompletedPayload(common.Hash(reqID), common.Hash(lastEnvelopeHash), cursor)
}
func (wakuAdapter) CreateSyncResponse(_ []types.Envelope, _ []byte, _ bool, _ string) interface{} {
return nil
}
func (wakuAdapter) CreateRawSyncResponse(_ []rlp.RawValue, _ []byte, _ bool, _ string) interface{} {
return nil
}
// -------
// service
// -------
type service interface {
SendHistoricMessageResponse(peerID []byte, payload []byte) error
SendRawP2PDirect(peerID []byte, envelopes ...rlp.RawValue) error
MaxMessageSize() uint32
SendRawSyncResponse(peerID []byte, data interface{}) error // optional
SendSyncResponse(peerID []byte, data interface{}) error // optional
}
// -----------
// wakuService
// -----------
type wakuService struct {
*waku.Waku
}
func (s *wakuService) SendRawSyncResponse(peerID []byte, data interface{}) error {
return errors.New("syncing mailservers is not support by Waku")
}
func (s *wakuService) SendSyncResponse(peerID []byte, data interface{}) error {
return errors.New("syncing mailservers is not support by Waku")
}
// ----------
// mailServer
// ----------
type mailServer struct {
adapter adapter
service service
db DB
cleaner *dbCleaner // removes old envelopes
muRateLimiter sync.RWMutex
rateLimiter *rateLimiter
}
func newMailServer(cfg Config, adapter adapter, service service) (*mailServer, error) {
if len(cfg.DataDir) == 0 {
return nil, errDirectoryNotProvided
}
// TODO: move out
if len(cfg.Password) == 0 && len(cfg.AsymKey) == 0 {
return nil, errDecryptionMethodNotProvided
}
s := mailServer{
adapter: adapter,
service: service,
}
if cfg.RateLimit > 0 {
s.setupRateLimiter(time.Duration(cfg.RateLimit) * time.Second)
}
// Open database in the last step in order not to init with error
// and leave the database open by accident.
if cfg.PostgresEnabled {
log.Info("Connecting to postgres database")
database, err := NewPostgresDB(cfg.PostgresURI)
if err != nil {
return nil, fmt.Errorf("open DB: %s", err)
}
s.db = database
log.Info("Connected to postgres database")
} else {
// Defaults to LevelDB
database, err := NewLevelDB(cfg.DataDir)
if err != nil {
return nil, fmt.Errorf("open DB: %s", err)
}
s.db = database
}
if cfg.DataRetention > 0 {
// MailServerDataRetention is a number of days.
s.setupCleaner(time.Duration(cfg.DataRetention) * time.Hour * 24)
}
return &s, nil
}
// setupRateLimiter in case limit is bigger than 0 it will setup an automated
// limit db cleanup.
func (s *mailServer) setupRateLimiter(limit time.Duration) {
s.rateLimiter = newRateLimiter(limit)
s.rateLimiter.Start()
}
func (s *mailServer) setupCleaner(retention time.Duration) {
s.cleaner = newDBCleaner(s.db, retention)
s.cleaner.Start()
}
func (s *mailServer) Archive(env types.Envelope) {
err := s.db.SaveEnvelope(env)
if err != nil {
log.Error("Could not save envelope", "hash", env.Hash().String())
}
}
func (s *mailServer) DeliverMail(peerID, reqID types.Hash, req MessagesRequestPayload) {
timer := prom.NewTimer(mailDeliveryDuration)
defer timer.ObserveDuration()
deliveryAttemptsCounter.Inc()
log.Info(
"[mailserver:DeliverMail] delivering mail",
"peerID", peerID.String(),
"requestID", reqID.String(),
)
req.SetDefaults()
log.Info(
"[mailserver:DeliverMail] processing request",
"peerID", peerID.String(),
"requestID", reqID.String(),
"lower", req.Lower,
"upper", req.Upper,
"bloom", req.Bloom,
"topics", req.Topics,
"limit", req.Limit,
"cursor", req.Cursor,
"batch", req.Batch,
)
if err := req.Validate(); err != nil {
syncFailuresCounter.WithLabelValues("req_invalid").Inc()
log.Error(
"[mailserver:DeliverMail] request invalid",
"peerID", peerID.String(),
"requestID", reqID.String(),
"err", err,
)
s.sendHistoricMessageErrorResponse(peerID, reqID, fmt.Errorf("request is invalid: %v", err))
return
}
if s.exceedsPeerRequests(peerID) {
deliveryFailuresCounter.WithLabelValues("peer_req_limit").Inc()
log.Error(
"[mailserver:DeliverMail] peer exceeded the limit",
"peerID", peerID.String(),
"requestID", reqID.String(),
)
s.sendHistoricMessageErrorResponse(peerID, reqID, fmt.Errorf("rate limit exceeded"))
return
}
if req.Batch {
requestsBatchedCounter.Inc()
}
iter, err := s.createIterator(req)
if err != nil {
log.Error(
"[mailserver:DeliverMail] request failed",
"peerID", peerID.String(),
"requestID", reqID.String(),
"err", err,
)
return
}
defer func() { _ = iter.Release() }()
bundles := make(chan []rlp.RawValue, 5)
errCh := make(chan error)
cancelProcessing := make(chan struct{})
go func() {
counter := 0
for bundle := range bundles {
if err := s.sendRawEnvelopes(peerID, bundle, req.Batch); err != nil {
close(cancelProcessing)
errCh <- err
break
}
counter++
}
close(errCh)
log.Info(
"[mailserver:DeliverMail] finished sending bundles",
"peerID", peerID,
"requestID", reqID.String(),
"counter", counter,
)
}()
nextPageCursor, lastEnvelopeHash := s.processRequestInBundles(
iter,
req.Bloom,
req.Topics,
int(req.Limit),
processRequestTimeout,
reqID.String(),
bundles,
cancelProcessing,
)
// Wait for the goroutine to finish the work. It may return an error.
if err := <-errCh; err != nil {
deliveryFailuresCounter.WithLabelValues("process").Inc()
log.Error(
"[mailserver:DeliverMail] error while processing",
"err", err,
"peerID", peerID,
"requestID", reqID,
)
s.sendHistoricMessageErrorResponse(peerID, reqID, err)
return
}
// Processing of the request could be finished earlier due to iterator error.
if err := iter.Error(); err != nil {
deliveryFailuresCounter.WithLabelValues("iterator").Inc()
log.Error(
"[mailserver:DeliverMail] iterator failed",
"err", err,
"peerID", peerID,
"requestID", reqID,
)
s.sendHistoricMessageErrorResponse(peerID, reqID, err)
return
}
log.Info(
"[mailserver:DeliverMail] sending historic message response",
"peerID", peerID,
"requestID", reqID,
"last", lastEnvelopeHash,
"next", nextPageCursor,
)
s.sendHistoricMessageResponse(peerID, reqID, lastEnvelopeHash, nextPageCursor)
}
func (s *mailServer) SyncMail(peerID types.Hash, req MessagesRequestPayload) error {
log.Info("Started syncing envelopes", "peer", peerID.String(), "req", req)
requestID := fmt.Sprintf("%d-%d", time.Now().UnixNano(), rand.Intn(1000)) // nolint: gosec
syncAttemptsCounter.Inc()
// Check rate limiting for a requesting peer.
if s.exceedsPeerRequests(peerID) {
syncFailuresCounter.WithLabelValues("req_per_sec_limit").Inc()
log.Error("Peer exceeded request per seconds limit", "peerID", peerID.String())
return fmt.Errorf("requests per seconds limit exceeded")
}
req.SetDefaults()
if err := req.Validate(); err != nil {
syncFailuresCounter.WithLabelValues("req_invalid").Inc()
return fmt.Errorf("request is invalid: %v", err)
}
iter, err := s.createIterator(req)
if err != nil {
syncFailuresCounter.WithLabelValues("iterator").Inc()
return err
}
defer func() { _ = iter.Release() }()
bundles := make(chan []rlp.RawValue, 5)
errCh := make(chan error)
cancelProcessing := make(chan struct{})
go func() {
for bundle := range bundles {
resp := s.adapter.CreateRawSyncResponse(bundle, nil, false, "")
if err := s.service.SendRawSyncResponse(peerID.Bytes(), resp); err != nil {
close(cancelProcessing)
errCh <- fmt.Errorf("failed to send sync response: %v", err)
break
}
}
close(errCh)
}()
nextCursor, _ := s.processRequestInBundles(
iter,
req.Bloom,
req.Topics,
int(req.Limit),
processRequestTimeout,
requestID,
bundles,
cancelProcessing,
)
// Wait for the goroutine to finish the work. It may return an error.
if err := <-errCh; err != nil {
syncFailuresCounter.WithLabelValues("routine").Inc()
_ = s.service.SendSyncResponse(
peerID.Bytes(),
s.adapter.CreateSyncResponse(nil, nil, false, "failed to send a response"),
)
return err
}
// Processing of the request could be finished earlier due to iterator error.
if err := iter.Error(); err != nil {
syncFailuresCounter.WithLabelValues("iterator").Inc()
_ = s.service.SendSyncResponse(
peerID.Bytes(),
s.adapter.CreateSyncResponse(nil, nil, false, "failed to process all envelopes"),
)
return fmt.Errorf("LevelDB iterator failed: %v", err)
}
log.Info("Finished syncing envelopes", "peer", peerID.String())
err = s.service.SendSyncResponse(
peerID.Bytes(),
s.adapter.CreateSyncResponse(nil, nextCursor, true, ""),
)
if err != nil {
syncFailuresCounter.WithLabelValues("response_send").Inc()
return fmt.Errorf("failed to send the final sync response: %v", err)
}
return nil
}
// Close the mailserver and its associated db connection.
func (s *mailServer) Close() {
if s.db != nil {
if err := s.db.Close(); err != nil {
log.Error("closing database failed", "err", err)
}
}
if s.rateLimiter != nil {
s.rateLimiter.Stop()
}
if s.cleaner != nil {
s.cleaner.Stop()
}
}
func (s *mailServer) exceedsPeerRequests(peerID types.Hash) bool {
s.muRateLimiter.RLock()
defer s.muRateLimiter.RUnlock()
if s.rateLimiter == nil {
return false
}
if s.rateLimiter.IsAllowed(peerID.String()) {
s.rateLimiter.Add(peerID.String())
return false
}
log.Info("peerID exceeded the number of requests per second", "peerID", peerID.String())
return true
}
func (s *mailServer) createIterator(req MessagesRequestPayload) (Iterator, error) {
var (
emptyHash types.Hash
emptyTopic types.TopicType
ku, kl *DBKey
)
ku = NewDBKey(req.Upper+1, emptyTopic, emptyHash)
kl = NewDBKey(req.Lower, emptyTopic, emptyHash)
query := CursorQuery{
start: kl.Bytes(),
end: ku.Bytes(),
cursor: req.Cursor,
topics: req.Topics,
bloom: req.Bloom,
limit: req.Limit,
}
return s.db.BuildIterator(query)
}
func (s *mailServer) processRequestInBundles(
iter Iterator,
bloom []byte,
topics [][]byte,
limit int,
timeout time.Duration,
requestID string,
output chan<- []rlp.RawValue,
cancel <-chan struct{},
) ([]byte, types.Hash) {
timer := prom.NewTimer(requestsInBundlesDuration)
defer timer.ObserveDuration()
var (
bundle []rlp.RawValue
bundleSize uint32
batches [][]rlp.RawValue
processedEnvelopes int
processedEnvelopesSize int64
nextCursor []byte
lastEnvelopeHash types.Hash
)
log.Info(
"[mailserver:processRequestInBundles] processing request",
"requestID", requestID,
"limit", limit,
)
var topicsMap map[types.TopicType]bool
if len(topics) != 0 {
topicsMap = make(map[types.TopicType]bool)
for _, t := range topics {
topicsMap[types.BytesToTopic(t)] = true
}
}
// We iterate over the envelopes.
// We collect envelopes in batches.
// If there still room and we haven't reached the limit
// append and continue.
// Otherwise publish what you have so far, reset the bundle to the
// current envelope, and leave if we hit the limit
for iter.Next() {
var rawValue []byte
var err error
if len(topicsMap) != 0 {
rawValue, err = iter.GetEnvelopeByTopicsMap(topicsMap)
} else if len(bloom) != 0 {
rawValue, err = iter.GetEnvelopeByBloomFilter(bloom)
} else {
err = errors.New("either topics or bloom must be specified")
}
if err != nil {
log.Error(
"[mailserver:processRequestInBundles]Failed to get envelope from iterator",
"err", err,
"requestID", requestID,
)
continue
}
if rawValue == nil {
continue
}
key, err := iter.DBKey()
if err != nil {
log.Error(
"[mailserver:processRequestInBundles] failed getting key",
"requestID", requestID,
)
break
}
// TODO(adam): this is invalid code. If the limit is 1000,
// it will only send 999 items and send a cursor.
lastEnvelopeHash = key.EnvelopeHash()
processedEnvelopes++
envelopeSize := uint32(len(rawValue))
limitReached := processedEnvelopes >= limit
newSize := bundleSize + envelopeSize
// If we still have some room for messages, add and continue
if !limitReached && newSize < s.service.MaxMessageSize() {
bundle = append(bundle, rawValue)
bundleSize = newSize
continue
}
// Publish if anything is in the bundle (there should always be
// something unless limit = 1)
if len(bundle) != 0 {
batches = append(batches, bundle)
processedEnvelopesSize += int64(bundleSize)
}
// Reset the bundle with the current envelope
bundle = []rlp.RawValue{rawValue}
bundleSize = envelopeSize
// Leave if we reached the limit
if limitReached {
nextCursor = key.Cursor()
break
}
}
if len(bundle) > 0 {
batches = append(batches, bundle)
processedEnvelopesSize += int64(bundleSize)
}
log.Info(
"[mailserver:processRequestInBundles] publishing envelopes",
"requestID", requestID,
"batchesCount", len(batches),
"envelopeCount", processedEnvelopes,
"processedEnvelopesSize", processedEnvelopesSize,
"cursor", nextCursor,
)
// Publish
batchLoop:
for _, batch := range batches {
select {
case output <- batch:
// It might happen that during producing the batches,
// the connection with the peer goes down and
// the consumer of `output` channel exits prematurely.
// In such a case, we should stop pushing batches and exit.
case <-cancel:
log.Info(
"[mailserver:processRequestInBundles] failed to push all batches",
"requestID", requestID,
)
break batchLoop
case <-time.After(timeout):
log.Error(
"[mailserver:processRequestInBundles] timed out pushing a batch",
"requestID", requestID,
)
break batchLoop
}
}
envelopesCounter.Inc()
sentEnvelopeBatchSizeMeter.Observe(float64(processedEnvelopesSize))
log.Info(
"[mailserver:processRequestInBundles] envelopes published",
"requestID", requestID,
)
close(output)
return nextCursor, lastEnvelopeHash
}
func (s *mailServer) sendRawEnvelopes(peerID types.Hash, envelopes []rlp.RawValue, batch bool) error {
timer := prom.NewTimer(sendRawEnvelopeDuration)
defer timer.ObserveDuration()
if batch {
return s.service.SendRawP2PDirect(peerID.Bytes(), envelopes...)
}
for _, env := range envelopes {
if err := s.service.SendRawP2PDirect(peerID.Bytes(), env); err != nil {
return err
}
}
return nil
}
func (s *mailServer) sendHistoricMessageResponse(peerID, reqID, lastEnvelopeHash types.Hash, cursor []byte) {
payload := s.adapter.CreateRequestCompletedPayload(reqID, lastEnvelopeHash, cursor)
err := s.service.SendHistoricMessageResponse(peerID.Bytes(), payload)
if err != nil {
deliveryFailuresCounter.WithLabelValues("historic_msg_resp").Inc()
log.Error(
"[mailserver:DeliverMail] error sending historic message response",
"err", err,
"peerID", peerID,
"requestID", reqID,
)
}
}
func (s *mailServer) sendHistoricMessageErrorResponse(peerID, reqID types.Hash, errorToReport error) {
payload := s.adapter.CreateRequestFailedPayload(reqID, errorToReport)
err := s.service.SendHistoricMessageResponse(peerID.Bytes(), payload)
// if we can't report an error, probably something is wrong with p2p connection,
// so we just print a log entry to document this sad fact
if err != nil {
log.Error("Error while reporting error response", "err", err, "peerID", peerID.String())
}
}
func extractBloomFromEncodedEnvelope(rawValue rlp.RawValue) ([]byte, error) {
var envelope wakucommon.Envelope
decodeErr := rlp.DecodeBytes(rawValue, &envelope)
if decodeErr != nil {
return nil, decodeErr
}
return envelope.Bloom(), nil
}
// checkMsgSignature returns an error in case the message is not correctly signed.
func checkMsgSignature(reqSrc *ecdsa.PublicKey, id []byte) error {
src := crypto.FromECDSAPub(reqSrc)
if len(src)-len(id) == 1 {
src = src[1:]
}
// if you want to check the signature, you can do it here. e.g.:
// if !bytes.Equal(peerID, src) {
if src == nil {
return errors.New("wrong signature of p2p request")
}
return nil
}

View File

@@ -0,0 +1,42 @@
package mailserver
import (
"time"
"github.com/status-im/status-go/eth-node/types"
)
// every this many seconds check real envelopes count
const envelopeCountCheckInterval = 60
// DB is an interface to abstract interactions with the db so that the mailserver
// is agnostic to the underlying technology used
type DB interface {
Close() error
// SaveEnvelope stores an envelope
SaveEnvelope(types.Envelope) error
// GetEnvelope returns an rlp encoded envelope from the datastore
GetEnvelope(*DBKey) ([]byte, error)
// Prune removes envelopes older than time
Prune(time.Time, int) (int, error)
// BuildIterator returns an iterator over envelopes
BuildIterator(query CursorQuery) (Iterator, error)
}
type Iterator interface {
Next() bool
DBKey() (*DBKey, error)
Release() error
Error() error
GetEnvelopeByBloomFilter(bloom []byte) ([]byte, error)
GetEnvelopeByTopicsMap(topics map[types.TopicType]bool) ([]byte, error)
}
type CursorQuery struct {
start []byte
end []byte
cursor []byte
limit uint32
bloom []byte
topics [][]byte
}

View File

@@ -0,0 +1,242 @@
package mailserver
import (
"fmt"
"time"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/util"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
"github.com/status-im/status-go/eth-node/types"
waku "github.com/status-im/status-go/waku/common"
)
type LevelDB struct {
// We can't embed as there are some state problems with go-routines
ldb *leveldb.DB
name string
done chan struct{}
}
type LevelDBIterator struct {
iterator.Iterator
}
func (i *LevelDBIterator) DBKey() (*DBKey, error) {
return &DBKey{
raw: i.Key(),
}, nil
}
func (i *LevelDBIterator) GetEnvelopeByTopicsMap(topics map[types.TopicType]bool) ([]byte, error) {
rawValue := make([]byte, len(i.Value()))
copy(rawValue, i.Value())
key, err := i.DBKey()
if err != nil {
return nil, err
}
if !topics[key.Topic()] {
return nil, nil
}
return rawValue, nil
}
func (i *LevelDBIterator) GetEnvelopeByBloomFilter(bloom []byte) ([]byte, error) {
var envelopeBloom []byte
rawValue := make([]byte, len(i.Value()))
copy(rawValue, i.Value())
key, err := i.DBKey()
if err != nil {
return nil, err
}
if len(key.Bytes()) != DBKeyLength {
var err error
envelopeBloom, err = extractBloomFromEncodedEnvelope(rawValue)
if err != nil {
return nil, err
}
} else {
envelopeBloom = types.TopicToBloom(key.Topic())
}
if !types.BloomFilterMatch(bloom, envelopeBloom) {
return nil, nil
}
return rawValue, nil
}
func (i *LevelDBIterator) Release() error {
i.Iterator.Release()
return nil
}
func NewLevelDB(dataDir string) (*LevelDB, error) {
// Open opens an existing leveldb database
db, err := leveldb.OpenFile(dataDir, nil)
if _, corrupted := err.(*errors.ErrCorrupted); corrupted {
log.Info("database is corrupted trying to recover", "path", dataDir)
db, err = leveldb.RecoverFile(dataDir, nil)
}
instance := LevelDB{
ldb: db,
name: dataDir, // name is used for metrics labels
done: make(chan struct{}),
}
// initialize the metric value
instance.updateArchivedEnvelopesCount()
// checking count on every insert is inefficient
go func() {
for {
select {
case <-instance.done:
return
case <-time.After(time.Second * envelopeCountCheckInterval):
instance.updateArchivedEnvelopesCount()
}
}
}()
return &instance, err
}
// GetEnvelope get an envelope by its key
func (db *LevelDB) GetEnvelope(key *DBKey) ([]byte, error) {
defer recoverLevelDBPanics("GetEnvelope")
return db.ldb.Get(key.Bytes(), nil)
}
func (db *LevelDB) updateArchivedEnvelopesCount() {
if count, err := db.envelopesCount(); err != nil {
log.Warn("db query for envelopes count failed", "err", err)
} else {
archivedEnvelopesGauge.WithLabelValues(db.name).Set(float64(count))
}
}
// Build iterator returns an iterator given a start/end and a cursor
func (db *LevelDB) BuildIterator(query CursorQuery) (Iterator, error) {
defer recoverLevelDBPanics("BuildIterator")
i := db.ldb.NewIterator(&util.Range{Start: query.start, Limit: query.end}, nil)
envelopeQueriesCounter.WithLabelValues("unknown", "unknown").Inc()
// seek to the end as we want to return envelopes in a descending order
if len(query.cursor) == CursorLength {
i.Seek(query.cursor)
}
return &LevelDBIterator{i}, nil
}
// Prune removes envelopes older than time
func (db *LevelDB) Prune(t time.Time, batchSize int) (int, error) {
defer recoverLevelDBPanics("Prune")
var zero types.Hash
var emptyTopic types.TopicType
kl := NewDBKey(0, emptyTopic, zero)
ku := NewDBKey(uint32(t.Unix()), emptyTopic, zero)
query := CursorQuery{
start: kl.Bytes(),
end: ku.Bytes(),
}
i, err := db.BuildIterator(query)
if err != nil {
return 0, err
}
defer func() { _ = i.Release() }()
batch := leveldb.Batch{}
removed := 0
for i.Next() {
dbKey, err := i.DBKey()
if err != nil {
return 0, err
}
batch.Delete(dbKey.Bytes())
if batch.Len() == batchSize {
if err := db.ldb.Write(&batch, nil); err != nil {
return removed, err
}
removed = removed + batch.Len()
batch.Reset()
}
}
if batch.Len() > 0 {
if err := db.ldb.Write(&batch, nil); err != nil {
return removed, err
}
removed = removed + batch.Len()
}
return removed, nil
}
func (db *LevelDB) envelopesCount() (int, error) {
defer recoverLevelDBPanics("envelopesCount")
iterator, err := db.BuildIterator(CursorQuery{})
if err != nil {
return 0, err
}
// LevelDB does not have API for getting a count
var count int
for iterator.Next() {
count++
}
return count, nil
}
// SaveEnvelope stores an envelope in leveldb and increments the metrics
func (db *LevelDB) SaveEnvelope(env types.Envelope) error {
defer recoverLevelDBPanics("SaveEnvelope")
key := NewDBKey(env.Expiry()-env.TTL(), env.Topic(), env.Hash())
rawEnvelope, err := rlp.EncodeToBytes(env.Unwrap())
if err != nil {
log.Error(fmt.Sprintf("rlp.EncodeToBytes failed: %s", err))
archivedErrorsCounter.WithLabelValues(db.name).Inc()
return err
}
if err = db.ldb.Put(key.Bytes(), rawEnvelope, nil); err != nil {
log.Error(fmt.Sprintf("Writing to DB failed: %s", err))
archivedErrorsCounter.WithLabelValues(db.name).Inc()
}
archivedEnvelopesGauge.WithLabelValues(db.name).Inc()
archivedEnvelopeSizeMeter.WithLabelValues(db.name).Observe(
float64(waku.EnvelopeHeaderLength + env.Size()))
return err
}
func (db *LevelDB) Close() error {
select {
case <-db.done:
default:
close(db.done)
}
return db.ldb.Close()
}
func recoverLevelDBPanics(calleMethodName string) {
// Recover from possible goleveldb panics
if r := recover(); r != nil {
if errString, ok := r.(string); ok {
log.Error(fmt.Sprintf("recovered from panic in %s: %s", calleMethodName, errString))
}
}
}

View File

@@ -0,0 +1,309 @@
package mailserver
import (
"database/sql"
"errors"
"fmt"
"time"
"github.com/lib/pq"
// Import postgres driver
_ "github.com/lib/pq"
"github.com/status-im/migrate/v4"
"github.com/status-im/migrate/v4/database/postgres"
bindata "github.com/status-im/migrate/v4/source/go_bindata"
"github.com/status-im/status-go/mailserver/migrations"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
"github.com/status-im/status-go/eth-node/types"
waku "github.com/status-im/status-go/waku/common"
)
type PostgresDB struct {
db *sql.DB
name string
done chan struct{}
}
func NewPostgresDB(uri string) (*PostgresDB, error) {
db, err := sql.Open("postgres", uri)
if err != nil {
return nil, err
}
instance := &PostgresDB{
db: db,
done: make(chan struct{}),
}
if err := instance.setup(); err != nil {
return nil, err
}
// name is used for metrics labels
if name, err := instance.getDBName(uri); err == nil {
instance.name = name
}
// initialize the metric value
instance.updateArchivedEnvelopesCount()
// checking count on every insert is inefficient
go func() {
for {
select {
case <-instance.done:
return
case <-time.After(time.Second * envelopeCountCheckInterval):
instance.updateArchivedEnvelopesCount()
}
}
}()
return instance, nil
}
type postgresIterator struct {
*sql.Rows
}
func (i *PostgresDB) getDBName(uri string) (string, error) {
query := "SELECT current_database()"
var dbName string
return dbName, i.db.QueryRow(query).Scan(&dbName)
}
func (i *PostgresDB) envelopesCount() (int, error) {
query := "SELECT count(*) FROM envelopes"
var count int
return count, i.db.QueryRow(query).Scan(&count)
}
func (i *PostgresDB) updateArchivedEnvelopesCount() {
if count, err := i.envelopesCount(); err != nil {
log.Warn("db query for envelopes count failed", "err", err)
} else {
archivedEnvelopesGauge.WithLabelValues(i.name).Set(float64(count))
}
}
func (i *postgresIterator) DBKey() (*DBKey, error) {
var value []byte
var id []byte
if err := i.Scan(&id, &value); err != nil {
return nil, err
}
return &DBKey{raw: id}, nil
}
func (i *postgresIterator) Error() error {
return i.Err()
}
func (i *postgresIterator) Release() error {
return i.Close()
}
func (i *postgresIterator) GetEnvelopeByBloomFilter(bloom []byte) ([]byte, error) {
var value []byte
var id []byte
if err := i.Scan(&id, &value); err != nil {
return nil, err
}
return value, nil
}
func (i *postgresIterator) GetEnvelopeByTopicsMap(topics map[types.TopicType]bool) ([]byte, error) {
var value []byte
var id []byte
if err := i.Scan(&id, &value); err != nil {
return nil, err
}
return value, nil
}
func (i *PostgresDB) BuildIterator(query CursorQuery) (Iterator, error) {
var args []interface{}
stmtString := "SELECT id, data FROM envelopes"
var historyRange string
if len(query.cursor) > 0 {
args = append(args, query.start, query.cursor)
// If we have a cursor, we don't want to include that envelope in the result set
stmtString += " " + "WHERE id >= $1 AND id < $2"
historyRange = "partial" //nolint: goconst
} else {
args = append(args, query.start, query.end)
stmtString += " " + "WHERE id >= $1 AND id <= $2"
historyRange = "full" //nolint: goconst
}
var filterRange string
if len(query.topics) > 0 {
args = append(args, pq.Array(query.topics))
stmtString += " " + "AND topic = any($3)"
filterRange = "partial" //nolint: goconst
} else {
stmtString += " " + fmt.Sprintf("AND bloom & b'%s'::bit(512) = bloom", toBitString(query.bloom))
filterRange = "full" //nolint: goconst
}
// Positional argument depends on the fact whether the query uses topics or bloom filter.
// If topic is used, the list of topics is passed as an argument to the query.
// If bloom filter is used, it is included into the query statement.
args = append(args, query.limit)
stmtString += " " + fmt.Sprintf("ORDER BY ID DESC LIMIT $%d", len(args))
stmt, err := i.db.Prepare(stmtString)
if err != nil {
return nil, err
}
envelopeQueriesCounter.WithLabelValues(filterRange, historyRange).Inc()
rows, err := stmt.Query(args...)
if err != nil {
return nil, err
}
return &postgresIterator{rows}, nil
}
func (i *PostgresDB) setup() error {
resources := bindata.Resource(
migrations.AssetNames(),
migrations.Asset,
)
source, err := bindata.WithInstance(resources)
if err != nil {
return err
}
driver, err := postgres.WithInstance(i.db, &postgres.Config{})
if err != nil {
return err
}
m, err := migrate.NewWithInstance(
"go-bindata",
source,
"postgres",
driver)
if err != nil {
return err
}
if err = m.Up(); err != migrate.ErrNoChange {
return err
}
return nil
}
func (i *PostgresDB) Close() error {
select {
case <-i.done:
default:
close(i.done)
}
return i.db.Close()
}
func (i *PostgresDB) GetEnvelope(key *DBKey) ([]byte, error) {
statement := `SELECT data FROM envelopes WHERE id = $1`
stmt, err := i.db.Prepare(statement)
if err != nil {
return nil, err
}
defer stmt.Close()
var envelope []byte
if err = stmt.QueryRow(key.Bytes()).Scan(&envelope); err != nil {
return nil, err
}
return envelope, nil
}
func (i *PostgresDB) Prune(t time.Time, batch int) (int, error) {
var zero types.Hash
var emptyTopic types.TopicType
kl := NewDBKey(0, emptyTopic, zero)
ku := NewDBKey(uint32(t.Unix()), emptyTopic, zero)
statement := "DELETE FROM envelopes WHERE id BETWEEN $1 AND $2"
stmt, err := i.db.Prepare(statement)
if err != nil {
return 0, err
}
defer stmt.Close()
result, err := stmt.Exec(kl.Bytes(), ku.Bytes())
if err != nil {
return 0, err
}
rows, err := result.RowsAffected()
if err != nil {
return 0, err
}
return int(rows), nil
}
func (i *PostgresDB) SaveEnvelope(env types.Envelope) error {
topic := env.Topic()
key := NewDBKey(env.Expiry()-env.TTL(), topic, env.Hash())
rawEnvelope, err := rlp.EncodeToBytes(env.Unwrap())
if err != nil {
log.Error(fmt.Sprintf("rlp.EncodeToBytes failed: %s", err))
archivedErrorsCounter.WithLabelValues(i.name).Inc()
return err
}
if rawEnvelope == nil {
archivedErrorsCounter.WithLabelValues(i.name).Inc()
return errors.New("failed to encode envelope to bytes")
}
statement := "INSERT INTO envelopes (id, data, topic, bloom) VALUES ($1, $2, $3, B'"
statement += toBitString(env.Bloom())
statement += "'::bit(512)) ON CONFLICT (id) DO NOTHING;"
stmt, err := i.db.Prepare(statement)
if err != nil {
return err
}
defer stmt.Close()
_, err = stmt.Exec(
key.Bytes(),
rawEnvelope,
topicToByte(topic),
)
if err != nil {
archivedErrorsCounter.WithLabelValues(i.name).Inc()
return err
}
archivedEnvelopesGauge.WithLabelValues(i.name).Inc()
archivedEnvelopeSizeMeter.WithLabelValues(i.name).Observe(
float64(waku.EnvelopeHeaderLength + env.Size()))
return nil
}
func topicToByte(t types.TopicType) []byte {
return []byte{t[0], t[1], t[2], t[3]}
}
func toBitString(bloom []byte) string {
val := ""
for _, n := range bloom {
val += fmt.Sprintf("%08b", n)
}
return val
}

View File

@@ -0,0 +1,84 @@
package mailserver
import prom "github.com/prometheus/client_golang/prometheus"
// By default the /metrics endpoint is not available.
// It is exposed only if -metrics flag is set.
var (
envelopesCounter = prom.NewCounter(prom.CounterOpts{
Name: "mailserver_envelopes_total",
Help: "Number of envelopes processed.",
})
deliveryFailuresCounter = prom.NewCounterVec(prom.CounterOpts{
Name: "mailserver_delivery_failures_total",
Help: "Number of requests that failed processing.",
}, []string{"type"})
deliveryAttemptsCounter = prom.NewCounter(prom.CounterOpts{
Name: "mailserver_delivery_attempts_total",
Help: "Number of Whisper envelopes processed.",
})
requestsBatchedCounter = prom.NewCounter(prom.CounterOpts{
Name: "mailserver_requests_batched_total",
Help: "Number of processed batched requests.",
})
requestsInBundlesDuration = prom.NewHistogram(prom.HistogramOpts{
Name: "mailserver_requests_bundle_process_duration_seconds",
Help: "The time it took to process message bundles.",
})
syncFailuresCounter = prom.NewCounterVec(prom.CounterOpts{
Name: "mailserver_sync_failures_total",
Help: "Number of failures processing a sync requests.",
}, []string{"type"})
syncAttemptsCounter = prom.NewCounter(prom.CounterOpts{
Name: "mailserver_sync_attempts_total",
Help: "Number of attempts are processing a sync requests.",
})
sendRawEnvelopeDuration = prom.NewHistogram(prom.HistogramOpts{
Name: "mailserver_send_raw_envelope_duration_seconds",
Help: "The time it took to send a Whisper envelope.",
})
sentEnvelopeBatchSizeMeter = prom.NewHistogram(prom.HistogramOpts{
Name: "mailserver_sent_envelope_batch_size_bytes",
Help: "Size of processed Whisper envelopes in bytes.",
Buckets: prom.ExponentialBuckets(1024, 4, 10),
})
mailDeliveryDuration = prom.NewHistogram(prom.HistogramOpts{
Name: "mailserver_delivery_duration_seconds",
Help: "Time it takes to deliver messages to a Whisper peer.",
})
archivedErrorsCounter = prom.NewCounterVec(prom.CounterOpts{
Name: "mailserver_archived_envelopes_failures_total",
Help: "Number of failures storing a Whisper envelope.",
}, []string{"db"})
archivedEnvelopesGauge = prom.NewGaugeVec(prom.GaugeOpts{
Name: "mailserver_archived_envelopes_total",
Help: "Number of envelopes saved in the DB.",
}, []string{"db"})
archivedEnvelopeSizeMeter = prom.NewHistogramVec(prom.HistogramOpts{
Name: "mailserver_archived_envelope_size_bytes",
Help: "Size of envelopes saved.",
Buckets: prom.ExponentialBuckets(1024, 2, 11),
}, []string{"db"})
envelopeQueriesCounter = prom.NewCounterVec(prom.CounterOpts{
Name: "mailserver_envelope_queries_total",
Help: "Number of queries for envelopes in the DB.",
}, []string{"filter", "history"})
)
func init() {
prom.MustRegister(envelopesCounter)
prom.MustRegister(deliveryFailuresCounter)
prom.MustRegister(deliveryAttemptsCounter)
prom.MustRegister(requestsBatchedCounter)
prom.MustRegister(requestsInBundlesDuration)
prom.MustRegister(syncFailuresCounter)
prom.MustRegister(syncAttemptsCounter)
prom.MustRegister(sendRawEnvelopeDuration)
prom.MustRegister(sentEnvelopeBatchSizeMeter)
prom.MustRegister(mailDeliveryDuration)
prom.MustRegister(archivedErrorsCounter)
prom.MustRegister(archivedEnvelopesGauge)
prom.MustRegister(archivedEnvelopeSizeMeter)
prom.MustRegister(envelopeQueriesCounter)
}

View File

@@ -0,0 +1,321 @@
// Code generated by go-bindata. DO NOT EDIT.
// sources:
// 1557732988_initialize_db.down.sql (72B)
// 1557732988_initialize_db.up.sql (278B)
// static.go (178B)
package migrations
import (
"bytes"
"compress/gzip"
"crypto/sha256"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"time"
)
func bindataRead(data []byte, name string) ([]byte, error) {
gz, err := gzip.NewReader(bytes.NewBuffer(data))
if err != nil {
return nil, fmt.Errorf("read %q: %w", name, err)
}
var buf bytes.Buffer
_, err = io.Copy(&buf, gz)
clErr := gz.Close()
if err != nil {
return nil, fmt.Errorf("read %q: %w", name, err)
}
if clErr != nil {
return nil, err
}
return buf.Bytes(), nil
}
type asset struct {
bytes []byte
info os.FileInfo
digest [sha256.Size]byte
}
type bindataFileInfo struct {
name string
size int64
mode os.FileMode
modTime time.Time
}
func (fi bindataFileInfo) Name() string {
return fi.name
}
func (fi bindataFileInfo) Size() int64 {
return fi.size
}
func (fi bindataFileInfo) Mode() os.FileMode {
return fi.mode
}
func (fi bindataFileInfo) ModTime() time.Time {
return fi.modTime
}
func (fi bindataFileInfo) IsDir() bool {
return false
}
func (fi bindataFileInfo) Sys() interface{} {
return nil
}
var __1557732988_initialize_dbDownSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\x09\xf2\x0f\x50\xf0\xf4\x73\x71\x8d\x50\xc8\x4c\x89\x4f\xca\xc9\xcf\xcf\x8d\xcf\x4c\xa9\xb0\xe6\x42\x95\x28\xc9\x2f\xc8\x4c\x46\x92\x08\x71\x74\xf2\x71\x55\x48\xcd\x2b\x4b\xcd\xc9\x2f\x48\x2d\xb6\xe6\x02\x04\x00\x00\xff\xff\x6b\x93\xaa\x08\x48\x00\x00\x00")
func _1557732988_initialize_dbDownSqlBytes() ([]byte, error) {
return bindataRead(
__1557732988_initialize_dbDownSql,
"1557732988_initialize_db.down.sql",
)
}
func _1557732988_initialize_dbDownSql() (*asset, error) {
bytes, err := _1557732988_initialize_dbDownSqlBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "1557732988_initialize_db.down.sql", size: 72, mode: os.FileMode(0644), modTime: time.Unix(1704726859, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x77, 0x40, 0x78, 0xb7, 0x71, 0x3c, 0x20, 0x3b, 0xc9, 0xb, 0x2f, 0x49, 0xe4, 0xff, 0x1c, 0x84, 0x54, 0xa1, 0x30, 0xe3, 0x90, 0xf8, 0x73, 0xda, 0xb0, 0x2a, 0xea, 0x8e, 0xf1, 0x82, 0xe7, 0xd2}}
return a, nil
}
var __1557732988_initialize_dbUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\x0e\x72\x75\x0c\x71\x55\x08\x71\x74\xf2\x71\x55\x48\xcd\x2b\x4b\xcd\xc9\x2f\x48\x2d\x56\xd0\xc8\x4c\x51\x70\x8a\x0c\x71\x75\x54\xf0\xf3\x0f\x51\xf0\x0b\xf5\xf1\x51\x08\xf5\xf3\x0c\x0c\x75\xd5\x51\x48\x49\x2c\x49\x44\x93\xd3\x51\x28\xc9\x2f\xc8\x4c\xc6\x10\x4d\xca\xc9\xcf\xcf\x55\x70\xf2\x0c\xd1\x30\x35\x34\xd2\x84\x4b\x68\x5a\x73\x71\x41\xed\xf5\xf4\x73\x71\x8d\x50\xc8\x4c\x89\x07\x2b\x8d\xcf\x4c\xa9\x50\xf0\xf7\x43\x73\x87\x8b\x6b\xb0\x33\xd4\x2c\x4d\x6b\x0c\x8d\x60\x9b\xf1\x69\x04\x2b\x40\xd7\x88\x5d\x97\x06\x4c\x2d\x20\x00\x00\xff\xff\x0b\x7d\x91\x3e\x16\x01\x00\x00")
func _1557732988_initialize_dbUpSqlBytes() ([]byte, error) {
return bindataRead(
__1557732988_initialize_dbUpSql,
"1557732988_initialize_db.up.sql",
)
}
func _1557732988_initialize_dbUpSql() (*asset, error) {
bytes, err := _1557732988_initialize_dbUpSqlBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "1557732988_initialize_db.up.sql", size: 278, mode: os.FileMode(0644), modTime: time.Unix(1704726859, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xf5, 0x85, 0x41, 0x7a, 0xba, 0x4f, 0xa3, 0x43, 0xc0, 0x63, 0xfa, 0x2c, 0xd1, 0xc5, 0xbb, 0x20, 0xa0, 0x64, 0xa8, 0x3b, 0x65, 0x82, 0xa2, 0x14, 0x28, 0x18, 0x7c, 0x8b, 0x3a, 0x7a, 0xfd, 0xe0}}
return a, nil
}
var _staticGo = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x54\x8c\x41\x6a\xc3\x40\x0c\x45\xf7\x73\x8a\xbf\x6c\xa1\x1e\xed\x7b\x82\x52\x12\x08\x24\x17\x90\x6d\x21\x0b\xc7\x33\x46\x52\x72\xfe\x6c\x12\x42\x96\x8f\xc7\x7b\x44\x38\xf1\xb4\xb2\x0a\x22\x39\x6d\x82\x6c\xa3\xcc\xf1\xa2\xaf\xff\xf3\x0f\xfe\x2e\xc7\xc3\x37\x5c\xa2\xdf\x7c\x92\x80\x9b\x2e\x09\x6b\xd9\x91\x8b\x60\xb4\xc6\x6e\x12\x65\xff\x38\x95\x42\xa4\xfd\x57\xa5\x89\x73\x0a\xb4\x0f\xa3\xb5\x99\x93\x31\xec\xab\x62\x33\x75\x4e\xeb\x2d\x30\x74\xd4\x4a\xb5\xd2\xc6\x76\x0d\xf1\xbb\x38\xbd\x35\x3d\xb3\xaa\x1d\xb5\x3c\x02\x00\x00\xff\xff\xf4\xe4\x35\xe2\xb2\x00\x00\x00")
func staticGoBytes() ([]byte, error) {
return bindataRead(
_staticGo,
"static.go",
)
}
func staticGo() (*asset, error) {
bytes, err := staticGoBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "static.go", size: 178, mode: os.FileMode(0644), modTime: time.Unix(1704726859, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xab, 0x8a, 0xf4, 0x27, 0x24, 0x9d, 0x2a, 0x1, 0x7b, 0x54, 0xea, 0xae, 0x4a, 0x35, 0x40, 0x92, 0xb5, 0xf9, 0xb3, 0x54, 0x3e, 0x3a, 0x1a, 0x2b, 0xae, 0xfb, 0x9e, 0x82, 0xeb, 0x4c, 0xf, 0x6}}
return a, nil
}
// Asset loads and returns the asset for the given name.
// It returns an error if the asset could not be found or
// could not be loaded.
func Asset(name string) ([]byte, error) {
canonicalName := strings.Replace(name, "\\", "/", -1)
if f, ok := _bindata[canonicalName]; ok {
a, err := f()
if err != nil {
return nil, fmt.Errorf("Asset %s can't read by error: %v", name, err)
}
return a.bytes, nil
}
return nil, fmt.Errorf("Asset %s not found", name)
}
// AssetString returns the asset contents as a string (instead of a []byte).
func AssetString(name string) (string, error) {
data, err := Asset(name)
return string(data), err
}
// MustAsset is like Asset but panics when Asset would return an error.
// It simplifies safe initialization of global variables.
func MustAsset(name string) []byte {
a, err := Asset(name)
if err != nil {
panic("asset: Asset(" + name + "): " + err.Error())
}
return a
}
// MustAssetString is like AssetString but panics when Asset would return an
// error. It simplifies safe initialization of global variables.
func MustAssetString(name string) string {
return string(MustAsset(name))
}
// AssetInfo loads and returns the asset info for the given name.
// It returns an error if the asset could not be found or
// could not be loaded.
func AssetInfo(name string) (os.FileInfo, error) {
canonicalName := strings.Replace(name, "\\", "/", -1)
if f, ok := _bindata[canonicalName]; ok {
a, err := f()
if err != nil {
return nil, fmt.Errorf("AssetInfo %s can't read by error: %v", name, err)
}
return a.info, nil
}
return nil, fmt.Errorf("AssetInfo %s not found", name)
}
// AssetDigest returns the digest of the file with the given name. It returns an
// error if the asset could not be found or the digest could not be loaded.
func AssetDigest(name string) ([sha256.Size]byte, error) {
canonicalName := strings.Replace(name, "\\", "/", -1)
if f, ok := _bindata[canonicalName]; ok {
a, err := f()
if err != nil {
return [sha256.Size]byte{}, fmt.Errorf("AssetDigest %s can't read by error: %v", name, err)
}
return a.digest, nil
}
return [sha256.Size]byte{}, fmt.Errorf("AssetDigest %s not found", name)
}
// Digests returns a map of all known files and their checksums.
func Digests() (map[string][sha256.Size]byte, error) {
mp := make(map[string][sha256.Size]byte, len(_bindata))
for name := range _bindata {
a, err := _bindata[name]()
if err != nil {
return nil, err
}
mp[name] = a.digest
}
return mp, nil
}
// AssetNames returns the names of the assets.
func AssetNames() []string {
names := make([]string, 0, len(_bindata))
for name := range _bindata {
names = append(names, name)
}
return names
}
// _bindata is a table, holding each asset generator, mapped to its name.
var _bindata = map[string]func() (*asset, error){
"1557732988_initialize_db.down.sql": _1557732988_initialize_dbDownSql,
"1557732988_initialize_db.up.sql": _1557732988_initialize_dbUpSql,
"static.go": staticGo,
}
// AssetDebug is true if the assets were built with the debug flag enabled.
const AssetDebug = false
// AssetDir returns the file names below a certain
// directory embedded in the file by go-bindata.
// For example if you run go-bindata on data/... and data contains the
// following hierarchy:
//
// data/
// foo.txt
// img/
// a.png
// b.png
//
// then AssetDir("data") would return []string{"foo.txt", "img"},
// AssetDir("data/img") would return []string{"a.png", "b.png"},
// AssetDir("foo.txt") and AssetDir("notexist") would return an error, and
// AssetDir("") will return []string{"data"}.
func AssetDir(name string) ([]string, error) {
node := _bintree
if len(name) != 0 {
canonicalName := strings.Replace(name, "\\", "/", -1)
pathList := strings.Split(canonicalName, "/")
for _, p := range pathList {
node = node.Children[p]
if node == nil {
return nil, fmt.Errorf("Asset %s not found", name)
}
}
}
if node.Func != nil {
return nil, fmt.Errorf("Asset %s not found", name)
}
rv := make([]string, 0, len(node.Children))
for childName := range node.Children {
rv = append(rv, childName)
}
return rv, nil
}
type bintree struct {
Func func() (*asset, error)
Children map[string]*bintree
}
var _bintree = &bintree{nil, map[string]*bintree{
"1557732988_initialize_db.down.sql": {_1557732988_initialize_dbDownSql, map[string]*bintree{}},
"1557732988_initialize_db.up.sql": {_1557732988_initialize_dbUpSql, map[string]*bintree{}},
"static.go": {staticGo, map[string]*bintree{}},
}}
// RestoreAsset restores an asset under the given directory.
func RestoreAsset(dir, name string) error {
data, err := Asset(name)
if err != nil {
return err
}
info, err := AssetInfo(name)
if err != nil {
return err
}
err = os.MkdirAll(_filePath(dir, filepath.Dir(name)), os.FileMode(0755))
if err != nil {
return err
}
err = os.WriteFile(_filePath(dir, name), data, info.Mode())
if err != nil {
return err
}
return os.Chtimes(_filePath(dir, name), info.ModTime(), info.ModTime())
}
// RestoreAssets restores an asset under the given directory recursively.
func RestoreAssets(dir, name string) error {
children, err := AssetDir(name)
// File
if err != nil {
return RestoreAsset(dir, name)
}
// Dir
for _, child := range children {
err = RestoreAssets(dir, filepath.Join(name, child))
if err != nil {
return err
}
}
return nil
}
func _filePath(dir, name string) string {
canonicalName := strings.Replace(name, "\\", "/", -1)
return filepath.Join(append([]string{dir}, strings.Split(canonicalName, "/")...)...)
}

View File

@@ -0,0 +1,51 @@
package mailserver
import (
"errors"
"time"
)
const (
maxMessagesRequestPayloadLimit = 1000
)
// MessagesRequestPayload is a payload sent to the Mail Server.
type MessagesRequestPayload struct {
// Lower is a lower bound of time range for which messages are requested.
Lower uint32
// Upper is a lower bound of time range for which messages are requested.
Upper uint32
// Bloom is a bloom filter to filter envelopes.
Bloom []byte
// Topics is a list of topics to filter envelopes.
Topics [][]byte
// Limit is the max number of envelopes to return.
Limit uint32
// Cursor is used for pagination of the results.
Cursor []byte
// Batch set to true indicates that the client supports batched response.
Batch bool
}
func (r *MessagesRequestPayload) SetDefaults() {
if r.Limit == 0 {
r.Limit = maxQueryLimit
}
if r.Upper == 0 {
r.Upper = uint32(time.Now().Unix() + whisperTTLSafeThreshold)
}
}
func (r MessagesRequestPayload) Validate() error {
if r.Upper < r.Lower {
return errors.New("query range is invalid: lower > upper")
}
if len(r.Bloom) == 0 && len(r.Topics) == 0 {
return errors.New("bloom filter and topics is empty")
}
if r.Limit > maxMessagesRequestPayloadLimit {
return errors.New("limit exceeds the maximum allowed value")
}
return nil
}