agolovenko / avro-tools   0.8.0

Apache License 2.0 GitHub

Tools for JSON to Avro conversion

Scala versions: 2.13 2.12

avro-tools

Set of tools for conversions between various text formats and avro. Available for scala 2.11, 2.12 and 2.13

Some notable features:

  • Supported input formats: json, csv, xml
  • Pluggable StringParsers
  • Pluggable validations
  • Renaming of fields that aren't named within avro convention: [A-Za-z0-9_]
  • Descriptive errors that include path of origin

Sub-projects (look inside for more documentation)

  • Core - Base common utilities
  • JSON - Set of tools for JSON to Avro conversions
  • XML - Set of tools for XML to Avro conversions
  • CSV - Set of tools for CSV to Avro conversions

Some sample code to get a taste

libraryDependencies += "io.github.agolovenko" %% "avro-tools-json" % "0.8.0"
import io.github.agolovenko.avro.StringParsers._
import io.github.agolovenko.avro._
import io.github.agolovenko.avro.json.JsonParser
import org.apache.avro.{LogicalTypes, Schema}
import org.apache.avro.generic.GenericData
import play.api.libs.json.Json

import java.time.LocalDate
import java.time.format.DateTimeFormatter

val schema = new Schema.Parser().parse("""
    |{
    |  "type": "record",
    |  "name": "sch_rec",
    |  "fields": [
    |    {
    |      "name": "f_record",
    |      "type": {
    |        "name": "sch_f_record",
    |        "type": "record",
    |        "fields": [
    |          {
    |            "name": "nf_string",
    |            "type": "string"
    |          },
    |          {
    |            "name": "nf_int",
    |            "type": "int"
    |          }
    |        ]
    |      }
    |    },
    |    {
    |      "name": "f_string",
    |      "type": "string"
    |    },
    |    {
    |      "name": "f_long",
    |      "type": "long"
    |    },
    |    {
    |      "name": "f_date",
    |      "type": {
    |        "type": "int",
    |        "logicalType": "date"
    |      }
    |    }
    |  ]
    |}""".stripMargin)

val parsers: PartialFunction[ParserContext, Any] = dateParser(DateTimeFormatter.ISO_DATE) orElse primitiveParsers

val validations: PartialFunction[ValidationContext, Unit] = {
  val nestedStringPath = Path("f-record", "nf-string")

  {
    case ctx if ctx.path =~= nestedStringPath && ctx.value.asInstanceOf[String].isEmpty =>
      throw new IllegalArgumentException("empty string")
    case ctx if ctx.schema.getType == Schema.Type.LONG && ctx.value.asInstanceOf[Long] < 0L =>
      throw new IllegalArgumentException("negative value")
    case ctx if ctx.schema.getLogicalType == LogicalTypes.date() =>
      val year = LocalDate.ofEpochDay(ctx.value.asInstanceOf[Int].toLong).getYear
      if (year != 2022) throw new IllegalArgumentException("invalid year")
  }
}

val renameRules = new RenameRules(
  RenameRule(Path("f-record"), avroName = "f_record"),
  RenameRule(Path("f-record", "nf-string"), avroName = "nf_string")
)

val parser = new JsonParser(schema, parsers, validations, renameRules)

val input = Json.parse("""
     |{
     |  "f-record": {
     |     "nf-string": "non-empty",
     |     "nf_int": "1"
     |  },
     |  "f_string": "",
     |  "f_long": 42,
     |  "f_date": "2022-01-01"
     |}
     |""".stripMargin)

val record: GenericData.Record = parser(input) //OK

val input2 = Json.parse("""
    |{
    |  "f-record": {
    |     "nf-string": "",
    |     "nf_int": 1
    |  },
    |  "f_string": "",
    |  "f_long": 42,
    |  "f_date": "2022-01-01"
    |}
    |""".stripMargin)

parser(input2) //io.github.agolovenko.avro.InvalidValueException: Invalid value '': empty string @ /f-record/nf-string