1 /* 2 * MsgTrans - Message Transport Framework for DLang. Based on TCP, WebSocket, UDP transmission protocol. 3 * 4 * Copyright (C) 2019 HuntLabs 5 * 6 * Website: https://www.msgtrans.org 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module msgtrans.MessageTransportClient; 13 14 import msgtrans.channel.ClientChannel; 15 import msgtrans.executor; 16 import msgtrans.MessageTransport; 17 import msgtrans.MessageBuffer; 18 import msgtrans.MessageHandler; 19 import msgtrans.ee2e.crypto; 20 import msgtrans.TransportContext; 21 22 import hunt.logging.ConsoleLogger; 23 import hunt.concurrency.FuturePromise; 24 25 import core.time; 26 27 /** 28 * 29 */ 30 class MessageTransportClient : MessageTransport { 31 private bool _isConnected = false; 32 private ClientChannel _channel; 33 __gshared bool isEE2E; 34 __gshared ownkey_s client_key; 35 __gshared peerkey_s server_key; 36 private CloseHandler _closeHandler; 37 shared static this() 38 { 39 client_key = new ownkey_s; 40 server_key = new peerkey_s; 41 isEE2E = false; 42 } 43 44 // private Duration _tickPeriod = 10.seconds; 45 // private Duration _ackTimeout = 30.seconds; 46 // private uint _missedAcks = 3; 47 48 this(string name ,bool ee2e = false) 49 { 50 if (!name.length) 51 { 52 // Exeption? 53 } 54 55 if(ee2e) 56 { 57 if (!generate_ecdh_keys(client_key.ec_pub_key, client_key.ec_priv_key)) 58 { 59 logError("ECDH-KEY generation failed."); 60 } 61 logInfo("%s",client_key.ec_pub_key); 62 /* Generate a random number that called salt */ 63 if (!rand_salt(client_key.salt, CRYPTO_SALT_LEN)) 64 { 65 logError("Random salt generation failed."); 66 } 67 isEE2E = true; 68 } 69 70 super(CLIENT_NAME_PREFIX ~ name); 71 } 72 73 void closer(CloseHandler handler) { 74 _closeHandler = handler; 75 } 76 77 MessageTransportClient channel(ClientChannel channel) 78 { 79 assert(channel !is null); 80 _channel = channel; 81 return this; 82 } 83 84 bool isConnected() 85 { 86 return _isConnected; 87 } 88 89 bool connect() 90 { 91 assert(_channel !is null); 92 93 try { 94 _channel.set(this); 95 _channel.connect(); 96 _channel.onClose(_closeHandler); 97 _isConnected = true; 98 } catch(Exception e) { 99 warning(e); 100 return false; 101 } 102 103 return true; 104 } 105 106 void send(MessageBuffer buffer) 107 { 108 _channel.send(buffer); 109 } 110 111 void send(uint id, ubyte[] msg) { 112 _channel.send(new MessageBuffer(id, msg)); 113 } 114 115 void send(uint id, string msg) { 116 this.send(id, cast(ubyte[]) msg); 117 } 118 119 120 MessageBuffer Call(MessageBuffer buffer, Duration timeout = 5.seconds) { 121 FuturePromise!MessageBuffer promise = new FuturePromise!MessageBuffer(); 122 123 this.registerHandler(buffer.id, (ctx, buf) { 124 promise.succeeded(buf); 125 }); 126 127 scope(exit) { 128 this.deregisterHandler(buffer.id); 129 } 130 131 _channel.send(buffer); 132 133 MessageBuffer responseBuffer = promise.get(timeout); 134 return responseBuffer; 135 } 136 137 void AsyncCall(MessageBuffer buffer, MessageHandler handler) { 138 this.registerHandler(buffer.id, handler); 139 _channel.send(buffer); 140 } 141 142 void close() { 143 _channel.close(); 144 } 145 146 bool isClosed() 147 { 148 bool c = !_channel.isConnected; 149 if(c) 150 { 151 _isConnected = false; 152 } 153 return c; 154 } 155 }