fix: Upgrade status-go to the most recent version of release branch which contains memory fix

Fix #4990
This commit is contained in:
Michal Iskierko
2024-05-13 12:21:03 +02:00
committed by Michał Iskierko
parent 03d490156a
commit 66cf3d21b9
230 changed files with 30930 additions and 14243 deletions

View File

@@ -0,0 +1,191 @@
package storenodes
import (
"bytes"
"database/sql"
"fmt"
"time"
"github.com/status-im/status-go/eth-node/types"
)
type Database struct {
db *sql.DB
}
func NewDB(db *sql.DB) *Database {
return &Database{db: db}
}
// syncSave will sync the storenodes in the DB from the snode slice
// - if a storenode is not in the provided list, it will be soft-deleted
// - if a storenode is in the provided list, it will be inserted or updated
func (d *Database) syncSave(communityID types.HexBytes, snode []Storenode, clock uint64) (err error) {
var tx *sql.Tx
tx, err = d.db.Begin()
if err != nil {
return err
}
defer func() {
if err == nil {
err = tx.Commit()
return
}
_ = tx.Rollback()
}()
now := time.Now().Unix()
dbNodes, err := d.getByCommunityID(communityID, tx)
if err != nil {
return fmt.Errorf("getting storenodes by community id: %w", err)
}
// Soft-delete db nodes that are not in the provided list
for _, dbN := range dbNodes {
if find(dbN, snode) != nil {
continue
}
if clock != 0 && dbN.Clock >= clock {
continue
}
if err := d.softDelete(communityID, dbN.StorenodeID, now, tx); err != nil {
return fmt.Errorf("soft deleting existing storenodes: %w", err)
}
}
// Insert or update the nodes in the provided list
for _, n := range snode {
// defensively validate the communityID
if len(n.CommunityID) == 0 || !bytes.Equal(communityID, n.CommunityID) {
err = fmt.Errorf("communityID mismatch %v != %v", communityID, n.CommunityID)
return err
}
dbN := find(n, dbNodes)
if dbN != nil && n.Clock != 0 && dbN.Clock >= n.Clock {
continue
}
if err := d.upsert(n, tx); err != nil {
return fmt.Errorf("upserting storenodes: %w", err)
}
}
// TODO for now only allow one storenode per community
count, err := d.countByCommunity(communityID, tx)
if err != nil {
return err
}
if count > 1 {
err = fmt.Errorf("only one storenode per community is allowed")
return err
}
return nil
}
func (d *Database) getAll() ([]Storenode, error) {
rows, err := d.db.Query(`
SELECT community_id, storenode_id, name, address, fleet, version, clock, removed, deleted_at
FROM community_storenodes
WHERE removed = 0
`)
if err != nil {
return nil, err
}
defer rows.Close()
return toStorenodes(rows)
}
func (d *Database) getByCommunityID(communityID types.HexBytes, tx ...*sql.Tx) ([]Storenode, error) {
var rows *sql.Rows
var err error
q := `
SELECT community_id, storenode_id, name, address, fleet, version, clock, removed, deleted_at
FROM community_storenodes
WHERE community_id = ? AND removed = 0
`
if len(tx) > 0 {
rows, err = tx[0].Query(q, communityID)
} else {
rows, err = d.db.Query(q, communityID)
}
if err != nil {
return nil, err
}
defer rows.Close()
return toStorenodes(rows)
}
func (d *Database) softDelete(communityID types.HexBytes, storenodeID string, deletedAt int64, tx *sql.Tx) error {
_, err := tx.Exec("UPDATE community_storenodes SET removed = 1, deleted_at = ? WHERE community_id = ? AND storenode_id = ?", deletedAt, communityID, storenodeID)
if err != nil {
return err
}
return nil
}
func (d *Database) upsert(n Storenode, tx *sql.Tx) error {
_, err := tx.Exec(`INSERT OR REPLACE INTO community_storenodes(
community_id,
storenode_id,
name,
address,
fleet,
version,
clock,
removed,
deleted_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
n.CommunityID,
n.StorenodeID,
n.Name,
n.Address,
n.Fleet,
n.Version,
n.Clock,
n.Removed,
n.DeletedAt,
)
if err != nil {
return err
}
return nil
}
func (d *Database) countByCommunity(communityID types.HexBytes, tx *sql.Tx) (int, error) {
var count int
err := tx.QueryRow(`SELECT COUNT(*) FROM community_storenodes WHERE community_id = ? AND removed = 0`, communityID).Scan(&count)
if err != nil {
return 0, err
}
return count, nil
}
func toStorenodes(rows *sql.Rows) ([]Storenode, error) {
var result []Storenode
for rows.Next() {
var m Storenode
if err := rows.Scan(
&m.CommunityID,
&m.StorenodeID,
&m.Name,
&m.Address,
&m.Fleet,
&m.Version,
&m.Clock,
&m.Removed,
&m.DeletedAt,
); err != nil {
return nil, err
}
result = append(result, m)
}
return result, nil
}
func find(n Storenode, nodes []Storenode) *Storenode {
for i, node := range nodes {
if node.StorenodeID == n.StorenodeID && bytes.Equal(node.CommunityID, n.CommunityID) {
return &nodes[i]
}
}
return nil
}

View File

@@ -0,0 +1,6 @@
// package storenodes provides functionality to work with community specific storenodes
// Current limitations:
// - we support only one storenode per community
// - we assume that the storenode is always active
// - we don't support a way to regularly check connection similar to the `messenger_mailserver_cycle.go`
package storenodes

View File

@@ -0,0 +1,69 @@
package storenodes
import (
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/protobuf"
"github.com/status-im/status-go/services/mailservers"
)
// Storenode is a struct that represents a storenode, it is very closely related to `mailservers.Mailserver`
type Storenode struct {
CommunityID types.HexBytes `json:"community_id"`
StorenodeID string `json:"storenode_id"`
Name string `json:"name"`
Address string `json:"address"`
Fleet string `json:"fleet"`
Version uint `json:"version"`
Clock uint64 `json:"-"` // used to sync
Removed bool `json:"-"`
DeletedAt int64 `json:"-"`
}
type Storenodes []Storenode
func (m Storenodes) ToProtobuf() []*protobuf.Storenode {
result := make([]*protobuf.Storenode, 0, len(m))
for _, n := range m {
result = append(result, &protobuf.Storenode{
CommunityId: n.CommunityID,
StorenodeId: n.StorenodeID,
Name: n.Name,
Address: n.Address,
Fleet: n.Fleet,
Version: uint32(n.Version),
Removed: n.Removed,
DeletedAt: n.DeletedAt,
})
}
return result
}
func FromProtobuf(storenodes []*protobuf.Storenode, clock uint64) Storenodes {
result := make(Storenodes, 0, len(storenodes))
for _, s := range storenodes {
result = append(result, Storenode{
CommunityID: s.CommunityId,
StorenodeID: s.StorenodeId,
Name: s.Name,
Address: s.Address,
Fleet: s.Fleet,
Version: uint(s.Version),
Removed: s.Removed,
DeletedAt: s.DeletedAt,
Clock: clock,
})
}
return result
}
func toMailserver(m Storenode) mailservers.Mailserver {
return mailservers.Mailserver{
ID: m.StorenodeID,
Name: m.Name,
Custom: true,
Address: m.Address,
Fleet: m.Fleet,
Version: m.Version,
}
}

View File

@@ -0,0 +1,114 @@
package storenodes
import (
"errors"
"sync"
"go.uber.org/zap"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/services/mailservers"
)
var (
ErrNotFound = errors.New("not found")
)
// CommunityStorenodes has methods to handle the storenodes for a community
type CommunityStorenodes struct {
storenodesByCommunityIDMutex *sync.RWMutex
storenodesByCommunityID map[string]storenodesData
storenodesDB *Database
logger *zap.Logger
}
func NewCommunityStorenodes(storenodesDB *Database, logger *zap.Logger) *CommunityStorenodes {
if logger == nil {
logger = zap.NewNop()
}
return &CommunityStorenodes{
storenodesByCommunityIDMutex: &sync.RWMutex{},
storenodesByCommunityID: make(map[string]storenodesData),
storenodesDB: storenodesDB,
logger: logger.With(zap.Namespace("CommunityStorenodes")),
}
}
type storenodesData struct {
storenodes []Storenode
}
// GetStorenodeByCommunnityID returns the active storenode for a community
func (m *CommunityStorenodes) GetStorenodeByCommunnityID(communityID string) (mailservers.Mailserver, error) {
m.storenodesByCommunityIDMutex.RLock()
defer m.storenodesByCommunityIDMutex.RUnlock()
msData, ok := m.storenodesByCommunityID[communityID]
if !ok || len(msData.storenodes) == 0 {
return mailservers.Mailserver{}, ErrNotFound
}
return toMailserver(msData.storenodes[0]), nil
}
func (m *CommunityStorenodes) IsCommunityStoreNode(id string) bool {
m.storenodesByCommunityIDMutex.RLock()
defer m.storenodesByCommunityIDMutex.RUnlock()
for _, data := range m.storenodesByCommunityID {
for _, snode := range data.storenodes {
if snode.StorenodeID == id {
return true
}
}
}
return false
}
func (m *CommunityStorenodes) HasStorenodeSetup(communityID string) bool {
m.storenodesByCommunityIDMutex.RLock()
defer m.storenodesByCommunityIDMutex.RUnlock()
msData, ok := m.storenodesByCommunityID[communityID]
return ok && len(msData.storenodes) > 0
}
// ReloadFromDB loads or reloads the mailservers from the database (on adding/deleting mailservers)
func (m *CommunityStorenodes) ReloadFromDB() error {
if m.storenodesDB == nil {
return nil
}
m.storenodesByCommunityIDMutex.Lock()
defer m.storenodesByCommunityIDMutex.Unlock()
dbNodes, err := m.storenodesDB.getAll()
if err != nil {
return err
}
// overwrite the in-memory storenodes
m.storenodesByCommunityID = make(map[string]storenodesData)
for _, node := range dbNodes {
communityID := node.CommunityID.String()
if _, ok := m.storenodesByCommunityID[communityID]; !ok {
m.storenodesByCommunityID[communityID] = storenodesData{}
}
data := m.storenodesByCommunityID[communityID]
data.storenodes = append(data.storenodes, node)
m.storenodesByCommunityID[communityID] = data
}
m.logger.Debug("loaded communities storenodes", zap.Int("count", len(dbNodes)))
return nil
}
func (m *CommunityStorenodes) UpdateStorenodesInDB(communityID types.HexBytes, snodes []Storenode, clock uint64) error {
if err := m.storenodesDB.syncSave(communityID, snodes, clock); err != nil {
return err
}
if err := m.ReloadFromDB(); err != nil {
return err
}
return nil
}
func (m *CommunityStorenodes) GetStorenodesFromDB(communityID types.HexBytes) ([]Storenode, error) {
return m.storenodesDB.getByCommunityID(communityID)
}