首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 开发语言 > perl python >

python透过stomp协议和hornetq进行连接

2012-09-20 
python通过stomp协议和hornetq进行连接再看HornetQ,因为自己学了python,所以不仅仅希望用Java来连接Hornet

python通过stomp协议和hornetq进行连接

再看HornetQ,因为自己学了python,所以不仅仅希望用Java来连接HornetQ,也希望用python来连接,进行开发。看了HornetQ的手册,里面说的很清楚,HornetQ不支持对stomp消息的持久化。这算是很大的一个缺点。但是毕竟支持了跨计算机语言的功能。我尝试了,并且写了简单的测试。发现开始运行的时候,会有数据丢失。这个问题在JBoss那里有提到,具体的我还有继续研究。HornetQ的stomp支持,请参考手册。测试了很久,发现就在开始的短暂时间有数据丢失,中间运行还是很稳定的。如果对于数据丢失不是很敏感的应用,可以进行测试。需要进行深入研究。

下面是例子的代码:

#-*-coding:utf-8-*-'''Created on 2012-2-20'''import loggingimport stompimport time logging.basicConfig()dest = 'jms.queue.TestQueue' #dest = 'jms.topic.TestTopic' logging.basicConfig()class MyListener(stomp.ConnectionListener):         def on_error(self, headers, message):                 print('received an error %s' % message)         def on_message(self, headers, message):        print '--------------------------------------'        #for k, v in headers.iteritems():                     #    print('header: key %s , value %s' % (k, v))                 print('received message\n %s' % message)       def on_disconnected(self):        """        Called by the STOMP connection when a TCP/IP connection to the        STOMP server has been lost.  No messages should be sent via        the connection until it has been reestablished.        """        pass        def on_connecting(self, host_and_port):        """        Called by the STOMP connection once a TCP/IP connection to the        STOMP server has been established or re-established. Note that        at this point, no connection has been established on the STOMP        protocol level. For this, you need to invoke the "connect"        method on the connection.        \param host_and_port a tuple containing the host name and port        number to which the connection has been established.        """        pass    def on_connected(self, headers, body):        """        Called by the STOMP connection when a CONNECTED frame is        received, that is after a connection has been established or        re-established.        \param headers a dictionary containing all headers sent by the        server as key/value pairs.        \param body the frame's payload. This is usually empty for        CONNECTED frames.        """        pass            def on_heartbeat_timeout(self):        """        Called by the STOMP connection when a heartbeat message has not been        received beyond the specified period.        """        pass    def on_receipt(self, headers, body):        """        Called by the STOMP connection when a RECEIPT frame is        received, sent by the server if requested by the client using        the 'receipt' header.        \param headers a dictionary containing all headers sent by the        server as key/value pairs.        \param body the frame's payload. This is usually empty for        RECEIPT frames.        """        pass        def on_send(self, headers, body):        """        Called by the STOMP connection when it is in the process of sending a message                \param headers a dictionary containing the headers that will be sent with this message                \param body the message payload        """        passtry:    conn = stomp.Connection([('192.168.123.74', 61613)])     conn.set_listener('somename', MyListener())    print('set up Connection')         conn.start()     print('started connection')      conn.connect(wait=True)     print('connected')          while True:        num = 0        count = 99999        while num < count:            try:                num += 1                message = 'hello world ' + str(num)                 conn.send(message=message, destination=dest, headers={'type':'textMessage'}, ack='auto')                 #print 'sent message:', message            except Exception , e:                print '==============', e        print 'It has produce ' + str(count) + ' messages'        time.sleep(2) except Exception , e:    print '----------------- ', e    print('slept') conn.disconnect() print('disconnected')        

?

#-*-coding:utf-8-*-'''Created on 2012-2-20'''import loggingimport stompimport timelogging.basicConfig()class MyListener(stomp.ConnectionListener):    def __init__(self,conn,headers):         super(MyListener,self).__init__()        self.conn = conn        self.headers = headers    def on_error(self, headers, message):                 print('received an error %s' % message)         def on_message(self, headers, message):        print '--------------------------------------'        #for k, v in headers.iteritems():                     #    print('header: key %s , value %s' % (k, v))                 print('received message\n %s' % message)       def on_disconnected(self):        """        Called by the STOMP connection when a TCP/IP connection to the        STOMP server has been lost.  No messages should be sent via        the connection until it has been reestablished.        """        if not  self.conn.is_connected():            print 'Error: conn failure! try to connection again'        sleepTime = 5        print '+++++++++++++++++++++++++++++++++++++++++++'        print 'it will sleep ' + str(sleepTime) + ' seconds.'        time.sleep(sleepTime)        consume()        pass        def on_connecting(self, host_and_port):        """        Called by the STOMP connection once a TCP/IP connection to the        STOMP server has been established or re-established. Note that        at this point, no connection has been established on the STOMP        protocol level. For this, you need to invoke the "connect"        method on the connection.        \param host_and_port a tuple containing the host name and port        number to which the connection has been established.        """        pass    def on_connected(self, headers, body):        """        Called by the STOMP connection when a CONNECTED frame is        received, that is after a connection has been established or        re-established.        \param headers a dictionary containing all headers sent by the        server as key/value pairs.        \param body the frame's payload. This is usually empty for        CONNECTED frames.        """        pass            def on_heartbeat_timeout(self):        """        Called by the STOMP connection when a heartbeat message has not been        received beyond the specified period.        """        pass    def on_receipt(self, headers, body):        """        Called by the STOMP connection when a RECEIPT frame is        received, sent by the server if requested by the client using        the 'receipt' header.        \param headers a dictionary containing all headers sent by the        server as key/value pairs.        \param body the frame's payload. This is usually empty for        RECEIPT frames.        """        pass        def on_send(self, headers, body):        """        Called by the STOMP connection when it is in the process of sending a message                \param headers a dictionary containing the headers that will be sent with this message                \param body the message payload        """        passdef consume():        dest = 'jms.queue.TestQueue'     clientId = 919191    headers={'client-id':clientId}    #dest = 'jms.topic.TestTopic'         conn = stomp.Connection([('192.168.123.74', 61613)])     print('set up Connection')     conn.set_listener('somename', MyListener(conn,headers))     print('Set up listener')      conn.start()    print('started connection')      conn.connect(wait=True,headers=headers)     print('connected')        conn.subscribe(destination=dest, ack='auto')     print('subscribed')    while True:        pass    print('slept')     conn.disconnect()     print('disconnected')    if __name__ == '__main__':        consume()
?

热点排行