Conflicts: README.rst examples/IoT_TestDevice.py examples/disco_browser.py setup.py sleekxmpp/jid.py sleekxmpp/plugins/google/auth/stanza.py sleekxmpp/plugins/google/gmail/notifications.py sleekxmpp/plugins/google/nosave/stanza.py sleekxmpp/plugins/google/settings/settings.py sleekxmpp/thirdparty/__init__.py sleekxmpp/thirdparty/socks.py sleekxmpp/thirdparty/statemachine.py sleekxmpp/util/__init__.py sleekxmpp/xmlstream/xmlstream.py slixmpp/basexmpp.py slixmpp/plugins/xep_0004/stanza/form.py slixmpp/plugins/xep_0009/rpc.py slixmpp/plugins/xep_0050/adhoc.py slixmpp/plugins/xep_0065/proxy.py slixmpp/plugins/xep_0084/stanza.py slixmpp/plugins/xep_0202/time.py slixmpp/plugins/xep_0323/sensordata.py slixmpp/plugins/xep_0325/control.py slixmpp/plugins/xep_0325/stanza/control.py slixmpp/roster/single.py slixmpp/stanza/atom.py slixmpp/stanza/rootstanza.py slixmpp/test/slixtest.py slixmpp/util/sasl/mechanisms.py slixmpp/version.py slixmpp/xmlstream/stanzabase.py tests/test_stanza_xep_0323.py tests/test_stanza_xep_0325.py tests/test_stream_xep_0323.py tests/test_stream_xep_0325.py
		
			
				
	
	
		
			360 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			360 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# -*- coding: utf-8 -*-
 | 
						|
"""
 | 
						|
    Slixmpp: The Slick XMPP Library
 | 
						|
    Implementation of xeps for Internet of Things
 | 
						|
    http://wiki.xmpp.org/web/Tech_pages/IoT_systems
 | 
						|
    Copyright (C) 2013 Sustainable Innovation, Joachim.lindborg@sust.se, bjorn.westrom@consoden.se
 | 
						|
    This file is part of Slixmpp.
 | 
						|
 | 
						|
    See the file LICENSE for copying permission.
 | 
						|
"""
 | 
						|
 | 
						|
import sys
 | 
						|
import datetime
 | 
						|
import time
 | 
						|
import threading
 | 
						|
 | 
						|
from slixmpp.test import *
 | 
						|
from slixmpp.xmlstream import ElementBase
 | 
						|
from slixmpp.plugins.xep_0325.device import Device
 | 
						|
 | 
						|
 | 
						|
class TestStreamControl(SlixTest):
 | 
						|
 | 
						|
    """
 | 
						|
    Test using the XEP-0325 plugin.
 | 
						|
    """
 | 
						|
    def setUp(self):
 | 
						|
        pass
 | 
						|
 | 
						|
    def _time_now(self):
 | 
						|
        return datetime.datetime.now().replace(microsecond=0).isoformat()
 | 
						|
 | 
						|
    def tearDown(self):
 | 
						|
        self.stream_close()
 | 
						|
 | 
						|
    def testRequestSetOk(self):
 | 
						|
        self.stream_start(mode='component',
 | 
						|
                          plugins=['xep_0030',
 | 
						|
                                   'xep_0325'])
 | 
						|
 | 
						|
        myDevice = Device("Device22")
 | 
						|
        myDevice._add_control_field(name="Temperature", typename="int", value="15")
 | 
						|
 | 
						|
        self.xmpp['xep_0325'].register_node(nodeId="Device22", device=myDevice, commTimeout=0.5)
 | 
						|
 | 
						|
        self.recv("""
 | 
						|
            <iq type='set'
 | 
						|
                from='master@clayster.com/amr'
 | 
						|
                to='device@clayster.com'
 | 
						|
                id='1'>
 | 
						|
                <set xmlns='urn:xmpp:iot:control'>
 | 
						|
                    <int name="Temperature" value="17"/>
 | 
						|
                </set>
 | 
						|
            </iq>
 | 
						|
        """)
 | 
						|
 | 
						|
        self.send("""
 | 
						|
            <iq type='result'
 | 
						|
                from='device@clayster.com'
 | 
						|
                to='master@clayster.com/amr'
 | 
						|
                id='1'>
 | 
						|
                <setResponse xmlns='urn:xmpp:iot:control' responseCode="OK" />
 | 
						|
            </iq>
 | 
						|
            """)
 | 
						|
 | 
						|
        self.assertEqual(myDevice._get_field_value("Temperature"), "17")
 | 
						|
 | 
						|
    def testRequestSetMulti(self):
 | 
						|
        self.stream_start(mode='component',
 | 
						|
                          plugins=['xep_0030',
 | 
						|
                                   'xep_0325'])
 | 
						|
 | 
						|
        myDevice = Device("Device22")
 | 
						|
        myDevice._add_control_field(name="Temperature", typename="int", value="15")
 | 
						|
        myDevice._add_control_field(name="Startup", typename="date", value="2013-01-03")
 | 
						|
 | 
						|
        myDevice2 = Device("Device23")
 | 
						|
        myDevice2._add_control_field(name="Temperature", typename="int", value="19")
 | 
						|
        myDevice2._add_control_field(name="Startup", typename="date", value="2013-01-09")
 | 
						|
 | 
						|
        self.xmpp['xep_0325'].register_node(nodeId="Device22", device=myDevice, commTimeout=0.5)
 | 
						|
        self.xmpp['xep_0325'].register_node(nodeId="Device23", device=myDevice2, commTimeout=0.5)
 | 
						|
 | 
						|
        self.recv("""
 | 
						|
            <iq type='set'
 | 
						|
                from='master@clayster.com/amr'
 | 
						|
                to='device@clayster.com'
 | 
						|
                id='1'>
 | 
						|
                <set xmlns='urn:xmpp:iot:control'>
 | 
						|
                    <node nodeId='Device22' />
 | 
						|
                    <int name="Temperature" value="17"/>
 | 
						|
                </set>
 | 
						|
            </iq>
 | 
						|
        """)
 | 
						|
 | 
						|
        self.send("""
 | 
						|
            <iq type='result'
 | 
						|
                from='device@clayster.com'
 | 
						|
                to='master@clayster.com/amr'
 | 
						|
                id='1'>
 | 
						|
                <setResponse xmlns='urn:xmpp:iot:control' responseCode="OK" />
 | 
						|
            </iq>
 | 
						|
            """)
 | 
						|
 | 
						|
        self.assertEqual(myDevice._get_field_value("Temperature"), "17")
 | 
						|
        self.assertEqual(myDevice2._get_field_value("Temperature"), "19")
 | 
						|
 | 
						|
        self.recv("""
 | 
						|
            <iq type='set'
 | 
						|
                from='master@clayster.com/amr'
 | 
						|
                to='device@clayster.com'
 | 
						|
                id='2'>
 | 
						|
                <set xmlns='urn:xmpp:iot:control'>
 | 
						|
                    <node nodeId='Device23' />
 | 
						|
                    <node nodeId='Device22' />
 | 
						|
                    <date name="Startup" value="2013-02-01"/>
 | 
						|
                    <int name="Temperature" value="20"/>
 | 
						|
                </set>
 | 
						|
            </iq>
 | 
						|
        """)
 | 
						|
 | 
						|
        self.send("""
 | 
						|
            <iq type='result'
 | 
						|
                from='device@clayster.com'
 | 
						|
                to='master@clayster.com/amr'
 | 
						|
                id='2'>
 | 
						|
                <setResponse xmlns='urn:xmpp:iot:control' responseCode="OK" />
 | 
						|
            </iq>
 | 
						|
            """)
 | 
						|
 | 
						|
        self.assertEqual(myDevice._get_field_value("Temperature"), "20")
 | 
						|
        self.assertEqual(myDevice2._get_field_value("Temperature"), "20")
 | 
						|
        self.assertEqual(myDevice._get_field_value("Startup"), "2013-02-01")
 | 
						|
        self.assertEqual(myDevice2._get_field_value("Startup"), "2013-02-01")
 | 
						|
 | 
						|
    def testRequestSetFail(self):
 | 
						|
        self.stream_start(mode='component',
 | 
						|
                          plugins=['xep_0030',
 | 
						|
                                   'xep_0325'])
 | 
						|
 | 
						|
        myDevice = Device("Device23")
 | 
						|
        myDevice._add_control_field(name="Temperature", typename="int", value="15")
 | 
						|
 | 
						|
        self.xmpp['xep_0325'].register_node(nodeId="Device23", device=myDevice, commTimeout=0.5)
 | 
						|
 | 
						|
        self.recv("""
 | 
						|
            <iq type='set'
 | 
						|
                from='master@clayster.com/amr'
 | 
						|
                to='device@clayster.com'
 | 
						|
                id='9'>
 | 
						|
                <set xmlns='urn:xmpp:iot:control'>
 | 
						|
                    <int name="Voltage" value="17"/>
 | 
						|
                </set>
 | 
						|
            </iq>
 | 
						|
        """)
 | 
						|
 | 
						|
        self.send("""
 | 
						|
            <iq type='error'
 | 
						|
                from='device@clayster.com'
 | 
						|
                to='master@clayster.com/amr'
 | 
						|
                id='9'>
 | 
						|
                <setResponse xmlns='urn:xmpp:iot:control' responseCode='NotFound'>
 | 
						|
                    <parameter name='Voltage' />
 | 
						|
                    <error var='Output'>Invalid field Voltage</error>
 | 
						|
                </setResponse>
 | 
						|
            </iq>
 | 
						|
            """)
 | 
						|
 | 
						|
        self.assertEqual(myDevice._get_field_value("Temperature"), "15")
 | 
						|
        self.assertFalse(myDevice.has_control_field("Voltage", "int"))
 | 
						|
 | 
						|
    def testDirectSetOk(self):
 | 
						|
        self.stream_start(mode='component',
 | 
						|
                          plugins=['xep_0030',
 | 
						|
                                   'xep_0325'])
 | 
						|
 | 
						|
        myDevice = Device("Device22")
 | 
						|
        myDevice._add_control_field(name="Temperature", typename="int", value="15")
 | 
						|
 | 
						|
        self.xmpp['xep_0325'].register_node(nodeId="Device22", device=myDevice, commTimeout=0.5)
 | 
						|
 | 
						|
        self.recv("""
 | 
						|
            <message
 | 
						|
                from='master@clayster.com/amr'
 | 
						|
                to='device@clayster.com'>
 | 
						|
                <set xmlns='urn:xmpp:iot:control'>
 | 
						|
                    <int name="Temperature" value="17"/>
 | 
						|
                </set>
 | 
						|
            </message>
 | 
						|
        """)
 | 
						|
 | 
						|
        time.sleep(0.5)
 | 
						|
 | 
						|
        self.assertEqual(myDevice._get_field_value("Temperature"), "17")
 | 
						|
 | 
						|
    def testDirectSetFail(self):
 | 
						|
        self.stream_start(mode='component',
 | 
						|
                          plugins=['xep_0030',
 | 
						|
                                   'xep_0325'])
 | 
						|
 | 
						|
        myDevice = Device("Device22")
 | 
						|
        myDevice._add_control_field(name="Temperature", typename="int", value="15")
 | 
						|
 | 
						|
        self.xmpp['xep_0325'].register_node(nodeId="Device22", device=myDevice, commTimeout=0.5)
 | 
						|
 | 
						|
        self.recv("""
 | 
						|
            <message
 | 
						|
                from='master@clayster.com/amr'
 | 
						|
                to='device@clayster.com'>
 | 
						|
                <set xmlns='urn:xmpp:iot:control'>
 | 
						|
                    <int name="Voltage" value="17"/>
 | 
						|
                </set>
 | 
						|
            </message>
 | 
						|
        """)
 | 
						|
 | 
						|
        self.assertEqual(myDevice._get_field_value("Temperature"), "15");
 | 
						|
        self.assertFalse(myDevice.has_control_field("Voltage", "int"));
 | 
						|
 | 
						|
 | 
						|
    def testRequestSetOkAPI(self):
 | 
						|
 | 
						|
        self.stream_start(mode='client',
 | 
						|
                          plugins=['xep_0030',
 | 
						|
                                   'xep_0325'])
 | 
						|
 | 
						|
        results = []
 | 
						|
 | 
						|
        def my_callback(from_jid, result, nodeIds=None, fields=None, error_msg=None):
 | 
						|
            results.append(result)
 | 
						|
 | 
						|
        fields = []
 | 
						|
        fields.append(("Temperature", "double", "20.5"))
 | 
						|
        fields.append(("TemperatureAlarmSetting", "string", "High"))
 | 
						|
 | 
						|
        self.xmpp['xep_0325'].set_request(from_jid="tester@localhost", to_jid="you@google.com", fields=fields, nodeIds=['Device33', 'Device22'], callback=my_callback)
 | 
						|
 | 
						|
        self.send("""
 | 
						|
            <iq type='set'
 | 
						|
                from='tester@localhost'
 | 
						|
                to='you@google.com'
 | 
						|
                id='1'>
 | 
						|
                <set xmlns='urn:xmpp:iot:control'>
 | 
						|
                    <node nodeId='Device33' />
 | 
						|
                    <node nodeId='Device22' />
 | 
						|
                    <double name="Temperature" value="20.5" />
 | 
						|
                    <string name="TemperatureAlarmSetting" value="High" />
 | 
						|
                </set>
 | 
						|
            </iq>
 | 
						|
            """)
 | 
						|
 | 
						|
        self.recv("""
 | 
						|
            <iq type='result'
 | 
						|
                from='you@google.com'
 | 
						|
                to='tester@localhost'
 | 
						|
                id='1'>
 | 
						|
                <setResponse xmlns='urn:xmpp:iot:control' responseCode="OK" />
 | 
						|
            </iq>
 | 
						|
            """)
 | 
						|
 | 
						|
        self.assertEqual(results, ["OK"]);
 | 
						|
 | 
						|
    def testRequestSetErrorAPI(self):
 | 
						|
 | 
						|
        self.stream_start(mode='client',
 | 
						|
                          plugins=['xep_0030',
 | 
						|
                                   'xep_0325'])
 | 
						|
 | 
						|
        results = []
 | 
						|
 | 
						|
        def my_callback(from_jid, result, nodeIds=None, fields=None, error_msg=None):
 | 
						|
            results.append(result)
 | 
						|
 | 
						|
        fields = []
 | 
						|
        fields.append(("Temperature", "double", "20.5"))
 | 
						|
        fields.append(("TemperatureAlarmSetting", "string", "High"))
 | 
						|
 | 
						|
        self.xmpp['xep_0325'].set_request(from_jid="tester@localhost", to_jid="you@google.com", fields=fields, nodeIds=['Device33', 'Device22'], callback=my_callback)
 | 
						|
 | 
						|
        self.send("""
 | 
						|
            <iq type='set'
 | 
						|
                from='tester@localhost'
 | 
						|
                to='you@google.com'
 | 
						|
                id='1'>
 | 
						|
                <set xmlns='urn:xmpp:iot:control'>
 | 
						|
                    <node nodeId='Device33' />
 | 
						|
                    <node nodeId='Device22' />
 | 
						|
                    <double name="Temperature" value="20.5" />
 | 
						|
                    <string name="TemperatureAlarmSetting" value="High" />
 | 
						|
                </set>
 | 
						|
            </iq>
 | 
						|
            """)
 | 
						|
 | 
						|
        self.recv("""
 | 
						|
            <iq type='error'
 | 
						|
                from='you@google.com'
 | 
						|
                to='tester@localhost'
 | 
						|
                id='1'>
 | 
						|
                <setResponse xmlns='urn:xmpp:iot:control' responseCode="OtherError" >
 | 
						|
                    <error var='Temperature'>Sensor error</error>
 | 
						|
                </setResponse>
 | 
						|
            </iq>
 | 
						|
            """)
 | 
						|
 | 
						|
        self.assertEqual(results, ["OtherError"])
 | 
						|
 | 
						|
    def testServiceDiscoveryClient(self):
 | 
						|
        self.stream_start(mode='client',
 | 
						|
                          plugins=['xep_0030',
 | 
						|
                                   'xep_0325'])
 | 
						|
 | 
						|
        self.recv("""
 | 
						|
        <iq type='get'
 | 
						|
                from='master@clayster.com/amr'
 | 
						|
                to='tester@localhost/resource'
 | 
						|
                id='disco1'>
 | 
						|
            <query xmlns='http://jabber.org/protocol/disco#info'/>
 | 
						|
        </iq>
 | 
						|
        """)
 | 
						|
 | 
						|
        self.send("""
 | 
						|
        <iq type='result'
 | 
						|
            to='master@clayster.com/amr'
 | 
						|
            id='disco1'>
 | 
						|
            <query xmlns='http://jabber.org/protocol/disco#info'>
 | 
						|
                <identity category='client' type='bot'/>
 | 
						|
                <feature var='urn:xmpp:iot:control'/>
 | 
						|
            </query>
 | 
						|
        </iq>
 | 
						|
        """)
 | 
						|
 | 
						|
    def testServiceDiscoveryComponent(self):
 | 
						|
        self.stream_start(mode='component',
 | 
						|
                          plugins=['xep_0030',
 | 
						|
                                   'xep_0325'])
 | 
						|
 | 
						|
        self.recv("""
 | 
						|
        <iq type='get'
 | 
						|
                from='master@clayster.com/amr'
 | 
						|
                to='tester@localhost/resource'
 | 
						|
                id='disco1'>
 | 
						|
            <query xmlns='http://jabber.org/protocol/disco#info'/>
 | 
						|
        </iq>
 | 
						|
        """)
 | 
						|
 | 
						|
        self.send("""
 | 
						|
        <iq type='result'
 | 
						|
            from='tester@localhost/resource'
 | 
						|
            to='master@clayster.com/amr'
 | 
						|
            id='disco1'>
 | 
						|
            <query xmlns='http://jabber.org/protocol/disco#info'>
 | 
						|
                <identity category='component' type='generic'/>
 | 
						|
                <feature var='urn:xmpp:iot:control'/>
 | 
						|
            </query>
 | 
						|
        </iq>
 | 
						|
        """)
 | 
						|
 | 
						|
 | 
						|
suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamControl)
 | 
						|
 |