1 /*
2  * MsgTrans - Message Transport Framework for DLang. Based on TCP, WebSocket, UDP transmission protocol.
3  *
4  * Copyright (C) 2019 HuntLabs
5  *
6  * Website: https://www.msgtrans.org
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module msgtrans.channel.tcp.TcpServerChannel;
13 
14 import msgtrans.MessageTransport;
15 import msgtrans.SessionManager;
16 import msgtrans.TransportContext;
17 import msgtrans.channel.ServerChannel;
18 import msgtrans.channel.TransportSession;
19 import msgtrans.channel.tcp.TcpCodec;
20 import msgtrans.channel.tcp.TcpTransportSession;
21 import msgtrans.MessageTransportServer;
22 import msgtrans.MessageBuffer;
23 import msgtrans.executor;
24 import msgtrans.ee2e.message.MsgDefine;
25 import msgtrans.ee2e.crypto;
26 import hunt.logging.ConsoleLogger;
27 import hunt.net;
28 import hunt.net.codec.Codec;
29 import google.protobuf;
30 import std.array;
31 import msgtrans.ee2e.common;
32 import std.format;
33 import std.uuid;
34 import std.base64;
35 import core.time;
36 
37 /**
38  *
39  */
40 class TcpServerChannel : ServerChannel {
41     private NetServer _server;
42     private MessageTransport _messageTransport;
43     private SessionManager _sessionManager;
44     private AcceptHandler _acceptHandler;
45     private CloseHandler _closeHandler;
46     private string _name = typeof(this).stringof;
47 
48     enum string ChannelSession = "ChannelSession";
49 
50     private {
51         string _host;
52         ushort _port;
53 
54         NetServerOptions _options = null;
55     }
56 
57     this(ushort port) {
58         this("0.0.0.0", port);
59     }
60 
61     this(string host, ushort port) {
62         auto option = new NetServerOptions();
63         option.setTcpKeepAlive(true);
64         option.setKeepaliveWaitTime(60.seconds);
65         this(host, port, option);
66     }
67 
68     this(string host, ushort port, NetServerOptions options) {
69         _host = host;
70         _port = port;
71         _options = options;
72         // _name = randomUUID().toString();
73     }
74 
75     string name() {
76         return _name;
77     }
78 
79     ushort port() {
80         return _port;
81     }
82 
83     string host() {
84         return _host;
85     }
86 
87     void set(MessageTransport transport) {
88         _messageTransport = transport;
89         _sessionManager = transport.sessionManager();
90     }
91 
92     void start() {
93         initialize();
94         _server.listen(host, port);
95     }
96 
97     void stop() {
98         if (_server !is null)
99             _server.close();
100     }
101 
102     void onAccept(AcceptHandler handler) {
103         _acceptHandler = handler;
104     }
105 
106     void onClose(CloseHandler handler)
107     {
108         _closeHandler = handler;
109     }
110 
111     private void initialize() {
112         // dfmt off
113         _server = NetUtil.createNetServer!(ThreadMode.Single)(_options);
114 
115         _server.setCodec(new TcpCodec());
116 
117         _server.setHandler(new class NetConnectionHandler {
118 
119             override void connectionOpened(Connection connection) {
120                 version(HUNT_DEBUG) infof("Connection created: %s", connection.getRemoteAddress());
121                 TcpTransportSession session = new TcpTransportSession(_sessionManager.generateId(), connection);
122                 connection.setAttribute(ChannelSession, session);
123                 _sessionManager.add(session);
124                 TransportContext context = TransportContext(_sessionManager, session);
125                 if(_acceptHandler !is null) {
126                     _acceptHandler(context);
127                 }
128             }
129 
130             override void connectionClosed(Connection connection) {
131                 version(HUNT_DEBUG) infof("Connection closed: %s", connection.getRemoteAddress());
132                 TransportSession session = cast(TransportSession)connection.getAttribute(ChannelSession);
133                 if(session !is null ) {
134                     _sessionManager.remove(session);
135                 }
136                 if (_closeHandler !is null)
137                 {
138                   TransportContext context = TransportContext(_sessionManager, session);
139                   _closeHandler(context);
140                 }
141             }
142 
143             override void messageReceived(Connection connection, Object message) {
144                 MessageBuffer buffer = cast(MessageBuffer) message;
145                 if(buffer is null) {
146                     warningf("expected type: MessageBuffer, message type: %s", typeid(message).name);
147                 } else {
148                     dispatchMessage(connection, buffer);
149                 }
150             }
151 
152             override void exceptionCaught(Connection connection, Throwable t) {
153                 debug warning(t.msg);
154             }
155 
156             override void failedOpeningConnection(int connectionId, Throwable t) {
157                 debug warning(t.msg);
158             }
159 
160             override void failedAcceptingConnection(int connectionId, Throwable t) {
161                 debug warning(t.msg);
162             }
163         });
164 
165         // dfmt on
166     }
167 
168     private void dispatchMessage(Connection connection, MessageBuffer message) {
169         version (HUNT_DEBUG) {
170             string str = format("data received: %s", message.toString());
171             tracef(str);
172         }
173 
174         // rx: 00 00 27 11 00 00 00 05 00 00 00 00 00 00 00 00 57 6F 72 6C 64
175         // tx: 00 00 4E 21 00 00 00 0B 00 00 00 00 00 00 00 00 48 65 6C 6C 6F 20 57 6F 72 6C 64
176 
177         uint messageId = message.id;
178         if (messageId == MESSAGE.INITIATE || messageId == MESSAGE.FINALIZE)
179         {
180             keyExchangeRequest(message,connection);
181             return;
182         }
183 
184        // string str = format("data received: %s", message.toString());
185         ExecutorInfo executorInfo = _messageTransport.getExecutor(messageId);
186         if (executorInfo == ExecutorInfo.init) {
187             warning("No Executor found for id: ", messageId);
188         } else {
189             TcpTransportSession session = cast(TcpTransportSession) connection.getAttribute(
190                     ChannelSession);
191             if (session is null) {
192                 session = new TcpTransportSession(_sessionManager.generateId(), connection);
193                 connection.setAttribute(ChannelSession, session);
194                 _sessionManager.add(session);
195             }
196             TransportContext context = TransportContext(_sessionManager, session);
197             if (MessageTransportServer.isEE2E)
198             {
199                 peerkey_s peerkeys = cast(peerkey_s)(context.session().getAttribute("EE2E"));
200                 if(peerkeys !is null)
201                 {
202                     message = common.encrypted_decode(message,peerkeys);
203                     if(message is null)
204                     {
205                         connection.close();
206                         return;
207                     }
208                 }else
209                 {
210                     logError("peerkeys is null");
211                 }
212             }
213             executorInfo.execute(context, message);
214         }
215     }
216 
217     private void keyExchangeRequest(MessageBuffer message, Connection connection)
218     {
219         TcpTransportSession session = cast(TcpTransportSession) connection.getAttribute(
220         ChannelSession);
221         if (session is null) {
222           session = new TcpTransportSession(_sessionManager.generateId(), connection);
223           connection.setAttribute(ChannelSession, session);
224           _sessionManager.add(session);
225         }
226         TransportContext context = TransportContext(_sessionManager, session);
227 
228         switch(message.id)
229         {
230             case MESSAGE.INITIATE :
231             {
232                 KeyExchangeRequest keyExchangeRes = new KeyExchangeRequest;
233                 message.data.fromProtobuf!KeyExchangeRequest(keyExchangeRes);
234 
235                 //logInfo("%s",keyExchangeRes.key_info.ec_public_key_65bytes);
236 
237                 peerkey_s peerkeys = new peerkey_s;
238                 peerkeys.ec_pub_key = Base64.decode(keyExchangeRes.key_info.ec_public_key_65bytes);
239                 peerkeys.salt = Base64.decode(keyExchangeRes.key_info.salt_32bytes);
240                 context.session().setAttribute("EE2E",peerkeys);
241                 logInfo("client pub : %s" ,peerkeys.ec_pub_key );
242                 logInfo("client salt : %s" , peerkeys.salt);
243 
244 
245                 KeyExchangeRequest res = new KeyExchangeRequest;
246                 KeyInfo info = new KeyInfo;
247                 info.ec_public_key_65bytes = Base64.encode(MessageTransportServer.s_server_key.ec_pub_key);
248                 info.salt_32bytes = Base64.encode(MessageTransportServer.s_server_key.salt);
249                 res.key_info = info;
250                 logInfo("server pub : %s" ,res.key_info.ec_public_key_65bytes );
251                 logInfo("server salt : %s" , res.key_info.salt_32bytes);
252 
253                 context.session().send(new MessageBuffer(MESSAGE.INITIATE, res.toProtobuf.array));
254                 break;
255             }
256             case MESSAGE.FINALIZE :
257             {
258                 peerkey_s peerkeys  =  cast(peerkey_s)(context.session().getAttribute("EE2E"));
259                 if (peerkeys !is null && common.keyCalculate(MessageTransportServer.s_server_key, peerkeys))
260                 {
261                     context.session().send(new MessageBuffer(MESSAGE.FINALIZE, []));
262                 }else
263                 {
264                     logError("peerkeys is null");
265                 }
266                 break;
267             }
268             default: break;
269         }
270 
271 
272     }
273 }