- 浏览: 7251205 次
- 性别:
- 来自: 上海
文章分类
- 全部博客 (1546)
- 企业中间件 (236)
- 企业应用面临的问题 (236)
- 小布Oracle学习笔记汇总 (36)
- Spring 开发应用 (54)
- IBatis开发应用 (16)
- Oracle基础学习 (23)
- struts2.0 (41)
- JVM&ClassLoader&GC (16)
- JQuery的开发应用 (17)
- WebService的开发应用 (21)
- Java&Socket (44)
- 开源组件的应用 (254)
- 常用Javascript的开发应用 (28)
- J2EE开发技术指南 (163)
- EJB3开发应用 (11)
- GIS&Mobile&MAP (36)
- SWT-GEF-RCP (52)
- 算法&数据结构 (6)
- Apache开源组件研究 (62)
- Hibernate 学习应用 (57)
- java并发编程 (59)
- MySQL&Mongodb&MS/SQL (15)
- Oracle数据库实验室 (55)
- 搜索引擎的开发应用 (34)
- 软件工程师笔试经典 (14)
- 其他杂项 (10)
- AndroidPn& MQTT&C2DM&推技术 (29)
- ActiveMQ学习和研究 (38)
- Google技术应用开发和API分析 (11)
- flex的学习总结 (59)
- 项目中一点总结 (20)
- java疑惑 java面向对象编程 (28)
- Android 开发学习 (133)
- linux和UNIX的总结 (37)
- Titanium学习总结 (20)
- JQueryMobile学习总结 (34)
- Phonegap学习总结 (32)
- HTML5学习总结 (41)
- JeeCMS研究和理解分析 (9)
最新评论
-
lgh1992314:
[u][i][b][flash=200,200][url][i ...
看看mybatis 源代码 -
尼古拉斯.fwp:
图片根本就不出来好吧。。。。。。
Android文件图片上传的详细讲解(一)HTTP multipart/form-data 上传报文格式实现手机端上传 -
ln94223:
第一个应该用排它网关吧 怎么是并行网关, 并行网关是所有exe ...
工作流Activiti的学习总结(八)Activiti自动执行的应用 -
ZY199266:
获取不到任何消息信息,请问这是什么原因呢?
ActiveMQ 通过JMX监控Connection,Queue,Topic的信息 -
xiaoyao霄:
DestinationSourceMonitor 报错 应该导 ...
ActiveMQ 通过JMX监控Connection,Queue,Topic的信息
MQTT的学习研究(六) MQTT moquette 的 Blocking API 订阅消息客户端使用
参阅官方文档:
* 使用 Java 为 MQ Telemetry Transport 创建订户
* 在此任务中,您将遵循教程来创建订户应用程序。订户将针对主题创建预订并接收该预订的发布。
* 提供了一个示例订户应用程序 Subscribe。Subscribe 将创建预订主题 MQTT Examples,并等待获
* 得该预订的发布,等待时间为 30 秒。订户可以创建预订并等待获得发布。它还可以接收发送至先前
* 为同一客户机标识创建的预订的发布。
* MqttConnectionOptions.cleanSession Boolean 属性将控制是否接收到先前所发送的发布
*
*4.创建新的 MqttClient 实例。
* MqttClient client = new MqttClient(Example.TCPAddress, Example.clientId);为客户机提供服务器地址,稍后会将此地址用来连接至 WebSphere MQ。设置客户机标识以对客户机命名。
*
* ◦(可选)可以提供 MqttClientPersistence 接口的实现以替换缺省实现。缺省 MqttPersistence 实现会将正在等待传递的 QoS 1 和 QoS 2 消息作为文件来存储;请参阅MQTT 客户机中的消息持久性。
* ◦MQTT 的缺省 WebSphere MQ TCP/IP 端口为 1883。对于 SSL,缺省端口为 8883。在此示例中,缺省地址设置为 tcp://localhost:1883。
* ◦通常,能够使用客户机标识来标识特定物理客户机很重要。在与服务器相连的所有客户机中,客户机标识必须是唯一的;请参阅MQTT 客户机标识。如果与前一个实例使用同一个客户机标识,那么表示目前的实例是同一个客户机的实例。如果您在两个正在运行的客户机中重复使用同一个客户机标识,那么这两个客户机中都会抛出异常,并且一个客户机会终止。
* ◦客户机标识的长度不能超过 23 个字节。如果超过了此长度,就会抛出异常。客户机标识中必须只包含队列管理器名称中允许使用的字符;例如,不能包含连字符或空格。
* ◦在您调用 MqttClient.connect 方法之前,不会处理消息。
* 使用客户机对象来发布和预订主题以及恢复有关尚未传递的发布的信息。
*
*
*6.创建一个 MqttConnectOptions 对象,并设置其 cleanSession 属性。
* a.创建一个 MqttConnectOptions 对象。
* MqttConnectOptions conOptions = new MqttConnectOptions();conOptions 是 MqttClient 构造函数的一个选项参数。
*
* b.设置 clearSession 属性。
* conOptions.setCleanSession(Example.cleanSession);缺省情况下,Example.cleanSession 参数设置为 true,从而与 MqttConnectionOptions.cleanSession 的缺省设置相匹配。
*
* 如果您使用缺省 MqttConnectOptions,或者在连接客户机之前将 MqttConnectOptions.cleanSession 设置为 true,那么在客户机建立连接时,将除去客户机的任何旧预订。当客户机断开连接时,会除去客户机在会话期间创建的任何新预订。
*
* 如果您在连接之前将 MqttConnectOptions.cleanSession 设置为 false,那么客户机创建的任何预订都会被添加至客户机在连接之前就已存在的所有预订。当客户机断开连接时,所有预订仍保持活动状态。
*
* 要了解 cleanSession 属性影响预订的方式,另一种方法就是将它视作模态属性。在其缺省方式 cleanSession=true 下,客户机仅在会话的作用域内创建预订和接收发布。在另一种方式 cleanSession=false 下,预订是持久预订。客户机可以连接和断开连接,而其预订保持活动状态。当客户机重新连接时,它将接收任何未传递的发布。在它连接之后,它可以自己修改处于活动状态的预订集。
*
* 在连接之前,您必须设置 cleanSession 方式;在整个会话期间都将保持此方式。要更改此属性的设置,必须将客户机断开连接,然后再重新连接客户机。如果您将方式从使用 cleanSession=false 更改为 cleanSession=true,那么此客户机先前的所有预订以及尚未接收到的任何发布都将被废弃。
*
MQTT订阅实现类:
package com.etrip.wsmqtt.client; import com.ibm.micro.client.mqttv3.MqttClient; import com.ibm.micro.client.mqttv3.MqttConnectOptions; /** * * 使用 Java 为 MQ Telemetry Transport 创建订户 * 在此任务中,您将遵循教程来创建订户应用程序。订户将针对主题创建预订并接收该预订的发布。 * 提供了一个示例订户应用程序 Subscribe。Subscribe 将创建预订主题 MQTT Examples,并等待获 * 得该预订的发布,等待时间为 30 秒。订户可以创建预订并等待获得发布。它还可以接收发送至先前 * 为同一客户机标识创建的预订的发布。 * @author longgangbai */ public class WSMQTTClientSubscribe { public static void main(String[] args) { try { //创建MQTT客户端对象 MqttClient client = new MqttClient(WSMQTTClientConstants.TCPAddress, WSMQTTClientConstants.clientId); //创建客户端MQTT回调类 WSMQTTClientCallBack callback = new WSMQTTClientCallBack(WSMQTTClientConstants.clientId); //设置MQTT回调 client.setCallback(callback); //创建一个连接对象 MqttConnectOptions conOptions = new MqttConnectOptions(); //设置清除会话信息 conOptions.setCleanSession(WSMQTTClientConstants.cleanSession); //设置超时时间 conOptions.setConnectionTimeout(10000); //设置会话心跳时间 conOptions.setKeepAliveInterval(20000); //设置最终端口的通知消息 conOptions.setWill(client.getTopic("LastWillTopic"), "the client will stop !".getBytes(), 1, false); //连接broker client.connect(conOptions); System.out.println("Subscribing to topic \"" + WSMQTTClientConstants.topicString + "\" for client instance \"" + client.getClientId() + "\" using QoS " + WSMQTTClientConstants.QoS + ". Clean session is " + WSMQTTClientConstants.cleanSession); //订阅相关的主题信息 client.subscribe(WSMQTTClientConstants.topicString, WSMQTTClientConstants.QoS); System.out.println("Going to sleep for " + WSMQTTClientConstants.sleepTimeout / 1000 + " seconds"); Thread.sleep(100000000000000l); //关闭相关的MQTT连接 if(client.isConnected()){ client.disconnect(); } System.out.println("Finished"); } catch (Exception e) { e.printStackTrace(); } } }
MQTT订阅回调类:
package com.etrip.wsmqtt.client; import com.ibm.micro.client.mqttv3.*; /** * * 消息订阅相关的回调类使用 * * 必须实现MqttCallback的接口并实现对应的相关接口方法 * * @author longgangbai */ public class WSMQTTClientCallBack implements MqttCallback { private String instanceData = ""; public WSMQTTClientCallBack(String instance) { instanceData = instance; } public void messageArrived(MqttTopic topic, MqttMessage message) { try { System.out.println("Message arrived: \"" + message.toString() + "\" on topic \"" + topic.toString() + "\" for instance \"" + instanceData + "\""); } catch (Exception e) { e.printStackTrace(); } } public void connectionLost(Throwable cause) { System.out.println("Connection lost on instance \"" + instanceData + "\" with cause \"" + cause.getMessage() + "\" Reason code " + ((MqttException)cause).getReasonCode() + "\" Cause \"" + ((MqttException)cause).getCause() + "\""); cause.printStackTrace(); } public void deliveryComplete(MqttDeliveryToken token) { try { System.out.println("Delivery token \"" + token.hashCode() + "\" received by instance \"" + instanceData + "\""); } catch (Exception e) { e.printStackTrace(); } } }
常量类:
package com.etrip.wsmqtt.client; /** * * 消息订阅消息的常量字段 * * @author longgangbai */ public final class WSMQTTClientConstants { public static final String TCPAddress = System.getProperty("TCPAddress", "tcp://192.168.208.46:1883"); public static String clientId = String.format("%-23.23s",(System.getProperty("user.name") + "_" + (System.getProperty("clientId", "Subscribe."))).trim()).replace('-', '_'); public static final String topicString = System.getProperty("topicString", "china/beijing"); public static final String publication =System.getProperty("publication", "Hello World " + String.format("%tc", System.currentTimeMillis())); public static final int quiesceTimeout = Integer.parseInt(System.getProperty("timeout", "10000")); public static final int sleepTimeout =Integer.parseInt(System.getProperty("timeout", "10000000")); public static final boolean cleanSession = Boolean.parseBoolean(System.getProperty("cleanSession", "false")); public static final int QoS = Integer.parseInt(System.getProperty("QoS", "1")); public static final boolean retained = Boolean.parseBoolean(System.getProperty("retained", "false")); }
评论
这个是要做什么呢?客户端不显示是因为它啊。
最后声明,没有成功......也许成功是偶然,不成功是常态。
try {
System.out.println("Delivery token \"" + token.hashCode()
+ "\" received by instance \"" + instanceData + "\"");
} catch (Exception e) {
e.printStackTrace();
}
}
这里的token 是null,
客户端收不到 message,
从broker日志看,没有向client SENT。
343308 [NioProcessor-1] INFO SERVER LOG - CREATED
343308 [NioProcessor-1] INFO SERVER LOG - OPENED
343316 [NioProcessor-1] INFO SERVER LOG - RECEIVED: type: CONNECT, dup: false, QoS: 0, retain: false, remainingLen: 76
343317 [NioProcessor-1] INFO MQTTHandler - Received a message of type CONNECT
343321 [NioProcessor-1] INFO SERVER LOG - SENT: HeapBuffer[pos=0 lim=0 cap=0: empty]
343321 [NioProcessor-1] INFO SERVER LOG - SENT: HeapBuffer[pos=0 lim=0 cap=0: empty]
343324 [NioProcessor-1] INFO SERVER LOG - RECEIVED: type: SUBSCRIBE, dup: false, QoS: 1, retain: false, remainingLen: 18
343324 [NioProcessor-1] INFO MQTTHandler - Received a message of type SUBSCRIBE
343326 [pool-3-thread-1] INFO SimpleMessaging - replying with SubAct to MSG ID 1
343327 [NioProcessor-1] INFO SERVER LOG - SENT: HeapBuffer[pos=0 lim=0 cap=0: empty]
353327 [NioProcessor-1] INFO SERVER LOG - IDLE
363289 [NioProcessor-2] INFO SERVER LOG - CREATED
363293 [NioProcessor-2] INFO SERVER LOG - OPENED
363295 [NioProcessor-2] INFO SERVER LOG - RECEIVED: type: CONNECT, dup: false, QoS: 0, retain: false, remainingLen: 37
363296 [NioProcessor-2] INFO MQTTHandler - Received a message of type CONNECT
363298 [NioProcessor-2] INFO SERVER LOG - SENT: HeapBuffer[pos=0 lim=0 cap=0: empty]
363327 [NioProcessor-1] INFO SERVER LOG - IDLE
363351 [NioProcessor-2] INFO SERVER LOG - RECEIVED: type: PUBLISH, dup: false, QoS: 2, retain: false, remainingLen: 66
363351 [NioProcessor-2] INFO MQTTHandler - Received a message of type PUBLISH
363356 [NioProcessor-2] INFO SERVER LOG - SENT: HeapBuffer[pos=0 lim=0 cap=0: empty]
363384 [NioProcessor-2] INFO SERVER LOG - RECEIVED: type: PUBREL, dup: false, QoS: 1, retain: false, remainingLen: 2
363384 [NioProcessor-2] INFO MQTTHandler - Received a message of type PUBREL
363389 [NioProcessor-2] INFO SERVER LOG - SENT: HeapBuffer[pos=0 lim=0 cap=0: empty]
373327 [NioProcessor-1] INFO SERVER LOG - IDLE
Subscribing to topic "china/beijing" for client instance "Administrator_Subscribe" using QoS 1. Clean session is false
at com.etrip.wsmqtt.client.WSMQTTClientCallBack.deliveryComplete(WSMQTTClientCallBack.java:34)
at com.ibm.micro.client.mqttv3.internal.CommsCallback.run(CommsCallback.java:136)
at java.lang.Thread.run(Thread.java:619)
客户端没有定义回执,在回调中的token就没有值,这里怎么解决呢?
发表评论
-
TestNG简单的学习(十三)TestNG中Junit的实现
2013-12-04 09:00 3287TestNG和junit的整合 ... -
TestNG简单的学习(十二)TestNG运行
2013-12-03 09:08 51445文档来自官方地址: ... -
TestNG简单的学习(十一)TestNG学习总结
2013-12-03 09:08 13994最近一直在学习关于TestNG方面的知识,根 ... -
TestNG简单的学习(十)TestNG @Listeners 的使用
2013-12-03 09:07 8624TestNG官方网站: http://testng.or ... -
TestNG简单的学习(九)TestNG Method Interceptors 的使用
2013-12-03 09:07 2654TestNG官方网站: http://testng ... -
TestNG简单的学习(八)TestNG Annotation Transformers 的使用
2013-12-03 09:07 2757TestNG官方网站: http://testng.or ... -
TestNG简单的学习(七)TestNG编程方式运行
2013-12-02 09:22 2396TestNG官方网站: http://testng.or ... -
TestNG简单的学习(六)测试工厂注释的使用
2013-12-02 09:22 2713TestNG官方网站: http://testng.or ... -
TestNG简单的学习(五)参数化测试数据的定制
2013-12-02 09:22 2641TestNG官方网站: http://testng.or ... -
TestNG简单的学习(四)测试方法通过名称名称依赖实现
2013-12-02 09:21 2032TestNG官方网站: http://testng.or ... -
TestNG简单的学习(三)测试方法通过测试分组依赖实现
2013-12-02 09:21 2765TestNG官方网站: http://testng.or ... -
TestNG简单的学习(二)参数化测试并发且多方法测试方法判定
2013-11-29 15:35 3635TestNG官方网站: http://testng.or ... -
TestNG简单的学习(一)类和方法级别@Test的区别
2013-11-29 15:31 9380TestNG官方文档的地址: http://testng ... -
Feed4Junit的简单使用(七)Feed4TestNg
2013-11-29 13:35 6081在Feed4Junit主要针对junit实现的 ... -
Feed4Junit的简单使用(六)数据来特定格式文件
2013-11-29 12:29 2687Feed4Junit官方地址: http://da ... -
Feed4Junit的简单使用(五)数据来自动态约束数据
2013-11-29 12:29 2562Feed4Junit官方地址: http://datab ... -
Feed4Junit的简单使用(四)数据来自定义数据源
2013-11-28 14:09 3039Feed4Junit官方地址: http://databe ... -
Feed4Junit的简单使用(三)数据源来自数据库
2013-11-28 13:58 3097Feed4Junit官方地址: http://databe ... -
Feed4Junit的简单使用(二)数据源来自文件
2013-11-28 13:50 4521Feed4Junit官方地址: http://datab ... -
Feed4Junit的简单使用(一)
2013-11-28 13:47 2158Feed4Junit官方地址: http://databe ...
相关推荐
Winform中使用MQTTnet实现MQTT的服务端和客户端之间的通信以及将订阅的消息保存到文件示例代码.rar Winform中使用MQTTnet实现MQTT的服务端和客户端之间的通信以及将订阅的消息保存到文件示例代码.rar Winform中使用...
MQTTnet在.net core(2.2版本)开发环境下的实例代码,包含服务端,两个客户端(订阅、发布),并在阿里云的mqtt服务测试过可用
编写环境VS2013,C# MQTT客户端订阅主题接收消息接口。
SpringBoot整合MQTT服务器实现消息的发送与订阅(推送消息与接收推送)客户端类与回调方法.rar 博客地址:https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/112394731
springboot+idea+java+mqtt实现订阅者订阅消息 针对业务需要和硬件对接,使用mqtt来处理硬件的数据 java实现订阅者订阅消息,以及处理硬件的数据 是根据业务场景写的demo,包括数据库什么的就不再上传,大家可以根据...
最近在搞IOT方面的东西,接触到MQTT协议,由于需要模拟多个MQTT客户端进行消息订阅及消息推送功能,而现有的工具和网上的代码都满足不了现有需求,例如MQTT.fx只能模拟单个设备订阅或者消息推送、MQTT broker提供的...
MQTT发布/订阅消息机制
最近做MQTT发布和订阅的功能,在网上找了很久,都是C#或其它语言的,vb.net的基本找不到,所以用vb.net做了个MQTT客户端的发布和订阅功能,不包含服务器代码。 要测试使用该代码,需要有自己的MQTT服务器。
说明:工程分为两个。一个是Linux C语言编写的MQTT客户端,另一个是websocket编写的MQTT客户端,先运行Linux的,再运行websocket就出实验现象了。(发布的主要是温湿度数据、继电器控制状态、GPS定位系统等等)
利用C#编写MQTT客户端上位机,简单实用,自带MQTT使用库
mqtt开发C语言基于paho实现MQTT客户端实战案例
目录 一、简介 ...MQTT 全称为 Message Queuing Telemetry Transport(消息队列遥测传输)是一种基于发布/订阅范式的“轻量级”消息协议,由 IBM 发布。 MQTT 可以被解释为一种低开销,低带宽占用
mqtt.fx1.7.1 mqtt客户端 mqtt.fx无需授权版。支持发布、订阅
微消息队列MQTT版最简单的使用场景即MQTT客户端消息的自发自收。如下图所示,您可以使用MQTT.fx作为MQTT客户端,在MQTT.fx客户端配置相关参数后接入微消息队列MQTT版实现消息的发送和接收。消息收发 微消息队列MQTT...
SpringBoot集成MQTT之消息订阅处理程序,含有AES加密算法工具类、BASE64的编码解码。
java实现mqtt的发送和订阅,代码中有详细的注释,是分服务端和客户端来测试的,并有断开重连的处理!
winForm中使用MQTT收发消息,不懂的可以联系qq:502701291
MQTT客户端(MQTT.fx)1.7.1
MQTT调试工具,用于MQTT协议联调联试,使用方便,画面简洁清晰,需要键入地址及端口号,ID/用户名及密码后连接,能够用来进行十六进制显示和ASCII显示,可以订阅MQTT topic!
客户端、服务端都包含,直接启动即可查看发布订阅消息推送功能