These are the beginnings of a Connector from Neo4j to Apache Spark 2.1 using the new binary protocol for Neo4j, Bolt.
Find more information about the Bolt protocol, available drivers and documentation.
Please note that I still know very little about Apache Spark and might have done really dumb things. Please let me know by creating an issue or even better submitting a pull request to this repo.
This neo4j-spark-connector is Apache 2 Licensed
Build target/neo4j-spark-connector_2.11-full-2.4.5-M2.jar
for Scala 2.11
mvn clean package
spark-shell, pyspark, or spark-submit
$SPARK_HOME/bin/spark-shell --jars neo4j-spark-connector_2.11-full-2.4.5-M2.jar
$SPARK_HOME/bin/spark-shell --packages neo4j-contrib:neo4j-spark-connector:2.4.5-M2
sbt
If you use the sbt-spark-package plugin, in your sbt build file, add:
scala spDependencies += "neo4j-contrib/neo4j-spark-connector:2.4.5-M2"
Otherwise,
resolvers += "Spark Packages Repo" at "http://dl.bintray.com/spark-packages/maven"
libraryDependencies += "neo4j-contrib" % "neo4j-spark-connector" % "2.4.5-M2"
maven
In your pom.xml, add:
<dependencies>
<!-- list of dependencies -->
<dependency>
<groupId>neo4j-contrib</groupId>
<artifactId>neo4j-spark-connector</artifactId>
<version>2.4.5-M2</version>
</dependency>
</dependencies>
<repositories>
<!-- list of other repositories -->
<repository>
<id>SparkPackagesRepo</id>
<url>http://dl.bintray.com/spark-packages/maven</url>
</repository>
</repositories>
If you're running Neo4j on localhost with the default ports, you only have to configure your password in spark.neo4j.password=<password>
.
Otherwise set the spark.neo4j.url
in your SparkConf
pointing e.g. to bolt://host:port
.
You can provide user and password as part of the URL bolt://neo4j:<password>@localhost
or individually in spark.neo4j.user
and spark.neo4j.password
.
If you're running Neo4j with the Bolt connector and the option dbms.connector.tls_level
in Neo4j is REQUIRED
, you must set the spark.neo4j.encryption.status
to true
in your SparkConf
.
Otherwise, you can either ignore spark.neo4j.encryption
or set spark.neo4j.encryption
to false
in your SparkConf
.
Nb. The old spark config prefix spark.neo4j.bolt
is deprecated and will be removed in the next release.
Starting with version 2.4.5-M2 you can use a fluent builder API to declare the queries or patterns you want to use, but also partitions, total-rows and batch-sizes and then select which Apache Spark Type to load.
This library supports:
RDD[Row], RDD[T]
(loadRowR)DataFrame
- GraphX
Graph
GraphFrame
The general usage is
- create
org.neo4j.spark.Neo4j(sc)
- set
cypher(query,[params])
,nodes(query,[params])
,rels(query,[params])
as direct query, or
pattern("Label1",Seq("REL"),"Label2")
orpattern(("Label1","prop1",("REL","prop"),("Label2","prop2))
- optionally define
partitions(n)
,batch(size)
,rows(count)
for parallelism - choose which datatype to return
loadRowRdd
,loadNodeRdds
,loadRelRdd
,loadRdd[T]
loadDataFrame
,loadDataFrame(schema)
loadGraph[VD,ED]
loadGraphFrame[VD,ED]
- execute Spark Operations
- save graph back:
saveGraph(grap, [pattern],[nodeProp],[merge=false])
For Example:
org.neo4j.spark.Neo4j(sc).cypher("MATCH (n:Person) RETURN n.name").partitions(5).batch(10000).loadRowRdd
UNWIND range(1,100) as id
CREATE (p:Person {id:id}) WITH collect(p) as people
UNWIND people as p1
UNWIND range(1,10) as friend
WITH p1, people[(p1.id + friend) % size(people)] as p2
CREATE (p1)-[:KNOWS {years: abs(p2.id - p2.id)}]->(p2)
Start the Spark-Shell with
$SPARK_HOME/bin/spark-shell --packages neo4j-contrib:neo4j-spark-connector:2.4.5-M2
import org.neo4j.spark._
val neo = Neo4j(sc)
val rdd = neo.cypher("MATCH (n:Person) RETURN id(n) as id ").loadRowRdd
rdd.count
// inferred schema
rdd.first.schema.fieldNames
// => ["id"]
rdd.first.schema("id")
// => StructField(id,LongType,true)
neo.cypher("MATCH (n:Person) RETURN id(n)").loadRdd[Long].mean
// => res30: Double = 236696.5
neo.cypher("MATCH (n:Person) WHERE n.id <= $maxId RETURN n.id").param("maxId", 10).loadRowRdd.count
// => res34: Long = 10
// provide partitions and batch-size
neo.nodes("MATCH (n:Person) RETURN id(n) SKIP $_skip LIMIT $_limit").partitions(4).batch(25).loadRowRdd.count
// => 100 == 4 * 25
// load via pattern
neo.pattern("Person",Seq("KNOWS"),"Person").rows(80).batch(21).loadNodeRdds.count
// => 80 = b/c 80 rows given
// load relationships via pattern
neo.pattern("Person",Seq("KNOWS"),"Person").partitions(12).batch(100).loadRelRdd.count
// => 1000
import org.neo4j.spark._
val neo = Neo4j(sc)
// load via Cypher query
neo.cypher("MATCH (n:Person) RETURN id(n) as id SKIP $_skip LIMIT $_limit").partitions(4).batch(25).loadDataFrame.count
// => res36: Long = 100
val df = neo.pattern("Person",Seq("KNOWS"),"Person").partitions(12).batch(100).loadDataFrame
// => org.apache.spark.sql.DataFrame = [id: bigint]
// TODO loadRelDataFrame
import org.neo4j.spark._
val neo = Neo4j(sc)
import org.apache.spark.graphx._
import org.apache.spark.graphx.lib._
// load graph via Cypher query
val graphQuery = "MATCH (n:Person)-[r:KNOWS]->(m:Person) RETURN id(n) as source, id(m) as target, type(r) as value SKIP $_skip LIMIT $_limit"
val graph: Graph[Long, String] = neo.rels(graphQuery).partitions(7).batch(200).loadGraph
graph.vertices.count
// => 100
graph.edges.count
// => 1000
// load graph via pattern
// note ("Person","id") refers to Person.id and ("Person",null) refers to id(Person) in cypher
val graph = neo.pattern(("Person","id"),("KNOWS","since"),("Person","id")).partitions(7).batch(200).loadGraph[Long,Long]
val graph2 = PageRank.run(graph, 5)
// => graph2: org.apache.spark.graphx.Graph[Double,Double] =
graph2.vertices.sort(_._2).take(3)
// => res46: Array[(org.apache.spark.graphx.VertexId, Long)]
// => Array((236746,100), (236745,99), (236744,98))
// uses pattern from above to save the data, merge parameter is false by default, only update existing nodes
neo.saveGraph(graph, "rank")
// uses pattern from parameter to save the data, merge = true also create new nodes and relationships
neo.saveGraph(graph, "rank",Pattern(("Person","id"),("FRIEND","years"),("Person","id")), merge = true)
import org.neo4j.spark._
val neo = Neo4j(sc)
import org.graphframes._
val graphFrame = neo.pattern(("Person","id"),("KNOWS",null), ("Person","id")).partitions(3).rows(1000).loadGraphFrame
graphFrame.vertices.count
// => 100
graphFrame.edges.count
// => 1000
val pageRankFrame = graphFrame.pageRank.maxIter(5).run()
val ranked = pageRankFrame.vertices
ranked.printSchema()
val top3 = ranked.orderBy(ranked.col("pagerank").desc).take(3)
// => top3: Array[org.apache.spark.sql.Row]
// => Array([236716,70,0.62285...], [236653,7,0.62285...], [236658,12,0.62285])
// example loading a graph frame with two dedicated Cypher statements
val nodesQuery = "match (n:Person) RETURN id(n) as id, n.name as value UNION ALL MATCH (n:Company) return id(n) as id, n.name as value"
val relsQuery = "match (p:Person)-[r]->(c:Company) return id(p) as src, id(c) as dst, type(r) as value"
val graphFrame = Neo4j(sc).nodes(nodesQuery,Map.empty).rels(relsQuery,Map.empty).loadGraphFrame
NOTE: The APIs below were the previous APIs which still work, but I recommend that you use and provide feedback on the new builder API above.
There are a few different RDD's all named Neo4jXxxRDD
Neo4jTupleRDD
returns a Seq[(String,AnyRef)] per rowNeo4jRowRDD
returns a spark-sql Row per row
The Neo4jDataFrame
is a SparkSQL DataFrame
that you construct either with explicit type information about result
names and types or inferred from the first result-row; it provides:
mergeEdgeList(sc: SparkContext, dataFrame: DataFrame, source: (label,Seq[prop]), relationship: (type,Seq[prop]), target: (label,Seq[prop]), partitions: Int = 1, unwindBatchSize: Int = 10000)
to merge a DataFrame back into a Neo4j graph- both nodes are merged by first property in sequence, all the others will be set on the entity
- relationships are merged between the two nodes and all properties included in the sequence will be set on the relationship
- property names from the sequence are used as column names for the data-frame, currently there is no name translation.
For example: with a DataFrame containing ascreen
column describing the on-screen relationship between two actors, then to get ascreen
property on the relationship we'd need to supply a relationship tuple of("ACTED_WITH",Seq("screen")
. If we had areal
column in the DataFrame representing a real-life relationship as well, then we would need to add that property into theSeq
too, e.g.("ACTED_WITH",Seq("screen", "real"))
and so on. - the result are sent in batches of 10000 to the graph
- the
partitions
parameter defines the number of partitions of the dataframe will be repartitioned into. It defines the level of the parallelism, so consider that it could lead to transaction locks if the same node gets managed concurrently; - the
unwindBatchSize
parameter defines for each partition the batch size sent to Neo4j. - optional
renamedColumns
parameter - can be used to create a relationship between nodes having the same properties. For example:(keanu:Person {name: 'Keanu'})-[:ACTED_WITH]->(Laurence:Person {name: "Laurence"})
requires a DataFrame with 2name
columns which is not possible. - optional
nodeOperation
which can have three values:merge
(the default) merges the nodes,create
creates the nodes,match
skip the creations To overcome this, one can create a DataFrame withsrc_node_name
anddst_node_name
and providerenamedColumns = Map("src_node_name" -> "name", "dst_node_name" -> "name")
createNodes(sc: SparkContext, dataFrame: DataFrame, nodes: (String,Seq[String]), partitions: Int = 1, unwindBatchSize: Int = 10000, merger: Boolean = false)
to create nodes in Neo4j graph.- nodes are created by first property in sequence, all the others will be set on the node
- the result are sent in batches of 10000 to the graph
- optional
renamedColums
parameter - can be used to create a node with a label different fromDataFrame
's column name. - the
partitions
parameter defines the number of partitions of the dataframe will be repartitioned into. It defines the level of the parallelism, so consider that it could lead to transaction locks if the same node gets managed concurrently; - the
unwindBatchSize
parameter defines for each partition the batch size sent to Neo4j - the
merge
parameter defines the ingestion strategy (MERGE/CREATE)
-
Neo4jGraph
has methods to load and save a GraphX graph -
Neo4jGraph.execute
runs a Cypher statement and returns aCypherResult
with thekeys
and anrows
Iterator ofArray[Any]
-
Neo4jGraph.loadGraph(sc, label,rel-types,label2)
loads a graph via the relationships between those labeled nodes -
Neo4jGraph.saveGraph(sc, graph, [nodeProp], [relTypeProp (type,prop)], [mainLabelId (label,prop)],[secondLabelId (label,prop)],merge=false)
saves a graph object to Neo4j by updating the given node- and relationship-properties -
Neo4jGraph.loadGraphFromNodePairs(sc,stmt,params)
loads a graph from pairs of node-id's -
Neo4jGraph.loadGraphFromRels(sc,stmt,params)
loads a graph from pairs of start- and end-node-id's and and additional value per relationship -
Neo4jGraph.loadGraph(sc, (stmt,params), (stmt,params))
loads a graph with two dedicated statements first for nodes, second for relationships
GraphFrames (Spark Packages) are a new Apache Spark API to process graph data.
It is similar and based on DataFrames, you can create GraphFrames from DataFrames and also from GraphX graphs.
Neo4jGraphFrame(sqlContext, (srcNodeLabel,nodeProp), (relType,relProp), dst:(dstNodeLabel,dstNodeProp)
loads a graph with the given source and destination nodes and the relationships in between, the relationship-property is optional and can be nullNeo4jGraphFrame.fromGraphX(sc,label,Seq(rel-type),label)
loads a graph with the given patternNeo4jGraphFrame.fromEdges(sqlContext, srcNodeLabel, Seq(relType), dstNodeLabel)
Download and install Apache Spark 2.1 from http://spark.apache.org/downloads.html
Download and install Neo4j 3.0.0 or later (e.g. from http://neo4j.com/download/)
For a simple dataset of connected people run the two following Cypher statements, that create 1M people and 1M relationships in about a minute.
FOREACH (x in range(1,1000000) | CREATE (:Person {name:"name"+x, age: x%100}));
UNWIND range(1,1000000) as x
MATCH (n),(m) WHERE id(n) = x AND id(m)=toInt(rand()*1000000)
CREATE (n)-[:KNOWS]->(m);
You can also provide the dependencies to spark-shell or spark-submit via --packages
and optionally --repositories
.
$SPARK_HOME/bin/spark-shell \
--conf spark.neo4j.password=<neo4j-password> \
--packages neo4j-contrib:neo4j-spark-connector:2.4.5-M2
$SPARK_HOME/bin/spark-shell --conf spark.neo4j.password=<neo4j-password> \
--packages neo4j-contrib:neo4j-spark-connector:2.4.5-M2
<!-- tag::example_rdd[] -->
import org.neo4j.spark._
import org.neo4j.spark.rdd._
Neo4jTupleRDD(sc,"MATCH (n) return id(n)",Seq.empty).count
// res46: Long = 1000000
Neo4jRowRDD(sc,"MATCH (n) where id(n) < $maxId return id(n)",Seq("maxId" -> 100000)).count
// res47: Long = 100000
<!-- end::example_rdd[] -->
$SPARK_HOME/bin/spark-shell --conf spark.neo4j.password=<neo4j-password> \
--packages neo4j-contrib:neo4j-spark-connector:2.4.5-M2
import org.neo4j.spark._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.neo4j.spark.dataframe._
val df = Neo4jDataFrame.withDataType(sqlContext, "MATCH (n) return id(n) as id",Seq.empty, "id" -> LongType)
// df: org.apache.spark.sql.DataFrame = [id: bigint]
df.count
// res0: Long = 1000000
val query = "MATCH (n:Person) return n.age as age"
val df = Neo4jDataFrame.withDataType(sqlContext,query, Seq.empty, "age" -> LongType)
// df: org.apache.spark.sql.DataFrame = [age: bigint]
df.agg(sum(df.col("age"))).collect()
// res31: Array[org.apache.spark.sql.Row] = Array([49500000])
query: String = "MATCH (n:Person) return n.age as age"
// val query = "MATCH (n:Person)-[:KNOWS]->(m:Person) where n.id = $x return m.age as age"
val query = "MATCH (n:Person) where n.id = $x return n.age as age"
val rdd = sc.makeRDD(1.to(1000000))
val ages = rdd.map( i => {
val df = Neo4jDataFrame.withDataType(sqlContext,query, Seq("x"->i.asInstanceOf[AnyRef]), "age" -> LongType)
df.agg(sum(df("age"))).first().getLong(0)
})
// TODO
val ages.reduce( _ + _ )
val df = Neo4jDataFrame(sqlContext, "MATCH (n) WHERE id(n) < $maxId return n.name as name",Seq("maxId" -> 100000),"name" -> "string")
df.count
// res0: Long = 100000
$SPARK_HOME/bin/spark-shell --conf spark.neo4j.password=<neo4j-password> \
--packages neo4j-contrib:neo4j-spark-connector:2.4.5-M2
import org.neo4j.spark._
val g = Neo4jGraph.loadGraph(sc, "Person", Seq("KNOWS"), "Person")
// g: org.apache.spark.graphx.Graph[Any,Int] = org.apache.spark.graphx.impl.GraphImpl@574985d8
g.vertices.count
// res0: Long = 999937
g.edges.count
// res1: Long = 999906
import org.apache.spark.graphx._
import org.apache.spark.graphx.lib._
val g2 = PageRank.run(g, 5)
val v = g2.vertices.take(5)
// v: Array[(org.apache.spark.graphx.VertexId, Double)] = Array((185012,0.15), (612052,1.0153273593749998), (354796,0.15), (182316,0.15), (199516,0.38587499999999997))
Neo4jGraph.saveGraph(sc, g2, "rank")
// res2: (Long, Long) = (999937,0)
// full syntax example
Neo4jGraph.saveGraph(sc, graph, "rank",("LIKES","score"),Some(("Person","name")),Some(("Movie","title")), merge=true)
GraphFrames are a new Apache Spark API to process graph data.
It is similar and based on DataFrames, you can create GraphFrames from DataFrames and also from GraphX graphs.
There was a recent release (0.5.0) of GraphFrames for Spark 2.1 and Scala 2.11 which we use. It is available on the Maven repository for Apache Spark Packages.
Resources:
-
API Docs // * Flights Example // * SparkSummit Video
$SPARK_HOME/bin/spark-shell --conf spark.neo4j.password=
--packages neo4j-contrib:neo4j-spark-connector:2.4.5-M2
<!-- tag::example_graphframes[] -->
import org.neo4j.spark._
import org.neo4j.spark.dataframe._
val gdf = Neo4jGraphFrame(sqlContext,"Person" -> "name",("KNOWS"),"Person" -> "name")
// gdf: org.graphframes.GraphFrame = GraphFrame(v:[id: bigint, prop: string],
// e:[src: bigint, dst: bigint, prop: string])
val gdf = Neo4jGraphFrame.fromGraphX(sc,"Person",Seq("KNOWS"),"Person")
gdf.vertices.count // res0: Long = 1000000
gdf.edges.count // res1: Long = 999999
val results = gdf.pageRank.resetProbability(0.15).maxIter(5).run
// results: org.graphframes.GraphFrame = GraphFrame(
// v:[id: bigint, prop: string, pagerank: double],
// e:[src: bigint, dst: bigint, prop: string, weight: double])
results.vertices.take(5)
// res3: Array[org.apache.spark.sql.Row] = Array([31,name32,0.96820096875], [231,name232,0.15],
// [431,name432,0.15], [631,name632,1.1248028437499997], [831,name832,0.15])
// pattern matching
val results = gdf.find("(A)-[]->(B)").select("A","B").take(3)
// results: Array[org.apache.spark.sql.Row] = Array([[159148,name159149],[31,name32]],
// [[461182,name461183],[631,name632]], [[296686,name296687],[1031,name1032]])
<!-- end::example_graphframes[] -->
gdf.find("(A)-[]->(B);(B)-[]->(C); !(A)-[]->(C)")
// res8: org.apache.spark.sql.DataFrame = [A: struct<id:bigint,prop:string>, B: struct<id:bigint,prop:string>, C: struct<id:bigint,prop:string>]
gdf.find("(A)-[]->(B);(B)-[]->(C); !(A)-[]->(C)").take(3)
// res9: Array[org.apache.spark.sql.Row] = Array([[904749,name904750],[702750,name702751],[122280,name122281]], [[240723,name240724],[813112,name813113],[205438,name205439]], [[589543,name589544],[600245,name600246],[659932,name659933]])
// doesn't work yet ... complains about different table widths
val results = gdf.find("(A)-[]->(B); (B)-[]->(C); !(A)-[]->(C)").filter("A.id != C.id")
// Select recommendations for A to follow C
val results = results.select("A", "C").take(3)
gdf.labelPropagation.maxIter(3).run().take(3)
The project uses the java driver for Neo4j's Bolt protocol.
We use its org.neo4j.driver:neo4j-java-driver:1.7.5
version.
Testing is done using neo4j-harness
, a test library for starting an in-process Neo4j-Server which you can use either with a JUnit @Rule
or directly.
I only start one server and one SparkContext per test-class to avoid the lifecycle overhead.
Please note that Neo4j running an in-process server pulls in Scala 2.11 for Cypher, so you need to run the tests with scala_2.11. That's why I had to add two profiles for the different Scala versions.