agmenc / planet7

Scala library for fast ETL and reconciliation.

GitHub

planet7

Fast ETL and reconciliation tool for Scala users:

  • Load and merge CSVs.
  • Rename, re-order and filter columns.
  • Sort and validate rows
  • Diff two CSVs, or any lists of comparable elements
  • Use external CSV parsing libraries such as CSVReader

Usage

Supported Scala versions:

build.sbt:

libraryDependencies += "com.github.agmenc" %% "planet7" % "0.1.13"

Basic ETL:

In this example, we convert from a two-column input format:

Name Id
Bob A
Dave C

to a three-column output format:

Id Value Name
1 X bob
3 X dave

We also manipulate the data:

  • Change the names to lower case
  • Default "Value" to "X"
  • Map the characters "A" and "C" to the numerals "1" and "3".
"We can map data and specify default values by column" in {
  val twoColumns = Csv("""
                         |Name,ID
                         |BOB,A
                         |DAVE,C
                       """.stripMargin)

  val threeColumns = Csv("""
                           |ID,Value,Name
                           |1,X,bob
                           |3,X,dave
                         """.stripMargin)

  def alphaToNum(alpha: String): String = alpha match {
    case "A" => "1"
    case _ => "3"
  }

  val result = twoColumns
    .columnStructure("ID", "Value", "Name")
    .withMappings(
      "Value" -> (_ => "X"),      // Default value (for the empty column we added)
      "ID" -> alphaToNum,         // Mapping function
      "Name" -> (_.toLowerCase)   // Mapping function
    )

  result.rows.toList must equal(threeColumns.rows.toList)
}

All CSV Features

  "All available CSV-manipulation features" in {
    import planet7.tabular._

    val csv = Csv(new File(inputPath))
      .assertAndAbort(                      // Fail-fast validations
        Validations.rowNotTruncated)        // Let's check early, before we select columns
      .columnStructure(
        "Company",                          // Keep any column we name; remove the rest
        "Company account" -> "Company ID",  // Rename "Company account" to "Company ID"
        "First name",
        "Surname",
        "Postcode" -> "Zip code")
      .withMappings(
        "Zip code" -> postcodeLookupTable,  // Specify a (String) => String to change data
        "Surname" -> (_.toUpperCase))
      .assertAndReport(                     // Reported validations are appended to the row
        "Zip code" -> validZipCode _)       // Report any invalid zip codes
      .columnStructure(ignore("Zip code"))  // Drop column, now we've validated against it

    write(sort(csv), outputPath)            // Sort the output and write to disk

    Diff(Csv(new File(outputPath)), Csv(new File(modelAnswerPath)), NaiveRowDiffer) mustBe empty
  } // ```

### More Examples

These examples are taken from [CsvSpec.scala](https://github.com/agmenc/planet7/blob/master/src/test/scala/planet7/tabular/CsvSpec.scala). They are not exhaustive, so see [CsvSpec.scala](https://github.com/agmenc/planet7/blob/master/src/test/scala/planet7/tabular/csv/CsvSpec.scala) for a full range of working examples.


**Extract and remodel a CSV:**

```scala
"Extract a CSV, remodel it, and convert the data" in {
  import planet7.tabular.CompanyAccountsData._

  // CSV file with header: First name,Surname,Company,Company account,Postcode,Pet names
  val someFile = asFile("before.csv")

  // Retain only three of the original columns, in a different order, renaming
  // "Postcode" to "Zip code", and adding "Fee owing"
  val reshapedCsv = Csv(someFile)
    .columnStructure("Surname", "First name", "Postcode" -> "Zip code", "Fee owing")
    .withMappings(
      "Zip code" -> postcodeLookupTable,  // Map the old postcodes to zip codes, using a Map
      "Surname" -> (_.toUpperCase),       // Make all surnames upper case
      "Fee owing" -> (_ => "0.00")        // Add a default value for "Fee owing" of 0.00
    )

  // Now convert the data to your data model, or export to a feed, or reconcile against another source, etc.
  // reshapedCsv.rows map ( ... )
}

Export a CSV:

Materialises the data. Use this in tests or for small files, but consider iterating through Csv.rows and writing to a BufferedWriter or similar, to avoid sucking all your data into the heap at once.

val allYourData: String = export(reshapedCsv)

Use an external CSV parser:

Pimp external libraries into TabularDataSources:

implicit def fromCsvReader(reader: CSVReader): TabularDataSource = new TabularDataSource {
  override val header = reader.readNext() match {
    case Some(items) => Row(items.toArray)
    case None => throw new NoDataInSourceException(reader.toString)
  }

  override def rows = reader.iterator.map(items => Row(items.toArray))

  override def close() = reader.close()
}

Now work with Csv in the usual way:

"We can use external parsers such as (the incredibly slow) CSVReader" in {
  import LargeDataSet._

  val csv = Csv(CSVReader.open(asFile(largeDataFile)))

  csv.header must equal(expectedHeader)
  csv.rows.next() must be (expectedFirstRow)
}

Diff/reconciliation:

Convert two CSVs to a canonical format and Diff them, aggregating and formatting the output for easy reconciliation:

"We can Diff Csv instances and generate readable output" in {
  import planet7.Diff
  import planet7.tabular.CompanyAccountsData._

  val before = Csv(asFile("before.csv"))
    .columnStructure("First name", "Surname", "Company", "Company account" -> "Company ID", "Postcode")
    .withMappings(
      "Postcode" -> postcodeLookupTable,
      "Company" -> (_.toUpperCase)
    )

  val after = Csv(asFile("after_with_diffs.csv"))
    .columnStructure("First name", "Surname", "Company", "Company ID", "Postcode")

  val diffs: Seq[(Row, Row)] = Diff(before, after, RowDiffer(before.header, "Company ID"))

  // The resulting diffs are yours to play with. Let's group them: missing rows, added rows, or just plain different rows.
  val summary = diffs.groupBy {
    case (row, EmptyRow) => "Missing"
    case (EmptyRow, row) => "Added"
    case (row1, row2) => "Diffs"
  }

  // We can Diff rows which have changed. We zip the header information with each row, so that we know the names of the fields which changed.
  val fieldDifferences = summary("Diffs") map {
    case (leftRow, rightRow) => NonSortingDiff(before.header.data zip leftRow.data, after.header.data zip rightRow.data, FieldDiffer)
  }

  // Let's print the name of the field which changed, and the before and after values
  val readableDiffs = fieldDifferences map (FieldDiffer.prettyPrint(_).mkString(", "))
  printSummary(summary, readableDiffs)
  assert(readableDiffs === List(
    "Postcode: 43205 -> 432666, Company: ENIM SIT AMET INCORPORATED -> ENIM SIT AMET LIMITED",
    "Postcode: 22656 -> 22756"
  ))
}

Laziness:

All structure and filtering operations are lazy. Aside from the header row, the datasource is only read when the Csv is exported, or you iterate through the Csv.rows iterator.

Timers:

External parsers, large datasets or naive coding can slow down CSV transformations. Don't optmise prematurely; instead, time your operations and fix only those that need fixing:

"Users of the planet7 library can gauge the performance impact of external parsers such as CsvReader" in {
  import planet7.timing._

  val timer = new Timer(2)
  import timer._

  for (i <- 1 to 5) {
    t"overallTime" {
      val csvReader = t"shouldBeQuick" { CSVReader.open(asFile("before.csv")) }
      val csv = t"shouldAlsoBeQuick" { Csv(csvReader) }
      t"veryExpensive" { export(csv) }
    }
  }

  println(timer)
  timer.overallTime.average must be < 150.0
}

More examples:

See CsvSpec.scala for a full range of working examples