消息通讯——SpringBoot集成MQTT

SpringBoot集成MQTT步骤

1. 引入pom依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!--Spring boot Web容器-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--FreeMarker模板视图依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-freemarker</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>RELEASE</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>net.sf.json-lib</groupId>
<artifactId>json-lib</artifactId>
<version>2.4</version>
</dependency>

2. application.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 应用服务 WEB 访问端口
server.port=8080
# 配置静态资源路径
spring.resources.static-locations=classpath:/resources/,classpath:/static/,classpath:/templates/

### FreeMarker 配置
spring.freemarker.allow-request-override=false
#Enable template caching.启用模板缓存。
spring.freemarker.cache=false
spring.freemarker.check-template-location=true
spring.freemarker.charset=UTF-8
spring.freemarker.content-type=text/html
spring.freemarker.expose-request-attributes=false
spring.freemarker.expose-session-attributes=false
spring.freemarker.expose-spring-macro-helpers=false
#设置面板后缀
spring.freemarker.suffix=.ftl

## MQTT##
mqtt.host=tcp://127.0.0.1:1883
mqtt.clientId=mqttClient
mqtt.username=admin
mqtt.password=123456
mqtt.timeout=1000
mqtt.keepalive=2000
mqtt.topic1=ceshi

3. MqttConfiguration.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package cn.kt.mqttdemo2.config;


import cn.kt.mqttdemo2.mqtt.MyMQTTClient;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* Created by tao.
* Date: 2021/4/12 14:47
* 描述:
*/
@Configuration
@Slf4j
public class MqttConfiguration {

@Value("${mqtt.host}")
String host;
@Value("${mqtt.username}")
String username;
@Value("${mqtt.password}")
String password;
@Value("${mqtt.clientId}")
String clientId;
@Value("${mqtt.timeout}")
int timeOut;
@Value("${mqtt.keepalive}")
int keepAlive;
@Value("${mqtt.topic1}")
String topic1;

@Bean//注入spring
public MyMQTTClient myMQTTClient() {
MyMQTTClient myMQTTClient = new MyMQTTClient(host, username, password, clientId, timeOut, keepAlive);
for (int i = 0; i < 10; i++) {
try {
myMQTTClient.connect();
myMQTTClient.subscribe(topic1, 1);
return myMQTTClient;
} catch (MqttException e) {
log.error("MQTT connect exception,connect time = " + i);
try {
Thread.sleep(2000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
return myMQTTClient;
}
}

4. MyMQTTClient.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package cn.kt.mqttdemo2.mqtt;


import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Created by lcw.
* Date: 2021/4/12 14:46
* 描述:
*/
public class MyMQTTClient {

private static final Logger LOGGER = LoggerFactory.getLogger(MyMQTTClient.class);

private static MqttClient client;

public static MqttClient getClient() {
return client;
}

public static void setClient(MqttClient client) {
MyMQTTClient.client = client;
}

private String host;
private String username;
private String password;
private String clientId;
private int timeout;
private int keepalive;

public MyMQTTClient(String host, String username, String password, String clientId, int timeOut, int keepAlive) {
this.host = host;
this.username = username;
this.password = password;
this.clientId = clientId;
this.timeout = timeOut;
this.keepalive = keepAlive;
}

/**
* 设置mqtt连接参数
*
* @param username
* @param password
* @param timeout
* @param keepalive
* @return
*/
public MqttConnectOptions setMqttConnectOptions(String username, String password, int timeout, int keepalive) {
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keepalive);
options.setCleanSession(false);
return options;
}

/**
* 连接mqtt服务端,得到MqttClient连接对象
*/
public void connect() throws MqttException {
if (client == null) {
client = new MqttClient(host, clientId, new MemoryPersistence());
client.setCallback(new MyMQTTCallback(MyMQTTClient.this));
}
MqttConnectOptions mqttConnectOptions = setMqttConnectOptions(username, password, timeout, keepalive);
if (!client.isConnected()) {
client.connect(mqttConnectOptions);
} else {
client.disconnect();
client.connect(mqttConnectOptions);
}
LOGGER.info("MQTT connect success");//未发生异常,则连接成功
}

/**
* 发布,默认qos为0,非持久化
*
* @param pushMessage
* @param topic
*/
public void publish(String pushMessage, String topic) {
publish(pushMessage, topic, 0, false);
}

/**
* 发布消息
*
* @param pushMessage
* @param topic
* @param qos
* @param retained:留存
*/
public void publish(String pushMessage, String topic, int qos, boolean retained) {
MqttMessage message = new MqttMessage();
message.setPayload(pushMessage.getBytes());
message.setQos(qos);
message.setRetained(retained);
MqttTopic mqttTopic = MyMQTTClient.getClient().getTopic(topic);
if (null == mqttTopic) {
LOGGER.error("topic is not exist");
}
MqttDeliveryToken token;//Delivery:配送
synchronized (this) {//注意:这里一定要同步,否则,在多线程publish的情况下,线程会发生死锁,分析见文章最后补充
try {
token = mqttTopic.publish(message);//也是发送到执行队列中,等待执行线程执行,将消息发送到消息中间件
token.waitForCompletion(1000L);
} catch (MqttPersistenceException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}
}
}

/**
* 订阅某个主题,qos默认为0
*
* @param topic
*/
public void subscribe(String topic) {
subscribe(topic, 0);
}

/**
* 订阅某个主题
*
* @param topic
* @param qos
*/
public void subscribe(String topic, int qos) {
try {
MyMQTTClient.getClient().subscribe(topic, qos);
} catch (MqttException e) {
e.printStackTrace();
}
}


/**
* 取消订阅主题
*
* @param topic 主题名称
*/
public void cleanTopic(String topic) {
if (client != null && client.isConnected()) {
try {
client.unsubscribe(topic);
} catch (MqttException e) {
e.printStackTrace();
}
} else {
System.out.println("取消订阅失败!");
}
}
}

5. MyMQTTCallback.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package cn.kt.mqttdemo2.mqtt;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Created by lcw.
* Date: 2021/4/12 14:46
* 描述:
*/
public class MyMQTTCallback implements MqttCallback {

private static final Logger LOGGER = LoggerFactory.getLogger(MyMQTTCallback.class);

private MyMQTTClient myMQTTClient;

public MyMQTTCallback(MyMQTTClient myMQTTClient) {
this.myMQTTClient = myMQTTClient;
}

/**
* 丢失连接,可在这里做重连
* 只会调用一次
*
* @param throwable
*/
@Override
public void connectionLost(Throwable throwable) {
LOGGER.error("连接断开,下面做重连...");
long reconnectTimes = 1;
while (true) {
try {
if (MyMQTTClient.getClient().isConnected()) {
LOGGER.warn("mqtt reconnect success end");
return;
}
LOGGER.warn("mqtt reconnect times = {} try again...", reconnectTimes++);
MyMQTTClient.getClient().reconnect();
} catch (MqttException e) {
LOGGER.error("", e);
}
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
//e1.printStackTrace();
}
}
}

/**
* @param s
* @param mqttMessage
* @throws Exception
* subscribe后得到的消息会执行到这里面
*/
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
//System.out.println("我收到消息了!!!");
LOGGER.info("接收消息主题 : {},接收消息内容 : {}", s, new String(mqttMessage.getPayload()));
}

/**
* 消息到达后
* subscribe后,执行的回调函数
*
* @param s
* @param mqttMessage
* @throws Exception
*/
/**
* publish后,配送完成后回调的方法
*
* @param iMqttDeliveryToken
*/
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
// LOGGER.info("==========deliveryComplete={}==========", iMqttDeliveryToken.isComplete());
}
}

6. MqttMsg.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package cn.kt.mqttdemo2.domain;

/**
* Created by lcw.
* Date: 2021/5/19 15:22
* 描述:
*/
public class MqttMsg {
private String name = "";
private String content = "";
private String time = "";

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getContent() {
return content;
}

public void setContent(String content) {
this.content = content;
}

public String getTime() {
return time;
}

public void setTime(String time) {
this.time = time;
}

@Override
public String toString() {
return "MqttMsg{" +
"name='" + name + '\'' +
", content='" + content + '\'' +
", time='" + time + '\'' +
'}';
}
}

7. MqttController.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package cn.kt.mqttdemo2.controller;

import cn.kt.mqttdemo2.domain.MqttMsg;
import cn.kt.mqttdemo2.mqtt.MyMQTTClient;
import net.sf.json.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.LinkedList;
import java.util.Queue;
import java.util.UUID;

/**
* Created by tao.
* Date: 2021/4/21 15:59
* 描述:
*/
@Controller
public class MqttController {

@Autowired
private MyMQTTClient myMQTTClient;

@Value("${mqtt.topic1}")
private String topic1;

@RequestMapping("/mqtt")
public String mqttClint() {
return "test.html";
}

Queue<String> msgQueue = new LinkedList<String>();
int count = 1;

/*@PostMapping("/getMsg")
@ResponseBody
public void mqttMsg(MqttMsg mqttMsg) {
System.out.println("***************" + mqttMsg.getName() + ":" + mqttMsg.getContent() + "****************");
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String time = df.format(new Date());
mqttMsg.setTime(time);

JSONObject json = JSONObject.fromObject(mqttMsg);
String sendMsg = json.toString();
System.out.println(sendMsg);
System.out.println("时间戳" + new Date().getTime());

//发布消息
myMQTTClient.publish(sendMsg, topic1);
}*/

@PostMapping("/getMsg")
@ResponseBody
public synchronized void mqttMsg(MqttMsg mqttMsg) {
System.out.println("队列元素数量:" + msgQueue.size());
System.out.println("***************" + mqttMsg.getName() + ":" + mqttMsg.getContent() + "****************");

//时间格式化
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String time = df.format(new Date());
mqttMsg.setTime(time);

mqttMsg.setContent(mqttMsg.getContent() + "——后台编号:" + count);
count++;

//map转json
JSONObject json = JSONObject.fromObject(mqttMsg);
String sendMsg = json.toString();
System.out.println(sendMsg);

//队列添加元素
boolean flag = msgQueue.offer(sendMsg);
if (flag) {
//发布消息
myMQTTClient.publish(msgQueue.poll(), topic1);
System.out.println("时间戳" + new Date().getTime());
}
System.out.println("队列元素数量:" + msgQueue.size());
}
}

8. mqttws31.js

可以下载:

链接:https://pan.baidu.com/s/1c9CfyhT4CSY2FEOa1OgxPw

提取码:siwg

也可以用对应的 cdn 地址

1
2
3
4
<!-- For the plain library-->
<script src="https://cdnjs.cloudflare.com/ajax/libs/paho-mqtt/1.0.1/mqttws31.js" type="text/javascript"></script>
<!-- For the minified library-->
<script src="https://cdnjs.cloudflare.com/ajax/libs/paho-mqtt/1.0.1/mqttws31.min.js" type="text/javascript"></script>

9. test.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
<!DOCTYPE html >
<html>

<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta http - equiv="X-UA-Compatible" content="ie=edge">
<title> Document </title>
<link href="https://cdn.bootcdn.net/ajax/libs/twitter-bootstrap/3.3.7/css/bootstrap.min.css" rel="nofollow noopener" rel="stylesheet">
<script src="https://cdn.bootcdn.net/ajax/libs/jquery/2.2.1/jquery.min.js"></script>
<script src="/mqttws31.js" type="text/javascript"></script>
<style>
#contentList li {
word-break: break-all;
word-wrap: break-word;
}
</style>
</head>

<body>
<div style="width: 900px;margin: 50px auto;">
<div class="form-group">
<label>评论人:</label>
<input type="text" class="form-control" id="user">
</div>

<div class="form-group">
<label>评论内容:</label>
<textarea class="form-control" id="content" style="word-break:break-all;word-wrap:break-word;"></textarea>
</div>

<div class="form-group">
<input type="button" value="发表评论" class="btn btn-primary" onclick="send()">
</div>
<div class="form-group">
<input type="button" value="连发测试" class="btn btn-primary" onclick="sendTest()">
</div>

<div>
<ul id="contentList" class="list-group">
<!-- <li class="list-group-item">
<span class="badge">评论人: {{ item.user }} 时间:{{item.time}}</span> {{ item.content }}
</li> -->
</ul>
</div>
</div>

<script>
var hostname = '127.0.0.1',
port = 8083,
clientId = 'client-' + generateUUID(),
timeout = 1000,
keepAlive = 2000,
cleanSession = false,
ssl = false,
userName = 'Nick',
password = '12356',
topic = 'ceshi';
client = new Paho.MQTT.Client(hostname, port, clientId);
//建立客户端实例
var options = {
invocationContext: {
host: hostname,
port: port,
path: client.path,
clientId: clientId
},
timeout: timeout,
keepAliveInterval: keepAlive,
cleanSession: cleanSession,
useSSL: ssl,
userName: userName,
password: password,
onSuccess: onConnect,
onFailure: function (e) {
console.log(e);
}
};
client.connect(options);

//连接服务器并注册连接成功处理事件
function onConnect() {
console.log("onConnected");
client.subscribe(topic);
}

client.onConnectionLost = onConnectionLost;

//注册连接断开处理事件
client.onMessageArrived = onMessageArrived;

//注册消息接收处理事件
function onConnectionLost(responseObject) {
console.log(responseObject);
if (responseObject.errorCode !== 0) {
console.log("onConnectionLost:" + responseObject.errorMessage);
console.log("连接已断开");
}
}

//收到消息时处理事件
function onMessageArrived(message) {
var msg = message.payloadString;
console.log("收到消息:" + msg);
console.log("收到消息时间戳:" + new Date().getTime());
var obj = JSON.parse(msg);
/*
<li class="list-group-item">
<span class="badge">评论人: {{ item.user }} 时间:{{item.time}}</span> {{ item.content }}
</li>
*/
$('#contentList').append($(`<li class="list-group-item" > <span class="badge">评论人:` + obj.name + `,时间:` + obj.time + `</span>` + obj.content + `</li>`));
}

//点击发送按钮事件
function send() {
var name = document.getElementById("user").value;
var content = document.getElementById("content").value;
console.log('name :>> ', name);
console.log('content :>> ', content);
var time = new Date().Format("yyyy-MM-dd hh:mm:ss");
console.log('time :>> ', time);
console.log("发送前时间戳:" + new Date().getTime());

if (name) {
$.ajax({
type: "post",
url: "/getMsg",
data: {
name: name,
content: content,
time: time
},
dataType: "json"
});
document.getElementById("content").value = "";
document.getElementById("user").value = "";
}
}

//生成UUID
function generateUUID() {
var d = new Date().getTime();
if (window.performance && typeof window.performance.now === "function") {
d += performance.now(); //use high-precision timer if available
}
var uuid = 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function (c) {
var r = (d + Math.random() * 16) % 16 | 0;
d = Math.floor(d / 16);
return (c == 'x' ? r : (r & 0x3 | 0x8)).toString(16);
});
return uuid;
}

//date时间格式化
Date.prototype.Format = function (fmt) {
var o = {
"M+": this.getMonth() + 1, //月份
"d+": this.getDate(), //日
"h+": this.getHours(), //小时
"m+": this.getMinutes(), //分
"s+": this.getSeconds(), //秒
"q+": Math.floor((this.getMonth() + 3) / 3), //季度
"S": this.getMilliseconds() //毫秒
};
if (/(y+)/.test(fmt)) fmt = fmt.replace(RegExp.$1, (this.getFullYear() + "").substr(4 - RegExp.$1.length));
for (var k in o)
if (new RegExp("(" + k + ")").test(fmt)) fmt = fmt.replace(RegExp.$1, (RegExp.$1.length == 1) ? (o[k]) : (("00" + o[k]).substr(("" + o[k]).length)));
return fmt;
}


function sendTest() {

for (var i = 1; i < 100; i++) {
var name = "ceshi" + i;
var content = "测试内容" + i;
var time;
time = new Date().getTime();
$.ajax({
type: "post",
url: "/getMsg",
data: {
name: name,
content: content,
time: time
},
dataType: "json"
});
}
}
</script>
</body>
</html>

集成后效果

客户端页面

后台处理

在其他的页面客户端也收到了订阅消息

demo源代码

链接:https://pan.baidu.com/s/1UtU_iAEI-DcSfsK8Z_rvxA
提取码:lmyq