diff --git a/src/main/java/com/zhgd/mqtt/server/MqttConfig.java b/src/main/java/com/zhgd/mqtt/server/MqttConfig.java index a550d11ce..9206b30c2 100644 --- a/src/main/java/com/zhgd/mqtt/server/MqttConfig.java +++ b/src/main/java/com/zhgd/mqtt/server/MqttConfig.java @@ -1,6 +1,11 @@ package com.zhgd.mqtt.server; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; import com.zhgd.xmgl.constant.Cts; +import com.zhgd.xmgl.modules.worker.service.IWorkerInfoService; +import com.zhgd.xmgl.modules.xz.service.IXzHikvisionSyncService; import org.apache.commons.lang3.StringUtils; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.slf4j.Logger; @@ -10,12 +15,17 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; +import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; +import javax.annotation.Resource; + /** * @program: itbgp * @description: @@ -62,6 +72,8 @@ public class MqttConfig { @Value("${mqtt.consumer.defaultTopic}") private String consumerDefaultTopic; + @Resource + private IXzHikvisionSyncService xzHikvisionSyncService; /** * MQTT连接器选项 @@ -131,7 +143,7 @@ public class MqttConfig { * MQTT消息订阅绑定(消费者) * * @return {@link org.springframework.integration.core.MessageProducer} - *//* + */ @Bean public MessageProducer inbound() { // 可以同时消费(订阅)多个Topic @@ -145,24 +157,23 @@ public class MqttConfig { // 设置订阅通道 adapter.setOutputChannel(mqttInboundChannel()); return adapter; - }*/ + } /** * MQTT信息通道(消费者) * * @return {@link org.springframework.messaging.MessageChannel} - *//* + */ @Bean(name = CHANNEL_NAME_IN) public MessageChannel mqttInboundChannel() { return new DirectChannel(); } -*/ /** * MQTT消息处理器(消费者) * * @return {@link org.springframework.messaging.MessageHandler} */ - /*@Bean + @Bean @ServiceActivator(inputChannel = CHANNEL_NAME_IN) public MessageHandler handler() { return message -> { @@ -171,6 +182,15 @@ public class MqttConfig { Object payLoad = message.getPayload(); LOGGER.info("主题:{},消息接收到的数据:{}", message.getHeaders().get("mqtt_receivedTopic"),payLoad); LOGGER.error("===================={}============", payLoad); +// if (topic.contains("mqtt/face/")) { +// JSONObject jsonObject = JSONObject.parseObject(payLoad.toString()); +// if (jsonObject.getString("code").equals("200")) { +// JSONArray jsonArray = jsonObject.getJSONObject("info").getJSONArray("AddSucInfo"); +// xzHikvisionSyncService.mqttAck(topic.split("/")[2], jsonArray, 1); +// JSONArray errorArray = jsonObject.getJSONObject("info").getJSONArray("AddErrInfo"); +// xzHikvisionSyncService.mqttAck(topic.split("/")[2], errorArray, 0); +// } +// } }; - }*/ + } } diff --git a/src/main/java/com/zhgd/xmgl/modules/xz/service/IXzHikvisionSyncService.java b/src/main/java/com/zhgd/xmgl/modules/xz/service/IXzHikvisionSyncService.java index 0a30b4957..32af3d9e4 100644 --- a/src/main/java/com/zhgd/xmgl/modules/xz/service/IXzHikvisionSyncService.java +++ b/src/main/java/com/zhgd/xmgl/modules/xz/service/IXzHikvisionSyncService.java @@ -1,5 +1,6 @@ package com.zhgd.xmgl.modules.xz.service; +import com.alibaba.fastjson.JSONArray; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.service.IService; import com.zhgd.xmgl.modules.xz.entity.XzHikvisionSync; @@ -67,4 +68,6 @@ public interface IXzHikvisionSyncService extends IService { * @throws Exception */ void retry(Map paramMap) throws Exception; + + void mqttAck(String deviceSn, JSONArray workerIds, Integer isSuccess); } diff --git a/src/main/java/com/zhgd/xmgl/modules/xz/service/impl/XzHikvisionSyncServiceImpl.java b/src/main/java/com/zhgd/xmgl/modules/xz/service/impl/XzHikvisionSyncServiceImpl.java index 27ad0890d..3c673a702 100644 --- a/src/main/java/com/zhgd/xmgl/modules/xz/service/impl/XzHikvisionSyncServiceImpl.java +++ b/src/main/java/com/zhgd/xmgl/modules/xz/service/impl/XzHikvisionSyncServiceImpl.java @@ -1,6 +1,9 @@ package com.zhgd.xmgl.modules.xz.service.impl; import cn.hutool.core.util.StrUtil; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.metadata.IPage; @@ -26,6 +29,7 @@ import com.zhgd.xmgl.modules.xz.enums.XzHikvisionSyncOperateEnum; import com.zhgd.xmgl.modules.xz.enums.XzHikvisionSyncTypeEnum; import com.zhgd.xmgl.modules.xz.mapper.XzHikvisionSyncMapper; import com.zhgd.xmgl.modules.xz.service.IXzHikvisionSyncService; +import com.zhgd.xmgl.modules.xz.special.entity.XzGasAnalyze; import com.zhgd.xmgl.util.PageUtil; import com.zhgd.xmgl.util.RefUtil; import com.zhgd.xmgl.util.ThreadLocalUtil; @@ -37,6 +41,7 @@ import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; import java.util.*; +import java.util.stream.Collectors; /** * @Description: 星纵-海康同步数据 @@ -251,4 +256,35 @@ public class XzHikvisionSyncServiceImpl extends ServiceImpl syncList = new ArrayList<>(); + for (Object ack : ackInfo) { + JSONObject obj = JSONObject.parseObject(JSON.toJSONString(ack)); + String customId = obj.getString("customId"); + WorkerInfo workerInfo = workerInfoMapper.selectById(customId); + if (workerInfo != null) { + build(syncList, workerInfo, deviceSn, isSuccess); + } + } + if (syncList.size() > 0) { + this.saveBatch(syncList); + } + } + + private void build(List syncList, WorkerInfo workerInfo, String deviceSn, Integer isSuccess) { + XzHikvisionSync sync = new XzHikvisionSync(); + sync.setProjectSn(workerInfo.getProjectSn()); + sync.setType(1); + sync.setOperate(1); + sync.setDeviceSn(deviceSn); + sync.setWhoId(workerInfo.getId()); + sync.setIsSuccess(isSuccess); + sync.setCreateDate(new Date()); + sync.setUpdateDate(new Date()); + sync.setBigType(1); + syncList.add(sync); + syncList.add(sync); + } } diff --git a/src/main/java/com/zhgd/xmgl/task/WorkerTask.java b/src/main/java/com/zhgd/xmgl/task/WorkerTask.java index 5b3607371..3458ba102 100644 --- a/src/main/java/com/zhgd/xmgl/task/WorkerTask.java +++ b/src/main/java/com/zhgd/xmgl/task/WorkerTask.java @@ -30,6 +30,7 @@ import com.zhgd.xmgl.modules.project.entity.ProjectExternalSystemService; import com.zhgd.xmgl.modules.project.entity.ProjectUfaceConfig; import com.zhgd.xmgl.modules.project.mapper.ProjectExternalSystemServiceMapper; import com.zhgd.xmgl.modules.project.mapper.ProjectMapper; +import com.zhgd.xmgl.modules.project.mapper.ProjectUfaceConfigMapper; import com.zhgd.xmgl.modules.project.service.IProjectUfaceConfigService; import com.zhgd.xmgl.modules.quality.entity.QualityRegion; import com.zhgd.xmgl.modules.quality.service.IQualityRegionService; @@ -40,10 +41,7 @@ import com.zhgd.xmgl.modules.worker.mapper.WorkerInfoMapper; import com.zhgd.xmgl.modules.worker.service.*; import com.zhgd.xmgl.modules.worker.service.impl.WorkerInfoServiceImpl; import com.zhgd.xmgl.modules.worker.service.IDangongWorkerFaceStatusService; -import com.zhgd.xmgl.modules.xz.entity.XzCertificateExpireAlarmRecord; -import com.zhgd.xmgl.modules.xz.entity.XzWorkerSafeWatchAlarm; -import com.zhgd.xmgl.modules.xz.entity.XzWorkerSafeWatchConfig; -import com.zhgd.xmgl.modules.xz.entity.XzWorkerSafeWatchManager; +import com.zhgd.xmgl.modules.xz.entity.*; import com.zhgd.xmgl.modules.xz.security.entity.XzSecurityInspectTaskItemRecord; import com.zhgd.xmgl.modules.xz.security.entity.XzSecurityInspectTaskRecord; import com.zhgd.xmgl.modules.xz.security.entity.XzSecurityQualityInspectionRecord; @@ -52,14 +50,12 @@ import com.zhgd.xmgl.modules.xz.security.mapper.XzSecurityInspectTaskItemRecordM import com.zhgd.xmgl.modules.xz.security.mapper.XzSecurityInspectTaskRecordMapper; import com.zhgd.xmgl.modules.xz.security.mapper.XzSecurityQualityInspectionRecordMapper; import com.zhgd.xmgl.modules.xz.security.service.IXzSecurityQualityInspectionRecordService; +import com.zhgd.xmgl.modules.xz.service.IXzHikvisionSyncService; import com.zhgd.xmgl.modules.xz.service.IXzWorkerSafeWatchAlarmService; import com.zhgd.xmgl.modules.xz.service.IXzWorkerSafeWatchConfigService; import com.zhgd.xmgl.modules.xz.service.IXzWorkerSafeWatchManagerService; import com.zhgd.xmgl.modules.xz.service.impl.XzCertificateExpireAlarmRecordServiceImpl; -import com.zhgd.xmgl.util.Base64Util; -import com.zhgd.xmgl.util.ElecardUtil; -import com.zhgd.xmgl.util.MapBuilder; -import com.zhgd.xmgl.util.NumberUtils; +import com.zhgd.xmgl.util.*; import lombok.extern.slf4j.Slf4j; import net.javacrumbs.shedlock.core.SchedulerLock; import org.apache.commons.lang3.StringUtils; @@ -77,6 +73,8 @@ import java.util.concurrent.CompletableFuture; import java.util.function.Function; import java.util.stream.Collectors; +import static com.zhgd.xmgl.modules.project.enums.ProjectUfaceConfigSupplierTypeEnum.MQTT; + /** * @program: wisdomSite * @description: 劳务人员定时任务 @@ -149,6 +147,14 @@ public class WorkerTask { @Autowired private IDangongWorkerFaceStatusService dangongWorkerFaceStatusService; + @Lazy + @Autowired + private IXzHikvisionSyncService xzHikvisionSyncService; + + @Lazy + @Autowired + private ProjectUfaceConfigMapper projectUfaceConfigMapper; + /** * 定时修改用户码状态 */ @@ -942,10 +948,23 @@ public class WorkerTask { // /** // * MQTT同步下发人员信息 // */ -// @Scheduled(cron = "0 0/10 * * * ?") +// @Scheduled(cron = "0 0/15 * * * ?") // @SchedulerLock(name = "mqttIssuedWorkerInfo", lockAtMostFor = 1000 * 55, lockAtLeastFor = 1000 * 55) // @RequestMapping("mqttIssuedWorkerInfo") // public void mqttIssuedWorkerInfo() { -// +// List configList = projectUfaceConfigMapper.selectList(Wrappers.lambdaQuery() +// .eq(ProjectUfaceConfig::getSupplierType, MQTT.getCode()) +// .eq(ProjectUfaceConfig::getIssueDev, 1)); +// for (ProjectUfaceConfig projectUfaceConfig : configList) { +// List list = xzHikvisionSyncService.list(Wrappers.lambdaQuery() +// .eq(XzHikvisionSync::getProjectSn, projectUfaceConfig.getProjectSn()) +// .eq(XzHikvisionSync::getIsSuccess, 0) +// .eq(XzHikvisionSync::getType, 1) +// .eq(XzHikvisionSync::getBigType, 1)); +// for (XzHikvisionSync sync : list) { +// WorkerInfo workerInfo = workerInfoMapper.selectById(sync.getWhoId()); +// MqttFaceDevUtil.addOrUpdatePerson(workerInfo, sync.getDeviceSn()); +// } +// } // } } diff --git a/src/main/java/com/zhgd/xmgl/util/MqttFaceDevUtil.java b/src/main/java/com/zhgd/xmgl/util/MqttFaceDevUtil.java index 86d0e8006..d5d17ac74 100644 --- a/src/main/java/com/zhgd/xmgl/util/MqttFaceDevUtil.java +++ b/src/main/java/com/zhgd/xmgl/util/MqttFaceDevUtil.java @@ -11,9 +11,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Value; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; +import java.util.*; /** * MQTT下发人员信息 @@ -49,6 +47,29 @@ public class MqttFaceDevUtil { } } + /** + * 添加修改人员 + * + * @param workerInfo + * @param devSn + */ + public static void addOrUpdatePersonList(List workerInfo, String devSn) { + if (StrUtil.isNotBlank(devSn)) { + String[] splitArr = StringUtils.split(devSn, ","); + for (int i = 0; i < splitArr.length; i++) { + String ds = splitArr[i]; + Map map = build(workerInfo); + String payload = JSON.toJSONString(map); + String topic = mqttTopic + ds; + log.info("发送主题信息:{},主题为:{}", payload, topic); + IMqttSender mqttSender = SpringContextUtils.getBean(IMqttSender.class); + mqttSender.sendToMqtt(topic, 2, payload); + } + } else { + log.info("未查询到设备sn"); + } + } + /** * 删除人员 * @@ -78,8 +99,11 @@ public class MqttFaceDevUtil { private static Map build(WorkerInfo workerInfo) { Map map = new HashMap<>(); - map.put("messageId", "ID:" + System.currentTimeMillis() + ":" + workerInfo.getPersonSn()); - map.put("operator", "EditPerson"); + map.put("messageId", "ID:" + System.currentTimeMillis() + ":" + workerInfo.getProjectSn()); + map.put("DataBegin", "BeginFlag"); + map.put("operator", "EditPersonsNew"); + map.put("PersonNum", 1); + List> infoList = new ArrayList<>(); Map info = new HashMap<>(); info.put("customId", workerInfo.getPersonSn()); info.put("name", workerInfo.getWorkerName()); @@ -94,7 +118,38 @@ public class MqttFaceDevUtil { info.put("personType", 0); info.put("cardType", 0); info.put("pic", basePath + "/" + workerInfo.getFieldAcquisitionUrl()); - map.put("info", info); + infoList.add(info); + map.put("info", infoList); + map.put("DataEnd", "EndFlag"); + return map; + } + + private static Map build(List workerInfos) { + Map map = new HashMap<>(); + map.put("messageId", "ID:" + System.currentTimeMillis() + ":" + workerInfos.get(0).getProjectSn()); + map.put("DataBegin", "BeginFlag"); + map.put("operator", "EditPersonsNew"); + map.put("PersonNum", 1); + List> infoList = new ArrayList<>(); + for (WorkerInfo workerInfo : workerInfos) { + Map info = new HashMap<>(); + info.put("customId", workerInfo.getPersonSn()); + info.put("name", workerInfo.getWorkerName()); + info.put("nation", 1); + info.put("gender", workerInfo.getSex() - 1); + info.put("birthday", workerInfo.getBirthday()); + info.put("address", workerInfo.getNowPlace()); + info.put("idCard", workerInfo.getIdCard()); + info.put("tempCardType", 0); + info.put("telnum1", workerInfo.getPhoneNumber()); + info.put("native", workerInfo.getNativePlace()); + info.put("personType", 0); + info.put("cardType", 0); + info.put("pic", basePath + "/" + workerInfo.getFieldAcquisitionUrl()); + infoList.add(info); + } + map.put("info", infoList); + map.put("DataEnd", "EndFlag"); return map; } } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 7544112a3..2d6098885 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -71,7 +71,7 @@ mqtt.url=tcp://jxj.zhgdyun.com:1883 mqtt.producer.clientId=mqttProd mqtt.producer.defaultTopic=topic1 mqtt.consumer.clientId=mqttConsumer -mqtt.consumer.defaultTopic=topic1 +mqtt.consumer.defaultTopic=topic1,mqtt/face/+/Ack #mqtt.consumer.defaultTopic=/P114101/202203010084 #server.http2.enabled=true #server.ssl.key-store=classpath:www.cscec1b1.com.jks