package com.zhgd.xmgl.task; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.date.DateTime; import cn.hutool.core.date.DateUtil; import cn.hutool.core.text.CharSequenceUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.http.HttpRequest; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.google.common.util.concurrent.RateLimiter; import com.zhgd.annotation.OperLog; import com.zhgd.jeecg.common.api.vo.Result; import com.zhgd.xmgl.config.SafetyHatWSClient; import com.zhgd.xmgl.modules.project.entity.Project; import com.zhgd.xmgl.modules.project.entity.vo.ProjectInfoExtVo; import com.zhgd.xmgl.modules.project.service.IProjectService; import com.zhgd.xmgl.modules.safetyhat.entity.SafetyHatData; import com.zhgd.xmgl.modules.safetyhat.entity.SafetyHatDev; import com.zhgd.xmgl.modules.safetyhat.mapper.SafetyHatDataMapper; import com.zhgd.xmgl.modules.safetyhat.mapper.SafetyHatDevMapper; import com.zhgd.xmgl.modules.safetyhat.service.ISafetyHatDataService; import com.zhgd.xmgl.util.RundeSafeyHatUtils; import io.swagger.annotations.ApiImplicitParam; import io.swagger.annotations.ApiImplicitParams; import io.swagger.annotations.ApiOperation; import lombok.extern.slf4j.Slf4j; import net.javacrumbs.shedlock.core.SchedulerLock; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.transaction.annotation.Transactional; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import springfox.documentation.annotations.ApiIgnore; import javax.websocket.DeploymentException; import javax.websocket.WebSocketContainer; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.Date; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; /** * 智能安全帽task */ @Slf4j @RestController @RequestMapping("xmgl/task") @Transactional(rollbackFor = Exception.class) public class SafetyHatTask { public static final String SESSION_ID = "session_id"; @Autowired IProjectService projectService; @Autowired SafetyHatDevMapper safetyHatDevMapper; @Autowired SafetyHatDataMapper safetyHatDataMapper; @Autowired ISafetyHatDataService safetyHatDataService; @Autowired WebSocketContainer webSocketContainer; /** * 获取安全帽心跳 */ @Scheduled(cron = "*/20 * * * * ?") @SchedulerLock(name = "updateHelmetStatus", lockAtMostFor = 1000 * 60 * 2, lockAtLeastFor = 1000 * 10) @RequestMapping("updateHelmetStatus") public void updateHelmetStatus() { try { List projectList = projectService.list(Wrappers.lambdaQuery().ne(Project::getHelmetUser, "").ne(Project::getHelmetPassword, "")); if (CollUtil.isNotEmpty(projectList)) { log.info("定时获取安全帽心跳"); for (Project project : projectList) { log.info("安全帽user:{}", project.getHelmetUser()); SafetyHatWSClient client = SafetyHatWSClient.clientMap.get(project.getHelmetUser()); CompletableFuture.runAsync(() -> { try { if (client == null) { log.info("首次连接安全帽:user:{}", project.getHelmetUser()); connect(project); } else { log.info("获取安全帽心跳,user:{}", project.getHelmetUser()); client.send("{\"act\":\"ma_get_active_devices\"}"); } } catch (IllegalStateException e) { log.info("异常重连:{}", e.getMessage()); connect(project); } }).exceptionally(throwable -> { log.error("err", throwable); return null; }); } } } catch (Exception e) { log.error("err:", e); } } // 设置每秒最大请求数 private static final double MAX_REQUESTS_PER_SECOND = 10.0; // 创建一个RateLimiter实例 private static final RateLimiter rateLimiter = RateLimiter.create(MAX_REQUESTS_PER_SECOND); /** * 定时2分钟获取安全帽数据 */ @Scheduled(cron = "0 */2 * * * ?") @SchedulerLock(name = "getHelmetData", lockAtMostFor = 1000 * 60 * 2, lockAtLeastFor = 1000 * 10) @RequestMapping("getHelmetData") public void getHelmetData() { List projectList = projectService.list(Wrappers.lambdaQuery().ne(Project::getHelmetUser, "").ne(Project::getHelmetPassword, "")); if (CollUtil.isNotEmpty(projectList)) { log.info("定时2分钟获取安全帽数据任务开始"); for (Project project : projectList) { List devList = safetyHatDevMapper.selectList(new LambdaQueryWrapper() .eq(SafetyHatDev::getProjectSn, project.getProjectSn())); for (SafetyHatDev dev : devList) { // 等待从RateLimiter获取权限 // rateLimiter.acquire(); if (StrUtil.isBlank(dev.getExtUserId())) { log.info("定时2分钟获取安全帽数据任务,安全帽外部user_id没有设置,devSn:{}", dev.getDevSn()); continue; } SafetyHatData lastData = safetyHatDataMapper.selectOne(new LambdaQueryWrapper() .eq(SafetyHatData::getDevSn, dev.getDevSn()).orderByDesc(SafetyHatData::getUploadTime).last("limit 1")); String start; if (lastData != null) { start = lastData.getUploadTime().getTime() / 1000L + ""; } else { start = DateUtil.offsetHour(new Date(), -12).getTime() / 1000L + ""; } //轨迹回放 String url = "https://caps.runde.pro/api/index.php?ctl=location&act=get_user_path_web"; JSONObject pJo = new JSONObject(); pJo.put("admin_id", project.getHelmetUser()); pJo.put("user_id", dev.getExtUserId()); String end = System.currentTimeMillis() / 1000L + ""; pJo.put("start", start); pJo.put("end", end); String json = pJo.toJSONString(); log.info("定时2分钟获取安全帽数据任务开始,devSn:{},url:{},json:{}", dev.getDevSn(), url, json); String rs = HttpRequest.post(url) .body(json) .timeout(20000)//超时,毫秒 .execute().body(); log.info("定时2分钟获取安全帽数据任务开始rs,devSn:{},rs:{}", dev.getDevSn(), rs); JSONObject rsJo = JSON.parseObject(rs); if (rsJo.getBoolean("status")) { JSONArray dataJa = rsJo.getJSONArray("data"); if (CollUtil.isEmpty(dataJa)) { continue; } for (int i = 0; i < dataJa.size(); i++) { JSONObject dataJo = dataJa.getJSONObject(i); Double xPoint = dataJo.getDouble("x_point"); Double yPoint = dataJo.getDouble("y_point"); Long time = dataJo.getLong("time"); SafetyHatData data = new SafetyHatData(); data.setWorkerInfoId(dev.getWorkerInfoId()); data.setDevSn(dev.getDevSn()); data.setLatitude(xPoint); data.setLongitude(yPoint); data.setUploadTime(new Date(time * 1000L)); data.setProjectSn(dev.getProjectSn()); data.setIsPlatformData(1); data.setType(dev.getType()); safetyHatDataService.add(data); } } else { log.error("定时2分钟获取安全帽数据任务失败:devSn:{}", dev.getDevSn()); } } } } } // /** // * 定时设置昨天的轨迹到数据库 // */ // @Scheduled(cron = "0 0 3 * * ?") // @SchedulerLock(name = "setYesterdayHelmetData", lockAtMostFor = 1000 * 60 * 2, lockAtLeastFor = 1000 * 10) // @RequestMapping("setYesterdayHelmetData") // public void setYesterdayHelmetData() { // log.info("定时设置昨天的轨迹到数据库任务开始"); // List projectList = projectService.list(Wrappers.lambdaQuery().ne(Project::getHelmetUser, "").ne(Project::getHelmetPassword, "")); // if (CollUtil.isNotEmpty(projectList)) { // for (Project project : projectList) { // List devList = safetyHatDevMapper.selectList(new LambdaQueryWrapper() // .eq(SafetyHatDev::getProjectSn, project.getProjectSn())); // for (SafetyHatDev dev : devList) { // if (StrUtil.isBlank(dev.getExtUserId())) { // log.info("定时设置昨天的轨迹到数据库任务,安全帽外部user_id没有设置,devSn:{}", dev.getDevSn()); // continue; // } // DateTime yB = DateUtil.beginOfDay(DateUtil.offsetDay(new Date(), -1)); // DateTime yE = DateUtil.endOfDay(DateUtil.offsetDay(new Date(), -1)); // String start = yB.getTime() / 1000L + ""; // String end = yE.getTime() / 1000L + ""; // String url = "https://caps.runde.pro/api/index.php?ctl=location&act=get_user_path_web"; // JSONObject pJo = new JSONObject(); // pJo.put("admin_id", project.getHelmetUser()); // pJo.put("user_id", dev.getExtUserId()); // pJo.put("start", start); // pJo.put("end", end); // String json = pJo.toJSONString(); // log.info("定时设置昨天的轨迹到数据库任务开始,devSn:{},url:{},json:{}", dev.getDevSn(), url, json); // String rs = HttpRequest.post(url) // .body(json) // .timeout(20000)//超时,毫秒 // .execute().body(); // log.info("定时设置昨天的轨迹到数据库任务开始rs,devSn:{},rs:{}", dev.getDevSn(), rs); // JSONObject rsJo = JSON.parseObject(rs); // if (rsJo.getBoolean("status")) { // JSONArray dataJa = rsJo.getJSONArray("data"); // //删除昨日的数据 // safetyHatDataMapper.delete(new LambdaQueryWrapper() // .eq(SafetyHatData::getDevSn, dev.getDevSn()) // .eq(SafetyHatData::getIsPlatformData, 1) // .ge(SafetyHatData::getUploadTime, yB) // .le(SafetyHatData::getUploadTime, yE) // ); // if (CollUtil.isEmpty(dataJa)) { // continue; // } // for (int i = 0; i < dataJa.size(); i++) { // JSONObject dataJo = dataJa.getJSONObject(i); // Double xPoint = dataJo.getDouble("x_point"); // Double yPoint = dataJo.getDouble("y_point"); // Long time = dataJo.getLong("time"); // SafetyHatData data = new SafetyHatData(); // data.setWorkerInfoId(dev.getWorkerInfoId()); // data.setDevSn(dev.getDevSn()); // data.setLatitude(xPoint); // data.setLongitude(yPoint); // data.setUploadTime(new Date(time * 1000L)); // data.setProjectSn(dev.getProjectSn()); // data.setIsPlatformData(1); // safetyHatDataMapper.insert(data); // } // } else { // log.error("定时设置昨天的轨迹到数据库任务失败:devSn:{}", dev.getDevSn()); // } // } // // } // } // } /** * 测试发生安全帽数据 */ @RequestMapping("testGetHelmetData") public void testGetHelmetData() { SafetyHatWSClient client = new SafetyHatWSClient("jkbdz"); try { webSocketContainer.connectToServer(client, new URI("wss://caps.runde.pro/wss")); } catch (DeploymentException e) { log.error("error:", e); } catch (IOException e) { log.error("error:", e); } catch (URISyntaxException e) { log.error("error:", e); } //SafetyHatWSClient.clientMap1.put("jkbdz", client); String message = "{\"act\":\"ma_login\",\"user_name\":\"jkbdz\",\"access_token\":\"5c26d999d97009fc4b5d3347f6463c17\"}"; client.send(message); } /** * 连接ws和登录 * * @param project * @throws DeploymentException * @throws IOException * @throws URISyntaxException */ private SafetyHatWSClient connect(Project project) { SafetyHatWSClient client = new SafetyHatWSClient(project.getHelmetUser()); try { webSocketContainer.connectToServer(client, new URI("wss://caps.runde.pro/wss")); } catch (Exception e) { log.error("error:", e); } JSONObject token = RundeSafeyHatUtils.getToken(project.getHelmetUser(), project.getHelmetPassword()); if (token != null && CharSequenceUtil.isNotBlank(token.getString(SESSION_ID))) { String sessionId = token.getString(SESSION_ID); project.setRundeToken(sessionId); String message = "{\"act\":\"ma_login\",\"user_name\":\"" + project.getHelmetUser() + "\",\"access_token\":\"" + sessionId + "\"}"; client.send(message); log.info("登录安全帽user:{},ms:{}", project.getHelmetUser(), message); SafetyHatWSClient.clientMap.put(project.getHelmetUser(), client); } return client; } @OperLog(operModul = "智能安全帽设备管理", operType = "查询", operDesc = "文字转语音") @ApiOperation(value = "文字转语音并发送", notes = "文字转语音", httpMethod = "POST") @ApiImplicitParams({ @ApiImplicitParam(name = "projectSn", value = "项目sn", paramType = "query", required = true, dataType = "String"), @ApiImplicitParam(name = "content", value = "文字", paramType = "query", required = true, dataType = "String") }) @PostMapping(value = "/textToAudio") public void textToAudio(@ApiIgnore @RequestBody Map param) { ProjectInfoExtVo project = projectService.getProjectInfoBySn(MapUtils.getString(param, "projectSn")); JSONObject token = RundeSafeyHatUtils.getToken(project.getHelmetUser(), project.getHelmetPassword()); String content = MapUtils.getString(param, "content"); JSONObject pJo = new JSONObject(); pJo.put("token", token.getString("token")); pJo.put("content", content); String json = pJo.toJSONString(); String rs = HttpRequest.post("https://caps.runde.pro/api/index.php?ctl=aibroadcast&act=send_broadcast_to_cat") .body(json) .timeout(20000)//超时,毫秒 .execute().body(); String id = null; if (StringUtils.isNotBlank(rs)) { JSONObject jsonObject = JSONObject.parseObject(rs); id = jsonObject.getJSONObject("data").getString("message"); } SafetyHatWSClient client = SafetyHatWSClient.clientMap.get(project.getHelmetUser()); String userId = MapUtils.getString(param, "userId"); String message = "{\"act\":\"ma_sending_message\",\"message\":\"" + id + "\",\"user_id\":\"" + userId + "\"}"; client.send(message); log.info("安全帽发送消息:{},content:{}", userId, content); } @OperLog(operModul = "智能安全帽设备管理", operType = "查询", operDesc = "文字转语音") @ApiOperation(value = "查询历史记录", notes = "查询历史记录", httpMethod = "POST") @ApiImplicitParams({ @ApiImplicitParam(name = "projectSn", value = "项目sn", paramType = "query", required = true, dataType = "String"), @ApiImplicitParam(name = "userId", value = "用户ID", paramType = "query", required = true, dataType = "String") }) @PostMapping(value = "/messageRecord") public Result messageRecord(@ApiIgnore @RequestBody Map param) { ProjectInfoExtVo project = projectService.getProjectInfoBySn(MapUtils.getString(param, "projectSn")); JSONObject token = RundeSafeyHatUtils.getToken(project.getHelmetUser(), project.getHelmetPassword()); String userId = MapUtils.getString(param, "userId"); JSONObject pJo = new JSONObject(); pJo.put("admin_id", "13997"); pJo.put("user_id", userId); pJo.put("act", "get_user_msg_list"); pJo.put("ctl", "msg"); String json = pJo.toJSONString(); String rs = HttpRequest.post("https://caps.runde.pro/api/index.php") .header("authentication", token.getString(SESSION_ID)) .body(json) .timeout(20000)//超时,毫秒 .execute().body(); JSONArray jsonArray = new JSONArray(); if (StringUtils.isNotBlank(rs)) { JSONObject jsonObject = JSONObject.parseObject(rs); jsonArray = jsonObject.getJSONObject("data").getJSONArray("list"); } return Result.success(jsonArray); } }