Revert "cleanup semicolons, whitespace and mutable default arguments"

This reverts commit 7265682a4d.
This commit is contained in:
Robin Gloster
2014-08-18 15:15:14 +02:00
parent 7265682a4d
commit 3dd379cdf1
30 changed files with 1067 additions and 1073 deletions

View File

@@ -13,18 +13,15 @@ import logging
class Device(object):
"""
Example implementation of a device readout object.
Example implementation of a device readout object.
Is registered in the XEP_0323.register_node call
The device object may be any custom implementation to support
The device object may be any custom implementation to support
specific devices, but it must implement the functions:
has_field
request_fields
"""
def __init__(self, nodeId, fields=None):
if not fields:
fields = {}
def __init__(self, nodeId, fields={}):
self.nodeId = nodeId
self.fields = fields # see fields described below
# {'type':'numeric',
@@ -41,19 +38,19 @@ class Device(object):
Returns true if the supplied field name exists in this device.
Arguments:
field -- The field name
field -- The field name
"""
if field in self.fields.keys():
return True
return False
return True;
return False;
def refresh(self, fields):
"""
override method to do the refresh work
refresh values from hardware or other
"""
pass
def request_fields(self, fields, flags, session, callback):
"""
@@ -68,7 +65,7 @@ class Device(object):
Formatted as a dictionary like { "flag name": "flag value" ... }
session -- Session id, only used in the callback as identifier
callback -- Callback function to call when data is available.
The callback function must support the following arguments:
session -- Session id, as supplied in the request_fields call
@@ -76,11 +73,11 @@ class Device(object):
result -- The current result status of the readout. Valid values are:
"error" - Readout failed.
"fields" - Contains readout data.
"done" - Indicates that the readout is complete. May contain
"done" - Indicates that the readout is complete. May contain
readout data.
timestamp_block -- [optional] Only applies when result != "error"
timestamp_block -- [optional] Only applies when result != "error"
The readout data. Structured as a dictionary:
{
{
timestamp: timestamp for this datablock,
fields: list of field dictionary (one per readout field).
readout field dictionary format:
@@ -92,10 +89,10 @@ class Device(object):
dataType: The datatype of the field. Only applies to type enum.
flags: [optional] data classifier flags for the field, e.g. momentary
Formatted as a dictionary like { "flag name": "flag value" ... }
}
}
}
error_msg -- [optional] Only applies when result == "error".
Error details when a request failed.
Error details when a request failed.
"""
logging.debug("request_fields called looking for fields %s",fields)
@@ -104,10 +101,10 @@ class Device(object):
for f in fields:
if f not in self.fields.keys():
self._send_reject(session, callback)
return False
return False;
else:
# Request all fields
fields = self.fields.keys()
fields = self.fields.keys();
# Refresh data from device
@@ -117,27 +114,27 @@ class Device(object):
if "momentary" in flags and flags['momentary'] == "true" or \
"all" in flags and flags['all'] == "true":
ts_block = {}
timestamp = ""
ts_block = {};
timestamp = "";
if len(self.momentary_timestamp) > 0:
timestamp = self.momentary_timestamp
timestamp = self.momentary_timestamp;
else:
timestamp = self._get_timestamp()
timestamp = self._get_timestamp();
field_block = []
field_block = [];
for f in self.momentary_data:
if f in fields:
field_block.append({"name": f,
"type": self.fields[f]["type"],
field_block.append({"name": f,
"type": self.fields[f]["type"],
"unit": self.fields[f]["unit"],
"dataType": self.fields[f]["dataType"],
"value": self.momentary_data[f]["value"],
"flags": self.momentary_data[f]["flags"]})
ts_block["timestamp"] = timestamp
ts_block["fields"] = field_block
"value": self.momentary_data[f]["value"],
"flags": self.momentary_data[f]["flags"]});
ts_block["timestamp"] = timestamp;
ts_block["fields"] = field_block;
callback(session, result="done", nodeId=self.nodeId, timestamp_block=ts_block)
callback(session, result="done", nodeId=self.nodeId, timestamp_block=ts_block);
return
from_flag = self._datetime_flag_parser(flags, 'from')
@@ -145,36 +142,36 @@ class Device(object):
for ts in sorted(self.timestamp_data.keys()):
tsdt = datetime.datetime.strptime(ts, "%Y-%m-%dT%H:%M:%S")
if not from_flag is None:
if tsdt < from_flag:
if not from_flag is None:
if tsdt < from_flag:
#print (str(tsdt) + " < " + str(from_flag))
continue
if not to_flag is None:
if tsdt > to_flag:
if not to_flag is None:
if tsdt > to_flag:
#print (str(tsdt) + " > " + str(to_flag))
continue
ts_block = {}
field_block = []
ts_block = {};
field_block = [];
for f in self.timestamp_data[ts]:
if f in fields:
field_block.append({"name": f,
"type": self.fields[f]["type"],
field_block.append({"name": f,
"type": self.fields[f]["type"],
"unit": self.fields[f]["unit"],
"dataType": self.fields[f]["dataType"],
"value": self.timestamp_data[ts][f]["value"],
"flags": self.timestamp_data[ts][f]["flags"]})
"value": self.timestamp_data[ts][f]["value"],
"flags": self.timestamp_data[ts][f]["flags"]});
ts_block["timestamp"] = ts
ts_block["fields"] = field_block
callback(session, result="fields", nodeId=self.nodeId, timestamp_block=ts_block)
callback(session, result="done", nodeId=self.nodeId, timestamp_block=None)
ts_block["timestamp"] = ts;
ts_block["fields"] = field_block;
callback(session, result="fields", nodeId=self.nodeId, timestamp_block=ts_block);
callback(session, result="done", nodeId=self.nodeId, timestamp_block=None);
def _datetime_flag_parser(self, flags, flagname):
if not flagname in flags:
return None
dt = None
try:
dt = datetime.datetime.strptime(flags[flagname], "%Y-%m-%dT%H:%M:%S")
@@ -198,7 +195,7 @@ class Device(object):
session -- Session id, see definition in request_fields function
callback -- Callback function, see definition in request_fields function
"""
callback(session, result="error", nodeId=self.nodeId, timestamp_block=None, error_msg="Reject")
callback(session, result="error", nodeId=self.nodeId, timestamp_block=None, error_msg="Reject");
def _add_field(self, name, typename, unit=None, dataType=None):
"""
@@ -210,7 +207,7 @@ class Device(object):
unit -- [optional] only applies to "numeric". Unit for the field.
dataType -- [optional] only applies to "enum". Datatype for the field.
"""
self.fields[name] = {"type": typename, "unit": unit, "dataType": dataType}
self.fields[name] = {"type": typename, "unit": unit, "dataType": dataType};
def _add_field_timestamp_data(self, name, timestamp, value, flags=None):
"""
@@ -224,12 +221,12 @@ class Device(object):
Formatted as a dictionary like { "flag name": "flag value" ... }
"""
if not name in self.fields.keys():
return False
return False;
if not timestamp in self.timestamp_data:
self.timestamp_data[timestamp] = {}
self.timestamp_data[timestamp] = {};
self.timestamp_data[timestamp][name] = {"value": value, "flags": flags}
return True
self.timestamp_data[timestamp][name] = {"value": value, "flags": flags};
return True;
def _add_field_momentary_data(self, name, value, flags=None):
"""
@@ -242,17 +239,17 @@ class Device(object):
Formatted as a dictionary like { "flag name": "flag value" ... }
"""
if name not in self.fields:
return False
return False;
if flags is None:
flags = {}
flags = {};
flags["momentary"] = "true"
self.momentary_data[name] = {"value": value, "flags": flags}
return True
self.momentary_data[name] = {"value": value, "flags": flags};
return True;
def _set_momentary_timestamp(self, timestamp):
"""
This function is only for unit testing to produce predictable results.
"""
self.momentary_timestamp = timestamp
self.momentary_timestamp = timestamp;

View File

@@ -29,12 +29,12 @@ log = logging.getLogger(__name__)
class XEP_0323(BasePlugin):
"""
XEP-0323: IoT Sensor Data
XEP-0323: IoT Sensor Data
This XEP provides the underlying architecture, basic operations and data
structures for sensor data communication over XMPP networks. It includes
a hardware abstraction model, removing any technical detail implemented
a hardware abstraction model, removing any technical detail implemented
in underlying technologies.
Also see <http://xmpp.org/extensions/xep-0323.html>
@@ -55,10 +55,10 @@ class XEP_0323(BasePlugin):
Sensordata Event:Rejected -- Received a reject from sensor for a request
Sensordata Event:Cancelled -- Received a cancel confirm from sensor
Sensordata Event:Fields -- Received fields from sensor for a request
This may be triggered multiple times since
This may be triggered multiple times since
the sensor can split up its response in
multiple messages.
Sensordata Event:Failure -- Received a failure indication from sensor
Sensordata Event:Failure -- Received a failure indication from sensor
for a request. Typically a comm timeout.
Attributes:
@@ -69,7 +69,7 @@ class XEP_0323(BasePlugin):
relevant to a request's session. This dictionary is used
both by the client and sensor side. On client side, seqnr
is used as key, while on sensor side, a session_id is used
as key. This ensures that the two will not collide, so
as key. This ensures that the two will not collide, so
one instance can be both client and sensor.
Sensor side
-----------
@@ -89,12 +89,12 @@ class XEP_0323(BasePlugin):
Sensor side
-----------
register_node -- Register a sensor as available from this XMPP
register_node -- Register a sensor as available from this XMPP
instance.
Client side
-----------
request_data -- Initiates a request for data from one or more
request_data -- Initiates a request for data from one or more
sensors. Non-blocking, a callback function will
be called when data is available.
@@ -102,7 +102,7 @@ class XEP_0323(BasePlugin):
name = 'xep_0323'
description = 'XEP-0323 Internet of Things - Sensor Data'
dependencies = set(['xep_0030'])
dependencies = set(['xep_0030'])
stanza = stanza
@@ -155,11 +155,11 @@ class XEP_0323(BasePlugin):
self._handle_event_started))
# Server side dicts
self.nodes = {}
self.sessions = {}
self.nodes = {};
self.sessions = {};
self.last_seqnr = 0
self.seqnr_lock = Lock()
self.last_seqnr = 0;
self.seqnr_lock = Lock();
## For testning only
self.test_authenticated_from = ""
@@ -182,7 +182,7 @@ class XEP_0323(BasePlugin):
def plugin_end(self):
""" Stop the XEP-0323 plugin """
self.sessions.clear()
self.sessions.clear();
self.xmpp.remove_handler('Sensordata Event:Req')
self.xmpp.remove_handler('Sensordata Event:Accepted')
self.xmpp.remove_handler('Sensordata Event:Rejected')
@@ -198,9 +198,9 @@ class XEP_0323(BasePlugin):
def register_node(self, nodeId, device, commTimeout, sourceId=None, cacheType=None):
"""
Register a sensor/device as available for serving of data through this XMPP
instance.
instance.
The device object may by any custom implementation to support
The device object may by any custom implementation to support
specific devices, but it must implement the functions:
has_field
request_fields
@@ -212,25 +212,25 @@ class XEP_0323(BasePlugin):
commTimeout -- Time in seconds to wait between each callback from device during
a data readout. Float.
sourceId -- [optional] identifying the data source controlling the device
cacheType -- [optional] narrowing down the search to a specific kind of node
cacheType -- [optional] narrowing down the search to a specific kind of node
"""
self.nodes[nodeId] = {"device": device,
self.nodes[nodeId] = {"device": device,
"commTimeout": commTimeout,
"sourceId": sourceId,
"cacheType": cacheType}
"sourceId": sourceId,
"cacheType": cacheType};
def _set_authenticated(self, auth=''):
""" Internal testing function """
self.test_authenticated_from = auth
self.test_authenticated_from = auth;
def _handle_event_req(self, iq):
"""
Event handler for reception of an Iq with req - this is a request.
Verifies that
Verifies that
- all the requested nodes are available
- at least one of the requested fields is available from at least
- at least one of the requested fields is available from at least
one of the nodes
If the request passes verification, an accept response is sent, and
@@ -238,42 +238,42 @@ class XEP_0323(BasePlugin):
If the verification fails, a reject message is sent.
"""
seqnr = iq['req']['seqnr']
error_msg = ''
req_ok = True
seqnr = iq['req']['seqnr'];
error_msg = '';
req_ok = True;
# Authentication
if len(self.test_authenticated_from) > 0 and not iq['from'] == self.test_authenticated_from:
# Invalid authentication
req_ok = False
error_msg = "Access denied"
req_ok = False;
error_msg = "Access denied";
# Nodes
process_nodes = []
process_nodes = [];
if len(iq['req']['nodes']) > 0:
for n in iq['req']['nodes']:
if not n['nodeId'] in self.nodes:
req_ok = False
error_msg = "Invalid nodeId " + n['nodeId']
process_nodes = [n['nodeId'] for n in iq['req']['nodes']]
req_ok = False;
error_msg = "Invalid nodeId " + n['nodeId'];
process_nodes = [n['nodeId'] for n in iq['req']['nodes']];
else:
process_nodes = self.nodes.keys()
process_nodes = self.nodes.keys();
# Fields - if we just find one we are happy, otherwise we reject
process_fields = []
process_fields = [];
if len(iq['req']['fields']) > 0:
found = False
for f in iq['req']['fields']:
for node in self.nodes:
if self.nodes[node]["device"].has_field(f['name']):
found = True
break
found = True;
break;
if not found:
req_ok = False
error_msg = "Invalid field " + f['name']
process_fields = [f['name'] for n in iq['req']['fields']]
req_ok = False;
error_msg = "Invalid field " + f['name'];
process_fields = [f['name'] for n in iq['req']['fields']];
req_flags = iq['req']._get_flags()
req_flags = iq['req']._get_flags();
request_delay_sec = None
if 'when' in req_flags:
@@ -283,7 +283,7 @@ class XEP_0323(BasePlugin):
try:
dt = datetime.datetime.strptime(req_flags['when'], "%Y-%m-%dT%H:%M:%S")
except ValueError:
req_ok = False
req_ok = False;
error_msg = "Invalid datetime in 'when' flag, please use ISO format (i.e. 2013-04-05T15:00:03)."
if not dt is None:
@@ -292,30 +292,30 @@ class XEP_0323(BasePlugin):
dtdiff = dt - dtnow
request_delay_sec = dtdiff.seconds + dtdiff.days * 24 * 3600
if request_delay_sec <= 0:
req_ok = False
error_msg = "Invalid datetime in 'when' flag, cannot set a time in the past. Current time: " + dtnow.isoformat()
req_ok = False;
error_msg = "Invalid datetime in 'when' flag, cannot set a time in the past. Current time: " + dtnow.isoformat();
if req_ok:
session = self._new_session()
self.sessions[session] = {"from": iq['from'], "to": iq['to'], "seqnr": seqnr}
self.sessions[session]["commTimers"] = {}
self.sessions[session]["nodeDone"] = {}
session = self._new_session();
self.sessions[session] = {"from": iq['from'], "to": iq['to'], "seqnr": seqnr};
self.sessions[session]["commTimers"] = {};
self.sessions[session]["nodeDone"] = {};
#print("added session: " + str(self.sessions))
iq.reply()
iq['accepted']['seqnr'] = seqnr
iq.reply();
iq['accepted']['seqnr'] = seqnr;
if not request_delay_sec is None:
iq['accepted']['queued'] = "true"
iq.send(block=False)
iq.send(block=False);
self.sessions[session]["node_list"] = process_nodes
self.sessions[session]["node_list"] = process_nodes;
if not request_delay_sec is None:
# Delay request to requested time
timer = Timer(request_delay_sec, self._event_delayed_req, args=(session, process_fields, req_flags))
self.sessions[session]["commTimers"]["delaytimer"] = timer
timer.start()
self.sessions[session]["commTimers"]["delaytimer"] = timer;
timer.start();
return
if self.threaded:
@@ -324,19 +324,19 @@ class XEP_0323(BasePlugin):
tr_req.start()
#print("started thread")
else:
self._threaded_node_request(session, process_fields, req_flags)
self._threaded_node_request(session, process_fields, req_flags);
else:
iq.reply()
iq['type'] = 'error'
iq['rejected']['seqnr'] = seqnr
iq['rejected']['error'] = error_msg
iq.send(block=False)
iq.reply();
iq['type'] = 'error';
iq['rejected']['seqnr'] = seqnr;
iq['rejected']['error'] = error_msg;
iq.send(block=False);
def _threaded_node_request(self, session, process_fields, flags):
"""
"""
Helper function to handle the device readouts in a separate thread.
Arguments:
session -- The request session id
process_fields -- The fields to request from the devices
@@ -344,41 +344,41 @@ class XEP_0323(BasePlugin):
Formatted as a dictionary like { "flag name": "flag value" ... }
"""
for node in self.sessions[session]["node_list"]:
self.sessions[session]["nodeDone"][node] = False
self.sessions[session]["nodeDone"][node] = False;
for node in self.sessions[session]["node_list"]:
timer = TimerReset(self.nodes[node]['commTimeout'], self._event_comm_timeout, args=(session, node))
self.sessions[session]["commTimers"][node] = timer
timer = TimerReset(self.nodes[node]['commTimeout'], self._event_comm_timeout, args=(session, node));
self.sessions[session]["commTimers"][node] = timer;
#print("Starting timer " + str(timer) + ", timeout: " + str(self.nodes[node]['commTimeout']))
timer.start()
self.nodes[node]['device'].request_fields(process_fields, flags=flags, session=session, callback=self._device_field_request_callback)
timer.start();
self.nodes[node]['device'].request_fields(process_fields, flags=flags, session=session, callback=self._device_field_request_callback);
def _event_comm_timeout(self, session, nodeId):
"""
"""
Triggered if any of the readout operations timeout.
Sends a failure message back to the client, stops communicating
with the failing device.
Arguments:
session -- The request session id
nodeId -- The id of the device which timed out
"""
msg = self.xmpp.Message()
msg['from'] = self.sessions[session]['to']
msg['to'] = self.sessions[session]['from']
msg['failure']['seqnr'] = self.sessions[session]['seqnr']
msg['failure']['error']['text'] = "Timeout"
msg['failure']['error']['nodeId'] = nodeId
msg['failure']['error']['timestamp'] = datetime.datetime.now().replace(microsecond=0).isoformat()
msg = self.xmpp.Message();
msg['from'] = self.sessions[session]['to'];
msg['to'] = self.sessions[session]['from'];
msg['failure']['seqnr'] = self.sessions[session]['seqnr'];
msg['failure']['error']['text'] = "Timeout";
msg['failure']['error']['nodeId'] = nodeId;
msg['failure']['error']['timestamp'] = datetime.datetime.now().replace(microsecond=0).isoformat();
# Drop communication with this device and check if we are done
self.sessions[session]["nodeDone"][nodeId] = True
self.sessions[session]["nodeDone"][nodeId] = True;
if (self._all_nodes_done(session)):
msg['failure']['done'] = 'true'
msg.send()
msg['failure']['done'] = 'true';
msg.send();
# The session is complete, delete it
#print("del session " + session + " due to timeout")
del self.sessions[session]
del self.sessions[session];
def _event_delayed_req(self, session, process_fields, req_flags):
"""
@@ -390,47 +390,47 @@ class XEP_0323(BasePlugin):
flags -- [optional] flags to pass to the devices, e.g. momentary
Formatted as a dictionary like { "flag name": "flag value" ... }
"""
msg = self.xmpp.Message()
msg['from'] = self.sessions[session]['to']
msg['to'] = self.sessions[session]['from']
msg['started']['seqnr'] = self.sessions[session]['seqnr']
msg.send()
msg = self.xmpp.Message();
msg['from'] = self.sessions[session]['to'];
msg['to'] = self.sessions[session]['from'];
msg['started']['seqnr'] = self.sessions[session]['seqnr'];
msg.send();
if self.threaded:
tr_req = Thread(target=self._threaded_node_request, args=(session, process_fields, req_flags))
tr_req.start()
else:
self._threaded_node_request(session, process_fields, req_flags)
self._threaded_node_request(session, process_fields, req_flags);
def _all_nodes_done(self, session):
"""
"""
Checks wheter all devices are done replying to the readout.
Arguments:
session -- The request session id
"""
for n in self.sessions[session]["nodeDone"]:
if not self.sessions[session]["nodeDone"][n]:
return False
return True
return False;
return True;
def _device_field_request_callback(self, session, nodeId, result, timestamp_block, error_msg=None):
"""
"""
Callback function called by the devices when they have any additional data.
Composes a message with the data and sends it back to the client, and resets
Composes a message with the data and sends it back to the client, and resets
the timeout timer for the device.
Arguments:
session -- The request session id
nodeId -- The device id which initiated the callback
result -- The current result status of the readout. Valid values are:
"error" - Readout failed.
"fields" - Contains readout data.
"done" - Indicates that the readout is complete. May contain
"done" - Indicates that the readout is complete. May contain
readout data.
timestamp_block -- [optional] Only applies when result != "error"
timestamp_block -- [optional] Only applies when result != "error"
The readout data. Structured as a dictionary:
{
{
timestamp: timestamp for this datablock,
fields: list of field dictionary (one per readout field).
readout field dictionary format:
@@ -442,7 +442,7 @@ class XEP_0323(BasePlugin):
dataType: The datatype of the field. Only applies to type enum.
flags: [optional] data classifier flags for the field, e.g. momentary
Formatted as a dictionary like { "flag name": "flag value" ... }
}
}
}
error_msg -- [optional] Only applies when result == "error".
Error details when a request failed.
@@ -452,99 +452,99 @@ class XEP_0323(BasePlugin):
return
if result == "error":
self.sessions[session]["commTimers"][nodeId].cancel()
self.sessions[session]["commTimers"][nodeId].cancel();
msg = self.xmpp.Message()
msg['from'] = self.sessions[session]['to']
msg['to'] = self.sessions[session]['from']
msg['failure']['seqnr'] = self.sessions[session]['seqnr']
msg['failure']['error']['text'] = error_msg
msg['failure']['error']['nodeId'] = nodeId
msg['failure']['error']['timestamp'] = datetime.datetime.now().replace(microsecond=0).isoformat()
msg = self.xmpp.Message();
msg['from'] = self.sessions[session]['to'];
msg['to'] = self.sessions[session]['from'];
msg['failure']['seqnr'] = self.sessions[session]['seqnr'];
msg['failure']['error']['text'] = error_msg;
msg['failure']['error']['nodeId'] = nodeId;
msg['failure']['error']['timestamp'] = datetime.datetime.now().replace(microsecond=0).isoformat();
# Drop communication with this device and check if we are done
self.sessions[session]["nodeDone"][nodeId] = True
self.sessions[session]["nodeDone"][nodeId] = True;
if (self._all_nodes_done(session)):
msg['failure']['done'] = 'true'
msg['failure']['done'] = 'true';
# The session is complete, delete it
# print("del session " + session + " due to error")
del self.sessions[session]
msg.send()
del self.sessions[session];
msg.send();
else:
msg = self.xmpp.Message()
msg['from'] = self.sessions[session]['to']
msg['to'] = self.sessions[session]['from']
msg['fields']['seqnr'] = self.sessions[session]['seqnr']
msg = self.xmpp.Message();
msg['from'] = self.sessions[session]['to'];
msg['to'] = self.sessions[session]['from'];
msg['fields']['seqnr'] = self.sessions[session]['seqnr'];
if timestamp_block is not None and len(timestamp_block) > 0:
node = msg['fields'].add_node(nodeId)
ts = node.add_timestamp(timestamp_block["timestamp"])
node = msg['fields'].add_node(nodeId);
ts = node.add_timestamp(timestamp_block["timestamp"]);
for f in timestamp_block["fields"]:
data = ts.add_data( typename=f['type'],
name=f['name'],
value=f['value'],
unit=f['unit'],
dataType=f['dataType'],
flags=f['flags'])
data = ts.add_data( typename=f['type'],
name=f['name'],
value=f['value'],
unit=f['unit'],
dataType=f['dataType'],
flags=f['flags']);
if result == "done":
self.sessions[session]["commTimers"][nodeId].cancel()
self.sessions[session]["nodeDone"][nodeId] = True
msg['fields']['done'] = 'true'
self.sessions[session]["commTimers"][nodeId].cancel();
self.sessions[session]["nodeDone"][nodeId] = True;
msg['fields']['done'] = 'true';
if (self._all_nodes_done(session)):
# The session is complete, delete it
# print("del session " + session + " due to complete")
del self.sessions[session]
del self.sessions[session];
else:
# Restart comm timer
self.sessions[session]["commTimers"][nodeId].reset()
self.sessions[session]["commTimers"][nodeId].reset();
msg.send()
msg.send();
def _handle_event_cancel(self, iq):
""" Received Iq with cancel - this is a cancel request.
""" Received Iq with cancel - this is a cancel request.
Delete the session and confirm. """
seqnr = iq['cancel']['seqnr']
seqnr = iq['cancel']['seqnr'];
# Find the session
for s in self.sessions:
if self.sessions[s]['from'] == iq['from'] and self.sessions[s]['to'] == iq['to'] and self.sessions[s]['seqnr'] == seqnr:
# found it. Cancel all timers
for n in self.sessions[s]["commTimers"]:
self.sessions[s]["commTimers"][n].cancel()
self.sessions[s]["commTimers"][n].cancel();
# Confirm
iq.reply()
iq['type'] = 'result'
iq['cancelled']['seqnr'] = seqnr
iq.send(block=False)
iq.reply();
iq['type'] = 'result';
iq['cancelled']['seqnr'] = seqnr;
iq.send(block=False);
# Delete session
del self.sessions[s]
return
# Could not find session, send reject
iq.reply()
iq['type'] = 'error'
iq['rejected']['seqnr'] = seqnr
iq['rejected']['error'] = "Cancel request received, no matching request is active."
iq.send(block=False)
iq.reply();
iq['type'] = 'error';
iq['rejected']['seqnr'] = seqnr;
iq['rejected']['error'] = "Cancel request received, no matching request is active.";
iq.send(block=False);
# =================================================================
# =================================================================
# Client side (data retriever) API
def request_data(self, from_jid, to_jid, callback, nodeIds=None, fields=None, flags=None):
"""
"""
Called on the client side to initiade a data readout.
Composes a message with the request and sends it to the device(s).
Does not block, the callback will be called when data is available.
Arguments:
from_jid -- The jid of the requester
to_jid -- The jid of the device(s)
callback -- The callback function to call when data is availble.
callback -- The callback function to call when data is availble.
The callback function must support the following arguments:
from_jid -- The jid of the responding device(s)
@@ -565,7 +565,7 @@ class XEP_0323(BasePlugin):
The timestamp of data in this callback. One callback will only
contain data from one timestamp.
fields -- [optional] Mandatory when result == "fields".
List of field dictionaries representing the readout data.
List of field dictionaries representing the readout data.
Dictionary format:
{
typename: The field type (numeric, boolean, dateTime, timeSpan, string, enum)
@@ -575,11 +575,11 @@ class XEP_0323(BasePlugin):
dataType: The datatype of the field. Only applies to type enum.
flags: [optional] data classifier flags for the field, e.g. momentary.
Formatted as a dictionary like { "flag name": "flag value" ... }
}
}
error_msg -- [optional] Mandatory when result == "rejected" or "failure".
Details about why the request is rejected or failed.
"rejected" means that the request is stopped, but note that the
Details about why the request is rejected or failed.
"rejected" means that the request is stopped, but note that the
request will continue even after a "failure". "failure" only means
that communication was stopped to that specific device, other
device(s) (if any) will continue their readout.
@@ -593,131 +593,131 @@ class XEP_0323(BasePlugin):
session -- Session identifier. Client can use this as a reference to cancel
the request.
"""
iq = self.xmpp.Iq()
iq['from'] = from_jid
iq['to'] = to_jid
iq['type'] = "get"
seqnr = self._get_new_seqnr()
iq['id'] = seqnr
iq['req']['seqnr'] = seqnr
iq = self.xmpp.Iq();
iq['from'] = from_jid;
iq['to'] = to_jid;
iq['type'] = "get";
seqnr = self._get_new_seqnr();
iq['id'] = seqnr;
iq['req']['seqnr'] = seqnr;
if nodeIds is not None:
for nodeId in nodeIds:
iq['req'].add_node(nodeId)
iq['req'].add_node(nodeId);
if fields is not None:
for field in fields:
iq['req'].add_field(field)
iq['req'].add_field(field);
iq['req']._set_flags(flags)
iq['req']._set_flags(flags);
self.sessions[seqnr] = {"from": iq['from'], "to": iq['to'], "seqnr": seqnr, "callback": callback}
iq.send(block=False)
self.sessions[seqnr] = {"from": iq['from'], "to": iq['to'], "seqnr": seqnr, "callback": callback};
iq.send(block=False);
return seqnr
return seqnr;
def cancel_request(self, session):
"""
"""
Called on the client side to cancel a request for data readout.
Composes a message with the cancellation and sends it to the device(s).
Does not block, the callback will be called when cancellation is
Does not block, the callback will be called when cancellation is
confirmed.
Arguments:
session -- The session id of the request to cancel
"""
seqnr = session
iq = self.xmpp.Iq()
iq = self.xmpp.Iq();
iq['from'] = self.sessions[seqnr]['from']
iq['to'] = self.sessions[seqnr]['to']
iq['type'] = "get"
iq['id'] = seqnr
iq['cancel']['seqnr'] = seqnr
iq.send(block=False)
iq['to'] = self.sessions[seqnr]['to'];
iq['type'] = "get";
iq['id'] = seqnr;
iq['cancel']['seqnr'] = seqnr;
iq.send(block=False);
def _get_new_seqnr(self):
""" Returns a unique sequence number (unique across threads) """
self.seqnr_lock.acquire()
self.last_seqnr += 1
self.seqnr_lock.release()
return str(self.last_seqnr)
self.seqnr_lock.acquire();
self.last_seqnr = self.last_seqnr + 1;
self.seqnr_lock.release();
return str(self.last_seqnr);
def _handle_event_accepted(self, iq):
""" Received Iq with accepted - request was accepted """
seqnr = iq['accepted']['seqnr']
seqnr = iq['accepted']['seqnr'];
result = "accepted"
if iq['accepted']['queued'] == 'true':
result = "queued"
callback = self.sessions[seqnr]["callback"]
callback(from_jid=iq['from'], result=result)
callback = self.sessions[seqnr]["callback"];
callback(from_jid=iq['from'], result=result);
def _handle_event_rejected(self, iq):
""" Received Iq with rejected - this is a reject.
""" Received Iq with rejected - this is a reject.
Delete the session. """
seqnr = iq['rejected']['seqnr']
callback = self.sessions[seqnr]["callback"]
callback(from_jid=iq['from'], result="rejected", error_msg=iq['rejected']['error'])
seqnr = iq['rejected']['seqnr'];
callback = self.sessions[seqnr]["callback"];
callback(from_jid=iq['from'], result="rejected", error_msg=iq['rejected']['error']);
# Session terminated
del self.sessions[seqnr]
del self.sessions[seqnr];
def _handle_event_cancelled(self, iq):
"""
Received Iq with cancelled - this is a cancel confirm.
Delete the session.
"""
Received Iq with cancelled - this is a cancel confirm.
Delete the session.
"""
#print("Got cancelled")
seqnr = iq['cancelled']['seqnr']
callback = self.sessions[seqnr]["callback"]
callback(from_jid=iq['from'], result="cancelled")
seqnr = iq['cancelled']['seqnr'];
callback = self.sessions[seqnr]["callback"];
callback(from_jid=iq['from'], result="cancelled");
# Session cancelled
del self.sessions[seqnr]
del self.sessions[seqnr];
def _handle_event_fields(self, msg):
"""
"""
Received Msg with fields - this is a data reponse to a request.
If this is the last data block, issue a "done" callback.
"""
seqnr = msg['fields']['seqnr']
callback = self.sessions[seqnr]["callback"]
seqnr = msg['fields']['seqnr'];
callback = self.sessions[seqnr]["callback"];
for node in msg['fields']['nodes']:
for ts in node['timestamps']:
fields = []
fields = [];
for d in ts['datas']:
field_block = {}
field_block["name"] = d['name']
field_block["typename"] = d._get_typename()
field_block["value"] = d['value']
field_block = {};
field_block["name"] = d['name'];
field_block["typename"] = d._get_typename();
field_block["value"] = d['value'];
if not d['unit'] == "": field_block["unit"] = d['unit'];
if not d['dataType'] == "": field_block["dataType"] = d['dataType'];
flags = d._get_flags()
flags = d._get_flags();
if not len(flags) == 0:
field_block["flags"] = flags
fields.append(field_block)
callback(from_jid=msg['from'], result="fields", nodeId=node['nodeId'], timestamp=ts['value'], fields=fields)
field_block["flags"] = flags;
fields.append(field_block);
callback(from_jid=msg['from'], result="fields", nodeId=node['nodeId'], timestamp=ts['value'], fields=fields);
if msg['fields']['done'] == "true":
callback(from_jid=msg['from'], result="done")
callback(from_jid=msg['from'], result="done");
# Session done
del self.sessions[seqnr]
del self.sessions[seqnr];
def _handle_event_failure(self, msg):
"""
"""
Received Msg with failure - our request failed
Delete the session.
Delete the session.
"""
seqnr = msg['failure']['seqnr']
callback = self.sessions[seqnr]["callback"]
callback(from_jid=msg['from'], result="failure", nodeId=msg['failure']['error']['nodeId'], timestamp=msg['failure']['error']['timestamp'], error_msg=msg['failure']['error']['text'])
seqnr = msg['failure']['seqnr'];
callback = self.sessions[seqnr]["callback"];
callback(from_jid=msg['from'], result="failure", nodeId=msg['failure']['error']['nodeId'], timestamp=msg['failure']['error']['timestamp'], error_msg=msg['failure']['error']['text']);
# Session failed
del self.sessions[seqnr]
del self.sessions[seqnr];
def _handle_event_started(self, msg):
"""
Received Msg with started - our request was queued and is now started.
"""
Received Msg with started - our request was queued and is now started.
"""
seqnr = msg['started']['seqnr']
callback = self.sessions[seqnr]["callback"]
callback(from_jid=msg['from'], result="started")
seqnr = msg['started']['seqnr'];
callback = self.sessions[seqnr]["callback"];
callback(from_jid=msg['from'], result="started");

View File

@@ -20,14 +20,14 @@ class Sensordata(ElementBase):
interfaces = set(tuple())
class FieldTypes():
"""
"""
All field types are optional booleans that default to False
"""
field_types = set([ 'momentary','peak','status','computed','identity','historicalSecond','historicalMinute','historicalHour', \
'historicalDay','historicalWeek','historicalMonth','historicalQuarter','historicalYear','historicalOther'])
class FieldStatus():
"""
"""
All field statuses are optional booleans that default to False
"""
field_status = set([ 'missing','automaticEstimate','manualEstimate','manualReadout','automaticReadout','timeOffset','warning','error', \
@@ -38,12 +38,12 @@ class Request(ElementBase):
name = 'req'
plugin_attrib = name
interfaces = set(['seqnr','nodes','fields','serviceToken','deviceToken','userToken','from','to','when','historical','all'])
interfaces.update(FieldTypes.field_types)
_flags = set(['serviceToken','deviceToken','userToken','from','to','when','historical','all'])
_flags.update(FieldTypes.field_types)
interfaces.update(FieldTypes.field_types);
_flags = set(['serviceToken','deviceToken','userToken','from','to','when','historical','all']);
_flags.update(FieldTypes.field_types);
def __init__(self, xml=None, parent=None):
ElementBase.__init__(self, xml, parent)
ElementBase.__init__(self, xml, parent);
self._nodes = set()
self._fields = set()
@@ -64,27 +64,27 @@ class Request(ElementBase):
def _get_flags(self):
"""
Helper function for getting of flags. Returns all flags in
dictionary format: { "flag name": "flag value" ... }
Helper function for getting of flags. Returns all flags in
dictionary format: { "flag name": "flag value" ... }
"""
flags = {}
flags = {};
for f in self._flags:
if not self[f] == "":
flags[f] = self[f]
return flags
flags[f] = self[f];
return flags;
def _set_flags(self, flags):
"""
Helper function for setting of flags.
Helper function for setting of flags.
Arguments:
flags -- Flags in dictionary format: { "flag name": "flag value" ... }
flags -- Flags in dictionary format: { "flag name": "flag value" ... }
"""
for f in self._flags:
if flags is not None and f in flags:
self[f] = flags[f]
self[f] = flags[f];
else:
self[f] = None
self[f] = None;
def add_node(self, nodeId, sourceId=None, cacheType=None):
"""
@@ -94,7 +94,7 @@ class Request(ElementBase):
Arguments:
nodeId -- The ID for the node.
sourceId -- [optional] identifying the data source controlling the device
cacheType -- [optional] narrowing down the search to a specific kind of node
cacheType -- [optional] narrowing down the search to a specific kind of node
"""
if nodeId not in self._nodes:
self._nodes.add((nodeId))
@@ -269,7 +269,7 @@ class Error(ElementBase):
:param value: string
"""
self.xml.text = value
self.xml.text = value;
return self
def del_text(self):
@@ -292,7 +292,7 @@ class Fields(ElementBase):
interfaces = set(['seqnr','done','nodes'])
def __init__(self, xml=None, parent=None):
ElementBase.__init__(self, xml, parent)
ElementBase.__init__(self, xml, parent);
self._nodes = set()
def setup(self, xml=None):
@@ -318,7 +318,7 @@ class Fields(ElementBase):
Arguments:
nodeId -- The ID for the node.
sourceId -- [optional] identifying the data source controlling the device
cacheType -- [optional] narrowing down the search to a specific kind of node
cacheType -- [optional] narrowing down the search to a specific kind of node
"""
if nodeId not in self._nodes:
self._nodes.add((nodeId))
@@ -392,7 +392,7 @@ class FieldsNode(ElementBase):
interfaces = set(['nodeId','sourceId','cacheType','timestamps'])
def __init__(self, xml=None, parent=None):
ElementBase.__init__(self, xml, parent)
ElementBase.__init__(self, xml, parent);
self._timestamps = set()
def setup(self, xml=None):
@@ -411,7 +411,7 @@ class FieldsNode(ElementBase):
def add_timestamp(self, timestamp, substanzas=None):
"""
Add a new timestamp element.
Add a new timestamp element.
Arguments:
timestamp -- The timestamp in ISO format.
@@ -423,7 +423,7 @@ class FieldsNode(ElementBase):
ts = Timestamp(parent=self)
ts['value'] = timestamp
if not substanzas is None:
ts.set_datas(substanzas)
ts.set_datas(substanzas);
#print("add_timestamp with substanzas: " + str(substanzas))
self.iterables.append(ts)
#print(str(id(self)) + " added_timestamp: " + str(id(ts)))
@@ -485,7 +485,7 @@ class FieldsNode(ElementBase):
self.iterables.remove(timestamp)
class Field(ElementBase):
"""
"""
Field element in response Timestamp. This is a base class,
all instances of fields added to Timestamp must be of types:
DataNumeric
@@ -494,17 +494,17 @@ class Field(ElementBase):
DataDateTime
DataTimeSpan
DataEnum
"""
"""
namespace = 'urn:xmpp:iot:sensordata'
name = 'field'
plugin_attrib = name
interfaces = set(['name','module','stringIds'])
interfaces.update(FieldTypes.field_types)
interfaces.update(FieldStatus.field_status)
interfaces = set(['name','module','stringIds']);
interfaces.update(FieldTypes.field_types);
interfaces.update(FieldStatus.field_status);
_flags = set()
_flags.update(FieldTypes.field_types)
_flags.update(FieldStatus.field_status)
_flags = set();
_flags.update(FieldTypes.field_types);
_flags.update(FieldStatus.field_status);
def set_stringIds(self, value):
"""Verifies stringIds according to regexp from specification XMPP-0323.
@@ -514,7 +514,7 @@ class Field(ElementBase):
pattern = re.compile("^\d+([|]\w+([.]\w+)*([|][^,]*)?)?(,\d+([|]\w+([.]\w+)*([|][^,]*)?)?)*$")
if pattern.match(value) is not None:
self.xml.stringIds = value
self.xml.stringIds = value;
else:
# Bad content, add nothing
pass
@@ -523,30 +523,30 @@ class Field(ElementBase):
def _get_flags(self):
"""
Helper function for getting of flags. Returns all flags in
dictionary format: { "flag name": "flag value" ... }
Helper function for getting of flags. Returns all flags in
dictionary format: { "flag name": "flag value" ... }
"""
flags = {}
flags = {};
for f in self._flags:
if not self[f] == "":
flags[f] = self[f]
return flags
flags[f] = self[f];
return flags;
def _set_flags(self, flags):
"""
Helper function for setting of flags.
Helper function for setting of flags.
Arguments:
flags -- Flags in dictionary format: { "flag name": "flag value" ... }
flags -- Flags in dictionary format: { "flag name": "flag value" ... }
"""
for f in self._flags:
if flags is not None and f in flags:
self[f] = flags[f]
self[f] = flags[f];
else:
self[f] = None
self[f] = None;
def _get_typename(self):
return "invalid type, use subclasses!"
return "invalid type, use subclasses!";
class Timestamp(ElementBase):
@@ -557,7 +557,7 @@ class Timestamp(ElementBase):
interfaces = set(['value','datas'])
def __init__(self, xml=None, parent=None):
ElementBase.__init__(self, xml, parent)
ElementBase.__init__(self, xml, parent);
self._datas = set()
def setup(self, xml=None):
@@ -576,7 +576,7 @@ class Timestamp(ElementBase):
def add_data(self, typename, name, value, module=None, stringIds=None, unit=None, dataType=None, flags=None):
"""
Add a new data element.
Add a new data element.
Arguments:
typename -- The type of data element (numeric, string, boolean, dateTime, timeSpan or enum)
@@ -587,29 +587,29 @@ class Timestamp(ElementBase):
dataType -- [optional] The dataType. Only applicable for type enum
"""
if name not in self._datas:
dataObj = None
dataObj = None;
if typename == "numeric":
dataObj = DataNumeric(parent=self)
dataObj['unit'] = unit
dataObj = DataNumeric(parent=self);
dataObj['unit'] = unit;
elif typename == "string":
dataObj = DataString(parent=self)
dataObj = DataString(parent=self);
elif typename == "boolean":
dataObj = DataBoolean(parent=self)
dataObj = DataBoolean(parent=self);
elif typename == "dateTime":
dataObj = DataDateTime(parent=self)
dataObj = DataDateTime(parent=self);
elif typename == "timeSpan":
dataObj = DataTimeSpan(parent=self)
dataObj = DataTimeSpan(parent=self);
elif typename == "enum":
dataObj = DataEnum(parent=self)
dataObj['dataType'] = dataType
dataObj = DataEnum(parent=self);
dataObj['dataType'] = dataType;
dataObj['name'] = name
dataObj['value'] = value
dataObj['module'] = module
dataObj['stringIds'] = stringIds
dataObj['name'] = name;
dataObj['value'] = value;
dataObj['module'] = module;
dataObj['stringIds'] = stringIds;
if flags is not None:
dataObj._set_flags(flags)
dataObj._set_flags(flags);
self._datas.add(name)
self.iterables.append(dataObj)
@@ -661,87 +661,87 @@ class Timestamp(ElementBase):
self.iterables.remove(data)
class DataNumeric(Field):
"""
Field data of type numeric.
Note that the value is expressed as a string.
"""
Field data of type numeric.
Note that the value is expressed as a string.
"""
namespace = 'urn:xmpp:iot:sensordata'
name = 'numeric'
plugin_attrib = name
interfaces = set(['value', 'unit'])
interfaces.update(Field.interfaces)
interfaces = set(['value', 'unit']);
interfaces.update(Field.interfaces);
def _get_typename(self):
return "numeric"
return "numeric"
class DataString(Field):
"""
Field data of type string
"""
Field data of type string
"""
namespace = 'urn:xmpp:iot:sensordata'
name = 'string'
plugin_attrib = name
interfaces = set(['value'])
interfaces.update(Field.interfaces)
interfaces = set(['value']);
interfaces.update(Field.interfaces);
def _get_typename(self):
return "string"
return "string"
class DataBoolean(Field):
"""
"""
Field data of type boolean.
Note that the value is expressed as a string.
Note that the value is expressed as a string.
"""
namespace = 'urn:xmpp:iot:sensordata'
name = 'boolean'
plugin_attrib = name
interfaces = set(['value'])
interfaces.update(Field.interfaces)
interfaces = set(['value']);
interfaces.update(Field.interfaces);
def _get_typename(self):
return "boolean"
return "boolean"
class DataDateTime(Field):
"""
"""
Field data of type dateTime.
Note that the value is expressed as a string.
Note that the value is expressed as a string.
"""
namespace = 'urn:xmpp:iot:sensordata'
name = 'dateTime'
plugin_attrib = name
interfaces = set(['value'])
interfaces.update(Field.interfaces)
interfaces = set(['value']);
interfaces.update(Field.interfaces);
def _get_typename(self):
return "dateTime"
return "dateTime"
class DataTimeSpan(Field):
"""
"""
Field data of type timeSpan.
Note that the value is expressed as a string.
Note that the value is expressed as a string.
"""
namespace = 'urn:xmpp:iot:sensordata'
name = 'timeSpan'
plugin_attrib = name
interfaces = set(['value'])
interfaces.update(Field.interfaces)
interfaces = set(['value']);
interfaces.update(Field.interfaces);
def _get_typename(self):
return "timeSpan"
return "timeSpan"
class DataEnum(Field):
"""
"""
Field data of type enum.
Note that the value is expressed as a string.
Note that the value is expressed as a string.
"""
namespace = 'urn:xmpp:iot:sensordata'
name = 'enum'
plugin_attrib = name
interfaces = set(['value', 'dataType'])
interfaces.update(Field.interfaces)
interfaces = set(['value', 'dataType']);
interfaces.update(Field.interfaces);
def _get_typename(self):
return "enum"
return "enum"
class Done(ElementBase):
""" Done element used to signal that all data has been transferred """

View File

@@ -23,7 +23,7 @@ class _TimerReset(Thread):
t.cancel() # stop the timer's action if it's still waiting
"""
def __init__(self, interval, function, *args, **kwargs):
def __init__(self, interval, function, args=[], kwargs={}):
Thread.__init__(self)
self.interval = interval
self.function = function