Plug-and-play implementation of an Apache Spark custom data source for AWS DynamoDB.
We published a small article about the project, check it out here: https://www.audienceproject.com/blog/tech/sparkdynamodb-using-aws-dynamodb-data-source-apache-spark/
- 2021-01-28: Added option inferSchema=falsewhich is useful when writing to a table with many columns
- 2020-07-23: Releasing version 1.1.0 which supports Spark 3.0.0 and Scala 2.12. Future releases will no longer be compatible with Scala 2.11 and Spark 2.x.x.
- 2020-04-28: Releasing version 1.0.4. Includes support for assuming AWS roles through custom STS endpoint (credits @jhulten).
- 2020-04-09: We are releasing version 1.0.3 of the Spark+DynamoDB connector. Added option to deleterecords (thank you @rhelmstetter). Fixes (thank you @juanyunism for #46).
- 2019-11-25: We are releasing version 1.0.0 of the Spark+DynamoDB connector, which is based on the Spark Data Source V2 API. Out-of-the-box throughput calculations, parallelism and partition planning should now be more reliable. We have also pulled out the external dependency on Guava, which was causing a lot of compatibility issues.
- Distributed, parallel scan with lazy evaluation
- Throughput control by rate limiting on target fraction of provisioned table/index capacity
- Schema discovery to suit your needs
- Dynamic inference
- Static analysis of case class
 
- Column and filter pushdown
- Global secondary index support
- Write support
The library is available from Maven Central. Add the dependency in SBT as "com.audienceproject" %% "spark-dynamodb" % "latest"
Spark is used in the library as a "provided" dependency, which means Spark has to be installed separately on the container where the application is running, such as is the case on AWS EMR.
import com.audienceproject.spark.dynamodb.implicits._
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().getOrCreate()
// Load a DataFrame from a Dynamo table. Only incurs the cost of a single scan for schema inference.
val dynamoDf = spark.read.dynamodb("SomeTableName") // <-- DataFrame of Row objects with inferred schema.
// Scan the table for the first 100 items (the order is arbitrary) and print them.
dynamoDf.show(100)
// write to some other table overwriting existing item with same keys
dynamoDf.write.dynamodb("SomeOtherTable")
// Case class representing the items in our table.
import com.audienceproject.spark.dynamodb.attribute
case class Vegetable (name: String, color: String, @attribute("weight_kg") weightKg: Double)
// Load a Dataset[Vegetable]. Notice the @attribute annotation on the case class - we imagine the weight attribute is named with an underscore in DynamoDB.
import org.apache.spark.sql.functions._
import spark.implicits._
val vegetableDs = spark.read.dynamodbAs[Vegetable]("VegeTable")
val avgWeightByColor = vegetableDs.agg($"color", avg($"weightKg")) // The column is called 'weightKg' in the Dataset.# Load a DataFrame from a Dynamo table. Only incurs the cost of a single scan for schema inference.
dynamoDf = spark.read.option("tableName", "SomeTableName") \
                     .format("dynamodb") \
                     .load() # <-- DataFrame of Row objects with inferred schema.
# Scan the table for the first 100 items (the order is arbitrary) and print them.
dynamoDf.show(100)
# write to some other table overwriting existing item with same keys
dynamoDf.write.option("tableName", "SomeOtherTable") \
              .format("dynamodb") \
              .save()Note: When running from pyspark shell, you can add the library as:
pyspark --packages com.audienceproject:spark-dynamodb_<spark-scala-version>:<version>The following parameters can be set as options on the Spark reader and writer object before loading/saving.
- regionsets the region where the dynamodb table. Default is environment specific.
- roleArnsets an IAM role to assume. This allows for access to a DynamoDB in a different account than the Spark cluster. Defaults to the standard role configuration.
The following parameters can be set as options on the Spark reader object before loading.
- readPartitionsnumber of partitions to split the initial RDD when loading the data into Spark. Defaults to the size of the DynamoDB table divided into chunks of- maxPartitionBytes
- maxPartitionBytesthe maximum size of a single input partition. Default 128 MB
- defaultParallelismthe number of input partitions that can be read from DynamoDB simultaneously. Defaults to- sparkContext.defaultParallelism
- targetCapacityfraction of provisioned read capacity on the table (or index) to consume for reading. Default 1 (i.e. 100% capacity).
- stronglyConsistentReadswhether or not to use strongly consistent reads. Default false.
- bytesPerRCUnumber of bytes that can be read per second with a single Read Capacity Unit. Default 4000 (4 KB). This value is multiplied by two when- stronglyConsistentReads=false
- filterPushdownwhether or not to use filter pushdown to DynamoDB on scan requests. Default true.
- throughputthe desired read throughput to use. It overwrites any calculation used by the package. It is intended to be used with tables that are on-demand. Defaults to 100 for on-demand.
The following parameters can be set as options on the Spark writer object before saving.
- writeBatchSizenumber of items to send per call to DynamoDB BatchWriteItem. Default 25.
- targetCapacityfraction of provisioned write capacity on the table to consume for writing or updating. Default 1 (i.e. 100% capacity).
- updateif true items will be written using UpdateItem on keys rather than BatchWriteItem. Default false.
- throughputthe desired write throughput to use. It overwrites any calculation used by the package. It is intended to be used with tables that are on-demand. Defaults to 100 for on-demand.
- inferSchemaif false will not automatically infer schema - this is useful when writing to a table with many columns
The following Java system properties are available for configuration.
- aws.profileIAM profile to use for default credentials provider.
- aws.dynamodb.regionregion in which to access the AWS APIs.
- aws.dynamodb.endpointendpoint to use for accessing the DynamoDB API.
- aws.sts.endpointendpoint to use for accessing the STS API when assuming the role indicated by the- roleArnparameter.
Usage of parallel scan and rate limiter inspired by work in https://github.com/traviscrawford/spark-dynamodb