学习Zookeeper第一课
安装单机版Zookeeper
基本命令
ACL权限模块
curator客户端操作ZK
Watcher机制
zk的应用场景
安装单机版Zookeeper
下载地址:https://zookeeper.apache.org/
复制到/usr/tools然后解压缩到/usr/apps tar -xzvf /usr/tools/zookeeper.tar.gz -C /usr/apps
创建一个软连接指向 /usr/apps ln -s /usr/apps/zookeeper/ /usr/apps/zk
进入conf目录下,找到zoo_sample.cfg文件复制一份重命名为zoo.cfg
修改配置为中的 dataDir配置,指定zk数据存放目录,并且创建该文件夹。
打开/etc/profile文件修改环境变量,注意zk依赖JDK所以也要下载JDK并且配置环境变量。最后刷新环境变量source/etc/profile
基本命令
zkServer.sh start 启动zk。
zkServer.sh status 查看zk状态。
zkServer.sh restart 重启zk。
zkServer.sh stop 关闭zk。
zkCli.sh 打开客户端,如果想要连接其他机器的zk命令为 zkCli.sh -sever ip:port 第二张图是我的Windows连接的Linux虚拟机的zk。
zk的整体结构是树形结构,可以想象成文件夹,每个节点有名称和值,每个节点也可以有子节点,子节点也可以有子节点。根目录为 / 默认的情况下会有一个/zookeeper的节点,它的下边有config和quota节点。
使用 ls path 命令查看,当前我的根路径还有一个dubbo节点,这是我的dubbo服务。有子节点的会显示[子节点列表],没有子节点的就是空数组[]
查看节点内容 get path 节点是可以没有内容的,下图dubbo节点有内容,而zookeeper节点没有内容,dubbo子节点api.service.ProtocolService是有内容的。
create path value 创建节点,这个命令不支持多层目录创建。
create -s path value 创建带顺序序号的节点,被创建的节点会在名称后边自动追加编号,类似于mysql的自增id,这个特性意味着在一个节点下创建N个顺序子节点这些子节点的名称不会重复。
一般的需要是从一串0000000000000开始的,下图是因为我这个目录下已经创建并且删除过很多节点了。
create -e path value (也可以带 -s 带编号)创建临时节点,临时节点的意思是当前客户端连接如果关闭了,那么这个节点就会自动删除,并且临时节点不能有子节点。
如果发现客户端被关闭后临时节点没有被删除,就等一会,在查看。
set path value 修改节点内容。
delete path 删除节点,如果该节点有子节点则无法删除,只能一级一级删除。
ACL权限模块
zk支持对某个节点(子节点不会继承父节点的权限)做权限限制,ACL分为三部分,总结起来说就是给某个节点设置通过某种方式上一把锁,这把锁有不同的权限。
权限授权
授权分为四个维度,ip,digest,word(默认操作,不限制权限),super(超级用户拥有所有节点的所有权限)
授权对象
ip:限制节点仅能为指定IP操作。digest:设置账号密码,只有当前连接使用该账号密码验证后才可以操作。word:授权对象是anyone。super:超级用户拥有所有节点的所有权限,也是账号密码验证。
权限permission
c:允许授权对象在当前节点下创建子节点。d:允许授权对象删除当前节点。r:允许授权对象读取当前节点的数据内容,及子节点列表。w:允许授权对象修改当前节点的数据内容,及子节点列表。a:允许授权对象对当前节点进行 ACL 相关的设置。
getAcl path 查看节点权限。创建的节点默认都是word没有做限制。
addAcl path ip p 给节点设置某个IP才可以访问。
有一个删除BUG,就算是客户端对这个节点没有任何权限但是它却可以删除这个节点。我当前的zk是3.6.2不指定是不是只有这一个版本有这个问题。
此处需要补充的是这里的删除并不是BUG,我查阅了官方文档,d权限是针对子节点的,也就是说d是保护子节点不被删除,而不是节点本身。
设置账号密码权限有两个步骤
1 给当前客户端增加一个权限用户 addauth digest 账号:密码
2 给节点设置权限用到的账号密码就是上一步创建的 setAcl path auth:账号:密码:权限
下图在虚拟机客户端设置了一个用户密码并且给节点设置了权限,在本机则无法访问。
给本机客户端登录了用户后则可以做操作。
此处需要补充的是这里的删除并不是BUG,我查阅了官方文档,d权限是针对子节点的,也就是说d是保护子节点不被删除,而不是节点本身。
超级用户super。如果设置了密码但是给忘记了,可以登录super用户登录。super用户需要在zk启动时配置,具体是在zkServer.sh
启动脚本中配置。这个密码需要SHA1加密后在base64Encode,在文章下边的 curator 客户端中有专门的加密包。
curator客户端操作ZK
Watcher机制
zk中最大的特点就是watcher机制,它能够在节点上注册一些事件,节点触发事件后调用注册者的回调。
watcher的三个特性
zk的应用场景
1 集群状态监控
ClusterMonitoringTest(监控平台)的main方法启动后首先创建一个/consumer节点,注册节点子节点变更watcher可以看到watcher触发后又调用了自己,原因是每次watcher触发后就会从WatcherManager中被删除。
使用watcher时必须要注意这一点。三个server类模拟三个服务端,每个服务端启动后在/consumer创建一个临时节点,节点创建后会触发监控平台的watcher,得知有新的服务上线了。当server关闭后节点被删除,
同样触发监控平台的watcher,监控平台就知道服务下线了。
package zk.cluster; import java.util.concurrent.TimeUnit; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; public class Server1 { public static void main(String[] args) throws Exception { CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.31.149:2181") .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); client.start(); client.create().withMode(CreateMode.EPHEMERAL).forPath("/consumer/c1"); System.out.println("服务已经启动"); TimeUnit.MINUTES.sleep(10); } }
View Code
package zk.cluster; import java.util.concurrent.TimeUnit; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; public class Server2 { public static void main(String[] args) throws Exception { CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.31.149:2181") .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); client.start(); client.create().withMode(CreateMode.EPHEMERAL).forPath("/consumer/c2"); System.out.println("服务已经启动"); TimeUnit.MINUTES.sleep(10); } }
View Code
package zk.cluster; import java.util.concurrent.TimeUnit; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; public class Server3 { public static void main(String[] args) throws Exception { CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.31.149:2181") .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); client.start(); client.create().withMode(CreateMode.EPHEMERAL).forPath("/consumer/c3"); System.out.println("服务已经启动"); TimeUnit.MINUTES.sleep(10); } }
View Code
package zk.cluster; import java.util.List; import java.util.concurrent.TimeUnit; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.CuratorWatcher; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; //集群状态监控Test public class ClusterMonitoringTest { public static void childrenWatcher(CuratorFramework client) throws Exception { client.getChildren().usingWatcher((CuratorWatcher) e1 -> { List<String> serverList = client.getChildren().forPath("/consumer"); System.out.println("当前/consumer服务列表" + serverList); System.out.println("-----------------------"); childrenWatcher(client); }).forPath("/consumer"); } public static void main(String[] args) throws Exception { CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.31.149:2181") .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); client.start(); client.create().forPath("/consumer"); childrenWatcher(client); System.out.println("监控平台已经启动"); TimeUnit.MINUTES.sleep(10); } }
View Code
2 DNS服务
DNS服务其实就是集群状态监控的扩展。server启动后会在/consumer节点下创建临时节点,节点值是代表自己的ip。DNSTest启动后每10秒获取一次/consumer的子节点列表,并随机获取其中一个调用这个节点的方法。模拟的效果就是对请求负载均衡。
package zk.dns; import java.util.concurrent.TimeUnit; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; public class Server1 { public static void main(String[] args) throws Exception { CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.31.149:2181") .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); client.start(); client.create().creatingParentContainersIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/consumer/c1", "192.168.0.1:8080".getBytes()); System.out.println("服务已经启动"); TimeUnit.MINUTES.sleep(10); } public static String show() { return "我是服务1"; } }
View Code
package zk.dns; import java.util.concurrent.TimeUnit; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; public class Server2 { public static void main(String[] args) throws Exception { CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.31.149:2181") .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); client.start(); client.create().creatingParentContainersIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/consumer/c2", "192.168.0.2:8080".getBytes()); System.out.println("服务已经启动"); TimeUnit.MINUTES.sleep(10); } public static String show() { return "我是服务2"; } }
View Code
package zk.dns; import java.util.concurrent.TimeUnit; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; public class Server3 { public static void main(String[] args) throws Exception { CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.31.149:2181") .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); client.start(); client.create().creatingParentContainersIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/consumer/c3", "192.168.0.3:8080".getBytes()); System.out.println("服务已经启动"); TimeUnit.MINUTES.sleep(10); } public static String show() { return "我是服务3"; } }
View Code
package zk.dns; import java.util.List; import java.util.Random; import java.util.concurrent.TimeUnit; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; //DNS负载均衡 public class DNSTest { public static void main(String[] args) throws Exception { CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.31.149:2181") .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); client.start(); for (int i = 0; i < 10; i++) { List<String> serverList = client.getChildren().forPath("/consumer"); Random r = new Random(); String server = serverList.get(r.nextInt(serverList.size())); byte[] url = client.getData().forPath("/consumer/" + server); String URL = new String(url); if (URL.equals("192.168.0.1:8080")) { System.out.println(Server1.show()); } else if (URL.equals("192.168.0.2:8080")) { System.out.println(Server2.show()); } else { System.out.println(Server3.show()); } TimeUnit.SECONDS.sleep(10); } } }
View Code
3 远程配置服务
ConfigMaintainServer充当客户端,每个客户端都会对/serverConfig注册节点内容变更的watcher,main方法启动后创建三个客户端,修改/serverConfig节点内容,就会触发ConfigMaintainServer的watcher,客户端修改自己的配置。
package zk; import java.util.concurrent.TimeUnit; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.CuratorWatcher; import org.apache.curator.retry.ExponentialBackoffRetry; //配置文件同步Test public class ConfigMaintainTest { public static void main(String[] args) throws Exception { CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.31.149:2181") .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); client.start(); client.create().forPath("/serverConfig", "1000".getBytes()); ConfigMaintainTest configMaintain = new ConfigMaintainTest(); configMaintain.new ConfigMaintainServer(client, "SERVER1"); configMaintain.new ConfigMaintainServer(client, "SERVER2"); configMaintain.new ConfigMaintainServer(client, "SERVER3"); client.setData().forPath("/serverConfig", "2000".getBytes()); TimeUnit.SECONDS.sleep(10); client.setData().forPath("/serverConfig", "3000".getBytes()); TimeUnit.MINUTES.sleep(1); } class ConfigMaintainServer { private String timeOut; private String name; public ConfigMaintainServer(CuratorFramework client, String name) throws Exception { byte[] data = client.getData().forPath("/serverConfig"); this.setTimeOut(new String(data)); System.out.println(name + "初始化timeOut为" + timeOut); changeDataWatcher(client); this.name = name; } public String getTimeOut() { return timeOut; } private void setTimeOut(String timeOut) { this.timeOut = timeOut; } public void changeDataWatcher(CuratorFramework client) throws Exception { client.getData().usingWatcher((CuratorWatcher) e1 -> { byte[] data = client.getData().forPath("/serverConfig"); this.setTimeOut(new String(data)); System.out.println(name + "内容修改了当前内容为" + this.getTimeOut()); changeDataWatcher(client); }).forPath("/serverConfig"); } } }
View Code
4 获取唯一的id
利用zk可以创建有序节点的特性,创建节点后获取节点的名称这个名称同一个节点下是唯一的,即获取唯一的id值。
package zk; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; //获取不重复IDTest public class GetIdTest { public static void main(String[] args) throws Exception { CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.31.149:2181") .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); client.start(); for (int i = 0; i < 10; i++) { String id = client.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL) .forPath("/order/id_"); client.delete().forPath(id); System.out.println(id); } } }
View Code
5 master选举
当多个集群服务(不限于服务类型)需要选择一个master节点,当master节点挂掉后系统自动选择其他节点作为master节点。每个服务在启动后会调用elect()这个方法会在/master节点上创建一个节点是否存在的watcher,并且判断
如果/master不存在则调用regist()方法,regist()会立刻创建一个临时节点/master,但如果节点已经存在则不做任何操作,只是在重新注册一个watcher监听。例如三个服务同时启动,便会同时触发自身regist()但是只有一个服务会注
册成功,注册失败则表示没有抢先成为master。
package zk.master; import java.util.concurrent.TimeUnit; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.CuratorWatcher; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; //master选举 public class Server1 { public static void main(String[] args) throws Exception { CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.31.149:2181") .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); client.start(); System.out.println("服务已经启动"); elect(client); regist(client); TimeUnit.MINUTES.sleep(10); } public static void regist(CuratorFramework client) { try { client.create().withMode(CreateMode.EPHEMERAL).forPath("/master", "server1".getBytes()); System.out.println("server1顺利成为master"); } catch (Exception e) { System.out.println("server1选举master失败"); } } public static void elect(CuratorFramework client) throws Exception { client.checkExists().usingWatcher((CuratorWatcher) e -> { if (client.checkExists().forPath("/master") == null) { regist(client); } elect(client); }).forPath("/master"); } }
View Code
package zk.master; import java.util.concurrent.TimeUnit; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.CuratorWatcher; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; //master选举 public class Server2 { public static void main(String[] args) throws Exception { CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.31.149:2181") .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); client.start(); System.out.println("服务已经启动"); elect(client); regist(client); TimeUnit.MINUTES.sleep(10); } public static void regist(CuratorFramework client) { try { client.create().withMode(CreateMode.EPHEMERAL).forPath("/master", "server2".getBytes()); System.out.println("server2顺利成为master"); } catch (Exception e) { System.out.println("server2选举master失败"); } } public static void elect(CuratorFramework client) throws Exception { client.checkExists().usingWatcher((CuratorWatcher) e -> { if (client.checkExists().forPath("/master") == null) { regist(client); } elect(client); }).forPath("/master"); } }
View Code
package zk.master; import java.util.concurrent.TimeUnit; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.CuratorWatcher; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; //master选举 public class Server3 { public static void main(String[] args) throws Exception { CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.31.149:2181") .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); client.start(); System.out.println("服务已经启动"); elect(client); regist(client); TimeUnit.MINUTES.sleep(10); } public static void regist(CuratorFramework client) { try { client.create().withMode(CreateMode.EPHEMERAL).forPath("/master", "server3".getBytes()); System.out.println("server3顺利成为master"); } catch (Exception e) { System.out.println("server3选举master失败"); } } public static void elect(CuratorFramework client) throws Exception { client.checkExists().usingWatcher((CuratorWatcher) e -> { if (client.checkExists().forPath("/master") == null) { regist(client); } elect(client); }).forPath("/master"); } }
View Code
6 简单队列
Produce定时向/message节点下创建子节点。Consumer启动后获取/message下的节点列表,然后一个一个消费,消费后删除该节点。当全部节点消费完毕在重新注册/message节点的watcher事件,再次消费。。。
package zk.queue; import java.util.concurrent.TimeUnit; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; //简单的队列消费 public class Produce { public static void main(String[] args) throws Exception { CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.31.150:2181") .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); client.start(); System.out.println("生产者已经启动"); TimeUnit.SECONDS.sleep(10); for (int i = 0; i < 10; i++) { client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/message/msg", ("我是消息" + i).getBytes()); TimeUnit.SECONDS.sleep(10); } } }
View Code
package zk.queue; import java.util.List; import java.util.concurrent.TimeUnit; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.CuratorWatcher; import org.apache.curator.retry.ExponentialBackoffRetry; public class Consumer { public static void main(String[] args) throws Exception { CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.31.150:2181") .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); client.start(); consumerMsg(client); System.out.println("消费者已经启动"); TimeUnit.MINUTES.sleep(10); } public static void consumerMsg(CuratorFramework client) throws Exception { client.getChildren().usingWatcher((CuratorWatcher) e -> { List<String> mes = client.getChildren().forPath("/message"); mes.forEach(s -> { try { System.out.println(new String(client.getData().forPath("/message/" + s))); client.delete().forPath("/message/" + s); } catch (Exception e1) { e1.printStackTrace(); } }); consumerMsg(client); }).forPath("/message"); } }
View Code
7 分布式锁
当我们需要对一个公众资源做操作时就要什么样的情况下要允许操作,什么样的情况下不允许操作,也就是只有获取到锁才可以操作,与Redis锁不同的是Redis锁定一个资源后只有获取该资源的锁才能进行操作,也就是该资源只有一把锁。
而zk分布式锁则将线程操作分为共享锁和同步锁,也就是写写互斥,读写互斥,读读不互斥。当有一个线程操作要对共享资源做操作后首先在公共父节点下创建一个节点,表示一个线程的访问。该节点创建后要根据它自身的情况(读,写)如果
当前线程是读操作,它只需判断它之前有没有写节点,如果没有则可以执行具体的业务操作,如果它之前有写操作,则找到距离自己最近的一个写节点,在这个节点上注册watcher,直到该写节点被删除,则自己才可以执行。如果节点本身是
写节点,那么它前边无论有任何一个节点它都无法执行。于是要找到它的上一个节点,然后注册watcher,直到上一个节点被删除,但此时依然还要再次判断自己前边有没有节点,因为如果该写节点前有N个读节点,那么多个读节点之前是不互斥的
不见得说上一个读节点一定是最后执行结束的。
以下代码片段main启动后会创建6个节点,如果/lock下没有任何节点,那么这6个节点会一个一个执行结束。如果在/lock下预设一个节点无论预设节点是读还是写则只有手动删除该节点后,main创建的6个节点才会自动执行并且删除。setExists
中做的事情就是不断的判断自己前边有没有节点,如果没有则执行自己。getExists则只判断自己前边有没有写节点。recoverWatcher方法是一个恢复watcher的方法,它的场景是预设了一个节点后,程序又创建了6个但是测试我没有手动
删除预设的节点,而是关闭了服务。那么此时预设的6个节点的watcher全部失效。当服务器再次启动则会创建6个新的节点加上之前的7个就是13个但是这13个中只有后边6个有对应的watcher,也就是说此时在删除第一个节点也不会出发后续
节点的自动删除。所以recoverWatcher在启动后获取到节点列表然后重新设置watcher关系。
额外需要注意的是:我们创建的节点一定要是有序节点,getChildren()获取的节点列表是无序的所以需要使用Collections.sort()手动排序。
package zk.lock; import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.CuratorWatcher; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import com.google.common.base.Objects; public class Main { private static int age = 0; private final static String ROTT_NODE_PATH = "/lock"; static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("21.163.13.143:2181") .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); public static void main(String[] args) throws Exception { client.start(); recoverWatcher(client); lock("set-1"); lock("set-2"); lock("get-G"); lock("get-G"); lock("set-3"); lock("get-G"); System.out.println("多线程执行结束"); // 删除节点,包括子节点。 // client.delete().deletingChildrenIfNeeded().forPath(ROTT_NODE_PATH); TimeUnit.MINUTES.sleep(10); } // 恢复之前节点的watcher public static void recoverWatcher(CuratorFramework client) throws Exception { List<String> childList = client.getChildren().forPath(ROTT_NODE_PATH); // 因为getChildren()返回的集合是无序的,所以先进行排序 Collections.sort(childList); // 跳过第一个,例如现在的节点有10个,但是第一个不恢复,因为第一个不会监听任何节点,如果把他恢复了全部节点都会顺序消费,出不了效果。 for (int i = 1; i < childList.size(); i++) { String path = ROTT_NODE_PATH + "/" + childList.get(i); byte[] data = client.getData().forPath(path); String value = new String(data); String[] spl = value.split("-", 2); if (Objects.equal(spl[0], "get")) { getExists(path); } else { setExists(path); } System.out.println(path + "节点已经恢复watcher"); } } // 添加节点 public static void lock(String value) throws Exception { String cur = client.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL) .forPath(ROTT_NODE_PATH + "/a", value.getBytes()); System.out.println("已创建节点:" + cur); String[] spl = value.split("-", 2); if (Objects.equal(spl[0], "get")) { getExists(cur); } else { setExists(cur); } } // get请求只需要判断自己前边有没有set public static void getExists(String cur) throws Exception { List<String> childList = client.getChildren().forPath(ROTT_NODE_PATH); // 因为getChildren()返回的集合是无序的,所以先进行排序 Collections.sort(childList); // set节点 String setNodeName = null; for (String node : childList) { System.out.println("get遍历的节点" + node); try { // 遍历节点 byte[] value = client.getData().forPath(ROTT_NODE_PATH + "/" + node); String v = new String(value); String[] spl = v.split("-", 2); // 找到set节点 if (spl[0].equals("set")) { setNodeName = ROTT_NODE_PATH + "/" + node; } } catch (Exception e) { System.out.println("get请求时/lock/" + node + "节点已经不存在"); e.printStackTrace(); } if ((ROTT_NODE_PATH + "/" + node).equals(cur)) { break; } } // 当前节点前不存在set节点则可以直接读 if (setNodeName == null) { System.out.println("获取当前age为:" + Main.age); client.delete().forPath(cur); } else { // 如果存在则给这个写节点设置watcher,当写节点不存在后才可以读 System.out.println("当前get操作等待当前节点" + cur + "我等待的节点是" + setNodeName); client.checkExists().usingWatcher((CuratorWatcher) e -> { System.out.println("我是" + cur + "我等待的节点" + e.getPath() + "已经解锁"); System.out.println("获取当前age为:" + Main.age); client.delete().forPath(cur); }).forPath(setNodeName); } } // set节点需要递归判断自己前边有没有节点 public static void setExists(String cur) throws Exception { List<String> childList = client.getChildren().forPath(ROTT_NODE_PATH); // 因为getChildren()返回的集合是无序的,所以先进行排序 Collections.sort(childList); String inNode = null; for (String node : childList) { System.out.println("set遍历的节点" + node); try { if ((ROTT_NODE_PATH + "/" + node).equals(cur)) { break; } // 存上一个节点 inNode = ROTT_NODE_PATH + "/" + node; } catch (Exception e) { System.out.println("set请求时/lock/" + node + "节点已经不存在"); e.printStackTrace(); } } // 如果上一个节点是自己说明只有这一个节点,则可以直接写 if (inNode == null) { byte[] value = client.getData().forPath(cur); String val = new String(value); String[] spl = val.split("-", 2); Main.age = Integer.parseInt(spl[1]); System.out.println("写入后的age为:" + age); client.delete().forPath(cur); } else { System.out.println("当前set操作等待当前节点" + cur + "我等待的节点是" + inNode); // 如果之前有节点要等之前的节点被删除,但是被删除后还需要再次判断前边还有没有其他节点 client.checkExists().usingWatcher((CuratorWatcher) e -> { System.out.println("我是" + cur + "我等待的节点" + e.getPath() + "已经解锁"); setExists(cur); }).forPath(inNode); } } }
View Code