`
regular
  • 浏览: 76119 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

关键应用服务的集群技术模拟

    博客分类:
  • Java
阅读更多
集群技术,也就是俗称的Cluster,是为了保证某种应用不间断的服务而采取的一种技术手段。

主旨

服务运行在A,B两台机器上,其中一台为主用机,一台为备用机。备用机不断检测主用机的心跳信息。当发现主用机宕机或不能提供服务的时候,会自动转变为主用机,继续提供服务。

实现细节包括主备用机之间的心跳协议,物理线缆连接方式,以及虚拟出来的服务接口。

其他都还好说,不过虚拟服务接口的确不是很容易实现的。本文对虚拟服务接口技术做了模拟。

虚拟接口,实际是指虚拟的IP地址以及在这个IP地址接口侦听的应用服务。

由于客户机并不会自动改变请求服务的IP地址和端口,因此集群必须提供一个唯一的对外服务接口,并实现主备之间的无缝转换。

方案

虚拟接口我以为有两种办法:
第一种,在计算机上虚拟一块网卡出来,然后把服务绑定到这块网卡上;

第二种,把指向虚拟IP的IP协议包以及ARP协议包转到主机实际存在的网卡上。

我这里是使用了JPCAP工具,实现了第二种方案。

具体要做两件事:
  • 实现ARP协议的ARP_REPLY;
  • 转发IP包。

实现的时候发现,如果使用JPCAP直接转发ICMP包的话会出现问题。

ECHO_REPLY不能被识别,造成Ping不通的现象。

之后研究了ICMP的头结构,发现是在REPLY的时候不能自动填充id和seq,修改之后运转正常。

/**
 * Network Utilities
 */
package cn.sh.huang;

import jpcap.NetworkInterface;
import jpcap.NetworkInterfaceAddress;

public final class NetworkUtilities
{
    private NetworkUtilities()
    {

    }

    public static byte[] getMacBytes(String mac)
    {
        byte[] bytes = new byte[6];
        String[] hex = mac.split("-");
        if (hex.length != 6)
        {
            throw new IllegalArgumentException("Invalid MAC address.");
        }
        try
        {
            for (int i = 0; i < 6; i++)
            {
                bytes[i] = (byte) Integer.parseInt(hex[i], 16);
            }
        }
        catch (NumberFormatException ex)
        {
            throw new IllegalArgumentException("Invalid hex digit in MAC address.");
        }
        return bytes;
    }

    public static String getMacString(byte[] mac)
    {
        StringBuffer sb = new StringBuffer();
        for (byte hex : mac) {
            sb.append('-').append(((int)hex) & 0xFF);
        }
        return sb.substring(1);
    }

    public static NetworkInterface fetchDevice(NetworkInterface[] devices, String ip)
    {
        for (NetworkInterface device : devices)
        {
            for (NetworkInterfaceAddress address : device.addresses)
            {
                if (ip.equals(address.address.getHostAddress()))
                {
                    return device;
                }
            }
        }
        return null;
    }

}

/**
 * ClusterDaemon
 */
package cn.sh.huang;

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

import jpcap.JpcapCaptor;
import jpcap.JpcapSender;
import jpcap.NetworkInterface;
import jpcap.packet.ARPPacket;
import jpcap.packet.EthernetPacket;
import jpcap.packet.ICMPPacket;
import jpcap.packet.IPPacket;
import jpcap.packet.Packet;
import jpcap.packet.TCPPacket;

public class ClusterDaemon
{
    private static final String      PRV_IP        = "192.168.56.1";
    private static final String      PUB_IP        = "10.61.97.207";
    private static final String      NAT_IP        = "10.61.96.7";
    private static final String      MAP_IP_PREFIX = "192.168.111.";
    private static final InetAddress PRV_IP_ADDR;
    private static final InetAddress NAT_IP_ADDR;
    static
    {
        PRV_IP_ADDR = getIpAddr(PRV_IP);
        NAT_IP_ADDR = getIpAddr(NAT_IP);
    }

    private static InetAddress getIpAddr(String ip)
    {
        InetAddress ipAddr = null;
        try
        {
            ipAddr = InetAddress.getByName(ip);
        }
        catch (UnknownHostException ex)
        {
            ex.printStackTrace();
        }
        return ipAddr;
    }

    private final NetworkInterface    pubInterf, prvInterf;
    private final JpcapSender         pubSender, prvSender;
    private final JpcapCaptor         pubCaptor, prvCaptor;

    private final Map<String, byte[]> macMap;

    private ClusterDaemon() throws IOException
    {
        NetworkInterface[] devices = JpcapCaptor.getDeviceList();
        pubInterf = NetworkUtilities.fetchDevice(devices, PUB_IP);
        pubCaptor = JpcapCaptor.openDevice(pubInterf, 65535, false, 20);
        pubSender = pubCaptor.getJpcapSenderInstance();

        prvInterf = NetworkUtilities.fetchDevice(devices, PRV_IP);
        prvCaptor = JpcapCaptor.openDevice(prvInterf, 65535, false, 20);
        prvSender = prvCaptor.getJpcapSenderInstance();
        macMap = new HashMap<String, byte[]>();

        Thread pubIfListener = new Thread(new Runnable() {

            @Override
            public void run()
            {
                Packet pack;
                while (true)
                {
                    pack = pubCaptor.getPacket();
                    if (pack instanceof ARPPacket)
                    {
                        ARPPacket arpPack = (ARPPacket) pack;
                        // 若arp的目的为请求newIpAddr的MAC地址,则回应
                        if (Arrays.equals(arpPack.target_protoaddr, NAT_IP_ADDR.getAddress())
                                && arpPack.operation == ARPPacket.ARP_REQUEST)
                        {
                            // if (!Arrays.equals(arpPack.sender_protoaddr, pubIpAddr.getAddress())) // 避免自问自答
                            // {
                            pubSender.sendPacket(createArpPack(pubInterf.mac_address, NAT_IP_ADDR.getAddress(),
                                    arpPack.sender_hardaddr, arpPack.sender_protoaddr, pubInterf.mac_address,
                                    arpPack.sender_hardaddr, ARPPacket.ARP_REPLY));
                            // }
                        }
                    }
                    else if (pack instanceof IPPacket)
                    {
                        IPPacket ipPack = (IPPacket) pack;
                        if (!(ipPack instanceof TCPPacket || ipPack instanceof ICMPPacket))
                        {
                            continue;
                        }
                        // else
                        // {
                        // System.out.println("PubIf " + ipPack);
                        // }
                        InetAddress dstIp = ipPack.dst_ip;
                        InetAddress srcIp = ipPack.src_ip;

                        if (dstIp.equals(NAT_IP_ADDR))
                        {
                            // 转发给prvIf
                            EthernetPacket eth = (EthernetPacket) ipPack.datalink;
                            String srcIpAddr = srcIp.getHostAddress();
                            macMap.put(srcIpAddr, eth.src_mac);
                            macMap.put(NetworkUtilities.getMacString(eth.src_mac), ipPack.src_ip.getAddress());
                            String mapIp = MAP_IP_PREFIX + srcIpAddr.substring(srcIpAddr.lastIndexOf('.') + 1);
                            macMap.put(mapIp, eth.src_mac);
                            try
                            {
                                ipPack.src_ip = InetAddress.getByName(mapIp);
                            }
                            catch (UnknownHostException ex)
                            {
                                ex.printStackTrace();
                            }
                            eth.dst_mac = prvInterf.mac_address;
                            ipPack.dst_ip = PRV_IP_ADDR;
                            if (ipPack instanceof ICMPPacket) // ICMP包要注意id和seq不要漏掉(大概是JPCAP的bug)
                            {
                                ICMPPacket icmp = (ICMPPacket) ipPack;
                                if (icmp.id == 0 && icmp.seq == 0)
                                {
                                    icmp.id = (short) (icmp.header[38] * 256 + icmp.header[39]);
                                    icmp.seq = (short) (icmp.header[40] * 256 + icmp.header[41]);
                                }
                            }
                            prvSender.sendPacket(ipPack);
                        }
                    }
                }
            }
        });
        pubIfListener.start();
        Thread prvIfListener = new Thread(new Runnable() {

            @Override
            public void run()
            {
                Packet pack;
                while (true)
                {
                    pack = prvCaptor.getPacket();
                    if (pack instanceof ARPPacket)
                    {
                        ARPPacket arpPack = (ARPPacket) pack;
                        // 若arp的目的为请求mapIp的MAC地址,则回应
                        String fakeAddr = null;
                        try
                        {
                            fakeAddr = InetAddress.getByAddress(arpPack.target_protoaddr).getHostAddress();
                        }
                        catch (UnknownHostException ex)
                        {
                            ex.printStackTrace();
                        }
                        if (fakeAddr.startsWith(MAP_IP_PREFIX) && arpPack.operation == ARPPacket.ARP_REQUEST)
                        {
                            byte[] fakeMAC = macMap.get(fakeAddr);
                            prvSender.sendPacket(createArpPack(fakeMAC, arpPack.target_protoaddr,
                                    arpPack.sender_hardaddr, arpPack.sender_protoaddr, fakeMAC,
                                    arpPack.sender_hardaddr, ARPPacket.ARP_REPLY));
                        }
                    }
                    else if (pack instanceof IPPacket)
                    {
                        IPPacket ipPack = (IPPacket) pack;
                        if (!(ipPack instanceof TCPPacket || ipPack instanceof ICMPPacket))
                        {
                            continue;
                        }
                        // else
                        // {
                        // System.out.println("PrvIf " + ipPack);
                        // }

                        if (ipPack.dst_ip.getHostAddress().startsWith(MAP_IP_PREFIX)) // mapIp
                        {
                            // 转发给pubIf
                            EthernetPacket eth = (EthernetPacket) ipPack.datalink;
                            eth.src_mac = pubInterf.mac_address;
                            ipPack.src_ip = NAT_IP_ADDR;
                            try
                            {
                                ipPack.dst_ip = InetAddress.getByAddress(macMap.get(NetworkUtilities
                                        .getMacString(eth.dst_mac)));
                            }
                            catch (UnknownHostException ex)
                            {
                                ex.printStackTrace();
                            }
                            if (ipPack instanceof ICMPPacket)
                            {
                                ICMPPacket icmp = (ICMPPacket) ipPack;
                                if (icmp.id == 0 && icmp.seq == 0)
                                {
                                    icmp.id = (short) (icmp.header[38] * 256 + icmp.header[39]);
                                    icmp.seq = (short) (icmp.header[40] * 256 + icmp.header[41]);
                                }
                            }
                            pubSender.sendPacket(ipPack);
                        }
                    }
                }
            }
        });
        prvIfListener.start();
    }

    public static void main(String[] args) throws IOException
    {

        /**
         * <pre>
         * Thread thread = new Thread(new Runnable() {
         * 
         *     &#064;Override
         *     public void run()
         *     {
         *         while (true)
         *         {
         *             if (state == ArpState.REQUEST)
         *             {
         *                 ARPPacket arp = createArpPack(pubIf.mac_address, pubIpAddr, new byte[] {
         *                         0, 0, 0, 0, 0, 0
         *                 }, newIpAddr, pubIf.mac_address, new byte[] {
         *                         -1, -1, -1, -1, -1, -1
         *                 }, ARPPacket.ARP_REQUEST);
         *                 pubSender.sendPacket(arp);
         *                 try
         *                 {
         *                     Thread.sleep(30000);
         *                 }
         *                 catch (InterruptedException ex)
         *                 {
         *                     ex.printStackTrace();
         *                 }
         *             }
         *         }
         *     }
         * });
         * thread.setDaemon(true);
         * thread.start();
         * </pre>
         */
        new ClusterDaemon();
    }

    private static ARPPacket createArpPack(byte[] sender_hardaddr, byte[] sender_protoaddr, byte[] target_hardaddr,
            byte[] target_protoaddr, byte[] eth_src_mac, byte[] eth_dst_mac, short operation)
    {
        ARPPacket arp = new ARPPacket();
        arp.hardtype = ARPPacket.HARDTYPE_ETHER;
        arp.prototype = ARPPacket.PROTOTYPE_IP;
        arp.operation = operation;
        arp.hlen = 6;
        arp.plen = 4;
        arp.sender_hardaddr = sender_hardaddr; // 发送方的MAC地址
        arp.sender_protoaddr = sender_protoaddr; // 发送方协议地址
        arp.target_hardaddr = target_hardaddr; // 目标MAC地址
        arp.target_protoaddr = target_protoaddr; // 目标协议地址
        EthernetPacket eth = new EthernetPacket(); // 创建以太网头
        eth.frametype = EthernetPacket.ETHERTYPE_ARP; // 以太包类型
        eth.src_mac = eth_src_mac; // 以太源 MAC地址
        eth.dst_mac = eth_dst_mac; // 以太汇 MAC地址
        arp.datalink = eth; // 将以太头放在ARP包前
        return arp;
    }
    /**
     * <pre>
     * private static void receivePack() throws UnknownHostException
     * {
     *     Packet pack = null;
     *     int ident = 10000;
     *     while (true)
     *     {
     *         pack = pubCaptor.getPacket();
     *         if (pack instanceof IPPacket)
     *         {
     *             IPPacket ipPack = (IPPacket) pack;
     *             byte[] srcIp = ipPack.src_ip.getAddress();
     *             byte[] dstIp = ipPack.dst_ip.getAddress();
     *             if (Arrays.equals(dstIp, newIpAddr))
     *             {
     * 
     *             }
     *             else if (!Arrays.equals(dstIp, pubIpAddr))
     *             {
     * 
     *             }
     *             if (Arrays.equals(dstIp, newIpAddr) || Arrays.equals(srcIp, newIpAddr))
     *             {
     *                 if (ipPack instanceof ICMPPacket)
     *                 {
     *                     // System.out.println(ipPack);
     *                     if (Arrays.equals(dstIp, newIpAddr))
     *                     {
     *                         ICMPPacket icmpPack = (ICMPPacket) ipPack;
     *                         ICMPPacket icmp = new ICMPPacket();
     *                         icmp.type = ICMPPacket.ICMP_ECHOREPLY;
     *                         icmp.setIPv4Parameter(0, false, false, false, 0, false, false, false, 0, ident++, 128,
     *                                 IPPacket.IPPROTO_ICMP, InetAddress.getByName(newIp), icmpPack.src_ip);
     *                         long millisec = Calendar.getInstance().getTimeInMillis();
     *                         icmp.sec = millisec / 1000000;
     *                         icmp.usec = millisec % 1000000;
     *                         icmp.data = icmpPack.data;
     *                         EthernetPacket echoEther = (EthernetPacket) icmpPack.datalink;
     *                         EthernetPacket ether = new EthernetPacket();
     *                         ether.frametype = EthernetPacket.ETHERTYPE_IP;
     *                         ether.dst_mac = echoEther.src_mac;
     *                         ether.src_mac = pubIf.mac_address;
     *                         icmp.datalink = ether;
     *                         icmp.id = icmpPack.id;
     *                         icmp.seq = icmpPack.seq;
     *                         if (icmp.id == 0 &amp;&amp; icmp.seq == 0)
     *                         {
     *                             icmp.id = (short) (icmpPack.header[38] * 256 + icmpPack.header[39]);
     *                             icmp.seq = (short) (icmpPack.header[40] * 256 + icmpPack.header[41]);
     *                         }
     *                         // System.out.println(icmpPack);
     *                         pubSender.sendPacket(icmp);
     *                     }
     *                 }
     *                 // System.out.println(&quot;IPPacket:&quot;);
     *                 // System.out.println(ipPack);
     *             }
     *         }
     *         else if (pack instanceof ARPPacket)
     *         {
     *             ARPPacket arpPack = (ARPPacket) pack;
     *             // if (Arrays.equals(arpPack.sender_protoaddr, newIpAddr)
     *             // || Arrays.equals(arpPack.target_protoaddr, newIpAddr))
     *             // {
     *             // System.out.println(arpPack);
     *             // }
     *             if (Arrays.equals(arpPack.target_protoaddr, newIpAddr))
     *             {
     *                 if (!Arrays.equals(arpPack.sender_protoaddr, pubIpAddr))
     *                 {
     *                     ARPPacket arp = createArpPack(pubIf.mac_address, newIpAddr, arpPack.sender_hardaddr,
     *                             arpPack.sender_protoaddr, pubIf.mac_address, arpPack.sender_hardaddr, ARPPacket.ARP_REPLY);
     *                     pubSender.sendPacket(arp);
     *                     state = ArpState.REPLY;
     *                 }
     *             }
     *         }
     *         else
     *         {
     *             if (pack == null)
     *                 continue;
     *             // System.out.println(&quot;Unknown:&quot;);
     *             // System.out.println(pack);
     *         }
     *     }
     * }
     * </pre>
     */
}
分享到:
评论

相关推荐

    CCF-顾明剑-高性能计算的遥感应用服务关键技术探讨-2019-10-17.pdf

    目标就是通过遥感数据、应用服务与云计算发展相结合,实现遥感大数据、云计算(包括超算)、科学算法和行业应用的协调发展,为教育、科研、气象数值模拟计算和面向更广阔的应用开展系统级的变革与服务。 报告针对几...

    服务器基础知识介绍(服务器全部组件).pdf

    二、服务器关键部件介绍 2.1 CPU类型和应用 2.2 内存类型和应用 2.3 硬盘类型和应用 2.4 RAID技术 2.5 PCIE接口及应用 2.6 BIOS的作用和发展 2.7 BMC和机框管理的作用和发展 2.8 NVMe CPU 中央处理器(Central ...

    智能设计关键技术.doc

    第六章 智能设计关键技术 6.1 智能制造系统的设计 智能制造系统是基于人工智能研究,为应对不断增长的设计信息和工艺信息,以及随 之带来的生产线和生产设备内部信息的增加而出现的先进制造系统。通过借助计算机模 ...

    智能设计关键技术(1).doc

    第六章 智能设计关键技术 6.1 智能制造系统的设计 智能制造系统是基于人工智能研究,为应对不断增长的设计信息和工艺信息,以及随之 带来的生产线和生产设备内部信息的增加而出现的先进制造系统。通过借助计算机模拟 ...

    Spring3.x企业应用开发实战(完整版) part1

    13.6.2 任务调度对应用程序集群的影响 13.6.3 任务调度云 13.6.4 Web应用程序中调度器的启动和关闭问题 13.7 小结 第14章 使用OXM进行对象XML映射 14.1 认识XML解析技术 14.1.1 什么是XML 14.1.2 XML的处理技术 14.2...

    Spring.3.x企业应用开发实战(完整版).part2

    13.6.2 任务调度对应用程序集群的影响 13.6.3 任务调度云 13.6.4 Web应用程序中调度器的启动和关闭问题 13.7 小结 第14章 使用OXM进行对象XML映射 14.1 认识XML解析技术 14.1.1 什么是XML 14.1.2 XML的处理技术 14.2...

    精品推荐-2024智慧城市智慧应急平台精品PPT方案合集(60+份).zip

    应急数字底盘构建中的关键技术 人工智能赋能智慧交通 应急管理综合应用平台 应急物资快速调配管理智能库房 应急模拟演练工作台 应急指挥信息系统 应急指挥中心典型案例 网络安全应急响应服务 互联网社区安全治理模式...

    (matlab程序)用于汽车应用的JPDA跟踪器的处理器在环验证.rar

    从跟踪器生成用于安全关键应用(如高速公路车道跟踪)的嵌入式代码时,通常不鼓励动态内存分配。这意味着分配给跟踪器的内存量必须在编译时知道。此外,生成的代码必须适合嵌入式设备提供的存储器。要在没有动态内存...

    Java测试新技术TestNG和高级概念.part2

    展示了高级技术:测试部分失败、工厂、依赖关系测试、远程调用、基于集群的测试服务器群等。 介绍了在Eclipse和IDE中安装TestNG插件。 包含了大量的代码示例。 无论您使用TestNG,JUnit或其他测试框架,本书提供的...

    Java测试新技术TestNG和高级概念.part1

    展示了高级技术:测试部分失败、工厂、依赖关系测试、远程调用、基于集群的测试服务器群等。 介绍了在Eclipse和IDE中安装TestNG插件。 包含了大量的代码示例。 无论您使用TestNG,JUnit或其他测试框架,本书提供的...

    building_storage_networks_chsSAN存储区域网络 .rar

    同时,存储价格的下降也加速了信息的数字化,而在过去,许多信息仅能以模拟形式或书写的形式存在,如出版物以及从各种视频和音频应用产生的输出。这些变化导致每年需要新增存储容量达到90%。既然所有这些信息都以...

    计算机理论第一章详细透彻 支持格式WORD

    第一代个人移动通信采用的是模拟技术,使用频段为800/900MHs,称之为模拟移动通信系统。随着数字传输,时分多址和码分多址(CDMA)等技术的采用,很快进入第二代,频段900到1800 MHs.目前我国广泛使用GSM,,CDMA,,...

    37篇经过消化的云计算论文

    本文讲了采用mapreduce技术来完成并行的生物分析的一个应用。本文的这个应用是在Hadoop下完成的,讲了两个算法,一个用于支持大规模数据集的流计算,一个用于计算小数据集的策略。 16、 Do Clouds Compute? A ...

    37篇经过消化云计算论文打包下载

    本文讲了采用mapreduce技术来完成并行的生物分析的一个应用。本文的这个应用是在Hadoop下完成的,讲了两个算法,一个用于支持大规模数据集的流计算,一个用于计算小数据集的策略。 16、 Do Clouds Compute? A ...

    分布式网络爬虫.zip

    使用scrapy,redis, mongodb,graphite实现的一个分布式网络爬虫,底层存储mongodb集群,分布式使用redis实现,爬虫状态显示使用graphite实现 爬虫(Web Crawler)是一种自动化程序,用于从互联网上收集信息。其主要功能...

    Quartz-Job-Scheduling-Framework-中文版-V0.9.1.zip

    集群中的 Quartz 应用是通过中心数据库来感知道其他节点的存在。 第十一章. Quartz 集群 (第三部分) 内容提要:如何配置使 Quartz 节点工作在集群环境中。 第十一章. Quartz 集群 (第四部分) 内容提要:运行 ...

    学习Kubernetes

    提供声明性的方式来定义集群的状态 关键集装箱的好处 消除应用程序冲突 环境一致性 加速开发和启动,以防env启动并快速运行 更快地发布软件 对于开发者 在本地模拟产品 从docker-compose移至kubernetes 创建e2e测试...

    maro:多代理资源优化(MARO)平台是针对现实资源优化问题的强化学习即服务(RaaS)的一个实例

    MARO的关键组件: 模拟工具包:它提供了一些预定义的方案,以及用于构建新方案的可重复使用的轮子。 RL工具包:它为RL提供了全栈抽象,例如代理管理器,代理,RL算法,学习者,参与者和各种成形者。 分布式工具箱:...

Global site tag (gtag.js) - Google Analytics