站长论坛 - 站长交流社区- SEO交流论坛

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 216|回复: 4

[其他] 【从入门到精通-ZooKeeper】ZooKeeper实战-分布式队列

[复制链接]
发表于 2019-9-16 10:14:30 | 显示全部楼层 |阅读模式

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有帐号?立即注册

x
前言
上文【从入门到放弃-ZooKeeper】ZooKeeper入门中,我们学习了ZooKeeper的简单安装和cli使用。
接下来我们开始基于java API的实战编程。本文先来写一个分布式队列的代码实现。
设计
我们来写一个先进先出的分布式无界公平队列。参考我们之前介绍的【从入门到放弃-Java】并发编程-JUC-ConcurrentLinkedQueue和【从入门到放弃-Java】并发编程-JUC-LinkedBlockingQueue。我们直接继承AbstractQueue类,并实现Queue接口。
主要重写offer、poll、peek、size方法。
我们使用ZooKeeper的持久化顺序节点来实现分布式队列。
offer是入队,入队时新创建一个持久化顺序节点,节点后缀会根据ZooKeeper的特性自动累加。
poll的出队,获取根节点下的所有节点,根据后缀数字排序,数组最小的是最先入队的,因此要最先出队。
peek,获取到最下入队的数据,和poll的区别是,peek只获取数据,不出队,不删除已经消费的节点。
size获取队列长度,实现方式是,获取根节点下的节点数量即可。这个方法在并发时可能会有问题。慎用。
附: 申请阿里云服务器等产品时,可以使用2000元阿里云代金券
DistributedQueue//继承AbstractQueue类并实现Queue接口public class DistributedQueue<E> extends AbstractQueue<E> implements Queue<E> {    private static Logger logger = LoggerFactory.getLogger(DistributedQueue.class);        //ZooKeeper客户端,进行ZooKeeper操作    private ZooKeeper zooKeeper;    //根节点名称    private String dir;    //数据节点名称,顺序节点在插入口会变为 node{00000000xx} 格式    private String node;    //ZooKeeper鉴权信息    private List<ACL> acls;    /**     * Constructor.     *     * @param zooKeeper the zoo keeper     * @param dir       the dir     * @param node      the node     * @param acls      the acls     */    public DistributedQueue (ZooKeeper zooKeeper, String dir, String node, List<ACL> acls) {        this.zooKeeper = zooKeeper;        this.dir = dir;        this.node = node;        this.acls = acls;        init();    }    private void init() {        //需要先判断根节点是否存在,不存在的话,创建子节点时会出错。        try {            Stat stat = zooKeeper.exists(dir, false);            if (stat == null) {                zooKeeper.create(dir, null, acls, CreateMode.PERSISTENT);            }        } catch (Exception e) {            logger.error("[DistributedQueue#init] error : " + e.toString(), e);        }    }}offer/** * Offer boolean. * * @param o the o * @return the boolean */@Overridepublic boolean offer(E o) {    //构建要插入的节点名称    String fullPath = dir.concat("/").concat(node);    try {        //创建子节点成功则返回入队成功        zooKeeper.create(fullPath, objectToBytes(o), acls, CreateMode.PERSISTENT_SEQUENTIAL);        return true;    } catch (Exception e) {        logger.error("[DistributedQueue#offer] error : " + e.toString(), e);    }    return false;}poll/** * Poll e. * * @return the e */@Overridepublic E poll() {    try {        //获取根节点所有子节点信息。        List<String> children = zooKeeper.getChildren(dir, null);        //如果队列是空的则返回null        if (children == null || children.iSEMpty()) {            return null;        }        //将子节点名称排序        Collections.sort(children);        for (String child : children) {            //拼接子节点的具体名称            String fullPath = dir.concat("/").concat(child);            try {                //如果获取数据成功,则类型转换后,返回,并删除改队列中该节点                byte[] bytes = zooKeeper.getData(fullPath, false, null);                E data = (E) bytesToObject(bytes);                zooKeeper.delete(fullPath, -1);                return data;            } catch (Exception e) {                logger.warn("[DistributedQueue#poll] warn : " + e.toString(), e);            }        }    } catch (Exception e) {        logger.error("[DistributedQueue#peek] poll : " + e.toString(), e);    }    return null;}peek/** * Peek e. * * @return the e */@Overridepublic E peek() {       try {        //获取根节点所有子节点信息。        List<String> children = zooKeeper.getChildren(dir, null);        //如果队列是空的则返回null        if (children == null || children.isEmpty()) {            return null;        }        //将子节点名称排序        Collections.sort(children);                for (String child : children) {            //拼接子节点的具体名称            String fullPath = dir.concat("/").concat(child);            try {                //如果获取数据成功,则类型转换后,返回,不会删除改队列中该节点                byte[] bytes = zooKeeper.getData(fullPath, false, null);                E data = (E) bytesToObject(bytes);                return data;            } catch (Exception e) {                logger.warn("[DistributedQueue#peek] warn : " + e.toString(), e);            }        }    } catch (Exception e) {        logger.error("[DistributedQueue#peek] warn : " + e.toString(), e);    }    return null;}size/** * Size int. * * @return the int */@Overridepublic int size() {    try {        //获取根节点的子节点名称        List<String> children = zooKeeper.getChildren(dir, null);        //返回子结点信息数量        return children.size();    } catch (Exception e) {        logger.error("[DistributedQueue#offer] size : " + e.toString(), e);    }    return 0;}总结
上面我们一起学习了如何利用持久性顺序节点,创建一个分布式先进先出队列。

回复

使用道具 举报

 楼主| 发表于 2019-9-16 19:41:12 | 显示全部楼层
分享一下哪儿有阿里云备案服务号?

可以单独买个阿里云备案服务号,不用买主机就可以做正规的网站域名备案,需要你自己直接备案的,很安全。
   请加旺旺:gydtep  QQ在线咨询:394416076   
   阿里云备案服务号淘宝网址: https://item.taobao.com/item.htm?&id=533719730125
回复 支持 反对

使用道具 举报

 楼主| 发表于 2019-9-17 17:53:22 | 显示全部楼层
017年1月,“Gitlab误删库事件”引起业界对信息安全和重大风险的敏感神经。
回复 支持 反对

使用道具 举报

 楼主| 发表于 2019-9-20 19:27:26 | 显示全部楼层
并且团队的任务和评估成功的方法也是明确的
回复 支持 反对

使用道具 举报

 楼主| 发表于 2019-10-10 13:55:31 | 显示全部楼层
。申请阿里云服务器等产品时,可以使用2000元阿里云代金券,阿里云官网领取网址: https://promotion.aliyun.com/ntm ... l?userCode=2a7uv47d
     1 、仅限新用户领取,老用户可以新注册一个账号,注册新账号不能用以前用过的手机号、邮箱、身份信息等,否则还会被当做老用户。
     2 、首次购买时先将所有要买的业务都加入到购物车,最后一起付款,这样才可享受优惠。如果一个一个买,那就只有首单才能享受优惠,从第2单开始就不能享受优惠了。
回复 支持 反对

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

QQ|Archiver|手机版|小黑屋|易采站长站 ( 蒙ICP备14002389-1号 ) |

GMT+8, 2019-12-9 23:46

Powered by Discuz! X3.4

Copyright © 2001-2020, Tencent Cloud.

快速回复 返回顶部 返回列表