1 /*
2  * MsgTrans - Message Transport Framework for DLang. Based on TCP, WebSocket, UDP transmission protocol.
3  *
4  * Copyright (C) 2019 HuntLabs
5  *
6  * Website: https://www.msgtrans.org
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module msgtrans.PacketParser;
13 
14 import hunt.io.ByteBuffer;
15 import hunt.io.BufferUtils;
16 import hunt.collection.List;
17 import hunt.collection.ArrayList;
18 
19 import hunt.logging.ConsoleLogger;
20 import hunt.serialization.JsonSerializer;
21 
22 import msgtrans.MessageBuffer;
23 import msgtrans.PacketHeader;
24 import std.bitmanip;
25 import std.algorithm : max, canFind;
26 
27 
28 /**
29  *
30  */
31 class PacketParser {
32     private ByteBuffer _receivedPacketBuf;
33     private size_t _defaultBufferSize = 8*1024;
34 
35     this(size_t bufferSize = 8*1024) {
36         _defaultBufferSize = bufferSize;
37     }
38 
39     private void mergeBuffer(ByteBuffer now) {
40         if (_receivedPacketBuf is null) {
41             version(HUNT_MESSAGE_DEBUG) tracef("buffering data: %d, bytes", now.remaining());
42             // ByteBuffer ret = newBuffer(now.remaining());
43             // ret.put(now).flip();
44             // _receivedPacketBuf = ret;
45             _receivedPacketBuf = now;
46         } else {
47             if (_receivedPacketBuf.hasRemaining()) {
48                 version(HUNT_MESSAGE_DEBUG) {
49                     tracef("merge buffer -> %s, %s", _receivedPacketBuf.remaining(), now.remaining());
50                 }
51                 ByteBuffer ret = newBuffer(_receivedPacketBuf.remaining() + now.remaining());
52                 ret.put(_receivedPacketBuf).put(now).flip();
53                 _receivedPacketBuf = ret;
54             } else {
55                 version(HUNT_MESSAGE_DEBUG) {
56                     tracef("buffering data: %s, bytes, current buffer: %s",
57                         now.toString(), _receivedPacketBuf.toString());
58                 }
59 
60                 if(now.remaining() <= _receivedPacketBuf.remaining()) {
61                     _receivedPacketBuf.clear();
62                     _receivedPacketBuf.put(now).flip();
63                 } else {
64                     ByteBuffer ret = newBuffer(now.remaining());
65                     ret.put(now).flip();
66                     _receivedPacketBuf = ret;
67                 }
68             }
69         }
70         version(HUNT_MESSAGE_DEBUG) trace(_receivedPacketBuf.toString());
71     }
72 
73     protected ByteBuffer newBuffer(int size) {
74         return BufferUtils.allocate(size);
75     }
76 
77     MessageBuffer[] parse(ByteBuffer buffer) {
78         // TODO: Tasks pending completion -@zhangxueping at 2019-11-13T10:07:54+08:00
79         // To handle big size frame
80 
81         version(HUNT_DEBUG) tracef("incoming buffer: %s", buffer);
82         mergeBuffer(buffer);
83 
84         MessageBuffer[] resultBuffers;
85         size_t dataStart = 0;
86 
87         while (_receivedPacketBuf.remaining() >= PACKET_HEADER_LENGTH) {
88             ubyte[] data = cast(ubyte[])_receivedPacketBuf.peekRemaining();
89             //infof("rev buff: %s ----- %d",data, data.length);
90             PacketHeader header = PacketHeader.parse(data);
91             if(header is null) {
92                 warning("corrupted data");
93                 version(HUNT_DEBUG) {
94                     if(data.length<=64)
95                         infof("%(%02X %)", data[0 .. $]);
96                     else
97                         infof("%(%02X %) ...", data[0 .. 64]);
98                 }
99                 _receivedPacketBuf.clear().flip(); // All buffered data will be dropped.
100                 version(HUNT_MESSAGE_DEBUG) {
101                     tracef("_receivedPacketBuf: %d, %s",
102                      _receivedPacketBuf.remaining(), _receivedPacketBuf.toString());
103                 }
104                 return null;
105 
106             } else if(AvaliableMessageIds.length>0 && !AvaliableMessageIds.canFind(header.messageId())) {
107                 warningf("Unrecognized packet: %s", header.toString());
108                 _receivedPacketBuf.clear().flip();
109                 return null;
110             }
111 
112             version(HUNT_DEBUG) infof("packet header, %s", header.toString());
113 
114             size_t currentFrameSize = header.messageLength + header.extendLength() + PACKET_HEADER_LENGTH;
115             if (data.length < currentFrameSize) {
116                 // No enough data for a full frame, so save the remaining
117                 break;
118             }
119             //else if(data.length > MAX_PACKET_SIZE) {
120             //    warningf("Out of packet size (<= %d): %d", MAX_PACKET_SIZE, currentFrameSize);
121             //    return null;
122             //}
123             if (header.extendLength() > 0)
124             {
125                 //if(header.extendLength() == uint.sizeof)
126                 //{
127                   ubyte[] d = data[PACKET_HEADER_LENGTH .. PACKET_HEADER_LENGTH + cast(int)(header.extendLength())];
128                   //ubyte[Extend.sizeof] d
129                   //uint tagId = bigEndianToNative!uint(d);
130                   //Extend extend  = cast(Extend)d;
131                   //logInfof("Extend : %s ---  %s -----%s" , header.extendLength(),extend.tagId, extend.userId);
132                   resultBuffers ~= new MessageBuffer(header.messageId(), data[PACKET_HEADER_LENGTH + header.extendLength() ..currentFrameSize] , d);
133                 //}
134                 //else
135                 //{
136                 //  ubyte[EXTENSION_FIELD_LENGTH] d1 = data[PACKET_HEADER_LENGTH .. PACKET_HEADER_LENGTH + uint.sizeof];
137                 //  uint tagId = bigEndianToNative!uint(d1);
138                 //  ubyte[EXTENSION_FIELD_LENGTH] d2 = data[PACKET_HEADER_LENGTH + uint.sizeof .. PACKET_HEADER_LENGTH + uint.sizeof + uint.sizeof];
139                 //  uint clientId = bigEndianToNative!uint(d2);
140                 //  logInfof("tagId : %s" , tagId);
141                 //  logInfof("clientId : %s" , clientId);
142                 //  resultBuffers ~= new MessageBuffer(header.messageId(), data[PACKET_HEADER_LENGTH + header.extendLength() ..currentFrameSize] , tagId ,clientId);
143                 //}
144             }else
145             {
146                 resultBuffers ~= new MessageBuffer(header.messageId(), data[PACKET_HEADER_LENGTH..currentFrameSize]);
147             }
148 
149             version(HUNT_MESSAGE_DEBUG) trace(_receivedPacketBuf.toString());
150             _receivedPacketBuf.nextGetIndex(cast(int)currentFrameSize);
151             version(HUNT_MESSAGE_DEBUG) trace(_receivedPacketBuf.toString());
152         }
153 
154         int remaining = _receivedPacketBuf.remaining();
155         version(HUNT_MESSAGE_DEBUG) {
156             tracef("remaining: %d, buffer: %s", remaining, _receivedPacketBuf.toString());
157         }
158 
159         if(remaining > 0) {
160             byte[] data = cast(byte[])_receivedPacketBuf.peekRemaining();
161             size_t newLength = max(remaining, _defaultBufferSize);
162             if(_receivedPacketBuf is buffer || newLength > _receivedPacketBuf.capacity()) {
163                 version(HUNT_DEBUG) infof("reset buffer's size to %d bytes", newLength);
164                 _receivedPacketBuf = BufferUtils.allocate(cast(int)newLength);
165                 _receivedPacketBuf.put(data.dup).flip(); // buffer the remaining
166                 version(HUNT_MESSAGE_DEBUG) trace(_receivedPacketBuf.toString());
167             } else if(resultBuffers.length > 0) {
168                 version(HUNT_MESSAGE_DEBUG) warning("buffer the remaining: %s", _receivedPacketBuf.toString());
169                 _receivedPacketBuf.rewind();
170                 _receivedPacketBuf.put(data.dup).flip(); // buffer the remaining
171                 version(HUNT_MESSAGE_DEBUG) trace(_receivedPacketBuf.toString());
172             } else {
173                 version(HUNT_MESSAGE_DEBUG) warning("do nothing");
174             }
175         } else {
176             _receivedPacketBuf = null;
177         }
178 
179         return resultBuffers;
180     }
181 }