fingo / spata

Scala parser for CSV

GitHub

spata

Build Status Code Coverage Maven Central Scala Doc Gitter

spata is a functional Scala parser for tabular data (CSV). The library is backed by FS2 - Functional Streams for Scala.

The main goal of the library is to provide handy, functional, stream-based API with easy conversion of records to case classes and precise information about possible flaws and their location in source data while maintaining good performance. Providing the location of the cause of a parsing error has been the main motivation to develop the library. It is typically not that hard to parse a well-formatted CSV file, but it could be a nightmare to locate the source of a problem in case of any distortions in a large data file.

The source data format is assumed to basically conform to RFC 4180, but allows some variations - see CSVConfig for details.

Getting started

spata is available for Scala 2.13 and requires at least Java 11.

To use spata you have to add this single dependency to your build.sbt:

libraryDependencies += "info.fingo" %% "spata" % "<version>"

The latest version may be found on the badge above.

Link to the current API version is available through the badge as well.

Basic usage

The whole parsing process in a simple case may look like this:

import scala.io.Source
import cats.effect.IO
import fs2.Stream
import info.fingo.spata.CSVParser
import info.fingo.spata.io.reader

case class Data(item: String, value: Double)
val parser = CSVParser[IO]() // parser with default configuration
val records = Stream
  // get stream of CSV records while ensuring source cleanup
  .bracket(IO { Source.fromFile("input.csv") })(source => IO { source.close() })
  .through(reader[IO]().by) // produce stream of chars from source
  .through(parser.parse)  // parse CSV file and get CSV records 
  .filter(_.get[Double]("value").exists(_ > 1000))  // do some operations using Stream API
  .map(_.to[Data]()) // convert records to case class
  .handleErrorWith(ex => Stream.eval(IO(Left(ex)))) // convert global (I/O, CSV structure) errors to Either
val result = records.compile.toList.unsafeRunSync // run everything while converting result to list

Another example may be taken from FS2 readme, assuming that the data is stored in CSV format with two fields, date and temp:

import java.nio.file.Paths
import scala.io.Codec
import cats.effect.{Blocker, ExitCode, IO, IOApp}
import cats.implicits._
import fs2.Stream
import fs2.io
import fs2.text
import info.fingo.spata.CSVParser
import info.fingo.spata.io.reader

object Converter extends IOApp {

  val converter: Stream[IO, Unit] = Stream.resource(Blocker[IO]).flatMap {
    blocker =>
      implicit val codec: Codec = Codec.UTF8
      val parser: CSVParser[IO] = CSVParser.config.get[IO]()
      def fahrenheitToCelsius(f: Double): Double =
        (f - 32.0) * (5.0 / 9.0)

      reader
        .shifting[IO](blocker)
        .read(Paths.get("testdata/fahrenheit.txt"))
        .through(parser.parse)
        .filter(r => r("temp").exists(!_.isBlank)
        .map { r =>
          val date = r.unsafe("date")
          val temp = fahrenheitToCelsius(r.unsafe.get[Double]("temp"))
          s"$date,$temp"
        }
        .intersperse("\n")
        .through(text.utf8Encode)
        .through(io.file.writeAll(Paths.get("testdata/celsius.txt"), blocker))
  }

  def run(args: List[String]): IO[ExitCode] =
    converter.compile.drain.as(ExitCode.Success)
}

(This example uses exception throwing methods for brevity and to keep it closer to original snippet. A modified version with safe access to record data may be found in error handling part of tutorial.)

More examples of how to use the library may be found in src/test/scala/info/fingo/spata/sample.

Tutorial

Parsing

Core spata operation is a transformation from a stream of characters into a stream of Records. This is available through CSVParser.parse function (supplying FS2 Pipe) and is probably the best way to include CSV parsing into any FS2 stream processing pipeline:

val input: Stream[IO, Char] = ???
val parser: CSVParser[IO] = CSVParser[IO]()
val output: Stream[IO, Record] = input.through(parser.parse)

In accordance with FS2, spata is polymorphic in the effect type and may be used with different effect implementations (Cats IO, Monix Task or ZIO ZIO). Please note however, that Cats Effect IO is the only effect implementation used for testing and documentation purposes. Type class dependencies are defined in terms of Cats Effect class hierarchy.

Like in case of any other FS2 processing, spata consumes only as much of the source stream as required, give or take a chunk size.

Field and record delimiters are required to be single characters. There are however no other assumptions about them - particularly the record delimiter does not have to be a line break and spata does not assume line break presence in the source data - it does not read the data by lines.

If newline (LF, \n, 0x0A) is however used as the record delimiter, carriage return character (CR, \r, 0x0D) is automatically skipped if not escaped, to support CRLF line breaks.

Fields containing delimiters (field or record) or quotes have to be wrapped in quotation marks. As defined in RFC 4180, quotation marks in content have to be escaped through double quotation marks.

In addition to parse, CSVParser provides other methods to read CSV data:

  • get to load data into List[Record], which may be handy for small data sets,
  • process to deal with data record by record through a callback function,
  • async to process data through a callback function in asynchronous way.

The three above functions return the result (List or Unit) wrapped in an effect and require calling one of the "at the end of the world" methods (unsafeRunSync or unsafeRunAsync for cats.effect.IO) to trigger computation.

val stream: Stream[IO, Char] = ???
val parser: CSVParser[IO] = CSVParser[IO]()
val list: List[Record] = parser.get(stream).unsafeRunSync()

Alternatively, instead of calling an unsafe function, whole processing may run through IOApp.

If we have to work with a stream of String (e.g. from FS2 text.utf8Decode) we may convert it to a character stream:

val ss: Stream[IO, String] = ???
val sc: Stream[IO, Char] = ss.map(s => Chunk.chars(s.toCharArray)).flatMap(Stream.chunk)

See Reading source data for helper methods to get stream of characters from various sources.

Configuration

CSVParser is configured through CSVConfig, which is a parameter to its constructor. A more convenient way may be a builder-like method, which takes the defaults provided by CSVParser object and allows altering selected parameters:

val parser = CSVReader.config.fieldDelimiter(';').noHeader().get[IO]()

Individual configuration parameters are described in CSVConfig's Scaladoc.

A specific setting is the header mapping, available through CSVConfig.mapHeader. It allows replacement of original header values with more convenient ones or even defining header if no one is present. The new values are then used in all operations referencing individual fields, including automatic conversion to case classes or tuples. Mapping may be defined only for a subset of fields, leaving the rest in their original form.

date,max temparature,min temparature
2020-02-02,13.7,-2.2
val stream: Stream[IO, Char] = ???
val parser: CSVParser[IO] =
  CSVParser.config.mapHeader(Map("max temparature" -> "tempMax", "min temparature" -> "tempMin")).get[IO]()
val frosty: Stream[IO, Char] = stream.through(parser.parse).filter(_.get[Double]("minTemp").exists(_ < 0))

It may also be defined for more fields than there are present in any particular data source, which allows using a single parser for multiple data sets with different headers.

There is also index-based header mapping available. It may be used not only to define / redefine header, but to remove duplicates as well:

date,temparature,temparature
2020-02-02,13.7,-2.2
val stream: Stream[IO, Char] = ???
val parser: CSVParser[IO] =
  CSVParser.config.mapHeader(Map(1 -> "tempMax", 2 -> "tempMin")).get[IO]()
val frosty: Stream[IO, Char] = stream.through(parser.parse).filter(_.get[Double]("minTemp").exists(_ < 0))

FS2 takes care of limiting the amount of processed data and consumed memory to the required level. This works well to restrict the number of records, however each record has to be fully loaded into memory, no matter how large it is. This is not a problem if everything goes well - individual records are typically not that large. A record can however grow uncontrollably in case of incorrect configuration (e.g. wrong record delimiter) or malformed structure (e.g. unclosed quotation). To prevent OutOfMemoryError in such situations, spata can be configured to limit the maximum size of a single field using fieldSizeLimit. If this limit is exceeded during parsing, the processing stops with an error. By default, no limit is specified.

Reading source data

As mentioned earlier, CSVParser requires a stream of characters as its input. To simplify working with common data sources, like files or sockets, spata provides a few convenience methods, available through its io.reader object.

There are two groups of read methods in reader:

  • basic ones, accessible through reader.plain, where reading is done synchronously on the current thread,
  • with support for thread shifting, accessible through reader.shifting.

It is recommended to use the thread shifting version, especially for long reading operation, for better thread pools utilization. See a post from Daniel Spiewak about thread pools configuration. More information about threading may be found in Cats Concurrency Basics.

The simplest way to read a file is:

val stream: Stream[IO, Char] = reader.plain[IO]().read(Path.of("data.csv"))

or even:

val stream: Stream[IO, Char] = reader[IO]().read(Path.of("data.csv")) // reader.apply is an alias for reader.plain

The thread shifting reader provides a similar method, but requires implicit ContextShift:

implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
val stream: Stream[IO, Char] = reader.shifting[IO]().read(Path.of("data.csv"))

The ExecutionContext provided to ContextShift is used to switch the context back to the CPU-bound one, used for regular, non-blocking operation, after the blocking I/O operation finishes. The Blocker, which provides the thread pool for blocking I/O, may be passed to shifting or will be created internally.

All read operations load data in chunks for better performance. Chunk size may be supplied while creating a reader:

val stream: Stream[IO, Char] = reader.plain[IO](1024).read(Path.of("data.csv"))

If not provided explicitly, a default chunk size will be used.

Except for Source, which is already character-based, other data sources require an implicit Codec to convert bytes into characters:

implicit val codec: Codec = Codec.UTF8

The caller to a read function which takes a resource as parameter (Source or InputStream) is responsible for resource cleanup. This may be achieved through FS2 Stream.bracket:

val stream: Stream[IO, Char] = for {
   source <- Stream.bracket(IO { Source.fromFile("data.csv") })(source => IO { source.close() })
   char <- reader.shifting[IO]().read(source)
} yield char

Other methods of resource acquisition and releasing are described in Cats Effect tutorial.

In addition to the read function, reader provides a by function, to be used with Stream.through. The above example may be rewritten by using by into:

val stream: Stream[IO, Char] = Stream
  .bracket(IO { Source.fromFile("data.csv") })(source => IO { source.close() })
  .through(reader.shifting[IO]().by)

Getting actual data

Sole CSV parsing operation produces a stream of Records. Each record may be seen as a map from String to String, where the keys are shared among all records. The basic method to obtain individual values is through the apply function, by key (taken from header):

val record: Record = ???
val value: Option[String] = record("key")

or index:

val record: Record = ???
val value: Option[String] = record(0)

CSVRecord supports retrieval of typed values. In simple cases, when the value is serialized in its canonical form, which does not require any additional format information, like ISO format for dates, this may be done with single-parameter get function:

val record: Record = ???
val num: Decoded[Double] = record.get[Double]("123.45")

Decoded[A] is an alias for Either[ContentError, A]. This method requires a text.StringParser[A], which is described in the next chapter.

get has sn overloaded versions, which support formatting-aware parsing:

val record: Record = ???
val df = new DecimalFormat("#,###")
val num: Decoded[Double] = record.get[Double]("123,45", df)

This methods requires a text.FormattedStringParser[A, B], which is described in the next chapter. (It uses an intermediary class Field to provide a nice syntax, this should be however transparent in most cases).

Above methods are available also in unsafe, exception-throwing version, accessible through Record.unsafe object:

val record: Record = ???
val v1: String = record.unsafe("key")
val v2: String = record.unsafe(0)
val n1: Double = record.unsafe.get[Double]("123.45")
val df = new DecimalFormat("#,###")
val n2: Double = record.unsafe.get[Double]("123,45", df)

They may throw ContentError exception.

In addition to retrieval of single fields, Record may be converted to a case class or a tuple. Assuming CSV data in the following form:

element,symnol,melting,boiling
hydrogen,H,13.99,20.271
helium,He,0.95,4.222
lithium,Li,453.65,1603

The data can be converted from a record directly into a case class:

val record: Record = ???
case class Element(symbol: String, melting: Double, boiling: Double)
val element: Decoded[Element] = record.to[Element]()

Notice that not all source fields have to be used for conversion. The conversion is name-based - header strings have to match case class field names exactly, including case. We can use header mapping, described in Configuration, if they do not match.

For tuples, the header has to match tuple field names (_1, _2, etc.) and is automatically generated in this form for data without a header:

hydrogen,H,13.99,20.271
helium,He,0.95,4.222
lithium,Li,453.65,1603
val record: Record = ???
type Element = (String, String, Double, Double)
val element: Decoded[Element] = record.to[Element]()

Notice that in this case the first column has been included in the conversion to ensure header and tuple field matching.

Both forms of conversion require implicit StringParser. Parsers for common types and their default formats are provided through StringParser object and are automatically brought in scope. Because it is not possible to explicitly provide custom formatter while converting a record into a case class, an implicit StringParser has to be defined in case of specific formats or types:

element,symnol,melting,boiling
hydrogen,H,"13,99","20,271"
helium,He,"0,95","4,222"
lithium,Li,"453,65","1603"
val record: Record = ???
case class Element(symbol: String, melting: Double, boiling: Double)
val nf = NumberFormat.getInstance(new Locale("pl", "PL"))
implicit val nsp: StringParser[Double] = (str: String) => nf.parse(str).doubleValue()
val element: Decoded[Element] = record.to[Element]()

Text parsing

CSV data is parsed as Strings. We often need typed values, e.g. numbers or dates, for further processing. There is no standard, uniform interface available for Scala or Java to parse strings to different types. Numbers may be parsed using java.text.NumberFormat. Dates and times through parse methods in java.time.LocalDate or LocalTime, providing format as parameter. This is awkward when providing single interface for various types, like Record does. This is the place where spata's text.StringParser comes in handy.

StringParser object provides methods for parsing strings with default format:

val num: ParseResult[Double] = StringParser.parse[Double]("123.45")

where ParseResult[A] is just an alias for Either[ParseError, A].

When a specific format has to be provided, an overloaded version of above method is available:

val df = new DecimalFormat("#,###")
val num: ParseResult[Double] = StringParser.parse[Double]("123,45", df)

(They use intermediary classes Pattern to provide nice syntax, this should be however transparent in most cases).

These functions require implicit StringParser or FormattedStringParser respectively. Implicits for a few basic types are already available - see Scaladoc for StringParser. When additional parsers are required, they may be easily provided by implementing StringParser or FormattedStringParser traits.

Let's take java.sql.Date as an example. Having implemented StringParser[Date]:

import java.sql.Date
import info.fingo.spata.text.StringParser

implicit val sdf: StringParser[Date] = (s: String) => Date.valueOf(s)

We can use it as follows:

val date = StringParser.parse[Date]("2020-02-02")

Defining a parser with support for custom formatting requires the implementation of FormattedStringParser:

import java.sql.Date
import java.text.DateFormat
import info.fingo.spata.text.FormattedStringParser

implicit val sdf: FormattedStringParser[Date, DateFormat] =
  new FormattedStringParser[Date, DateFormat] {
      override def apply(str: String): Date = Date.valueOf(str.strip)
      override def apply(str: String, fmt: DateFormat): Date =  new Date(fmt.parse(str.strip).getTime)
  }

And can be used as follows:

import info.fingo.spata.text.StringParser
import java.util.Locale

val df = DateFormat.getDateInstance(DateFormat.SHORT, new Locale("pl", "PL"))
val date = StringParser.parse[Date]("02.02.2020", df)

Please note that this sample implementation accepts partial string parsing, e.g. "02.02.2020xyz" will successfully parse to 2020-02-02. This is different from the built-in parsing behaviour for LocalDate, where the entire string has to conform to the format.

Parsing implementations are expected to throw specific runtime exceptions when parsing fails. This is converted to ParseError in StringParser object's parse method, while keeping the original exception in cause field.

Although this design decision might be seen as questionable, as returning Either instead of throwing an exception could be the better choice, it is made deliberately - all available Java parsing methods throw an exception, so it is more convenient to use them directly while implementing StringParser traits, leaving all exception handling in a single place, i.e. the StringParser.parse method.

Error handling

There are three types of errors which may arise while parsing CSV:

  • Various I/O errors, including but not limited to IOException. They are not directly related to parsing logic but CSV is typically read from an external, unreliable source. They may be raised by reader operations.
  • Errors caused by malformed CSV structure, reported as StructureException. They may be caused by CSVParser's methods.
  • Errors caused by unexpected / incorrect data in record fields, reported as HeaderError or DataError. They may result from interactions with Record.

The two first error categories are unrecoverable and stop stream processing. For the StructureException errors we are able to precisely identify the place that caused the problem. See Scaladoc for CSVException for further information about error location.

The last category is reported on the record level and allows for different handling policies. Please notice however, that if the error is not handled locally (e.g. using safe functions returning Decoded) and propagates through the stream, further processing of input data is stopped, like for the above error categories.

Errors are raised and should be handled by using the FS2 error handling mechanism. FS2 captures exceptions thrown or reported explicitly with raiseError and in both cases is able to handle them with handleErrorWith. To fully support this, CSVParser requires the RaiseThrowable type class instance for its effect F.

The converter example presented in Basic usage may be enriched with explicit error handling:

import java.nio.file.Paths
import scala.io.Codec
import scala.util.Try
import cats.effect.{Blocker, ExitCode, IO, IOApp}
import fs2.Stream
import fs2.io
import fs2.text
import info.fingo.spata.CSVParser
import info.fingo.spata.io.reader

object Converter extends IOApp {

  val converter: Stream[IO, ExitCode] = Stream.resource(Blocker[IO]).flatMap {
    blocker =>
      def fahrenheitToCelsius(f: Double): Double =
        (f - 32.0) * (5.0 / 9.0)
      val parser: CSVParser[IO] = CSVParser[IO]()
      implicit val codec: Codec = Codec.UTF8
      val src = Paths.get("testdata/fahrenheit.txt")
      val dst = Paths.get("testdata/celsius.txt")

      reader
        .shifting[IO](blocker)
        .read(src)
        .through(parser.parse)
        .filter(r => r("temp").exists(!_.isBlank)
        .map { r =>
          for {
            date <- r.get[String]("date")
            fTemp <- r.get[Double]("temp")
            cTemp = fahrenheitToCelsius(fTemp)
          } yield s"$date,$cTemp"
        }
        .rethrow
        .intersperse("\n")
        .through(text.utf8Encode)
        .through(io.file.writeAll(dst, blocker))
        .fold(ExitCode.Success)((z, _) => z)
        .handleErrorWith(ex => {
          println(ex)
          Try(dst.toFile.delete())
          Stream.eval(IO(ExitCode.Error))
        })
  }

  def run(args: List[String]): IO[ExitCode] =
    converter.compile.lastOrError
}

The rethrow method in above code raises error for Left, converting Either to a simple values.

Sometimes we would like to convert a stream to a collection. We should wrap the result in Either in such situations to distinguish successful processing from erroneous one. See the first code snippet in Basic usage for sample.

Alternatives

For those who need a different characteristic of a CSV library, there are a few alternatives available for Scala:

  • Itto-CSV - CSV handling library based on FS2 and Cats with support for case class conversion.
  • fs2 data - collection of FS2 based parsers, including CSV.
  • kantan.csv - well documented CSV parser/serializer with support for different parsing engines.
  • scala-csv - easy to use CSV reader/writer.

Credits

spata makes use of the following tools, languages, frameworks, libraries and data sets (in alphabetical order):

/C means compile/runtime dependency, /T means test dependency, /S means source code derivative and /D means development tool. Only direct dependencies are presented in the above list.