Recipe: onFinish for Task

import fs2._
import fs2.util._
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
implicit val strategy = Strategy.fromExecutionContext(global)
implicit val scheduler = Scheduler.fromFixedDaemonPool(4)
implicit val F = Async[Task]

There is no onFinish or onComplete method on Task. How can we run a Task "callback" after a Task has finished? Assume your callback looks like dispose: Task[Unit] = .... Also assume that that the dispose task could also throw an exception. nnnnnn We can use Task.flatMap to sequence the computations. Since Task allows us to catch an exception or a value as an Either we can:

scala> val dispose = Task.delay(println("Cleanup!"))
dispose: fs2.Task[Unit] = Task

scala> val goodTask = Task.delay(1+1)
goodTask: fs2.Task[Int] = Task

scala> val badTask = Task.delay(throw new RuntimeException("Boom!"))
badTask: fs2.Task[Nothing] = Task

scala> goodTask.attempt.flatMap{ result =>
     |   println(s"Original task result: $result")
     |   dispose.flatMap(_ => 
     |     result.fold(Task.fail, Task.now))}.unsafeRun
Original task result: Right(2)
Cleanup!
res0: Int = 2

But for a task that that throws.

scala> badTask.attempt.flatMap{ result =>
     |   println(s"Original task result: $result")
     |   dispose.flatMap(_ => result.fold(Task.fail, Task.now))}.unsafeRun
Original task result: Left(java.lang.RuntimeException: Boom!)
Cleanup!
java.lang.RuntimeException: Boom!
  at $anonfun$1.apply(<console>:28)
  at $anonfun$1.apply(<console>:28)
  at fs2.Task$.$anonfun$delay$1(Task.scala:191)
  at fs2.Task$.$anonfun$suspend$2(Task.scala:199)
  at fs2.util.Attempt$.apply(Attempt.scala:12)
  at fs2.Task$.$anonfun$suspend$1(Task.scala:199)
  at fs2.internal.Future.step(Future.scala:54)
  at fs2.internal.Future.listen(Future.scala:30)
  at fs2.internal.Future.runAsync(Future.scala:69)
  at fs2.internal.Future.run(Future.scala:79)
  at fs2.TaskPlatform$JvmSyntax.unsafeRun(TaskPlatform.scala:14)
  ... 164 elided

Ouch! This actually throws the exception from the original bad task. We actually want that exception because its important information, but we realize that the Task.fail plucks the exception out and returns it from the flatMap. Depending on how you want to handle any errors from your badTask you need to add that at the end after the flatMap for resource cleanup. The .attempt on badTask is there so we can run our cleanup handler. Note that resource management is builtin to fs2 streams with Stream.bracket but we are working with Tasks here.

We could just convert any errors to an Either, this time, for the purposes of handling the original badTask errors:

scala> val badTask = Task.delay(throw new RuntimeException("Boom!"))
badTask: fs2.Task[Nothing] = Task

scala> badTask.attempt.flatMap{ result =>
     |   println(s"Original task result: $result")
     |   dispose.flatMap(_ => result.fold(Task.fail, Task.now))}.attempt.unsafeRun
Original task result: Left(java.lang.RuntimeException: Boom!)
Cleanup!
res2: fs2.util.Attempt[Nothing] = Left(java.lang.RuntimeException: Boom!)

If your Task already returns an Either or other Monad that captures the concept of an error (Either, Option, Try) you can apply the same idea.

This recipe is sourced from gitter.

Last updated