1 /*
2  * MsgTrans - Message Transport Framework for DLang. Based on TCP, WebSocket, UDP transmission protocol.
3  *
4  * Copyright (C) 2019 HuntLabs
5  *
6  * Website: https://www.msgtrans.org
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module msgtrans.channel.websocket.WebSocketClientChannel;
13 
14 import msgtrans.DefaultSessionManager;
15 import msgtrans.executor;
16 import msgtrans.MessageBuffer;
17 import msgtrans.MessageHandler;
18 import msgtrans.MessageTransport;
19 import msgtrans.Packet;
20 import msgtrans.TransportContext;
21 import msgtrans.channel.ClientChannel;
22 import msgtrans.channel.TransportSession;
23 import msgtrans.channel.websocket.WebSocketChannel;
24 import msgtrans.channel.websocket.WebSocketTransportSession;
25 
26 import hunt.io.ByteBuffer;
27 import hunt.Exceptions;
28 import hunt.http.client;
29 import hunt.logging.ConsoleLogger;
30 import hunt.net;
31 
32 import hunt.concurrency.FuturePromise;
33 
34 import core.sync.condition;
35 import core.sync.mutex;
36 
37 import std.format;
38 import std.range;
39 
40 /**
41  *
42  */
43 class WebSocketClientChannel : WebSocketChannel, ClientChannel {
44     private HttpURI _url;
45 
46     private HttpClient _client;
47     private WebSocketConnection _connection;
48     private MessageTransport _messageTransport;
49 
50 
51     this(string host, ushort port, string path) {
52         if(path.empty() || path[0] != '/')
53             throw new Exception("Wrong path: " ~ path);
54         string url = format("ws://%s:%d%s", host, port, path);
55         this(new HttpURI(url));
56     }
57 
58     this(string url) {
59         this(new HttpURI(url));
60     }
61 
62     this(HttpURI url) {
63         assert(url.getScheme() == "http" || url.getScheme() == "ws", "Only http or ws supported");
64         _url = url;
65         _client = new HttpClient();
66     }
67 
68     void set(MessageTransport transport) {
69         _messageTransport = transport;
70     }
71 
72     void connect() {
73 
74         if(_connection !is null) {
75             return;
76         }
77 
78         Request request = new RequestBuilder()
79             .url(_url)
80             // .authorization(AuthenticationScheme.Basic, "cHV0YW86MjAxOQ==")
81             .build();
82 
83 
84         _connection = _client.newWebSocket(request, new class AbstractWebSocketMessageHandler {
85             override void onOpen(WebSocketConnection connection) {
86                 version(HUNT_DEBUG) infof("New connection from: %s", connection.getRemoteAddress());
87             }
88 
89             override void onText(WebSocketConnection connection, string text) {
90                 version(HUNT_DEBUG) tracef("received (from %s): %s", connection.getRemoteAddress(), text);
91             }
92 
93             override void onBinary(WebSocketConnection connection, ByteBuffer buffer)  {
94                 byte[] data = buffer.peekRemaining();
95                 version(HUNT_DEBUG) {
96                     tracef("received (from %s): %s", connection.getRemoteAddress(), buffer.toString());
97                     if(data.length<=64)
98                         infof("%(%02X %)", data[0 .. $]);
99                     else
100                         infof("%(%02X %) ...(%d bytes)", data[0 .. 64], data.length);
101                 }
102 
103                 if(data.length > 0) {
104                     decode(connection, buffer);
105                 }
106             }
107         });
108 
109     }
110 
111 
112    void onClose(CloseHandler handler)
113    {
114 
115    }
116 
117     bool isConnected() {
118         return _connection !is null && _connection.getTcpConnection().isConnected();
119     }
120 
121     void send(MessageBuffer message) {
122         if(!isConnected()) {
123             throw new IOException("Connection broken!");
124         }
125 
126         ubyte[][] buffers = Packet.encode(message);
127         foreach(ubyte[] data; buffers) {
128             _connection.sendData(cast(byte[])data);
129         }
130     }
131 
132     void close() {
133         // if(_connection !is null) {
134         //     _connection.close();
135         // }
136 
137         if(_client !is null) {
138             _client.close();
139         }
140     }
141 
142     override protected void dispatchMessage(WebSocketConnection connection, MessageBuffer message ) {
143         version(HUNT_DEBUG) {
144             string str = format("data received: %s", message.toString());
145             tracef(str);
146         }
147 
148         // rx: 00 00 27 11 00 00 00 05 00 00 00 00 00 00 00 00 57 6F 72 6C 64
149         // tx: 00 00 4E 21 00 00 00 0B 00 00 00 00 00 00 00 00 48 65 6C 6C 6F 20 57 6F 72 6C 64
150 
151         uint messageId = message.id;
152         ExecutorInfo executorInfo = _messageTransport.getExecutor(messageId);
153         if(executorInfo == ExecutorInfo.init) {
154             warning("No Executor found for id: ", messageId);
155         } else {
156             enum string ChannelSession = "ChannelSession";
157             WebsocketTransportSession session = cast(WebsocketTransportSession)connection.getAttribute(ChannelSession);
158             if(session is null ){
159                 session = new WebsocketTransportSession(nextClientSessionId(), connection);
160                 connection.setAttribute(ChannelSession, session);
161             }
162             TransportContext context = TransportContext(null, session);
163             executorInfo.execute(context, message);
164         }
165     }
166 }