From e17c8172a492105bbddab4073872f997f0d573b6 Mon Sep 17 00:00:00 2001 From: guoshengxiong <1923636941@qq.com> Date: Fri, 30 Aug 2024 15:48:48 +0800 Subject: [PATCH] =?UTF-8?q?=E9=9E=8D=E9=92=A2=E5=A4=9A=E6=AC=A1=E9=87=8D?= =?UTF-8?q?=E8=AF=95=E7=BD=91=E7=BB=9C=E5=BC=82=E5=B8=B8=E8=AF=B7=E6=B1=82?= =?UTF-8?q?=E8=B6=85=E6=97=B6=E7=9A=84=EF=BC=8C=E8=87=AA=E5=8A=A8=E9=A9=B3?= =?UTF-8?q?=E5=9B=9E=E5=AE=A1=E6=89=B9=EF=BC=8C=E5=81=9A=E5=A5=BD=E5=BC=82?= =?UTF-8?q?=E5=B8=B8=E6=8D=95=E8=8E=B7=E6=97=A5=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../wflow/exception/BusinessException.java | 10 +- .../config/listener/GlobalTaskListener.java | 18 ++- .../workflow/execute/ListenerExecutor.java | 49 ++++--- .../service/impl/ProcessTaskServiceImpl.java | 124 ++++++++++-------- 4 files changed, 116 insertions(+), 85 deletions(-) diff --git a/src/main/java/com/wflow/exception/BusinessException.java b/src/main/java/com/wflow/exception/BusinessException.java index b9920a8..aa61bec 100644 --- a/src/main/java/com/wflow/exception/BusinessException.java +++ b/src/main/java/com/wflow/exception/BusinessException.java @@ -1,17 +1,23 @@ package com.wflow.exception; -import lombok.Getter; +import lombok.Data; /** * @author : willian fu * @date : 2022/6/27 */ -@Getter +@Data public class BusinessException extends RuntimeException { + private Integer code; public BusinessException(String message) { super(message); } + + public BusinessException(Integer code, String message) { + super(message); + this.code = code; + } } diff --git a/src/main/java/com/wflow/workflow/config/listener/GlobalTaskListener.java b/src/main/java/com/wflow/workflow/config/listener/GlobalTaskListener.java index 71a16e8..d8142b8 100644 --- a/src/main/java/com/wflow/workflow/config/listener/GlobalTaskListener.java +++ b/src/main/java/com/wflow/workflow/config/listener/GlobalTaskListener.java @@ -1,7 +1,6 @@ package com.wflow.workflow.config.listener; import cn.hutool.core.util.StrUtil; -import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.wflow.exception.BusinessException; import com.wflow.workflow.bean.dto.NotifyDto; @@ -10,7 +9,6 @@ import com.wflow.workflow.service.NotifyService; import lombok.extern.slf4j.Slf4j; import org.flowable.bpmn.model.EndEvent; import org.flowable.common.engine.api.delegate.event.FlowableEngineEntityEvent; -import org.flowable.engine.HistoryService; import org.flowable.engine.RuntimeService; import org.flowable.engine.delegate.event.*; import org.flowable.engine.runtime.ProcessInstance; @@ -50,7 +48,7 @@ public class GlobalTaskListener extends AbstractFlowableEngineEventListener { @Override protected void activityStarted(FlowableActivityEvent event) { - log.info("流程[{}]进入ID[{}]的[{}]节点", event.getProcessInstanceId(), event.getActivityId(), event.getActivityName()); + log.info("流程[{}]进入ID[{}]的[{}]节点", event.getProcessInstanceId(), event.getActivityId(), event.getActivityName()); listenerExecutor.doProcessNodeChangeHandler("enter", event.getProcessInstanceId(), event.getProcessDefinitionId(), event.getActivityId(), event.getActivityType()); super.activityStarted(event); @@ -70,7 +68,7 @@ public class GlobalTaskListener extends AbstractFlowableEngineEventListener { @Override protected void multiInstanceActivityStarted(FlowableMultiInstanceActivityEvent event) { - log.info("流程[{}]进入ID[{}]的[{}]节点", event.getProcessInstanceId(), event.getActivityId(), event.getActivityName()); + log.info("流程[{}]进入ID[{}]的[{}]节点", event.getProcessInstanceId(), event.getActivityId(), event.getActivityName()); listenerExecutor.doProcessNodeChangeHandler("enter", event.getProcessInstanceId(), event.getProcessDefinitionId(), event.getActivityId(), event.getActivityType()); super.multiInstanceActivityStarted(event); @@ -88,7 +86,7 @@ public class GlobalTaskListener extends AbstractFlowableEngineEventListener { if (pass != null) { JSONObject obj = JSONObject.parseObject(pass.toString()); if (obj.getJSONObject("data").getInteger("code") != 200) { - throw new BusinessException(obj.getJSONObject("data").getString("message")); + throw new BusinessException(obj.getJSONObject("data").getInteger("code"), obj.getJSONObject("data").getString("message")); } } ProcessInstance instance = runtimeService.createProcessInstanceQuery() @@ -111,11 +109,11 @@ public class GlobalTaskListener extends AbstractFlowableEngineEventListener { @Override protected void processCompletedWithTerminateEnd(FlowableEngineEntityEvent event) { //通过判断流程实例的endActivityId = refuse-end / cancel-end 判断是撤销还是驳回 - if (event instanceof FlowableProcessTerminatedEvent){ + if (event instanceof FlowableProcessTerminatedEvent) { Object cause = ((FlowableProcessTerminatedEvent) event).getCause(); - if (cause instanceof EndEvent){ + if (cause instanceof EndEvent) { String endNode = ((EndEvent) cause).getId(); - if ("refuse-end".equals(endNode)){ + if ("refuse-end".equals(endNode)) { log.debug("监听到流程[{}]被驳回", event.getProcessInstanceId()); listenerExecutor.doProcessChangeHandler("refuse", event.getProcessInstanceId(), event.getProcessDefinitionId()); } else if ("cancel-end".equals(endNode)) { @@ -127,8 +125,8 @@ public class GlobalTaskListener extends AbstractFlowableEngineEventListener { super.processCompletedWithTerminateEnd(event); } - private void processLeaveNodeEventHandler(FlowableActivityEvent event){ - log.info("流程[{}]离开ID[{}]的[{}]节点", event.getProcessInstanceId(), event.getActivityId(), event.getActivityName()); + private void processLeaveNodeEventHandler(FlowableActivityEvent event) { + log.info("流程[{}]离开ID[{}]的[{}]节点", event.getProcessInstanceId(), event.getActivityId(), event.getActivityName()); listenerExecutor.doProcessNodeChangeHandler("leave", event.getProcessInstanceId(), event.getProcessDefinitionId(), event.getActivityId(), event.getActivityType()); } diff --git a/src/main/java/com/wflow/workflow/execute/ListenerExecutor.java b/src/main/java/com/wflow/workflow/execute/ListenerExecutor.java index 725dd9e..748c2e9 100644 --- a/src/main/java/com/wflow/workflow/execute/ListenerExecutor.java +++ b/src/main/java/com/wflow/workflow/execute/ListenerExecutor.java @@ -2,28 +2,24 @@ package com.wflow.workflow.execute; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.collection.CollectionUtil; -import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; +import cn.hutool.http.HttpException; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.wflow.bean.entity.WflowModelHistorys; import com.wflow.mapper.WflowModelHistorysMapper; import com.wflow.workflow.UELTools; -import com.wflow.workflow.bean.dto.ProcessInstanceOwnerDto; import com.wflow.workflow.bean.process.HttpDefinition; import com.wflow.workflow.bean.process.props.ApprovalProps; import com.wflow.workflow.bean.process.props.RootProps; import com.wflow.workflow.config.WflowGlobalVarDef; import lombok.extern.slf4j.Slf4j; -import org.flowable.engine.RuntimeService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import java.util.HashSet; import java.util.Map; import java.util.Objects; -import java.util.Set; import java.util.concurrent.ExecutorService; /** @@ -43,43 +39,60 @@ public class ListenerExecutor { @Autowired private ExecutorService executorService; - public Object doProcessChangeHandler(String event, String instanceId, String defId){ + public Object doProcessChangeHandler(String event, String instanceId, String defId) { try { final Object[] result = {null}; WflowModelHistorys selected = modelHistorysMapper.selectOne( new LambdaQueryWrapper() .select(WflowModelHistorys::getProcessConfig) .eq(WflowModelHistorys::getProcessDefId, defId)); - if (Objects.nonNull(selected) && StrUtil.isNotBlank(selected.getProcessConfig())){ + if (Objects.nonNull(selected) && StrUtil.isNotBlank(selected.getProcessConfig())) { Map contextVar = uelTools.getContextVar(instanceId, defId); JSONObject parsed = JSONObject.parseObject(selected.getProcessConfig()); JSONObject listener = parsed.getJSONObject("listener"); - if (Objects.nonNull(listener)){ + if (Objects.nonNull(listener)) { listener.getJSONArray(event).toJavaList(JSONObject.class).forEach(object -> { String actionType = object.getString("actionType"); - switch (actionType){ + switch (actionType) { case "JS": - new JsExecute(executorService).executeVoid("action", "function action(ctx){"+ object.getString("js") +"}" , contextVar); + new JsExecute(executorService).executeVoid("action", "function action(ctx){" + object.getString("js") + "}", contextVar); break; case "JAVA": new ElExecute().execute(object.getString("java"), contextVar, Object.class); break; case "HTTP": - result[0] = new HttpExecute().execute(object.getObject("http", HttpDefinition.class), executorService, Object.class, contextVar); + for (int i = 0; i < 3; i++) { + try { + result[0] = new HttpExecute().execute(object.getObject("http", HttpDefinition.class), executorService, Object.class, contextVar); + break; + } catch (HttpException e) { + if (i == 2) { + throw e; + } + } + } break; } }); } } return result[0]; + } catch (HttpException e) { + log.warn("流程实例[{}]的[{}]事件触发失败:{}", instanceId, event, e.getMessage()); + JSONObject jo = new JSONObject(); + JSONObject jo1 = new JSONObject(); + jo1.put("code", 5001); + jo1.put("message", "网络超时"); + jo.put("data", jo1); + return jo; } catch (Exception e) { log.warn("流程实例[{}]的[{}]事件触发失败:{}", instanceId, event, e.getMessage()); + return null; } - return null; } - public void doProcessNodeChangeHandler(String event, String instanceId, String defId, String nodeId, String actType){ - if (!"userTask".equals(actType)){ + public void doProcessNodeChangeHandler(String event, String instanceId, String defId, String nodeId, String actType) { + if (!"userTask".equals(actType)) { return; //过滤其他非必要节点 } try { @@ -87,17 +100,17 @@ public class ListenerExecutor { Map map = (Map) contextVar.get(WflowGlobalVarDef.WFLOW_NODE_PROPS); Object nodeProps = map.get(nodeId); Map nodeLis = null; - if (nodeProps instanceof ApprovalProps){ + if (nodeProps instanceof ApprovalProps) { nodeLis = ((ApprovalProps) nodeProps).getListeners(); } else if (nodeProps instanceof RootProps) { nodeLis = ((RootProps) nodeProps).getListeners(); } - if (Objects.nonNull(nodeLis) && CollectionUtil.isNotEmpty(nodeLis)){ + if (Objects.nonNull(nodeLis) && CollectionUtil.isNotEmpty(nodeLis)) { nodeLis.get(event).toJavaList(JSONObject.class).forEach(object -> { String actionType = object.getString("actionType"); - switch (actionType){ + switch (actionType) { case "JS": - new JsExecute(executorService).executeVoid("action","function action(ctx){"+ object.get("js") +"}", contextVar); + new JsExecute(executorService).executeVoid("action", "function action(ctx){" + object.get("js") + "}", contextVar); break; case "JAVA": new ElExecute().execute(object.getString("java"), contextVar, Object.class); diff --git a/src/main/java/com/wflow/workflow/service/impl/ProcessTaskServiceImpl.java b/src/main/java/com/wflow/workflow/service/impl/ProcessTaskServiceImpl.java index 986833e..518d664 100644 --- a/src/main/java/com/wflow/workflow/service/impl/ProcessTaskServiceImpl.java +++ b/src/main/java/com/wflow/workflow/service/impl/ProcessTaskServiceImpl.java @@ -1,4 +1,5 @@ package com.wflow.workflow.service.impl; +import java.util.Date; import cn.hutool.cache.CacheUtil; import cn.hutool.cache.impl.TimedCache; @@ -7,6 +8,8 @@ import cn.hutool.core.collection.ConcurrentHashSet; import cn.hutool.core.date.DateUtil; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; +import cn.hutool.http.HttpRequest; +import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; @@ -26,8 +29,10 @@ import com.wflow.workflow.bean.process.props.CcProps; import com.wflow.workflow.bean.vo.*; import com.wflow.workflow.config.WflowGlobalVarDef; import com.wflow.workflow.extension.cmd.RecallToHisApprovalNodeCmd; -import com.wflow.workflow.service.*; -import com.wflow.workflow.service.FormService; +import com.wflow.workflow.service.BusinessDataStorageService; +import com.wflow.workflow.service.NotifyService; +import com.wflow.workflow.service.ProcessTaskService; +import com.wflow.workflow.service.UserDeptOrLeaderService; import com.wflow.workflow.utils.FlowableUtils; import com.zhgd.xmgl.tenant.TenantContextHolder; import lombok.extern.slf4j.Slf4j; @@ -38,24 +43,21 @@ import org.flowable.common.engine.impl.identity.Authentication; import org.flowable.engine.*; import org.flowable.engine.delegate.DelegateExecution; import org.flowable.engine.history.HistoricProcessInstance; -import org.flowable.engine.history.HistoricProcessInstanceQuery; import org.flowable.engine.impl.persistence.entity.ExecutionEntity; import org.flowable.engine.impl.util.ExecutionGraphUtil; -import org.flowable.engine.repository.ProcessDefinition; import org.flowable.engine.runtime.ActivityInstance; import org.flowable.engine.runtime.Execution; import org.flowable.engine.runtime.ProcessInstance; import org.flowable.engine.runtime.ProcessInstanceQuery; import org.flowable.task.api.Task; -import org.flowable.task.api.TaskInfo; import org.flowable.task.api.TaskQuery; import org.flowable.task.api.history.HistoricTaskInstance; import org.flowable.task.service.history.NativeHistoricTaskInstanceQuery; import org.flowable.variable.api.history.HistoricVariableInstance; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; -import org.springframework.transaction.interceptor.TransactionAspectSupport; import java.util.*; import java.util.stream.Collectors; @@ -67,43 +69,8 @@ import java.util.stream.Collectors; @Slf4j @Service("processTaskService") public class ProcessTaskServiceImpl implements ProcessTaskService { - - @Autowired - private TaskService taskService; - - @Autowired - private RuntimeService runtimeService; - - @Autowired - private NotifyService notifyService; - - @Autowired - private BusinessDataStorageService businessDataService; - - @Autowired - private UserDeptOrLeaderService userDeptOrLeaderService; - - @Autowired - private OrgRepositoryService orgRepositoryService; - - @Autowired - private HistoryService historyService; - - @Autowired - private RepositoryService repositoryService; - - @Autowired - private ManagementService managementService; - - @Autowired - private WflowModelHistorysMapper historysMapper; - - @Autowired - private WflowModelsMapper wflowModelsMapper; - //超时缓存,数据缓存20秒,用来存储审批人防止flowable高频调用 private static final TimedCache> taskCache = CacheUtil.newTimedCache(20000); - //用来存储正在处理的节点,防止并发处理 private static final Set HANDLER_NODE_LOCK = new ConcurrentHashSet<>(); @@ -111,6 +78,31 @@ public class ProcessTaskServiceImpl implements ProcessTaskService { taskCache.schedulePrune(10000); } + @Value("${server.port:}") + private String serverPort; + @Autowired + private TaskService taskService; + @Autowired + private RuntimeService runtimeService; + @Autowired + private NotifyService notifyService; + @Autowired + private BusinessDataStorageService businessDataService; + @Autowired + private UserDeptOrLeaderService userDeptOrLeaderService; + @Autowired + private OrgRepositoryService orgRepositoryService; + @Autowired + private HistoryService historyService; + @Autowired + private RepositoryService repositoryService; + @Autowired + private ManagementService managementService; + @Autowired + private WflowModelHistorysMapper historysMapper; + @Autowired + private WflowModelsMapper wflowModelsMapper; + @Override public Page getUserTodoList(Integer pageSize, Integer pageNo, String code, String[] startTimes, String startUser, String key) { String userId = UserUtil.getLoginUserId(); @@ -266,17 +258,17 @@ public class ProcessTaskServiceImpl implements ProcessTaskService { //批量获取所有任务的发起部门信息 Map startDept = FlowableUtils.getProcessVars(instanceMap.keySet(), "startDept"); List taskVos = taskInstances.stream().map(task -> { - HistoricProcessInstance instance = instanceMap.get(task.getProcessInstanceId()); - HistoricVariableInstance variableInstance = historyService.createHistoricVariableInstanceQuery() - .processInstanceId(instance.getId()).variableName("approve_" + task.getId()).singleResult(); - if (StringUtils.isNotBlank(result)) { - System.out.println("11111" + variableInstance.getValue()); - if (!variableInstance.getValue().toString().equals(result)) { - return null; - } + HistoricProcessInstance instance = instanceMap.get(task.getProcessInstanceId()); + HistoricVariableInstance variableInstance = historyService.createHistoricVariableInstanceQuery() + .processInstanceId(instance.getId()).variableName("approve_" + task.getId()).singleResult(); + if (StringUtils.isNotBlank(result)) { + System.out.println("11111" + variableInstance.getValue()); + if (!variableInstance.getValue().toString().equals(result)) { + return null; } - //构造用户id -> 部门id - staterUsers.add(instance.getStartUserId() + "_" + startDept.getOrDefault(instance.getId(), + } + //构造用户id -> 部门id + staterUsers.add(instance.getStartUserId() + "_" + startDept.getOrDefault(instance.getId(), //如果没有就从流程变量 owner 里取(之前是存owner变量) FlowableUtils.getOwnerDept(instance.getId(), false))); return ProcessTaskVo.builder() @@ -357,7 +349,7 @@ public class ProcessTaskServiceImpl implements ProcessTaskService { if (ProcessHandlerParamsVo.Action.cancel.equals(params.getAction())) { params.getComment().setText("撤销:" + params.getComment().getText()); } - if (!(isComment || putComment)){ + if (!(isComment || putComment)) { throw new BusinessException("任务不存在,请刷新数据"); } taskService.addComment(params.getTaskId(), params.getInstanceId(), JSONObject.toJSONString(params.getComment())); @@ -420,11 +412,11 @@ public class ProcessTaskServiceImpl implements ProcessTaskService { //throw new BusinessException("暂不支持单独评论"); break; } - }catch (Exception e) { + } catch (Exception e) { log.error("处理任务异常", e); // TransactionAspectSupport.currentTransactionStatus().setRollbackOnly(); throw new BusinessException(e.getMessage()); - }finally { + } finally { //移除锁 HANDLER_NODE_LOCK.remove(lockKey); } @@ -650,7 +642,29 @@ public class ProcessTaskServiceImpl implements ProcessTaskService { if (StrUtil.isNotBlank(params.getSignature())) { var.put("sign_" + task.getId(), params.getSignature()); } - taskService.complete(params.getTaskId(), var); + try { + taskService.complete(params.getTaskId(), var); + } catch (BusinessException e) { + if (Objects.equals(e.getCode(), 5001)) { + log.error("", e); + String url = "http://127.0.0.1:" + serverPort + "/xmgl/flowExceptionLog/flow/add"; + JSONObject jo = new JSONObject(); + jo.put("instanceId", task.getProcessInstanceId()); + String body = jo.toJSONString(); + log.info("工作流提交网络超时插入日志的url:{},body:{}",url,body); + String result2 = HttpRequest.post(url) + .body(body)//表单内容 + .timeout(20000)//超时,毫秒 + .execute().body(); + log.info("工作流提交网络超时插入日志的返回结果:{}", result2); + //网络超时自动驳回 + params.setAction(ProcessHandlerParamsVo.Action.refuse); + var.put("approve_" + task.getId(), params.getAction()); + taskService.complete(params.getTaskId(), var); + } else { + throw e; + } + } } private void doRecallTask(Task task, String userId, ProcessHandlerParamsVo params) {