274 lines
		
	
	
		
			8.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			274 lines
		
	
	
		
			8.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from sleekxmpp.xmlstream.matcher import MatchXPath
 | |
| from sleekxmpp.xmlstream.handler import Callback
 | |
| from sleekxmpp.exceptions import XMPPError
 | |
| import unittest
 | |
| from sleekxmpp.test import SleekTest
 | |
| 
 | |
| 
 | |
| class TestStreamExceptions(SleekTest):
 | |
|     """
 | |
|     Test handling roster updates.
 | |
|     """
 | |
| 
 | |
|     def tearDown(self):
 | |
|         self.stream_close()
 | |
| 
 | |
|     def testExceptionReply(self):
 | |
|         """Test that raising an exception replies with the original stanza."""
 | |
| 
 | |
|         def message(msg):
 | |
|             msg.reply()
 | |
|             msg['body'] = 'Body changed'
 | |
|             raise XMPPError(clear=False)
 | |
| 
 | |
|         self.stream_start()
 | |
|         self.xmpp.add_event_handler('message', message)
 | |
| 
 | |
|         self.recv("""
 | |
|           <message>
 | |
|             <body>This is going to cause an error.</body>
 | |
|           </message>
 | |
|         """)
 | |
| 
 | |
|         self.send("""
 | |
|           <message type="error">
 | |
|             <body>This is going to cause an error.</body>
 | |
|             <error type="cancel" code="500">
 | |
|               <undefined-condition
 | |
|                   xmlns="urn:ietf:params:xml:ns:xmpp-stanzas" />
 | |
|             </error>
 | |
|           </message>
 | |
|         """)
 | |
| 
 | |
|     def testExceptionContinueWorking(self):
 | |
|         """Test that Sleek continues to respond after an XMPPError is raised."""
 | |
| 
 | |
|         def message(msg):
 | |
|             msg.reply()
 | |
|             msg['body'] = 'Body changed'
 | |
|             raise XMPPError(clear=False)
 | |
| 
 | |
|         self.stream_start()
 | |
|         self.xmpp.add_event_handler('message', message)
 | |
| 
 | |
|         self.recv("""
 | |
|           <message>
 | |
|             <body>This is going to cause an error.</body>
 | |
|           </message>
 | |
|         """)
 | |
| 
 | |
|         self.send("""
 | |
|           <message type="error">
 | |
|             <body>This is going to cause an error.</body>
 | |
|             <error type="cancel" code="500">
 | |
|               <undefined-condition
 | |
|                   xmlns="urn:ietf:params:xml:ns:xmpp-stanzas" />
 | |
|             </error>
 | |
|           </message>
 | |
|         """)
 | |
| 
 | |
|         self.recv("""
 | |
|           <message>
 | |
|             <body>This is going to cause an error.</body>
 | |
|           </message>
 | |
|         """)
 | |
| 
 | |
|         self.send("""
 | |
|           <message type="error">
 | |
|             <body>This is going to cause an error.</body>
 | |
|             <error type="cancel" code="500">
 | |
|               <undefined-condition
 | |
|                   xmlns="urn:ietf:params:xml:ns:xmpp-stanzas" />
 | |
|             </error>
 | |
|           </message>
 | |
|         """)
 | |
| 
 | |
|     def testXMPPErrorException(self):
 | |
|         """Test raising an XMPPError exception."""
 | |
| 
 | |
|         def message(msg):
 | |
|             raise XMPPError(condition='feature-not-implemented',
 | |
|                             text="We don't do things that way here.",
 | |
|                             etype='cancel',
 | |
|                             extension='foo',
 | |
|                             extension_ns='foo:error',
 | |
|                             extension_args={'test': 'true'})
 | |
| 
 | |
|         self.stream_start()
 | |
|         self.xmpp.add_event_handler('message', message)
 | |
| 
 | |
|         self.recv("""
 | |
|           <message>
 | |
|             <body>This is going to cause an error.</body>
 | |
|           </message>
 | |
|         """)
 | |
| 
 | |
|         self.send("""
 | |
|           <message type="error">
 | |
|             <error type="cancel" code="501">
 | |
|               <feature-not-implemented
 | |
|                   xmlns="urn:ietf:params:xml:ns:xmpp-stanzas" />
 | |
|               <text xmlns="urn:ietf:params:xml:ns:xmpp-stanzas">
 | |
|                 We don't do things that way here.
 | |
|               </text>
 | |
|               <foo xmlns="foo:error" test="true" />
 | |
|             </error>
 | |
|           </message>
 | |
|         """, use_values=False)
 | |
| 
 | |
|     def testIqErrorException(self):
 | |
|         """Test using error exceptions with Iq stanzas."""
 | |
| 
 | |
|         def handle_iq(iq):
 | |
|             raise XMPPError(condition='feature-not-implemented',
 | |
|                             text="We don't do things that way here.",
 | |
|                             etype='cancel',
 | |
|                             clear=False)
 | |
| 
 | |
|         self.stream_start()
 | |
|         self.xmpp.register_handler(
 | |
|                 Callback(
 | |
|                     'Test Iq',
 | |
|                      MatchXPath('{%s}iq/{test}query' % self.xmpp.default_ns),
 | |
|                      handle_iq))
 | |
| 
 | |
|         self.recv("""
 | |
|           <iq type="get" id="0">
 | |
|             <query xmlns="test" />
 | |
|           </iq>
 | |
|         """)
 | |
| 
 | |
|         self.send("""
 | |
|           <iq type="error" id="0">
 | |
|             <query xmlns="test" />
 | |
|             <error type="cancel" code="501">
 | |
|               <feature-not-implemented
 | |
|                   xmlns="urn:ietf:params:xml:ns:xmpp-stanzas" />
 | |
|               <text xmlns="urn:ietf:params:xml:ns:xmpp-stanzas">
 | |
|                 We don't do things that way here.
 | |
|               </text>
 | |
|             </error>
 | |
|           </iq>
 | |
|         """, use_values=False)
 | |
| 
 | |
|     def testThreadedXMPPErrorException(self):
 | |
|         """Test raising an XMPPError exception in a threaded handler."""
 | |
| 
 | |
|         def message(msg):
 | |
|             raise XMPPError(condition='feature-not-implemented',
 | |
|                             text="We don't do things that way here.",
 | |
|                             etype='cancel')
 | |
| 
 | |
|         self.stream_start()
 | |
|         self.xmpp.add_event_handler('message', message,
 | |
|                                     threaded=True)
 | |
| 
 | |
|         self.recv("""
 | |
|           <message>
 | |
|             <body>This is going to cause an error.</body>
 | |
|           </message>
 | |
|         """)
 | |
| 
 | |
|         self.send("""
 | |
|           <message type="error">
 | |
|             <error type="cancel" code="501">
 | |
|               <feature-not-implemented
 | |
|                   xmlns="urn:ietf:params:xml:ns:xmpp-stanzas" />
 | |
|               <text xmlns="urn:ietf:params:xml:ns:xmpp-stanzas">
 | |
|                 We don't do things that way here.
 | |
|               </text>
 | |
|             </error>
 | |
|           </message>
 | |
|         """)
 | |
| 
 | |
|     def testUnknownException(self):
 | |
|         """Test raising an generic exception in a threaded handler."""
 | |
| 
 | |
|         raised_errors = []
 | |
| 
 | |
|         def message(msg):
 | |
|             raise ValueError("Did something wrong")
 | |
| 
 | |
|         def catch_error(*args, **kwargs):
 | |
|             raised_errors.append(True)
 | |
| 
 | |
|         self.stream_start()
 | |
|         self.xmpp.exception = catch_error
 | |
|         self.xmpp.add_event_handler('message', message)
 | |
| 
 | |
|         self.recv("""
 | |
|           <message>
 | |
|             <body>This is going to cause an error.</body>
 | |
|           </message>
 | |
|         """)
 | |
| 
 | |
|         self.send("""
 | |
|           <message type="error">
 | |
|             <error type="cancel" code="500">
 | |
|               <undefined-condition
 | |
|                   xmlns="urn:ietf:params:xml:ns:xmpp-stanzas" />
 | |
|               <text xmlns="urn:ietf:params:xml:ns:xmpp-stanzas">
 | |
|                 SleekXMPP got into trouble.
 | |
|               </text>
 | |
|             </error>
 | |
|           </message>
 | |
|         """)
 | |
| 
 | |
|         self.assertEqual(raised_errors, [True], "Exception was not raised: %s" % raised_errors)
 | |
| 
 | |
|     def testUnknownException(self):
 | |
|         """Test Sleek continues to respond after an unknown exception."""
 | |
| 
 | |
|         raised_errors = []
 | |
| 
 | |
|         def message(msg):
 | |
|             raise ValueError("Did something wrong")
 | |
| 
 | |
|         def catch_error(*args, **kwargs):
 | |
|             raised_errors.append(True)
 | |
| 
 | |
|         self.stream_start()
 | |
|         self.xmpp.exception = catch_error
 | |
|         self.xmpp.add_event_handler('message', message)
 | |
| 
 | |
|         self.recv("""
 | |
|           <message>
 | |
|             <body>This is going to cause an error.</body>
 | |
|           </message>
 | |
|         """)
 | |
| 
 | |
|         self.send("""
 | |
|           <message type="error">
 | |
|             <error type="cancel" code="500">
 | |
|               <undefined-condition
 | |
|                   xmlns="urn:ietf:params:xml:ns:xmpp-stanzas" />
 | |
|               <text xmlns="urn:ietf:params:xml:ns:xmpp-stanzas">
 | |
|                 SleekXMPP got into trouble.
 | |
|               </text>
 | |
|             </error>
 | |
|           </message>
 | |
|         """)
 | |
| 
 | |
|         self.recv("""
 | |
|           <message>
 | |
|             <body>This is going to cause an error.</body>
 | |
|           </message>
 | |
|         """)
 | |
| 
 | |
|         self.send("""
 | |
|           <message type="error">
 | |
|             <error type="cancel" code="500">
 | |
|               <undefined-condition
 | |
|                   xmlns="urn:ietf:params:xml:ns:xmpp-stanzas" />
 | |
|               <text xmlns="urn:ietf:params:xml:ns:xmpp-stanzas">
 | |
|                 SleekXMPP got into trouble.
 | |
|               </text>
 | |
|             </error>
 | |
|           </message>
 | |
|         """)
 | |
| 
 | |
|         self.assertEqual(raised_errors, [True, True], "Exceptions were not raised: %s" % raised_errors)
 | |
| 
 | |
| 
 | |
| suite = unittest.TestLoader().loadTestsFromTestCase(TestStreamExceptions)
 | 
