-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ZMQ input plugin #1729
base: master
Are you sure you want to change the base?
Add ZMQ input plugin #1729
Conversation
Added a plugin which can received the message to be sent out over ZMQ channels. For each ZMQ input plugin there must be an "Endpoint" configured, which contains the name of the ZMQ socket which will be accessed to receive the messages (for example: "ipc:///var/run/zmq-channel-1"). The ZMQ socket must be of style PULL/PUSH - the ZMQ input plugin will PULL messages from the socket and the producer of the information must PUSH them to the socket. Note that the producer must to the "bind" to the ZMQ socket, as the ZMQ plugin will do the "connect". There is also optional config variable "hwm" which can be configured for the plug-in. This will cause the receive and send high-water-marks for the ZMQ socket to be set to the value. The value is in messages, and defaults to unlimited. As normal with internal MSGPACK messages, there is an outer envelope array of two, with the first element being the timestamp, and the second one being a map of keys and their values. The ZMQ message received must contain three parts. The first part is encoded as a string named "topic" in the MAP part of the MSGPACK. The second part is encoded as a string named "key" in the MAP part. The third part is encoded as a binary data with a name "payload" in the MAP part. Note: this plugin can currently only be compiled for Linux. It can be enabled on the cmake line with "-DFLB_IN_ZMQ=Yes". Note that to compile this plugin the libczmq-dev package (and its dependencies) need installed. Signed-off-by: Gavin Shearer <[email protected]>
Thanks for your contribution. Based on community needs, we will evaluate the inclusion of this plugin. note: this plugin depends on external system library libczmq |
assigned @fujimotos for review |
Note: I have just started to take a look at this patch. |
size_t num_frames; | ||
zframe_t *frame; | ||
|
||
if ((zevents & ZMQ_POLLIN) == 0) /* nothing to read */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Our coding standard does not omit{
on a single line if.
struct flb_in_zmq_ctx *ctx = data; | ||
flb_debug("[in_zmq] resuming endpoint %s on fd %d", ctx->zmq_endpoint, | ||
ctx->ul_fd); | ||
flb_input_collector_resume(ctx->ul_fd, ctx->i_ins); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not collect. flb_input_collector_*
expects the identifier
returned by flb_input_set_collector_event()
below.
I confirmed that this plugin works as described. Attached is the test script I have used, and its execution result. Test Client In Python import time
import zmq
import msgpack
ctx = zmq.Context()
sock = ctx.socket(zmq.PUSH)
sock.bind("tcp://127.0.0.1:5555")
while True:
sock.send_multipart([b"topic", b"key", b"payload"], track=False)
time.sleep(1) Fluent Bit log
|
That said, there a few things I'm not sure about this patch:
But I'm no expart on ZMQ. If there is a good rationale for this design, |
#include "in_zmq.h" | ||
|
||
#ifdef __GNUC__ | ||
#define likely(x) __builtin_expect(!!(x), 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need to define this macro. We already have
flb_likely
in fluent-bit/flb_macros.h
.
const char *hwm_str; | ||
|
||
/* Get input properties */ | ||
ctx->zmq_endpoint = flb_input_get_property("endpoint", i_ins); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably you should use standard parameter names (Listen
and
port
for TCP and Path
for UNIX sock) instead of endpoint
.
See: https://docs.fluentbit.io/manual/pipeline/inputs/syslog
Added a plugin which can received the message to be sent out
over ZMQ channels. For each ZMQ input plugin there must be an
"Endpoint" configured, which contains the name of the ZMQ socket
which will be accessed to receive the messages (for example:
"ipc:///var/run/zmq-channel-1"). The ZMQ socket must be of style
PULL/PUSH - the ZMQ input plugin will PULL messages from the socket and
the producer of the information must PUSH them to the socket. Note that
the producer must to the "bind" to the ZMQ socket, as the ZMQ plugin
will do the "connect". There is also optional config variable "hwm"
which can be configured for the plug-in. This will cause the receive
and send high-water-marks for the ZMQ socket to be set to the value. The
value is in messages, and defaults to unlimited.
As normal with internal MSGPACK messages, there is an outer envelope
array of two, with the first element being the timestamp, and the second
one being a map of keys and their values. The ZMQ message received
must contain three parts. The first part is encoded as a string named
"topic" in the MAP part of the MSGPACK. The second part is encoded as
a string named "key" in the MAP part. The third part is encoded as a
binary data with a name "payload" in the MAP part.
Note: this plugin can currently only be compiled for Linux. It can be
enabled on the cmake line with "-DFLB_IN_ZMQ=Yes".
Note that to compile this plugin the libczmq-dev package (and its
dependencies) need installed.
Signed-off-by: Gavin Shearer [email protected]