2023-02-16 15:28:15 +08:00

338 lines
14 KiB
Java
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package com.zhgd.xmgl.async;
import cn.hutool.core.collection.CollUtil;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.zhgd.mqtt.bean.PushPayload;
import com.zhgd.mqtt.server.IMqttSender;
import com.zhgd.xmgl.modules.basicdata.entity.CompanyConfig;
import com.zhgd.xmgl.modules.basicdata.entity.SystemUser;
import com.zhgd.xmgl.modules.basicdata.mapper.SystemUserMapper;
import com.zhgd.xmgl.modules.basicdata.service.ICompanyConfigService;
import com.zhgd.xmgl.modules.project.service.IProjectService;
import com.zhgd.xmgl.push.service.UniPushService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
/**
* @author 邱平毅
* @ClassName AsyncCommon
* @date 2022/9/30 10:09
* @Version 1.0
*/
@Slf4j
@Component
public class AsyncCommon {
@Autowired
private SystemUserMapper systemUserMapper;
@Autowired
AsyncAiAnalyse asyncAiAnalyse;
@Autowired
IProjectService projectService;
@Value("${mqtt-scope}")
private String scope;
@Autowired
private IMqttSender mqttPushClient;
@Autowired
private UniPushService uniPushService;
@Autowired
private ICompanyConfigService companyConfigService;
/***
* 通过项目sn发送信息到相关用户下的MQ以及app
* @param title 标题
* @param msg 消息
* @param itemType
* @param projectSn 项目sn
* @param payload 跳转页面
*/
@Async("devExcavationExecutor")
public synchronized void sendMqAndApp(String title, String msg, String itemType, String projectSn, String payload) {
try {
List<SystemUser> systemUserList = systemUserMapper.selectProjectSystemUserList(projectSn);
//向项目管理员和子账号推送通知
if (CollUtil.isNotEmpty(systemUserList)) {
sendMq(title, msg, itemType, systemUserList);
asyncAiAnalyse.sendAppNotice(projectSn, title, msg, systemUserList, payload);
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 通过项目sn的列表发送信息到相关用户下的MQ以及app
*
* @param title
* @param msg
* @param itemType
* @param projectSnSet
* @param payload
*/
@Async("devExcavationExecutor")
public synchronized void sendMqAndAppSnSet(String title, String msg, String itemType, Set<String> projectSnSet, String payload) {
try {
//向项目管理员和子账号推送通知
List<SystemUser> systemUserList = systemUserMapper.selectProjectSystemUserListBySnSet(projectSnSet);
if (CollUtil.isNotEmpty(systemUserList)) {
sendMq(title, msg, itemType, systemUserList);
Map<String, List<SystemUser>> snUserMap = systemUserList.stream().filter(user -> user.getClientId() != null).collect(Collectors.groupingBy(SystemUser::getSn));
for (Map.Entry<String, List<SystemUser>> snUserListEntry : snUserMap.entrySet()) {
asyncAiAnalyse.sendAppNotice(snUserListEntry.getKey(), title, msg, snUserListEntry.getValue(), payload);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 通过用户id的集合发送信息到相关用户下的MQ以及app
*
* @param title
* @param msg
* @param itemType
* @param userIdSet
* @param payload
*/
@Async("asyncExecutor")
public synchronized void sendMqAndAppUserIdColl(String title, String msg, String itemType, Collection<String> userIdSet, String payload) {
try {
//向项目管理员和子账号推送通知
List<SystemUser> systemUserList = systemUserMapper.selectList(Wrappers.lambdaQuery(SystemUser.class).in(SystemUser::getUserId, userIdSet));
if (CollUtil.isNotEmpty(systemUserList)) {
sendMq(title, msg, itemType, systemUserList);
Map<String, List<SystemUser>> snUserMap = systemUserList.stream().filter(user -> user.getClientId() != null).collect(Collectors.groupingBy(SystemUser::getSn));
for (Map.Entry<String, List<SystemUser>> snUserListEntry : snUserMap.entrySet()) {
asyncAiAnalyse.sendAppNotice(snUserListEntry.getKey(), title, msg, snUserListEntry.getValue(), payload);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 发给mq信息给相关的用户
*
* @param title
* @param msg
* @param itemType
* @param systemUserList 用户列表
*/
private void sendMq(String title, String msg, String itemType, List<SystemUser> systemUserList) {
for (SystemUser systemUser : systemUserList) {
PushPayload pushMessage = PushPayload.getPushPayloadBuider().setAccountId(systemUser.getUserId().toString())
.setTitle(title)
.setContent(msg)
.setType("8")
.setItemType(itemType)
.bulid();
String kdTopic = scope + systemUser.getUserId();
mqttPushClient.sendToMqtt(kdTopic, pushMessage.toString());
}
}
/**
* 发送信息给相关的用户列表(不同用户通知相同消息)
*
* @param title
* @param msg 用户对应消息Map
* @param itemType
* @param payload
* @param systemUserList 用户列表
*/
public void sendAppAndMq(String title, String msg, String itemType, String payload, List<SystemUser> systemUserList) {
systemUserList.forEach(systemUser -> CompletableFuture.supplyAsync(() -> {
PushPayload pushMessage = PushPayload.getPushPayloadBuider().setAccountId(systemUser.getUserId().toString())
.setTitle(title)
.setContent(msg)
.setType("8")
.setItemType(itemType)
.bulid();
String kdTopic = scope + systemUser.getUserId();
mqttPushClient.sendToMqtt(kdTopic, pushMessage.toString());
return true;
}));
Map<String, List<SystemUser>> snUserListMap = systemUserList.stream().collect(Collectors.groupingBy(SystemUser::getSn));
snUserListMap.forEach((sn, userList) -> sendAppNotice(sn, title, msg, userList, payload));
}
/**
* 发送信息给相关的用户列表(不同用户通知不同信息)
*
* @param title
* @param userMsgMap 用户对应消息Map
* @param itemType
* @param payload
* @param systemUserList 用户列表
*/
public void sendAppAndMq(String title, Map<Long, String> userMsgMap, String itemType, String payload, List<SystemUser> systemUserList) {
systemUserList.forEach(systemUser -> CompletableFuture.supplyAsync(() -> {
PushPayload pushMessage = PushPayload.getPushPayloadBuider().setAccountId(systemUser.getUserId().toString())
.setTitle(title)
.setContent(userMsgMap.get(systemUser.getUserId()))
.setType("8")
.setItemType(itemType)
.bulid();
String kdTopic = scope + systemUser.getUserId();
mqttPushClient.sendToMqtt(kdTopic, pushMessage.toString());
return true;
}));
Map<String, List<SystemUser>> snUserListMap = systemUserList.stream().collect(Collectors.groupingBy(SystemUser::getSn));
snUserListMap.forEach((sn, userList) -> sendAppNotice(sn, title, userMsgMap, userList, payload));
}
/**
* 发送信息给相关的用户id不同用户通知不同信息
*
* @param title
* @param userMsgMap 用户对应消息Map
* @param itemType
* @param payload
* @param userIdList 用户id列表
*/
@Async("asyncExecutor")
public void sendAppAndMqByUserId(String title, Map<Long, String> userMsgMap, String itemType, String payload, List<Long> userIdList) {
List<SystemUser> userList = systemUserMapper.selectList(Wrappers.<SystemUser>lambdaQuery().in(SystemUser::getUserId, userIdList));
sendAppAndMq(title, userMsgMap, itemType, payload, userList);
}
/**
* 向手机端推送通知(不同用户不同信息)
*
* @param projectSn
* @param title
* @param userMsgMap
* @param systemUserList
*/
public void sendAppNotice(String projectSn, String title, Map<Long, String> userMsgMap, List<SystemUser> systemUserList, String payload) {
try {
CompanyConfig companyConfig = companyConfigService.getCompanyConfigByProjectSn(projectSn);
log.info("companyConfig-------------发送开关" + (companyConfig == null ? "" : companyConfig.getAppNoticeType()));
if (companyConfig != null && companyConfig.getAppNoticeType() != null && companyConfig.getAppNoticeType() == 1) {
systemUserList.forEach(systemUser -> CompletableFuture.supplyAsync(() -> {
if (systemUser.getAccountType() == 5 || systemUser.getAccountType() == 6) {
String msg = userMsgMap.get(systemUser.getUserId());
log.info("发送-----" + systemUser.getAccount() + "消息:" + msg);
uniPushService.pushToSingleByAlias(systemUser.getClientId(), title, msg, payload);
}
return true;
}));
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 向手机端推送通知(不同用户相同信息)
*
* @param projectSn
* @param title
* @param msg
* @param systemUserList
*/
public void sendAppNotice(String projectSn, String title, String msg, List<SystemUser> systemUserList, String payload) {
try {
CompanyConfig companyConfig = companyConfigService.getCompanyConfigByProjectSn(projectSn);
log.info("companyConfig-------------发送开关" + (companyConfig == null ? "" : companyConfig.getAppNoticeType()));
if (companyConfig != null && companyConfig.getAppNoticeType() != null && companyConfig.getAppNoticeType() == 1) {
systemUserList.forEach(systemUser -> CompletableFuture.supplyAsync(() -> {
if (systemUser.getAccountType() == 5 || systemUser.getAccountType() == 6) {
log.info("发送-----" + systemUser.getAccount() + "消息:" + msg);
uniPushService.pushToSingleByAlias(systemUser.getClientId(), title, msg, payload);
}
return true;
}));
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 发送信息给相关的用户id不同用户通知不同信息用于处理一个用户多信息的情况
*
* @param title
* @param customKeyMsgMap 用户对应消息Map<自定义主键, 对应msg> 自定义主键userId + "_" + customKey
* @param itemType
* @param payload
* @param userList 用户列表
*/
@Async("asyncExecutor")
public void sendAppAndMqCustomKey(String title, Map<String, String> customKeyMsgMap, String itemType, String payload, List<SystemUser> userList) {
if (CollUtil.isEmpty(userList)) {
log.warn("没有相关联的用户!");
return;
}
// 根据不同的项目分组
Map<String, List<SystemUser>> snUserListMap = userList.stream().collect(Collectors.groupingBy(SystemUser::getSn));
// 根据用户id分组自定义消息内容
Map<String, List<Map.Entry<String, String>>> userMsgListMap = customKeyMsgMap.entrySet().stream().collect(Collectors.groupingBy(entry -> entry.getKey().substring(0, entry.getKey().indexOf("_"))));
snUserListMap.forEach((sn, snUserList) -> {
// 异步通知不同项目下的人员
CompletableFuture.supplyAsync(() -> {
// 查看企业是否开启通知
CompanyConfig companyConfig = companyConfigService.getCompanyConfigByProjectSn(sn);
log.info("companyConfig-------------发送开关" + (companyConfig == null ? "" : companyConfig.getAppNoticeType()));
// 是否发送app通知
boolean isSendApp = companyConfig != null && companyConfig.getAppNoticeType() != null && companyConfig.getAppNoticeType() == 1;
snUserList.forEach(systemUser -> {
String userId = String.valueOf(systemUser.getUserId());
// 用户对应的消息列表
List<Map.Entry<String, String>> msgList = userMsgListMap.get(userId);
// 循环消息列表发送通知
msgList.forEach(msgMap -> {
// 消息
String msg = msgMap.getValue();
// 是否发送信息到app
if (isSendApp) {
log.info("发送-----" + systemUser.getAccount() + "消息:" + msg);
uniPushService.pushToSingleByAlias(systemUser.getClientId(), title, msg, payload);
}
// 发送消息到mq
sendMq(title, itemType, msg, userId);
});
});
return true;
});
});
}
@Async("asyncExecutor")
public void sendMq(String title, String itemType, String msg, String userId) {
PushPayload pushMessage = PushPayload.getPushPayloadBuider().setAccountId(userId)
.setTitle(title)
.setContent(msg)
.setType("8")
.setItemType(itemType)
.bulid();
String kdTopic = scope + userId;
mqttPushClient.sendToMqtt(kdTopic, pushMessage.toString());
}
}