首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 软件管理 > 软件架构设计 >

JGroups(二)

2012-08-09 
JGroups(2)转载自:http://whitesock.iteye.com/blog/199269?2 API2.1 Interfaces2.1.1 Transport??? Trans

JGroups(2)

转载自:http://whitesock.iteye.com/blog/199269

?

2 API
2.1 Interfaces
2.1.1 Transport
??? Transport接口只定义了最简单的方法,用于发送和接收消息。其定义如下:

Java代码??JGroups(二)
  1. public?interface?Transport?{??
  2. ????void?send(Message?msg)?throws?Exception;??
  3. ????Object?receive(long?timeout)?throws?Exception;??
  4. }??

2.1.2 MessageListener
??? 如果说Transport接口是以pull的方式接收消息,那么MessageListener则是以push的方式处理消息。当收到消息时,receive方法会被调用。getState() 和setState()方法用于在实例间传递状态。其定义如下:

Java代码??JGroups(二)
  1. public?interface?MessageListener?{??
  2. ????void?receive(Message?msg);??
  3. ????byte[]?getState();??
  4. ????void?setState(byte[]?state);??
  5. }??

2.1.3 ExtendedMessageListener
??? ExtendedMessageListener继承自MessageListener,它定义了用来在实例间部分传递状态的方法。如果需要传递的状态数据量很大,那么通过配置协议栈,也可以指定使用流的方式传递状态。其定义如下:

Java代码??JGroups(二)
  1. public?interface?ExtendedMessageListener?extends?MessageListener?{??
  2. ????byte[]?getState(String?state_id);??
  3. ????void?setState(String?state_id,?byte[]?state);??
  4. ??
  5. ????void?getState(OutputStream?ostream);??
  6. ????void?setState(InputStream?istream);??
  7. ??
  8. ????void?getState(String?state_id,?OutputStream?ostream);??
  9. ????void?setState(String?state_id,?InputStream?istream);??
  10. }??

2.1.4 MembershipListener
??? 当收到view、suspicion message和block event 的时候,相应的方法会被调用。这个接口常用的方法是viewAccepted(),以便在新的实例加入(或者离开)到集群时得到通知。当JGroups推测某个实例可能崩溃时(此时该实例并未离开集群),suspect()方法会被调用,目前没有unsuspect()方法。当JGroups需要通知集群中的实例不要发送消息时,block()方法会被调用。这通常需要配置FLUSH协议,例如为了确保在进行状态传递的时候,没有实例在发送消息。在block()方法返回后,所有发送消息的线程都会被阻塞,知道FLUSH协议解除阻塞。需要注意的是,block()方法内不应该执行耗时的操作,否则整个FLUSH协议都会被阻塞。其定义如下:

Java代码??JGroups(二)
  1. public?interface?MembershipListener?{??
  2. ????void?viewAccepted(View?new_view);??
  3. ????void?suspect(Address?suspected_mbr);??
  4. ????void?block();??
  5. }??

2.1.5 ExtendedMembershipListener
??? ExtendedMembershipListener继承自MembershipListener。当FLUSH协议解除阻塞的时候,unblock()方法会被调用,所有发送消息的线程可以继续发送消息。其定义如下:

Java代码??JGroups(二)
  1. public?interface?ExtendedMembershipListener?extends?MembershipListener?{??
  2. ????void?unblock();??
  3. }??

2.1.6 ChannelListener
??? 可以通过调用JChannel接口的addChannelListener(ChannelListener listener)方法来添加ChannelListener。当Channel被连接或者关闭时,相应的方法会北调用。其定义如下:

Java代码??JGroups(二)
  1. public?interface?ChannelListener?{??
  2. ????void?channelConnected(Channel?channel);??
  3. ????void?channelDisconnected(Channel?channel);??
  4. ????void?channelClosed(Channel?channel);??
  5. ????void?channelShunned();??
  6. ????void?channelReconnected(Address?addr);??
  7. }??

2.1.7 Receiver
??? Receiver继承自MessageListener和MembershipListener。其定义如下:

Java代码??JGroups(二)
  1. public?interface?Receiver?extends?MessageListener,?MembershipListener?{??
  2. }??

2.1.8 ExtendedReceiver
??? ExtendedReceiver继承自Receiver、ExtendedMessageListener和ExtendedMembershipListener。其定义如下:

Java代码??JGroups(二)
  1. public?interface?ExtendedReceiver?extends?Receiver,?ExtendedMessageListener,?ExtendedMembershipListener?{??
  2. }??

?

2.2 Channel
2.2.1 Creating a channel
??? 最常见的创建Channel的方法是通过构造函数,此外也可以通过工厂方法。需要注意的是,集群中所有的实例必须有相同的协议栈。JChannel的构造函数之一如下:

Java代码??JGroups(二)
  1. public?JChannel(String?properties)?throws?ChannelException?{??
  2. ????this(ConfiguratorFactory.getStackConfigurator(properties));??
  3. }??

??? 以上的构造函数中,properties参数是冒号分割的字符串,用来配置协议栈。字符串的最左端的元素定义了最底层的协议。如果properties为null,那么将使用缺省的协议栈,即jgroups-all.jar中的udp.xml。以下是个properties参数的例子:

Java代码??JGroups(二)
  1. String?props="UDP(mcast_addr=228.1.2.3;mcast_port=45566;ip_ttl=32):"?+??
  2. "PING(timeout=3000;num_initial_members=6):"?+??
  3. "FD(timeout=5000):"?+??
  4. "VERIFY_SUSPECT(timeout=1500):"?+??
  5. "pbcast.NAKACK(gc_lag=10;retransmit_timeout=3000):"?+??
  6. "UNICAST(timeout=300,600,1200):"?+??
  7. "FRAG:"?+??
  8. "pbcast.GMS(join_timeout=5000;shun=false;print_local_addr=true)";??

??? 此外,也可以用File和URL作为构造函数的参数,这种方式允许以本地或者远程的XML文件配置协议栈。XML文件的config节点中的每个子节点定义一个协议,第一个子节点定义了最底层的协议。每个子节点名都对应一个Java类名,缺省的协议名不必是全限定类名,它们位于org.jgroups.stack.protocols包中。如果是自定义的协议,那么则必须是全限定类名。每个协议可以有零个或多个属性,以name/value对的方式指定。以下是jgroups-all.jar中的udp.xml的内容:

Xml代码??JGroups(二)
  1. <config>??
  2. ????<UDP??
  3. ?????????mcast_addr="${jgroups.udp.mcast_addr:228.10.10.10}"??
  4. ?????????mcast_port="${jgroups.udp.mcast_port:45588}"??
  5. ?????????tos="8"??
  6. ?????????ucast_recv_buf_size="20000000"??
  7. ?????????ucast_send_buf_size="640000"??
  8. ?????????mcast_recv_buf_size="25000000"??
  9. ?????????mcast_send_buf_size="640000"??
  10. ?????????loopback="false"??
  11. ?????????discard_incompatible_packets="true"??
  12. ?????????max_bundle_size="64000"??
  13. ?????????max_bundle_timeout="30"??
  14. ?????????use_incoming_packet_handler="true"??
  15. ?????????ip_ttl="${jgroups.udp.ip_ttl:2}"??
  16. ?????????enable_bundling="true"??
  17. ?????????enable_diagnostics="true"??
  18. ?????????thread_naming_pattern="cl"??
  19. ??
  20. ?????????use_concurrent_stack="true"??
  21. ??
  22. ?????????thread_pool.enabled="true"??
  23. ?????????thread_pool.min_threads="2"??
  24. ?????????thread_pool.max_threads="8"??
  25. ?????????thread_pool.keep_alive_time="5000"??
  26. ?????????thread_pool.queue_enabled="true"??
  27. ?????????thread_pool.queue_max_size="1000"??
  28. ?????????thread_pool.rejection_policy="Run"??
  29. ??
  30. ?????????oob_thread_pool.enabled="true"??
  31. ?????????oob_thread_pool.min_threads="1"??
  32. ?????????oob_thread_pool.max_threads="8"??
  33. ?????????oob_thread_pool.keep_alive_time="5000"??
  34. ?????????oob_thread_pool.queue_enabled="false"??
  35. ?????????oob_thread_pool.queue_max_size="100"??
  36. ?????????oob_thread_pool.rejection_policy="Run"/>??
  37. ??
  38. ????<PING?timeout="2000"??
  39. ????????????num_initial_members="3"/>??
  40. ????<MERGE2?max_interval="30000"??
  41. ????????????min_interval="10000"/>??
  42. ????<FD_SOCK/>??
  43. ????<FD?timeout="10000"?max_tries="5"???shun="true"/>??
  44. ????<VERIFY_SUSPECT?timeout="1500"??/>??
  45. ????<BARRIER?/>??
  46. ????<pbcast.NAKACK?use_stats_for_retransmission="false"??
  47. ???????????????????exponential_backoff="150"??
  48. ???????????????????use_mcast_xmit="true"?gc_lag="0"??
  49. ???????????????????retransmit_timeout="50,300,600,1200"??
  50. ???????????????????discard_delivered_msgs="true"/>??
  51. ????<UNICAST?timeout="300,600,1200"/>??
  52. ????<pbcast.STABLE?stability_delay="1000"?desired_avg_gossip="50000"??
  53. ???????????????????max_bytes="1000000"/>??
  54. ????<VIEW_SYNC?avg_send_interval="60000"???/>??
  55. ????<pbcast.GMS?print_local_addr="true"?join_timeout="3000"??
  56. ????????????????shun="false"??
  57. ????????????????view_bundling="true"/>??
  58. ????<FC?max_credits="500000"??
  59. ????????????????????min_threshold="0.20"/>??
  60. ????<FRAG2?frag_size="60000"??/>??
  61. ????<!--pbcast.STREAMING_STATE_TRANSFER?/-->??
  62. ????<pbcast.STATE_TRANSFER??/>??
  63. ????<!--?pbcast.FLUSH??/-->??
  64. </config>??

??? 以上XML文件中,UDP协议的mcast_addr属性被指定使用jgroups.udp.mcast_addr系统属性,如果没有配置这个系统属性,那么使用缺省值228.10.10.10。

2.2.2 Setting options
??? 通过setOpt(int option, Object value)方法可以给Channel设置属性,目前支持的属性有:

  • Channel.BLOCK 这是一个Boolean型的属性,缺省是false。如果设置成true,那么会接收到block message。
  • Channel.LOCAL这是一个Boolean型的属性,缺省设置成true。如果是true,那么当集群中的实例向集群发送消息时,这个实例本身也会收到这个消息。
  • Channel.AUTO_RECONNECT这是一个Boolean型的属性,缺省是false。如果设置成true,那么shunned channel 在离开集群后,会自动重新加入到集群中。
  • Channel.AUTO_GETSTATE 这是一个Boolean型的属性,缺省是false。如果设置成true,那么shunned channel在自动重新加入到集群后,会自动尝试向集群的coordinator 获得集群的状态(需要AUTO_RECONNECT也设置成true)。

    ??? 通过Object getOpt(int option)可以或者channel的相关属性值。

    2.2.3 Connecting/Disconnecting
    ??? 通过调用connect(String cluster_name) throws ChannelException方法连接到集群。cluster_name参数指定了集群的名称。集群中的第一个实例被称为coordinator。当集群中的成员发生变化的时候,coordinator会向集群中的其它实例发送view message。
    ??? 也可以通过调用void connect(String cluster_name, Address target, String state_id,long timeout) throws ChannelException方法连接到集群,并从集群中请求当前的状态。与将这两个操作分开进行相比,在一个方法调用内完成这两个操作,可以允许JGroups对发送的消息进行优化,更重要的是,从调用者角度来看,这两个操作被合并成一个原子操作。cluster_name参数用于指定集群的名称。target参数指定了从集群中的哪个实例获得状态,如果该参数为null,那么会从coordinator获得。如果希望传递部分的状态,那么state_id参数可以用于指定状态id。
    ??? 当Channel连接到集群后,通过调用String getClusterName()方法可以获得当前连接到的集群名称。通过调用Address getLocalAddress()方法可以获得channel的地址。通过调用View getView()方法可以获得channel的当前view。每当Channel收到view message的时候,channel的当前view就会被更新。
    ??? 通过调用void disconnect()方法以断开到集群的连接。如果channel已经并没有连接到集群,或者chaneel已经被close,那么调用这个方法没有任何效果。如果channel已经连接到集群,那么这个方法内会向coordinator发送一个离开请求,同时coordinator会向集群中的所有其它实例发送view message,以通知它们该实例的离开。断开连接的channel可以通过调用connect()方法重新连接到集群。
    ??? 通过void close()方法以释放channel占有的资源。Channel被close之后,调用channel上的任何方法都可能会导致异常。

    2.2.4 Sending messages
    ??? 当channel连接到集群后,可以通过以下方法发送消息。第一个send方法接受一个Message型的参数,如果msg的目标地址不是null,那么消息会发送到指定地址,否则会发送到集群内的所有成员。msg的源地址可以不必手工设置,如果是null,那么会被协议栈的最底层协议(传输协议)设置成channel的本地地址。第二个send方法在内部使用了一个send方法。

    Java代码??JGroups(二)
    1. void?send(Message?msg)?throws?ChannelNotConnectedException,?ChannelClosedException???
    2. void?send(Address?dst,?Address?src,?Serializable?obj)?throws?ChannelNotConnectedException,?ChannelClosedException??

    ??? 以下是个发送消息的例子:

    Java代码??JGroups(二)
    1. Hashtable?data;??
    2. try?{??
    3. ????Address?receiver=channel.getView().getMembers().first();??
    4. ????channel.send(receiver,?null,?data);??
    5. }??
    6. catch(Exception?ex)?{??
    7. ????//?handle?errors??
    8. }??

    2.2.5 Receiving messages
    ??? Channel 以异步的方式从网络上接收消息,然后把消息存放在一个无界队列中。当调用receive()方法时,会尝试返回队列中取得下一个可用的消息。如果队列中没有消息,那么会被阻塞。如果timeout小于等于零,那么会永远等待下去;否则会等待timeout指定的毫秒数,直到收到消息或者抛出TimeoutException。需要注意的是,JChannel. receive(long timeout)方法已经被标记为deprecated。根据channel options的不同,receive()方法可能返回以下类型的对象:

    • Message 普通消息。Message.makeReply()可以同于创建消息的应答,即以当前消息的源地址作为应答消息的目标地址。
    • View 当集群的成员发生变化的时候,集群的每个成员都会收到view message。当两个或者多个子集群(subgroups)合并成一个的时候,集群中的成员会收到MergeView message。如果需要在子集群合并时处理子集群状态的合并,那么通常需要在单独的线程里执行耗时的操作。
    • SuspectEvent 当集群中的某个成员被怀疑崩溃时,集群中的其它成员会收到SuspectEvent message。通过调用SuspectEvent.getMember()可以得到可疑成员的地址。在收到这个消息后,通常还会收到view message。
    • BlockEvent 当收到BlockEvent message后,实例应该停止发送消息,然后应该调用Channel.blockOk()方法(目前JChannel.blockOk()方法是一个空方法)确认已经停止发送消息。当Channel.blockOk()方法调用完毕之后,所有发送消息的线程都会被阻塞直到FLUSH协议解除阻塞。为了接收BlockEvent message,需要设置Channel.BLOCK属性为true。
    • UnblockEvent 当收到UnblockEvent message后,实例可以继续发送消息。
    • GetStateEvent 当收到GetStateEvent message后,实例应该保存当前的状态,并将当前状态的一份拷贝作为参数调用Channel.returnState()方法,然后JGroups会将这个状态返回给请求状态的实例。为了接收GetStateEvent message,需要在协议栈中配置pbcast.STATE_TRANSFER协议。
    • StreamingGetStateEvent当收到StreamingGetStateEvent message后,实例应该通过StreamingGetStateEvent.getArg()返回的输出流返回状态。为了接收StreamingGetStateEvent message,需要在协议栈中配置pbcast.STREAMING_STATE_TRANSFER协议。
    • SetStateEvent当收到SetStateEvent message后,实例应该通过SetStateEvent.getArg()返回的字节数组取得状态。
    • StreamingSetStateEvent当收到StreamingSetStateEvent message后,实例应该通过StreamingSetStateEvent.getArg()返回的输入流取得状态。为了接收StreamingSetStateEvent message,需要在协议栈中配置pbcast.STREAMING_STATE_TRANSFER协议。

      ??? 以下是个使用pull方式接收消息的例子:

      Java代码??JGroups(二)
      1. import?java.io.BufferedReader;??
      2. import?java.io.InputStreamReader;??
      3. import?java.util.HashMap;??
      4. import?java.util.Iterator;??
      5. import?java.util.Map;??
      6. ??
      7. import?org.jgroups.BlockEvent;??
      8. import?org.jgroups.Channel;??
      9. import?org.jgroups.GetStateEvent;??
      10. import?org.jgroups.JChannel;??
      11. import?org.jgroups.Message;??
      12. import?org.jgroups.SetStateEvent;??
      13. import?org.jgroups.UnblockEvent;??
      14. import?org.jgroups.View;??
      15. import?org.jgroups.util.Util;??
      16. ??
      17. public?class?PollStyleReceiver?implements?Runnable?{??
      18. ????//??
      19. ????private?JChannel?channel;??
      20. ????private?Map<String,?String>?state?=?new?HashMap<String,?String>();??
      21. ????private?String?properties?=?"UDP(mcast_addr=228.1.2.3;mcast_port=45566;ip_ttl=32):"?+??
      22. ????"PING(timeout=3000;num_initial_members=6):"?+??
      23. ????"FD(timeout=5000):"?+??
      24. ????"VERIFY_SUSPECT(timeout=1500):"?+??
      25. ????"pbcast.NAKACK(gc_lag=10;retransmit_timeout=3000):"?+??
      26. ????"UNICAST(timeout=300,600,1200):"?+??
      27. ????"FRAG:"?+??
      28. ????"pbcast.GMS(join_timeout=5000;shun=false;print_local_addr=true):"?+???
      29. ????"pbcast.STATE_TRANSFER:"?+???
      30. ????"pbcast.FLUSH";??
      31. ??????
      32. ??????
      33. ????public?void?start()?throws?Exception?{??
      34. ????????//??
      35. ????????channel?=?new?JChannel(properties);??
      36. ????????channel.connect("PollStyleReceiver");??
      37. ????????channel.setOpt(Channel.BLOCK,?Boolean.TRUE);??
      38. ????????channel.getState(null,?10000);??
      39. ??????????
      40. ????????new?Thread(this).start();??
      41. ??????????
      42. ????????sendMessage();??
      43. ??????????
      44. ????????channel.close();??
      45. ????}??
      46. ??????
      47. ????@SuppressWarnings({?"unchecked",?"deprecation"?})??
      48. ????public?void?run()?{??
      49. ????????while(true)?{??
      50. ????????????try?{??
      51. ????????????????Object?obj?=?channel.receive(0);??
      52. ????????????????if(obj?instanceof?Message)?{??
      53. ????????????????????System.out.println("received?a?regular?message:?"?+?(Message)obj);??
      54. ????????????????????String?s?=?(String)((Message)obj).getObject();??
      55. ????????????????????String?key?=?s.substring(0,?s.indexOf("="));??
      56. ????????????????????String?value?=?s.substring(s.indexOf("=")?+?1);??
      57. ????????????????????state.put(key,?value);??
      58. ????????????????}?else?if(obj?instanceof?View)?{??
      59. ????????????????????System.out.println("received?a?View?message:?"?+?(View)obj);??
      60. ????????????????}?else?if(obj?instanceof?BlockEvent)?{??
      61. ????????????????????System.out.println("received?a?BlockEvent?message:?"?+?(BlockEvent)obj);??
      62. ????????????????????channel.blockOk();??
      63. ????????????????}?else?if(obj?instanceof?UnblockEvent)?{??
      64. ????????????????????System.out.println("received?a?UnblockEvent?message:?"?+?(UnblockEvent)obj);??
      65. ????????????????}?else?if(obj?instanceof?GetStateEvent)?{??
      66. ????????????????????System.out.println("received?a?GetStateEvent?message:?"?+?(GetStateEvent)obj);??
      67. ????????????????????channel.returnState(Util.objectToByteBuffer(copyState(state)));??
      68. ????????????????}?else?if(obj?instanceof?SetStateEvent)?{??
      69. ????????????????????System.out.println("received?a?SetStateEvent?message:?"?+?(SetStateEvent)obj);??
      70. ????????????????????this.state?=?(Map<String,?String>)Util.objectFromByteBuffer(((SetStateEvent)obj).getArg());??
      71. ????????????????????System.out.println("current?state:?"?+?printState(this.state));??
      72. ????????????????}?else?{??
      73. ????????????????????System.out.println(obj);??
      74. ????????????????}??
      75. ????????????}?catch(Exception?e)?{??
      76. ????????????????e.printStackTrace();??
      77. ????????????????break;??
      78. ????????????}??
      79. ????????}??
      80. ????}??
      81. ??????
      82. ????private?void?sendMessage()?throws?Exception?{??
      83. ????????boolean?succeed?=?false;??
      84. ????????BufferedReader?br?=?null;??
      85. ????????try?{??
      86. ????????????br?=?new?BufferedReader(new?InputStreamReader(System.in));??
      87. ????????????while(true)?{??
      88. ????????????????System.out.print(">?");??
      89. ????????????????System.out.flush();??
      90. ????????????????String?line?=?br.readLine();??
      91. ????????????????if(line?!=?null?&&?line.equals("exit"))?{??
      92. ????????????????????break;??
      93. ????????????????}?else?if(line.indexOf("=")?>?0?||?line.indexOf("=")?==?line.length()?-?1){??
      94. ????????????????????Message?msg?=?new?Message(null,?null,?line);??
      95. ????????????????????channel.send(msg);??
      96. ????????????????}?else?{??
      97. ????????????????????System.out.println("invalid?input:?"?+?line);??
      98. ????????????????}??
      99. ????????????}??
      100. ????????????succeed?=?true;??
      101. ????????}?finally?{??
      102. ????????????if(br?!=?null)?{??
      103. ????????????????try?{??
      104. ????????????????????br.close();??
      105. ????????????????}?catch?(Exception?e)?{??
      106. ????????????????????if(succeed)?{??
      107. ????????????????????????throw?e;??
      108. ????????????????????}??
      109. ????????????????}??
      110. ????????????}??
      111. ????????}??
      112. ????}??
      113. ??????
      114. ????private?Map<String,?String>?copyState(Map<String,?String>?s)?{??
      115. ????????Map<String,?String>?m?=?new?HashMap<String,?String>();???
      116. ????????for(String?key?:?s.keySet())?{??
      117. ????????????m.put(key,?s.get(key));??
      118. ????????}??
      119. ????????return?m;??
      120. ????}??
      121. ??????
      122. ????private?String?printState(Map<String,?String>?s)?{??
      123. ????????StringBuffer?sb?=?new?StringBuffer();??
      124. ????????sb.append("[");??
      125. ????????for(Iterator<String>?iter?=?s.keySet().iterator();?iter.hasNext();?)?{??
      126. ????????????String?key?=?iter.next();??
      127. ????????????sb.append(key).append("=");??
      128. ????????????sb.append(s.get(key));??
      129. ????????????if(iter.hasNext())?{??
      130. ????????????????sb.append(",?");??
      131. ????????????}??
      132. ????????}??
      133. ????????sb.append("]");??
      134. ????????return?sb.toString();??
      135. ????}??
      136. ??????
      137. ????public?static?void?main(String?args[])?throws?Exception?{??
      138. ????????new?PollStyleReceiver().start();??
      139. ????}??
      140. }??

      ??? 程序启动后,程序会将在命令行键入键值对(例如key1=value1)保存到HashMap中。并允许在不同的实例间传递状态。
      ??? 除了以poll的方式接收消息外,JGroups也支持以push的方式处理消息。通过向JChannel注册Receiver,允许程序以回调的方式处理消息,而不必启动额外的线程来接收消息,同时JGroups在内部也不用使用无界队列来保存消息。一下是个使用push方式处理消息的例子:

      Java代码??JGroups(二)
      1. JChannel?ch?=?new?JChannel();??
      2. ch.setReceiver(new?ExtendedReceiverAdapter()?{??
      3. ????public?void?receive(Message?msg)?{??
      4. ????????System.out.println("received?message?"?+?msg);??
      5. ????}??
      6. ????public?void?viewAccepted(View?new_view)?{??
      7. ????????System.out.println("received?view?"?+?new_view);??
      8. ????}??
      9. });??
      10. ch.connect("bla");??

      2.2.6 State transfer
      ??? JGroups支持在集群中维护和传递状态(state),例如web server的Http Sessions等。集群中的某个实例可以通过JChannel.send()方法发送消息,从而把对状态的修改同步到集群的其它实例中。当一个新的实例加入到集群后,可以调用JChannel.getState()方法向集群中的某个实例(缺省是coordinator)请求获得当前的状态。需要注意的是,JChannel.getState()方法返回的是boolean类型。如果该实例是集群中的第一个实例,那么该方法返回false(也就是说目前没有状态),否则返回true。在接下来JChannel.receive()方法的返回值中会包含SetStateEvent message,或者通过MembershipListener.setState()方法获得状态。JChannel.getState()方法不直接返回状态的原因是,如果JChannel的消息队列中还有未被处理的消息,那么让JChannel.getState()直接返回状态,会破坏消息接收的FIFO顺序保证,传递的状态也会不正确。

      ??? 假设某个集群中包含A、B 和C三个成员,当D加入到集群的时候,如果D调用了JChannel.getState(),那么会发生以下的调用序列:

      1. D 调用 JChannel.getState()方法。假设状态从集群中的A成员取得。
      2. A 收到GetStateEvent message或者A注册的Receiver的getState() 方法被调用。A返回了当前状态。
      3. D 调用 JChannel.getState()方法返回,返回值是true。
      4. D 收到SetStateEvent message或者D注册的Receiver的setState()方法被调用。D取得状态。

      ??? 2.2.5节的例子中包含了状态传递相关的代码,需要注意的是,在调用JChannel.returnState()方法的时候,为了防止在状态被通过网络发送前,程序的其它地方对状态进行了修改(比如接收到新的消息并更新状态),需要传递当前状态的一份拷贝。
      ??? 除了通过处理GetStateEvent 和 SetStateEvent消息来传递状态之外,JGroups也支持通过Reciever传递状态,例如在第一章中演示的例子。
      ??? JGroups支持传递部分状态和以流的形式传递状态,详细内容可以参考JGroups Manual。

热点排行