Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filter_rewrite_tag: add and-combination for rules #2399

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

novegit
Copy link
Contributor

@novegit novegit commented Jul 28, 2020

rules in rewrite_tag filter were combined with OR combination. In some usecases
an AND-combination is helpful. For instance, when logmessages in kubernetes from
customer namespaces should be dropped, which haven't set a special annotation
field. Without an AND-combination, two filter section are necessary, to get this
done.

Configuration example:
to each rule a fifth field with true|false can be added. 'true' means, that this
rule should be "AND"-combined with the next rule. "false" means default "OR"
behaviour, and is not needed. So its full compatible with old filter configuration.

[FILTER]
    Name          rewrite_tag
    Match         tail
    Rule          $log ^(1|3)$    newtag_or    false false
    Rule          $log ^(.*end)$  newtag_and_1 false true
    Rule          $log ^(1.*)$    newtag_and_2 false false
    Rule          $log ^(2.*)$    newtag_or    false

Signed-off-by: Michael Voelker [email protected]


Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
  • Debug log output from testing the change
  • Attached Valgrind output that shows no leaks or memory corruption was found

Documentation

  • Documentation required for this feature

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

rules in rewrite_tag filter were combined with OR combination. In some usecases
an AND-combination is helpful. For instance, when logmessages in kubernetes from
customer namespaces should be dropped, which haven't set a special annotation
field. Without an AND-combination, two filter section are necessary, to get this
done.

Configuration example:
to each rule a fifth field with true|false can be added. 'true' means, that this
rule should be "AND"-combined with the next rule. "false" means default "OR"
behaviour, and is not needed. So its full compatible with old filter configuration.

```
[FILTER]
    Name          rewrite_tag
    Match         tail
    Rule          $log ^(1|3)$    newtag_or    false false
    Rule          $log ^(.*end)$  newtag_and_1 false true
    Rule          $log ^(1.*)$    newtag_and_2 false false
    Rule          $log ^(2.*)$    newtag_or    false
```

Signed-off-by: Michael Voelker <[email protected]>
@novegit
Copy link
Contributor Author

novegit commented Jul 28, 2020

Example configfiguration

fluentbit.conf

[SERVICE]
    # Flush
    # =====
    # Set an interval of seconds before to flush records to a destination
    Flush        5

    # Daemon
    # ======
    # Instruct Fluent Bit to run in foreground or background mode.
    Daemon       Off
    Log_Level debug

    # Parsers_File
    # ============
    # Specify an optional 'Parsers' configuration file
    Parsers_File parsers.conf
    Plugins_File plugins.conf

    # HTTP Server
    # ===========
    # Enable/Disable the built-in HTTP Server for metrics
    HTTP_Server  On
    HTTP_Listen  0.0.0.0
    HTTP_Port    2020

[INPUT]
    Name tail
    Tag tail
    Path /var/tmp/loginput.txt
    Docker_Mode On

[FILTER]
    Name          rewrite_tag
    Match         tail
    Rule          $log ^(1|3)$    newtag_or    false false
    Rule          $log ^(.*end)$  newtag_and_1 false true
    Rule          $log ^(1.*)$    newtag_and_2 false false
    Rule          $log ^(2.*)$    newtag_or    false

[OUTPUT]
    Name  stdout
    Match *

/var/tmp/loginput.txt

1
2
3
4
5
6
7
8
9
10start
10test
10end
11no
12

rewrite_tag result:

[0] newtag_or: [1595957976.822152000, {"log"=>"1"}]
[1] newtag_or: [1595957976.825958100, {"log"=>"2"}]
[2] newtag_or: [1595957976.826111700, {"log"=>"3"}]
[0] newtag_and_2: [1595957976.826542900, {"log"=>"10end"}]

@novegit
Copy link
Contributor Author

novegit commented Jul 28, 2020

Debug log

/root@79f207404693:/tmp/src/build# usr/bin/valgrind.bin /fluent-bit/bin/fluent-bit -c /fluent-bit/etc/fluent-bit.conf
==17== Memcheck, a memory error detector
==17== Copyright (C) 2002-2017, and GNU GPL'd, by Julian Seward et al.
==17== Using Valgrind-3.14.0 and LibVEX; rerun with -h for copyright info
==17== Command: /fluent-bit/bin/fluent-bit -c /fluent-bit/etc/fluent-bit.conf
==17==
Fluent Bit v1.6.0
* Copyright (C) 2019-2020 The Fluent Bit Authors
* Copyright (C) 2015-2018 Treasure Data
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

[2020/07/28 17:39:36] [ info] Configuration:
[2020/07/28 17:39:36] [ info]  flush time     | 5.000000 seconds
[2020/07/28 17:39:36] [ info]  grace          | 5 seconds
[2020/07/28 17:39:36] [ info]  daemon         | 0
[2020/07/28 17:39:36] [ info] ___________
[2020/07/28 17:39:36] [ info]  inputs:
[2020/07/28 17:39:36] [ info]      tail
[2020/07/28 17:39:36] [ info] ___________
[2020/07/28 17:39:36] [ info]  filters:
[2020/07/28 17:39:36] [ info]      rewrite_tag.0
[2020/07/28 17:39:36] [ info] ___________
[2020/07/28 17:39:36] [ info]  outputs:
[2020/07/28 17:39:36] [ info]      stdout.0
[2020/07/28 17:39:36] [ info] ___________
[2020/07/28 17:39:36] [ info]  collectors:
[2020/07/28 17:39:36] [ info] [engine] started (pid=17)
[2020/07/28 17:39:36] [debug] [engine] coroutine stack size: 24576 bytes (24.0K)
[2020/07/28 17:39:36] [debug] [storage] [cio stream] new stream registered: tail.0
[2020/07/28 17:39:36] [ info] [storage] version=1.0.4, initializing...
[2020/07/28 17:39:36] [ info] [storage] in-memory
[2020/07/28 17:39:36] [ info] [storage] normal synchronization mode, checksum disabled, max_chunks_up=128
[2020/07/28 17:39:36] [debug] [input:tail:tail.0] inotify watch fd=19
[2020/07/28 17:39:36] [debug] [input:tail:tail.0] scanning path /var/tmp/loginput.txt
[2020/07/28 17:39:36] [debug] [input:tail:tail.0] inode=15564311 appended as /var/tmp/loginput.txt
[2020/07/28 17:39:36] [debug] [input:tail:tail.0] scan_glob add(): /var/tmp/loginput.txt, inode 15564311
[2020/07/28 17:39:36] [debug] [input:tail:tail.0] 1 new files found on path '/var/tmp/loginput.txt'
[2020/07/28 17:39:36] [debug] [storage] [cio stream] new stream registered: emitter_for_rewrite_tag.0
[2020/07/28 17:39:36] [debug] [router] match rule tail.0:stdout.0
[2020/07/28 17:39:36] [debug] [router] match rule emitter.1:stdout.0
[2020/07/28 17:39:36] [ info] [http_server] listen iface=0.0.0.0 tcp_port=2020
[2020/07/28 17:39:36] [ info] [sp] stream processor started
[2020/07/28 17:39:36] [debug] [input:tail:tail.0] inode=15564311 file=/var/tmp/loginput.txt promote to TAIL_EVENT
[2020/07/28 17:39:36] [ info] inotify_fs_add(): inode=15564311 watch_fd=1 name=/var/tmp/loginput.txt
[0] tail: [1595957976.826163800, {"log"=>"4"}]
[1] tail: [1595957976.826225200, {"log"=>"5"}]
[2] tail: [1595957976.826260300, {"log"=>"6"}]
[3] tail: [1595957976.826295600, {"log"=>"7"}]
[4] tail: [1595957976.826329600, {"log"=>"8"}]
[5] tail: [1595957976.826414200, {"log"=>"9"}]
[6] tail: [1595957976.826452800, {"log"=>"10start"}]
[7] tail: [1595957976.826488900, {"log"=>"10test"}]
[8] tail: [1595957976.826579100, {"log"=>"11no"}]
[9] tail: [1595957976.826613300, {"log"=>"12"}]
[0] newtag_or: [1595957976.822152000, {"log"=>"1"}]
[1] newtag_or: [1595957976.825958100, {"log"=>"2"}]
[2] newtag_or: [1595957976.826111700, {"log"=>"3"}]
[2020/07/28 17:39:41] [debug] [task] created task=0x5fb1a60 id=0 OK
[0] newtag_and_2: [1595957976.826542900, {"log"=>"10end"}]
[2020/07/28 17:39:41] [debug] [task] created task=0x5fccc20 id=1 OK
[2020/07/28 17:39:41] [debug] [task] created task=0x5fccdb0 id=2 OK
[2020/07/28 17:39:41] [debug] [task] destroy task=0x5fb1a60 (task_id=0)
[2020/07/28 17:39:41] [debug] [task] destroy task=0x5fccc20 (task_id=1)
[2020/07/28 17:39:41] [debug] [task] destroy task=0x5fccdb0 (task_id=2)
[2020/07/28 17:40:36] [debug] [input:tail:tail.0] scanning path /var/tmp/loginput.txt
[2020/07/28 17:40:36] [debug] [input:tail:tail.0] scan_blog add(): dismissed: /var/tmp/loginput.txt, inode 15564311
[2020/07/28 17:40:36] [debug] [input:tail:tail.0] 0 new files found on path '/var/tmp/loginput.txt'
^C[engine] caught signal (SIGINT)
[2020/07/28 17:41:02] [ info] [input] pausing tail.0
[2020/07/28 17:41:02] [ info] [input] pausing emitter_for_rewrite_tag.0
[2020/07/28 17:41:02] [debug] [input:tail:tail.0] inode=15564311 removing file name /var/tmp/loginput.txt
[2020/07/28 17:41:02] [ info] inotify_fs_remove(): inode=15564311 watch_fd=1
==17== Invalid free() / delete / delete[] / realloc()
==17==    at 0x48369AB: free (in /usr/lib/x86_64-linux-gnu/valgrind/vgpreload_memcheck-amd64-linux.so)
==17==    by 0x509CAB9: free_key_mem (dlerror.c:223)
==17==    by 0x509CAB9: __dlerror_main_freeres (dlerror.c:239)
==17==    by 0x5224B71: __libc_freeres (in /lib/x86_64-linux-gnu/libc-2.28.so)
==17==    by 0x482B19E: _vgnU_freeres (in /usr/lib/x86_64-linux-gnu/valgrind/vgpreload_core-amd64-linux.so)
==17==    by 0x1832AA: flb_signal_handler (fluent-bit.c:396)
==17==    by 0x50F383F: ??? (in /lib/x86_64-linux-gnu/libc-2.28.so)
==17==    by 0x51B57EE: epoll_wait (epoll_wait.c:30)
==17==    by 0x64B256: _mk_event_wait (mk_event_epoll.c:354)
==17==    by 0x64B546: mk_event_wait (mk_event.c:163)
==17==    by 0x211301: flb_engine_start (flb_engine.c:555)
==17==    by 0x1844F5: flb_main (fluent-bit.c:1035)
==17==    by 0x184543: main (fluent-bit.c:1048)
==17==  Address 0x5d9b1e0 is in a rw- anonymous segment
==17==
==17==
==17== HEAP SUMMARY:
==17==     in use at exit: 89,571 bytes in 663 blocks
==17==   total heap usage: 5,358 allocs, 4,696 frees, 6,115,075 bytes allocated
==17==
==17== LEAK SUMMARY:
==17==    definitely lost: 48 bytes in 2 blocks
==17==    indirectly lost: 882 bytes in 5 blocks
==17==      possibly lost: 0 bytes in 0 blocks
==17==    still reachable: 88,641 bytes in 656 blocks
==17==         suppressed: 0 bytes in 0 blocks
==17== Rerun with --leak-check=full to see details of leaked memory
==17==
==17== For counts of detected and suppressed errors, rerun with: -v
==17== ERROR SUMMARY: 1 errors from 1 contexts (suppressed: 0 from 0)

novegit added a commit to novegit/fluent-bit-docs that referenced this pull request Jul 29, 2020
description for and-combination for rewrite_tag filter rules
PR: fluent/fluent-bit#2399

Signed-off-by: Michael Voelker <[email protected]>
@edsiper
Copy link
Member

edsiper commented Jul 29, 2020

thanks for opening this PR.

I think it's not the main purpose of rewrite tag filter to discard records, but rewrite tags only, and optionally discard the original matched ones.

If you need to "let pass" records that matches a criteria there are other ways to do it. E.g: consider the following test file with two records:

{"log": {"kubernetes": {"msg": "test1", "logme": "yes"}}}
{"log": {"kubernetes": {"msg": "test2"}}}

Using tail + grep filter you can accomplish the same thing you need:

fluent-bit -R parsers.conf \
  -i tail -p path=test.log -p parser=json \
  -F grep -p "regex=\$log['kubernetes']['logme'] yes" -m '*' \
  -o stdout -p format=json_lines -f 1

output:

{"date":1596059959.811035,"log":{"kubernetes":{"msg":"test1","logme":"yes"}}}

another way to accomplish the same is using the stream processor, making the input plugin non-routable and creating a new stream that matches a SQL criteria.

@edsiper edsiper self-assigned this Jul 29, 2020
@edsiper edsiper added the waiting-for-user Waiting for more information, tests or requested changes label Jul 29, 2020
@novegit
Copy link
Contributor Author

novegit commented Jul 30, 2020

thanks for the reply, I didnt' knew so far, that grep can handle nested fields with this regex syntax. (In grep documentation section is only an example about using filter/nest for this case)

But i still don't see, that grep can solve my usecase. Task is to drop messages, that fulfill both conditions:
kubernetes['namespace_name'] should match ^.*-(acc|dev|prd|test)$
and
kubernetes['annotations']['loggingkafkatopic'] does not exist

input:

{"log": {"kubernetes": {"namespace_name: "namespace-customer1-tst", "labels": {"app": "prometheus"}, "annotations": {"loggingkafkatopic": "customer_kafka_topic" }}}}
{"log": {"kubernetes": {"namespace_name: "namespace-customer1-dev", "labels": {"app": "prometheus"}, "annotations": {"sample: "data" }}}}
{"log": {"kubernetes": {"namespace_name: "openshift-monitoring", "labels": {"app": "prometheus"}, "annotations": {"loggingkafkatopic": "customer_kafka_topic" }}}}
fluent-bit -R parsers.conf \
 -i tail -p path=in -p parser=json \
 -F grep -p "regex=\$log['kubernetes']['name_space'] .*-(acc|dev|prd|tst)" -m '*'  
 -p "exclude=\$log['kubernetes']['annotations']['loggingkafkatopic'] .*"   
 -o stdout

result:

[0] tail.0: [1596138829.479714300, {"log"=>"{"log": {"kubernetes": {"name_space: "namespace-customer1-tst", "labels": {"app": "prometheus"}, "annotations": {"loggingkafkatopic": "customer_kafka_topic" }}}}"}]
[1] tail.0: [1596138829.479724300, {"log"=>"{"log": {"kubernetes": {"name_space: "namespace-customer1-dev", "labels": {"app": "prometheus"}, "annotations": {"sample: "data" }}}}"}]

I assume, that the exclude-part isn't executed for the first result line, because when the first 'regex' matched, the grep-filter is finished for the message.

An alternate requirement could be in future, not to drop, but to route them to a seperate output with a customers pool kafka topic, collecting all the 'customer-logs' without a namespace specific kafkatopic (set in annotation loggingkafkatopic) to seperate them from the clusters default kafkatopic.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
waiting-for-user Waiting for more information, tests or requested changes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants