264 lines
11 KiB
Java
264 lines
11 KiB
Java
package com.zhgd.xmgl.task;
|
||
|
||
import cn.hutool.core.collection.CollUtil;
|
||
import cn.hutool.core.date.DateUtil;
|
||
import cn.hutool.core.util.NumberUtil;
|
||
import cn.hutool.core.util.StrUtil;
|
||
import com.alibaba.fastjson.JSONArray;
|
||
import com.alibaba.fastjson.JSONObject;
|
||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||
import com.zhgd.redis.lock.RedisRepository;
|
||
import com.zhgd.xmgl.modules.project.entity.Project;
|
||
import com.zhgd.xmgl.modules.project.mapper.ProjectMapper;
|
||
import com.zhgd.xmgl.modules.sewage.entity.SewageAlarm;
|
||
import com.zhgd.xmgl.modules.sewage.entity.SewageData;
|
||
import com.zhgd.xmgl.modules.sewage.entity.SewageDev;
|
||
import com.zhgd.xmgl.modules.sewage.service.ISewageAlarmService;
|
||
import com.zhgd.xmgl.modules.sewage.service.ISewageDataService;
|
||
import com.zhgd.xmgl.modules.sewage.service.ISewageDevService;
|
||
import com.zhgd.xmgl.util.RenZhiUtil;
|
||
import lombok.extern.slf4j.Slf4j;
|
||
import net.javacrumbs.shedlock.core.SchedulerLock;
|
||
import org.jetbrains.annotations.Nullable;
|
||
import org.springframework.beans.factory.annotation.Autowired;
|
||
import org.springframework.context.annotation.Lazy;
|
||
import org.springframework.scheduling.annotation.Scheduled;
|
||
import org.springframework.web.bind.annotation.RequestMapping;
|
||
import org.springframework.web.bind.annotation.RestController;
|
||
|
||
import javax.annotation.Resource;
|
||
import java.util.*;
|
||
import java.util.function.Function;
|
||
import java.util.stream.Collectors;
|
||
|
||
@Slf4j
|
||
@RestController
|
||
@RequestMapping("xmgl/task")
|
||
public class SewageTask {
|
||
public static final String REDIS_RAIN_ALARM_TASK_START_TIME = "getSewageAlarmTaskStartTime";
|
||
public static final String DATA_VALUE = "dataValue";
|
||
public static final String RECORD_TIME = "recordTime";
|
||
@Resource
|
||
@Lazy
|
||
private ISewageDataService sewageDataService;
|
||
@Resource
|
||
@Lazy
|
||
private ISewageAlarmService sewageAlarmService;
|
||
@Resource
|
||
@Lazy
|
||
private ISewageDevService sewageDevService;
|
||
@Lazy
|
||
@Autowired
|
||
private ProjectMapper projectMapper;
|
||
@Lazy
|
||
@Autowired
|
||
private RedisRepository redisRepository;
|
||
|
||
/**
|
||
* 获取实时数据
|
||
*/
|
||
@Scheduled(cron = "0 */2 * * * ?")
|
||
@SchedulerLock(name = "getSewageRecordTask", lockAtMostFor = 1000 * 60, lockAtLeastFor = 1000 * 60)
|
||
@RequestMapping("getSewageRecordTask")
|
||
public void getSewageRecordTask() {
|
||
List<Project> projectList = projectMapper.selectList(new LambdaQueryWrapper<Project>()
|
||
.ne(Project::getJnrzckAccount, "")
|
||
.ne(Project::getJnrzckPw, "")
|
||
);
|
||
for (Project project : projectList) {
|
||
try {
|
||
saveAllRecord(project);
|
||
} catch (Exception e) {
|
||
log.error("获取污水实时数据错误", e);
|
||
}
|
||
}
|
||
}
|
||
|
||
private void saveAllRecord(Project project) {
|
||
List<SewageDev> devs = sewageDevService.list(new LambdaQueryWrapper<SewageDev>().eq(SewageDev::getProjectSn, project.getProjectSn()));
|
||
if (CollUtil.isEmpty(devs)) {
|
||
return;
|
||
}
|
||
Map<String, SewageDev> devSnMap = devs.stream().collect(Collectors.toMap(SewageDev::getDevSn, Function.identity()));
|
||
JSONArray datas = RenZhiUtil.getRealTimeDataByDeviceAddr(StrUtil.join(",", devs.stream().map(SewageDev::getDevSn).collect(Collectors.toList())), project.getJnrzckAccount(), project.getJnrzckPw());
|
||
if (CollUtil.isEmpty(datas)) {
|
||
log.info("污水获取实时数据为空,项目名称:{}", project.getProjectName());
|
||
return;
|
||
}
|
||
List<SewageData> records = new ArrayList<>();
|
||
for (int i = 0; i < datas.size(); i++) {
|
||
JSONObject dataJo = datas.getJSONObject(i);
|
||
JSONArray dataItemJa = dataJo.getJSONArray("dataItem");
|
||
String deviceAddr = dataJo.getString("deviceAddr");
|
||
//normal:正常
|
||
//alarming:报警
|
||
//preAlarming:预警
|
||
//offline:离线
|
||
String deviceStatus = dataJo.getString("deviceStatus");
|
||
SewageData record = new SewageData();
|
||
SewageDev dev = devSnMap.get(deviceAddr);
|
||
if (CollUtil.isNotEmpty(dataItemJa)) {
|
||
for (int j = 0; j < dataItemJa.size(); j++) {
|
||
JSONObject itemJo = dataItemJa.getJSONObject(j);
|
||
Integer nodeId = itemJo.getInteger("nodeId");
|
||
JSONArray registerItemJa = itemJo.getJSONArray("registerItem");
|
||
if (nodeId == 7) {
|
||
//工业 PH、工业 EC
|
||
record.setPhValue(getDouble(registerItemJa, 0));
|
||
record.setConductivity(getDouble(registerItemJa, 1));
|
||
} else if (nodeId == 8) {
|
||
//水质浊度、水质溶解氧
|
||
record.setTurbidityValue(getDouble(registerItemJa, 0));
|
||
record.setDissolvedOxygen(getDouble(registerItemJa, 1));
|
||
} else if (nodeId == 29) {
|
||
//实时流量、雷达流量计液位高
|
||
record.setWaterLevel(Optional.ofNullable(getDouble(registerItemJa, 1)).map(m -> NumberUtil.div(m.doubleValue(), 1000D, 5)).orElse(null));
|
||
} else if (nodeId == 31) {
|
||
//累计水量、流速
|
||
record.setFlowVelocity(Optional.ofNullable(getDouble(registerItemJa, 0)).map(m -> NumberUtil.div(m.doubleValue(), 100D, 5)).orElse(null));
|
||
}
|
||
// record.setWaterTemperature(getDouble(registerItemJa, 0));
|
||
}
|
||
record.setCreateDate(new Date(dataJo.getLong("timeStamp")));
|
||
record.setDevSn(deviceAddr);
|
||
record.setProjectSn(dev.getProjectSn());
|
||
records.add(record);
|
||
}
|
||
}
|
||
if (CollUtil.isNotEmpty(records)) {
|
||
for (SewageData record : records) {
|
||
sewageDataService.add(record);
|
||
}
|
||
}
|
||
}
|
||
|
||
private String getString(JSONArray registerItemJa, int index) {
|
||
return Optional.ofNullable(registerItemJa.getJSONObject(index)).map(o -> o.getString("data")).orElse(null);
|
||
}
|
||
|
||
@Nullable
|
||
private Double getDouble(JSONArray registerItemJa, int index) {
|
||
return Optional.ofNullable(registerItemJa.getJSONObject(index)).map(o -> o.getDouble("data")).orElse(null);
|
||
}
|
||
|
||
/**
|
||
* 获取报警数据
|
||
*/
|
||
@Scheduled(cron = "0 */6 * * * ?")
|
||
@SchedulerLock(name = "getSewageAlarmTask", lockAtMostFor = 1000 * 60 * 5, lockAtLeastFor = 1000 * 60 * 3)
|
||
@RequestMapping("getSewageAlarmTask")
|
||
public void getRainAlarmTask() {
|
||
List<Project> projectList = projectMapper.selectList(new LambdaQueryWrapper<Project>()
|
||
.ne(Project::getJnrzckAccount, "")
|
||
.ne(Project::getJnrzckPw, "")
|
||
);
|
||
String start;
|
||
Object o = redisRepository.get(REDIS_RAIN_ALARM_TASK_START_TIME);
|
||
if (o != null) {
|
||
start = o.toString();
|
||
} else {
|
||
start = DateUtil.format(DateUtil.offsetMinute(new Date(), -5), "yyyy-MM-dd HH:mm");
|
||
}
|
||
String end = DateUtil.format(new Date(), "yyyy-MM-dd HH:mm");
|
||
redisRepository.set(REDIS_RAIN_ALARM_TASK_START_TIME, end);
|
||
for (Project project : projectList) {
|
||
try {
|
||
saveAllAlarm(project, start, end);
|
||
} catch (Exception e) {
|
||
log.error("获取污水报警数据错误", e);
|
||
}
|
||
}
|
||
|
||
}
|
||
|
||
private void saveAllAlarm(Project project, String start, String end) throws InterruptedException {
|
||
List<SewageDev> devs = sewageDevService.list(new LambdaQueryWrapper<SewageDev>().eq(SewageDev::getProjectSn, project.getProjectSn()));
|
||
if (CollUtil.isEmpty(devs)) {
|
||
return;
|
||
}
|
||
ArrayList<SewageAlarm> alarms = new ArrayList<>();
|
||
String token = RenZhiUtil.getToken(project.getJnrzckAccount(), project.getJnrzckPw());
|
||
for (SewageDev dev : devs) {
|
||
JSONArray dataJa = RenZhiUtil.getAlarmRecordList(dev.getDevSn(), token, -1, start, end);
|
||
Thread.sleep(200);
|
||
if (CollUtil.isEmpty(dataJa)) {
|
||
continue;
|
||
}
|
||
for (int j = 0; j < dataJa.size(); j++) {
|
||
JSONObject jo = dataJa.getJSONObject(j);
|
||
String recordId = jo.getString("recordId");
|
||
SewageAlarm alarm = new SewageAlarm();
|
||
alarm.setDevSn(jo.getString("deviceAddr"));
|
||
alarm.setMonitorParam(jo.getString("factorName"));
|
||
alarm.setMonitorValue(jo.getDouble("dataValue"));
|
||
alarm.setAlarmDetail(getAlarmContent(jo));
|
||
alarm.setAlarmTime(new Date(jo.getLong("recordTime")));
|
||
alarm.setProjectSn(project.getProjectSn());
|
||
alarm.setAlarmType(getAlarmType(jo));
|
||
// alarm.setSewageDataId(0L);
|
||
// alarm.setSewageType(0);
|
||
alarms.add(alarm);
|
||
}
|
||
}
|
||
sewageAlarmService.saveBatch(alarms);
|
||
}
|
||
|
||
private Integer getAlarmType(JSONObject jo) {
|
||
if (!Objects.equals(jo.get("alarmLevel"), 2) && !Objects.equals(jo.get("alarmLevel"), 3)) {
|
||
return 2;
|
||
} else {
|
||
return 1;
|
||
}
|
||
}
|
||
|
||
private String getAlarmContent(JSONObject jo) {
|
||
/*
|
||
alarmLevel int 报警级别:
|
||
25
|
||
1: 报警(超报警上限)
|
||
2: 预警(超预警上限)
|
||
3: 预警(超预警下限)
|
||
4:报警(超报警下限)
|
||
-1: 离线报警
|
||
-2:遥调(开关量)报警
|
||
dataValue Double 报警值
|
||
alarmRange String 报警限值
|
||
*/
|
||
String factorName = jo.getString("factorName");
|
||
Integer alarmLevel = jo.getInteger("alarmLevel");
|
||
if (Objects.equals(alarmLevel, 1) || Objects.equals(alarmLevel, 2) || Objects.equals(alarmLevel, 3) || Objects.equals(alarmLevel, 4)) {
|
||
return StrUtil.format("{} {},报警值:{},报警限值:{}", factorName, getAlarmLevelStr(alarmLevel), jo.getDouble(DATA_VALUE), jo.getString("alarmRange"));
|
||
} else {
|
||
return StrUtil.format("{} {},报警值:{}", factorName, getAlarmLevelStr(alarmLevel), jo.getString(DATA_VALUE));
|
||
}
|
||
}
|
||
|
||
private String getAlarmLevelStr(Integer alarmLevel) {
|
||
String s = "";
|
||
switch (alarmLevel) {
|
||
case 1:
|
||
s = "报警(超报警上限)";
|
||
break;
|
||
case 2:
|
||
s = "预警(超预警上限)";
|
||
break;
|
||
case 3:
|
||
s = "预警(超预警下限)";
|
||
break;
|
||
case 4:
|
||
s = "报警(超报警下限)";
|
||
break;
|
||
case -1:
|
||
s = "离线报警";
|
||
break;
|
||
case -2:
|
||
s = "遥调(开关量)报警";
|
||
break;
|
||
default:
|
||
}
|
||
return s;
|
||
}
|
||
|
||
|
||
}
|