`

ActiveMQ之三 -- 使用ActiveMQ来传送文件

 
阅读更多

这个方法还有待研究,目前还有如下几个疑点:
1. ActiveMQ 报出这样的信息:

INFO | Usage Manager memory limit (1048576) reached for topic://EXCHANGE.FILE. Producers will be throttled to the rate at which messages are removed from this
destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info

 

2. 这种以异步方式传送资料,能保证客户端能以正确的顺序接收到文件段麽?

 

使用ActiveMQ传送文件,发送端必须将文件拆成一段一段,每段封装在独立的Message中,逐次发送到客户端。例如下面的例子,Producer通过发送命令,告诉文件传送的开始,发送中,结束。客户端接收到这些命令之后,就知道如何接收资料了。

客户端收到内容后,根据命令将内容合并到一个文件中。  

 

package org.apache.activemq.exchange.file;

import java.io.BufferedOutputStream;
import java.io.FileOutputStream;
import java.io.IOException;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.StreamMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer {

    /**
     * @param args
     */
    public static void main(String[] args) throws JMSException, IOException {
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");

        Connection connection = factory.createConnection();
        connection.start();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        Destination destination = session.createTopic("EXCHANGE.FILE");

        MessageConsumer consumer = session.createConsumer(destination);

        boolean appended = false;
        try {
            while (true) {
                Message message = consumer.receive(5000);
                if (message == null) {
                    continue;
                }

                if (message instanceof StreamMessage) {
                    StreamMessage streamMessage = (StreamMessage) message;
                    String command = streamMessage.getStringProperty("COMMAND");
                    
                    if ("start".equals(command)) {
                        appended = false;
                        continue;
                    }

                    if ("sending".equals(command)) {
                        byte[] content = new byte[4096];
                        String file_name = message.getStringProperty("FILE_NAME");
                        BufferedOutputStream bos = null;
                        bos = new BufferedOutputStream(new FileOutputStream("c:/" + file_name, appended));
                        if (!appended) {
                            appended = true;
                        }
                        while (streamMessage.readBytes(content) > 0) {
                            bos.write(content);
                        }
                        bos.close();
                        continue;
                    }

                    if ("end".equals(command)) {
                        appended = false;
                        continue;
                    }
                }
            }
        } catch (JMSException e) {
            throw e;
        } finally {
            if (connection != null) {
                connection.close();
            }
        }

    }

}

 

发送端将文件分包,逐次发送到客户端  

package org.apache.activemq.exchange.file;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.StreamMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Publisher {

    public static String FILE_NAME = "01.mp3";
    
    public static void main(String[] args) throws JMSException, IOException {
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        Connection connection = factory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createTopic("EXCHANGE.FILE");        
        MessageProducer producer = session.createProducer(destination);
        long time = System.currentTimeMillis();
        
        //通知客户端开始接受文件
        StreamMessage message = session.createStreamMessage();
        message.setStringProperty("COMMAND", "start");
        producer.send(message);
        
        //开始发送文件
        byte[] content = new byte[4096];
        InputStream ins = Publisher.class.getResourceAsStream(FILE_NAME);
        BufferedInputStream bins = new BufferedInputStream(ins);
        while (bins.read(content) > 0) {
            //
            message = session.createStreamMessage();
            message.setStringProperty("FILE_NAME", FILE_NAME);
            message.setStringProperty("COMMAND", "sending");
            message.clearBody();
            message.writeBytes(content);
            producer.send(message);
        }
        bins.close();
        ins.close();
        
        //通知客户端发送完毕
        message = session.createStreamMessage();
        message.setStringProperty("COMMAND", "end");
        producer.send(message);
        
        connection.close();
        
        System.out.println("Total Time costed : " + (System.currentTimeMillis() - time) + " mili seconds");
    }
}
 

 

分享到:
评论
2 楼 luotianwen456123 2015-01-12  
文件大小有限制吗
1 楼 hotbain 2013-01-13  
谢谢分享,值得收藏。又学了好多啊! 

相关推荐

    JMS 使用 ActiveMQ 传送文件

    NULL 博文链接:https://itjiehun.iteye.com/blog/1321969

    liuwei1989#study-guide#ActiveMQ传输文件的几种方式原理1

    对于比较小的文件,简单的处理方式是先读取所有的文件成byte[],然后使用ByteMessage,把文件数据发送到broker,像正常的message一样处理。

    docker-activemq:Apache ActiveMQ 的 Docker 镜像

    ActiveMQ Web 前端 - 8161 OpenWire 传输侦听器 - 61616 AMPQ 传输侦听器 - 5672 STOMP 传输侦听器 - 61613 MQTT 传输侦听器 - 1883 WS 传输侦听器 - 61614 在默认配置目录中创建一个卷,以便用户可以挂载他/她自己...

    ACTIVEMQ C#下的例子

    activemq 传送数据流发送文件,仅供参考

    activemq的windowns编译库、centos7编译库和mac编译库(含头文件和库文件)

    Spring支持,以便ActiveMQ可以轻松嵌入到Spring应用程序中,并使用Spring的XML配置机制进行配置 专为高性能集群,客户端 - 服务器,基于对等的通信而设计 CXF和Axis支持,以便ActiveMQ可以轻松地放入这些Web服务...

    实战ActiveMQ集群与应用视频教程.zip

    网盘文件永久链接 1:ActiveMQ入门和消息中间件 2:JMS基本概念和模型 ...7:ActiveMQ支持的传输协议 8:ActiveMQ消息存储持久化 9:ActiveMQ的静态网络链接 10:多线程consumer访问集群 ..........

    ActiveMQ.rar

    基于ActiveMQ的多人聊天系统(c#,winform),实现功能:发送文本信息,发送图片,传输文件,好友列表,群聊私聊

    simple-camel

    如果文件具有xml扩展名->使用jaxb解析文件并将其发送到xmlQueue嵌入式ActiveMQ代理; +/-(带有CD标签的嵌套元素解析不正确,未收集到集合中) 如果文件具有任何其他扩展名->引发异常,然后将该文件写入errorQueue...

    Java思维导图xmind文件+导出图片

    CDN静态文件访问 分布式存储 分布式搜索引擎 应用发布与监控 应用容灾及机房规划 系统动态扩容 分布式架构策略-分而治之 从简到难,从网络通信探究分布式通信原理 基于消息方式的系统间通信 理解通信协议...

    JAVA上百实例源码以及开源项目

    (1)提高文件的共享性(计算机程序和/或数据),(2)鼓励间接地(通过程序)使用远程计算机,(3)保护用户因主机之间的文件存储系统导致的变化,(4)为了可靠和高效地传输,虽然用户可以在终端上直接地使用它,...

    JAVA上百实例源码以及开源项目源代码

    EJB中JNDI的使用源码例子 1个目标文件,JNDI的使用例子,有源代码,可以下载参考,JNDI的使用,初始化Context,它是连接JNDI树的起始点,查找你要的对象,打印找到的对象,关闭Context…… ftp文件传输 2个目标文件...

    springcloud入门

    springcloud-zipkin:链路跟踪工具,监控并就持久化微服务集群中调用链路的通畅情况,采用rabbitmq异步传输、elasticsearch负责持久化的方式集成。 #### 软件架构 1、JDK:jdk-8u181-windows-x64。 2、MAVEN:...

    java开源包1

    Tomcat Native 这个项目可以让 Tomcat 使用 Apache 的 apr 包来处理包括文件和网络IO操作,以提升性能。 预输入搜索 Cleo Cleo 是一个灵活的软件库用于处理一些预输入和自动完成的搜索功能,该项目是 LinkedIn 公司...

    Transport.Transport

    用于 ActiveMQ 传输的完整示例。 还包含本机 Java ActiveMQ。 可以运行 NServiceBus 到 NServiceBus 或 NServiceBus 到 Native Java ActiveMQ。 Intro2NServiceBus NServiceBus 演示简介的骨架解决方案。 ...

    java开源包10

    Tomcat Native 这个项目可以让 Tomcat 使用 Apache 的 apr 包来处理包括文件和网络IO操作,以提升性能。 预输入搜索 Cleo Cleo 是一个灵活的软件库用于处理一些预输入和自动完成的搜索功能,该项目是 LinkedIn 公司...

    java开源包8

    Tomcat Native 这个项目可以让 Tomcat 使用 Apache 的 apr 包来处理包括文件和网络IO操作,以提升性能。 预输入搜索 Cleo Cleo 是一个灵活的软件库用于处理一些预输入和自动完成的搜索功能,该项目是 LinkedIn 公司...

    java开源包11

    Tomcat Native 这个项目可以让 Tomcat 使用 Apache 的 apr 包来处理包括文件和网络IO操作,以提升性能。 预输入搜索 Cleo Cleo 是一个灵活的软件库用于处理一些预输入和自动完成的搜索功能,该项目是 LinkedIn 公司...

    java开源包2

    Tomcat Native 这个项目可以让 Tomcat 使用 Apache 的 apr 包来处理包括文件和网络IO操作,以提升性能。 预输入搜索 Cleo Cleo 是一个灵活的软件库用于处理一些预输入和自动完成的搜索功能,该项目是 LinkedIn 公司...

    java开源包3

    Tomcat Native 这个项目可以让 Tomcat 使用 Apache 的 apr 包来处理包括文件和网络IO操作,以提升性能。 预输入搜索 Cleo Cleo 是一个灵活的软件库用于处理一些预输入和自动完成的搜索功能,该项目是 LinkedIn 公司...

Global site tag (gtag.js) - Google Analytics