olib963 / avrocheck

Scalacheck integration with Avro

GitHub

Avrocheck

avrocheck
avrocheck

What?

A small library to generate random GenericRecords from a given Avro schema using ScalaCheck.

Why?

First of all we always want to make sure our custom deserialisation code is able to deserialise any message using its reader schema that was written using the writer schema.

Example schema:

user-schema.avsc
{
  "namespace": "example.avro",
  "type": "record",
  "name": "User",
  "fields": [
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "age",
      "type": "int"
    },
    {
      "name": "favourite_number",
      "type": [
        "int",
        "null"
      ]
    }
  ]
}

An example test to check this using avro4s would be something like:

User.scala
import com.sksamuel.avro4s.{AvroName, AvroSchema, Decoder}
import org.apache.avro.Schema

case class User(name: String, @AvroName("favourite_number") favouriteNumber: Option[Int])

object User {
  val decoder: Decoder[User] = Decoder[User]
  val schema: Schema = AvroSchema[User]
}
SerdeProperty.scala
import com.sksamuel.avro4s.{AvroSchema, Decoder, DefaultFieldMapper, Encoder}
import org.scalacheck.{Arbitrary, Gen, Properties}
import org.scalacheck.Prop.forAll
import io.github.olib963.avrocheck._
import io.github.olib963.avrocheck.Implicits._

import scala.util.{Success, Try}

object SerdeProperty extends Properties("Serde") {

  // This test checks that Avro4s can deserialise a User case class from a generic record created by it's own autogenerated schema
  property("avro4s round trip") = forAll(Gen.resultOf(User.apply(_: String, _: Option[Int]))) {
    user =>
      val encoded = Encoder[User].encode(user, User.schema, DefaultFieldMapper)
      val decoded = User.decoder.decode(encoded, User.schema, DefaultFieldMapper)
      decoded == user
  }

  // The problem with this can be shown by creating a case class with invalid typing that does not match our schema
  case class InvalidUser(name: Boolean, // Name has the wrong type
                         favouriteNumber: Option[Int] // favouriteNumber should be favourite_number
                        )

  // and seeing that the same test would pass
  property("avro4s invalid round trip") = forAll(Gen.resultOf(InvalidUser)) {
    user =>
      val encoded = Encoder[InvalidUser].encode(user, AvroSchema[InvalidUser], DefaultFieldMapper)
      val decoded = Decoder[InvalidUser].decode(encoded, AvroSchema[InvalidUser], DefaultFieldMapper)
      decoded == user
  }

  // Using avrocheck we can instead check that a record created using the full writer schema is compatible with our case class.
  private val schema = schemaFromResource("user-schema.avsc")
  property("deserialises user messages") = forAll(genFromSchema(schema)) { record =>
    Try(User.decoder.decode(record, User.schema, DefaultFieldMapper)).isSuccess
  }

  // Or if you want to be more precise:
  property("deserialises user messages with correct values") = {
    val generator = for {
      name <- Arbitrary.arbString.arbitrary
      favNum <- Arbitrary.arbOption[Int].arbitrary
      favourite_number = favNum.map(Int.box).orNull

      // Notice that here we do not override age (or in the later schema favourite_colour) because these
      // values are of no interest to our application code
      overrides = overrideFields("name" -> name, "favourite_number" -> favourite_number)
      record <- genFromSchema(schema, overrides = overrides)
    } yield (record, User(name, favNum))
    forAll(generator) { case (record, user) =>
      Try(User.decoder.decode(record, User.schema, DefaultFieldMapper)) == Success(user)
    }
  }

}

Secondly it can be useful to write high level system property tests in terms of messages in and out. As an example we will use the above schema to write a very simple application that signs up users with their favourite number. If their favourite number is negative we give them a £10 sign up bonus and if it is between -1000 and -2000 we give them a double bonus.

import io.github.olib963.avrocheck._
import io.github.olib963.avrocheck.Implicits._
import org.apache.avro.Schema
import org.scalacheck.Prop.forAll
import org.scalacheck.{Gen, Properties}

import scala.util.Success

object ApplicationProperty extends Properties("My application") {

  val schema = schemaFromResource("user-schema.avsc")

  property("persists users with negative favourite numbers and gives them a bonus") = {
    val generator = for {
      name <- Gen.alphaNumStr
      // Any number in (-inf, -2001] or [-1000, -1]
      favNum <- Gen.oneOf(Gen.negNum[Int].map(_ - 2001), Gen.chooseNum[Int](-1000, -1))
      overrides = overrideFields("name" -> name, "favourite_number" -> favNum)
      message <- genFromSchema(schema, overrides = overrides)
    } yield (name, message)
    forAll(generator) { case (name, message) =>
      val result = Application.processUser(message)
      result == Success(PersistedWithBonus(name, 10))
    }
  }

  property("gives a double bonus if their favourite number is between -2000 and -1000") = {
    val generator = for {
      name <- Gen.alphaNumStr
      favNum <- Gen.chooseNum[Int](-2000, -1001)
      overrides = overrideFields("name" -> name, "favourite_number" -> favNum)
      message <- genFromSchema(schema, overrides = overrides)
    } yield (name, message)
    forAll(generator) { case (name, message) =>
      val result = Application.processUser(message)
      result == Success(PersistedWithBonus(name, 20))
    }
  }

  property("persists users with a positive or no favourite number with no bonus") = {
    val generator = for {
      name <- Gen.alphaNumStr
      favNum <- Gen.oneOf(Gen.const(null), Gen.posNum[Int])
      overrides = overrideFields("name" -> name, "favourite_number" -> favNum)
      message <- genFromSchema(schema, overrides = overrides)
    } yield (name, message)
    forAll(generator) { case (name, message) =>
      val result = Application.processUser(message)
      result == Success(Persisted(name))
    }
  }

}

Due to the compatibility features of Avro, producers upstream of you should be able to make backwards compatible changes without affecting your codebase. It is easy now to verify this by just updating the schema file. For example by adding the following:

{
  "name": "favourite_colour",
  "type": [
    "string",
    "null"
  ],
  "default": "null"
}

to the above schema, the example tests all still pass.

How?

Import io.github.olib963.avrocheck._ to get access to generation from Avro schemas. Configuration can be provided explicitly or implicitly (by importing io.github.olib963.avrocheck.Implicits._). There is a utility function to read schemas from a resource file. The schema you are passing currently must either be for a RECORD or a UNION of RECORDs.

To change the default generators used by the Gen you can either explicitly pass the configuration or provide an implicit arbitrary if using the implicit configuration.

package io.github.olib963.avrocheck.documentation

import io.github.olib963.avrocheck._
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.scalacheck.Prop.forAll
import org.scalacheck.{Arbitrary, Gen, Properties}

object RecordGeneration extends Properties("generating random values from schema") {

  private val schema: Schema = schemaFromResource("user-schema.avsc")

  property("My explicit test") = forAll(genFromSchema(schema)){
    genericRecord => genericRecord.isInstanceOf[GenericRecord]
  }

  property("My explicit positive age test") =  forAll(genFromSchema(schema, Configuration.Default.copy(intGen = Gen.posNum[Int]))){
    userRecord => userRecord.get("age").asInstanceOf[Int] >= 0
  }

  // Implicit configuration
  import io.github.olib963.avrocheck.Implicits._
  property("My implicit test") = forAll(genFromSchemaImplicit(schema)){
    genericRecord => genericRecord.isInstanceOf[GenericRecord]
  }

  property("My implicit positive age test") = {
    implicit val onlyPositiveInts: Arbitrary[Int] = Arbitrary(Gen.posNum[Int])
    forAll(genFromSchemaImplicit(schema)){
      userRecord => userRecord.get("age").asInstanceOf[Int] >= 0
    }
  }

}

Logical Types

Logical types will automatically be generated using the types:

  • timestamp-millisjava.time.Instant

  • timestamp-microsjava.time.Instant

  • time-millisjava.time.LocalTime

  • time-microsjava.time.LocalTime

  • datejava.time.LocalDate

  • uuidjava.util.UUID

  • decimalscala.math.BigDecimal

If you want to provide overrides or implicit Arbitrarys for logical types you must use these types.

If you don’t want to go through the hassle of adding logical type conversions to your serialiser you can set the configuration option preserialiseLogicalTypes to true, and the values will automatically be transformed into their underlying primitives.

package io.github.olib963.avrocheck.documentation

import java.time.LocalDate

import io.github.olib963.avrocheck._
import org.apache.avro.Schema
import org.scalacheck.Prop.forAll
import org.scalacheck.{Arbitrary, Gen, Properties}

object LogicalTypeConfiguration extends Properties("Logical type configuration"){

  // This schema has a field "date" with schema {"type": "int", "logicalType": "date"}
  val schema: Schema = io.github.olib963.avrocheck.schemaFromResource("record-with-logical-types.avsc")

  private val onlyDaysSinceEpoch = Gen.posNum[Int].map(LocalDate.ofEpochDay(_))
  private val onlyDaysBeforeEpoch = Gen.negNum[Int].map(_ - 1)

  private val overriddenConfig: Configuration = Configuration.Default.copy(
    intGen = onlyDaysBeforeEpoch,
    localDateGen = onlyDaysSinceEpoch
  )

  // Generates a local date not an int
  property("Explicitly override date type") = forAll(genFromSchema(schema, overriddenConfig)) {
    record => record.get("date").isInstanceOf[LocalDate]
  }

  // Serialises the local date to an int for you, but is still using the Gen[LocalDate] not the Gen[Int] to create the value
  property("Explicitly override date type preserialised") = forAll(genFromSchema(schema, overriddenConfig.copy(preserialiseLogicalTypes = true))) {
    record => record.get("date").asInstanceOf[Int] >= 0
  }

  // Using implicit configuration
  import io.github.olib963.avrocheck.Implicits._
  implicit val onlyDaysSinceEpochArb: Arbitrary[LocalDate] = Arbitrary(onlyDaysSinceEpoch)
  implicit val onlyDaysBeforeEpochArb: Arbitrary[Int] = Arbitrary(onlyDaysBeforeEpoch)

  // Generates a local date not an int
  property("Implicitly override date type") = forAll(genFromSchemaImplicit(schema)) {
    record => record.get("date").isInstanceOf[LocalDate]
  }

  // Serialises the local date to an int for you, but is still using the Gen[LocalDate] not the Gen[Int] to create the value
  property("Implicitly override date type preserialised") = {
    implicit val preserialise: PreserialiseLogicalTypes = true
    forAll(genFromSchemaImplicit(schema)) {
      record => record.get("date").asInstanceOf[Int] >= 0
    }
  }

}

Overrides

If you want to customise the generation of your GenericRecord even more you can provide an explicit/implicit Overrides object.

package io.github.olib963.avrocheck.documentation

import io.github.olib963.avrocheck.CollectionConverters.toScala
import org.scalacheck.Prop.forAll
import org.scalacheck.{Gen, Properties}
import io.github.olib963.avrocheck._

object OverrideConfiguration extends Properties("Overriding generation") {

  //****************************//
  //  General Record Overrides  //
  //****************************//

  // User schema we have used above in documentation
  private val userSchema = schemaFromResource("user-schema.avsc")

  property("Explicitly override primitive fields") = {
    val overrides = overrideFields( // Override fields in the record by name
      "name" -> constantOverride("oli"), // Always generate the string "oli" for "name"
      "favourite_number" -> generatorOverride(Gen.posNum[Int].map(_ + 1)) // Always generate a positive Int for "favourite_number"
    )
    forAll(genFromSchema(userSchema, overrides = overrides)) { record =>
      val namedOli = record.get("name") == "oli"
      val randomIntAge = record.get("age").isInstanceOf[Int]
      val positiveFavouriteNUmber = record.get("favourite_number").asInstanceOf[Int] > 0
      namedOli && randomIntAge && positiveFavouriteNUmber
    }
  }

  property("Implicitly override primitive fields") = {
    import io.github.olib963.avrocheck.Implicits._
    // Implicitly infer the override type for each field
    implicit val overrides: Overrides = overrideFields(
      "name" -> "oli",
      "favourite_number" -> Gen.posNum[Int].map(_ + 1)
    )
    forAll(genFromSchemaImplicit(userSchema)) { record =>
      val namedOli = record.get("name") == "oli"
      val randomIntAge = record.get("age").isInstanceOf[Int]
      val positiveFavouriteNUmber = record.get("favourite_number").asInstanceOf[Int] > 0
      namedOli && randomIntAge && positiveFavouriteNUmber
    }
  }

  //******************//
  //  Union Overides  //
  //******************//

  // Schema of two records named "Foo" and "Bar". "Foo" has an "int" field of type "int".
  private val unionSchema = schemaFromResource("union-of-records.avsc")
  property("Explicitly select a branch") = {
    val fooOverrides = overrideFields("int" -> constantOverride(10))
    val overrides = selectNamedUnion(
      "Foo", // Selecting the specific "Foo" branch
      overrides = fooOverrides // Within the "Foo" branch we are setting overrides
    )
    forAll(genFromSchema(unionSchema, overrides = overrides)) { record =>
      val correctSchema = record.getSchema.getName == "Foo"
      val always10 = record.get("int") == 10
      correctSchema && always10
    }
  }

  //******************//
  //  Array Overides  //
  //******************//

  // Contains a field called "longArray" with schema {"type": "array", "items": "long"}
  private val compositeSchema = schemaFromResource("record-with-composites.avsc")

  property("Override array generation") = {
    val fiveOrTenPositiveLongs = arrayGenerationOverride(sizeGenerator = Gen.oneOf(5, 10), generatorOverride(Gen.posNum[Long]))
    val overrides = overrideFields("longArray" -> fiveOrTenPositiveLongs)
    forAll(genFromSchema(compositeSchema, overrides = overrides)) { r =>
      val array = toScala(r.get("longArray").asInstanceOf[java.util.List[Long]])
      val elementAssertion = array.forall(_ >= 0) // Array should only contain non negative longs
      val sizeAssertion = array.size == 5 || array.size == 10
      sizeAssertion && elementAssertion
    }
  }

  property("Explicitly override each element in an array") = {
    val positiveLongThenOne = arrayOverride(List(generatorOverride(Gen.posNum[Long]), constantOverride(1L)))
    val overrides = overrideFields("longArray" -> positiveLongThenOne)
    forAll(genFromSchema(compositeSchema, overrides = overrides)) { r =>
      val array = toScala(r.get("longArray").asInstanceOf[java.util.List[Long]])
      val firstElement = array.headOption
      val firstElementAssertion = firstElement.exists(_ >= 0) // First element of the array should only contain non negative longs

      val secondElement = array.tail.headOption
      val secondElementAssertion = secondElement.contains(1L) // Second element of the array should be 1

      val sizeAssertion = array.size == 2
      sizeAssertion && firstElementAssertion && secondElementAssertion
    }
  }

}

Confluent Stack Warning

If you are using this library to run tests that integrate with Kafka and the confluent stack you should be aware of this:

Schema Registry with Unions

If you are generating messages that are a UNION of RECORDs at the top level and you are using schema registry you will want the union schema to be posted for your topic. This means you cannot simply serialise the GenericRecord, instead you will need to do this:

package io.github.olib963.avrocheck.documentation

import io.confluent.kafka.schemaregistry.client.{MockSchemaRegistryClient, SchemaRegistryClient}
import io.confluent.kafka.serializers.{AbstractKafkaAvroSerDeConfig, KafkaAvroSerializer, NonRecordContainer}
import io.github.olib963.avrocheck._
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.scalacheck.Prop.forAll
import org.scalacheck.{Gen, Properties}

object SchemaRegistrySerialisation extends Properties("Confluent stack test") {

  // Schema of two records named "Foo" and "Bar"
  private val unionSchema = schemaFromResource("union-of-records.avsc")
  private val gen: Gen[GenericRecord] = genFromSchema(unionSchema)

  property("serialises with correct schema") = forAll(gen){ genericRecord =>
    val schemaRegistryClient = new MockSchemaRegistryClient()
    val config = Map(
      AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> "http://localhost:8080",
      AbstractKafkaAvroSerDeConfig.AUTO_REGISTER_SCHEMAS -> true,
    )
    val serialiser = new KafkaAvroSerializer(schemaRegistryClient, CollectionConverters.toJava(config))

    // This is NOT what you want, this will post the schema for the specific branch of the union, not the union as a whole.
    val incorrectlySerialisedTopic = "wrong-topic"
    serialiser.serialize(incorrectlySerialisedTopic, genericRecord)

    // This is what you want, this will post the union schema for the topic
    val correctlySerialisedTopic = "right-topic"
    serialiser.serialize(correctlySerialisedTopic, new NonRecordContainer(unionSchema, genericRecord))

    (schemaRegistryClient.latestSchemaForTopic(incorrectlySerialisedTopic) != unionSchema) &&
      (schemaRegistryClient.latestSchemaForTopic(correctlySerialisedTopic) == unionSchema)
  }

  implicit class SchemaRegistryOps(schemaRegistryClient: SchemaRegistryClient) {
    def latestSchemaForTopic(topicName: String): Schema = {
      val metadata = schemaRegistryClient.getLatestSchemaMetadata(s"$topicName-value")
      new Schema.Parser().parse(metadata.getSchema)
    }
  }

}