More Than One Stream

More Than One Stream

Some of your solutions will require more than one fs2 stream to be run at the same time. You know when you need more one stream in your solution when you need to call .run on more than one stream. Since more than one stream is running at a time, processing needs to occur asynchronously and perhaps in parallel.

You may often find the need for multiple streams when you have a long running process or multiple processes that start and stop during different processing stages but you have common infrastructure processes that run independently of the other processes, for example, stream logging.

You may need multiple streams to handle asynchronous adaption to external systems that act as sources/sinks, such as webservices or databases. Multiple streams have to be coupled in some way to each other, typically asynchronously.

Assuming we can setup our streams to perform our application specific processing, how do we run multiple streams in a program?

  • Use "merge" on a stream to merge with the second stream. The second stream should be merged and drained so that the net effect is that the second stream runs, produces no output but halts when the main stream ends.

  • Use concurrent.join (parallel running of multiple streams)

  • Use a queue-model to insert objects into your second stream for processing and run it in the background. Your main business API will need to have an API to insert items into the queue.

  • Run your effect that in turn, runs a stream asynchronously. Perhaps it uses a queue or a signal to communicate with other streams or domain logic.

Merging/Draining

The easiest way to run two streams is to merge them together, drain one of the streams and terminate it when the main stream completes.

scala> // counter
     | val counter = new java.util.concurrent.atomic.AtomicInteger(0)
counter: java.util.concurrent.atomic.AtomicInteger = 0

scala> // continuously count from 0 to 999, forever
     | val foreverStream = Stream.range(0,1000).map{i => counter.getAndIncrement(); i+1000}.repeat
foreverStream: fs2.Stream[Nothing,Int] = append(Segment(Emit(Chunk(()))).flatMap(<function1>).mapChunks(<function1>), Segment(Emit(Chunk(()))).flatMap(<function1>))

scala> // just count from 0 to 9, once
     | val mainStream = Stream.range(0,10)
mainStream: fs2.Stream[Nothing,Int] = Segment(Emit(Chunk(()))).flatMap(<function1>)

scala> // Use drain to get rid of data from foreverStream. It still runs as
     | // evidenced by the counter. Without any output or need to merge,
     | // the foreverStream runs much faster than the main stream as 
     | // evidence by the counter's high value.
     | mainStream.mergeHaltBoth(foreverStream.drain).runLog.unsafeRun
res7: Vector[Int] = Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)

scala> println(s"counter: ${counter.get}")
counter: 105

scala> counter.set(0)

scala> // Not using drain means that the data is merged and its printed.
     | // The data from foreverStream is interspersed
     | // in the output due to non-determinism in the merge although each
     | // stream's outputs come in the proper order.
     | mainStream.mergeHaltBoth(foreverStream).runLog.unsafeRun
res14: Vector[Int] = Vector(0, 1000, 1, 1001, 2, 3, 4, 1002, 1003, 5, 1004, 6, 7, 1005, 1006, 1007, 1008, 1009, 8, 9, 1010, 1011)

scala> println(s"counter: ${counter.get}")
counter: 13

Using a Queue to Communicate

Let's assume that your second stream runs asynchronously and that you need to interface to it in a non-stream manner. You may need to run many, multiple background processing streams like this. The approach is similar to that described in the fs2 user guide that describes how to communicate with the outside world.

Regardless of which method you use, you need to ensure you can stop the background stream. If you use "merge" then it stops automatically but that's only useful if your main application logic is formulated as a stream. You could use atomic flags or other such mechanisms. Another approach is to return an effect from the second stream that when run stops the background stream.

The trick in composing your solution is recognizing that since we are using streams and effects, your solution will forever live inside the stream or effect monad. The stream/effect "leaks" into your API in some way--you can't avoid it. Since your application probably wants to choose its own effect, you are probably Ok with exposing an "effects" based API. For example, you might return a Task/IO monad based interface that requires the main application to run the effect in order to perform some function in the background stream.

scala> import fs2.async._
import fs2.async._

scala> object Processor {
     |   // backend processor, uses values for something application specific
     |   private def streamProcessor(queue: mutable.Queue[Task, Int]) = queue.
     |         dequeue.
     |         map(_ + 10).
     |         evalMap(i => Task.delay{println(s"Processed data and created $i"); i})
     | 
     |   def make: Task[(Int => Task[Unit], Task[Unit])] =  for {
     |      queue <- boundedQueue[Task, Int](10)
     |      interrupt <- signalOf[Task, Boolean](false)
     |      // this starts the background processing once the outer Task is run
     |       _ <- Task.start { streamProcessor(queue).interruptWhen(interrupt).run }
     |   } yield (queue.enqueue1, interrupt.set(true))
     | }
defined object Processor

scala> case class AppLogic(enqueue1: Int => Task[Unit]) {
     |   def process(a: Int) = enqueue1(a)
     | }
defined class AppLogic

scala> // Everything needs to flatMap/map into a Task
     | val runitTask = for {
     |    processor <- Processor.make
     |    _ <- AppLogic(processor._1).process(10)
     |    _ <- processor._2
     | } yield ()
runitTask: fs2.Task[Unit] = Task

scala> // Run the processing of a single Int in the background,
     | // which places the Int into a background stream.
     | runitTask.unsafeRunAsync(_ => ())

scala> // Ignore this...
     | Thread.sleep(3000)

concurrent.join

This method allows you to run multiple streams in parallel. You may do this if asynchronous behavior is not sufficient for your application.

To use this method, you must generate multiple streams wrapped in an outer stream. The "inner" streams can do different things such as write to database or process data. concurrent.join outputs the results of the stream in non-deterministic order. If you do not want any output, you can "drain" or "filter" the output.

scala> val stra: Stream[Task, Int] = Stream.range(1,5).evalMap{i => Task.delay{ println(s"a-$i"); i }}
stra: fs2.Stream[fs2.Task,Int] = Segment(Emit(Chunk(()))).flatMap(<function1>).flatMap(<function1>)

scala> val strb: Stream[Task, Int] = Stream.range(20,25).evalMap{i => Task.delay{ println(s"b-$i"); i }}
strb: fs2.Stream[fs2.Task,Int] = Segment(Emit(Chunk(()))).flatMap(<function1>).flatMap(<function1>)

scala> val strc: Stream[Task, Int] = Stream.range(40,45).evalMap{i => Task.delay{ println(s"c-$i"); i }}
strc: fs2.Stream[fs2.Task,Int] = Segment(Emit(Chunk(()))).flatMap(<function1>).flatMap(<function1>)

scala> val outer: Stream[Task, Stream[Task, Int]] = Stream(stra, strb, strc)
outer: fs2.Stream[fs2.Task,fs2.Stream[fs2.Task,Int]] = Segment(Emit(Chunk(Segment(Emit(Chunk(()))).flatMap(<function1>).flatMap(<function1>), Segment(Emit(Chunk(()))).flatMap(<function1>).flatMap(<function1>), Segment(Emit(Chunk(()))).flatMap(<function1>).flatMap(<function1>))))

scala> concurrent.join(3)(outer).runLog.unsafeRun
res22: Vector[Int] = Vector(40, 20, 1, 41, 21, 2, 42, 22, 3, 43, 23, 4, 44, 24)

Last updated