2023-02-16 15:28:15 +08:00
|
|
|
|
package com.zhgd.mqtt.server;
|
|
|
|
|
|
|
2025-07-21 10:44:43 +08:00
|
|
|
|
import com.alibaba.fastjson.JSON;
|
|
|
|
|
|
import com.alibaba.fastjson.JSONArray;
|
|
|
|
|
|
import com.alibaba.fastjson.JSONObject;
|
2024-07-09 16:42:03 +08:00
|
|
|
|
import com.zhgd.xmgl.constant.Cts;
|
2025-07-21 10:44:43 +08:00
|
|
|
|
import com.zhgd.xmgl.modules.worker.service.IWorkerInfoService;
|
|
|
|
|
|
import com.zhgd.xmgl.modules.xz.service.IXzHikvisionSyncService;
|
2023-02-16 15:28:15 +08:00
|
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
|
|
|
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
|
|
|
|
|
import org.slf4j.Logger;
|
|
|
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
|
|
|
|
import org.springframework.context.annotation.Bean;
|
|
|
|
|
|
import org.springframework.context.annotation.Configuration;
|
|
|
|
|
|
import org.springframework.integration.annotation.ServiceActivator;
|
|
|
|
|
|
import org.springframework.integration.channel.DirectChannel;
|
2025-07-21 10:44:43 +08:00
|
|
|
|
import org.springframework.integration.core.MessageProducer;
|
2023-02-16 15:28:15 +08:00
|
|
|
|
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
|
|
|
|
|
|
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
|
2025-07-21 10:44:43 +08:00
|
|
|
|
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
|
2023-02-16 15:28:15 +08:00
|
|
|
|
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
|
2025-07-21 10:44:43 +08:00
|
|
|
|
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
|
2023-02-16 15:28:15 +08:00
|
|
|
|
import org.springframework.messaging.MessageChannel;
|
|
|
|
|
|
import org.springframework.messaging.MessageHandler;
|
|
|
|
|
|
|
2025-07-21 10:44:43 +08:00
|
|
|
|
import javax.annotation.Resource;
|
|
|
|
|
|
|
2023-02-16 15:28:15 +08:00
|
|
|
|
/**
|
|
|
|
|
|
* @program: itbgp
|
|
|
|
|
|
* @description:
|
|
|
|
|
|
* @author: Mr.Peng
|
|
|
|
|
|
* @create: 2019-10-16 16:21
|
|
|
|
|
|
**/
|
|
|
|
|
|
@Configuration
|
|
|
|
|
|
public class MqttConfig {
|
|
|
|
|
|
private static final Logger LOGGER = LoggerFactory.getLogger(MqttConfig.class);
|
|
|
|
|
|
|
|
|
|
|
|
private static final byte[] WILL_DATA;
|
|
|
|
|
|
|
|
|
|
|
|
static {
|
|
|
|
|
|
WILL_DATA = "offline".getBytes();
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 订阅的bean名称
|
|
|
|
|
|
*/
|
|
|
|
|
|
public static final String CHANNEL_NAME_IN = "mqttInboundChannel";
|
|
|
|
|
|
/**
|
|
|
|
|
|
* 发布的bean名称
|
|
|
|
|
|
*/
|
|
|
|
|
|
public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";
|
|
|
|
|
|
|
|
|
|
|
|
@Value("${mqtt.username}")
|
|
|
|
|
|
private String username;
|
|
|
|
|
|
|
|
|
|
|
|
@Value("${mqtt.password}")
|
|
|
|
|
|
private String password;
|
|
|
|
|
|
|
|
|
|
|
|
@Value("${mqtt.url}")
|
|
|
|
|
|
private String url;
|
|
|
|
|
|
|
|
|
|
|
|
@Value("${mqtt.producer.clientId}")
|
|
|
|
|
|
private String producerClientId;
|
|
|
|
|
|
|
|
|
|
|
|
@Value("${mqtt.producer.defaultTopic}")
|
|
|
|
|
|
private String producerDefaultTopic;
|
|
|
|
|
|
|
|
|
|
|
|
@Value("${mqtt.consumer.clientId}")
|
|
|
|
|
|
private String consumerClientId;
|
|
|
|
|
|
|
|
|
|
|
|
@Value("${mqtt.consumer.defaultTopic}")
|
|
|
|
|
|
private String consumerDefaultTopic;
|
|
|
|
|
|
|
2025-07-21 10:44:43 +08:00
|
|
|
|
@Resource
|
|
|
|
|
|
private IXzHikvisionSyncService xzHikvisionSyncService;
|
2023-02-16 15:28:15 +08:00
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* MQTT连接器选项
|
|
|
|
|
|
*
|
|
|
|
|
|
* @return {@link MqttConnectOptions}
|
|
|
|
|
|
*/
|
|
|
|
|
|
@Bean
|
|
|
|
|
|
public MqttConnectOptions getMqttConnectOptions() {
|
|
|
|
|
|
MqttConnectOptions options = new MqttConnectOptions();
|
|
|
|
|
|
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
|
|
|
|
|
|
// 这里设置为true表示每次连接到服务器都以新的身份连接
|
|
|
|
|
|
options.setCleanSession(true);
|
|
|
|
|
|
// 设置连接的用户名
|
|
|
|
|
|
options.setUserName(username);
|
|
|
|
|
|
// 设置连接的密码
|
|
|
|
|
|
options.setPassword(password.toCharArray());
|
2024-07-09 16:42:03 +08:00
|
|
|
|
options.setServerURIs(StringUtils.split(url, Cts.COMMA));
|
2023-02-16 15:28:15 +08:00
|
|
|
|
// 设置超时时间 单位为秒
|
|
|
|
|
|
options.setConnectionTimeout(20);
|
|
|
|
|
|
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制
|
|
|
|
|
|
options.setKeepAliveInterval(20);
|
|
|
|
|
|
// 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
|
|
|
|
|
|
//options.setWill("willTopic", WILL_DATA, 2, false);
|
|
|
|
|
|
return options;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* MQTT客户端
|
|
|
|
|
|
*
|
|
|
|
|
|
* @return {@link org.springframework.integration.mqtt.core.MqttPahoClientFactory}
|
|
|
|
|
|
*/
|
|
|
|
|
|
@Bean
|
|
|
|
|
|
public MqttPahoClientFactory mqttClientFactory() {
|
|
|
|
|
|
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
|
|
|
|
|
|
factory.setConnectionOptions(getMqttConnectOptions());
|
|
|
|
|
|
return factory;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* MQTT信息通道(生产者)
|
|
|
|
|
|
*
|
|
|
|
|
|
* @return {@link org.springframework.messaging.MessageChannel}
|
|
|
|
|
|
*/
|
|
|
|
|
|
@Bean(name = CHANNEL_NAME_OUT)
|
|
|
|
|
|
public MessageChannel mqttOutboundChannel() {
|
|
|
|
|
|
return new DirectChannel();
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* MQTT消息处理器(生产者)
|
|
|
|
|
|
*
|
|
|
|
|
|
* @return {@link org.springframework.messaging.MessageHandler}
|
|
|
|
|
|
*/
|
|
|
|
|
|
@Bean
|
|
|
|
|
|
@ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
|
|
|
|
|
|
public MessageHandler mqttOutbound() {
|
|
|
|
|
|
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
|
|
|
|
|
|
producerClientId + System.currentTimeMillis(),
|
|
|
|
|
|
mqttClientFactory());
|
|
|
|
|
|
messageHandler.setAsync(true);
|
|
|
|
|
|
messageHandler.setDefaultTopic(producerDefaultTopic);
|
|
|
|
|
|
return messageHandler;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* MQTT消息订阅绑定(消费者)
|
|
|
|
|
|
*
|
|
|
|
|
|
* @return {@link org.springframework.integration.core.MessageProducer}
|
2025-07-21 10:44:43 +08:00
|
|
|
|
*/
|
2023-02-16 15:28:15 +08:00
|
|
|
|
@Bean
|
|
|
|
|
|
public MessageProducer inbound() {
|
|
|
|
|
|
// 可以同时消费(订阅)多个Topic
|
|
|
|
|
|
MqttPahoMessageDrivenChannelAdapter adapter =
|
|
|
|
|
|
new MqttPahoMessageDrivenChannelAdapter(
|
|
|
|
|
|
consumerClientId, mqttClientFactory(),
|
2024-07-09 16:42:03 +08:00
|
|
|
|
StringUtils.split(consumerDefaultTopic, Cts.COMMA));
|
2023-02-16 15:28:15 +08:00
|
|
|
|
adapter.setCompletionTimeout(5000);
|
|
|
|
|
|
adapter.setConverter(new DefaultPahoMessageConverter());
|
|
|
|
|
|
adapter.setQos(1);
|
|
|
|
|
|
// 设置订阅通道
|
|
|
|
|
|
adapter.setOutputChannel(mqttInboundChannel());
|
|
|
|
|
|
return adapter;
|
2025-07-21 10:44:43 +08:00
|
|
|
|
}
|
2023-02-16 15:28:15 +08:00
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
* MQTT信息通道(消费者)
|
|
|
|
|
|
*
|
|
|
|
|
|
* @return {@link org.springframework.messaging.MessageChannel}
|
2025-07-21 10:44:43 +08:00
|
|
|
|
*/
|
2023-02-16 15:28:15 +08:00
|
|
|
|
@Bean(name = CHANNEL_NAME_IN)
|
|
|
|
|
|
public MessageChannel mqttInboundChannel() {
|
|
|
|
|
|
return new DirectChannel();
|
|
|
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
|
|
|
* MQTT消息处理器(消费者)
|
|
|
|
|
|
*
|
|
|
|
|
|
* @return {@link org.springframework.messaging.MessageHandler}
|
|
|
|
|
|
*/
|
2025-07-21 10:44:43 +08:00
|
|
|
|
@Bean
|
2023-02-16 15:28:15 +08:00
|
|
|
|
@ServiceActivator(inputChannel = CHANNEL_NAME_IN)
|
|
|
|
|
|
public MessageHandler handler() {
|
|
|
|
|
|
return message -> {
|
|
|
|
|
|
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
|
|
|
|
|
|
LOGGER.info("消息主题:{}", topic);
|
|
|
|
|
|
Object payLoad = message.getPayload();
|
|
|
|
|
|
LOGGER.info("主题:{},消息接收到的数据:{}", message.getHeaders().get("mqtt_receivedTopic"),payLoad);
|
|
|
|
|
|
LOGGER.error("===================={}============", payLoad);
|
2025-07-21 10:44:43 +08:00
|
|
|
|
// if (topic.contains("mqtt/face/")) {
|
|
|
|
|
|
// JSONObject jsonObject = JSONObject.parseObject(payLoad.toString());
|
|
|
|
|
|
// if (jsonObject.getString("code").equals("200")) {
|
|
|
|
|
|
// JSONArray jsonArray = jsonObject.getJSONObject("info").getJSONArray("AddSucInfo");
|
|
|
|
|
|
// xzHikvisionSyncService.mqttAck(topic.split("/")[2], jsonArray, 1);
|
|
|
|
|
|
// JSONArray errorArray = jsonObject.getJSONObject("info").getJSONArray("AddErrInfo");
|
|
|
|
|
|
// xzHikvisionSyncService.mqttAck(topic.split("/")[2], errorArray, 0);
|
|
|
|
|
|
// }
|
|
|
|
|
|
// }
|
2023-02-16 15:28:15 +08:00
|
|
|
|
};
|
2025-07-21 10:44:43 +08:00
|
|
|
|
}
|
2023-02-16 15:28:15 +08:00
|
|
|
|
}
|