Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix sslsubject regression #159

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions lib/logstash/inputs/tcp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -186,19 +186,22 @@ def close
end

def decode_buffer(client_ip_address, client_address, client_port, codec, proxy_address,
proxy_port, tbuf, socket)
proxy_port, tbuf, ssl_subject)
codec.decode(tbuf) do |event|
if @proxy_protocol
event.set(PROXY_HOST_FIELD, proxy_address) unless event.get(PROXY_HOST_FIELD)
event.set(PROXY_PORT_FIELD, proxy_port) unless event.get(PROXY_PORT_FIELD)
end
enqueue_decorated(event, client_ip_address, client_address, client_port, socket)
if ssl_subject
event.set(SSLSUBJECT_FIELD, ssl_subject)
end
enqueue_decorated(event, client_ip_address, client_address, client_port)
end
end

def flush_codec(codec, client_ip_address, client_address, client_port, socket)
def flush_codec(codec, client_ip_address, client_address, client_port)
codec.flush do |event|
enqueue_decorated(event, client_ip_address, client_address, client_port, socket)
enqueue_decorated(event, client_ip_address, client_address, client_port)
end
end

Expand All @@ -218,10 +221,14 @@ def run_client()
client_socket.close rescue nil
end

# only called in client mode
def handle_socket(socket)
client_address = socket.peeraddr[3]
client_ip_address = socket.peeraddr[2]
client_port = socket.peeraddr[1]

# Client mode sslsubject extraction, server mode happens in DecoderImpl#decode
ssl_subject = socket.peer_cert.subject.to_s if @ssl_enable && @ssl_verify
peer = "#{client_address}:#{client_port}"
first_read = true
codec = @codec.clone
Expand All @@ -245,7 +252,7 @@ def handle_socket(socket)
end
end
decode_buffer(client_ip_address, client_address, client_port, codec, proxy_address,
proxy_port, tbuf, socket)
proxy_port, tbuf, ssl_subject)
end
rescue EOFError
@logger.debug? && @logger.debug("Connection closed", :client => peer)
Expand All @@ -261,14 +268,13 @@ def handle_socket(socket)
ensure
# catch all rescue nil on close to discard any close errors or invalid socket
socket.close rescue nil
flush_codec(codec, client_ip_address, client_address, client_port, socket)
flush_codec(codec, client_ip_address, client_address, client_port)
end

def enqueue_decorated(event, client_ip_address, client_address, client_port, socket)
def enqueue_decorated(event, client_ip_address, client_address, client_port)
event.set(HOST_FIELD, client_address) unless event.get(HOST_FIELD)
event.set(HOST_IP_FIELD, client_ip_address) unless event.get(HOST_IP_FIELD)
event.set(PORT_FIELD, client_port) unless event.get(PORT_FIELD)
event.set(SSLSUBJECT_FIELD, socket.peer_cert.subject.to_s) if socket && @ssl_enable && @ssl_verify && event.get(SSLSUBJECT_FIELD).nil?
rwaweber marked this conversation as resolved.
Show resolved Hide resolved
decorate(event)
@output_queue << event
end
Expand Down
16 changes: 12 additions & 4 deletions lib/logstash/inputs/tcp/decoder_impl.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,32 @@ def initialize(codec, tcp)
@first_read = true
end

def decode(channel_addr, data)
def decode(ctx, data)
channel_addr = ctx.channel().remoteAddress()
bytes = Java::byte[data.readableBytes].new
data.getBytes(0, bytes)
data.release
tbuf = String.from_java_bytes bytes, "ASCII-8BIT"
if @first_read
tbuf = init_first_read(channel_addr, tbuf)
end
@tcp.decode_buffer(@ip_address, @address, @port, @codec,
@proxy_address, @proxy_port, tbuf, nil)
if @tcp.ssl_enable && @tcp.ssl_verify
session = ctx.channel().pipeline().get("ssl-handler").engine().getSession()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a pretty long chain, and it is unclear why we stop here to bind a local variable session, which we use only once to continue the chain.

  • are all methods in the chain documented to never return nil?
  • should we extract this complexity to a helper method that has appropriate rescue clauses?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are all methods in the chain documented to never return nil?

It seems that way, aside from the ChannelPipeline#get we invoke, though since we explicitly name our channelhandler here, we should be safe in that respect.

should we extract this complexity to a helper method that has appropriate rescue clauses?

I think thats a great idea, though I sort of think it might be worth tucking in somewhere like the logstash/base/inputs package, since we'd be repeating the same logic in this plugin, the beats input plugin, and potentially other plugins where netty is used and TLS metadata extraction is possible. We would need to be careful about formatting though, as different input plugins seem to serialize TLS metadata differently. (it gets cut to 3 fields in the beats plugin and one field here).

Also, as an update on my last message, looking more closely, I'm not sure that rescue clause I cite would really be adding much in terms of safety, but would be more for debug logging insight.

sslsubject = session.getPeerPrincipal().getName()
@tcp.decode_buffer(@ip_address, @address, @port, @codec,
@proxy_address, @proxy_port, tbuf, sslsubject)
else
@tcp.decode_buffer(@ip_address, @address, @port, @codec,
@proxy_address, @proxy_port, tbuf, nil)
end
end

def copy
DecoderImpl.new(@codec.clone, @tcp)
end

def flush
@tcp.flush_codec(@codec, @ip_address, @address, @port, nil)
@tcp.flush_codec(@codec, @ip_address, @address, @port)
end

private
Expand Down
25 changes: 25 additions & 0 deletions spec/inputs/tcp_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,31 @@ def get_port
expect(result.first.get("message")).to eq(message)
end
end
context "with a regular TLS setup" do
let(:config) do
{
"host" => "127.0.0.1",
"port" => port,
"ssl_enable" => true,
"ssl_cert" => chain_of_certificates[:b_cert].path,
"ssl_key" => chain_of_certificates[:b_key].path,
"ssl_extra_chain_certs" => [ chain_of_certificates[:a_cert].path ],
"ssl_certificate_authorities" => [ chain_of_certificates[:root_ca].path ],
"ssl_verify" => true
}
end
it "should be able to extract the sslsubject from connections" do
result = TcpHelpers.pipelineless_input(subject, 1) do
sslsocket.connect
sslsocket.write("#{message}\n")
tcp.flush
sslsocket.close
tcp.close
end
expect(result.size).to eq(1)
expect(result.first.get("sslsubject")).to eq("CN=RubyAA_Cert,DC=ruby-lang,DC=org")
end
end
end

context "with a poorly-behaving client" do
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/logstash/tcp/Decoder.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.logstash.tcp;

import io.netty.buffer.ByteBuf;
import java.net.SocketAddress;
import io.netty.channel.ChannelHandlerContext;

/**
* Decoder bridge to implement in JRuby.
Expand All @@ -13,7 +13,7 @@ public interface Decoder {
* @param key {@link SocketAddress}
* @param message Data {@link ByteBuf} for this address
*/
void decode(SocketAddress key, ByteBuf message);
void decode(ChannelHandlerContext context, ByteBuf message);

/**
* Creates a copy of this decoder, that has all internal meta data cleared.
Expand Down
7 changes: 5 additions & 2 deletions src/main/java/org/logstash/tcp/InputLoop.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public void close() {
* {@link Decoder}.
*/
private static final class InputHandler extends ChannelInitializer<SocketChannel> {
private final String SSL_HANDLER = "ssl-handler";

/**
* {@link Decoder} supplied by JRuby.
Expand Down Expand Up @@ -140,7 +141,7 @@ protected void initChannel(final SocketChannel channel) throws Exception {

// if SSL is enabled, the SSL handler must be added to the pipeline first
if (sslContext != null) {
channel.pipeline().addLast(sslContext.newHandler(channel.alloc()));
channel.pipeline().addLast(SSL_HANDLER, sslContext.newHandler(channel.alloc()));
}

channel.pipeline().addLast(new DecoderAdapter(localCopy, logger));
Expand Down Expand Up @@ -199,9 +200,11 @@ private static final class DecoderAdapter extends ChannelInboundHandlerAdapter {
this.decoder = decoder;
}

// 6.07 updated to pass in the full netty ChannelHandlerContext instead of the remoteaddress field
// corresponding interface updated
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
decoder.decode(ctx.channel().remoteAddress(), (ByteBuf) msg);
decoder.decode(ctx, (ByteBuf) msg);
}

@Override
Expand Down
2 changes: 1 addition & 1 deletion version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
6.0.6
6.0.7