Skip to content

Commit

Permalink
encoding: add flb_encoding support to in_tail-plugin
Browse files Browse the repository at this point in the history
* Adds new option:  "encoding" to in_tail.
* If encoding fails. message is skipped.

Signed-off-by: Jukka Pihl <[email protected]>
  • Loading branch information
bluebike committed Aug 4, 2020
1 parent d4986e8 commit 145814e
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 1 deletion.
8 changes: 8 additions & 0 deletions plugins/in_tail/tail.c
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,14 @@ static struct flb_config_map config_map[] = {
},
#endif

#ifdef FLB_HAVE_UTF8_ENCODER
{
FLB_CONFIG_MAP_STR, "encoding", NULL,
0, FLB_FALSE, 0,
"specify the charset encoder to decode message",
},
#endif

/* Multiline Options */
#ifdef FLB_HAVE_PARSER
{
Expand Down
21 changes: 21 additions & 0 deletions plugins/in_tail/tail_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins,
#ifdef FLB_HAVE_SQLDB
ctx->db_sync = -1;
#endif
#ifdef FLB_HAVE_UTF8_ENCODER
ctx->encoding = NULL;
#endif

/* Load the config map */
ret = flb_input_config_map_set(ins, (void *) ctx);
Expand Down Expand Up @@ -151,6 +154,18 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins,
}
#endif

#ifdef FLB_HAVE_UTF8_ENCODER
tmp = flb_input_get_property("encoding", ins);
if (tmp) {
ctx->encoding = flb_encoding_open(tmp);
if (!ctx->encoding) {
flb_plg_error(ctx->ins,"illegal encoding: %s", tmp);
flb_tail_config_destroy(ctx);
return NULL;
}
}
#endif

/* Config: Docker mode */
if(ctx->docker_mode == FLB_TRUE) {
ret = flb_tail_dmode_create(ctx, ins, config);
Expand Down Expand Up @@ -290,6 +305,12 @@ int flb_tail_config_destroy(struct flb_tail_config *config)
}
#endif

#ifdef FLB_HAVE_UTF8_ENCODER
if(config->encoding) {
flb_encoding_close(config->encoding);
}
#endif

flb_free(config);
return 0;
}
7 changes: 7 additions & 0 deletions plugins/in_tail/tail_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
#ifdef FLB_HAVE_REGEX
#include <fluent-bit/flb_regex.h>
#endif
#ifdef FLB_HAVE_UTF8_ENCODER
#include <fluent-bit/flb_encoding.h>
#endif

/* Metrics */
#ifdef FLB_HAVE_METRICS
Expand Down Expand Up @@ -90,6 +93,10 @@ struct flb_tail_config {
sqlite3_stmt *stmt_offset;
#endif

#ifdef FLB_HAVE_UTF8_ENCODER
struct flb_encoding *encoding;
#endif

/* Parser / Format */
struct flb_parser *parser;

Expand Down
27 changes: 26 additions & 1 deletion plugins/in_tail/tail_file.c
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ static int process_content(struct flb_tail_file *file, off_t *bytes)
msgpack_sbuffer *out_sbuf;
msgpack_packer *out_pck;
struct flb_tail_config *ctx = file->config;
#ifdef FLB_HAVE_UTF8_ENCODER
char *decoded;
size_t decoded_len;
#endif

/* Create a temporal msgpack buffer */
msgpack_sbuffer_init(&mp_sbuf);
Expand Down Expand Up @@ -238,6 +242,20 @@ static int process_content(struct flb_tail_file *file, off_t *bytes)
line_len = len - crlf;
repl_line = NULL;


#ifdef FLB_HAVE_UTF8_ENCODER
decoded = NULL;
if(ctx->encoding) {
ret = flb_encoding_decode(ctx->encoding, line, line_len, &decoded, &decoded_len);
if (ret != FLB_ENCODING_SUCCESS) {
flb_plg_error(ctx->ins, "encoding failed '%.*s'", line_len, line);
goto go_next;
}
line = decoded;
line_len = decoded_len;
}
#endif

if (ctx->docker_mode) {
ret = flb_tail_dmode_process_content(now, line, line_len,
&repl_line, &repl_line_len,
Expand Down Expand Up @@ -289,7 +307,7 @@ static int process_content(struct flb_tail_file *file, off_t *bytes)
/* Parser failed, pack raw text */
flb_time_get(&out_time);
flb_tail_file_pack_line(out_sbuf, out_pck, &out_time,
data, len, file);
line, len, file);
}
}
else if (ctx->multiline == FLB_TRUE) {
Expand Down Expand Up @@ -332,6 +350,12 @@ static int process_content(struct flb_tail_file *file, off_t *bytes)
processed_bytes += len + 1;
file->parsed = 0;
lines++;
#ifdef FLB_HAVE_UTF8_ENCODER
if(decoded) {
flb_free(decoded);
decoded = NULL;
}
#endif
}
file->parsed = file->buf_len;
*bytes = processed_bytes;
Expand All @@ -344,6 +368,7 @@ static int process_content(struct flb_tail_file *file, off_t *bytes)
out_sbuf->size);

msgpack_sbuffer_destroy(out_sbuf);

return lines;
}

Expand Down

0 comments on commit 145814e

Please sign in to comment.