1 /* 2 * MsgTrans - Message Transport Framework for DLang. Based on TCP, WebSocket, UDP transmission protocol. 3 * 4 * Copyright (C) 2019 HuntLabs 5 * 6 * Website: https://www.msgtrans.org 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 12 module msgtrans.channel.websocket.WebSocketServerChannel; 13 14 import msgtrans.executor; 15 import msgtrans.PacketParser; 16 import msgtrans.MessageBuffer; 17 import msgtrans.MessageTransport; 18 import msgtrans.SessionManager; 19 import msgtrans.TransportContext; 20 import msgtrans.channel.ServerChannel; 21 import msgtrans.channel.TransportSession; 22 import msgtrans.channel.websocket.WebSocketTransportSession; 23 import msgtrans.channel.websocket.WebSocketChannel; 24 25 import hunt.io.ByteBuffer; 26 import hunt.http.server; 27 import hunt.logging.ConsoleLogger; 28 import hunt.net; 29 import hunt.util.DateTime; 30 31 import std.format; 32 import std.stdio; 33 34 35 /** 36 * 37 */ 38 class WebSocketServerChannel : WebSocketChannel, ServerChannel { 39 private HttpServer _server; 40 private SessionManager _sessionManager; 41 private AcceptHandler _acceptHandler; 42 private CloseHandler _closeHandler; 43 private MessageTransport _messageTransport; 44 45 private string _name = typeof(this).stringof; 46 private string _host = "0.0.0.0"; 47 private ushort _port = 8080; 48 private string _path = "/*"; 49 enum string ChannelSession = "ChannelSession"; 50 51 52 this(ushort port , string path) { 53 _port = port; 54 _path = path; 55 } 56 57 string name() { 58 return _name; 59 } 60 61 void set(MessageTransport transport) { 62 _messageTransport = transport; 63 _sessionManager = transport.sessionManager(); 64 } 65 66 // void setSessionManager(SessionManager manager) { 67 // _sessionManager = manager; 68 // } 69 70 void onAccept(AcceptHandler handler) { 71 _acceptHandler = handler; 72 } 73 74 void onClose(CloseHandler handler) 75 { 76 _closeHandler = handler; 77 } 78 79 void start() { 80 initialize(); 81 82 _server.onOpened(() { 83 if(_server.isTLS()) 84 writefln("listening on https://%s:%d", _server.getHost, _server.getPort); 85 else 86 writefln("listening on http://%s:%d", _server.getHost, _server.getPort); 87 }) 88 .onOpenFailed((e) { 89 writefln("Failed to open a HttpServer, the reason: %s", e.msg); 90 }) 91 .start(); 92 } 93 94 void stop() { 95 if(_server !is null) 96 _server.stop(); 97 } 98 99 100 private void initialize() { 101 _server = HttpServer.builder() 102 // .setTLS("cert/server.crt", "cert/server.key", "hunt2018", "hunt2018") //websocket 103 .setListener(_port, _host) 104 .websocket(_path, new class AbstractWebSocketMessageHandler { 105 //.registerWebSocket(_path, new class AbstractWebSocketMessageHandler { 106 107 override void onOpen(WebSocketConnection connection) { 108 version(HUNT_DEBUG) infof("New connection from: %s", connection.getRemoteAddress()); 109 WebsocketTransportSession session = 110 new WebsocketTransportSession(_sessionManager.generateId(), connection); 111 connection.setAttribute(ChannelSession, session); 112 _sessionManager.add(session); 113 TransportContext context = TransportContext(_sessionManager, session); 114 if(_acceptHandler !is null) { 115 _acceptHandler(context); 116 } 117 } 118 119 override void onClosed(WebSocketConnection connection) { 120 version(HUNT_DEBUG) infof("closed with %s", connection.getRemoteAddress()); 121 TransportSession session = cast(TransportSession)connection.getAttribute(ChannelSession); 122 if(session !is null ) { 123 _sessionManager.remove(session); 124 } 125 if (_closeHandler !is null) 126 { 127 TransportContext context = TransportContext(_sessionManager, session); 128 _closeHandler(context); 129 } 130 } 131 132 override void onText(WebSocketConnection connection, string text) { 133 version(HUNT_DEBUG) tracef("received (from %s): %s", connection.getRemoteAddress(), text); 134 } 135 136 override void onBinary(WebSocketConnection connection, ByteBuffer buffer) { 137 byte[] data = buffer.getRemaining(); 138 version(HUNT_DEBUG) { 139 tracef("received (from %s): %s", connection.getRemoteAddress(), buffer.toString()); 140 if(data.length<=64) 141 infof("%(%02X %)", data[0 .. $]); 142 else 143 infof("%(%02X %) ...(%d bytes)", data[0 .. 64], data.length); 144 } 145 146 if(data.length > 0) { 147 decode(connection, buffer); 148 } 149 } 150 }) 151 .build(); 152 } 153 154 override protected void dispatchMessage(WebSocketConnection connection, MessageBuffer message ) { 155 version(HUNT_DEBUG) { 156 string str = format("data received: %s", message.toString()); 157 tracef(str); 158 } 159 160 // rx: 00 00 27 11 00 00 00 05 00 00 00 00 00 00 00 00 57 6F 72 6C 64 161 // tx: 00 00 4E 21 00 00 00 0B 00 00 00 00 00 00 00 00 48 65 6C 6C 6F 20 57 6F 72 6C 64 162 163 uint messageId = message.id; 164 ExecutorInfo executorInfo = _messageTransport.getExecutor(messageId); 165 if(executorInfo == ExecutorInfo.init) { 166 warning("No Executor found for id: ", messageId); 167 } else { 168 WebsocketTransportSession session = cast(WebsocketTransportSession)connection.getAttribute(ChannelSession); 169 if(session is null ){ 170 session = new WebsocketTransportSession(_sessionManager.generateId(), connection); 171 connection.setAttribute(ChannelSession, session); 172 _sessionManager.add(session); 173 } 174 175 TransportContext context = TransportContext(_sessionManager, session); 176 executorInfo.execute(context, message); 177 } 178 } 179 }