Simple Streams

import fs2._
import fs2.util._
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
implicit val strategy = Strategy.fromExecutionContext(global)
implicit val scheduler = Scheduler.fromFixedDaemonPool(4)
implicit val F = Async[Task]

Stream fragments are created using smart constructors.

For example if you process the contents of a file in batch, the initial fragment is often a Stream object from the fs2 io module. Combinators and stream creators in the io module essentially create a Stream[Task,String] object because an IO operation needs to happen with a context, in this case, the Task context. When we initial create streams, that are neither runnable nor run automatically.

Creating Simple Streams

The fragment below is not run automatically. It needs to be converted to a runnable stream then run explicitly. Notice that it produces a Stream which signals to us that it really acting as a source.

     | Stream.emit(0) 
res1: fs2.Stream[Nothing,Int] = Segment(Emit(Chunk(0)))

scala> Stream(0) 
res2: fs2.Stream[Nothing,Int] = Segment(Emit(Chunk(0)))

We knew to use emit or just apply because we looked at the Stream object documentation in the Stream companion object. The Stream object documentation holds many smart constructors. By default, no output was created. A description of a stream was created. The Emit inside the result is an instruction that instructs the stream, once it is run, to emit a value. The value is wrapped in a Chunk as Chunks are used internally to manage collections of items more efficiently. You could also construct the stream using a chunk directly:

scala> Stream.chunk(Chunk.singleton(0)) 
res3: fs2.Stream[Nothing,Int] = Segment(Emit(Chunk(0)))

which is of course more verbose.

The following fragment skips using the emit method and emits 3 values instead of just one:

scala> Stream(0,1,2) 
res4: fs2.Stream[Nothing,Int] = Segment(Emit(Chunk(0, 1, 2)))

Each stream object created is immutable. You can build up streams using this base object and without changing the underlying base object. So:

scala> val x = Stream(0,1,2) 
x: fs2.Stream[Nothing,Int] = Segment(Emit(Chunk(0, 1, 2)))

scala> x.map(_+1) 
res5: fs2.Stream[Nothing,Int] = Segment(Emit(Chunk(0, 1, 2))).mapChunks(<function1>)

scala> x 
res6: fs2.Stream[Nothing,Int] = Segment(Emit(Chunk(0, 1, 2)))

This shows that you can create stream fragments, which are immutable values, and use them to create other, new streams by linking them together using scala functional idioms such as map.

This is not true of all stream fragments. For example, if the stream fragment contains a side effect you could get different results when the side effect runs. Streams created using "strict" values such as integers 0 and 1 really create "pure" streams. Streams are pure if they have no side effects. We could make the "pure" concept explicit in stream construction and instead of showing "Nothing" for the effect, convert it to "Pure". We also need to convert streams to "Pure" effects at times to satisfy the scala compiler:

scala> Stream(0,1,2).covary
res7: fs2.Stream[Nothing,Int] = Segment(Emit(Chunk(0, 1, 2)))

You can see that instead of Stream[Nothing, Int] we now have Stream[Pure, Int].

In the examples below, we will run these fragments immediately but later we will connect stream fragments together using pipes and factor in effects, which is harder.

Running Streams to Get Results

We need can create a runnable Stream then run it. For Pure streams, this is pretty straight forward and matches what scala already provides in its collection/stream objects. Creating a runnable stream when effects are involved is more complex but should be less complex and give you more control (i.e. make things very explicit) than using the basic building blocks contained in scala. For example, composing a program using scala Future's and thread pools to perform the same operations and remain composable may be more difficult than using fs2 Streams. fs2 may be overkill for some simple problems, but may be critical for more complex processing scenarios.

In the example below, the output is a single number. runLog converts a Stream to the F context, in this case a Task, which is then run to obtain the result. runLog indicates that the outputs of the stream should be "logged" to a collection. The default collection for runLog is a standard scala Vector. Consider the fragment below:

fs2> Stream(0).run 
def run(implicit F: fs2.util.Catchable[F]): F[Unit]
runFold      runFoldFree  runFree      runLast      runLog       runLogFree
fs2> Stream(0).run

If we tab complete on .run we see a few different run methods. Since the stream is pure, runLog actually runs the stream as no other side effect needs to be considered (fs2 selects its own side effect to run a pure stream):

scala> Stream(0).runLog.unsafeRun
res8: Vector[Int] = Vector(0)

We now see that the stream output since we took the final effect, a Task, and used the Task API to run the Task, which in this case was .unsafeRun. .runLog automatically created a Vector output in the sense that "log" logged the output to an in-memory structure, a Vector. Since the stream was pure we could have also just called toVector on the stream fragment, no run command would have been needed.

scala> Stream(0).toVector
res9: Vector[Int] = Vector(0)

A pure stream does not have an effect so there is no Task to output from .run and hence no need to run .unsafeRun.

fs2 also focuses on resource safety. Its internal interpreter (using a free monad) catches exceptions and maps them into various outputs. It does this because in order to guarantee resource safety, fs2 must catch exceptions. You or fs2 may choose to rethrow the exceptions but that's your choice. Exceptions are actually mapped to a Fail "instruction" in the interpreter.

Let's consider a stream fragment where we only use run:

scala> Stream(0,1,2).run
res10: fs2.Task[Unit] = Task

Unlike with runLog we would not have any output. We can view runLog as adding an effect of writing to a scala vector as a convenience for us.

Internally, when a stream is run through Pure or a Task, fs2 adds a "fold" command to our stream that folds over the stream value and collects them into a data structure. The initial value of .runLog fold is an empty vector and the operation during the fold is the "append to vector."

We could use our own collection. Underneath runLog is runLogFree.run which calls a fold: runFoldFree(Vector.empty)(_ :+ _). So let's say we want to output our stream output items to a set:

scala> import scala.collection.immutable.HashSet
import scala.collection.immutable.HashSet

scala> Stream(0,1,1).runFoldFree(HashSet[Int]())((s,i) => s + i).run 
res11: fs2.Task[scala.collection.immutable.HashSet[Int]] = Task

Note that we still called run to run it as runFoldFree just says that when the stream is run, to run a fold on it. There are others way to accomplish writing to different outputs but the idea is that essentially we transformed our stream from a stream of Ints to a stream with "item" type "Set" using a fold. The run* methods implies its the end of the world and to "build" a runnable stream" (and potentially run it).

We could just use a standard fold in our stream but then we have to run it and collect the output using runLog:

scala> Stream(0,1,1).fold(HashSet[Int]())((s,i) => s + i).runLog 
res12: fs2.Task[Vector[scala.collection.immutable.HashSet[Int]]] = Task

We needed the runLog because unlike before, the runFoldFree knew to do the run as well whereas just a plain fold did not.

Simple Stream Composition

Now we can chain together several individual objects that represent a "step" and have them work in sequence. We need to append (or add) individual emits together. In sstream, this is can be done by the operators ++:

scala> (Stream(0) ++ Stream(1)).runLog.unsafeRun
res13: Vector[Int] = Vector(0, 1)

scala> (Stream(0) append Stream(1)).runLog.unsafeRun
res14: Vector[Int] = Vector(0, 1)

which is the same as

scala> Stream(0,1).runLog.unsafeRun
res15: Vector[Int] = Vector(0, 1)

There are also some smart constructors for creating ranges.

scala> Stream.range(1, 10).runLog .unsafeRun
res16: Vector[Int] = Vector(1, 2, 3, 4, 5, 6, 7, 8, 9)

scala> Stream.ranges(1,10,2).runLog.unsafeRun
res17: Vector[(Int, Int)] = Vector((1,3), (3,5), (5,7), (7,9), (9,10))

This also shows that the output can be any object, not just Strings or Ints, but in the last case, a Tuple.

The different ways to run a Task are covered in a later section. There are many .unsafeRun-like methods that you can use to return Either to manage exceptions as values or to run the Task asynchronously in the background and attach a callback so that you can process the result when it completes.

Important thoughts on composing streams.

The above compositions concatenated streams together so that the outputs from the first stream came first then the outputs from the second stream came second when the final stream was run. This is one way to "append" streams.

The other more useful way in advanced streams is to use map/flatMap. In this context, these monadic operators are used for sequencing. The key thing to recognize is that the stream that is being mapped/flatMapped runs first then the elements/stream elements from the inner expression come second. It's not the same thing as concatenating streams together but you can think of mapping/flatMapping as "feeding" the inner map/flatMap expression with the Stream results as the stream results are generated, assuming the first Stream emits anything at all.

The Stream below emits a curious set of values but makes a point:

scala> Stream(0,1,3).flatMap{x => println(s"prev stream output: $x"); Stream(4)}.runLog.unsafeRun
prev stream output: 1
prev stream output: 3
prev stream output: 0
res18: Vector[Int] = Vector(4, 4, 4)

Each upstream value that is emitted makes the expression in the flatMap run as if the upstream Stream was an outer loop in a for-loop. Then, because this is a flatMap and we output a Stream(4) each time, the downstream Stream becomes a Stream that emits a 4 for each upstream value. runLog sees 3 4s.

Since Streams are monads, we use one-element Streams to initialize information "within the stream":

scala> Stream.eval(Task.delay(10)).flatMap(intResource => Stream(intResource+10)).runLog.unsafeRun
res19: Vector[Int] = Vector(20)

Here we assumed that Task.delay(10) represents a resource that we had to run asynchronously to obtain, then we fed that resource, as a one element Stream result, into the flatMap to be used to create another Stream that uses that resource. Since the result of the flatMap becomes the "stream", we can consume the resource without having to print or output it by passing it along. It is effectively consumed to create the real stream we cared about Stream(intResource+10). This resource could also be fed into a downstream stream to be used as an initializer for Stream element generation. In the above example, we actually pass it along after adding 10 to it, so the "log" sees 20.

A Hint of Effectful Streams

All the streams up to know were pure in the sense the streams did factor in effects. However, internally, fs2 may convert your pure streams to effectful streams. It does this for some of the functions on Stream that may appear to be pure.

For example, you instead of using runLog you could use toVector:

scala> Stream.range(1, 10).toVector 
res20: Vector[Int] = Vector(1, 2, 3, 4, 5, 6, 7, 8, 9)

Internally, the stream is converted to a Stream with a Task effect. The Task effect allows us to choose whether we just want the raw value out of the Task or have it throw an exception if there is an error.

When we use an effect, we use the effect's error handling. A fs2 Task has an unsafeRunSync/unsafeRunAsync method that returns a "Right" value directly or throws an exception if it encounters a Left. So out of fs2, we can get an Either, the Task interprets this Either in unsafeRunSync and either provides the value or throws an exception. It looks something like this:

(stream produces an Either output) => 
   Task receives that output =>
      Task.unsafeRunSync => 
        throw an exception (again!) if its a Left
        return the value if it was a Right

In this case the Right was a Vector.

Your choice of effect decides what function needs to be called to run a runnable stream. fs2 provides the Task effect which is fairly flexible and useful not only in fs2 but also standalone. You could use your own effect, for example, an Option, List or Either as an effect although for asynchronous work, that may not make alot of sense.

Last updated