Recipe: Stop an infinite Stream when you want to

Sometimes you need to stop a stream that you working on that runs indefinitely or running in the "wild" (which means you may have a few streams running independently in your application). It's obvious using the "pull" model that when the input stream runs out of data, the overall stream should stop. But what if the stream is infinite? We can use a Signal to interrupt a Stream. You can also use a stream of booleans but we will show the signal approach.

// set the effect to use and ensure an Async is available
val done = async.signalOf(false).unsafeRun
// run your inifinite stream...
your_inifintite_or_bounded_stream.interruptWhen(done).run.unsafeRunAsyncFuture()

println("Press Enter to stop...")
readLine()
done.set(true).unsafeRun

We are using a Signal outside of a stream, which is perfectly allowable. Here's what's going on:

  • First, we need a Signal, so we create one. But its wrapped in a Task (the effect), so we need to extract it out using unsafeRun.

  • Signal.interrupt takes a stream argument and runs that stream. Underneath, it uses Stream.interruptWhen that stops the stream when the Signal is set to true.

  • The readLine() simulates some condition that you control.

  • The last line sets the Signal to true. But since the Signal communicates through the "effect", you need to run the Task returned from set that actually sets the value.

Once set, the stream will be interrupted and stop.

Notice that we used unsafeRun which is not strictly necessary. You could do all of this in a streamy way. The approach was described in this gist. There is also a good description of pausing.

scala> /** Interrupt immediately. */
     | def interruptNow[F[_], A](implicit F: Async[F]): Pipe[F, A, A] =
     |     toBeInterrupted => 
     |         // Create a cancellation signal
     |         Stream.eval(async.signalOf(false)).flatMap { cancellationSignal =>
     |            // Attached it to the input stream
     |            toBeInterrupted.interruptWhen(cancellationSignal).merge(Stream.eval_(cancellationSignal.set(true)))
     |     }
interruptNow: [F[_], A](implicit F: fs2.util.Async[F])fs2.Pipe[F,A,A]

scala> /** Delay interruption. */
     | def interruptAfter[F[_], A](delay: FiniteDuration)(implicit F: Async[F]): Pipe[F, A, A] =
     |   toBeInterrupted =>
     |     Stream.eval(async.signalOf(false)).flatMap { cancellationSignal =>
     |       toBeInterrupted.interruptWhen(cancellationSignal).merge(time.sleep_(delay) ++ Stream.eval_(cancellationSignal.set(true)))
     |     }
interruptAfter: [F[_], A](delay: scala.concurrent.duration.FiniteDuration)(implicit F: fs2.util.Async[F])fs2.Pipe[F,A,A]

How does this work?

  • Create a pipe because we want to create a combinator that is easy to use and that would convert an existing stream.

  • Create a signal that allows the use of Stream.interruptWhen. The signal must be a boolean.

  • Since signals are wrapped in effects, use a stream to unwrap the signal so we can use it directly in toBeInterrupted.interruptWhen

  • Merge the "interruptable" stream with a stream that sets the interrupt signal to true.

  • Yes, this is a silly example, because interruption is really only useful if it is criteria based.

  • interruptAfter interrupts based on a delay-based criteria. So this is not silly.

val logStart = Stream.eval_(Task.delay(println("Started: " + System.currentTimeMillis)))
val logFinished = Stream.eval_(Task.delay(println("Finished: " + System.currentTimeMillis)))

def randomIntEffect(): Task[Int] = Task.delay { (math.random * 100).toInt }

val infiniteStream = time.awakeEvery[Task](1.seconds).evalMap{ delta => 
    println(s"Delta: $delta")
    randomIntEffect()
}

Now we can run this, first interrupting immediately:

scala> val solutionNow = logStart ++ infiniteStream.through(interruptNow) ++ logFinished
solutionNow: fs2.Stream[fs2.Task,Int] = append(append(attemptEval(Task).flatMap(<function1>).flatMap(<function1>), Segment(Emit(Chunk(()))).flatMap(<function1>)), Segment(Emit(Chunk(()))).flatMap(<function1>))

scala> solutionNow.runLog.unsafeRun
Started: 1502804969299
res4: Vector[Int] = Vector()

then interrupting after a 6 seconds, which may gives 5 or 6 results:

scala> val solutionAfter = logStart ++ infiniteStream.through(interruptAfter(5.seconds)) ++ logFinished
solutionAfter: fs2.Stream[fs2.Task,Int] = append(append(attemptEval(Task).flatMap(<function1>).flatMap(<function1>), Segment(Emit(Chunk(()))).flatMap(<function1>)), Segment(Emit(Chunk(()))).flatMap(<function1>))

scala> solutionAfter.runLog.unsafeRun
Started: 1502804969481
res5: Vector[Int] = Vector(98, 51, 39, 28, 83)

We could generalize this more to any trigger wrapped in an effect and when "triggered," interrupts the stream. We could even use another "signal" based on the first approach. If you want to be able to interrupt a stream in a repl that is running asynchronously in the background, we can setup a small effect, compose it into the stream, then "run" the effect in the repl to terminate a stream running in the background.

INSERT COOL EXAMPLE HERE

Last updated