Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logstash-6.5.2 filter worker crash with Exception in pipelineworker #145

Open
rafaelma opened this issue Dec 10, 2018 · 3 comments
Open

Comments

@rafaelma
Copy link

rafaelma commented Dec 10, 2018

Hello

We have a problem with our Logstash installation and we think it is a bug.

Logstash version: 6.5.2
OS: Centos7

Description

One of the pipelines in our Logstash installation stops working with an "Exception in pipelineworker" error. The pipeline continues accepting events via the defined input but stops processing these events when they have been saved in a persisted queue. The persisted queue for the pipeline will keep growing with new events not processed by filters/output sections.

Cause

We have identified the cause of the problem. The filter worker will crash if one use a geoip filter and the geoip source value is an empty json object. This is triggered because sometimes the clients generate a log with an empty value for the attributte used by geoip/source.

Full errorlog

[2018-12-07T12:54:16,727][ERROR][logstash.pipeline        ] Exception in pipelineworker, the pipeline stopped processing new events, please check your filter configuration and restart Logstash. {:pipeline_id=>"test", "exception"=>"Index: 0, Size: 0", "backtrace"=>["java.util.ArrayList.rangeCheck(java/util/ArrayList.java:657)", "java.util.ArrayList.get(java/util/ArrayList.java:433)", "org.logstash.filters.GeoIPFilter.handleEvent(org/logstash/filters/GeoIPFilter.java:120)", "java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:498)", "org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(org/jruby/javasupport/JavaMethod.java:453)", "org.jruby.javasupport.JavaMethod.invokeDirect(org/jruby/javasupport/JavaMethod.java:314)", "usr.share.logstash.vendor.bundle.jruby.$2_dot_3_dot_0.gems.logstash_minus_filter_minus_geoip_minus_5_dot_0_dot_3_minus_java.lib.logstash.filters.geoip.filter(/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/lib/logstash/filters/geoip.rb:111)", "usr.share.logstash.vendor.bundle.jruby.$2_dot_3_dot_0.gems.logstash_minus_filter_minus_geoip_minus_5_dot_0_dot_3_minus_java.lib.logstash.filters.geoip.RUBY$method$filter$0$__VARARGS__(usr/share/logstash/vendor/bundle/jruby/$2_dot_3_dot_0/gems/logstash_minus_filter_minus_geoip_minus_5_dot_0_dot_3_minus_java/lib/logstash/filters//usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/lib/logstash/filters/geoip.rb)", "usr.share.logstash.logstash_minus_core.lib.logstash.filters.base.do_filter(/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb:143)", "usr.share.logstash.logstash_minus_core.lib.logstash.filters.base.RUBY$method$do_filter$0$__VARARGS__(usr/share/logstash/logstash_minus_core/lib/logstash/filters//usr/share/logstash/logstash-core/lib/logstash/filters/base.rb)", "usr.share.logstash.logstash_minus_core.lib.logstash.filters.base.block in multi_filter(/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb:162)", "org.jruby.RubyArray.each(org/jruby/RubyArray.java:1734)", "usr.share.logstash.logstash_minus_core.lib.logstash.filters.base.multi_filter(/usr/share/logstash/logstash-core/lib/logstash/filters/base.rb:159)", "usr.share.logstash.logstash_minus_core.lib.logstash.filters.base.RUBY$method$multi_filter$0$__VARARGS__(usr/share/logstash/logstash_minus_core/lib/logstash/filters//usr/share/logstash/logstash-core/lib/logstash/filters/base.rb)", "usr.share.logstash.logstash_minus_core.lib.logstash.filter_delegator.multi_filter(/usr/share/logstash/logstash-core/lib/logstash/filter_delegator.rb:44)", "RUBY.block in filter_func((eval):68)", "org.jruby.RubyProc.call(org/jruby/RubyProc.java:289)", "usr.share.logstash.logstash_minus_core.lib.logstash.pipeline.filter_batch(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:341)", "usr.share.logstash.logstash_minus_core.lib.logstash.pipeline.worker_loop(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:320)", "usr.share.logstash.logstash_minus_core.lib.logstash.pipeline.RUBY$method$worker_loop$0$__VARARGS__(usr/share/logstash/logstash_minus_core/lib/logstash//usr/share/logstash/logstash-core/lib/logstash/pipeline.rb)", "usr.share.logstash.logstash_minus_core.lib.logstash.pipeline.block in start_workers(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:286)", "org.jruby.RubyProc.call(org/jruby/RubyProc.java:289)", "org.jruby.RubyProc.call(org/jruby/RubyProc.java:246)", "java.lang.Thread.run(java/lang/Thread.java:748)"], :thread=>"#<Thread:0x4390727b sleep>"}

Pipeline configuration

cat /etc/logstash/logstash.yml
#
# logstash.yml
#
path.data: /var/lib/logstash
pipeline.batch.size: 1024
config.reload.automatic: true
config.reload.interval: 60s
queue.type: persisted
queue.drain: true
queue.max_bytes: 4gb
path.logs: /var/log/logstash

cat /etc/logstash/pipelines.yml
- pipeline.id: test
  path.config: "/etc/logstash/conf.d/test.conf"
  pipeline.workers: 2

cat /etc/logstash/conf.d/test.conf
input {
 beats {
   port => 5044
  add_field => { "pipeline_receiver" => "beats-receiver-5044"}
 }
}

filter{
  json {
    source => "message"
  }
 geoip { 
    source => "[MachineInfo][ipv4_address]"
 }
}

output{

 file {
   path => "/tmp/test_pipeline.txt"
 }
}

Sample Data

{"MachineInfo":{"ipv4_address":["8.8.8.8"],"json_test":"json test"},"comment":"Testing-1"}
{"MachineInfo":{"json_test":"json test"},"comment":"Testing-2"}
{"MachineInfo":{"ipv4_address":[],"json_test":"json test"},"comment":"Testing-3"}

Steps to Reproduce

  • Use filebeat to send the sample data to a logstash instance
  • Update the logfile defined in the filebeat input with the sample data lines one at the time
  • First and second log will be processed and saved into a file
  • The third log will crash the filter worker with an "Exception in pipelineworker" error
  • The pipeline will continue accepting logs and save them to a persisted queue but they will not be processed after this.

Ref: https://discuss.elastic.co/t/logstash-6-5-2-filter-worker-crash-with-exception-in-pipelineworker/159933

@jsvd
Copy link
Member

jsvd commented Dec 10, 2018

this can be replicated with:

 /tmp/logstash-6.5.2 % echo '{"array": []}' | bin/logstash -e "input { stdin { codec => json } } filter { geoip { source => 'array' } }"
Sending Logstash logs to /tmp/logstash-6.5.2/logs which is now configured via log4j2.properties
[2018-12-10T13:32:33,059][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2018-12-10T13:32:33,078][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"6.5.2"}
[2018-12-10T13:32:35,959][INFO ][logstash.pipeline        ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2018-12-10T13:32:35,989][INFO ][logstash.filters.geoip   ] Using geoip database {:path=>"/tmp/logstash-6.5.2/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/vendor/GeoLite2-City.mmdb"}
[2018-12-10T13:32:36,067][INFO ][logstash.inputs.stdin    ] Automatically switching from json to json_lines codec {:plugin=>"stdin"}
[2018-12-10T13:32:36,105][INFO ][logstash.pipeline        ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x2b9cd752 run>"}
[2018-12-10T13:32:36,151][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2018-12-10T13:32:36,355][ERROR][logstash.pipeline        ] Exception in pipelineworker, the pipeline stopped processing new events, please check your filter configuration and restart Logstash. {:pipeline_id=>"main", "exception"=>"Index: 0, Size: 0", "backtrace"=>["java.util.ArrayList.rangeCheck(java/util/ArrayList.java:657)", "java.util.ArrayList.get(java/util/ArrayList.java:433)", "org.logstash.filters.GeoIPFilter.handleEvent(org/logstash/filters/GeoIPFilter.java:120)", "java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:498)", "org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(org/jruby/javasupport/JavaMethod.java:423)", "org.jruby.javasupport.JavaMethod.invokeDirect(org/jruby/javasupport/JavaMethod.java:290)", "tmp.logstash_minus_6_dot_5_dot_2.vendor.bundle.jruby.$2_dot_3_dot_0.gems.logstash_minus_filter_minus_geoip_minus_5_dot_0_dot_3_minus_java.lib.logstash.filters.geoip.filter(/tmp/logstash-6.5.2/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/lib/logstash/filters/geoip.rb:111)", "tmp.logstash_minus_6_dot_5_dot_2.vendor.bundle.jruby.$2_dot_3_dot_0.gems.logstash_minus_filter_minus_geoip_minus_5_dot_0_dot_3_minus_java.lib.logstash.filters.geoip.RUBY$method$filter$0$__VARARGS__(tmp/logstash_minus_6_dot_5_dot_2/vendor/bundle/jruby/$2_dot_3_dot_0/gems/logstash_minus_filter_minus_geoip_minus_5_dot_0_dot_3_minus_java/lib/logstash/filters//tmp/logstash-6.5.2/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/lib/logstash/filters/geoip.rb)", "tmp.logstash_minus_6_dot_5_dot_2.logstash_minus_core.lib.logstash.filters.base.do_filter(/tmp/logstash-6.5.2/logstash-core/lib/logstash/filters/base.rb:143)", "tmp.logstash_minus_6_dot_5_dot_2.logstash_minus_core.lib.logstash.filters.base.RUBY$method$do_filter$0$__VARARGS__(tmp/logstash_minus_6_dot_5_dot_2/logstash_minus_core/lib/logstash/filters//tmp/logstash-6.5.2/logstash-core/lib/logstash/filters/base.rb)", "tmp.logstash_minus_6_dot_5_dot_2.logstash_minus_core.lib.logstash.filters.base.block in multi_filter(/tmp/logstash-6.5.2/logstash-core/lib/logstash/filters/base.rb:162)", "org.jruby.RubyArray.each(org/jruby/RubyArray.java:1734)", "org.jruby.RubyArray$INVOKER$i$0$0$each.call(org/jruby/RubyArray$INVOKER$i$0$0$each.gen)", "tmp.logstash_minus_6_dot_5_dot_2.logstash_minus_core.lib.logstash.filters.base.multi_filter(/tmp/logstash-6.5.2/logstash-core/lib/logstash/filters/base.rb:159)", "tmp.logstash_minus_6_dot_5_dot_2.logstash_minus_core.lib.logstash.filters.base.RUBY$method$multi_filter$0$__VARARGS__(tmp/logstash_minus_6_dot_5_dot_2/logstash_minus_core/lib/logstash/filters//tmp/logstash-6.5.2/logstash-core/lib/logstash/filters/base.rb)", "tmp.logstash_minus_6_dot_5_dot_2.logstash_minus_core.lib.logstash.filter_delegator.multi_filter(/tmp/logstash-6.5.2/logstash-core/lib/logstash/filter_delegator.rb:44)", "RUBY.block in filter_func((eval):42)", "org.jruby.RubyProc.call(org/jruby/RubyProc.java:289)", "tmp.logstash_minus_6_dot_5_dot_2.logstash_minus_core.lib.logstash.pipeline.filter_batch(/tmp/logstash-6.5.2/logstash-core/lib/logstash/pipeline.rb:341)", "RUBY.worker_loop(/tmp/logstash-6.5.2/logstash-core/lib/logstash/pipeline.rb:320)", "tmp.logstash_minus_6_dot_5_dot_2.logstash_minus_core.lib.logstash.pipeline.block in start_workers(/tmp/logstash-6.5.2/logstash-core/lib/logstash/pipeline.rb:286)", "org.jruby.RubyProc.call(org/jruby/RubyProc.java:289)", "org.jruby.RubyProc.call(org/jruby/RubyProc.java:246)", "java.lang.Thread.run(java/lang/Thread.java:748)"], :thread=>"#<Thread:0x2b9cd752 sleep>"}
Exception in thread "Ruby-0-Thread-6: :1" java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
	at java.util.ArrayList.rangeCheck(java/util/ArrayList.java:657)
	at java.util.ArrayList.get(java/util/ArrayList.java:433)
	at org.logstash.filters.GeoIPFilter.handleEvent(org/logstash/filters/GeoIPFilter.java:120)
	at java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:498)
	at org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(org/jruby/javasupport/JavaMethod.java:423)
	at org.jruby.javasupport.JavaMethod.invokeDirect(org/jruby/javasupport/JavaMethod.java:290)
	at tmp.logstash_minus_6_dot_5_dot_2.vendor.bundle.jruby.$2_dot_3_dot_0.gems.logstash_minus_filter_minus_geoip_minus_5_dot_0_dot_3_minus_java.lib.logstash.filters.geoip.filter(/tmp/logstash-6.5.2/vendor/bundle/jruby/2.3.0/gems/logstash-filter-geoip-5.0.3-java/lib/logstash/filters/geoip.rb:111)

The issue seems to be around for some time now

@inammathe
Copy link

inammathe commented Jan 10, 2019

could this be avoided with something like this?

public boolean handleEvent(RubyEvent rubyEvent) {
    final Event event = rubyEvent.getEvent();
    Object input = event.getField(sourceField);
    if (input == null) {
      return false;
    }
    String ip;

    if (input instanceof List) {
+      if (input.isEmpty()) {
+          return false;
+      }
      ip = (String) ((List) input).get(0);

in

public boolean handleEvent(RubyEvent rubyEvent) {

@inammathe
Copy link

I think I am right with the above..
this workaround in my filter worked for me:

ruby {
    code => "event.remove('your_field') if event.get('[your_field]').empty?"
}

if [your_field]
{
    geoip { source => "your_field" }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants