Avro for Spark
Supports Confluent Schema Registry and Apicurio Schema registry.
<dependency>
    <groupId>com.adobe</groupId>
    <artifactId>spark-avro_2.12</artifactId>
    <version>1.0.0</version>
</dependency>spark-shell --packages com.adobe:spark-avro_2.12:1.0.0 --repositories https://packages.confluent.io/maven/
<project>
    <repositories>
        <repository>
            <id>confluent</id>
            <url>https://packages.confluent.io/maven/</url>
        </repository>
    </repositories>
    <dependencies>
        <dependency>
            <groupId>com.adobe</groupId>
            <artifactId>spark-avro_2.12</artifactId>
            <version>1.0.0</version>
        </dependency>
    </dependencies>
</project>import com.adobe.spark.sql.avro._
import com.adobe.spark.sql.avro.functions._
val schemaId = 1L
val registryConfig = Map(
  "schema.registry.url" -> "mock://registry", 
  "max.schemas.per.subject" -> "200", 
  "class" -> "com.adobe.spark.sql.avro.client.ApicurioRegistryClient"
)
val serializedColumn = to_avro(col("my_data"), serConfig(schemaId, registryConfig, writeSchemaId=true, magicByteSize=4), registryConfig)import com.adobe.spark.sql.avro._
import com.adobe.spark.sql.avro.functions._
val schemaId = 1L
val registryConfig = Map(
  "schema.registry.url" -> "mock://registry", 
  "max.schemas.per.subject" -> "200", 
  "class" -> "com.adobe.spark.sql.avro.client.ApicurioRegistryClient"
)
val serializedColumn = to_avro_json(col("my_data"), serConfig(schemaId, registryConfig, magicByteSize=4), registryConfig) // writeSchemaId has no effect for jsonimport com.adobe.spark.sql.avro._
import com.adobe.spark.sql.avro.functions._
val schemaSubject = "my-schema"
val registryConfig = Map(
  "schema.registry.url" -> "mock://registry", 
  "max.schemas.per.subject" -> "200", 
  "class" -> "com.adobe.spark.sql.avro.client.ConfluentRegistryClient"
)
val serializedColumn = to_avro(col("my_data"), serConfigForSubject(schemaSubject, registryConfig, writeSchemaId=true, magicByteSize=4), registryConfig)import com.adobe.spark.sql.avro._
import com.adobe.spark.sql.avro.functions._
val schemaSubject = "my-schema"
val registryConfig = Map(
  "schema.registry.url" -> "mock://registry", 
  "max.schemas.per.subject" -> "200", 
  "class" -> "com.adobe.spark.sql.avro.client.ConfluentRegistryClient"
)
val serializedColumn = to_avro_json(col("my_data"), serConfigForSubject(schemaSubject, registryConfig, magicByteSize=4), registryConfig) // writeSchemaId has no effect for jsonimport com.adobe.spark.sql.avro._
import com.adobe.spark.sql.avro.errors._
import com.adobe.spark.sql.avro.functions._
val schemaId = 1L
val registryConfig = Map(
  "schema.registry.url" -> "mock://registry", 
  "max.schemas.per.subject" -> "200", 
  "class" -> "com.adobe.spark.sql.avro.client.ConfluentRegistryClient"
)
val deserializerConfig = deSerConfig(schemaId, registryConfig,
  errOnEvolution = true, errHandler = FailFastExceptionHandler(), magicByteSize = 4) // These 3 are optional
val serializedColumn = from_avro(col("my_data"), deserializerConfig, registryConfig)import com.adobe.spark.sql.avro._
import com.adobe.spark.sql.avro.errors._
import com.adobe.spark.sql.avro.functions._
val schemaSubject = "my-schema"
val registryConfig = Map(
  "schema.registry.url" -> "mock://registry", 
  "max.schemas.per.subject" -> "200",  
  "class" -> "com.adobe.spark.sql.avro.client.ApicurioRegistryClient"
)
val deserializerConfig = deSerConfigForSubject(schemaSubject, registryConfig,
  errOnEvolution = true, errHandler = FailFastExceptionHandler(), magicByteSize = 4) // These 3 are optional
val serializedColumn = from_avro(col("my_data"), deserializerConfig, registryConfig)import com.adobe.spark.sql.avro._
import com.adobe.spark.sql.avro.errors._
import com.adobe.spark.sql.avro.functions._
val schemaId = 1L
val registryConfig = Map(
  "schema.registry.url" -> "mock://registry", 
  "max.schemas.per.subject" -> "200", 
  "class" -> "com.adobe.spark.sql.avro.client.ConfluentRegistryClient"
)
val deserializerConfig = deSerConfig(schemaId, registryConfig, 
  errOnEvolution = true, errHandler = FailFastExceptionHandler(), magicByteSize = 4) // These 3 are optional
val serializedColumn = from_avro_json(col("my_data"), col("writerSchemaId"), deserializerConfig, registryConfig)com.adobe.spark.sql.avro.functions.registerFunctions()SELECT 
  from_avro_binary(
   data, 
   map(
     'schemaId', 1, 
     'errOnEvolution', 'false', 
     'errHandler', 'com.adobe.spark.sql.avro.errors.DefaultRecordExceptionHandler', 
     'magicByteSize', '4'
   ), 
   'default-value', 
   map('schema.registry.url', 'mock://registry', 'max.schemas.per.subject', '200')
  ) AS value
FROM dataset
""".stripMargin
SELECT 
  from_avro_binary(
  data, 
   map(
     'subject', 'deserialize-string-correctly', 
     'errOnEvolution', 'false', 
     'errHandler', 'com.adobe.spark.sql.avro.errors.DefaultRecordExceptionHandler', 
     'magicByteSize', '4'
   ), 
   'default-value', 
   map('schema.registry.url', 'mock://registry', 'max.schemas.per.subject', '200')
  ) AS value
FROM dataset
""".stripMarginSELECT 
  from_avro_binary_with_id(
   data, 
   schemaId, 
   map(
     'subject', 'deserialize-string-correctly', 
     'errOnEvolution', 'false', 
     'errHandler', 'com.adobe.spark.sql.avro.errors.DefaultRecordExceptionHandler', 
     'magicByteSize', '4'
   ), 
   'HELLO', 
   map('schema.registry.url', 'mock://registry', 'max.schemas.per.subject', '200')
  ) AS value
FROM datasetSELECT
  from_avro_json(
   data, 
   schemaId, 
   map(
     'subject', 'deserialize-string-correctly', 
     'errOnEvolution', 'false', 
     'errHandler', 'com.adobe.spark.sql.avro.errors.DefaultRecordExceptionHandler', 
     'magicByteSize', '4'
   ), 
   'default-value', 
   map('schema.registry.url', 'mock://registry', 'max.schemas.per.subject', '200')
  ) AS value
FROM dataset
""".stripMarginSELECT 
  to_avro_binary(
   data, 
   map(
     'subject', 'serialize-struct-correctly', 
     'writeSchemaId', 'true', 
     'magicByteSize', '4'
   ), 
   'HELLO', 
   map('schema.registry.url', 'mock://registry', 'max.schemas.per.subject', '200')
  ) AS record 
FROM datasetSELECT 
  to_avro_json(
   record, 
   map(
     'subject', 'serialize-struct-correctly', 
     'writeSchemaId', 'true', 
     'magicByteSize', '4'
   ), 
   'HELLO', 
   map('schema.registry.url', 'mock://registry', 'max.schemas.per.subject', '200')
  ) AS record
FROM datasetval registryConfig = Map(
  "schema.registry.url" -> "mock://registry", // Replace with your registry endpoint
  "max.schemas.per.subject" -> "200",
  "class" -> "com.adobe.spark.sql.avro.client.ApicurioRegistryClient"
)   map('schema.registry.url', 'mock://registry')  /* Replace with your registry endpoint*/   map('schema.registry.url', 'mock://registry', 'class', 'com.adobe.spark.sql.avro.client.ApicurioRegistryClient') /* Replace with your registry endpoint*/val registryConfig = Map(
    "schema.registry.url" -> "mock://registry", // Replace with your registry endpoint
    "max.schemas.per.subject" -> "200",
    "class" -> "com.adobe.spark.sql.avro.client.ConfluentRegistryClient"
)   map('schema.registry.url', 'mock://registry', 'class', 'com.adobe.spark.sql.avro.client.ConfluentRegistryClient') /* Replace with your registry endpoint*/You can handle errors when deserializing records using one of the DeSerExceptionHandler.
Refer above samples for example.
Fails the job by throwing new SparkException("Malformed record detected. Fail fast.", exception)
Returns a record matching the passed schema with all fields as null
Returns null
Returns the default value as provided
Returns the value after deserializing the default value against the reader schema