zeroMQ初体验-2.发布订阅模式(pub/sub)
pub/sub模式:
发布端(pub)
import itertoolsimport sys import time import zmq def main(): if len (sys.argv) != 2: print 'usage: publisher <bind-to>' sys.exit (1) bind_to = sys.argv[1] all_topics = ['sports.general','sports.football','sports.basketball', 'stocks.general','stocks.GOOG','stocks.AAPL', 'weather'] ctx = zmq.Context() s = ctx.socket(zmq.PUB) s.bind(bind_to) print "Starting broadcast on topics:" print " %s" % all_topics print "Hit Ctrl-C to stop broadcasting." print "Waiting so subscriber sockets can connect..." print time.sleep(1.0) msg_counter = itertools.count() try: for topic in itertools.cycle(all_topics): msg_body = str(msg_counter.next()) print ' Topic: %s, msg:%s' % (topic, msg_body) #s.send_multipart([topic, msg_body]) s.send_pyobj([topic, msg_body]) # short wait so we don't hog the cpu time.sleep(0.1) except KeyboardInterrupt: pass print "Waiting for message queues to flush..." time.sleep(0.5) s.close() print "Done."if __name__ == "__main__": main()
import sysimport timeimport zmqdef main(): if len (sys.argv) < 2: print 'usage: subscriber <connect_to> [topic topic ...]' sys.exit (1) connect_to = sys.argv[1] topics = sys.argv[2:] ctx = zmq.Context() s = ctx.socket(zmq.SUB) s.connect(connect_to) # manage subscriptions if not topics: print "Receiving messages on ALL topics..." s.setsockopt(zmq.SUBSCRIBE,'') else: print "Receiving messages on topics: %s ..." % topics for t in topics: s.setsockopt(zmq.SUBSCRIBE,t) print try: while True: #topic, msg = s.recv_multipart() topic, msg = s.recv_pyobj() print ' Topic: %s, msg:%s' % (topic, msg) except KeyboardInterrupt: pass print "Done."if __name__ == "__main__": main()