Skip to content

Commit

Permalink
Fix SSL Subject regression in server mode (#199)
Browse files Browse the repository at this point in the history
* Fix SSL Subject regression in server mode

Alters the TCP#decode_buffer signature to accept the _ssl subject_ instead of
expecting a ruby socket, so that it can be interoperable between the ruby-based
client mode and the netty-powered server mode.

In server mode, the SSL subject is extracted _once_ when initializing the
connection IFF SSL is enabled and verification is turned on.

Co-authored-by: Will Weber <[email protected]>
Closes: #159

* include docker jdk17 env

Co-authored-by: Will Weber <[email protected]>
  • Loading branch information
yaauie and rwaweber authored Oct 12, 2022
1 parent bca64db commit 3663173
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 22 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ import:

env:
jobs:
- ELASTIC_STACK_VERSION=8.x
- SNAPSHOT=true ELASTIC_STACK_VERSION=8.x
- ELASTIC_STACK_VERSION=8.x DOCKER_ENV=dockerjdk17.env
- SNAPSHOT=true ELASTIC_STACK_VERSION=8.x DOCKER_ENV=dockerjdk17.env
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 6.3.1
- Fixes a regression in which the ssl_subject was missing for SSL-secured connections in server mode [#199](https://github.com/logstash-plugins/logstash-input-tcp/pull/199)

## 6.3.0
- Feat: ssl_supported_protocols (TLSv1.3) + ssl_cipher_suites [#198](https://github.com/logstash-plugins/logstash-input-tcp/pull/198)

Expand Down
20 changes: 12 additions & 8 deletions lib/logstash/inputs/tcp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -190,19 +190,19 @@ def close
end

def decode_buffer(client_ip_address, client_address, client_port, codec, proxy_address,
proxy_port, tbuf, socket)
proxy_port, tbuf, ssl_subject)
codec.decode(tbuf) do |event|
if @proxy_protocol
event.set(@field_proxy_host, proxy_address) unless event.get(@field_proxy_host)
event.set(@field_proxy_port, proxy_port) unless event.get(@field_proxy_port)
end
enqueue_decorated(event, client_ip_address, client_address, client_port, socket)
enqueue_decorated(event, client_ip_address, client_address, client_port, ssl_subject)
end
end

def flush_codec(codec, client_ip_address, client_address, client_port, socket)
def flush_codec(codec, client_ip_address, client_address, client_port, ssl_subject)
codec.flush do |event|
enqueue_decorated(event, client_ip_address, client_address, client_port, socket)
enqueue_decorated(event, client_ip_address, client_address, client_port, ssl_subject)
end
end

Expand All @@ -222,10 +222,14 @@ def run_client()
client_socket.close rescue nil
end

# only called in client mode
def handle_socket(socket)
client_address = socket.peeraddr[3]
client_ip_address = socket.peeraddr[2]
client_port = socket.peeraddr[1]

# Client mode sslsubject extraction, server mode happens in DecoderImpl#decode
ssl_subject = socket.peer_cert.subject.to_s if @ssl_enable && @ssl_verify
peer = "#{client_address}:#{client_port}"
first_read = true
codec = @codec.clone
Expand All @@ -249,7 +253,7 @@ def handle_socket(socket)
end
end
decode_buffer(client_ip_address, client_address, client_port, codec, proxy_address,
proxy_port, tbuf, socket)
proxy_port, tbuf, ssl_subject)
end
rescue EOFError
@logger.debug? && @logger.debug("Connection closed", :client => peer)
Expand All @@ -263,14 +267,14 @@ def handle_socket(socket)
ensure
# catch all rescue nil on close to discard any close errors or invalid socket
socket.close rescue nil
flush_codec(codec, client_ip_address, client_address, client_port, socket)
flush_codec(codec, client_ip_address, client_address, client_port, ssl_subject)
end

def enqueue_decorated(event, client_ip_address, client_address, client_port, socket)
def enqueue_decorated(event, client_ip_address, client_address, client_port, ssl_subject)
event.set(@field_host, client_address) unless event.get(@field_host)
event.set(@field_host_ip, client_ip_address) unless event.get(@field_host_ip)
event.set(@field_port, client_port) unless event.get(@field_port)
event.set(@field_sslsubject, socket.peer_cert.subject.to_s) if socket && @ssl_enable && @ssl_verify && event.get(@field_sslsubject).nil?
event.set(@field_sslsubject, ssl_subject) unless ssl_subject.nil? || event.get(@field_sslsubject)
decorate(event)
@output_queue << event
end
Expand Down
22 changes: 17 additions & 5 deletions lib/logstash/inputs/tcp/decoder_impl.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,30 @@ def initialize(codec, tcp)
@first_read = true
end

def decode(channel_addr, data)
def decode(ctx, data)
channel = ctx.channel()
bytes = Java::byte[data.readableBytes].new
data.getBytes(0, bytes)
data.release
tbuf = String.from_java_bytes bytes, "ASCII-8BIT"
if @first_read
tbuf = init_first_read(channel_addr, tbuf)
tbuf = init_first_read(channel, tbuf)
end
@tcp.decode_buffer(@ip_address, @address, @port, @codec,
@proxy_address, @proxy_port, tbuf, nil)
@proxy_address, @proxy_port, tbuf, @sslsubject)
end

def copy
self.class.new(@codec.clone, @tcp)
end

def flush
@tcp.flush_codec(@codec, @ip_address, @address, @port, nil)
@tcp.flush_codec(@codec, @ip_address, @address, @port, @sslsubject)
end

private
def init_first_read(channel_addr, received)
def init_first_read(channel, received)
channel_addr = channel.remoteAddress()
if @tcp.proxy_protocol
pp_hdr, filtered = received.split("\r\n", 2)
pp_info = pp_hdr.split(/\s/)
Expand All @@ -53,10 +55,20 @@ def init_first_read(channel_addr, received)
@address = extract_host_name(channel_addr) # name _or_ address of sender
@port = channel_addr.get_port # outgoing port of sender (probably random)
end
@sslsubject = extract_sslsubject(channel)
@first_read = false
filtered
end

private
def extract_sslsubject(channel)
return nil unless @tcp.ssl_enable && @tcp.ssl_verify

channel.pipeline().get("ssl-handler").engine().getSession().getPeerPrincipal().getName()
rescue Exception => e
nil
end

private
def extract_host_name(channel_addr)
channel_addr = java.net.InetSocketAddress.new(channel_addr, 0) if channel_addr.kind_of?(String)
Expand Down
39 changes: 37 additions & 2 deletions spec/inputs/tcp_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -541,15 +541,18 @@ def get_port
end
end

describe "#receive" do
describe "#receive", :ecs_compatibility_support do
shared_examples "receiving events" do
# TODO(sissel): Implement normal event-receipt tests as as a shared example
end

context "when ssl_enable is true" do
let(:input) { subject }
let(:queue) { Queue.new }
before(:each) { subject.register }
before(:each) do
allow_any_instance_of(described_class).to receive(:ecs_compatibility).and_return(ecs_compatibility) if defined?(ecs_compatibility)
subject.register
end

context "when using a certificate chain" do
chain_of_certificates = TcpHelpers.new.chain_of_certificates
Expand Down Expand Up @@ -651,6 +654,38 @@ def get_port
end
end

context "with a regular TLS setup" do
let(:config) do
{
"host" => "127.0.0.1",
"port" => port,
"ssl_enable" => true,
"ssl_cert" => chain_of_certificates[:b_cert].path,
"ssl_key" => chain_of_certificates[:b_key].path,
"ssl_extra_chain_certs" => [ chain_of_certificates[:a_cert].path ],
"ssl_certificate_authorities" => [ chain_of_certificates[:root_ca].path ],
"ssl_verify" => true
}
end

ecs_compatibility_matrix(:disabled,:v1, :v8 => :v1) do |ecs_select|
it "extracts the TLS subject from connections" do
result = TcpHelpers.pipelineless_input(subject, 1) do
sslsocket.connect
sslsocket.write("#{message}\n")
tcp.flush
sslsocket.close
tcp.close
end
expect(result.size).to eq(1)
event = result.first

ssl_subject_field = ecs_select[disabled: 'sslsubject', v1:'[@metadata][input][tcp][tls][client][subject]']
expect(event.get(ssl_subject_field)).to eq("CN=RubyAA_Cert,DC=ruby-lang,DC=org")
end
end
end

context "with enforced protocol version" do
let(:config) do
base_config.merge 'ssl_supported_protocols' => [ tls_version ]
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/logstash/tcp/Decoder.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.logstash.tcp;

import io.netty.buffer.ByteBuf;
import java.net.SocketAddress;
import io.netty.channel.ChannelHandlerContext;

/**
* Decoder bridge to implement in JRuby.
Expand All @@ -13,7 +13,7 @@ public interface Decoder {
* @param key {@link SocketAddress}
* @param message Data {@link ByteBuf} for this address
*/
void decode(SocketAddress key, ByteBuf message);
void decode(ChannelHandlerContext context, ByteBuf message);

/**
* Creates a copy of this decoder, that has all internal meta data cleared.
Expand Down
7 changes: 5 additions & 2 deletions src/main/java/org/logstash/tcp/InputLoop.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public void close() {
* {@link Decoder}.
*/
private static final class InputHandler extends ChannelInitializer<SocketChannel> {
private final String SSL_HANDLER = "ssl-handler";

/**
* {@link Decoder} supplied by JRuby.
Expand All @@ -133,7 +134,7 @@ protected void initChannel(final SocketChannel channel) throws Exception {

// if SSL is enabled, the SSL handler must be added to the pipeline first
if (sslContext != null) {
channel.pipeline().addLast(sslContext.newHandler(channel.alloc()));
channel.pipeline().addLast(SSL_HANDLER, sslContext.newHandler(channel.alloc()));
}

channel.pipeline().addLast(new DecoderAdapter(localCopy, logger));
Expand Down Expand Up @@ -196,9 +197,11 @@ private static final class DecoderAdapter extends ChannelInboundHandlerAdapter {
this.decoder = decoder;
}

// 6.07 updated to pass in the full netty ChannelHandlerContext instead of the remoteaddress field
// corresponding interface updated
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
decoder.decode(ctx.channel().remoteAddress(), (ByteBuf) msg);
decoder.decode(ctx, (ByteBuf) msg);
}

@Override
Expand Down
2 changes: 1 addition & 1 deletion version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
6.3.0
6.3.1

0 comments on commit 3663173

Please sign in to comment.