`
iyuan
  • 浏览: 463308 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

zeroMQ初体验-28.可靠性-主从模式

    博客分类:
  • MQ
阅读更多
虽然"硬盘模式"看起来已经非常靠谱了,不过,还记得前段时间"亚马逊云拓机"么,异地灾备似乎才能真正当的上'高可用',暂且抛开物理、成本上的问题,zeromq也为此提供了靠谱的支持~

官方声明:
1.这是一个直接的高可用的方案
2.足够简单,让人易于理解和使用
3.在需要时可以提供可靠的转移

比较经典也比较靠谱的模式简图:


迁移:


作为一个高可用的框架,至少得做到如下几点:
  • 为灾难性事故做准备,例如主机房失火了、地震了等。
  • 切换应该在尽可能短的时间内完成(60秒内,最好是10s内)。
  • 由主切换到从可以是自动的,不过恢复时最好手动完成。
  • 客户端应该明了这种切换机制,最好在api中给出自动方案。
  • 有明确的标识来明确主从。
  • 主从间不能有启动顺序的依赖。
  • 服务端的切换不会导致客户端崩溃(或许会重新连接)。
  • 主从状态必须可监控。
  • 主从间的连接必须靠谱,高速。


基于如此结构的一些假设:
  • 一主一从结构已经足够保险,不再需要多层次的备份。
  • 主和从都能单独负载整个需求,而不是平衡性的承载整个符合。
  • 要有足够的预算供起这么一个日常空转的从。


这些并没有被涉及到:
  • 从并非用来负载均衡。
  • 信息不持久化。
  • 主从间不会自动探测对方。
  • 主从间不会同步服务器的状态、信息。


如果要配置一对主从,需要在各自的配置中设定好对方及同步时间。

匹配的,作为客户端需要做的:
1.知道主从的地址
2.先连主,失败则试试从
3.检测失效的连接(心跳)
4.重连时遵守2
5.重新在请求的服务器上创建状态
6.当主从切换时,可以重新传递请求

当然,这些都会由客户端的api完成。这里注意:作者一再强调,主从同时只能有一个在提供服务!

服务器端:
//
//  Binary Star server
//
#include "czmq.h"

//  We send state information every this often
//  If peer doesn't respond in two heartbeats, it is 'dead'
#define HEARTBEAT 1000          //  In msecs

//  States we can be in at any point in time
typedef enum {
    STATE_PRIMARY = 1,          //  Primary, waiting for peer to connect
    STATE_BACKUP = 2,           //  Backup, waiting for peer to connect
    STATE_ACTIVE = 3,           //  Active - accepting connections
    STATE_PASSIVE = 4           //  Passive - not accepting connections
} state_t;

//  Events, which start with the states our peer can be in
typedef enum {
    PEER_PRIMARY = 1,           //  HA peer is pending primary
    PEER_BACKUP = 2,            //  HA peer is pending backup
    PEER_ACTIVE = 3,            //  HA peer is active
    PEER_PASSIVE = 4,           //  HA peer is passive
    CLIENT_REQUEST = 5          //  Client makes request
} event_t;

//  Our finite state machine
typedef struct {
    state_t state;              //  Current state
    event_t event;              //  Current event
    int64_t peer_expiry;        //  When peer is considered 'dead'
} bstar_t;

//  Execute finite state machine (apply event to state)
//  Returns TRUE if there was an exception

static Bool
s_state_machine (bstar_t *fsm)
{
    Bool exception = FALSE;
    //  Primary server is waiting for peer to connect
    //  Accepts CLIENT_REQUEST events in this state
    if (fsm->state == STATE_PRIMARY) {
        if (fsm->event == PEER_BACKUP) {
            printf ("I: connected to backup (slave), ready as master\n");
            fsm->state = STATE_ACTIVE;
        }
        else
        if (fsm->event == PEER_ACTIVE) {
            printf ("I: connected to backup (master), ready as slave\n");
            fsm->state = STATE_PASSIVE;
        }
    }
    else
    //  Backup server is waiting for peer to connect
    //  Rejects CLIENT_REQUEST events in this state
    if (fsm->state == STATE_BACKUP) {
        if (fsm->event == PEER_ACTIVE) {
            printf ("I: connected to primary (master), ready as slave\n");
            fsm->state = STATE_PASSIVE;
        }
        else
        if (fsm->event == CLIENT_REQUEST)
            exception = TRUE;
    }
    else
    //  Server is active
    //  Accepts CLIENT_REQUEST events in this state
    if (fsm->state == STATE_ACTIVE) {
        if (fsm->event == PEER_ACTIVE) {
            //  Two masters would mean split-brain
            printf ("E: fatal error - dual masters, aborting\n");
            exception = TRUE;
        }
    }
    else
    //  Server is passive
    //  CLIENT_REQUEST events can trigger failover if peer looks dead
    if (fsm->state == STATE_PASSIVE) {
        if (fsm->event == PEER_PRIMARY) {
            //  Peer is restarting - become active, peer will go passive
            printf ("I: primary (slave) is restarting, ready as master\n");
            fsm->state = STATE_ACTIVE;
        }
        else
        if (fsm->event == PEER_BACKUP) {
            //  Peer is restarting - become active, peer will go passive
            printf ("I: backup (slave) is restarting, ready as master\n");
            fsm->state = STATE_ACTIVE;
        }
        else
        if (fsm->event == PEER_PASSIVE) {
            //  Two passives would mean cluster would be non-responsive
            printf ("E: fatal error - dual slaves, aborting\n");
            exception = TRUE;
        }
        else
        if (fsm->event == CLIENT_REQUEST) {
            //  Peer becomes master if timeout has passed
            //  It's the client request that triggers the failover
            assert (fsm->peer_expiry > 0);
            if (zclock_time () >= fsm->peer_expiry) {
                //  If peer is dead, switch to the active state
                printf ("I: failover successful, ready as master\n");
                fsm->state = STATE_ACTIVE;
            }
            else
                //  If peer is alive, reject connections
                exception = TRUE;
        }
    }
    return exception;
}

int main (int argc, char *argv [])
{
    //  Arguments can be either of:
    //      -p  primary server, at tcp://localhost:5001
    //      -b  backup server, at tcp://localhost:5002
    zctx_t *ctx = zctx_new ();
    void *statepub = zsocket_new (ctx, ZMQ_PUB);
    void *statesub = zsocket_new (ctx, ZMQ_SUB);
    void *frontend = zsocket_new (ctx, ZMQ_ROUTER);
    bstar_t fsm = { 0 };

    if (argc == 2 && streq (argv [1], "-p")) {
        printf ("I: Primary master, waiting for backup (slave)\n");
        zsocket_bind (frontend, "tcp://*:5001");
        zsocket_bind (statepub, "tcp://*:5003");
        zsocket_connect (statesub, "tcp://localhost:5004");
        fsm.state = STATE_PRIMARY;
    }
    else
    if (argc == 2 && streq (argv [1], "-b")) {
        printf ("I: Backup slave, waiting for primary (master)\n");
        zsocket_bind (frontend, "tcp://*:5002");
        zsocket_bind (statepub, "tcp://*:5004");
        zsocket_connect (statesub, "tcp://localhost:5003");
        fsm.state = STATE_BACKUP;
    }
    else {
        printf ("Usage: bstarsrv { -p | -b }\n");
        zctx_destroy (&ctx);
        exit (0);
    }
    //  Set timer for next outgoing state message
    int64_t send_state_at = zclock_time () + HEARTBEAT;

    while (!zctx_interrupted) {
        zmq_pollitem_t items [] = {
            { frontend, 0, ZMQ_POLLIN, 0 },
            { statesub, 0, ZMQ_POLLIN, 0 }
        };
        int time_left = (int) ((send_state_at - zclock_time ()));
        if (time_left < 0)
            time_left = 0;
        int rc = zmq_poll (items, 2, time_left * ZMQ_POLL_MSEC);
        if (rc == -1)
            break;              //  Context has been shut down

        if (items [0].revents & ZMQ_POLLIN) {
            //  Have a client request
            zmsg_t *msg = zmsg_recv (frontend);
            fsm.event = CLIENT_REQUEST;
            if (s_state_machine (&fsm) == FALSE)
                //  Answer client by echoing request back
                zmsg_send (&msg, frontend);
            else
                zmsg_destroy (&msg);
        }
        if (items [1].revents & ZMQ_POLLIN) {
            //  Have state from our peer, execute as event
            char *message = zstr_recv (statesub);
            fsm.event = atoi (message);
            free (message);
            if (s_state_machine (&fsm))
                break;          //  Error, so exit
            fsm.peer_expiry = zclock_time () + 2 * HEARTBEAT;
        }
        //  If we timed-out, send state to peer
        if (zclock_time () >= send_state_at) {
            char message [2];
            sprintf (message, "%d", fsm.state);
            zstr_send (statepub, message);
            send_state_at = zclock_time () + HEARTBEAT;
        }
    }
    if (zctx_interrupted)
        printf ("W: interrupted\n");

    //  Shutdown sockets and context
    zctx_destroy (&ctx);
    return 0;
}


客户端:
//
//  Binary Star client
//
#include "czmq.h"

#define REQUEST_TIMEOUT     1000    //  msecs
#define SETTLE_DELAY        2000    //  Before failing over

int main (void)
{
    zctx_t *ctx = zctx_new ();

    char *server [] = { "tcp://localhost:5001", "tcp://localhost:5002" };
    uint server_nbr = 0;

    printf ("I: connecting to server at %s…\n", server [server_nbr]);
    void *client = zsocket_new (ctx, ZMQ_REQ);
    zsocket_connect (client, server [server_nbr]);

    int sequence = 0;
    while (!zctx_interrupted) {
        //  We send a request, then we work to get a reply
        char request [10];
        sprintf (request, "%d", ++sequence);
        zstr_send (client, request);

        int expect_reply = 1;
        while (expect_reply) {
            //  Poll socket for a reply, with timeout
            zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 } };
            int rc = zmq_poll (items, 1, REQUEST_TIMEOUT * ZMQ_POLL_MSEC);
            if (rc == -1)
                break;          //  Interrupted

            //  If we got a reply, process it
            if (items [0].revents & ZMQ_POLLIN) {
                //  We got a reply from the server, must match sequence
                char *reply = zstr_recv (client);
                if (atoi (reply) == sequence) {
                    printf ("I: server replied OK (%s)\n", reply);
                    expect_reply = 0;
                    sleep (1);  //  One request per second
                }
                else {
                    printf ("E: malformed reply from server: %s\n",
                        reply);
                }
                free (reply);
            }
            else {
                printf ("W: no response from server, failing over\n");
                //  Old socket is confused; close it and open a new one
                zsocket_destroy (ctx, client);
                server_nbr = (server_nbr + 1) % 2;
                zclock_sleep (SETTLE_DELAY);
                printf ("I: connecting to server at %s…\n",
                        server [server_nbr]);
                client = zsocket_new (ctx, ZMQ_REQ);
                zsocket_connect (client, server [server_nbr]);

                //  Send request again, on new socket
                zstr_send (client, request);
            }
        }
    }
    zctx_destroy (&ctx);
    return 0;
}


主从间的状态图:


(未完待续)
0
0
分享到:
评论

相关推荐

    zeromq-2.1.7.tar.gz

    zeromq-2.1.7.tar.gz 的早期的一个版本,本人已安装成功 放心使用

    zeromq-4.0.3.tar.gz.zip

    zeromq-4.0.3.tar.gz zeromq-4.0.3.tar.gz zeromq-4.0.3.tar.gz

    zeromq-4.3.2.tar.gz

    zeromq-4.3.2.tar.gz,可在linux下编译安装,能够使用zeromq进行sorket开发,多线程,提升性能,效率,可以配合msgpack进行使用,是个好的扩展插件

    zeromq-4.1.3.tar.gz

    zeromq-4.1.3.tar.gz,最新的zeromq的开发工具包,希望对开发者有用

    zeromq-4.0.5-4.el7.x86_64.rpm

    官方离线安装包,测试可用。使用rpm -ivh [rpm完整包名] 进行安装

    zeromq-4.1.8.tar.gz

    zeromq-4.1.8.tar.gz 有问题请联系

    zeromq-2.1.9.tar.gz

    zeromq-2.1.9.tar.gz 这是zeromq linux 官方原版 请放心下载

    zeromq-4.2.3.tar.gz

    zeromq-4.2.3.tar.gz 一个稳定的版本,可以安装使用!

    zeromq-4.3.4.zip

    0MQ version 4.3.4 stable, released on 2021/01/17

    zeromq-3.2.5.tar.gz、jzmq.tar.gz、Python-2.6.6.tar.bz2、storm-0.8.0.zip下载

    storm搭建所需资源

    zeromq-4.2.0.tar.gz源码包

    在官网下载zeromq太慢了,网速极不稳定,特意下载放在这里供大家下载,当然象征性地赚点 资源分

    zeromq-4.1.2.tar.gz

    ZMQ(以下ZeroMQ简称ZMQ)是一个简单好用的传输层,像框架一样的一个socket library,他使得Socket编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。

    zeromq-3.2.5.tar.gz

    ZeroMQ是一个网络通讯库,其主要用来为分布式应用程序开发提供进程间通信(此处的进程既可以是同一台机器上的两个进程也可以是不同机器上的两个进程)。ZeroMQ的特点在于灵活的通信手段和丰富的连接模型,并且它可以...

    zeromq-4.1.4.tar.gz

    ZeroMQ是一个网络通讯库,其主要用来为分布式应用程序开发提供进程间通信(此处的进程既可以是同一台机器上的两个进程也可以是不同机器上的两个进程)。ZeroMQ的特点在于灵活的通信手段和丰富的连接模型,并且它可以...

    zeromq-4.3.4.tar.gz

    0MQ version 4.3.4 stable, released on 2021/01/17

    zeromq-4.2.1.tar.gz (包括安装包和zeromq\jzmq教程)

    ZeroMQ(也说明 ØMQ,0MQ 或 ZMQ)是一个高性能的异步消息库,旨在使用分布式或并行应用程序。它提供了一个消息队列,但 不同于面向消息的中间件,一个 ZeroMQ 系统可以在没有专用运行消息代理。jzmq安装包,本人也...

    zeromq-4.2.3.zip

    zeroMQ 4.2.3版本 zeromq-4.2.3.tar.gz 欢迎关注我的CSDN博客:https://mp.csdn.net/console/home 免积分下载

    zeromq-4.2.5.tar.gz

    zeromq-4.2.5.tar.gz

    Win64-ZeroMQ-JZMQ-CZMQ.zip

    VS2015 在Widows 10 上编译的 ZeroMQ 4.3.2,JZMQ 3.1 CZMQ 4.2,可以在 JDK 1.8 下运行。DLL 都是 64位,包含了编译及运行相关信息。分享一下,也给自己留个备份

Global site tag (gtag.js) - Google Analytics