1 /* 2 * MsgTrans - Message Transport Framework for DLang. Based on TCP, WebSocket, UDP transmission protocol. 3 * 4 * Copyright (C) 2019 HuntLabs 5 * 6 * Website: https://www.msgtrans.org 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module msgtrans.channel.websocket.WebSocketClientChannel; 13 14 import msgtrans.DefaultSessionManager; 15 import msgtrans.executor; 16 import msgtrans.MessageBuffer; 17 import msgtrans.MessageTransport; 18 import msgtrans.Packet; 19 import msgtrans.TransportContext; 20 import msgtrans.channel.ClientChannel; 21 import msgtrans.channel.TransportSession; 22 import msgtrans.channel.websocket.WebSocketChannel; 23 import msgtrans.channel.websocket.WebSocketTransportSession; 24 25 import hunt.io.ByteBuffer; 26 import hunt.Exceptions; 27 import hunt.http.client; 28 import hunt.logging.ConsoleLogger; 29 import hunt.net; 30 31 import hunt.concurrency.FuturePromise; 32 33 import core.sync.condition; 34 import core.sync.mutex; 35 36 import std.format; 37 import std.range; 38 39 /** 40 * 41 */ 42 class WebSocketClientChannel : WebSocketChannel, ClientChannel { 43 private HttpURI _url; 44 45 private HttpClient _client; 46 private WebSocketConnection _connection; 47 private MessageTransport _messageTransport; 48 49 50 this(string host, ushort port, string path) { 51 if(path.empty() || path[0] != '/') 52 throw new Exception("Wrong path: " ~ path); 53 string url = format("ws://%s:%d%s", host, port, path); 54 this(new HttpURI(url)); 55 } 56 57 this(string url) { 58 this(new HttpURI(url)); 59 } 60 61 this(HttpURI url) { 62 assert(url.getScheme() == "http" || url.getScheme() == "ws", "Only http or ws supported"); 63 _url = url; 64 _client = new HttpClient(); 65 } 66 67 void set(MessageTransport transport) { 68 _messageTransport = transport; 69 } 70 71 void connect() { 72 73 if(_connection !is null) { 74 return; 75 } 76 77 Request request = new RequestBuilder() 78 .url(_url) 79 // .authorization(AuthenticationScheme.Basic, "cHV0YW86MjAxOQ==") 80 .build(); 81 82 83 _connection = _client.newWebSocket(request, new class AbstractWebSocketMessageHandler { 84 override void onOpen(WebSocketConnection connection) { 85 version(HUNT_DEBUG) infof("New connection from: %s", connection.getRemoteAddress()); 86 } 87 88 override void onText(WebSocketConnection connection, string text) { 89 version(HUNT_DEBUG) tracef("received (from %s): %s", connection.getRemoteAddress(), text); 90 } 91 92 override void onBinary(WebSocketConnection connection, ByteBuffer buffer) { 93 byte[] data = buffer.getRemaining(); 94 version(HUNT_DEBUG) { 95 tracef("received (from %s): %s", connection.getRemoteAddress(), buffer.toString()); 96 if(data.length<=64) 97 infof("%(%02X %)", data[0 .. $]); 98 else 99 infof("%(%02X %) ...(%d bytes)", data[0 .. 64], data.length); 100 } 101 102 if(data.length > 0) { 103 decode(connection, buffer); 104 } 105 } 106 }); 107 108 } 109 110 111 void onClose(CloseHandler handler) 112 { 113 114 } 115 116 bool isConnected() { 117 return _connection !is null && _connection.getTcpConnection().isConnected(); 118 } 119 120 void send(MessageBuffer message) { 121 if(!isConnected()) { 122 throw new IOException("Connection broken!"); 123 } 124 125 ubyte[][] buffers = Packet.encode(message); 126 foreach(ubyte[] data; buffers) { 127 _connection.sendData(cast(byte[])data); 128 } 129 } 130 131 void close() { 132 // if(_connection !is null) { 133 // _connection.close(); 134 // } 135 136 if(_client !is null) { 137 _client.close(); 138 } 139 } 140 141 override protected void dispatchMessage(WebSocketConnection connection, MessageBuffer message ) { 142 version(HUNT_DEBUG) { 143 string str = format("data received: %s", message.toString()); 144 tracef(str); 145 } 146 147 // rx: 00 00 27 11 00 00 00 05 00 00 00 00 00 00 00 00 57 6F 72 6C 64 148 // tx: 00 00 4E 21 00 00 00 0B 00 00 00 00 00 00 00 00 48 65 6C 6C 6F 20 57 6F 72 6C 64 149 150 uint messageId = message.id; 151 ExecutorInfo executorInfo = _messageTransport.getExecutor(messageId); 152 if(executorInfo == ExecutorInfo.init) { 153 warning("No Executor found for id: ", messageId); 154 } else { 155 enum string ChannelSession = "ChannelSession"; 156 WebsocketTransportSession session = cast(WebsocketTransportSession)connection.getAttribute(ChannelSession); 157 if(session is null ){ 158 session = new WebsocketTransportSession(nextClientSessionId(), connection); 159 connection.setAttribute(ChannelSession, session); 160 } 161 TransportContext context = TransportContext(null, session); 162 executorInfo.execute(context, message); 163 } 164 } 165 }