Skip to content

Commit

Permalink
feat: Merge pull request #12 from tropnikovvl/main
Browse files Browse the repository at this point in the history
Added cloudwatch decompression and data message extraction
  • Loading branch information
fdmsantos authored Feb 28, 2024
2 parents 34af9cd + c9e39e5 commit 7686e0e
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 3 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ Supports all destinations and all Kinesis Firehose Features.
* [Version 3.1.0](#version-310)
* [License](#license)


## Module versioning rule

| Module version | AWS Provider version |
Expand Down Expand Up @@ -95,6 +94,7 @@ Supports all destinations and all Kinesis Firehose Features.
- Original Data Backup in S3
- Logging and Encryption
- Application Role to Direct Put Sources
- Turn on/off cloudwatch logs decompressing and data message extraction
- Permissions
- IAM Roles
- Opensearch / Opensearch Serverless Service Role
Expand Down Expand Up @@ -981,6 +981,8 @@ No modules.
| <a name="input_elasticsearch_index_rotation_period"></a> [elasticsearch\_index\_rotation\_period](#input\_elasticsearch\_index\_rotation\_period) | The Elasticsearch index rotation period. Index rotation appends a timestamp to the IndexName to facilitate expiration of old data | `string` | `"OneDay"` | no |
| <a name="input_elasticsearch_retry_duration"></a> [elasticsearch\_retry\_duration](#input\_elasticsearch\_retry\_duration) | The length of time during which Firehose retries delivery after a failure, starting from the initial request and including the first attempt | `string` | `300` | no |
| <a name="input_elasticsearch_type_name"></a> [elasticsearch\_type\_name](#input\_elasticsearch\_type\_name) | The Elasticsearch type name with maximum length of 100 characters | `string` | `null` | no |
| <a name="input_enable_cloudwatch_logs_data_message_extraction"></a> [enable\_cloudwatch\_logs\_data\_message\_extraction](#input\_enable\_cloudwatch\_logs\_data\_message\_extraction) | Cloudwatch Logs data message extraction | `bool` | `false` | no |
| <a name="input_enable_cloudwatch_logs_decompression"></a> [enable\_cloudwatch\_logs\_decompression](#input\_enable\_cloudwatch\_logs\_decompression) | Enables or disables Cloudwatch Logs decompression | `bool` | `false` | no |
| <a name="input_enable_data_format_conversion"></a> [enable\_data\_format\_conversion](#input\_enable\_data\_format\_conversion) | Set it to true if you want to disable format conversion. | `bool` | `false` | no |
| <a name="input_enable_destination_log"></a> [enable\_destination\_log](#input\_enable\_destination\_log) | The CloudWatch Logging Options for the delivery stream | `bool` | `true` | no |
| <a name="input_enable_dynamic_partitioning"></a> [enable\_dynamic\_partitioning](#input\_enable\_dynamic\_partitioning) | Enables or disables dynamic partitioning | `bool` | `false` | no |
Expand Down
24 changes: 22 additions & 2 deletions locals.tf
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ locals {
is_search_destination = contains(["elasticsearch", "opensearch", "opensearchserverless"], local.destination) ? true : false

# Data Transformation
enable_processing = var.enable_lambda_transform || var.enable_dynamic_partitioning
enable_processing = var.enable_lambda_transform || var.enable_dynamic_partitioning || var.enable_cloudwatch_logs_decompression
lambda_processor = var.enable_lambda_transform ? {
type = "Lambda"
parameters = [
Expand Down Expand Up @@ -97,11 +97,31 @@ locals {
record_deaggregation_processor = (var.enable_dynamic_partitioning && var.dynamic_partition_enable_record_deaggregation ?
(var.dynamic_partition_record_deaggregation_type == "JSON" ? local.record_deaggregation_processor_json : local.record_deaggregation_processor_delimiter)
: null)
cloudwatch_logs_decompression_processor = var.enable_cloudwatch_logs_decompression ? {
type = "Decompression"
parameters = [
{
name = "CompressionFormat"
value = "GZIP"
}
]
} : null
cloudwatch_logs_data_message_extraction_processor = var.enable_cloudwatch_logs_decompression && var.enable_cloudwatch_logs_data_message_extraction ? {
type = "CloudWatchLogProcessing"
parameters = [
{
name = "DataMessageExtraction"
value = tostring(var.enable_cloudwatch_logs_data_message_extraction)
},
]
} : null
processors = [for each in [
local.lambda_processor,
local.metadata_extractor_processor,
local.append_delimiter_processor,
local.record_deaggregation_processor
local.record_deaggregation_processor,
local.cloudwatch_logs_decompression_processor,
local.cloudwatch_logs_data_message_extraction_processor
] : each if local.enable_processing && each != null]

# Data Format conversion
Expand Down
12 changes: 12 additions & 0 deletions variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,18 @@ variable "msk_source_connectivity_type" {
######
# S3 Destination Configurations
######
variable "enable_cloudwatch_logs_decompression" {
description = "Enables or disables Cloudwatch Logs decompression"
type = bool
default = false
}

variable "enable_cloudwatch_logs_data_message_extraction" {
description = "Cloudwatch Logs data message extraction"
type = bool
default = false
}

variable "enable_dynamic_partitioning" {
description = "Enables or disables dynamic partitioning"
type = bool
Expand Down

0 comments on commit 7686e0e

Please sign in to comment.