Error Handling and Resource Management

The imports below user used for this section:

import fs2._
import fs2.util._
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
implicit val strategy = Strategy.fromExecutionContext(global)
implicit val scheduler = Scheduler.fromFixedDaemonPool(4)
implicit val F = Async[Task]

There are a few different ways to handle error and resource management.

Resource Management:

  • Stream.bracket: Guaranteed acquire and release resources.

  • Stream.onFinalize: Simple resource management.

Error Handling:

  • Stream.onError: Handle errors explicitly.

  • Stream.mask: Eats exceptions in your stream.

  • Stream.attempt/effect.attempt (your own effect such as Task/IO)

In some cases you can use either the Stream or your effect's (such as Task's) error handling. However, note that when an error is thrown and managed in the Stream, the Stream essentially terminates its normal processing and any other downstream elements are "lost." While you can essentially manage an error by replacing the current stream with a new steram, much like a flatMap, the original computation is essentially over. Because of this behavior, handling the error at the effect level is usually more desirable. Handling it at the effect level also means that your stream may contain elements that carry the errors as values, such as using Either, more often than just having a stream of only "values."

Resource Bracketing Resource Management

You can use Stream.bracket to bracket a resource and ensure you have a chance to clean it up.

Let's assume our resource is managed by a monad, a Task. We can do:

scala> Stream.bracket(Task.delay(10))(
     |     intResource => Stream.range(0, intResource),
     |     intResource => Task.delay(println(s"Releasing resource $intResource"))).runLog.unsafeRun
Releasing resource 10
res0: Vector[Int] = Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)

Stream.onFinalize Resource Management

If you need to run something when the Stream completes, .onFinalize is guaranteed to be called even if there is an exception. Underneath, .onFinalize uses Stream.bracket.

scala> (Stream(1,2,3) ++ Stream.fail(new RuntimeException("boom")) 
     |     ++ Stream(4,5,6)).onFinalize(Task.delay(println(s"finalizer called"))).mask.run.unsafeRun
finalizer called

We used .mask, described below, in order to "eat" the actual exception thrown for the example.

Callback-Style Error Handling

Using onError is straight forward. It is essentially a callback handler much like Future.handleWith (flatMap oriented) or Future.transform in Scala.

scala> (Stream(1,2,3) ++ Stream.fail(new RuntimeException("boom")) 
     |     ++ Stream(4,5,6)).onError(t => Stream(-1)).toVector
res2: Vector[Int] = Vector(1, 2, 3, -1)

Here, we used Stream's onError handler to managed the error and provide values when an error occurred. It did not use Task's error handling. Obviously, the actual exception would be generated inside your Stream versus explicitly created as in the example above. The onError handler t => Stream looks like a flatMap A => M[B].

You can eat Stream exceptions using Stream.mask which merely uses .onError to eat it:

scala> (Stream(1,2,3) ++ Stream.fail(new RuntimeException("boom")) 
     |     ++ Stream(4,5,6)).mask.toVector
res3: Vector[Int] = Vector(1, 2, 3)

In both cases, once the error occurs, any downstream elements are lost. It's clear that handling the error at the effect level allows you to keep your stream composition a bit more clean of restart/retry/re-anything code.

Using Stream.attempt (or your effect's .attempt)

You can also treat exceptions as values using Stream.attempt which converts your Stream from "A" values to "Attempt" which is really "Either[Throwable, A]".

scala> (Stream(1,2,3) ++ Stream.fail(new RuntimeException("boom")) 
     |     ++ Stream(4,5,6)).attempt.run.unsafeRun

If your effect can also catch errors, you can use that to catch them:

scala> (Stream(1,2,3) ++ Stream.fail(new RuntimeException("boom")) 
     |     ++ Stream(4,5,6)).covary[Task].run.attempt.unsafeRun
res5: fs2.util.Attempt[Unit] = Left(java.lang.RuntimeException: boom)

The .run took our Task based Stream (via .covary[Task]) and used the .attempt method on Task to convert from Task[A] to Task[Attempt[A]].

Last updated