抱怨责怪怨天尤人,抑或幻想有种神奇的魔力,让您突然变得顺遂,倒不如抛却不切实际的奇想,坦然接受生活着实存在着不周详。

2.运用Fluent风格的Api成立会话

骨干参数变为流式设置,1个列子如下:

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
        CuratorFrameworkFactory.builder()
                .connectString(connectionInfo)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .build();

结果,时间在每一站的分布,不是按本人的安顿开始展览,从翘首期盼到至死不变领略本身稳定迟到了,是心如火焚谩骂到根本绝望的历程。而留给出堵车的光阴,反倒一路封堵通行。

LeaderLatch

LeaderLatch有多个构造函数:

public LeaderLatch(CuratorFramework client, String latchPath)
public LeaderLatch(CuratorFramework client, String latchPath,  String id)

LeaderLatch的启动:

leaderLatch.start( );

假诺运转,LeaderLatch会和别的使用相同latch
path的任何LeaderLatch交涉,然后里面二个末段会被公投为leader,可以通过hasLeadership主意查看LeaderLatch实例是或不是leader:

leaderLatch.hasLeadership( ); //重临true表达当前实例是leader

就像JDK的CountDownLatch,
LeaderLatch在伸手成为leadership会block(阻塞),一旦不利用LeaderLatch了,必须调用close方法。
假使它是leader,会自由leadership, 别的的到场者将会选出二个leader。

public void await() throws InterruptedException,EOFException
/*Causes the current thread to wait until this instance acquires leadership
unless the thread is interrupted or closed.*/
public boolean await(long timeout,TimeUnit unit)throws InterruptedException

这七个处理:
LeaderLatch实例能够追加ConnectionStateListener来监听网络连接难点。 当
SUSPENDED 或 LOST 时,
leader不再认为自身或然leader。当LOST后连连重连后RECONNECTED,LeaderLatch会删除先前的ZNode然后再也成立一个。LeaderLatch用户必须考虑导致leadership丢失的三番五次难点。
强烈推荐你使用ConnectionStateListener。

一个LeaderLatch的施用例子:

public class LeaderLatchDemo extends BaseConnectionInfo {
    protected static String PATH = "/francis/leader";
    private static final int CLIENT_QTY = 10;


    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderLatch> examples = Lists.newArrayList();
        TestingServer server=new TestingServer();
        try {
            for (int i = 0; i < CLIENT_QTY; i++) {
                CuratorFramework client
                        = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3));
                clients.add(client);
                LeaderLatch latch = new LeaderLatch(client, PATH, "Client #" + i);
                latch.addListener(new LeaderLatchListener() {

                    @Override
                    public void isLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am Leader");
                    }

                    @Override
                    public void notLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am not Leader");
                    }
                });
                examples.add(latch);
                client.start();
                latch.start();
            }
            Thread.sleep(10000);
            LeaderLatch currentLeader = null;
            for (LeaderLatch latch : examples) {
                if (latch.hasLeadership()) {
                    currentLeader = latch;
                }
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
            currentLeader.close();

            Thread.sleep(5000);

            for (LeaderLatch latch : examples) {
                if (latch.hasLeadership()) {
                    currentLeader = latch;
                }
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
        } finally {
            for (LeaderLatch latch : examples) {
                if (null != latch.getState())
                CloseableUtils.closeQuietly(latch);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
        }
    }
}

能够添加test module的借助方便开始展览测试,不须求运转真实的zookeeper服务端:

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-test</artifactId>
            <version>2.12.0</version>
        </dependency>

先是我们创建了拾个LeaderLatch,运维后它们中的三个会被推选为leader。
因为公投会费用一些岁月,start后并不可能即时就赢得leader。
通过hasLeadership查看自个儿是不是是leader, 固然是的话重回true。
能够通过.getLeader().getId()能够收获当前的leader的ID。
只可以通过close获释当前的领导权。
await是三个绿灯方法, 尝试获取leader地位,然而未必能上位。

实则,小小的人儿也有自尊心。小编在书中独立设置了1个章节《怎么着防止孩子的忧虑》,把关心人格障碍的意见引向了男女,别小看小小的他们,对于老人家是不是幸福,心绪产生了变化,幼小的心灵都能感知到。

创制数量节点

Zookeeper的节点创造方式:

  • PERSISTENT:持久化
  • PERSISTENT_SEQUENTIAL:持久化并且带连串号
  • EPHEMERAL:临时
  • EPHEMERAL_SEQUENTIAL:一时并且带种类号

**创立四个节点,先阿布扎比容为空 **

client.create().forPath("path");

瞩目:要是没有安装节点属性,节点创立格局默许为持久化节点,内容暗中认可为空

创建二个节点,附带起头化内容

client.create().forPath("path","init".getBytes());

创造1个节点,钦定创制格局(一时半刻节点),内容为空

client.create().withMode(CreateMode.EPHEMERAL).forPath("path");

创办3个节点,内定创立形式(近来节点),附带早先化内容

client.create().withMode(CreateMode.EPHEMERAL).forPath("path","init".getBytes());

创设一个节点,钦命创制方式(权且节点),附带初始化内容,并且自动递归创立父节点

client.create()
      .creatingParentContainersIfNeeded()
      .withMode(CreateMode.EPHEMERAL)
      .forPath("path","init".getBytes());

其一creatingParentContainersIfNeeded()接口万分有用,因为相似景观开发人士在开创2个子节点必须认清它的父节点是或不是留存,如若不存在直接成立会抛出NoNodeException,使用creatingParentContainersIfNeeded()之后Curator能够自行递归成立全数所需的父节点。

学着作育积极心情,面对生存中的误解和人家的不承认,爱慕正在经历那么些痛心的和谐,在各个风雨来袭的随时,找到克服困难的能力。

Zookeeper客户端Curator使用详解

在明日多元的社会,竞争激烈,不仅成年人遭遇任何的难点,笔者在街上还四日两头境遇不服管教有个性的儿童,不惬意时即刻失声尖叫。面对这么的小婴孩,家长顿感颜面尽失大声斥责,孩子也会偷瞄一眼四周人群,随即用大声哭闹来抗击。

Curator食谱(高级个性)

升迁:首先你必须添加curator-recipes依赖,下文仅仅对recipes一些表征的选拔实行解释和举例,不打算进行源码级别的商讨

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

重在提示:强烈推荐使用ConnectionStateListener监察和控制连接的气象,当连接情形为LOST,curator-recipes下的具备Api将会失灵或许逾期,固然前边全数的例子都不曾运用到ConnectionStateListener。

图片来自互联网

多共享锁对象 —Multi Shared Lock

Multi Shared Lock是1个锁的器皿。 当调用acquire()
全部的锁都会被acquire(),假若请求失败,全数的锁都会被release。
同样调用release时享有的锁都被release(挫折被忽略)。
基本上,它就是组锁的象征,在它上面的请求释放操作都会传送给它含有的具有的锁。

根本涉及五个类:

  • InterProcessMultiLock
  • InterProcessLock

它的构造函数必要包蕴的锁的集合,恐怕一组ZooKeeper的path。

public InterProcessMultiLock(List<InterProcessLock> locks)
public InterProcessMultiLock(CuratorFramework client, List<String> paths)

用法和Shared Lock相同。

public class MultiSharedLockDemo {

    private static final String PATH1 = "/examples/locks1";
    private static final String PATH2 = "/examples/locks2";

    public static void main(String[] args) throws Exception {
        FakeLimitedResource resource = new FakeLimitedResource();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            InterProcessLock lock1 = new InterProcessMutex(client, PATH1);
            InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, PATH2);

            InterProcessMultiLock lock = new InterProcessMultiLock(Arrays.asList(lock1, lock2));

            if (!lock.acquire(10, TimeUnit.SECONDS)) {
                throw new IllegalStateException("could not acquire the lock");
            }
            System.out.println("has got all lock");

            System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());
            System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());

            try {
                resource.use(); //access resource exclusively
            } finally {
                System.out.println("releasing the lock");
                lock.release(); // always release the lock in a finally block
            }
            System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());
            System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());
        }
    }
}

新建2个InterProcessMultiLock, 包涵三个重入锁和三个非重入锁。
调用acquire()后能够看来线程同时具备了那多少个锁。
调用release()看看那七个锁都被假释了。

最后再反复一次,
强烈推荐使用ConnectionStateListener监察和控制连接的气象,当连接景况为LOST,锁将会丢掉。

你怎么老是不安心乐意?

数量节点操作

真正左右我们心理的不是遇到中的人和事,而是大家对各类遭遇的体会和平解决读。

Path Cache

Path Cache用来监督八个ZNode的子节点. 当叁个子节点增添, 更新,删除时,
Path Cache会改变它的景观, 会包罗最新的子节点,
子节点的数据和情景,而事态的更变将通过PathChildrenCacheListener文告。

事实上应用时会涉及到四个类:

  • PathChildrenCache
  • PathChildrenCacheEvent
  • PathChildrenCacheListener
  • ChildData

经过上面包车型客车构造函数创制Path Cache:

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)

想使用cache,必须调用它的start主意,使用完后调用close艺术。
能够安装StartMode来完成运行的形式,

StartMode有上面三种:

  1. NOPRADOMAL:不奇怪初叶化。
  2. BUILD_INITIAL_CACHE:在调用start()事先会调用rebuild()
  3. POST_INITIALIZED_EVENT:
    当Cache早先化数据后发送2个PathChildrenCache伊夫nt.Type#INITIALIZED事件

public void addListener(PathChildrenCacheListener listener)可以追加listener监听缓存的变迁。

getCurrentData()格局再次回到二个List<ChildData>目的,能够遍历全数的子节点。

设置/更新、移除其实是利用client (CuratorFramework)来操作,
不经过PathChildrenCache操作:

public class PathCacheDemo {

    private static final String PATH = "/example/pathCache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        PathChildrenCache cache = new PathChildrenCache(client, PATH, true);
        cache.start();
        PathChildrenCacheListener cacheListener = (client1, event) -> {
            System.out.println("事件类型:" + event.getType());
            if (null != event.getData()) {
                System.out.println("节点数据:" + event.getData().getPath() + " = " + new String(event.getData().getData()));
            }
        };
        cache.getListenable().addListener(cacheListener);
        client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test01", "01".getBytes());
        Thread.sleep(10);
        client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test02", "02".getBytes());
        Thread.sleep(10);
        client.setData().forPath("/example/pathCache/test01", "01_V2".getBytes());
        Thread.sleep(10);
        for (ChildData data : cache.getCurrentData()) {
            System.out.println("getCurrentData:" + data.getPath() + " = " + new String(data.getData()));
        }
        client.delete().forPath("/example/pathCache/test01");
        Thread.sleep(10);
        client.delete().forPath("/example/pathCache/test02");
        Thread.sleep(1000 * 5);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:如果new PathChildrenCache(client, PATH,
true)中的参数cacheData值设置为false,则示例中的event.getData().getData()、data.getData()将回来null,cache将不会缓存节点数据。

注意:示范中的Thread.sleep(10)能够注释掉,然而注释后事件监听的触发次数会不全,那说不定与PathCache的完成原理有关,不能够太过数次的触及事件!

把过去经历的不完美的事,供给升级和改进的方向多指向友好,而不是把富有的错和权力和义务都一股脑推给别人。也唯有那样,过去的苦处才会演化成今后的财物。

预先级分布式队列—DistributedPriorityQueue

先行级队列对队列中的成分依据事先级进行排序。 Priority越小,
成分越靠前, 越先被消费掉
。 它涉及上边多少个类:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedPriorityQueue

透过builder.buildPriorityQueue(minItemsBeforeRefresh)方法创设。
当优先级队列获得成分增加和删除音讯时,它会暂停处理当下的要素队列,然后刷新队列。minItemsBeforeRefresh钦命刷新前当前活动的种类的细微数量。
首要安装你的次第能够容忍的不排序的纤维值。

放入队列时索要钦命优先级:

queue.put(aMessage, priority);

例子:

public class DistributedPriorityQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedPriorityQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildPriorityQueue(0);
            queue.start();

            for (int i = 0; i < 10; i++) {
                int priority = (int) (Math.random() * 100);
                System.out.println("test-" + i + " priority:" + priority);
                queue.put("test-" + i, priority);
                Thread.sleep((long) (50 * Math.random()));
            }

            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                Thread.sleep(1000);
                System.out.println("consume one message: " + message);
            }

        };
    }

}

神跡你恐怕会有错觉,优先级设置并不曾起效。那是因为事先级是对此队列积压的成分而言,假诺消费速度过快有也许出现在后多个要素入队操作从前前3个成分已经被消费,那种情景下DistributedPriorityQueue会退化为DistributedQueue。

大家经历过气团雾缭绕嘈杂吵闹的烦乱,才能深切回味到平静生活的美,它依然淡而无味的白开水,却不觉乏味。在四个个常常生活里,最能滋养你的身心,变得不可或缺。

DistributedBarrier

DistributedBarrier类完毕了栅栏的功能。 它的构造函数如下:

public DistributedBarrier(CuratorFramework client, String barrierPath)
Parameters:
client - client
barrierPath - path to use as the barrier

率先你须求安装栅栏,它将卡住在它上面等待的线程:

setBarrier();

然后需求阻塞的线程调用方法等待放行条件:

public void waitOnBarrier()

当规则满意时,移除栅栏,全部等待的线程将继续执行:

removeBarrier();

越发处理 Distributed巴里r
会监察和控制连接情状,当连接断掉时waitOnBarrier()方法会抛出十一分。

public class DistributedBarrierDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/barrier";

    public static void main(String[] args) throws Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            DistributedBarrier controlBarrier = new DistributedBarrier(client, PATH);
            controlBarrier.setBarrier();

            for (int i = 0; i < QTY; ++i) {
                final DistributedBarrier barrier = new DistributedBarrier(client, PATH);
                final int index = i;
                Callable<Void> task = () -> {
                    Thread.sleep((long) (3 * Math.random()));
                    System.out.println("Client #" + index + " waits on Barrier");
                    barrier.waitOnBarrier();
                    System.out.println("Client #" + index + " begins");
                    return null;
                };
                service.submit(task);
            }
            Thread.sleep(10000);
            System.out.println("all Barrier instances should wait the condition");
            controlBarrier.removeBarrier();
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

            Thread.sleep(20000);
        }
    }
}

那几个例子创造了controlBarrier来安装栅栏和移除栅栏。
大家创设了六个线程,在此巴里r上等待。
最终移除栅栏后有所的线程才继续执行。

假若您起来不安装栅栏,全体的线程就不会阻塞住。

在那本《不到家,才美》书中,她将告诉咱们怎样与负面心绪友好相处,怎么着用科学的主意疏导负面情感,生活中不可制止地遭受不乐意的事,自个儿该怎么着实市场价格绪梳理,书中还提供了简单易学易操作的多少个步骤。

自笔者批评节点是或不是留存

client.checkExists().forPath("path");

注意:该方法重回二个Stat实例,用于检查ZNode是还是不是留存的操作.
能够调用额外的章程(监察和控制也许后台处理)并在终极调用forPath(
)钦点要操作的ZNode

日更第⑨十天

启航客户端

当创设会话成功,获得client的实例然后能够一向调用其start( )方法:

client.start();

过去的不周密,后悔、遗憾与否都已成过去,首要的是面对今后和以往不周到时,怎么着找到一种积极的能量。

Tree Cache

Tree
Cache能够监督全数树上的具备节点,类似于PathCache和NodeCache的结缘,重要涉嫌到上面五个类:

  • TreeCache – Tree Cache实现类
  • TreeCacheListener – 监听器类
  • TreeCache伊芙nt – 触发的事件类
  • ChildData – 节点数据

public class TreeCacheDemo {

    private static final String PATH = "/example/cache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        client.create().creatingParentsIfNeeded().forPath(PATH);
        TreeCache cache = new TreeCache(client, PATH);
        TreeCacheListener listener = (client1, event) ->
                System.out.println("事件类型:" + event.getType() +
                        " | 路径:" + (null != event.getData() ? event.getData().getPath() : null));
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:在此示例中一贯不使用Thread.sleep(10),不过事件触发次数也是例行的。

注意:TreeCache在早先化(调用start()措施)的时候会回调TreeCacheListener实例1个事TreeCache伊夫nt,而回调的TreeCache伊芙nt对象的Type为INITIALIZED,ChildData为null,此时event.getData().getPath()很有恐怕导致空指针万分,那里应该积极处理并防止那种场地。

心态作为身体的“隐形杀手”,负面情感郁积太多轻则引起家庭争论,重则引发焦虑抑郁,而略带担忧和烦躁相伴相随的,长时间担忧还和糖尿病呼吸道疾病皮肤病有提到。

缓存

Zookeeper原生帮忙通过挂号沃特cher来进行事件监听,不过开发者供给频仍注册(Watcher只好单次注册单次使用)。Cache是Curator中对事件监听的包裹,能够用作是对事件监听的地面缓存视图,能够自行为开发者处理反复注册监听。Curator提供了三种沃特cher(Cache)来监听结点的转变。

我土黑大学生有过多年文学从业经验,她创作的那本书《不圆满,才美》得到了杨澜(Yang Lan)新东方董事长俞敏洪苏岑等55人专家心绪专家作家一致推举。

分布式延迟队列—DistributedDelayQueue

JDK中也有DelayQueue,不知道您是或不是熟知。
DistributedDelayQueue也提供了类似的功效, 成分有个delay值,
消费者隔一段时间才能接到成分。 涉及到下边四个类。

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedDelayQueue

经过下边包车型地铁讲话创造:

QueueBuilder<MessageType>    builder = QueueBuilder.builder(client, consumer, serializer, path);
... more builder method calls as needed ...
DistributedDelayQueue<MessageType> queue = builder.buildDelayQueue();

放入成分时能够钦定delayUntilEpoch

queue.put(aMessage, delayUntilEpoch);

注意delayUntilEpoch不是离将来的二个时间距离,
比如20阿秒,而是以往的一个时日戳,如 System.currentTimeMillis() + 10秒。
如若delayUntilEpoch的小时已经谢世,音信会立马被消费者收到。

public class DistributedDelayQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedDelayQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildDelayQueue();
            queue.start();

            for (int i = 0; i < 10; i++) {
                queue.put("test-" + i, System.currentTimeMillis() + 10000);
            }
            System.out.println(new Date().getTime() + ": already put all items");


            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println(new Date().getTime() + ": consume one message: " + message);
            }

        };
    }
}

装有的退步和折磨,都以人生的历炼,它或早或晚总会产出。爱是一种力量,幸福也是一种能力。

SimpleDistributedQueue

前方尽管实现了各个队列,可是你注意到没有,那几个队列并不曾兑现类似JDK一样的接口。
SimpleDistributedQueue提供了和JDK基本一致的接口(可是并未落到实处Queue接口)。
创制很简单:

public SimpleDistributedQueue(CuratorFramework client,String path)

追日成分:

public boolean offer(byte[] data) throws Exception

剔除成分:

public byte[] take() throws Exception

除此以外还提供了任何方式:

public byte[] peek() throws Exception
public byte[] poll(long timeout, TimeUnit unit) throws Exception
public byte[] poll() throws Exception
public byte[] remove() throws Exception
public byte[] element() throws Exception

没有add方法, 多了take方法。

take办法在功成名就再次来到从前会被堵塞。
poll格局在队列为空时直接回到null。

public class SimpleDistributedQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        SimpleDistributedQueue queue;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));
            client.start();
            queue = new SimpleDistributedQueue(client, PATH);
            Producer producer = new Producer(queue);
            Consumer consumer = new Consumer(queue);
            new Thread(producer, "producer").start();
            new Thread(consumer, "consumer").start();
            Thread.sleep(20000);
        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    public static class Producer implements Runnable {

        private SimpleDistributedQueue queue;

        public Producer(SimpleDistributedQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                try {
                    boolean flag = queue.offer(("zjc-" + i).getBytes());
                    if (flag) {
                        System.out.println("发送一条消息成功:" + "zjc-" + i);
                    } else {
                        System.out.println("发送一条消息失败:" + "zjc-" + i);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static class Consumer implements Runnable {

        private SimpleDistributedQueue queue;

        public Consumer(SimpleDistributedQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                byte[] datas = queue.take();
                System.out.println("消费一条消息成功:" + new String(datas, "UTF-8"));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }


}

但是其实发送了100条新闻,消费完第③条之后,前面包车型客车消息不能够消费,近期没找到原因。查看一下法定文书档案推荐的demo使用下边几个Api:

Creating a SimpleDistributedQueue

public SimpleDistributedQueue(CuratorFramework client,
                              String path)
Parameters:
client - the client
path - path to store queue nodes
Add to the queue

public boolean offer(byte[] data)
             throws Exception
Inserts data into queue.
Parameters:
data - the data
Returns:
true if data was successfully added
Take from the queue

public byte[] take()
           throws Exception
Removes the head of the queue and returns it, blocks until it succeeds.
Returns:
The former head of the queue
NOTE: see the Javadoc for additional methods

不过事实上行使发现依然存在消费阻塞难题。

“金无足赤,人无完人”,大家自家就是不完美的,那为啥对团结好吃懒做不求学不发展,却严格要求别人完美呢?你继续在追求完善的路上一路狂奔,不回头,照旧接受生活自然正是风雨阳光交叠的不到家,并且脚踏实地学习怎么样去面对。

信号量—Shared Semaphore

二个计数的信号量类似JDK的Semaphore。
JDK中Semaphore维护的一组认同(permits),而Curator中称之为租约(Lease)。
有二种方法能够决定semaphore的最大租约数。第1种艺术是用户给定path并且钦命最大LeaseSize。第3种格局用户给定path并且动用SharedCountReader类。一旦不采用SharedCountReader,
必须保证拥有实例在多进度中运用同样的(最大)租约数量,不然有或者出现A过程中的实例持有最大租约数量为10,不过在B进程中具有的最大租约数量为20,此时租约的意思就失效了。

此次调用acquire()会回来3个租约对象。
客户端必须在finally中close那个租约对象,不然那一个租约会丢失掉。 可是,
但是,若是客户端session由于某种原因比如crash丢掉,
那么这几个客户端持有的租约会自动close,
那样任何客户端能够持续行使这一个租约。 租约仍可以够透过下边包车型大巴法子返还:

public void returnAll(Collection<Lease> leases)
public void returnLease(Lease lease)

只顾你能够叁遍性请求多个租约,借使Semaphore当前的租约不够,则呼吁线程会被打断。
同时还提供了晚点的重载方法。

public Lease acquire()
public Collection<Lease> acquire(int qty)
public Lease acquire(long time, TimeUnit unit)
public Collection<Lease> acquire(int qty, long time, TimeUnit unit)

Shared Semaphore使用的最首要类包罗上边多少个:

  • InterProcessSemaphoreV2
  • Lease
  • SharedCountReader

public class InterProcessSemaphoreDemo {

    private static final int MAX_LEASE = 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        FakeLimitedResource resource = new FakeLimitedResource();
        try (TestingServer server = new TestingServer()) {

            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, PATH, MAX_LEASE);
            Collection<Lease> leases = semaphore.acquire(5);
            System.out.println("get " + leases.size() + " leases");
            Lease lease = semaphore.acquire();
            System.out.println("get another lease");

            resource.use();

            Collection<Lease> leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS);
            System.out.println("Should timeout and acquire return " + leases2);

            System.out.println("return one lease");
            semaphore.returnLease(lease);
            System.out.println("return another 5 leases");
            semaphore.returnAll(leases);
        }
    }
}

率先大家先获得了6个租约, 最后大家把它还给了semaphore。
接着请求了多个租约,因为semaphore还有四个租约,所以恳请能够满足,再次来到三个租约,还剩6个租约。
然后再请求三个租约,因为租约不够,卡住到过期,如故没能满意,再次回到结果为null(租约不足会阻塞到过期,然后重临null,不会主动抛出至极;要是不安装超时时间,会一如既往阻塞)。

上边说讲的锁都是正义锁(fair)。 总ZooKeeper的角度看,
每种客户端都依据请求的逐一得到锁,不设有非公平的侵占的图景。

人在公私,无论你身处高校依旧哪个集团,COO同事同学,总有那么一五个不协调音;人在家中,更年期长辈的督促,恶性难改的朋友,不令人方便的孩子,总遇到令人不快意的事。

创建会话

平等是早起上班,有人做最坏的打算早出来半钟头,留出路上耽误堵车的岁月;有人掐着点依旧非常大心晚出来几分钟,渴望路上有神乎其神的顺风。

获得某些节点的全部子节点路径

client.getChildren().forPath("path");

只顾:该方法的重回值为List<String>,得到ZNode的子节点Path列表。
能够调用额外的艺术(监察和控制、后台处理或然取得状态watch, background or get
stat) 并在终极调用forPath()钦赐要操作的父ZNode

无戒极限挑衅练习营写作战演练练

履新数据节点数据

立异一个节点的数目内容

client.setData().forPath("path","data".getBytes());

留神:该接口会回来一个Stat实例

更新贰个节点的数据内容,强制钦点版本实行翻新

client.setData().withVersion(10086).forPath("path","data".getBytes());

成长,不是看清了众多事,而是看轻了众多事。

Node Cache

Node Cache与Path Cache类似,Node
Cache只是监听某贰个一定的节点。它事关到下边包车型大巴五个类:

  • NodeCache – Node Cache实现类
  • NodeCacheListener – 节点监听器
  • ChildData – 节点数据

注意:运用cache,如故要调用它的start()方法,使用完后调用close()方法。

getCurrentData()将赢得节点当前的情事,通过它的事态能够获得当前的值。

public class NodeCacheDemo {

    private static final String PATH = "/example/cache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        client.create().creatingParentsIfNeeded().forPath(PATH);
        final NodeCache cache = new NodeCache(client, PATH);
        NodeCacheListener listener = () -> {
            ChildData data = cache.getCurrentData();
            if (null != data) {
                System.out.println("节点数据:" + new String(cache.getCurrentData().getData()));
            } else {
                System.out.println("节点被删除!");
            }
        };
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:以身作则中的Thread.sleep(10)能够注释,不过注释后事件监听的触发次数会不全,那大概与NodeCache的达成原理有关,不能够太过多次的触及事件!

注意:NodeCache只好监听一个节点的情状变化。

要是不清楚自身哪些地方需求立异,丝毫不尝试改变,那就根本不设有二零一八年霉运连连,转过年就如换了一位,突然之间一切时来运行。

事务

CuratorFramework的实例包蕴inTransaction(
)接口方法,调用此方法开启二个ZooKeeper事务. 能够复合create, setData,
check, and/or delete
等操作然后调用commit()作为一个原子操作提交。一个例子如下:

client.inTransaction().check().forPath("path")
      .and()
      .create().withMode(CreateMode.EPHEMERAL).forPath("path","data".getBytes())
      .and()
      .setData().withVersion(10086).forPath("path","data2".getBytes())
      .and()
      .commit();

那是大家生活的机要,也是本书的优点,静观自我关切是行得通地消除自作者痛心的法门。

分布式队列

选拔Curator也足以简化Ephemeral Node
(权且节点)的操作。Curator也提供ZK Recipe的分布式队列实现。 利用ZK的
PE奥迪Q5SISTENTS_EQUENTIAL节点,
可以保险放入到行列中的项目是鲁人持竿顺序排队的。
假若单纯的买主从队列中取数据, 那么它是先入先出的,这也是队列的特点。
要是你严峻须求顺序,你就的应用单一的主顾,能够应用Leader公投只让Leader作为唯一的买主。

但是, 依据Netflix的Curator小编所说,
ZooKeeper真心不切合做Queue,或然说ZK没有完结3个好的Queue,详细内容能够看
Tech Note
4

原因有五:

  1. ZK有1MB 的传输限制。
    实践中ZNode必须相对较小,而队列包罗众多的音信,十分大。
  2. 假使有那多少个节点,ZK运维时一点也不慢。 而使用queue会导致不可枚举ZNode.
    你须要显著增大 initLimit 和 syncLimit.
  3. ZNode相当的大的时候很难清理。Netflix不得不创立了一个特地的程序做那事。
  4. 当非常的大方的包蕴众多的子节点的ZNode时, ZK的性质变得不得了
  5. ZK的数据库完全放在内部存款和储蓄器中。 大量的Queue意味着会占有很多的内部存款和储蓄器空间。

固然, Curator依然创立了各种Queue的落到实处。
假诺Queue的数据量不太多,数据量不太大的情形下,酌情考虑,还是得以行使的。

那么些,都以针对性那么些偶尔心思倒霉的人,已被诊断患上焦虑抑郁等动感疾患的人,也无须太优伤,癔症就和头痛一样
,并不是洪涝猛兽,也不应当对它抱有偏见。

可重入共享锁—Shared Reentrant Lock

Shared意味着锁是全局可知的, 客户端都足以请求锁。
Reentrant和JDK的ReentrantLock类似,即可重入,
意味着同二个客户端在享有锁的同时,可以屡屡取得,不会被打断。
它是由类InterProcessMutex来兑现。 它的构造函数为:

public InterProcessMutex(CuratorFramework client, String path)

通过acquire()赢得锁,并提供超时机制:

public void acquire()
Acquire the mutex - blocking until it's available. Note: the same thread can call acquire
re-entrantly. Each call to acquire must be balanced by a call to release()

public boolean acquire(long time,TimeUnit unit)
Acquire the mutex - blocks until it's available or the given time expires. Note: the same thread can call acquire re-entrantly. Each call to acquire that returns true must be balanced by a call to release()

Parameters:
time - time to wait
unit - time unit
Returns:
true if the mutex was acquired, false if not

通过release()方法释放锁。 InterProcessMutex 实例能够采用。

Revoking ZooKeeper recipes wiki定义了可协商的撤消机制。
为了打消mutex, 调用上面包车型地铁法子:

public void makeRevocable(RevocationListener<T> listener)
将锁设为可撤销的. 当别的进程或线程想让你释放锁时Listener会被调用。
Parameters:
listener - the listener

如若您请求撤废当前的锁,
调用attemptRevoke()措施,注意锁释放时RevocationListener将会回调。

public static void attemptRevoke(CuratorFramework client,String path) throws Exception
Utility to mark a lock for revocation. Assuming that the lock has been registered
with a RevocationListener, it will get called and the lock should be released. Note,
however, that revocation is cooperative.
Parameters:
client - the client
path - the path of the lock - usually from something like InterProcessMutex.getParticipantNodes()

3遍提示:错误处理
依旧强烈推荐你选用ConnectionStateListener拍卖连接景况的变更。
当连接LOST时您不再持有锁。

率先让我们创造3个效仿的共享财富,
这一个能源期望只可以单线程的造访,不然会有出现难题。

public class FakeLimitedResource {
    private final AtomicBoolean inUse = new AtomicBoolean(false);

    public void use() throws InterruptedException {
        // 真实环境中我们会在这里访问/维护一个共享的资源
        //这个例子在使用锁的情况下不会非法并发异常IllegalStateException
        //但是在无锁的情况由于sleep了一段时间,很容易抛出异常
        if (!inUse.compareAndSet(false, true)) {
            throw new IllegalStateException("Needs to be used by one client at a time");
        }
        try {
            Thread.sleep((long) (3 * Math.random()));
        } finally {
            inUse.set(false);
        }
    }
}

接下来创制一个InterProcessMutexDemo类, 它肩负请求锁,
使用能源,释放锁那样3个完好无损的访问进度。

public class InterProcessMutexDemo {

    private InterProcessMutex lock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public InterProcessMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        this.lock = new InterProcessMutex(client, lockPath);
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        if (!lock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " could not acquire the lock");
        }
        try {
            System.out.println(clientName + " get the lock");
            resource.use(); //access resource exclusively
        } finally {
            System.out.println(clientName + " releasing the lock");
            lock.release(); // always release the lock in a finally block
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final InterProcessMutexDemo example = new InterProcessMutexDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
    }
}

代码也很不难,生成10个client, 每种client重复执行15回请求锁–访问能源–释放锁的进程。每种client都在单独的线程中。
结果可以看来,锁是轻易的被各种实例排他性的行使。

既然如此是可选拔的,你能够在四个线程中多次调用acquire(),在线程拥有锁时它总是回到true。

您不应有在三个线程中用同2个InterProcessMutex
你能够在各样线程中都生成三个新的InterProcessMutex实例,它们的path都如出一辙,那样它们能够共享同多少个锁。


分布式计数器

顾名思义,计数器是用来计数的,
利用ZooKeeper能够兑现二个集群共享的计数器。
只要使用同样的path就足以获取最新的计数器值,
那是由ZooKeeper的一致性保障的。Curator有五个计数器,
四个是用int来计数(SharedCount),一个用long来计数(DistributedAtomicLong)。

追忆此前,每一个人都有狼狈的随时,也不可制止在早就的时日里留下遗憾。难受与泪水,希望与震撼,未来和前景内需大家用一颗真心去追随。怎样让投机变得分裂于现在,让空虚吞噬啃咬过的心,变成宁静缤纷的园林,让时光见证本身的成人。

分布式int计数器—SharedCount

其一类应用int类型来计数。 首要涉及多个类。

  • SharedCount
  • SharedCountReader
  • SharedCountListener

SharedCount表示计数器,
能够为它扩展二个SharedCountListener,当计数器改变时此Listener可以监听到改变的风云,而SharedCountReader能够读取到新型的值,
包蕴字面值和带版本新闻的值VersionedValue。

public class SharedCounterDemo implements SharedCountListener {

    private static final int QTY = 5;
    private static final String PATH = "/examples/counter";

    public static void main(String[] args) throws IOException, Exception {
        final Random rand = new Random();
        SharedCounterDemo example = new SharedCounterDemo();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            SharedCount baseCount = new SharedCount(client, PATH, 0);
            baseCount.addListener(example);
            baseCount.start();

            List<SharedCount> examples = Lists.newArrayList();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final SharedCount count = new SharedCount(client, PATH, 0);
                examples.add(count);
                Callable<Void> task = () -> {
                    count.start();
                    Thread.sleep(rand.nextInt(10000));
                    System.out.println("Increment:" + count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10)));
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

            for (int i = 0; i < QTY; ++i) {
                examples.get(i).close();
            }
            baseCount.close();
        }
        Thread.sleep(Integer.MAX_VALUE);
    }

    @Override
    public void stateChanged(CuratorFramework arg0, ConnectionState arg1) {
        System.out.println("State changed: " + arg1.toString());
    }

    @Override
    public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {
        System.out.println("Counter's value is changed to " + newCount);
    }
}

在这些例子中,大家运用baseCount来监听计数值(addListener措施来添加SharedCountListener
)。 任意的SharedCount, 只要利用同样的path,都足以获取那一个计数值。
然后我们使用多个线程为计数值扩充三个10以内的随机数。相同的path的SharedCount对计数值实行变更,将会回调给baseCount的SharedCountListener。

count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10))

此处大家使用trySetCount去设置计数器。
第1个参数提供当前的VersionedValue,如若中间其它client更新了此计数值,
你的更新恐怕不成功,
可是此时你的client更新了流行的值,所以退步了你能够尝试再更新1回。
setCount是劫持更新计数器的值

留神计数器必须start,使用完今后必须调用close关闭它。

强烈推荐使用ConnectionStateListener
在本例中SharedCountListener扩展ConnectionStateListener

远近著名活着条件比在此之前更好,幸福感却很少,总认为温馨最苦,幸福在别处。

3.创设包含隔断命名空间的对话

为了促成差别的Zookeeper业务之间的隔开分离,供给为每种事情分配一个单独的命名空间(NameSpace),即钦点三个Zookeeper的根路径(官方术语:为Zookeeper添加“Chroot”特性)。例如(上边包车型地铁例证)当客户端内定了单独命名空间为“/base”,那么该客户端对Zookeeper上的数据节点的操作都以依据该目录举办的。通过安装Chroot能够将客户端应用与Zookeeper服务端的一课子树相对应,在三个应用共用多个Zookeeper集群的情景下,那对于落到实处不一致选拔之间的互动隔开十分有意义。

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
        CuratorFrameworkFactory.builder()
                .connectString(connectionInfo)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .namespace("base")
                .build();

分布式队列—DistributedQueue

DistributedQueue是最常见的一种队列。 它设计以下八个类:

  • QueueBuilder – 成立队列使用QueueBuilder,它也是其它队列的创导类
  • QueueConsumer – 队列中的新闻消费者接口
  • QueueSerializer –
    队列音讯体系化和反类别化接口,提供了对队列中的对象的体系化和反类别化
  • DistributedQueue – 队列完成类

QueueConsumer是顾客,它能够接收队列的多寡。处理队列中的数据的代码逻辑能够放在QueueConsumer.consumeMessage()中。

例市场价格形下先将新闻从队列中移除,再提交消费者消费。但那是三个步骤,不是原子的。能够调用Builder的lockPath()消费者加锁,当消费者消费数据时持有锁,那样任何消费者不能够消费此音信。如若消费战败大概经过死掉,新闻能够提交其他进程。那会拉动一些个性的损失。最好依旧单消费者方式应用队列。

public class DistributedQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework clientA = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        clientA.start();
        CuratorFramework clientB = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        clientB.start();
        DistributedQueue<String> queueA;
        QueueBuilder<String> builderA = QueueBuilder.builder(clientA, createQueueConsumer("A"), createQueueSerializer(), PATH);
        queueA = builderA.buildQueue();
        queueA.start();

        DistributedQueue<String> queueB;
        QueueBuilder<String> builderB = QueueBuilder.builder(clientB, createQueueConsumer("B"), createQueueSerializer(), PATH);
        queueB = builderB.buildQueue();
        queueB.start();
        for (int i = 0; i < 100; i++) {
            queueA.put(" test-A-" + i);
            Thread.sleep(10);
            queueB.put(" test-B-" + i);
        }
        Thread.sleep(1000 * 10);// 等待消息消费完成
        queueB.close();
        queueA.close();
        clientB.close();
        clientA.close();
        System.out.println("OK!");
    }

    /**
     * 队列消息序列化实现类
     */
    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {
            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }
        };
    }

    /**
     * 定义队列消费者
     */
    private static QueueConsumer<String> createQueueConsumer(final String name) {
        return new QueueConsumer<String>() {
            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("连接状态改变: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("消费消息(" + name + "): " + message);
            }
        };
    }
}

事例中定义了五个分布式队列和多个顾客,因为PATH是一模一样的,会设有消费者抢占消费音信的情景。

LeaderSelector

LeaderSelector使用的时候根本涉嫌下边几个类:

  • LeaderSelector
  • LeaderSelectorListener
  • LeaderSelectorListenerAdapter
  • CancelLeadershipException

核心类是LeaderSelector,它的构造函数如下:

public LeaderSelector(CuratorFramework client, String mutexPath,LeaderSelectorListener listener)
public LeaderSelector(CuratorFramework client, String mutexPath, ThreadFactory threadFactory, Executor executor, LeaderSelectorListener listener)

类似LeaderLatch,LeaderSelector必须start: leaderSelector.start();
一旦运转,当实例取得领导权时您的listener的takeLeadership()艺术被调用。而takeLeadership()方法唯有领导权被放飞时才重返。
当你不再使用LeaderSelector实例时,应该调用它的close方法。

老大处理
LeaderSelectorListener类继承ConnectionStateListener。LeaderSelector必须小心连接情形的变更。假如实例成为leader,
它应当响应SUSPENDED 或 LOST。 当 SUSPENDED 状态出现时,
实例必须假定在再次连接成功以前它或然不再是leader了。 倘使LOST状态出现,
实例不再是leader, takeLeadership方法再次来到。

重要: 推荐处理格局是当接受SUSPENDED 或
LOST时抛出CancelLeadershipException格外.。这会招致LeaderSelector实例中断并吊销执行takeLeadership方法的相当.。那13分重庆大学,
你不可能不考虑扩展LeaderSelectorListener艾达pter.
LeaderSelectorListenerAdapter提供了推介的拍卖逻辑。

下边包车型大巴3个例子摘抄自官方:

public class LeaderSelectorAdapter extends LeaderSelectorListenerAdapter implements Closeable {
    private final String name;
    private final LeaderSelector leaderSelector;
    private final AtomicInteger leaderCount = new AtomicInteger();

    public LeaderSelectorAdapter(CuratorFramework client, String path, String name) {
        this.name = name;
        leaderSelector = new LeaderSelector(client, path, this);
        leaderSelector.autoRequeue();
    }

    public void start() throws IOException {
        leaderSelector.start();
    }

    @Override
    public void close() throws IOException {
        leaderSelector.close();
    }

    @Override
    public void takeLeadership(CuratorFramework client) throws Exception {
        final int waitSeconds = (int) (5 * Math.random()) + 1;
        System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
        System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");
        try {
            Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
        } catch (InterruptedException e) {
            System.err.println(name + " was interrupted.");
            Thread.currentThread().interrupt();
        } finally {
            System.out.println(name + " relinquishing leadership.\n");
        }
    }
}

你能够在takeLeadership实行职务的分配等等,并且不要回来,假使你想要要此实例平素是leader的话能够加2个死循环。调用
leaderSelector.autoRequeue();管教在此实例释放领导权之后还只怕获得领导权。
在这边大家运用AtomicInteger来记录此client得到领导权的次数, 它是”fair”,
每一种client有雷同的火候取得领导权。

public class LeaderSelectorDemo {

    protected static String PATH = "/francis/leader";
    private static final int CLIENT_QTY = 10;


    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderSelectorAdapter> examples = Lists.newArrayList();
        TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < CLIENT_QTY; i++) {
                CuratorFramework client
                        = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3));
                clients.add(client);
                LeaderSelectorAdapter selectorAdapter = new LeaderSelectorAdapter(client, PATH, "Client #" + i);
                examples.add(selectorAdapter);
                client.start();
                selectorAdapter.start();
            }
            System.out.println("Press enter/return to quit\n");
            new BufferedReader(new InputStreamReader(System.in)).readLine();
        } finally {
            System.out.println("Shutting down...");
            for (LeaderSelectorAdapter exampleClient : examples) {
                CloseableUtils.closeQuietly(exampleClient);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
            CloseableUtils.closeQuietly(server);
        }
    }
}

对待能够,LeaderLatch必须调用close()艺术才会放出领导权,而对此LeaderSelector,通过LeaderSelectorListener能够对领导权进行控制,
在适合的时候释放领导权,那样各种节点都有大概取得领导权。从而,LeaderSelector具有更好的油滑和可控性,提议有LeaderElection应用场景下优先利用LeaderSelector。

Leader选举

在分布式总结中, leader elections是很重庆大学的一个功能,
那几个大选进程是那样子的: 指派一个经过作为协会者,将任务分发给各节点。
在职分初步前,
哪个节点都不亮堂何人是leader(领导者)或然coordinator(协调者).
当选举算法发轫推行后, 各类节点最终会收获三个唯一的节点作为义务leader.
除此之外,
大选还日常会时有产生在leader意外宕机的动静下,新的leader要被公投出来。

在zookeeper集群中,leader负责写操作,然后经过Zab磋商落到实处follower的同步,leader也许follower都足以拍卖读操作。

Curator
有两种leader选举的recipe,分别是LeaderSelectorLeaderLatch

前端是有着存活的客户端不间断的轮流做Leader,宿州社会。后者是假设选举出Leader,除非有客户端挂掉重新触发大选,不然不会交出领导权。某党?

简介

Curator是Netflix集团开源的一套zookeeper客户端框架,解决了诸多Zookeeper客户端非凡底层的细节开发工作,包罗一连重连、反复注册Watcher和NodeExistsException卓殊等等。Patrixck
Hunt(Zookeeper)以一句“Guava is to Java that Curator to
Zookeeper”给Curator予中度评价。
引子和趣闻:
Zookeeper名字的来由是比较有趣的,下边包车型地铁一些摘抄自《从PAXOS到ZOOKEEPERAV五分布式一致性原理与履行》一书:
Zookeeper最早起点于雅虎的研商院的二个探究小组。在当下,商讨人口发现,在雅虎内部很多大型的系统必要依靠2个像样的连串开始展览分布式协调,不过那个系统往往存在分布式单点难题。所以雅虎的开发人士就试图开发一个通用的无单点难点的分布式协调框架。在立项初期,考虑到广大连串都以用动物的名字来命名的(例如闻名的Pig项目),雅虎的工程师希望给那个类型也取二个动物的名字。时任研商院的首席物经济学家Raghu
Ramakrishnan开玩笑说:再那样下去,大家那时候就改成动物园了。此话一出,我们纷繁表示就叫动物园管理员吧——因为各样以动物命名的分布式组件放在一块儿,雅虎的全体分布式系统看上去就如二个重型的动物园了,而Zookeeper正好用来开始展览分布式环境的和谐——于是,Zookeeper的名字因此诞生了。

Curator无疑是Zookeeper客户端中的瑞士联邦军刀,它译作”馆长”只怕”管理者”,不驾驭是否支付小组有意而为之,我推测有恐怕那样命名的原委是注脚Curator正是Zookeeper的馆长(脑洞有点大:Curator正是动物园的园长)。
Curator包蕴了多少个包:
curator-framework:对zookeeper的最底层api的一对包裹
curator-client:提供部分客户端的操作,例如重试策略等
curator-recipes:打包了一部分高等个性,如:Cache事件监听、大选、分布式锁、分布式计数器、分布式Barrier等
Maven信赖(使用curator的版本:2.12.0,对应Zookeeper的本子为:3.4.x,如若跨版本会有包容性难点,很有大概造成节点操作失利):

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

分布式屏障—巴里r

分布式巴里r是这么2个类:
它会阻塞全数节点上的等候进程,直到某三个被满意,
然后具备的节点继续拓展。

比如赛马竞赛中, 等赛马陆续来到起跑线前。
一声令下,全部的跑马都飞奔而出。

双栅栏—DistributedDoubleBarrier

双栅栏允许客户端在盘算的开始和了结时手拉手。当丰盛的长河进入到双栅栏时,进度始起总结,
当总计完毕时,离开栅栏。 双栅栏类是DistributedDoubleBarrier
构造函数为:

public DistributedDoubleBarrier(CuratorFramework client,
                                String barrierPath,
                                int memberQty)
Creates the barrier abstraction. memberQty is the number of members in the barrier. When enter() is called, it blocks until
all members have entered. When leave() is called, it blocks until all members have left.

Parameters:
client - the client
barrierPath - path to use
memberQty - the number of members in the barrier

memberQty是成员数量,当enter()形式被调用时,成员被堵塞,直到全数的成员都调用了enter()
leave()主意被调用时,它也不通调用线程,直到全部的分子都调用了leave()
就好像百米赛跑比赛, 发令枪响,
全部的健儿开始跑,等富有的运动员跑过极端线,比赛才结束。

DistributedDoubleBarrier会监察和控制连接景况,当连接断掉时enter()leave()方法会抛出尤其。

public class DistributedDoubleBarrierDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/barrier";

    public static void main(String[] args) throws Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client, PATH, QTY);
                final int index = i;
                Callable<Void> task = () -> {

                    Thread.sleep((long) (3 * Math.random()));
                    System.out.println("Client #" + index + " enters");
                    barrier.enter();
                    System.out.println("Client #" + index + " begins");
                    Thread.sleep((long) (3000 * Math.random()));
                    barrier.leave();
                    System.out.println("Client #" + index + " left");
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
            Thread.sleep(Integer.MAX_VALUE);
        }
    }

}

参考资料:
《从PAXOS到ZOOKEEPEXC九十几分布式一致性原理与实施》
《 跟着实例学习ZooKeeper的用法》博客种类

品种仓库:
https://github.com/zjcscut/curator-seed
(其实本文的MD是带导航目录[toc]的,比较便利导航到各种章节,只是简书不扶助,本文的MD原著放在项目标/resources/md目录下,有爱自取,文章用Typora编写,提议用Typora打开)

End on 2017-5-13 13:10.
Help yourselves!
自笔者是throwable,在苏黎世冲刺,白天上班,中午和双休不定时加班,早晨悠闲坚定不移写下博客。
指望小编的篇章能够给你带来收获,共勉。

带Id的分布式队列—DistributedIdQueue

DistributedIdQueue和地点的系列类似,可是足以为队列中的每2个成分设置一个ID
能够透过ID把队列中随机的因素移除。 它关系多少个类:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedQueue

透过上面方法创造:

builder.buildIdQueue()

放入成分时:

queue.put(aMessage, messageId);

社会,移除成分时:

int numberRemoved = queue.remove(messageId);

在那么些例子中,
有个别成分还尚未被消费者消费前就移除了,这样消费者不会收到删除的音信。

public class DistributedIdQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedIdQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildIdQueue();
            queue.start();

            for (int i = 0; i < 10; i++) {
                queue.put(" test-" + i, "Id" + i);
                Thread.sleep((long) (15 * Math.random()));
                queue.remove("Id" + i);
            }

            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("consume one message: " + message);
            }

        };
    }
}

1.接纳静态工程措施创制客户端

一个事例如下:

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.newClient(
                        connectionInfo,
                        5000,
                        3000,
                        retryPolicy);

newClient静态工厂方法包括几个首要参数:

参数名 说明
connectionString 服务器列表,格式host1:port1,host2:port2,…
retryPolicy 重试策略,内建有四种重试策略,也可以自行实现RetryPolicy接口
sessionTimeoutMs 会话超时时间,单位毫秒,默认60000ms
connectionTimeoutMs 连接创建超时时间,单位毫秒,默认60000ms

分布式锁

提醒:

1.推荐使用ConnectionStateListener监察和控制连接的处境,因为当连接LOST时你不再持有锁

2.分布式的锁全局同步,
那表示任何1个日子点不会有多个客户端都拥有一致的锁。

不可重入共享锁—Shared Lock

以此锁和地点的InterProcessMutex对照,正是少了Reentrant的法力,也就代表它不能够在同三个线程中重入。那个类是InterProcessSemaphoreMutex,使用形式和InterProcessMutex类似

public class InterProcessSemaphoreMutexDemo {

    private InterProcessSemaphoreMutex lock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public InterProcessSemaphoreMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        this.lock = new InterProcessSemaphoreMutex(client, lockPath);
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        if (!lock.acquire(time, unit))
        {
            throw new IllegalStateException(clientName + " 不能得到互斥锁");
        }
        System.out.println(clientName + " 已获取到互斥锁");
        if (!lock.acquire(time, unit))
        {
            throw new IllegalStateException(clientName + " 不能得到互斥锁");
        }
        System.out.println(clientName + " 再次获取到互斥锁");
        try {
            System.out.println(clientName + " get the lock");
            resource.use(); //access resource exclusively
        } finally {
            System.out.println(clientName + " releasing the lock");
            lock.release(); // always release the lock in a finally block
            lock.release(); // 获取锁几次 释放锁也要几次
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final InterProcessSemaphoreMutexDemo example = new InterProcessSemaphoreMutexDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
        Thread.sleep(Integer.MAX_VALUE);
    }
}

运转后发现,有且唯有1个client成功博得第一个锁(第①个acquire()方式重返true),然后它自己过不去在第三个acquire()主意,获取第二个锁超时;别的兼具的客户端都阻塞在首先个acquire()方式超时并且抛出特别。

这么也就表明了InterProcessSemaphoreMutex福如东海的锁是不行重入的。

分布式long计数器—DistributedAtomicLong

再看三个Long类型的计数器。 除了计数的限定比SharedCount大了之外,
它首先尝试利用乐观锁的方法设置计数器,
假设不成事(比如时期计数器已经被其余client更新了),
它采取InterProcessMutex主意来更新计数值。

能够从它的里边贯彻DistributedAtomicValue.trySet()中看出:

   AtomicValue<byte[]>   trySet(MakeValue makeValue) throws Exception
    {
        MutableAtomicValue<byte[]>  result = new MutableAtomicValue<byte[]>(null, null, false);

        tryOptimistic(result, makeValue);
        if ( !result.succeeded() && (mutex != null) )
        {
            tryWithMutex(result, makeValue);
        }

        return result;
    }

此计数器有一文山会海的操作:

  • get(): 获取当前值
  • increment(): 加一
  • decrement(): 减一
  • add(): 扩充一定的值
  • subtract(): 减去特定的值
  • trySet(): 尝试设置计数值
  • forceSet(): 强制设置计数值

必须反省重临结果的succeeded(), 它意味着此操作是不是成功。
要是操作成功, preValue()表示操作前的值,
postValue()代表操作后的值。

public class DistributedAtomicLongDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/counter";

    public static void main(String[] args) throws IOException, Exception {
        List<DistributedAtomicLong> examples = Lists.newArrayList();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final DistributedAtomicLong count = new DistributedAtomicLong(client, PATH, new RetryNTimes(10, 10));

                examples.add(count);
                Callable<Void> task = () -> {
                    try {
                        AtomicValue<Long> value = count.increment();
                        System.out.println("succeed: " + value.succeeded());
                        if (value.succeeded())
                            System.out.println("Increment: from " + value.preValue() + " to " + value.postValue());
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
            Thread.sleep(Integer.MAX_VALUE);
        }
    }
}

Curator的基本Api

可重入读写锁—Shared Reentrant Read Write Lock

类似JDK的ReentrantReadWriteLock。二个读写锁管理一对相关的锁。2个顶住校读书操作,此外多少个担负写操作。读操作在写锁没被应用时可同时由多少个进度使用,而写锁在应用时不允许读(阻塞)。

此锁是可重入的。三个独具写锁的线程可重入读锁,不过读锁却不可能跻身写锁。那也表示写锁能够降级成读锁,
比如请求写锁 —>请求读锁—>释放读锁
—->释放写锁
。从读锁升级成写锁是万分的。

可重入读写锁首要由五个类完结:InterProcessReadWriteLockInterProcessMutex。使用时首先创造一个InterProcessReadWriteLock实例,然后再依据你的供给获得读锁或然写锁,读写锁的类型是InterProcessMutex

public class ReentrantReadWriteLockDemo {

    private final InterProcessReadWriteLock lock;
    private final InterProcessMutex readLock;
    private final InterProcessMutex writeLock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public ReentrantReadWriteLockDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        lock = new InterProcessReadWriteLock(client, lockPath);
        readLock = lock.readLock();
        writeLock = lock.writeLock();
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        // 注意只能先得到写锁再得到读锁,不能反过来!!!
        if (!writeLock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " 不能得到写锁");
        }
        System.out.println(clientName + " 已得到写锁");
        if (!readLock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " 不能得到读锁");
        }
        System.out.println(clientName + " 已得到读锁");
        try {
            resource.use(); // 使用资源
            Thread.sleep(1000);
        } finally {
            System.out.println(clientName + " 释放读写锁");
            readLock.release();
            writeLock.release();
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY ;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final ReentrantReadWriteLockDemo example = new ReentrantReadWriteLockDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
    }
}

异步接口

上边提到的始建、删除、更新、读取等办法都以一道的,Curator提供异步接口,引入了BackgroundCallback接口用于拍卖异步接口调用之后服务端再次来到的结果音讯。BackgroundCallback接口中1个最首要的回调值为CuratorEvent,里面含有事件类型、响应吗和节点的详细新闻。

CuratorEventType

事件类型 对应CuratorFramework实例的方法
CREATE #create()
DELETE #delete()
EXISTS #checkExists()
GET_DATA #getData()
SET_DATA #setData()
CHILDREN #getChildren()
SYNC #sync(String,Object)
GET_ACL #getACL()
SET_ACL #setACL()
WATCHED #Watcher(Watcher)
CLOSING #close()

响应码(#getResultCode())

响应码 意义
0 OK,即调用成功
-4 ConnectionLoss,即客户端与服务端断开连接
-110 NodeExists,即节点已经存在
-112 SessionExpired,即会话过期

七个异步成立节点的例子如下:

Executor executor = Executors.newFixedThreadPool(2);
client.create()
      .creatingParentsIfNeeded()
      .withMode(CreateMode.EPHEMERAL)
      .inBackground((curatorFramework, curatorEvent) -> {      System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode()));
      },executor)
      .forPath("path");

注意:如果#inBackground()方法不钦定executor,那么会暗许使用Curator的伊夫ntThread去开始展览异步处理。

除去数据节点

删去多个节点

client.delete().forPath("path");

在意,此措施只可以去除叶子节点,不然会抛出尤其。

剔除三个节点,并且递归删除其颇具的子节点

client.delete().deletingChildrenIfNeeded().forPath("path");

删去3个节点,强制钦定版本进行删减

client.delete().withVersion(10086).forPath("path");

剔除2个节点,强制保险删除

client.delete().guaranteed().forPath("path");

guaranteed()接口是一个保持方法,只要客户端会话有效,那么Curator会在后台持续开始展览删减操作,直到删除节点成功。

注意:上边的多个流式接口是足以自由组合的,例如:

client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(10086).forPath("path");

[TOC]

读取数据节点数据

读取三个节点的多寡内容

client.getData().forPath("path");

在意,此格局返的重临值是byte[ ];

读取一个节点的数码内容,同时得到到该节点的stat

Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("path");