Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

out_kafka: add hash option #2268

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

novegit
Copy link
Contributor

@novegit novegit commented Jun 16, 2020

this PR enhances plugin out_kafka with 'hash' option. See fluent/fluent-bit-docs#321

If option 'hash' is set, an uniqe hash is added to each message

configuration example:

  Hash                 On
  Hash_Key             _myid

the commit was tested in docker/openshift environment

messages in kafka with hash field '_myid':

{"@timestamp":1592327982.411502,"cpu_p":1.1,"user_p":0.3666666666666666,"system_p":0.7333333333333333,"cpu0.p_cpu":1.4,"cpu0.p_user":0.4,"cpu0.p_system":1,"cpu1.p_cpu":1.2,"cpu1.p_user":0.4,"cpu1.p_system":0.8,"cpu2.p_cpu":2,"cpu2.p_user":1,"cpu2.p_system":1,"cpu3.p_cpu":1,"cpu3.p_user":0.2,"cpu3.p_system":0.8,"cpu4.p_cpu":0.6,"cpu4.p_user":0,"cpu4.p_system":0.6,"cpu5.p_cpu":0.8,"cpu5.p_user":0.2,"cpu5.p_system":0.6,"_myid":"D839ED0D670D7000"}
{"@timestamp":1592327987.411792,"cpu_p":0.9666666666666666,"user_p":0.2666666666666667,"system_p":0.7,"cpu0.p_cpu":1.2,"cpu0.p_user":0.4,"cpu0.p_system":0.8,"cpu1.p_cpu":0.8,"cpu1.p_user":0.2,"cpu1.p_system":0.6,"cpu2.p_cpu":1.2,"cpu2.p_user":0.4,"cpu2.p_system":0.8,"cpu3.p_cpu":0.8,"cpu3.p_user":0.2,"cpu3.p_system":0.6,"cpu4.p_cpu":0.8,"cpu4.p_user":0.2,"cpu4.p_system":0.6,"cpu5.p_cpu":1,"cpu5.p_user":0.2,"cpu5.p_system":0.8,"_myid":"788F98C1E1632000"}
{"@timestamp":1592327992.377262,"cpu_p":0.9666666666666666,"user_p":0.3666666666666666,"system_p":0.6,"cpu0.p_cpu":1.2,"cpu0.p_user":0.4,"cpu0.p_system":0.8,"cpu1.p_cpu":0.6,"cpu1.p_user":0.2,"cpu1.p_system":0.4,"cpu2.p_cpu":1.2,"cpu2.p_user":0.6,"cpu2.p_system":0.6,"cpu3.p_cpu":1,"cpu3.p_user":0.4,"cpu3.p_system":0.6,"cpu4.p_cpu":0.4,"cpu4.p_user":0.2,"cpu4.p_system":0.2,"cpu5.p_cpu":1.2,"cpu5.p_user":0.4,"cpu5.p_system":0.8,"_myid":"4321FF0183AC4000"}
{"@timestamp":1592327997.378218,"cpu_p":1.1,"user_p":0.3666666666666666,"system_p":0.7333333333333333,"cpu0.p_cpu":1.6,"cpu0.p_user":0.4,"cpu0.p_system":1.2,"cpu1.p_cpu":0.8,"cpu1.p_user":0.4,"cpu1.p_system":0.4,"cpu2.p_cpu":1,"cpu2.p_user":0.4,"cpu2.p_system":0.6,"cpu3.p_cpu":1.2,"cpu3.p_user":0.4,"cpu3.p_system":0.8,"cpu4.p_cpu":0.8,"cpu4.p_user":0.2,"cpu4.p_system":0.6,"cpu5.p_cpu":1.4,"cpu5.p_user":0.4,"cpu5.p_system":1,"_myid":"207661B77DBA7A00"}

Signed-off-by: Michael Voelker [email protected]


Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
  • Debug log output from testing the change
  • Attached Valgrind output that shows no leaks or memory corruption was found

Documentation

  • Documentation required for this feature

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

@novegit
Copy link
Contributor Author

novegit commented Jun 16, 2020

fix for #2139

@novegit
Copy link
Contributor Author

novegit commented Jun 16, 2020

Configuration file:

# fluentbit conf
[SERVICE]
    # Flush
    # =====
    # Set an interval of seconds before to flush records to a destination
    Flush        5

    # Daemon
    # ======
    # Instruct Fluent Bit to run in foreground or background mode.
    Daemon       Off

    # Log_Level
    # =========
    # Set the verbosity level of the service, values can be:
    #
    # - error
    # - warning
    # - info
    # - debug
    # - trace
    #
    # By default 'info' is set, that means it includes 'error' and 'warning'.
    Log_Level    info

    # Parsers_File
    # ============
    # Specify an optional 'Parsers' configuration file
    Parsers_File parsers.conf
    Plugins_File plugins.conf

    # HTTP Server
    # ===========
    # Enable/Disable the built-in HTTP Server for metrics
    HTTP_Server  On
    HTTP_Listen  0.0.0.0
    HTTP_Port    2020

[INPUT]
    Name cpu
    Tag  cpu.local
    # Interval Sec
    # ====
    # Read interval (sec) Default: 1
    Interval_Sec 5

[OUTPUT]
    Name  stdout
    Match *

[OUTPUT]
    Name        kafka
    Match       *
    Brokers     172.16.1.3:9092
    #Brokers     localhost:9092

    Topics      testacl
    Hash        On
    Hash_Key    _myid

Logoutput of fluentbit:

Fluent Bit v1.5.0
* Copyright (C) 2019-2020 The Fluent Bit Authors
* Copyright (C) 2015-2018 Treasure Data
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

[2020/06/16 17:19:32] [ info] [storage] version=1.0.4, initializing...
[2020/06/16 17:19:32] [ info] [storage] in-memory
[2020/06/16 17:19:32] [ info] [storage] normal synchronization mode, checksum disabled, max_chunks_up=128
[2020/06/16 17:19:32] [ info] [engine] started (pid=1)
[2020/06/16 17:19:32] [ info] [output:kafka:kafka.1] brokers='172.16.1.3:9092' topics='testacl'
[2020/06/16 17:19:32] [ info] [http_server] listen iface=0.0.0.0 tcp_port=2020
[2020/06/16 17:19:32] [ info] [sp] stream processor started
[0] cpu.local: [1592327977.411774300, {"cpu_p"=>0.833333, "user_p"=>0.233333, "system_p"=>0.600000, "cpu0.p_cpu"=>0.800000, "cpu0.p_user"=>0.200000, "cpu0.p_system"=>0.600000, "cpu1.p_cpu"=>1.200000, "cpu1.p_user"=>0.400000, "cpu1.p_system"=>0.800000, "cpu2.p_cpu"=>1.000000, "cpu2.p_user"=>0.200000, "cpu2.p_system"=>0.800000, "cpu3.p_cpu"=>1.000000, "cpu3.p_user"=>0.400000, "cpu3.p_system"=>0.600000, "cpu4.p_cpu"=>0.200000, "cpu4.p_user"=>0.000000, "cpu4.p_system"=>0.200000, "cpu5.p_cpu"=>0.400000, "cpu5.p_user"=>0.200000, "cpu5.p_system"=>0.200000}]
[0] cpu.local: [1592327982.411501600, {"cpu_p"=>1.100000, "user_p"=>0.366667, "system_p"=>0.733333, "cpu0.p_cpu"=>1.400000, "cpu0.p_user"=>0.400000, "cpu0.p_system"=>1.000000, "cpu1.p_cpu"=>1.200000, "cpu1.p_user"=>0.400000, "cpu1.p_system"=>0.800000, "cpu2.p_cpu"=>2.000000, "cpu2.p_user"=>1.000000, "cpu2.p_system"=>1.000000, "cpu3.p_cpu"=>1.000000, "cpu3.p_user"=>0.200000, "cpu3.p_system"=>0.800000, "cpu4.p_cpu"=>0.600000, "cpu4.p_user"=>0.000000, "cpu4.p_system"=>0.600000, "cpu5.p_cpu"=>0.800000, "cpu5.p_user"=>0.200000, "cpu5.p_system"=>0.600000}]
[0] cpu.local: [1592327987.411791800, {"cpu_p"=>0.966667, "user_p"=>0.266667, "system_p"=>0.700000, "cpu0.p_cpu"=>1.200000, "cpu0.p_user"=>0.400000, "cpu0.p_system"=>0.800000, "cpu1.p_cpu"=>0.800000, "cpu1.p_user"=>0.200000, "cpu1.p_system"=>0.600000, "cpu2.p_cpu"=>1.200000, "cpu2.p_user"=>0.400000, "cpu2.p_system"=>0.800000, "cpu3.p_cpu"=>0.800000, "cpu3.p_user"=>0.200000, "cpu3.p_system"=>0.600000, "cpu4.p_cpu"=>0.800000, "cpu4.p_user"=>0.200000, "cpu4.p_system"=>0.600000, "cpu5.p_cpu"=>1.000000, "cpu5.p_user"=>0.200000, "cpu5.p_system"=>0.800000}]
[0] cpu.local: [1592327992.377261700, {"cpu_p"=>0.966667, "user_p"=>0.366667, "system_p"=>0.600000, "cpu0.p_cpu"=>1.200000, "cpu0.p_user"=>0.400000, "cpu0.p_system"=>0.800000, "cpu1.p_cpu"=>0.600000, "cpu1.p_user"=>0.200000, "cpu1.p_system"=>0.400000, "cpu2.p_cpu"=>1.200000, "cpu2.p_user"=>0.600000, "cpu2.p_system"=>0.600000, "cpu3.p_cpu"=>1.000000, "cpu3.p_user"=>0.400000, "cpu3.p_system"=>0.600000, "cpu4.p_cpu"=>0.400000, "cpu4.p_user"=>0.200000, "cpu4.p_system"=>0.200000, "cpu5.p_cpu"=>1.200000, "cpu5.p_user"=>0.400000, "cpu5.p_system"=>0.800000}]
[0] cpu.local: [1592327997.378218000, {"cpu_p"=>1.100000, "user_p"=>0.366667, "system_p"=>0.733333, "cpu0.p_cpu"=>1.600000, "cpu0.p_user"=>0.400000, "cpu0.p_system"=>1.200000, "cpu1.p_cpu"=>0.800000, "cpu1.p_user"=>0.400000, "cpu1.p_system"=>0.400000, "cpu2.p_cpu"=>1.000000, "cpu2.p_user"=>0.400000, "cpu2.p_system"=>0.600000, "cpu3.p_cpu"=>1.200000, "cpu3.p_user"=>0.400000, "cpu3.p_system"=>0.800000, "cpu4.p_cpu"=>0.800000, "cpu4.p_user"=>0.200000, "cpu4.p_system"=>0.600000, "cpu5.p_cpu"=>1.400000, "cpu5.p_user"=>0.400000, "cpu5.p_system"=>1.000000}]
[0] cpu.local: [1592328002.377657700, {"cpu_p"=>0.966667, "user_p"=>0.333333, "system_p"=>0.633333, "cpu0.p_cpu"=>1.000000, "cpu0.p_user"=>0.400000, "cpu0.p_system"=>0.600000, "cpu1.p_cpu"=>0.800000, "cpu1.p_user"=>0.400000, "cpu1.p_system"=>0.400000, "cpu2.p_cpu"=>1.000000, "cpu2.p_user"=>0.400000, "cpu2.p_system"=>0.600000, "cpu3.p_cpu"=>1.000000, "cpu3.p_user"=>0.200000, "cpu3.p_system"=>0.800000, "cpu4.p_cpu"=>1.000000, "cpu4.p_user"=>0.200000, "cpu4.p_system"=>0.800000, "cpu5.p_cpu"=>0.600000, "cpu5.p_user"=>0.200000, "cpu5.p_system"=>0.400000}]
[0] cpu.local: [1592328007.377676300, {"cpu_p"=>1.166667, "user_p"=>0.400000, "system_p"=>0.766667, "cpu0.p_cpu"=>1.000000, "cpu0.p_user"=>0.400000, "cpu0.p_system"=>0.600000, "cpu1.p_cpu"=>1.000000, "cpu1.p_user"=>0.400000, "cpu1.p_system"=>0.600000, "cpu2.p_cpu"=>1.600000, "cpu2.p_user"=>0.600000, "cpu2.p_system"=>1.000000, "cpu3.p_cpu"=>0.600000, "cpu3.p_user"=>0.200000, "cpu3.p_system"=>0.400000, "cpu4.p_cpu"=>1.600000, "cpu4.p_user"=>0.600000, "cpu4.p_system"=>1.000000, "cpu5.p_cpu"=>1.400000, "cpu5.p_user"=>0.400000, "cpu5.p_system"=>1.000000}]

messages in kafka:

{"@timestamp":1592327977.411774,"cpu_p":0.8333333333333334,"user_p":0.2333333333333333,"system_p":0.6,"cpu0.p_cpu":0.8,"cpu0.p_user":0.2,"cpu0.p_system":0.6,"cpu1.p_cpu":1.2,"cpu1.p_user":0.4,"cpu1.p_system":0.8,"cpu2.p_cpu":1,"cpu2.p_user":0.2,"cpu2.p_system":0.8,"cpu3.p_cpu":1,"cpu3.p_user":0.4,"cpu3.p_system":0.6,"cpu4.p_cpu":0.2,"cpu4.p_user":0,"cpu4.p_system":0.2,"cpu5.p_cpu":0.4,"cpu5.p_user":0.2,"cpu5.p_system":0.2,"_myid":"6A101E76A6DD2C00"}
{"@timestamp":1592327982.411502,"cpu_p":1.1,"user_p":0.3666666666666666,"system_p":0.7333333333333333,"cpu0.p_cpu":1.4,"cpu0.p_user":0.4,"cpu0.p_system":1,"cpu1.p_cpu":1.2,"cpu1.p_user":0.4,"cpu1.p_system":0.8,"cpu2.p_cpu":2,"cpu2.p_user":1,"cpu2.p_system":1,"cpu3.p_cpu":1,"cpu3.p_user":0.2,"cpu3.p_system":0.8,"cpu4.p_cpu":0.6,"cpu4.p_user":0,"cpu4.p_system":0.6,"cpu5.p_cpu":0.8,"cpu5.p_user":0.2,"cpu5.p_system":0.6,"_myid":"D839ED0D670D7000"}
{"@timestamp":1592327987.411792,"cpu_p":0.9666666666666666,"user_p":0.2666666666666667,"system_p":0.7,"cpu0.p_cpu":1.2,"cpu0.p_user":0.4,"cpu0.p_system":0.8,"cpu1.p_cpu":0.8,"cpu1.p_user":0.2,"cpu1.p_system":0.6,"cpu2.p_cpu":1.2,"cpu2.p_user":0.4,"cpu2.p_system":0.8,"cpu3.p_cpu":0.8,"cpu3.p_user":0.2,"cpu3.p_system":0.6,"cpu4.p_cpu":0.8,"cpu4.p_user":0.2,"cpu4.p_system":0.6,"cpu5.p_cpu":1,"cpu5.p_user":0.2,"cpu5.p_system":0.8,"_myid":"788F98C1E1632000"}
{"@timestamp":1592327992.377262,"cpu_p":0.9666666666666666,"user_p":0.3666666666666666,"system_p":0.6,"cpu0.p_cpu":1.2,"cpu0.p_user":0.4,"cpu0.p_system":0.8,"cpu1.p_cpu":0.6,"cpu1.p_user":0.2,"cpu1.p_system":0.4,"cpu2.p_cpu":1.2,"cpu2.p_user":0.6,"cpu2.p_system":0.6,"cpu3.p_cpu":1,"cpu3.p_user":0.4,"cpu3.p_system":0.6,"cpu4.p_cpu":0.4,"cpu4.p_user":0.2,"cpu4.p_system":0.2,"cpu5.p_cpu":1.2,"cpu5.p_user":0.4,"cpu5.p_system":0.8,"_myid":"4321FF0183AC4000"}
{"@timestamp":1592327997.378218,"cpu_p":1.1,"user_p":0.3666666666666666,"system_p":0.7333333333333333,"cpu0.p_cpu":1.6,"cpu0.p_user":0.4,"cpu0.p_system":1.2,"cpu1.p_cpu":0.8,"cpu1.p_user":0.4,"cpu1.p_system":0.4,"cpu2.p_cpu":1,"cpu2.p_user":0.4,"cpu2.p_system":0.6,"cpu3.p_cpu":1.2,"cpu3.p_user":0.4,"cpu3.p_system":0.8,"cpu4.p_cpu":0.8,"cpu4.p_user":0.2,"cpu4.p_system":0.6,"cpu5.p_cpu":1.4,"cpu5.p_user":0.4,"cpu5.p_system":1,"_myid":"207661B77DBA7A00"}
{"@timestamp":1592328002.377658,"cpu_p":0.9666666666666666,"user_p":0.3333333333333334,"system_p":0.6333333333333333,"cpu0.p_cpu":1,"cpu0.p_user":0.4,"cpu0.p_system":0.6,"cpu1.p_cpu":0.8,"cpu1.p_user":0.4,"cpu1.p_system":0.4,"cpu2.p_cpu":1,"cpu2.p_user":0.4,"cpu2.p_system":0.6,"cpu3.p_cpu":1,"cpu3.p_user":0.2,"cpu3.p_system":0.8,"cpu4.p_cpu":1,"cpu4.p_user":0.2,"cpu4.p_system":0.8,"cpu5.p_cpu":0.6,"cpu5.p_user":0.2,"cpu5.p_system":0.4,"_myid":"B83FAE306E89C800"}
{"@timestamp":1592328007.377676,"cpu_p":1.166666666666667,"user_p":0.4,"system_p":0.7666666666666667,"cpu0.p_cpu":1,"cpu0.p_user":0.4,"cpu0.p_system":0.6,"cpu1.p_cpu":1,"cpu1.p_user":0.4,"cpu1.p_system":0.6,"cpu2.p_cpu":1.6,"cpu2.p_user":0.6,"cpu2.p_system":1,"cpu3.p_cpu":0.6,"cpu3.p_user":0.2,"cpu3.p_system":0.4,"cpu4.p_cpu":1.6,"cpu4.p_user":0.6,"cpu4.p_system":1,"cpu5.p_cpu":1.4,"cpu5.p_user":0.4,"cpu5.p_system":1,"_myid":"6FF28F3687C73800"}

@novegit novegit force-pushed the fb_out_kafka_messagehash branch from 465b262 to 5230af6 Compare June 17, 2020 12:43
@novegit
Copy link
Contributor Author

novegit commented Jun 17, 2020

Valgrind output:

valgrind --leak-check=yes --show-leak-kinds=definite /fluent-bit/bin/fluent-bit -c /fluent-bit/conf/fluentbit.conf
==7== Memcheck, a memory error detector
==7== Copyright (C) 2002-2017, and GNU GPL'd, by Julian Seward et al.
==7== Using Valgrind-3.14.0 and LibVEX; rerun with -h for copyright info
==7== Command: /fluent-bit/bin/fluent-bit -c /fluent-bit/conf/fluentbit.conf
==7==
Fluent Bit v1.5.0
* Copyright (C) 2019-2020 The Fluent Bit Authors
* Copyright (C) 2015-2018 Treasure Data
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

[2020/06/17 12:40:56] [Warning] [config] I cannot open /fluent-bit/conf/parsers.conf file
[2020/06/17 12:40:56] [Warning] [config] I cannot open /fluent-bit/conf/plugins.conf file
[2020/06/17 12:40:56] [ info] [storage] version=1.0.4, initializing...
[2020/06/17 12:40:56] [ info] [storage] in-memory
[2020/06/17 12:40:56] [ info] [storage] normal synchronization mode, checksum disabled, max_chunks_up=128
[2020/06/17 12:40:56] [ info] [engine] started (pid=7)
[2020/06/17 12:40:56] [ info] [output:kafka:kafka.1] brokers='172.16.1.3:9092' topics='testacl'
[2020/06/17 12:40:56] [ info] [http_server] listen iface=0.0.0.0 tcp_port=2020
[2020/06/17 12:40:56] [ info] [sp] stream processor started
[0] cpu.local: [1592397661.762052500, {"cpu_p"=>2.166667, "user_p"=>1.800000, "system_p"=>0.366667, "cpu0.p_cpu"=>0.600000, "cpu0.p_user"=>0.200000, "cpu0.p_system"=>0.400000, "cpu1.p_cpu"=>1.000000, "cpu1.p_user"=>0.800000, "cpu1.p_system"=>0.200000, "cpu2.p_cpu"=>0.400000, "cpu2.p_user"=>0.400000, "cpu2.p_system"=>0.000000, "cpu3.p_cpu"=>8.200000, "cpu3.p_user"=>7.600000, "cpu3.p_system"=>0.600000, "cpu4.p_cpu"=>2.200000, "cpu4.p_user"=>2.000000, "cpu4.p_system"=>0.200000, "cpu5.p_cpu"=>0.600000, "cpu5.p_user"=>0.000000, "cpu5.p_system"=>0.600000}]
[0] cpu.local: [1592397666.780864100, {"cpu_p"=>0.733333, "user_p"=>0.466667, "system_p"=>0.266667, "cpu0.p_cpu"=>0.400000, "cpu0.p_user"=>0.000000, "cpu0.p_system"=>0.400000, "cpu1.p_cpu"=>0.200000, "cpu1.p_user"=>0.000000, "cpu1.p_system"=>0.200000, "cpu2.p_cpu"=>0.400000, "cpu2.p_user"=>0.200000, "cpu2.p_system"=>0.200000, "cpu3.p_cpu"=>2.400000, "cpu3.p_user"=>2.200000, "cpu3.p_system"=>0.200000, "cpu4.p_cpu"=>0.400000, "cpu4.p_user"=>0.000000, "cpu4.p_system"=>0.400000, "cpu5.p_cpu"=>0.400000, "cpu5.p_user"=>0.200000, "cpu5.p_system"=>0.200000}]
[0] cpu.local: [1592397671.724692500, {"cpu_p"=>0.466667, "user_p"=>0.133333, "system_p"=>0.333333, "cpu0.p_cpu"=>0.200000, "cpu0.p_user"=>0.000000, "cpu0.p_system"=>0.200000, "cpu1.p_cpu"=>0.200000, "cpu1.p_user"=>0.000000, "cpu1.p_system"=>0.200000, "cpu2.p_cpu"=>0.600000, "cpu2.p_user"=>0.200000, "cpu2.p_system"=>0.400000, "cpu3.p_cpu"=>0.600000, "cpu3.p_user"=>0.200000, "cpu3.p_system"=>0.400000, "cpu4.p_cpu"=>1.000000, "cpu4.p_user"=>0.200000, "cpu4.p_system"=>0.800000, "cpu5.p_cpu"=>0.400000, "cpu5.p_user"=>0.200000, "cpu5.p_system"=>0.200000}]
[0] cpu.local: [1592397676.726628200, {"cpu_p"=>0.466667, "user_p"=>0.133333, "system_p"=>0.333333, "cpu0.p_cpu"=>0.800000, "cpu0.p_user"=>0.200000, "cpu0.p_system"=>0.600000, "cpu1.p_cpu"=>0.400000, "cpu1.p_user"=>0.200000, "cpu1.p_system"=>0.200000, "cpu2.p_cpu"=>0.000000, "cpu2.p_user"=>0.000000, "cpu2.p_system"=>0.000000, "cpu3.p_cpu"=>0.400000, "cpu3.p_user"=>0.200000, "cpu3.p_system"=>0.200000, "cpu4.p_cpu"=>1.000000, "cpu4.p_user"=>0.400000, "cpu4.p_system"=>0.600000, "cpu5.p_cpu"=>0.600000, "cpu5.p_user"=>0.200000, "cpu5.p_system"=>0.400000}]
^C[engine] caught signal (SIGINT)
[2020/06/17 12:41:23] [ info] [input] pausing cpu.0
==7==
==7== HEAP SUMMARY:
==7==     in use at exit: 97,312 bytes in 679 blocks
==7==   total heap usage: 2,369 allocs, 1,690 frees, 3,114,578 bytes allocated
==7==
==7== 556 (16 direct, 540 indirect) bytes in 1 blocks are definitely lost in loss record 38 of 44
==7==    at 0x483577F: malloc (in /usr/lib/x86_64-linux-gnu/valgrind/vgpreload_memcheck-amd64-linux.so)
==7==    by 0x2B674A: flb_malloc (flb_mem.h:62)
==7==    by 0x2B6C24: cb_mq_metrics (metrics.c:110)
==7==    by 0x625858: mk_fifo_worker_read (mk_fifo.c:398)
==7==    by 0x6351F0: mk_server_worker_loop (mk_server.c:525)
==7==    by 0x62BACE: mk_sched_launch_worker_loop (mk_scheduler.c:412)
==7==    by 0x4850FA2: start_thread (pthread_create.c:486)
==7==    by 0x51B54CE: clone (clone.S:95)
==7==
==7== LEAK SUMMARY:
==7==    definitely lost: 16 bytes in 1 blocks
==7==    indirectly lost: 540 bytes in 3 blocks
==7==      possibly lost: 0 bytes in 0 blocks
==7==    still reachable: 96,756 bytes in 675 blocks
==7==         suppressed: 0 bytes in 0 blocks
==7== Reachable blocks (those to which a pointer was found) are not shown.
==7== To see them, rerun with: --leak-check=full --show-leak-kinds=all
==7==
==7== For counts of detected and suppressed errors, rerun with: -v
==7== ERROR SUMMARY: 1 errors from 
1 contexts (suppressed: 0 from 0)

as it looks is the leak in src/http_server/api/v1/metrics.c (prometheus metrics were enabled in config)

@simonasr
Copy link

@edsiper could You please review this one 🙏

@github-actions
Copy link
Contributor

This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.

@edsiper
Copy link
Member

edsiper commented Dec 13, 2021

assigned to @nokute78 for review

@erikvanbrakel
Copy link

@edsiper @nokute78 Is this something that's going to happen at a certain point? I'm currently running into this as well, and would love to know if and when I can expect this to land in fluentbit. Thank you!

Copy link
Collaborator

@nokute78 nokute78 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for late reply.

First, could you fix conflicting ?
I also added some review comments.

plugins/out_kafka/kafka_config.c Outdated Show resolved Hide resolved
plugins/out_kafka/kafka.c Outdated Show resolved Hide resolved
plugins/out_kafka/kafka.c Outdated Show resolved Hide resolved
@lecaros lecaros added the waiting-for-user Waiting for more information, tests or requested changes label Mar 14, 2022
@novegit novegit force-pushed the fb_out_kafka_messagehash branch from 5230af6 to 89ff32a Compare March 28, 2022 19:23
@novegit
Copy link
Contributor Author

novegit commented Mar 28, 2022

rebased master and switched to mbedtls_sha256

valgrind:

valgrind --leak-check=full /fluent-bit/bin/fluent-bit -c fb_mh2.conf
...
==60== HEAP SUMMARY:
==60==     in use at exit: 208,252 bytes in 4,385 blocks
==60==   total heap usage: 12,482 allocs, 8,098 frees, 18,938,141 bytes allocated
==60==
==60== LEAK SUMMARY:
==60==    definitely lost: 0 bytes in 0 blocks
==60==    indirectly lost: 0 bytes in 0 blocks
==60==      possibly lost: 0 bytes in 0 blocks
==60==    still reachable: 208,252 bytes in 4,385 blocks
==60==         suppressed: 0 bytes in 0 blocks
==60== Reachable blocks (those to which a pointer was found) are not shown.
==60== To see them, rerun with: --leak-check=full --show-leak-kinds=all
==60==
==60== For lists of detected and suppressed errors, rerun with: -s
==60== ERROR SUMMARY: 2 errors from 2 contexts (suppressed: 0 from 0)

used config:

[SERVICE]
    flush        5.5
    grace        1
    daemon       Off
    log_level    info
    parsers_file /fluent-bit/etc/parsers.conf
    plugins_file plugins.conf

    # HTTP Server
    # ===========
    # Enable/Disable the built-in HTTP Server for metrics
    http_server  On
    http_listen  0.0.0.0
    http_port    2020

[INPUT]
    name cpu
    tag  generic
    interval_sec 5

[INPUT]
    name dummy
    tag dummy.log
    dummy {"message":"dummy1", "kubernetes":{"labels":{"app":"fluentbit"}}}

[INPUT]
    name dummy
    tag dummy.log
    dummy {"message":"dummy2", "kubernetes":{"labels":{"app":"fluentbit"},"annotations":{"topic":"none"}}}

[OUTPUT]
    name  stdout
    match *
[OUTPUT]
    name kafka
    match *
    brokers   kafka:9092
    topics    test1
    Hash on
    #Hash_key  sha256
    rdkafka.request.required.acks         1
    rdkafka.queue.buffering.max.messages  100
    rdkafka.queue.buffering.max.kbytes    8000
    rdkafka.linger.ms                     150m
    rdkafka.security.protocol             plaintext
generated messages in kafka:
{"@timestamp":1648494655.627689,"cpu_p":0.6333333333333333,"user_p":0.4333333333333333,"system_p":0.2,"cpu0.p_cpu":0.2,"cpu0.p_user":0.0,"cpu0.p_system":0.2,"cpu1.p_cpu":0.4,"cpu1.p_user":0.2,"cpu1.p_system":0.2,"cpu2.p_cpu":0.8,"cpu2.p_user":0.6,"cpu2.p_system":0.2,"cpu3.p_cpu":0.4,"cpu3.p_user":0.2,"cpu3.p_system":0.2,"cpu4.p_cpu":1.8,"cpu4.p_user":1.6,"cpu4.p_system":0.2,"cpu5.p_cpu":0.6,"cpu5.p_user":0.4,"cpu5.p_system":0.2,"_id":"e2c12919c821de836df84d8316882058acc85314077aca945c299a07dad69834"}
{"@timestamp":1648494652.630272,"message":"dummy1","kubernetes":{"labels":{"app":"fluentbit"}},"_id":"0d34ec8ec61656ef70a5d2a317b757cfd209f928d71799d1e0f3d12a0a4c78f7"}
{"@timestamp":1648494653.626873,"message":"dummy1","kubernetes":{"labels":{"app":"fluentbit"}},"_id":"de05cf6a31d72bbdeef2593f2dd65f09439cd66ef60af25559bf228986c259bd"}
{"@timestamp":1648494654.627484,"message":"dummy1","kubernetes":{"labels":{"app":"fluentbit"}},"_id":"0d7d817ae5396040da821a8626e28770fe740e92b35462eaf5776e9efe4444c9"}
{"@timestamp":1648494655.627736,"message":"dummy1","kubernetes":{"labels":{"app":"fluentbit"}},"_id":"ff53d73c1b8d27597a0726b6445fc963ba84a89b26b2b47afa070ba6e1c167af"}
{"@timestamp":1648494656.630089,"message":"dummy1","kubernetes":{"labels":{"app":"fluentbit"}},"_id":"96ecd663ee837dfd6cf91d9161ae1a2d7dae9df8b811a1ec1c505279f4d114ba"}
{"@timestamp":1648494652.630357,"message":"dummy2","kubernetes":{"labels":{"app":"fluentbit"},"annotations":{"topic":"none"}},"_id":"0b44be5deac91a86f6f79578338b21c363507bbd40e02390eef62455f660b770"}
{"@timestamp":1648494653.627026,"message":"dummy2","kubernetes":{"labels":{"app":"fluentbit"},"annotations":{"topic":"none"}},"_id":"da9fffd53d2773743c3f8ae10d89cea2a01803ed433034cf50721a28b4ade7ac"}
{"@timestamp":1648494654.627525,"message":"dummy2","kubernetes":{"labels":{"app":"fluentbit"},"annotations":{"topic":"none"}},"_id":"f0982e745f27f4c199e40143c098952fb398c803923124d840b859b308fb188a"}
{"@timestamp":1648494655.627746,"message":"dummy2","kubernetes":{"labels":{"app":"fluentbit"},"annotations":{"topic":"none"}},"_id":"64c65a20de6f9455c8dbb3592a6be044372f543db9eb825eda61815ab114aa16"}
{"@timestamp":1648494656.63013,"message":"dummy2","kubernetes":{"labels":{"app":"fluentbit"},"annotations":{"topic":"none"}},"_id":"8448b27a4a5b7a94c1231cbb56d47c9a5439f04cb33f0d084190c1f1867dcc9d"}

@novegit novegit closed this Mar 28, 2022
@novegit novegit reopened this Mar 28, 2022
this PR enhances plugin out_kafka with 'hash' option. See fluent/fluent-bit-docs#321

If option 'hash' is set, an uniqe hash is added to each message

configuration example:
```
  Hash                 On
  Hash_Key             _myid
```

the commit was tested in docker/openshift environment

messages in kafka with hash field '_myid':
```
{"@timestamp":1648491317.038613,"cpu_p":1.266666666666667,"user_p":0.9,"system_p":0.3666666666666666,"cpu0.p_cpu":0.8,"cpu0.p_user":0.4,"cpu0.p_system":0.4,"cpu1.p_cpu":1.2,"cpu1.p_user":0.8,"cpu1.p_system":0.4,"cpu2.p_cpu":0.8,"cpu2.p_user":0.4,"cpu2.p_system":0.4,"cpu3.p_cpu":1.4,"cpu3.p_user":1.0,"cpu3.p_system":0.4,"cpu4.p_cpu":1.2,"cpu4.p_user":0.8,"cpu4.p_system":0.4,"cpu5.p_cpu":1.8,"cpu5.p_user":1.6,"cpu5.p_system":0.2,"_id":"8a726e3841ebe225017073cd79d6a34ece3c3f9b20ae155d48610f3f99d49ef6"}
{"@timestamp":1648491313.039214,"message":"dummy","kubernetes":{"labels":{"app":"fluentbit"}},"_id":"52e3e91e42d99eebd02cbfd863f9df979a8147615dbe979cbaf7c1fc8be8c107"}
{"@timestamp":1648491314.041181,"message":"dummy","kubernetes":{"labels":{"app":"fluentbit"}},"_id":"800535ad134c2dea06580233358665ca769698c95f7fb5ead34523de420df450"}
{"@timestamp":1648491315.041067,"message":"dummy","kubernetes":{"labels":{"app":"fluentbit"}},"_id":"e6cd998b20207c71d65e7037e998aeefe36029739cb5fd1879b10252deaf5295"}
{"@timestamp":1648491316.038793,"message":"dummy","kubernetes":{"labels":{"app":"fluentbit"}},"_id":"6cc2cb91766c333999842665c2467e03af6e502c9a75a8a22f6b0afe720bf31c"}
{"@timestamp":1648491317.038671,"message":"dummy","kubernetes":{"labels":{"app":"fluentbit"}},"_id":"149c5b304277efcf94a420425dfe074346c73f4b0ad809d6609aec5fda02386a"}
{"@timestamp":1648491318.038539,"message":"dummy","kubernetes":{"labels":{"app":"fluentbit"}},"_id":"19b05475c35bcad1880737ea4c060b31480927ee8066f238c9cf13db839d192b"}
```

Signed-off-by: Michael Voelker <[email protected]>
@novegit novegit force-pushed the fb_out_kafka_messagehash branch from 89ff32a to fff078f Compare March 28, 2022 19:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
waiting-for-user Waiting for more information, tests or requested changes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants