Recipe: Delay running a stream

If you need to delay running a stream, you can easily delay it by using the time functions.

import fs2._
import fs2.util._
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
implicit val strategy = Strategy.fromExecutionContext(global)
implicit val scheduler = Scheduler.fromFixedDaemonPool(4)
implicit val F = Async[Task]
scala> // delay a stream by 10 seconds
     | time.sleep(5.seconds).run.unsafeRun

This creates a stream that sleeps for a short time. Using unsafeRun will block the current thread though as unsafeRun awaits the completion fo the task created (our F effect) from the sleep stream.

If we want to run the stream in the background, we could:

scala> Await.ready( // Await is only here to capture the output for display
     |     time.sleep(5.seconds).run.unsafeRunAsyncFuture
     |     , Duration.Inf)
res2: scala.concurrent.Future[Unit] = Future(Success(()))

Which asks to run the Task (created from run) but use a scala Future. The .foreach prints out the result for convenience although in this case there is no output as expected.

To sleep a stream, we need to "add" on our own stream after the sleep:

scala> (time.sleep(5.seconds) ++ Stream.emit(true)).runLog.unsafeRun
res3: Vector[AnyVal] = Vector((), true)

This emits a Unit and a true after a 5 second delay. We can use this to write a pipe that sleeps the contents:

/** A pipe that given a stream, delays it by delta. */
def sleepFirst[F[_], O](delta: FiniteDuration)(implicit F: Async[F]): Pipe[F, O, O] =
  delayed => time.sleep(delta).flatMap(_ =>  delayed)
scala> Stream.emit(true).through(sleepFirst(5 seconds)).runLog.unsafeRun
res5: Vector[Boolean] = Vector(true)

Instead of stream concatenation, we used flatMap to drop the Unit emitted by the time.sleep.

There are other ways to delay a stream. Streams are parameterized on F so we can use our chosen effect to potentially delay things.

For example, lets generate stream elements that are a result of calling to the outside world to obtain a security token periodically. Security tokens expire on a schedule that is often indicated by the current security token. If we want to generate a stream that obtains a new token just before the previous token expires but otherwise sleep, we can use Stream.unfoldEval and Task.schedule:

import java.util.concurrent.{TimeUnit  => TU}

case class AuthToken(expiresIn: FiniteDuration)

// Simulate a taken that has a varying expiration time
// A return value of None indicates an error of some sort.
// You can use a Try or Either to capture any error information.
// The Task reflects the asynchronous nature of obtaining a token
// from a web service.
def getNextToken() = { 
   println(s"Getting token: ${java.time.Instant.now}")
   Task.delay(Some(AuthToken((math.random * 5).seconds)))
}

// Create a token stream. It will immediately try to get the first token.
val authTokenStream = Stream.unfoldEval((0.0).seconds){ expiresIn => 
    getNextToken()
    .schedule(expiresIn) // schedule getting the token through the F effect (=Task)
    .map{ nextTokenOpt => 
       // If a token is found, emit the token and set the state to the next time it is needed
       // with a small decrease so it does not expire before the call is issued.
       nextTokenOpt.map { nextToken => 
           val delay = FiniteDuration((nextToken.expiresIn * 0.95).toNanos, TU.NANOSECONDS)
           (nextToken, delay)
       }}}

Run it and make sure there is a way to stop i.e. only take 10 values:

scala> authTokenStream.take(10).runLog.unsafeRun
Getting token: 2017-08-15T13:49:56.004Z
res15: Vector[AuthToken] = Vector(AuthToken(4871614083 nanoseconds), AuthToken(203938202 nanoseconds), AuthToken(20214022 nanoseconds), AuthToken(73392269 nanoseconds), AuthToken(423479764 nanoseconds), AuthToken(143081291 nanoseconds), AuthToken(4430848941 nanoseconds), AuthToken(1686590951 nanoseconds), AuthToken(417218776 nanoseconds), AuthToken(52172066 nanoseconds))

We could make this a bit more general:

 /** Unfold, periodically checking an effect for new values.
    * Time between checks is obtained using getDelay potentially
    * using the returned effect value.
    * @param A Output element type.
    * @param f Call an effect to get a value.
    * @param getDelay Extract amount to delay from that value.
    */
  def unfoldEvalWithDelay[A](f: => Task[Option[A]],
    getDelay: A => FiniteDuration)
    (implicit F: Async[Task], strategy: Strategy, scheduler: fs2.Scheduler) =
    Stream.unfoldEval((0.0).seconds){ delay =>
      f.schedule(delay)
        .map{ opt =>
          opt.map { a =>
            (a, getDelay(a))}}}

  /** Calculate a delay but use fraction to shorten the delay. */
  def shortenDelay(fraction: Double = 0.95, delay: FiniteDuration): FiniteDuration =
    FiniteDuration((delay * 0.95).toNanos, TU.NANOSECONDS)

and then call it like:

scala> unfoldEvalWithDelay(getNextToken, (auth: AuthToken) => shortenDelay(delay=auth.expiresIn)).take(5).runLog.unsafeRun
Getting token: 2017-08-15T13:50:07.949Z
res19: Vector[AuthToken] = Vector(AuthToken(299479746 nanoseconds), AuthToken(2231396086 nanoseconds), AuthToken(619609193 nanoseconds), AuthToken(2901539413 nanoseconds), AuthToken(3456834684 nanoseconds))

Last updated