`
顽石
  • 浏览: 164164 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

etcd事件监听

 
阅读更多

    etcd是CoreOS开发的一个高可用的键值存储系统,主要用于共享配置和服务发现。它使用Go语言编写,并通过Raft一致性算法处理日志复制以保证强一致性。etcd目前的最新版本是2.2.0。

    和zookeeper的二进制接口不同,它提供了HTTP/JSON的rest api接口,所以对使用它的客户端来说是很友好的,几乎每种编程语言都有较成熟的http client开发包,基于这些开发包很容易编写etcd客户端。在java中已经有个比较好的客户端etcd4j(https://github.com/jurmous/etcd4j),类似zookeeper有好用的curator库。

   

   etcd事件

    etcd中和数据变化(包括目录&值key、value变化)相关的事件类型有:set, delete, update, create, compareAndSwap、compareAndDelete。在etcd http response (json格式)action属性就是事件类型。

     etcd的事件watch监听接口也是使用http访问,它提供两种监听模式,一种是一次性监听,类似zookeeper的事件watch,监听到一次事件后,需要客户端重新发起监听,比较繁琐。另一种是持久监听(stream),当有事件时,会连续触发,不需要客户端重新发起监听。

     和zookeeper一样也存在客户端丢失监听事件的可能,它不保证每个事件客户端都能监听到,不过我们在实际使用过程中,通常是在应用启动时,在开始监听之后或之前先查询etcd,将需要的数据全量加载到内存中,后续才是根据监听事件来增量更新内存中的数据或全量刷新数据。另外目前版本的etcd最多只保存1千条事件历史,满1千条后,如果有新的事件产生,事件历史库中最老的事件将被丟弃,这个可以理解为etcd服务端事件丢失。由于有1千条的限制,在有事件浪涌时(在单位时间内例如1秒内产生2千条事件),事件监听处理较慢的时候或未监听时也会发生客户端事件丢失。   

 

 

    一次性watch监听

   可用curl发送http请求来进行演示,假设在本机安装有etcd,客户端访问端口为2379(旧版本监听端口为4001),监听key为/application。一次性监听命令如下:

    curl "http://127.0.0.1:2379/v2/keys/application?wait=true"

    查询字符串中的wait=true表示监听,还可加查询参数recursive=true表示附加监听/application的子目录和后辈目录,例如curl "http://127.0.0.1:2379/v2/keys/application?wait=true&recursive=true" 。

    启动监听后etcd如有变化事件发生,则返回json格式的事件,结果类似如下:

    HTTP/1.1 200 OK

    Content-Type: application/json

    X-Etcd-Cluster-Id: e88d54f6225f06ad

    X-Etcd-Index: 271

    X-Raft-Index: 872202

    X-Raft-Term: 5

    Date: Sat, 26 Sep 2015 08:43:17 GMT

    Transfer-Encoding: chunked

 

    {"action":"set","node":{"key":"/application/store","value":"        {...}","modifiedIndex":271,"createdIndex":271},"prevNode":{...}}

   上面的curl返回结果中显示了http头,是加了curl的-i选项。

   从结果可看出action属性为set事件,modifiedIndex是该事件的index,为271。X-Etcd-Index头表示启动监听时etcd的当前index。etcd的index是单调递增的正整数,该整数取值空间是etcd中的所有key共享的。

    由于是一次性监听,所以curl会退出,继续监听要重新运行curl。

    和zookeeper事件监听一样,上面的http请求只能监听到命令发出之后产生的事件(更精确的说是在etcd服务器收到监听请求之后),之前发生的事件是监听不到的。另外在前一个监听得到触发开始(中间包括事件处理耗时)到启动下一次监听之间(并且下一次监听是etcd服务器收到监听请求才真正开始生效,这中间还有网络延)肯定存在时间间隔,如果在这个时间间隔(区间)内有事件发生,客户端是监听不到这些事件的,这个也和zookeeper类似。也就是如果两个事件并行发生或在发生时间上相距很近(例如update一个key后立即delete),后一事件的发生刚好落在这个间隔内,那这个事件在采用一次性监听模式的客户端中会丢失,这些事件对客户端监听不可见。

    

 

    那是不是就一定不能获取先前的事件了?这个要看情况,只要事件还在etcd的事件历史中,并且知道事件的index或起始index,还是可以取到单个事件的。在查询字符串中加waitIndex参数,其值是该事件的index或起始index。例如curl "http://127.0.0.1:2379/v2/keys/application?wait=true&recursive=true&waitIndex=269" ,表示查询/application目录或后辈目录中index等于或大于269的事件,如果有则返回一个最接近269的事件,否则会挂起curl,直到有满足条件的事件发生。这个其实是查询事件,已经不是正规的事件监听了。所以如果知道起始index,可以每次index加1来遍历查询事件,遍历查询时可以根据返回事件中的index来调整后续查询条件中使用的index。 

    如果waitIndex对应的事件已被etcd丢弃(见前面提到的etcd事件历史库),此时如果获取该事件,etcd将返回http状态码400( Bad Request),http body类似如下:

    {"errorCode":401,"message":"The event in requested index is outdated and cleared","cause":"the requested history has been cleared [3305/1]","index":4304} 

    另外可以监听根目录(根key),例如curl "http://127.0.0.1:2379/v2/keys?wait=true&recursive=true" 。

 

    持久监听(流式watch)

    要在查询字符串中加stream=true,例如curl "http://127.0.0.1:2379/v2/keys/application?wait=true&recursive=true&stream=true"

    发出上述命令后,会和etcd服务器之间建立一个http长连接(此处的长连接不是指http keep alive,虽然也需要keep alive),其应答http头中的Transfer-Encoding为chunked,类似server push和comet中的multipart/x-mixed-replace,其http response body不结束,后续的每个事件作为http body的一部分,在该长连接中以一个或多个body chunk块的形式推送给客户端。可以看到在流式持久监听中curl即使收到事件也不会退出,它一直在等待后续将要发生的事件。一次性监听http头中也是chunked,但收到一个事件后,etcd会发送一个结束chunk(大小为0,表示http response结束),因此收到该chunk后curl会退出。

     相比一次性监听,除了更简便,持久监听也比一次性监听可靠性高,不会出现上面提到的在时间间隔内监听不到事件的情况。

      持久监听也是监听从命令发出之后发生的事件,先前的事件是监听不到的,不像JMS的持久订阅可以收到启动之前的jms消息。

      如果在持久监听中加waitIndex参数,分两种情况:一种是waitIndex的值小于或等于启动监听时etcd的当前index(这个值在X-Etcd-Index http头中可看到),则curl接收到满足条件的事件后挂起(不退出),后续再也收不到其他事件,即最多只能收到一个事件。第二种是waitIndex的值大于启动监听时etcd的当前index,则waitIndex参数无效,持久监听的行为同没有waitIndex参数一样。

 

 

   使用etcd4j进行事件监听

   在使用etcd4j进行事件监听时,有个注意事项:如果在同一个jvm虚拟机中既有修改查询etcd数据的操作,也有监听etcd事件,监听事件使用的EtcdClient实例和修改&查询操作使用的EtcdClient实例应分开,不能共用同一个实例,否则目前的etcd4j(2.7.0版本)会出现问题。多个key可以共用一个EtcdClient实例来进行监听,也可每个key使用一个单独的实例来进行监听。

  • 一次性监听

  //创建etcd客户端实例

  EtcdClient   etcdClient = new EtcdClient (new URI[]{new URI("127.0.0.1:2379")});

  /*setRetryPolicy设置重试策略

  * recursive()监听/application目录和其后辈目录中的事件,设置查询字符串recursive=true.

  *  waitForChange()监听,设置查询字符串wait=true.

 * -1表示永不超时,send()发送http请求进行监听

  */

  EtcdResponsePromise<EtcdKeysResponse> promise = etcd.get("/application").setRetryPolicy(

                        new RetryNTimes(300,Integer.MAX_VALUE)).recursive().waitForChange().

                        timeout(-1, TimeUnit.SECONDS).send();

    //增加listener

    promise.addListener(new IsSimplePromiseResponseHandler<EtcdKeysResponse>()

    {

         @Override

         public void onResponse(ResponsePromise<EtcdKeysResponse> promise) {

                   //从promise中取响应,检查是否有异常

                   EtcdKeysResponse response = promise.getNow();

                   Throwable  exception = promise.getException();

                    ... 处理事件

                    //再次发起http请求进行监听

                    ...

         }   

     }

   );

 

     还可使用waitForChange(long waitIndex)来监听指定的事件。

 

  • 持久监听

     etcd4j的当前版本2.7.0不支持持久监听,可以使用

Java异步async httpclient(https://github.com/AsyncHttpClient/async-http-client)来实现持久监听,最挫最原始的持久监听实验代码如下:

     AsyncHttpClient asyncHttpClient = ...;

      //发送http get异步请求,不超时,且提供一个匿名的异步请求完成handler

      asyncHttpClient.prepareGet("http://127.0.0.1:2379/v2/keys/application?   wait=true&recursive=true&stream=true").setRequestTimeout(-1).execute(

       new AsyncHandler<String>() {

           //这个异常回调方法可类比为zookeeper的会话过期

           @Override

           public void onThrowable(Throwable t) {

                  // TODO Auto-generated method stub

                  //todo:异常,重新重试发起监听

            }

             

             //在会话期,有事件时会回调onBodyPartReceived方法

           @Override

           public State  onBodyPartReceived(HttpResponseBodyPart  bodyPart)

throws Exception {

                 // TODO Auto-generated method stub

                // bodyPart对应一个http chunk块,一个事件的数据可能由多个chunk块组成

                System.out.println("event="+new String(bodyPart.getBodyByteBuffer().array(),"utf-8"));

                //todo:将chunk中的数据加入缓冲区,待收到完整的事件数据后将json格式

                //的数据解析成java对象

               ...

               return State.CONTINUE;

         }

  

        @Override

        public  State onStatusReceived(HttpResponseStatus  responseStatus)

throws Exception {

               // TODO Auto-generated method stub

               return State.CONTINUE;

         }

            

            //一个会话期只调用一次,这个回调方法可类比为zookeeper会话建立

         @Override

         public  State onHeadersReceived(HttpResponseHeaders headers)

throws Exception {

               // TODO Auto-generated method stub

               return State.CONTINUE;

          }

 

         @Override

          public String onCompleted() throws Exception {

                    // TODO Auto-generated method stub

                   return "endWatch";

          }

   

    });

   

     

    

 

    

    

    

   

    

 

 

    

  • 大小: 23.6 KB
分享到:
评论

相关推荐

    etcdWatcher:ETCD密钥监听器实现

    etcdWatcher ETCD密钥监听器实现

    etcd-v3.3.10-linux-amd64.tar.zip

    在分布式系统中,如何管理节点间的状态一直是一个难题,etcd像是专门为集群环境的服务发现和注册而设计,它提供了数据TTL失效、数据改变监视、多值、目录监听、分布式锁原子操作等功能,可以方便的跟踪并管理集群...

    Go 版本 Etcd 客户端操作 Etcd

    本文诣在使用 Go 客户端操作 Etcd,并实现元数据的写入(单条写、批量写)、读取(单挑读、前缀读)、监听(watch)、更新(update)!

    Go 学习、Go 进阶、Go 实用工具类、Go DDD 项目落地、Go-kit 、Go-Micro 、Go 推送平台、微服务实践

    - 通过Etcd watch 监听服务(APP),通过变化更新链接 + 负载均衡 (客户端发起请求(APP)) - 负载均衡选择合适的服务(APP HTTP2长链接) - 发起调用 ``` ├── discovery │ ├── customize_balancer.go │ ├...

    docker-etcd-register:将暴露的 docker 容器端口注册到 Etcd

    它旨在由作为全局服务进行管理,并在集群中运行。 该项目的灵感来自的和 Jason Wilder 的博客文章。...使用实时监听容器生命周期事件 用法 运行它: $ git clone https://github.com/mimperatore/do

    goservice:使用go和etcd发现和注册工具

    goserviceDiscovery and register components implements with go and etcd.基于 go 和 etcd 实现的服务注册、发现组件,可应用于...一般而言,只有内网地址服务本地端口:服务监听的端口etcd endpoints:etcd 服务的

    SecKill秒杀项目.zip

    2. 监听Etcd中的数据变化,实时加载数据到内存中。 3. 从Redis中加载黑名单数据到内存当中。 4. 设置白名单。 5. 对用户请求进行黑名单限制。 6. 对用户请求进行流量限制、秒级限制、分级限制。 7. 将用户数据进行...

    coreos_etcd_4.X.proxy:从本地 mashine 到核心 etcd 集群的代理请求

    docker run -d --net host --name etcdproxy -e ETCD1=172.17.8.100:4001 -e ETCD2=172.17.8.101:4001 -e ETCD3=172.17.8.102:4001 quay.io/pkircher/etcdproxy 将开始监听 *:4001 admin on *:1936 username = pwd &gt;...

    consul 中文开发指南

    支持多数据中心,内外网的服务采用不同的端口进行监听。 多数据中心集群可以避免单数据中心的单点故障,而其部署则需要考虑网络延迟, 分片等情况等. zookeeper 和 etcd 均不提供多数据中心功能的支持. 支持健康检查. ...

    viper配置框架的介绍支持zookeeper的读取和监听

    监听配置文件变化,实时读取读取配置文件内容 读取环境变量值 读取远程配置系统 (etcd Consul) 和监控配置变化 读取命令 Flag 值 读取 buffer 值 读取确切值 乍一看,未免有相见恨晚之感,可仔细一想,不免...

    prometheus监控haproxy规则

    prometheus监控规则大全 node规则,redis监控,es监控,vmware监控,ipmi监控,ceph监控,etcd监控,k8s监控,mysql监控,openstack监控,os监控,交换机监控,windows监控,cdh监控,calico监控规则监控

    SecKill:秒杀系统;

    监听Etcd中的数据变化,实时加载数据到内存中。 从Redis中加载黑名单数据到内存当中。 设置白名单。 对用户请求进行黑名单限制。 对用户请求进行流量限制,秒级限制,分级限制。 将用户数据进行签名验证,检验参数的...

    SecKill:这是基于Go语言的一个秒杀系统

    监听Etcd中的数据变化,实时加载数据到内存中。从Redis中加载黑名单数据到内存当中。设置白名单。对用户请求进行黑名单限制。对用户请求进行流量限制,秒级限制,分级限制。将用户数据进行签名验证,检验参数的合法...

    Docker服务注册项目Registrator.zip

    Registrator(原名Docksul)是一个为Docker而设计的服务注册项目,它监听跨主机运行的容器的启动和停止,检查并向Consul或者Etcd注册它们(容器)。 标签:Registrator

    docker-devdns:DNS 服务器根据 docker 容器名称解析 .dev 域

    监听容器创建事件并维护一个 name&lt;-&gt;ip 缓存。 对特殊顶级域(默认 .dev)的请求尝试匹配任何具有相同名称的正在运行的 docker 容器。 对其他域的请求由操作系统解析器解析,这样我们就可以与安装在您的开发...

    Rancher部署Traefik实现微服务的快速发现

    它非常快无需安装其他依赖,通过Go语言编写的单一可执行文件支持RestAPI多种后台支持:Rancher、Docker、Swarm、Kubernetes、Marathon、Mesos、Consul、Etcd,并且还会更多后台监控,可以监听后台变化进而自动化应用...

    谷歌开源的容器集群管理系统Kubernetes.zip

    sockets的proxy,每创建一种Service,Proxy主要从etcd获取Services和Endpoints的配置信息,或者也可以从file获取,然后根据配置信息在Minion上启动一个Proxy的进程并监听相应的服务端口,当外部请求发生时,Proxy会...

    kube-linstor:用于Kubernetes的集装箱式LINSTOR SDS,准备投入生产

    PostgeSQL数据库/ etcd或任何其他后备存储,以实现冗余。 (可选) QuckStart Kube-Linstor由几个组件组成: Linstor-controller-控制器是Linstor的主要控制点。 它为客户端提供API,并与卫星进行通信以创建和...

    kkbinlog:支持mysql,mongodb数据变更订阅分配

    1概述mysql,MongoDB数据移动监听分发本项目意在简化监听mysql,MongoDB数据库的不同表的各种数据移动项目依赖项redis,mysql使用场景:刷新,替代系统...2使用方式从bin-log-distributor-app到客户端数据分发方式的...

Global site tag (gtag.js) - Google Analytics