-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.py
138 lines (121 loc) · 4.44 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
"""kafka-spark-python
This is a skeleton application for processing stream data from Apache
Kafka with Apache Spark. It will read messages on an input topic and
simply echo those message to the output topic.
This application uses Spark's _Structured Streaming_ interface, for
more information please see
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
"""
import argparse
import importlib.machinery as importlib
import logging
import os
import types as pytypes
import urllib.request as urllib
import pyspark.sql as sql
import pyspark.sql.types as types
import pyspark.sql.functions as functions
def main(args):
"""Configure and start the Kafka stream processor"""
# acquire a SparkSession object
spark = (
sql
.SparkSession
.builder
.appName('kafka-spark-python')
.getOrCreate()
)
# if a user function is specified, download it and import it
if args.userfunction is not None:
try:
logging.info('downloading user function')
logging.info(args.userfunction)
dl = urllib.urlretrieve(args.userfunction)[0]
loader = importlib.SourceFileLoader('userfunction', dl)
userfunction = pytypes.ModuleType(loader.name)
loader.exec_module(userfunction)
user_function = functions.udf(
userfunction.user_defined_function, types.StringType())
logging.info('user function loaded')
except Exception as e:
logging.error('failed to import user function file')
logging.error(e)
user_function = None
# configure the operations to read the input topic
records = (
spark
.readStream
.format('kafka')
.option('kafka.bootstrap.servers', args.brokers)
.option('subscribe', args.intopic)
.load()
.select(
functions.column('value').cast(types.StringType()).alias('value'))
# add your data operations here, the raw message is passed along as
# the alias `value`.
#
# for example, to process the message as json and create the
# corresponding objects you could do the following:
#
# .select(
# functions.from_json(
# functions.column('value'), msg_struct).alias('json'))
#
# the following operations would then access the object and its
# properties using the name `json`.
)
# if it exists, add the user function to the stream pipeline
if user_function is not None:
records = (
records
.select(user_function(functions.column('value')).alias('value'))
.where('value is not null')
)
# configure the output stream
writer = (
records
.writeStream
.format('kafka')
.option('kafka.bootstrap.servers', args.brokers)
.option('topic', args.outtopic)
.option('checkpointLocation', '/tmp')
.start()
)
# begin processing the input and output topics
writer.awaitTermination()
def get_arg(env, default):
return os.getenv(env) if os.getenv(env, '') is not '' else default
def parse_args(parser):
args = parser.parse_args()
args.brokers = get_arg('KAFKA_BROKERS', args.brokers)
args.intopic = get_arg('KAFKA_IN_TOPIC', args.intopic)
args.outtopic = get_arg('KAFKA_OUT_TOPIC', args.outtopic)
args.userfunction = get_arg('USER_FUNCTION_URI', args.userfunction)
return args
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
logging.info('starting kafka-spark-python')
parser = argparse.ArgumentParser(
description='copy messages from one stream to another')
parser.add_argument(
'--brokers',
help='The bootstrap servers, env variable KAFKA_BROKERS',
default='localhost:9092')
parser.add_argument(
'--in-topic',
dest='intopic',
help='Topic to publish to, env variable KAFKA_TOPIC',
default='topic1')
parser.add_argument(
'--out-topic',
dest='outtopic',
help='Topic to publish to, env variable KAFKA_TOPIC',
default='topic2')
parser.add_argument(
'--user-function',
dest='userfunction',
help='URI to a user function .py file, env variable '
'USER_FUNCTION_URI')
args = parse_args(parser)
main(args)
logging.info('exiting')