-
Notifications
You must be signed in to change notification settings - Fork 0
/
push-to-neo4j.py
80 lines (72 loc) · 3.19 KB
/
push-to-neo4j.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import sys
from operator import add
from pyspark.sql import SparkSession
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("push-to-neo4j") \
.getOrCreate()
# load suppliers
suppliers = spark.read.jdbc(
"jdbc:postgresql://data-import-experiment-postgres.cgzrnuxjdpso.us-east-1.rds.amazonaws.com:5432/northwind",
"suppliers",
numPartitions=5,
properties={"user": "postgres", "password": "mypassword", "driver": "org.postgresql.Driver"})
suppliers.show()
# create suppliers nodes
suppliers \
.select("supplierid", "companyname", "address", "city", "region", "postalcode", "country") \
.withColumnsRenamed(
{"supplierid": "id", "companyname": "name", "address": "address", "city": "city", "region": "region",
"postalcode": "postalcode", "country": "country"}) \
.write \
.format("org.neo4j.spark.DataSource") \
.mode("overwrite") \
.option("url", "neo4j://my-neo4j-release:7687") \
.option("authentication.basic.username", "neo4j") \
.option("authentication.basic.password", "passw0rd") \
.option("labels", ":Supplier") \
.option("node.keys", "id") \
.option("schema.optimization.type", "NODE_CONSTRAINTS") \
.save()
# load products table
products = spark.read.jdbc(
"jdbc:postgresql://data-import-experiment-postgres.cgzrnuxjdpso.us-east-1.rds.amazonaws.com:5432/northwind",
"products",
numPartitions=5,
properties={"user": "postgres", "password": "mypassword", "driver": "org.postgresql.Driver"})
products.show()
# create products nodes
products.select("productid", "productname", "unitprice", "discontinued") \
.withColumnsRenamed(
{"productid": "id", "productname": "name", "unitprice": "price", "discontinued": "discontinued"}) \
.write \
.format("org.neo4j.spark.DataSource") \
.mode("overwrite") \
.option("url", "neo4j://my-neo4j-release:7687") \
.option("authentication.basic.username", "neo4j") \
.option("authentication.basic.password", "passw0rd") \
.option("labels", ":Product") \
.option("node.keys", "id") \
.option("schema.optimization.type", "NODE_CONSTRAINTS") \
.save()
# create Supplies relationship
products \
.select("productid", "supplierid") \
.repartition(1) \
.write \
.format("org.neo4j.spark.DataSource") \
.mode("overwrite") \
.option("url", "neo4j://my-neo4j-release:7687") \
.option("authentication.basic.username", "neo4j") \
.option("authentication.basic.password", "passw0rd") \
.option("relationship", "SUPPLIES") \
.option("relationship.save.strategy", "keys") \
.option("relationship.source.labels", ":Supplier") \
.option("relationship.source.save.mode", "match") \
.option("relationship.source.node.keys", "supplierid:id") \
.option("relationship.target.labels", ":Product") \
.option("relationship.target.save.mode", "match") \
.option("relationship.target.node.keys", "productid:id") \
.save()
spark.stop()