First implementation of the xep_0323 and xep_325 used in IoT systems. Tests are added for stanza and streams

This commit is contained in:
Joachim Lindborg 2013-08-30 02:29:52 +02:00
parent b7adaafb3e
commit 45689fd879
18 changed files with 5234 additions and 77 deletions

View File

@ -79,5 +79,6 @@ __all__ = [
'xep_0302', # XMPP Compliance Suites 2012
'xep_0308', # Last Message Correction
'xep_0313', # Message Archive Management
'xep_0323', # IoT Sensor Data
'xep_0323', # IoT Systems Sensor Data
'xep_0325', # IoT Systems Control
]

View File

@ -2,7 +2,7 @@
SleekXMPP: The Sleek XMPP Library
Implementation of xeps for Internet of Things
http://wiki.xmpp.org/web/Tech_pages/IoT_systems
Copyright (C) 2013 Joachim Lindborg, Joachim.lindborg@lsys.se
Copyright (C) 2013 Sustainable Innovation, Joachim.lindborg@sust.se, bjorn.westrom@consoden.se
This file is part of SleekXMPP.
See the file LICENSE for copying permission.

View File

@ -0,0 +1,243 @@
"""
SleekXMPP: The Sleek XMPP Library
Implementation of xeps for Internet of Things
http://wiki.xmpp.org/web/Tech_pages/IoT_systems
Copyright (C) 2013 Sustainable Innovation, Joachim.lindborg@sust.se, bjorn.westrom@consoden.se
This file is part of SleekXMPP.
See the file LICENSE for copying permission.
"""
import datetime
class Device(object):
"""
Example implementation of a device readout object.
Is registered in the XEP_0323.register_node call
The device object may be any custom implementation to support
specific devices, but it must implement the functions:
has_field
request_fields
"""
def __init__(self, nodeId):
self.nodeId = nodeId;
self.fields = {};
# {'type':'numeric',
# 'name':'myname',
# 'value': 42,
# 'unit':'Z'}];
self.timestamp_data = {};
self.momentary_data = {};
self.momentary_timestamp = "";
def has_field(self, field):
"""
Returns true if the supplied field name exists in this device.
Arguments:
field -- The field name
"""
if field in self.fields:
return True;
return False;
def request_fields(self, fields, flags, session, callback):
"""
Starts a data readout. Verifies the requested fields,
refreshes the data (if needed) and calls the callback
with requested data.
Arguments:
fields -- List of field names to readout
flags -- [optional] data classifier flags for the field, e.g. momentary
Formatted as a dictionary like { "flag name": "flag value" ... }
session -- Session id, only used in the callback as identifier
callback -- Callback function to call when data is available.
The callback function must support the following arguments:
session -- Session id, as supplied in the request_fields call
nodeId -- Identifier for this device
result -- The current result status of the readout. Valid values are:
"error" - Readout failed.
"fields" - Contains readout data.
"done" - Indicates that the readout is complete. May contain
readout data.
timestamp_block -- [optional] Only applies when result != "error"
The readout data. Structured as a dictionary:
{
timestamp: timestamp for this datablock,
fields: list of field dictionary (one per readout field).
readout field dictionary format:
{
type: The field type (numeric, boolean, dateTime, timeSpan, string, enum)
name: The field name
value: The field value
unit: The unit of the field. Only applies to type numeric.
dataType: The datatype of the field. Only applies to type enum.
flags: [optional] data classifier flags for the field, e.g. momentary
Formatted as a dictionary like { "flag name": "flag value" ... }
}
}
error_msg -- [optional] Only applies when result == "error".
Error details when a request failed.
"""
if len(fields) > 0:
# Check availiability
for f in fields:
if f not in self.fields:
self._send_reject(session, callback)
return False;
else:
# Request all fields
fields = self.fields;
# Refresh data from device
# ...
if "momentary" in flags and flags['momentary'] == "true" or \
"all" in flags and flags['all'] == "true":
ts_block = {};
timestamp = "";
if len(self.momentary_timestamp) > 0:
timestamp = self.momentary_timestamp;
else:
timestamp = self._get_timestamp();
field_block = [];
for f in self.momentary_data:
if f in fields:
field_block.append({"name": f,
"type": self.fields[f]["type"],
"unit": self.fields[f]["unit"],
"dataType": self.fields[f]["dataType"],
"value": self.momentary_data[f]["value"],
"flags": self.momentary_data[f]["flags"]});
ts_block["timestamp"] = timestamp;
ts_block["fields"] = field_block;
callback(session, result="done", nodeId=self.nodeId, timestamp_block=ts_block);
return
from_flag = self._datetime_flag_parser(flags, 'from')
to_flag = self._datetime_flag_parser(flags, 'to')
for ts in sorted(self.timestamp_data.keys()):
tsdt = datetime.datetime.strptime(ts, "%Y-%m-%dT%H:%M:%S")
if not from_flag is None:
if tsdt < from_flag:
#print (str(tsdt) + " < " + str(from_flag))
continue
if not to_flag is None:
if tsdt > to_flag:
#print (str(tsdt) + " > " + str(to_flag))
continue
ts_block = {};
field_block = [];
for f in self.timestamp_data[ts]:
if f in fields:
field_block.append({"name": f,
"type": self.fields[f]["type"],
"unit": self.fields[f]["unit"],
"dataType": self.fields[f]["dataType"],
"value": self.timestamp_data[ts][f]["value"],
"flags": self.timestamp_data[ts][f]["flags"]});
ts_block["timestamp"] = ts;
ts_block["fields"] = field_block;
callback(session, result="fields", nodeId=self.nodeId, timestamp_block=ts_block);
callback(session, result="done", nodeId=self.nodeId, timestamp_block=None);
def _datetime_flag_parser(self, flags, flagname):
if not flagname in flags:
return None
dt = None
try:
dt = datetime.datetime.strptime(flags[flagname], "%Y-%m-%dT%H:%M:%S")
except ValueError:
# Badly formatted datetime, ignore it
pass
return dt
def _get_timestamp(self):
"""
Generates a properly formatted timestamp of current time
"""
return datetime.datetime.now().replace(microsecond=0).isoformat()
def _send_reject(self, session, callback):
"""
Sends a reject to the caller
Arguments:
session -- Session id, see definition in request_fields function
callback -- Callback function, see definition in request_fields function
"""
callback(session, result="error", nodeId=self.nodeId, timestamp_block=None, error_msg="Reject");
def _add_field(self, name, typename, unit=None, dataType=None):
"""
Adds a field to the device
Arguments:
name -- Name of the field
typename -- Type of the field (numeric, boolean, dateTime, timeSpan, string, enum)
unit -- [optional] only applies to "numeric". Unit for the field.
dataType -- [optional] only applies to "enum". Datatype for the field.
"""
self.fields[name] = {"type": typename, "unit": unit, "dataType": dataType};
def _add_field_timestamp_data(self, name, timestamp, value, flags=None):
"""
Adds timestamped data to a field
Arguments:
name -- Name of the field
timestamp -- Timestamp for the data (string)
value -- Field value at the timestamp
flags -- [optional] data classifier flags for the field, e.g. momentary
Formatted as a dictionary like { "flag name": "flag value" ... }
"""
if not name in self.fields:
return False;
if not timestamp in self.timestamp_data:
self.timestamp_data[timestamp] = {};
self.timestamp_data[timestamp][name] = {"value": value, "flags": flags};
return True;
def _add_field_momentary_data(self, name, value, flags=None):
"""
Sets momentary data to a field
Arguments:
name -- Name of the field
value -- Field value at the timestamp
flags -- [optional] data classifier flags for the field, e.g. momentary
Formatted as a dictionary like { "flag name": "flag value" ... }
"""
if self.fields[name] is None:
return False;
if flags is None:
flags = {};
flags["momentary"] = "true"
self.momentary_data[name] = {"value": value, "flags": flags};
return True;
def _set_momentary_timestamp(self, timestamp):
"""
This function is only for unit testing to produce predictable results.
"""
self.momentary_timestamp = timestamp;

View File

@ -2,19 +2,25 @@
SleekXMPP: The Sleek XMPP Library
Implementation of xeps for Internet of Things
http://wiki.xmpp.org/web/Tech_pages/IoT_systems
Copyright (C) 2013 Joachim Lindborg, Joachim.lindborg@lsys.se
Copyright (C) 2013 Sustainable Innovation, Joachim.lindborg@sust.se, bjorn.westrom@consoden.se
This file is part of SleekXMPP.
See the file LICENSE for copying permission.
"""
import logging
import time
import datetime
from threading import Thread, Lock, Timer
from sleekxmpp.plugins.xep_0323.timerreset import TimerReset
from sleekxmpp.xmlstream import JID
from sleekxmpp.xmlstream.handler import Callback
from sleekxmpp.xmlstream.matcher import StanzaPath
from sleekxmpp.plugins.base import BasePlugin
from sleekxmpp.plugins.xep_0323 import stanza
from sleekxmpp.plugins.xep_0323.stanza import Sensordata
log = logging.getLogger(__name__)
@ -23,43 +29,690 @@ log = logging.getLogger(__name__)
class XEP_0323(BasePlugin):
"""
XEP-0323 IoT Sensor Data
XEP-0323: IoT Sensor Data
This XEP provides the underlying architecture, basic operations and data
structures for sensor data communication over XMPP networks. It includes
a hardware abstraction model, removing any technical detail implemented
in underlying technologies.
Also see <http://xmpp.org/extensions/xep-0323.html>
Configuration Values:
threaded -- Indicates if communication with sensors should be threaded.
Defaults to True.
Events:
Sensor side
-----------
Sensordata Event:Req -- Received a request for data
Sensordata Event:Cancel -- Received a cancellation for a request
Client side
-----------
Sensordata Event:Accepted -- Received a accept from sensor for a request
Sensordata Event:Rejected -- Received a reject from sensor for a request
Sensordata Event:Cancelled -- Received a cancel confirm from sensor
Sensordata Event:Fields -- Received fields from sensor for a request
This may be triggered multiple times since
the sensor can split up its response in
multiple messages.
Sensordata Event:Failure -- Received a failure indication from sensor
for a request. Typically a comm timeout.
Attributes:
threaded -- Indicates if command events should be threaded.
Defaults to True.
sessions -- A dictionary or equivalent backend mapping
session IDs to dictionaries containing data
relevant to a request's session. This dictionary is used
both by the client and sensor side. On client side, seqnr
is used as key, while on sensor side, a session_id is used
as key. This ensures that the two will not collide, so
one instance can be both client and sensor.
Sensor side
-----------
nodes -- A dictionary mapping sensor nodes that are serviced through
this XMPP instance to their device handlers ("drivers").
Client side
-----------
last_seqnr -- The last used sequence number (integer). One sequence of
communication (e.g. -->request, <--accept, <--fields)
between client and sensor is identified by a unique
sequence number (unique between the client/sensor pair)
Methods:
plugin_init -- Overrides base_plugin.plugin_init
post_init -- Overrides base_plugin.post_init
plugin_end -- Overrides base_plugin.plugin_end
Sensor side
-----------
register_node -- Register a sensor as available from this XMPP
instance.
Client side
-----------
request_data -- Initiates a request for data from one or more
sensors. Non-blocking, a callback function will
be called when data is available.
"""
name = 'xep_0323'
description = 'XEP-0323 Internet of Things - Sensor Data'
dependencies = set(['xep_0030']) # set(['xep_0030', 'xep_0004', 'xep_0082', 'xep_0131'])
dependencies = set(['xep_0030'])
stanza = stanza
def plugin_init(self):
pass
# self.node_event_map = {}
# self.xmpp.register_handler(
# Callback('Sensordata Event: Get',
# StanzaPath('message/sensordata_event/get'),
# self._handle_event_get))
default_config = {
'threaded': True
# 'session_db': None
}
def plugin_init(self):
""" Start the XEP-0323 plugin """
self.xmpp.register_handler(
Callback('Sensordata Event:Req',
StanzaPath('iq@type=get/req'),
self._handle_event_req))
self.xmpp.register_handler(
Callback('Sensordata Event:Accepted',
StanzaPath('iq@type=result/accepted'),
self._handle_event_accepted))
self.xmpp.register_handler(
Callback('Sensordata Event:Rejected',
StanzaPath('iq@type=error/rejected'),
self._handle_event_rejected))
self.xmpp.register_handler(
Callback('Sensordata Event:Cancel',
StanzaPath('iq@type=get/cancel'),
self._handle_event_cancel))
self.xmpp.register_handler(
Callback('Sensordata Event:Cancelled',
StanzaPath('iq@type=result/cancelled'),
self._handle_event_cancelled))
self.xmpp.register_handler(
Callback('Sensordata Event:Fields',
StanzaPath('message/fields'),
self._handle_event_fields))
self.xmpp.register_handler(
Callback('Sensordata Event:Failure',
StanzaPath('message/failure'),
self._handle_event_failure))
self.xmpp.register_handler(
Callback('Sensordata Event:Started',
StanzaPath('message/started'),
self._handle_event_started))
# Server side dicts
self.nodes = {};
self.sessions = {};
self.last_seqnr = 0;
self.seqnr_lock = Lock();
## For testning only
self.test_authenticated_from = ""
def post_init(self):
""" Init complete. Register our features in Serivce discovery. """
BasePlugin.post_init(self)
self.xmpp['xep_0030'].add_feature(Sensordata.namespace)
self.xmpp['xep_0030'].set_items(node=Sensordata.namespace, items=tuple())
def _new_session(self):
""" Return a new session ID. """
return str(time.time()) + '-' + self.xmpp.new_id()
def plugin_end(self):
# self.xmpp.remove_handler('Sensordata Event: Get')
pass
def get_value(self, jid, msg):
""" Stop the XEP-0323 plugin """
self.sessions.clear();
self.xmpp.remove_handler('Sensordata Event:Req')
self.xmpp.remove_handler('Sensordata Event:Accepted')
self.xmpp.remove_handler('Sensordata Event:Rejected')
self.xmpp.remove_handler('Sensordata Event:Cancel')
self.xmpp.remove_handler('Sensordata Event:Cancelled')
self.xmpp.remove_handler('Sensordata Event:Fields')
self.xmpp['xep_0030'].del_feature(feature=Sensordata.namespace)
self.xmpp['xep_0030'].set_items(node=Sensordata.namespace, items=tuple());
# =================================================================
# Sensor side (data provider) API
def register_node(self, nodeId, device, commTimeout, sourceId=None, cacheType=None):
"""
Recieving a stanza for erading values
# verify provisioning
Register a sensor/device as available for serving of data through this XMPP
instance.
# verify requested values and categories
The device object may by any custom implementation to support
specific devices, but it must implement the functions:
has_field
request_fields
according to the interfaces shown in the example device.py file.
# Send accepted
# Thread of the readout
Arguments:
nodeId -- The identifier for the device
device -- The device object
commTimeout -- Time in seconds to wait between each callback from device during
a data readout. Float.
sourceId -- [optional] identifying the data source controlling the device
cacheType -- [optional] narrowing down the search to a specific kind of node
"""
self.nodes[nodeId] = {"device": device,
"commTimeout": commTimeout,
"sourceId": sourceId,
"cacheType": cacheType};
# send started
def _set_authenticated(self, auth=''):
""" Internal testing function """
self.test_authenticated_from = auth;
# send data messages
# send done
"""
pass
def _handle_event_req(self, iq):
"""
Event handler for reception of an Iq with req - this is a request.
Verifies that
- all the requested nodes are available
- at least one of the requested fields is available from at least
one of the nodes
If the request passes verification, an accept response is sent, and
the readout process is started in a separate thread.
If the verification fails, a reject message is sent.
"""
seqnr = iq['req']['seqnr'];
error_msg = '';
req_ok = True;
# Authentication
if len(self.test_authenticated_from) > 0 and not iq['from'] == self.test_authenticated_from:
# Invalid authentication
req_ok = False;
error_msg = "Access denied";
# Nodes
process_nodes = [];
if len(iq['req']['nodes']) > 0:
for n in iq['req']['nodes']:
if not n['nodeId'] in self.nodes:
req_ok = False;
error_msg = "Invalid nodeId " + n['nodeId'];
process_nodes = [n['nodeId'] for n in iq['req']['nodes']];
else:
process_nodes = self.nodes.keys();
# Fields - if we just find one we are happy, otherwise we reject
process_fields = [];
if len(iq['req']['fields']) > 0:
found = False
for f in iq['req']['fields']:
for node in self.nodes:
if self.nodes[node]["device"].has_field(f['name']):
found = True;
break;
if not found:
req_ok = False;
error_msg = "Invalid field " + f['name'];
process_fields = [f['name'] for n in iq['req']['fields']];
req_flags = iq['req']._get_flags();
request_delay_sec = None
if 'when' in req_flags:
# Timed request - requires datetime string in iso format
# ex. 2013-04-05T15:00:03
dt = None
try:
dt = datetime.datetime.strptime(req_flags['when'], "%Y-%m-%dT%H:%M:%S")
except ValueError:
req_ok = False;
error_msg = "Invalid datetime in 'when' flag, please use ISO format (i.e. 2013-04-05T15:00:03)."
if not dt is None:
# Datetime properly formatted
dtnow = datetime.datetime.now()
dtdiff = dt - dtnow
request_delay_sec = dtdiff.seconds + dtdiff.days * 24 * 3600
if request_delay_sec <= 0:
req_ok = False;
error_msg = "Invalid datetime in 'when' flag, cannot set a time in the past. Current time: " + dtnow.isoformat();
if req_ok:
session = self._new_session();
self.sessions[session] = {"from": iq['from'], "to": iq['to'], "seqnr": seqnr};
self.sessions[session]["commTimers"] = {};
self.sessions[session]["nodeDone"] = {};
#print("added session: " + str(self.sessions))
iq.reply();
iq['accepted']['seqnr'] = seqnr;
if not request_delay_sec is None:
iq['accepted']['queued'] = "true"
iq.send(block=False);
self.sessions[session]["node_list"] = process_nodes;
if not request_delay_sec is None:
# Delay request to requested time
timer = Timer(request_delay_sec, self._event_delayed_req, args=(session, process_fields, req_flags))
self.sessions[session]["commTimers"]["delaytimer"] = timer;
timer.start();
return
if self.threaded:
#print("starting thread")
tr_req = Thread(target=self._threaded_node_request, args=(session, process_fields, req_flags))
tr_req.start()
#print("started thread")
else:
self._threaded_node_request(session, process_fields, req_flags);
else:
iq.reply();
iq['type'] = 'error';
iq['rejected']['seqnr'] = seqnr;
iq['rejected']['error'] = error_msg;
iq.send(block=False);
def _threaded_node_request(self, session, process_fields, flags):
"""
Helper function to handle the device readouts in a separate thread.
Arguments:
session -- The request session id
process_fields -- The fields to request from the devices
flags -- [optional] flags to pass to the devices, e.g. momentary
Formatted as a dictionary like { "flag name": "flag value" ... }
"""
for node in self.sessions[session]["node_list"]:
self.sessions[session]["nodeDone"][node] = False;
for node in self.sessions[session]["node_list"]:
timer = TimerReset(self.nodes[node]['commTimeout'], self._event_comm_timeout, args=(session, node));
self.sessions[session]["commTimers"][node] = timer;
#print("Starting timer " + str(timer) + ", timeout: " + str(self.nodes[node]['commTimeout']))
timer.start();
self.nodes[node]['device'].request_fields(process_fields, flags=flags, session=session, callback=self._device_field_request_callback);
def _event_comm_timeout(self, session, nodeId):
"""
Triggered if any of the readout operations timeout.
Sends a failure message back to the client, stops communicating
with the failing device.
Arguments:
session -- The request session id
nodeId -- The id of the device which timed out
"""
msg = self.xmpp.Message();
msg['from'] = self.sessions[session]['to'];
msg['to'] = self.sessions[session]['from'];
msg['failure']['seqnr'] = self.sessions[session]['seqnr'];
msg['failure']['error']['text'] = "Timeout";
msg['failure']['error']['nodeId'] = nodeId;
msg['failure']['error']['timestamp'] = datetime.datetime.now().replace(microsecond=0).isoformat();
# Drop communication with this device and check if we are done
self.sessions[session]["nodeDone"][nodeId] = True;
if (self._all_nodes_done(session)):
msg['failure']['done'] = 'true';
msg.send();
# The session is complete, delete it
#print("del session " + session + " due to timeout")
del self.sessions[session];
def _event_delayed_req(self, session, process_fields, req_flags):
"""
Triggered when the timer from a delayed request fires.
Arguments:
session -- The request session id
process_fields -- The fields to request from the devices
flags -- [optional] flags to pass to the devices, e.g. momentary
Formatted as a dictionary like { "flag name": "flag value" ... }
"""
msg = self.xmpp.Message();
msg['from'] = self.sessions[session]['to'];
msg['to'] = self.sessions[session]['from'];
msg['started']['seqnr'] = self.sessions[session]['seqnr'];
msg.send();
if self.threaded:
tr_req = Thread(target=self._threaded_node_request, args=(session, process_fields, req_flags))
tr_req.start()
else:
self._threaded_node_request(session, process_fields, req_flags);
def _all_nodes_done(self, session):
"""
Checks wheter all devices are done replying to the readout.
Arguments:
session -- The request session id
"""
for n in self.sessions[session]["nodeDone"]:
if not self.sessions[session]["nodeDone"][n]:
return False;
return True;
def _device_field_request_callback(self, session, nodeId, result, timestamp_block, error_msg=None):
"""
Callback function called by the devices when they have any additional data.
Composes a message with the data and sends it back to the client, and resets
the timeout timer for the device.
Arguments:
session -- The request session id
nodeId -- The device id which initiated the callback
result -- The current result status of the readout. Valid values are:
"error" - Readout failed.
"fields" - Contains readout data.
"done" - Indicates that the readout is complete. May contain
readout data.
timestamp_block -- [optional] Only applies when result != "error"
The readout data. Structured as a dictionary:
{
timestamp: timestamp for this datablock,
fields: list of field dictionary (one per readout field).
readout field dictionary format:
{
type: The field type (numeric, boolean, dateTime, timeSpan, string, enum)
name: The field name
value: The field value
unit: The unit of the field. Only applies to type numeric.
dataType: The datatype of the field. Only applies to type enum.
flags: [optional] data classifier flags for the field, e.g. momentary
Formatted as a dictionary like { "flag name": "flag value" ... }
}
}
error_msg -- [optional] Only applies when result == "error".
Error details when a request failed.
"""
if not session in self.sessions:
# This can happend if a session was deleted, like in a cancellation. Just drop the data.
return
if result == "error":
self.sessions[session]["commTimers"][nodeId].cancel();
msg = self.xmpp.Message();
msg['from'] = self.sessions[session]['to'];
msg['to'] = self.sessions[session]['from'];
msg['failure']['seqnr'] = self.sessions[session]['seqnr'];
msg['failure']['error']['text'] = error_msg;
msg['failure']['error']['nodeId'] = nodeId;
msg['failure']['error']['timestamp'] = datetime.datetime.now().replace(microsecond=0).isoformat();
# Drop communication with this device and check if we are done
self.sessions[session]["nodeDone"][nodeId] = True;
if (self._all_nodes_done(session)):
msg['failure']['done'] = 'true';
# The session is complete, delete it
# print("del session " + session + " due to error")
del self.sessions[session];
msg.send();
else:
msg = self.xmpp.Message();
msg['from'] = self.sessions[session]['to'];
msg['to'] = self.sessions[session]['from'];
msg['fields']['seqnr'] = self.sessions[session]['seqnr'];
if timestamp_block is not None and len(timestamp_block) > 0:
node = msg['fields'].add_node(nodeId);
ts = node.add_timestamp(timestamp_block["timestamp"]);
for f in timestamp_block["fields"]:
data = ts.add_data( typename=f['type'],
name=f['name'],
value=f['value'],
unit=f['unit'],
dataType=f['dataType'],
flags=f['flags']);
if result == "done":
self.sessions[session]["commTimers"][nodeId].cancel();
self.sessions[session]["nodeDone"][nodeId] = True;
msg['fields']['done'] = 'true';
if (self._all_nodes_done(session)):
# The session is complete, delete it
# print("del session " + session + " due to complete")
del self.sessions[session];
else:
# Restart comm timer
self.sessions[session]["commTimers"][nodeId].reset();
msg.send();
def _handle_event_cancel(self, iq):
""" Received Iq with cancel - this is a cancel request.
Delete the session and confirm. """
seqnr = iq['cancel']['seqnr'];
# Find the session
for s in self.sessions:
if self.sessions[s]['from'] == iq['from'] and self.sessions[s]['to'] == iq['to'] and self.sessions[s]['seqnr'] == seqnr:
# found it. Cancel all timers
for n in self.sessions[s]["commTimers"]:
self.sessions[s]["commTimers"][n].cancel();
# Confirm
iq.reply();
iq['type'] = 'result';
iq['cancelled']['seqnr'] = seqnr;
iq.send(block=False);
# Delete session
del self.sessions[s]
return
# Could not find session, send reject
iq.reply();
iq['type'] = 'error';
iq['rejected']['seqnr'] = seqnr;
iq['rejected']['error'] = "Cancel request received, no matching request is active.";
iq.send(block=False);
# =================================================================
# Client side (data retriever) API
def request_data(self, from_jid, to_jid, callback, nodeIds=None, fields=None, flags=None):
"""
Called on the client side to initiade a data readout.
Composes a message with the request and sends it to the device(s).
Does not block, the callback will be called when data is available.
Arguments:
from_jid -- The jid of the requester
to_jid -- The jid of the device(s)
callback -- The callback function to call when data is availble.
The callback function must support the following arguments:
from_jid -- The jid of the responding device(s)
result -- The current result status of the readout. Valid values are:
"accepted" - Readout request accepted
"queued" - Readout request accepted and queued
"rejected" - Readout request rejected
"failure" - Readout failed.
"cancelled" - Confirmation of request cancellation.
"started" - Previously queued request is now started
"fields" - Contains readout data.
"done" - Indicates that the readout is complete.
nodeId -- [optional] Mandatory when result == "fields" or "failure".
The node Id of the responding device. One callback will only
contain data from one device.
timestamp -- [optional] Mandatory when result == "fields".
The timestamp of data in this callback. One callback will only
contain data from one timestamp.
fields -- [optional] Mandatory when result == "fields".
List of field dictionaries representing the readout data.
Dictionary format:
{
typename: The field type (numeric, boolean, dateTime, timeSpan, string, enum)
name: The field name
value: The field value
unit: The unit of the field. Only applies to type numeric.
dataType: The datatype of the field. Only applies to type enum.
flags: [optional] data classifier flags for the field, e.g. momentary.
Formatted as a dictionary like { "flag name": "flag value" ... }
}
error_msg -- [optional] Mandatory when result == "rejected" or "failure".
Details about why the request is rejected or failed.
"rejected" means that the request is stopped, but note that the
request will continue even after a "failure". "failure" only means
that communication was stopped to that specific device, other
device(s) (if any) will continue their readout.
nodeIds -- [optional] Limits the request to the node Ids in this list.
fields -- [optional] Limits the request to the field names in this list.
flags -- [optional] Limits the request according to the flags, or sets
readout conditions such as timing.
Return value:
session -- Session identifier. Client can use this as a reference to cancel
the request.
"""
iq = self.xmpp.Iq();
iq['from'] = from_jid;
iq['to'] = to_jid;
iq['type'] = "get";
seqnr = self._get_new_seqnr();
iq['id'] = seqnr;
iq['req']['seqnr'] = seqnr;
if nodeIds is not None:
for nodeId in nodeIds:
iq['req'].add_node(nodeId);
if fields is not None:
for field in fields:
iq['req'].add_field(field);
iq['req']._set_flags(flags);
self.sessions[seqnr] = {"from": iq['from'], "to": iq['to'], "seqnr": seqnr, "callback": callback};
iq.send(block=False);
return seqnr;
def cancel_request(self, session):
"""
Called on the client side to cancel a request for data readout.
Composes a message with the cancellation and sends it to the device(s).
Does not block, the callback will be called when cancellation is
confirmed.
Arguments:
session -- The session id of the request to cancel
"""
seqnr = session
iq = self.xmpp.Iq();
iq['from'] = self.sessions[seqnr]['from']
iq['to'] = self.sessions[seqnr]['to'];
iq['type'] = "get";
iq['id'] = seqnr;
iq['cancel']['seqnr'] = seqnr;
iq.send(block=False);
def _get_new_seqnr(self):
""" Returns a unique sequence number (unique across threads) """
self.seqnr_lock.acquire();
self.last_seqnr = self.last_seqnr + 1;
self.seqnr_lock.release();
return str(self.last_seqnr);
def _handle_event_accepted(self, iq):
""" Received Iq with accepted - request was accepted """
seqnr = iq['accepted']['seqnr'];
result = "accepted"
if iq['accepted']['queued'] == 'true':
result = "queued"
callback = self.sessions[seqnr]["callback"];
callback(from_jid=iq['from'], result=result);
def _handle_event_rejected(self, iq):
""" Received Iq with rejected - this is a reject.
Delete the session. """
seqnr = iq['rejected']['seqnr'];
callback = self.sessions[seqnr]["callback"];
callback(from_jid=iq['from'], result="rejected", error_msg=iq['rejected']['error']);
# Session terminated
del self.sessions[seqnr];
def _handle_event_cancelled(self, iq):
"""
Received Iq with cancelled - this is a cancel confirm.
Delete the session.
"""
#print("Got cancelled")
seqnr = iq['cancelled']['seqnr'];
callback = self.sessions[seqnr]["callback"];
callback(from_jid=iq['from'], result="cancelled");
# Session cancelled
del self.sessions[seqnr];
def _handle_event_fields(self, msg):
"""
Received Msg with fields - this is a data reponse to a request.
If this is the last data block, issue a "done" callback.
"""
seqnr = msg['fields']['seqnr'];
callback = self.sessions[seqnr]["callback"];
for node in msg['fields']['nodes']:
for ts in node['timestamps']:
fields = [];
for d in ts['datas']:
field_block = {};
field_block["name"] = d['name'];
field_block["typename"] = d._get_typename();
field_block["value"] = d['value'];
if not d['unit'] == "": field_block["unit"] = d['unit'];
if not d['dataType'] == "": field_block["dataType"] = d['dataType'];
flags = d._get_flags();
if not len(flags) == 0:
field_block["flags"] = flags;
fields.append(field_block);
callback(from_jid=msg['from'], result="fields", nodeId=node['nodeId'], timestamp=ts['value'], fields=fields);
if msg['fields']['done'] == "true":
callback(from_jid=msg['from'], result="done");
# Session done
del self.sessions[seqnr];
def _handle_event_failure(self, msg):
"""
Received Msg with failure - our request failed
Delete the session.
"""
seqnr = msg['failure']['seqnr'];
callback = self.sessions[seqnr]["callback"];
callback(from_jid=msg['from'], result="failure", nodeId=msg['failure']['error']['nodeId'], timestamp=msg['failure']['error']['timestamp'], error_msg=msg['failure']['error']['text']);
# Session failed
del self.sessions[seqnr];
def _handle_event_started(self, msg):
"""
Received Msg with started - our request was queued and is now started.
"""
seqnr = msg['started']['seqnr'];
callback = self.sessions[seqnr]["callback"];
callback(from_jid=msg['from'], result="started");

View File

@ -2,7 +2,7 @@
SleekXMPP: The Sleek XMPP Library
Implementation of xeps for Internet of Things
http://wiki.xmpp.org/web/Tech_pages/IoT_systems
Copyright (C) 2013 Joachim Lindborg, Joachim.lindborg@lsys.se
Copyright (C) 2013 Sustainable Innovation, Joachim.lindborg@sust.se, bjorn.westrom@consoden.se
This file is part of SleekXMPP.
See the file LICENSE for copying permission.

View File

@ -1,9 +1,10 @@
"""
SleekXMPP: The Sleek XMPP Library
Implementation of xeps for Internet of Things
http://wiki.xmpp.org/web/Tech_pages/IoT_systems
Copyright (C) 2013 Joachim Lindborg, Joachim.lindborg@lsys.se
Copyright (C) 2013 Sustainable Innovation, Joachim.lindborg@sust.se, bjorn.westrom@consoden.se
This file is part of SleekXMPP.
See the file LICENSE for copying permission.
"""

View File

@ -2,7 +2,7 @@
SleekXMPP: The Sleek XMPP Library
Implementation of xeps for Internet of Things
http://wiki.xmpp.org/web/Tech_pages/IoT_systems
Copyright (C) 2013 Joachim Lindborg, Joachim.lindborg@lsys.se
Copyright (C) 2013 Sustainable Innovation, Joachim.lindborg@sust.se, bjorn.westrom@consoden.se
This file is part of SleekXMPP.
See the file LICENSE for copying permission.
@ -10,55 +10,783 @@
from sleekxmpp import Iq, Message
from sleekxmpp.xmlstream import register_stanza_plugin, ElementBase, ET, JID
from re import match
class Sensordata(ElementBase):
namespace = 'http://xmpp.org/iot/sensordata'
""" Placeholder for the namespace, not used as a stanza """
namespace = 'urn:xmpp:iot:sensordata'
name = 'sensordata'
plugin_attrib = name
interfaces = set(tuple())
class FieldTypes():
"""
All field types are optional booleans that default to False
"""
field_types = set([ 'momentary','peak','status','computed','identity','historicalSecond','historicalMinute','historicalHour', \
'historicalDay','historicalWeek','historicalMonth','historicalQuarter','historicalYear','historicalOther'])
class FieldStatus():
"""
All field statuses are optional booleans that default to False
"""
field_status = set([ 'missing','automaticEstimate','manualEstimate','manualReadout','automaticReadout','timeOffset','warning','error', \
'signed','invoiced','endOfSeries','powerFailure','invoiceConfirmed'])
class Request(ElementBase):
namespace = 'http://xmpp.org/iot/sensordata'
namespace = 'urn:xmpp:iot:sensordata'
name = 'req'
plugin_attrib = name
interfaces = set(('seqnr','momentary'))
interfaces = set(['seqnr','nodes','fields','serviceToken','deviceToken','userToken','from','to','when','historical','all'])
interfaces.update(FieldTypes.field_types);
_flags = set(['serviceToken','deviceToken','userToken','from','to','when','historical','all']);
_flags.update(FieldTypes.field_types);
def __init__(self, xml=None, parent=None):
ElementBase.__init__(self, xml, parent);
self._nodes = set()
self._fields = set()
def setup(self, xml=None):
"""
Populate the stanza object using an optional XML object.
Overrides ElementBase.setup
Caches item information.
Arguments:
xml -- Use an existing XML object for the stanza's values.
"""
ElementBase.setup(self, xml)
self._nodes = set([node['nodeId'] for node in self['nodes']])
self._fields = set([field['name'] for field in self['fields']])
def _get_flags(self):
"""
Helper function for getting of flags. Returns all flags in
dictionary format: { "flag name": "flag value" ... }
"""
flags = {};
for f in self._flags:
if not self[f] == "":
flags[f] = self[f];
return flags;
def _set_flags(self, flags):
"""
Helper function for setting of flags.
Arguments:
flags -- Flags in dictionary format: { "flag name": "flag value" ... }
"""
for f in self._flags:
if flags is not None and f in flags:
self[f] = flags[f];
else:
self[f] = None;
def add_node(self, nodeId, sourceId=None, cacheType=None):
"""
Add a new node element. Each item is required to have a
nodeId, but may also specify a sourceId value and cacheType.
Arguments:
nodeId -- The ID for the node.
sourceId -- [optional] identifying the data source controlling the device
cacheType -- [optional] narrowing down the search to a specific kind of node
"""
if nodeId not in self._nodes:
self._nodes.add((nodeId))
node = RequestNode(parent=self)
node['nodeId'] = nodeId
node['sourceId'] = sourceId
node['cacheType'] = cacheType
self.iterables.append(node)
return node
return None
def del_node(self, nodeId):
"""
Remove a single node.
Arguments:
nodeId -- Node ID of the item to remove.
"""
if nodeId in self._nodes:
nodes = [i for i in self.iterables if isinstance(i, RequestNode)]
for node in nodes:
if node['nodeId'] == nodeId:
self.xml.remove(node.xml)
self.iterables.remove(node)
return True
return False
def get_nodes(self):
"""Return all nodes."""
nodes = set()
for node in self['substanzas']:
if isinstance(node, RequestNode):
nodes.add(node)
return nodes
def set_nodes(self, nodes):
"""
Set or replace all nodes. The given nodes must be in a
list or set where each item is a tuple of the form:
(nodeId, sourceId, cacheType)
Arguments:
nodes -- A series of nodes in tuple format.
"""
self.del_nodes()
for node in nodes:
if isinstance(node, RequestNode):
self.add_node(node['nodeId'], node['sourceId'], node['cacheType'])
else:
nodeId, sourceId, cacheType = node
self.add_node(nodeId, sourceId, cacheType)
def del_nodes(self):
"""Remove all nodes."""
self._nodes = set()
nodes = [i for i in self.iterables if isinstance(i, RequestNode)]
for node in nodes:
self.xml.remove(node.xml)
self.iterables.remove(node)
def add_field(self, name):
"""
Add a new field element. Each item is required to have a
name.
Arguments:
name -- The name of the field.
"""
if name not in self._fields:
self._fields.add((name))
field = RequestField(parent=self)
field['name'] = name
self.iterables.append(field)
return field
return None
def del_field(self, name):
"""
Remove a single field.
Arguments:
name -- name of field to remove.
"""
if name in self._fields:
fields = [i for i in self.iterables if isinstance(i, RequestField)]
for field in fields:
if field['name'] == name:
self.xml.remove(field.xml)
self.iterables.remove(field)
return True
return False
def get_fields(self):
"""Return all fields."""
fields = set()
for field in self['substanzas']:
if isinstance(field, RequestField):
fields.add(field)
return fields
def set_fields(self, fields):
"""
Set or replace all fields. The given fields must be in a
list or set where each item is RequestField or string
Arguments:
fields -- A series of fields in RequestField or string format.
"""
self.del_fields()
for field in fields:
if isinstance(field, RequestField):
self.add_field(field['name'])
else:
self.add_field(field)
def del_fields(self):
"""Remove all fields."""
self._fields = set()
fields = [i for i in self.iterables if isinstance(i, RequestField)]
for field in fields:
self.xml.remove(field.xml)
self.iterables.remove(field)
class RequestNode(ElementBase):
""" Node element in a request """
namespace = 'urn:xmpp:iot:sensordata'
name = 'node'
plugin_attrib = name
interfaces = set(['nodeId','sourceId','cacheType'])
class RequestField(ElementBase):
""" Field element in a request """
namespace = 'urn:xmpp:iot:sensordata'
name = 'field'
plugin_attrib = name
interfaces = set(['name'])
class Accepted(ElementBase):
namespace = 'http://xmpp.org/iot/sensordata'
namespace = 'urn:xmpp:iot:sensordata'
name = 'accepted'
plugin_attrib = name
interfaces = set(('seqnr'))
interfaces = set(['seqnr','queued'])
class Started(ElementBase):
namespace = 'urn:xmpp:iot:sensordata'
name = 'started'
plugin_attrib = name
interfaces = set(['seqnr'])
class Failure(ElementBase):
namespace = 'http://xmpp.org/iot/sensordata'
namespace = 'urn:xmpp:iot:sensordata'
name = 'failure'
plugin_attrib = name
interfaces = set(('seqnr','done'))
interfaces = set(['seqnr','done'])
class Error(ElementBase):
""" Error element in a request failure """
namespace = 'urn:xmpp:iot:sensordata'
name = 'error'
plugin_attrib = name
interfaces = set(['nodeId','timestamp','sourceId','cacheType','text'])
def get_text(self):
"""Return then contents inside the XML tag."""
return self.xml.text
def set_text(self, value):
"""Set then contents inside the XML tag.
:param value: string
"""
self.xml.text = value;
return self
def del_text(self):
"""Remove the contents inside the XML tag."""
self.xml.text = ""
return self
class Rejected(ElementBase):
namespace = 'urn:xmpp:iot:sensordata'
name = 'rejected'
plugin_attrib = name
interfaces = set(['seqnr','error'])
sub_interfaces = set(['error'])
class Fields(ElementBase):
""" Fields element, top level in a response message with data """
namespace = 'urn:xmpp:iot:sensordata'
name = 'fields'
plugin_attrib = name
interfaces = set(['seqnr','done','nodes'])
def __init__(self, xml=None, parent=None):
ElementBase.__init__(self, xml, parent);
self._nodes = set()
def setup(self, xml=None):
"""
Populate the stanza object using an optional XML object.
Overrides ElementBase.setup
Caches item information.
Arguments:
xml -- Use an existing XML object for the stanza's values.
"""
ElementBase.setup(self, xml)
self._nodes = set([node['nodeId'] for node in self['nodes']])
def add_node(self, nodeId, sourceId=None, cacheType=None, substanzas=None):
"""
Add a new node element. Each item is required to have a
nodeId, but may also specify a sourceId value and cacheType.
register_stanza_plugin(Iq, Sensordata)
register_stanza_plugin(Sensordata, Request)
register_stanza_plugin(Sensordata, Accepted)
register_stanza_plugin(Sensordata, Failure)
# register_stanza_plugin(Pubsub, Default)
# register_stanza_plugin(Pubsub, Items)
# register_stanza_plugin(Pubsub, Options)
# register_stanza_plugin(Pubsub, Publish)
# register_stanza_plugin(Pubsub, PublishOptions)
# register_stanza_plugin(Pubsub, Retract)
# register_stanza_plugin(Pubsub, Subscribe)
# register_stanza_plugin(Pubsub, Subscription)
# register_stanza_plugin(Pubsub, Subscriptions)
# register_stanza_plugin(Pubsub, Unsubscribe)
# register_stanza_plugin(Affiliations, Affiliation, iterable=True)
# register_stanza_plugin(Configure, xep_0004.Form)
# register_stanza_plugin(Items, Item, iterable=True)
# register_stanza_plugin(Publish, Item, iterable=True)
# register_stanza_plugin(Retract, Item)
# register_stanza_plugin(Subscribe, Options)
# register_stanza_plugin(Subscription, SubscribeOptions)
# register_stanza_plugin(Subscriptions, Subscription, iterable=True)
Arguments:
nodeId -- The ID for the node.
sourceId -- [optional] identifying the data source controlling the device
cacheType -- [optional] narrowing down the search to a specific kind of node
"""
if nodeId not in self._nodes:
self._nodes.add((nodeId))
node = FieldsNode(parent=self)
node['nodeId'] = nodeId
node['sourceId'] = sourceId
node['cacheType'] = cacheType
if substanzas is not None:
node.set_timestamps(substanzas)
self.iterables.append(node)
return node
return None
def del_node(self, nodeId):
"""
Remove a single node.
Arguments:
nodeId -- Node ID of the item to remove.
"""
if nodeId in self._nodes:
nodes = [i for i in self.iterables if isinstance(i, FieldsNode)]
for node in nodes:
if node['nodeId'] == nodeId:
self.xml.remove(node.xml)
self.iterables.remove(node)
return True
return False
def get_nodes(self):
"""Return all nodes."""
nodes = set()
for node in self['substanzas']:
if isinstance(node, FieldsNode):
nodes.add(node)
return nodes
def set_nodes(self, nodes):
"""
Set or replace all nodes. The given nodes must be in a
list or set where each item is a tuple of the form:
(nodeId, sourceId, cacheType)
Arguments:
nodes -- A series of nodes in tuple format.
"""
#print(str(id(self)) + " set_nodes: got " + str(nodes))
self.del_nodes()
for node in nodes:
if isinstance(node, FieldsNode):
self.add_node(node['nodeId'], node['sourceId'], node['cacheType'], substanzas=node['substanzas'])
else:
nodeId, sourceId, cacheType = node
self.add_node(nodeId, sourceId, cacheType)
def del_nodes(self):
"""Remove all nodes."""
self._nodes = set()
nodes = [i for i in self.iterables if isinstance(i, FieldsNode)]
for node in nodes:
self.xml.remove(node.xml)
self.iterables.remove(node)
class FieldsNode(ElementBase):
""" Node element in response fields """
namespace = 'urn:xmpp:iot:sensordata'
name = 'node'
plugin_attrib = name
interfaces = set(['nodeId','sourceId','cacheType','timestamps'])
def __init__(self, xml=None, parent=None):
ElementBase.__init__(self, xml, parent);
self._timestamps = set()
def setup(self, xml=None):
"""
Populate the stanza object using an optional XML object.
Overrides ElementBase.setup
Caches item information.
Arguments:
xml -- Use an existing XML object for the stanza's values.
"""
ElementBase.setup(self, xml)
self._timestamps = set([ts['value'] for ts in self['timestamps']])
def add_timestamp(self, timestamp, substanzas=None):
"""
Add a new timestamp element.
Arguments:
timestamp -- The timestamp in ISO format.
"""
#print(str(id(self)) + " add_timestamp: " + str(timestamp))
if timestamp not in self._timestamps:
self._timestamps.add((timestamp))
ts = Timestamp(parent=self)
ts['value'] = timestamp
if not substanzas is None:
ts.set_datas(substanzas);
#print("add_timestamp with substanzas: " + str(substanzas))
self.iterables.append(ts)
#print(str(id(self)) + " added_timestamp: " + str(id(ts)))
return ts
return None
def del_timestamp(self, timestamp):
"""
Remove a single timestamp.
Arguments:
timestamp -- timestamp (in ISO format) of the item to remove.
"""
#print("del_timestamp: ")
if timestamp in self._timestamps:
timestamps = [i for i in self.iterables if isinstance(i, Timestamp)]
for ts in timestamps:
if ts['value'] == timestamp:
self.xml.remove(ts.xml)
self.iterables.remove(ts)
return True
return False
def get_timestamps(self):
"""Return all timestamps."""
#print(str(id(self)) + " get_timestamps: ")
timestamps = set()
for timestamp in self['substanzas']:
if isinstance(timestamp, Timestamp):
timestamps.add(timestamp)
return timestamps
def set_timestamps(self, timestamps):
"""
Set or replace all timestamps. The given timestamps must be in a
list or set where each item is a timestamp
Arguments:
timestamps -- A series of timestamps.
"""
#print(str(id(self)) + " set_timestamps: got " + str(timestamps))
self.del_timestamps()
for timestamp in timestamps:
#print("set_timestamps: subset " + str(timestamp))
#print("set_timestamps: subset.substanzas " + str(timestamp['substanzas']))
if isinstance(timestamp, Timestamp):
self.add_timestamp(timestamp['value'], substanzas=timestamp['substanzas'])
else:
#print("set_timestamps: got " + str(timestamp))
self.add_timestamp(timestamp)
def del_timestamps(self):
"""Remove all timestamps."""
#print(str(id(self)) + " del_timestamps: ")
self._timestamps = set()
timestamps = [i for i in self.iterables if isinstance(i, Timestamp)]
for timestamp in timestamps:
self.xml.remove(timestamp.xml)
self.iterables.remove(timestamp)
class Field(ElementBase):
"""
Field element in response Timestamp. This is a base class,
all instances of fields added to Timestamp must be of types:
DataNumeric
DataString
DataBoolean
DataDateTime
DataTimeSpan
DataEnum
"""
namespace = 'urn:xmpp:iot:sensordata'
name = 'field'
plugin_attrib = name
interfaces = set(['name','module','stringIds']);
interfaces.update(FieldTypes.field_types);
interfaces.update(FieldStatus.field_status);
_flags = set();
_flags.update(FieldTypes.field_types);
_flags.update(FieldStatus.field_status);
def set_stringIds(self, value):
"""Verifies stringIds according to regexp from specification XMPP-0323.
:param value: string
"""
pattern = re.compile("^\d+([|]\w+([.]\w+)*([|][^,]*)?)?(,\d+([|]\w+([.]\w+)*([|][^,]*)?)?)*$")
if pattern.match(value) is not None:
self.xml.stringIds = value;
else:
# Bad content, add nothing
pass
return self
def _get_flags(self):
"""
Helper function for getting of flags. Returns all flags in
dictionary format: { "flag name": "flag value" ... }
"""
flags = {};
for f in self._flags:
if not self[f] == "":
flags[f] = self[f];
return flags;
def _set_flags(self, flags):
"""
Helper function for setting of flags.
Arguments:
flags -- Flags in dictionary format: { "flag name": "flag value" ... }
"""
for f in self._flags:
if flags is not None and f in flags:
self[f] = flags[f];
else:
self[f] = None;
def _get_typename(self):
return "invalid type, use subclasses!";
class Timestamp(ElementBase):
""" Timestamp element in response Node """
namespace = 'urn:xmpp:iot:sensordata'
name = 'timestamp'
plugin_attrib = name
interfaces = set(['value','datas'])
def __init__(self, xml=None, parent=None):
ElementBase.__init__(self, xml, parent);
self._datas = set()
def setup(self, xml=None):
"""
Populate the stanza object using an optional XML object.
Overrides ElementBase.setup
Caches item information.
Arguments:
xml -- Use an existing XML object for the stanza's values.
"""
ElementBase.setup(self, xml)
self._datas = set([data['name'] for data in self['datas']])
def add_data(self, typename, name, value, module=None, stringIds=None, unit=None, dataType=None, flags=None):
"""
Add a new data element.
Arguments:
typename -- The type of data element (numeric, string, boolean, dateTime, timeSpan or enum)
value -- The value of the data element
module -- [optional] language module to use for the data element
stringIds -- [optional] The stringIds used to find associated text in the language module
unit -- [optional] The unit. Only applicable for type numeric
dataType -- [optional] The dataType. Only applicable for type enum
"""
if name not in self._datas:
dataObj = None;
if typename == "numeric":
dataObj = DataNumeric(parent=self);
dataObj['unit'] = unit;
elif typename == "string":
dataObj = DataString(parent=self);
elif typename == "boolean":
dataObj = DataBoolean(parent=self);
elif typename == "dateTime":
dataObj = DataDateTime(parent=self);
elif typename == "timeSpan":
dataObj = DataTimeSpan(parent=self);
elif typename == "enum":
dataObj = DataEnum(parent=self);
dataObj['dataType'] = dataType;
dataObj['name'] = name;
dataObj['value'] = value;
dataObj['module'] = module;
dataObj['stringIds'] = stringIds;
if flags is not None:
dataObj._set_flags(flags);
self._datas.add(name)
self.iterables.append(dataObj)
return dataObj
return None
def del_data(self, name):
"""
Remove a single data element.
Arguments:
data_name -- The data element name to remove.
"""
if name in self._datas:
datas = [i for i in self.iterables if isinstance(i, Field)]
for data in datas:
if data['name'] == name:
self.xml.remove(data.xml)
self.iterables.remove(data)
return True
return False
def get_datas(self):
""" Return all data elements. """
datas = set()
for data in self['substanzas']:
if isinstance(data, Field):
datas.add(data)
return datas
def set_datas(self, datas):
"""
Set or replace all data elements. The given elements must be in a
list or set where each item is a data element (numeric, string, boolean, dateTime, timeSpan or enum)
Arguments:
datas -- A series of data elements.
"""
self.del_datas()
for data in datas:
self.add_data(typename=data._get_typename(), name=data['name'], value=data['value'], module=data['module'], stringIds=data['stringIds'], unit=data['unit'], dataType=data['dataType'], flags=data._get_flags())
def del_datas(self):
"""Remove all data elements."""
self._datas = set()
datas = [i for i in self.iterables if isinstance(i, Field)]
for data in datas:
self.xml.remove(data.xml)
self.iterables.remove(data)
class DataNumeric(Field):
"""
Field data of type numeric.
Note that the value is expressed as a string.
"""
namespace = 'urn:xmpp:iot:sensordata'
name = 'numeric'
plugin_attrib = name
interfaces = set(['value', 'unit']);
interfaces.update(Field.interfaces);
def _get_typename(self):
return "numeric"
class DataString(Field):
"""
Field data of type string
"""
namespace = 'urn:xmpp:iot:sensordata'
name = 'string'
plugin_attrib = name
interfaces = set(['value']);
interfaces.update(Field.interfaces);
def _get_typename(self):
return "string"
class DataBoolean(Field):
"""
Field data of type boolean.
Note that the value is expressed as a string.
"""
namespace = 'urn:xmpp:iot:sensordata'
name = 'boolean'
plugin_attrib = name
interfaces = set(['value']);
interfaces.update(Field.interfaces);
def _get_typename(self):
return "boolean"
class DataDateTime(Field):
"""
Field data of type dateTime.
Note that the value is expressed as a string.
"""
namespace = 'urn:xmpp:iot:sensordata'
name = 'dateTime'
plugin_attrib = name
interfaces = set(['value']);
interfaces.update(Field.interfaces);
def _get_typename(self):
return "dateTime"
class DataTimeSpan(Field):
"""
Field data of type timeSpan.
Note that the value is expressed as a string.
"""
namespace = 'urn:xmpp:iot:sensordata'
name = 'timeSpan'
plugin_attrib = name
interfaces = set(['value']);
interfaces.update(Field.interfaces);
def _get_typename(self):
return "timeSpan"
class DataEnum(Field):
"""
Field data of type enum.
Note that the value is expressed as a string.
"""
namespace = 'urn:xmpp:iot:sensordata'
name = 'enum'
plugin_attrib = name
interfaces = set(['value', 'dataType']);
interfaces.update(Field.interfaces);
def _get_typename(self):
return "enum"
class Done(ElementBase):
""" Done element used to signal that all data has been transferred """
namespace = 'urn:xmpp:iot:sensordata'
name = 'done'
plugin_attrib = name
interfaces = set(['seqnr'])
class Cancel(ElementBase):
""" Cancel element used to signal that a request shall be cancelled """
namespace = 'urn:xmpp:iot:sensordata'
name = 'cancel'
plugin_attrib = name
interfaces = set(['seqnr'])
class Cancelled(ElementBase):
""" Cancelled element used to signal that cancellation is confirmed """
namespace = 'urn:xmpp:iot:sensordata'
name = 'cancelled'
plugin_attrib = name
interfaces = set(['seqnr'])
register_stanza_plugin(Iq, Request)
register_stanza_plugin(Request, RequestNode, iterable=True)
register_stanza_plugin(Request, RequestField, iterable=True)
register_stanza_plugin(Iq, Accepted)
register_stanza_plugin(Message, Failure)
register_stanza_plugin(Failure, Error)
register_stanza_plugin(Iq, Rejected)
register_stanza_plugin(Message, Fields)
register_stanza_plugin(Fields, FieldsNode, iterable=True)
register_stanza_plugin(FieldsNode, Timestamp, iterable=True)
register_stanza_plugin(Timestamp, Field, iterable=True)
register_stanza_plugin(Timestamp, DataNumeric, iterable=True)
register_stanza_plugin(Timestamp, DataString, iterable=True)
register_stanza_plugin(Timestamp, DataBoolean, iterable=True)
register_stanza_plugin(Timestamp, DataDateTime, iterable=True)
register_stanza_plugin(Timestamp, DataTimeSpan, iterable=True)
register_stanza_plugin(Timestamp, DataEnum, iterable=True)
register_stanza_plugin(Message, Started)
register_stanza_plugin(Iq, Cancel)
register_stanza_plugin(Iq, Cancelled)

View File

@ -0,0 +1,64 @@
"""
SleekXMPP: The Sleek XMPP Library
Implementation of xeps for Internet of Things
http://wiki.xmpp.org/web/Tech_pages/IoT_systems
Copyright (C) 2013 Sustainable Innovation, Joachim.lindborg@sust.se, bjorn.westrom@consoden.se
This file is part of SleekXMPP.
See the file LICENSE for copying permission.
"""
from threading import Thread, Event, Timer
import time
def TimerReset(*args, **kwargs):
""" Global function for Timer """
return _TimerReset(*args, **kwargs)
class _TimerReset(Thread):
"""Call a function after a specified number of seconds:
t = TimerReset(30.0, f, args=[], kwargs={})
t.start()
t.cancel() # stop the timer's action if it's still waiting
"""
def __init__(self, interval, function, args=[], kwargs={}):
Thread.__init__(self)
self.interval = interval
self.function = function
self.args = args
self.kwargs = kwargs
self.finished = Event()
self.resetted = True
def cancel(self):
"""Stop the timer if it hasn't finished yet"""
self.finished.set()
def run(self):
#print "Time: %s - timer running..." % time.asctime()
while self.resetted:
#print "Time: %s - timer waiting for timeout in %.2f..." % (time.asctime(), self.interval)
self.resetted = False
self.finished.wait(self.interval)
if not self.finished.isSet():
self.function(*self.args, **self.kwargs)
self.finished.set()
#print "Time: %s - timer finished!" % time.asctime()
def reset(self, interval=None):
""" Reset the timer """
if interval:
#print "Time: %s - timer resetting to %.2f..." % (time.asctime(), interval)
self.interval = interval
else:
#print "Time: %s - timer resetting..." % time.asctime()
pass
self.resetted = True
self.finished.set()
self.finished.clear()

View File

@ -0,0 +1,18 @@
"""
SleekXMPP: The Sleek XMPP Library
Implementation of xeps for Internet of Things
http://wiki.xmpp.org/web/Tech_pages/IoT_systems
Copyright (C) 2013 Sustainable Innovation, Joachim.lindborg@sust.se, bjorn.westrom@consoden.se
This file is part of SleekXMPP.
See the file LICENSE for copying permission.
"""
from sleekxmpp.plugins.base import register_plugin
from sleekxmpp.plugins.xep_0325.control import XEP_0325
from sleekxmpp.plugins.xep_0325 import stanza
register_plugin(XEP_0325)
xep_0325=XEP_0325

View File

@ -0,0 +1,574 @@
"""
SleekXMPP: The Sleek XMPP Library
Implementation of xeps for Internet of Things
http://wiki.xmpp.org/web/Tech_pages/IoT_systems
Copyright (C) 2013 Sustainable Innovation, Joachim.lindborg@sust.se, bjorn.westrom@consoden.se
This file is part of SleekXMPP.
See the file LICENSE for copying permission.
"""
import logging
import time
from threading import Thread, Timer, Lock
from sleekxmpp.xmlstream import JID
from sleekxmpp.xmlstream.handler import Callback
from sleekxmpp.xmlstream.matcher import StanzaPath
from sleekxmpp.plugins.base import BasePlugin
from sleekxmpp.plugins.xep_0325 import stanza
from sleekxmpp.plugins.xep_0325.stanza import Control
log = logging.getLogger(__name__)
class XEP_0325(BasePlugin):
"""
XEP-0325: IoT Control
Actuators are devices in sensor networks that can be controlled through
the network and act with the outside world. In sensor networks and
Internet of Things applications, actuators make it possible to automate
real-world processes.
This plugin implements a mechanism whereby actuators can be controlled
in XMPP-based sensor networks, making it possible to integrate sensors
and actuators of different brands, makes and models into larger
Internet of Things applications.
Also see <http://xmpp.org/extensions/xep-0325.html>
Configuration Values:
threaded -- Indicates if communication with sensors should be threaded.
Defaults to True.
Events:
Sensor side
-----------
Control Event:DirectSet -- Received a control message
Control Event:SetReq -- Received a control request
Client side
-----------
Control Event:SetResponse -- Received a response to a
control request, type result
Control Event:SetResponseError -- Received a response to a
control request, type error
Attributes:
threaded -- Indicates if command events should be threaded.
Defaults to True.
sessions -- A dictionary or equivalent backend mapping
session IDs to dictionaries containing data
relevant to a request's session. This dictionary is used
both by the client and sensor side. On client side, seqnr
is used as key, while on sensor side, a session_id is used
as key. This ensures that the two will not collide, so
one instance can be both client and sensor.
Sensor side
-----------
nodes -- A dictionary mapping sensor nodes that are serviced through
this XMPP instance to their device handlers ("drivers").
Client side
-----------
last_seqnr -- The last used sequence number (integer). One sequence of
communication (e.g. -->request, <--accept, <--fields)
between client and sensor is identified by a unique
sequence number (unique between the client/sensor pair)
Methods:
plugin_init -- Overrides base_plugin.plugin_init
post_init -- Overrides base_plugin.post_init
plugin_end -- Overrides base_plugin.plugin_end
Sensor side
-----------
register_node -- Register a sensor as available from this XMPP
instance.
Client side
-----------
set_request -- Initiates a control request to modify data in
sensor(s). Non-blocking, a callback function will
be called when the sensor has responded.
set_command -- Initiates a control command to modify data in
sensor(s). Non-blocking. The sensor(s) will not
respond regardless of the result of the command,
so no callback is made.
"""
name = 'xep_0325'
description = 'XEP-0325 Internet of Things - Control'
dependencies = set(['xep_0030'])
stanza = stanza
default_config = {
'threaded': True
# 'session_db': None
}
def plugin_init(self):
""" Start the XEP-0325 plugin """
self.xmpp.register_handler(
Callback('Control Event:DirectSet',
StanzaPath('message/set'),
self._handle_direct_set))
self.xmpp.register_handler(
Callback('Control Event:SetReq',
StanzaPath('iq@type=set/set'),
self._handle_set_req))
self.xmpp.register_handler(
Callback('Control Event:SetResponse',
StanzaPath('iq@type=result/setResponse'),
self._handle_set_response))
self.xmpp.register_handler(
Callback('Control Event:SetResponseError',
StanzaPath('iq@type=error/setResponse'),
self._handle_set_response))
# Server side dicts
self.nodes = {};
self.sessions = {};
self.last_seqnr = 0;
self.seqnr_lock = Lock();
## For testning only
self.test_authenticated_from = ""
def post_init(self):
""" Init complete. Register our features in Serivce discovery. """
BasePlugin.post_init(self)
self.xmpp['xep_0030'].add_feature(Control.namespace)
self.xmpp['xep_0030'].set_items(node=Control.namespace, items=tuple())
def _new_session(self):
""" Return a new session ID. """
return str(time.time()) + '-' + self.xmpp.new_id()
def plugin_end(self):
""" Stop the XEP-0325 plugin """
self.sessions.clear();
self.xmpp.remove_handler('Control Event:DirectSet')
self.xmpp.remove_handler('Control Event:SetReq')
self.xmpp.remove_handler('Control Event:SetResponse')
self.xmpp.remove_handler('Control Event:SetResponseError')
self.xmpp['xep_0030'].del_feature(feature=Control.namespace)
self.xmpp['xep_0030'].set_items(node=Control.namespace, items=tuple());
# =================================================================
# Sensor side (data provider) API
def register_node(self, nodeId, device, commTimeout, sourceId=None, cacheType=None):
"""
Register a sensor/device as available for control requests/commands
through this XMPP instance.
The device object may by any custom implementation to support
specific devices, but it must implement the functions:
has_control_field
set_control_fields
according to the interfaces shown in the example device.py file.
Arguments:
nodeId -- The identifier for the device
device -- The device object
commTimeout -- Time in seconds to wait between each callback from device during
a data readout. Float.
sourceId -- [optional] identifying the data source controlling the device
cacheType -- [optional] narrowing down the search to a specific kind of node
"""
self.nodes[nodeId] = {"device": device,
"commTimeout": commTimeout,
"sourceId": sourceId,
"cacheType": cacheType};
def _set_authenticated(self, auth=''):
""" Internal testing function """
self.test_authenticated_from = auth;
def _get_new_seqnr(self):
""" Returns a unique sequence number (unique across threads) """
self.seqnr_lock.acquire();
self.last_seqnr = self.last_seqnr + 1;
self.seqnr_lock.release();
return str(self.last_seqnr);
def _handle_set_req(self, iq):
"""
Event handler for reception of an Iq with set req - this is a
control request.
Verifies that
- all the requested nodes are available
(if no nodes are specified in the request, assume all nodes)
- all the control fields are available from all requested nodes
(if no nodes are specified in the request, assume all nodes)
If the request passes verification, the control request is passed
to the devices (in a separate thread).
If the verification fails, a setResponse with error indication
is sent.
"""
error_msg = '';
req_ok = True;
missing_node = None;
missing_field = None;
# Authentication
if len(self.test_authenticated_from) > 0 and not iq['from'] == self.test_authenticated_from:
# Invalid authentication
req_ok = False;
error_msg = "Access denied";
# Nodes
process_nodes = [];
if len(iq['set']['nodes']) > 0:
for n in iq['set']['nodes']:
if not n['nodeId'] in self.nodes:
req_ok = False;
missing_node = n['nodeId'];
error_msg = "Invalid nodeId " + n['nodeId'];
process_nodes = [n['nodeId'] for n in iq['set']['nodes']];
else:
process_nodes = self.nodes.keys();
# Fields - for control we need to find all in all devices, otherwise we reject
process_fields = [];
if len(iq['set']['datas']) > 0:
for f in iq['set']['datas']:
for node in self.nodes:
if not self.nodes[node]["device"].has_control_field(f['name'], f._get_typename()):
req_ok = False;
missing_field = f['name'];
error_msg = "Invalid field " + f['name'];
break;
process_fields = [(f['name'], f._get_typename(), f['value']) for f in iq['set']['datas']];
if req_ok:
session = self._new_session();
self.sessions[session] = {"from": iq['from'], "to": iq['to'], "seqnr": iq['id']};
self.sessions[session]["commTimers"] = {};
self.sessions[session]["nodeDone"] = {};
# Flag that a reply is exected when we are done
self.sessions[session]["reply"] = True;
self.sessions[session]["node_list"] = process_nodes;
if self.threaded:
#print("starting thread")
tr_req = Thread(target=self._threaded_node_request, args=(session, process_fields))
tr_req.start()
#print("started thread")
else:
self._threaded_node_request(session, process_fields);
else:
iq.reply();
iq['type'] = 'error';
iq['setResponse']['responseCode'] = "NotFound";
if missing_node is not None:
iq['setResponse'].add_node(missing_node);
if missing_field is not None:
iq['setResponse'].add_data(missing_field);
iq['setResponse']['error']['var'] = "Output";
iq['setResponse']['error']['text'] = error_msg;
iq.send(block=False);
def _handle_direct_set(self, msg):
"""
Event handler for reception of a Message with set command - this is a
direct control command.
Verifies that
- all the requested nodes are available
(if no nodes are specified in the request, assume all nodes)
- all the control fields are available from all requested nodes
(if no nodes are specified in the request, assume all nodes)
If the request passes verification, the control request is passed
to the devices (in a separate thread).
If the verification fails, do nothing.
"""
req_ok = True;
# Nodes
process_nodes = [];
if len(msg['set']['nodes']) > 0:
for n in msg['set']['nodes']:
if not n['nodeId'] in self.nodes:
req_ok = False;
error_msg = "Invalid nodeId " + n['nodeId'];
process_nodes = [n['nodeId'] for n in msg['set']['nodes']];
else:
process_nodes = self.nodes.keys();
# Fields - for control we need to find all in all devices, otherwise we reject
process_fields = [];
if len(msg['set']['datas']) > 0:
for f in msg['set']['datas']:
for node in self.nodes:
if not self.nodes[node]["device"].has_control_field(f['name'], f._get_typename()):
req_ok = False;
missing_field = f['name'];
error_msg = "Invalid field " + f['name'];
break;
process_fields = [(f['name'], f._get_typename(), f['value']) for f in msg['set']['datas']];
if req_ok:
session = self._new_session();
self.sessions[session] = {"from": msg['from'], "to": msg['to']};
self.sessions[session]["commTimers"] = {};
self.sessions[session]["nodeDone"] = {};
self.sessions[session]["reply"] = False;
self.sessions[session]["node_list"] = process_nodes;
if self.threaded:
#print("starting thread")
tr_req = Thread(target=self._threaded_node_request, args=(session, process_fields))
tr_req.start()
#print("started thread")
else:
self._threaded_node_request(session, process_fields);
def _threaded_node_request(self, session, process_fields):
"""
Helper function to handle the device control in a separate thread.
Arguments:
session -- The request session id
process_fields -- The fields to set in the devices. List of tuple format:
(name, datatype, value)
"""
for node in self.sessions[session]["node_list"]:
self.sessions[session]["nodeDone"][node] = False;
for node in self.sessions[session]["node_list"]:
timer = Timer(self.nodes[node]['commTimeout'], self._event_comm_timeout, args=(session, node));
self.sessions[session]["commTimers"][node] = timer;
timer.start();
self.nodes[node]['device'].set_control_fields(process_fields, session=session, callback=self._device_set_command_callback);
def _event_comm_timeout(self, session, nodeId):
"""
Triggered if any of the control operations timeout.
Stop communicating with the failing device.
If the control command was an Iq request, sends a failure
message back to the client.
Arguments:
session -- The request session id
nodeId -- The id of the device which timed out
"""
if self.sessions[session]["reply"]:
# Reply is exected when we are done
iq = self.xmpp.Iq();
iq['from'] = self.sessions[session]['to'];
iq['to'] = self.sessions[session]['from'];
iq['type'] = "error";
iq['id'] = self.sessions[session]['seqnr'];
iq['setResponse']['responseCode'] = "OtherError";
iq['setResponse'].add_node(nodeId);
iq['setResponse']['error']['var'] = "Output";
iq['setResponse']['error']['text'] = "Timeout.";
iq.send(block=False);
## TODO - should we send one timeout per node??
# Drop communication with this device and check if we are done
self.sessions[session]["nodeDone"][nodeId] = True;
if (self._all_nodes_done(session)):
# The session is complete, delete it
del self.sessions[session];
def _all_nodes_done(self, session):
"""
Checks wheter all devices are done replying to the control command.
Arguments:
session -- The request session id
"""
for n in self.sessions[session]["nodeDone"]:
if not self.sessions[session]["nodeDone"][n]:
return False;
return True;
def _device_set_command_callback(self, session, nodeId, result, error_field=None, error_msg=None):
"""
Callback function called by the devices when the control command is
complete or failed.
If needed, composes a message with the result and sends it back to the
client.
Arguments:
session -- The request session id
nodeId -- The device id which initiated the callback
result -- The current result status of the control command. Valid values are:
"error" - Set fields failed.
"ok" - All fields were set.
error_field -- [optional] Only applies when result == "error"
The field name that failed (usually means it is missing)
error_msg -- [optional] Only applies when result == "error".
Error details when a request failed.
"""
if not session in self.sessions:
# This can happend if a session was deleted, like in a timeout. Just drop the data.
return
if result == "error":
self.sessions[session]["commTimers"][nodeId].cancel();
if self.sessions[session]["reply"]:
# Reply is exected when we are done
iq = self.xmpp.Iq();
iq['from'] = self.sessions[session]['to'];
iq['to'] = self.sessions[session]['from'];
iq['type'] = "error";
iq['id'] = self.sessions[session]['seqnr'];
iq['setResponse']['responseCode'] = "OtherError";
iq['setResponse'].add_node(nodeId);
if error_field is not None:
iq['setResponse'].add_data(error_field);
iq['setResponse']['error']['var'] = error_field;
iq['setResponse']['error']['text'] = error_msg;
iq.send(block=False);
# Drop communication with this device and check if we are done
self.sessions[session]["nodeDone"][nodeId] = True;
if (self._all_nodes_done(session)):
# The session is complete, delete it
del self.sessions[session];
else:
self.sessions[session]["commTimers"][nodeId].cancel();
self.sessions[session]["nodeDone"][nodeId] = True;
if (self._all_nodes_done(session)):
if self.sessions[session]["reply"]:
# Reply is exected when we are done
iq = self.xmpp.Iq();
iq['from'] = self.sessions[session]['to'];
iq['to'] = self.sessions[session]['from'];
iq['type'] = "result";
iq['id'] = self.sessions[session]['seqnr'];
iq['setResponse']['responseCode'] = "OK";
iq.send(block=False);
# The session is complete, delete it
del self.sessions[session];
# =================================================================
# Client side (data controller) API
def set_request(self, from_jid, to_jid, callback, fields, nodeIds=None):
"""
Called on the client side to initiade a control request.
Composes a message with the request and sends it to the device(s).
Does not block, the callback will be called when the device(s)
has responded.
Arguments:
from_jid -- The jid of the requester
to_jid -- The jid of the device(s)
callback -- The callback function to call when data is availble.
The callback function must support the following arguments:
from_jid -- The jid of the responding device(s)
result -- The result of the control request. Valid values are:
"OK" - Control request completed successfully
"NotFound" - One or more nodes or fields are missing
"InsufficientPrivileges" - Not authorized.
"Locked" - Field(s) is locked and cannot
be changed at the moment.
"NotImplemented" - Request feature not implemented.
"FormError" - Error while setting with
a form (not implemented).
"OtherError" - Indicates other types of
errors, such as timeout.
Details in the error_msg.
nodeId -- [optional] Only applicable when result == "error"
List of node Ids of failing device(s).
fields -- [optional] Only applicable when result == "error"
List of fields that failed.[optional] Mandatory when result == "rejected" or "failure".
error_msg -- Details about why the request failed.
fields -- Fields to set. List of tuple format: (name, typename, value).
nodeIds -- [optional] Limits the request to the node Ids in this list.
"""
iq = self.xmpp.Iq();
iq['from'] = from_jid;
iq['to'] = to_jid;
seqnr = self._get_new_seqnr();
iq['id'] = seqnr;
iq['type'] = "set";
if nodeIds is not None:
for nodeId in nodeIds:
iq['set'].add_node(nodeId);
if fields is not None:
for name, typename, value in fields:
iq['set'].add_data(name=name, typename=typename, value=value);
self.sessions[seqnr] = {"from": iq['from'], "to": iq['to'], "callback": callback};
iq.send(block=False);
def set_command(self, from_jid, to_jid, fields, nodeIds=None):
"""
Called on the client side to initiade a control command.
Composes a message with the set commandand sends it to the device(s).
Does not block. Device(s) will not respond, regardless of result.
Arguments:
from_jid -- The jid of the requester
to_jid -- The jid of the device(s)
fields -- Fields to set. List of tuple format: (name, typename, value).
nodeIds -- [optional] Limits the request to the node Ids in this list.
"""
msg = self.xmpp.Message();
msg['from'] = from_jid;
msg['to'] = to_jid;
msg['type'] = "set";
if nodeIds is not None:
for nodeId in nodeIds:
msg['set'].add_node(nodeId);
if fields is not None:
for name, typename, value in fields:
msg['set'].add_data(name, typename, value);
# We won't get any reply, so don't create a session
msg.send();
def _handle_set_response(self, iq):
""" Received response from device(s) """
#print("ooh")
seqnr = iq['id'];
from_jid = str(iq['from']);
result = iq['setResponse']['responseCode'];
nodeIds = [n['name'] for n in iq['setResponse']['nodes']];
fields = [f['name'] for f in iq['setResponse']['datas']];
error_msg = None;
if not iq['setResponse'].find('error') is None and not iq['setResponse']['error']['text'] == "":
error_msg = iq['setResponse']['error']['text'];
callback = self.sessions[seqnr]["callback"];
callback(from_jid=from_jid, result=result, nodeIds=nodeIds, fields=fields, error_msg=error_msg);

View File

@ -0,0 +1,125 @@
"""
SleekXMPP: The Sleek XMPP Library
Implementation of xeps for Internet of Things
http://wiki.xmpp.org/web/Tech_pages/IoT_systems
Copyright (C) 2013 Sustainable Innovation, Joachim.lindborg@sust.se, bjorn.westrom@consoden.se
This file is part of SleekXMPP.
See the file LICENSE for copying permission.
"""
import datetime
class Device(object):
"""
Example implementation of a device control object.
The device object may by any custom implementation to support
specific devices, but it must implement the functions:
has_control_field
set_control_fields
"""
def __init__(self, nodeId):
self.nodeId = nodeId;
self.control_fields = {};
def has_control_field(self, field, typename):
"""
Returns true if the supplied field name exists
and the type matches for control in this device.
Arguments:
field -- The field name
typename -- The expected type
"""
if field in self.control_fields and self.control_fields[field]["type"] == typename:
return True;
return False;
def set_control_fields(self, fields, session, callback):
"""
Starts a control setting procedure. Verifies the fields,
sets the data and (if needed) and calls the callback.
Arguments:
fields -- List of control fields in tuple format:
(name, typename, value)
session -- Session id, only used in the callback as identifier
callback -- Callback function to call when control set is complete.
The callback function must support the following arguments:
session -- Session id, as supplied in the
request_fields call
nodeId -- Identifier for this device
result -- The current result status of the readout.
Valid values are:
"error" - Set fields failed.
"ok" - All fields were set.
error_field -- [optional] Only applies when result == "error"
The field name that failed
(usually means it is missing)
error_msg -- [optional] Only applies when result == "error".
Error details when a request failed.
"""
if len(fields) > 0:
# Check availiability
for name, typename, value in fields:
if not self.has_control_field(name, typename):
self._send_control_reject(session, name, "NotFound", callback)
return False;
for name, typename, value in fields:
self._set_field_value(name, value)
callback(session, result="ok", nodeId=self.nodeId);
return True
def _send_control_reject(self, session, field, message, callback):
"""
Sends a reject to the caller
Arguments:
session -- Session id, see definition in
set_control_fields function
callback -- Callback function, see definition in
set_control_fields function
"""
callback(session, result="error", nodeId=self.nodeId, error_field=field, error_msg=message);
def _add_control_field(self, name, typename, value):
"""
Adds a control field to the device
Arguments:
name -- Name of the field
typename -- Type of the field, one of:
(boolean, color, string, date, dateTime,
double, duration, int, long, time)
value -- Field value
"""
self.control_fields[name] = {"type": typename, "value": value};
def _set_field_value(self, name, value):
"""
Set the value of a control field
Arguments:
name -- Name of the field
value -- New value for the field
"""
if name in self.control_fields:
self.control_fields[name]["value"] = value;
def _get_field_value(self, name):
"""
Get the value of a control field. Only used for unit testing.
Arguments:
name -- Name of the field
"""
if name in self.control_fields:
return self.control_fields[name]["value"];
return None;

View File

@ -0,0 +1,12 @@
"""
SleekXMPP: The Sleek XMPP Library
Implementation of xeps for Internet of Things
http://wiki.xmpp.org/web/Tech_pages/IoT_systems
Copyright (C) 2013 Sustainable Innovation, Joachim.lindborg@sust.se, bjorn.westrom@consoden.se
This file is part of SleekXMPP.
See the file LICENSE for copying permission.
"""
from sleekxmpp.plugins.xep_0325.stanza.control import *

View File

@ -0,0 +1,13 @@
"""
SleekXMPP: The Sleek XMPP Library
Implementation of xeps for Internet of Things
http://wiki.xmpp.org/web/Tech_pages/IoT_systems
Copyright (C) 2013 Sustainable Innovation, Joachim.lindborg@sust.se, bjorn.westrom@consoden.se
This file is part of SleekXMPP.
See the file LICENSE for copying permission.
"""
from sleekxmpp.xmlstream import ET
pass

View File

@ -0,0 +1,526 @@
"""
SleekXMPP: The Sleek XMPP Library
Implementation of xeps for Internet of Things
http://wiki.xmpp.org/web/Tech_pages/IoT_systems
Copyright (C) 2013 Sustainable Innovation, Joachim.lindborg@sust.se, bjorn.westrom@consoden.se
This file is part of SleekXMPP.
See the file LICENSE for copying permission.
"""
from sleekxmpp import Iq, Message
from sleekxmpp.xmlstream import register_stanza_plugin, ElementBase, ET, JID
from re import match
class Control(ElementBase):
""" Placeholder for the namespace, not used as a stanza """
namespace = 'urn:xmpp:iot:control'
name = 'control'
plugin_attrib = name
interfaces = set(tuple())
class ControlSet(ElementBase):
namespace = 'urn:xmpp:iot:control'
name = 'set'
plugin_attrib = name
interfaces = set(['nodes','datas'])
def __init__(self, xml=None, parent=None):
ElementBase.__init__(self, xml, parent);
self._nodes = set()
self._datas = set()
def setup(self, xml=None):
"""
Populate the stanza object using an optional XML object.
Overrides ElementBase.setup
Caches item information.
Arguments:
xml -- Use an existing XML object for the stanza's values.
"""
ElementBase.setup(self, xml)
self._nodes = set([node['nodeId'] for node in self['nodes']])
self._datas = set([data['name'] for data in self['datas']])
def add_node(self, nodeId, sourceId=None, cacheType=None):
"""
Add a new node element. Each item is required to have a
nodeId, but may also specify a sourceId value and cacheType.
Arguments:
nodeId -- The ID for the node.
sourceId -- [optional] identifying the data source controlling the device
cacheType -- [optional] narrowing down the search to a specific kind of node
"""
if nodeId not in self._nodes:
self._nodes.add((nodeId))
node = RequestNode(parent=self)
node['nodeId'] = nodeId
node['sourceId'] = sourceId
node['cacheType'] = cacheType
self.iterables.append(node)
return node
return None
def del_node(self, nodeId):
"""
Remove a single node.
Arguments:
nodeId -- Node ID of the item to remove.
"""
if nodeId in self._nodes:
nodes = [i for i in self.iterables if isinstance(i, RequestNode)]
for node in nodes:
if node['nodeId'] == nodeId:
self.xml.remove(node.xml)
self.iterables.remove(node)
return True
return False
def get_nodes(self):
"""Return all nodes."""
nodes = set()
for node in self['substanzas']:
if isinstance(node, RequestNode):
nodes.add(node)
return nodes
def set_nodes(self, nodes):
"""
Set or replace all nodes. The given nodes must be in a
list or set where each item is a tuple of the form:
(nodeId, sourceId, cacheType)
Arguments:
nodes -- A series of nodes in tuple format.
"""
self.del_nodes()
for node in nodes:
if isinstance(node, RequestNode):
self.add_node(node['nodeId'], node['sourceId'], node['cacheType'])
else:
nodeId, sourceId, cacheType = node
self.add_node(nodeId, sourceId, cacheType)
def del_nodes(self):
"""Remove all nodes."""
self._nodes = set()
nodes = [i for i in self.iterables if isinstance(i, RequestNode)]
for node in nodes:
self.xml.remove(node.xml)
self.iterables.remove(node)
def add_data(self, name, typename, value):
"""
Add a new data element.
Arguments:
name -- The name of the data element
typename -- The type of data element
(boolean, color, string, date, dateTime,
double, duration, int, long, time)
value -- The value of the data element
"""
if name not in self._datas:
dataObj = None;
if typename == "boolean":
dataObj = BooleanParameter(parent=self);
elif typename == "color":
dataObj = ColorParameter(parent=self);
elif typename == "string":
dataObj = StringParameter(parent=self);
elif typename == "date":
dataObj = DateParameter(parent=self);
elif typename == "dateTime":
dataObj = DateTimeParameter(parent=self);
elif typename == "double":
dataObj = DoubleParameter(parent=self);
elif typename == "duration":
dataObj = DurationParameter(parent=self);
elif typename == "int":
dataObj = IntParameter(parent=self);
elif typename == "long":
dataObj = LongParameter(parent=self);
elif typename == "time":
dataObj = TimeParameter(parent=self);
dataObj['name'] = name;
dataObj['value'] = value;
self._datas.add(name)
self.iterables.append(dataObj)
return dataObj
return None
def del_data(self, name):
"""
Remove a single data element.
Arguments:
data_name -- The data element name to remove.
"""
if name in self._datas:
datas = [i for i in self.iterables if isinstance(i, BaseParameter)]
for data in datas:
if data['name'] == name:
self.xml.remove(data.xml)
self.iterables.remove(data)
return True
return False
def get_datas(self):
""" Return all data elements. """
datas = set()
for data in self['substanzas']:
if isinstance(data, BaseParameter):
datas.add(data)
return datas
def set_datas(self, datas):
"""
Set or replace all data elements. The given elements must be in a
list or set where each item is a data element (numeric, string, boolean, dateTime, timeSpan or enum)
Arguments:
datas -- A series of data elements.
"""
self.del_datas()
for data in datas:
self.add_data(name=data['name'], typename=data._get_typename(), value=data['value'])
def del_datas(self):
"""Remove all data elements."""
self._datas = set()
datas = [i for i in self.iterables if isinstance(i, BaseParameter)]
for data in datas:
self.xml.remove(data.xml)
self.iterables.remove(data)
class RequestNode(ElementBase):
""" Node element in a request """
namespace = 'urn:xmpp:iot:control'
name = 'node'
plugin_attrib = name
interfaces = set(['nodeId','sourceId','cacheType'])
class ControlSetResponse(ElementBase):
namespace = 'urn:xmpp:iot:control'
name = 'setResponse'
plugin_attrib = name
interfaces = set(['responseCode'])
def __init__(self, xml=None, parent=None):
ElementBase.__init__(self, xml, parent);
self._nodes = set()
self._datas = set()
def setup(self, xml=None):
"""
Populate the stanza object using an optional XML object.
Overrides ElementBase.setup
Caches item information.
Arguments:
xml -- Use an existing XML object for the stanza's values.
"""
ElementBase.setup(self, xml)
self._nodes = set([node['nodeId'] for node in self['nodes']])
self._datas = set([data['name'] for data in self['datas']])
def add_node(self, nodeId, sourceId=None, cacheType=None):
"""
Add a new node element. Each item is required to have a
nodeId, but may also specify a sourceId value and cacheType.
Arguments:
nodeId -- The ID for the node.
sourceId -- [optional] identifying the data source controlling the device
cacheType -- [optional] narrowing down the search to a specific kind of node
"""
if nodeId not in self._nodes:
self._nodes.add(nodeId)
node = RequestNode(parent=self)
node['nodeId'] = nodeId
node['sourceId'] = sourceId
node['cacheType'] = cacheType
self.iterables.append(node)
return node
return None
def del_node(self, nodeId):
"""
Remove a single node.
Arguments:
nodeId -- Node ID of the item to remove.
"""
if nodeId in self._nodes:
nodes = [i for i in self.iterables if isinstance(i, RequestNode)]
for node in nodes:
if node['nodeId'] == nodeId:
self.xml.remove(node.xml)
self.iterables.remove(node)
return True
return False
def get_nodes(self):
"""Return all nodes."""
nodes = set()
for node in self['substanzas']:
if isinstance(node, RequestNode):
nodes.add(node)
return nodes
def set_nodes(self, nodes):
"""
Set or replace all nodes. The given nodes must be in a
list or set where each item is a tuple of the form:
(nodeId, sourceId, cacheType)
Arguments:
nodes -- A series of nodes in tuple format.
"""
self.del_nodes()
for node in nodes:
if isinstance(node, RequestNode):
self.add_node(node['nodeId'], node['sourceId'], node['cacheType'])
else:
nodeId, sourceId, cacheType = node
self.add_node(nodeId, sourceId, cacheType)
def del_nodes(self):
"""Remove all nodes."""
self._nodes = set()
nodes = [i for i in self.iterables if isinstance(i, RequestNode)]
for node in nodes:
self.xml.remove(node.xml)
self.iterables.remove(node)
def add_data(self, name):
"""
Add a new ResponseParameter element.
Arguments:
name -- Name of the parameter
"""
if name not in self._datas:
self._datas.add(name)
data = ResponseParameter(parent=self)
data['name'] = name;
self.iterables.append(data)
return data
return None
def del_data(self, name):
"""
Remove a single ResponseParameter element.
Arguments:
name -- The data element name to remove.
"""
if name in self._datas:
datas = [i for i in self.iterables if isinstance(i, ResponseParameter)]
for data in datas:
if data['name'] == name:
self.xml.remove(data.xml)
self.iterables.remove(data)
return True
return False
def get_datas(self):
""" Return all ResponseParameter elements. """
datas = set()
for data in self['substanzas']:
if isinstance(data, ResponseParameter):
datas.add(data)
return datas
def set_datas(self, datas):
"""
Set or replace all data elements. The given elements must be in a
list or set of ResponseParameter elements
Arguments:
datas -- A series of data element names.
"""
self.del_datas()
for data in datas:
self.add_data(name=data['name'])
def del_datas(self):
"""Remove all ResponseParameter elements."""
self._datas = set()
datas = [i for i in self.iterables if isinstance(i, ResponseParameter)]
for data in datas:
self.xml.remove(data.xml)
self.iterables.remove(data)
class Error(ElementBase):
namespace = 'urn:xmpp:iot:control'
name = 'error'
plugin_attrib = name
interfaces = set(['var','text'])
def get_text(self):
"""Return then contents inside the XML tag."""
return self.xml.text
def set_text(self, value):
"""Set then contents inside the XML tag.
Arguments:
value -- string
"""
self.xml.text = value;
return self
def del_text(self):
"""Remove the contents inside the XML tag."""
self.xml.text = ""
return self
class ResponseParameter(ElementBase):
"""
Parameter element in ControlSetResponse.
"""
namespace = 'urn:xmpp:iot:control'
name = 'parameter'
plugin_attrib = name
interfaces = set(['name']);
class BaseParameter(ElementBase):
"""
Parameter element in SetCommand. This is a base class,
all instances of parameters added to SetCommand must be of types:
BooleanParameter
ColorParameter
StringParameter
DateParameter
DateTimeParameter
DoubleParameter
DurationParameter
IntParameter
LongParameter
TimeParameter
"""
namespace = 'urn:xmpp:iot:control'
name = 'baseParameter'
plugin_attrib = name
interfaces = set(['name','value']);
def _get_typename(self):
return self.name;
class BooleanParameter(BaseParameter):
"""
Field data of type boolean.
Note that the value is expressed as a string.
"""
name = 'boolean'
plugin_attrib = name
class ColorParameter(BaseParameter):
"""
Field data of type color.
Note that the value is expressed as a string.
"""
name = 'color'
plugin_attrib = name
class StringParameter(BaseParameter):
"""
Field data of type string.
"""
name = 'string'
plugin_attrib = name
class DateParameter(BaseParameter):
"""
Field data of type date.
Note that the value is expressed as a string.
"""
name = 'date'
plugin_attrib = name
class DateTimeParameter(BaseParameter):
"""
Field data of type dateTime.
Note that the value is expressed as a string.
"""
name = 'dateTime'
plugin_attrib = name
class DoubleParameter(BaseParameter):
"""
Field data of type double.
Note that the value is expressed as a string.
"""
name = 'double'
plugin_attrib = name
class DurationParameter(BaseParameter):
"""
Field data of type duration.
Note that the value is expressed as a string.
"""
name = 'duration'
plugin_attrib = name
class IntParameter(BaseParameter):
"""
Field data of type int.
Note that the value is expressed as a string.
"""
name = 'int'
plugin_attrib = name
class LongParameter(BaseParameter):
"""
Field data of type long (64-bit int).
Note that the value is expressed as a string.
"""
name = 'long'
plugin_attrib = name
class TimeParameter(BaseParameter):
"""
Field data of type time.
Note that the value is expressed as a string.
"""
name = 'time'
plugin_attrib = name
register_stanza_plugin(Iq, ControlSet)
register_stanza_plugin(Message, ControlSet)
register_stanza_plugin(ControlSet, RequestNode, iterable=True)
register_stanza_plugin(ControlSet, BooleanParameter, iterable=True)
register_stanza_plugin(ControlSet, ColorParameter, iterable=True)
register_stanza_plugin(ControlSet, StringParameter, iterable=True)
register_stanza_plugin(ControlSet, DateParameter, iterable=True)
register_stanza_plugin(ControlSet, DateTimeParameter, iterable=True)
register_stanza_plugin(ControlSet, DoubleParameter, iterable=True)
register_stanza_plugin(ControlSet, DurationParameter, iterable=True)
register_stanza_plugin(ControlSet, IntParameter, iterable=True)
register_stanza_plugin(ControlSet, LongParameter, iterable=True)
register_stanza_plugin(ControlSet, TimeParameter, iterable=True)
register_stanza_plugin(Iq, ControlSetResponse)
register_stanza_plugin(ControlSetResponse, Error)
register_stanza_plugin(ControlSetResponse, RequestNode, iterable=True)
register_stanza_plugin(ControlSetResponse, ResponseParameter, iterable=True)

View File

@ -1,15 +1,24 @@
# -*- coding: utf-8 -*-
from sleekxmpp.test import *
import sleekxmpp.plugins.xep_0323 as xep_0323
namespace='sn'
class TestChatStates(SleekTest):
class TestSensorDataStanzas(SleekTest):
def setUp(self):
register_stanza_plugin(Message, xep_0323.stanza.Request)
register_stanza_plugin(Message, xep_0323.stanza.Accepted)
register_stanza_plugin(Message, xep_0323.stanza.Failure)
pass
#register_stanza_plugin(Iq, xep_0323.stanza.Request)
#register_stanza_plugin(Iq, xep_0323.stanza.Accepted)
#register_stanza_plugin(Message, xep_0323.stanza.Failure)
#register_stanza_plugin(xep_0323.stanza.Failure, xep_0323.stanza.Error)
#register_stanza_plugin(Iq, xep_0323.stanza.Rejected)
#register_stanza_plugin(Message, xep_0323.stanza.Fields)
#register_stanza_plugin(Message, xep_0323.stanza.Request)
#register_stanza_plugin(Message, xep_0323.stanza.Accepted)
#register_stanza_plugin(Message, xep_0323.stanza.Failure)
# register_stanza_plugin(Message, xep_0323.stanza.Result)
# register_stanza_plugin(Message, xep_0323.stanza.Gone)
# register_stanza_plugin(Message, xep_0323.stanza.Inactive)
@ -21,37 +30,364 @@ class TestChatStates(SleekTest):
"""
iq = self.Iq()
iq['type'] = 'get'
iq['from'] = 'master@clayster.com/amr'
iq['to'] = 'device@clayster.com'
iq['id'] = '1'
iq['sensordata']['req']['seqnr'] = '1'
iq['sensordata']['req']['momentary'] = 'true'
iq['req']['seqnr'] = '1'
iq['req']['momentary'] = 'true'
self.check(iq,"""
<iq type='get'
from='master@clayster.com/amr'
to='device@clayster.com'
id='1'>
<req xmlns='urn:xmpp:iot:sensordata' seqnr='1' momentary='true'/>
</iq>
"""
)
def testRequestNodes(self):
"""
test of request nodes stanza
"""
iq = self.Iq()
iq['type'] = 'get'
iq['from'] = 'master@clayster.com/amr'
iq['to'] = 'device@clayster.com'
iq['id'] = '1'
iq['req']['seqnr'] = '1'
iq['req']['momentary'] = 'true'
iq['req'].add_node("Device02", "Source02", "CacheType");
iq['req'].add_node("Device44");
self.check(iq,"""
<iq type='get'
from='master@clayster.com/amr'
to='device@clayster.com'
id='1'>
<req xmlns='urn:xmpp:iot:sensordata' seqnr='1' momentary='true'>
<node nodeId='Device02' sourceId='Source02' cacheType='CacheType'/>
<node nodeId='Device44'/>
</req>
</iq>
"""
)
iq['req'].del_node("Device02");
self.check(iq,"""
<iq type='get'
from='master@clayster.com/amr'
to='device@clayster.com'
id='1'>
<req xmlns='urn:xmpp:iot:sensordata' seqnr='1' momentary='true'>
<node nodeId='Device44'/>
</req>
</iq>
"""
)
iq['req'].del_nodes();
self.check(iq,"""
<iq type='get'
from='master@clayster.com/amr'
to='device@clayster.com'
id='1'>
<req xmlns='urn:xmpp:iot:sensordata' seqnr='1' momentary='true'>
</req>
</iq>
"""
)
def testRequestField(self):
"""
test of request field stanza
"""
iq = self.Iq()
iq['type'] = 'get'
iq['from'] = 'master@clayster.com/amr'
iq['to'] = 'device@clayster.com'
iq['id'] = '1'
iq['req']['seqnr'] = '1'
iq['req']['momentary'] = 'true'
iq['req'].add_field("Top temperature");
iq['req'].add_field("Bottom temperature");
self.check(iq,"""
<iq type='get'
from='master@clayster.com/amr'
to='device@clayster.com'
id='1'>
<req xmlns='urn:xmpp:iot:sensordata' seqnr='1' momentary='true'>
<field name='Top temperature'/>
<field name='Bottom temperature'/>
</req>
</iq>
"""
)
iq['req'].del_field("Top temperature")
self.check(iq,"""
<iq type='get'
from='master@clayster.com/amr'
to='device@clayster.com'
id='1'>
<req xmlns='urn:xmpp:iot:sensordata' seqnr='1' momentary='true'>
<field name='Bottom temperature'/>
</req>
</iq>
"""
)
iq['req'].del_fields()
self.check(iq,"""
<iq type='get'
from='master@clayster.com/amr'
to='device@clayster.com'
id='1'>
<req xmlns='urn:xmpp:iot:sensordata' seqnr='1' momentary='true'>
</req>
</iq>
"""
)
def testAccepted(self):
"""
test of request stanza
"""
iq = self.Iq()
iq['type'] = 'result'
iq['from'] = 'device@clayster.com'
iq['to'] = 'master@clayster.com/amr'
iq['id'] = '2'
iq['sensordata']['accepted']['seqnr'] = '2'
iq['accepted']['seqnr'] = '2'
print(str(iq))
self.check(iq,"""
<iq type='result'
from='device@clayster.com'
to='master@clayster.com/amr'
id='2'>
<accepted xmlns='urn:xmpp:iot:sensordata' seqnr='2'/>
</iq>
"""
)
def testRejected(self):
"""
test of request stanza
"""
iq = self.Iq()
iq['type'] = 'error'
iq['from'] = 'device@clayster.com'
iq['to'] = 'master@clayster.com/amr'
iq['id'] = '4'
iq['rejected']['seqnr'] = '4'
iq['rejected']['error'] = 'Access denied.'
self.check(iq,"""
<iq type='error'
from='device@clayster.com'
to='master@clayster.com/amr'
id='4'>
<rejected xmlns='urn:xmpp:iot:sensordata' seqnr='4'>
<error>Access denied.</error>
</rejected>
</iq>
"""
)
def testFailure(self):
"""
test of failure stanza
"""
msg = self.Message()
msg['from'] = 'device@clayster.com'
msg['to'] = 'master@clayster.com/amr'
msg['failure']['seqnr'] = '3'
msg['failure']['done'] = 'true'
msg['failure']['error']['nodeId'] = 'Device01'
msg['failure']['error']['timestamp'] = '2013-03-07T17:13:30'
msg['failure']['error']['text'] = 'Timeout.'
self.check(msg,"""
<message from='device@clayster.com'
to='master@clayster.com/amr'>
<failure xmlns='urn:xmpp:iot:sensordata' seqnr='3' done='true'>
<error nodeId='Device01' timestamp='2013-03-07T17:13:30'>
Timeout.</error>
</failure>
</message>
"""
)
def testFields(self):
"""
test of fields stanza
"""
msg = self.Message()
msg['from'] = 'device@clayster.com'
msg['to'] = 'master@clayster.com/amr'
msg['fields']['seqnr'] = '1'
node = msg['fields'].add_node("Device02");
ts = node.add_timestamp("2013-03-07T16:24:30");
data = ts.add_data(typename="numeric", name="Temperature", value="-12.42", unit='K');
data['momentary'] = 'true';
data['automaticReadout'] = 'true';
self.check(msg,"""
<message from='device@clayster.com'
to='master@clayster.com/amr'>
<fields xmlns='urn:xmpp:iot:sensordata' seqnr='1'>
<node nodeId='Device02'>
<timestamp value='2013-03-07T16:24:30'>
<numeric name='Temperature' momentary='true' automaticReadout='true' value='-12.42' unit='K'/>
</timestamp>
</node>
</fields>
</message>
"""
)
node = msg['fields'].add_node("EmptyDevice");
node = msg['fields'].add_node("Device04");
ts = node.add_timestamp("EmptyTimestamp");
self.check(msg,"""
<message from='device@clayster.com'
to='master@clayster.com/amr'>
<fields xmlns='urn:xmpp:iot:sensordata' seqnr='1'>
<node nodeId='Device02'>
<timestamp value='2013-03-07T16:24:30'>
<numeric name='Temperature' momentary='true' automaticReadout='true' value='-12.42' unit='K'/>
</timestamp>
</node>
<node nodeId='EmptyDevice'/>
<node nodeId='Device04'>
<timestamp value='EmptyTimestamp'/>
</node>
</fields>
</message>
"""
)
node = msg['fields'].add_node("Device77");
ts = node.add_timestamp("2013-05-03T12:00:01");
data = ts.add_data(typename="numeric", name="Temperature", value="-12.42", unit='K');
data['historicalDay'] = 'true';
data = ts.add_data(typename="numeric", name="Speed", value="312.42", unit='km/h');
data['historicalWeek'] = 'false';
data = ts.add_data(typename="string", name="Temperature name", value="Bottom oil");
data['historicalMonth'] = 'true';
data = ts.add_data(typename="string", name="Speed name", value="Top speed");
data['historicalQuarter'] = 'false';
data = ts.add_data(typename="dateTime", name="T1", value="1979-01-01T00:00:00");
data['historicalYear'] = 'true';
data = ts.add_data(typename="dateTime", name="T2", value="2000-01-01T01:02:03");
data['historicalOther'] = 'false';
data = ts.add_data(typename="timeSpan", name="TS1", value="P5Y");
data['missing'] = 'true';
data = ts.add_data(typename="timeSpan", name="TS2", value="PT2M1S");
data['manualEstimate'] = 'false';
data = ts.add_data(typename="enum", name="top color", value="red", dataType="string");
data['invoiced'] = 'true';
data = ts.add_data(typename="enum", name="bottom color", value="black", dataType="string");
data['powerFailure'] = 'false';
data = ts.add_data(typename="boolean", name="Temperature real", value="false");
data['historicalDay'] = 'true';
data = ts.add_data(typename="boolean", name="Speed real", value="true");
data['historicalWeek'] = 'false';
self.check(msg,"""
<message from='device@clayster.com'
to='master@clayster.com/amr'>
<fields xmlns='urn:xmpp:iot:sensordata' seqnr='1'>
<node nodeId='Device02'>
<timestamp value='2013-03-07T16:24:30'>
<numeric name='Temperature' momentary='true' automaticReadout='true' value='-12.42' unit='K'/>
</timestamp>
</node>
<node nodeId='EmptyDevice'/>
<node nodeId='Device04'>
<timestamp value='EmptyTimestamp'/>
</node>
<node nodeId='Device77'>
<timestamp value='2013-05-03T12:00:01'>
<numeric name='Temperature' historicalDay='true' value='-12.42' unit='K'/>
<numeric name='Speed' historicalWeek='false' value='312.42' unit='km/h'/>
<string name='Temperature name' historicalMonth='true' value='Bottom oil'/>
<string name='Speed name' historicalQuarter='false' value='Top speed'/>
<dateTime name='T1' historicalYear='true' value='1979-01-01T00:00:00'/>
<dateTime name='T2' historicalOther='false' value='2000-01-01T01:02:03'/>
<timeSpan name='TS1' missing='true' value='P5Y'/>
<timeSpan name='TS2' manualEstimate='false' value='PT2M1S'/>
<enum name='top color' invoiced='true' value='red' dataType='string'/>
<enum name='bottom color' powerFailure='false' value='black' dataType='string'/>
<boolean name='Temperature real' historicalDay='true' value='false'/>
<boolean name='Speed real' historicalWeek='false' value='true'/>
</timestamp>
</node>
</fields>
</message>
"""
)
def testReadOutMomentary_multiple(self):
"""
test of reading momentary value from a nde with multiple responses
"""
iq = self.Iq()
print(str(iq))
def testTimestamp(self):
msg = self.Message();
self.check(iq,"""
"""
msg['from'] = 'device@clayster.com'
msg['to'] = 'master@clayster.com/amr'
msg['fields']['seqnr'] = '1'
node = msg['fields'].add_node("Device02");
node = msg['fields'].add_node("Device03");
ts = node.add_timestamp("2013-03-07T16:24:30");
ts = node.add_timestamp("2013-03-07T16:24:31");
self.check(msg,"""
<message from='device@clayster.com'
to='master@clayster.com/amr'>
<fields xmlns='urn:xmpp:iot:sensordata' seqnr='1'>
<node nodeId='Device02'/>
<node nodeId='Device03'>
<timestamp value='2013-03-07T16:24:30'/>
<timestamp value='2013-03-07T16:24:31'/>
</node>
</fields>
</message>
"""
)
def testStringIdsMatcher(self):
"""
test of StringIds follow spec
"""
emptyStringIdXML='<message xmlns="jabber:client"><fields xmlns="urn:xmpp:iot:sensordata" /></message>'
msg = self.Message()
msg['fields']['stringIds'] = "Nisse"
self.check(msg,emptyStringIdXML)
msg['fields']['stringIds'] = "Nisse___nje#"
self.check(msg,emptyStringIdXML)
msg['fields']['stringIds'] = "1"
self.check(msg,emptyStringIdXML)
suite = unittest.TestLoader().loadTestsFromTestCase(TestChatStates)
suite = unittest.TestLoader().loadTestsFromTestCase(TestSensorDataStanzas)

View File

@ -0,0 +1,248 @@
# -*- coding: utf-8 -*-
"""
SleekXMPP: The Sleek XMPP Library
Implementation of xeps for Internet of Things
http://wiki.xmpp.org/web/Tech_pages/IoT_systems
Copyright (C) 2013 Sustainable Innovation, Joachim.lindborg@sust.se, bjorn.westrom@consoden.se
This file is part of SleekXMPP.
See the file LICENSE for copying permission.
"""
from sleekxmpp.test import *
import sleekxmpp.plugins.xep_0325 as xep_0325
namespace='sn'
class TestControlStanzas(SleekTest):
def setUp(self):
pass
def testSetRequest(self):
"""
test of set request stanza
"""
iq = self.Iq()
iq['type'] = 'set'
iq['from'] = 'master@clayster.com/amr'
iq['to'] = 'device@clayster.com'
iq['id'] = '1'
iq['set'].add_node("Device02", "Source02", "MyCacheType");
iq['set'].add_node("Device15");
iq['set'].add_data("Tjohej", "boolean", "true")
self.check(iq,"""
<iq type='set'
from='master@clayster.com/amr'
to='device@clayster.com'
id='1'>
<set xmlns='urn:xmpp:iot:control'>
<node nodeId='Device02' sourceId='Source02' cacheType='MyCacheType'/>
<node nodeId='Device15'/>
<boolean name='Tjohej' value='true'/>
</set>
</iq>
"""
)
iq['set'].del_node("Device02");
self.check(iq,"""
<iq type='set'
from='master@clayster.com/amr'
to='device@clayster.com'
id='1'>
<set xmlns='urn:xmpp:iot:control'>
<node nodeId='Device15'/>
<boolean name='Tjohej' value='true'/>
</set>
</iq>
"""
)
iq['set'].del_nodes();
self.check(iq,"""
<iq type='set'
from='master@clayster.com/amr'
to='device@clayster.com'
id='1'>
<set xmlns='urn:xmpp:iot:control'>
<boolean name='Tjohej' value='true'/>
</set>
</iq>
"""
)
def testDirectSet(self):
"""
test of direct set stanza
"""
msg = self.Message()
msg['from'] = 'master@clayster.com/amr'
msg['to'] = 'device@clayster.com'
msg['set'].add_node("Device02");
msg['set'].add_node("Device15");
msg['set'].add_data("Tjohej", "boolean", "true")
self.check(msg,"""
<message
from='master@clayster.com/amr'
to='device@clayster.com'>
<set xmlns='urn:xmpp:iot:control'>
<node nodeId='Device02'/>
<node nodeId='Device15'/>
<boolean name='Tjohej' value='true'/>
</set>
</message>
"""
)
def testSetResponse(self):
"""
test of set response stanza
"""
iq = self.Iq()
iq['type'] = 'result'
iq['from'] = 'master@clayster.com/amr'
iq['to'] = 'device@clayster.com'
iq['id'] = '8'
iq['setResponse']['responseCode'] = "OK";
self.check(iq,"""
<iq type='result'
from='master@clayster.com/amr'
to='device@clayster.com'
id='8'>
<setResponse xmlns='urn:xmpp:iot:control' responseCode='OK' />
</iq>
"""
)
iq = self.Iq()
iq['type'] = 'error'
iq['from'] = 'master@clayster.com/amr'
iq['to'] = 'device@clayster.com'
iq['id'] = '9'
iq['setResponse']['responseCode'] = "OtherError";
iq['setResponse']['error']['var'] = "Output";
iq['setResponse']['error']['text'] = "Test of other error.!";
self.check(iq,"""
<iq type='error'
from='master@clayster.com/amr'
to='device@clayster.com'
id='9'>
<setResponse xmlns='urn:xmpp:iot:control' responseCode='OtherError'>
<error var='Output'>Test of other error.!</error>
</setResponse>
</iq>
"""
)
iq = self.Iq()
iq['type'] = 'error'
iq['from'] = 'master@clayster.com/amr'
iq['to'] = 'device@clayster.com'
iq['id'] = '9'
iq['setResponse']['responseCode'] = "NotFound";
iq['setResponse'].add_node("Device17", "Source09");
iq['setResponse'].add_node("Device18", "Source09");
iq['setResponse'].add_data("Tjohopp");
self.check(iq,"""
<iq type='error'
from='master@clayster.com/amr'
to='device@clayster.com'
id='9'>
<setResponse xmlns='urn:xmpp:iot:control' responseCode='NotFound'>
<node nodeId='Device17' sourceId='Source09'/>
<node nodeId='Device18' sourceId='Source09'/>
<parameter name='Tjohopp' />
</setResponse>
</iq>
"""
)
def testSetRequestDatas(self):
"""
test of set request data stanzas
"""
iq = self.Iq()
iq['type'] = 'set'
iq['from'] = 'master@clayster.com/amr'
iq['to'] = 'device@clayster.com'
iq['id'] = '1'
iq['set'].add_node("Device02", "Source02", "MyCacheType");
iq['set'].add_node("Device15");
iq['set'].add_data("Tjohej", "boolean", "true");
iq['set'].add_data("Tjohej2", "boolean", "false");
iq['set'].add_data("TjohejC", "color", "FF00FF");
iq['set'].add_data("TjohejC2", "color", "00FF00");
iq['set'].add_data("TjohejS", "string", "String1");
iq['set'].add_data("TjohejS2", "string", "String2");
iq['set'].add_data("TjohejDate", "date", "2012-01-01");
iq['set'].add_data("TjohejDate2", "date", "1900-12-03");
iq['set'].add_data("TjohejDateT4", "dateTime", "1900-12-03 12:30");
iq['set'].add_data("TjohejDateT2", "dateTime", "1900-12-03 11:22");
iq['set'].add_data("TjohejDouble2", "double", "200.22");
iq['set'].add_data("TjohejDouble3", "double", "-12232131.3333");
iq['set'].add_data("TjohejDur", "duration", "P5Y");
iq['set'].add_data("TjohejDur2", "duration", "PT2M1S");
iq['set'].add_data("TjohejInt", "int", "1");
iq['set'].add_data("TjohejInt2", "int", "-42");
iq['set'].add_data("TjohejLong", "long", "123456789098");
iq['set'].add_data("TjohejLong2", "long", "-90983243827489374");
iq['set'].add_data("TjohejTime", "time", "23:59");
iq['set'].add_data("TjohejTime2", "time", "12:00");
self.check(iq,"""
<iq type='set'
from='master@clayster.com/amr'
to='device@clayster.com'
id='1'>
<set xmlns='urn:xmpp:iot:control'>
<node nodeId='Device02' sourceId='Source02' cacheType='MyCacheType'/>
<node nodeId='Device15'/>
<boolean name='Tjohej' value='true'/>
<boolean name='Tjohej2' value='false'/>
<color name='TjohejC' value='FF00FF'/>
<color name='TjohejC2' value='00FF00'/>
<string name='TjohejS' value='String1'/>
<string name='TjohejS2' value='String2'/>
<date name='TjohejDate' value='2012-01-01'/>
<date name='TjohejDate2' value='1900-12-03'/>
<dateTime name='TjohejDateT4' value='1900-12-03 12:30'/>
<dateTime name='TjohejDateT2' value='1900-12-03 11:22'/>
<double name='TjohejDouble2' value='200.22'/>
<double name='TjohejDouble3' value='-12232131.3333'/>
<duration name='TjohejDur' value='P5Y'/>
<duration name='TjohejDur2' value='PT2M1S'/>
<int name='TjohejInt' value='1'/>
<int name='TjohejInt2' value='-42'/>
<long name='TjohejLong' value='123456789098'/>
<long name='TjohejLong2' value='-90983243827489374'/>
<time name='TjohejTime' value='23:59'/>
<time name='TjohejTime2' value='12:00'/>
</set>
</iq>
"""
)
suite = unittest.TestLoader().loadTestsFromTestCase(TestControlStanzas)

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,365 @@
# -*- coding: utf-8 -*-
"""
SleekXMPP: The Sleek XMPP Library
Implementation of xeps for Internet of Things
http://wiki.xmpp.org/web/Tech_pages/IoT_systems
Copyright (C) 2013 Sustainable Innovation, Joachim.lindborg@sust.se, bjorn.westrom@consoden.se
This file is part of SleekXMPP.
See the file LICENSE for copying permission.
"""
import sys
import datetime
import time
import threading
from sleekxmpp.test import *
from sleekxmpp.xmlstream import ElementBase
from sleekxmpp.plugins.xep_0325.device import Device
class TestStreamControl(SleekTest):
"""
Test using the XEP-0325 plugin.
"""
def setUp(self):
pass
def _time_now(self):
return datetime.datetime.now().replace(microsecond=0).isoformat();
def tearDown(self):
self.stream_close()
def testRequestSetOk(self):
self.stream_start(mode='component',
plugins=['xep_0030',
'xep_0325'])
myDevice = Device("Device22");
myDevice._add_control_field(name="Temperature", typename="int", value="15");
self.xmpp['xep_0325'].register_node(nodeId="Device22", device=myDevice, commTimeout=0.5);
self.recv("""
<iq type='set'
from='master@clayster.com/amr'
to='device@clayster.com'
id='1'>
<set xmlns='urn:xmpp:iot:control'>
<int name="Temperature" value="17"/>
</set>
</iq>
""")
self.send("""
<iq type='result'
from='device@clayster.com'
to='master@clayster.com/amr'
id='1'>
<setResponse xmlns='urn:xmpp:iot:control' responseCode="OK" />
</iq>
""")
self.assertEqual(myDevice._get_field_value("Temperature"), "17");
def testRequestSetMulti(self):
self.stream_start(mode='component',
plugins=['xep_0030',
'xep_0325'])
myDevice = Device("Device22");
myDevice._add_control_field(name="Temperature", typename="int", value="15");
myDevice._add_control_field(name="Startup", typename="date", value="2013-01-03");
myDevice2 = Device("Device23");
myDevice2._add_control_field(name="Temperature", typename="int", value="19");
myDevice2._add_control_field(name="Startup", typename="date", value="2013-01-09");
self.xmpp['xep_0325'].register_node(nodeId="Device22", device=myDevice, commTimeout=0.5);
self.xmpp['xep_0325'].register_node(nodeId="Device23", device=myDevice2, commTimeout=0.5);
self.recv("""
<iq type='set'
from='master@clayster.com/amr'
to='device@clayster.com'
id='1'>
<set xmlns='urn:xmpp:iot:control'>
<node nodeId='Device22' />
<int name="Temperature" value="17"/>
</set>
</iq>
""")
self.send("""
<iq type='result'
from='device@clayster.com'
to='master@clayster.com/amr'
id='1'>
<setResponse xmlns='urn:xmpp:iot:control' responseCode="OK" />
</iq>
""")
self.assertEqual(myDevice._get_field_value("Temperature"), "17");
self.assertEqual(myDevice2._get_field_value("Temperature"), "19");
self.recv("""
<iq type='set'
from='master@clayster.com/amr'
to='device@clayster.com'
id='2'>
<set xmlns='urn:xmpp:iot:control'>
<node nodeId='Device23' />
<node nodeId='Device22' />
<date name="Startup" value="2013-02-01"/>
<int name="Temperature" value="20"/>
</set>
</iq>
""")
self.send("""
<iq type='result'
from='device@clayster.com'
to='master@clayster.com/amr'
id='2'>
<setResponse xmlns='urn:xmpp:iot:control' responseCode="OK" />
</iq>
""")
self.assertEqual(myDevice._get_field_value("Temperature"), "20");
self.assertEqual(myDevice2._get_field_value("Temperature"), "20");
self.assertEqual(myDevice._get_field_value("Startup"), "2013-02-01");
self.assertEqual(myDevice2._get_field_value("Startup"), "2013-02-01");
def testRequestSetFail(self):
self.stream_start(mode='component',
plugins=['xep_0030',
'xep_0325'])
myDevice = Device("Device23");
myDevice._add_control_field(name="Temperature", typename="int", value="15");
self.xmpp['xep_0325'].register_node(nodeId="Device23", device=myDevice, commTimeout=0.5);
self.recv("""
<iq type='set'
from='master@clayster.com/amr'
to='device@clayster.com'
id='9'>
<set xmlns='urn:xmpp:iot:control'>
<int name="Voltage" value="17"/>
</set>
</iq>
""")
self.send("""
<iq type='error'
from='device@clayster.com'
to='master@clayster.com/amr'
id='9'>
<setResponse xmlns='urn:xmpp:iot:control' responseCode='NotFound'>
<parameter name='Voltage' />
<error var='Output'>Invalid field Voltage</error>
</setResponse>
</iq>
""")
self.assertEqual(myDevice._get_field_value("Temperature"), "15");
self.assertFalse(myDevice.has_control_field("Voltage", "int"));
def testDirectSetOk(self):
self.stream_start(mode='component',
plugins=['xep_0030',
'xep_0325'])
myDevice = Device("Device22");
myDevice._add_control_field(name="Temperature", typename="int", value="15");
self.xmpp['xep_0325'].register_node(nodeId="Device22", device=myDevice, commTimeout=0.5);
self.recv("""
<message
from='master@clayster.com/amr'
to='device@clayster.com'>
<set xmlns='urn:xmpp:iot:control'>
<int name="Temperature" value="17"/>
</set>
</message>
""")
time.sleep(.5)
self.assertEqual(myDevice._get_field_value("Temperature"), "17");
def testDirectSetFail(self):
self.stream_start(mode='component',
plugins=['xep_0030',
'xep_0325'])
myDevice = Device("Device22");
myDevice._add_control_field(name="Temperature", typename="int", value="15");
self.xmpp['xep_0325'].register_node(nodeId="Device22", device=myDevice, commTimeout=0.5);
self.recv("""
<message
from='master@clayster.com/amr'
to='device@clayster.com'>
<set xmlns='urn:xmpp:iot:control'>
<int name="Voltage" value="17"/>
</set>
</message>
""")
time.sleep(.5)
self.assertEqual(myDevice._get_field_value("Temperature"), "15");
self.assertFalse(myDevice.has_control_field("Voltage", "int"));
def testRequestSetOkAPI(self):
self.stream_start(mode='client',
plugins=['xep_0030',
'xep_0325'])
results = [];
def my_callback(from_jid, result, nodeIds=None, fields=None, error_msg=None):
results.append(result);
fields = []
fields.append(("Temperature", "double", "20.5"))
fields.append(("TemperatureAlarmSetting", "string", "High"))
self.xmpp['xep_0325'].set_request(from_jid="tester@localhost", to_jid="you@google.com", fields=fields, nodeIds=['Device33', 'Device22'], callback=my_callback);
self.send("""
<iq type='set'
from='tester@localhost'
to='you@google.com'
id='1'>
<set xmlns='urn:xmpp:iot:control'>
<node nodeId='Device33' />
<node nodeId='Device22' />
<double name="Temperature" value="20.5" />
<string name="TemperatureAlarmSetting" value="High" />
</set>
</iq>
""")
self.recv("""
<iq type='result'
from='you@google.com'
to='tester@localhost'
id='1'>
<setResponse xmlns='urn:xmpp:iot:control' responseCode="OK" />
</iq>
""")
time.sleep(.5)
self.assertEqual(results, ["OK"]);
def testRequestSetErrorAPI(self):
self.stream_start(mode='client',
plugins=['xep_0030',
'xep_0325'])
results = [];
def my_callback(from_jid, result, nodeIds=None, fields=None, error_msg=None):
results.append(result);
fields = []
fields.append(("Temperature", "double", "20.5"))
fields.append(("TemperatureAlarmSetting", "string", "High"))
self.xmpp['xep_0325'].set_request(from_jid="tester@localhost", to_jid="you@google.com", fields=fields, nodeIds=['Device33', 'Device22'], callback=my_callback);
self.send("""
<iq type='set'
from='tester@localhost'
to='you@google.com'
id='1'>
<set xmlns='urn:xmpp:iot:control'>
<node nodeId='Device33' />
<node nodeId='Device22' />
<double name="Temperature" value="20.5" />
<string name="TemperatureAlarmSetting" value="High" />
</set>
</iq>
""")
self.recv("""
<iq type='error'
from='you@google.com'
to='tester@localhost'
id='1'>
<setResponse xmlns='urn:xmpp:iot:control' responseCode="OtherError" >
<error var='Temperature'>Sensor error</error>
</setResponse>
</iq>
""")
time.sleep(.5)
self.assertEqual(results, ["OtherError"]);
def testServiceDiscoveryClient(self):
self.stream_start(mode='client',
plugins=['xep_0030',
'xep_0325']);
self.recv("""
<iq type='get'
from='master@clayster.com/amr'
to='tester@localhost'
id='disco1'>
<query xmlns='http://jabber.org/protocol/disco#info'/>
</iq>
""")
self.send("""
<iq type='result'
to='master@clayster.com/amr'
id='disco1'>
<query xmlns='http://jabber.org/protocol/disco#info'>
<identity category='client' type='bot'/>
<feature var='urn:xmpp:iot:control'/>
</query>
</iq>
""")
def testServiceDiscoveryComponent(self):
self.stream_start(mode='component',
plugins=['xep_0030',
'xep_0325']);
self.recv("""
<iq type='get'
from='master@clayster.com/amr'
to='tester@localhost'
id='disco1'>
<query xmlns='http://jabber.org/protocol/disco#info'/>
</iq>
""")
self.send("""
<iq type='result'
from='tester@localhost'
to='master@clayster.com/amr'
id='disco1'>
<query xmlns='http://jabber.org/protocol/disco#info'>
<identity category='component' type='generic'/>
<feature var='urn:xmpp:iot:control'/>
</query>
</iq>
""")
suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamControl)