1 /*
2  * MsgTrans - Message Transport Framework for DLang. Based on TCP, WebSocket, UDP transmission protocol.
3  *
4  * Copyright (C) 2019 HuntLabs
5  *
6  * Website: https://www.msgtrans.org
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module msgtrans.channel.websocket.WebSocketServerChannel;
13 
14 import msgtrans.executor;
15 import msgtrans.PacketParser;
16 import msgtrans.MessageBuffer;
17 import msgtrans.MessageTransport;
18 import msgtrans.SessionManager;
19 import msgtrans.TransportContext;
20 import msgtrans.channel.ServerChannel;
21 import msgtrans.channel.TransportSession;
22 import msgtrans.channel.websocket.WebSocketTransportSession;
23 import msgtrans.channel.websocket.WebSocketChannel;
24 
25 import hunt.io.ByteBuffer;
26 import hunt.http.server;
27 import hunt.logging.ConsoleLogger;
28 import hunt.net;
29 import hunt.util.DateTime;
30 
31 import std.format;
32 import std.stdio;
33 
34 
35 /**
36  *
37  */
38 class WebSocketServerChannel : WebSocketChannel, ServerChannel {
39     private HttpServer _server;
40     private SessionManager _sessionManager;
41     private AcceptHandler _acceptHandler;
42     private CloseHandler _closeHandler;
43     private MessageTransport _messageTransport;
44 
45     private string _name = typeof(this).stringof;
46     private string _host = "0.0.0.0";
47     private ushort _port = 8080;
48     private string _path = "/*";
49     enum string ChannelSession = "ChannelSession";
50 
51 
52     this(ushort port , string path) {
53         _port = port;
54         _path = path;
55     }
56 
57     string name() {
58         return _name;
59     }
60 
61     void set(MessageTransport transport) {
62         _messageTransport = transport;
63         _sessionManager = transport.sessionManager();
64     }
65 
66     // void setSessionManager(SessionManager manager) {
67     //     _sessionManager = manager;
68     // }
69 
70     void onAccept(AcceptHandler handler) {
71         _acceptHandler = handler;
72     }
73 
74     void onClose(CloseHandler handler)
75     {
76         _closeHandler = handler;
77     }
78 
79     void start() {
80         initialize();
81 
82         _server.onOpened(() {
83                 if(_server.isTLS())
84                     writefln("listening on https://%s:%d", _server.getHost, _server.getPort);
85                 else
86                     writefln("listening on http://%s:%d", _server.getHost, _server.getPort);
87             })
88             .onOpenFailed((e) {
89                 writefln("Failed to open a HttpServer, the reason: %s", e.msg);
90             })
91             .start();
92     }
93 
94     void stop() {
95         if(_server !is null)
96             _server.stop();
97     }
98 
99 
100     private void initialize() {
101         _server = HttpServer.builder()
102             // .setTLS("cert/server.crt", "cert/server.key", "hunt2018", "hunt2018") //websocket
103             .setListener(_port, _host)
104             .websocket(_path, new class AbstractWebSocketMessageHandler {
105             //.registerWebSocket(_path, new class AbstractWebSocketMessageHandler {
106 
107                 override void onOpen(WebSocketConnection connection) {
108                     version(HUNT_DEBUG) infof("New connection from: %s", connection.getRemoteAddress());
109                     WebsocketTransportSession session =
110                         new WebsocketTransportSession(_sessionManager.generateId(), connection);
111                     connection.setAttribute(ChannelSession, session);
112                     _sessionManager.add(session);
113                     TransportContext context = TransportContext(_sessionManager, session);
114                     if(_acceptHandler !is null) {
115                         _acceptHandler(context);
116                     }
117                 }
118 
119                 override void onClosed(WebSocketConnection connection)  {
120                     version(HUNT_DEBUG) infof("closed with %s", connection.getRemoteAddress());
121                     TransportSession session = cast(TransportSession)connection.getAttribute(ChannelSession);
122                     if(session !is null ) {
123                         _sessionManager.remove(session);
124                     }
125                     if (_closeHandler !is null)
126                     {
127                       TransportContext context = TransportContext(_sessionManager, session);
128                         _closeHandler(context);
129                     }
130                 }
131 
132                 override void onText(WebSocketConnection connection, string text) {
133                     version(HUNT_DEBUG) tracef("received (from %s): %s", connection.getRemoteAddress(), text);
134                 }
135 
136                 override void onBinary(WebSocketConnection connection, ByteBuffer buffer)  {
137                     byte[] data = buffer.getRemaining();
138                     version(HUNT_DEBUG) {
139                         tracef("received (from %s): %s", connection.getRemoteAddress(), buffer.toString());
140                         if(data.length<=64)
141                             infof("%(%02X %)", data[0 .. $]);
142                         else
143                             infof("%(%02X %) ...(%d bytes)", data[0 .. 64], data.length);
144                     }
145 
146                     if(data.length > 0) {
147                         decode(connection, buffer);
148                     }
149                 }
150             })
151             .build();
152     }
153 
154     override protected void dispatchMessage(WebSocketConnection connection, MessageBuffer message ) {
155         version(HUNT_DEBUG) {
156             string str = format("data received: %s", message.toString());
157             tracef(str);
158         }
159 
160         // rx: 00 00 27 11 00 00 00 05 00 00 00 00 00 00 00 00 57 6F 72 6C 64
161         // tx: 00 00 4E 21 00 00 00 0B 00 00 00 00 00 00 00 00 48 65 6C 6C 6F 20 57 6F 72 6C 64
162 
163         uint messageId = message.id;
164         ExecutorInfo executorInfo = _messageTransport.getExecutor(messageId);
165         if(executorInfo == ExecutorInfo.init) {
166             warning("No Executor found for id: ", messageId);
167         } else {
168             WebsocketTransportSession session = cast(WebsocketTransportSession)connection.getAttribute(ChannelSession);
169             if(session is null ){
170                 session = new WebsocketTransportSession(_sessionManager.generateId(), connection);
171                 connection.setAttribute(ChannelSession, session);
172                 _sessionManager.add(session);
173             }
174 
175             TransportContext context = TransportContext(_sessionManager, session);
176             executorInfo.execute(context, message);
177         }
178     }
179 }