Recipe: Data-Driven Stream

If you need to process the first item in a stream to generate a stream item and then stream the rest of the items you can do something like:

val s = yourstream
def derive(sitem): A { ... }
// Derive some output from the first item in the streamm
s.take(1).flatMap(a => Stream.emits(derive(a), a)) ++ s.tail

The challenge with this is that if your stream is expensive to process, you will process that first item twice. You may be able to work around that in your stream.

We could use a Pull here to accomplish the same thing without having to "run" the underlying s stream twice.

scala> val s = Stream.range(1,10) 
s: fs2.Stream[Nothing,Int] = Segment(Emit(Chunk(()))).flatMap(<function1>)

scala> def derive(i: Int) = i+100 
derive: (i: Int)Int

scala> s.pure.pull(h => h.receive1 { (a, h) => Pull.output1(derive(a)) >> Pull.output1(a) >> h.echo}).toVector 
res0: Vector[Int] = Vector(101, 1, 2, 3, 4, 5, 6, 7, 8, 9)

The basic idea is that a pull request on a stream gives you a Handle. That Handle allows you to receive one element. Of course, if all you did is output that one element that's equivalent to take(1) so we need to use the Pull object output something derived from the first item, output the item then echo the rest of the output. Most of the combinators on Handle create a Pull e.g. h.echo (the inner h) rucursively creates a Pull object by echoing the input. >> is essentially a sequencing operator, so, given the Handle, compose a sequence of Pull objects whose last one keeps pulling whatever is presented by the Handle.

Last updated