1 /* 2 * MsgTrans - Message Transport Framework for DLang. Based on TCP, WebSocket, UDP transmission protocol. 3 * 4 * Copyright (C) 2019 HuntLabs 5 * 6 * Website: https://www.msgtrans.org 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module msgtrans.PacketParser; 13 14 import hunt.io.ByteBuffer; 15 import hunt.io.BufferUtils; 16 import hunt.collection.List; 17 import hunt.collection.ArrayList; 18 19 import hunt.logging.ConsoleLogger; 20 import hunt.serialization.JsonSerializer; 21 22 import msgtrans.MessageBuffer; 23 import msgtrans.PacketHeader; 24 import std.bitmanip; 25 import std.algorithm : max, canFind; 26 27 28 /** 29 * 30 */ 31 class PacketParser { 32 private ByteBuffer _receivedPacketBuf; 33 private size_t _defaultBufferSize = 8*1024; 34 35 this(size_t bufferSize = 8*1024) { 36 _defaultBufferSize = bufferSize; 37 } 38 39 private void mergeBuffer(ByteBuffer now) { 40 if (_receivedPacketBuf is null) { 41 version(HUNT_MESSAGE_DEBUG) tracef("buffering data: %d, bytes", now.remaining()); 42 // ByteBuffer ret = newBuffer(now.remaining()); 43 // ret.put(now).flip(); 44 // _receivedPacketBuf = ret; 45 _receivedPacketBuf = now; 46 } else { 47 if (_receivedPacketBuf.hasRemaining()) { 48 version(HUNT_MESSAGE_DEBUG) { 49 tracef("merge buffer -> %s, %s", _receivedPacketBuf.remaining(), now.remaining()); 50 } 51 ByteBuffer ret = newBuffer(_receivedPacketBuf.remaining() + now.remaining()); 52 ret.put(_receivedPacketBuf).put(now).flip(); 53 _receivedPacketBuf = ret; 54 } else { 55 version(HUNT_MESSAGE_DEBUG) { 56 tracef("buffering data: %s, bytes, current buffer: %s", 57 now.toString(), _receivedPacketBuf.toString()); 58 } 59 60 if(now.remaining() <= _receivedPacketBuf.remaining()) { 61 _receivedPacketBuf.clear(); 62 _receivedPacketBuf.put(now).flip(); 63 } else { 64 ByteBuffer ret = newBuffer(now.remaining()); 65 ret.put(now).flip(); 66 _receivedPacketBuf = ret; 67 } 68 } 69 } 70 version(HUNT_MESSAGE_DEBUG) trace(_receivedPacketBuf.toString()); 71 } 72 73 protected ByteBuffer newBuffer(int size) { 74 return BufferUtils.allocate(size); 75 } 76 77 MessageBuffer[] parse(ByteBuffer buffer) { 78 // TODO: Tasks pending completion -@zhangxueping at 2019-11-13T10:07:54+08:00 79 // To handle big size frame 80 81 version(HUNT_DEBUG) tracef("incoming buffer: %s", buffer); 82 mergeBuffer(buffer); 83 84 MessageBuffer[] resultBuffers; 85 size_t dataStart = 0; 86 87 while (_receivedPacketBuf.remaining() >= PACKET_HEADER_LENGTH) { 88 ubyte[] data = cast(ubyte[])_receivedPacketBuf.peekRemaining(); 89 //infof("rev buff: %s ----- %d",data, data.length); 90 PacketHeader header = PacketHeader.parse(data); 91 if(header is null) { 92 warning("corrupted data"); 93 version(HUNT_DEBUG) { 94 if(data.length<=64) 95 infof("%(%02X %)", data[0 .. $]); 96 else 97 infof("%(%02X %) ...", data[0 .. 64]); 98 } 99 _receivedPacketBuf.clear().flip(); // All buffered data will be dropped. 100 version(HUNT_MESSAGE_DEBUG) { 101 tracef("_receivedPacketBuf: %d, %s", 102 _receivedPacketBuf.remaining(), _receivedPacketBuf.toString()); 103 } 104 return null; 105 106 } else if(AvaliableMessageIds.length>0 && !AvaliableMessageIds.canFind(header.messageId())) { 107 warningf("Unrecognized packet: %s", header.toString()); 108 _receivedPacketBuf.clear().flip(); 109 return null; 110 } 111 112 version(HUNT_DEBUG) infof("packet header, %s", header.toString()); 113 114 size_t currentFrameSize = header.messageLength + header.extendLength() + PACKET_HEADER_LENGTH; 115 if (data.length < currentFrameSize) { 116 // No enough data for a full frame, so save the remaining 117 break; 118 } 119 //else if(data.length > MAX_PACKET_SIZE) { 120 // warningf("Out of packet size (<= %d): %d", MAX_PACKET_SIZE, currentFrameSize); 121 // return null; 122 //} 123 if (header.extendLength() > 0) 124 { 125 //if(header.extendLength() == uint.sizeof) 126 //{ 127 ubyte[] d = data[PACKET_HEADER_LENGTH .. PACKET_HEADER_LENGTH + cast(int)(header.extendLength())]; 128 //ubyte[Extend.sizeof] d 129 //uint tagId = bigEndianToNative!uint(d); 130 //Extend extend = cast(Extend)d; 131 //logInfof("Extend : %s --- %s -----%s" , header.extendLength(),extend.tagId, extend.userId); 132 resultBuffers ~= new MessageBuffer(header.messageId(), data[PACKET_HEADER_LENGTH + header.extendLength() ..currentFrameSize] , d); 133 //} 134 //else 135 //{ 136 // ubyte[EXTENSION_FIELD_LENGTH] d1 = data[PACKET_HEADER_LENGTH .. PACKET_HEADER_LENGTH + uint.sizeof]; 137 // uint tagId = bigEndianToNative!uint(d1); 138 // ubyte[EXTENSION_FIELD_LENGTH] d2 = data[PACKET_HEADER_LENGTH + uint.sizeof .. PACKET_HEADER_LENGTH + uint.sizeof + uint.sizeof]; 139 // uint clientId = bigEndianToNative!uint(d2); 140 // logInfof("tagId : %s" , tagId); 141 // logInfof("clientId : %s" , clientId); 142 // resultBuffers ~= new MessageBuffer(header.messageId(), data[PACKET_HEADER_LENGTH + header.extendLength() ..currentFrameSize] , tagId ,clientId); 143 //} 144 }else 145 { 146 resultBuffers ~= new MessageBuffer(header.messageId(), data[PACKET_HEADER_LENGTH..currentFrameSize]); 147 } 148 149 version(HUNT_MESSAGE_DEBUG) trace(_receivedPacketBuf.toString()); 150 _receivedPacketBuf.nextGetIndex(cast(int)currentFrameSize); 151 version(HUNT_MESSAGE_DEBUG) trace(_receivedPacketBuf.toString()); 152 } 153 154 int remaining = _receivedPacketBuf.remaining(); 155 version(HUNT_MESSAGE_DEBUG) { 156 tracef("remaining: %d, buffer: %s", remaining, _receivedPacketBuf.toString()); 157 } 158 159 if(remaining > 0) { 160 byte[] data = cast(byte[])_receivedPacketBuf.peekRemaining(); 161 size_t newLength = max(remaining, _defaultBufferSize); 162 if(_receivedPacketBuf is buffer || newLength > _receivedPacketBuf.capacity()) { 163 version(HUNT_DEBUG) infof("reset buffer's size to %d bytes", newLength); 164 _receivedPacketBuf = BufferUtils.allocate(cast(int)newLength); 165 _receivedPacketBuf.put(data.dup).flip(); // buffer the remaining 166 version(HUNT_MESSAGE_DEBUG) trace(_receivedPacketBuf.toString()); 167 } else if(resultBuffers.length > 0) { 168 version(HUNT_MESSAGE_DEBUG) warning("buffer the remaining: %s", _receivedPacketBuf.toString()); 169 _receivedPacketBuf.rewind(); 170 _receivedPacketBuf.put(data.dup).flip(); // buffer the remaining 171 version(HUNT_MESSAGE_DEBUG) trace(_receivedPacketBuf.toString()); 172 } else { 173 version(HUNT_MESSAGE_DEBUG) warning("do nothing"); 174 } 175 } else { 176 _receivedPacketBuf = null; 177 } 178 179 return resultBuffers; 180 } 181 }