Hands-on guide to use Apache Flink app in Kinesis Data Analytics to aggregate time-series data with tumbling & sliding window in real-time.
A tumbling windows assigner assigns each element to a window of a specified window size. Tumbling windows have a fixed size and do not overlap. For example, if you specify a tumbling window with a size of 5 minutes, the current window will be evaluated and a new window will be started every five minutes as illustrated by the following figure.
Will be generated by the send.py and sent to Kinesis Data Stream.
[
{
"type": "Feature",
"properties": {
"RECEIVED_ON": "2020-09-14T09:20:22.385",
"N02_001": "14",
"N02_002": "5",
"N02_003": "北海道新幹線",
"N02_004": "西日本旅客鉄道",
"ID": "5_14",
"COUNT": 20
}
},
{
"type": "Feature",
"properties": {
"RECEIVED_ON": "2020-09-14T09:22:25.325",
"N02_001": "14",
"N02_002": "5",
"N02_003": "北海道新幹線",
"N02_004": "西日本旅客鉄道",
"ID": "5_14",
"COUNT": 30
}
}
...
]
RAILWAY_CLASS | RAILWAY_CLASS_COUNT | WINDOW_START | WINDOW_END |
---|---|---|---|
12 | 10 | 2020-09-14T09:22:00Z | 2020-09-14T09:23:00Z |
12 | 11 | 2020-09-14T09:23:00Z | 2020-09-14T09:24:00Z |
14 | 20 | 2020-09-14T09:22:00Z | 2020-09-14T09:23:00Z |
15 | 13 | 2020-09-14T09:23:00Z | 2020-09-14T09:24:00Z |
aws rds create-db-subnet-group \
--db-subnet-group-name database-vpc-subnet-group \
--db-subnet-group-description database_vpc_subnet_group \
--subnet-ids <subnet_id_1> <subnet_id_2> <subnet_id_3>
aws rds create-db-cluster --db-cluster-identifier sls-postgres \
--engine aurora-postgresql --engine-version 10.7 --engine-mode serverless \
--scaling-configuration MinCapacity=2,MaxCapacity=4 \
--enable-http-endpoint \
--master-username <username> --master-user-password <password> \
--enable-http-endpoint \
--db-subnet-group-name database-vpc-subnet-group \
--vpc-security-group-ids <security-group-id>
aws secretsmanager create-secret --name sls-postgres-secret --secret-string "file://creds-sls-postgres.json"
$ aws rds-data execute-statement --resource-arn "<cluster-arn>" --secret-arn "<secret-arn>" --sql "SELECT datname FROM pg_database" --database "postgres"
$ aws rds-data execute-statement --resource-arn "<cluster-arn>" --secret-arn "<secret-arn>" --sql "CREATE DATABASE monitoring" --database "postgres"
$ aws rds-data execute-statement --resource-arn "<cluster-arn>" --secret-arn "<secret-arn>" --sql "create table tumbling(RAILWAY_CLASS varchar(20), RAILWAY_CLASS_COUNT bigint, WINDOW_START timestamp, WINDOW_END timestamp, RECEIVED_ON timestamp, PRIMARY KEY(RAILWAY_CLASS, WINDOW_START, WINDOW_END))" --database "monitoring"
curl -LJO https://github.com/apache/flink/archive/release-1.8.2.zip && unzip flink-release-1.8.2.zip && cd flink-release-1.8.2
mvn clean install -Pinclude-kinesis -DskipTests -pl flink-connectors/flink-connector-kinesis
- The built shaded jar file is under the target folder.
mvn package -Dflink.version=1.8.2
Follow the AWS official document Create and Run the Application (Console)
- Key: "JOB_CLASS_NAME", Value: "StreamJobSqlTumbling"
- Key: "AWS_REGION", Value: "< your-aws-region >"
- Key: "INPUT_STREAM_NAME", Value: "kda_geojson"
- Key: "STREAM_INITIAL_POSITION", Value: "LATEST"
- Key: "DATABASE", Value: The database name set as 'monitoring' from 1-6
- Key: "RESOURCE_ARN", Value: The datbase cluster ARN from 1-1
- Key: "SECRET_ARN", Value: The secret ARN from 1-2
- Key: "THRESHOLD", Value: 4
- Key: "CHECKPOINT_INTERVAL", Value: 30000
- Key: "INTERVAL_AMOUNT", Value: 1
- Key: "INTERVAL_UOM", Value: "MINUTE(2)"
Default role name kinesis-analytics-flink-analytics-monitoring-< your-region >
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"secretsmanager:GetSecretValue"
],
"Resource": [
"*"
],
"Effect": "Allow"
}
]
}
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "ReadInputStream",
"Effect": "Allow",
"Action": "kinesis:*",
"Resource": "*"
}
]
}
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"rds-data:BatchExecuteStatement",
"rds-data:BeginTransaction",
"rds-data:CommitTransaction",
"rds-data:ExecuteStatement",
"rds-data:RollbackTransaction",
"rds-data:DeleteItems",
"rds-data:ExecuteSql",
"rds-data:GetItems",
"rds-data:InsertItems",
"rds-data:UpdateItems"
],
"Resource": [
"*"
],
"Effect": "Allow"
}
]
}
select * from tumbling
https://docs.aws.amazon.com/en_pv/kinesisanalytics/latest/java/examples-cloudwatch.html
https://docs.aws.amazon.com/en_pv/kinesisanalytics/latest/java/examples-sliding.html
https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples
Build and run streaming applications with Apache Flink and Amazon Kinesis Data Analytics for Java Applications
https://github.com/aws-samples/amazon-kinesis-analytics-taxi-consumer https://aws.amazon.com/blogs/big-data/build-and-run-streaming-applications-with-apache-flink-and-amazon-kinesis-data-analytics-for-java-applications/
https://qiita.com/masato/items/32d84f117152ea0fdb0b
https://medium.com/@cjolif/hands-on-with-event-stream-processing-frameworks-8be69101a1c8 https://github.com/cjolif/streaming-examples/blob/master/flink/src/main/java/Example.java
https://github.com/aws-samples/amazon-kinesis-data-analytics-flinktableapi
https://riptutorial.com/apache-flink/example/27901/simple-aggregation-from-a-csv
https://tech.signavio.com/2017/postgres-flink-sink
https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html
https://ci.apache.org/projects/flink/flink-docs-stable/flinkDev/ide_setup.html
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/ https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/windows.html
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/tableApi.html https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sql.html
https://www.infoq.cn/article/flink-api-table-api-and-sql
https://ci.apache.org/projects/flink/flink-docs-release-1.8/api/java/org/apache/flink/streaming/api/datastream/DataStream.html https://ci.apache.org/projects/flink/flink-docs-release-1.8/api/java/org/apache/flink/streaming/api/datastream/KeyedStream.html https://ci.apache.org/projects/flink/flink-docs-release-1.8/api/java/org/apache/flink/streaming/api/datastream/WindowedStream.html