diff --git a/pom.xml b/pom.xml
index 97a132fd7..79e76254d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -15,6 +15,7 @@
yudao-module-system
yudao-module-infra
yudao-module-grpc
+ yudao-module-mqtt
diff --git a/yudao-module-grpc/yudao-module-grpc-biz/src/main/java/cn/iocoder/yudao/module/grpc/GrpcServerApplication.java b/yudao-module-grpc/yudao-module-grpc-biz/src/main/java/cn/iocoder/yudao/module/grpc/GrpcServerApplication.java
index e382649f0..1129a5d47 100644
--- a/yudao-module-grpc/yudao-module-grpc-biz/src/main/java/cn/iocoder/yudao/module/grpc/GrpcServerApplication.java
+++ b/yudao-module-grpc/yudao-module-grpc-biz/src/main/java/cn/iocoder/yudao/module/grpc/GrpcServerApplication.java
@@ -8,15 +8,6 @@ import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
@EnableDiscoveryClient
public class GrpcServerApplication {
public static void main(String[] args) {
- // 如果你碰到启动的问题,请认真阅读 https://cloud.iocoder.cn/quick-start/ 文章
- // 如果你碰到 启动的问题,请认真阅读 https://cloud.iocoder.cn/quick-start/ 文章
- // 如果你碰到启动的问题,请认真阅读 https://cloud.iocoder.cn/quick-start/ 文章
-
SpringApplication.run(GrpcServerApplication.class, args);
- System.out.println("启动了");
-
- // 如果你碰到启动的问题,请认真阅读 https://cloud.iocoder.cn/quick-start/ 文章
- // 如果你碰到启动的问题,请认真阅读 https://cloud.iocoder.cn/quick-start/ 文章
- // 如果你碰到启动的问题,请认真阅读 https://cloud.iocoder.cn/quick-start/ 文章
}
}
diff --git a/yudao-module-mqtt/pom.xml b/yudao-module-mqtt/pom.xml
new file mode 100644
index 000000000..129a2ca31
--- /dev/null
+++ b/yudao-module-mqtt/pom.xml
@@ -0,0 +1,24 @@
+
+
+
+ cn.iocoder.cloud
+ yudao
+ ${revision}
+
+ 4.0.0
+
+ yudao-module-mqtt-api
+ yudao-module-mqtt-biz
+
+ yudao-module-mqtt
+ pom
+
+ ${project.artifactId}
+
+ mqtt 模块,主要提供能力:
+ 1. 与设备通信
+
+
+
\ No newline at end of file
diff --git a/yudao-module-mqtt/yudao-module-mqtt-api/pom.xml b/yudao-module-mqtt/yudao-module-mqtt-api/pom.xml
new file mode 100644
index 000000000..536f63a80
--- /dev/null
+++ b/yudao-module-mqtt/yudao-module-mqtt-api/pom.xml
@@ -0,0 +1,47 @@
+
+
+
+ cn.iocoder.cloud
+ yudao-module-mqtt
+ ${revision}
+
+ 4.0.0
+ yudao-module-mqtt-api
+ jar
+
+ ${project.artifactId}
+
+ mqtt 模块 API,暴露给其它模块调用
+
+
+
+
+
+
+
+ org.springdoc
+ springdoc-openapi-ui
+ provided
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-validation
+ true
+
+
+
+
+ org.springframework.cloud
+ spring-cloud-starter-openfeign
+ true
+
+
+
+
\ No newline at end of file
diff --git a/yudao-module-mqtt/yudao-module-mqtt-biz/pom.xml b/yudao-module-mqtt/yudao-module-mqtt-biz/pom.xml
new file mode 100644
index 000000000..df59279b2
--- /dev/null
+++ b/yudao-module-mqtt/yudao-module-mqtt-biz/pom.xml
@@ -0,0 +1,96 @@
+
+
+
+ cn.iocoder.cloud
+ yudao-module-mqtt
+ ${revision}
+
+ 4.0.0
+ yudao-module-mqtt-biz
+ jar
+
+ ${project.artifactId}
+
+ mqtt 模块,主要提供能力:
+ 1. 与设备通信
+
+
+
+
+
+
+ cn.iocoder.cloud
+ yudao-spring-boot-starter-env
+
+
+
+
+ cn.iocoder.cloud
+ yudao-module-system-api
+ ${revision}
+
+
+
+ cn.iocoder.cloud
+ yudao-module-mqtt-api
+ ${revision}
+
+
+
+ org.springframework.integration
+ spring-integration-mqtt
+
+
+ org.eclipse.paho
+ org.eclipse.paho.client.mqttv3
+ 1.2.5
+
+
+
+ io.swagger.core.v3
+ swagger-annotations
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+ org.junit.vintage
+ junit-vintage-engine
+
+
+
+
+
+
+
+ ${project.artifactId}
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+ ${spring.boot.version}
+
+
+
+ repackage
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/MqttServerApplication.java b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/MqttServerApplication.java
new file mode 100644
index 000000000..fa00ee67c
--- /dev/null
+++ b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/MqttServerApplication.java
@@ -0,0 +1,11 @@
+package cn.iododer.yudao.module.mqtt;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class MqttServerApplication {
+ public static void main(String[] args) {
+ SpringApplication.run(MqttServerApplication.class, args);
+ }
+}
diff --git a/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/config/MqttBeforePoint.java b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/config/MqttBeforePoint.java
new file mode 100644
index 000000000..d59b33806
--- /dev/null
+++ b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/config/MqttBeforePoint.java
@@ -0,0 +1,30 @@
+package cn.iododer.yudao.module.mqtt.config;
+
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.core.annotation.Order;
+import org.springframework.stereotype.Component;
+
+/**
+ * 初始化执行
+ */
+@Slf4j
+@Order(1)
+@Component
+public class MqttBeforePoint implements ApplicationRunner {
+
+ @Override
+ public void run(ApplicationArguments args) throws Exception {
+ // 初始化Mqtt连接
+ try {
+ MqttFactory.getInstance();
+ log.info("[MQTT]初始化成功");
+ } catch (MqttException e) {
+ log.info("[MQTT]初始化失败。考虑是否服务端掉线");
+ MqttFactory.reconnect();
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/config/MqttCallBack.java b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/config/MqttCallBack.java
new file mode 100644
index 000000000..24c793f2b
--- /dev/null
+++ b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/config/MqttCallBack.java
@@ -0,0 +1,54 @@
+package cn.iododer.yudao.module.mqtt.config;
+
+import cn.iododer.yudao.module.mqtt.service.MqttService;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * MQTT回调
+ */
+@Slf4j
+@Configuration
+public class MqttCallBack implements MqttCallback {
+
+ /**
+ * 与服务器断开的回调
+ */
+ @Override
+ public void connectionLost(Throwable throwable) {
+ log.info("[MQTT]断开了与服务端的连接。考虑是否服务端掉线 or 回调参数解析报错 or 无默认sub");
+ // 执行自动重连
+ MqttFactory.reconnect();
+ }
+
+ /**
+ * 消息到达的回调
+ *
+ * @param topic 话题
+ * @param mqttMessage 消息内容
+ */
+ @Override
+ public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
+ String msg = new String(mqttMessage.getPayload());
+ log.info("[MQTT]已获取返回数据,当前数据为:{}", msg);
+
+ /*MqttService mqttService = MqttFactory.getMqttFactory(topic);
+ mqttService.analysisMessage(msg);*/
+ }
+
+ /**
+ * 消息发布成功的回调
+ *
+ * @param token token
+ */
+ @Override
+ public void deliveryComplete(IMqttDeliveryToken token) {
+ IMqttAsyncClient client = token.getClient();
+ log.info("[MQTT]{}:消息发布成功!", client.getClientId());
+ }
+
+}
\ No newline at end of file
diff --git a/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/config/MqttFactory.java b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/config/MqttFactory.java
new file mode 100644
index 000000000..be2f982c4
--- /dev/null
+++ b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/config/MqttFactory.java
@@ -0,0 +1,142 @@
+package cn.iododer.yudao.module.mqtt.config;
+
+import cn.iododer.yudao.module.mqtt.enums.DefineSubTopicEnum;
+import cn.iododer.yudao.module.mqtt.service.MqttService;
+import cn.iododer.yudao.module.mqtt.service.RobotExecptionMqttServiceImpl;
+import cn.iododer.yudao.module.mqtt.service.RobotPointMqttServiceImpl;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Mqtt 工厂类
+ */
+@Slf4j
+@Component
+public class MqttFactory {
+
+ @Autowired
+ private MqttProperties config;
+
+ private static MqttFactory factory;
+
+ private static MqttClient client;
+
+
+ @PostConstruct
+ public void init() {
+ factory = this;
+ factory.config = this.config;
+ }
+
+
+ /**
+ * 获取客户端实例
+ * 单例模式:存在即返回,不存在则初始化
+ *
+ * @return client
+ * @throws MqttException 此处刻意抛出异常,否则无法执行断线重连
+ */
+ public static MqttClient getInstance() throws MqttException {
+ if (client == null) {
+ connect();
+ }
+ return client;
+ }
+
+
+ /**
+ * 清空客户端实例
+ * 当 mqtt 断开连接时,需清空 clientId,再执行断线重连
+ */
+ public static void clear() {
+ client = null;
+ }
+
+
+ /**
+ * 断线重连方法
+ */
+ public static void reconnect() {
+ int count = 0;
+ while (true) {
+ clear();
+ ++ count;
+
+ try {
+ log.info("----------------[MQTT]即将执行自动重连----------------");
+ getInstance();
+ log.info("----------------[MQTT]自动重连成功----------------");
+ break;
+ } catch (MqttException e) {
+ log.error("----------------[MQTT]自动重连失败,当前为第 {} 次尝试----------------", count);
+ try {
+ TimeUnit.SECONDS.sleep(5);
+ } catch (InterruptedException ex) {
+ log.error("----------------[MQTT]自动重连,休眠失败!----------------", e);
+ }
+ }
+ }
+ }
+
+ /**
+ * 客户端连接服务端
+ *
+ * @throws MqttException 此处刻意抛出异常,否则无法执行断线重连
+ */
+ private static void connect() throws MqttException {
+ // 创建MQTT客户端对象
+ client = new MqttClient(factory.config.getHostUrl(), factory.config.getClientId(), new MemoryPersistence());
+ // 连接设置
+ MqttConnectOptions options = new MqttConnectOptions();
+ // 是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
+ // 设置为true表示每次连接服务器都是以新的身份
+ options.setCleanSession(true);
+ // 设置连接用户名
+ options.setUserName(factory.config.getUsername());
+ // 设置连接密码
+ options.setPassword(factory.config.getPassword().toCharArray());
+ // 设置超时时间,单位为秒
+ options.setConnectionTimeout(100);
+ // 设置心跳时间 单位为秒,表示服务器每隔 20 秒的时间向客户端发送心跳判断客户端是否在线
+ options.setKeepAliveInterval(20);
+ // 设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
+ options.setWill("willTopic", (factory.config.getClientId() + "MQTT客户端与服务器断开连接").getBytes(), 0, false);
+ // 设置回调
+ client.setCallback(new MqttCallBack());
+ client.connect(options);
+
+ // 设置默认订阅主题
+ // 消息等级,与主题数组一一对应
+ int[] qos = DefineSubTopicEnum.queryAllQos();
+ // 主题
+ String[] topics = DefineSubTopicEnum.queryAllTopic();
+ // 订阅主题
+ client.subscribe(topics, qos);
+ }
+
+
+ /**
+ * 根据topic去实现自己的业务
+ * @param topic
+ * @return
+ */
+ /*public static MqttService getMqttFactory(String topic){
+ DefineSubTopicEnum defineSubTopicEnum = DefineSubTopicEnum.getDefineSubTopicEnumByTopic(topic);
+ switch (defineSubTopicEnum){
+ case ROBOT_EXECPTION:
+ return new RobotExecptionMqttServiceImpl();
+ default :
+ return new RobotPointMqttServiceImpl();
+ }
+ }*/
+
+
+}
\ No newline at end of file
diff --git a/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/config/MqttProperties.java b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/config/MqttProperties.java
new file mode 100644
index 000000000..756e1d290
--- /dev/null
+++ b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/config/MqttProperties.java
@@ -0,0 +1,35 @@
+package cn.iododer.yudao.module.mqtt.config;
+
+import lombok.Getter;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * Mqtt Config
+ * * 备注:以下刻意将 defaultClientId、clientId 区分使用。
+ * * 防止断线重连的时候,clientId 被重复拼接时间戳
+ */
+@Getter
+@Configuration
+public class MqttProperties {
+
+ @Value("${mqtt.host}")
+ private String hostUrl;
+
+ @Value("${mqtt.username}")
+ private String username;
+
+ @Value("${mqtt.password}")
+ private String password;
+
+ @Value("${mqtt.clientId}")
+ private String clientId;
+
+ @Value("${mqtt.qos}")
+ private int qos;
+
+ public String getClientId() {
+ clientId = clientId + System.currentTimeMillis();
+ return clientId;
+ }
+}
diff --git a/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/controller/MqttController.java b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/controller/MqttController.java
new file mode 100644
index 000000000..cac384a88
--- /dev/null
+++ b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/controller/MqttController.java
@@ -0,0 +1,34 @@
+package cn.iododer.yudao.module.mqtt.controller;
+
+import cn.iododer.yudao.module.mqtt.util.MqttUtils;
+import io.swagger.v3.oas.annotations.tags.Tag;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.validation.annotation.Validated;
+import org.springframework.web.bind.annotation.*;
+
+@Tag(name = "MQTT通讯")
+@RestController
+@RequestMapping("/mqtt")
+@Validated
+@Slf4j
+public class MqttController {
+
+ @Autowired
+ private MqttUtils mqttUtils;
+
+
+ @PostMapping("/pub")
+ public String pub(String topic, String msg) {
+ try {
+ mqttUtils.pub( topic, msg);
+ return "发送成功!";
+ } catch (Exception e) {
+ log.error("消息发送失败!", e);
+ }
+ return "发送成功!";
+ }
+
+
+
+}
diff --git a/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/enums/DefineSubTopicEnum.java b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/enums/DefineSubTopicEnum.java
new file mode 100644
index 000000000..ee2a3219a
--- /dev/null
+++ b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/enums/DefineSubTopicEnum.java
@@ -0,0 +1,70 @@
+package cn.iododer.yudao.module.mqtt.enums;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.Getter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * 默认订阅的话题 -- 枚举类
+ */
+@Getter
+@AllArgsConstructor
+public enum DefineSubTopicEnum {
+
+ //qos 0-至多1次、1-至少1次、2-正好一次
+ ROBOT_EXECPTION("robot_execption", 0,"机器人异常"),
+ ROBOT_POINT("robot_point", 0,"机器人点位");
+
+
+ private final String topic;
+
+ private final int qos;
+
+ private final String msg;
+
+ /**
+ * 获取所有话题名
+ *
+ * @return topicArr
+ */
+ public static String[] queryAllTopic() {
+ List topicList = new ArrayList<>();
+ for (DefineSubTopicEnum item : DefineSubTopicEnum.values()) {
+ topicList.add(item.getTopic());
+ }
+ String[] topicArr = new String[topicList.size()];
+ topicArr = topicList.toArray(topicArr);
+ return topicArr;
+ }
+
+
+ /**
+ * 获取所有qos
+ *
+ * @return qosArr
+ */
+ public static int[] queryAllQos() {
+ List qosList = new ArrayList<>();
+ for (DefineSubTopicEnum item : DefineSubTopicEnum.values()) {
+ qosList.add(item.getQos());
+ }
+
+ int[] qosArr = new int[qosList.size()];
+ qosArr = qosList.stream().mapToInt(Integer::intValue).toArray();
+
+ return qosArr;
+ }
+
+ /*public static DefineSubTopicEnum getDefineSubTopicEnumByTopic(String topic) {
+ for (DefineSubTopicEnum item : DefineSubTopicEnum.values()) {
+ if (item.getTopic().equals(topic)) {
+ return item;
+ }
+ }
+ return null;
+ }*/
+
+}
\ No newline at end of file
diff --git a/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/service/MqttService.java b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/service/MqttService.java
new file mode 100644
index 000000000..e4067d157
--- /dev/null
+++ b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/service/MqttService.java
@@ -0,0 +1,5 @@
+package cn.iododer.yudao.module.mqtt.service;
+
+public interface MqttService {
+ void analysisMessage(String message);
+}
diff --git a/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/service/RobotExecptionMqttServiceImpl.java b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/service/RobotExecptionMqttServiceImpl.java
new file mode 100644
index 000000000..e6f8e4f1c
--- /dev/null
+++ b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/service/RobotExecptionMqttServiceImpl.java
@@ -0,0 +1,14 @@
+package cn.iododer.yudao.module.mqtt.service;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+
+@Service
+@Slf4j
+public class RobotExecptionMqttServiceImpl implements MqttService{
+
+ @Override
+ public void analysisMessage(String message) {
+ log.info("处理RobotExecptionMqttServiceImpl的消息 :{}",message);
+ }
+}
diff --git a/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/service/RobotPointMqttServiceImpl.java b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/service/RobotPointMqttServiceImpl.java
new file mode 100644
index 000000000..cbfaab0fd
--- /dev/null
+++ b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/service/RobotPointMqttServiceImpl.java
@@ -0,0 +1,13 @@
+package cn.iododer.yudao.module.mqtt.service;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+
+@Service
+@Slf4j
+public class RobotPointMqttServiceImpl implements MqttService {
+ @Override
+ public void analysisMessage(String message) {
+ log.info("处理RobotPointMqttServiceImpl的消息 :{}",message);
+ }
+}
diff --git a/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/util/MqttUtils.java b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/util/MqttUtils.java
new file mode 100644
index 000000000..b86c0dbe5
--- /dev/null
+++ b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/util/MqttUtils.java
@@ -0,0 +1,73 @@
+package cn.iododer.yudao.module.mqtt.util;
+
+import cn.iododer.yudao.module.mqtt.config.MqttFactory;
+import cn.iododer.yudao.module.mqtt.config.MqttProperties;
+import org.eclipse.paho.client.mqttv3.*;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.nio.charset.StandardCharsets;
+
+@Component
+public class MqttUtils {
+
+ @Autowired
+ private MqttProperties config;
+
+ /**
+ * 发布消息
+ *
+ * @param qos 0-至多1次、1-至少1次、2-一次
+ * @param retained 是否保留:true-sub重新连接mqtt服务端时,总能拿到该主题的最新消息、false-sub重新连接mqtt服务端时,只能拿到连接后发布的消息
+ * @param topic 话题
+ * @param message 消息内容
+ */
+ public void pub(String topic, String message) throws MqttException {
+ // 获取客户端实例
+ MqttClient client = MqttFactory.getInstance();
+ MqttMessage mqttMessage = new MqttMessage();
+ mqttMessage.setQos(config.getQos());
+ mqttMessage.setRetained(false);
+ // 此处必须指明编码方式,否则会出现订阅端中文乱码的情况
+ mqttMessage.setPayload(message.getBytes(StandardCharsets.UTF_8));
+ // 主题的目的地,用于发布/订阅信息
+ MqttTopic mqttTopic = client.getTopic(topic);
+ // 提供一种机制来跟踪消息的传递进度
+ // 用于在以非阻塞方式(在后台运行)执行发布是跟踪消息的传递进度
+ MqttDeliveryToken token;
+ try {
+ // 将指定消息发布到主题,但不等待消息传递完成,返回的token可用于跟踪消息的传递状态
+ // 一旦此方法干净地返回,消息就已被客户端接受发布,当连接可用,将在后台完成消息传递。
+ token = mqttTopic.publish(mqttMessage);
+ token.waitForCompletion();
+ } catch (MqttException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * 订阅话题
+ *
+ * @param topic 话题
+ * @param qos 0-至多1次、1-至少1次、2-一次
+ */
+ public static void sub(String topic, int qos) throws MqttException {
+ // 获取客户端实例
+ MqttClient client = MqttFactory.getInstance();
+
+ client.subscribe(topic, qos);
+ }
+
+ /**
+ * 断开连接
+ */
+ public static void disConnect() {
+ try {
+ // 获取客户端实例
+ MqttClient client = MqttFactory.getInstance();
+ client.disconnect();
+ } catch (MqttException e) {
+ e.printStackTrace();
+ }
+ }
+}
\ No newline at end of file
diff --git a/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/resources/application-local.yaml b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/resources/application-local.yaml
new file mode 100644
index 000000000..b0f760524
--- /dev/null
+++ b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/resources/application-local.yaml
@@ -0,0 +1,42 @@
+spring:
+ cloud:
+ nacos:
+ server-addr: 127.0.0.1:8848 # Nacos 服务器地址
+ username: # Nacos 账号
+ password: # Nacos 密码
+ discovery: # 【配置中心】配置项
+ namespace: dev # 命名空间。这里使用 dev 开发环境
+ group: DEFAULT_GROUP # 使用的 Nacos 配置分组,默认为 DEFAULT_GROUP
+ metadata:
+ version: 1.0.0 # 服务实例的版本号,可用于灰度发布
+ config: # 【注册中心】配置项
+ namespace: dev # 命名空间。这里使用 dev 开发环境
+ group: DEFAULT_GROUP # 使用的 Nacos 配置分组,默认为 DEFAULT_GROUP
+
+# Lock4j 配置项
+lock4j:
+ acquire-timeout: 3000 # 获取分布式锁超时时间,默认为 3000 毫秒
+ expire: 30000 # 分布式锁的超时时间,默认为 30 毫秒
+
+--- #################### 监控相关配置 ####################
+
+# Actuator 监控端点的配置项
+management:
+ endpoints:
+ web:
+ base-path: /actuator # Actuator 提供的 API 接口的根目录。默认为 /actuator
+ exposure:
+ include: '*' # 需要开放的端点。默认值只打开 health 和 info 两个端点。通过设置 * ,可以开放所有端点。
+
+
+# MQTT
+mqtt:
+ host: tcp://127.0.0.1:1883
+ username: admin
+ password: admin
+ qos: 1
+ clientId: mqttx_b82345a52
+ timeout: 10
+ keepalive: 20
+
+
diff --git a/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/resources/application.yaml b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/resources/application.yaml
new file mode 100644
index 000000000..01c21f62a
--- /dev/null
+++ b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/resources/application.yaml
@@ -0,0 +1,30 @@
+
+spring:
+ application:
+ name: mqtt-server
+ main:
+ allow-circular-references: true # 允许循环依赖,因为项目是三层架构,无法避免这个情况。
+ allow-bean-definition-overriding: true # 允许 Bean 覆盖,例如说 Feign 等会存在重复定义的服务
+
+ profiles:
+ active: local
+
+ mvc:
+ pathmatch:
+ matching-strategy: ANT_PATH_MATCHER # 解决 SpringFox 与 SpringBoot 2.6.x 不兼容的问题,参见 SpringFoxHandlerProviderBeanPostProcessor 类
+
+ # Jackson 配置项
+ jackson:
+ serialization:
+ write-dates-as-timestamps: true # 设置 LocalDateTime 的格式,使用时间戳
+ write-date-timestamps-as-nanoseconds: false # 设置不使用 nanoseconds 的格式。例如说 1611460870.401,而是直接 1611460870401
+ write-durations-as-timestamps: true # 设置 Duration 的格式,使用时间戳
+ fail-on-empty-beans: false # 允许序列化无属性的 Bean
+
+server:
+ port: 48083
+
+
+debug: false
+
+