马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有帐号?立即注册
x
前言设计我们来写一个先进先出的分布式无界公平队列。参考我们之前介绍的【从入门到放弃-Java】并发编程-JUC-ConcurrentLinkedQueue和【从入门到放弃-Java】并发编程-JUC-LinkedBlockingQueue。我们直接继承AbstractQueue类,并实现Queue接口。
主要重写offer、poll、peek、size方法。
我们使用ZooKeeper的持久化顺序节点来实现分布式队列。
offer是入队,入队时新创建一个持久化顺序节点,节点后缀会根据ZooKeeper的特性自动累加。
poll的出队,获取根节点下的所有节点,根据后缀数字排序,数组最小的是最先入队的,因此要最先出队。
peek,获取到最下入队的数据,和poll的区别是,peek只获取数据,不出队,不删除已经消费的节点。
size获取队列长度,实现方式是,获取根节点下的节点数量即可。这个方法在并发时可能会有问题。慎用。 DistributedQueue//继承AbstractQueue类并实现Queue接口public class DistributedQueue<E> extends AbstractQueue<E> implements Queue<E> { private static Logger logger = LoggerFactory.getLogger(DistributedQueue.class); //ZooKeeper客户端,进行ZooKeeper操作 private ZooKeeper zooKeeper; //根节点名称 private String dir; //数据节点名称,顺序节点在插入口会变为 node{00000000xx} 格式 private String node; //ZooKeeper鉴权信息 private List<ACL> acls; /** * Constructor. * * @param zooKeeper the zoo keeper * @param dir the dir * @param node the node * @param acls the acls */ public DistributedQueue (ZooKeeper zooKeeper, String dir, String node, List<ACL> acls) { this.zooKeeper = zooKeeper; this.dir = dir; this.node = node; this.acls = acls; init(); } private void init() { //需要先判断根节点是否存在,不存在的话,创建子节点时会出错。 try { Stat stat = zooKeeper.exists(dir, false); if (stat == null) { zooKeeper.create(dir, null, acls, CreateMode.PERSISTENT); } } catch (Exception e) { logger.error("[DistributedQueue#init] error : " + e.toString(), e); } }}offer/** * Offer boolean. * * @param o the o * @return the boolean */@Overridepublic boolean offer(E o) { //构建要插入的节点名称 String fullPath = dir.concat("/").concat(node); try { //创建子节点成功则返回入队成功 zooKeeper.create(fullPath, objectToBytes(o), acls, CreateMode.PERSISTENT_SEQUENTIAL); return true; } catch (Exception e) { logger.error("[DistributedQueue#offer] error : " + e.toString(), e); } return false;}poll/** * Poll e. * * @return the e */@Overridepublic E poll() { try { //获取根节点所有子节点信息。 List<String> children = zooKeeper.getChildren(dir, null); //如果队列是空的则返回null if (children == null || children.iSEMpty()) { return null; } //将子节点名称排序 Collections.sort(children); for (String child : children) { //拼接子节点的具体名称 String fullPath = dir.concat("/").concat(child); try { //如果获取数据成功,则类型转换后,返回,并删除改队列中该节点 byte[] bytes = zooKeeper.getData(fullPath, false, null); E data = (E) bytesToObject(bytes); zooKeeper.delete(fullPath, -1); return data; } catch (Exception e) { logger.warn("[DistributedQueue#poll] warn : " + e.toString(), e); } } } catch (Exception e) { logger.error("[DistributedQueue#peek] poll : " + e.toString(), e); } return null;}peek/** * Peek e. * * @return the e */@Overridepublic E peek() { try { //获取根节点所有子节点信息。 List<String> children = zooKeeper.getChildren(dir, null); //如果队列是空的则返回null if (children == null || children.isEmpty()) { return null; } //将子节点名称排序 Collections.sort(children); for (String child : children) { //拼接子节点的具体名称 String fullPath = dir.concat("/").concat(child); try { //如果获取数据成功,则类型转换后,返回,不会删除改队列中该节点 byte[] bytes = zooKeeper.getData(fullPath, false, null); E data = (E) bytesToObject(bytes); return data; } catch (Exception e) { logger.warn("[DistributedQueue#peek] warn : " + e.toString(), e); } } } catch (Exception e) { logger.error("[DistributedQueue#peek] warn : " + e.toString(), e); } return null;}size/** * Size int. * * @return the int */@Overridepublic int size() { try { //获取根节点的子节点名称 List<String> children = zooKeeper.getChildren(dir, null); //返回子结点信息数量 return children.size(); } catch (Exception e) { logger.error("[DistributedQueue#offer] size : " + e.toString(), e); } return 0;}总结上面我们一起学习了如何利用持久性顺序节点,创建一个分布式先进先出队列。
|