package com.zhgd.xmgl.async; import cn.hutool.core.collection.CollUtil; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.zhgd.mqtt.bean.PushPayload; import com.zhgd.mqtt.server.IMqttSender; import com.zhgd.xmgl.modules.basicdata.entity.SystemUser; import com.zhgd.xmgl.modules.basicdata.service.ISystemUserService; import com.zhgd.xmgl.modules.foundation.entity.DeepExcavationSensor; import com.zhgd.xmgl.modules.foundation.enums.DataStatusEnum; import com.zhgd.xmgl.modules.project.service.IProjectService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import java.util.*; import java.util.stream.Collectors; /** * @author 邱平毅 * @ClassName AsyncDeepExcavation * @date 2022/9/26 15:46 * @Version 1.0 * 深基坑数据处理 */ @Slf4j @Component public class AsyncDevExcavation { @Autowired private ISystemUserService systemUserService; @Autowired AsyncAiAnalyse asyncAiAnalyse; @Autowired IProjectService projectService; @Value("${mqtt-scope}") private String scope; @Autowired private IMqttSender mqttPushClient; @Autowired private AsyncCommon asyncCommon; /** * 通知实时数据异常数据 * * @param alarmState * @param deepExcavationSensor */ @Async("devExcavationExecutor") public void sendExceptionData(Integer alarmState, DeepExcavationSensor deepExcavationSensor) { try { String title = "深基坑数据异常"; String msg = "传感器编号为:" + deepExcavationSensor.getSensorSn() + ",当前处于" + DataStatusEnum.getStatusNameById(alarmState); List systemUserList = systemUserService.list(Wrappers.lambdaQuery().eq(SystemUser::getSn, deepExcavationSensor.getProjectSn()).eq(SystemUser::getAccountType, 5)); //向项目管理员和子账号推送通知 if (CollUtil.isNotEmpty(systemUserList)) { for (SystemUser systemUser : systemUserList) { PushPayload pushMessage = PushPayload.getPushPayloadBuider().setAccountId(systemUser.getUserId().toString()) .setTitle(title) .setContent(msg) .setType("8") .setItemType("传感器数据") .bulid(); String kdTopic = scope + systemUser.getUserId(); mqttPushClient.sendToMqtt(kdTopic, pushMessage.toString()); } asyncAiAnalyse.sendAppNotice(deepExcavationSensor.getProjectSn(), title, msg, systemUserList, "/pages/projectEnd/projectIndex/projectIndex"); } } catch (Exception e) { log.error("error:", e); } } @Async("devExcavationExecutor") public void sendDroppedSensor(List sensorList) { if (CollUtil.isEmpty(sensorList)) { return; } String title = "深基坑传感器异常"; Map> projectSnSensorListMap = sensorList.stream().filter(sensor -> sensor.getProjectSn() != null).collect(Collectors.groupingBy(DeepExcavationSensor::getProjectSn)); Set projectSnSet = projectSnSensorListMap.keySet(); Map projectMsgMap = getProjectMsgMap(projectSnSensorListMap); List allSystemUserList = systemUserService.list(Wrappers.lambdaQuery().in(SystemUser::getSn, projectSnSet).eq(SystemUser::getAccountType, 5)); Map> projectSnUserListMap = allSystemUserList.stream().collect(Collectors.groupingBy(SystemUser::getSn)); sendUserDroppedSensorInfo(title, projectMsgMap, projectSnUserListMap); } private void sendUserDroppedSensorInfo(String title, Map projectMsgMap, Map> projectSnUserListMap) { Set>> projectSnUserListSet = projectSnUserListMap.entrySet(); // 对项目数据进行 for (Map.Entry> projectSnUserListEntry : projectSnUserListSet) { List systemUserList = projectSnUserListEntry.getValue(); String projectSn = projectSnUserListEntry.getKey(); try { if (CollUtil.isNotEmpty(systemUserList)) { String msg = projectMsgMap.get(projectSn); for (SystemUser systemUser : systemUserList) { PushPayload pushMessage = PushPayload.getPushPayloadBuider().setAccountId(systemUser.getUserId().toString()) .setTitle(title) .setContent(msg) .setType("8") .setItemType("传感器设备") .bulid(); String kdTopic = scope + systemUser.getUserId(); mqttPushClient.sendToMqtt(kdTopic, pushMessage.toString()); Thread.sleep(200); } asyncAiAnalyse.sendAppNotice(projectSn, title, msg, systemUserList, "/pages/projectEnd/projectIndex/projectIndex"); } } catch (Exception e) { log.error("error:", e); } } } private Map getProjectMsgMap(Map> projectSnSensorListMap) { if (CollUtil.isEmpty(projectSnSensorListMap)) { return Collections.emptyMap(); } Map projectMsgMap = new HashMap<>(projectSnSensorListMap.size()); for (Map.Entry> projectSnSensorEntry : projectSnSensorListMap.entrySet()) { StringBuilder msgSb = new StringBuilder("编号为:"); projectSnSensorEntry.getValue().forEach(projectSensorList -> { msgSb.append(projectSensorList.getSensorSn()); msgSb.append("、"); }); msgSb.deleteCharAt(msgSb.lastIndexOf("、")); msgSb.append("的传感器异常,请检查设备是否在线或是否连网"); projectMsgMap.put(projectSnSensorEntry.getKey(), msgSb.toString()); } return projectMsgMap; } }