1 /*
2  * MsgTrans - Message Transport Framework for DLang. Based on TCP, WebSocket, UDP transmission protocol.
3  *
4  * Copyright (C) 2019 HuntLabs
5  *
6  * Website: https://www.msgtrans.org
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module msgtrans.channel.tcp.TcpServerChannel;
13 
14 import msgtrans.MessageTransport;
15 import msgtrans.SessionManager;
16 import msgtrans.TransportContext;
17 import msgtrans.channel.ServerChannel;
18 import msgtrans.channel.TransportSession;
19 import msgtrans.channel.tcp.TcpCodec;
20 import msgtrans.channel.tcp.TcpTransportSession;
21 import msgtrans.MessageTransportServer;
22 import msgtrans.MessageBuffer;
23 import msgtrans.executor;
24 import msgtrans.ee2e.message.MsgDefine;
25 import msgtrans.ee2e.crypto;
26 import hunt.logging.ConsoleLogger;
27 import hunt.net;
28 import hunt.net.codec.Codec;
29 import hunt.io.channel.Common;
30 
31 import google.protobuf;
32 import std.array;
33 import msgtrans.ee2e.common;
34 import std.format;
35 import std.uuid;
36 import std.base64;
37 import core.time;
38 
39 /**
40  *
41  */
42 class TcpServerChannel : ServerChannel {
43     private NetServer _server;
44     private MessageTransport _messageTransport;
45     private SessionManager _sessionManager;
46     private msgtrans.TransportContext.AcceptHandler _acceptHandler;
47     private CloseHandler _closeHandler;
48     private string _name = typeof(this).stringof;
49 
50     enum string ChannelSession = "ChannelSession";
51 
52     private {
53         string _host;
54         ushort _port;
55 
56         NetServerOptions _options = null;
57     }
58 
59     this(ushort port) {
60         this("0.0.0.0", port);
61     }
62 
63     this(string host, ushort port) {
64         auto option = new NetServerOptions();
65         option.setTcpKeepAlive(true);
66         option.setKeepaliveWaitTime(60.seconds);
67         this(host, port, option);
68     }
69 
70     this(string host, ushort port, NetServerOptions options) {
71         _host = host;
72         _port = port;
73         _options = options;
74         // _name = randomUUID().toString();
75     }
76 
77     string name() {
78         return _name;
79     }
80 
81     ushort port() {
82         return _port;
83     }
84 
85     string host() {
86         return _host;
87     }
88 
89     void set(MessageTransport transport) {
90         _messageTransport = transport;
91         _sessionManager = transport.sessionManager();
92     }
93 
94     void start() {
95         initialize();
96         _server.listen(host, port);
97     }
98 
99     void stop() {
100         if (_server !is null)
101             _server.close();
102     }
103 
104     void onAccept(msgtrans.TransportContext.AcceptHandler handler) {
105         _acceptHandler = handler;
106     }
107 
108     void onClose(CloseHandler handler)
109     {
110         _closeHandler = handler;
111     }
112 
113     private void initialize() {
114         // dfmt off
115         _server = NetUtil.createNetServer!(ThreadMode.Single)(_options);
116 
117         _server.setCodec(new TcpCodec());
118 
119         _server.setHandler(new class NetConnectionHandler {
120 
121             override void connectionOpened(Connection connection) {
122                 version(HUNT_DEBUG) infof("Connection created: %s", connection.getRemoteAddress());
123                 TcpTransportSession session = new TcpTransportSession(_sessionManager.generateId(), connection);
124                 connection.setAttribute(ChannelSession, session);
125                 _sessionManager.add(session);
126                 TransportContext context = TransportContext(_sessionManager, session);
127                 if(_acceptHandler !is null) {
128                     _acceptHandler(context);
129                 }
130             }
131 
132             override void connectionClosed(Connection connection) {
133                 version(HUNT_DEBUG) infof("Connection closed: %s", connection.getRemoteAddress());
134                 TransportSession session = cast(TransportSession)connection.getAttribute(ChannelSession);
135                 if(session !is null ) {
136                     _sessionManager.remove(session);
137                 }
138                 if (_closeHandler !is null)
139                 {
140                   TransportContext context = TransportContext(_sessionManager, session);
141                   _closeHandler(context);
142                 }
143             }
144 
145             override DataHandleStatus messageReceived(Connection connection, Object message) {
146                 MessageBuffer buffer = cast(MessageBuffer) message;
147                 if(buffer is null) {
148                     warningf("expected type: MessageBuffer, message type: %s", typeid(message).name);
149                 } else {
150                     dispatchMessage(connection, buffer);
151                 }
152 
153                 return DataHandleStatus.Done;
154             }
155 
156             override void exceptionCaught(Connection connection, Throwable t) {
157                 debug warning(t.msg);
158             }
159 
160             override void failedOpeningConnection(int connectionId, Throwable t) {
161                 debug warning(t.msg);
162             }
163 
164             override void failedAcceptingConnection(int connectionId, Throwable t) {
165                 debug warning(t.msg);
166             }
167         });
168 
169         // dfmt on
170     }
171 
172     private void dispatchMessage(Connection connection, MessageBuffer message) {
173         version (HUNT_DEBUG) {
174             string str = format("data received: %s", message.toString());
175             tracef(str);
176         }
177 
178         // rx: 00 00 27 11 00 00 00 05 00 00 00 00 00 00 00 00 57 6F 72 6C 64
179         // tx: 00 00 4E 21 00 00 00 0B 00 00 00 00 00 00 00 00 48 65 6C 6C 6F 20 57 6F 72 6C 64
180 
181         uint messageId = message.id;
182         if (messageId == MESSAGE.INITIATE || messageId == MESSAGE.FINALIZE)
183         {
184             keyExchangeRequest(message,connection);
185             return;
186         }
187 
188        // string str = format("data received: %s", message.toString());
189         ExecutorInfo executorInfo = _messageTransport.getExecutor(messageId);
190         if (executorInfo == ExecutorInfo.init) {
191             warning("No Executor found for id: ", messageId);
192         } else {
193             TcpTransportSession session = cast(TcpTransportSession) connection.getAttribute(
194                     ChannelSession);
195             if (session is null) {
196                 session = new TcpTransportSession(_sessionManager.generateId(), connection);
197                 connection.setAttribute(ChannelSession, session);
198                 _sessionManager.add(session);
199             }
200             TransportContext context = TransportContext(_sessionManager, session);
201             if (MessageTransportServer.isEE2E)
202             {
203                 peerkey_s peerkeys = cast(peerkey_s)(context.session().getAttribute("EE2E"));
204                 if(peerkeys !is null)
205                 {
206                     message = common.encrypted_decode(message,peerkeys);
207                     if(message is null)
208                     {
209                         connection.close();
210                         return;
211                     }
212                 }else
213                 {
214                     logError("peerkeys is null");
215                 }
216             }
217             executorInfo.execute(context, message);
218         }
219     }
220 
221     private void keyExchangeRequest(MessageBuffer message, Connection connection)
222     {
223         TcpTransportSession session = cast(TcpTransportSession) connection.getAttribute(
224         ChannelSession);
225         if (session is null) {
226           session = new TcpTransportSession(_sessionManager.generateId(), connection);
227           connection.setAttribute(ChannelSession, session);
228           _sessionManager.add(session);
229         }
230         TransportContext context = TransportContext(_sessionManager, session);
231 
232         switch(message.id)
233         {
234             case MESSAGE.INITIATE :
235             {
236                 KeyExchangeRequest keyExchangeRes = new KeyExchangeRequest;
237                 message.data.fromProtobuf!KeyExchangeRequest(keyExchangeRes);
238 
239                 //logInfo("%s",keyExchangeRes.key_info.ec_public_key_65bytes);
240 
241                 peerkey_s peerkeys = new peerkey_s;
242                 peerkeys.ec_pub_key = Base64.decode(keyExchangeRes.key_info.ec_public_key_65bytes);
243                 peerkeys.salt = Base64.decode(keyExchangeRes.key_info.salt_32bytes);
244                 context.session().setAttribute("EE2E",peerkeys);
245                 logInfo("client pub : %s" ,peerkeys.ec_pub_key );
246                 logInfo("client salt : %s" , peerkeys.salt);
247 
248 
249                 KeyExchangeRequest res = new KeyExchangeRequest;
250                 KeyInfo info = new KeyInfo;
251                 info.ec_public_key_65bytes = Base64.encode(MessageTransportServer.s_server_key.ec_pub_key);
252                 info.salt_32bytes = Base64.encode(MessageTransportServer.s_server_key.salt);
253                 res.key_info = info;
254                 logInfo("server pub : %s" ,res.key_info.ec_public_key_65bytes );
255                 logInfo("server salt : %s" , res.key_info.salt_32bytes);
256 
257                 context.session().send(new MessageBuffer(MESSAGE.INITIATE, res.toProtobuf.array));
258                 break;
259             }
260             case MESSAGE.FINALIZE :
261             {
262                 peerkey_s peerkeys  =  cast(peerkey_s)(context.session().getAttribute("EE2E"));
263                 if (peerkeys !is null && common.keyCalculate(MessageTransportServer.s_server_key, peerkeys))
264                 {
265                     context.session().send(new MessageBuffer(cast(uint)MESSAGE.FINALIZE, cast(ubyte[])null));
266                 }else
267                 {
268                     logError("peerkeys is null");
269                 }
270                 break;
271             }
272             default: break;
273         }
274 
275 
276     }
277 }