From f6a9836fdf8f4d911eb2baee82afd5ad8f368d46 Mon Sep 17 00:00:00 2001 From: Kale Blankenship Date: Tue, 26 Mar 2024 11:02:05 -0700 Subject: [PATCH] move nextMutex to Client to prevent blocking separate Clients Avoids a global mutex which could end up unexpectedly blocking a separate client. For example, if there were a client with few messages and a client with many messages, the client with few could hold the lock waiting for a token blocking the client with many from receiving. --- xmpp.go | 50 ++++++++++++++++++++++++-------------------------- 1 file changed, 24 insertions(+), 26 deletions(-) diff --git a/xmpp.go b/xmpp.go index f2eac8c..7951a22 100644 --- a/xmpp.go +++ b/xmpp.go @@ -61,9 +61,6 @@ var DefaultConfig = &tls.Config{} // DebugWriter is the writer used to write debugging output to. var DebugWriter io.Writer = os.Stderr -// Mutex to prevent multiple access to xml.Decoder -var nextMutex sync.Mutex - // Cookie is a unique XMPP session identifier type Cookie uint64 @@ -80,6 +77,7 @@ type Client struct { conn net.Conn // connection to server jid string // Jabber ID for our connection domain string + nextMutex sync.Mutex // Mutex to prevent multiple access to xml.Decoder p *xml.Decoder stanzaWriter io.Writer Mechanism string @@ -341,7 +339,7 @@ func (c *Client) Close() error { case <-time.After(10 * time.Second): break default: - ee, err := nextEnd(c.p) + ee, err := c.nextEnd() // If the server already closed the stream it is // likely to receive an error when trying to parse // the stream. Therefore the connection is also closed @@ -540,7 +538,7 @@ func (c *Client) init(o *Options) error { fmt.Fprintf(c.stanzaWriter, "%s\n", nsSASL, mechanism, base64.StdEncoding.EncodeToString([]byte(clientFirstMessage))) var sfm string - _, val, err := next(c.p) + _, val, err := c.next() if err != nil { return err } @@ -706,7 +704,7 @@ func (c *Client) init(o *Options) error { return fmt.Errorf("no viable authentication method available: %v", f.Mechanisms.Mechanism) } // Next message should be either success or failure. - name, val, err := next(c.p) + name, val, err := c.next() if err != nil { return err } @@ -757,7 +755,7 @@ func (c *Client) init(o *Options) error { } else { fmt.Fprintf(c.stanzaWriter, "%s\n", cookie, nsBind, o.Resource) } - _, val, err = next(c.p) + _, val, err = c.next() if err != nil { return err } @@ -854,7 +852,7 @@ func (c *Client) startStream(o *Options, domain string) (*streamFeatures, error) } // We expect the server to start a . - se, err := nextStart(c.p) + se, err := c.nextStart() if err != nil { return nil, err } @@ -923,7 +921,7 @@ type IQ struct { // Recv waits to receive the next XMPP stanza. func (c *Client) Recv() (stanza interface{}, err error) { for { - _, val, err := next(c.p) + _, val, err := c.next() if err != nil { return Chat{}, err } @@ -1462,57 +1460,57 @@ type rosterItem struct { } // Scan XML token stream to find next StartElement. -func nextStart(p *xml.Decoder) (xml.StartElement, error) { +func (c *Client) nextStart() (xml.StartElement, error) { for { - nextMutex.Lock() - t, err := p.Token() + c.nextMutex.Lock() + t, err := c.p.Token() if err != nil || t == nil { - nextMutex.Unlock() + c.nextMutex.Unlock() return xml.StartElement{}, err } switch t := t.(type) { case xml.StartElement: - nextMutex.Unlock() + c.nextMutex.Unlock() return t, nil // Also check for stream end element and stop waiting // for new start elements if we received a closing stream // element. case xml.EndElement: if t.Name.Local == "stream" { - nextMutex.Unlock() + c.nextMutex.Unlock() return xml.StartElement{}, nil } } - nextMutex.Unlock() + c.nextMutex.Unlock() } } // Scan XML token stream to find next EndElement -func nextEnd(p *xml.Decoder) (xml.EndElement, error) { - p.Strict = false +func (c *Client) nextEnd() (xml.EndElement, error) { + c.p.Strict = false for { - nextMutex.Lock() - to, err := p.RawToken() + c.nextMutex.Lock() + to, err := c.p.RawToken() if err != nil || to == nil { - nextMutex.Unlock() + c.nextMutex.Unlock() return xml.EndElement{}, err } t := xml.CopyToken(to) switch t := t.(type) { case xml.EndElement: - nextMutex.Unlock() + c.nextMutex.Unlock() return t, nil } - nextMutex.Unlock() + c.nextMutex.Unlock() } } // Scan XML token stream for next element and save into val. // If val == nil, allocate new element based on proto map. // Either way, return val. -func next(p *xml.Decoder) (xml.Name, interface{}, error) { +func (c *Client) next() (xml.Name, interface{}, error) { // Read start element to find out what type we want. - se, err := nextStart(p) + se, err := c.nextStart() if err != nil { return xml.Name{}, nil, err } @@ -1560,7 +1558,7 @@ func next(p *xml.Decoder) (xml.Name, interface{}, error) { } // Unmarshal into that storage. - if err = p.DecodeElement(nv, &se); err != nil { + if err = c.p.DecodeElement(nv, &se); err != nil { return xml.Name{}, nil, err }