Channels and Exchanges

Channels

UPDATE FOR FS2 A sstream Channel is a process of functions. It may seem strange to have a process that contains functions, but in scala, functions can be treated as data. The signature for Channel is:

type Channel[+F[_], -I, O] = Process[F, (I)  F[O]]
// An effectful channel, to which we can send values and get back responses.

The function signature is quite specific. Given some input value, return an environment that somehow returns an output value when the environment is accessed. The input value usually arrives via another process. The environment is often a Task from which a value could be obtained by running the task. The Task should return a value. The reason why the "effectful" doc comment is there is because if you can access the environment, synchronously or asynchronously, that returns a value then you can perform a side effect in that environment. A common environment is a Task. In this sense, Task could be thought of as IO in the haskell sense---some type of effect is wrapped up in an object so it's runtime behavior can be controlled.

The main way to create a custom channel, since each effect will probably be specific to your application is to use io.channel:

def myEffect = channel.lift { url: String => Task.delay{ getUrlContent(url) }}

Here getUrl uses a side effect, such as a call to a webservice, to obtain and return a result. You could imagine any side effect modeled this way. Instead of getUrl you could use insertIntoDB and the input could be some type of record that needs to be inserted into the database.

Another standard use of Channel is to zip a function with an argument to the function and within the zip, apply the function to the argument. You could do this with a "tee" and tee's zipApply:

scala>val addOne = (i: Int) => Task{i + 1}
addOne: Int => scalaz.concurrent.Task[Int] = <function1>

scala> Process.repeatEval(Task.delay{addOne}).take(10).runLog.run
res46: IndexedSeq[Int => scalaz.concurrent.Task[Int]] = Vector(<function1>, <function1>, <function1>, <function1>, <function1>, <function1>, <function1>, <function1>, <function1>, <function1>)

scala>Process.range(1,10).toSource.runLog.run
res47: IndexedSeq[Int] = Vector(1, 2, 3, 4, 5, 6, 7, 8, 9)

scala>val channel = channel.lift(addOne)
channel: scalaz.stream.Channel[scalaz.concurrent.Task,Int,Int] = Append(Emit(Vector(<function1>)),Vector(<function1>))

scala>Process.range(1,10).tee(channel)(tee.zipApply).runLog.run
res53: IndexedSeq[scalaz.concurrent.Task[Int]] = Vector(scalaz.concurrent.Task@4ce36f04, scalaz.concurrent.Task@22ced72a, scalaz.concurrent.Task@66c3530b, scalaz.concurrent.Task@287116d6, scalaz.concurrent.Task@7fbdc793, scalaz.concurrent.Task@2b9e38c9, scalaz.concurrent.Task@57396dba, scalaz.concurrent.Task@775e162d, scalaz.concurrent.Task@664ec60a)

scala> Process.range(1,10).tee(channel)(tee.zipApply).eval.runLog.run
res54: IndexedSeq[Int] = Vector(2, 3, 4, 5, 6, 7, 8, 9, 10)

In the above, we used the argument, the stream of integers, and wanted to apply the addOne function to them. addOne is turned into a channel using io.channel. Because addOne already performed the "effect" inside of a task, it was easy wrap call to create the channel. Then, using "tee" and "zipApply" we applied the function, inside the channel, to the arguments. For zipApply to work, the arguments need to be on the "left" and the function to be applied on the "right." In the next to last expression that merely running this stream results in a vector of Tasks. That's because the addOne function returned tasks. You need to extract the values from the Tasks using "run". sstream's eval evaluates the Task using run and returns the value. You could have also just used the vector of Tasks and evaluated that result outside of sstreams e.g. .run.run.map(_.run).

It is probably more often the case that you already have a function and need to convert it into a channel. If your function body is not wrapped in a Task, it needs to be. But you can wrap any function with a little boilerplate:

scala>def addTwo = (i: Int) => i+2
addTwo: Int => Int

scala> Task.delay(addTwo).map(_(2)).run
res66: Int = 4

scala> def addTwoWrapped = (i: Int) => Task.delay(addTwo).map(_(i))
addTwoWrapped: Int => scalaz.concurrent.Task[Int]

scala>io.channel(addTwoWrapped)
res67: scalaz.stream.Channel[scalaz.concurrent.Task,Int,Int] = Append(Emit(Vector(<function1>)),Vector(<function1>))

There's a slightly better way to express this by implicitly grabbing a scalaz Functor[F] but hopefully this illustrates the idea.

Exchanges

Refer to http://stackoverflow.com/questions/27967118/scalaz-stream-how-to-implement-ask-then-wait-reply-tcp-client

Last updated