Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logstash ingest pipeline - Data from one pipeline going to another pipeline/index. #437

Open
parthdmaniar opened this issue Dec 12, 2021 · 0 comments
Labels

Comments

@parthdmaniar
Copy link

Logstash information:

Please include the following information:

  1. Logstash version: 7.16.0 (running on Raspberry Pi 4 Model B Rev 1.1 with AArch64 [ARM64] using Ubuntu 20.04 LTS)
  2. Logstash installation source: APT
  3. How is Logstash being run: As a service using systemd
  4. How was the Logstash Plugin installed: Default plugin

JVM:
openjdk version "11.0.13" 2021-10-19
OpenJDK Runtime Environment Temurin-11.0.13+8 (build 11.0.13+8)
OpenJDK 64-Bit Server VM Temurin-11.0.13+8 (build 11.0.13+8, mixed mode)

OS version: Ubuntu 20.04 LTS (5.4.0-1047-raspi #52-Ubuntu SMP PREEMPT Wed Nov 24 08:16:38 UTC 2021 aarch64 aarch64 aarch64 GNU/Linux)

Description of the problem including expected versus actual behavior:
Expected: Input data should go its respective index
Error: Data from one pipeline which has a specific index is also going to another index. (two copies being created)

There are two different instances of Filebeat running on a single host. Both have different installation and configuration mechanisms.

Input is via filebeat:

Installation and persistence:

  1. Filebeat installed via APT (running configuration: /etc/filebeat/filebeat.yml) && referred to as cowrie-*
  2. Filebeat unzipped and made persistent via /etc/rc.local (running configuration: /home/user/filebeat2/filebeat.yml) && referred to as cowrie-firewall-*

Configuration:

  1. Both filebeat instances have their unique configurations. They are configured to send logs to Logstash on different ports

Logstash

  1. Logstash is running on a single host with different pipelines for each ingest.
  2. Logs being sent to pipeline "cowrie-" on port 5045 are visible in the index of pipeline "cowrie-logstash-" (pipeline.id: honeypot_ingest)
  3. Logs being sent to pipeline "cowrie-firewall*" on port 5055 are visible in the index of pipeline "cowrie-logstash-*" (pipeline.id: cowrie_firewall_ingest)
  4. Output section of the configuration for each configuration:

A. cowrie-*

input {
       # filebeats
       beats {
             port => 5054
             type => "cowrie"
             #id => "honeypot_ingest"
       }
filter {
    if [type] == "cowrie" {
        json {
output {
    if [type] == "cowrie" {
        elasticsearch {
            hosts => ["REDACTED","REDACTED"]
            #data_stream => true  #Causes Errors: added after reading this: https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html#plugins-outputs-elasticsearch-data_streamwhile diagnosing cowrie ingestion causing data duplication.
            index => "cowrie-logstash-%{+yyyy.MM.dd}"
            ssl => true
            user => 'REDACTED'
            password => 'REDACTED'
            cacert => '/etc/logstash/elasticsearch-ca.pem'
            ssl_certificate_verification => true
            ilm_enabled => auto
            ilm_rollover_alias => "cowrie-logstash"
        }
        #file {
        #    path => "/tmp/cowrie-logstash.log"
        #    codec => json
        #}
        #stdout {
            #codec => rubydebug
        #}
    }
}

B. Cowrie-Firewall-*

input {
  beats {
    port => 5055
    type => "logs"
  }
}

filter {
   grok {
   patterns_dir => ["/etc/logstash/patterns/"]
   match=> ["message", "%{HOSTNAME:hostname}.*?SRC=%{IPV4:source_ip} DST=%{IPV4:destination_ip} LEN=%{DATA:length} TOS=%{DATA:type_of_service} PREC=%{DATA:precedence} TTL=%{DATA:ttl} ID=%{DATA:unique_id} PROTO=%{DATA:protocol} SPT=%{DATA:source_port} DPT=%{DATA:destination_port} WINDOW=%{DATA:tcp_receive_size} RES=%{DATA:reserved_bits} %{DATA:tcp_flag} URGP=%{GREEDYDATA:urgent_flag}",
   "message","%{HOSTNAME:hostname}.*?SRC=%{IPV4:source_ip} DST=%{IPV4:destination_ip} LEN=%{DATA:length} TOS=%{DATA:type_of_service} PREC=%{DATA:precedence} TTL=%{DATA:ttl} ID=%{DATA:unique_id} %{DATA:tcp_flag} PROTO=%{DATA:protocol} SPT=%{DATA:source_port} DPT=%{DATA:destination_port} WINDOW=%{DATA:tcp_receive_size} RES=%{DATA:reserved_bits} %{DATA:packet_flag} URGP=%{GREEDYDATA:urgent_flag}",
   "message","%{HOSTNAME:hostname}.*?SRC=%{IPV4:source_ip} DST=%{IPV4:destination_ip} LEN=%{DATA:length} TOS=%{DATA:type_of_service} PREC=%{DATA:precedence} TTL=%{DATA:ttl} ID=%{NUMBER:unique_id} %{DATA:udp_flag} PROTO=%{DATA:protocol} SPT=%{DATA:source_port} DPT=%{DATA:destination_port} LEN=%{NUMBER:length}",
   "message","%{HOSTNAME:hostname}.*?SRC=%{IPV4:source_ip} DST=%{IPV4:destination_ip} LEN=%{DATA:length} TOS=%{DATA:type_of_service} PREC=%{DATA:precedence} TTL=%{DATA:ttl} ID=%{NUMBER:unique_id} PROTO=%{DATA:protocol} SPT=%{DATA:source_port} DPT=%{DATA:destination_port} LEN=%{NUMBER:length}"]
}

   geoip {
    source => "source_ip"
    target => "geoip"
    database => "/opt/logstash/vendor/geoip/GeoLite2-City.mmdb"

  }
}
output {
  elasticsearch {
   hosts => ["REDACTED","REDACTED"]
   index => "cowrie-firewall-logstash-%{+yyyy.MM.dd}"
   ssl => true
   user => 'REDACTED'
   password => 'REDACTED'
   cacert => '/etc/logstash/elasticsearch-ca.pem'
   ssl_certificate_verification => true
   ilm_enabled => auto
   ilm_rollover_alias => "cowrie-firewall-logstash"
  }
}

pipelines.yml


- pipeline.id: honeypot_ingest
  path.config: "/etc/logstash/conf.d/cowrie.conf"

- pipeline.id: cowrie_firewall_ingest
  path.config: "/etc/logstash/conf.d/cowrie_firewall.conf"

Steps to reproduce:

  1. On the ISP router configured inbound from ANY to port 5000-6000 send to logstash IP port 5000-6000
  2. Installed two distinct instances of Filebeat - one via APT and second via decompressing folder and using /etc/rc.local
  3. Verify that both filebeat instances are using their own configurations:

root         510       1  0 01:12 ?        00:00:18 /usr/share/filebeat/bin/filebeat --environment systemd -c /etc/filebeat/filebeat.yml --path.home /usr/share/filebeat --path.config /etc/filebeat --path.data /var/lib/filebeat --path.logs /var/log/filebeat
root         529       1  0 01:12 ?        00:00:37 /home/user/filebeat2/filebeat -c /home/user/filebeat2/filebeat.yml
root        9429    9113  0 08:57 pts/1    00:00:00 grep --color=auto filebeat
  1. Verify input configuration of each filebeat instance:

A. cowrie-logstash-* (filepath: /etc/filebeat/filebeat.yml)

  # Paths that should be crawled and fetched. Glob based paths.
  paths:
    #- /var/log/*.log
    - /srv/cowrie/var/log/cowrie/*.json
    - /home/ubuntu/logs/*.json
    #- c:\programdata\elasticsearch\logs\*

B. Cowrie-firewall-* (filepath: /home/user/filebeat2/filebeat.yml)

  # Paths that should be crawled and fetched. Glob based paths.
  paths:
    #- /var/log/*.log
    - /var/log/dshield.log
    #- /home/ubuntu/logs/*.json
    #- c:\programdata\elasticsearch\logs\*
  1. Verify output configurations for filebeat:
    A. cowrie-logstash-* (filepath: /etc/filebeat/filebeat.yml)
output.logstash:
  # The Logstash hosts
  #hosts: ["localhost:5044"]
  hosts: ["IP REDACTED:5054"]

B. Cowrie-firewall-* (filepath: /home/user/filebeat2/filebeat.yml)

output.logstash:
  # The Logstash hosts
  #hosts: ["localhost:5044"]
  hosts: ["IP REDACTED:5055"]

Provide logs (if relevant):
I can email the logs if need be.

Please refer to the thread: https://discuss.elastic.co/t/logstash-ingest-pipeline-data-from-one-pipeline-going-to-another/289131

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant