Combining Streams: Wye and Tee

NEED TO BE UPDATED. THESE ARE NOW PIPE2.

I needed to use scalaz-streams (sstreams) for some processing but was having problems understanding wye and tee. Here's a bunch of examples that illustrate what the functions do. You can read the specs in the tests, which are quite good. I needed a place to stash notes based on my work. Hence this article. Hopefully, it helps others learn faster.

In the below examples, you need to remember that the Process is a moment-in-time capture of the state of the stream. As such, when you see a Wye or Tee object, it will be used once, but then the next process state will use another object instance. You sometimes starting wondering, at least I did, where does the object that remembers what you are doing live? Since each state carries with it enough information to generate the next state, give a suitable driver, you do not need a mutable, long lived object sticking around that runs only that process step. It has taken me awhile to get used to this abstraction. It feels alittle like the difference between the Euler vs. Lagrangian reference system in fluid dynamics.

In the examples below, I try to keep the input expression one one line. Sometimes, this makes the line very long. But I think it makes it easier to read in some ways.

Wye

You need to remember that a wye joins the output of two processes non-deterministically. What does this mean? Non-deterministically means that the outputs do not just come from the right side or just the left side or have a pre-scribed rule about which side, e.g. bounces between left and right and back again, to emit values.

I had trouble with the syntax at first because in the scalaz.stream scope, there is an object named wye that holds a variety of ways to create wyes. In addition, WyeOps allows processes to be turned into Wyes. But underneath the hood, WyeOps just calls the methods in the wye object. Wye is just a type that describes a Process. Like any other Process, you have to use run.run or runLog.log to run the stream interpreter and obtain results.

There are a few wye functions that are foundational to creating wye objects. Many of the functions in the wye object take another wye object which makes you wonder how do create the first wye to begin with?

Since the wye "object" is really just an object with "static" methods for creating wyes, we use a few methods to create a wye object and use that new wye in the other methods in the wye "object." Think of it like this. The two methods below create wyes that you use with the combinators in the wye "object" and other Processes. In the end, you really always need some type of left and right process to feed a wye because a wye combines a "left" and "right" process.

  • wye.merge[I] -> creates a wye that merges the two inputs

  • wye.either[I] -> same as wye.merge but retains "left" and "right" origin information

There are a few others as well. These functions create a Wye object that implement the "merging" strategy--in this case, wye.either merges the two streams, non-deterministically, while there are values from either the left or the right. If the left runs out of values, and hence it Halts, but the right side has more values, the wye will keep outputting values from the right side. You use combinators in the wye "object" to alter this strategy. For example, you can Halt the wye if either the left or the right Halts, or just the left Halts, or just the right Halts.

In the next set of examples, wye.apply() is used to bring the two processes together into the Wye and the last argument is the merging strategy function. Also, your results may vary from mine because of non-deterministic behavior--the whole point of the wye to start with.

Just remember that if you are creating a wye (or tee), you need two inputs and you need to specify a "merge" strategy. That merge strategy the wye or tee and you feed inputs into the wye merge strategy via the combinators in the wye or tee class.

Merge two equal size lists

scala> wye(Process.range(1,6), Process.range(6,11))(wye.either).runLog.run
res3: IndexedSeq[scalaz.\/[Int,Int]] = Vector(-\/(1), \/-(6), \/-(7),
-\/(2), -\/(3), \/-(8), \/-(9), -\/(4), -\/(5), \/-(10))

We did not need the type on wye.either in this case but we might need to sometimes e.g. wye.either[Int].

Notice that the values are wrapped in a disjunction. That's because we used wye.either as the strategy and it "tags" the data with the left or right projection based on where it came from. If you use wye.merge, wye does not retain the origin information.

Merge two unequal sized lists

scala> wye(Process.range(1,11), Process.range(11,12))(wye.either).runLog.run
res8: IndexedSeq[scalaz.\/[Int,Int]] = Vector(-\/(1), -\/(2), -\/(3), -\/(4), 
-\/(5), -\/(6), -\/(7), -\/(8), -\/(9), -\/(10), \/-(11))

The left list is longer, but since either keeps going until both sides Halt, all the values are still obtained.

You can also use the operator notation like

scala> (Process.range(1,11) either Process.range(11,12)).runLog.run
res26: IndexedSeq[scalaz.\/[Int,Int]] = Vector(-\/(1), \/-(11), -\/(2), -\/(3), 
-\/(4), -\/(5), -\/(6), -\/(7), -\/(8), -\/(9), -\/(10))

Don't forget that because wye is non-deterministic, your results may vary from mine, but the semantics should be the same.

Halt when either left or right Halts.

If we use two unequally sized lists, we can force the wye to halt if either list Halts even if the other side still has data. We use almost the exact same expression as the one above except we use mergeHaltBoth.

scala> wye(Process.range(1,11), Process.range(11,12))(wye.mergeHaltBoth).runLog.run
res9: IndexedSeq[Int] = Vector(1, 11)

We only see two outputs because in this case the right side was only going to issue one value and mergeHaltBoth halted after obtaining the one value. Internally, the Wye received a Halt state from the right side and Halt the wye itself.

Also notice that with wye.merge* strategies, there is no disjunction "origin" information on the values. There are just the values themselves.

Terminate when the left side terminates

This one terminates only when the left side terminates even though the right side still had values to emit.

scala> wye(Process.range(1,3), Process.range(3,100))(wye.mergeHaltL).runLog.run
res0: IndexedSeq[Int] = Vector(1, 3, 4, 2)

The mergeHaltR does the same thing but for the right hand side.

Can use WyeOps on Process

When you use yourProcess.wye(...) on a process, yourProcess becomes the left hand side.

scala> Process.range(1,3).wye(Process.range(3,100))(wye.mergeHaltL).runLog.run
res1: IndexedSeq[Int] = Vector(1, 2)

scala> wye(Process.range(1,3),Process.range(3,100))(wye.mergeHaltL).runLog.run
res4: IndexedSeq[Int] = Vector(3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 1, 2, 13, 14)

Here we ran it both with the .wye syntax and the wye "object" syntax. Notice that that we obtained different results but the behavior was the same i.e. mergeHaltL. The different results were obtained because the merging is non-deterministic.

Tee

Tee is pretty performs the same operation as wye, combining two inputs, except its deterministic. Either you tell it to alternate between left and right or use the right first or some other variation. But unlike what you saw above for Wye, you should always get the same "combining" results when you run it.

Here are a few Tees that create the merging logic:

  • tee.interleave - interleave between left and right until one of the streams Halts

  • tee.passL - only pass inputs from the left

  • tee.passR - only pass inputs from the right

  • tee.until - echoes the right side until the left side becomes true, then halts

  • tee.when - echoes the right side when the left side is true, but when the left side becomes true halts

Like wye, there are some zips as well.

At first, it seems like there is no equivalent to wye.apply() which is correct. For Tee, you can feed a sequence into a Tee along with a Process and another "merging logic" tee to create the Tee or you use the infix notation like we did in the last examples of Wye. We'll show both below. The feed approach is applicable to the wyes although we did not show an example in that section.

Alternate left then right until both are exhausted, even if uneven lists

scala> Process.range(1,3).toSource.tee(Process.range(3,10))(tee.interleave[Int]).runLog.run
res1: IndexedSeq[Int] = Vector(1, 3, 2, 4)

We need the toSource in order to generate the Task F[_] environment needed to run the process. We could have assigned parts of the expression to a typed val and the Task when also have been added automatically. Notice that after one of the streams Halts, the tee halts.

You can use the other flavor of interleave which operates directly off the left process

scala> (Process.range(1,10).toSource interleave Process.range(10,20)).runLog.run
res23: IndexedSeq[Int] = Vector(1, 10, 2, 11, 3, 12, 4, 13, 5, 14,
6, 15, 7, 16, 8, 17, 9, 18)

We need to toSource one one of the processes to make sure that the environment, Task, is used. Type inference figures out the F[_] for the rest of the expressions.

Use the left side as a boolean signal to control the right side

scala> Process.emitAll(Seq(false, false, false, true)).tee(Process.range(1,10).toSource)(tee.until).runLog.run
res39: IndexedSeq[Int] = Vector(1, 2, 3)

until is very similar

cala> Process.emitAll(Seq(true, true, true, false)).tee(Process.range(1,10).toSource)(tee.when).runLog.run
res40: IndexedSeq[Int] = Vector(1, 2, 3)

Passing values

passL/R ignore one side and Halts when the respective side Halts.

Here the large set of inputs on the left, 100 000 worth integers, are ignored and only the right side is processed.

scala> Process.range(1,100000).toSource.tee(Process.range(1,11))(tee.passR).runLog.run
res43: IndexedSeq[Int] = Vector(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

passL is the same but with the left side passed.

Feeding into a tee (or a wye directly)

The feedL and feedR let you feed data you already have, such as a sequence of values, directly into a tee or a wye. This is useful if you already have some data not originating from a Process that needs to be combined into the stream. Of course, you could just create a process that emits your sequence of values and layer a tee on top of another process or tee.

Let's first look at a curious example. We will first do a Wye then a Tee.

scala> val w = wye.feedL(List.fill(10)(1))(process1.id)
w: scalaz.stream.Wye[Int,Any,Int] = Append(Emit(Vector(1, 1, 1, 1, 1, 1,
1, 1, 1)),Vector(<function1>))

scala> halt.wye(halt)(w).runLog.run
res13: IndexedSeq[Int] = Vector(1, 1, 1, 1, 1, 1, 1, 1, 1, 1)

The second expression is very strange and its an artifact of needing to run the process in scala repl. By itself, you cannot run a wye or a tee. Like the other expressions above, you need to provide it with an environment and another process. If you look at feedL above, you see that you provided a sequence and a process. But wye/tee need two processes as inputs. process1.id is promoted to a wye and it echoes whatever comes enters it, left or right. A process1 takes 1 input and creates 1 output so even if it is promoted to a wye, we still needs another input.

The second expression starts with halt.wye(halt) where halt is a process that emits nothing and it creates a wye with a right that is also a halt and layers these halts (left and right) ontop of the wye w. The notation leftProcess.wye(rightProcess)(anotherWye) builds on the first wye--it takes two processes and puts them together. Since the halts are procesess (Process0[Nothing]), they complete a wye that can run. .runLog.run runs the process. First the original w runs and outputs the filled list, then the other two input processes run which immediately signal they are done--no values come from the outer function. The default merging strategy is merge which take whatever you can get, as much as you can get, when you can get it. The original list comes out first. The right and left side then finish with a halt. This halts the overall process. The run does not produce an interesting output but illustrates the point.

It is more interesting to have a non-halt input. This merges the process-independent data into the process and combines it with another output stream. Again, if you have the original process, you could just run append on the inputs. In many scenarios you do not have access to it, so with the wye/tee and feedL/R, you can feed in data into a process like below:

scala> Process.range(100,110).wye(halt)(w).runLog.run
res6: IndexedSeq[Int] = Vector(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 100, 101, 
102, 103, 104, 105, 106, 107, 108, 109)

The 1s come first from the original w then the range of values from the process that is added last to the composition are output. This is deterministic so we should always expect this result.

Its the same with Tee:

scala> val t = tee.feedL(List.fill(10)(1))(process1.id)
t: scalaz.stream.Tee[Int,Any,Int] = Append(Emit(Vector(1, 1, 1, 1, 1, 1,
1, 1, 1)),Vector(<function1>))

scala> Process.range(100,110).tee(halt)(t).runLog.run
<console>:27: error: could not find implicit value for parameter C: scalaz.Catchable[F2] 
Process.range(100,110).tee(halt)(t).runLog.run

Similar to issue when you see that some implicits are missing or .runLog.run does not seem to work. There are some type inference issues. I have asked the sacalz list about this, but to ensure that the implicit is found, I provided explicit type information:

scala> Process.range(100,110).tee[Task, Int, Int](halt)(t).runLog.run
res25: IndexedSeq[Int] = Vector(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 100, 101,
102, 103, 104, 105, 106, 107, 108, 109)

Zipping

I will not cover all the zipping possibilities but I will mention one that has been quite useful. Let's say we want to apply a function to an argument. That's already pretty straight forward

scala> Process.range(1,10).map(_ + 1).toSource.runLog.run
res9: IndexedSeq[Int] = Vector(2, 3, 4, 5, 6, 7, 8, 9, 10)

Given a process we can easily use map. Now let's assume the function is effectful and that the return value from the function needs be in an environment. And lets assume that the effectful function is represented by a process as well--a stream of functions each of which is in an environment.

We know we can zip the two streams together with a tee and apply the function to the argument. Here's a quick example. We could have used Task.now as well for this snippet.

scala> Process.range(1,10).tee(Process.repeatEval(Task.delay{x:Int => x + 1}))(tee.zipApply).runLog.run
res7: IndexedSeq[Int] = Vector(2, 3, 4, 5, 6, 7, 8, 9, 10)

The argument needs to be on the right and the function on the left.

Zipping can be easy. Let's just zip two processes that produce Ints and Strings. We'll use the default "combining" strategy called tee.zip which takes the two inputs (left and right) and combines them into a tuple:

 val left = Process.emitAll(Seq(1,2,3))          //> left  : scalaz.stream.Process0[Int] = Emit(List(1, 2, 3))
  val right = Process.emitAll(Seq("blah", "hah", "nah"))
                                                  //> right  : scalaz.stream.Process0[String] = Emit(List(blah, hah, nah))
  left.tee(right)(tee.zip[Int, String]).toSource.runLog.run
                                                  //> res28: IndexedSeq[(Int, String)] = Vector((1,blah), (2,hah), (3,nah))

Zippig And Effectful Computations (like concurrency)

Now lets have the stream of functions return an effectful computation.

In the real world, the function returning an effect, such as a Task, is much like programming with futures where we create the future, say grabbing a remote web page, and the input is the URL itself. Back to scalaz...each effectful function is really a urlString => Task[String]. We might then choose, downstream say, to run that Task asynchronously or synchronously. If we had used a scala Future, the web page grab would have started immediately. Using Task and not calling run on it immediately delays the decision to how much "nondeterminism" to use to later in the program. This makes the application more composable.

Here's an example that evaluates _ + 1 in an asynchronous task that also sleeps for 2000 millis. The actual tasks remain unevaluated because the output object from the "function" is a task that when run individually, would complete the calculation:

scala> Process.range(1,10).tee(Process.repeatEval(Task.delay{x:Int => Task{Thread.sleep(2000); x + 1}}))(tee.zipApply).runLog.run
res14: IndexedSeq[scalaz.concurrent.Task[Int]] = 
Vector(scalaz.concurrent.Task@4ffa3d4f,
scalaz.concurrent.Task@5d41ef2c, scalaz.concurrent.Task@79057f3e,
scalaz.concurrent.Task@36707e79, scalaz.concurrent.Task@28fe2dbd,
scalaz.concurrent.Task@7a148f01, scalaz.concurrent.Task@28baf3f2,
scalaz.concurrent.Task@1d8cc32a, scalaz.concurrent.Task@444686e3)

So we would need to run it to get the values. We can run it after the process has been run, which fine for this example. This one lines takes 200010 millis to complete and return the result vector. The async part only* happens when t.run is called.

scala> Process.range(1,10).tee(Process.repeatEval(Task.delay{x:Int => Task{Thread.sleep(2000); x + 1}}))(tee.zipApply).runLog.run.collect{ case t => t.run}
res17: IndexedSeq[Int] = Vector(2, 3, 4, 5, 6, 7, 8, 9, 10)
`

But for a long running stream where we need to control the memory, we need the "running" part to be applied to the tasks a bit earlier since in most cases, more processing is needed downstream. Once the Task is run asynchronously, we want to make sure the output values are output in a process so it can be used in downstream processing. Obviously, the order is no longer preserved because each Task may run asynchronously for a different amount of time.

First we can convert each asynch, unstarted Task to a Process using Process.eval:

scala> Process.range(1,10).tee(Process.repeatEval(Task.delay{x:Int => Task{Thread.sleep(rand.nextInt(5)*1000); x + 1}}))(tee.zipApply).map(Process.eval)
res27: scalaz.stream.Process[scalaz.concurrent.Task,scalaz.stream.Process[scalaz.concurrent.Task,Int]] = Append(Halt(End),Vector(<function1>))

Then we need to merge the results together. We can use merge.mergeN to do this as indicated in this gist. merge.mergeN merges the output of multiple processes into a single output stream. We add the .runLog.run to the end to get the results. We have also added randomness to the sleep to help illustrate how the resulting output is not longer in order.

scala> val rand= new java.util.Random
rand: java.util.Random = java.util.Random@36edc749

scala> merge.mergeN(4)(Process.range(1,10).tee(Process.repeatEval(Task.delay{x:Int => Task{Thread.sleep(rand.nextInt(5)*1000); x + 1}}))(tee.zipApply).map(Process.eval)).runLog.run
res29: IndexedSeq[Int] = Vector(5, 4, 2, 3, 7, 8, 9, 6, 10)

The line is a bit of mouthful. But we can see how this could be re-used. For example, we could add yet another 1 to the values simply using map:

scala> val rresult = merge.mergeN(4)(Process.range(1,10).tee(Process.repeatEval(Task.delay{x:Int => Task{Thread.sleep(rand.nextInt(5)*1000); x + 1}}))(tee.zipApply).map(Process.eval))
rresult: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Append(Halt(End),Vector(<function1>))

scala> rresult.map(_ + 1).runLog.run
res30: IndexedSeq[Int] = Vector(4, 5, 6, 7, 10, 9, 3, 8, 11)

But we know see that this concept of a function returning an effectful computation given an argument is really just a Channel[F[_], I, O] that is used all over in the io module. The basic idea behind the use of Channel for io is that you have to make a request to some object using an argument, and you get something that you must evaluate to get the returned value. The environment is bubblewrap around the object returned from the external environment. To peak inside, you .run to remove the bubblewrap.

scala> val asyncAdderChannel = channel.lift { x:Int => Task { Thread.sleep(rand.nextInt(5)*1000); x+1}}
asyncAdderChannel: scalaz.stream.Channel[scalaz.concurrent.Task,Int,Int] = Append(Emit(Vector(<function1>)),Vector(<function1>))

which can be run exactly like before:

scala> val rresult = merge.mergeN(4)(Process.range(1,10).tee(asyncAdderChannel)(tee.zipApply).map(Process.eval)).runLog.run
rresult: IndexedSeq[Int] = Vector(2, 3, 4, 5, 7, 8, 9, 6, 10)

You read this as a process that models a stream of arguments (the range) and evaluating (the apply) them using a stream of effectfull functions then outputting the values, possibly out of order. You can also view this as a super-complicated way to add 1 to some arguments in a separate thread pool then spew them out as a vector :-) Or you can view this as merging a function stream and the argument stream. Since the function produces a task that adds numbers together, you run it to add a stream of inputs asynchronously.

If you use the approach outlined here, you can do this even more simply. Use this combinator (modified from the here link):

implicit class ConcurrentProcess[O](val process: Process[Task, O]) {
    /**
     * Run process through channel with given level of concurrency
     */
    def concurrently[O2](concurrencyLevel: Int)
                        (f: Channel[Task, O, O2]): Process[Task, O2] = {
      merge.mergeN(concurrencyLevel)(process.tee(f)(tee.zipApply).map(Process.eval))
    }
  }

to be able to run:

cala> val result = Process.range(1,10).concurrently(4)(asyncAdderChannel).runLog.run
result: IndexedSeq[Int] = Vector(3, 2, 5, 8, 7, 4, 6, 10, 9)

You might ask your self why can't I just do

scala> Process.range(1,10).tee(asyncAdderChannel)(tee.zipApply).gather(2).runLog.run
res75: IndexedSeq[Int] = Vector(3, 2, 4, 5, 6, 7, 9, 8, 10)

Instead of .eval we run .gather(2) which runs eval behind the scenes. But first it chunks the input process. So first, 2 inputs elements, Tasks in this case, are chunked up, then eval is called on the tasks and gather waits until those 2 tasks complete. The results are gathered, out of order, as they complete and only when the tasks in that chunk all complete, gather releases the chunked output. Then the next chunk runs. In contrast, merge will run another Task as soon as one completes, up to the total number of concurrency you have specified, within the chunk giving you better pipeline parallelism.

Last updated