Skip to content

Commit

Permalink
in_syslog: add expansion for ident and msg-id in tag
Browse files Browse the repository at this point in the history
Allow "*" in tag, e.g. "syslog/*/*/". Wildcard chars are to be expanded
to <ident> and <msg-id> fields.

One and only "*" is expanded to <ident>, and an optional, second "*" is
expanded to <msg-id>.

A test runtime/in_syslog has been added to verify the tag expansion
functionality - it does not involve the real syslog service.

Signed-off-by: Ji-Ping Shen <[email protected]>
  • Loading branch information
jiping-s committed Nov 8, 2019
1 parent e1c7c2d commit 1e36019
Show file tree
Hide file tree
Showing 5 changed files with 239 additions and 1 deletion.
1 change: 1 addition & 0 deletions plugins/in_syslog/syslog.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ struct flb_syslog {

/* Configuration */
struct flb_parser *parser;
int dynamic_tag;

/* List for connections and event loop */
struct mk_list connections;
Expand Down
7 changes: 7 additions & 0 deletions plugins/in_syslog/syslog_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,13 @@ struct flb_syslog *syslog_conf_create(struct flb_input_instance *i_ins,
return NULL;
}

/* Tag settings */
if (strchr(i_ins->tag, '*') != NULL) {
ctx->dynamic_tag = FLB_TRUE;
} else {
ctx->dynamic_tag = FLB_FALSE;
}

return ctx;
}

Expand Down
84 changes: 83 additions & 1 deletion plugins/in_syslog/syslog_prot.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
* limitations under the License.
*/

#define _GNU_SOURCE

#include <string.h>

#include <fluent-bit/flb_info.h>
Expand All @@ -32,6 +34,80 @@ static inline void consume_bytes(char *buf, int bytes, int length)
memmove(buf, buf + bytes, length - bytes);
}

#define DYNAMIC_TAG_MAX 128

// find and fill the value by msgpack format, e.g. \xA5ident\xA7my-appX...
// https://github.com/msgpack/msgpack/blob/master/spec.md#str-format-family
static int fill_field_value(char const *data, size_t const data_size, char const *name, char *out_val, char const *out_limit) {
size_t name_len = strlen(name);
unsigned char name_buf[1 + 31];
name_buf[0] = 0xA0 + name_len;
memcpy(name_buf + 1, name, name_len);
unsigned char *key_start = memmem(data, data_size, name_buf, name_len + 1);
if (key_start == NULL) {
flb_warn("[in_syslog] field '%s' missing in syslog parser definition", name);
return -1;
}
unsigned char *val_header_ptr = key_start + 1 + name_len;
unsigned char val_header = *val_header_ptr;
int val_len;
unsigned char *val_ptr;
if ((val_header & 0xA0) == 0xA0) {
val_len = val_header & ~0xA0;
val_ptr = key_start + 1 + name_len + 1;
} else if (val_header == 0xD9) {
val_len = *((unsigned char *) val_header_ptr + 1);
val_ptr = key_start + 2 + name_len + 1;
} else {
flb_warn("[in_syslog] field '%s' has invalid value header: 0x%02X", name, val_header);
return -1;
}
if (val_len > out_limit - out_val) {
val_len = out_limit - out_val;
}
memcpy(out_val, val_ptr, val_len);
return val_len;
}

static int tag_compose(char *tag, char *data, size_t data_size, char *out_tag, size_t out_tag_max) {
enum { FIELD_IDENT, FIELD_MSGID }; // first * is <ident>, second * is <msgid>
char *in = tag;
char *in_end = tag + strlen(tag);
char *out = out_tag;
char *out_limit = out_tag + out_tag_max - 1;
int next_field = FIELD_IDENT;
while (in < in_end && out < out_limit) {
char *e = strchr(in, '*');
if (e == NULL) {
e = in_end;
}
int len = e - in;
if (len > 0) {
memcpy(out, in, len);
in += len;
out += len;
}
if (*in == '*') {
int field_len = 0;
switch (next_field) {
case FIELD_IDENT:
field_len = fill_field_value(data, data_size, "ident", out, out_limit);
break;
case FIELD_MSGID:
field_len = fill_field_value(data, data_size, "msgid", out, out_limit);
break;
}
if (field_len > 0) {
out += field_len;
}
next_field++;
in++;
}
}
*out = '\0';
return out - out_tag;
}

static inline int pack_line(struct flb_syslog *ctx,
struct flb_time *time, char *data, size_t data_size)
{
Expand All @@ -46,7 +122,13 @@ static inline int pack_line(struct flb_syslog *ctx,
flb_time_append_to_msgpack(time, &mp_pck, 0);
msgpack_sbuffer_write(&mp_sbuf, data, data_size);

flb_input_chunk_append_raw(ctx->i_ins, NULL, 0, mp_sbuf.data, mp_sbuf.size);
if (ctx->dynamic_tag) {
char tag[DYNAMIC_TAG_MAX];
int tag_len = tag_compose(ctx->i_ins->tag, mp_sbuf.data, mp_sbuf.size, tag, DYNAMIC_TAG_MAX);
flb_input_chunk_append_raw(ctx->i_ins, tag, tag_len, mp_sbuf.data, mp_sbuf.size);
} else {
flb_input_chunk_append_raw(ctx->i_ins, NULL, 0, mp_sbuf.data, mp_sbuf.size);
}
msgpack_sbuffer_destroy(&mp_sbuf);

return 0;
Expand Down
1 change: 1 addition & 0 deletions tests/runtime/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ if(FLB_OUT_LIB)
FLB_RT_TEST(FLB_IN_HEAD "in_head.c")
FLB_RT_TEST(FLB_IN_DUMMY "in_dummy.c")
FLB_RT_TEST(FLB_IN_RANDOM "in_random.c")
FLB_RT_TEST(FLB_IN_SYSLOG "in_syslog.c")
endif()

# Filter Plugins
Expand Down
147 changes: 147 additions & 0 deletions tests/runtime/in_syslog.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */

#include <fluent-bit.h>
#include <fluent-bit/flb_parser.h>
#include <semaphore.h>
#include "flb_tests_runtime.h"

extern struct flb_output_plugin out_stdout_plugin;

void init_test(void) __attribute__ ((constructor));

static sem_t sem_last_tag;
static char last_tag[256] = { 0 };
static void (*old_cb_stdout_flush)(const void *data, size_t bytes, const char *tag, int tag_len, struct flb_input_instance *i_ins, void *out_context, struct flb_config *config);
static void cb_stdout_flush(const void *data, size_t bytes, const char *tag, int tag_len, struct flb_input_instance *i_ins, void *out_context, struct flb_config *config);

/* Test data */
const char SYSLOG_DATA[] = "<162>1 2019-08-15T15:50:46.866915+03:00 localhost MyApp 5001 Type1 - No chain 'foo' found in org.springframework...\n";
const char SYSLOG_LONG_DATA[] = "<162>1 2019-08-15T15:50:46.866915+03:00 localhost 123456789012345678901234567890123456789X 5001 ABCDEFGHIJKLMNOPQRSTUVWXYZ9876543210 - No chain 'foo' found in org.springframework...\n";

/* Test functions */
void flb_test_syslog(const char* log_data, const char *tag_def, const char *result_tag);
void flb_test_syslog_tagging_static(void);
void flb_test_syslog_tagging_with_ident(void);
void flb_test_syslog_tagging_with_ident_n_msgid(void);
void flb_test_syslog_tagging_minimal(void);
void flb_test_syslog_tagging_oversized(void);
void flb_test_syslog_tagging_long_fields(void);

/* Test list */
TEST_LIST = {
{"tagging_static", flb_test_syslog_tagging_static },
{"tagging_with_ident", flb_test_syslog_tagging_with_ident },
{"tagging_with_ident_n_msgid", flb_test_syslog_tagging_with_ident_n_msgid},
{"tagging_minimal", flb_test_syslog_tagging_minimal},
{"tagging_oversized", flb_test_syslog_tagging_oversized},
{"tagging_long_fields", flb_test_syslog_tagging_long_fields},
{NULL, NULL}
};


void init_test(void)
{
sem_init(&sem_last_tag, 0, 0);
old_cb_stdout_flush = out_stdout_plugin.cb_flush;
out_stdout_plugin.cb_flush = cb_stdout_flush;
}

static void cb_stdout_flush(const void *data, size_t bytes,
const char *tag, int tag_len,
struct flb_input_instance *i_ins,
void *out_context,
struct flb_config *config)
{
strncpy(last_tag, tag, tag_len);
last_tag[tag_len] = '\0';
sem_post(&sem_last_tag);
old_cb_stdout_flush(data, bytes, tag, tag_len, i_ins, out_context, config);
}

void flb_test_syslog_tagging_static(void)
{
flb_test_syslog(SYSLOG_DATA, "Syslog", "Syslog");
}

void flb_test_syslog_tagging_with_ident(void)
{
flb_test_syslog(SYSLOG_DATA, "Syslog_*X", "Syslog_MyAppX");
}

void flb_test_syslog_tagging_with_ident_n_msgid(void)
{
flb_test_syslog(SYSLOG_DATA, "Syslog_*_*X", "Syslog_MyApp_Type1X");
}

void flb_test_syslog_tagging_minimal(void)
{
flb_test_syslog(SYSLOG_DATA, "**", "MyAppType1");
}

void flb_test_syslog_tagging_oversized(void)
{
char input_tag[128];
char output_tag[128];
int i;
for (i = 0; i < 120; i++) {
input_tag[i] = output_tag[i] = '_';
}
input_tag[120] = output_tag[120] = '\0';
strcat(input_tag, "*_*");
strcat(output_tag, "MyApp_T");
flb_test_syslog(SYSLOG_DATA, input_tag, output_tag);
}

// for msgpack string between 32-255 bytes
void flb_test_syslog_tagging_long_fields(void)
{
flb_test_syslog(SYSLOG_LONG_DATA, "*-*", "123456789012345678901234567890123456789X-ABCDEFGHIJKLMNOPQRSTUVWXYZ9876543210");
}

void flb_test_syslog(const char *log_data, const char *tag_def, const char *result_tag)
{
flb_ctx_t *ctx = flb_create();
flb_service_set(ctx, "Flush", "1", "Grace", "1", "Log_Level", "error", NULL);

struct flb_parser *parser = flb_parser_create(
"syslog-rfc5424", "regex", "^\\<(?<pri>[0-9]{1,5})\\>1 (?<time>[^ ]+) (?<host>[^ ]+) (?<ident>[^ ]+) (?<pid>[-0-9]+) (?<msgid>[^ ]+) (?<extradata>(\\[(.*)\\]|-)) (?<message>.+)$",
"%Y-%m-%dT%H:%M:%S.%L", "time", NULL, MK_TRUE, NULL, 0,
NULL, ctx->config);
TEST_CHECK(parser != NULL);

int in_ffd = flb_input(ctx, (char *) "syslog", NULL);
TEST_CHECK(in_ffd >= 0);
flb_input_set(ctx, in_ffd, "mode", "tcp", NULL);
flb_input_set(ctx, in_ffd, "tag", tag_def, NULL);

int out_ffd = flb_output(ctx, (char *) "stdout", NULL);
TEST_CHECK(out_ffd >= 0);
flb_output_set(ctx, out_ffd, "match", "*", NULL);

int flb_ret = flb_start(ctx);
TEST_CHECK(flb_ret == 0);

int sockfd = socket(AF_INET, SOCK_STREAM, 0);
TEST_CHECK(sockfd >= 0);

struct sockaddr_in serv_addr = {
.sin_family = AF_INET,
.sin_port = htons(5140),
.sin_addr.s_addr = htonl(INADDR_LOOPBACK),
.sin_zero = { 0 }
};
TEST_CHECK(connect(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) >= 0);

int data_len = strlen(log_data);
TEST_CHECK(write(sockfd, log_data, data_len) == data_len);
close(sockfd);

sem_wait(&sem_last_tag);

flb_info("[test_in_syslog] received tag: [%s]", last_tag);
TEST_CHECK(strcmp(last_tag, result_tag) == 0);

flb_stop(ctx);
flb_destroy(ctx);
}

0 comments on commit 1e36019

Please sign in to comment.