Browse Source

编写mqtt

dev-shibei
yj 1 year ago
parent
commit
ebff112256
  1. 5
      device_gather/pom.xml
  2. 12
      device_gather/src/main/java/com/xr/device/DeviceGatherApplication.java
  3. 116
      device_gather/src/main/java/com/xr/device/client/MqttClient.java
  4. 58
      device_gather/src/main/java/com/xr/device/client/NtpService.java
  5. 26
      device_gather/src/main/java/com/xr/device/common/configvalue/StaticProperties.java
  6. 35
      device_gather/src/main/java/com/xr/device/common/task/ApplicationTaskTool.java
  7. 23
      device_gather/src/main/java/com/xr/device/common/thread/ThreadPoolConfig.java
  8. 122
      device_gather/src/main/java/com/xr/device/common/utils/MqttClientUtil.java
  9. 31
      device_gather/src/main/java/com/xr/device/common/utils/StaticPropUtil.java
  10. 26
      device_gather/src/main/java/com/xr/device/model/entity/Heartbeat.java
  11. 40
      device_gather/src/main/java/com/xr/device/model/service/impl/MqttServiceImpl.java
  12. 12
      device_gather/src/main/java/com/xr/device/schedule/GetMeterSchedule.java
  13. 32
      device_gather/src/main/resources/application-jcDev.yml

5
device_gather/pom.xml

@ -146,6 +146,11 @@
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId> <artifactId>spring-boot-starter-websocket</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
</dependencies> </dependencies>
<build> <build>

12
device_gather/src/main/java/com/xr/device/DeviceGatherApplication.java

@ -1,5 +1,7 @@
package com.xr.device; package com.xr.device;
import com.xr.device.client.NtpService;
import com.xr.device.common.task.ApplicationTaskTool;
import com.xr.device.netty.NettyServer; import com.xr.device.netty.NettyServer;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner; import org.springframework.boot.CommandLineRunner;
@ -18,11 +20,11 @@ import org.springframework.scheduling.annotation.EnableScheduling;
@EnableAsync//开启异步 @EnableAsync//开启异步
public class DeviceGatherApplication implements CommandLineRunner { public class DeviceGatherApplication implements CommandLineRunner {
private final NettyServer nettyServer; private final ApplicationTaskTool taskTool;
@Autowired @Autowired
public DeviceGatherApplication(NettyServer nettyServer) { public DeviceGatherApplication(ApplicationTaskTool taskTool) {
this.nettyServer = nettyServer; this.taskTool = taskTool;
} }
public static void main(String[] args) { public static void main(String[] args) {
@ -32,7 +34,9 @@ public class DeviceGatherApplication implements CommandLineRunner {
@Async @Async
@Override @Override
public void run(String... args) throws Exception { public void run(String... args) throws Exception {
nettyServer.start(); taskTool.startNettyServer();
taskTool.startNtpService();
taskTool.startMqttClient();
} }
} }

116
device_gather/src/main/java/com/xr/device/client/MqttClient.java

@ -0,0 +1,116 @@
package com.xr.device.client;
import com.xr.device.common.utils.StaticPropUtil;
import com.xr.device.model.service.impl.MqttServiceImpl;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.List;
@Component
public class MqttClient implements Runnable{
private static org.eclipse.paho.client.mqttv3.MqttClient client;
private static MqttConnectOptions options;
// 开始连接
public void start() {
new Thread(this).start();
}
public void run() {
try {
if(client!=null){
return;
}
// host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
client = new org.eclipse.paho.client.mqttv3.MqttClient(StaticPropUtil.host, StaticPropUtil.clientId, new MemoryPersistence());
// MQTT的连接设置
options = new MqttConnectOptions();
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
options.setCleanSession(false);
// 设置连接的用户名
options.setUserName(StaticPropUtil.clentName);
// 设置连接的密码
options.setPassword(StaticPropUtil.clentPassword.toCharArray());
// 设置超时时间 单位为秒
options.setConnectionTimeout(StaticPropUtil.timeOut);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
options.setKeepAliveInterval(StaticPropUtil.interval);
// 设置断开后重新连接
//options.setAutomaticReconnect(true);
// 设置回调
client.setCallback(new MqttServiceImpl());
MqttTopic topic = client.getTopic(StaticPropUtil.TOPIC1);
// setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息 遗嘱
options.setWill(topic, "close".getBytes(), 1, true);
client.connect(options);
// 订阅消息
subList(StaticPropUtil.TOPIC1,2);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 关闭MQTT连接
*/
public void close() throws MqttException {
client.close();
client.disconnect();
}
/**
* 向某个主题发布消息 默认qos1
*
* @param topic:发布的主题
* @param msg发布的消息
*/
public void pub(String topic, String msg) throws MqttException {
MqttMessage mqttMessage = new MqttMessage();
//mqttMessage.setQos(2);
mqttMessage.setPayload(msg.getBytes());
MqttTopic mqttTopic = client.getTopic(topic);
MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
token.waitForCompletion();
}
/**
* 向某个主题发布消息
*
* @param topic: 发布的主题
* @param msg: 发布的消息
* @param qos: 消息质量 Qos012
*/
public void pub(String topic, String msg, int qos) throws MqttException {
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos);
mqttMessage.setPayload(msg.getBytes());
MqttTopic mqttTopic = client.getTopic(topic);
MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
token.waitForCompletion();
}
/**
* 订阅某一个主题 此方法默认的的Qos等级为1
*
* @param topic 主题
*/
public void sub(String topic) throws MqttException {
client.subscribe(topic);
}
/**
* 订阅某一个主题可携带Qos
*
* @param topic 所要订阅的主题
* @param qos 消息质量012
*/
public void subList(String topic, int qos) throws MqttException {
List<String> list= Arrays.asList(topic.trim().split(","));
for(String str:list){
client.subscribe(str, qos);
}
}
}

58
device_gather/src/main/java/com/xr/device/client/NtpService.java

@ -0,0 +1,58 @@
package com.xr.device.client;
import com.xr.device.common.utils.StaticPropUtil;
import org.springframework.stereotype.Service;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.nio.ByteBuffer;
import java.util.Arrays;
@Service
public class NtpService implements Runnable {
public void start() {
new Thread(this).start();
}
@Override
public void run() {
try (DatagramSocket socket = new DatagramSocket(StaticPropUtil.ntpPort)) {
byte[] buffer = new byte[48];
while (true) {
DatagramPacket requestPacket = new DatagramPacket(buffer, buffer.length);
socket.receive(requestPacket);
long currentTime = Long.parseLong(System.currentTimeMillis() / 1000 + StaticPropUtil.timestampOffset);
ByteBuffer responseBuffer = ByteBuffer.allocate(48);
Arrays.fill(responseBuffer.array(), (byte) 0);
responseBuffer.put(0, (byte) 0x24); // LI, Version, Mode
responseBuffer.put(1, (byte) 1); // Stratum
responseBuffer.put(2, (byte) 0); // Poll
responseBuffer.put(3, (byte) -20); // Precision
responseBuffer.putInt(4, 0); // Root Delay
responseBuffer.putInt(8, 0); // Root Dispersion
responseBuffer.putInt(12, 0x4C4F434C); // Reference Identifier ("LOCL")
responseBuffer.putLong(16, currentTime << 32); // Reference Timestamp
responseBuffer.putLong(24, ByteBuffer.wrap(requestPacket.getData(), 40, 8).getLong()); // Originate Timestamp
responseBuffer.putLong(32, currentTime << 32); // Receive Timestamp
responseBuffer.putLong(40, currentTime << 32); // Transmit Timestamp
DatagramPacket responsePacket = new DatagramPacket(
responseBuffer.array(),
responseBuffer.array().length,
requestPacket.getAddress(),
requestPacket.getPort()
);
socket.send(responsePacket);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

26
device_gather/src/main/java/com/xr/device/common/configvalue/StaticProperties.java

@ -23,4 +23,30 @@ public class StaticProperties {
@Value("${station.id}") @Value("${station.id}")
private Integer stationId; private Integer stationId;
@Value("${ntp.port}")
private Integer ntpPort;
@Value("${ntp.timestampOffset}")
private String timestampOffset;
@Value("${spring.mqtt.username}")
private String clentName;
@Value("${spring.mqtt.password}")
private String clentPassword;
@Value("${spring.mqtt.url}")
private String host;
@Value("${spring.mqtt.client.id}")
private String clientId;
@Value("${spring.mqtt.default.topic}")
private String TOPIC1;
@Value("${mqtt.connection.timeout}")
private int timeOut;
@Value("${mqtt.keep.alive.interval}")
private int interval;
} }

35
device_gather/src/main/java/com/xr/device/common/task/ApplicationTaskTool.java

@ -0,0 +1,35 @@
package com.xr.device.common.task;
import com.xr.device.client.MqttClient;
import com.xr.device.client.NtpService;
import com.xr.device.netty.NettyServer;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
public class ApplicationTaskTool {
private final NettyServer nettyServer;
private final NtpService ntpService;
private final MqttClient mqttClient;
@Async("fixedTaskExecutor")
public void startNettyServer() throws Exception {
nettyServer.start();
}
@Async("fixedTaskExecutor")
public void startNtpService() throws Exception {
ntpService.start();
}
@Async("fixedTaskExecutor")
public void startMqttClient() throws Exception {
mqttClient.start();
}
}

23
device_gather/src/main/java/com/xr/device/common/thread/ThreadPoolConfig.java

@ -0,0 +1,23 @@
package com.xr.device.common.thread;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
@Configuration
public class ThreadPoolConfig {
@Bean(name = "fixedTaskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
int poolSize = 10; // 固定线程池大小
executor.setCorePoolSize(poolSize);
executor.setMaxPoolSize(poolSize);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("FixedAsync-");
executor.initialize();
return executor;
}
}

122
device_gather/src/main/java/com/xr/device/common/utils/MqttClientUtil.java

@ -0,0 +1,122 @@
package com.xr.device.common.utils;
import com.xr.device.model.service.impl.MqttServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Objects;
@Component
@Slf4j
public class MqttClientUtil {
private MqttClient mqttClient;
private MqttConnectOptions mqttConnectOptions;
@PostConstruct
private void init(){
connect(StaticPropUtil.host, StaticPropUtil.clientId);
}
/**
* 链接mqtt
* @param host
* @param clientId
*/
private void connect(String host,String clientId){
try{
mqttClient = new MqttClient(host,clientId,new MemoryPersistence());
mqttClient.setCallback(new MqttServiceImpl());
mqttConnectOptions = getMqttConnectOptions();
mqttClient.connect(mqttConnectOptions);
}catch (Exception e){
log.error("mqtt服务链接异常!");
e.printStackTrace();
}
}
/**
* 设置链接对象信息
* setCleanSession true 断开链接即清楚会话 false 保留链接信息 离线还会继续发消息
* @return
*/
private MqttConnectOptions getMqttConnectOptions(){
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setUserName(StaticPropUtil.clentName);
mqttConnectOptions.setPassword(StaticPropUtil.clentPassword.toCharArray());
mqttConnectOptions.setServerURIs(new String[]{StaticPropUtil.host});
mqttConnectOptions.setKeepAliveInterval(StaticPropUtil.interval);
mqttConnectOptions.setConnectionTimeout(StaticPropUtil.timeOut);
//mqttConnectOptions.setAutomaticReconnect(true);
mqttConnectOptions.setCleanSession(false);
return mqttConnectOptions;
}
/**
*mqtt链接状态
* @return
*/
private boolean isConnect(){
if(Objects.isNull(this.mqttClient)){
return false;
}
return mqttClient.isConnected();
}
/**
* 设置重连
* @throws Exception
*/
private void reConnect() throws Exception{
if(Objects.nonNull(this.mqttClient)){
log.info("mqtt 服务已重新链接...");
this.mqttClient.connect(this.mqttConnectOptions);
}
}
/**
* 断开链接
* @throws Exception
*/
private void closeConnect() throws Exception{
if(Objects.nonNull(this.mqttClient)){
log.info("mqtt 服务已断开链接...");
this.mqttClient.disconnect();
}
}
/**
* 发布消息
* @param topic
* @param message
* @param qos
* @throws Exception
*/
public void sendMessage(String topic,String message,int qos) throws Exception {
if(Objects.nonNull(this.mqttClient) && this.mqttClient.isConnected()){
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(message.getBytes());
mqttMessage.setQos(qos);
MqttTopic mqttTopic = mqttClient.getTopic(topic);
if(Objects.nonNull(mqttTopic)){
try{
MqttDeliveryToken publish = mqttTopic.publish(mqttMessage);
if(publish.isComplete()){
log.info("消息发送成功---->{}",message);
}
}catch(Exception e){
log.error("消息发送异常",e);
}
}
}else{
reConnect();
sendMessage(topic,message,qos);
}
}
}

31
device_gather/src/main/java/com/xr/device/common/utils/StaticPropUtil.java

@ -15,12 +15,43 @@ public class StaticPropUtil {
public static Integer stationId; public static Integer stationId;
public static Integer ntpPort;
public static String timestampOffset;
public static String userName;
public static String password;
public static String clentName;
public static String clentPassword;
public static String host;
public static String clientId;
public static String TOPIC1;
public static int timeOut;
public static int interval;
public static void initDingDingProp(StaticProperties dingProperties){ public static void initDingDingProp(StaticProperties dingProperties){
imgUrl = dingProperties.getImgUrl(); imgUrl = dingProperties.getImgUrl();
imgPath = dingProperties.getImgPath(); imgPath = dingProperties.getImgPath();
pythonPath = dingProperties.getPythonPath(); pythonPath = dingProperties.getPythonPath();
modelPath = dingProperties.getModelPath(); modelPath = dingProperties.getModelPath();
stationId = dingProperties.getStationId(); stationId = dingProperties.getStationId();
ntpPort = dingProperties.getNtpPort();
timestampOffset = dingProperties.getTimestampOffset();
clentName=dingProperties.getClentName();
clentPassword=dingProperties.getClentPassword();
host=dingProperties.getHost();
clientId=dingProperties.getClientId();
TOPIC1=dingProperties.getTOPIC1();
timeOut=dingProperties.getTimeOut();
interval=dingProperties.getInterval();
} }
} }

26
device_gather/src/main/java/com/xr/device/model/entity/Heartbeat.java

@ -0,0 +1,26 @@
package com.xr.device.model.entity;
import lombok.Data;
@Data
public class Heartbeat {
private String battery_level;
private String created;
private String id;
private String ip_address;
private String mac_address;
private String name;
private String serial_number;
private String timestamp;
private String updated;
}

40
device_gather/src/main/java/com/xr/device/model/service/impl/MqttServiceImpl.java

@ -0,0 +1,40 @@
package com.xr.device.model.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.xr.device.model.entity.Heartbeat;
import com.xr.onvifhk.entity.DeviceInfo;
import com.xr.onvifhk.util.HkComUtil;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class MqttServiceImpl implements MqttCallback {
@Override
public void connectionLost(Throwable throwable) {
System.out.println("connectionLost---------连接断开,可以做重连");
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("接收消息主题 : " + topic);
System.out.println("接收消息Qos : " + message.getQos());
if(topic.equals("camera/heartbeat")){
//收到设备心跳后,调取onvif抓取照片
String res = new String(message.getPayload());
if(!res.equals("close")){
Heartbeat mqttTop = JSONObject.parseObject(res, Heartbeat.class);
DeviceInfo info = new DeviceInfo();
info.setPort(1002);
info.setIp(mqttTop.getIp_address());
info.setPassword("1234");
info.setAccount("admin");
String url = HkComUtil.getBole(info,null);
}
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
}

12
device_gather/src/main/java/com/xr/device/schedule/GetMeterSchedule.java

@ -15,7 +15,9 @@ import com.xr.onvifhk.entity.DeviceInfo;
import com.xr.onvifhk.util.HkComUtil; import com.xr.onvifhk.util.HkComUtil;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy; import javax.annotation.PreDestroy;
@ -45,6 +47,16 @@ public class GetMeterSchedule {
private final MeterTypeService meterTypeService; private final MeterTypeService meterTypeService;
@Bean
public ThreadPoolTaskScheduler taskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(10);
scheduler.setThreadNamePrefix("custom-scheduled-task-");
scheduler.setWaitForTasksToCompleteOnShutdown(true);
scheduler.setAwaitTerminationSeconds(30);
return scheduler;
}
@Scheduled(cron = "0 0/3 * * * ?") @Scheduled(cron = "0 0/3 * * * ?")
public void getMeterSchedule(){ public void getMeterSchedule(){
List<MeterConfig> configs = meterConfigService.getMeterList(StaticPropUtil.stationId); List<MeterConfig> configs = meterConfigService.getMeterList(StaticPropUtil.stationId);

32
device_gather/src/main/resources/application-jcDev.yml

@ -1,7 +1,7 @@
server: server:
port: 8173 port: 8173
servlet: servlet:
context-path: /cars-api context-path: /get-api
#context-path: / #context-path: /
spring: spring:
@ -57,6 +57,14 @@ spring:
max-wait: -1ms max-wait: -1ms
max-idle: 10 max-idle: 10
min-idle: 5 min-idle: 5
mqtt:
username: admin
password: admin123
url: tcp://192.168.1.83:1883
client:
id: publish_1524585547client
default:
topic: camera/heartbeat
swagger: swagger:
show: true show: true
analysis: analysis:
@ -79,4 +87,24 @@ eureka:
service-url: service-url:
defaultZone: http://localhost:8184/eureka defaultZone: http://localhost:8184/eureka
station: station:
id: 2 id: 2
ntp:
port: 1123
timestampOffset: 2208988800L
mqtt:
connection:
timeout: 30000
keep:
alive:
interval: 20
work:
mqtt:
username: admin
password: public
clientId: publish_client
broker: tcp://localhost:1883
topic: camera/heartbeat
timeout: 30000
interval: 20
Loading…
Cancel
Save