From 363fa30fef002d76797681c9e6a074b927aadc45 Mon Sep 17 00:00:00 2001 From: cbs <18617195505@163.com> Date: Tue, 31 Dec 2024 14:47:29 +0800 Subject: [PATCH] =?UTF-8?q?MQTT=E6=9C=8D=E5=8A=A1=E5=8F=91=E9=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 1 + .../module/grpc/GrpcServerApplication.java | 9 -- yudao-module-mqtt/pom.xml | 24 +++ .../yudao-module-mqtt-api/pom.xml | 47 ++++++ .../yudao-module-mqtt-biz/pom.xml | 96 ++++++++++++ .../module/mqtt/MqttServerApplication.java | 11 ++ .../module/mqtt/config/MqttBeforePoint.java | 30 ++++ .../module/mqtt/config/MqttCallBack.java | 54 +++++++ .../yudao/module/mqtt/config/MqttFactory.java | 142 ++++++++++++++++++ .../module/mqtt/config/MqttProperties.java | 35 +++++ .../mqtt/controller/MqttController.java | 34 +++++ .../module/mqtt/enums/DefineSubTopicEnum.java | 70 +++++++++ .../module/mqtt/service/MqttService.java | 5 + .../RobotExecptionMqttServiceImpl.java | 14 ++ .../service/RobotPointMqttServiceImpl.java | 13 ++ .../yudao/module/mqtt/util/MqttUtils.java | 73 +++++++++ .../src/main/resources/application-local.yaml | 42 ++++++ .../src/main/resources/application.yaml | 30 ++++ 18 files changed, 721 insertions(+), 9 deletions(-) create mode 100644 yudao-module-mqtt/pom.xml create mode 100644 yudao-module-mqtt/yudao-module-mqtt-api/pom.xml create mode 100644 yudao-module-mqtt/yudao-module-mqtt-biz/pom.xml create mode 100644 yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/MqttServerApplication.java create mode 100644 yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/config/MqttBeforePoint.java create mode 100644 yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/config/MqttCallBack.java create mode 100644 yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/config/MqttFactory.java create mode 100644 yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/config/MqttProperties.java create mode 100644 yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/controller/MqttController.java create mode 100644 yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/enums/DefineSubTopicEnum.java create mode 100644 yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/service/MqttService.java create mode 100644 yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/service/RobotExecptionMqttServiceImpl.java create mode 100644 yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/service/RobotPointMqttServiceImpl.java create mode 100644 yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/util/MqttUtils.java create mode 100644 yudao-module-mqtt/yudao-module-mqtt-biz/src/main/resources/application-local.yaml create mode 100644 yudao-module-mqtt/yudao-module-mqtt-biz/src/main/resources/application.yaml diff --git a/pom.xml b/pom.xml index 97a132fd7..79e76254d 100644 --- a/pom.xml +++ b/pom.xml @@ -15,6 +15,7 @@ yudao-module-system yudao-module-infra yudao-module-grpc + yudao-module-mqtt diff --git a/yudao-module-grpc/yudao-module-grpc-biz/src/main/java/cn/iocoder/yudao/module/grpc/GrpcServerApplication.java b/yudao-module-grpc/yudao-module-grpc-biz/src/main/java/cn/iocoder/yudao/module/grpc/GrpcServerApplication.java index e382649f0..1129a5d47 100644 --- a/yudao-module-grpc/yudao-module-grpc-biz/src/main/java/cn/iocoder/yudao/module/grpc/GrpcServerApplication.java +++ b/yudao-module-grpc/yudao-module-grpc-biz/src/main/java/cn/iocoder/yudao/module/grpc/GrpcServerApplication.java @@ -8,15 +8,6 @@ import org.springframework.cloud.client.discovery.EnableDiscoveryClient; @EnableDiscoveryClient public class GrpcServerApplication { public static void main(String[] args) { - // 如果你碰到启动的问题,请认真阅读 https://cloud.iocoder.cn/quick-start/ 文章 - // 如果你碰到 启动的问题,请认真阅读 https://cloud.iocoder.cn/quick-start/ 文章 - // 如果你碰到启动的问题,请认真阅读 https://cloud.iocoder.cn/quick-start/ 文章 - SpringApplication.run(GrpcServerApplication.class, args); - System.out.println("启动了"); - - // 如果你碰到启动的问题,请认真阅读 https://cloud.iocoder.cn/quick-start/ 文章 - // 如果你碰到启动的问题,请认真阅读 https://cloud.iocoder.cn/quick-start/ 文章 - // 如果你碰到启动的问题,请认真阅读 https://cloud.iocoder.cn/quick-start/ 文章 } } diff --git a/yudao-module-mqtt/pom.xml b/yudao-module-mqtt/pom.xml new file mode 100644 index 000000000..129a2ca31 --- /dev/null +++ b/yudao-module-mqtt/pom.xml @@ -0,0 +1,24 @@ + + + + cn.iocoder.cloud + yudao + ${revision} + + 4.0.0 + + yudao-module-mqtt-api + yudao-module-mqtt-biz + + yudao-module-mqtt + pom + + ${project.artifactId} + + mqtt 模块,主要提供能力: + 1. 与设备通信 + + + \ No newline at end of file diff --git a/yudao-module-mqtt/yudao-module-mqtt-api/pom.xml b/yudao-module-mqtt/yudao-module-mqtt-api/pom.xml new file mode 100644 index 000000000..536f63a80 --- /dev/null +++ b/yudao-module-mqtt/yudao-module-mqtt-api/pom.xml @@ -0,0 +1,47 @@ + + + + cn.iocoder.cloud + yudao-module-mqtt + ${revision} + + 4.0.0 + yudao-module-mqtt-api + jar + + ${project.artifactId} + + mqtt 模块 API,暴露给其它模块调用 + + + + + + + + org.springdoc + springdoc-openapi-ui + provided + + + + + org.springframework.boot + spring-boot-starter-validation + true + + + + + org.springframework.cloud + spring-cloud-starter-openfeign + true + + + + \ No newline at end of file diff --git a/yudao-module-mqtt/yudao-module-mqtt-biz/pom.xml b/yudao-module-mqtt/yudao-module-mqtt-biz/pom.xml new file mode 100644 index 000000000..df59279b2 --- /dev/null +++ b/yudao-module-mqtt/yudao-module-mqtt-biz/pom.xml @@ -0,0 +1,96 @@ + + + + cn.iocoder.cloud + yudao-module-mqtt + ${revision} + + 4.0.0 + yudao-module-mqtt-biz + jar + + ${project.artifactId} + + mqtt 模块,主要提供能力: + 1. 与设备通信 + + + + + + + cn.iocoder.cloud + yudao-spring-boot-starter-env + + + + + cn.iocoder.cloud + yudao-module-system-api + ${revision} + + + + cn.iocoder.cloud + yudao-module-mqtt-api + ${revision} + + + + org.springframework.integration + spring-integration-mqtt + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + 1.2.5 + + + + io.swagger.core.v3 + swagger-annotations + + + + + + org.springframework.boot + spring-boot-starter-web + + + + org.springframework.boot + spring-boot-starter-test + test + + + org.junit.vintage + junit-vintage-engine + + + + + + + + ${project.artifactId} + + + + org.springframework.boot + spring-boot-maven-plugin + ${spring.boot.version} + + + + repackage + + + + + + + + \ No newline at end of file diff --git a/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/MqttServerApplication.java b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/MqttServerApplication.java new file mode 100644 index 000000000..fa00ee67c --- /dev/null +++ b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/MqttServerApplication.java @@ -0,0 +1,11 @@ +package cn.iododer.yudao.module.mqtt; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class MqttServerApplication { + public static void main(String[] args) { + SpringApplication.run(MqttServerApplication.class, args); + } +} diff --git a/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/config/MqttBeforePoint.java b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/config/MqttBeforePoint.java new file mode 100644 index 000000000..d59b33806 --- /dev/null +++ b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/config/MqttBeforePoint.java @@ -0,0 +1,30 @@ +package cn.iododer.yudao.module.mqtt.config; + +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.core.annotation.Order; +import org.springframework.stereotype.Component; + +/** + * 初始化执行 + */ +@Slf4j +@Order(1) +@Component +public class MqttBeforePoint implements ApplicationRunner { + + @Override + public void run(ApplicationArguments args) throws Exception { + // 初始化Mqtt连接 + try { + MqttFactory.getInstance(); + log.info("[MQTT]初始化成功"); + } catch (MqttException e) { + log.info("[MQTT]初始化失败。考虑是否服务端掉线"); + MqttFactory.reconnect(); + } + } + +} \ No newline at end of file diff --git a/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/config/MqttCallBack.java b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/config/MqttCallBack.java new file mode 100644 index 000000000..24c793f2b --- /dev/null +++ b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/config/MqttCallBack.java @@ -0,0 +1,54 @@ +package cn.iododer.yudao.module.mqtt.config; + +import cn.iododer.yudao.module.mqtt.service.MqttService; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.IMqttAsyncClient; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.springframework.context.annotation.Configuration; + +/** + * MQTT回调 + */ +@Slf4j +@Configuration +public class MqttCallBack implements MqttCallback { + + /** + * 与服务器断开的回调 + */ + @Override + public void connectionLost(Throwable throwable) { + log.info("[MQTT]断开了与服务端的连接。考虑是否服务端掉线 or 回调参数解析报错 or 无默认sub"); + // 执行自动重连 + MqttFactory.reconnect(); + } + + /** + * 消息到达的回调 + * + * @param topic 话题 + * @param mqttMessage 消息内容 + */ + @Override + public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { + String msg = new String(mqttMessage.getPayload()); + log.info("[MQTT]已获取返回数据,当前数据为:{}", msg); + + /*MqttService mqttService = MqttFactory.getMqttFactory(topic); + mqttService.analysisMessage(msg);*/ + } + + /** + * 消息发布成功的回调 + * + * @param token token + */ + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + IMqttAsyncClient client = token.getClient(); + log.info("[MQTT]{}:消息发布成功!", client.getClientId()); + } + +} \ No newline at end of file diff --git a/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/config/MqttFactory.java b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/config/MqttFactory.java new file mode 100644 index 000000000..be2f982c4 --- /dev/null +++ b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/config/MqttFactory.java @@ -0,0 +1,142 @@ +package cn.iododer.yudao.module.mqtt.config; + +import cn.iododer.yudao.module.mqtt.enums.DefineSubTopicEnum; +import cn.iododer.yudao.module.mqtt.service.MqttService; +import cn.iododer.yudao.module.mqtt.service.RobotExecptionMqttServiceImpl; +import cn.iododer.yudao.module.mqtt.service.RobotPointMqttServiceImpl; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import java.util.concurrent.TimeUnit; + +/** + * Mqtt 工厂类 + */ +@Slf4j +@Component +public class MqttFactory { + + @Autowired + private MqttProperties config; + + private static MqttFactory factory; + + private static MqttClient client; + + + @PostConstruct + public void init() { + factory = this; + factory.config = this.config; + } + + + /** + * 获取客户端实例 + * 单例模式:存在即返回,不存在则初始化 + * + * @return client + * @throws MqttException 此处刻意抛出异常,否则无法执行断线重连 + */ + public static MqttClient getInstance() throws MqttException { + if (client == null) { + connect(); + } + return client; + } + + + /** + * 清空客户端实例 + * 当 mqtt 断开连接时,需清空 clientId,再执行断线重连 + */ + public static void clear() { + client = null; + } + + + /** + * 断线重连方法 + */ + public static void reconnect() { + int count = 0; + while (true) { + clear(); + ++ count; + + try { + log.info("----------------[MQTT]即将执行自动重连----------------"); + getInstance(); + log.info("----------------[MQTT]自动重连成功----------------"); + break; + } catch (MqttException e) { + log.error("----------------[MQTT]自动重连失败,当前为第 {} 次尝试----------------", count); + try { + TimeUnit.SECONDS.sleep(5); + } catch (InterruptedException ex) { + log.error("----------------[MQTT]自动重连,休眠失败!----------------", e); + } + } + } + } + + /** + * 客户端连接服务端 + * + * @throws MqttException 此处刻意抛出异常,否则无法执行断线重连 + */ + private static void connect() throws MqttException { + // 创建MQTT客户端对象 + client = new MqttClient(factory.config.getHostUrl(), factory.config.getClientId(), new MemoryPersistence()); + // 连接设置 + MqttConnectOptions options = new MqttConnectOptions(); + // 是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息 + // 设置为true表示每次连接服务器都是以新的身份 + options.setCleanSession(true); + // 设置连接用户名 + options.setUserName(factory.config.getUsername()); + // 设置连接密码 + options.setPassword(factory.config.getPassword().toCharArray()); + // 设置超时时间,单位为秒 + options.setConnectionTimeout(100); + // 设置心跳时间 单位为秒,表示服务器每隔 20 秒的时间向客户端发送心跳判断客户端是否在线 + options.setKeepAliveInterval(20); + // 设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息 + options.setWill("willTopic", (factory.config.getClientId() + "MQTT客户端与服务器断开连接").getBytes(), 0, false); + // 设置回调 + client.setCallback(new MqttCallBack()); + client.connect(options); + + // 设置默认订阅主题 + // 消息等级,与主题数组一一对应 + int[] qos = DefineSubTopicEnum.queryAllQos(); + // 主题 + String[] topics = DefineSubTopicEnum.queryAllTopic(); + // 订阅主题 + client.subscribe(topics, qos); + } + + + /** + * 根据topic去实现自己的业务 + * @param topic + * @return + */ + /*public static MqttService getMqttFactory(String topic){ + DefineSubTopicEnum defineSubTopicEnum = DefineSubTopicEnum.getDefineSubTopicEnumByTopic(topic); + switch (defineSubTopicEnum){ + case ROBOT_EXECPTION: + return new RobotExecptionMqttServiceImpl(); + default : + return new RobotPointMqttServiceImpl(); + } + }*/ + + +} \ No newline at end of file diff --git a/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/config/MqttProperties.java b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/config/MqttProperties.java new file mode 100644 index 000000000..756e1d290 --- /dev/null +++ b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/config/MqttProperties.java @@ -0,0 +1,35 @@ +package cn.iododer.yudao.module.mqtt.config; + +import lombok.Getter; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Configuration; + +/** + * Mqtt Config + * * 备注:以下刻意将 defaultClientId、clientId 区分使用。 + * * 防止断线重连的时候,clientId 被重复拼接时间戳 + */ +@Getter +@Configuration +public class MqttProperties { + + @Value("${mqtt.host}") + private String hostUrl; + + @Value("${mqtt.username}") + private String username; + + @Value("${mqtt.password}") + private String password; + + @Value("${mqtt.clientId}") + private String clientId; + + @Value("${mqtt.qos}") + private int qos; + + public String getClientId() { + clientId = clientId + System.currentTimeMillis(); + return clientId; + } +} diff --git a/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/controller/MqttController.java b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/controller/MqttController.java new file mode 100644 index 000000000..cac384a88 --- /dev/null +++ b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/controller/MqttController.java @@ -0,0 +1,34 @@ +package cn.iododer.yudao.module.mqtt.controller; + +import cn.iododer.yudao.module.mqtt.util.MqttUtils; +import io.swagger.v3.oas.annotations.tags.Tag; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.*; + +@Tag(name = "MQTT通讯") +@RestController +@RequestMapping("/mqtt") +@Validated +@Slf4j +public class MqttController { + + @Autowired + private MqttUtils mqttUtils; + + + @PostMapping("/pub") + public String pub(String topic, String msg) { + try { + mqttUtils.pub( topic, msg); + return "发送成功!"; + } catch (Exception e) { + log.error("消息发送失败!", e); + } + return "发送成功!"; + } + + + +} diff --git a/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/enums/DefineSubTopicEnum.java b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/enums/DefineSubTopicEnum.java new file mode 100644 index 000000000..ee2a3219a --- /dev/null +++ b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/enums/DefineSubTopicEnum.java @@ -0,0 +1,70 @@ +package cn.iododer.yudao.module.mqtt.enums; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.Getter; + +import java.util.ArrayList; +import java.util.List; + +/** + * 默认订阅的话题 -- 枚举类 + */ +@Getter +@AllArgsConstructor +public enum DefineSubTopicEnum { + + //qos 0-至多1次、1-至少1次、2-正好一次 + ROBOT_EXECPTION("robot_execption", 0,"机器人异常"), + ROBOT_POINT("robot_point", 0,"机器人点位"); + + + private final String topic; + + private final int qos; + + private final String msg; + + /** + * 获取所有话题名 + * + * @return topicArr + */ + public static String[] queryAllTopic() { + List topicList = new ArrayList<>(); + for (DefineSubTopicEnum item : DefineSubTopicEnum.values()) { + topicList.add(item.getTopic()); + } + String[] topicArr = new String[topicList.size()]; + topicArr = topicList.toArray(topicArr); + return topicArr; + } + + + /** + * 获取所有qos + * + * @return qosArr + */ + public static int[] queryAllQos() { + List qosList = new ArrayList<>(); + for (DefineSubTopicEnum item : DefineSubTopicEnum.values()) { + qosList.add(item.getQos()); + } + + int[] qosArr = new int[qosList.size()]; + qosArr = qosList.stream().mapToInt(Integer::intValue).toArray(); + + return qosArr; + } + + /*public static DefineSubTopicEnum getDefineSubTopicEnumByTopic(String topic) { + for (DefineSubTopicEnum item : DefineSubTopicEnum.values()) { + if (item.getTopic().equals(topic)) { + return item; + } + } + return null; + }*/ + +} \ No newline at end of file diff --git a/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/service/MqttService.java b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/service/MqttService.java new file mode 100644 index 000000000..e4067d157 --- /dev/null +++ b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/service/MqttService.java @@ -0,0 +1,5 @@ +package cn.iododer.yudao.module.mqtt.service; + +public interface MqttService { + void analysisMessage(String message); +} diff --git a/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/service/RobotExecptionMqttServiceImpl.java b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/service/RobotExecptionMqttServiceImpl.java new file mode 100644 index 000000000..e6f8e4f1c --- /dev/null +++ b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/service/RobotExecptionMqttServiceImpl.java @@ -0,0 +1,14 @@ +package cn.iododer.yudao.module.mqtt.service; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +@Service +@Slf4j +public class RobotExecptionMqttServiceImpl implements MqttService{ + + @Override + public void analysisMessage(String message) { + log.info("处理RobotExecptionMqttServiceImpl的消息 :{}",message); + } +} diff --git a/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/service/RobotPointMqttServiceImpl.java b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/service/RobotPointMqttServiceImpl.java new file mode 100644 index 000000000..cbfaab0fd --- /dev/null +++ b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/service/RobotPointMqttServiceImpl.java @@ -0,0 +1,13 @@ +package cn.iododer.yudao.module.mqtt.service; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +@Service +@Slf4j +public class RobotPointMqttServiceImpl implements MqttService { + @Override + public void analysisMessage(String message) { + log.info("处理RobotPointMqttServiceImpl的消息 :{}",message); + } +} diff --git a/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/util/MqttUtils.java b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/util/MqttUtils.java new file mode 100644 index 000000000..b86c0dbe5 --- /dev/null +++ b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/java/cn/iododer/yudao/module/mqtt/util/MqttUtils.java @@ -0,0 +1,73 @@ +package cn.iododer.yudao.module.mqtt.util; + +import cn.iododer.yudao.module.mqtt.config.MqttFactory; +import cn.iododer.yudao.module.mqtt.config.MqttProperties; +import org.eclipse.paho.client.mqttv3.*; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.nio.charset.StandardCharsets; + +@Component +public class MqttUtils { + + @Autowired + private MqttProperties config; + + /** + * 发布消息 + * + * @param qos 0-至多1次、1-至少1次、2-一次 + * @param retained 是否保留:true-sub重新连接mqtt服务端时,总能拿到该主题的最新消息、false-sub重新连接mqtt服务端时,只能拿到连接后发布的消息 + * @param topic 话题 + * @param message 消息内容 + */ + public void pub(String topic, String message) throws MqttException { + // 获取客户端实例 + MqttClient client = MqttFactory.getInstance(); + MqttMessage mqttMessage = new MqttMessage(); + mqttMessage.setQos(config.getQos()); + mqttMessage.setRetained(false); + // 此处必须指明编码方式,否则会出现订阅端中文乱码的情况 + mqttMessage.setPayload(message.getBytes(StandardCharsets.UTF_8)); + // 主题的目的地,用于发布/订阅信息 + MqttTopic mqttTopic = client.getTopic(topic); + // 提供一种机制来跟踪消息的传递进度 + // 用于在以非阻塞方式(在后台运行)执行发布是跟踪消息的传递进度 + MqttDeliveryToken token; + try { + // 将指定消息发布到主题,但不等待消息传递完成,返回的token可用于跟踪消息的传递状态 + // 一旦此方法干净地返回,消息就已被客户端接受发布,当连接可用,将在后台完成消息传递。 + token = mqttTopic.publish(mqttMessage); + token.waitForCompletion(); + } catch (MqttException e) { + e.printStackTrace(); + } + } + + /** + * 订阅话题 + * + * @param topic 话题 + * @param qos 0-至多1次、1-至少1次、2-一次 + */ + public static void sub(String topic, int qos) throws MqttException { + // 获取客户端实例 + MqttClient client = MqttFactory.getInstance(); + + client.subscribe(topic, qos); + } + + /** + * 断开连接 + */ + public static void disConnect() { + try { + // 获取客户端实例 + MqttClient client = MqttFactory.getInstance(); + client.disconnect(); + } catch (MqttException e) { + e.printStackTrace(); + } + } +} \ No newline at end of file diff --git a/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/resources/application-local.yaml b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/resources/application-local.yaml new file mode 100644 index 000000000..b0f760524 --- /dev/null +++ b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/resources/application-local.yaml @@ -0,0 +1,42 @@ +spring: + cloud: + nacos: + server-addr: 127.0.0.1:8848 # Nacos 服务器地址 + username: # Nacos 账号 + password: # Nacos 密码 + discovery: # 【配置中心】配置项 + namespace: dev # 命名空间。这里使用 dev 开发环境 + group: DEFAULT_GROUP # 使用的 Nacos 配置分组,默认为 DEFAULT_GROUP + metadata: + version: 1.0.0 # 服务实例的版本号,可用于灰度发布 + config: # 【注册中心】配置项 + namespace: dev # 命名空间。这里使用 dev 开发环境 + group: DEFAULT_GROUP # 使用的 Nacos 配置分组,默认为 DEFAULT_GROUP + +# Lock4j 配置项 +lock4j: + acquire-timeout: 3000 # 获取分布式锁超时时间,默认为 3000 毫秒 + expire: 30000 # 分布式锁的超时时间,默认为 30 毫秒 + +--- #################### 监控相关配置 #################### + +# Actuator 监控端点的配置项 +management: + endpoints: + web: + base-path: /actuator # Actuator 提供的 API 接口的根目录。默认为 /actuator + exposure: + include: '*' # 需要开放的端点。默认值只打开 health 和 info 两个端点。通过设置 * ,可以开放所有端点。 + + +# MQTT +mqtt: + host: tcp://127.0.0.1:1883 + username: admin + password: admin + qos: 1 + clientId: mqttx_b82345a52 + timeout: 10 + keepalive: 20 + + diff --git a/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/resources/application.yaml b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/resources/application.yaml new file mode 100644 index 000000000..01c21f62a --- /dev/null +++ b/yudao-module-mqtt/yudao-module-mqtt-biz/src/main/resources/application.yaml @@ -0,0 +1,30 @@ + +spring: + application: + name: mqtt-server + main: + allow-circular-references: true # 允许循环依赖,因为项目是三层架构,无法避免这个情况。 + allow-bean-definition-overriding: true # 允许 Bean 覆盖,例如说 Feign 等会存在重复定义的服务 + + profiles: + active: local + + mvc: + pathmatch: + matching-strategy: ANT_PATH_MATCHER # 解决 SpringFox 与 SpringBoot 2.6.x 不兼容的问题,参见 SpringFoxHandlerProviderBeanPostProcessor 类 + + # Jackson 配置项 + jackson: + serialization: + write-dates-as-timestamps: true # 设置 LocalDateTime 的格式,使用时间戳 + write-date-timestamps-as-nanoseconds: false # 设置不使用 nanoseconds 的格式。例如说 1611460870.401,而是直接 1611460870401 + write-durations-as-timestamps: true # 设置 Duration 的格式,使用时间戳 + fail-on-empty-beans: false # 允许序列化无属性的 Bean + +server: + port: 48083 + + +debug: false + +