diff --git a/README.md b/README.md index c4f83aa10..040faff10 100644 --- a/README.md +++ b/README.md @@ -21,8 +21,8 @@ Subreddit: [r/red5](http://www.reddit.com/r/red5) Automatic builds (Courtesy of Apache [OpenMeetings](http://openmeetings.apache.org/)): -* [Red5](https://builds.apache.org/view/M-R/view/OpenMeetings/job/Red5-server/) -* [Windows Installer](https://builds.apache.org/view/M-R/view/OpenMeetings/job/red5-installer/) + * [Red5](https://ci-builds.apache.org/job/OpenMeetings/job/Red5-server/) + * [Windows Installer](https://ci-builds.apache.org/job/OpenMeetings/job/red5-installer/) __Note on Bootstrap__ The bootstrap and shutdown classes have been moved to the [red5-service](https://github.com/Red5/red5-service) project; the dependency has been added to this projects pom. diff --git a/io/src/main/java/org/red5/io/utils/TlsUtils.java b/io/src/main/java/org/red5/io/utils/TlsUtils.java index ab7c81875..89f14ba4b 100644 --- a/io/src/main/java/org/red5/io/utils/TlsUtils.java +++ b/io/src/main/java/org/red5/io/utils/TlsUtils.java @@ -23,10 +23,7 @@ public class TlsUtils { @SuppressWarnings("unused") - private static byte[] DOWNGRADE_TLS11 = Hex.decodeStrict("444F574E47524400"); - - @SuppressWarnings("unused") - private static byte[] DOWNGRADE_TLS12 = Hex.decodeStrict("444F574E47524401"); + private static byte[] DOWNGRADE_TLS11 = Hex.decodeStrict("444F574E47524400"), DOWNGRADE_TLS12 = Hex.decodeStrict("444F574E47524401"); public static final byte[] EMPTY_BYTES = new byte[0]; @@ -610,9 +607,7 @@ public static int[] readUint16Array(int count, InputStream input) throws IOExcep } public static ASN1Primitive readASN1Object(byte[] encoding) throws IOException { - ASN1InputStream asn1 = null; - try { - asn1 = new ASN1InputStream(encoding); + try (ASN1InputStream asn1 = new ASN1InputStream(encoding)) { ASN1Primitive result = asn1.readObject(); if (null == result) { throw new IOException("AlertDescription.decode_error"); @@ -621,10 +616,6 @@ public static ASN1Primitive readASN1Object(byte[] encoding) throws IOException { throw new IOException("AlertDescription.decode_error"); } return result; - } finally { - if (asn1 != null) { - asn1.close(); - } } } diff --git a/server/src/main/java/org/red5/net/websocket/WebSocketConnection.java b/server/src/main/java/org/red5/net/websocket/WebSocketConnection.java index fd73a7a08..03852f5d5 100644 --- a/server/src/main/java/org/red5/net/websocket/WebSocketConnection.java +++ b/server/src/main/java/org/red5/net/websocket/WebSocketConnection.java @@ -25,9 +25,6 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.stream.Stream; -import jakarta.websocket.Extension; -import jakarta.websocket.Session; - import org.apache.commons.lang3.StringUtils; import org.apache.tomcat.websocket.Constants; import org.apache.tomcat.websocket.WsSession; @@ -35,6 +32,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import jakarta.websocket.CloseReason; +import jakarta.websocket.CloseReason.CloseCode; +import jakarta.websocket.CloseReason.CloseCodes; +import jakarta.websocket.Extension; +import jakarta.websocket.Session; + /** * WebSocketConnection
* This class represents a WebSocket connection with a client (browser). @@ -63,8 +66,9 @@ public class WebSocketConnection extends AttributeStore implements Comparable wsSession; + private final WsSession wsSession; + // reference to the scope for manager access private WeakReference scope; // unique identifier for the session @@ -114,9 +118,9 @@ public WebSocketConnection(WebSocketScope scope, Session session) { log.debug("path: {}", path); } // cast ws session - this.wsSession = new WeakReference<>((WsSession) session); + this.wsSession = (WsSession) session; if (isDebug) { - log.debug("ws session: {}", wsSession.get()); + log.debug("ws session: {}", wsSession); } // the websocket session id will be used for hash code comparison, its the only usable value currently wsSessionId = session.getId(); @@ -163,6 +167,8 @@ public WebSocketConnection(WebSocketScope scope, Session session) { // add the timeouts to the user props userProps.put(Constants.READ_IDLE_TIMEOUT_MS, readTimeout); userProps.put(Constants.WRITE_IDLE_TIMEOUT_MS, sendTimeout); + // set the close timeout to 5 seconds + userProps.put(Constants.SESSION_CLOSE_TIMEOUT_PROPERTY, TimeUnit.SECONDS.toMillis(5)); if (isDebug) { log.debug("userProps: {}", userProps); } @@ -186,9 +192,8 @@ public void send(String data) throws UnsupportedEncodingException, IOException { } // process the incoming string if (StringUtils.isNotBlank(data)) { - final WsSession session = wsSession.get(); // attempt send only if the session is not closed - if (session != null && !session.isClosed()) { + if (!wsSession.isClosed()) { try { if (useAsync) { if (sendFuture != null && !sendFuture.isDone()) { @@ -197,7 +202,7 @@ public void send(String data) throws UnsupportedEncodingException, IOException { } catch (TimeoutException e) { log.warn("Send timed out {}", wsSessionId); // if the session is not open, cancel the future - if (!session.isOpen()) { + if (!wsSession.isOpen()) { sendFuture.cancel(true); return; } @@ -205,13 +210,13 @@ public void send(String data) throws UnsupportedEncodingException, IOException { } synchronized (wsSessionId) { int lengthToWrite = data.getBytes().length; - sendFuture = session.getAsyncRemote().sendText(data); + sendFuture = wsSession.getAsyncRemote().sendText(data); updateWriteBytes(lengthToWrite); } } else { synchronized (wsSessionId) { int lengthToWrite = data.getBytes().length; - session.getBasicRemote().sendText(data); + wsSession.getBasicRemote().sendText(data); updateWriteBytes(lengthToWrite); } } @@ -236,8 +241,7 @@ public void send(byte[] buf) throws IOException { if (isDebug) { log.debug("send binary: {}", Arrays.toString(buf)); } - WsSession session = wsSession.get(); - if (session != null && session.isOpen()) { + if (!wsSession.isClosed()) { try { // send the bytes if (useAsync) { @@ -253,12 +257,12 @@ public void send(byte[] buf) throws IOException { } } synchronized (wsSessionId) { - sendFuture = session.getAsyncRemote().sendBinary(ByteBuffer.wrap(buf)); + sendFuture = wsSession.getAsyncRemote().sendBinary(ByteBuffer.wrap(buf)); updateWriteBytes(buf.length); } } else { synchronized (wsSessionId) { - session.getBasicRemote().sendBinary(ByteBuffer.wrap(buf)); + wsSession.getBasicRemote().sendBinary(ByteBuffer.wrap(buf)); updateWriteBytes(buf.length); } } @@ -281,11 +285,10 @@ public void sendPing(byte[] buf) throws IllegalArgumentException, IOException { if (isTrace) { log.trace("send ping: {}", buf); } - WsSession session = wsSession.get(); - if (session != null && session.isOpen()) { + if (!wsSession.isClosed()) { synchronized (wsSessionId) { // send the bytes - session.getBasicRemote().sendPing(ByteBuffer.wrap(buf)); + wsSession.getBasicRemote().sendPing(ByteBuffer.wrap(buf)); // update counter updateWriteBytes(buf.length); } @@ -305,11 +308,10 @@ public void sendPong(byte[] buf) throws IllegalArgumentException, IOException { if (isTrace) { log.trace("send pong: {}", buf); } - WsSession session = wsSession.get(); - if (session != null && session.isOpen()) { + if (!wsSession.isClosed()) { synchronized (wsSessionId) { // send the bytes - session.getBasicRemote().sendPong(ByteBuffer.wrap(buf)); + wsSession.getBasicRemote().sendPong(ByteBuffer.wrap(buf)); // update counter updateWriteBytes(buf.length); } @@ -319,20 +321,36 @@ public void sendPong(byte[] buf) throws IllegalArgumentException, IOException { } /** - * close Connection + * Close the connection. */ public void close() { + close(CloseCodes.NORMAL_CLOSURE, ""); + } + + /** + * Close the connection with a reason. + * + * @param code CloseCode + * @param reasonPhrase short reason for closing + */ + public void close(CloseCode code, String reasonPhrase) { if (connected.compareAndSet(true, false)) { - log.debug("close: {}", wsSessionId); - WsSession session = wsSession != null ? wsSession.get() : null; - // session has to be open, or user props cannot be retrieved - if (session != null && session.isOpen()) { - // trying to close the session nicely - try { - session.close(); - } catch (Exception e) { - log.debug("Exception closing session", e); + // no blank reasons + if (reasonPhrase == null) { + reasonPhrase = ""; + } + log.debug("close: {} code: {} reason: {}", wsSessionId, code, reasonPhrase); + try { + // close the session if open + if (wsSession.isOpen()) { + CloseReason reason = new CloseReason(code, reasonPhrase); + if (isDebug) { + log.debug("Closing session: {} with reason: {}", wsSessionId, reason); + } + wsSession.close(reason); } + } catch (Exception e) { + log.debug("Exception closing session", e); } // clean up our props attributes.clear(); @@ -347,40 +365,9 @@ public void close() { if (headers != null) { headers = null; } - if (scope.get() != null) { - // disconnect from scope - scope.get().removeConnection(this); - // clear weak refs - wsSession.clear(); - scope.clear(); - } } } - /* - WsSession uses these userProperties for checkExpiration along with maxIdleTimeout - - configuration for read idle timeout on WebSocket session - READ_IDLE_TIMEOUT_MS = "org.apache.tomcat.websocket.READ_IDLE_TIMEOUT_MS"; - configuration for write idle timeout on WebSocket session - WRITE_IDLE_TIMEOUT_MS = "org.apache.tomcat.websocket.WRITE_IDLE_TIMEOUT_MS"; - */ - public void timeoutAsync(long now) { - // XXX(paul) only logging here as we should more than likely rely upon the container checking expiration - log.trace("timeoutAsync: {} on session id: {} read: {} written: {}", now, wsSessionId, readBytes, writtenBytes); - /* - WsSession session = wsSession.get(); - Map props = session.getUserProperties(); - log.debug("Session properties: {}", props); - long maxIdleTimeout = session.getMaxIdleTimeout(); - long readTimeout = (long) props.get(Constants.READ_IDLE_TIMEOUT_MS); - long sendTimeout = (long) props.get(Constants.WRITE_IDLE_TIMEOUT_MS); - log.debug("Session timeouts - max: {} read: {} write: {}", maxIdleTimeout, readTimeout, sendTimeout); - //long readDelta = (now - lastReadTime), writeDelta = (now - lastWriteTime); - //log.debug("timeoutAsync: {} on {} last read: {} last write: {}", now, wsSessionId, readDelta, writeDelta); - */ - } - /** * Async send is enabled in non-Windows based systems; this provides a means to override it. * @@ -453,7 +440,7 @@ public void setOrigin(String origin) { * @return true if secure and false if unsecure or unconnected */ public boolean isSecure() { - Optional opt = Optional.ofNullable(wsSession.get()); + Optional opt = Optional.ofNullable(wsSession); if (opt.isPresent()) { return (opt.get().isOpen() ? opt.get().isSecure() : false); } @@ -672,12 +659,12 @@ public Object getUserProperty(String key) { public void setWsSessionTimeout(long idleTimeout) { if (wsSession != null) { - wsSession.get().setMaxIdleTimeout(idleTimeout); + wsSession.setMaxIdleTimeout(idleTimeout); } } public WsSession getWsSession() { - return wsSession != null ? wsSession.get() : null; + return wsSession != null ? wsSession : null; } public long getReadBytes() { diff --git a/server/src/main/java/org/red5/net/websocket/WebSocketScope.java b/server/src/main/java/org/red5/net/websocket/WebSocketScope.java index 767178402..bf58b6792 100644 --- a/server/src/main/java/org/red5/net/websocket/WebSocketScope.java +++ b/server/src/main/java/org/red5/net/websocket/WebSocketScope.java @@ -24,6 +24,8 @@ import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; +import jakarta.websocket.CloseReason.CloseCodes; + /** * WebSocketScope contains an IScope and keeps track of WebSocketConnection and IWebSocketDataListener instances. * @@ -91,7 +93,7 @@ public void unregister() { // clean up the connections by first closing them conns.forEach(conn -> { if (conns.remove(conn)) { - conn.close(); + conn.close(CloseCodes.GOING_AWAY, "WebSocket scope removed"); } }); // clean up the listeners by first stopping them diff --git a/server/src/main/java/org/red5/net/websocket/WebSocketScopeManager.java b/server/src/main/java/org/red5/net/websocket/WebSocketScopeManager.java index 17918a420..a1a6e440f 100644 --- a/server/src/main/java/org/red5/net/websocket/WebSocketScopeManager.java +++ b/server/src/main/java/org/red5/net/websocket/WebSocketScopeManager.java @@ -24,6 +24,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import jakarta.websocket.CloseReason.CloseCodes; + /** * Manages websocket scopes and listeners. * @@ -172,15 +174,12 @@ public boolean addWebSocketScope(WebSocketScope webSocketScope) { wsConn.sendPing(PING_BYTES); } catch (Exception e) { log.debug("Exception pinging connection: {} connection will be closed", wsConn.getSessionId(), e); - // if the connection isn't connected, remove them - wsScope.removeConnection(wsConn); - // if the ping fails, consider them gone - wsConn.close(); + wsConn.close(CloseCodes.CLOSED_ABNORMALLY, e.getMessage()); } } else { log.debug("Removing unconnected connection: {} during ping loop", wsConn.getSessionId()); // if the connection isn't connected, remove them - wsScope.removeConnection(wsConn); + wsConn.close(CloseCodes.UNEXPECTED_CONDITION, "Connection not connected"); } } catch (Exception e) { log.warn("Exception in WS pinger", e); diff --git a/server/src/main/java/org/red5/net/websocket/server/DefaultWebSocketEndpoint.java b/server/src/main/java/org/red5/net/websocket/server/DefaultWebSocketEndpoint.java index 72d3aa741..5e3520f51 100644 --- a/server/src/main/java/org/red5/net/websocket/server/DefaultWebSocketEndpoint.java +++ b/server/src/main/java/org/red5/net/websocket/server/DefaultWebSocketEndpoint.java @@ -11,7 +11,6 @@ import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; -import java.util.Map; import org.apache.mina.core.buffer.IoBuffer; import org.red5.net.websocket.WSConstants; @@ -57,15 +56,10 @@ public void onOpen(Session session, EndpointConfig config) { if (isDebug) { log.debug("Session opened: {}\n{}", session.getId(), session.getRequestParameterMap()); } - Map confUserProps = config.getUserProperties(); - Map sessionUserProps = session.getUserProperties(); - if (isTrace) { - log.trace("User conf props: {}\nsession props: {}", confUserProps, sessionUserProps); - } // get ws scope from user props - scope = (WebSocketScope) confUserProps.get(WSConstants.WS_SCOPE); + scope = (WebSocketScope) config.getUserProperties().get(WSConstants.WS_SCOPE); // get ws connection from session user props - WebSocketConnection conn = (WebSocketConnection) sessionUserProps.get(WSConstants.WS_CONNECTION); + WebSocketConnection conn = (WebSocketConnection) session.getUserProperties().get(WSConstants.WS_CONNECTION); if (conn == null) { log.warn("WebSocketConnection null at onOpen for {}", session.getId()); } @@ -76,24 +70,13 @@ public void onOpen(Session session, EndpointConfig config) { @Override public void onClose(Session session, CloseReason closeReason) { + final String sessionId = session.getId(); + log.debug("Session closed: {}", sessionId); WebSocketConnection conn = null; // getting the sessions user properties on a closed connection will throw an exception when it checks state try { - Map sessionUserProps = session.getUserProperties(); - if (isTrace) { - log.trace("User session props: {}", sessionUserProps); - } // ensure we grab the scope from the session if its null - if (scope == null) { - scope = (WebSocketScope) sessionUserProps.get(WSConstants.WS_SCOPE); - log.trace("Scope pulled from session: {}", scope); - } - String sessionId = session.getId(); - if (isDebug) { - log.debug("Session closed: {} on scope: {}", sessionId, scope); - } - // get ws connection from session user props - conn = (WebSocketConnection) sessionUserProps.get(WSConstants.WS_CONNECTION); + conn = (WebSocketConnection) session.getUserProperties().get(WSConstants.WS_CONNECTION); // if we don't get it from the session, try the scope lookup if (conn == null) { log.warn("Connection for id: {} was not found in the session onClose", sessionId); @@ -107,11 +90,9 @@ public void onClose(Session session, CloseReason closeReason) { } finally { if (conn != null) { // fire close, to be sure - conn.close(); + scope.removeConnection(conn); // force remove on exception - if (scope != null) { - scope.removeConnection(conn); - } + conn.close(closeReason.getCloseCode(), closeReason.getReasonPhrase()); } } } diff --git a/server/src/main/java/org/red5/net/websocket/server/DefaultWsServerContainer.java b/server/src/main/java/org/red5/net/websocket/server/DefaultWsServerContainer.java index b6710472d..0b4720a3e 100644 --- a/server/src/main/java/org/red5/net/websocket/server/DefaultWsServerContainer.java +++ b/server/src/main/java/org/red5/net/websocket/server/DefaultWsServerContainer.java @@ -287,7 +287,7 @@ protected void registerSession(Object endpoint, WsSession wsSession) { */ @Override protected void unregisterSession(Object endpoint, WsSession wsSession) { - if (wsSession.isOpen() && wsSession.getHttpSessionId() != null && wsSession.getUserPrincipal() != null) { + if (wsSession.getHttpSessionId() != null) { unregisterAuthenticatedSession(wsSession, wsSession.getHttpSessionId()); log.debug("unregisterSession - unregisterAuthenticatedSession: {}", wsSession.getId()); } diff --git a/server/src/main/java/org/red5/net/websocket/server/WsHttpUpgradeHandler.java b/server/src/main/java/org/red5/net/websocket/server/WsHttpUpgradeHandler.java index 3f58fbe8d..36976f2ed 100644 --- a/server/src/main/java/org/red5/net/websocket/server/WsHttpUpgradeHandler.java +++ b/server/src/main/java/org/red5/net/websocket/server/WsHttpUpgradeHandler.java @@ -11,6 +11,7 @@ import org.apache.tomcat.util.net.SocketEvent; import org.apache.tomcat.util.net.SocketWrapperBase; import org.apache.tomcat.util.res.StringManager; +import org.apache.tomcat.websocket.Constants; import org.apache.tomcat.websocket.Transformation; import org.apache.tomcat.websocket.WsIOException; import org.apache.tomcat.websocket.WsSession; @@ -78,6 +79,10 @@ public class WsHttpUpgradeHandler implements InternalHttpUpgradeHandler { private WsSession wsSession; + private long lastTimeoutCheck = System.currentTimeMillis(); + + private long lastReadBytes, lastWrittenBytes; + public WsHttpUpgradeHandler() { applicationClassLoader = Thread.currentThread().getContextClassLoader(); } @@ -282,23 +287,66 @@ public void setSslSupport(SSLSupport sslSupport) { @Override public void timeoutAsync(long now) { log.trace("timeoutAsync: {} on session: {}", now, wsSession); - // session methods may not be called if the session is not open if (wsSession != null) { - if (wsSession.isOpen()) { - try { - // if we have a timeout, inform the ws connection - WebSocketConnection conn = (WebSocketConnection) wsSession.getUserProperties().get(WSConstants.WS_CONNECTION); - if (conn != null) { - conn.timeoutAsync(now); + try { + final String wsSessionId = wsSession.getId(); + // get scope from endpoint config + WebSocketScope scope = (WebSocketScope) endpointConfig.getUserProperties().get(WSConstants.WS_SCOPE); + // do lookup by session id, skips need for session user props + WebSocketConnection conn = scope.getConnectionBySessionId(wsSessionId); + // if we don't get it from the scope, try the session lookup + if (conn == null && wsSession.isOpen()) { + // session methods may not be called if its not open + conn = (WebSocketConnection) wsSession.getUserProperties().get(WSConstants.WS_CONNECTION); + } + // last check, if we don't have a connection, log a warning + if (conn == null) { + log.warn("Connection for id: {} was not found in the scope or session: {}", wsSession.getId(), scope.getPath()); + return; + } + // negative now means always treat as expired + if (now > 0) { + long checkDelta = now - lastTimeoutCheck; + long readBytes = conn.getReadBytes(), writtenBytes = conn.getWrittenBytes(); + log.info("timeoutAsync: {}ms on session id: {} read: {} written: {}", checkDelta, wsSessionId, readBytes, writtenBytes); + Map props = wsSession.getUserProperties(); + log.debug("Session properties: {}", props); + long maxIdleTimeout = wsSession.getMaxIdleTimeout(); + long readTimeout = (long) props.get(Constants.READ_IDLE_TIMEOUT_MS); + long writeTimeout = (long) props.get(Constants.WRITE_IDLE_TIMEOUT_MS); + log.debug("Session timeouts - max: {} read: {} write: {}", maxIdleTimeout, readTimeout, writeTimeout); + if (maxIdleTimeout > 0) { + if (checkDelta > maxIdleTimeout && (readBytes == lastReadBytes || writtenBytes == lastWrittenBytes)) { + log.info("Max idle timeout: {}ms on session id: {}", checkDelta, wsSessionId); + conn.close(CloseCodes.GOING_AWAY, "Max idle timeout"); + } + } else { + if (readTimeout > 0) { + if (readBytes == lastReadBytes) { + if (checkDelta > readTimeout) { + log.info("Read timeout: {}ms on session id: {}", checkDelta, wsSessionId); + conn.close(CloseCodes.GOING_AWAY, "Read timeout"); + } + } + } + if (writeTimeout > 0) { + if (writtenBytes == lastWrittenBytes) { + if (checkDelta > writeTimeout) { + log.info("Write timeout: {}ms on session id: {}", checkDelta, wsSessionId); + conn.close(CloseCodes.GOING_AWAY, "Write timeout"); + } + } + } } - } catch (Throwable t) { - log.warn(sm.getString("wsHttpUpgradeHandler.timeoutAsyncFailed"), t); + lastReadBytes = readBytes; + lastWrittenBytes = writtenBytes; + lastTimeoutCheck = now; + } else { + log.warn("timeoutAsync: negative time on session id: {}", wsSessionId); + conn.close(CloseCodes.GOING_AWAY, "Timeout expired"); } - } else { - log.debug("timeoutAsync: {} session is not open for session id: {}", now, wsSession.getId()); - // we need the processor released from the async waitingProcessors list - // located in abstract protocol - //socketWrapper.close(); + } catch (Throwable t) { + log.warn(sm.getString("wsHttpUpgradeHandler.timeoutAsyncFailed"), t); } } } diff --git a/server/src/main/server/conf/red5-common.xml b/server/src/main/server/conf/red5-common.xml index 43950df9a..04dea85b3 100644 --- a/server/src/main/server/conf/red5-common.xml +++ b/server/src/main/server/conf/red5-common.xml @@ -29,11 +29,6 @@ - - -