分布式助手Zookeeper(四)
Zookeeper是分布式环境下一个重要的组件,因为它能在分布式环境下,给我带来很多便利,大大简化了分布式编程的复杂性,本篇散仙将给出一个模拟例子,来演示下如何使用Zookeeper的API编程,来完成分布式环境下配置的同步。大家都知道在一个中大型的规模的集群中,配置文件通常是必不可少的的东西,很多时候,我都需要将在Master上配置好的配置文件,给分发到各个Slave上,以确保整体配置的一致性,在集群规模小的时候我们可能简单的使用远程拷贝或复制即可完成,但是,当集群规模越来越大的时候,我们发现这种方式不仅繁琐,而且容易出错,最要命的是,以后如果改动配置文件的很少一部分的东西,都得需要把所有的配置文件,给重新远程拷贝覆盖一次,那么,怎样才能避免这种牵一发而动全身的事情呢?
事实上,利用Zookeeper,就能够很容易的,高可靠的帮我们完成这件事,我们只需要把配置文件保存在Zookeeper的znode里,然后通过Watch来监听数据变化,进而帮我们实现同步。一个简单的工作图如下所示:
总结流程如下:
package com.sanjiesanxian;import java.util.concurrent.CountDownLatch;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooKeeper;import org.apache.zookeeper.ZooDefs.Ids;import org.apache.zookeeper.data.Stat;/*** * Zookeeper实现分布式配置同步 * * @author 秦东亮 * * ***/public class SyscConfig implements Watcher{//Zookeeper实例private ZooKeeper zk;private CountDownLatch countDown=new CountDownLatch(1);//同步工具private static final int TIMIOUT=5000;//超时时间private static final String PATH="/sanxian";public SyscConfig(String hosts) { try{zk=new ZooKeeper(hosts, TIMIOUT, new Watcher() {@Overridepublic void process(WatchedEvent event) { if(event.getState().SyncConnected==Event.KeeperState.SyncConnected){//防止在未连接Zookeeper服务器前,执行相关的CURD操作countDown.countDown();//连接初始化,完成,清空计数器}}});}catch(Exception e){e.printStackTrace();}}/*** * 写入或更新 * 数据 * @param path 写入路径 * @param value 写入的值 * **/ public void addOrUpdateData(String path,String data)throws Exception { Stat stat=zk.exists(path, false); if(stat==null){ //没有就创建,并写入 zk.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("新建,并写入数据成功.. "); }else{ //存在,就更新 zk.setData(path, data.getBytes(), -1); System.out.println("更新成功!"); } } /** * 读取数据 * @param path 读取的路径 * @return 读取数据的内容 * * **/ public String readData()throws Exception{ String s=new String(zk.getData(PATH, this, null)); return s; }/** * 关闭zookeeper连接 * 释放资源 * * **/public void close(){try{zk.close();}catch(Exception e){e.printStackTrace();}} public static void main(String[] args)throws Exception {SyscConfig conf=new SyscConfig("10.2.143.5:2181"); conf.addOrUpdateData(PATH, "修真天劫,九死一生。"); conf.addOrUpdateData(PATH, "圣人之下,皆为蝼蚁,就算再大的蝼蚁,还是蝼蚁."); conf.addOrUpdateData(PATH, "努力奋斗,实力才是王道! ");//System.out.println("监听器开始监听........");// conf.readData();// Thread.sleep(Long.MAX_VALUE);//conf.readData();conf.close();}@Overridepublic void process(WatchedEvent event){ try{if(event.getType()==Event.EventType.NodeDataChanged){System.out.println("变化数据: "+readData());} }catch(Exception e){ e.printStackTrace(); }}}
模拟客户端输出如下://客户端监听代码SyscConfig conf=new SyscConfig("10.2.143.5:2181"); conf.addOrUpdateData(PATH, "修真天劫,九死一生。"); conf.addOrUpdateData(PATH, "圣人之下,皆为蝼蚁,就算再大的蝼蚁,还是蝼蚁."); conf.addOrUpdateData(PATH, "努力奋斗,实力才是王道! ");//System.out.println("监听器开始监听........");// conf.readData();// Thread.sleep(Long.MAX_VALUE);//conf.readData();conf.close();更新成功!更新成功!更新成功!
模拟服务端输出如下:public static void main(String[] args)throws Exception {//服务端监听代码SyscConfig conf=new SyscConfig("10.2.143.36:2181");//conf.addOrUpdateData(PATH, "");System.out.println("模拟服务监听器开始监听........"); conf.readData(); Thread.sleep(Long.MAX_VALUE);conf.close();}模拟服务监听器开始监听........数据更新了: 修真天劫,九死一生。数据更新了: 圣人之下,皆为蝼蚁,就算再大的蝼蚁,还是蝼蚁.数据更新了: 努力奋斗,实力才是王道!
至此,使用zookeeper来完成配置同步的服务就完成了,我们可以发现,使用zookeeper来编写分布式程序是非常简单可靠的。
//防止在未连接Zookeeper服务器前,执行相关的CURD操作 countDown.countDown();//连接初始化,完成,清空计数器
但实际没有应用起来
要在读写前加入conf.countDown.await();
这样才能确保链接上了Zookeeper//防止在未连接Zookeeper服务器前,执行相关的CURD操作 countDown.countDown();//连接初始化,完成,清空计数器
但实际没有应用起来
要在读写前加入conf.countDown.await();
这样才能确保链接上了Zookeeper
嗯,谢谢指正!