Skip to content

Commit

Permalink
Handle close_notify in ConnectionPoolHandler (#1841)
Browse files Browse the repository at this point in the history
* Close on completion close event

* Add handling to OriginResponseReceiver

* Use a new status category

* Fix index

* Add unit test for ids being unique

* 504 is probably better than 500

* Switch to debug logging

* Use 502 for close_notify
  • Loading branch information
jguerra authored Oct 30, 2024
1 parent 7df5afc commit 5e2bdf1
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ public enum OutboundErrorType implements ErrorType {
ERROR_TYPE_ORIGIN_RESET_CONN_STATUS.get(),
ZuulStatusCategory.FAILURE_ORIGIN_RESET_CONNECTION,
ClientException.ErrorType.CONNECT_EXCEPTION),
CLOSE_NOTIFY_CONNECTION(
502,
ZuulStatusCategory.FAILURE_ORIGIN_CLOSE_NOTIFY_CONNECTION,
ClientException.ErrorType.CONNECT_EXCEPTION),
CANCELLED(400, ZuulStatusCategory.FAILURE_CLIENT_CANCELLED, ClientException.ErrorType.SOCKET_TIMEOUT_EXCEPTION),
ORIGIN_CONCURRENCY_EXCEEDED(
ERROR_TYPE_ORIGIN_CONCURRENCY_EXCEEDED_STATUS.get(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.ssl.SslCloseCompletionEvent;
import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -48,6 +49,7 @@ public class ConnectionPoolHandler extends ChannelDuplexHandler {
private final Counter inactiveCounter;
private final Counter errorCounter;
private final Counter headerCloseCounter;
private final Counter sslCloseCompletionCounter;

public ConnectionPoolHandler(OriginName originName) {
if (originName == null) {
Expand All @@ -58,6 +60,7 @@ public ConnectionPoolHandler(OriginName originName) {
this.inactiveCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_inactive", originName.getMetricId());
this.errorCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_error", originName.getMetricId());
this.headerCloseCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_headerClose", originName.getMetricId());
this.sslCloseCompletionCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_sslClose", originName.getMetricId());
}

@Override
Expand All @@ -71,12 +74,11 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
final String msg = "Origin channel for origin - " + originName + " - idle timeout has fired. "
+ ChannelUtils.channelInfoForLogging(ctx.channel());
closeConnection(ctx, msg);
} else if (evt instanceof CompleteEvent) {
} else if (evt instanceof CompleteEvent completeEvt) {
// The HttpLifecycleChannelHandler instance will fire this event when either a response has finished being
// written, or
// the channel is no longer active or disconnected.
// Return the connection to pool.
CompleteEvent completeEvt = (CompleteEvent) evt;
final CompleteReason reason = completeEvt.getReason();
if (reason == CompleteReason.SESSION_COMPLETE) {
final PooledConnection conn = PooledConnection.getFromChannel(ctx.channel());
Expand All @@ -97,6 +99,11 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
+ reason.name() + ", " + ChannelUtils.channelInfoForLogging(ctx.channel());
closeConnection(ctx, msg);
}
} else if(evt instanceof SslCloseCompletionEvent event) {
sslCloseCompletionCounter.increment();
String msg = "Origin channel for origin - " + originName + " - received SslCloseCompletionEvent " + event + ". "
+ ChannelUtils.channelInfoForLogging(ctx.channel());
closeConnection(ctx, msg);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.ssl.SslCloseCompletionEvent;
import io.netty.handler.ssl.SslHandshakeCompletionEvent;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.ReadTimeoutException;
Expand All @@ -59,6 +60,8 @@ public class OriginResponseReceiver extends ChannelDuplexHandler {
private static final Logger logger = LoggerFactory.getLogger(OriginResponseReceiver.class);
private static final AttributeKey<Throwable> SSL_HANDSHAKE_UNSUCCESS_FROM_ORIGIN_THROWABLE =
AttributeKey.newInstance("_ssl_handshake_from_origin_throwable");
private static final AttributeKey<Boolean> SSL_CLOSE_NOTIFY_SEEN =
AttributeKey.newInstance("_ssl_close_notify_seen");
public static final String CHANNEL_HANDLER_NAME = "_origin_response_receiver";

public OriginResponseReceiver(final ProxyEndpoint edgeProxy) {
Expand Down Expand Up @@ -106,15 +109,20 @@ protected void channelReadInternal(final ChannelHandlerContext ctx, Object msg)

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof CompleteEvent) {
final CompleteReason reason = ((CompleteEvent) evt).getReason();
if (evt instanceof CompleteEvent completeEvent) {
final CompleteReason reason = completeEvent.getReason();
if ((reason != CompleteReason.SESSION_COMPLETE) && (edgeProxy != null)) {
logger.error(
"Origin request completed with reason other than COMPLETE: {}, {}",
reason.name(),
ChannelUtils.channelInfoForLogging(ctx.channel()));
final ZuulException ze = new ZuulException("CompleteEvent", reason.name(), true);
edgeProxy.errorFromOrigin(ze);
if(reason == CompleteReason.CLOSE && Boolean.TRUE.equals(ctx.channel().attr(SSL_CLOSE_NOTIFY_SEEN).get())) {
logger.warn("Origin request completed with close, after getting a SslCloseCompletionEvent event: {}", ChannelUtils.channelInfoForLogging(ctx.channel()));
edgeProxy.errorFromOrigin(new OriginConnectException("Origin connection close_notify", OutboundErrorType.CLOSE_NOTIFY_CONNECTION));
} else {
logger.error(
"Origin request completed with reason other than COMPLETE: {}, {}",
reason.name(),
ChannelUtils.channelInfoForLogging(ctx.channel()));
final ZuulException ze = new ZuulException("CompleteEvent", reason.name(), true);
edgeProxy.errorFromOrigin(ze);
}
}

// First let this event propagate along the pipeline, before cleaning vars from the channel.
Expand All @@ -135,6 +143,10 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
new OutboundException(OutboundErrorType.READ_TIMEOUT, edgeProxy.getRequestAttempts()));
}
super.userEventTriggered(ctx, evt);
} else if(evt instanceof SslCloseCompletionEvent) {
logger.debug("Received SslCloseCompletionEvent on {}", ChannelUtils.channelInfoForLogging(ctx.channel()));
ctx.channel().attr(SSL_CLOSE_NOTIFY_SEEN).set(true);
super.userEventTriggered(ctx, evt);
} else {
super.userEventTriggered(ctx, evt);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ public enum ZuulStatusCategory implements StatusCategory {
FAILURE_ORIGIN_THROTTLED(ZuulStatusCategoryGroup.FAILURE, 6, "Throttled by origin returning 503 status"),
FAILURE_ORIGIN_NO_SERVERS(ZuulStatusCategoryGroup.FAILURE, 14, "No UP origin servers available in Discovery"),
FAILURE_ORIGIN_RESET_CONNECTION(
ZuulStatusCategoryGroup.FAILURE, 15, "Connection reset on an established origin connection");
ZuulStatusCategoryGroup.FAILURE, 15, "Connection reset on an established origin connection"),
FAILURE_ORIGIN_CLOSE_NOTIFY_CONNECTION(ZuulStatusCategoryGroup.FAILURE, 16, "Connection TLS session shutdown");

private final StatusCategoryGroup group;
private final String id;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2024 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.zuul.stats.status;

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.util.Arrays;
import java.util.Set;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Test;

/**
* @author Justin Guerra
* @since 10/29/24
*/
public class ZuulStatusCategoryTest {

@Test
public void categoriesUseUniqueIds() {
ZuulStatusCategory[] values = ZuulStatusCategory.values();
Set<String> ids = Arrays.stream(values).map(ZuulStatusCategory::getId).collect(Collectors.toSet());
assertEquals(values.length, ids.size());
}
}

0 comments on commit 5e2bdf1

Please sign in to comment.