1 /* 2 * MsgTrans - Message Transport Framework for DLang. Based on TCP, WebSocket, UDP transmission protocol. 3 * 4 * Copyright (C) 2019 HuntLabs 5 * 6 * Website: https://www.msgtrans.org 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module msgtrans.channel.tcp.TcpServerChannel; 13 14 import msgtrans.MessageTransport; 15 import msgtrans.SessionManager; 16 import msgtrans.TransportContext; 17 import msgtrans.channel.ServerChannel; 18 import msgtrans.channel.TransportSession; 19 import msgtrans.channel.tcp.TcpCodec; 20 import msgtrans.channel.tcp.TcpTransportSession; 21 import msgtrans.MessageTransportServer; 22 import msgtrans.MessageBuffer; 23 import msgtrans.executor; 24 import msgtrans.ee2e.message.MsgDefine; 25 import msgtrans.ee2e.crypto; 26 import hunt.logging.ConsoleLogger; 27 import hunt.net; 28 import hunt.net.codec.Codec; 29 import google.protobuf; 30 import std.array; 31 import msgtrans.ee2e.common; 32 import std.format; 33 import std.uuid; 34 import std.base64; 35 import core.time; 36 37 /** 38 * 39 */ 40 class TcpServerChannel : ServerChannel { 41 private NetServer _server; 42 private MessageTransport _messageTransport; 43 private SessionManager _sessionManager; 44 private AcceptHandler _acceptHandler; 45 private CloseHandler _closeHandler; 46 private string _name = typeof(this).stringof; 47 48 enum string ChannelSession = "ChannelSession"; 49 50 private { 51 string _host; 52 ushort _port; 53 54 NetServerOptions _options = null; 55 } 56 57 this(ushort port) { 58 this("0.0.0.0", port); 59 } 60 61 this(string host, ushort port) { 62 auto option = new NetServerOptions(); 63 option.setTcpKeepAlive(true); 64 option.setKeepaliveWaitTime(60.seconds); 65 this(host, port, option); 66 } 67 68 this(string host, ushort port, NetServerOptions options) { 69 _host = host; 70 _port = port; 71 _options = options; 72 // _name = randomUUID().toString(); 73 } 74 75 string name() { 76 return _name; 77 } 78 79 ushort port() { 80 return _port; 81 } 82 83 string host() { 84 return _host; 85 } 86 87 void set(MessageTransport transport) { 88 _messageTransport = transport; 89 _sessionManager = transport.sessionManager(); 90 } 91 92 void start() { 93 initialize(); 94 _server.listen(host, port); 95 } 96 97 void stop() { 98 if (_server !is null) 99 _server.close(); 100 } 101 102 void onAccept(AcceptHandler handler) { 103 _acceptHandler = handler; 104 } 105 106 void onClose(CloseHandler handler) 107 { 108 _closeHandler = handler; 109 } 110 111 private void initialize() { 112 // dfmt off 113 _server = NetUtil.createNetServer!(ThreadMode.Single)(_options); 114 115 _server.setCodec(new TcpCodec()); 116 117 _server.setHandler(new class NetConnectionHandler { 118 119 override void connectionOpened(Connection connection) { 120 version(HUNT_DEBUG) infof("Connection created: %s", connection.getRemoteAddress()); 121 TcpTransportSession session = new TcpTransportSession(_sessionManager.generateId(), connection); 122 connection.setAttribute(ChannelSession, session); 123 _sessionManager.add(session); 124 TransportContext context = TransportContext(_sessionManager, session); 125 if(_acceptHandler !is null) { 126 _acceptHandler(context); 127 } 128 } 129 130 override void connectionClosed(Connection connection) { 131 version(HUNT_DEBUG) infof("Connection closed: %s", connection.getRemoteAddress()); 132 TransportSession session = cast(TransportSession)connection.getAttribute(ChannelSession); 133 if(session !is null ) { 134 _sessionManager.remove(session); 135 } 136 if (_closeHandler !is null) 137 { 138 TransportContext context = TransportContext(_sessionManager, session); 139 _closeHandler(context); 140 } 141 } 142 143 override void messageReceived(Connection connection, Object message) { 144 MessageBuffer buffer = cast(MessageBuffer) message; 145 if(buffer is null) { 146 warningf("expected type: MessageBuffer, message type: %s", typeid(message).name); 147 } else { 148 dispatchMessage(connection, buffer); 149 } 150 } 151 152 override void exceptionCaught(Connection connection, Throwable t) { 153 debug warning(t.msg); 154 } 155 156 override void failedOpeningConnection(int connectionId, Throwable t) { 157 debug warning(t.msg); 158 } 159 160 override void failedAcceptingConnection(int connectionId, Throwable t) { 161 debug warning(t.msg); 162 } 163 }); 164 165 // dfmt on 166 } 167 168 private void dispatchMessage(Connection connection, MessageBuffer message) { 169 version (HUNT_DEBUG) { 170 string str = format("data received: %s", message.toString()); 171 tracef(str); 172 } 173 174 // rx: 00 00 27 11 00 00 00 05 00 00 00 00 00 00 00 00 57 6F 72 6C 64 175 // tx: 00 00 4E 21 00 00 00 0B 00 00 00 00 00 00 00 00 48 65 6C 6C 6F 20 57 6F 72 6C 64 176 177 uint messageId = message.id; 178 if (messageId == MESSAGE.INITIATE || messageId == MESSAGE.FINALIZE) 179 { 180 keyExchangeRequest(message,connection); 181 return; 182 } 183 184 // string str = format("data received: %s", message.toString()); 185 ExecutorInfo executorInfo = _messageTransport.getExecutor(messageId); 186 if (executorInfo == ExecutorInfo.init) { 187 warning("No Executor found for id: ", messageId); 188 } else { 189 TcpTransportSession session = cast(TcpTransportSession) connection.getAttribute( 190 ChannelSession); 191 if (session is null) { 192 session = new TcpTransportSession(_sessionManager.generateId(), connection); 193 connection.setAttribute(ChannelSession, session); 194 _sessionManager.add(session); 195 } 196 TransportContext context = TransportContext(_sessionManager, session); 197 if (MessageTransportServer.isEE2E) 198 { 199 peerkey_s peerkeys = cast(peerkey_s)(context.session().getAttribute("EE2E")); 200 if(peerkeys !is null) 201 { 202 message = common.encrypted_decode(message,peerkeys); 203 if(message is null) 204 { 205 connection.close(); 206 return; 207 } 208 }else 209 { 210 logError("peerkeys is null"); 211 } 212 } 213 executorInfo.execute(context, message); 214 } 215 } 216 217 private void keyExchangeRequest(MessageBuffer message, Connection connection) 218 { 219 TcpTransportSession session = cast(TcpTransportSession) connection.getAttribute( 220 ChannelSession); 221 if (session is null) { 222 session = new TcpTransportSession(_sessionManager.generateId(), connection); 223 connection.setAttribute(ChannelSession, session); 224 _sessionManager.add(session); 225 } 226 TransportContext context = TransportContext(_sessionManager, session); 227 228 switch(message.id) 229 { 230 case MESSAGE.INITIATE : 231 { 232 KeyExchangeRequest keyExchangeRes = new KeyExchangeRequest; 233 message.data.fromProtobuf!KeyExchangeRequest(keyExchangeRes); 234 235 //logInfo("%s",keyExchangeRes.key_info.ec_public_key_65bytes); 236 237 peerkey_s peerkeys = new peerkey_s; 238 peerkeys.ec_pub_key = Base64.decode(keyExchangeRes.key_info.ec_public_key_65bytes); 239 peerkeys.salt = Base64.decode(keyExchangeRes.key_info.salt_32bytes); 240 context.session().setAttribute("EE2E",peerkeys); 241 logInfo("client pub : %s" ,peerkeys.ec_pub_key ); 242 logInfo("client salt : %s" , peerkeys.salt); 243 244 245 KeyExchangeRequest res = new KeyExchangeRequest; 246 KeyInfo info = new KeyInfo; 247 info.ec_public_key_65bytes = Base64.encode(MessageTransportServer.s_server_key.ec_pub_key); 248 info.salt_32bytes = Base64.encode(MessageTransportServer.s_server_key.salt); 249 res.key_info = info; 250 logInfo("server pub : %s" ,res.key_info.ec_public_key_65bytes ); 251 logInfo("server salt : %s" , res.key_info.salt_32bytes); 252 253 context.session().send(new MessageBuffer(MESSAGE.INITIATE, res.toProtobuf.array)); 254 break; 255 } 256 case MESSAGE.FINALIZE : 257 { 258 peerkey_s peerkeys = cast(peerkey_s)(context.session().getAttribute("EE2E")); 259 if (peerkeys !is null && common.keyCalculate(MessageTransportServer.s_server_key, peerkeys)) 260 { 261 context.session().send(new MessageBuffer(MESSAGE.FINALIZE, [])); 262 }else 263 { 264 logError("peerkeys is null"); 265 } 266 break; 267 } 268 default: break; 269 } 270 271 272 } 273 }