Socket往客户端推送数据

This commit is contained in:
cbs 2025-07-11 11:24:16 +08:00
parent 40a805f2f2
commit e7a79cf457
3 changed files with 32 additions and 18 deletions

View File

@ -18,6 +18,7 @@ import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
@ -34,10 +35,11 @@ public class RemoteControllerProcessor {
public final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
public final Map<String,Socket> socketMap = new HashMap<>();
@Value("${remote.msg}")
private String defaultMsg;
@Value("${remote.cockpit-time-out}")
private int cockpitTimeOut;
@ -205,10 +207,11 @@ public class RemoteControllerProcessor {
for (Map.Entry<String, RemoteControllerSocketDTO> v : cache.entrySet()) {
RemoteControllerSocketDTO remoteControllerSocketDTO = v.getValue();
Socket socket = remoteControllerSocketDTO.getSocket();
Socket socket = socketMap.get("1");
if (socket == null || socket.isClosed()) {
socket = new Socket();
try {
log.info("socket建立新连接");
socket.connect(new InetSocketAddress(v.getKey(), remoteControllerSocketDTO.getControllerPort()), 1000);
remoteControllerSocketDTO.setSocket(socket);
} catch (IOException e) {
@ -227,7 +230,7 @@ public class RemoteControllerProcessor {
} finally {
if (ObjectUtil.isNotEmpty(os)) {
try {
os.close();
os.flush();
} catch (IOException e) {
}
}
@ -245,4 +248,8 @@ public class RemoteControllerProcessor {
// 在Bean销毁前关闭去啊
shutdown();
}
public void putSocket(Socket socket) {
socketMap.put("1",socket);
}
}

View File

@ -1,10 +1,12 @@
package cn.iocoder.yudao.module.remote.config.server;
import cn.iocoder.yudao.module.remote.api.robot.RemoteControllerProcessor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@ -13,11 +15,14 @@ import java.net.Socket;
@Slf4j
@Component
public class TCPServerApplicationRunner implements ApplicationRunner {
public class TCPServerApplicationRunner implements ApplicationRunner {
@Resource
private RemoteControllerProcessor remoteControllerProcessor;
@Override
public void run(ApplicationArguments args) {
new Thread(new Runnable() {
public void run(ApplicationArguments args) {
new Thread(new Runnable() {
@Override
public void run() {
initServer();
@ -28,20 +33,21 @@ public class TCPServerApplicationRunner implements ApplicationRunner {
public void initServer() {
try {
log.info("开始初始化socket服务");
ServerSocket server=new ServerSocket(9000);
while (true){
ServerSocket server = new ServerSocket(9000);
while (true) {
// 监听客户端的请求,没有的话就会进行阻塞
Socket socket=server.accept();
Socket socket = server.accept();
remoteControllerProcessor.putSocket(socket);
//开启一个线程进行处理客户端的请求
new Thread(new Runnable() {
new Thread(new Runnable() {
@Override
public void run() {
try {
InputStream is=socket.getInputStream();
byte[] bytes=new byte[1024];
int len=is.read(bytes);
System.out.println(new String(bytes,0,len));
OutputStream os= socket.getOutputStream();
InputStream is = socket.getInputStream();
byte[] bytes = new byte[1024];
int len = is.read(bytes);
System.out.println(new String(bytes, 0, len));
OutputStream os = socket.getOutputStream();
//响应给客户端
os.write("server :".getBytes());
socket.close();

View File

@ -167,12 +167,13 @@ public class TaskDistributionConsumer {
}
public Long getSeconds(Object startObj, Object endObj) {
String startObjStr = startObj.toString();
String endObjStr = endObj.toString();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss");
String startObjStr = startObj.toString().replaceAll("T"," ");
String endObjStr = endObj.toString().replaceAll("T"," ");
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
LocalDateTime startTime = LocalDateTime.parse(startObjStr, formatter);
LocalDateTime endTime = LocalDateTime.parse(endObjStr, formatter);
Duration duration = Duration.between(startTime, endTime);
log.info("开始时间 :{}, 结束时间 :{}, 间隔 :{}",startObj,endObj,duration.getSeconds());
return duration.getSeconds();
}