1 /* 2 * MsgTrans - Message Transport Framework for DLang. Based on TCP, WebSocket, UDP transmission protocol. 3 * 4 * Copyright (C) 2019 HuntLabs 5 * 6 * Website: https://www.msgtrans.org 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module msgtrans.channel.websocket.WebSocketClientChannel; 13 14 import msgtrans.DefaultSessionManager; 15 import msgtrans.executor; 16 import msgtrans.MessageBuffer; 17 import msgtrans.MessageHandler; 18 import msgtrans.MessageTransport; 19 import msgtrans.Packet; 20 import msgtrans.TransportContext; 21 import msgtrans.channel.ClientChannel; 22 import msgtrans.channel.TransportSession; 23 import msgtrans.channel.websocket.WebSocketChannel; 24 import msgtrans.channel.websocket.WebSocketTransportSession; 25 26 import hunt.io.ByteBuffer; 27 import hunt.Exceptions; 28 import hunt.http.client; 29 import hunt.logging.ConsoleLogger; 30 import hunt.net; 31 32 import hunt.concurrency.FuturePromise; 33 34 import core.sync.condition; 35 import core.sync.mutex; 36 37 import std.format; 38 import std.range; 39 40 /** 41 * 42 */ 43 class WebSocketClientChannel : WebSocketChannel, ClientChannel { 44 private HttpURI _url; 45 46 private HttpClient _client; 47 private WebSocketConnection _connection; 48 private MessageTransport _messageTransport; 49 50 51 this(string host, ushort port, string path) { 52 if(path.empty() || path[0] != '/') 53 throw new Exception("Wrong path: " ~ path); 54 string url = format("ws://%s:%d%s", host, port, path); 55 this(new HttpURI(url)); 56 } 57 58 this(string url) { 59 this(new HttpURI(url)); 60 } 61 62 this(HttpURI url) { 63 assert(url.getScheme() == "http" || url.getScheme() == "ws", "Only http or ws supported"); 64 _url = url; 65 _client = new HttpClient(); 66 } 67 68 void set(MessageTransport transport) { 69 _messageTransport = transport; 70 } 71 72 void connect() { 73 74 if(_connection !is null) { 75 return; 76 } 77 78 Request request = new RequestBuilder() 79 .url(_url) 80 // .authorization(AuthenticationScheme.Basic, "cHV0YW86MjAxOQ==") 81 .build(); 82 83 84 _connection = _client.newWebSocket(request, new class AbstractWebSocketMessageHandler { 85 override void onOpen(WebSocketConnection connection) { 86 version(HUNT_DEBUG) infof("New connection from: %s", connection.getRemoteAddress()); 87 } 88 89 override void onText(WebSocketConnection connection, string text) { 90 version(HUNT_DEBUG) tracef("received (from %s): %s", connection.getRemoteAddress(), text); 91 } 92 93 override void onBinary(WebSocketConnection connection, ByteBuffer buffer) { 94 byte[] data = buffer.peekRemaining(); 95 version(HUNT_DEBUG) { 96 tracef("received (from %s): %s", connection.getRemoteAddress(), buffer.toString()); 97 if(data.length<=64) 98 infof("%(%02X %)", data[0 .. $]); 99 else 100 infof("%(%02X %) ...(%d bytes)", data[0 .. 64], data.length); 101 } 102 103 if(data.length > 0) { 104 decode(connection, buffer); 105 } 106 } 107 }); 108 109 } 110 111 112 void onClose(CloseHandler handler) 113 { 114 115 } 116 117 bool isConnected() { 118 return _connection !is null && _connection.getTcpConnection().isConnected(); 119 } 120 121 void send(MessageBuffer message) { 122 if(!isConnected()) { 123 throw new IOException("Connection broken!"); 124 } 125 126 ubyte[][] buffers = Packet.encode(message); 127 foreach(ubyte[] data; buffers) { 128 _connection.sendData(cast(byte[])data); 129 } 130 } 131 132 void close() { 133 // if(_connection !is null) { 134 // _connection.close(); 135 // } 136 137 if(_client !is null) { 138 _client.close(); 139 } 140 } 141 142 override protected void dispatchMessage(WebSocketConnection connection, MessageBuffer message ) { 143 version(HUNT_DEBUG) { 144 string str = format("data received: %s", message.toString()); 145 tracef(str); 146 } 147 148 // rx: 00 00 27 11 00 00 00 05 00 00 00 00 00 00 00 00 57 6F 72 6C 64 149 // tx: 00 00 4E 21 00 00 00 0B 00 00 00 00 00 00 00 00 48 65 6C 6C 6F 20 57 6F 72 6C 64 150 151 uint messageId = message.id; 152 ExecutorInfo executorInfo = _messageTransport.getExecutor(messageId); 153 if(executorInfo == ExecutorInfo.init) { 154 warning("No Executor found for id: ", messageId); 155 } else { 156 enum string ChannelSession = "ChannelSession"; 157 WebsocketTransportSession session = cast(WebsocketTransportSession)connection.getAttribute(ChannelSession); 158 if(session is null ){ 159 session = new WebsocketTransportSession(nextClientSessionId(), connection); 160 connection.setAttribute(ChannelSession, session); 161 } 162 TransportContext context = TransportContext(null, session); 163 executorInfo.execute(context, message); 164 } 165 } 166 }