XEP-0325: Don’t use threading

This commit is contained in:
mathieui 2015-02-28 17:39:32 +01:00
parent 997928de91
commit 93ce318259
No known key found for this signature in database
GPG Key ID: C59F84CEEFD616E3

View File

@ -10,8 +10,9 @@
import logging import logging
import time import time
from threading import Thread, Timer, Lock
from slixmpp import asyncio
from functools import partial
from slixmpp.xmlstream import JID from slixmpp.xmlstream import JID
from slixmpp.xmlstream.handler import Callback from slixmpp.xmlstream.handler import Callback
from slixmpp.xmlstream.matcher import StanzaPath from slixmpp.xmlstream.matcher import StanzaPath
@ -40,10 +41,6 @@ class XEP_0325(BasePlugin):
Also see <http://xmpp.org/extensions/xep-0325.html> Also see <http://xmpp.org/extensions/xep-0325.html>
Configuration Values:
threaded -- Indicates if communication with sensors should be threaded.
Defaults to True.
Events: Events:
Sensor side Sensor side
----------- -----------
@ -58,8 +55,6 @@ class XEP_0325(BasePlugin):
control request, type error control request, type error
Attributes: Attributes:
threaded -- Indicates if command events should be threaded.
Defaults to True.
sessions -- A dictionary or equivalent backend mapping sessions -- A dictionary or equivalent backend mapping
session IDs to dictionaries containing data session IDs to dictionaries containing data
relevant to a request's session. This dictionary is used relevant to a request's session. This dictionary is used
@ -107,7 +102,6 @@ class XEP_0325(BasePlugin):
default_config = { default_config = {
'threaded': True
# 'session_db': None # 'session_db': None
} }
@ -139,7 +133,6 @@ class XEP_0325(BasePlugin):
self.sessions = {} self.sessions = {}
self.last_seqnr = 0 self.last_seqnr = 0
self.seqnr_lock = Lock()
## For testning only ## For testning only
self.test_authenticated_from = "" self.test_authenticated_from = ""
@ -198,9 +191,7 @@ class XEP_0325(BasePlugin):
def _get_new_seqnr(self): def _get_new_seqnr(self):
""" Returns a unique sequence number (unique across threads) """ """ Returns a unique sequence number (unique across threads) """
self.seqnr_lock.acquire()
self.last_seqnr = self.last_seqnr + 1 self.last_seqnr = self.last_seqnr + 1
self.seqnr_lock.release()
return str(self.last_seqnr) return str(self.last_seqnr)
def _handle_set_req(self, iq): def _handle_set_req(self, iq):
@ -264,14 +255,7 @@ class XEP_0325(BasePlugin):
self.sessions[session]["reply"] = True self.sessions[session]["reply"] = True
self.sessions[session]["node_list"] = process_nodes self.sessions[session]["node_list"] = process_nodes
if self.threaded: self._node_request(session, process_fields)
#print("starting thread")
tr_req = Thread(target=self._threaded_node_request, args=(session, process_fields))
tr_req.start()
#print("started thread")
else:
self._threaded_node_request(session, process_fields)
else: else:
iq = iq.reply() iq = iq.reply()
iq['type'] = 'error' iq['type'] = 'error'
@ -332,16 +316,10 @@ class XEP_0325(BasePlugin):
self.sessions[session]["reply"] = False self.sessions[session]["reply"] = False
self.sessions[session]["node_list"] = process_nodes self.sessions[session]["node_list"] = process_nodes
if self.threaded: self._node_request(session, process_fields)
#print("starting thread")
tr_req = Thread(target=self._threaded_node_request, args=(session, process_fields))
tr_req.start()
#print("started thread")
else:
self._threaded_node_request(session, process_fields)
def _threaded_node_request(self, session, process_fields): def _node_request(self, session, process_fields):
""" """
Helper function to handle the device control in a separate thread. Helper function to handle the device control in a separate thread.
@ -354,9 +332,8 @@ class XEP_0325(BasePlugin):
self.sessions[session]["nodeDone"][node] = False self.sessions[session]["nodeDone"][node] = False
for node in self.sessions[session]["node_list"]: for node in self.sessions[session]["node_list"]:
timer = Timer(self.nodes[node]['commTimeout'], self._event_comm_timeout, args=(session, node)) timer = asyncio.get_event_loop().call_later(self.nodes[node]['commTimeout'], partial(self._event_comm_timeout, args=(session, node)))
self.sessions[session]["commTimers"][node] = timer self.sessions[session]["commTimers"][node] = timer
timer.start()
self.nodes[node]['device'].set_control_fields(process_fields, session=session, callback=self._device_set_command_callback) self.nodes[node]['device'].set_control_fields(process_fields, session=session, callback=self._device_set_command_callback)
def _event_comm_timeout(self, session, nodeId): def _event_comm_timeout(self, session, nodeId):