Effectful Streams

In the last section, we saw that fs2 may use an effect to process a pure stream e.g. running it.

However, effects can intentionally be used anywhere in your stream. You can convert between different effects depending on your processing needs.

We can evaluate an arbitrary effect in a context and use that as a source:

fs2> var x = 2 
x: Int = 2
fs2> val p1 = Stream.eval(Task.delay(x + 2)) 
p1: Stream[Task, Int] = attemptEval(Task).flatMap(<function1>)
fs2> val p2 = Stream.eval(Task.delay(x + 3)) 
p2: Stream[Task, Int] = attemptEval(Task).flatMap(<function1>)
fs2> p1.runLog.unsafeRun 
res58: Vector[Int] = Vector(6)
fs2> p2.runLog.unsafeRun 
res59: Vector[Int] = Vector(7)

The stream became effectful because we asked it to eval something and that something was a Task. The item output by the stream is an integer, but the effect is a Task.

The above just shows that inside a Task, we can return a value. Now let's have that Task process a side effect:

fs2> val p3 = Stream.eval(Task.delay(x = 10)) 
p3: Stream[Task, Unit] = attemptEval(Task).flatMap(<function1>)
fs2> val p4 = Stream.eval(Task.delay(x = x + 10)) 
p4: Stream[Task, Unit] = attemptEval(Task).flatMap(<function1>)
fs2> p3.runLog.unsafeRun 
res62: Vector[Unit] = Vector(())
fs2> p4.runLog.unsafeRun 
res63: Vector[Unit] = Vector(())
fs2> x 
res64: Int = 20

We see that runLog returns Unit, which is to be expected since the Task performs an assignment, a side effect, on x. Running both changes the environment, in this case, setting x to 20 eventually.

This also shows that Task.delay is not evaluated until the process is run. Task.delay uses scala by name parameters. As a slight detour about using Task, you could use Task.now to force evaluation at the point that the Task.now is called.

It may not seem very useful to have a stream with only one value emitted, but you can imagine that you might repeat the side effect call each time, perhaps from an IO source, and return a different value.

Synchronous Effects

The example above using Task ran the Task sychronously. We can see that by doing the following:

fs2> Stream.eval(Task.delay { println(s"${Thread.currentThread.getName}"); System.currentTimeMillis }).repeat.take(5).runLog.unsafeRun 
main
main
main
main
main
res67: Vector[Long] = Vector(1475333678504L, 1475333678513L, 1475333678514L, 1475333678515L, 1475333678516L)

and we can even be explicit:

fs2> Stream.eval(Task.delay { println(s"${Thread.currentThread.getName}"); System.currentTimeMillis }).repeat.take(5).runLog.unsafeRunSync 
main
main
main
main
main
res71: Either[Attempt[Vector[Long]] => Unit => Unit, Vector[Long]] = Right(Vector(1475333765381L, 1475333765382L, 1475333765382L, 1475333765383L, 1475333765383L))

The take(5) takes a stream then creates a new stream with the take applied to it. There are many operators that operate directly on a stream including map, flatmap, etc. You need to look at the Stream's class documentation to see them all. We could have also used repeatEval to create a stream of values instead of using eval and repeat separately.

Asynchronous Effects

Running tasks on the main thread is not great if the computation takes awhile. However, we must thoughtful when we think about running on different threads because where we indicate want to run asynchronously makes a difference.

We can for example, run the final effectful Task that is created when calling runLog on a stream asynchronously:

fs2> Stream.repeatEval(Task.delay { println(s"${Thread.currentThread.getName}"); System.currentTimeMillis }).take(5).runLog 
res76: Task[Vector[Long]] = Task

There's a Task as the final output, now we need to run a Task. We can run a task many ways but Task's API may be confusing a bit.

fs2> Stream.repeatEval(Task.delay { println(s"${Thread.currentThread.getName}"); System.currentTimeMillis }).take(5).runLog.unsafeRunAsyncFuture 
main
main
main
main
main
res79: scala.concurrent.Future[Vector[Long]] = Success(Vector(1475334408289, 1475334408289, 1475334408290, 1475334408290, 1475334408290))

We see that this returned a future, but we see that the internal tasks still ran on the main thread. unsafeRunAyncFuture returns the result in a Future, but the internal tasks never "requested" to be run asynchronously so they were forced back onto the main thread.

We could drop the Future part and just call unsafeRunAsync but then we need to supply a callback that handles both successful values as well as exceptions returned as values. In fs2, this is the Attempt type which is really a Either[Throwable, A]. unsafeRunAsync also does not do what we think.

So let's try another method on Task which is async. async requires a Strategy object (which is in our standard imports for amm):

fs2> strategy 
res85: Strategy = Strategy

We might expect async to immediately start running, but async actually just returns a Task that when run, starts the asynchronous computation:

fs2> val asyncTask  = Stream.repeatEval(Task.delay { println(s"${Thread.currentThread.getName}"); System.currentTimeMillis }).take(5).runLog.async 
asyncTask: Task[Vector[Long]] = Task
fs2>

Since we do not see any output, we know that the inner Tasks did not run. Now we can run it:

fs2> asyncTask.unsafeRun 
ForkJoinPool-1-worker-5
ForkJoinPool-1-worker-5
ForkJoinPool-1-worker-5
ForkJoinPool-1-worker-5
ForkJoinPool-1-worker-5
res84: Vector[Long] = Vector(1475335219673L, 1475335219674L, 1475335219674L, 1475335219674L, 1475335219674L)

Which shows that all the inner Tasks did run asynchronously on other thread, but it did not do a whole lot regarding running those individual on different threads. In other words, the stream itself was run on a different thread but the inner tasks of our stream were still running synchronously on that thread.

That's because when we used Task.delay to define that stream, we did not tell each inner Task to run asynchronously using our Strategy object. We can do this by using apply instead of delay.

If we make the change and just use unsafeRun on our main stream we get:

fs2> val asyncTask  = Stream.repeatEval(Task { println(s"${Thread.currentThread.getName}"); System.currentTimeMillis }).take(5).runLog 
asyncTask: Task[Vector[Long]] = Task

fs2> asyncTask.unsafeRun 
ForkJoinPool-1-worker-3
ForkJoinPool-1-worker-3
ForkJoinPool-1-worker-1
ForkJoinPool-1-worker-1
ForkJoinPool-1-worker-7
res90: Vector[Long] = Vector(1475335626584L, 1475335626585L, 1475335626585L, 1475335626585L, 1475335626586L)

You may need to run unsafeRun multiple times because you see multiple threads being used since which thread is used in the pool is non-deterministic.

The inner tasks did not do much, the computation was mostly self-constained. We could add some internal state used to calculate the task output:

fs2> def supplyRandom(maxValue: Int): Stream[Task, Int] = {
       var r =new java.util.Random
     Stream.repeatEval(Task{println(s"${Thread.currentThread.getName}"); r.nextInt(maxValue)})
     } 
defined function supplyRandom
fs2> supplyRandom(100).take(10).runLog.unsafeRun 
ForkJoinPool-1-worker-1
ForkJoinPool-1-worker-1
ForkJoinPool-1-worker-7
ForkJoinPool-1-worker-7
ForkJoinPool-1-worker-7
ForkJoinPool-1-worker-7
ForkJoinPool-1-worker-7
ForkJoinPool-1-worker-5
ForkJoinPool-1-worker-5
ForkJoinPool-1-worker-5
res99: Vector[Int] = Vector(4, 87, 26, 75, 31, 52, 38, 18, 25, 90)

This shows us that we can do asynchronous work with state. You'll also notice that this is still basically a "pull" model where the stream interpreter drives the "pull" of values from the source.

Side-Show: What are all those unsafe* Task Methods?

We see alot of "unsafe*" methods on Task. What are those for? If we tab complete a Task, we get:

fs2> Task.now(42). 
!=                     ensure                 handleWith             race                   unsafeAttemptRunFor    unsafeRunAsyncFuture   |>
==                     equals                 hashCode               schedule               unsafeAttemptRunSync   unsafeRunFor
asInstanceOf           flatMap                isInstanceOf           self                   unsafeAttemptValue     unsafeRunSync
async                  getClass               map                    toString               unsafeRun              unsafeTimed
attempt                handle                 or                     unsafeAttemptRun       unsafeRunAsync         unsafeValue

That's alot.

Here's the way to think about them. The Task wants to promote good behavior on your part. When it runs something, the thing inside that is running could be very effectful, e.g. reading from the disk. This may lead to exceptions or other types of errors. The different varieties of unsafe* methods allows you to choose how to run the Task and manage the potential errors. Recall that Attempt is really just an alias type for Either[Throwable, A] so clearly any method with the word "Attempt" in it is related to this concept of treating errors as values.

We have already seen that

  • async: Runs the overall Task on a separate thread but this may not translate into running the other Tasks in the stream on different threads.

  • unsafeRun: Runs on the current thread and returns the current value. Exceptions are thrown if an exception occurs and you have to wrap your call with a try-catch to catch the exception.

  • unsafeRunSync: Runs the task sychronously on the current thread and return an Either with either the value or a callback function that is called when any underlying asynchrounous computation is encountered.

  • unsafeRunAsyncFuture/unsafeRunAsync: Runs the Task on the calling thread but if an asynchronous computation is encountered, returns immediately while letting that computation complete on that thread. Either return the result in a Future or provide a callback to handle the returned value.

  • unsafeValue: Instead of an either, return an Option. None means that there some error, but you have no information on that error.

You can also schedule the Task to run after a delay ("unsafeTimed"/"schedule") or under a timer ("runFor") so that an exception is thrown if you wait too long.

You can handle errors in a Task by explicitly converting them to an Either (via "attempt") or you can use some error handler like:

  • handle: This is just like recover in a scala Future.

  • handleWith: This is just like a recoverWith in a scala Future.

So fine grained error handling is really through "attempt" and "handle" or even "flatMap".

This is all still pretty complex of course, but the intent is to make it less complex than rolling your own framework.

A Stream can be made up of lots of synchronous or asynchrounous parts. Streams may at one moment by sync then turn async for a computation then join back together to be sync. In general you may not know the what's in your stream exactly but you still want to get the "output" value at some point at the "end of the world" of your program.

Let's give a specific example of this behavior and use "unsafeRunSync".

s2> Task.now(42).unsafeRunSync 
res106: Either[Attempt[Int] => Unit => Unit, Int] = Right(42)

This tells us that when we explicitly want to run the task synchronously, there may still be async processing going on. In this case, I received a Right(42) because the process was basically a pure value being lifted to a Task, it finished immediately. (note to author: I'm not sure this shows the async part prior to unsafeRunSync).

What causes a return of a Left(callback)?

fs2> val x = Task.delay{ println("doit") }.flatMap(_ => Task{Thread.sleep(5000); println(s"${Thread.currentThread.getName}") }.async) 
x: Task[Unit] = Task
fs2> x.unsafeRunSync 
doit
res125: Either[Attempt[Unit] => Unit => Unit, Unit] = Left(<function1>)
fs2> ForkJoinPool-1-worker-7

Here we see that we create a Task whose "head" is really a synchronous Task but whose flatMap runs a Task asynchronously. It's hard to tell, but the worker thread name is printed after the prompt returns. In other words, "unsafeRunSync" starts off by seeing that it is sync but there is an underlying async so it returns a Left. If you had both sync tasks even in the flatMap you get:

fs2> val x = Task.delay{ println("doit") }.flatMap(_ => Task.delay{Thread.sleep(5000); println(s"${Thread.currentThread.getName}") }) 
x: Task[Unit] = Task
fs2> x.unsafeRunSync 
doit
main
res129: Either[Attempt[Unit] => Unit => Unit, Unit] = Right(())
fs2>

Here we still encounter the sleep, its all run on the main thread (hence the thread name main) and the main thread name printed synch right after doit. So it all ran synchronously. We also see that we got a Right back indicating that Task "knew" it could return a Right.

So unsafeRunSync is about async boundaries in the map/flatMap of the Task NOT async boundaries inside a stream. unsafeRunSync helps you manage your final "end of world" computation when you are modifying it after you create the Task itself.

Last updated