Dynamic Fraud Detection Engine
build on top of core Flink functionality, impls three powerful features:
- Dynamic data partitioning/shuffle at runtime, controlled based on a set of broadcast rules instead of using a hard-coded
KeySelector
.Provides the ability to change how events are distributed and grouped by Flink at runtime. Such functionality often becomes a natural requirement when building jobs with dynamically reconfigurable application logic.
- Dynamic updates of fraud detection logic based on modifiable rules.
Allow Flink jobs to change at runtime, without downtime from stopping and resubmitting the code.
- Low latency alerting based on custom windowing logic (without using the window API)
Utilize the low level process function API to implement low latency alerting on Windows and how to limit state growth with timers.
These features expand the possibilities of what is achievable with statically defined data flows and provide the building blocks to fulfill complex business requirements. When used together, can eliminate the need to recompile the code and redeploy Flink jobs for a wide range of modifications of the business logic.
In order to fulfill complex fraud detection requirements, rules are expected to change on a frequent basis without require recompilation of the job with each rule modification.
Rule definitions with following JSON schema. At this point, it is important to understand that groupingKeys
determine the actual physical grouping of events - all events with the same values of specified parameters
(e.g. payer #25 -> beneficiary #12) have to be aggregated in the same physical instance of the evaluating operator.
{
"id": 1,
"state": "ACTIVE",
"groupingKeys": [
"key1",
"key2"
],
"aggs": [
{
"field": "key1",
"name": "amount",
"func": "SUM"
}
],
"limit": "\"amount\" > 50",
"filter": "",
"prune": {
"enabled": true,
"reserved": [
"timestamp",
"guid"
]
},
"windowSize": 1000,
"command": "BROADCAST_RULE"
}
-
state
has values ofACTIVE
,PAUSE
,DELETE
andCONTROL
,CONTROL
will be used for a rule act as a COMMAND. -
groupingKeys
used for dynamic partitioning/shuffle events. -
aggs
arrays of aggregation with following JSON schema:{ "field": "field_name", "name": "amount", "func": "SUM" }
field
specified which field will be aggregated.name
named this aggregation, it is a variable name maybe used inlimit
DSL expression.It'll be the field name if not be explicit specified.
func
aggregation function values ofSUM
,AVG
,MIN
,MAX
,GROUP
, and maybe others in the future. The aggregated result will be used for evaluating thelimit
DSL expression.
-
limit
DSL expression for evaluating aggregations and low-latency alerting. -
filter
DSL expression for filtering of certain events. -
prune
configure events pruning enabled or not, and fields should be reserved when pruning enabled, pruning is default enabled. -
windowSize
in millisecond for controlling alerting latency. -
command
has values ofBROADCAST_RULE
,CLEAR_STATE_ALL
,CLEAR_STATE_ALL_STOP
,DELETE_RULES_ALL
, andEXPORT_RULES_CURRENT
, default valueBROADCAST_RULE
.
It is easy to imagine adding extensions, such as domain specific language (DSL) for more sophisticated rule definitions, including filtering of certain events, logical rules chaining, and other more advanced functionality.
To investigate the core concepts of Dynamic Rule Engine and underlying DSL, let us start with articulating a realistic sample rule definition for our fraud detection system in the form of a financial domain requirement:
If the sum of payments from the same payer to the same beneficiary within a 4 hours period at nighttime 00:00:00 ~ 06:00:00 is greater than 200$ - trigger an alert.
Let's assume the transaction logs or events has following JSON schema:
{
"id": 8940923213314405583,
"payeeId": 11,
"beneficiaryId": 6,
"payment": {
"amount": 15.72,
"currency": "USD"
},
"timestamp": 1620389265534
}
The financial domain requirement can be described using our rule definition:
{
"id": 1,
"state": "ACTIVE",
"groupingKeys": [
"payeeId",
"beneficiaryId"
],
"aggs": [
{
"field": "payment.amount",
"name": "amt",
"func": "SUM"
}
],
"limit": "\"amt\" > 200",
"filter": "time(\"timestamp\") >= \"00:00:00\" && time(\"timestamp\") <= \"06:00:00\"",
"windowSize": 14400000,
"command": "BROADCAST_RULE"
}
The limit expression "amt" > 200
, and filter expression time("timestamp") >= "00:00:00" && time("timestamp") <= "06:00:00"
are our custom DSL expression, it is very intuitive.
DSL supported operators and operands are:
- logical operators
&&
,||
,!
, or the equivalent wordand
,or
,not
. - comparison operators
>
,>=
,<
,<=
,===
,=!=
NOTE: we did not use
==
and!=
for equality comparison, we use===
,=!=
- arithmetic operators
+
,-
,*
,/
,%
- field extractor/operand
""
(double quotes), support nested. e.g."timestamp"
,"amt"
,"payment.amount"
NOTE: only lhs(left hand side)
""
interpreted as field extractor - field exist determine operand
exist("field")
- time interpret operands
time()
,date()
,datetime()
. e.g.time("timestamp")
,date("timestamp")
,datetime("timestamp")
the value of the field must be any one of following:
- Unix timestamp
- String time value with format
HH:mm:ss
orHH:mm:ss.SSS
- String date value with format
yyyy-MM-dd
- String datetime value with format
yyyy-MM-dd HH:mm:ss
oryyyy-MM-dd'T'HH:mm:ss.SSS+0800
As a result of we use ""
(double quotes) as field extractor, we cannot write expression like "field_name" <= "const_value"
.
To support represents string constants by double quotes, following special operands are supported:
field("field_name")
is an explicit field extractorconst(literal_value)
, e.g.const(5)
,const(0.5)
,const("string_constant")
NOTE: If we want addition a number with the value of field: "field" + 5 <= 100, we SHOULD write like
field("field") + 5 <= 100
DSL expression supports time, date, datetime constants, they SHOULD be written with format:
- time format
HH:mm:ss
- date format
yyyy-MM-dd
- datetime format
yyyy-MM-dd HH:mm:ss
Special Operands for String value field, assumes some field's value are string:
"field" #== "prefix"
test String value field whether starts with prefix"field" =@= "infix"
test String value field whether contains infix"field" ==# "suffix"
test String value field whether ends with suffix"field" =#= "regex_pattern"
test String value field whether regex matching pattern
-
determine in simple collection
"field" in (v1,v2,v3)
test value offield
in collection[v1, v2, v3]
, the value can be any numerical value or string."field" =:= (v1,v2,v3)
test value offield
in collection[v1, v2, v3]
, the value can be any numerical value or string
-
determine not in simple collection
"field" not in (v1,v2,v3)
test value offield
not in collection(v1,v2,v3)
the value can be any numerical value or string.!("field" =:= (v1,v2,v3))
test value offield
not in collection(v1,v2,v3)
the value can be any numerical value or string.
-
high-order condition ensure in complex collection, for example:
{ "a": { "b": { "key1": ["1", "2", "3"], "key2": [1, 2, 3, 4], "key3": [ { "key": "k1", "val": "v1" }, { "key": "k2", "val": "v2" }, { "key": "k3", "val": "v3" }, { "key": "k4", "val": "v4" } ], "c": { "k1": 10, "k2": 3, "k3": 5, "k5": 12 } }, "key1": 10 }, "key1": 100 }
-
dsl supports have expression
"a" have "key1" === 10 "a.b.key1" have "3" "a.b.key2" have 3
-
dsl supports match expression
"a.b.key3" none matches ("key" === "k5" && "val" === "v5") "a.b.key3" all matches (("key" #== "k") && ("val" #== "v")) "a.b.key3" any matches ("key" ==# "3") && ("key" =#= "^k\\d$")
-
dsl supports match the
unknown key name
in case likeThe total number of payer details whoever accessed is greater than
100
, or the total COUNT of accessed the same one payer details is greater than10
- trigger an alert.("a.b.c" value have any matches ( ? > 10)) || ("a.b.c" have size >= 100)
-
We provide DSL formatter and preview feature:
val expr = 1 / 2 * (x + 1)
will be formatted as
1
- * (x + 1)
2
- Start
netcat
:
nc -lk 9999
- Run main method of
com.fsa.job.FlinkJob
- Submit
Rule
to netcat in one line JSON format
{"id":1,"state":"ACTIVE","groupingKeys":["payeeId","beneficiaryId"],"aggs":[{"field":"payment.amount","name":"amt","func":"SUM"}],"limit":"\"amt\" > 20","filter":"time(\"timestamp\") >= \"07:30:00\" && time(\"timestamp\") <= \"22:00:00\""}
{"id":1,"state":"DELETE"}
{"id":0,"state":"CONTROL","command":"DELETE_RULES_ALL"}
{"id":0,"state":"CONTROL","command":"EXPORT_RULES_CURRENT"}
{"id":0,"state":"CONTROL","command":"CLEAR_STATE_ALL"}
--data-source kafka --rules-source kafka --alerts-sink kafka --rules-export-sink kafka
- Using
COUNT
as the aggregate field for counting the events - Using
COUNT_WITH_RESET
as the aggregate field for counting the events, and clear all events after counting.
{"id":1,"state":"ACTIVE","groupingKeys":["payeeId","beneficiaryId"],"aggs":[{"field":"COUNT","name":"count","func":"SUM"}],"limit":"\"count\" > 50","filter":"time(\"timestamp\") >= \"00:00:00\" && time(\"timestamp\") <= \"06:00:00\""}
{"id":1,"state":"ACTIVE","groupingKeys":["payeeId","beneficiaryId"],"aggs":[{"field":"COUNT_WITH_RESET","name":"count","func":"SUM"}],"limit":"\"count\" > 50","filter":"time(\"timestamp\") >= \"00:00:00\" && time(\"timestamp\") <= \"06:00:00\""}