spark-cassandra-sink is a Spark Structured Streaming Sink for cassandra. It requires a streaming Dataset/Dataframe and inserts its rows into a cassandra table.
Add the library in the build.sbt
:
libraryDependencies += "com.github.bdoepf" %% "spark-cassandra-sink" % "2.4.0"
The version should be the same like the spark version used. Only Spark > 2.3.0 is supported. Please check maven central for available versions.
Configure Spark to be able to connect to cassandra.
spark.cassandra.connection.host=localhost
spark.cassandra.connection.port=9092
Spark supports several ways for configuration. Please check Spark's configuration documentation for more detials. For the full list of possible cassandra configurations in Spark please check the internally used spark-cassandra-connector docs.
Use the Spark Cassandra Sink by passing the format cassandra-streaming
.
val source = ... // Streaming Dataset or Dataframe
val query = source
.writeStream
.format("cassandra-streaming")
.option("checkpointLocation", "/tmp/checkpoint")
.option("keyspace", "my_keyspace")
.option("table", "my_table")
.queryName("socket-cassandra-streaming")
.start()
The Spark Cassandra Sink inserts the Dataframe/Dataset's rows into a cassandra table.
When using a streaming Dataset the field names and types of the case class used must match the cassandra's table schema. The same applies for the Dataframe's schema when using a streaming Dataframe. For mapping between scala and cassandra types please check spark-cassandra-connector documentation.
CheckpointLocation is mandatory like for all Spark Structured Streaming Sink's except MemorySink.
// configure spark
val spark = SparkSession
.builder()
.config("spark.cassandra.connection.host", "localhost")
.config("spark.cassandra.connection.port", 9042)
.master("local[*]")
.getOrCreate()
import spark.implicits._
// Run `nc -lk 9999` in a separate terminal before starting the spark structured streaming job
// Create the streaming source
val socket = spark
.readStream
.format("socket")
.options(Map("host" -> "localhost", "port" -> "9999"))
.load()
.as[String]
// apply transformations
val transformed = socket.map { s =>
val records = s.split(",")
assert(records.length >= 2)
(records(0).toInt, records(1))
}.selectExpr("_1 as id", "_2 as description")
// Use Spark Cassandra Sink
val query = transformed
.writeStream
.format("cassandra-streaming")
.option("checkpointLocation", "/tmp/demo-checkpoint")
.option("keyspace", "my_keyspace")
.option("table", "my_table")
.queryName("socket-cassandra-streaming")
.start()
query.awaitTermination()
The example above requires a running cassandra on localhost. Start cassandra via docker
docker run --name cassandra -d -p 9042:9042 cassandra:latest
Save following CQL commands to a file named create.cql
:
cat <<EOF > create.cql
CREATE KEYSPACE IF NOT EXISTS my_keyspace WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };
CREATE TABLE IF NOT EXISTS my_keyspace.my_table (
id int,
description text,
PRIMARY KEY (id));
EOF
Copy the create.cql
file into the docker container and apply it
docker cp create.cql cassandra:/tmp/
docker exec cassandra cqlsh --file /tmp/create.cql
- Start the netcat server
nc -lk 9999
- Start the spark job from above
- Type lines of the following format into the nc terminal
1,hello 2,world
- Check in another terminal if the rows have been written to cassandra
There should have been inserted two rows:
docker exec -it cassandra cqlsh > SELECT * FROM my_keyspace.my_table;
id | description ----+------------- 1 | hello 2 | world