MQTT服务发送

This commit is contained in:
cbs 2024-12-31 14:47:29 +08:00
parent 0c4ca50b3f
commit 363fa30fef
18 changed files with 721 additions and 9 deletions

View File

@ -15,6 +15,7 @@
<module>yudao-module-system</module>
<module>yudao-module-infra</module>
<module>yudao-module-grpc</module>
<module>yudao-module-mqtt</module>
<!-- <module>yudao-module-member</module>-->
<!-- <module>yudao-module-bpm</module>-->
<!-- <module>yudao-module-pay</module>-->

View File

@ -8,15 +8,6 @@ import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
@EnableDiscoveryClient
public class GrpcServerApplication {
public static void main(String[] args) {
// 如果你碰到启动的问题请认真阅读 https://cloud.iocoder.cn/quick-start/ 文章
// 如果你碰到 启动的问题请认真阅读 https://cloud.iocoder.cn/quick-start/ 文章
// 如果你碰到启动的问题请认真阅读 https://cloud.iocoder.cn/quick-start/ 文章
SpringApplication.run(GrpcServerApplication.class, args);
System.out.println("启动了");
// 如果你碰到启动的问题请认真阅读 https://cloud.iocoder.cn/quick-start/ 文章
// 如果你碰到启动的问题请认真阅读 https://cloud.iocoder.cn/quick-start/ 文章
// 如果你碰到启动的问题请认真阅读 https://cloud.iocoder.cn/quick-start/ 文章
}
}

24
yudao-module-mqtt/pom.xml Normal file
View File

@ -0,0 +1,24 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>cn.iocoder.cloud</groupId>
<artifactId>yudao</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<modules>
<module>yudao-module-mqtt-api</module>
<module>yudao-module-mqtt-biz</module>
</modules>
<artifactId>yudao-module-mqtt</artifactId>
<packaging>pom</packaging>
<name>${project.artifactId}</name>
<description>
mqtt 模块,主要提供能力:
1. 与设备通信
</description>
</project>

View File

@ -0,0 +1,47 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>cn.iocoder.cloud</groupId>
<artifactId>yudao-module-mqtt</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>yudao-module-mqtt-api</artifactId>
<packaging>jar</packaging>
<name>${project.artifactId}</name>
<description>
mqtt 模块 API暴露给其它模块调用
</description>
<dependencies>
<!--<dependency>
<groupId>cn.iocoder.cloud</groupId>
<artifactId>yudao-common</artifactId>
</dependency>-->
<!-- Web 相关 -->
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-ui</artifactId>
<scope>provided</scope>
</dependency>
<!-- 参数校验 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
<optional>true</optional>
</dependency>
<!-- RPC 远程调用相关 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,96 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>cn.iocoder.cloud</groupId>
<artifactId>yudao-module-mqtt</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>yudao-module-mqtt-biz</artifactId>
<packaging>jar</packaging>
<name>${project.artifactId}</name>
<description>
mqtt 模块,主要提供能力:
1. 与设备通信
</description>
<dependencies>
<!-- Spring Cloud 基础 -->
<dependency>
<groupId>cn.iocoder.cloud</groupId>
<artifactId>yudao-spring-boot-starter-env</artifactId>
</dependency>
<!-- 依赖服务 -->
<dependency>
<groupId>cn.iocoder.cloud</groupId>
<artifactId>yudao-module-system-api</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>cn.iocoder.cloud</groupId>
<artifactId>yudao-module-mqtt-api</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
<dependency>
<groupId>io.swagger.core.v3</groupId>
<artifactId>swagger-annotations</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<!-- 设置构建的 jar 包名 -->
<finalName>${project.artifactId}</finalName>
<plugins>
<!-- 打包 -->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring.boot.version}</version>
<executions>
<execution>
<goals>
<goal>repackage</goal> <!-- 将引入的 jar 打入其中 -->
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,11 @@
package cn.iododer.yudao.module.mqtt;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class MqttServerApplication {
public static void main(String[] args) {
SpringApplication.run(MqttServerApplication.class, args);
}
}

View File

@ -0,0 +1,30 @@
package cn.iododer.yudao.module.mqtt.config;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
/**
* 初始化执行
*/
@Slf4j
@Order(1)
@Component
public class MqttBeforePoint implements ApplicationRunner {
@Override
public void run(ApplicationArguments args) throws Exception {
// 初始化Mqtt连接
try {
MqttFactory.getInstance();
log.info("[MQTT]初始化成功");
} catch (MqttException e) {
log.info("[MQTT]初始化失败。考虑是否服务端掉线");
MqttFactory.reconnect();
}
}
}

View File

@ -0,0 +1,54 @@
package cn.iododer.yudao.module.mqtt.config;
import cn.iododer.yudao.module.mqtt.service.MqttService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.context.annotation.Configuration;
/**
* MQTT回调
*/
@Slf4j
@Configuration
public class MqttCallBack implements MqttCallback {
/**
* 与服务器断开的回调
*/
@Override
public void connectionLost(Throwable throwable) {
log.info("[MQTT]断开了与服务端的连接。考虑是否服务端掉线 or 回调参数解析报错 or 无默认sub");
// 执行自动重连
MqttFactory.reconnect();
}
/**
* 消息到达的回调
*
* @param topic 话题
* @param mqttMessage 消息内容
*/
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
String msg = new String(mqttMessage.getPayload());
log.info("[MQTT]已获取返回数据,当前数据为:{}", msg);
/*MqttService mqttService = MqttFactory.getMqttFactory(topic);
mqttService.analysisMessage(msg);*/
}
/**
* 消息发布成功的回调
*
* @param token token
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
IMqttAsyncClient client = token.getClient();
log.info("[MQTT]{}:消息发布成功!", client.getClientId());
}
}

View File

@ -0,0 +1,142 @@
package cn.iododer.yudao.module.mqtt.config;
import cn.iododer.yudao.module.mqtt.enums.DefineSubTopicEnum;
import cn.iododer.yudao.module.mqtt.service.MqttService;
import cn.iododer.yudao.module.mqtt.service.RobotExecptionMqttServiceImpl;
import cn.iododer.yudao.module.mqtt.service.RobotPointMqttServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.concurrent.TimeUnit;
/**
* Mqtt 工厂类
*/
@Slf4j
@Component
public class MqttFactory {
@Autowired
private MqttProperties config;
private static MqttFactory factory;
private static MqttClient client;
@PostConstruct
public void init() {
factory = this;
factory.config = this.config;
}
/**
* 获取客户端实例
* 单例模式存在即返回不存在则初始化
*
* @return client
* @throws MqttException 此处刻意抛出异常否则无法执行断线重连
*/
public static MqttClient getInstance() throws MqttException {
if (client == null) {
connect();
}
return client;
}
/**
* 清空客户端实例
* mqtt 断开连接时需清空 clientId再执行断线重连
*/
public static void clear() {
client = null;
}
/**
* 断线重连方法
*/
public static void reconnect() {
int count = 0;
while (true) {
clear();
++ count;
try {
log.info("----------------[MQTT]即将执行自动重连----------------");
getInstance();
log.info("----------------[MQTT]自动重连成功----------------");
break;
} catch (MqttException e) {
log.error("----------------[MQTT]自动重连失败,当前为第 {} 次尝试----------------", count);
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException ex) {
log.error("----------------[MQTT]自动重连,休眠失败!----------------", e);
}
}
}
}
/**
* 客户端连接服务端
*
* @throws MqttException 此处刻意抛出异常否则无法执行断线重连
*/
private static void connect() throws MqttException {
// 创建MQTT客户端对象
client = new MqttClient(factory.config.getHostUrl(), factory.config.getClientId(), new MemoryPersistence());
// 连接设置
MqttConnectOptions options = new MqttConnectOptions();
// 是否清空session设置false表示服务器会保留客户端的连接记录订阅主题qos,客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
// 设置为true表示每次连接服务器都是以新的身份
options.setCleanSession(true);
// 设置连接用户名
options.setUserName(factory.config.getUsername());
// 设置连接密码
options.setPassword(factory.config.getPassword().toCharArray());
// 设置超时时间单位为秒
options.setConnectionTimeout(100);
// 设置心跳时间 单位为秒表示服务器每隔 20 秒的时间向客户端发送心跳判断客户端是否在线
options.setKeepAliveInterval(20);
// 设置遗嘱消息的话题若客户端和服务器之间的连接意外断开服务器将发布客户端的遗嘱信息
options.setWill("willTopic", (factory.config.getClientId() + "MQTT客户端与服务器断开连接").getBytes(), 0, false);
// 设置回调
client.setCallback(new MqttCallBack());
client.connect(options);
// 设置默认订阅主题
// 消息等级与主题数组一一对应
int[] qos = DefineSubTopicEnum.queryAllQos();
// 主题
String[] topics = DefineSubTopicEnum.queryAllTopic();
// 订阅主题
client.subscribe(topics, qos);
}
/**
* 根据topic去实现自己的业务
* @param topic
* @return
*/
/*public static MqttService getMqttFactory(String topic){
DefineSubTopicEnum defineSubTopicEnum = DefineSubTopicEnum.getDefineSubTopicEnumByTopic(topic);
switch (defineSubTopicEnum){
case ROBOT_EXECPTION:
return new RobotExecptionMqttServiceImpl();
default :
return new RobotPointMqttServiceImpl();
}
}*/
}

View File

@ -0,0 +1,35 @@
package cn.iododer.yudao.module.mqtt.config;
import lombok.Getter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
/**
* Mqtt Config
* * 备注以下刻意将 defaultClientIdclientId 区分使用
* * 防止断线重连的时候clientId 被重复拼接时间戳
*/
@Getter
@Configuration
public class MqttProperties {
@Value("${mqtt.host}")
private String hostUrl;
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password}")
private String password;
@Value("${mqtt.clientId}")
private String clientId;
@Value("${mqtt.qos}")
private int qos;
public String getClientId() {
clientId = clientId + System.currentTimeMillis();
return clientId;
}
}

View File

@ -0,0 +1,34 @@
package cn.iododer.yudao.module.mqtt.controller;
import cn.iododer.yudao.module.mqtt.util.MqttUtils;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
@Tag(name = "MQTT通讯")
@RestController
@RequestMapping("/mqtt")
@Validated
@Slf4j
public class MqttController {
@Autowired
private MqttUtils mqttUtils;
@PostMapping("/pub")
public String pub(String topic, String msg) {
try {
mqttUtils.pub( topic, msg);
return "发送成功!";
} catch (Exception e) {
log.error("消息发送失败!", e);
}
return "发送成功!";
}
}

View File

@ -0,0 +1,70 @@
package cn.iododer.yudao.module.mqtt.enums;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
import java.util.ArrayList;
import java.util.List;
/**
* 默认订阅的话题 -- 枚举类
*/
@Getter
@AllArgsConstructor
public enum DefineSubTopicEnum {
//qos 0-至多1次1-至少1次2-正好一次
ROBOT_EXECPTION("robot_execption", 0,"机器人异常"),
ROBOT_POINT("robot_point", 0,"机器人点位");
private final String topic;
private final int qos;
private final String msg;
/**
* 获取所有话题名
*
* @return topicArr
*/
public static String[] queryAllTopic() {
List<String> topicList = new ArrayList<>();
for (DefineSubTopicEnum item : DefineSubTopicEnum.values()) {
topicList.add(item.getTopic());
}
String[] topicArr = new String[topicList.size()];
topicArr = topicList.toArray(topicArr);
return topicArr;
}
/**
* 获取所有qos
*
* @return qosArr
*/
public static int[] queryAllQos() {
List<Integer> qosList = new ArrayList<>();
for (DefineSubTopicEnum item : DefineSubTopicEnum.values()) {
qosList.add(item.getQos());
}
int[] qosArr = new int[qosList.size()];
qosArr = qosList.stream().mapToInt(Integer::intValue).toArray();
return qosArr;
}
/*public static DefineSubTopicEnum getDefineSubTopicEnumByTopic(String topic) {
for (DefineSubTopicEnum item : DefineSubTopicEnum.values()) {
if (item.getTopic().equals(topic)) {
return item;
}
}
return null;
}*/
}

View File

@ -0,0 +1,5 @@
package cn.iododer.yudao.module.mqtt.service;
public interface MqttService {
void analysisMessage(String message);
}

View File

@ -0,0 +1,14 @@
package cn.iododer.yudao.module.mqtt.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class RobotExecptionMqttServiceImpl implements MqttService{
@Override
public void analysisMessage(String message) {
log.info("处理RobotExecptionMqttServiceImpl的消息 :{}",message);
}
}

View File

@ -0,0 +1,13 @@
package cn.iododer.yudao.module.mqtt.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class RobotPointMqttServiceImpl implements MqttService {
@Override
public void analysisMessage(String message) {
log.info("处理RobotPointMqttServiceImpl的消息 :{}",message);
}
}

View File

@ -0,0 +1,73 @@
package cn.iododer.yudao.module.mqtt.util;
import cn.iododer.yudao.module.mqtt.config.MqttFactory;
import cn.iododer.yudao.module.mqtt.config.MqttProperties;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
@Component
public class MqttUtils {
@Autowired
private MqttProperties config;
/**
* 发布消息
*
* @param qos 0-至多1次1-至少1次2-一次
* @param retained 是否保留true-sub重新连接mqtt服务端时总能拿到该主题的最新消息false-sub重新连接mqtt服务端时只能拿到连接后发布的消息
* @param topic 话题
* @param message 消息内容
*/
public void pub(String topic, String message) throws MqttException {
// 获取客户端实例
MqttClient client = MqttFactory.getInstance();
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(config.getQos());
mqttMessage.setRetained(false);
// 此处必须指明编码方式否则会出现订阅端中文乱码的情况
mqttMessage.setPayload(message.getBytes(StandardCharsets.UTF_8));
// 主题的目的地用于发布/订阅信息
MqttTopic mqttTopic = client.getTopic(topic);
// 提供一种机制来跟踪消息的传递进度
// 用于在以非阻塞方式在后台运行执行发布是跟踪消息的传递进度
MqttDeliveryToken token;
try {
// 将指定消息发布到主题但不等待消息传递完成返回的token可用于跟踪消息的传递状态
// 一旦此方法干净地返回消息就已被客户端接受发布当连接可用将在后台完成消息传递
token = mqttTopic.publish(mqttMessage);
token.waitForCompletion();
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 订阅话题
*
* @param topic 话题
* @param qos 0-至多1次1-至少1次2-一次
*/
public static void sub(String topic, int qos) throws MqttException {
// 获取客户端实例
MqttClient client = MqttFactory.getInstance();
client.subscribe(topic, qos);
}
/**
* 断开连接
*/
public static void disConnect() {
try {
// 获取客户端实例
MqttClient client = MqttFactory.getInstance();
client.disconnect();
} catch (MqttException e) {
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,42 @@
spring:
cloud:
nacos:
server-addr: 127.0.0.1:8848 # Nacos 服务器地址
username: # Nacos 账号
password: # Nacos 密码
discovery: # 【配置中心】配置项
namespace: dev # 命名空间。这里使用 dev 开发环境
group: DEFAULT_GROUP # 使用的 Nacos 配置分组,默认为 DEFAULT_GROUP
metadata:
version: 1.0.0 # 服务实例的版本号,可用于灰度发布
config: # 【注册中心】配置项
namespace: dev # 命名空间。这里使用 dev 开发环境
group: DEFAULT_GROUP # 使用的 Nacos 配置分组,默认为 DEFAULT_GROUP
# Lock4j 配置项
lock4j:
acquire-timeout: 3000 # 获取分布式锁超时时间,默认为 3000 毫秒
expire: 30000 # 分布式锁的超时时间,默认为 30 毫秒
--- #################### 监控相关配置 ####################
# Actuator 监控端点的配置项
management:
endpoints:
web:
base-path: /actuator # Actuator 提供的 API 接口的根目录。默认为 /actuator
exposure:
include: '*' # 需要开放的端点。默认值只打开 health 和 info 两个端点。通过设置 * ,可以开放所有端点。
# MQTT
mqtt:
host: tcp://127.0.0.1:1883
username: admin
password: admin
qos: 1
clientId: mqttx_b82345a52
timeout: 10
keepalive: 20

View File

@ -0,0 +1,30 @@
spring:
application:
name: mqtt-server
main:
allow-circular-references: true # 允许循环依赖,因为项目是三层架构,无法避免这个情况。
allow-bean-definition-overriding: true # 允许 Bean 覆盖,例如说 Feign 等会存在重复定义的服务
profiles:
active: local
mvc:
pathmatch:
matching-strategy: ANT_PATH_MATCHER # 解决 SpringFox 与 SpringBoot 2.6.x 不兼容的问题,参见 SpringFoxHandlerProviderBeanPostProcessor 类
# Jackson 配置项
jackson:
serialization:
write-dates-as-timestamps: true # 设置 LocalDateTime 的格式,使用时间戳
write-date-timestamps-as-nanoseconds: false # 设置不使用 nanoseconds 的格式。例如说 1611460870.401,而是直接 1611460870401
write-durations-as-timestamps: true # 设置 Duration 的格式,使用时间戳
fail-on-empty-beans: false # 允许序列化无属性的 Bean
server:
port: 48083
debug: false