From ebff112256a9d8dd5f84dcc7d4271cc063dbcd40 Mon Sep 17 00:00:00 2001 From: yj <913944315@qq.com> Date: Wed, 31 Jul 2024 13:42:22 +0800 Subject: [PATCH] =?UTF-8?q?=E7=BC=96=E5=86=99mqtt?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- device_gather/pom.xml | 5 + .../xr/device/DeviceGatherApplication.java | 12 +- .../java/com/xr/device/client/MqttClient.java | 116 +++++++++++++++++ .../java/com/xr/device/client/NtpService.java | 58 +++++++++ .../common/configvalue/StaticProperties.java | 26 ++++ .../common/task/ApplicationTaskTool.java | 35 +++++ .../common/thread/ThreadPoolConfig.java | 23 ++++ .../device/common/utils/MqttClientUtil.java | 122 ++++++++++++++++++ .../device/common/utils/StaticPropUtil.java | 31 +++++ .../com/xr/device/model/entity/Heartbeat.java | 26 ++++ .../model/service/impl/MqttServiceImpl.java | 40 ++++++ .../xr/device/schedule/GetMeterSchedule.java | 12 ++ .../src/main/resources/application-jcDev.yml | 32 ++++- 13 files changed, 532 insertions(+), 6 deletions(-) create mode 100644 device_gather/src/main/java/com/xr/device/client/MqttClient.java create mode 100644 device_gather/src/main/java/com/xr/device/client/NtpService.java create mode 100644 device_gather/src/main/java/com/xr/device/common/task/ApplicationTaskTool.java create mode 100644 device_gather/src/main/java/com/xr/device/common/thread/ThreadPoolConfig.java create mode 100644 device_gather/src/main/java/com/xr/device/common/utils/MqttClientUtil.java create mode 100644 device_gather/src/main/java/com/xr/device/model/entity/Heartbeat.java create mode 100644 device_gather/src/main/java/com/xr/device/model/service/impl/MqttServiceImpl.java diff --git a/device_gather/pom.xml b/device_gather/pom.xml index 061ead1..79a3a26 100644 --- a/device_gather/pom.xml +++ b/device_gather/pom.xml @@ -146,6 +146,11 @@ org.springframework.boot spring-boot-starter-websocket + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + 1.2.2 + diff --git a/device_gather/src/main/java/com/xr/device/DeviceGatherApplication.java b/device_gather/src/main/java/com/xr/device/DeviceGatherApplication.java index b9b1667..6613018 100644 --- a/device_gather/src/main/java/com/xr/device/DeviceGatherApplication.java +++ b/device_gather/src/main/java/com/xr/device/DeviceGatherApplication.java @@ -1,5 +1,7 @@ package com.xr.device; +import com.xr.device.client.NtpService; +import com.xr.device.common.task.ApplicationTaskTool; import com.xr.device.netty.NettyServer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; @@ -18,11 +20,11 @@ import org.springframework.scheduling.annotation.EnableScheduling; @EnableAsync//开启异步 public class DeviceGatherApplication implements CommandLineRunner { - private final NettyServer nettyServer; + private final ApplicationTaskTool taskTool; @Autowired - public DeviceGatherApplication(NettyServer nettyServer) { - this.nettyServer = nettyServer; + public DeviceGatherApplication(ApplicationTaskTool taskTool) { + this.taskTool = taskTool; } public static void main(String[] args) { @@ -32,7 +34,9 @@ public class DeviceGatherApplication implements CommandLineRunner { @Async @Override public void run(String... args) throws Exception { - nettyServer.start(); + taskTool.startNettyServer(); + taskTool.startNtpService(); + taskTool.startMqttClient(); } } diff --git a/device_gather/src/main/java/com/xr/device/client/MqttClient.java b/device_gather/src/main/java/com/xr/device/client/MqttClient.java new file mode 100644 index 0000000..23a6d6c --- /dev/null +++ b/device_gather/src/main/java/com/xr/device/client/MqttClient.java @@ -0,0 +1,116 @@ +package com.xr.device.client; + +import com.xr.device.common.utils.StaticPropUtil; +import com.xr.device.model.service.impl.MqttServiceImpl; +import org.eclipse.paho.client.mqttv3.*; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.stereotype.Component; + +import java.util.Arrays; +import java.util.List; + +@Component +public class MqttClient implements Runnable{ + + private static org.eclipse.paho.client.mqttv3.MqttClient client; + private static MqttConnectOptions options; + // 开始连接 + public void start() { + new Thread(this).start(); + } + public void run() { + try { + if(client!=null){ + return; + } + // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存 + client = new org.eclipse.paho.client.mqttv3.MqttClient(StaticPropUtil.host, StaticPropUtil.clientId, new MemoryPersistence()); + // MQTT的连接设置 + options = new MqttConnectOptions(); + // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接 + options.setCleanSession(false); + // 设置连接的用户名 + options.setUserName(StaticPropUtil.clentName); + // 设置连接的密码 + options.setPassword(StaticPropUtil.clentPassword.toCharArray()); + // 设置超时时间 单位为秒 + options.setConnectionTimeout(StaticPropUtil.timeOut); + // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 + options.setKeepAliveInterval(StaticPropUtil.interval); + // 设置断开后重新连接 + //options.setAutomaticReconnect(true); + // 设置回调 + client.setCallback(new MqttServiceImpl()); + MqttTopic topic = client.getTopic(StaticPropUtil.TOPIC1); + // setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息 遗嘱 + options.setWill(topic, "close".getBytes(), 1, true); + client.connect(options); + // 订阅消息 + subList(StaticPropUtil.TOPIC1,2); + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * 关闭MQTT连接 + */ + public void close() throws MqttException { + client.close(); + client.disconnect(); + } + + /** + * 向某个主题发布消息 默认qos:1 + * + * @param topic:发布的主题 + * @param msg:发布的消息 + */ + public void pub(String topic, String msg) throws MqttException { + MqttMessage mqttMessage = new MqttMessage(); + //mqttMessage.setQos(2); + mqttMessage.setPayload(msg.getBytes()); + MqttTopic mqttTopic = client.getTopic(topic); + MqttDeliveryToken token = mqttTopic.publish(mqttMessage); + token.waitForCompletion(); + } + + /** + * 向某个主题发布消息 + * + * @param topic: 发布的主题 + * @param msg: 发布的消息 + * @param qos: 消息质量 Qos:0、1、2 + */ + public void pub(String topic, String msg, int qos) throws MqttException { + MqttMessage mqttMessage = new MqttMessage(); + mqttMessage.setQos(qos); + mqttMessage.setPayload(msg.getBytes()); + MqttTopic mqttTopic = client.getTopic(topic); + MqttDeliveryToken token = mqttTopic.publish(mqttMessage); + token.waitForCompletion(); + } + + /** + * 订阅某一个主题 ,此方法默认的的Qos等级为:1 + * + * @param topic 主题 + */ + public void sub(String topic) throws MqttException { + client.subscribe(topic); + } + + /** + * 订阅某一个主题,可携带Qos + * + * @param topic 所要订阅的主题 + * @param qos 消息质量:0、1、2 + */ + public void subList(String topic, int qos) throws MqttException { + List list= Arrays.asList(topic.trim().split(",")); + for(String str:list){ + client.subscribe(str, qos); + } + } + +} diff --git a/device_gather/src/main/java/com/xr/device/client/NtpService.java b/device_gather/src/main/java/com/xr/device/client/NtpService.java new file mode 100644 index 0000000..b186c30 --- /dev/null +++ b/device_gather/src/main/java/com/xr/device/client/NtpService.java @@ -0,0 +1,58 @@ +package com.xr.device.client; + +import com.xr.device.common.utils.StaticPropUtil; +import org.springframework.stereotype.Service; + +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.nio.ByteBuffer; +import java.util.Arrays; + +@Service +public class NtpService implements Runnable { + + + public void start() { + new Thread(this).start(); + } + + @Override + public void run() { + try (DatagramSocket socket = new DatagramSocket(StaticPropUtil.ntpPort)) { + byte[] buffer = new byte[48]; + while (true) { + DatagramPacket requestPacket = new DatagramPacket(buffer, buffer.length); + socket.receive(requestPacket); + + long currentTime = Long.parseLong(System.currentTimeMillis() / 1000 + StaticPropUtil.timestampOffset); + ByteBuffer responseBuffer = ByteBuffer.allocate(48); + Arrays.fill(responseBuffer.array(), (byte) 0); + + responseBuffer.put(0, (byte) 0x24); // LI, Version, Mode + responseBuffer.put(1, (byte) 1); // Stratum + responseBuffer.put(2, (byte) 0); // Poll + responseBuffer.put(3, (byte) -20); // Precision + + responseBuffer.putInt(4, 0); // Root Delay + responseBuffer.putInt(8, 0); // Root Dispersion + responseBuffer.putInt(12, 0x4C4F434C); // Reference Identifier ("LOCL") + + responseBuffer.putLong(16, currentTime << 32); // Reference Timestamp + responseBuffer.putLong(24, ByteBuffer.wrap(requestPacket.getData(), 40, 8).getLong()); // Originate Timestamp + responseBuffer.putLong(32, currentTime << 32); // Receive Timestamp + responseBuffer.putLong(40, currentTime << 32); // Transmit Timestamp + + DatagramPacket responsePacket = new DatagramPacket( + responseBuffer.array(), + responseBuffer.array().length, + requestPacket.getAddress(), + requestPacket.getPort() + ); + socket.send(responsePacket); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + +} diff --git a/device_gather/src/main/java/com/xr/device/common/configvalue/StaticProperties.java b/device_gather/src/main/java/com/xr/device/common/configvalue/StaticProperties.java index d45a945..7f5b421 100644 --- a/device_gather/src/main/java/com/xr/device/common/configvalue/StaticProperties.java +++ b/device_gather/src/main/java/com/xr/device/common/configvalue/StaticProperties.java @@ -23,4 +23,30 @@ public class StaticProperties { @Value("${station.id}") private Integer stationId; + @Value("${ntp.port}") + private Integer ntpPort; + + @Value("${ntp.timestampOffset}") + private String timestampOffset; + + @Value("${spring.mqtt.username}") + private String clentName; + + @Value("${spring.mqtt.password}") + private String clentPassword; + + @Value("${spring.mqtt.url}") + private String host; + + @Value("${spring.mqtt.client.id}") + private String clientId; + + @Value("${spring.mqtt.default.topic}") + private String TOPIC1; + + @Value("${mqtt.connection.timeout}") + private int timeOut; + + @Value("${mqtt.keep.alive.interval}") + private int interval; } diff --git a/device_gather/src/main/java/com/xr/device/common/task/ApplicationTaskTool.java b/device_gather/src/main/java/com/xr/device/common/task/ApplicationTaskTool.java new file mode 100644 index 0000000..ea385a7 --- /dev/null +++ b/device_gather/src/main/java/com/xr/device/common/task/ApplicationTaskTool.java @@ -0,0 +1,35 @@ +package com.xr.device.common.task; + +import com.xr.device.client.MqttClient; +import com.xr.device.client.NtpService; +import com.xr.device.netty.NettyServer; +import lombok.RequiredArgsConstructor; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; + +@Service +@RequiredArgsConstructor +public class ApplicationTaskTool { + + private final NettyServer nettyServer; + + private final NtpService ntpService; + + private final MqttClient mqttClient; + + @Async("fixedTaskExecutor") + public void startNettyServer() throws Exception { + nettyServer.start(); + } + + @Async("fixedTaskExecutor") + public void startNtpService() throws Exception { + ntpService.start(); + } + + @Async("fixedTaskExecutor") + public void startMqttClient() throws Exception { + mqttClient.start(); + } +} diff --git a/device_gather/src/main/java/com/xr/device/common/thread/ThreadPoolConfig.java b/device_gather/src/main/java/com/xr/device/common/thread/ThreadPoolConfig.java new file mode 100644 index 0000000..1ee9089 --- /dev/null +++ b/device_gather/src/main/java/com/xr/device/common/thread/ThreadPoolConfig.java @@ -0,0 +1,23 @@ +package com.xr.device.common.thread; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.Executor; + +@Configuration +public class ThreadPoolConfig { + + @Bean(name = "fixedTaskExecutor") + public Executor taskExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + int poolSize = 10; // 固定线程池大小 + executor.setCorePoolSize(poolSize); + executor.setMaxPoolSize(poolSize); + executor.setQueueCapacity(100); + executor.setThreadNamePrefix("FixedAsync-"); + executor.initialize(); + return executor; + } +} diff --git a/device_gather/src/main/java/com/xr/device/common/utils/MqttClientUtil.java b/device_gather/src/main/java/com/xr/device/common/utils/MqttClientUtil.java new file mode 100644 index 0000000..b8b1e6b --- /dev/null +++ b/device_gather/src/main/java/com/xr/device/common/utils/MqttClientUtil.java @@ -0,0 +1,122 @@ +package com.xr.device.common.utils; + +import com.xr.device.model.service.impl.MqttServiceImpl; +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.*; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import java.util.Objects; + +@Component +@Slf4j +public class MqttClientUtil { + + + private MqttClient mqttClient; + private MqttConnectOptions mqttConnectOptions; + + @PostConstruct + private void init(){ + connect(StaticPropUtil.host, StaticPropUtil.clientId); + } + + /** + * 链接mqtt + * @param host + * @param clientId + */ + private void connect(String host,String clientId){ + try{ + mqttClient = new MqttClient(host,clientId,new MemoryPersistence()); + mqttClient.setCallback(new MqttServiceImpl()); + mqttConnectOptions = getMqttConnectOptions(); + mqttClient.connect(mqttConnectOptions); + }catch (Exception e){ + log.error("mqtt服务链接异常!"); + e.printStackTrace(); + } + } + + /** + * 设置链接对象信息 + * setCleanSession true 断开链接即清楚会话 false 保留链接信息 离线还会继续发消息 + * @return + */ + private MqttConnectOptions getMqttConnectOptions(){ + MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); + mqttConnectOptions.setUserName(StaticPropUtil.clentName); + mqttConnectOptions.setPassword(StaticPropUtil.clentPassword.toCharArray()); + mqttConnectOptions.setServerURIs(new String[]{StaticPropUtil.host}); + mqttConnectOptions.setKeepAliveInterval(StaticPropUtil.interval); + mqttConnectOptions.setConnectionTimeout(StaticPropUtil.timeOut); + //mqttConnectOptions.setAutomaticReconnect(true); + mqttConnectOptions.setCleanSession(false); + return mqttConnectOptions; + } + + /** + *mqtt链接状态 + * @return + */ + private boolean isConnect(){ + if(Objects.isNull(this.mqttClient)){ + return false; + } + return mqttClient.isConnected(); + } + + /** + * 设置重连 + * @throws Exception + */ + private void reConnect() throws Exception{ + if(Objects.nonNull(this.mqttClient)){ + log.info("mqtt 服务已重新链接..."); + this.mqttClient.connect(this.mqttConnectOptions); + } + } + + /** + * 断开链接 + * @throws Exception + */ + private void closeConnect() throws Exception{ + if(Objects.nonNull(this.mqttClient)){ + log.info("mqtt 服务已断开链接..."); + this.mqttClient.disconnect(); + } + } + + /** + * 发布消息 + * @param topic + * @param message + * @param qos + * @throws Exception + */ + public void sendMessage(String topic,String message,int qos) throws Exception { + if(Objects.nonNull(this.mqttClient) && this.mqttClient.isConnected()){ + MqttMessage mqttMessage = new MqttMessage(); + mqttMessage.setPayload(message.getBytes()); + mqttMessage.setQos(qos); + + MqttTopic mqttTopic = mqttClient.getTopic(topic); + + if(Objects.nonNull(mqttTopic)){ + try{ + MqttDeliveryToken publish = mqttTopic.publish(mqttMessage); + if(publish.isComplete()){ + log.info("消息发送成功---->{}",message); + } + }catch(Exception e){ + log.error("消息发送异常",e); + } + } + }else{ + reConnect(); + sendMessage(topic,message,qos); + } + } +} diff --git a/device_gather/src/main/java/com/xr/device/common/utils/StaticPropUtil.java b/device_gather/src/main/java/com/xr/device/common/utils/StaticPropUtil.java index 0c746ce..d3aa1e8 100644 --- a/device_gather/src/main/java/com/xr/device/common/utils/StaticPropUtil.java +++ b/device_gather/src/main/java/com/xr/device/common/utils/StaticPropUtil.java @@ -15,12 +15,43 @@ public class StaticPropUtil { public static Integer stationId; + public static Integer ntpPort; + + public static String timestampOffset; + + public static String userName; + + public static String password; + + public static String clentName; + + public static String clentPassword; + + public static String host; + + public static String clientId; + + public static String TOPIC1; + + public static int timeOut; + + public static int interval; + public static void initDingDingProp(StaticProperties dingProperties){ imgUrl = dingProperties.getImgUrl(); imgPath = dingProperties.getImgPath(); pythonPath = dingProperties.getPythonPath(); modelPath = dingProperties.getModelPath(); stationId = dingProperties.getStationId(); + ntpPort = dingProperties.getNtpPort(); + timestampOffset = dingProperties.getTimestampOffset(); + clentName=dingProperties.getClentName(); + clentPassword=dingProperties.getClentPassword(); + host=dingProperties.getHost(); + clientId=dingProperties.getClientId(); + TOPIC1=dingProperties.getTOPIC1(); + timeOut=dingProperties.getTimeOut(); + interval=dingProperties.getInterval(); } } diff --git a/device_gather/src/main/java/com/xr/device/model/entity/Heartbeat.java b/device_gather/src/main/java/com/xr/device/model/entity/Heartbeat.java new file mode 100644 index 0000000..22272f1 --- /dev/null +++ b/device_gather/src/main/java/com/xr/device/model/entity/Heartbeat.java @@ -0,0 +1,26 @@ +package com.xr.device.model.entity; + +import lombok.Data; + +@Data +public class Heartbeat { + + private String battery_level; + + private String created; + + private String id; + + private String ip_address; + + private String mac_address; + + private String name; + + private String serial_number; + + private String timestamp; + + private String updated; + +} diff --git a/device_gather/src/main/java/com/xr/device/model/service/impl/MqttServiceImpl.java b/device_gather/src/main/java/com/xr/device/model/service/impl/MqttServiceImpl.java new file mode 100644 index 0000000..5340769 --- /dev/null +++ b/device_gather/src/main/java/com/xr/device/model/service/impl/MqttServiceImpl.java @@ -0,0 +1,40 @@ +package com.xr.device.model.service.impl; + +import com.alibaba.fastjson.JSONObject; +import com.xr.device.model.entity.Heartbeat; +import com.xr.onvifhk.entity.DeviceInfo; +import com.xr.onvifhk.util.HkComUtil; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttMessage; + +public class MqttServiceImpl implements MqttCallback { + @Override + public void connectionLost(Throwable throwable) { + System.out.println("connectionLost---------连接断开,可以做重连"); + } + + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + System.out.println("接收消息主题 : " + topic); + System.out.println("接收消息Qos : " + message.getQos()); + if(topic.equals("camera/heartbeat")){ + //收到设备心跳后,调取onvif抓取照片 + String res = new String(message.getPayload()); + if(!res.equals("close")){ + Heartbeat mqttTop = JSONObject.parseObject(res, Heartbeat.class); + DeviceInfo info = new DeviceInfo(); + info.setPort(1002); + info.setIp(mqttTop.getIp_address()); + info.setPassword("1234"); + info.setAccount("admin"); + String url = HkComUtil.getBole(info,null); + } + } + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + System.out.println("deliveryComplete---------" + token.isComplete()); + } +} diff --git a/device_gather/src/main/java/com/xr/device/schedule/GetMeterSchedule.java b/device_gather/src/main/java/com/xr/device/schedule/GetMeterSchedule.java index bfcf073..0a757c2 100644 --- a/device_gather/src/main/java/com/xr/device/schedule/GetMeterSchedule.java +++ b/device_gather/src/main/java/com/xr/device/schedule/GetMeterSchedule.java @@ -15,7 +15,9 @@ import com.xr.onvifhk.entity.DeviceInfo; import com.xr.onvifhk.util.HkComUtil; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Bean; import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.stereotype.Component; import javax.annotation.PreDestroy; @@ -45,6 +47,16 @@ public class GetMeterSchedule { private final MeterTypeService meterTypeService; + @Bean + public ThreadPoolTaskScheduler taskScheduler() { + ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); + scheduler.setPoolSize(10); + scheduler.setThreadNamePrefix("custom-scheduled-task-"); + scheduler.setWaitForTasksToCompleteOnShutdown(true); + scheduler.setAwaitTerminationSeconds(30); + return scheduler; + } + @Scheduled(cron = "0 0/3 * * * ?") public void getMeterSchedule(){ List configs = meterConfigService.getMeterList(StaticPropUtil.stationId); diff --git a/device_gather/src/main/resources/application-jcDev.yml b/device_gather/src/main/resources/application-jcDev.yml index 6532e94..ee00f68 100644 --- a/device_gather/src/main/resources/application-jcDev.yml +++ b/device_gather/src/main/resources/application-jcDev.yml @@ -1,7 +1,7 @@ server: port: 8173 servlet: - context-path: /cars-api + context-path: /get-api #context-path: / spring: @@ -57,6 +57,14 @@ spring: max-wait: -1ms max-idle: 10 min-idle: 5 + mqtt: + username: admin + password: admin123 + url: tcp://192.168.1.83:1883 + client: + id: publish_1524585547client + default: + topic: camera/heartbeat swagger: show: true analysis: @@ -79,4 +87,24 @@ eureka: service-url: defaultZone: http://localhost:8184/eureka station: - id: 2 \ No newline at end of file + id: 2 +ntp: + port: 1123 + timestampOffset: 2208988800L + + +mqtt: + connection: + timeout: 30000 + keep: + alive: + interval: 20 +work: + mqtt: + username: admin + password: public + clientId: publish_client + broker: tcp://localhost:1883 + topic: camera/heartbeat + timeout: 30000 + interval: 20 \ No newline at end of file