图表源于网络

[TOC]

君干什么连不喜?

Zookeeper客户端Curator使用详解

肯定在条件比较原先还好,幸福感也十分少,总以为好太苦,幸福在别处。

简介

Curator是Netflix公司开源之等同法zookeeper客户端框架,解决了很多Zookeeper客户端非常底层的底细开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等等。Patrixck
Hunt(Zookeeper)以平等句子“Guava is to Java that Curator to
Zookeeper”给Curator予高度评价。
引子和趣闻:
Zookeeper名字的原因是比好玩的,下面的有的摘抄自《从PAXOS到ZOOKEEPER分布式一致性原理与实践》一书:
Zookeeper最早起源于雅虎的研究院的一个研究小组。在就,研究人口发现,在雅虎内部多重型的体系要靠一个看似之网开展分布式协调,但是这些系统往往存在分布式单点问题。所以雅虎的开发人员就计开一个通用的管单点问题的分布式协调框架。在立项初期,考虑到无数色还是因此动物的讳来定名的(例如知名的Pig项目),雅虎的工程师希望让这路也得一个动物的讳。时任研究院的首席科学家Raghu
Ramakrishnan开玩笑说:再这么下去,我们这就改为动物园了。此话一闹,大家纷纷表示就给动物园管理员吧——因为各个以动物命名的分布式组件放在同,雅虎的所有分布式系统看上去就比如一个大型的动物园了,而Zookeeper正好用来进行分布式环境之和谐——于是,Zookeeper的名字由此诞生了。

Curator无疑是Zookeeper客户端中的瑞士军刀,它译作”馆长”或者”管理者”,不知底是休是开发小组有意要为之,笔者猜测有或这样命名的原因是证明Curator就是Zookeeper的馆长(脑洞有硌很:Curator就是动物园的园长)。
Curator包含了几乎独保险:
curator-framework:对zookeeper的平底api的片装进
curator-client:供部分客户端的操作,例如重试策略等
curator-recipes:装进了有的尖端特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等
Maven依赖(使用curator的版:2.12.0,对许Zookeeper的版本为:3.4.x,设若超过版以会生兼容性问题,很有或造成节点操作失败):

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

丁当公私,无论你身处学校还是谁铺,老板同事同学,总有那一两只非协调音;人当门,更年期长辈的督促,屡教不移的情人,不吃人便的儿女,总遇到吃丁无开玩笑的从事。

Curator的基本Api

抱怨责怪怨天尤人,抑或幻想出种植神奇之魔力,让你突然转换得得心应手,倒不如抛却不切实际的胡思乱想,坦然接受生活的确存在在无完善。

缔造会话

无异于是早晨上班,有人开顶老之打算早出半小时,留起路上耽搁堵车之岁月;有人掐着点还不小心晚出来几分钟,渴望路上发生不可思议的顺畅。

1.采取静态工程措施创建客户端

一个事例如下:

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.newClient(
                        connectionInfo,
                        5000,
                        3000,
                        retryPolicy);

newClient静态工厂方法包含四只主要参数:

参数名 说明
connectionString 服务器列表,格式host1:port1,host2:port2,…
retryPolicy 重试策略,内建有四种重试策略,也可以自行实现RetryPolicy接口
sessionTimeoutMs 会话超时时间,单位毫秒,默认60000ms
connectionTimeoutMs 连接创建超时时间,单位毫秒,默认60000ms

结果,时间在各国一样立的遍布,不是按自己的计划进行,从翘首期盼到死心塌地理解自己一贯迟到了,是着急谩骂到干净绝望的长河。而留出堵车的岁月,反倒一路绿灯通行。

2.下Fluent风格的Api创建会话

核心参数变为流式设置,一个列子如下:

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
        CuratorFrameworkFactory.builder()
                .connectString(connectionInfo)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .build();

“金无足赤,人无完人”,我们本身就是匪周到的,那怎么对自己好吃懒做不学习不前进,却严苛要求他人到也?你继承当追求完美的途中一路狂奔,不回头,还是受生活当然就是是风雨阳光交叠的非周全,并且脚踏实地学习怎么样去当。

3.创造包含隔离命名空间的对话

为实现不同之Zookeeper业务之间的割裂,需要吗每个工作分配一个独的命名空间(NameSpace),即指定一个Zookeeper的根本路径(官方术语:为Zookeeper添加“Chroot”特性)。例如(下面的例子)当客户端指定了单独命名空间吧“/base”,那么该客户端对Zookeeper上之数码节点的操作都是冲该目录进行的。通过设置Chroot可以用客户端应用及Zookeeper服务端的同等课子树相对应,在差不多个利用共用一个Zookeeper集群的光景下,这对贯彻不同采取内的并行隔离十分有含义。

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
        CuratorFrameworkFactory.builder()
                .connectString(connectionInfo)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .namespace("base")
                .build();

作者海蓝博士有了多年医学从业经历,她做的当下仍开《不圆满,才得意》得到了杨澜俞敏洪苏岑等五十五号学者心理专家作家一致推举。

启动客户端

当创建会说话成功,得到client的实例然后可以一直调用其start( )方法:

client.start();

回溯从眼前,每个人犹来不便的随时,也不可避免在已的流年里养遗憾。痛苦和泪水,希望以及震撼,现在跟前程得我们之所以相同粒真心去追随。如何让祥和变得不同让过去,让空虚吞噬啃咬了之胸,变成宁静缤纷的园,让时光见证自己之成才。

数节点操作

每当这仍《不全面,才得意》书被,她用报告我们怎么和负面情绪友好相处,如何用是的法疏导负面情绪,生活着不可避免地撞不乐意之事,自己该如何进行情绪梳理,书被还提供了简单好学易操作的七单步骤。

创立数量节点

Zookeeper的节点创建模式:

  • PERSISTENT:持久化
  • PERSISTENT_SEQUENTIAL:持久化并且带动序列号
  • EPHEMERAL:临时
  • EPHEMERAL_SEQUENTIAL:临时又带动序列号

**创造一个节点,初始内容为空 **

client.create().forPath("path");

顾:如果无装节点性,节点创建模式默认为持久化节点,内容默认为空

开创一个节点,附带初始化内容

client.create().forPath("path","init".getBytes());

创建一个节点,指定创建模式(临时节点),内容吗空

client.create().withMode(CreateMode.EPHEMERAL).forPath("path");

创造一个节点,指定创建模式(临时节点),附带初始化内容

client.create().withMode(CreateMode.EPHEMERAL).forPath("path","init".getBytes());

始建一个节点,指定创建模式(临时节点),附带初始化内容,并且自动递归创建父节点

client.create()
      .creatingParentContainersIfNeeded()
      .withMode(CreateMode.EPHEMERAL)
      .forPath("path","init".getBytes());

斯creatingParentContainersIfNeeded()接口非常有因此,因为一般情形开发人员在开立一个子节点必须认清它们的父节点是否在,如果未在直接创造会抛出NoNodeException,使用creatingParentContainersIfNeeded()之后Curator能够自行递归创建所有所需的父节点。

这些,都是本着那些偶尔情绪不尽如人意的食指,已给诊断患有上担忧抑郁等精神疾病的人数,也无须太难过,抑郁症就同感冒一样
,并无是洪水猛兽,也未欠对它们抱有偏见。

去除数据节点

除去一个节点

client.delete().forPath("path");

小心,此方法就能够去叶子节点,否则会丢掉来怪。

删除一个节点,并且递归删除其有着的子节点

client.delete().deletingChildrenIfNeeded().forPath("path");

除去一个节点,强制指定版本进行去

client.delete().withVersion(10086).forPath("path");

抹一个节点,强制保险删除

client.delete().guaranteed().forPath("path");

guaranteed()接口是一个保持办法,只要客户端会话有效,那么Curator会在后台持续进行删除操作,直到删除节点成功。

注意:点的基本上单流式接口是可以自由组合的,例如:

client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(10086).forPath("path");

每当今差不多正的社会,竞争激烈,不仅人遇到任何的问题,我当街上还时不时遇到不服管教有个性的小儿,不好听时就失声尖叫。面对这么的宝宝,家长顿感颜面尽失大声斥责,孩子呢会偷瞄一眼四周人群,随即用大声啼哭来来抵抗。

读取数据节点数据

读取一个节点的数目内容

client.getData().forPath("path");

瞩目,此方式返的返回值是byte[ ];

读取一个节点的多少内容,同时获取到该节点的stat

Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("path");

实则,小小的人儿也发自尊心。作者以书写中独立设了一个章节《如何避免孩子的焦虑》,把关心焦虑症的理念引往了儿女,别小看小小的他们,对于家长是否幸福,心境发生了转,幼小之心灵都能感知到。

履新数据节点数据

更新一个节点的数码内容

client.setData().forPath("path","data".getBytes());

留神:该接口会回一个Stat实例

更新一个节点的数据内容,强制指定版本进行创新

client.setData().withVersion(10086).forPath("path","data".getBytes());

成长,不是圈清矣成千上万业,而是看轻了很多业。

自我批评节点是否留存

client.checkExists().forPath("path");

留神:该办法返回一个Stat实例,用于检查ZNode是否有的操作.
可以调用额外的点子(监控或后台处理)并在终极调用forPath(
)指定要操作的ZNode

过去之无健全,后悔、遗憾也还早就改成过去,重要之是面本及明天莫完美时,如何找到同样栽积极的能量。

取得有节点的所有子节点路径

client.getChildren().forPath("path");

注意:该办法的返回值为List<String>,获得ZNode的子节点Path列表。
可以调用额外的点子(监控、后台处理还是取得状态watch, background or get
stat) 并以终极调用forPath()指定要操作的父ZNode

即是咱生存的重点,也是本书的助益,静观自我关怀是中地化解自我痛苦之计。

事务

CuratorFramework的实例包含inTransaction(
)接口方法,调用此方式被一个ZooKeeper事务. 可以复合create, setData,
check, and/or delete
等操作然后调用commit()作为一个原子操作提交。一个事例如下:

client.inTransaction().check().forPath("path")
      .and()
      .create().withMode(CreateMode.EPHEMERAL).forPath("path","data".getBytes())
      .and()
      .setData().withVersion(10086).forPath("path","data2".getBytes())
      .and()
      .commit();

诚左右我们心情的非是境遇中的总人口同转业,而是我们本着种种境遇的体味和解读。

异步接口

面提到的始建、删除、更新、读取等方式还是联名的,Curator提供异步接口,引入了BackgroundCallback接口用于拍卖异步接口调用之后服务端返回的结果信息。BackgroundCallback接口中一个根本之回调值吗CuratorEvent,里面含有事件类、响应也和节点的详细信息。

CuratorEventType

事件类型 对应CuratorFramework实例的方法
CREATE #create()
DELETE #delete()
EXISTS #checkExists()
GET_DATA #getData()
SET_DATA #setData()
CHILDREN #getChildren()
SYNC #sync(String,Object)
GET_ACL #getACL()
SET_ACL #setACL()
WATCHED #Watcher(Watcher)
CLOSING #close()

响应码(#getResultCode())

响应码 意义
0 OK,即调用成功
-4 ConnectionLoss,即客户端与服务端断开连接
-110 NodeExists,即节点已经存在
-112 SessionExpired,即会话过期

一个异步创建节点的事例如下:

Executor executor = Executors.newFixedThreadPool(2);
client.create()
      .creatingParentsIfNeeded()
      .withMode(CreateMode.EPHEMERAL)
      .inBackground((curatorFramework, curatorEvent) -> {      System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode()));
      },executor)
      .forPath("path");

注意:如果#inBackground()方法不指定executor,那么会默认使用Curator的EventThread去开展异步处理。

心态作为身体的“隐形杀手”,负面情绪郁积太多易则引起家庭矛盾,重则抓住焦虑抑郁,而聊焦虑和抑郁相伴相随的,长期担忧还跟糖尿病呼吸系统疾病皮肤病有涉及。

Curator食谱(高级特性)

提示:首先你必补偿加curator-recipes依赖,下文仅仅针对recipes一些风味的动进行分解与举例,不打算进行源码级别的探索

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

主要提示:强烈推荐使用ConnectionStateListener监控连接的状态,当连接状态为LOST,curator-recipes下之所有Api将会晤失灵或者逾期,尽管后面有的例子都尚未应用及ConnectionStateListener。

拟在培养积极情绪,面对生遭之误解及旁人的无承认,爱惜着经历这些痛苦之团结,在每个风雨来袭的天天,找到战胜困难的能力。

缓存

Zookeeper原生支持通过注册Watcher来拓展事件监听,但是开发者需要反复注册(Watcher只能单次注册单次使用)。Cache是Curator中对事件监听的包,可以当作是指向事件监听的本地缓存视图,能够自动为开发者处理反复注册监听。Curator提供了三栽Watcher(Cache)来监听结点的转移。

咱经历过烟缭绕嘈杂吵闹的郁闷,才会深切体会到平静生活的得意,它要淡而无味的白开水,却不觉乏味。在一个个平常生活里,最会滋养你的身心,变得不可或缺。

Path Cache

Path Cache用来监督一个ZNode的子节点. 当一个子节点增加, 更新,删除时,
Path Cache会改变它们的状态, 会包含最新的子节点,
子节点的数与状态,而状态的更变将通过PathChildrenCacheListener通知。

实际应用时见面干到四单近乎:

  • PathChildrenCache
  • PathChildrenCacheEvent
  • PathChildrenCacheListener
  • ChildData

透过下面的构造函数创建Path Cache:

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)

怀念以cache,必须调用它的start方式,使用完后调用close方法。
可以安装StartMode来实现启动之模式,

StartMode有下面几乎栽:

  1. NORMAL:正常初始化。
  2. BUILD_INITIAL_CACHE:在调用start()事先见面调用rebuild()
  3. POST_INITIALIZED_EVENT:
    当Cache初始化数据后发送一个PathChildrenCacheEvent.Type#INITIALIZED事件

public void addListener(PathChildrenCacheListener listener)好多listener监听缓存的变更。

getCurrentData()计返回一个List<ChildData>目标,可以遍历所有的子节点。

安/更新、移除其实是采用client (CuratorFramework)来操作,
不经PathChildrenCache操作:

public class PathCacheDemo {

    private static final String PATH = "/example/pathCache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        PathChildrenCache cache = new PathChildrenCache(client, PATH, true);
        cache.start();
        PathChildrenCacheListener cacheListener = (client1, event) -> {
            System.out.println("事件类型:" + event.getType());
            if (null != event.getData()) {
                System.out.println("节点数据:" + event.getData().getPath() + " = " + new String(event.getData().getData()));
            }
        };
        cache.getListenable().addListener(cacheListener);
        client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test01", "01".getBytes());
        Thread.sleep(10);
        client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test02", "02".getBytes());
        Thread.sleep(10);
        client.setData().forPath("/example/pathCache/test01", "01_V2".getBytes());
        Thread.sleep(10);
        for (ChildData data : cache.getCurrentData()) {
            System.out.println("getCurrentData:" + data.getPath() + " = " + new String(data.getData()));
        }
        client.delete().forPath("/example/pathCache/test01");
        Thread.sleep(10);
        client.delete().forPath("/example/pathCache/test02");
        Thread.sleep(1000 * 5);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:假设new PathChildrenCache(client, PATH,
true)中之参数cacheData值设置也false,则示例中的event.getData().getData()、data.getData()将赶回null,cache将非会见缓存节点数据。

注意:以身作则中的Thread.sleep(10)可以注释掉,但是注释后事件监听的触发次数会不净,这或者和PathCache的兑现原理有关,不能够太过累的触及事件!

将过去更的非完美的从,需要升级以及改进之可行性多靠为和睦,而未是将拥有的吹拂以及事还同道脑推为他人。也惟有这么,过去的苦楚才见面演变成为将来底财物。

Node Cache

Node Cache与Path Cache类似,Node
Cache只是监听某一个一定的节点。它涉及到下的老三单类似:

  • NodeCache – Node Cache实现类
  • NodeCacheListener – 节点监听器
  • ChildData – 节点数据

注意:动用cache,依然要调用它的start()方法,使用完后调用close()方法。

getCurrentData()将得到节点当前的状态,通过其的状态好博时之价。

public class NodeCacheDemo {

    private static final String PATH = "/example/cache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        client.create().creatingParentsIfNeeded().forPath(PATH);
        final NodeCache cache = new NodeCache(client, PATH);
        NodeCacheListener listener = () -> {
            ChildData data = cache.getCurrentData();
            if (null != data) {
                System.out.println("节点数据:" + new String(cache.getCurrentData().getData()));
            } else {
                System.out.println("节点被删除!");
            }
        };
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:示范中的Thread.sleep(10)可以注释,但是注释后事件监听的触发次数会不都,这或者和NodeCache的实现原理有关,不克最好过多次的触及事件!

注意:NodeCache只能监听一个节点的状态变化。

若果无亮好哪些地方需要改善,丝毫未尝试改变,那就算从来未在去年霉运连连,转过年就是比如变了一个丁,突然内全时来运作。

Tree Cache

Tree
Cache可以监督整个树上的拥有节点,类似于PathCache和NodeCache的构成,主要涉嫌到下四个像样:

  • TreeCache – Tree Cache实现类
  • TreeCacheListener – 监听器类
  • TreeCacheEvent – 触发的风波类
  • ChildData – 节点数据

public class TreeCacheDemo {

    private static final String PATH = "/example/cache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        client.create().creatingParentsIfNeeded().forPath(PATH);
        TreeCache cache = new TreeCache(client, PATH);
        TreeCacheListener listener = (client1, event) ->
                System.out.println("事件类型:" + event.getType() +
                        " | 路径:" + (null != event.getData() ? event.getData().getPath() : null));
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:当此示例中尚无采取Thread.sleep(10),但是事件触发次数也是例行的。

注意:TreeCache在初始化(调用start()艺术)的时刻会回调TreeCacheListener实例一个事TreeCacheEvent,而回调的TreeCacheEvent对象的Type为INITIALIZED,ChildData为null,此时event.getData().getPath()异常有或导致空指针异常,这里当主动处理并避免这种状况。

具有的败和磨,都是人生之历炼,它或早或晚总会出现。爱是平等种能力,幸福吧是平栽力量。

Leader选举

每当分布式计算中, leader elections大凡老大重点之一个力量,
这个选举过程是这样子的: 指派一个经过作为组织者,将任务分发给每节点。
在职责开始前,
哪个节点都非知道哪个是leader(领导者)或者coordinator(协调者).
当选举算法开始实行后, 每个节点最终见面取得一个唯一的节点作为任务leader.
除此之外,
选举还不时会面生出在leader意外宕机的景下,新的leader要被选举出来。

于zookeeper集群中,leader负责写操作,然后通过Zab商落实follower的共同,leader或者follower都得以处理读操作。

Curator
有两种leader选举的recipe,分别是LeaderSelectorLeaderLatch

前端是享有存活的客户端不暂停的更替开Leader,大同社会。后者是使选举出Leader,除非有客户端挂掉重触发选举,否则不会见交出领导权。某党?


LeaderLatch

LeaderLatch有些许单构造函数:

public LeaderLatch(CuratorFramework client, String latchPath)
public LeaderLatch(CuratorFramework client, String latchPath,  String id)

LeaderLatch的启动:

leaderLatch.start( );

假设启动,LeaderLatch会和另使用相同latch
path的另外LeaderLatch交涉,然后里面一个最终见面被推举为leader,可以经hasLeadership方法查看LeaderLatch实例是否leader:

leaderLatch.hasLeadership( ); //返回true说明时实例是leader

好像JDK的CountDownLatch,
LeaderLatch在呼吁成为leadership会block(阻塞),一旦不利用LeaderLatch了,必须调用close道。
如果它是leader,会自由leadership, 其它的参与者将会选举一个leader。

public void await() throws InterruptedException,EOFException
/*Causes the current thread to wait until this instance acquires leadership
unless the thread is interrupted or closed.*/
public boolean await(long timeout,TimeUnit unit)throws InterruptedException

特别处理:
LeaderLatch实例可以增加ConnectionStateListener来监听网络连接问题。 当
SUSPENDED 或 LOST 时,
leader不再认为好或者leader。当LOST后连续重连后RECONNECTED,LeaderLatch会删除先前底ZNode然后重新创设一个。LeaderLatch用户须考虑导致leadership丢失的连天问题。
强烈推荐你采取ConnectionStateListener。

一个LeaderLatch的用例子:

public class LeaderLatchDemo extends BaseConnectionInfo {
    protected static String PATH = "/francis/leader";
    private static final int CLIENT_QTY = 10;


    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderLatch> examples = Lists.newArrayList();
        TestingServer server=new TestingServer();
        try {
            for (int i = 0; i < CLIENT_QTY; i++) {
                CuratorFramework client
                        = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3));
                clients.add(client);
                LeaderLatch latch = new LeaderLatch(client, PATH, "Client #" + i);
                latch.addListener(new LeaderLatchListener() {

                    @Override
                    public void isLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am Leader");
                    }

                    @Override
                    public void notLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am not Leader");
                    }
                });
                examples.add(latch);
                client.start();
                latch.start();
            }
            Thread.sleep(10000);
            LeaderLatch currentLeader = null;
            for (LeaderLatch latch : examples) {
                if (latch.hasLeadership()) {
                    currentLeader = latch;
                }
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
            currentLeader.close();

            Thread.sleep(5000);

            for (LeaderLatch latch : examples) {
                if (latch.hasLeadership()) {
                    currentLeader = latch;
                }
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
        } finally {
            for (LeaderLatch latch : examples) {
                if (null != latch.getState())
                CloseableUtils.closeQuietly(latch);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
        }
    }
}

足添加test module的依赖方便进行测试,不需启动真实的zookeeper服务端:

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-test</artifactId>
            <version>2.12.0</version>
        </dependency>

先是我们创建了10单LeaderLatch,启动后它吃之一个见面于推选为leader。
因为选举会花费一些时日,start后并无能够立就是赢得leader。
通过hasLeadership翻开自己是否是leader, 如果是的言辞返回true。
可以通过.getLeader().getId()得得时之leader的ID。
无非能够通过close刑满释放当前的领导权。
await举凡一个封堵方法, 尝试获取leader地位,但是未必能够上位。

任由防护极限挑战训练营写作训练

LeaderSelector

LeaderSelector用的下根本涉及下面几乎个像样:

  • LeaderSelector
  • LeaderSelectorListener
  • LeaderSelectorListenerAdapter
  • CancelLeadershipException

核心类是LeaderSelector,它的构造函数如下:

public LeaderSelector(CuratorFramework client, String mutexPath,LeaderSelectorListener listener)
public LeaderSelector(CuratorFramework client, String mutexPath, ThreadFactory threadFactory, Executor executor, LeaderSelectorListener listener)

类似LeaderLatch,LeaderSelector必须start: leaderSelector.start();
一旦启动,当实例取得领导权时您的listener的takeLeadership()方被调用。而takeLeadership()方法就生领导权被假释时才回去。
当你不再行使LeaderSelector实例时,应该调用它的close方法。

雅处理
LeaderSelectorListener类继承ConnectionStateListener。LeaderSelector必须小心连接状态的变更。如果实例成为leader,
它应有响应SUSPENDED 或 LOST。 当 SUSPENDED 状态出现时,
实例必使在还连接成功之前它恐怕不再是leader了。 如果LOST状态出现,
实例不再是leader, takeLeadership方法返回。

重要: 推荐处理方式是当收到SUSPENDED 或
LOST时抛来CancelLeadershipException异常.。这会促成LeaderSelector实例中断并销执行takeLeadership方法的异常.。这生主要,
你必须考虑扩大LeaderSelectorListenerAdapter.
LeaderSelectorListenerAdapter提供了推荐的拍卖逻辑。

下面的一个例证摘抄自官方:

public class LeaderSelectorAdapter extends LeaderSelectorListenerAdapter implements Closeable {
    private final String name;
    private final LeaderSelector leaderSelector;
    private final AtomicInteger leaderCount = new AtomicInteger();

    public LeaderSelectorAdapter(CuratorFramework client, String path, String name) {
        this.name = name;
        leaderSelector = new LeaderSelector(client, path, this);
        leaderSelector.autoRequeue();
    }

    public void start() throws IOException {
        leaderSelector.start();
    }

    @Override
    public void close() throws IOException {
        leaderSelector.close();
    }

    @Override
    public void takeLeadership(CuratorFramework client) throws Exception {
        final int waitSeconds = (int) (5 * Math.random()) + 1;
        System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
        System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");
        try {
            Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
        } catch (InterruptedException e) {
            System.err.println(name + " was interrupted.");
            Thread.currentThread().interrupt();
        } finally {
            System.out.println(name + " relinquishing leadership.\n");
        }
    }
}

卿可以当takeLeadership进行任务的分红等等,并且不要回,如果您想如果如之实例一直是leader的话语可加一个死循环。调用
leaderSelector.autoRequeue();管在这实例释放领导权之后还可能得领导权。
在此我们以AtomicInteger来记录此client获得领导权的次数, 它是”fair”,
每个client有平等的会取得领导权。

public class LeaderSelectorDemo {

    protected static String PATH = "/francis/leader";
    private static final int CLIENT_QTY = 10;


    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderSelectorAdapter> examples = Lists.newArrayList();
        TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < CLIENT_QTY; i++) {
                CuratorFramework client
                        = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3));
                clients.add(client);
                LeaderSelectorAdapter selectorAdapter = new LeaderSelectorAdapter(client, PATH, "Client #" + i);
                examples.add(selectorAdapter);
                client.start();
                selectorAdapter.start();
            }
            System.out.println("Press enter/return to quit\n");
            new BufferedReader(new InputStreamReader(System.in)).readLine();
        } finally {
            System.out.println("Shutting down...");
            for (LeaderSelectorAdapter exampleClient : examples) {
                CloseableUtils.closeQuietly(exampleClient);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
            CloseableUtils.closeQuietly(server);
        }
    }
}

对照能够,LeaderLatch必须调用close()措施才会放出领导权,而对此LeaderSelector,通过LeaderSelectorListener得本着领导权进行支配,
在适宜的上释放领导权,这样每个节点都出或得领导权。从而,LeaderSelector具有双重好的油滑和可控性,建议有LeaderElection应用场景下优先利用LeaderSelector。

日还第七十天

分布式锁

提醒:

1.引进下ConnectionStateListener监控连接的状态,因为当连接LOST时若不再具有锁

2.分布式的锁全局同步,
这代表任何一个时日接触未会见生出零星个客户端都拥有相同的缉。

可又称并享锁—Shared Reentrant Lock

Shared意味着锁是大局可见的, 客户端都好要锁。
Reentrant和JDK的ReentrantLock类似,即可重入,
意味着同一个客户端在拥有锁的以,可以频繁获取,不会见叫堵塞。
它是出于接近InterProcessMutex来兑现。 它的构造函数为:

public InterProcessMutex(CuratorFramework client, String path)

通过acquire()获取锁,并提供过机制:

public void acquire()
Acquire the mutex - blocking until it's available. Note: the same thread can call acquire
re-entrantly. Each call to acquire must be balanced by a call to release()

public boolean acquire(long time,TimeUnit unit)
Acquire the mutex - blocks until it's available or the given time expires. Note: the same thread can call acquire re-entrantly. Each call to acquire that returns true must be balanced by a call to release()

Parameters:
time - time to wait
unit - time unit
Returns:
true if the mutex was acquired, false if not

通过release()计释放锁。 InterProcessMutex 实例可以选用。

Revoking ZooKeeper recipes wiki定义了可协商的取消机制。
为了撤销mutex, 调用底的不二法门:

public void makeRevocable(RevocationListener<T> listener)
将锁设为可撤销的. 当别的进程或线程想让你释放锁时Listener会被调用。
Parameters:
listener - the listener

如若您要撤销当前的缉,
调用attemptRevoke()方,注意锁释放时RevocationListener拿会回调。

public static void attemptRevoke(CuratorFramework client,String path) throws Exception
Utility to mark a lock for revocation. Assuming that the lock has been registered
with a RevocationListener, it will get called and the lock should be released. Note,
however, that revocation is cooperative.
Parameters:
client - the client
path - the path of the lock - usually from something like InterProcessMutex.getParticipantNodes()

亚赖提醒:错误处理
还是强烈推荐你下ConnectionStateListener处理连接状态的反。
当连接LOST时你不再具备锁。

首先被我们创建一个拟的共享资源,
这个资源要只能单线程的顾,否则会发出起问题。

public class FakeLimitedResource {
    private final AtomicBoolean inUse = new AtomicBoolean(false);

    public void use() throws InterruptedException {
        // 真实环境中我们会在这里访问/维护一个共享的资源
        //这个例子在使用锁的情况下不会非法并发异常IllegalStateException
        //但是在无锁的情况由于sleep了一段时间,很容易抛出异常
        if (!inUse.compareAndSet(false, true)) {
            throw new IllegalStateException("Needs to be used by one client at a time");
        }
        try {
            Thread.sleep((long) (3 * Math.random()));
        } finally {
            inUse.set(false);
        }
    }
}

下一场创建一个InterProcessMutexDemo好像, 它承受请求锁,
使用资源,释放锁这样一个整的造访过程。

public class InterProcessMutexDemo {

    private InterProcessMutex lock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public InterProcessMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        this.lock = new InterProcessMutex(client, lockPath);
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        if (!lock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " could not acquire the lock");
        }
        try {
            System.out.println(clientName + " get the lock");
            resource.use(); //access resource exclusively
        } finally {
            System.out.println(clientName + " releasing the lock");
            lock.release(); // always release the lock in a finally block
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final InterProcessMutexDemo example = new InterProcessMutexDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
    }
}

代码也死简单,生成10个client, 每个client重复执行10破
请求锁–访问资源–释放锁的过程。每个client都于独立的线程中。
结果可以望,锁是随机的为每个实例排他性的用。

既是是不过选用的,你可当一个线程中多次调用acquire(),在线程拥有锁时它连接回到true。

乃切莫应有于差不多只线程中之所以和一个InterProcessMutex
你可在每个线程中还生成一个初的InterProcessMutex实例,它们的path都同一,这样她可齐享同一个锁。

不可重入共享锁—Shared Lock

斯锁与地方的InterProcessMutex对照,就是少了Reentrant的效用,也不怕表示它不克于同一个线程中重入。这个仿佛是InterProcessSemaphoreMutex,使用方法及InterProcessMutex类似

public class InterProcessSemaphoreMutexDemo {

    private InterProcessSemaphoreMutex lock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public InterProcessSemaphoreMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        this.lock = new InterProcessSemaphoreMutex(client, lockPath);
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        if (!lock.acquire(time, unit))
        {
            throw new IllegalStateException(clientName + " 不能得到互斥锁");
        }
        System.out.println(clientName + " 已获取到互斥锁");
        if (!lock.acquire(time, unit))
        {
            throw new IllegalStateException(clientName + " 不能得到互斥锁");
        }
        System.out.println(clientName + " 再次获取到互斥锁");
        try {
            System.out.println(clientName + " get the lock");
            resource.use(); //access resource exclusively
        } finally {
            System.out.println(clientName + " releasing the lock");
            lock.release(); // always release the lock in a finally block
            lock.release(); // 获取锁几次 释放锁也要几次
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final InterProcessSemaphoreMutexDemo example = new InterProcessSemaphoreMutexDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
        Thread.sleep(Integer.MAX_VALUE);
    }
}

运行后意识,有还只有发一个client成功博得第一只锁(第一独acquire()法返回true),然后其自己过不去在第二单acquire()计,获取第二只锁超时;其他有的客户端都阻塞在首先个acquire()艺术超时并且抛来特别。

如此也尽管说明了InterProcessSemaphoreMutex心想事成的吊是不可重入的。

可是还可读写锁—Shared Reentrant Read Write Lock

类似JDK的ReentrantReadWriteLock。一个朗诵写锁管理均等针对性相关的缉。一个担负读操作,另外一个担当写操作。读操作以写锁没让采用时只是又由于多单过程使,而写锁在利用时无容许读(阻塞)。

此锁是不过重入的。一个享写锁的线程可重入读锁,但是读锁却休能够上写锁。这为意味形容锁得降成读锁,
比如请求写锁 —>请求读锁—>释放读锁
—->释放写锁
。从读锁升级化写锁是非常的。

可是又称读写锁主要出于片独八九不离十实现:InterProcessReadWriteLockInterProcessMutex。使用时首先创建一个InterProcessReadWriteLock实例,然后再度依据你的需得到读锁或者写锁,读写锁之路是InterProcessMutex

public class ReentrantReadWriteLockDemo {

    private final InterProcessReadWriteLock lock;
    private final InterProcessMutex readLock;
    private final InterProcessMutex writeLock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public ReentrantReadWriteLockDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        lock = new InterProcessReadWriteLock(client, lockPath);
        readLock = lock.readLock();
        writeLock = lock.writeLock();
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        // 注意只能先得到写锁再得到读锁,不能反过来!!!
        if (!writeLock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " 不能得到写锁");
        }
        System.out.println(clientName + " 已得到写锁");
        if (!readLock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " 不能得到读锁");
        }
        System.out.println(clientName + " 已得到读锁");
        try {
            resource.use(); // 使用资源
            Thread.sleep(1000);
        } finally {
            System.out.println(clientName + " 释放读写锁");
            readLock.release();
            writeLock.release();
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY ;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final ReentrantReadWriteLockDemo example = new ReentrantReadWriteLockDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
    }
}

信号量—Shared Semaphore

一个计数的信号量类似JDK的Semaphore。
JDK中Semaphore维护的均等组认可(permits),而Curator中称之为租约(Lease)。
有零星栽办法可决定semaphore的最为老租约数。第一种方式是用户为定path并且指定最酷LeaseSize。第二栽方法用户被定path并且采用SharedCountReader类。倘不使SharedCountReader,
必须确保拥有实例在差不多进程中使同一的(最深)租约数量,否则有或出现A进程被的实例持有最特别租约数量也10,但是于B进程面临颇具的不过要命租约数量也20,此时租约的含义就失效了。

这次调用acquire()会见回去一个租约对象。
客户端必须在finally中close这些租约对象,否则这些租约会丢失不见。 但是,
但是,如果客户端session由于某种原因比如crash丢掉,
那么这些客户端有的租约会自动close,
这样任何客户端可继承采用这些租约。 租约还可由此下的计赶回还:

public void returnAll(Collection<Lease> leases)
public void returnLease(Lease lease)

小心你可以一次性请求多独租约,如果Semaphore当前之租约不够,则请线程会为卡住。
同时还提供了晚点的重载方法。

public Lease acquire()
public Collection<Lease> acquire(int qty)
public Lease acquire(long time, TimeUnit unit)
public Collection<Lease> acquire(int qty, long time, TimeUnit unit)

Shared Semaphore使用的根本类包括下面几乎个:

  • InterProcessSemaphoreV2
  • Lease
  • SharedCountReader

public class InterProcessSemaphoreDemo {

    private static final int MAX_LEASE = 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        FakeLimitedResource resource = new FakeLimitedResource();
        try (TestingServer server = new TestingServer()) {

            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, PATH, MAX_LEASE);
            Collection<Lease> leases = semaphore.acquire(5);
            System.out.println("get " + leases.size() + " leases");
            Lease lease = semaphore.acquire();
            System.out.println("get another lease");

            resource.use();

            Collection<Lease> leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS);
            System.out.println("Should timeout and acquire return " + leases2);

            System.out.println("return one lease");
            semaphore.returnLease(lease);
            System.out.println("return another 5 leases");
            semaphore.returnAll(leases);
        }
    }
}

率先我们先抱了5独租约, 最后咱们拿它们还为了semaphore。
接着要了一个租约,因为semaphore还时有发生5只租约,所以要可以满足,返回一个租约,还留4个租约。
然后再要一个租约,因为租约不够,卡住到过,还是没能够满足,返回结果吗null(租约不足会阻塞到过,然后回来null,不会见再接再厉弃来大;如果无设置过时间,会同样阻塞)。

地方说讲的吊都是正义锁(fair)。 总ZooKeeper的角度看,
每个客户端都按照请求的一一获得锁,不存在非公平的抢占的情。

大抵同享锁对象 —Multi Shared Lock

Multi Shared Lock是一个吊的容器。 当调用acquire()
所有的吊都见面让acquire(),如果要失败,所有的缉都见面于release。
同样调用release时怀有的吊都吃release(未果给忽略)。
基本上,它就是组锁的表示,在其上面的恳求释放操作都见面传送给它们富含的具有的缉。

重大涉嫌个别只八九不离十:

  • InterProcessMultiLock
  • InterProcessLock

它们的构造函数需要包含的缉的聚众,或者同一组ZooKeeper的path。

public InterProcessMultiLock(List<InterProcessLock> locks)
public InterProcessMultiLock(CuratorFramework client, List<String> paths)

用法和Shared Lock相同。

public class MultiSharedLockDemo {

    private static final String PATH1 = "/examples/locks1";
    private static final String PATH2 = "/examples/locks2";

    public static void main(String[] args) throws Exception {
        FakeLimitedResource resource = new FakeLimitedResource();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            InterProcessLock lock1 = new InterProcessMutex(client, PATH1);
            InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, PATH2);

            InterProcessMultiLock lock = new InterProcessMultiLock(Arrays.asList(lock1, lock2));

            if (!lock.acquire(10, TimeUnit.SECONDS)) {
                throw new IllegalStateException("could not acquire the lock");
            }
            System.out.println("has got all lock");

            System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());
            System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());

            try {
                resource.use(); //access resource exclusively
            } finally {
                System.out.println("releasing the lock");
                lock.release(); // always release the lock in a finally block
            }
            System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());
            System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());
        }
    }
}

新建一个InterProcessMultiLock, 包含一个更入锁和一个非重入锁。
调用acquire()继可看看线程同时具备了立即点儿只锁。
调用release()见状就片个锁都为假释了。

末了重复反复一赖,
强烈推荐使用ConnectionStateListener监控连接的状态,当连接状态也LOST,锁将见面丢掉。

分布式计数器

顾名思义,计数器是为此来计数的,
利用ZooKeeper可以实现一个集群共享的计数器。
只要利用同一的path就得落时的计数器值,
这是由ZooKeeper的一致性保证的。Curator有少独计数器,
一个凡是用int来计数(SharedCount),一个用long来计数(DistributedAtomicLong)。

分布式int计数器—SharedCount

此近乎以int类型来计数。 主要涉及三只类似。

  • SharedCount
  • SharedCountReader
  • SharedCountListener

SharedCount代表计数器,
可以吗其多一个SharedCountListener,当计数器改变时这个Listener可以监听到反之风波,而SharedCountReader足读取到新型的价值,
包括字面值和带动版本信息的值VersionedValue。

public class SharedCounterDemo implements SharedCountListener {

    private static final int QTY = 5;
    private static final String PATH = "/examples/counter";

    public static void main(String[] args) throws IOException, Exception {
        final Random rand = new Random();
        SharedCounterDemo example = new SharedCounterDemo();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            SharedCount baseCount = new SharedCount(client, PATH, 0);
            baseCount.addListener(example);
            baseCount.start();

            List<SharedCount> examples = Lists.newArrayList();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final SharedCount count = new SharedCount(client, PATH, 0);
                examples.add(count);
                Callable<Void> task = () -> {
                    count.start();
                    Thread.sleep(rand.nextInt(10000));
                    System.out.println("Increment:" + count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10)));
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

            for (int i = 0; i < QTY; ++i) {
                examples.get(i).close();
            }
            baseCount.close();
        }
        Thread.sleep(Integer.MAX_VALUE);
    }

    @Override
    public void stateChanged(CuratorFramework arg0, ConnectionState arg1) {
        System.out.println("State changed: " + arg1.toString());
    }

    @Override
    public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {
        System.out.println("Counter's value is changed to " + newCount);
    }
}

当此例子中,我们利用baseCount来监听计数值(addListener法来填补加SharedCountListener
)。 任意的SharedCount, 只要动同样的path,都可获取此计数值。
然后我们采取5单线程为计算数值长一个10因为内之随机数。相同之path的SharedCount对计数值进行改动,将会回调给baseCount的SharedCountListener。

count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10))

此间我们采用trySetCount去装计数器。
先是个参数提供当前之VersionedValue,如果中间其它client更新了此计数值,
你的翻新可能无成功,
但是这你的client更新了流行的值,所以失败了你可以品尝再度重新一不善。
setCount大凡劫持更新计数器的值

专注计数器必须start,使用完以后必须调用close关闭它。

强烈推荐使用ConnectionStateListener
在本例中SharedCountListener扩展ConnectionStateListener

分布式long计数器—DistributedAtomicLong

再度拘留一个Long类型的计数器。 除了计数的限比较SharedCount生了外面,
它首先尝试下乐观锁的法子设置计数器,
如果不成事(比如中计数器已经被其他client更新了),
它应用InterProcessMutex方法来更新计数值。

可以由其的内贯彻DistributedAtomicValue.trySet()中看出:

   AtomicValue<byte[]>   trySet(MakeValue makeValue) throws Exception
    {
        MutableAtomicValue<byte[]>  result = new MutableAtomicValue<byte[]>(null, null, false);

        tryOptimistic(result, makeValue);
        if ( !result.succeeded() && (mutex != null) )
        {
            tryWithMutex(result, makeValue);
        }

        return result;
    }

此计数器有同名目繁多的操作:

  • get(): 获取当前值
  • increment(): 加一
  • decrement(): 减一
  • add(): 增加一定的价值
  • subtract(): 减去特定的值
  • trySet(): 尝试设置计数值
  • forceSet(): 强制设置计数值

必须反省返回结果的succeeded(), 它表示这个操作是否成功。
如果操作成, preValue()代表操作前的值,
postValue()表示操作后底价值。

public class DistributedAtomicLongDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/counter";

    public static void main(String[] args) throws IOException, Exception {
        List<DistributedAtomicLong> examples = Lists.newArrayList();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final DistributedAtomicLong count = new DistributedAtomicLong(client, PATH, new RetryNTimes(10, 10));

                examples.add(count);
                Callable<Void> task = () -> {
                    try {
                        AtomicValue<Long> value = count.increment();
                        System.out.println("succeed: " + value.succeeded());
                        if (value.succeeded())
                            System.out.println("Increment: from " + value.preValue() + " to " + value.postValue());
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
            Thread.sleep(Integer.MAX_VALUE);
        }
    }
}

分布式队列

运用Curator也堪简化Ephemeral Node
(即节点)的操作。Curator也供ZK Recipe的分布式队列实现。 利用ZK的
PERSISTENTS_EQUENTIAL节点,
可以确保放入到队中的型是随顺序排队的。
如果纯粹的客自队列中取数据, 那么其是先期称先有的,这吗是排的特性。
如果你严格要求顺序,你不怕的采用单一的客,可以动用Leader选举只给Leader作为唯一的消费者。

而是, 根据Netflix的Curator作者所说,
ZooKeeper真心不合乎做Queue,或者说ZK没有落实一个好之Queue,详细内容可以看
Tech Note
4,
原因产生五:

  1. ZK有1MB 的传输限制。
    实践着ZNode必须相对较小,而班包含多的音信,非常的不行。
  2. 若有无数节点,ZK启动时一定之款。 而使用queue会导致群ZNode.
    你待明确增大 initLimit 和 syncLimit.
  3. ZNode很要命之上特别为难清理。Netflix不得不创建了一个专程的先后召开这行。
  4. 当好大气之涵盖众多的子节点的ZNode时, ZK的特性变得不好
  5. ZK的数据库完全在内存中。 大量底Queue意味着会占用多底内存空间。

虽说, Curator还是创造了各种Queue的贯彻。
如果Queue的数据量不极端多,数据量不顶非常的景象下,酌情考虑,还是得以行使的。

分布式队列—DistributedQueue

DistributedQueue是无与伦比平常的相同栽阵。 它设计以下四只类似:

  • QueueBuilder – 创建行使用QueueBuilder,它也是任何队列的创立类
  • QueueConsumer – 队列中之音讯消费者接口
  • QueueSerializer –
    队排消息序列化和倒序列化接口,提供了针对班中的目标的序列化和倒序列化
  • DistributedQueue – 队列实现类

QueueConsumer是消费者,它可接收队列的数。处理队列中的数目的代码逻辑可以在QueueConsumer.consumeMessage()中。

好端端情形下优先用信息于队列中移除,再交付消费者花。但就是零星个步骤,不是原子的。可以调用Builder的lockPath()消费者加锁,当消费者花数量常常怀有锁,这样任何消费者莫可知消费之音。如果消费失败或者经过死掉,消息可以交其他进程。这会带来或多或少特性的损失。最好要仅仅消费者模式下队列。

public class DistributedQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework clientA = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        clientA.start();
        CuratorFramework clientB = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        clientB.start();
        DistributedQueue<String> queueA;
        QueueBuilder<String> builderA = QueueBuilder.builder(clientA, createQueueConsumer("A"), createQueueSerializer(), PATH);
        queueA = builderA.buildQueue();
        queueA.start();

        DistributedQueue<String> queueB;
        QueueBuilder<String> builderB = QueueBuilder.builder(clientB, createQueueConsumer("B"), createQueueSerializer(), PATH);
        queueB = builderB.buildQueue();
        queueB.start();
        for (int i = 0; i < 100; i++) {
            queueA.put(" test-A-" + i);
            Thread.sleep(10);
            queueB.put(" test-B-" + i);
        }
        Thread.sleep(1000 * 10);// 等待消息消费完成
        queueB.close();
        queueA.close();
        clientB.close();
        clientA.close();
        System.out.println("OK!");
    }

    /**
     * 队列消息序列化实现类
     */
    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {
            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }
        };
    }

    /**
     * 定义队列消费者
     */
    private static QueueConsumer<String> createQueueConsumer(final String name) {
        return new QueueConsumer<String>() {
            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("连接状态改变: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("消费消息(" + name + "): " + message);
            }
        };
    }
}

事例中定义了有限独分布式队列和一定量个买主,因为PATH是平等的,会有消费者抢占消费信息之情。

带来Id的分布式队列—DistributedIdQueue

DistributedIdQueue和方面的队类似,唯独得吧队列中之各一个元素设置一个ID
可以由此ID把班中随意的因素移除。 它事关几只类似:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedQueue

经下方法创建:

builder.buildIdQueue()

放入元素时:

queue.put(aMessage, messageId);

移除元素时:

int numberRemoved = queue.remove(messageId);

在这个事例中,
有些元素还并未受消费者花前便更换除了,这样顾客不见面接到删除的音讯。

public class DistributedIdQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedIdQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildIdQueue();
            queue.start();

            for (int i = 0; i < 10; i++) {
                queue.put(" test-" + i, "Id" + i);
                Thread.sleep((long) (15 * Math.random()));
                queue.remove("Id" + i);
            }

            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("consume one message: " + message);
            }

        };
    }
}

先期级分布式队列—DistributedPriorityQueue

先行级列对队列中的因素以预级进行排序。 Priority越小,
元素越靠前, 越先被消费掉
。 它事关下面几乎只类似:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedPriorityQueue

通过builder.buildPriorityQueue(minItemsBeforeRefresh)方法创建。
当优先级列得到元素增删消息不时,它见面搁浅处理时的素队列,然后刷新队列。minItemsBeforeRefresh指定刷新前时倒之行的顶小数目。
主要安装你的次第可以容忍的非排序的绝小价。

放入队列时欲指定优先级:

queue.put(aMessage, priority);

例子:

public class DistributedPriorityQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedPriorityQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildPriorityQueue(0);
            queue.start();

            for (int i = 0; i < 10; i++) {
                int priority = (int) (Math.random() * 100);
                System.out.println("test-" + i + " priority:" + priority);
                queue.put("test-" + i, priority);
                Thread.sleep((long) (50 * Math.random()));
            }

            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                Thread.sleep(1000);
                System.out.println("consume one message: " + message);
            }

        };
    }

}

突发性你可能会见产生错觉,优先级设置并从未起效。那是因事先级是对此队列积压的因素而言,如果消费速度了不久有或出现在继一个元素入队操作前前一个素都让消费,这种情况下DistributedPriorityQueue会退化为DistributedQueue。

分布式延迟队列—DistributedDelayQueue

JDK中也发生DelayQueue,不晓你是不是熟悉。
DistributedDelayQueue也提供了近乎之功效, 元素有个delay值,
消费者隔一段时间才能够接受元素。 涉及到下四个近乎。

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedDelayQueue

透过下的言语创建:

QueueBuilder<MessageType>    builder = QueueBuilder.builder(client, consumer, serializer, path);
... more builder method calls as needed ...
DistributedDelayQueue<MessageType> queue = builder.buildDelayQueue();

放入元素时可以指定delayUntilEpoch

queue.put(aMessage, delayUntilEpoch);

注意delayUntilEpoch切莫是离现在的一个日间隔,
比如20毫秒,而是未来之一个时戳,如 System.currentTimeMillis() + 10秒。
如果delayUntilEpoch的时光已仙逝,消息会就叫消费者收到。

public class DistributedDelayQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedDelayQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildDelayQueue();
            queue.start();

            for (int i = 0; i < 10; i++) {
                queue.put("test-" + i, System.currentTimeMillis() + 10000);
            }
            System.out.println(new Date().getTime() + ": already put all items");


            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println(new Date().getTime() + ": consume one message: " + message);
            }

        };
    }
}

SimpleDistributedQueue

眼前则实现了各种队列,但是你注意到没有,这些队列并没有落实类似JDK一样的接口。
SimpleDistributedQueue供了跟JDK基本一致的接口(但是并未落实Queue接口)。
创建好简单:

public SimpleDistributedQueue(CuratorFramework client,String path)

长元素:

public boolean offer(byte[] data) throws Exception

去元素:

public byte[] take() throws Exception

除此以外还提供了另外方法:

public byte[] peek() throws Exception
public byte[] poll(long timeout, TimeUnit unit) throws Exception
public byte[] poll() throws Exception
public byte[] remove() throws Exception
public byte[] element() throws Exception

没有add方法, 多了take方法。

take方以成返回之前会于堵塞。
poll术在批列为空时直接回到null。

public class SimpleDistributedQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        SimpleDistributedQueue queue;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));
            client.start();
            queue = new SimpleDistributedQueue(client, PATH);
            Producer producer = new Producer(queue);
            Consumer consumer = new Consumer(queue);
            new Thread(producer, "producer").start();
            new Thread(consumer, "consumer").start();
            Thread.sleep(20000);
        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    public static class Producer implements Runnable {

        private SimpleDistributedQueue queue;

        public Producer(SimpleDistributedQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                try {
                    boolean flag = queue.offer(("zjc-" + i).getBytes());
                    if (flag) {
                        System.out.println("发送一条消息成功:" + "zjc-" + i);
                    } else {
                        System.out.println("发送一条消息失败:" + "zjc-" + i);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static class Consumer implements Runnable {

        private SimpleDistributedQueue queue;

        public Consumer(SimpleDistributedQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                byte[] datas = queue.take();
                System.out.println("消费一条消息成功:" + new String(datas, "UTF-8"));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }


}

但实际发送了100修消息,消费完毕第一修后,后面的音讯无法消费,目前未曾找到原因。查看转法定文档推荐的demo使用下几乎独Api:

Creating a SimpleDistributedQueue

public SimpleDistributedQueue(CuratorFramework client,
                              String path)
Parameters:
client - the client
path - path to store queue nodes
Add to the queue

public boolean offer(byte[] data)
             throws Exception
Inserts data into queue.
Parameters:
data - the data
Returns:
true if data was successfully added
Take from the queue

public byte[] take()
           throws Exception
Removes the head of the queue and returns it, blocks until it succeeds.
Returns:
The former head of the queue
NOTE: see the Javadoc for additional methods

不过事实上运用发现尚是存在消费阻塞问题。

分布式屏障—Barrier

分布式Barrier是如此一个类似:
它会阻塞所有节点上的待过程,直到某一个于满足,
然后具有的节点继续展开。

遵赛马比赛被, 等赛马陆续到起跑线前。
一名气让下,所有的赛马都飞奔而出。

DistributedBarrier

DistributedBarrier类实现了栅栏的作用。 它的构造函数如下:

public DistributedBarrier(CuratorFramework client, String barrierPath)
Parameters:
client - client
barrierPath - path to use as the barrier

第一你待安装栅栏,它以卡住在它上面等待的线程:

setBarrier();

然后用阻塞的线程调用方法等放行条件:

public void waitOnBarrier()

当规则满足时,移除栅栏,所有等待的线程将继续执行:

removeBarrier();

老大处理 DistributedBarrier
会监控连接状态,当连接断掉时waitOnBarrier()方法会抛来很。

public class DistributedBarrierDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/barrier";

    public static void main(String[] args) throws Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            DistributedBarrier controlBarrier = new DistributedBarrier(client, PATH);
            controlBarrier.setBarrier();

            for (int i = 0; i < QTY; ++i) {
                final DistributedBarrier barrier = new DistributedBarrier(client, PATH);
                final int index = i;
                Callable<Void> task = () -> {
                    Thread.sleep((long) (3 * Math.random()));
                    System.out.println("Client #" + index + " waits on Barrier");
                    barrier.waitOnBarrier();
                    System.out.println("Client #" + index + " begins");
                    return null;
                };
                service.submit(task);
            }
            Thread.sleep(10000);
            System.out.println("all Barrier instances should wait the condition");
            controlBarrier.removeBarrier();
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

            Thread.sleep(20000);
        }
    }
}

其一事例创建了controlBarrier来安栅栏及移除栅栏。
我们创建了5单线程,在此Barrier上等待。
最后移除栅栏后有着的线程才继续执行。

若是您从头免设置栅栏,所有的线程就不见面阻塞住。

双栅栏—DistributedDoubleBarrier

双栅栏允许客户端在测算的起跟终结时并。当足够的经过在到双栅栏时,进程始起计,
当计算好时,离开栅栏。 双栅栏类是DistributedDoubleBarrier
构造函数为:

public DistributedDoubleBarrier(CuratorFramework client,
                                String barrierPath,
                                int memberQty)
Creates the barrier abstraction. memberQty is the number of members in the barrier. When enter() is called, it blocks until
all members have entered. When leave() is called, it blocks until all members have left.

Parameters:
client - the client
barrierPath - path to use
memberQty - the number of members in the barrier

memberQty凡是成员数,当enter()主意为调用时,成员为打断,直到所有的分子都调用了enter()
leave()术被调用时,它吗打断调用线程,直到有的分子都调用了leave()
就如百米赛跑比赛, 发令枪响,
所有的健儿开跑,等具有的运动员跑了极端线,比赛才收。

DistributedDoubleBarrier会监控连接状态,当连接断掉时enter()leave()方法会抛来很。

public class DistributedDoubleBarrierDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/barrier";

    public static void main(String[] args) throws Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client, PATH, QTY);
                final int index = i;
                Callable<Void> task = () -> {

                    Thread.sleep((long) (3 * Math.random()));
                    System.out.println("Client #" + index + " enters");
                    barrier.enter();
                    System.out.println("Client #" + index + " begins");
                    Thread.sleep((long) (3000 * Math.random()));
                    barrier.leave();
                    System.out.println("Client #" + index + " left");
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
            Thread.sleep(Integer.MAX_VALUE);
        }
    }

}

参考资料:
《从PAXOS到ZOOKEEPER分布式一致性原理和履行》
《 跟着实例学习ZooKeeper的用法》博客系列

路仓库:
https://github.com/zjcscut/curator-seed
(其实本文的MD是带动导航目录[toc]的,比较便利导航及每个章节,只是简书不支持,本文的MD原文放在项目之/resources/md目录下,有易自取,文章之所以Typora编写,建议用Typora打开)

End on 2017-5-13 13:10.
Help yourselves!
本身是throwable,在广州加油,白天上班,晚上跟双休不定时加班,晚上有空坚持写下博客。
欲我之稿子能为你带收获,共勉。