fs2 Effects

fs2 comes with a few effects of its own. You can plugin in your own. This is important because you may not want to use fs2's Task, you may want to use someone else's Task. You can also use other types of effects such as Either or Option but in reality you will want to use something that allows asynchronous effects.

Effects as implemented in most libraries include effect4s or cats-effect are really "strategy design patterns." They implement a few methods that allow you to create the intended effect. The target effect is a class such as Task. Once you have a stable effects "API" you can build on it, which is what the scala macro that generalized async/await library monadless did.

For example, when you summon an Async instance in fs2 via implicit val F = Async[Task] you are really asking for a strategy class to be found that has a few special methods to create Tasks.

In the case of Async, the effect is one of running code asynchronously. Async implements a critical method that allows you to interface to other code, whether synchronous or not, and returns an instance of the effect type. It may seem strange that you need a "strategy design pattern" object for this, but the API of different effects can vary dramatically or sometimes, confusingly similar. Future and Task, for example, have some similarities in their API but differ in their semantics--Future starts computing right away. The Async can abstract over this and provide a single API for you to use.

The effects API generally has to worry about a two core areas:

  • Creating the target effect instance e.g. lifting a value to the target effect.

  • Handling errors

The effects object, such as Async, allows you to create the target effect, such as a Task, using the same "Async" API everywhere. It is still up to you to actually run the resulting Task if that is what your target effect requires.

Let's look at a few examples. First we need to establish, for convenience, an implicit effect, called F. If we use F directly it does not need to be implicitly declared, but we will need the implicit part later in this section.

scala> import fs2._
import fs2._

scala> import fs2.util._
import fs2.util._

scala> import scala.concurrent.duration._
import scala.concurrent.duration._

scala> import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.ExecutionContext.Implicits.global

scala> implicit val strategy = Strategy.fromExecutionContext(global)
strategy: fs2.Strategy = Strategy

scala> implicit val F = Async[Task]
F: fs2.util.Async[fs2.Task] = Async[Task]

If we tab complete the F in the repl, for this version of fs2, you get:

fs2> F. 
!=                  attempt             getClass            parallelTraverse    suspend
==                  delay               hashCode            pure                toString
ap                  equals              isInstanceOf        ref                 unsafeRunAsync
asInstanceOf        fail                map                 refOf               |>
async               flatMap             parallelSequence    start
fs2>

Async (or F) also has pure, which we know is a haskell word that lifts a strict value into a monad. Hence we could do:

scala> F.pure(1) 
res0: fs2.Task[Int] = Task

scala> F.pure(1).unsafeRun 
res1: Int = 1

It is normal that the effect is capitalized since its not really a "data" class per se nor is it the actual effect. Letters F and E are popular.

After we lifted the value using pure, we had a Task returned. To see what was in the Task, we have to run the Task using the Task's API, not the Async API. unsafeRun runs and returns a value, blocking until the result of the computation is available. unsafeRun could throw an exception if your task code throws an exception. We typically want different error handling processing.

Let's say we are concerned that an exception could be thrown but we want to handle the exception as a value instead using a try-catch block (which is very unfunctional).

We can use the Async API to do that by:

scala> F.attempt(Task.delay(1)) 
res2: fs2.Task[fs2.util.Attempt[Int]] = Task

We provided F.attempt with the target effect, Task, directly, to show that the .attempt belongs to F. Other strategy patterns that model effects may provide other methods to manage errors. Having to specify the target effect directly in F is not helpful and not the recommended approach so we can just use F.pure like above.

An fs2 Attempt is a convenient name for Either[Throwable, A]. Let's compose this using only F:

scala> F.attempt(F.pure(1)).unsafeRun 
res3: fs2.util.Attempt[Int] = Right(1)

Alight, a pure value of 1 is not throwing an exception but we see that we get an Either back. Since "right is good" for an Either, we get a Right(1) returned. If we lift something that throws an error:

scala> F.attempt(F.delay(throw new RuntimeException("blah"))).unsafeRun 
res4: fs2.util.Attempt[Nothing] = Left(java.lang.RuntimeException: blah)

Which shows that the exception is now a "value." We did not use "pure" to lift the "processing code" because we wanted to wait for the Task to be created and catch the exception. If we had used pure, the throw would have happened immediately and the Task would not have seen it:

scala> F.attempt[Int](F.pure(throw new RuntimeException("blah"))).unsafeRun 
java.lang.RuntimeException: blah
  ... 256 elided

The Task interface is rich with methods for selecting how a Task should be run. For example, you can delay running the Task for a certain amount of time, you can "race" a Task with another Task and receive the first result back. You can also have the task return an Option which holds an Either[Throwable, A] where None indicates that it synchronously until an async boundary was hit, then returned the Either. This asynchronous "boundary" sounds strange, but the idea is that since you can compose Tasks together using, say, flatMap, and each flatMap could return a Task that runs asynchronously, you may want a way to control evaluation more precisely. This type of "boundary" behavior is specific to Task so the Async API knows nothing about it nor provides any API to address it.

You saw before that Async had a method .attempt. Task also has an .attempt. Previously, we used the Async method, but we could use Task's directly. It turns out to be more convenient in some cases:

scala> F.pure(1).attempt.unsafeRun 
res6: fs2.util.Attempt[Int] = Right(1)

Since .attempt occurs after the .pure it is being applied to the Task, not Async. Now we could have done this using just the Async API like we showed earlier:

scala> F.attempt(F.pure(1)).unsafeRun 
res7: fs2.util.Attempt[Int] = Right(1)

Of course using .attempt twice, once for F and once for Task would not make sense:

scala> F.attempt(F.pure(1)).attempt.unsafeRun 
res8: fs2.util.Attempt[fs2.util.Attempt[Int]] = Right(Right(1))

You will need to choose which one works for you. If your effect already includes an .attempt or something like that, you can use that, or use F.attempt and F.delay which allows you to use F without any knowledge about the underlying effect--a better choice when writing your own code that allows the user to specify the effect they would like to use.

Async also has two curious methods: unsafeRunAsync and async. Both of these methods take callbacks that allow you to interface with other code. These methods are useful when integrating with other code that themselves require callbacks.

Let's just look at Async.async as the other method is very similar in concept. The signature is:

 def async[A](register: (Either[Throwable,A] => Unit) => F[Unit]): F[A]

We know the Either captures an error or a value. The signature Either => Unit indicates that the parameter to async needs to be a function. It takes a callback as the first argument. You would call the callback when your domain specific processing has an error or a value to return. Calling the callback returns Unit since there is no communication needed by your processing block from the async function e.g. an acknowledgement. But async has the => F[Unit] at the end as well, so we need are getting a <callback> => F[Unit]. The main idea here is that when you work asynchronously, you want to indicate "when" the callback should be called. In many languages you cannot delay computation, but in scala you can. Hence, your application code needs to call the callback and return a F that tells F.async when to process that callback. Many libraries, especially javascript, assume that the callback processing happens immediately. However, you never really know what the library may do with your callback. Its a bit more precise here.

Here's an example:

scala> // Straight processing
     | def outsideworldGood(arg: String) = arg + " - You did it!"
outsideworldGood: (arg: String)String

scala> def getResult1(input: String) = F.async( (cb: Either[Throwable, String] => Unit) => {
     |        Task.delay { 
     |            // do some clever processing...
     |            cb(Right(outsideworldGood(input)))
     |        }
     | })
getResult1: (input: String)fs2.Task[String]

scala> getResult1("1. Jump the shark").unsafeRun
res10: String = 1. Jump the shark - You did it!

But we may want to delay processing a bit when we interface to the outside world:

scala> implicit val scheduler: Scheduler = Scheduler.fromFixedDaemonPool(2)
scheduler: fs2.Scheduler = Scheduler(java.util.concurrent.ScheduledThreadPoolExecutor@14d0b789[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0])

scala> def getResult2(input: String, delayBy: FiniteDuration) = F.async( (cb: Either[Throwable, String] => Unit) => {
     |            // do some clever processing...
     |            F.delay(cb(Right(outsideworldGood(input)))).schedule(delayBy)
     | })
getResult2: (input: String, delayBy: scala.concurrent.duration.FiniteDuration)fs2.Task[String]

scala> getResult2("2. Jump the shark but have lunch first", 5 seconds).unsafeRun
res11: String = 2. Jump the shark but have lunch first - You did it!

Notice that we used Task.schedule which means our function was not completely agnostic to the specific effect type. That's because Async does not know about "scheduling." The lunch delay occurs on our thread, we could have started it in the background the asked it to delay. Don't schedule a delay directly on your thread.

We can also process bad things that happen:

scala> // Function with distinct callbacks, you find these in javascript libraries alot.
     | def outsideworldBad(arg: String, cb: (Option[Throwable], Option[String]) => Unit): Unit =
     |    cb(Some(new RuntimeException("Boom!")), None) // explicit error for the example
outsideworldBad: (arg: String, cb: (Option[Throwable], Option[String]) => Unit)Unit

scala> def getResult3(input: String) = F.async( (cb: Either[Throwable, String] => Unit) => {
     |        F.delay { 
     |            // do some clever processing...
     |            outsideworldBad(input, (t: Option[Throwable], a: Option[String]) => 
     |                (t,a) match {
     |                    case (Some(t), None) => cb(Left(t))
     |                    case (None, Some(a)) => cb(Right(a))
     |                    case _ => cb(Left(new RuntimeException("Javascript API is broken! You should never reach here.")))
     |            })
     |        }
     | })
getResult3: (input: String)fs2.Task[String]

scala> getResult3("3. Jump the shark").attempt.unsafeRun
res13: fs2.util.Attempt[String] = Left(java.lang.RuntimeException: Boom!)

All of these example really only call the outside world once. If you wanted to repeat the call to the outside world, you would generate a stream of values and for that, you would use fs2 streams of course.

Last updated