You'll need:
The following drive decision-making with respect to scope of neo4j-arrow
:
- Performance -- consumers or producers of data should be able to read or update the graph as fast as the network can transmit bytes
- Simplicity -- how easily can a Data Scientist/Engineer get the data they need from the graph?
- Interoperability -- can the data be easily used/stored/retransmitted by the tools Data Scientists/Engineers already use
The code is broken up into multiple projects.
- The root project contains the core Arrow Flight implementation
- This foundational code should be effectively "neo4j free", i.e. not require anything to do with Neo4j dependencies
- The core interfaces you should study are contained here!
- The subprojects each provide something specific:
This implementation builds atop the Apache Arrow Flight framework to provide high performance, remote, stream-based access to Neo4j graphs. Knowledge of Flight is beneficial.
On top of the core Flight concepts of FlightServer
s and FlightProducer
s,
the neo4j-arrow
project adds the concept of Job
s and ActionHandler
s
along with RowBasedRecord
s.
A quick cheat-sheet:
- The
Producer
brokersActions
toActionHandlers
and requests to consume streams of data- If you want to customize the core streaming logic, look here.
ActionHandlers
interpret RPC call, turning theirAction
bodies/payloads intoMessages
and use them to createJobs
.- If you want to create new
Actions
, build anActionHandler
and a new type ofJob
. Up to you how you deal with messages/payloads.
- If you want to create new
Jobs
unofficially
There's no formal interface or abstract class for
Messages
at the moment, ...consider them POJOs?
Jobs encapsulate the lifecycle of getting data in/out of the backend, which in this case is Neo4j. Jobs have a very simplified state machine at their core:
INITIALIZING
: the Job is starting, state is in fluxPENDING
: the Job is submitted and pending resultsPRODUCING
: the Job is producing results, i.e. the first record has been receivedCOMPLETE
: the Job is no longer producing resultsERROR
: something bad happened:-(
Jobs provide an asynchronous way for clients to request the creation of a stream, for example via a Cypher-based transaction, and a way to reference the stream in a future client request.
Each Job that produces a stream gets "ticketed." Using an Arrow Flight Ticket provides a way to uniquely reference the job (and stream). Currently, this is handled via just a UUID that gets serialized and passed back to the client.
Clients built with the open-source Apache Arrow and Arrow Flight projects
utilize the Arrow Flight RPC framework, specifically the do_action
or
similar methods. This means any Arrow client (well, at least v5.0.0) in any
language can interact with neo4j-arrow
!
Clients configure Jobs via Arrow Flight Actions, which contain an Action Type (just a string) and an arbitrary "body" payload. Different Jobs expect different messages/payloads to parameterize inputs for the Job.
A Cypher Job encapsulates running a "traditional" Neo4j Cypher-based
transaction against the database. Since it's Cypher-based, the actual logic
is pretty similar between Java Driver based versions (see AsyncDriverJob
in the server project) and the Transaction API based version
(see Neo4jTransactionApiJob
in the plugin project).
Cypher jobs use the CypherMessage
(defined in the common
project) to communicate:
- the Cypher to execute
- the database (by name) to execute against
- any parameters for the Cypher
It currently uses the following serialized message format (which should probably be simplified to just JSON):
[start - end byte] [description]
----------------------------------------------------
[0 - 3 ] length of cypher UTF-8 string (C)
[4 - C ] cypher UTF-8 string
[C - C+4 ] length of UTF-8 database name (D)
[C+4 - C+D+8 ] database name UTF-8 string
[C+D+8 - C+D+12 ] length of params (P) as UTF-8 JSON
[C+D+12 - C+D+P+12] params serialized as UTF-8 JSON
GDS Jobs provide direct read (& soon write) access to in-memory Graph projections.
It currently uses the following message format in a simple serialized JSON format (in utf-8):
{
"db": "<the database name>",
"graph": "<name of the in-memory graph>",
"type": "<type of job: 'node' or 'relationships' or 'khop'>",
"filters": ["<list of label or relationship filters>"],
"parameters": ["<list of node/relationship parameters>"]
}
Action Handlers define new Arrow Flight RPC action types available, provide descriptions of the action offered, and perform any servicing of the actions when called by clients.
The way neo4j-arrow
uses actions is primarily around Job control:
submitting new stream-producing jobs and checking on their status.
Any new action handler gets registered with the App (or Producer).
This is part of the "secret sauce" to adapting Arrow to Neo4j. For now, the short description is this is where any mapping of native "types" (from Neo4j Driver Records, GDS values, etc.) to a generalized type occurs.
Other than the core Producer
logic dealing with putting bytes on the wire,
this is one of the hotter code paths ripe for optimization. The Value
interface is designed to help translate the raw underlying scalar or array
value (no map support yet) into the appropriate Arrow FieldVector
.
Some high level TODOs not in code comments:
- General Stuff
- Figure out how to properly interrupt/kill streams on exceptions
- Write support
- Basic GDS writing except GDS rel properties
-
Dockerize the standalone server app
- GDS Native Support
- Multiple node property support for GDS Jobs
- Relationship properties!
- Property filters (label-based, rel-type based)
- node labels
- [?] rel types
- Pivot away from RPC actions and just expose Graphs as discoverable flights?
last updated 11 Nov 2021 via todo.sh
- TODO: make an auth handler that isn't this silly HorribleBasicAuthValidator.java
- TODO: abort WorkBuffer.java
- TODO: should we allocate a single byte array and not have to reallocate? WorkBuffer.java
- TODO: should we allocate a single byte array and not have to reallocate? WorkBuffer.java
- TODO: check isReady(), yield if not Producer.javalistener.putNext();
- TODO: validate root.Schema Producer.java
- TODO!!! Producer.javajob.onComplete(arrowBatches);
- TODO: we need to wait until the post-processing completes, need a callback here Producer.java
- TODO: validate schema option? ArrowBatch.java
- XXX TODO ArrowBatch.javareturn(int)rowCount;
- TODO handle writejob close??? WriteJob.java
- TODO: standardize on matching logic? case sensitive/insensitive? StatusHandler.java
- TODO: use all 4 bits Edge.java
- TODO: new Exception class CypherRecord.java
- TODO: clean this Double to Float mess :-( CypherRecord.java
- TODO: object copy might be slow, check on this CypherRecord.java
- TODO: ShortArray CypherRecord.java}
- TODO: error handling for graph store retrieval GdsReadJob.java
- TODO: support both rel type and node label filtering GdsReadJob.java
- TODO: nested for-loop is ugly GdsReadJob.java
- TODO: GDS lets us batch access to lists of nodes...future opportunity? GdsReadJob.java
- TODO: should it be nodeCount - 1? We advanced the iterator...maybe? GdsReadJob.java
- TODO: clean up GdsWriteJob.java
- TODO: add in filtered label support GdsWriteJob.java
- TODO: implement cloning for id maps GdsWriteJob.java
- TODO: what the heck is this Cases stuff?! GdsWriteJob.java
- TODO: relationship properties! GdsWriteJob.java
- TODO: XXX when we implement rel props, do NOT close this here! GdsWriteJob.java
- TODO: wire in maps GdsWriteJob.java
- TODO: pull in reference to LoginContext and use it in the Transaction TransactionApiJob.java
- TODO: for now wait up to a minute for the first result...needs future work TransactionApiJob.java
- TODO: wrap function should take our assumed schema to optimize TransactionApiJob.java
- TODO: INT? Does it exist? GdsNodeRecord.java
- TODO: INT_ARRAY? GdsNodeRecord.java
- TODO: String? Object? What should we do? GdsNodeRecord.java
- TODO: rename property keys to read/write forms GdsActionHandler.java
- TODO: fallback to raw bytes? GdsActionHandler.java
- TODO: "type" is pretty vague...needs a better name GdsMessage.java
- TODO: validation / constraints of values? GdsMessage.java
- TODO: assert our minimum schema? GdsMessage.java
- TODO: assert our minimum schema? GdsWriteRelsMessage.java
- TODO: "type" is pretty vague...needs a better name GdsWriteNodeMessage.java
- TODO: assert our minimum schema? GdsWriteNodeMessage.java
- TODO: better mapping support for generic Cypher values? CypherActionHandler.java
- TODO: fallback to raw bytes? CypherActionHandler.java