1 /*
2  * MsgTrans - Message Transport Framework for DLang. Based on TCP, WebSocket, UDP transmission protocol.
3  *
4  * Copyright (C) 2019 HuntLabs
5  *
6  * Website: https://www.msgtrans.org
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module msgtrans.channel.websocket.WebSocketClientChannel;
13 
14 import msgtrans.DefaultSessionManager;
15 import msgtrans.executor;
16 import msgtrans.MessageBuffer;
17 import msgtrans.MessageTransport;
18 import msgtrans.Packet;
19 import msgtrans.TransportContext;
20 import msgtrans.channel.ClientChannel;
21 import msgtrans.channel.TransportSession;
22 import msgtrans.channel.websocket.WebSocketChannel;
23 import msgtrans.channel.websocket.WebSocketTransportSession;
24 
25 import hunt.io.ByteBuffer;
26 import hunt.Exceptions;
27 import hunt.http.client;
28 import hunt.logging.ConsoleLogger;
29 import hunt.net;
30 
31 import hunt.concurrency.FuturePromise;
32 
33 import core.sync.condition;
34 import core.sync.mutex;
35 
36 import std.format;
37 import std.range;
38 
39 /**
40  *
41  */
42 class WebSocketClientChannel : WebSocketChannel, ClientChannel {
43     private HttpURI _url;
44 
45     private HttpClient _client;
46     private WebSocketConnection _connection;
47     private MessageTransport _messageTransport;
48 
49 
50     this(string host, ushort port, string path) {
51         if(path.empty() || path[0] != '/')
52             throw new Exception("Wrong path: " ~ path);
53         string url = format("ws://%s:%d%s", host, port, path);
54         this(new HttpURI(url));
55     }
56 
57     this(string url) {
58         this(new HttpURI(url));
59     }
60 
61     this(HttpURI url) {
62         assert(url.getScheme() == "http" || url.getScheme() == "ws", "Only http or ws supported");
63         _url = url;
64         _client = new HttpClient();
65     }
66 
67     void set(MessageTransport transport) {
68         _messageTransport = transport;
69     }
70 
71     void connect() {
72 
73         if(_connection !is null) {
74             return;
75         }
76 
77         Request request = new RequestBuilder()
78             .url(_url)
79             // .authorization(AuthenticationScheme.Basic, "cHV0YW86MjAxOQ==")
80             .build();
81 
82 
83         _connection = _client.newWebSocket(request, new class AbstractWebSocketMessageHandler {
84             override void onOpen(WebSocketConnection connection) {
85                 version(HUNT_DEBUG) infof("New connection from: %s", connection.getRemoteAddress());
86             }
87 
88             override void onText(WebSocketConnection connection, string text) {
89                 version(HUNT_DEBUG) tracef("received (from %s): %s", connection.getRemoteAddress(), text);
90             }
91 
92             override void onBinary(WebSocketConnection connection, ByteBuffer buffer)  {
93                 byte[] data = buffer.getRemaining();
94                 version(HUNT_DEBUG) {
95                     tracef("received (from %s): %s", connection.getRemoteAddress(), buffer.toString());
96                     if(data.length<=64)
97                         infof("%(%02X %)", data[0 .. $]);
98                     else
99                         infof("%(%02X %) ...(%d bytes)", data[0 .. 64], data.length);
100                 }
101 
102                 if(data.length > 0) {
103                     decode(connection, buffer);
104                 }
105             }
106         });
107 
108     }
109 
110 
111    void onClose(CloseHandler handler)
112    {
113 
114    }
115 
116     bool isConnected() {
117         return _connection !is null && _connection.getTcpConnection().isConnected();
118     }
119 
120     void send(MessageBuffer message) {
121         if(!isConnected()) {
122             throw new IOException("Connection broken!");
123         }
124 
125         ubyte[][] buffers = Packet.encode(message);
126         foreach(ubyte[] data; buffers) {
127             _connection.sendData(cast(byte[])data);
128         }
129     }
130 
131     void close() {
132         // if(_connection !is null) {
133         //     _connection.close();
134         // }
135 
136         if(_client !is null) {
137             _client.close();
138         }
139     }
140 
141     override protected void dispatchMessage(WebSocketConnection connection, MessageBuffer message ) {
142         version(HUNT_DEBUG) {
143             string str = format("data received: %s", message.toString());
144             tracef(str);
145         }
146 
147         // rx: 00 00 27 11 00 00 00 05 00 00 00 00 00 00 00 00 57 6F 72 6C 64
148         // tx: 00 00 4E 21 00 00 00 0B 00 00 00 00 00 00 00 00 48 65 6C 6C 6F 20 57 6F 72 6C 64
149 
150         uint messageId = message.id;
151         ExecutorInfo executorInfo = _messageTransport.getExecutor(messageId);
152         if(executorInfo == ExecutorInfo.init) {
153             warning("No Executor found for id: ", messageId);
154         } else {
155             enum string ChannelSession = "ChannelSession";
156             WebsocketTransportSession session = cast(WebsocketTransportSession)connection.getAttribute(ChannelSession);
157             if(session is null ){
158                 session = new WebsocketTransportSession(nextClientSessionId(), connection);
159                 connection.setAttribute(ChannelSession, session);
160             }
161             TransportContext context = TransportContext(null, session);
162             executorInfo.execute(context, message);
163         }
164     }
165 }