package com.zhgd.xmgl.async; import cn.hutool.core.collection.CollUtil; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.zhgd.jeecg.common.mybatis.EntityMap; import com.zhgd.mqtt.bean.PushPayload; import com.zhgd.mqtt.server.IMqttSender; import com.zhgd.xmgl.modules.basicdata.entity.CompanyConfig; import com.zhgd.xmgl.modules.basicdata.entity.SystemUser; import com.zhgd.xmgl.modules.basicdata.enums.SystemUserAccountTypeEnum; import com.zhgd.xmgl.modules.basicdata.mapper.SystemUserMapper; import com.zhgd.xmgl.modules.basicdata.service.ICompanyConfigService; import com.zhgd.xmgl.modules.basicdata.service.INoticeService; import com.zhgd.xmgl.modules.project.entity.ProjectVideoConfig; import com.zhgd.xmgl.modules.project.service.IProjectService; import com.zhgd.xmgl.push.service.UniPushService; import com.zhgd.xmgl.util.HikVideoUtil; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Lazy; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.AsyncResult; import org.springframework.stereotype.Component; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.stream.Collectors; /** * @author 邱平毅 * @ClassName AsyncCommon * @date 2022/9/30 10:09 * @Version 1.0 */ @Slf4j @Component public class AsyncCommon { @Autowired private INoticeService noticeService; @Autowired private SystemUserMapper systemUserMapper; @Lazy @Autowired AsyncAiAnalyse asyncAiAnalyse; @Autowired IProjectService projectService; @Value("${mqtt-scope}") private String scope; @Autowired private IMqttSender mqttPushClient; @Autowired private UniPushService uniPushService; @Autowired private ICompanyConfigService companyConfigService; /*** * 通过项目sn发送信息到相关用户下的MQ以及app * @param title 标题 * @param msg 消息 * @param itemType * @param projectSn 项目sn * @param payload 跳转页面 */ @Async("devExcavationExecutor") public synchronized void sendMqAndApp(String title, String msg, String itemType, String projectSn, String payload) { try { List systemUserList = systemUserMapper.selectProjectSystemUserList(projectSn); //向项目管理员和子账号推送通知 if (CollUtil.isNotEmpty(systemUserList)) { sendMq(title, msg, itemType, systemUserList); asyncAiAnalyse.sendAppNotice(projectSn, title, msg, systemUserList, payload); } } catch (Exception e) { log.error("error:", e); } } /** * 通过项目sn的列表发送信息到相关用户下的MQ以及app * * @param title * @param msg * @param itemType * @param projectSnSet * @param payload */ @Async("devExcavationExecutor") public synchronized void sendMqAndAppSnSet(String title, String msg, String itemType, Set projectSnSet, String payload) { try { //向项目管理员和子账号推送通知 List systemUserList = systemUserMapper.selectProjectSystemUserListBySnSet(projectSnSet); if (CollUtil.isNotEmpty(systemUserList)) { sendMq(title, msg, itemType, systemUserList); Map> snUserMap = systemUserList.stream().filter(user -> user.getClientId() != null).collect(Collectors.groupingBy(SystemUser::getSn)); for (Map.Entry> snUserListEntry : snUserMap.entrySet()) { asyncAiAnalyse.sendAppNotice(snUserListEntry.getKey(), title, msg, snUserListEntry.getValue(), payload); } } } catch (Exception e) { log.error("error:", e); } } /** * 发给mq信息给相关的用户 * * @param title * @param msg * @param itemType * @param systemUserList 用户列表 */ public void sendMq(String title, String msg, String itemType, List systemUserList) { for (SystemUser systemUser : systemUserList) { PushPayload pushMessage = PushPayload.getPushPayloadBuider().setAccountId(systemUser.getUserId().toString()) .setTitle(title) .setContent(msg) .setType("8") .setItemType(itemType) .bulid(); String kdTopic = scope + systemUser.getUserId(); mqttPushClient.sendToMqtt(kdTopic, pushMessage.toString()); } } /** * 发送信息给相关的用户列表(不同用户通知相同消息) * * @param title * @param msg 用户对应消息Map * @param itemType * @param payload * @param systemUserList 用户列表 */ public void sendAppAndMq(String title, String msg, String itemType, String payload, List systemUserList) { systemUserList.forEach(systemUser -> CompletableFuture.supplyAsync(() -> { PushPayload pushMessage = PushPayload.getPushPayloadBuider().setAccountId(systemUser.getUserId().toString()) .setTitle(title) .setContent(msg) .setType("8") .setItemType(itemType) .bulid(); String kdTopic = scope + systemUser.getUserId(); mqttPushClient.sendToMqtt(kdTopic, pushMessage.toString()); return true; })); Map> snUserListMap = systemUserList.stream().collect(Collectors.groupingBy(SystemUser::getSn)); snUserListMap.forEach((sn, userList) -> sendAppNotice(sn, title, msg, userList, payload)); } /** * 发送信息给相关的用户列表(不同用户通知不同信息) * * @param title * @param userMsgMap 用户对应消息Map * @param itemType * @param payload * @param systemUserList 用户列表 */ public void sendAppAndMq(String title, Map userMsgMap, String itemType, String payload, List systemUserList) { systemUserList.forEach(systemUser -> CompletableFuture.supplyAsync(() -> { PushPayload pushMessage = PushPayload.getPushPayloadBuider().setAccountId(systemUser.getUserId().toString()) .setTitle(title) .setContent(userMsgMap.get(systemUser.getUserId())) .setType("8") .setItemType(itemType) .bulid(); String kdTopic = scope + systemUser.getUserId(); mqttPushClient.sendToMqtt(kdTopic, pushMessage.toString()); return true; })); Map> snUserListMap = systemUserList.stream().collect(Collectors.groupingBy(SystemUser::getSn)); snUserListMap.forEach((sn, userList) -> sendAppNotice(sn, title, userMsgMap, userList, payload)); } /** * 发送信息给相关的用户id(不同用户通知不同信息) * * @param title * @param userMsgMap 用户对应消息Map * @param itemType * @param payload * @param userIdList 用户id列表 */ @Async("asyncExecutor") public void sendAppAndMqByUserId(String title, Map userMsgMap, String itemType, String payload, List userIdList) { List userList = systemUserMapper.selectList(Wrappers.lambdaQuery().in(SystemUser::getUserId, userIdList)); sendAppAndMq(title, userMsgMap, itemType, payload, userList); } /** * 向手机端推送通知(不同用户不同信息) * * @param projectSn * @param title * @param userMsgMap * @param systemUserList */ public void sendAppNotice(String projectSn, String title, Map userMsgMap, List systemUserList, String payload) { try { CompanyConfig companyConfig = companyConfigService.getCompanyConfigByProjectSn(projectSn); log.info("companyConfig-------------发送开关" + (companyConfig == null ? "" : companyConfig.getAppNoticeType())); if (companyConfig != null && companyConfig.getAppNoticeType() != null && companyConfig.getAppNoticeType() == 1) { systemUserList.forEach(systemUser -> CompletableFuture.supplyAsync(() -> { if (Objects.equals(systemUser.getAccountType(), SystemUserAccountTypeEnum.PROJECT_ACCOUNT.getValue()) || Objects.equals(systemUser.getAccountType(), SystemUserAccountTypeEnum.PROJECT_SUB_ACCOUNT.getValue())) { String msg = userMsgMap.get(systemUser.getUserId()); log.info("发送-----" + systemUser.getAccount() + "消息:" + msg); uniPushService.pushToSingleByAlias(systemUser.getClientId(), title, msg, payload); } return true; })); } } catch (Exception e) { log.error("error:", e); } } /** * 向手机端推送通知(不同用户相同信息) * * @param projectSn * @param title * @param msg * @param systemUserList */ public void sendAppNotice(String projectSn, String title, String msg, List systemUserList, String payload) { try { CompanyConfig companyConfig = companyConfigService.getCompanyConfigByProjectSn(projectSn); log.info("companyConfig-------------发送开关" + (companyConfig == null ? "" : companyConfig.getAppNoticeType())); if (companyConfig != null && companyConfig.getAppNoticeType() != null && companyConfig.getAppNoticeType() == 1) { systemUserList.forEach(systemUser -> CompletableFuture.supplyAsync(() -> { if (Objects.equals(systemUser.getAccountType(), SystemUserAccountTypeEnum.PROJECT_ACCOUNT.getValue()) || Objects.equals(systemUser.getAccountType(), SystemUserAccountTypeEnum.PROJECT_SUB_ACCOUNT.getValue())) { log.info("发送-----" + systemUser.getAccount() + "消息:" + msg); uniPushService.pushToSingleByAlias(systemUser.getClientId(), title, msg, payload); } return true; })); } } catch (Exception e) { log.error("error:", e); } } /** * 发送信息给相关的用户id(不同用户通知不同信息),用于处理一个用户多信息的情况 * * @param title * @param customKeyMsgMap 用户对应消息Map<自定义主键, 对应msg> 自定义主键:userId + "_" + customKey * @param itemType * @param payload * @param userList 用户列表 */ @Async("asyncExecutor") public void sendAppAndMqCustomKey(String title, Map customKeyMsgMap, String itemType, String payload, List userList) { if (CollUtil.isEmpty(userList)) { log.warn("没有相关联的用户!"); return; } // 根据不同的项目分组 Map> snUserListMap = userList.stream().collect(Collectors.groupingBy(SystemUser::getSn)); // 根据用户id分组自定义消息内容 Map>> userMsgListMap = customKeyMsgMap.entrySet().stream().collect(Collectors.groupingBy(entry -> entry.getKey().substring(0, entry.getKey().indexOf("_")))); snUserListMap.forEach((sn, snUserList) -> { // 异步通知不同项目下的人员 CompletableFuture.supplyAsync(() -> { // 查看企业是否开启通知 CompanyConfig companyConfig = companyConfigService.getCompanyConfigByProjectSn(sn); log.info("companyConfig-------------发送开关" + (companyConfig == null ? "" : companyConfig.getAppNoticeType())); // 是否发送app通知 boolean isSendApp = companyConfig != null && companyConfig.getAppNoticeType() != null && companyConfig.getAppNoticeType() == 1; snUserList.forEach(systemUser -> { String userId = String.valueOf(systemUser.getUserId()); // 用户对应的消息列表 List> msgList = userMsgListMap.get(userId); // 循环消息列表发送通知 msgList.forEach(msgMap -> { // 消息 String msg = msgMap.getValue(); // 是否发送信息到app if (isSendApp) { log.info("发送-----" + systemUser.getAccount() + "消息:" + msg); uniPushService.pushToSingleByAlias(systemUser.getClientId(), title, msg, payload); } // 发送消息到mq sendMq(title, itemType, msg, userId); }); }); return true; }); }); } @Async("asyncExecutor") public void sendMq(String title, String itemType, String msg, String userId) { PushPayload pushMessage = PushPayload.getPushPayloadBuider().setAccountId(userId) .setTitle(title) .setContent(msg) .setType("8") .setItemType(itemType) .bulid(); String kdTopic = scope + userId; mqttPushClient.sendToMqtt(kdTopic, pushMessage.toString()); } @Async("asyncExecutor") public Future> getPlayUrlAsync(ProjectVideoConfig videoConfig, EntityMap entityMap) { try { String serialNumber = MapUtils.getString(entityMap, "serialNumber"); if (StringUtils.isNotEmpty(serialNumber)) { String url = HikVideoUtil.callPostApiGetPreviewURL(serialNumber, "hls", null, videoConfig.getAccount(), videoConfig.getPassword(), videoConfig.getAppId(), videoConfig.getAppSecret()); if (StringUtils.isNotEmpty(url)) { HashMap map = new HashMap<>(16); map.put("url", url); map.putAll(entityMap); Future> future = new AsyncResult<>(map); return future; } } } catch (Exception e) { log.error("err:", e); } return null; } }