ZeroMQ之push/pull模式
PUHS: 用于发送消息,定义一个zeromq的socket实例,用于send msg
PULL: 用于接收消息,recv msg
API:http://api.zeromq.org/
?
push/pull模式如下图:(图源自http://zguide.zeromq.org/page:all)
?
用于服务器与客户端消息通信。
下面是服务器给客户端发送消息的代码,服务器PUSH(绑定一个端口号),客户端PULL(连接服务器)
?
?? 服务器端代码??
package pushpull;import org.zeromq.ZMQ;public class Push { public static void main(String[] args) throws InterruptedException {// TODO Auto-generated method stubZMQ.Context context = ZMQ.context(1);ZMQ.Socket sender = context.socket(ZMQ.PUSH);sender.bind("tcp://*:5557");int i = 0;while (true) { Thread.currentThread().sleep(2000); i++; sender.send(("msg" + i).getBytes(), 0);} }}?
?
?
? 客户端A的代码?
package pushpull;import org.zeromq.ZMQ;public class PullA { public static void main(String[] args) {// TODO Auto-generated method stubZMQ.Context context = ZMQ.context(1);ZMQ.Socket receiver = context.socket(ZMQ.PULL);receiver.connect("tcp://localhost:5557");while (true) { // String msg = new String(receiver.recv(0)); System.out.println(new String(receiver.recv(0)));} }}??
??客户端B的代码
?
?
package pushpull;import org.zeromq.ZMQ;public class PullB { public static void main(String[] args) {// TODO Auto-generated method stubZMQ.Context context = ZMQ.context(1);ZMQ.Socket receiver = context.socket(ZMQ.PULL);receiver.connect("tcp://localhost:5557");while (true) { System.out.println(new String(receiver.recv(0)));} }}?
??客户端A接受的消息是??
msg5 msg7 msg9 msg11 ...
?? 客户端B接收的消息是?
msg1msg2 msg3 msg4 msg6 msg8 msg10...
?
可以看出push/pull模式是单向的,很适合消费者能力不足的情况,可以提供多个消费者。一条消息如果被A消费,B将不会再消费这条消息。
如果客户端给服务器发送消息,则服务器PULL(绑定一个端口号),客户端PUSH(连接服务器)