Recipe: CSV Parsing with node.js csv-parse using Readable => Stream

node.js has a package called csv-parse whwich is alot like univocity in that it is a mature package. If you need to use csv-parse, you may wonder how to turn csv-parse, which is a nodejs stream.Readable into a fs2 Stream.

First we need to create our csv-parse parser using io.nodejs.npm.csvparse:

...
import io.scalajs.npm.csvparse._
...
      val parser = CsvParse(new ParserOptions(
        columns = true,
        skip_empty_lines = true,
        trim = true
      ))

A csv-parse object is a Readable. The real question then is how to change a nodejs Readable into a fs2 Stream.

streams in nodejs are quite complex and the multiple iterations over many nodejs release means that the actual output of a nodejs stream can be difficult to figure out. In the case of csv-parse, the Readable outputs js.Objects. But let's put together a general Readable => fs2.Stream converter using the Readable data event. You will need to carefully read the nodejs stream documentation to understand it the different possible outputs a Readable could produce.

To create a fs2.Stream we need to use a queue. We include scalajs version below. Scalajs has a wrapJavascriptException method to help improve error reporting.

/** 
    * Turn a Readable parser into a Stream of js.Object using
    * the callback `onData`. While `A` could be a Buffer or String,
    * it could also be a js.Object. `A` must reflect the callers
    * understanding of what objects the Readable will produce.
    */
  def readableToStream[F[_], A](readable: Readable, qsize: Int = 1000)(implicit s: Strategy, F: Async[F]): Stream[D, A] = {
      import scala.scalajs.runtime.wrapJavaScriptException
      for {
        q <- Stream.eval(fs2.async.boundedQueue[F, Option[Either[Throwable, A]]](qsize))
        _ <- Stream.delay {
          readable.onData{(data: A) =>
            F.unsafeRunAsync(q.enqueue1(Some(Right(data))))(_ => ())
          }
          readable.onError{(e: Error) =>
              F.unsafeRunAsync(q.enqueue1(Some(Left(wrapJavascriptException(e)))))(_ => ())
          }
          readable.onEnd{ () =>
             F.unsafeRunAsync(q.enqueue1(None))(_ => ())
          }
          Stream.emit(())}
        // unNoneTerminate catches a None to terminate stream, pipe.rethrow 
        // throws a Left if encountered and strips the Right.
        record <- q.dequeue.unNoneTerminate through pipe.rethrow
      } yield record
  }

No data will flow until the fs2.Stream is run e.g. run.unsafeAsyncRun but we also need to make sure the Readable is connected to a source. A csv-parse object is also a Writable, so we can pipe data from a Readable into it:

    ...
    val f = Fs.createReadStream(csvfilename)
    ...
    val records = readableToStream[Task, js.Object](parser)
    ...
    val runme = Task.delay(f.pipe(parser)).flatMap{ _ => 
       records.
         zipWithIndex.
         map(...).
         ...more processing here...
         run // .run turns a Stream into a task
    }

    runme // returns a Task that should be run using one of the unsafe... methods.

Running the returned task will first start the file piping process to the parser, then sequenced using flatMap, assemble a fs2.Stream that processes the CSV records. You could also use Stream.bracket to start the piping and then convert that stream to a Task. We did not put the f.pipe inside readableToStream since we want to control when data starts flowing to our queue.

If you are using io.scalajs.nodejs then you should definitely use Stream.bracket so you can provide termination information from the Stream to the Readable via signalling that occurs in the Stream.

val create = Task.delay { 

  (aReadableStream, 
}

Stream.bracket(create) { case (

Last updated