exasol / spark-exasol-connector

A connector for Apache Spark to access Exasol

GitHub

[WIP] Spark Exasol Connector

Build Status Codecov Maven Central

Please note that this is an open source project which is not officially supported by Exasol. We will try to help you as much as possible, but can't guarantee anything since this is not an official Exasol product.

Overview

This is a connector library that supports an integration between Exasol and Apache Spark. Using this connector, users can create Spark dataframes from Exasol queries and save Spark dataframes as Exasol tables.

The implementation is based on Spark DataSources API and Exasol Sub Connections.

Quick Start

Here is short code snippets on how to use the connector in your Spark / Scala applications.

Reading data from Exasol as Spark dataframe:

// An Exasol sql syntax query string
val exasolQueryString =
  """
    SELECT SALES_DATE, MARKET_ID, PRICE
    FROM RETAIL.SALES
    WHERE MARKET_ID IN (661, 534, 667)
  """

val df = sparkSession
     .read
     .format("exasol")
     .option("host", "10.0.0.11")
     .option("port", "8563")
     .option("username", "sys")
     .option("password", "exaPass")
     .option("query", exasolQueryString)
     .load()

Saving a Spark dataframe as an Exasol table:

val df = sparkSession
     .write
     .mode("append")
     .option("host", "10.0.0.11")
     .option("port", "8563")
     .option("username", "sys")
     .option("password", "exaPass")
     .option("table", "RETAIL.ADJUSTED_SALES")
     .format("exasol")
     .save()

Additionally, you can set the parameter on SparkConf:

// Configure spark session
val sparkConf = new SparkConf()
  .setMaster("local[*]")
  .set("spark.exasol.host", "localhost")
  .set("spark.exasol.port", "8563")
  .set("spark.exasol.username", "sys")
  .set("spark.exasol.password", "exasol")
  .set("spark.exasol.max_nodes", "200")

val sparkSession = SparkSession
  .builder()
  .config(sparkConf)
  .getOrCreate()

val queryStr = "SELECT * FROM MY_SCHEMA.MY_TABLE"

val df = sparkSession
     .read
     .format("exasol")
     .option("query", queryStr)
     .load()

Please note that parameter values set on Spark configuration will have higher priority.

For an example walkthrough please check docs/example-walkthrough.

Usage

The latest release version (Maven Central) is compiled against Scala 2.11 and Spark 2.1+.

In order to use the connector in your Java or Scala applications, you can include it as a dependency to your projects by adding artifact information into build.sbt or pom.xml files.

build.sbt

resolvers ++= Seq("Exasol Releases" at "https://maven.exasol.com/artifactory/exasol-releases")

libraryDependencies += "com.exasol" % "spark-connector_2.11" % "$LATEST_VERSION"

pom.xml

<repository>
    <id>maven.exasol.com</id>
    <url>https://maven.exasol.com/artifactory/exasol-releases</url>
</repository>

<dependency>
    <groupId>com.exasol</groupId>
    <artifactId>spark-connector_2.11</artifactId>
    <version>$LATEST_VERSION</version>
</dependency>

Alternative Option

As an alternative, you can provide --repositories and --packages artifact coordinates to the spark-submit, spark-shell or pyspark commands.

For example:

spark-shell \
    --repositories https://maven.exasol.com/artifactory/exasol-releases \
    --packages com.exasol:spark-connector_2.11:$LATEST_VERSION

Deployment

Similarly, you can submit packaged application into the Spark cluster.

Using spark-submit:

spark-submit \
    --master spark://spark-master-url:7077
    --repositories https://maven.exasol.com/artifactory/exasol-releases \
    --packages com.exasol:spark-connector_2.11:$LATEST_VERSION \
    --class com.myorg.SparkExasolConnectorApp \
    --conf spark.exasol.password=exaTru3P@ss \
    path/to/project/folder/target/scala-2.11/sparkexasolconnectorapp_2.11-5.3.1.jar

This deployment example also shows that you can configure the Exasol parameters at startup using --conf spark.exasol.keyName=value syntax.

Please update the $LATEST_VERION accordingly with the latest artifact version number.

Configuration

The following configuration parameters can be provided mainly to facilitate a connection to Exasol cluster.

Spark Configuration Configuration Default Description
query A query string to send to Exasol
table A table name (with schema, e.g. my_schema.my_table) to save dataframe
spark.exasol.host host localhost A host ip address to the first Exasol node (e.g. 10.0.0.11)
spark.exasol.port port 8888 A port number to connect to Exasol nodes (e.g. 8563)
spark.exasol.username username sys An Exasol username for logging in
spark.exasol.password password exasol An Exasol password for logging in
spark.exasol.max_nodes max_nodes 200 The number of data nodes in Exasol cluster
spark.exasol.batch_size batch_size 1000 The number of records batched before running execute statement when saving dataframe
spark.exasol.create_table create_table false A permission to create table if it does not exist in Exasol when saving dataframe

Building and Testing

Clone the repository,

git clone https://github.com/exasol/spark-exasol-connector

cd spark-exasol-connector/

Compile,

./sbtx compile

Run unit tests,

./sbtx test

To run integration tests, a separate docker network should be created first,

docker network create -d bridge --subnet 192.168.0.0/24 --gateway 192.168.0.1 dockernet

then run,

./sbtx it:test

The integration tests requires docker, exasol/docker-db, testcontainers and spark-testing-base.

In order to create a bundled jar,

./sbtx assembly

This creates a jar file under target/ folder. The jar file can be used with spark-submit, spark-shell or pyspark commands. For example,

spark-shell --jars /path/to/spark-exasol-connector-assembly-*.jar

FAQ

  • Getting an Connection was lost and could not be reestablished error

    For example:

    [error] Caused by: com.exasol.jdbc.ConnectFailed: Connection was lost and could not be reestablished.  (SessionID: 1615669509094853970)
    [error]         at com.exasol.jdbc.AbstractEXAConnection.reconnect(AbstractEXAConnection.java:3505)
    [error]         at com.exasol.jdbc.ServerCommunication.handle(ServerCommunication.java:98)
    [error]         at com.exasol.jdbc.AbstractEXAConnection.communication(AbstractEXAConnection.java:2537)
    [error]         at com.exasol.jdbc.AbstractEXAConnection.communication_resultset(AbstractEXAConnection.java:2257)
    [error]         at com.exasol.jdbc.AbstractEXAStatement.execute(AbstractEXAStatement.java:456)
    [error]         at com.exasol.jdbc.EXAStatement.execute(EXAStatement.java:278)
    [error]         at com.exasol.jdbc.AbstractEXAStatement.executeQuery(AbstractEXAStatement.java:601)
    [error]         at com.exasol.spark.rdd.ExasolRDD.compute(ExasolRDD.scala:125)
    [error]         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    [error]         at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    

    This is one of the known issues. This happens when Spark scheduled parallel tasks are less than the number of sub connections. This can be mitigated by submitting Spark application with enough resources so that it can start parallel tasks that are more or equal to number of parallel Exasol connections.

    Additionally, you can limit the Exasol parallel connections using max_nodes parameter. However, it is not advised to limit this value in production environment.