From b1b6077840b21d774cc287030e5d0a52673b63da Mon Sep 17 00:00:00 2001 From: guo Date: Mon, 26 Feb 2024 10:34:27 +0800 Subject: [PATCH] =?UTF-8?q?bug=E4=BF=AE=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/zhgd/netty/tcp/location/BCD.java | 170 ++++++++++++++++++ .../zhgd/netty/tcp/location/BaseHandler.java | 67 +++++++ .../netty/tcp/location/CommonResponse.java | 52 ++++++ .../com/zhgd/netty/tcp/location/Const.java | 123 +++++++++++++ .../zhgd/netty/tcp/location/DataPacket.java | 113 ++++++++++++ .../tcp/location/EventLoopGroupConfig.java | 58 ++++++ .../zhgd/netty/tcp/location/JT808Const.java | 34 ++++ .../com/zhgd/netty/tcp/location/Location.java | 25 +++ .../netty/tcp/location/LocationMessage.java | 55 ++++++ .../tcp/location/LocationMessageHandler.java | 65 +++++++ .../netty/tcp/location/MessageDecoder.java | 104 +++++++++++ .../netty/tcp/location/MessageEncoder.java | 82 +++++++++ .../tcp/location/TcpNettyServerJT808.java | 79 ++++++++ ...iAnalyseHardWareAlarmRecordController.java | 1 + .../AiAnalyseHardWareAlarmRecordMapper.xml | 11 +- 15 files changed, 1037 insertions(+), 2 deletions(-) create mode 100644 src/main/java/com/zhgd/netty/tcp/location/BCD.java create mode 100644 src/main/java/com/zhgd/netty/tcp/location/BaseHandler.java create mode 100644 src/main/java/com/zhgd/netty/tcp/location/CommonResponse.java create mode 100644 src/main/java/com/zhgd/netty/tcp/location/Const.java create mode 100644 src/main/java/com/zhgd/netty/tcp/location/DataPacket.java create mode 100644 src/main/java/com/zhgd/netty/tcp/location/EventLoopGroupConfig.java create mode 100644 src/main/java/com/zhgd/netty/tcp/location/JT808Const.java create mode 100644 src/main/java/com/zhgd/netty/tcp/location/Location.java create mode 100644 src/main/java/com/zhgd/netty/tcp/location/LocationMessage.java create mode 100644 src/main/java/com/zhgd/netty/tcp/location/LocationMessageHandler.java create mode 100644 src/main/java/com/zhgd/netty/tcp/location/MessageDecoder.java create mode 100644 src/main/java/com/zhgd/netty/tcp/location/MessageEncoder.java create mode 100644 src/main/java/com/zhgd/netty/tcp/location/TcpNettyServerJT808.java diff --git a/src/main/java/com/zhgd/netty/tcp/location/BCD.java b/src/main/java/com/zhgd/netty/tcp/location/BCD.java new file mode 100644 index 000000000..5f5d56097 --- /dev/null +++ b/src/main/java/com/zhgd/netty/tcp/location/BCD.java @@ -0,0 +1,170 @@ +package com.zhgd.netty.tcp.location; + +import io.netty.buffer.ByteBuf; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class BCD { + + public static long toLong(byte[] value) { + long result = 0; + int len = value.length; + int temp; + for (int i = 0; i < len; i++) { + temp = (len - 1 - i) * 8; + if (temp == 0) { + result += (value[i] & 0x0ff); + } else { + result += (value[i] & 0x0ff) << temp; + } + } + return result; + } + + public static byte[] longToBytes(long value, int len) { + byte[] result = new byte[len]; + int temp; + for (int i = 0; i < len; i++) { + temp = (len - 1 - i) * 8; + if (temp == 0) { + result[i] += (value & 0x0ff); + } else { + result[i] += (value >>> temp) & 0x0ff; + } + } + return result; + } + + public static byte[] DecimalToBCD(long num) { + int digits = 0; + long temp = num; + while (temp != 0) { + digits++; + temp /= 10; + } + int byteLen = digits % 2 == 0 ? digits / 2 : (digits + 1) / 2; + byte bcd[] = new byte[byteLen]; + for (int i = 0; i < digits; i++) { + byte tmp = (byte) (num % 10); + if (i % 2 == 0) { + bcd[i / 2] = tmp; + } else { + bcd[i / 2] |= (byte) (tmp << 4); + } + num /= 10; + } + for (int i = 0; i < byteLen / 2; i++) { + byte tmp = bcd[i]; + bcd[i] = bcd[byteLen - i - 1]; + bcd[byteLen - i - 1] = tmp; + } + return bcd; + } + + public static long toDecimal(byte[] bcd) { + return Long.valueOf(BCD.toString(bcd)); + } + + public static int toInteger(byte[] bcd) { + return Integer.parseInt(BCD.toString(bcd)); + } + + public static String toString(byte bcd) { + StringBuffer sb = new StringBuffer(); + byte high = (byte) (bcd & 0xf0); + high >>>= (byte) 4; + high = (byte) (high & 0x0f); + byte low = (byte) (bcd & 0x0f); + sb.append(high); + sb.append(low); + return sb.toString(); + } + + public static String toString(byte[] bcd) { + StringBuffer sb = new StringBuffer(); + + for (int i = 0; i < bcd.length; i++) { + sb.append(toString(bcd[i])); + } + + return sb.toString(); + } + + private static final String HEX = "0123456789ABCDEF"; + + private static byte toByte(char c) { + byte b = (byte) HEX.indexOf(c); + return b; + } + + public static byte[] toBcdBytes(String hex) { + int len = (hex.length() / 2); + byte[] result = new byte[len]; + char[] achar = hex.toCharArray(); + for (int i = 0; i < len; i++) { + int pos = i * 2; + result[i] = (byte) (toByte(achar[pos]) << 4 | toByte(achar[pos + 1])); + } + return result; + } + + public static String toBcdDateString(byte[] bs) { + if (bs.length != 3 && bs.length != 4) { + log.error("无效BCD日期"); + return "0000-00-00"; + } + StringBuffer sb = new StringBuffer(); + int i = 0; + if (bs.length == 3) { + sb.append("20"); + } else { + sb.append(BCD.toString(bs[i++])); + } + sb.append(BCD.toString(bs[i++])); + sb.append("-").append(BCD.toString(bs[i++])); + sb.append("-").append(BCD.toString(bs[i++])); + return sb.toString(); + } + + public static String toBcdTimeString(byte[] bs) { + if (bs.length != 6 && bs.length != 7) { + log.error("无效BCD时间"); + return "0000-00-00 00:00:00"; + } + StringBuffer sb = new StringBuffer(); + int i = 0; + if (bs.length == 6) { + sb.append("20"); + } else { + sb.append(BCD.toString(bs[i++])); + } + sb.append(BCD.toString(bs[i++])); + sb.append("-").append(BCD.toString(bs[i++])); + sb.append("-").append(BCD.toString(bs[i++])); + sb.append(" ").append(BCD.toString(bs[i++])); + sb.append(":").append(BCD.toString(bs[i++])); + sb.append(":").append(BCD.toString(bs[i])); + return sb.toString(); + } + + /** + * 根据byteBuf的readerIndex和writerIndex计算校验码 + * 校验码规则:从消息头开始,同后一字节异或,直到校验码前一个字节,占用 1 个字节 + * + * @param byteBuf + * @return + */ + public static byte XorSumBytes(ByteBuf byteBuf) { + byte sum = byteBuf.getByte(byteBuf.readerIndex()); + for (int i = byteBuf.readerIndex() + 1; i < byteBuf.writerIndex(); i++) { + sum = (byte) (sum ^ byteBuf.getByte(i)); + } + return sum; + } + + //取num字节的第几位 + public static int getBit(int num, int i) { + return ((num & (1 << i)) != 0) ? 1 : 0;//true 表示第i位为1,否则为0 + + } +} diff --git a/src/main/java/com/zhgd/netty/tcp/location/BaseHandler.java b/src/main/java/com/zhgd/netty/tcp/location/BaseHandler.java new file mode 100644 index 000000000..4156485de --- /dev/null +++ b/src/main/java/com/zhgd/netty/tcp/location/BaseHandler.java @@ -0,0 +1,67 @@ +package com.zhgd.netty.tcp.location; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.util.Attribute; +import io.netty.util.AttributeKey; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public abstract class BaseHandler extends SimpleChannelInboundHandler { + + //消息流水号 + private static final AttributeKey SERIAL_NUMBER = AttributeKey.newInstance("serialNumber"); + + /** + * 递增获取流水号 + * + * @return + */ + public short getSerialNumber(Channel channel) { + Attribute flowIdAttr = channel.attr(SERIAL_NUMBER); + Short flowId = flowIdAttr.get(); + if (flowId == null) { + flowId = 0; + } else { + flowId++; + } + flowIdAttr.set(flowId); + return flowId; + } + + public void write(ChannelHandlerContext ctx, DataPacket msg) { + ctx.writeAndFlush(msg).addListener(future -> { + if (!future.isSuccess()) { + log.error("发送失败", future.cause()); + } + }); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + log.error("exceptionCaught", cause); + ctx.close(); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + //此实例项目只设置了读取超时时间,可以通过state分别做处理,一般服务端在这里关闭连接节省资源,客户端发送心跳维持连接 + IdleState state = ((IdleStateEvent) evt).state(); + if (state == IdleState.READER_IDLE) { + log.warn("客户端{}读取超时,关闭连接", ctx.channel().remoteAddress()); + ctx.close(); + } else if (state == IdleState.WRITER_IDLE) { + log.warn("客户端{}写入超时", ctx.channel().remoteAddress()); + } else if (state == IdleState.ALL_IDLE) { + log.warn("客户端{}读取写入超时", ctx.channel().remoteAddress()); + } + } else { + super.userEventTriggered(ctx, evt); + } + } + +} diff --git a/src/main/java/com/zhgd/netty/tcp/location/CommonResponse.java b/src/main/java/com/zhgd/netty/tcp/location/CommonResponse.java new file mode 100644 index 000000000..9841a161f --- /dev/null +++ b/src/main/java/com/zhgd/netty/tcp/location/CommonResponse.java @@ -0,0 +1,52 @@ +package com.zhgd.netty.tcp.location; + +import io.netty.buffer.ByteBuf; +import lombok.Data; + +@Data +public class CommonResponse extends DataPacket { + + public static final byte SUCCESS = 0;//成功/确认 + public static final byte FAILURE = 1;//失败 + public static final byte MSG_ERROR = 2;//消息有误 + public static final byte UNSUPPORTED = 3;//不支持 + public static final byte ALARM_PROCESS_ACK = 4;//报警处理确认 + + private short replyFlowId; //应答流水号 2字节 + private short replyId; //应答 ID 2字节 + private byte result; //结果 1字节 + + public CommonResponse() { + this.getHeader().setMsgId(Const.SERVER_RESP_COMMON); + } + + @Override + public ByteBuf toByteBufMsg() { + ByteBuf bb = super.toByteBufMsg(); + bb.writeShort(replyFlowId); + bb.writeShort(replyId); + bb.writeByte(result); + return bb; + } + + public static CommonResponse success(DataPacket msg, short flowId) { + CommonResponse resp = new CommonResponse(); + resp.getHeader().setTerminalPhone(msg.getHeader().getTerminalPhone()); + resp.getHeader().setFlowId(flowId); + resp.setReplyFlowId(msg.getHeader().getFlowId()); + resp.setReplyId(msg.getHeader().getMsgId()); + resp.setResult(SUCCESS); + return resp; + } + + public static CommonResponse success(DataPacket msg, short flowId, byte result) { + CommonResponse resp = new CommonResponse(); + resp.getHeader().setTerminalPhone(msg.getHeader().getTerminalPhone()); + resp.getHeader().setFlowId(flowId); + resp.setReplyFlowId(msg.getHeader().getFlowId()); + resp.setReplyId(msg.getHeader().getMsgId()); + resp.setResult(result); + return resp; + } + +} diff --git a/src/main/java/com/zhgd/netty/tcp/location/Const.java b/src/main/java/com/zhgd/netty/tcp/location/Const.java new file mode 100644 index 000000000..055c26695 --- /dev/null +++ b/src/main/java/com/zhgd/netty/tcp/location/Const.java @@ -0,0 +1,123 @@ +package com.zhgd.netty.tcp.location; + +import java.nio.charset.Charset; + +public class Const { + + //默认字符集为GBK + public static final Charset DEFAULT_CHARSET = Charset.forName("GBK"); + + //消息分隔符 + public static final byte PKG_DELIMITER = 0x7e; + + // 终端应答 + public static final short TERNIMAL_RESP_COMMON_ = 0x0001; //通用应答 + + // 终端消息分类 + public static final short TERNIMAL_MSG_HEARTBEAT = 0x0002; //心跳 + public static final short TERNIMAL_MSG_REGISTER = 0x0100; //注册 + public static final short TERNIMAL_MSG_LOGOUT = 0x0003;//注销 + public static final short TERNIMAL_MSG_AUTH = 0x0102;//鉴权 + public static final short TERNIMAL_MSG_LOCATION = 0x0200;//位置 + public static final short TERNIMAL_MSG_LOCATION_BATCH = 0x0704;//批量位置上报 + public static final short TERNIMAL_MSG_STATUS = 0x0900;//位置数据上行透传 + + //报警命令ID + public static final short ALARM_COMMAND_INFORMED = 0xFA;// 报警命令ID及描述附表 + public static final short ALARM_COMMAND_EXTEND = 0xEB;// 轿车扩展数据流 + public static final short ALARM_COMMAND_BASICS = 0xEA;// 基础数据流附表 + public static final short ALARM_COMMAND_TRUCK = 0xEC;// 火车扩展数据流 + + //数据上行透传消息体 + public static final short STATUS_MSG_FLAMEOUT = 0xF1;// 驾驶行程数据(熄火发送) 驾驶行程数据包 + public static final short STATUS_MSG_FAULT = 0xF2;// 故障码数据(状态改变发送) 故障码数据包 + public static final short STATUS_MSG_DORMANCY = 0xF3;// 休眠进入(进入休眠模式发送) 休眠进入数据包 + public static final short STATUS_MSG_AWAKEN = 0xF4;// 休眠唤醒(退出休眠模式发送) 休眠唤醒数据包 + //public static final short STATUS_MSG_DATA =0xF5;// 车辆GPS精简数据包(货车版) 暂时未加入 + //public static final short STATUS_MSG_FLAMEOUT =0xF6;// MCU升级状态反馈包 MCU升级状态反馈包 + public static final short STATUS_MSG_COLLISION = 0xF7;// 疑似碰撞报警描述包 疑似碰撞报警描述包 + + //FA报警信息 + public static final short ALARM_COMMAND_SPEEDING = 0x0107;//超速报警 + public static final short ALARM_COMMAND_IDLING = 0x0106;//怠速报警 + public static final short ALARM_COMMAND_IGNITION = 0x0001;//点火上报 + public static final short ALARM_COMMAND_FLAMEOUT = 0x0002;//熄火上报 + public static final short ALARM_COMMAND_START = 0x0007;//系统启动 + public static final short ALARM_COMMAND_ABNORMAL_VIBRATION = 0x0115;//异常振动报警,类似于点火操作 + + + //服务器应答 + public static final short SERVER_RESP_COMMON = (short) 0x8001;//通用应答 + public static final short SERVER_RESP_REGISTER = (short) 0x8100;//注册应答 + + //readerIdleTime + public static final int IDLESTATE_HANDLER_READTIMEOUT = 15; + //包头最大长度16+包体最大长度1023+分隔符2+转义字符最大姑且算60 = 1100 + public static final int MAX_FRAME_LENGTH = 2200; + + + //_基础数据流 需要哪些数据,switch添加即可 + public static final short ALARM_COMMAND_0X0003 = 0x0003;//总里程数据 米 + public static final short ALARM_COMMAND_0X0004 = 0x0004;//总油耗数据 毫升 + public static final short ALARM_COMMAND_0X0005 = 0x0005;//总运行时长 秒 + public static final short ALARM_COMMAND_0X0006 = 0x0006;//总熄火时长 秒 + public static final short ALARM_COMMAND_0X0007 = 0x0007;//总怠速时长 秒 + public static final short ALARM_COMMAND_0X00010 = 0x0010;//加速度表 + public static final short ALARM_COMMAND_0X00011 = 0x0011;//车辆状态表 + public static final short ALARM_COMMAND_0X00012 = 0x0012;//车辆电压 0.1V + public static final short ALARM_COMMAND_0X00013 = 0x0013;//终端内置电池电压 0.1V + public static final short ALARM_COMMAND_0X00014 = 0x0014;// CSQ值 网络信号强度 + + + //轿车扩展数据流 + public static final short ALARM_COMMAND_0x60C0 = 0x60C0;//转速 精度:1偏移:0范围:0 ~ 8000 + public static final short ALARM_COMMAND_0x60D0 = 0x60D0;//车速 精度:1偏移:0范围:0 ~ 240 + public static final short ALARM_COMMAND_0x62F0 = 0x62F0;// 剩余油量 % L 剩余油量,单位L或% Bit15 ==0百分比% OBD都为百分比==1单位L + public static final short ALARM_COMMAND_0x6050 = 0x6050;//冷却液温度 + public static final short ALARM_COMMAND_0x60F0 = 0x60F0;//进气口温度 ℃ + public static final short ALARM_COMMAND_0x60B0 = 0x60B0;//进气(岐管绝对)压力 kPa + public static final short ALARM_COMMAND_0x6330 = 0x6330;//大气压力 kPa + public static final short ALARM_COMMAND_0x6460 = 0x6460;//环境温度 ℃ + public static final short ALARM_COMMAND_0x6490 = 0x6490;//加速踏板位置 + public static final short ALARM_COMMAND_0x60A0 = 0x60A0;//燃油压力 + public static final short ALARM_COMMAND_0x6014 = 0x6014;//故障码状态 + public static final short ALARM_COMMAND_0X6010 = 0X6010;//故障码个数 + public static final short ALARM_COMMAND_0x6100 = 0x6100;//空气流量 + public static final short ALARM_COMMAND_0x6110 = 0x6110;//绝对节气门位置 + public static final short ALARM_COMMAND_0x61F0 = 0x61F0;//自发动机起动的时间 + public static final short ALARM_COMMAND_0x6210 = 0x6210;//故障行驶里程 + public static final short ALARM_COMMAND_0x6040 = 0x6040;//计算负荷值 + public static final short ALARM_COMMAND_0x6070 = 0x6070;//长期燃油修正(气缸列1和3) + public static final short ALARM_COMMAND_0x60E0 = 0x60E0;//第一缸点火正时提前角 + public static final short ALARM_COMMAND_0x6901 = 0x6901;//前刹车片磨损 0 正常/否则 显示对应数据,单位:级 + public static final short ALARM_COMMAND_0x6902 = 0x6902;//后刹车片磨损 0 正常/否则 显示对应数据,单位:级 + public static final short ALARM_COMMAND_0x6903 = 0x6903;//制动液液位 + public static final short ALARM_COMMAND_0x6904 = 0x6904;//机油液位 mL 显示值为上传值/1000 单位 毫米 + public static final short ALARM_COMMAND_0x6905 = 0x6905;//胎压报警 0:当前无警告 1:存在胎压失压 + public static final short ALARM_COMMAND_0x6906 = 0x6906;//冷却液液位 + public static final short ALARM_COMMAND_0x6907 = 0x6907;//续航里程 + + + //货车扩展数据流 6部分与轿车类似 + public static final short ALARM_COMMAND_0x5001 = 0x5001;//离合器开关 0x00/0x01 关/开 + public static final short ALARM_COMMAND_0x5002 = 0x5002;//制动刹车开关 0x00/0x01 关/开 + public static final short ALARM_COMMAND_0x5003 = 0x6110;//驻车刹车开关 0x00/0x01 关/开 + public static final short ALARM_COMMAND_0x5004 = 0x61F0;//节流阀位置 精度:1偏移:0范围:0% ~ 100% + public static final short ALARM_COMMAND_0x5005 = 0x5005;//油料使用率 精度:0.05L/h偏移:0取值范围:0 ~ 3212.75L/h 单位 L/h + public static final short ALARM_COMMAND_0x5006 = 0x5006;//燃油温度 精度:0.03125℃偏移:-273.0℃范围:-273.0℃ ~ +1734.96875℃ 单位 ℃ + public static final short ALARM_COMMAND_0x5007 = 0x5007;//机油温度 精度:0.03125℃偏移:-273.0℃范围:-273.0℃ ~ +1734.96875℃ + public static final short ALARM_COMMAND_0x5008 = 0x5008;//OBD发动机润滑油压力 精度:4偏移:0范围:0 ~ 1000kpa + public static final short ALARM_COMMAND_0x5009 = 0x6901;//OBD制动器踏板位置 精度:1偏移:0范围:0% ~ 100% + public static final short ALARM_COMMAND_0x500A = 0x6902;//OBD 空气流量 精度:0.1偏移:0取值范围:0~6553.5 + public static final short ALARM_COMMAND_0x62f0 = 0x62f0;//剩余油量,单位L或%Bit15 ==0百分比% OBD都为百分 ==1单位L 显示值为上传值/10 + //能读取到以下数据 + public static final short ALARM_COMMAND_0x5105 = 0x5105;//反应剂余量 % 精度:0.4偏移:0范围:0% ~ 100% + public static final short ALARM_COMMAND_0x5101 = 0x5101;//发动机净输出扭矩 % 精度:1偏移:-125取值范围:-125% ~+125% + public static final short ALARM_COMMAND_0x5102 = 0x5102;// 摩擦扭矩 % 精度:1偏移:-125取值范围:-125% ~+125% + public static final short ALARM_COMMAND_0x510A = 0x510A;//发动机扭矩模式 0:超速失效1:转速控制2:扭矩控制3:转速/扭矩控制9:正常 + public static final short ALARM_COMMAND_0x510C = 0x510C;//尿素箱温度 ℃ 精度:1℃偏移:-40.0℃范围:-40.0℃ ~ +210℃ + + //新能源 + + +} diff --git a/src/main/java/com/zhgd/netty/tcp/location/DataPacket.java b/src/main/java/com/zhgd/netty/tcp/location/DataPacket.java new file mode 100644 index 000000000..3f4f76295 --- /dev/null +++ b/src/main/java/com/zhgd/netty/tcp/location/DataPacket.java @@ -0,0 +1,113 @@ +package com.zhgd.netty.tcp.location; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import lombok.Data; +import org.apache.commons.lang3.StringUtils; + +@Data +public class DataPacket { + + protected Header header = new Header(); //消息头 + protected ByteBuf body; //消息体 + + public DataPacket() { + } + + public DataPacket(ByteBuf body) { + this.body = body; + } + + public void parse() { + try { + this.parseHead(); + //验证包体长度 + if (this.header.getMsgBodyLength() != this.body.readableBytes()) { + throw new RuntimeException("包体长度有误"); + } + this.parseBody(); + } finally { + //ReferenceCountUtil.safeRelease(this.body); + } + } + + public void parseHead() { + header.setMsgId(body.readShort()); + header.setMsgBodyProps(body.readShort()); + header.setTerminalPhone(BCD.toString(readBytes(6))); + header.setFlowId(body.readShort()); + if (header.hasSubPackage()) { + //TODO 处理分包 + body.readInt(); + } + } + + /** + * 请求报文重写 + */ + protected void parseBody() { + + } + + /** + * 响应报文重写 并调用父类 + * + * @return + */ + public ByteBuf toByteBufMsg() { + ByteBuf bb = ByteBufAllocator.DEFAULT.heapBuffer();//在JT808Encoder escape()方法处回收 + bb.writeInt(0);//先占4字节用来写msgId和msgBodyProps,JT808Encoder中覆盖回来 + bb.writeBytes(BCD.toBcdBytes(StringUtils.leftPad(this.header.getTerminalPhone(), 12, "0"))); + bb.writeShort(this.header.getFlowId()); + //TODO 处理分包 + return bb; + } + + /** + * 从ByteBuf中read固定长度的数组,相当于ByteBuf.readBytes(byte[] dst)的简单封装 + * + * @param length + * @return + */ + public byte[] readBytes(int length) { + byte[] bytes = new byte[length]; + this.body.readBytes(bytes); + return bytes; + } + + /** + * 从ByteBuf中读出固定长度的数组 ,根据808默认字符集构建字符串 + * + * @param length + * @return + */ + public String readString(int length) { + return new String(readBytes(length), Const.DEFAULT_CHARSET); + } + + /** + * 消息头对象 + */ + @Data + public static class Header { + private short msgId;// 功能ID 2字节 + private short msgBodyProps;//消息属性 2字节 + private String terminalPhone; // 终端手机号 6字节 + private short flowId;// 流水号 2字节 + + //获取包体长度 + public short getMsgBodyLength() { + return (short) (msgBodyProps & 0x3ff); + } + + //获取加密类型 3bits + public byte getEncryptionType() { + return (byte) ((msgBodyProps & 0x1c00) >> 10); + } + + //是否分包 + public boolean hasSubPackage() { + return ((msgBodyProps & 0x2000) >> 13) == 1; + } + } +} diff --git a/src/main/java/com/zhgd/netty/tcp/location/EventLoopGroupConfig.java b/src/main/java/com/zhgd/netty/tcp/location/EventLoopGroupConfig.java new file mode 100644 index 000000000..199d1491f --- /dev/null +++ b/src/main/java/com/zhgd/netty/tcp/location/EventLoopGroupConfig.java @@ -0,0 +1,58 @@ +package com.zhgd.netty.tcp.location; + +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.util.concurrent.DefaultEventExecutorGroup; +import io.netty.util.concurrent.EventExecutorGroup; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @Author: Zpsw + * @Date: 2019-05-16 + * @Description: + * @Version: 1.0 + */ +@Configuration +public class EventLoopGroupConfig { + + @Value("${netty.threads.boss:1}") + private int bossThreadsNum; + + @Value("${netty.threads.worker:6}") + private int workerThreadsNum; + + @Value("${netty.threads.business:6}") + private int businessThreadsNum; + + /** + * 负责TCP连接建立操作 绝对不能阻塞 + * + * @return + */ + @Bean(name = "bossGroup") + public NioEventLoopGroup bossGroup() { + return new NioEventLoopGroup(bossThreadsNum); + } + + /** + * 负责Socket读写操作 绝对不能阻塞 + * + * @return + */ + @Bean(name = "workerGroup") + public NioEventLoopGroup workerGroup() { + return new NioEventLoopGroup(workerThreadsNum); + } + + /** + * Handler中出现IO操作(如数据库操作,网络操作)使用这个 + * + * @return + */ + @Bean(name = "businessGroup") + public EventExecutorGroup businessGroup() { + return new DefaultEventExecutorGroup(businessThreadsNum); + } + +} diff --git a/src/main/java/com/zhgd/netty/tcp/location/JT808Const.java b/src/main/java/com/zhgd/netty/tcp/location/JT808Const.java new file mode 100644 index 000000000..f92fec29a --- /dev/null +++ b/src/main/java/com/zhgd/netty/tcp/location/JT808Const.java @@ -0,0 +1,34 @@ +package com.zhgd.netty.tcp.location; + +import java.nio.charset.Charset; + +/** + * @Author: Zpsw + * @Date: 2019-05-15 + * @Description: JT808协议参数集合 + * @Version: 1.0 + */ +public class JT808Const { + + //默认字符集为GBK + public static final Charset DEFAULT_CHARSET = Charset.forName("GBK"); + + //消息分隔符 + public static final byte PKG_DELIMITER = 0x7e; + + // 终端应答 + public static final short TERNIMAL_RESP_COMMON_ = 0x0001; //通用应答 + + // 终端消息分类 + public static final short TERNIMAL_MSG_HEARTBEAT = 0x0002; //心跳 + public static final short TERNIMAL_MSG_REGISTER = 0x0100; //注册 + public static final short TERNIMAL_MSG_LOGOUT = 0x0003;//注销 + public static final short TERNIMAL_MSG_AUTH = 0x0102;//鉴权 + public static final short TERNIMAL_MSG_LOCATION = 0x0200;//位置 + + + //服务器应答 + public static final short SERVER_RESP_COMMON = (short) 0x8001;//通用应答 + public static final short SERVER_RESP_REGISTER = (short) 0x8100;//注册应答 + +} diff --git a/src/main/java/com/zhgd/netty/tcp/location/Location.java b/src/main/java/com/zhgd/netty/tcp/location/Location.java new file mode 100644 index 000000000..191becaed --- /dev/null +++ b/src/main/java/com/zhgd/netty/tcp/location/Location.java @@ -0,0 +1,25 @@ +package com.zhgd.netty.tcp.location; + +import lombok.Data; +import org.springframework.beans.BeanUtils; + +@Data +public class Location { + + private String phone; + private Integer alarm; + private Integer statusField; + private Float latitude; + private Float longitude; + private Short elevation; + private Short speed; + private Short direction; + private String time; + + public static Location parseFromLocationMsg(LocationMessage msg) { + Location location = new Location(); + location.setPhone(msg.getHeader().getTerminalPhone()); + BeanUtils.copyProperties(msg, location); + return location; + } +} diff --git a/src/main/java/com/zhgd/netty/tcp/location/LocationMessage.java b/src/main/java/com/zhgd/netty/tcp/location/LocationMessage.java new file mode 100644 index 000000000..9b2878536 --- /dev/null +++ b/src/main/java/com/zhgd/netty/tcp/location/LocationMessage.java @@ -0,0 +1,55 @@ +package com.zhgd.netty.tcp.location; + +import io.netty.buffer.ByteBuf; +import lombok.Data; + +import java.util.Objects; + +/** + * + */ +@Data +public class LocationMessage extends DataPacket { + + private int alarm; //告警信息 4字节 + private int statusField;//状态 4字节 + private float latitude;//纬度 4字节 + private float longitude;//经度 4字节 + private short elevation;//海拔高度 2字节 + private short speed; //速度 2字节 + private short direction; //方向 2字节 + private String time; //时间 6字节BCD + private float mileage; //里程 + private float mainFuelTankHeight; //主油箱高度 + + public LocationMessage(ByteBuf byteBuf) { + super(byteBuf); + } + + @Override + public void parseBody() { + //解码器中的MessageDecoder.parse方法会把body消息体传到这里,在这里根据自己的业务进行解析数据 + ByteBuf bb = this.body; + this.setAlarm(bb.readInt()); + this.setStatusField(bb.readInt()); + this.setLatitude(bb.readUnsignedInt() * 1.0F / 1000000); + this.setLongitude(bb.readUnsignedInt() * 1.0F / 1000000); + this.setElevation(bb.readShort()); + this.setSpeed(bb.readShort()); + this.setDirection(bb.readShort()); + this.setTime(BCD.toBcdTimeString(readBytes(6))); + while (bb.isReadable()) { + byte type = bb.readByte(); + byte len = bb.readByte(); + if (Objects.equals(type, '1')) { + if (Objects.equals(len, '4')) { + this.setMileage(bb.readFloat()); + } + } else if (Objects.equals(type, '2')) { + if (Objects.equals(len, '2')) { + this.setMileage(bb.readShort()); + } + } + } + } +} diff --git a/src/main/java/com/zhgd/netty/tcp/location/LocationMessageHandler.java b/src/main/java/com/zhgd/netty/tcp/location/LocationMessageHandler.java new file mode 100644 index 000000000..b0f4bc11b --- /dev/null +++ b/src/main/java/com/zhgd/netty/tcp/location/LocationMessageHandler.java @@ -0,0 +1,65 @@ +package com.zhgd.netty.tcp.location; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.sun.org.slf4j.internal.Logger; +import com.sun.org.slf4j.internal.LoggerFactory; +import com.zhgd.xmgl.modules.vehicleposition.entity.VehiclePositionData; +import com.zhgd.xmgl.modules.vehicleposition.entity.VehiclePositionDev; +import com.zhgd.xmgl.modules.vehicleposition.mapper.VehiclePositionDataMapper; +import com.zhgd.xmgl.modules.vehicleposition.mapper.VehiclePositionDevMapper; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.util.ReferenceCountUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Component; + +/** + * 具体处理类 + */ +@Component +@ChannelHandler.Sharable +@Slf4j +public class LocationMessageHandler extends BaseHandler { + @Autowired + VehiclePositionDataMapper vehiclePositionDataMapper; + @Autowired + VehiclePositionDevMapper vehiclePositionDevMapper; + @Autowired + @Qualifier("workerGroup") + private NioEventLoopGroup workerGroup; + + @Override + protected void channelRead0(ChannelHandlerContext ctx, LocationMessage message) { + try { + //官方建议效验码判断通过后,应立刻给出应答,防止重复请求服务器 + CommonResponse response = CommonResponse.success(message, getSerialNumber(ctx.channel()), CommonResponse.SUCCESS); + workerGroup.execute(() -> write(ctx, response)); + Location location = Location.parseFromLocationMsg(message); + log.info("设备号/手机号:{}", location.getPhone()); + VehiclePositionDev dev = vehiclePositionDevMapper.selectOne(new LambdaQueryWrapper() + .eq(VehiclePositionDev::getDevSn, location.getPhone())); + if (dev == null) { + return; + } + VehiclePositionData entity = new VehiclePositionData(); + entity.setDevSn(dev.getDevSn()); + //entity.setBatteryPercentage(); + //entity.setTotalFuelConsumptionDay(); + //entity.setTotalSleepTimeDay(); + //entity.setCumulativeOperatingFuelConsumptionDay(); + //entity.setTotalWorkTimeDay(); + entity.setLongitude(Double.valueOf(location.getLongitude())); + entity.setLatitude(Double.valueOf(location.getLatitude())); + entity.setProjectSn(dev.getProjectSn()); + entity.setSpeed(Double.valueOf(location.getSpeed())); + vehiclePositionDataMapper.insert(entity); + } catch (Exception e) { + log.error("LocationMessageHandler 解析报文信息发生错误", e); + } finally { + ReferenceCountUtil.release(message.getBody()); + } + } +} diff --git a/src/main/java/com/zhgd/netty/tcp/location/MessageDecoder.java b/src/main/java/com/zhgd/netty/tcp/location/MessageDecoder.java new file mode 100644 index 000000000..8bfb93a72 --- /dev/null +++ b/src/main/java/com/zhgd/netty/tcp/location/MessageDecoder.java @@ -0,0 +1,104 @@ +package com.zhgd.netty.tcp.location; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.ByteBufUtil; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.util.ReferenceCountUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.List; + +@Slf4j +@Component +public class MessageDecoder extends ByteToMessageDecoder { + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) { + log.info("<<<<< ip:{},hex:{}", ctx.channel().remoteAddress(), ByteBufUtil.hexDump(in).toLowerCase()); + DataPacket msg = null; + msg = decode(in); + if (msg != null) { + out.add(msg); + } + } + + private DataPacket decode(ByteBuf in) { + if (in.readableBytes() < 12) { //包头最小长度 + return null; + } + //第一步 转义 + byte[] raw = new byte[in.readableBytes()]; + in.readBytes(raw); + ByteBuf escape = revert(raw); + //第二步 校验 + byte pkgCheckSum = escape.getByte(escape.writerIndex() - 1); + escape.writerIndex(escape.writerIndex() - 1);//排除校验码 + byte calCheckSum = BCD.XorSumBytes(escape); + if (pkgCheckSum != calCheckSum) { + log.warn("校验码错误,pkgCheckSum:{},calCheckSum:{}", pkgCheckSum, calCheckSum); + ReferenceCountUtil.safeRelease(escape); + return null; + } + //第三步:解码 + return parse(escape); + } + + /** + * 将接收到的原始转义数据还原 + * + * @param raw + * @return + */ + public ByteBuf revert(byte[] raw) { + int len = raw.length; + ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer(len);//DataPacket parse方法回收 + for (int i = 0; i < len; i++) { + //这里如果最后一位是0x7d会导致index溢出,说明原始报文转义有误 + if (raw[i] == 0x7d && raw[i + 1] == 0x01) { + buf.writeByte(0x7d); + i++; + } else if (raw[i] == 0x7d && raw[i + 1] == 0x02) { + buf.writeByte(0x7e); + i++; + } else { + buf.writeByte(raw[i]); + } + } + return buf; + } + + public DataPacket parse(ByteBuf bb) { + DataPacket packet = null; + short msgId = bb.getShort(bb.readerIndex()); + switch (msgId) { + case Const.TERNIMAL_MSG_HEARTBEAT: + //packet = new HeartBeatMessage(bb); + break; + case Const.TERNIMAL_MSG_LOCATION://0200 + packet = new LocationMessage(bb); + break; + case Const.TERNIMAL_MSG_LOCATION_BATCH://0704 + //packet = new LocationBatchMessage(bb); + break; + case Const.TERNIMAL_MSG_REGISTER: + //packet = new RegisterMessage(bb); + break; + case Const.TERNIMAL_MSG_AUTH: + //packet = new AuthMessage(bb); + break; +// case Const.TERNIMAL_MSG_STATUS: 0900数据 +// packet = new StatusMessage(bb); +// break; + default: + packet = new DataPacket(bb); + break; + } + packet.parse(); + return packet; + } + + +} diff --git a/src/main/java/com/zhgd/netty/tcp/location/MessageEncoder.java b/src/main/java/com/zhgd/netty/tcp/location/MessageEncoder.java new file mode 100644 index 000000000..f6016df45 --- /dev/null +++ b/src/main/java/com/zhgd/netty/tcp/location/MessageEncoder.java @@ -0,0 +1,82 @@ +package com.zhgd.netty.tcp.location; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; +import io.netty.util.ReferenceCountUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +@ChannelHandler.Sharable +public class MessageEncoder extends MessageToByteEncoder { + + @Override + protected void encode(ChannelHandlerContext ctx, DataPacket msg, ByteBuf out) throws Exception { + log.debug(msg.toString()); + //第一步:转换 + ByteBuf bb = msg.toByteBufMsg(); + bb.markWriterIndex();//标记一下,先到前面去写覆盖的,然后回到标记写校验码 + short bodyLen = (short) (bb.readableBytes() - 12);//包体长度=总长度-头部长度 + short bodyProps = createDefaultMsgBodyProperty(bodyLen); + //覆盖占用的4字节 + bb.writerIndex(0); + bb.writeShort(msg.getHeader().getMsgId()); + bb.writeShort(bodyProps); + bb.resetWriterIndex(); + bb.writeByte(BCD.XorSumBytes(bb)); + //log.info(">>>>> ip:{},hex:{}\n", ctx.channel().remoteAddress(), ByteBufUtil.hexDump(bb).toLowerCase()); + //第二步:转义 + ByteBuf escape = escape(bb); + out.writeBytes(escape); + ReferenceCountUtil.safeRelease(escape); + } + + /** + * 转义待发送数据 + * + * @param raw + * @return + */ + public ByteBuf escape(ByteBuf raw) { + int len = raw.readableBytes(); + ByteBuf buf = ByteBufAllocator.DEFAULT.directBuffer(len + 12); + buf.writeByte(Const.PKG_DELIMITER); + while (len > 0) { + byte b = raw.readByte(); + if (b == 0x7e) { + buf.writeByte(0x7d); + buf.writeByte(0x02); + } else if (b == 0x7d) { + buf.writeByte(0x7d); + buf.writeByte(0x01); + } else { + buf.writeByte(b); + } + len--; + } + ReferenceCountUtil.safeRelease(raw); + buf.writeByte(Const.PKG_DELIMITER); + return buf; + } + + /** + * 生成header中的消息体属性 + * + * @param bodyLen + * @return + */ + public static short createDefaultMsgBodyProperty(short bodyLen) { + return createMsgBodyProperty(bodyLen, (byte) 0, false, (byte) 0); + } + + public static short createMsgBodyProperty(short bodyLen, byte encType, boolean isSubPackage, byte reversed) { + int subPkg = isSubPackage ? 1 : 0; + int ret = (bodyLen & 0x3FF) | ((encType << 10) & 0x1C00) | ((subPkg << 13) & 0x2000) + | ((reversed << 14) & 0xC000); + return (short) (ret & 0xffff); + } +} diff --git a/src/main/java/com/zhgd/netty/tcp/location/TcpNettyServerJT808.java b/src/main/java/com/zhgd/netty/tcp/location/TcpNettyServerJT808.java new file mode 100644 index 000000000..f4050e955 --- /dev/null +++ b/src/main/java/com/zhgd/netty/tcp/location/TcpNettyServerJT808.java @@ -0,0 +1,79 @@ +package com.zhgd.netty.tcp.location; + +import com.zhgd.netty.tcp.listener.BindListener; +import com.zhgd.netty.tcp.listener.CloseListener; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.DelimiterBasedFrameDecoder; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; + +/** + * @author jt808协议解析-人员车辆定位 + * @date 2020/03/17 15:45:35 + */ +@Component +@Slf4j +public class TcpNettyServerJT808 { + @Autowired + private LocationMessageHandler locationMessageHandler; + @Autowired + private MessageDecoder messageDecoder; + @Autowired + private MessageEncoder messageEncoder; + @Autowired + @Qualifier("bossGroup") + private NioEventLoopGroup bossGroup; + + @Autowired + @Qualifier("workerGroup") + private NioEventLoopGroup workerGroup; + + @Value("${jt808.port:15003}") + int port; + + @PostConstruct + private void startTcpServer() { + try { + ServerBootstrap serverBootstrap = new ServerBootstrap(); + serverBootstrap.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .option(ChannelOption.SO_BACKLOG, 1024) + .childOption(ChannelOption.SO_KEEPALIVE, true) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel socketChannel) throws Exception { + //添加自定义处理器 + socketChannel.pipeline() + .addLast(new DelimiterBasedFrameDecoder(1100, Unpooled.copiedBuffer(new byte[]{JT808Const.PKG_DELIMITER}), + Unpooled.copiedBuffer(new byte[]{JT808Const.PKG_DELIMITER, JT808Const.PKG_DELIMITER}))) + .addLast(messageDecoder) + .addLast(messageEncoder) + .addLast(locationMessageHandler); + } + }); + //监听器,当服务绑定成功后执行 + ChannelFuture channelFuture = serverBootstrap.bind(port).addListener(new BindListener()); + //监听器,当停止服务后执行。 + channelFuture.channel().closeFuture().addListener(new CloseListener()); + } catch (Exception e) { + log.error("err:", e); + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + } + } + +} diff --git a/src/main/java/com/zhgd/xmgl/modules/video/controller/AiAnalyseHardWareAlarmRecordController.java b/src/main/java/com/zhgd/xmgl/modules/video/controller/AiAnalyseHardWareAlarmRecordController.java index 41058a096..1164b9ed4 100644 --- a/src/main/java/com/zhgd/xmgl/modules/video/controller/AiAnalyseHardWareAlarmRecordController.java +++ b/src/main/java/com/zhgd/xmgl/modules/video/controller/AiAnalyseHardWareAlarmRecordController.java @@ -256,6 +256,7 @@ public class AiAnalyseHardWareAlarmRecordController { @ApiOperation(value = "严重违章人员排序", notes = "严重违章人员排序", httpMethod = "POST") @ApiImplicitParams({ @ApiImplicitParam(name = "projectSn", value = "项目sn", paramType = "body", required = true, dataType = "String"), + @ApiImplicitParam(name = "handleResult", value = "处置结果(1:已处置2误报忽略)", paramType = "body", required = false, dataType = "Integer"), }) @PostMapping(value = "/violatorListSort") public Result> violatorListSort(@RequestBody @ApiIgnore Map map) { diff --git a/src/main/java/com/zhgd/xmgl/modules/video/mapper/xml/AiAnalyseHardWareAlarmRecordMapper.xml b/src/main/java/com/zhgd/xmgl/modules/video/mapper/xml/AiAnalyseHardWareAlarmRecordMapper.xml index eacfb652d..a74c56012 100644 --- a/src/main/java/com/zhgd/xmgl/modules/video/mapper/xml/AiAnalyseHardWareAlarmRecordMapper.xml +++ b/src/main/java/com/zhgd/xmgl/modules/video/mapper/xml/AiAnalyseHardWareAlarmRecordMapper.xml @@ -473,9 +473,16 @@