Recipe: CSV Parsing with univocity

Univocity makes fast CSV parsers. If you need to marry them up to fs2 for stream processing in scala, you will need to a create "source" of univocity Records.

First we need to create our univocity parser and begin parsing:

    val settings = new CsvParserSettings()
    settings.setHeaderExtractionEnabled(true)
    val parser = new CsvParser(settings)
    parser.beginParsing(new FileReader(inputFile))

Next we need to connect up this source to fs2. Univocity is starting to implement java iterators which would make this easier, but we can create our own. Anytime we have an object that generates data, we need to think of using unfold, which means to unfold "something" into a stream of values:

def toStream[Record](parser: CsvParser) = 
  Stream.unfold(parser) { p => 
     Option(p.parseNextRecord()).map((_, p)) }

This works because unfold needs an Option with a tuple of a value to emit and the next "state". The initial state is the parser itself. Since Option(..) will return None when the argument is null and parseNextRecord() returns null when there are no more CSV records, we can create an Option with a Record in it, then map into it. If there is a record in the Option then we form a tuple (_, p) which is the value and parser. If parseNextRecord returns null, then Option(null) evaluates to None, None is returned and unfolding stops.

To define our stream, just use it as a source:

 val inputs = toStream(parser).covary[Task]
      .map { ...do something with a univocity Record... }
      .take(100)
      ...

If we wanted to be fancier, we could use Stream.bracket to ensure that parser.stopParsing is called.

That's it!

Last updated