`
code727
  • 浏览: 65900 次
  • 性别: Icon_minigender_1
  • 来自: 成都
社区版块
存档分类
最新评论

JDK7新特性:MulticastChannel实现非阻塞式组播通信

阅读更多


         一般情况下,我们可以结合利用java.net.MulticastSocket和java.net.DatagramPacket对象来实现组播通信功能。但这在要求满足实时通信的情况下时,则显然有问题。主要体现在:如果没有数据报达到时,MulticastSocke对象调用receive()和send()方法进行收发数据报时,将一直处于阻塞状态,严重影响了后续操作。
        在此之前,解决上述问题的一个方案是利用多线程技术,将接收和发送操作放在不同的线程对象中进行,但这在高交互的场景下时会带来线程开销问题。
        不过利用java.nio.channels.MulticastChannel为我们解决了后顾之忧,这是在JDK1.7中提供的新特点,它可以使组播通信像单播UDP以及面向TCP连接的Socket通信那样,利用通道机制实现无阻塞式的交互环境。

        java.nio.channels.MulticastChannel 只是一个接口,它不具备任何实现细节。不过JDK1.7利用这个接口扩展了java.nio.channels.DatagramChannel 的职责范围,使这个抽象类保留了实现非阻塞式的单播UDP通信基础上,具备了非阻塞式组播UDP通信的能力。  
        下面的实例将模拟用户群聊天的场景。

 

/**
 * <p>非阻塞模式下的组播用户终端,模拟用户群聊天</p> 
 * @author  <a href="mailto:code727@gmail.com">Daniele</a>
 * @version 1.0.0, 2013-4-15
 * @see    
 * @since   AppDemo1.0.0
 */
public class MulticastUserTerminal {
   
    static CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder();
    static Charset charset = Charset.forName("UTF-8");
    static CharsetDecoder decoder = charset.newDecoder();
   
    public static void main(String[] args) {
       
        // 组播通道
        DatagramChannel channel = null;
        // 组播组
        InetAddress group = null;
       
        try {
           
            /*
             *  创建指定协议的组播通道,
             *  1.INET:IPV4
             *  2.INET6:IPV6
             */
            channel = DatagramChannel.open(StandardProtocolFamily.INET);
           
            /*
             *  setOption(StandardSocketOptions.SO_REUSEADDR, true)
             *  表示允许组播成员绑定到相同的端口上,它必须在绑定bind()前调用。
             *  由于bind()方法内的参数绑定的是当前组成员用于接收数据报的本地端口,
             *  因此如果此终端只用于发送,则将bind()方法的参数设置为null或直接去掉此方法。
             */
            channel.setOption(StandardSocketOptions.SO_REUSEADDR, true).bind(new InetSocketAddress(9527));
           
            // 允许接收自己发送出去的数据报
            channel.setOption(StandardSocketOptions.IP_MULTICAST_LOOP, true);
           
            // 关键点,设置组播通道为非阻塞模式
            channel.configureBlocking(false);
           
            /*
             *  如果组播通道是IPV4协议的,则这里创建的本地网络接口也应该具有此协议,否则为IPV6。
             *  如果通道协议与网络接口的协议不一致,则当通道加入组播组时就会抛出java.lang.IllegalArgumentException
             *  与MulticastSocket类似,在接收数据报之前要将创建的本地网络接口加入到组播组。
             *  如果只用于发送目的,则如下三行代码都不要
             */
            group = InetAddress.getByName("224.1.1.108");
            NetworkInterface networkInterface = NetworkInterface.getByName("net4");
            channel.join(group, networkInterface);
           
            ByteBuffer buffer = ByteBuffer.allocate(8192); 
            InetSocketAddress member = null;
            MulticastPacketSenderThread senderThread = null;
            while (true) {
                /*
                 *  由于前面已调用configureBlocking(false)方法将通道设置为非阻塞式的,
                 *  因此这里对需要对读进行判空。与MulticastSocket类似,
                 *  从组播通道的receive()方法的返回结果中,可以得到当前数据报是哪一个组播成员发送的。
                 */
                if ((member = (InetSocketAddress) channel.receive(buffer)) != null) {
                    buffer.flip();
                    String notice = DateUtils.dateToString(new Date(), DateUtils.DEFAULT_DATETIME_FORMAT)
                            + " - 来自 " + member.getHostName() + "[" + member.getAddress().getHostAddress()
                            + ":" + member.getPort() + "] 的消息:" ;
                    System.out.println(notice);
                    System.out.println(decoder.decode(buffer));
                    buffer.clear();
                }
               
                /*
                 *  由于发送数据来源于键盘输入(阻塞式),因此这里需要用线程来实现无阻塞发送。
                 *  如果数据不是来源于阻塞式的终端,则直接在下面判断send()方法执行后是否返回0即可。
                 */
                if (senderThread == null) {
                    senderThread = new MulticastPacketSenderThread(channel, group, 9527);
                    new Thread(senderThread).start();
                } else {
                   
                    // 当输入"exit"后,结束接收和发送数据报
                    if (senderThread.isStopSend())
                        break;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (channel != null)
                try {
                    channel.close();
                    System.out.println("关闭组播通道");
                } catch (IOException e) {
                    e.printStackTrace();
                }
        }
    }
}  

 

/**
 * <p>数据报发送线程</p>
 * @author  <a href="mailto:code727@gmail.com">Daniele</a>
 * @version 1.0.0, 2013-4-15
 * @see    
 * @since   AppDemo1.0.0
 */
public class MulticastPacketSenderThread implements Runnable {
   
    private DatagramChannel channel;
   
    /** 发送目标组 */
    private InetAddress group;
   
    /** 目标组成员端口号 */
    private int groupPort;
   
    /** 标识是否结束发送操作 */
    private boolean stopSend;
   
    public MulticastPacketSenderThread(DatagramChannel channel, InetAddress group, int groupPort) {
        this.channel = channel;
        this.group = group;
        this.groupPort = groupPort;
    }
   
    public boolean isStopSend() {
        return stopSend;
    }

    public void run() {
        String inputMessage = "";
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        InetSocketAddress target = new InetSocketAddress(group, groupPort);
        while (!stopSend) {
            try {
                inputMessage = reader.readLine();
                if (!"exit".equalsIgnoreCase(inputMessage.trim()))
                    channel.send(ByteBuffer.wrap(inputMessage.getBytes()), target);
                else
                    stopSend = true;
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

 

       在Eclipse等IDE环境中,同时运行多个MulticastUserTerminal 实例后,在控制台中输入发送数据后回车,查看各实例的控制台的输出结果即可


组播用户组


     图1 多播用户组    



图2 User1的控制台      



图3 User2的控制台

  • 大小: 26.7 KB
  • 大小: 19.6 KB
  • 大小: 18.3 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics