memsql / singlestore-spark-connector Edit

A connector for MemSQL and Spark

Version Matrix

SingleStore Spark Connector

Version: 4.0.0 License

Getting Started

You can find the latest version of the connector on Maven Central and The group is com.singlestore and the artifact is singlestore-spark-connector_2.11 for Spark 2 and singlestore-spark-connector_2.12 for Spark 3.

You can add the connector to your Spark application using: spark-shell, pyspark, or spark-submit

$SPARK_HOME/bin/spark-shell --packages com.singlestore:singlestore-spark-connector_2.12:4.0.0-spark-3.1.0

We release two versions of the singlestore-spark-connector, one per Spark version. An example version number is: .4.0.0-spark-3.1.0 which is the 4.0.0 version of the connector, compiled and tested against Spark 3.1.0. Make sure you are using the most recent version of the connector.

In addition to adding the singlestore-spark-connector, you will also need to have the SingleStore JDBC driver installed. This library is tested against the following SingleStore driver version:

"com.singlestore" % "singlestore-jdbc-client"  % "1.0.1"

Once you have everything installed, you're almost ready to run your first queries against SingleStore!


The singlestore-spark-connector is configurable globally via Spark options and locally when constructing a DataFrame. The options are named the same, however global options have the prefix spark.datasource.singlestore..

Option Description
ddlEndpoint (required) Hostname or IP address of the SingleStore Master Aggregator in the format host[:port] (port is optional). Ex. or
dmlEndpoints Hostname or IP address of SingleStore Aggregator nodes to run queries against in the format host[:port],host[:port],... (port is optional, multiple hosts separated by comma). Ex. child-agg:3308,child-agg2 (default: ddlEndpoint)
user SingleStore username (default: root)
password SingleStore password (default: no password)
query The query to run (mutually exclusive with dbtable)
dbtable The table to query (mutually exclusive with query)
database If set, all connections will default to using this database (default: empty)
disablePushdown Disable SQL Pushdown when running queries (default: false)
enableParallelRead Enable reading data in parallel for some query shapes; one of (disabled, automaticLite, automatic, forced) (default: automaticLite)
parallelRead.Features Specify comma separated list of parallel read features that will be tried. The order in which features are listed determines their priority. Supported features: ReadFromLeaves, ReadFromAggregators, ReadFromAggregatorsMaterialized. Ex. ReadFromLeaves,ReadFromAggregators (default: ReadFromAggregators).
parallelRead.tableCreationTimeoutMS Number of milliseconds reader will wait for the result table creation when the ReadFromAggregators feature is used; 0 means no timeout (default: 0)
parallelRead.tableCreationTimeoutMaterializedMS Number of milliseconds reader will wait for the result table creation when the ReadFromAggregatorsMaterialized feature is used; 0 means no timeout (default: 0)
parallelRead.repartition Repartition data before reading it (default: false)
parallelRead.repartition.columns Comma separated list of column names that are used for repartitioning, if parallelRead.repartition is enabled. By default, repartitioning is done using an additional column with RAND() value.
overwriteBehavior Specify the behavior during Overwrite; one of dropAndCreate, truncate, merge (default: dropAndCreate)
truncate ⚠️ Deprecated option, please use overwriteBehavior instead Truncate instead of drop an existing table during Overwrite (default: false)
loadDataCompression Compress data on load; one of (GZip, LZ4, Skip) (default: GZip)
loadDataFormat Serialize data on load; one of (Avro, CSV) (default: CSV)
tableKey Specify additional keys to add to tables created by the connector (See below for more details)
onDuplicateKeySQL If this option is specified, and a row is to be inserted that would result in a duplicate value in a PRIMARY KEY or UNIQUE index, SingleStore will instead perform an UPDATE of the old row. See examples below
insertBatchSize Size of the batch for row insertion (default: 10000)
maxErrors The maximum number of errors in a single LOAD DATA request. When this limit is reached, the load fails. If this property equals to 0, no error limit exists (Default: 0)


Example of configuring the singlestore-spark-connector globally:

spark.conf.set("spark.datasource.singlestore.ddlEndpoint", "singlestore-master.cluster.internal")
spark.conf.set("spark.datasource.singlestore.dmlEndpoints", "singlestore-master.cluster.internal,singlestore-child-1.cluster.internal:3307")
spark.conf.set("spark.datasource.singlestore.user", "admin")
spark.conf.set("spark.datasource.singlestore.password", "s3cur3-pa$$word")

Example of configuring the singlestore-spark-connector using the read API:

val df =
    .option("ddlEndpoint", "singlestore-master.cluster.internal")
    .option("user", "admin")

Example of configuring the singlestore-spark-connector using an external table in Spark SQL:

CREATE TABLE bar USING singlestore OPTIONS ('ddlEndpoint'='singlestore-master.cluster.internal','dbtable'='')

For Java/Python versions of some of these examples, visit the section "Java & Python Example"

Writing to SingleStore

The singlestore-spark-connector supports saving dataframes to SingleStore using the Spark write API. Here is a basic example of using this API:

    .option("loadDataCompression", "LZ4")
    .option("overwriteBehavior", "dropAndCreate")
    .save("") // in format: database.table

If the target table ("foo" in the example above) does not exist in SingleStore the singlestore-spark-connector will automatically attempt to create the table. If you specify SaveMode.Overwrite, if the target table already exists, it will be recreated or truncated before load. Specify overwriteBehavior = truncate to truncate rather than re-create.

Retrieving the number of written rows from taskMetrics

It is possible to add the listener and get the number of written rows.

spark.sparkContext.addSparkListener(new SparkListener() {
  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
    println("Task id: " +
    println("Records written: " + taskEnd.taskMetrics.outputMetrics.recordsWritten.toString)


Specifying keys for tables created by the Spark Connector

When creating a table, the singlestore-spark-connector will read options prefixed with tableKey. These options must be formatted in a specific way in order to correctly specify the keys.

⚠️ The default table type is SingleStore Columnstore. If you want a RowStore table, you will need to specify a Primary Key using the tableKey option.

To explain we will refer to the following example:

    .option("tableKey.primary", "id")
    .option("tableKey.key.created_firstname", "created, firstName")
    .option("tableKey.unique", "username")
    .save("") // in format: database.table

In this example, we are creating three keys:

  1. A primary key on the id column
  2. A regular key on the columns created, firstname with the key name created_firstname
  3. A unique key on the username column

Note on (2): Any key can optionally specify a name, just put it after the key type. Key names must be unique.

To change the default ColumnStore sort key you can specify it explicitly:

    .option("tableKey.columnstore", "id")

You can also customize the shard key like so:

    .option("tableKey.shard", "id, timestamp")

Inserting rows into the table with ON DUPLICATE KEY UPDATE

When updating a rowstore table it is possible to insert rows with ON DUPLICATE KEY UPDATE option. See sql reference for more details.

    .option("onDuplicateKeySQL", "age = age + 1")
    .option("insertBatchSize", 300)

As a result of the following query, all new rows will be appended without changes. If the row with the same PRIMARY KEY or UNIQUE index already exists then the corresponding age value will be increased.

When you use ON DUPLICATE KEY UPDATE, all rows of the data frame are split into batches, and every insert query will contain no more than the specified insertBatchSize rows setting.

Save Modes

Save operations can optionally take a SaveMode, that specifies how to handle existing data if present. It is important to realize that these save modes do not utilize any locking and are not atomic. Additionally, when performing an Overwrite, the data will be deleted before writing out the new data.

  1. SaveMode.Append means that when saving a DataFrame to a data source, if data/table already exists, contents of the DataFrame are expected to be appended to existing data.
  2. SaveMode.Overwrite means that when saving a DataFrame to a data source, if data/table already exists, existing data is expected to be overwritten by the contents of the DataFrame.

Overwrite mode depends on overwriteBehavior option, for better understanding look at the section "Merging on save"

  1. SaveMode.ErrorIfExists means that when saving a DataFrame to a data source, if data already exists, an exception is expected to be thrown.
  2. SaveMode.Ignore means that when saving a DataFrame to a data source, if data already exists, the save operation is expected to not save the contents of the DataFrame and to not change the existing data.

Example of SaveMode option


Merging on save

When saving dataframes or datasets to SingleStore, you can manage how SaveMode.Overwrite is interpreted by the connector via the option overwriteBehavior. This option can take one of the following values:

  1. dropAndCreate(default) - drop and create the table before writing new values.
  2. truncate - truncate the table before writing new values.
  3. merge - replace rows with new rows by matching on the primary key. (Use this option only if you need to fully rewrite existing rows with new ones. If you need to specify some rule for update, use onDuplicateKeySQL option instead.)

All these options are case-insensitive.

Example of merge option

Suppose you have the following table, and the Id column is the primary key.

SELECT * FROM <table>;

Id Name Age
1 Alice 20
2 Bob 25
3 Charlie 30

If you save the following dataframe with overwriteBehavior = merge:

Id Name Age
2 Daniel 22
3 Eve 27
4 Franklin 35
    .option("overwriteBehavior", "merge")

After the save is complete, the table will look like this:

note: rows with Id=2 and Id=3 were overwritten with new rows
note: the row with Id=1 was not touched and still exists in the result

SELECT * FROM <table>;

Id Name Age
1 Alice 20
2 Daniel 22
3 Eve 27
4 Franklin 35

SQL Pushdown

The singlestore-spark-connector has extensive support for rewriting Spark SQL query plans into standalone SingleStore queries. This allows most of the computation to be pushed into the SingleStore distributed system without any manual intervention. The SQL rewrites are enabled automatically, but can be disabled either globally or per-query using the disablePushdown option.

⚠️ SQL Pushdown is either enabled or disabled on the entire Spark Session. If you want to run multiple queries in parallel with different values of disablePushdown, make sure to run them on separate Spark Sessions.

We currently support most of the primary Logical Plan nodes in Spark SQL including:

  • Project
  • Filter
  • Aggregate
  • Window
  • Join
  • Limit
  • Sort

We also support most Spark SQL expressions. A full list of supported operators/functions can be found in the file ExpressionGen.scala.

The best place to look for examples of fully supported queries is in the tests. Check out this file as a starting point: SQLPushdownTest.scala.

Debugging SQL Pushdown

If you encounter an issue with SQL Pushdown the first step is to look at the explain. You can do this easily from any dataframe using the function df.explain(). If you pass the argument true you will get a lot more output that includes pre and post optimization passes.

In addition, the singlestore-spark-connector outputs a lot of helpful information when the TRACE log level is enabled for the com.singlestore.spark package. You can do this in your log4j configuration by adding the following line:

Make sure not to leave it in place since it generates a huge amount of tracing output.

Parallel Read Support

Parallel read can be enabled using enableParallelRead option. This can drastically improve performance in some cases.

enableParallelRead option can take one of the following values:

  • disabled - do non-parallel read.
  • automaticLite - check if at least one parallel read feature specified in parallelRead.Features can be used and if it is, use parallel read, otherwise do non-parallel read. When this option is used and push down of outer sorting operation is done, non-parallel read is used.
  • automatic - check if at least one parallel read feature specified in parallelRead.Features can be used and if it is, use parallel read, otherwise do non-parallel read. When parallel read is performed with this option, we are unable to push down an outer sorting operation into SingleStore. Final sorting is done on the Spark side.
  • forced - check if at least one parallel read feature specified in parallelRead.Features can be used and if it is, use parallel read, otherwise throw an error. When parallel read is performed with this option, we are unable to push down an outer sorting operation into SingleStore. Final sorting is done on the Spark side.

The default value is automaticLite

Parallel read features

singlestore-spark-connector supports three parallel read features:

  • readFromAggregators
  • readFromAggregatorsMaterialized
  • readFromLeaves

You can specify what features connector should try to use by setting the parallelRead.Features option. By default, it is readFromAggregators. Connector will use the first feature specified in parallelRead.Features for which all requirements are satisfied.


When this feature is used, singlestore-spark-connector will use SingleStore parallel read functionality. The number of partitions in the resulting DataFrame will be equal to the number of partitions in the SingleStore database. Each partition will be read by the separate spark task. All tasks must start reading at the same time. So, in order to use this feature, parallelism level in the spark cluster must be enough to start all these tasks at the same time - i.e., sum of (spark.executor.cores/spark.task.cpus) for all executors should not be less than number of partitions in your database.

You can set a timeout for result table creation using the parallelRead.tableCreationTimeoutMS option.


  • SingleStore version is 7.5 or above
  • database option is set, or database name is provided in load
  • Generated query is supported by SingleStore parallel read functionality
  • Parallelism level on Spark cluster is enough to start reading from all database partitions at the same time


This feature is very similar to readFromAggregators. The only difference is that the result table is created using the MATERIALIZED option. For this feature, reading tasks don't need to be started at the same time, so parallelism level on spark cluster doesn't matter. On the other side, the MATERIALIZED option may cause the query to fail if SingleStore doesn't have enough memory to materialize the result set.

You can set a timeout for materialized result table creation using the parallelRead.tableCreationTimeoutMaterializedMS option.


  • SingleStore version is 7.5 or above
  • database option is set, or database name is provided in load
  • Generated query is supported by SingleStore parallel read functionality


When this feature is used, singlestore-spark-connector reads directly from partitions on the leaf nodes, which skips our entire transaction layer. This means that the individual reads will see an independent version of the database's distributed state. Make sure to take this into account when using readFromLeaves feature.

It supports only query-shapes which do not perform work on the Aggregator and thus can be pushed entirely down to the leaf nodes.

In order to use readFromLeaves feature, the username and password provided to the singlestore-spark-connector must be the same across all nodes in the cluster.


  • database option is set, or database name is provided in load
  • Consistent authentication and connectible leaf nodes
  • Generated query can be pushed entirely down to the leaf nodes

Parallel read repartitioning

For readFromAggregators and readFromAggregatorsMaterialized you can repartition the result using parallelRead.repartition to be sure that all tasks will read approximately the same amount of data. This option is very useful for queries with top level limit clauses as without repartitioning it is possible that all rows will belong to one partition.

Use the parallelRead.repartition.columns option to specify a comma separated list of columns that will be used for repartitioning. Column names that contain leading or trailing whitespaces or commas must be escaped as:

  • Column name must be enclosed in backticks
"a" -> "`a`"
  • Each backtick (`) in the column name must be replaced with two backticks (``)
"a`a``" -> "a``a````"

By default, repartitioning is done using an additional column with RAND() value.

.option("enableParallelRead", "automatic")
.option("parallelRead.Features", "readFromAggregators,readFromLeaves")
.option("parallelRead.repartition", "true")
.option("parallelRead.repartition.columns", "a, b")
.option("parallelRead.TableCreationTimeout", "1000")

In the following example, connector will check requirements for readFromAggregators. If they are satisfied, it will use this feature. Otherwise, it will check requirements for readFromLeaves. If they are satisfied, connector will use this feature. Otherwise, it will use non-parallel read. If the connector uses readFromAggregators, it will repartition the result on the SingleStore side before reading it and it will fail if creation of the result table will take longer than 1000 milliseconds.

Running SQL queries

The methods executeSinglestoreQuery(query: String, variables: Any*) and executeSinglestoreQueryDB(db: String, query: String, variables: Any*) allow you to run SQL queries on a SingleStore database directly using the existing SparkSession object. The method executeSinglestoreQuery uses the database defined in the SparkContext object you use. executeSinglestoreQueryDB allows you to specify the database that will be used for querying. The following examples demonstrate their usage (assuming you already have initialized SparkSession object named spark). The methods return Iterator[org.apache.spark.sql.Row] object.

// this imports the implicit class QueryMethods which adds the methods
// executeSinglestoreQuery and executeSinglestoreQueryDB to SparkSession class
import com.singlestore.spark.SQLHelper.QueryMethods

// You can pass an empty database to executeSinglestoreQueryDB to connect to SingleStore without specifying a database.
// This allows you to create a database which is defined in the SparkSession config for example.
spark.executeSinglestoreQueryDB("", "CREATE DATABASE foo")
// the next query can be used if the database field has been specified in spark object
s = spark.executeSinglestoreQuery("CREATE TABLE user(id INT, name VARCHAR(30), status BOOLEAN)")

// you can create another database
spark.executeSinglestoreQuery("CREATE DATABASE bar")
// the database specified as the first argument will override the database set in the SparkSession object
s = spark.executeSinglestoreQueryDB("bar", "CREATE TABLE user(id INT, name VARCHAR(30), status BOOLEAN)")

You can pass query parameters to the functions as arguments following query. The supported types for parameters are String, Int, Long, Short, Float, Double, Boolean, Byte, java.sql.Date, java.sql.Timestamp.

import com.singlestore.spark.SQLHelper.QueryMethods

var userRows = spark.executeSinglestoreQuery("SELECT id, name FROM USER WHERE id > ? AND status = ? AND name LIKE ?", 10, true, "%at%")

for (row <- userRows) {
  println(row.getInt(0), row.getString(1))

Alternatively, these functions can take SparkSession object as the first argument, as in the example below

import com.singlestore.spark.SQLHelper.{executeSinglestoreQuery, executeSinglestoreQueryDB}

executeSinglestoreQuery(spark, "CREATE DATABASE foo")
var s = executeSinglestoreQueryDB(spark, "foo", "SHOW TABLES")


Connecting with a Kerberos-authenticated User

You can use the SingleStore Spark Connector with a Kerberized user without any additional configuration. To use a Kerberized user, you need to configure the connector with the given SingleStore database user that is authenticated with Kerberos (via the user option). Please visit our documentation here to learn about how to configure SingleStore users with Kerberos.

Here is an example of configuring the Spark connector globally with a Kerberized SingleStore user called krb_user.

spark = SparkSession.builder()
    .config("spark.datasource.singlestore.user", "krb_user")

You do not need to provide a password when configuring a Spark Connector user that is Kerberized. The connector driver (SingleStore JDBC driver) will be able to authenticate the Kerberos user from the cache by the provided username. Other than omitting a password with this configuration, using a Kerberized user with the Connector is no different than using a standard user. Note that if you do provide a password, it will be ignored.

SQL Permissions

SingleStore has a permission matrix which describes the permissions required to run each command.

To make any SQL operations through Spark connector you should have different permissions for different type of operation. The matrix below describes the minimum permissions you should have to perform some operation. As alternative to minimum required permissions, ALL PRIVILEGES allow you to perform any operation.

Operation Min. Permission Alternative Permission

For more information on GRANTING privileges, see this documentation

SSL Support

The SingleStore Spark Connector uses the SingleStore JDBC Driver under the hood and thus supports SSL configuration out of the box. In order to configure SSL, first ensure that your SingleStore cluster has SSL configured. Documentation on how to set this up can be found here:

Once you have setup SSL on your server, you can enable SSL via setting the following options:

spark.conf.set("spark.datasource.singlestore.useSSL", "true")
spark.conf.set("spark.datasource.singlestore.serverSslCert", "PATH/TO/CERT")

Note: the serverSslCert option may be server's certificate in DER form, or the server's CA certificate. Can be used in one of 3 forms:

  • serverSslCert=/path/to/cert.pem (full path to certificate)
  • serverSslCert=classpath:relative/cert.pem (relative to current classpath)
  • or as verbatim DER-encoded certificate string ------BEGIN CERTIFICATE-----...

You may also want to set these additional options depending on your SSL configuration:

spark.conf.set("spark.datasource.singlestore.trustServerCertificate", "true")
spark.conf.set("spark.datasource.singlestore.disableSslHostnameVerification", "true")

More information on the above parameters can be found at SingleStore JDBC driver documentation here:

Filing issues

When filing issues please include as much information as possible as well as any reproduction steps. It's hard for us to reproduce issues if the problem depends on specific data in your SingleStore table for example. Whenever possible please try to construct a minimal reproduction of the problem and include the table definition and table contents in the issue.

If the issue is related to SQL Pushdown (or you aren't sure) make sure to include the TRACE output (from the com.singlestore.spark package) or the full explain of the plan. See the debugging SQL Pushdown section above for more information on how to do this.

Happy querying!

Setting up development environment

  • install Oracle JDK 8 from this url:
  • install the community edition of Intellij IDEA from
  • clone the repository
  • in Intellij IDEA choose Configure->Plugins and install Scala plugin
  • in Intellij IDEA run Import Project and select path to singlestore-spark-connector build.sbt file
  • choose import project from external model and sbt
  • in Project JDK select New...->JDK and choose the path to the installed JDK
  • Finish
  • it will overwrite some files and create build files (which are in .gitignore)
  • you may need to remove the .idea directory for IDEA to load the project properly
  • in Intellij IDEA choose File->Close Project
  • run git checkout . to revert all changes made by Intellij IDEA
  • in Intellij IDEA choose Open and select path to singlestore-spark-connector
  • run Test Spark 3.0 (it should succeed)

SQL Pushdown Incompatibilities

  • ToUnixTimestamp and UnixTimestamp handle only time less then 2038-01-19 03:14:08, if they get DateType or TimestampType as a first argument
  • FromUnixTime with default format (yyyy-MM-dd HH:mm:ss) handle only time less then 2147483648 (2^31)
  • DecimalType on the overflow is truncated (by default spark either throws exception or returns null)
  • greatest and least return null if at least one argument is null (in spark these functions skip nulls)
  • When value can not be converted to numeric or fractional type MemSQL returns 0 (spark returns null)
  • Atanh(x), for x ∈ (-∞, -1] ∪ [1, ∞) retuns, null (spark returns NaN)
  • When string is casted to numeric type, singlestore takes the prefix of it which is numeric (spark returns null if the whole string is not numeric)
  • When numeric type is casted to the smaller one singlestore truncates it. For example 500 casted to the Byte will be 127 Note: spark optimizer can optimize casts for literals and then behaviour for them will match custom spark behaviour
  • When fractional type is casted to integral type singlestore rounds it to the closest value
  • Log instead of NaN, Infinity, -Infinity returns null
  • Round rounds down, if the number that should be rounded is followed by 5 and it is DOUBLE or FLOAT (DECIMAL will be rounded up)
  • Conv works differently if the number contains non alphanumeric characters
  • ShiftLeft, ShiftRight and ShiftRightUnsigned converts the value to the UNSIGNED BIGINT and then produces the shift In the case of overflow, it returns 0 (1<<64 = 0 and 10>>20 = 0)

Major changes from the 2.0.0 connector

The SingleStore Spark Connector 4.0.0 has a number of key features and enhancements:

  • Introduces SQL Optimization & Rewrite for most query shapes and compatible expressions
  • Implemented as a native Spark SQL plugin
  • Supports both the DataSource and DataSourceV2 API for maximum support of current and future functionality
  • Contains deep integrations with the Catalyst query optimizer
  • Is compatible with Spark 3.0, 3.1 and 3.2
  • Leverages SingleStore LOAD DATA to accelerate ingest from Spark via compression, vectorized cpu instructions, and optimized segment sizes
  • Takes advantage of all the latest and greatest features in SingleStore 7.x

Java & Python Examples



SparkConf conf = new SparkConf();
conf.set("spark.datasource.singlestore.ddlEndpoint", "singlestore-master.cluster.internal")
conf.set("spark.datasource.singlestore.dmlEndpoints", "singlestore-master.cluster.internal,singlestore-child-1.cluster.internal:3307")
conf.set("spark.datasource.singlestore.user", "admin")
conf.set("spark.datasource.singlestore.password", "s3cur3-pa$$word")

Read Data

DataFrame df = spark
  .option("ddlEndpoint", "singlestore-master.cluster.internal")
  .option("user", "admin")

Write Data

    .option("loadDataCompression", "LZ4")
    .option("overwriteBehavior", "dropAndCreate")



spark.conf.set("spark.datasource.singlestore.ddlEndpoint", "singlestore-master.cluster.internal")
spark.conf.set("spark.datasource.singlestore.dmlEndpoints", "singlestore-master.cluster.internal,singlestore-child-1.cluster.internal:3307")
spark.conf.set("spark.datasource.singlestore.user", "admin")
spark.conf.set("spark.datasource.singlestore.password", "s3cur3-pa$$word")

Read Data

df = spark \
  .read \
  .format("singlestore") \
  .option("ddlEndpoint", "singlestore-master.cluster.internal") \
  .option("user", "admin") \

Write Data

df.write \
    .format("singlestore") \
    .option("loadDataCompression", "LZ4") \
    .option("overwriteBehavior", "dropAndCreate") \
    .mode("overwrite") \