Avro for Spark
Supports Confluent Schema Registry and Apicurio Schema registry.
<dependency>
<groupId>com.adobe</groupId>
<artifactId>spark-avro_2.12</artifactId>
<version>1.0.0</version>
</dependency>
spark-shell --packages com.adobe:spark-avro_2.12:1.0.0 --repositories https://packages.confluent.io/maven/
<project>
<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>com.adobe</groupId>
<artifactId>spark-avro_2.12</artifactId>
<version>1.0.0</version>
</dependency>
</dependencies>
</project>
import com.adobe.spark.sql.avro._
import com.adobe.spark.sql.avro.functions._
val schemaId = 1L
val registryConfig = Map(
"schema.registry.url" -> "mock://registry",
"max.schemas.per.subject" -> "200",
"class" -> "com.adobe.spark.sql.avro.client.ApicurioRegistryClient"
)
val serializedColumn = to_avro(col("my_data"), serConfig(schemaId, registryConfig, writeSchemaId=true, magicByteSize=4), registryConfig)
import com.adobe.spark.sql.avro._
import com.adobe.spark.sql.avro.functions._
val schemaId = 1L
val registryConfig = Map(
"schema.registry.url" -> "mock://registry",
"max.schemas.per.subject" -> "200",
"class" -> "com.adobe.spark.sql.avro.client.ApicurioRegistryClient"
)
val serializedColumn = to_avro_json(col("my_data"), serConfig(schemaId, registryConfig, magicByteSize=4), registryConfig) // writeSchemaId has no effect for json
import com.adobe.spark.sql.avro._
import com.adobe.spark.sql.avro.functions._
val schemaSubject = "my-schema"
val registryConfig = Map(
"schema.registry.url" -> "mock://registry",
"max.schemas.per.subject" -> "200",
"class" -> "com.adobe.spark.sql.avro.client.ConfluentRegistryClient"
)
val serializedColumn = to_avro(col("my_data"), serConfigForSubject(schemaSubject, registryConfig, writeSchemaId=true, magicByteSize=4), registryConfig)
import com.adobe.spark.sql.avro._
import com.adobe.spark.sql.avro.functions._
val schemaSubject = "my-schema"
val registryConfig = Map(
"schema.registry.url" -> "mock://registry",
"max.schemas.per.subject" -> "200",
"class" -> "com.adobe.spark.sql.avro.client.ConfluentRegistryClient"
)
val serializedColumn = to_avro_json(col("my_data"), serConfigForSubject(schemaSubject, registryConfig, magicByteSize=4), registryConfig) // writeSchemaId has no effect for json
import com.adobe.spark.sql.avro._
import com.adobe.spark.sql.avro.errors._
import com.adobe.spark.sql.avro.functions._
val schemaId = 1L
val registryConfig = Map(
"schema.registry.url" -> "mock://registry",
"max.schemas.per.subject" -> "200",
"class" -> "com.adobe.spark.sql.avro.client.ConfluentRegistryClient"
)
val deserializerConfig = deSerConfig(schemaId, registryConfig,
errOnEvolution = true, errHandler = FailFastExceptionHandler(), magicByteSize = 4) // These 3 are optional
val serializedColumn = from_avro(col("my_data"), deserializerConfig, registryConfig)
import com.adobe.spark.sql.avro._
import com.adobe.spark.sql.avro.errors._
import com.adobe.spark.sql.avro.functions._
val schemaSubject = "my-schema"
val registryConfig = Map(
"schema.registry.url" -> "mock://registry",
"max.schemas.per.subject" -> "200",
"class" -> "com.adobe.spark.sql.avro.client.ApicurioRegistryClient"
)
val deserializerConfig = deSerConfigForSubject(schemaSubject, registryConfig,
errOnEvolution = true, errHandler = FailFastExceptionHandler(), magicByteSize = 4) // These 3 are optional
val serializedColumn = from_avro(col("my_data"), deserializerConfig, registryConfig)
import com.adobe.spark.sql.avro._
import com.adobe.spark.sql.avro.errors._
import com.adobe.spark.sql.avro.functions._
val schemaId = 1L
val registryConfig = Map(
"schema.registry.url" -> "mock://registry",
"max.schemas.per.subject" -> "200",
"class" -> "com.adobe.spark.sql.avro.client.ConfluentRegistryClient"
)
val deserializerConfig = deSerConfig(schemaId, registryConfig,
errOnEvolution = true, errHandler = FailFastExceptionHandler(), magicByteSize = 4) // These 3 are optional
val serializedColumn = from_avro_json(col("my_data"), col("writerSchemaId"), deserializerConfig, registryConfig)
com.adobe.spark.sql.avro.functions.registerFunctions()
SELECT
from_avro_binary(
data,
map(
'schemaId', 1,
'errOnEvolution', 'false',
'errHandler', 'com.adobe.spark.sql.avro.errors.DefaultRecordExceptionHandler',
'magicByteSize', '4'
),
'default-value',
map('schema.registry.url', 'mock://registry', 'max.schemas.per.subject', '200')
) AS value
FROM dataset
""".stripMargin
SELECT
from_avro_binary(
data,
map(
'subject', 'deserialize-string-correctly',
'errOnEvolution', 'false',
'errHandler', 'com.adobe.spark.sql.avro.errors.DefaultRecordExceptionHandler',
'magicByteSize', '4'
),
'default-value',
map('schema.registry.url', 'mock://registry', 'max.schemas.per.subject', '200')
) AS value
FROM dataset
""".stripMargin
SELECT
from_avro_binary_with_id(
data,
schemaId,
map(
'subject', 'deserialize-string-correctly',
'errOnEvolution', 'false',
'errHandler', 'com.adobe.spark.sql.avro.errors.DefaultRecordExceptionHandler',
'magicByteSize', '4'
),
'HELLO',
map('schema.registry.url', 'mock://registry', 'max.schemas.per.subject', '200')
) AS value
FROM dataset
SELECT
from_avro_json(
data,
schemaId,
map(
'subject', 'deserialize-string-correctly',
'errOnEvolution', 'false',
'errHandler', 'com.adobe.spark.sql.avro.errors.DefaultRecordExceptionHandler',
'magicByteSize', '4'
),
'default-value',
map('schema.registry.url', 'mock://registry', 'max.schemas.per.subject', '200')
) AS value
FROM dataset
""".stripMargin
SELECT
to_avro_binary(
data,
map(
'subject', 'serialize-struct-correctly',
'writeSchemaId', 'true',
'magicByteSize', '4'
),
'HELLO',
map('schema.registry.url', 'mock://registry', 'max.schemas.per.subject', '200')
) AS record
FROM dataset
SELECT
to_avro_json(
record,
map(
'subject', 'serialize-struct-correctly',
'writeSchemaId', 'true',
'magicByteSize', '4'
),
'HELLO',
map('schema.registry.url', 'mock://registry', 'max.schemas.per.subject', '200')
) AS record
FROM dataset
val registryConfig = Map(
"schema.registry.url" -> "mock://registry", // Replace with your registry endpoint
"max.schemas.per.subject" -> "200",
"class" -> "com.adobe.spark.sql.avro.client.ApicurioRegistryClient"
)
map('schema.registry.url', 'mock://registry') /* Replace with your registry endpoint*/
map('schema.registry.url', 'mock://registry', 'class', 'com.adobe.spark.sql.avro.client.ApicurioRegistryClient') /* Replace with your registry endpoint*/
val registryConfig = Map(
"schema.registry.url" -> "mock://registry", // Replace with your registry endpoint
"max.schemas.per.subject" -> "200",
"class" -> "com.adobe.spark.sql.avro.client.ConfluentRegistryClient"
)
map('schema.registry.url', 'mock://registry', 'class', 'com.adobe.spark.sql.avro.client.ConfluentRegistryClient') /* Replace with your registry endpoint*/
You can handle errors when deserializing records using one of the DeSerExceptionHandler
.
Refer above samples for example.
Fails the job by throwing new SparkException("Malformed record detected. Fail fast.", exception)
Returns a record matching the passed schema with all fields as null
Returns null
Returns the default value as provided
Returns the value after deserializing the default value against the reader schema