Skip to content

Commit

Permalink
in_syslog: add expansion for ident and msg-id in tag
Browse files Browse the repository at this point in the history
Allow * in tag, e.g. syslog/*/*/. Wildcard chars are to be expanded to <ident> and <msg-id> fields.

One and only * is expanded to <ident>, and an optional, second * is expanded to <msg-id>.

A test runtime/in_syslog.c has been added to verify the tag expansion functionality - it does not involve any real syslog service.

Signed-off-by: Sergej Gureev <[email protected]>
  • Loading branch information
Sergej Gureev committed May 28, 2020
1 parent 560c4e3 commit c17a8f5
Show file tree
Hide file tree
Showing 5 changed files with 264 additions and 1 deletion.
1 change: 1 addition & 0 deletions plugins/in_syslog/syslog.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ struct flb_syslog {

/* Configuration */
struct flb_parser *parser;
int dynamic_tag;

/* List for connections and event loop */
struct mk_list connections;
Expand Down
7 changes: 7 additions & 0 deletions plugins/in_syslog/syslog_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,13 @@ struct flb_syslog *syslog_conf_create(struct flb_input_instance *ins,
return NULL;
}

/* Tag settings */
if (strchr(ins->tag, '*') != NULL) {
ctx->dynamic_tag = FLB_TRUE;
} else {
ctx->dynamic_tag = FLB_FALSE;
}

return ctx;
}

Expand Down
109 changes: 108 additions & 1 deletion plugins/in_syslog/syslog_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,121 @@
* limitations under the License.
*/

#define _GNU_SOURCE

#include <fluent-bit/flb_input_plugin.h>
#include <fluent-bit/flb_parser.h>
#include <fluent-bit/flb_time.h>

#include "syslog.h"
#include "syslog_conn.h"

#include <msgpack.h>
#include <string.h>

static inline void consume_bytes(char *buf, int bytes, int length)
{
memmove(buf, buf + bytes, length - bytes);
}

#define MIN(X, Y) (((X) < (Y)) ? (X) : (Y))
#define DYNAMIC_TAG_MAX 128

// find and fill the value by msgpack format, e.g. \xA5ident\xA7my-appX...
// https://github.com/msgpack/msgpack/blob/master/spec.md#str-format-family
static int fill_field_value(char const *data, size_t const data_size, char const *name, char *out_val, char const *out_limit) {
int i;
int val_len = -1;
int matched;
size_t off = 0;
msgpack_object k;
msgpack_object v;
msgpack_object map;
msgpack_unpacked result;

msgpack_unpacked_init(&result);
msgpack_unpack_next(&result, data, data_size, &off);

map = result.data;
if (map.type != MSGPACK_OBJECT_MAP) {
flb_warn("[in_syslog] Wrong object type of field '%s': was %d, expected map", name, map.type);
msgpack_unpacked_destroy(&result);
return -1;
}

matched = -1;
for (i = 0; i < map.via.map.size; i++) {
k = map.via.map.ptr[i].key;
if (k.type != MSGPACK_OBJECT_STR) {
continue;
}

if (strncmp(name, k.via.str.ptr, k.via.str.size) == 0) {
/* we have a match, stop the check */
matched = i;
break;
}
}

/* No matches, no need to continue */
if (matched == -1) {
flb_warn("[in_syslog] field '%s' missing in syslog parser definition", name);
msgpack_unpacked_destroy(&result);
return -1;
}

v = map.via.map.ptr[i].val;
if (v.type != MSGPACK_OBJECT_STR) {
flb_warn("[in_syslog] field '%s' is not a string", name);
msgpack_unpacked_destroy(&result);
return -1;
}
val_len = MIN(v.via.str.size, out_limit - out_val);
strncpy(out_val, v.via.str.ptr, val_len);

msgpack_unpacked_destroy(&result);
return val_len;
}

static int tag_compose(char *tag, char *data, size_t data_size, char *out_tag, size_t out_tag_max) {
enum { FIELD_IDENT, FIELD_MSGID }; // first * is <ident>, second * is <msgid>
char *in = tag;
char *in_end = tag + strlen(tag);
char *out = out_tag;
char *out_limit = out_tag + out_tag_max - 1;
int next_field = FIELD_IDENT;
while (in < in_end && out < out_limit) {
char *e = strchr(in, '*');
if (e == NULL) {
e = in_end;
}
int len = e - in;
if (len > 0) {
memcpy(out, in, len);
in += len;
out += len;
}
if (*in == '*') {
int field_len = 0;
switch (next_field) {
case FIELD_IDENT:
field_len = fill_field_value(data, data_size, "ident", out, out_limit);
break;
case FIELD_MSGID:
field_len = fill_field_value(data, data_size, "msgid", out, out_limit);
break;
}
if (field_len > 0) {
out += field_len;
}
next_field++;
in++;
}
}
*out = '\0';
return out - out_tag;
}

static inline int pack_line(struct flb_syslog *ctx,
struct flb_time *time, char *data, size_t data_size)
{
Expand All @@ -46,7 +147,13 @@ static inline int pack_line(struct flb_syslog *ctx,
flb_time_append_to_msgpack(time, &mp_pck, 0);
msgpack_sbuffer_write(&mp_sbuf, data, data_size);

flb_input_chunk_append_raw(ctx->ins, NULL, 0, mp_sbuf.data, mp_sbuf.size);
if (ctx->dynamic_tag) {
char tag[DYNAMIC_TAG_MAX];
int tag_len = tag_compose(ctx->ins->tag, data, data_size, tag, DYNAMIC_TAG_MAX);
flb_input_chunk_append_raw(ctx->ins, tag, tag_len, mp_sbuf.data, mp_sbuf.size);
} else {
flb_input_chunk_append_raw(ctx->ins, NULL, 0, mp_sbuf.data, mp_sbuf.size);
}
msgpack_sbuffer_destroy(&mp_sbuf);

return 0;
Expand Down
1 change: 1 addition & 0 deletions tests/runtime/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ if(FLB_OUT_LIB)
FLB_RT_TEST(FLB_IN_HEAD "in_head.c")
FLB_RT_TEST(FLB_IN_DUMMY "in_dummy.c")
FLB_RT_TEST(FLB_IN_RANDOM "in_random.c")
FLB_RT_TEST(FLB_IN_SYSLOG "in_syslog.c")
endif()

# Filter Plugins
Expand Down
147 changes: 147 additions & 0 deletions tests/runtime/in_syslog.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */

#include <fluent-bit.h>
#include <fluent-bit/flb_parser.h>
#include <semaphore.h>
#include "flb_tests_runtime.h"

extern struct flb_output_plugin out_stdout_plugin;

void init_test(void) __attribute__ ((constructor));

static sem_t sem_last_tag;
static char last_tag[256] = { 0 };
static void (*old_cb_stdout_flush)(const void *data, size_t bytes, const char *tag, int tag_len, struct flb_input_instance *i_ins, void *out_context, struct flb_config *config);
static void cb_stdout_flush(const void *data, size_t bytes, const char *tag, int tag_len, struct flb_input_instance *i_ins, void *out_context, struct flb_config *config);

/* Test data */
const char SYSLOG_DATA[] = "<162>1 2019-08-15T15:50:46.866915+03:00 localhost MyApp 5001 Type1 - No chain 'foo' found in org.springframework...\n";
const char SYSLOG_LONG_DATA[] = "<162>1 2019-08-15T15:50:46.866915+03:00 localhost 123456789012345678901234567890123456789X 5001 ABCDEFGHIJKLMNOPQRSTUVWXYZ9876543210 - No chain 'foo' found in org.springframework...\n";

/* Test functions */
void flb_test_syslog(const char* log_data, const char *tag_def, const char *result_tag);
void flb_test_syslog_tagging_static(void);
void flb_test_syslog_tagging_with_ident(void);
void flb_test_syslog_tagging_with_ident_n_msgid(void);
void flb_test_syslog_tagging_minimal(void);
void flb_test_syslog_tagging_oversized(void);
void flb_test_syslog_tagging_long_fields(void);

/* Test list */
TEST_LIST = {
{"tagging_static", flb_test_syslog_tagging_static },
{"tagging_with_ident", flb_test_syslog_tagging_with_ident },
{"tagging_with_ident_n_msgid", flb_test_syslog_tagging_with_ident_n_msgid},
{"tagging_minimal", flb_test_syslog_tagging_minimal},
{"tagging_oversized", flb_test_syslog_tagging_oversized},
{"tagging_long_fields", flb_test_syslog_tagging_long_fields},
{NULL, NULL}
};


void init_test(void)
{
sem_init(&sem_last_tag, 0, 0);
old_cb_stdout_flush = out_stdout_plugin.cb_flush;
out_stdout_plugin.cb_flush = cb_stdout_flush;
}

static void cb_stdout_flush(const void *data, size_t bytes,
const char *tag, int tag_len,
struct flb_input_instance *i_ins,
void *out_context,
struct flb_config *config)
{
strncpy(last_tag, tag, tag_len);
last_tag[tag_len] = '\0';
sem_post(&sem_last_tag);
old_cb_stdout_flush(data, bytes, tag, tag_len, i_ins, out_context, config);
}

void flb_test_syslog_tagging_static(void)
{
flb_test_syslog(SYSLOG_DATA, "Syslog", "Syslog");
}

void flb_test_syslog_tagging_with_ident(void)
{
flb_test_syslog(SYSLOG_DATA, "Syslog_*X", "Syslog_MyAppX");
}

void flb_test_syslog_tagging_with_ident_n_msgid(void)
{
flb_test_syslog(SYSLOG_DATA, "Syslog_*_*X", "Syslog_MyApp_Type1X");
}

void flb_test_syslog_tagging_minimal(void)
{
flb_test_syslog(SYSLOG_DATA, "**", "MyAppType1");
}

void flb_test_syslog_tagging_oversized(void)
{
char input_tag[128];
char output_tag[128];
int i;
for (i = 0; i < 120; i++) {
input_tag[i] = output_tag[i] = '_';
}
input_tag[120] = output_tag[120] = '\0';
strcat(input_tag, "*_*");
strcat(output_tag, "MyApp_T");
flb_test_syslog(SYSLOG_DATA, input_tag, output_tag);
}

// for msgpack string between 32-255 bytes
void flb_test_syslog_tagging_long_fields(void)
{
flb_test_syslog(SYSLOG_LONG_DATA, "*-*", "123456789012345678901234567890123456789X-ABCDEFGHIJKLMNOPQRSTUVWXYZ9876543210");
}

void flb_test_syslog(const char *log_data, const char *tag_def, const char *result_tag)
{
flb_ctx_t *ctx = flb_create();
flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", "error", NULL);

struct flb_parser *parser = flb_parser_create(
"syslog-rfc5424", "regex", "^\\<(?<pri>[0-9]{1,5})\\>1 (?<time>[^ ]+) (?<host>[^ ]+) (?<ident>[^ ]+) (?<pid>[-0-9]+) (?<msgid>[^ ]+) (?<extradata>(\\[(.*)\\]|-)) (?<message>.+)$",
"%Y-%m-%dT%H:%M:%S.%L", "time", NULL, MK_TRUE, NULL, 0,
NULL, ctx->config);
TEST_CHECK(parser != NULL);

int in_ffd = flb_input(ctx, (char *) "syslog", NULL);
TEST_CHECK(in_ffd >= 0);
flb_input_set(ctx, in_ffd, "mode", "tcp", NULL);
flb_input_set(ctx, in_ffd, "tag", tag_def, NULL);

int out_ffd = flb_output(ctx, (char *) "stdout", NULL);
TEST_CHECK(out_ffd >= 0);
flb_output_set(ctx, out_ffd, "match", "*", NULL);

int flb_ret = flb_start(ctx);
TEST_CHECK(flb_ret == 0);

int sockfd = socket(AF_INET, SOCK_STREAM, 0);
TEST_CHECK(sockfd >= 0);

struct sockaddr_in serv_addr = {
.sin_family = AF_INET,
.sin_port = htons(5140),
.sin_addr.s_addr = htonl(INADDR_LOOPBACK),
.sin_zero = { 0 }
};
TEST_CHECK(connect(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) >= 0);

int data_len = strlen(log_data);
TEST_CHECK(write(sockfd, log_data, data_len) == data_len);
close(sockfd);

sem_wait(&sem_last_tag);

flb_info("[test_in_syslog] received tag: [%s]", last_tag);
TEST_CHECK(strcmp(last_tag, result_tag) == 0);

flb_stop(ctx);
flb_destroy(ctx);
}

0 comments on commit c17a8f5

Please sign in to comment.