This project provides an Apache Spark connector for Dgraph databases in Scala and Python. It comes with a Spark Data Source to read graphs from a Dgraph cluster directly into DataFrames, GraphX or GraphFrames. The connector supports filter pushdown, projection pushdown and partitioning by orthogonal dimensions predicates and nodes.
Example Scala code:
import org.apache.spark.sql.DataFrame
val target = "localhost:9080"
import org.apache.spark.graphx._
import uk.co.gresearch.spark.dgraph.graphx._
val graph: Graph[VertexProperty, EdgeProperty] = spark.read.dgraph.graphx(target)
val edges: RDD[Edge[EdgeProperty]] = spark.read.dgraph.edges(target)
val vertices: RDD[(VertexId, VertexProperty)] = spark.read.dgraph.vertices(target)
import org.graphframes.GraphFrame
import uk.co.gresearch.spark.dgraph.graphframes._
val graph: GraphFrame = spark.read.dgraph.graphframes(target)
val edges: DataFrame = spark.read.dgraph.edges(target)
val vertices: DataFrame = spark.read.dgraph.vertices(target)
import org.apache.spark.sql.DataFrame
import uk.co.gresearch.spark.dgraph.connector._
val triples: DataFrame = spark.read.dgraph.triples(target)
val edges: DataFrame = spark.read.dgraph.edges(target)
val nodes: DataFrame = spark.read.dgraph.nodes(target)
Example Python code (pyspark ≥3.0, see PySpark Shell and Python script):
from pyspark.sql import DataFrame
from gresearch.spark.dgraph.connector import *
triples: DataFrame = spark.read.dgraph.triples("localhost:9080")
edges: DataFrame = spark.read.dgraph.edges("localhost:9080")
nodes: DataFrame = spark.read.dgraph.nodes("localhost:9080")
The connector provides the following features:
- Scala, Java, and Python API: Supports Spark's Scala, Java, and Python Dataset API.
- Spark Graph API support: Supports Spark Graph APIs GraphX and GraphFrames.
- Various graph representations: Graph data are available as Spark Datasets in various formats: Triples, Nodes and Edge Datasets, fully typed, wide or long format.
- Filter Pushdown: Many filters applied to the Graph Datasets will be pushed down to Dgraph so that only the relevant sub-graph is read from the cluster.
- Projection Pushdown: Only actually used columns of the Graph Datasets will be read from the Dgraph cluster.
- Multi-language strings: Strings values of the same predicate can be stored in multiple languages.
- Spark metrics: The connector collects Spark metrics per partition providing insights in throughout and timing of the communication to the Dgraph cluster.
- Graph Partitioning and Streaming: Graph data are partitioned and streamed in small chunks from Dgraph into Spark. This guarantees that graphs of any size can be read into Spark.
The connector has the following known limitations:
- Read-only: The connector does not support mutating the graph (issue #8).
- Namespaces: The connector can only read the default namespace (issue #148).
- Authorization: No authorization against Dgraph supported (issue #149).
- Limited Lifetime of Transactions: The connector optionally reads all partitions within the same transaction, but concurrent mutations reduce the lifetime of that transaction.
- Language tags: The node source in wide mode cannot read string values with language tags. All other sources and modes can read language strings.
- Filtering on language string: The connector does not support filtering predicates with Dgraph
@lang
directives. - Facets: The connector cannot read facets.
The Spark Dgraph Connector is available for Spark 3.0, 3.1, 3.2, 3.3, 3.4 and 3.5, with Scala 2.12 and 2.13.
Use Maven artifact ID spark-dgraph-connector_2.12
or spark-dgraph-connector_2.12
. The Spark version is part of the package version,
i.e. 0.12.0-3.0, 0.12.0-3.1, 0.12.0-3.2, 0.12.0-3.3, 0.12.0-3.4 and 0.12.0-3.5, respectively.
Add this line to your build.sbt
file to use the latest version for Spark 3.5:
libraryDependencies += "uk.co.gresearch.spark" %% "spark-dgraph-connector" % "0.12.0-3.5"
Add this dependency to your pom.xml
file to use the latest version:
<dependency>
<groupId>uk.co.gresearch.spark</groupId>
<artifactId>spark-dgraph-connector_2.13</artifactId>
<version>0.12.0-3.5</version>
</dependency>
Launch the Scala Spark REPL (Spark ≥3.0.0) with the Spark Dgraph Connector dependency (version ≥0.5.0) as follows:
spark-shell --packages uk.co.gresearch.spark:spark-dgraph-connector_2.12:0.12.0-3.5
Launch the Python Spark REPL (pyspark ≥3.0.0) with the Spark Dgraph Connector dependency (version ≥0.5.0) as follows:
pyspark --packages uk.co.gresearch.spark:spark-dgraph-connector_2.12:0.12.0-3.5
Run your Python scripts that use PySpark (pyspark ≥3.0.0) and the Spark Dgraph Connector (version ≥0.5.0) via spark-submit
:
spark-submit --packages uk.co.gresearch.spark:spark-dgraph-connector_2.12:0.12.0-3.5 [script.py]
The following examples use a local Dgraph (≥20.03.3) instance setup as described in the
Dgraph Quickstart Guide.
Run Step 1 to start an instance,
a DROP_ALL
for Dgraph ≥20.07.0 only,
Step 2 to load example graph data, and
Step 3 to add a schema. These steps are
provided in the following scripts:
./dgraph-instance.start.sh
./dgraph-instance.drop-all.sh # for Dgraph ≥20.07.0 only
./dgraph-instance.schema.sh
./dgraph-instance.insert.sh
The Dgraph version can optionally be set via DGRAPH_TEST_CLUSTER_VERSION
environment variable.
The connection to Dgraph can be established via a target
, which is the hostname and gRPC port of a
Dgraph Alpha node in the form <hostname>:<port>
.
With our example instance started above, we can use localhost:9080
as the target.
The Dgraph UI Ratel can be used to query your local Dgraph instance. Open a browser and got to https://play.dgraph.io/?latest. Connect to your local Dgraph via http://localhost:8080.
You can load the entire Dgraph database into an Apache Spark GraphX graph. For example:
import uk.co.gresearch.spark.dgraph.graphx._
val graph = spark.read.dgraph.graphx("localhost:9080")
Example code to perform a PageRank computation on this graph to test that the connector is working:
val pageRank = graph.pageRank(0.0001)
pageRank.vertices.foreach(println)
You can load the entire Dgraph database into a GraphFrames graph. For example:
import uk.co.gresearch.spark.dgraph.graphframes._
val graph: GraphFrame = spark.read.dgraph.graphframes("localhost:9080")
Example code to perform a PageRank computation on this graph to test that the connector is working:
val pageRank = graph.pageRank.maxIter(10)
pageRank.run().triplets.show(false)
Note: Predicates get renamed when they are loaded from the Dgraph database. Any .
(dot) in the name
is replaced by a _
(underscore). To guarantee uniqueness of names, underscores in the original predicate
names are replaced by two underscores. For instance, predicates dgraph.type
and release_date
become dgraph_type
and release__date
, respectively.
Dgraph data can be loaded into Spark DataFrames in various forms:
- Triples
- fully typed values
- string values
- Nodes
- fully typed properties
- wide schema
- Edges
You can load the entire Dgraph database as triples into an Apache Spark DataFrame. For example:
import uk.co.gresearch.spark.dgraph.connector._
val triples = spark.read.dgraph.triples("localhost:9080")
The returned DataFrame
has the following schema:
root
|-- subject: long (nullable = false)
|-- predicate: string (nullable = true)
|-- objectUid: long (nullable = true)
|-- objectString: string (nullable = true)
|-- objectLong: long (nullable = true)
|-- objectDouble: double (nullable = true)
|-- objectTimestamp: timestamp (nullable = true)
|-- objectBoolean: boolean (nullable = true)
|-- objectGeo: string (nullable = true)
|-- objectPassword: string (nullable = true)
|-- objectType: string (nullable = true)
The object value gets stored in exactly one of the object*
(except objectType
) columns, depending on the type of the value.
The objectType
column provides the type of the object. Here is an example:
subject | predicate | objectString | objectLong | objectDouble | objectTimestamp | objectBoolean | objectGeo | objectPassword | objectType |
---|---|---|---|---|---|---|---|---|---|
1 | dgraph.type | Person | null | null | null | null | null | null | string |
1 | name | Luke Skywalker | null | null | null | null | null | null | string |
2 | dgraph.type | Person | null | null | null | null | null | null | string |
2 | name | Princess Leia | null | null | null | null | null | null | string |
3 | dgraph.type | Film | null | null | null | null | null | null | string |
3 | name | Star Wars: Episode IV - A New Hope | null | null | null | null | null | null | string |
3 | release_date | null | null | null | 1977-05-25 00:00:00 | null | null | null | timestamp |
3 | revenue | null | null | 7.75E8 | null | null | null | null | double |
3 | running_time | null | 121 | null | null | null | null | null | long |
This model allows you to store the fully-typed triples in a DataFrame
.
The triples can also be loaded in an un-typed, narrow form:
import uk.co.gresearch.spark.dgraph.connector._
spark
.read
.option(TriplesModeOption, TriplesModeStringOption)
.dgraph.triples("localhost:9080")
.show
The resulting DataFrame
has the following schema:
root
|-- subject: long (nullable = false)
|-- predicate: string (nullable = true)
|-- objectString: string (nullable = true)
|-- objectType: string (nullable = true)
The object value gets stored as a string in objectString
, and objectType
provides you
with the actual type of the object. Here is an example:
subject | predicate | objectString | objectType |
---|---|---|---|
1 | dgraph.type | Person | string |
1 | name | Luke Skywalker | string |
2 | dgraph.type | Person | string |
2 | name | Princess Leia | string |
3 | dgraph.type | Film | string |
3 | revenue | 7.75E8 | double |
3 | running_time | 121 | long |
3 | starring | 1 | uid |
3 | starring | 2 | uid |
3 | starring | 6 | uid |
3 | director | 7 | uid |
3 | name | Star Wars: Episode IV - A New Hope | string |
3 | release_date | 1977-05-25 00:00:00.0 | timestamp |
You can load all nodes into a DataFrame
in a fully-typed form. This contains all the nodes' properties but no edges to other nodes:
import uk.co.gresearch.spark.dgraph.connector._
spark.read.dgraph.nodes("localhost:9080")
The returned DataFrame
has the following schema:
root
|-- subject: long (nullable = false)
|-- predicate: string (nullable = true)
|-- objectString: string (nullable = true)
|-- objectLong: long (nullable = true)
|-- objectDouble: double (nullable = true)
|-- objectTimestamp: timestamp (nullable = true)
|-- objectBoolean: boolean (nullable = true)
|-- objectGeo: string (nullable = true)
|-- objectPassword: string (nullable = true)
|-- objectType: string (nullable = true)
The schema of the returned DataFrame
is very similar to the typed triples schema, except that there is no objectUid
column linking to other nodes. Here is an example:
subject | predicate | objectString | objectLong | objectDouble | objectTimestamp | objectBoolean | objectGeo | objectPassword | objectType |
---|---|---|---|---|---|---|---|---|---|
1 | dgraph.type | Person | null | null | null | null | null | null | string |
1 | name | Luke Skywalker | null | null | null | null | null | null | string |
2 | dgraph.type | Person | null | null | null | null | null | null | string |
2 | name | Princess Leia | null | null | null | null | null | null | string |
3 | dgraph.type | Film | null | null | null | null | null | null | string |
3 | revenue | null | null | 7.75E8 | null | null | null | null | double |
3 | running_time | null | 121 | null | null | null | null | null | long |
3 | name | Star Wars: Episode IV - A New Hope | null | null | null | null | null | null | string |
3 | release_date | null | null | null | 1977-05-25 00:00:00 | null | null | null | timestamp |
Nodes can also be loaded in a wide, fully-typed format:
import uk.co.gresearch.spark.dgraph.connector._
spark
.read
.option(NodesModeOption, NodesModeWideOption)
.dgraph.nodes("localhost:9080")
The returned DataFrame
has the following schema format, which is dependent on the schema of the underlying Dgraph database.
Node properties are stored in typed columns and are ordered alphabetically (property columns start after the subject
column):
root
|-- subject: long (nullable = false)
|-- dgraph.graphql.schema: string (nullable = true)
|-- dgraph.type: string (nullable = true)
|-- name: string (nullable = true)
|-- release_date: timestamp (nullable = true)
|-- revenue: double (nullable = true)
|-- running_time: long (nullable = true)
Note: The graph schema could become very large and therefore the DataFrame
could become prohibitively wide.
subject | dgraph.graphql.schema | dgraph.type | name | release_date | revenue | running_time |
---|---|---|---|---|---|---|
1 | null | Person | Luke Skywalker | null | null | null |
2 | null | Person | Princess Leia | null | null | null |
3 | null | Film | Star Wars: Episode IV - A New Hope | 1977-05-25 00:00:00 | 7.75E8 | 121 |
4 | null | Film | Star Wars: Episode VI - Return of the Jedi | 1983-05-25 00:00:00 | 5.72E8 | 131 |
5 | null | Film | Star Trek: The Motion Picture | 1979-12-07 00:00:00 | 1.39E8 | 132 |
6 | null | Person | Han Solo | null | null | null |
7 | null | Person | George Lucas | null | null | null |
8 | null | Person | Irvin Kernshner | null | null | null |
9 | null | Person | Richard Marquand | null | null | null |
10 | null | Film | Star Wars: Episode V - The Empire Strikes Back | 1980-05-21 00:00:00 | 5.34E8 | 124 |
Note: The Wide Nodes source enforces the predicate partitioner to produce a single partition.
Edges can be loaded as follows:
import uk.co.gresearch.spark.dgraph.connector._
spark.read.dgraph.edges("localhost:9080")
The returned DataFrame
has the following simple schema:
root
|-- subject: long (nullable = false)
|-- predicate: string (nullable = true)
|-- objectUid: long (nullable = false)
Though there is only a single object
column for the destination node, it is called objectUid
to align with the DataFrame
schemata above.
subject | predicate | objectUid |
---|---|---|
3 | starring | 1 |
3 | starring | 2 |
3 | starring | 6 |
3 | director | 7 |
4 | starring | 1 |
4 | starring | 2 |
4 | starring | 6 |
4 | director | 9 |
10 | starring | 1 |
10 | starring | 2 |
10 | starring | 6 |
10 | director | 8 |
Predicates marked in the Dgraph schema with the @lang
directive can store string values in
multiple languages at a time:
{
set {
_:sw3 <title> "Star Wars: Episode VI - Return of the Jedi" .
_:sw3 <title@en> "Star Wars: Episode VI - Return of the Jedi" .
_:sw3 <title@zh> "星際大戰六部曲:絕地大反攻" .
_:sw3 <title@th> "สตาร์ วอร์ส เอพพิโซด 6: การกลับมาของเจได" .
_:sw3 <title@de> "Die Rückkehr der Jedi-Ritter" .
}
}
The connector reads all these languages. Each of the predicate name contains the language
in the form predicate@language
:
subject | predicate | objectString | objectType |
---|---|---|---|
6 | dgraph.type | Film | string |
6 | title | Star Wars: Episode VI - Return of the Jedi | string |
6 | title@en | Star Wars: Episode VI - Return of the Jedi | string |
6 | title@zh | 星際大戰六部曲:絕地大反攻 | string |
6 | title@th | สตาร์ วอร์ส เอพพิโซด 6: การกลับมาของเจได | string |
6 | title@de | Die Rückkehr der Jedi-Ritter | string |
Dgraph isolates reads from writes through transactions. Since the connector initiates multiple reads while fetching the entire graph (partitioning), writes called mutations should be isolated in order to get a consistent snapshot of the graph.
Setting the dgraph.transaction.mode
option to "read"
will cause the connector to read all partitions
within the same transaction. However, this will cause an exception on the Dgraph cluster when too many
mutations occur while reading partitions. With that option set to "none"
, no such exception will
occur but reads are not isolated from writes.
The connector supports filter pushdown to improve efficiency when reading only sub-graphs. This is supported only in conjunction with the predicate partitioner. Spark filters can only be pushed for some column and data source because columns may have different meaning. Columns can be of the following types:
Column Type | Description | Type | Columns | Sources |
---|---|---|---|---|
subject column | the subject of the row | long |
subject |
all DataFrame sources |
predicate column | the predicate of the row | string |
predicate |
all but Wide Nodes source |
predicate value column | the value of a specific predicate, column name is predicate name | any | one column for each predicate in the schema, e.g. dgraph.type |
Wide Nodes source |
object value columns | object value of the row | long string long double timestamp boolean geo password |
objectUid objectString objectLong objectDouble objectTimestamp objectBoolean objectGeo objectPassword |
all but Wide Nodes source the String Triples source has only objectString the Typed Nodes source lacks the objectUid the Edges source has only objectUid |
object type column | the type of the object | string |
objectType |
String Triples source, Typed Triples source and Typed Nodes source |
The following table lists all supported Spark filters:
Spark Filter | Supported Columns | Example |
---|---|---|
EqualTo |
|
|
In |
|
|
IsNotNull |
|
|
The connector supports projection pushdown to improve efficiency when reading only sub-graphs.
A projection in Spark terms is a select
operation that selects only a subset of a DataFrame's columns.
The Wide Nodes source supports projection pushdown on all predicate value columns.
The following query uses filter and projection pushdown. First we define a wide node DataFrame
:
val df =
spark.read
.options(Map(
NodesModeOption -> NodesModeWideOption,
PartitionerOption -> PredicatePartitionerOption
))
.dgraph.nodes("localhost:9080")
Then we select some columns (projection) and rows (filter):
df
.select($"subject", $"`dgraph.type`", $"revenue") // projection
.where($"revenue".isNotNull) // filter
.show()
This selects the columns subject
, dgraph.type
and revenue
for only those rows that actually have a value for revenue
.
The underlying query to Dgraph simplifies from (the full graph):
{
pred1 as var(func: has(<dgraph.graphql.schema>))
pred2 as var(func: has(<dgraph.graphql.xid>))
pred3 as var(func: has(<dgraph.type>))
pred4 as var(func: has(<name>))
pred5 as var(func: has(<release_date>))
pred6 as var(func: has(<revenue>))
pred7 as var(func: has(<running_time>))
result (func: uid(pred1,pred2,pred3,pred4,pred5,pred6,pred7)) {
uid
<dgraph.graphql.schema>
<dgraph.graphql.xid>
<dgraph.type>
<name>
<release_date>
<revenue>
<running_time>
}
}
to (selected predicates and nodes only):
{
pred1 as var(func: has(<revenue>))
result (func: uid(pred1)) {
uid
<dgraph.type>
<revenue>
}
}
The response is faster as only relevant data are transferred between Dgraph and Spark.
subject | dgraph.type | revenue |
---|---|---|
4 | Film | 7.75E8 |
5 | Film | 5.34E8 |
6 | Film | 5.72E8 |
9 | Film | 1.39E8 |
The connector (Spark ≥3.0 only) collects metrics per partition that provide insights in throughout and timing of the communication to the Dgraph cluster. For each request to Dgraph (a chunk), the number of received bytes, uids and retrieval time are recorded and summed per partition. The values can be seen on the Spark UI for the respective stages that performs the read:
The connector uses Spark Accumulators
to collect these metrics. They can be accessed by the Spark driver via a SparkListener
:
val handler = new SparkListener {
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit =
stageCompleted.stageInfo.accumulables.values.foreach(println)
}
spark.sparkContext.addSparkListener(handler)
spark.read.dgraph.triples("localhost:9080").count()
The following metrics are available:
Metric | Description |
---|---|
Dgraph Bytes |
Size of JSON responses from the Dgraph cluster in Byte. |
Dgraph Chunks |
Number of requests sent to the Dgraph cluster. |
Dgraph Time |
Time waited for Dgraph to respond in Seconds. |
Dgraph Uids |
Number of Uids read. |
Partitioning your Dgraph graph is essential to be able to load large quantities of graph data into Spark. Spark splits data into partitions, where ideally all partitions have the same size and are of decent size. Partitions that are too large will kill your Spark executor as they won't fit into memory. When partitions are too small your Spark job becomes inefficient and slow, but will not fail.
Each partition connects to the Dgraph cluster and reads a specific sub-graph. Partitions are non-overlapping.
This connector provides various ways to partition your graph. When the default partitioning does not work for your specific use case, try a more appropriate partitioning scheme.
The following Partitioner
implementations are available:
Partitioner | partition by | Description | Use Case |
---|---|---|---|
Singleton | nothing | Provides a single partition for the entire graph. | Unit Tests and small graphs that fit into a single partition. Can be used for large graphs if combined with a "by uid" partitioner. |
Predicate | predicate | Provides multiple partitions with at most P predicates per partition where P defaults to 1000 . Picks multiple predicates from the same Dgraph group. |
Large graphs where each predicate fits into a partition, otherwise combine with Uid Range partitioner. Skewness of predicates reflects skewness of partitions. |
Uid Range | uids | Each partition has at most N uids where N defaults to 1000000 . |
Large graphs where single uid s fit into a partition. Can be combined with any predicate partitioner, otherwise induces internal Dgraph cluster communication across groups. |
Predicate + Uid Range (default) | predicates + uids | Partitions by predicate first (see Predicate Partitioner), then each partition gets partitioned by uid (see Uid Partitioner) | Graphs of any size. |
The Dgraph data can be partitioned by predicates. Each partition then contains a distinct set of predicates.
The number of predicates per partition can be configured via dgraph.partitioner.predicate.predicatesPerPartition
,
which defaults to 1000
.
Predicate partitions connect only to alpha nodes that contain those predicates. Hence, these reads are all locally to the alpha nodes and induce no Dgraph cluster internal communication.
A uid
represents a node or vertex in Dgraph terminology. An "Uid Range" partitioning splits
the graph by the subject of the graph triples. This can be combined with predicate partitioning,
which serves as an orthogonal partitioning. Without predicate partitioning, uid
partitioning
induces internal Dgraph cluster communication across the groups.
The uid partitioning always works on top of a predicate partitioner. If none is defined a singleton partitioner is used. The number of uids of each underlying partition has to be estimated. Once the number of uids is estimated, the partition is further split into ranges of that uid space.
The space of existing uids
is split into ranges of N
uids
per partition. The N
defaults to 1000000
and can be configured via dgraph.partitioner.uidRange.uidsPerPartition
. The uid
s are allocated to partitions in ascending order.
Such a split will not be done if more than dgraph.partitioner.uidRange.maxPartitions
partitions would be created. This defaults to 10000
.
If vertex size is skewed and a function of uid
, then partitions will be skewed as well.
Note: With uid partitioning, the chunk size configured via dgraph.chunkSize
should be at least a 10th of
the number of uids per partition configured via dgraph.partitioner.uidRange.uidsPerPartition
to avoid
inefficiency due to chunks overlapping with partition borders. When your result is sparse w.r.t. the uid space
set the chunk size to 100th or less.
The connector reads each partition from Dgraph in a streamed fashion. It splits up a partition into smaller chunks,
where each chunk contains 100000
uids. This chunk size can be configured via dgraph.chunkSize
.
Each chunk sends a single query to Dgraph. The chunk size limits the size of the result.
Due to the low memory footprint of the connector, Spark could read your entire graph via a single partition
(you would have to repartition
the read DataFrame to make Spark shuffle the data properly).
However, this would be slow, but it proves the connector can handle any size of graph with fixed executor memory requirement.
The connector uses Spark's Log4j standard logging framework. Add the following line to your log4j.properties
to set
the log level of the connector specifically:
log4j.logger.uk.co.gresearch.spark.dgraph.connector=DEBUG
See SPARK_HOME/conf/log4j.properties.template
for a template file.
Some unit tests require a Dgraph (≥20.03.3) cluster running at localhost:9080
. It has to be set up as
described in the Examples section. If that cluster is not running, the unit tests will
launch and set up such a cluster for you. This requires docker
to be installed on your machine
and will make the tests take longer. If you run those tests frequently it is recommended you run
the cluster setup yourself.
You can set the Dgraph version that is started automatically
by setting environment variable DGRAPH_TEST_CLUSTER_VERSION
.
The default version is defined in uk.co.gresearch.spark.dgraph.DgraphTestCluster.DgraphDefaultVersion
.
The Python code can be tested with pytest
:
PYTHONPATH="python:python/test" python -m pytest python/test