Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ZMQ input plugin #1729

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open

Conversation

gavin-shr
Copy link

Added a plugin which can received the message to be sent out
over ZMQ channels. For each ZMQ input plugin there must be an
"Endpoint" configured, which contains the name of the ZMQ socket
which will be accessed to receive the messages (for example:
"ipc:///var/run/zmq-channel-1"). The ZMQ socket must be of style
PULL/PUSH - the ZMQ input plugin will PULL messages from the socket and
the producer of the information must PUSH them to the socket. Note that
the producer must to the "bind" to the ZMQ socket, as the ZMQ plugin
will do the "connect". There is also optional config variable "hwm"
which can be configured for the plug-in. This will cause the receive
and send high-water-marks for the ZMQ socket to be set to the value. The
value is in messages, and defaults to unlimited.

As normal with internal MSGPACK messages, there is an outer envelope
array of two, with the first element being the timestamp, and the second
one being a map of keys and their values. The ZMQ message received
must contain three parts. The first part is encoded as a string named
"topic" in the MAP part of the MSGPACK. The second part is encoded as
a string named "key" in the MAP part. The third part is encoded as a
binary data with a name "payload" in the MAP part.

Note: this plugin can currently only be compiled for Linux. It can be
enabled on the cmake line with "-DFLB_IN_ZMQ=Yes".

Note that to compile this plugin the libczmq-dev package (and its
dependencies) need installed.

Signed-off-by: Gavin Shearer [email protected]

Added a plugin which can received the message to be sent out
over ZMQ channels. For each ZMQ input plugin there must be an
"Endpoint" configured, which contains the name of the ZMQ socket
which will be accessed to receive the messages (for example:
"ipc:///var/run/zmq-channel-1"). The ZMQ socket must be of style
PULL/PUSH - the ZMQ input plugin will PULL messages from the socket and
the producer of the information must PUSH them to the socket. Note that
the producer must to the "bind" to the ZMQ socket, as the ZMQ plugin
will do the "connect". There is also optional config variable "hwm"
which can be configured for the plug-in. This will cause the receive
and send high-water-marks for the ZMQ socket to be set to the value. The
value is in messages, and defaults to unlimited.

As normal with internal MSGPACK messages, there is an outer envelope
array of two, with the first element being the timestamp, and the second
one being a map of keys and their values.  The ZMQ message received
must contain three parts. The first part is encoded as a string named
"topic" in the MAP part of the MSGPACK. The second part is encoded as
a string named "key" in the MAP part.  The third part is encoded as a
binary data with a name "payload" in the MAP part.

Note: this plugin can currently only be compiled for Linux. It can be
enabled on the cmake line with "-DFLB_IN_ZMQ=Yes".

Note that to compile this plugin the libczmq-dev package (and its
dependencies) need installed.

Signed-off-by: Gavin Shearer <[email protected]>
@edsiper
Copy link
Member

edsiper commented May 5, 2020

Thanks for your contribution.

Based on community needs, we will evaluate the inclusion of this plugin.

note: this plugin depends on external system library libczmq

@edsiper
Copy link
Member

edsiper commented Nov 27, 2020

assigned @fujimotos for review

@fujimotos
Copy link
Member

Note: I have just started to take a look at this patch.

size_t num_frames;
zframe_t *frame;

if ((zevents & ZMQ_POLLIN) == 0) /* nothing to read */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our coding standard does not omit{ on a single line if.

struct flb_in_zmq_ctx *ctx = data;
flb_debug("[in_zmq] resuming endpoint %s on fd %d", ctx->zmq_endpoint,
ctx->ul_fd);
flb_input_collector_resume(ctx->ul_fd, ctx->i_ins);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not collect. flb_input_collector_* expects the identifier
returned by flb_input_set_collector_event() below.

@fujimotos
Copy link
Member

I confirmed that this plugin works as described.

Attached is the test script I have used, and its execution result.

Test Client In Python

import time
import zmq
import msgpack

ctx = zmq.Context()
sock = ctx.socket(zmq.PUSH)
sock.bind("tcp://127.0.0.1:5555")

while True:
    sock.send_multipart([b"topic", b"key", b"payload"], track=False)
    time.sleep(1)

Fluent Bit log

$ fluent-bit -i zmq -p endpoint=tcp://127.0.0.1:5555 -o stdout
...
[0] zmq.0: [1606894429.236495720, {"topic"=>"topic", "key"=>"key", "payload"=>"payload"}]
[0] zmq.0: [1606894430.237640321, {"topic"=>"topic", "key"=>"key", "payload"=>"payload"}]
[1] zmq.0: [1606894431.238933348, {"topic"=>"topic", "key"=>"key", "payload"=>"payload"}]
[2] zmq.0: [1606894432.239989316, {"topic"=>"topic", "key"=>"key", "payload"=>"payload"}]
[3] zmq.0: [1606894433.241117389, {"topic"=>"topic", "key"=>"key", "payload"=>"payload"}]
[4] zmq.0: [1606894434.242347335, {"topic"=>"topic", "key"=>"key", "payload"=>"payload"}]

``

@fujimotos
Copy link
Member

That said, there a few things I'm not sure about this patch:

  1. Is the message format correct?

    • I see Fluentd's ZMQ plugin allows users to send an event in a single frame.
    • That seems be more intuitive to me than sending one in 3 frames (as the current patch does).
  2. Shouldn't "topic" be emitted as Fluent Bit's tag (rather than a field in a record?)

    a.k.a. shouldn't the record format be:

    tag.from.zeromq: [1606894434.242347335, {"key"=>"key", "payload"=>"payload"}]
    

    instead of the following?

     zmq.0: [1606894434.242347335, {"topic"=>"tag.from.zeromq", "key"=>"key", "payload"=>"payload"}]
    

But I'm no expart on ZMQ. If there is a good rationale for this design,
I'm not against mergint this patch.

#include "in_zmq.h"

#ifdef __GNUC__
#define likely(x) __builtin_expect(!!(x), 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to define this macro. We already have
flb_likely in fluent-bit/flb_macros.h.

const char *hwm_str;

/* Get input properties */
ctx->zmq_endpoint = flb_input_get_property("endpoint", i_ins);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably you should use standard parameter names (Listen and
port for TCP and Path for UNIX sock) instead of endpoint.

See: https://docs.fluentbit.io/manual/pipeline/inputs/syslog

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants