`

ConfigServer(一)

 
阅读更多

是什么?

俗称:配置中心,是一个基于“发布-订阅”模型的分布式通信框架

常用应用场景

  • 场景1. HSF服务 。服务发布者发布服务,服务调用者订阅服务,ConfigServer向订阅服务的调用者推送可访问的服务IP地址列表,服务调用者从中选一个发起远程调用。

screenshot

  • 场景2. notify 。消息发送和接收,都要通过notifyserver来完成。notifyclient端也是通过configserver来获取notifyserver的地址列表的。

screenshot

特点

主动推

客户端(不管是发布者还是订阅者)和Server之间维持着一个TCP长连接,server通过连接往里面写数据,相当于主动的通知了客户端。

数据聚合

ConfigServer会自动把注册相同dataId的多个客户端聚合在一起

适合非持久型数据

数据的生命周期和发布者的TCP连接的生命周期相同,一旦发布者连接断开后,发布的数据就失效了。

动态感知发布者断连

两种情况

  • 发布者 正常断开连接 (TCP连接):通过连接事件,TCP连接断开时,应用层ConfigServer是可以感知到的,然后ConfigServer把断开的连接对应的数据删除。
  • 发布者 异常断开连接 ,比如直接拔网线。这样应用层是无法直接感知到的。所以需要一种通知机制: 心跳机制 。发布者每隔3秒向ConfigServer发送一个心跳包,用以维持与ConfigServer的连接。ConfigServer定时查看当前时间与上次更新时间的间隔,判断连接是否失效。如果失效,就删除这个连接对应的数据,同时通知订阅者。

Server端

  • Server端是分布式部署的,有多个节点,无主仆之分,是对等关系,同时工作。
  • 每个Server数据是一致的: 每个Server节点的数据都是 全量数据 ,Server两两之间进行数据同步。同步过程中通过更新数据的版本号,保证数据一致性。

发布数据的表示

screenshot

客户端连接的表示

ClientConnection类

主要功能:

  • 注册发布者 addPublisher(GroupId group,String clientId)
  • 注册订阅者 addSubscriber(GroupId group,String clientId)
  • 发布数据 publishData(..) 发布的数据封装在PubInfo类中
  • 删除发布者 removePublisher(..)
  • 延续指定client的生命周期 通过renew() 方法更新lastRenewTime为当前时间

通过两个ConcurrentHashMap保存连接上注册的发布者和订阅者:

 // 并发访问,client线程和推送线程
    private final ConcurrentHashMap<GroupId, String> subscribers = new ConcurrentHashMap<GroupId, String>(
            16, 0.75f, 1);
    // 并发访问,client线程和推送线程
    private final ConcurrentHashMap<GroupId, PubInfo> publishers = new ConcurrentHashMap<GroupId, PubInfo>(
            16, 0.75f, 1);

连接管理

ClientConnectionWorker类

此类是用于管理客户端连接的线程。

执行以下任务:

  • 处理连接上的请求
  • 向连接推送数据 通过对连接的哈希值对线程个数取模,对连接归组,每个cconnectionWorker管理一组连接
private final Map<Connection, ClientConnection> nativeClients;
private final Map<String, ClientConnection> clusterClients;

clusterClients:集群客户, 保存集群中其他服务器同步来的客户端
nativeClients :原生客户端,保存直连到本服务器的客户端

ClientConnectionWorkers 类

客户端连接线程池,向ClientConnectionWorker分派任务。

static private final ClientConnectionWorker[] clientWorkers;

有两种分派方式:

/**
     * 由originalClientIp找到client线程,并向其分派执行任务。请求线程不需要知道结果。
     */
    static public void dispatch(String clientIp, Runnable task) {
        if (clientIp.contains(":")) {
            new Exception("illegal clientIp, " + clientIp).printStackTrace();
            System.exit(0);
        }

        ClientConnectionWorker worker = getWorker(clientIp);
        worker.execute(task);
    }
/**
     * 向所有ConnectionWorker线程分派执行任务。请求线程通过返回的List<Future>来获得执行结果。
     */
    static public <V> List<Future<V>> dispatchToAllWorkers(Callable<V> task) {
        List<Future<V>> futures = new ArrayList<Future<V>>(clientWorkers.length);
        for (ClientConnectionWorker worker : clientWorkers) {
            futures.add(worker.execute(task));
        }
        return futures;
    }

数据推送

DefaultPushService类(向客户端推送)

监听组数据的变化和订阅者新增事件,并向客户端推送数据。
继承了EventListener的时间监听机制
onEvent(Event event)方法:根据不同的事件,做出不同处理

组数据变化事件:全推(获取该组所有发布者的数据,向该组所有订阅者所在的连接发送数据)
新增订阅者事件:单推(获取该订阅者所在组中所有发布者的数据,向该订阅者所属的连接发送数据)

@Override
    public void onEvent(Event event) {
        if (event instanceof GroupDataChangedEvent) {
            GroupDataChangedEvent gdce = (GroupDataChangedEvent) event;
            scheduleAllPush(gdce.group);
        }
        else if (event instanceof SubscriberAddedEvent) {
            SubscriberAddedEvent sae = (SubscriberAddedEvent) event;
            scheduleSinglePush(sae.group, sae.client);
        }
    }

@Override
    public void scheduleSinglePush(GroupId group, ClientConnection dest) {
        if (dest.getConnection().isConnected()) {
            PushDelayTask task = new PushDelayTask(group, dest);
            pushTaskManager.addTask(task);
        }
    }

    @Override
    public void scheduleAllPush(GroupId group) {
        PushDelayTask task = new PushDelayTask(group);
        pushTaskManager.addTask(task);
    }

上面代码里的PushDelayTask是延期推送任务类,由TaskManager线程执行,到期后立即推送。
推送执行任务类是PushExecuteTask,实现了Runnable接口,其run方法主要工作就是数据压缩、打包、发送。

推送部分源码:

// Send
Connection connection = client.getConnection();
RequestControl control = new RequestControlImpl(connection.lastRequestProtocol(),
                    RESP_TIMEOUT);
ResponseCallback callback = new PushDataCallback(client);
//回调Client的invokeWithCallback()方法,把打包好的数据protocolPackage作为参数推送到客户端。
connection.getClient().invokeWithCallback(protocolPackage, callback, control);

DefaultConfigClusterService类(集群间推送)

监听:数据发布、发布者注销、客户端断连。
事件处理:通知其他Server

数据发布事件包括:原生发布、集群间发布

连接断开事件的触发条件包括:
1. 原生连接(直接TCP连接到本服务器)断开
2. 收到其他Server的通知
3. 长时间收不到renew包

DefaultConfigClusterService类会启动一个周期性任务,向连接线程池中的所有线程委派心跳任务 SyncRenewTask

// 定时发送renew包
timerService.scheduleWithFixedDelay(new SyncRenewSchedulerTask(),SyncRenewSchedulerTask.RENEW_PERIOD);

SyncRenewTask表示心跳任务,被委派给ConnectionWorker线程去执行。
1. 对管理的native client,向其他server发送心跳包
2. 对管理的cluster client,计算最后一次renew到当前的时间,如果超过特定时长(过期时长是3分钟),视为断开连接,将其删除。

Client端原理

发布者和订阅者

screenshot

发布者和订阅者的注册

类结构如下:

screenshot

基本注册信息:

screenshot

心跳

使用的是remoting的ConnectionHeartBeat类(心跳包)和HeartBeatProcessor类(心跳处理类)
内部只有一个String 类型的clientUrl属性
也就是说客户端发送心跳数据包,内容很少,只有客户端连接的url

分享到:
评论

相关推荐

    ConfigServer:用于命名服务的配置中心

    Config Server的使用步骤1 从github下载config server源码。 cd "你的目录" git clone 步骤 2 安装 MySQL 服务器并初始化配置服务器 sql 脚本 a) 如何安装 MySQL 服务器,请参阅 Internet 的更多详细信息。 b) ...

    cas-configserver-overlay:通用CAS Spring Cloud Configuration Server WAR叠加

    版本号&lt; cas&gt;6.0.x要求JDK 11建造要查看构建脚本可用的命令,请运行: ./build.sh help 要打包最终的Web应用程序,请运行: ./build.sh package 要更新SNAPSHOT版本,请运行: ./build.sh package -U配置创建一个src...

    spring-cloud-config-server-example:如何设置基本的Spring Cloud Config Server的示例

    Config Server客户端设置在本地运行将存储库克隆或下载到本地计算机。 在命令行中执行./gradlew clean build 。 在命令行中键入gradle bootRun ,您应该看到服务器在端口8080上启动。瞧,您的配置服务器正在本地运行...

    Tomcat的配置详解中文版

    Tomcat的配置 增加一个虚拟目录 配置JSP及Servlet 配置服务器的端口 web.xml文件的设置 web.xml文件中安全性的设置 tomcat-users.xml 设置 配置日志

    cloud-config-server:用于啤酒厂微服务的Cloud Config Server

    云配置服务器 用于啤酒厂微服务的Cloud Config Server

    configServer:Spring Cloud 配置服务器教程

    #Spring Cloud Config Server & Bus Tutorial ##Microservices 在本教程中: ###ConfigLocal 这个 µService 从本地文件系统加载它的配置。 这是最简单的情况,但违反了 12 因素,因为道具在本地捆绑在: /src/main...

    mongodb分片

    启动mongos节点:mongos –f config/xx.conf –configserver configserver的id:port i) 添加分片过程 i. 步骤一.链接到mongos ii. Add shards 1. 单个数据库实例: -{addShard:”&lt;host&gt;&lt;:port&gt;”,maxSize:,name:””}...

    node-config-server:提供动态RESTful API的集中式配置服务器,允许检索整个文件的内容或其解析的属性

    npm npm install --global @kobionic/node-config-server码头工人使用普通的Docker命令docker run -d --name node-config-server kobionic/node-config-server 该图像基本上公开了端口20490,因此容器链接将使其可...

    mongodb(分片+副本)集群部署文档.docx

    mongos,数据库集群请求的入口,所有的请求都通过mongos进行协调,不需要在应用程序添加一个路由选择器,mongos自己就是一个请求分发中心...mongos第一次启动或者关掉重启就会从 config server 加载配置信息,以后....

    vue - vue.config.js中devServer配置方式

    今天小编就为大家分享一篇vue - vue.config.js中devServer配置方式,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧

    config_FR2servercode_

    这是一款2013年的流行于网上的PHP网完整源码,有喜欢的可以下载来看看

    server-config:一个文件库来设置我的本地服务器

    服务器配置一个文件库,用于设置运行本地服务器。 要实现类似的设置,您必须安装 ,并建议使用Docker Compose。 这里包含了脚本,但是它可能不会一直运行-每个单独运行的命令都可以解决问题。 希望将来会有所改善。 ...

    spring-cloud-config-server-issue

    春天的云配置服务器问题在一个航站楼 cd configserver./mvnw spring-boot:run在另一个航站楼 cd democlient./mvnw spring-boot:run客户端将打印: 2018-05-15 13:28:45.460 INFO 23725 --- [ main] c.e.democlient....

    SpringColud1简易分布式

    ● cloud-config-server:配置服务器-(通过git获取配置) ● cloud-eureka-server:eureka注册服务器 ● cloud-simple-service:一个使用mybatis的数据库应用,服务端 ● cloud-simple-ui:webui客户端

    spring-config.rar

    Spring Cloud Config项目是一个解决分布式系统的配置管理方案。它包含了Client和Server两个部分,server提供配置文件的存储、以接口的形式将配置文件的内容提供出去,client通过接口获取数据、并依据此数据初始化...

    CloudSecurity:使用 Spring Cloud Config Server 和 Vault 的云安全项目

    您必须提供一个名为jasypt.encryptor.password的环境变量,其值为sample-password以在应用程序启动期间解密数据库密码。 启动后, http://localhost:8080显示基本的应用程序信息。 春云配置 所有客户端应用程序都...

    Mongodb配置Sharding详细过程

    Shard节点负责存储实际数据,Config Server负责存储集群的元数据,Mongos Server作为路由器,负责将客户端的请求路由到正确的Shard节点上。在本例中,我们将配置三个节点:192.168.100.247和192.168.100.213作为...

    SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心

    SOFARegistry 最早源自于淘宝的 ConfigServer,十年来,随着蚂蚁金服的业务发展,注册中心架构已经演进至第六代。目前 SOFARegistry 不仅全面服务于蚂蚁金服的自有业务,还随着蚂蚁金融科技服务众多合作伙伴,同时也...

    CHKen FTP Server 2.0

    CHKen FTP Server 2.0 并不是一个用户连接上就建立一个线程的,而是多个连接共用一个线程,这个可以自行设定,可以控制总共的线程数量,这样以来,当然连接数再多也不会占用太多的CPU!  CHKen FTP Server 2.0 和 ...

    doraemon-config-server:从 code.google.compdoraemon-config-server 自动导出

    doraemon-config-serverAutomatically exported from code.google.com/p/doraemon-config-server一套监控、服务降级、统一配置集成系统.背景:一方面,在SOA服务化过程中,我们将业务封装成业务接口的方式,以...

Global site tag (gtag.js) - Google Analytics