diff --git a/lib/logstash/inputs/tcp.rb b/lib/logstash/inputs/tcp.rb index 81df5da..9e802d4 100644 --- a/lib/logstash/inputs/tcp.rb +++ b/lib/logstash/inputs/tcp.rb @@ -183,19 +183,19 @@ def close end def decode_buffer(client_ip_address, client_address, client_port, codec, proxy_address, - proxy_port, tbuf, socket) + proxy_port, tbuf, ssl_subject) codec.decode(tbuf) do |event| if @proxy_protocol event.set(@field_proxy_host, proxy_address) unless event.get(@field_proxy_host) event.set(@field_proxy_port, proxy_port) unless event.get(@field_proxy_port) end - enqueue_decorated(event, client_ip_address, client_address, client_port, socket) + enqueue_decorated(event, client_ip_address, client_address, client_port, ssl_subject) end end - def flush_codec(codec, client_ip_address, client_address, client_port, socket) + def flush_codec(codec, client_ip_address, client_address, client_port, ssl_subject) codec.flush do |event| - enqueue_decorated(event, client_ip_address, client_address, client_port, socket) + enqueue_decorated(event, client_ip_address, client_address, client_port, ssl_subject) end end @@ -215,10 +215,14 @@ def run_client() client_socket.close rescue nil end + # only called in client mode def handle_socket(socket) client_address = socket.peeraddr[3] client_ip_address = socket.peeraddr[2] client_port = socket.peeraddr[1] + + # Client mode sslsubject extraction, server mode happens in DecoderImpl#decode + ssl_subject = socket.peer_cert.subject.to_s if @ssl_enable && @ssl_verify peer = "#{client_address}:#{client_port}" first_read = true codec = @codec.clone @@ -242,7 +246,7 @@ def handle_socket(socket) end end decode_buffer(client_ip_address, client_address, client_port, codec, proxy_address, - proxy_port, tbuf, socket) + proxy_port, tbuf, ssl_subject) end rescue EOFError @logger.debug? && @logger.debug("Connection closed", :client => peer) @@ -256,14 +260,14 @@ def handle_socket(socket) ensure # catch all rescue nil on close to discard any close errors or invalid socket socket.close rescue nil - flush_codec(codec, client_ip_address, client_address, client_port, socket) + flush_codec(codec, client_ip_address, client_address, client_port, ssl_subject) end - def enqueue_decorated(event, client_ip_address, client_address, client_port, socket) + def enqueue_decorated(event, client_ip_address, client_address, client_port, ssl_subject) event.set(@field_host, client_address) unless event.get(@field_host) event.set(@field_host_ip, client_ip_address) unless event.get(@field_host_ip) event.set(@field_port, client_port) unless event.get(@field_port) - event.set(@field_sslsubject, socket.peer_cert.subject.to_s) if socket && @ssl_enable && @ssl_verify && event.get(@field_sslsubject).nil? + event.set(@field_sslsubject, ssl_subject) unless ssl_subject.nil? || event.get(@field_sslsubject) decorate(event) @output_queue << event end diff --git a/lib/logstash/inputs/tcp/decoder_impl.rb b/lib/logstash/inputs/tcp/decoder_impl.rb index cbd9609..5c8d2e7 100644 --- a/lib/logstash/inputs/tcp/decoder_impl.rb +++ b/lib/logstash/inputs/tcp/decoder_impl.rb @@ -11,16 +11,17 @@ def initialize(codec, tcp) @first_read = true end - def decode(channel_addr, data) + def decode(ctx, data) + channel = ctx.channel() bytes = Java::byte[data.readableBytes].new data.getBytes(0, bytes) data.release tbuf = String.from_java_bytes bytes, "ASCII-8BIT" if @first_read - tbuf = init_first_read(channel_addr, tbuf) + tbuf = init_first_read(channel, tbuf) end @tcp.decode_buffer(@ip_address, @address, @port, @codec, - @proxy_address, @proxy_port, tbuf, nil) + @proxy_address, @proxy_port, tbuf, @sslsubject) end def copy @@ -28,11 +29,12 @@ def copy end def flush - @tcp.flush_codec(@codec, @ip_address, @address, @port, nil) + @tcp.flush_codec(@codec, @ip_address, @address, @port, @sslsubject) end private - def init_first_read(channel_addr, received) + def init_first_read(channel, received) + channel_addr = channel.remoteAddress() if @tcp.proxy_protocol pp_hdr, filtered = received.split("\r\n", 2) pp_info = pp_hdr.split(/\s/) @@ -53,10 +55,20 @@ def init_first_read(channel_addr, received) @address = extract_host_name(channel_addr) # name _or_ address of sender @port = channel_addr.get_port # outgoing port of sender (probably random) end + @sslsubject = extract_sslsubject(channel) @first_read = false filtered end + private + def extract_sslsubject(channel) + return nil unless @tcp.ssl_enable && @tcp.ssl_verify + + channel.pipeline().get("ssl-handler").engine().getSession().getPeerPrincipal().getName() + rescue Exception => e + nil + end + private def extract_host_name(channel_addr) channel_addr = java.net.InetSocketAddress.new(channel_addr, 0) if channel_addr.kind_of?(String) diff --git a/spec/inputs/tcp_spec.rb b/spec/inputs/tcp_spec.rb index 48e8320..9515476 100644 --- a/spec/inputs/tcp_spec.rb +++ b/spec/inputs/tcp_spec.rb @@ -541,7 +541,7 @@ def get_port end end - describe "#receive" do + describe "#receive", :ecs_compatibility_support do shared_examples "receiving events" do # TODO(sissel): Implement normal event-receipt tests as as a shared example end @@ -549,7 +549,10 @@ def get_port context "when ssl_enable is true" do let(:input) { subject } let(:queue) { Queue.new } - before(:each) { subject.register } + before(:each) do + allow_any_instance_of(described_class).to receive(:ecs_compatibility).and_return(ecs_compatibility) if defined?(ecs_compatibility) + subject.register + end context "when using a certificate chain" do chain_of_certificates = TcpHelpers.new.chain_of_certificates @@ -646,6 +649,37 @@ def get_port expect(result.first.get("message")).to eq(message) end end + context "with a regular TLS setup" do + let(:config) do + { + "host" => "127.0.0.1", + "port" => port, + "ssl_enable" => true, + "ssl_cert" => chain_of_certificates[:b_cert].path, + "ssl_key" => chain_of_certificates[:b_key].path, + "ssl_extra_chain_certs" => [ chain_of_certificates[:a_cert].path ], + "ssl_certificate_authorities" => [ chain_of_certificates[:root_ca].path ], + "ssl_verify" => true + } + end + + ecs_compatibility_matrix(:disabled,:v1, :v8 => :v1) do |ecs_select| + it "extracts the TLS subject from connections" do + result = TcpHelpers.pipelineless_input(subject, 1) do + sslsocket.connect + sslsocket.write("#{message}\n") + tcp.flush + sslsocket.close + tcp.close + end + expect(result.size).to eq(1) + event = result.first + + ssl_subject_field = ecs_select[disabled: 'sslsubject', v1:'[@metadata][input][tcp][tls][client][subject]'] + expect(event.get(ssl_subject_field)).to eq("CN=RubyAA_Cert,DC=ruby-lang,DC=org") + end + end + end end context "with a poorly-behaving client" do diff --git a/src/main/java/org/logstash/tcp/Decoder.java b/src/main/java/org/logstash/tcp/Decoder.java index 2d7e4c8..5d0a950 100644 --- a/src/main/java/org/logstash/tcp/Decoder.java +++ b/src/main/java/org/logstash/tcp/Decoder.java @@ -1,7 +1,7 @@ package org.logstash.tcp; import io.netty.buffer.ByteBuf; -import java.net.SocketAddress; +import io.netty.channel.ChannelHandlerContext; /** * Decoder bridge to implement in JRuby. @@ -13,7 +13,7 @@ public interface Decoder { * @param key {@link SocketAddress} * @param message Data {@link ByteBuf} for this address */ - void decode(SocketAddress key, ByteBuf message); + void decode(ChannelHandlerContext context, ByteBuf message); /** * Creates a copy of this decoder, that has all internal meta data cleared. diff --git a/src/main/java/org/logstash/tcp/InputLoop.java b/src/main/java/org/logstash/tcp/InputLoop.java index 07ca370..8229517 100644 --- a/src/main/java/org/logstash/tcp/InputLoop.java +++ b/src/main/java/org/logstash/tcp/InputLoop.java @@ -107,6 +107,7 @@ public void close() { * {@link Decoder}. */ private static final class InputHandler extends ChannelInitializer { + private final String SSL_HANDLER = "ssl-handler"; /** * {@link Decoder} supplied by JRuby. @@ -133,7 +134,7 @@ protected void initChannel(final SocketChannel channel) throws Exception { // if SSL is enabled, the SSL handler must be added to the pipeline first if (sslContext != null) { - channel.pipeline().addLast(sslContext.newHandler(channel.alloc())); + channel.pipeline().addLast(SSL_HANDLER, sslContext.newHandler(channel.alloc())); } channel.pipeline().addLast(new DecoderAdapter(localCopy, logger)); @@ -196,9 +197,11 @@ private static final class DecoderAdapter extends ChannelInboundHandlerAdapter { this.decoder = decoder; } + // 6.07 updated to pass in the full netty ChannelHandlerContext instead of the remoteaddress field + // corresponding interface updated @Override public void channelRead(final ChannelHandlerContext ctx, final Object msg) { - decoder.decode(ctx.channel().remoteAddress(), (ByteBuf) msg); + decoder.decode(ctx, (ByteBuf) msg); } @Override diff --git a/version b/version index d938f42..2036a71 100644 --- a/version +++ b/version @@ -1 +1 @@ -6.2.7 +6.2.8