[TOC]

波及苗族,每个人想到的都无平等。有的人会面想到身着极有少数民族特点的苗族服饰,身上戴满银饰的漂亮苗族姑娘;有的人会想到她们之吊脚楼,感叹她们以高峰筑房的小聪明;还有的丁见面想到没有到手不错认证,但是以无法推翻的蛊文化,提到这中华民族,我们可想到的东西最多。

Zookeeper客户端Curator使用详解

图片 1

简介

Curator是Netflix公司开源的平等拟zookeeper客户端框架,解决了不少Zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等等。Patrixck
Hunt(Zookeeper)以平等句子“Guava is to Java that Curator to
Zookeeper”给Curator予高度评价。
引子和趣闻:
Zookeeper名字的故是比好玩的,下面的组成部分摘抄自《从PAXOS到ZOOKEEPER分布式一致性原理与实践》一修:
Zookeeper最早起源于雅虎的研究院的一个研究小组。在马上,研究人口发现,在雅虎内部多重型的网要靠一个类似的系统进行分布式协调,但是这些体系往往有分布式单点问题。所以雅虎的开发人员就打算开一个通用的不论是单点问题的分布式协调框架。在立项初期,考虑到广大类型还是为此动物之名来命名的(例如知名的Pig项目),雅虎的工程师希望于这路也落一个动物的名字。时任研究院的首席科学家Raghu
Ramakrishnan开玩笑说:再如此下去,我们这时候就成动物园了。此话一来,大家纷纷表示就让动物园管理员吧——因为各个以动物命名的分布式组件放在一块儿,雅虎的一切分布式系统看上去就是像一个巨型的动物园了,而Zookeeper正好用来拓展分布式环境的协调——于是,Zookeeper的讳由此诞生了。

Curator无疑是Zookeeper客户端中之瑞士军刀,它译作”馆长”或者”管理者”,不知情凡是勿是付出小组有意而也底,笔者猜测发生或这样命名的缘由是说明Curator就是Zookeeper的馆长(脑洞有接触非常:Curator就是动物园的园长)。
Curator包含了几个包:
curator-framework:针对zookeeper的底api的片段卷入
curator-client:供有客户端的操作,例如重试策略等
curator-recipes:卷入了有的高等特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等
Maven依赖(使用curator的版:2.12.0,对承诺Zookeeper的版本为:3.4.x,假如跨越版以会起兼容性问题,很有或造成节点操作失败):

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

极多之总人口啧啧称赞苗族人之小聪明,但要是从苗族人之叙说着来打探之之民族吧,他们的聪明显然是同一种他们没法的选取,那段满目疮痍的故事与他们现在的美好生活,对比起来为人蛮的痛惜。

Curator的基本Api

以西江苗寨的当儿,有空子可以错过苗族人家做客,近距离接触到了优质的苗族小姐姐,和其的对话让我之心灵蒙了充分可怜的震动,真正的开头对这中华民族肃然起敬。

创建会话

图片 2

1.用静态工程措施创建客户端

一个事例如下:

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.newClient(
                        connectionInfo,
                        5000,
                        3000,
                        retryPolicy);

newClient静态工厂方法包含四独主要参数:

参数名 说明
connectionString 服务器列表,格式host1:port1,host2:port2,…
retryPolicy 重试策略,内建有四种重试策略,也可以自行实现RetryPolicy接口
sessionTimeoutMs 会话超时时间,单位毫秒,默认60000ms
connectionTimeoutMs 连接创建超时时间,单位毫秒,默认60000ms

真像原来听说的如出一辙,苗族人之房子都是打在高峰的。吊脚楼这样的建真的好反映苗族先人的小聪明,既省去了土地,又避免毁坏土层结构,既可免潮湿,又好防野兽,而且吊脚楼依山而打,临水而立即,成为了苗寨一远在美丽的风景线。

2.使用Fluent风格的Api创建会话

主干参数变为流式设置,一个列子如下:

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
        CuratorFrameworkFactory.builder()
                .connectString(connectionInfo)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .build();

于咱们还于称赞这样筑房的小聪明的下,苗族姑娘开着玩笑和我们说:“很多人数还问我们是怎么想到将房建及高峰的,其实我们原先也是止在坪上的,只是我们的祖辈为杀戮,我们才同步一步逃至这山上的。”说了这些话语,除了生苗族姑娘,在场之人头都未曾笑。每个人犹了解历史上的那些乱,现在底我们且来
一种植为好是中国人的骄傲感,从而为同种高姿态去赞他们的灵性,殊不知这些所谓的明白,是他们曾多么无奈的挑。

3.开立包含隔离命名空间的对话

为了促成不同之Zookeeper业务之间的割裂,需要吗每个工作分配一个独的命名空间(NameSpace),即指定一个Zookeeper的绝望路径(官方术语:为Zookeeper添加“Chroot”特性)。例如(下面的例子)当客户端指定了单独命名空间也“/base”,那么该客户端对Zookeeper上的数据节点的操作都是基于该目录进行的。通过设置Chroot可以用客户端应用与Zookeeper服务端的一致课子树相对应,在差不多个使用共用一个Zookeeper集群的情景下,这对于实现不同采取内的互隔离十分闹含义。

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
        CuratorFrameworkFactory.builder()
                .connectString(connectionInfo)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .namespace("base")
                .build();

假如她们之菲菲被为藏在历史的划痕,苗族姑娘告诉我们因而他们见面身着银饰,是盖他们之先世发现将银子佩戴于身上,在吃屠杀之后逃脱,可以带属于他们自己之财产,而那些所谓的带银饰对人好,都是后来才发现的。

开行客户端

当创建会说话成功,得到client的实例然后可以直接调用其start( )方法:

client.start();

关于这样的故事,苗族姑娘叫咱说了诸多,但总体她都是笑着讲述。他们现在之在着实十分美好,他们呢是诚心诚意的谢谢国家及朝,而它们底这些描述,只是为了让咱可错过真正的刺探这个中华民族。

数节点操作

虽说苗族人的灵性来太多历史之烙印,但是只能吃我们敬佩。

创数量节点

Zookeeper的节点创建模式:

  • PERSISTENT:持久化
  • PERSISTENT_SEQUENTIAL:持久化并且带动序列号
  • EPHEMERAL:临时
  • EPHEMERAL_SEQUENTIAL:临时又带来序列号

**创建一个节点,初始内容吗空 **

client.create().forPath("path");

顾:如果没有设置节点性,节点创建模式默认为持久化节点,内容默认为空

创一个节点,附带初始化内容

client.create().forPath("path","init".getBytes());

创立一个节点,指定创建模式(临时节点),内容呢空

client.create().withMode(CreateMode.EPHEMERAL).forPath("path");

创建一个节点,指定创建模式(临时节点),附带初始化内容

client.create().withMode(CreateMode.EPHEMERAL).forPath("path","init".getBytes());

缔造一个节点,指定创建模式(临时节点),附带初始化内容,并且自动递归创建父节点

client.create()
      .creatingParentContainersIfNeeded()
      .withMode(CreateMode.EPHEMERAL)
      .forPath("path","init".getBytes());

此creatingParentContainersIfNeeded()接口非常有因此,因为相似情形开发人员在开创一个子节点必须认清其的父节点是否有,如果不存直接开立会抛出NoNodeException,使用creatingParentContainersIfNeeded()之后Curator能够活动递归创建所有所需的父节点。

好少看到犹太人和苗族人随即点儿个人群出现在共,而自己觉得就是挺一般之少只人群。他们灾难深重,而同时硬。他们都曾经经历了煎熬,历史给他俩的记忆都极过沉重,而她们之明白而还叫现在之世人所许。

去除数据节点

除去一个节点

client.delete().forPath("path");

小心,此办法才会去叶子节点,否则会扔来异常。

删除一个节点,并且递归删除该兼具的子节点

client.delete().deletingChildrenIfNeeded().forPath("path");

去除一个节点,强制指定版本进行去

client.delete().withVersion(10086).forPath("path");

除去一个节点,强制保险删除

client.delete().guaranteed().forPath("path");

guaranteed()接口是一个保措施,只要客户端会话有效,那么Curator会在后台持续进行删除操作,直到删除节点成功。

注意:方的大都独流式接口是好自由组合的,例如:

client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(10086).forPath("path");

涉嫌这片人数人群,心疼多了全。他们多多像是一个享自己想法桀骜不驯的小子,被实际教育成为所谓的听话懂事的好孩子,不多提过程,也不争论结果,我认为她们虽败犹荣!

读取数据节点数据

读取一个节点的多寡内容

client.getData().forPath("path");

小心,此措施返的返回值是byte[ ];

读取一个节点的数目内容,同时获取到拖欠节点的stat

Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("path");

创新数据节点数据

履新一个节点的多少内容

client.setData().forPath("path","data".getBytes());

留意:该接口会回去一个Stat实例

履新一个节点的数量内容,强制指定版本进行翻新

client.setData().withVersion(10086).forPath("path","data".getBytes());

反省节点是否在

client.checkExists().forPath("path");

专注:该措施返回一个Stat实例,用于检查ZNode是否留存的操作.
可以调用额外的方法(监控或后台处理)并以结尾调用forPath(
)指定要操作的ZNode

获有节点的所有子节点路径

client.getChildren().forPath("path");

专注:该办法的返回值为List<String>,获得ZNode的子节点Path列表。
可以调用额外的法(监控、后台处理要抱状态watch, background or get
stat) 并以末调用forPath()指定要操作的父ZNode

事务

CuratorFramework的实例包含inTransaction(
)接口方法,调用此方被一个ZooKeeper事务. 可以复合create, setData,
check, and/or delete
等操作然后调用commit()作为一个原子操作提交。一个例证如下:

client.inTransaction().check().forPath("path")
      .and()
      .create().withMode(CreateMode.EPHEMERAL).forPath("path","data".getBytes())
      .and()
      .setData().withVersion(10086).forPath("path","data2".getBytes())
      .and()
      .commit();

异步接口

点提到的创立、删除、更新、读取等办法都是合的,Curator提供异步接口,引入了BackgroundCallback接口用于拍卖异步接口调用之后服务端返回的结果信息。BackgroundCallback接口中一个重要之回调值吗CuratorEvent,里面含事件类、响应也和节点的详细信息。

CuratorEventType

事件类型 对应CuratorFramework实例的方法
CREATE #create()
DELETE #delete()
EXISTS #checkExists()
GET_DATA #getData()
SET_DATA #setData()
CHILDREN #getChildren()
SYNC #sync(String,Object)
GET_ACL #getACL()
SET_ACL #setACL()
WATCHED #Watcher(Watcher)
CLOSING #close()

响应码(#getResultCode())

响应码 意义
0 OK,即调用成功
-4 ConnectionLoss,即客户端与服务端断开连接
-110 NodeExists,即节点已经存在
-112 SessionExpired,即会话过期

一个异步创建节点的事例如下:

Executor executor = Executors.newFixedThreadPool(2);
client.create()
      .creatingParentsIfNeeded()
      .withMode(CreateMode.EPHEMERAL)
      .inBackground((curatorFramework, curatorEvent) -> {      System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode()));
      },executor)
      .forPath("path");

注意:如果#inBackground()方法无指定executor,那么会默认使用Curator的EventThread去开展异步处理。

Curator食谱(高级特性)

提醒:首先你要补偿加curator-recipes依赖,下文仅仅对recipes一些特点的运用进行解释和举例,不打算进行源码级别之追究

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

重中之重提示:强烈推荐使用ConnectionStateListener监控连接的状态,当连接状态吧LOST,curator-recipes下之有着Api将会失效或者逾期,尽管后面所有的事例都并未采用及ConnectionStateListener。

缓存

Zookeeper原生支持通过挂号Watcher来进行事件监听,但是开发者需要数注册(Watcher只能单次注册单次使用)。Cache是Curator中针对事件监听的卷入,可以视作是指向事件监听的本地缓存视图,能够自动为开发者处理反复注册监听。Curator提供了三栽Watcher(Cache)来监听结点的成形。

Path Cache

Path Cache用来监督一个ZNode的子节点. 当一个子节点增加, 更新,删除时,
Path Cache会改变它们的状态, 会包含最新的子节点,
子节点的多少与状态,而状态的更变将由此PathChildrenCacheListener通知。

事实上行使时见面波及到四单近乎:

  • PathChildrenCache
  • PathChildrenCacheEvent
  • PathChildrenCacheListener
  • ChildData

经下面的构造函数创建Path Cache:

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)

怀念利用cache,必须调用它的start办法,使用完后调用close主意。
可以装StartMode来实现启动之模式,

StartMode有下几乎种:

  1. NORMAL:正常初始化。
  2. BUILD_INITIAL_CACHE:在调用start()前面见面调用rebuild()
  3. POST_INITIALIZED_EVENT:
    当Cache初始化数据后发送一个PathChildrenCacheEvent.Type#INITIALIZED事件

public void addListener(PathChildrenCacheListener listener)得长listener监听缓存的变动。

getCurrentData()措施返回一个List<ChildData>对象,可以遍历所有的子节点。

安/更新、移除其实是使用client (CuratorFramework)来操作,
不经过PathChildrenCache操作:

public class PathCacheDemo {

    private static final String PATH = "/example/pathCache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        PathChildrenCache cache = new PathChildrenCache(client, PATH, true);
        cache.start();
        PathChildrenCacheListener cacheListener = (client1, event) -> {
            System.out.println("事件类型:" + event.getType());
            if (null != event.getData()) {
                System.out.println("节点数据:" + event.getData().getPath() + " = " + new String(event.getData().getData()));
            }
        };
        cache.getListenable().addListener(cacheListener);
        client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test01", "01".getBytes());
        Thread.sleep(10);
        client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test02", "02".getBytes());
        Thread.sleep(10);
        client.setData().forPath("/example/pathCache/test01", "01_V2".getBytes());
        Thread.sleep(10);
        for (ChildData data : cache.getCurrentData()) {
            System.out.println("getCurrentData:" + data.getPath() + " = " + new String(data.getData()));
        }
        client.delete().forPath("/example/pathCache/test01");
        Thread.sleep(10);
        client.delete().forPath("/example/pathCache/test02");
        Thread.sleep(1000 * 5);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:要new PathChildrenCache(client, PATH,
true)中之参数cacheData值设置也false,则示例中的event.getData().getData()、data.getData()将回null,cache将非会见缓存节点数据。

注意:示范中的Thread.sleep(10)可以注释掉,但是注释后事件监听的触发次数会不全,这恐怕跟PathCache的落实原理有关,不克最好过数的接触事件!

Node Cache

Node Cache与Path Cache类似,Node
Cache只是监听某一个一定的节点。它事关到脚的老三只类似:

  • NodeCache – Node Cache实现类
  • NodeCacheListener – 节点监听器
  • ChildData – 节点数据

注意:下cache,依然要调用它的start()方法,使用完后调用close()方法。

getCurrentData()将获取节点当前底状态,通过它们的状态好博得时之价。

public class NodeCacheDemo {

    private static final String PATH = "/example/cache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        client.create().creatingParentsIfNeeded().forPath(PATH);
        final NodeCache cache = new NodeCache(client, PATH);
        NodeCacheListener listener = () -> {
            ChildData data = cache.getCurrentData();
            if (null != data) {
                System.out.println("节点数据:" + new String(cache.getCurrentData().getData()));
            } else {
                System.out.println("节点被删除!");
            }
        };
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:示范中的Thread.sleep(10)可以注释,但是注释后事件监听的触发次数会不全,这或者同NodeCache的兑现原理有关,不克太过累的触发事件!

注意:NodeCache只能监听一个节点的状态变化。

Tree Cache

Tree
Cache可以监督所有树上的拥有节点,类似于PathCache和NodeCache的做,主要干到下面四只类似:

  • TreeCache – Tree Cache实现类
  • TreeCacheListener – 监听器类
  • TreeCacheEvent – 触发的波类
  • ChildData – 节点数据

public class TreeCacheDemo {

    private static final String PATH = "/example/cache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        client.create().creatingParentsIfNeeded().forPath(PATH);
        TreeCache cache = new TreeCache(client, PATH);
        TreeCacheListener listener = (client1, event) ->
                System.out.println("事件类型:" + event.getType() +
                        " | 路径:" + (null != event.getData() ? event.getData().getPath() : null));
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:当此示例中无利用Thread.sleep(10),但是事件触发次数为是例行的。

注意:TreeCache在初始化(调用start()办法)的时会回调TreeCacheListener实例一个事TreeCacheEvent,而回调的TreeCacheEvent对象的Type为INITIALIZED,ChildData为null,此时event.getData().getPath()雅有或导致空指针异常,这里当主动处理并避免这种状况。

Leader选举

当分布式计算中, leader elections凡深重大之一个效,
这个选举过程是这样子的: 指派一个进程作为组织者,将任务分发给各个节点。
在职责开始前,
哪个节点都不明了哪个是leader(领导者)或者coordinator(协调者).
当选举算法开始实行后, 每个节点最终见面得一个唯一的节点作为天职leader.
除此之外,
选举还常常会面发生在leader意外宕机的景象下,新的leader要被选举出来。

每当zookeeper集群中,leader负责写操作,然后通过Zab商量落实follower的一起,leader或者follower都足以拍卖读操作。

Curator
有两种leader选举的recipe,分别是LeaderSelectorLeaderLatch

前端是独具存活的客户端不暂停的更替举行Leader,大同社会。后者是要选举出Leader,除非有客户端挂掉重触发选举,否则不会见交出领导权。某党?

LeaderLatch

LeaderLatch有零星独构造函数:

public LeaderLatch(CuratorFramework client, String latchPath)
public LeaderLatch(CuratorFramework client, String latchPath,  String id)

LeaderLatch的启动:

leaderLatch.start( );

要启动,LeaderLatch会和外使用相同latch
path的别LeaderLatch交涉,然后中间一个结尾见面让推举为leader,可以通过hasLeadership措施查看LeaderLatch实例是否leader:

leaderLatch.hasLeadership( ); //返回true说明时实例是leader

好像JDK的CountDownLatch,
LeaderLatch在恳求成为leadership会block(阻塞),一旦不应用LeaderLatch了,必须调用close法。
如果它们是leader,会放出leadership, 其它的参与者以见面选举一个leader。

public void await() throws InterruptedException,EOFException
/*Causes the current thread to wait until this instance acquires leadership
unless the thread is interrupted or closed.*/
public boolean await(long timeout,TimeUnit unit)throws InterruptedException

生处理:
LeaderLatch实例可以追加ConnectionStateListener来监听网络连接问题。 当
SUSPENDED 或 LOST 时,
leader不再认为好要leader。当LOST后总是重连后RECONNECTED,LeaderLatch会删除先前底ZNode然后更创设一个。LeaderLatch用户须考虑导致leadership丢失的总是问题。
强烈推荐你采取ConnectionStateListener。

一个LeaderLatch的使用例子:

public class LeaderLatchDemo extends BaseConnectionInfo {
    protected static String PATH = "/francis/leader";
    private static final int CLIENT_QTY = 10;


    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderLatch> examples = Lists.newArrayList();
        TestingServer server=new TestingServer();
        try {
            for (int i = 0; i < CLIENT_QTY; i++) {
                CuratorFramework client
                        = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3));
                clients.add(client);
                LeaderLatch latch = new LeaderLatch(client, PATH, "Client #" + i);
                latch.addListener(new LeaderLatchListener() {

                    @Override
                    public void isLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am Leader");
                    }

                    @Override
                    public void notLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am not Leader");
                    }
                });
                examples.add(latch);
                client.start();
                latch.start();
            }
            Thread.sleep(10000);
            LeaderLatch currentLeader = null;
            for (LeaderLatch latch : examples) {
                if (latch.hasLeadership()) {
                    currentLeader = latch;
                }
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
            currentLeader.close();

            Thread.sleep(5000);

            for (LeaderLatch latch : examples) {
                if (latch.hasLeadership()) {
                    currentLeader = latch;
                }
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
        } finally {
            for (LeaderLatch latch : examples) {
                if (null != latch.getState())
                CloseableUtils.closeQuietly(latch);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
        }
    }
}

可添加test module的依方便进行测试,不待启动真实的zookeeper服务端:

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-test</artifactId>
            <version>2.12.0</version>
        </dependency>

率先我们创建了10独LeaderLatch,启动后它们中之一个会面被推举为leader。
因为选举会花费一些日,start后并无能够及时就落leader。
通过hasLeadership翻看自己是否是leader, 如果是的讲话返回true。
足由此.getLeader().getId()足取得时底leader的ID。
偏偏会由此close放当前之政权。
await大凡一个绿灯方法, 尝试获取leader地位,但是未必能够上位。

LeaderSelector

LeaderSelector以的时根本涉嫌下面几乎单近乎:

  • LeaderSelector
  • LeaderSelectorListener
  • LeaderSelectorListenerAdapter
  • CancelLeadershipException

核心类是LeaderSelector,它的构造函数如下:

public LeaderSelector(CuratorFramework client, String mutexPath,LeaderSelectorListener listener)
public LeaderSelector(CuratorFramework client, String mutexPath, ThreadFactory threadFactory, Executor executor, LeaderSelectorListener listener)

类似LeaderLatch,LeaderSelector必须start: leaderSelector.start();
一旦启动,当实例取得领导权时你的listener的takeLeadership()方法为调用。而takeLeadership()方法就出领导权被释放时才回到。
当你不再用LeaderSelector实例时,应该调用它的close方法。

老大处理
LeaderSelectorListener类继承ConnectionStateListener。LeaderSelector必须小心连接状态的反。如果实例成为leader,
它应当响应SUSPENDED 或 LOST。 当 SUSPENDED 状态出现经常,
实例必使在再次连接成功之前它恐怕不再是leader了。 如果LOST状态出现,
实例不再是leader, takeLeadership方法返回。

重要: 推荐处理方式是当接过SUSPENDED 或
LOST时抛来CancelLeadershipException异常.。这会招致LeaderSelector实例中断并吊销执行takeLeadership方法的异常.。这挺关键,
你必考虑扩大LeaderSelectorListenerAdapter.
LeaderSelectorListenerAdapter提供了推荐的拍卖逻辑。

下面的一个例子摘抄自官方:

public class LeaderSelectorAdapter extends LeaderSelectorListenerAdapter implements Closeable {
    private final String name;
    private final LeaderSelector leaderSelector;
    private final AtomicInteger leaderCount = new AtomicInteger();

    public LeaderSelectorAdapter(CuratorFramework client, String path, String name) {
        this.name = name;
        leaderSelector = new LeaderSelector(client, path, this);
        leaderSelector.autoRequeue();
    }

    public void start() throws IOException {
        leaderSelector.start();
    }

    @Override
    public void close() throws IOException {
        leaderSelector.close();
    }

    @Override
    public void takeLeadership(CuratorFramework client) throws Exception {
        final int waitSeconds = (int) (5 * Math.random()) + 1;
        System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
        System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");
        try {
            Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
        } catch (InterruptedException e) {
            System.err.println(name + " was interrupted.");
            Thread.currentThread().interrupt();
        } finally {
            System.out.println(name + " relinquishing leadership.\n");
        }
    }
}

若得在takeLeadership进行任务之分配等等,并且毫不回来,如果你想只要使这实例一直是leader的语可以加一个死循环。调用
leaderSelector.autoRequeue();确保在这个实例释放领导权之后还可能取得领导权。
在这边我们运用AtomicInteger来记录此client获得领导权的次数, 它是”fair”,
每个client有同等的时机得到领导权。

public class LeaderSelectorDemo {

    protected static String PATH = "/francis/leader";
    private static final int CLIENT_QTY = 10;


    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderSelectorAdapter> examples = Lists.newArrayList();
        TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < CLIENT_QTY; i++) {
                CuratorFramework client
                        = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3));
                clients.add(client);
                LeaderSelectorAdapter selectorAdapter = new LeaderSelectorAdapter(client, PATH, "Client #" + i);
                examples.add(selectorAdapter);
                client.start();
                selectorAdapter.start();
            }
            System.out.println("Press enter/return to quit\n");
            new BufferedReader(new InputStreamReader(System.in)).readLine();
        } finally {
            System.out.println("Shutting down...");
            for (LeaderSelectorAdapter exampleClient : examples) {
                CloseableUtils.closeQuietly(exampleClient);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
            CloseableUtils.closeQuietly(server);
        }
    }
}

对待能够,LeaderLatch必须调用close()计才见面放领导权,而对此LeaderSelector,通过LeaderSelectorListener可以针对领导权进行支配,
在适龄的时释放领导权,这样每个节点都有或取得领导权。从而,LeaderSelector具有更好之八面玲珑与可控性,建议来LeaderElection应用场景下先采取LeaderSelector。

分布式锁

提醒:

1.引进用ConnectionStateListener监控连接的状态,因为当连接LOST时你不再抱有锁

2.分布式的锁全局同步,
这表示任何一个岁月接触未会见发生少只客户端都拥有一致的吊。

可是还称并享锁—Shared Reentrant Lock

Shared意味着锁是全局可见的, 客户端都得请求锁。
Reentrant和JDK的ReentrantLock类似,即可重入,
意味着同一个客户端在有着锁之同时,可以屡屡到手,不见面让死。
它是由类似InterProcessMutex来落实。 它的构造函数为:

public InterProcessMutex(CuratorFramework client, String path)

通过acquire()抱锁,并提供过机制:

public void acquire()
Acquire the mutex - blocking until it's available. Note: the same thread can call acquire
re-entrantly. Each call to acquire must be balanced by a call to release()

public boolean acquire(long time,TimeUnit unit)
Acquire the mutex - blocks until it's available or the given time expires. Note: the same thread can call acquire re-entrantly. Each call to acquire that returns true must be balanced by a call to release()

Parameters:
time - time to wait
unit - time unit
Returns:
true if the mutex was acquired, false if not

通过release()方释放锁。 InterProcessMutex 实例可以用。

Revoking ZooKeeper recipes wiki定义了不过商榷的取消机制。
为了撤销mutex, 调用底的方法:

public void makeRevocable(RevocationListener<T> listener)
将锁设为可撤销的. 当别的进程或线程想让你释放锁时Listener会被调用。
Parameters:
listener - the listener

要是你要撤销当前之缉,
调用attemptRevoke()艺术,注意锁释放时RevocationListener拿会回调。

public static void attemptRevoke(CuratorFramework client,String path) throws Exception
Utility to mark a lock for revocation. Assuming that the lock has been registered
with a RevocationListener, it will get called and the lock should be released. Note,
however, that revocation is cooperative.
Parameters:
client - the client
path - the path of the lock - usually from something like InterProcessMutex.getParticipantNodes()

次潮提醒:错误处理
还是强烈推荐你用ConnectionStateListener处理连接状态的变动。
当连接LOST时您不再具备锁。

首先被我们创建一个法的共享资源,
这个资源要只能单线程的看,否则会出起问题。

public class FakeLimitedResource {
    private final AtomicBoolean inUse = new AtomicBoolean(false);

    public void use() throws InterruptedException {
        // 真实环境中我们会在这里访问/维护一个共享的资源
        //这个例子在使用锁的情况下不会非法并发异常IllegalStateException
        //但是在无锁的情况由于sleep了一段时间,很容易抛出异常
        if (!inUse.compareAndSet(false, true)) {
            throw new IllegalStateException("Needs to be used by one client at a time");
        }
        try {
            Thread.sleep((long) (3 * Math.random()));
        } finally {
            inUse.set(false);
        }
    }
}

下一场创建一个InterProcessMutexDemo仿佛, 它当请求锁,
使用资源,释放锁这样一个完完全全的造访过程。

public class InterProcessMutexDemo {

    private InterProcessMutex lock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public InterProcessMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        this.lock = new InterProcessMutex(client, lockPath);
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        if (!lock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " could not acquire the lock");
        }
        try {
            System.out.println(clientName + " get the lock");
            resource.use(); //access resource exclusively
        } finally {
            System.out.println(clientName + " releasing the lock");
            lock.release(); // always release the lock in a finally block
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final InterProcessMutexDemo example = new InterProcessMutexDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
    }
}

代码也格外简短,生成10独client, 每个client重复执行10破
请求锁–访问资源–释放锁的进程。每个client都在单独的线程中。
结果好看来,锁是自由的让每个实例排他性的动。

既然是不过选用的,你可于一个线程中反复调用acquire(),在线程拥有锁时它们总是回到true。

公切莫应有在多单线程中因故和一个InterProcessMutex
你可以每个线程中都很成一个新的InterProcessMutex实例,它们的path都一模一样,这样她得以共同享同一个锁。

不行重入共享锁—Shared Lock

其一锁和地方的InterProcessMutex比,就是少了Reentrant的效能,也便意味着它们不克以跟一个线程中重入。这个近乎是InterProcessSemaphoreMutex,使用方法与InterProcessMutex类似

public class InterProcessSemaphoreMutexDemo {

    private InterProcessSemaphoreMutex lock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public InterProcessSemaphoreMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        this.lock = new InterProcessSemaphoreMutex(client, lockPath);
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        if (!lock.acquire(time, unit))
        {
            throw new IllegalStateException(clientName + " 不能得到互斥锁");
        }
        System.out.println(clientName + " 已获取到互斥锁");
        if (!lock.acquire(time, unit))
        {
            throw new IllegalStateException(clientName + " 不能得到互斥锁");
        }
        System.out.println(clientName + " 再次获取到互斥锁");
        try {
            System.out.println(clientName + " get the lock");
            resource.use(); //access resource exclusively
        } finally {
            System.out.println(clientName + " releasing the lock");
            lock.release(); // always release the lock in a finally block
            lock.release(); // 获取锁几次 释放锁也要几次
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final InterProcessSemaphoreMutexDemo example = new InterProcessSemaphoreMutexDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
        Thread.sleep(Integer.MAX_VALUE);
    }
}

运行后发现,有且只有发一个client成功获得第一单锁(第一单acquire()艺术返回true),然后其自己死在亚独acquire()方法,获取第二单锁超时;其他有的客户端都阻塞在首先独acquire()方超时并且抛来深。

如此这般呢就说明了InterProcessSemaphoreMutex心想事成之沿是不可重入的。

而更称读写锁—Shared Reentrant Read Write Lock

类似JDK的ReentrantReadWriteLock。一个念写锁管理均等针对性相关的锁。一个承受读操作,另外一个负责写操作。读操作以写锁没叫采取时不过又鉴于多单过程使,而写锁在使时无同意读(阻塞)。

此锁是可重入的。一个享有写锁之线程可重入读锁,但是读锁却非可知上写锁。这为代表描绘锁得降成读锁,
比如请求写锁 —>请求读锁—>释放读锁
—->释放写锁
。从读锁升级成为写锁是挺的。

不过另行称读写锁主要由于少单类似实现:InterProcessReadWriteLockInterProcessMutex。使用时首先创建一个InterProcessReadWriteLock实例,然后又冲你的要求得到读锁或者写锁,读写锁之花色是InterProcessMutex

public class ReentrantReadWriteLockDemo {

    private final InterProcessReadWriteLock lock;
    private final InterProcessMutex readLock;
    private final InterProcessMutex writeLock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public ReentrantReadWriteLockDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        lock = new InterProcessReadWriteLock(client, lockPath);
        readLock = lock.readLock();
        writeLock = lock.writeLock();
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        // 注意只能先得到写锁再得到读锁,不能反过来!!!
        if (!writeLock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " 不能得到写锁");
        }
        System.out.println(clientName + " 已得到写锁");
        if (!readLock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " 不能得到读锁");
        }
        System.out.println(clientName + " 已得到读锁");
        try {
            resource.use(); // 使用资源
            Thread.sleep(1000);
        } finally {
            System.out.println(clientName + " 释放读写锁");
            readLock.release();
            writeLock.release();
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY ;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final ReentrantReadWriteLockDemo example = new ReentrantReadWriteLockDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
    }
}

信号量—Shared Semaphore

一个计数的信号量类似JDK的Semaphore。
JDK中Semaphore维护的平组认可(permits),而Curator中谓租约(Lease)。
有半点种方法得以控制semaphore的太充分租约数。第一栽办法是用户被定path并且指定最可怜LeaseSize。第二种方式用户给定path并且以SharedCountReader类。要是未采取SharedCountReader,
必须保证有实例在差不多进程中应用同样之(最可怜)租约数量,否则发生或出现A进程遭到之实例持有最深租约数量也10,但是于B进程面临负有的极特别租约数量也20,此时租约的含义就是失效了。

这次调用acquire()会见回去一个租约对象。
客户端必须于finally中close这些租约对象,否则这些租约会丢失不见。 但是,
但是,如果客户端session由于某种原因比如crash丢掉,
那么这些客户端有的租约会自动close,
这样任何客户端可延续以这些租约。 租约还好透过下面的主意赶回还:

public void returnAll(Collection<Lease> leases)
public void returnLease(Lease lease)

专注你可以一次性请求多独租约,如果Semaphore当前底租约不够,则请线程会吃卡住。
同时还提供了过的重载方法。

public Lease acquire()
public Collection<Lease> acquire(int qty)
public Lease acquire(long time, TimeUnit unit)
public Collection<Lease> acquire(int qty, long time, TimeUnit unit)

Shared Semaphore使用的主要类包括下面几乎个:

  • InterProcessSemaphoreV2
  • Lease
  • SharedCountReader

public class InterProcessSemaphoreDemo {

    private static final int MAX_LEASE = 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        FakeLimitedResource resource = new FakeLimitedResource();
        try (TestingServer server = new TestingServer()) {

            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, PATH, MAX_LEASE);
            Collection<Lease> leases = semaphore.acquire(5);
            System.out.println("get " + leases.size() + " leases");
            Lease lease = semaphore.acquire();
            System.out.println("get another lease");

            resource.use();

            Collection<Lease> leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS);
            System.out.println("Should timeout and acquire return " + leases2);

            System.out.println("return one lease");
            semaphore.returnLease(lease);
            System.out.println("return another 5 leases");
            semaphore.returnAll(leases);
        }
    }
}

第一我们先获了5个租约, 最后我们把它还吃了semaphore。
接着要了一个租约,因为semaphore还起5独租约,所以告可以满足,返回一个租约,还留4单租约。
然后再度请一个租约,因为租约不够,卡住到过期,还是没能满足,返回结果吧null(租约不足会阻塞到过期,然后回null,不会见积极性弃来十分;如果未装过时间,会一如既往阻塞)。

地方说称的锁都是公正锁(fair)。 总ZooKeeper的角度看,
每个客户端都按照请求的各个获得锁,不存在非公平的侵吞的情事。

大抵一道享锁对象 —Multi Shared Lock

Multi Shared Lock是一个锁之容器。 当调用acquire()
所有的吊都见面为acquire(),如果要失败,所有的沿都见面于release。
同样调用release时享有的锁都深受release(未果为忽视)。
基本上,它便是组锁的象征,在它们点的呼吁释放操作都见面传递让它富含的所有的沿。

最主要涉及个别独八九不离十:

  • InterProcessMultiLock
  • InterProcessLock

它们的构造函数需要包含的缉的集合,或者千篇一律组ZooKeeper的path。

public InterProcessMultiLock(List<InterProcessLock> locks)
public InterProcessMultiLock(CuratorFramework client, List<String> paths)

用法和Shared Lock相同。

public class MultiSharedLockDemo {

    private static final String PATH1 = "/examples/locks1";
    private static final String PATH2 = "/examples/locks2";

    public static void main(String[] args) throws Exception {
        FakeLimitedResource resource = new FakeLimitedResource();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            InterProcessLock lock1 = new InterProcessMutex(client, PATH1);
            InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, PATH2);

            InterProcessMultiLock lock = new InterProcessMultiLock(Arrays.asList(lock1, lock2));

            if (!lock.acquire(10, TimeUnit.SECONDS)) {
                throw new IllegalStateException("could not acquire the lock");
            }
            System.out.println("has got all lock");

            System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());
            System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());

            try {
                resource.use(); //access resource exclusively
            } finally {
                System.out.println("releasing the lock");
                lock.release(); // always release the lock in a finally block
            }
            System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());
            System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());
        }
    }
}

新建一个InterProcessMultiLock, 包含一个再次入锁和一个非重入锁。
调用acquire()继好观看线程同时具备了当时点儿独锁。
调用release()顾这半个锁都受释放了。

末了再重一次等,
强烈推荐使用ConnectionStateListener监控连接的状态,当连接状态呢LOST,锁将见面丢。

分布式计数器

顾名思义,计数器是因此来计数的,
利用ZooKeeper可以兑现一个集群共享的计数器。
只要用同一的path就得博最新的计数器值,
这是由ZooKeeper的一致性保证的。Curator有零星只计数器,
一个凡是用int来计数(SharedCount),一个用long来计数(DistributedAtomicLong)。

分布式int计数器—SharedCount

是看似以int类型来计数。 主要涉嫌三只类似。

  • SharedCount
  • SharedCountReader
  • SharedCountListener

SharedCount表示计数器,
可以为其长一个SharedCountListener,当计数器改变时这个Listener可以监听到转的风波,而SharedCountReader可读取到新型的价值,
包括字面值和拉动版本信息的值VersionedValue。

public class SharedCounterDemo implements SharedCountListener {

    private static final int QTY = 5;
    private static final String PATH = "/examples/counter";

    public static void main(String[] args) throws IOException, Exception {
        final Random rand = new Random();
        SharedCounterDemo example = new SharedCounterDemo();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            SharedCount baseCount = new SharedCount(client, PATH, 0);
            baseCount.addListener(example);
            baseCount.start();

            List<SharedCount> examples = Lists.newArrayList();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final SharedCount count = new SharedCount(client, PATH, 0);
                examples.add(count);
                Callable<Void> task = () -> {
                    count.start();
                    Thread.sleep(rand.nextInt(10000));
                    System.out.println("Increment:" + count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10)));
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

            for (int i = 0; i < QTY; ++i) {
                examples.get(i).close();
            }
            baseCount.close();
        }
        Thread.sleep(Integer.MAX_VALUE);
    }

    @Override
    public void stateChanged(CuratorFramework arg0, ConnectionState arg1) {
        System.out.println("State changed: " + arg1.toString());
    }

    @Override
    public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {
        System.out.println("Counter's value is changed to " + newCount);
    }
}

在这个例子中,我们用baseCount来监听计数值(addListener计来补加SharedCountListener
)。 任意的SharedCount, 只要动同样的path,都好获得此计数值。
然后我们利用5个线程为计算数值长一个10以内之随机数。相同之path的SharedCount对计数值进行变更,将会回调给baseCount的SharedCountListener。

count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10))

此间我们以trySetCount失掉装计数器。
先是单参数提供当前的VersionedValue,如果中其它client更新了这个计数值,
你的创新可能未成事,
但是这你的client更新了新型的值,所以失败了你可以尝试重新重新一潮。
setCount大凡劫持更新计数器的价值

专注计数器必须start,使用完后必须调用close关闭它。

强烈推荐使用ConnectionStateListener
在本例中SharedCountListener扩展ConnectionStateListener

分布式long计数器—DistributedAtomicLong

再看一个Long类型的计数器。 除了计数的限定比SharedCount深了外,
它首先尝试以乐观锁的办法设置计数器,
如果不成功(比如中计数器已经被外client更新了),
它采取InterProcessMutex方式来更新计数值。

得打她的里边贯彻DistributedAtomicValue.trySet()中看出:

   AtomicValue<byte[]>   trySet(MakeValue makeValue) throws Exception
    {
        MutableAtomicValue<byte[]>  result = new MutableAtomicValue<byte[]>(null, null, false);

        tryOptimistic(result, makeValue);
        if ( !result.succeeded() && (mutex != null) )
        {
            tryWithMutex(result, makeValue);
        }

        return result;
    }

此计数器有同系列之操作:

  • get(): 获取当前值
  • increment(): 加一
  • decrement(): 减一
  • add(): 增加一定的价
  • subtract(): 减去特定的值
  • trySet(): 尝试设置计数值
  • forceSet(): 强制设置计数值

必须自我批评返回结果的succeeded(), 它代表这个操作是否成。
如果操作成, preValue()意味着操作前之价值,
postValue()表示操作后的价。

public class DistributedAtomicLongDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/counter";

    public static void main(String[] args) throws IOException, Exception {
        List<DistributedAtomicLong> examples = Lists.newArrayList();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final DistributedAtomicLong count = new DistributedAtomicLong(client, PATH, new RetryNTimes(10, 10));

                examples.add(count);
                Callable<Void> task = () -> {
                    try {
                        AtomicValue<Long> value = count.increment();
                        System.out.println("succeed: " + value.succeeded());
                        if (value.succeeded())
                            System.out.println("Increment: from " + value.preValue() + " to " + value.postValue());
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
            Thread.sleep(Integer.MAX_VALUE);
        }
    }
}

分布式队列

以Curator也可以简化Ephemeral Node
(临时节点)的操作。Curator也提供ZK Recipe的分布式队列实现。 利用ZK的
PERSISTENTS_EQUENTIAL节点,
可以保放入到队中之类型是遵照顺序排队的。
如果单纯的顾客从队列中取数据, 那么其是事先抱先来底,这也是班的特征。
如果你严格要求顺序,你就是的以单一的消费者,可以利用Leader选举只给Leader作为唯一的主顾。

唯独, 根据Netflix的Curator作者所说,
ZooKeeper真心不相符做Queue,或者说ZK没有实现一个吓之Queue,详细内容好看
Tech Note
4,
原为生五:

  1. ZK有1MB 的导限制。
    实践备受ZNode必须相对较小,而行包含多的音信,非常之杀。
  2. 若是生很多节点,ZK启动时一定之款。 而使用queue会导致多ZNode.
    你待鲜明增大 initLimit 和 syncLimit.
  3. ZNode很特别的上死麻烦清理。Netflix不得不创建了一个专门的次序召开就事。
  4. 当好大方之带有多的子节点的ZNode时, ZK的性变得不得了
  5. ZK的数据库完全在内存中。 大量底Queue意味着会占多底内存空间。

尽管, Curator还是创造了各种Queue的兑现。
如果Queue的数据量不太多,数据量不极端可怜之景象下,酌情考虑,还是可以以的。

分布式队列—DistributedQueue

DistributedQueue是不过家常的同栽队列。 它设计以下四单近乎:

  • QueueBuilder – 创建行使用QueueBuilder,它呢是其余队列的缔造类
  • QueueConsumer – 队列中之信息消费者接口
  • QueueSerializer –
    队排消息序列化和倒序列化接口,提供了针对班中之目标的序列化和倒序列化
  • DistributedQueue – 队列实现类

QueueConsumer是顾客,它好接收队列的数目。处理队列中之数码的代码逻辑可以在QueueConsumer.consumeMessage()中。

好端端情况下优先用信息于队列中移除,再交由消费者花。但立刻是零星个步骤,不是原子的。可以调用Builder的lockPath()消费者加锁,当消费者花数据经常享有锁,这样任何消费者未可知消费之音。如果消费失败或者经过死掉,消息可以付出其他进程。这会带一些性质的损失。最好还是独消费者模式下队列。

public class DistributedQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework clientA = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        clientA.start();
        CuratorFramework clientB = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        clientB.start();
        DistributedQueue<String> queueA;
        QueueBuilder<String> builderA = QueueBuilder.builder(clientA, createQueueConsumer("A"), createQueueSerializer(), PATH);
        queueA = builderA.buildQueue();
        queueA.start();

        DistributedQueue<String> queueB;
        QueueBuilder<String> builderB = QueueBuilder.builder(clientB, createQueueConsumer("B"), createQueueSerializer(), PATH);
        queueB = builderB.buildQueue();
        queueB.start();
        for (int i = 0; i < 100; i++) {
            queueA.put(" test-A-" + i);
            Thread.sleep(10);
            queueB.put(" test-B-" + i);
        }
        Thread.sleep(1000 * 10);// 等待消息消费完成
        queueB.close();
        queueA.close();
        clientB.close();
        clientA.close();
        System.out.println("OK!");
    }

    /**
     * 队列消息序列化实现类
     */
    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {
            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }
        };
    }

    /**
     * 定义队列消费者
     */
    private static QueueConsumer<String> createQueueConsumer(final String name) {
        return new QueueConsumer<String>() {
            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("连接状态改变: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("消费消息(" + name + "): " + message);
            }
        };
    }
}

事例中定义了片独分布式队列和有限独顾客,因为PATH是一律之,会存在消费者抢占消费信息的情形。

带Id的分布式队列—DistributedIdQueue

DistributedIdQueue和方的班类似,但是可以吗队列中的各一个要素设置一个ID
可以经ID把班中随机的素移除。 它关系几单近乎:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedQueue

通过下面方法创建:

builder.buildIdQueue()

放入元素时:

queue.put(aMessage, messageId);

移除元素时:

int numberRemoved = queue.remove(messageId);

以斯事例中,
有些元素还从来不于消费者花前就易除了,这样消费者莫会见接收删除的音。

public class DistributedIdQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedIdQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildIdQueue();
            queue.start();

            for (int i = 0; i < 10; i++) {
                queue.put(" test-" + i, "Id" + i);
                Thread.sleep((long) (15 * Math.random()));
                queue.remove("Id" + i);
            }

            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("consume one message: " + message);
            }

        };
    }
}

预先级分布式队列—DistributedPriorityQueue

先级列对班中的素以优先级进行排序。 Priority越小,
元素越靠前, 越先为消费掉
。 它关系下面几乎个像样:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedPriorityQueue

经builder.buildPriorityQueue(minItemsBeforeRefresh)方法创建。
当优先级列得到元素增删消息不时,它见面暂停处理时之要素队列,然后刷新队列。minItemsBeforeRefresh指定刷新前即动之序列的极端小数码。
主要安装你的顺序可以容忍的未排序的不过小价。

放入队列时索要指定优先级:

queue.put(aMessage, priority);

例子:

public class DistributedPriorityQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedPriorityQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildPriorityQueue(0);
            queue.start();

            for (int i = 0; i < 10; i++) {
                int priority = (int) (Math.random() * 100);
                System.out.println("test-" + i + " priority:" + priority);
                queue.put("test-" + i, priority);
                Thread.sleep((long) (50 * Math.random()));
            }

            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                Thread.sleep(1000);
                System.out.println("consume one message: " + message);
            }

        };
    }

}

偶你恐怕会见发出错觉,优先级设置并没起效。那是为事先级是对队列积压的要素而言,如果消费速度过快来或出现于继一个素入队操作前前一个因素就给消费,这种场面下DistributedPriorityQueue会退化为DistributedQueue。

分布式延迟队列—DistributedDelayQueue

JDK中吗发生DelayQueue,不了解你是否熟悉。
DistributedDelayQueue也供了类似的效用, 元素有个delay值,
消费者隔一段时间才会接受元素。 涉及到下四单类似。

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedDelayQueue

由此下面的语句创建:

QueueBuilder<MessageType>    builder = QueueBuilder.builder(client, consumer, serializer, path);
... more builder method calls as needed ...
DistributedDelayQueue<MessageType> queue = builder.buildDelayQueue();

放入元素时好指定delayUntilEpoch

queue.put(aMessage, delayUntilEpoch);

注意delayUntilEpoch切莫是偏离现在底一个时光距离,
比如20毫秒,而是未来之一个年华穿,如 System.currentTimeMillis() + 10秒。
如果delayUntilEpoch的日已仙逝,消息会就叫消费者收到。

public class DistributedDelayQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedDelayQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildDelayQueue();
            queue.start();

            for (int i = 0; i < 10; i++) {
                queue.put("test-" + i, System.currentTimeMillis() + 10000);
            }
            System.out.println(new Date().getTime() + ": already put all items");


            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println(new Date().getTime() + ": consume one message: " + message);
            }

        };
    }
}

SimpleDistributedQueue

面前则实现了各种队列,但是你注意到没,这些队列并没有兑现类似JDK一样的接口。
SimpleDistributedQueue供了跟JDK基本一致的接口(但是并未兑现Queue接口)。
创建好粗略:

public SimpleDistributedQueue(CuratorFramework client,String path)

长元素:

public boolean offer(byte[] data) throws Exception

删去元素:

public byte[] take() throws Exception

另外还提供了另外方式:

public byte[] peek() throws Exception
public byte[] poll(long timeout, TimeUnit unit) throws Exception
public byte[] poll() throws Exception
public byte[] remove() throws Exception
public byte[] element() throws Exception

没有add方法, 多了take方法。

take主意以成返回之前会叫堵塞。
poll法以起列为空时直接回到null。

public class SimpleDistributedQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        SimpleDistributedQueue queue;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));
            client.start();
            queue = new SimpleDistributedQueue(client, PATH);
            Producer producer = new Producer(queue);
            Consumer consumer = new Consumer(queue);
            new Thread(producer, "producer").start();
            new Thread(consumer, "consumer").start();
            Thread.sleep(20000);
        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    public static class Producer implements Runnable {

        private SimpleDistributedQueue queue;

        public Producer(SimpleDistributedQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                try {
                    boolean flag = queue.offer(("zjc-" + i).getBytes());
                    if (flag) {
                        System.out.println("发送一条消息成功:" + "zjc-" + i);
                    } else {
                        System.out.println("发送一条消息失败:" + "zjc-" + i);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static class Consumer implements Runnable {

        private SimpleDistributedQueue queue;

        public Consumer(SimpleDistributedQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                byte[] datas = queue.take();
                System.out.println("消费一条消息成功:" + new String(datas, "UTF-8"));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }


}

但实际上发送了100漫漫信息,消费完毕第一修后,后面的信无法消费,目前没有找到原因。查看转法定文档推荐的demo使用下几乎个Api:

Creating a SimpleDistributedQueue

public SimpleDistributedQueue(CuratorFramework client,
                              String path)
Parameters:
client - the client
path - path to store queue nodes
Add to the queue

public boolean offer(byte[] data)
             throws Exception
Inserts data into queue.
Parameters:
data - the data
Returns:
true if data was successfully added
Take from the queue

public byte[] take()
           throws Exception
Removes the head of the queue and returns it, blocks until it succeeds.
Returns:
The former head of the queue
NOTE: see the Javadoc for additional methods

不过其实用发现还是在消费阻塞问题。

分布式屏障—Barrier

分布式Barrier是这样一个近似:
它会阻塞所有节点上之守候过程,直到某一个于满足,
然后有着的节点继续进行。

按赛马比赛被, 等赛马陆续来到起跑线前。
一名令下,所有的跑马都飞奔而出。

DistributedBarrier

DistributedBarrier接近实现了栅栏的机能。 它的构造函数如下:

public DistributedBarrier(CuratorFramework client, String barrierPath)
Parameters:
client - client
barrierPath - path to use as the barrier

第一你得装栅栏,它以卡住在它上面等待的线程:

setBarrier();

然后用阻塞的线程调用方法等放行条件:

public void waitOnBarrier()

当条件满足时,移除栅栏,所有等待的线程将继续执行:

removeBarrier();

异常处理 DistributedBarrier
会监控连接状态,当连接断掉时waitOnBarrier()方法会抛来非常。

public class DistributedBarrierDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/barrier";

    public static void main(String[] args) throws Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            DistributedBarrier controlBarrier = new DistributedBarrier(client, PATH);
            controlBarrier.setBarrier();

            for (int i = 0; i < QTY; ++i) {
                final DistributedBarrier barrier = new DistributedBarrier(client, PATH);
                final int index = i;
                Callable<Void> task = () -> {
                    Thread.sleep((long) (3 * Math.random()));
                    System.out.println("Client #" + index + " waits on Barrier");
                    barrier.waitOnBarrier();
                    System.out.println("Client #" + index + " begins");
                    return null;
                };
                service.submit(task);
            }
            Thread.sleep(10000);
            System.out.println("all Barrier instances should wait the condition");
            controlBarrier.removeBarrier();
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

            Thread.sleep(20000);
        }
    }
}

斯例子创建了controlBarrier来设置栅栏及移除栅栏。
我们创建了5个线程,在此Barrier上等待。
最后移除了栅栏后拥有的线程才继续执行。

倘若你开免装栅栏,所有的线程就不见面阻塞住。

双栅栏—DistributedDoubleBarrier

双栅栏允许客户端在计算的启同终止时一起。当足够的进程进入到双栅栏时,进程始起算计,
当计算好时,离开栅栏。 双栅栏类是DistributedDoubleBarrier
构造函数为:

public DistributedDoubleBarrier(CuratorFramework client,
                                String barrierPath,
                                int memberQty)
Creates the barrier abstraction. memberQty is the number of members in the barrier. When enter() is called, it blocks until
all members have entered. When leave() is called, it blocks until all members have left.

Parameters:
client - the client
barrierPath - path to use
memberQty - the number of members in the barrier

memberQty凡是成员数,当enter()方法为调用时,成员叫封堵,直到所有的积极分子还调用了enter()
leave()法让调用时,它吧死调用线程,直到有的成员都调用了leave()
就比如百米赛跑比赛, 发令枪响,
所有的选手开走,等有着的健儿跑过极端线,比赛才收。

DistributedDoubleBarrier会监控连接状态,当连接断掉时enter()leave()方法会抛来深。

public class DistributedDoubleBarrierDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/barrier";

    public static void main(String[] args) throws Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client, PATH, QTY);
                final int index = i;
                Callable<Void> task = () -> {

                    Thread.sleep((long) (3 * Math.random()));
                    System.out.println("Client #" + index + " enters");
                    barrier.enter();
                    System.out.println("Client #" + index + " begins");
                    Thread.sleep((long) (3000 * Math.random()));
                    barrier.leave();
                    System.out.println("Client #" + index + " left");
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
            Thread.sleep(Integer.MAX_VALUE);
        }
    }

}

参考资料:
《从PAXOS到ZOOKEEPER分布式一致性原理同执行》
《 跟着实例学习ZooKeeper的用法》博客系列

种仓库:
https://github.com/zjcscut/curator-seed
(其实本文的MD是带来导航目录[toc]的,比较好导航及每个章节,只是简书不支持,本文的MD原文放在项目之/resources/md目录下,有轻自取,文章之所以Typora编写,建议用Typora打开)

End on 2017-5-13 13:10.
Help yourselves!
本人是throwable,在广州努力,白天上班,晚上同双休不定时加班,晚上没事坚持写下博客。
盼我的篇章能让您带来收获,共勉。