`

JXTA中基于管道监听方式通信

阅读更多

在上一篇中“基于Pipe管道通信”中很多朋友反应说只有代码没有理论,看起来不知道过程。本文将过程补上,而且提供基于Pipe的另种通信方式-监听方式的实现。

Pipe是Peer之间通信主要机制之一。JXTA中的Pipe和传统的管道存在着本质的区别。它不是内存中存在的数据,也不是硬盘上的文件,而只是一个XML广告文档。因此,其是抽象的概念。其创建过程在这里也称为绑定过程,可以在不同时刻动态地绑定到不同的“物理管道末端”。其具有两个明显的特点:

1、Pipe可以绑定到任何端点

2、Pipe是单向的、不可靠的、异步的

JXTA中的Pipe中存在三种类型:JxtaUnicast,JxtaUnicastSecure,JxtaPropagate。

JxtaUnicast,JxtaUnicastSecure类型管道是单向的,可以在一个输入管道和一个或多个输出管道之间创建,而JxtaPropagate类型的管道则支持多个输入端点。

下面给出其具体通信过程:

1、作为接收方的Peer需要具备一个PipeAdvertisement,然后根据这个PipeAdvertisement创建一个InputPipe,然后等待Messages的到达。

2、发送数据方的Peer需要使用相同的PipeAdvertisement,然后根据这个PipeAdvertisement来创建OutputPipe以发送数据。要创建OutputPipe,它要先发送一个Pipe Binding Query Message给所有它知道的Peers。

3、接收方收到这个Pipe Binding Query Message,看看自己缓存的Pipes中有没有匹配的PipeID。如果有,它就回复一个Pipe Binding Answer Message(里面包含了自己的PeerAdvertisement)给接收方。

4、发送方接收到Pipe Binding Answer Message后,将PeerAdvertisement从中抽取出来。然后使用PeerAdvertisement中的Endpoint信息来创建OutputPipe,这样发送方才可以发送数据。

 

以上就是JXTA中基于Pipe的通信过程,下面我们基于Pipe,利用信息监听接口实现通信。在给出代码之前,先给出下面两个类所用到的Pipe中的主要类的描述。

PipeService:管道服务实现的接口,用于创建输入,输出管道以及创建管道注册监听器

InputPipe:输入管道的接口,为注册监听器的时候也可以等待信息

OutputPipe:输出管道的接口,用来发送信息

PipeMsgListener(之前版本是InputPipeListener):由JXTA程序实现的的接口,处理输入管道消息达到事件

OutputPipeListener:由需要处理的输出管道事件的应用程序实现的接口,通常在管道绑定完成的时候发生。

PipeMsgEvent:注册到输入管道监听器的类,表示消息的到达。

OutputPipeEvent:注册到输出管道监听器的类,可以用来绑定或者解析成功后的管道的ID或者输出管道的实体。

写了这么,有些人应该看不下去了,呵呵。好,下面就给出代码,在这里我使用的PipeAdvertisement与以前的不同,以前是在程序中生成,在这里是读pipe.adv来返回管道广告。该文件直接在工程下面,注意:如果读不到该文件,应该是路径问题。
public class PipeServer implements PipeMsgListener {

 

    static PeerGroup netPg = null;
    transient NetworkConfigurator config;
    private PipeService pipeService;
    private PipeAdvertisement pipeAdv;
    private InputPipe inputPipe = null;
    public PipeServer() {
     config = null;
        try {
            config = new NetworkConfigurator();
            config.setPrincipal("Pipe1");
            config.setPassword("888888888");
            config.save();
            netPg = new NetPeerGroupFactory().getInterface();
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(-1);
        }
        pipeService = netPg.getPipeService();
        pipeAdv = PipeClient.getPipeAdvertisement();
    }
    public static void main(String args[]) {
        PipeServer server = new PipeServer();
        server.start();
    }
    /**
     * 创建输入管道并监听信息
     */
    public void start() {

        try {
            System.out.println("服务端=创建输入管道");
            // 创建输入管道并注册等待信息的到达
            inputPipe = pipeService.createInputPipe(pipeAdv, this);
        } catch (IOException io) {
            io.printStackTrace();
            return;
        }
        if (inputPipe == null) {
            System.out.println("服务端=不能打开输入管道");
            System.exit(-1);
        }
        System.out.println("服务端=等待输出管道中的信息......");
    }

    /**
     * 关闭输入管道及退出程序
     */
    public void stop() {
        inputPipe.close();
        System.exit(-1);
    }

    /**
     * 要实现PipeMsgListener接口该方法,用来接收输出管道的中信息,并打印在控制台
     */
    public void pipeMsgEvent(PipeMsgEvent event) {

        Message msg;
        try {
            // PipeMsgEvent事件中包含信息
            msg = event.getMessage();
            if (msg == null) {
                System.out.println("服务端=空信息");
                return;
            }
        } catch (Exception e) {
            e.printStackTrace();
            return;
        }

        // 取得所有信息元素
        Message.ElementIterator en = msg.getMessageElements();

        if (!en.hasNext()) {
            return;
        }
        // 取得content元素下的内容
        MessageElement msgElement = msg.getMessageElement(null, PipeClient.MESSAGE_NAME_SPACE);

        // 接收信息
        if (msgElement.toString() == null) {
            System.out.println("服务端=接到空的信息 ");
        } else {
            System.out.println("服务端=接收到的信息 :" + msgElement.toString());
        }
    }
}

首先运行的是上面这个类,如果运行顺序错了,会连接不到,而发送不了信息。

public class PipeClient implements OutputPipeListener {

    public final static String MESSAGE_NAME_SPACE = "content";
    private PipeService pipeService;
    private PipeAdvertisement pipeAdv;
    private OutputPipe outputPipe;
    private NetworkConfigurator config;
    private PeerGroup netPg = null;
    public PipeClient() {
     config = null;
        try {
            config = new NetworkConfigurator();
            config.setPrincipal("Pipe2");
            config.setPassword("888888888");
            config.save();
            netPg = new NetPeerGroupFactory().getInterface();
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(-1);
        }
        // 获取管道服务
        pipeService = netPg.getPipeService();
        // 创建管道广告
        pipeAdv = getPipeAdvertisement();
    }
    public static void main(String args[]) {
        // 连接目标Peer
        PipeClient client = new PipeClient();
        client.start();
    }
    /**
     * 创建管道广告
     */
    public static PipeAdvertisement getPipeAdvertisement() {
     PipeAdvertisement advertisement=null;
        FileInputStream is;
  try {
   is = new FileInputStream("pipe.adv");//通过本地特定管道广告来创建管道
   advertisement=(PipeAdvertisement)AdvertisementFactory.newAdvertisement(MimeMediaType.XMLUTF8, is);
   is.close();
  } catch (Exception e) {
   e.printStackTrace();
  }
        return advertisement;
    }

    /**
     * 创建输出管道
     */
    public synchronized void start() {
        try {
            // 一旦输出管道创建完成,outputPipeEvent()就被激活
            pipeService.createOutputPipe(pipeAdv, this);
        } catch (IOException e) {
            System.out.println("客户端=创建输出管道失败");
            e.printStackTrace();
            System.exit(-1);
        }
    }

    /**
     * 要实现OutputPipeListener中的该方法
     * 当OutputPipe创建完成则调用该方法
     */
    public void outputPipeEvent(OutputPipeEvent event) {

        System.out.println("收到输出管道创建事件");
        // 获取输出管道对象
        outputPipe = event.getOutputPipe();

        Message msg;

        try {
            System.out.println("发送信息中......");
            // 实例化要发送的信息对象
            msg = new Message();
            // 字符串信息元素中添加要发送的内容“Testing Pipe"
            StringMessageElement sme = new StringMessageElement(MESSAGE_NAME_SPACE, "Testing Pipe", null);

            msg.addMessageElement(null, sme);
            // 发送信息
            outputPipe.send(msg);
            System.out.println("客户端:信息已经发送");
        } catch (IOException e) {
            System.out.println("客户端:发送信息失败");
            e.printStackTrace();
            System.exit(-1);
        }
        stop();
    }

 

 

    /**
     * 关闭输出管道和退出程序
     */
    public void stop() {
        outputPipe.close();
        System.exit(-1);
    }
}

终于完了,这是我写的最长一篇啦。今天就写到这了。以后会继续把我学到的JXTA P2P通信方面的知识共享给大家。

忘记提供pipe.adv,真不意思。pipe.adv文件内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<jxta:PipeAdvertisement>
<Name>Pipe tutorial</Name>
<Id>urn:jxta:uuid-59616261646162614E50472050325033C0C1DE89719B456691A596B983BA0E1004</Id>
<Type>JxtaUnicast</Type>
</jxta:PipeAdvertisement>

 

0
0
分享到:
评论
2 楼 yinboxian 2014-10-31  
我的peers第一次执行时可以得到正确的结果,以后就不行了。不知是为什么?
1 楼 lifeng_2009 2010-06-03  
牙哥太无私了,而且很专业。

相关推荐

Global site tag (gtag.js) - Google Analytics