1 /* 2 * MsgTrans - Message Transport Framework for DLang. Based on TCP, WebSocket, UDP transmission protocol. 3 * 4 * Copyright (C) 2019 HuntLabs 5 * 6 * Website: https://www.msgtrans.org 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module msgtrans.channel.tcp.TcpServerChannel; 13 14 import msgtrans.MessageTransport; 15 import msgtrans.SessionManager; 16 import msgtrans.TransportContext; 17 import msgtrans.channel.ServerChannel; 18 import msgtrans.channel.TransportSession; 19 import msgtrans.channel.tcp.TcpCodec; 20 import msgtrans.channel.tcp.TcpTransportSession; 21 import msgtrans.MessageTransportServer; 22 import msgtrans.MessageBuffer; 23 import msgtrans.executor; 24 import msgtrans.ee2e.message.MsgDefine; 25 import msgtrans.ee2e.crypto; 26 import hunt.logging.ConsoleLogger; 27 import hunt.net; 28 import hunt.net.codec.Codec; 29 import hunt.io.channel.Common; 30 31 import google.protobuf; 32 import std.array; 33 import msgtrans.ee2e.common; 34 import std.format; 35 import std.uuid; 36 import std.base64; 37 import core.time; 38 39 /** 40 * 41 */ 42 class TcpServerChannel : ServerChannel { 43 private NetServer _server; 44 private MessageTransport _messageTransport; 45 private SessionManager _sessionManager; 46 private msgtrans.TransportContext.AcceptHandler _acceptHandler; 47 private CloseHandler _closeHandler; 48 private string _name = typeof(this).stringof; 49 50 enum string ChannelSession = "ChannelSession"; 51 52 private { 53 string _host; 54 ushort _port; 55 56 NetServerOptions _options = null; 57 } 58 59 this(ushort port) { 60 this("0.0.0.0", port); 61 } 62 63 this(string host, ushort port) { 64 auto option = new NetServerOptions(); 65 option.setTcpKeepAlive(true); 66 option.setKeepaliveWaitTime(60.seconds); 67 this(host, port, option); 68 } 69 70 this(string host, ushort port, NetServerOptions options) { 71 _host = host; 72 _port = port; 73 _options = options; 74 // _name = randomUUID().toString(); 75 } 76 77 string name() { 78 return _name; 79 } 80 81 ushort port() { 82 return _port; 83 } 84 85 string host() { 86 return _host; 87 } 88 89 void set(MessageTransport transport) { 90 _messageTransport = transport; 91 _sessionManager = transport.sessionManager(); 92 } 93 94 void start() { 95 initialize(); 96 _server.listen(host, port); 97 } 98 99 void stop() { 100 if (_server !is null) 101 _server.close(); 102 } 103 104 void onAccept(msgtrans.TransportContext.AcceptHandler handler) { 105 _acceptHandler = handler; 106 } 107 108 void onClose(CloseHandler handler) 109 { 110 _closeHandler = handler; 111 } 112 113 private void initialize() { 114 // dfmt off 115 _server = NetUtil.createNetServer!(ThreadMode.Single)(_options); 116 117 _server.setCodec(new TcpCodec()); 118 119 _server.setHandler(new class NetConnectionHandler { 120 121 override void connectionOpened(Connection connection) { 122 version(HUNT_DEBUG) infof("Connection created: %s", connection.getRemoteAddress()); 123 TcpTransportSession session = new TcpTransportSession(_sessionManager.generateId(), connection); 124 connection.setAttribute(ChannelSession, session); 125 _sessionManager.add(session); 126 TransportContext context = TransportContext(_sessionManager, session); 127 if(_acceptHandler !is null) { 128 _acceptHandler(context); 129 } 130 } 131 132 override void connectionClosed(Connection connection) { 133 version(HUNT_DEBUG) infof("Connection closed: %s", connection.getRemoteAddress()); 134 TransportSession session = cast(TransportSession)connection.getAttribute(ChannelSession); 135 if(session !is null ) { 136 _sessionManager.remove(session); 137 } 138 if (_closeHandler !is null) 139 { 140 TransportContext context = TransportContext(_sessionManager, session); 141 _closeHandler(context); 142 } 143 } 144 145 override DataHandleStatus messageReceived(Connection connection, Object message) { 146 MessageBuffer buffer = cast(MessageBuffer) message; 147 if(buffer is null) { 148 warningf("expected type: MessageBuffer, message type: %s", typeid(message).name); 149 } else { 150 dispatchMessage(connection, buffer); 151 } 152 153 return DataHandleStatus.Done; 154 } 155 156 override void exceptionCaught(Connection connection, Throwable t) { 157 debug warning(t.msg); 158 } 159 160 override void failedOpeningConnection(int connectionId, Throwable t) { 161 debug warning(t.msg); 162 } 163 164 override void failedAcceptingConnection(int connectionId, Throwable t) { 165 debug warning(t.msg); 166 } 167 }); 168 169 // dfmt on 170 } 171 172 private void dispatchMessage(Connection connection, MessageBuffer message) { 173 version (HUNT_DEBUG) { 174 string str = format("data received: %s", message.toString()); 175 tracef(str); 176 } 177 178 // rx: 00 00 27 11 00 00 00 05 00 00 00 00 00 00 00 00 57 6F 72 6C 64 179 // tx: 00 00 4E 21 00 00 00 0B 00 00 00 00 00 00 00 00 48 65 6C 6C 6F 20 57 6F 72 6C 64 180 181 uint messageId = message.id; 182 if (messageId == MESSAGE.INITIATE || messageId == MESSAGE.FINALIZE) 183 { 184 keyExchangeRequest(message,connection); 185 return; 186 } 187 188 // string str = format("data received: %s", message.toString()); 189 ExecutorInfo executorInfo = _messageTransport.getExecutor(messageId); 190 if (executorInfo == ExecutorInfo.init) { 191 warning("No Executor found for id: ", messageId); 192 } else { 193 TcpTransportSession session = cast(TcpTransportSession) connection.getAttribute( 194 ChannelSession); 195 if (session is null) { 196 session = new TcpTransportSession(_sessionManager.generateId(), connection); 197 connection.setAttribute(ChannelSession, session); 198 _sessionManager.add(session); 199 } 200 TransportContext context = TransportContext(_sessionManager, session); 201 if (MessageTransportServer.isEE2E) 202 { 203 peerkey_s peerkeys = cast(peerkey_s)(context.session().getAttribute("EE2E")); 204 if(peerkeys !is null) 205 { 206 message = common.encrypted_decode(message,peerkeys); 207 if(message is null) 208 { 209 connection.close(); 210 return; 211 } 212 }else 213 { 214 logError("peerkeys is null"); 215 } 216 } 217 executorInfo.execute(context, message); 218 } 219 } 220 221 private void keyExchangeRequest(MessageBuffer message, Connection connection) 222 { 223 TcpTransportSession session = cast(TcpTransportSession) connection.getAttribute( 224 ChannelSession); 225 if (session is null) { 226 session = new TcpTransportSession(_sessionManager.generateId(), connection); 227 connection.setAttribute(ChannelSession, session); 228 _sessionManager.add(session); 229 } 230 TransportContext context = TransportContext(_sessionManager, session); 231 232 switch(message.id) 233 { 234 case MESSAGE.INITIATE : 235 { 236 KeyExchangeRequest keyExchangeRes = new KeyExchangeRequest; 237 message.data.fromProtobuf!KeyExchangeRequest(keyExchangeRes); 238 239 //logInfo("%s",keyExchangeRes.key_info.ec_public_key_65bytes); 240 241 peerkey_s peerkeys = new peerkey_s; 242 peerkeys.ec_pub_key = Base64.decode(keyExchangeRes.key_info.ec_public_key_65bytes); 243 peerkeys.salt = Base64.decode(keyExchangeRes.key_info.salt_32bytes); 244 context.session().setAttribute("EE2E",peerkeys); 245 logInfo("client pub : %s" ,peerkeys.ec_pub_key ); 246 logInfo("client salt : %s" , peerkeys.salt); 247 248 249 KeyExchangeRequest res = new KeyExchangeRequest; 250 KeyInfo info = new KeyInfo; 251 info.ec_public_key_65bytes = Base64.encode(MessageTransportServer.s_server_key.ec_pub_key); 252 info.salt_32bytes = Base64.encode(MessageTransportServer.s_server_key.salt); 253 res.key_info = info; 254 logInfo("server pub : %s" ,res.key_info.ec_public_key_65bytes ); 255 logInfo("server salt : %s" , res.key_info.salt_32bytes); 256 257 context.session().send(new MessageBuffer(MESSAGE.INITIATE, res.toProtobuf.array)); 258 break; 259 } 260 case MESSAGE.FINALIZE : 261 { 262 peerkey_s peerkeys = cast(peerkey_s)(context.session().getAttribute("EE2E")); 263 if (peerkeys !is null && common.keyCalculate(MessageTransportServer.s_server_key, peerkeys)) 264 { 265 context.session().send(new MessageBuffer(cast(uint)MESSAGE.FINALIZE, cast(ubyte[])null)); 266 }else 267 { 268 logError("peerkeys is null"); 269 } 270 break; 271 } 272 default: break; 273 } 274 275 276 } 277 }