Recipe: Retrieving Web Pages

Sometimes you need to retrieve pages, in sequence, in order to complete a logical request. For example, if you retrieve a collection of data and the web service returns "paged" results. In sequence, you need to retrieve the web pages and use a paging marker, included in each response, to make the next request. As part of the process, you need to ensure your authentication token, is always valid. Authentication tokens often time out after some specified time and needs to be renewed. While you could wait for a request to fail then renew the token, its easier and probably best to run at timer that renews the auth token before it expires.

The following was taken from the fs2 gitter channel.

Suppose you have a token

case class AuthToken(x: Int)

Let's say you want to create a stream of tokens that is refreshed every so often, that way you always have fresh auth tokens to use in your web calls. This refresh needs to occur completely independently of anything else but because an auth "stream" is a stream, it needs to be "run" at some point which means it has to be hooked up to other streams and those other streams need to be run, and in turn, the auth stream "run."

Given an auth stream, you have to compose it with functions that get pages from our web service. So let's define the auth stream:

val authStream: Stream[Task, AuthToken] = time.awakeEvery[Task](60.minutes).
  evalMap { _ => Task.delay(AuthToken((math.random * 1000).toInt)) }

We need the retrieval system to obtain pages and clearly a retrieval system needs an auth. You should consider the "page", in the example below, to be a paging token that is needs to be in each request. "page" is "state" that is carried between requests. getPage returns an Option that allows it to indicate that there are no more pages to obtain. Returning None will be used in unfoldEval below to stop the stream.

/** Data returned from the fake web service. */
case class Data(value: String)

/** Simulate a stateless web service call */
def getPage(authToken: AuthToken, page: Int): Task[Option[(Data, Int)]] = 
  Task.delay(if (page < 10) Some((Data("asdf"), page + 1)) else None)

Now we need a way to call our fake web service that obtains pages in sequence. To obtain one logical result, we have to "page" through a sequence of "pages."

"Unfold" uses a generative function to obtain pages. unfoldEval uses a generative function that evaluates an effect to get the generated value, hence the Eval part of unfoldEval evaluates that effect as it unfolds.

By unfolding, one logical set of pages can be retrieved from the web service, page by page, until the last page is returned and getPage returns None. unfoldEval looks for the none to terminate the unfolding so you have to arrange for your retrieval system to return None when there is nothing less to retrieve (or adapt the output value of the retrieval to provide a None). Of course, the getPage was designed to do exactly this.

/** Create a stream that has a generative function
 * that retrieves pages. The "page" state needs to
 * to be passed to each invocation.
 */
def retrievePages(authSignal: async.immutable.Signal[Task, AuthToken]) = 
  Stream.unfoldEval(0) { page => 
    authSignal.get.flatMap { currentAuthToken => getPage(currentAuthToken, page) } 
}

So clearly we need a Signal as an argument. If we look at Signals, we know that a Signal is memory cell that can only be read or set using an effect. So if we create a signal and pass it into retrievePages, we can use .get to obtain an effect then flatmap into the effect to use the value directly.

We still need to compose the auth stream with the retrieval system to get a stream of web page Data. We know that streams are often thought of as a sequence of emmitted values, but it useful to realize that if you flatMap a stream that only has one value, you create a stream, but the flatMap causes whatever is inside to become the new content of that stream. So a Stream in fs2 can sometimes be used solely to evaluate a single element wrapped in an effect, then use then use that wrapped value to generate the real stream you want to create.

You can then compose the auth token stream and the retrieval system using:

val data = Stream.eval(async.signalOf[Task,AuthToken](AuthToken(0))).flatMap { authTokenSignal =>
    authStream.evalMap(newToken => authTokenSignal.set(newToken)).drain mergeHaltBoth
    retrievePages(authTokenSignal)
}

What's going on here?

  • Stream.eval: Creates a new stream by evaluating an effect. The effect in this case is a Task. We do this solely to get access to the value inside. Since async.signalOf creates a Task with a signal inside, we need to eval it then flatMap into it so we have the actual signal object.

  • async.signalOf[Task](AuthToken(0)): This creates a Signal. A Signal is a memory cell that holds a value. The value is protected by an Effect, in this case a Task. So a Stream.eval on a Task creates a Stream with the return value of the Task, which is the Signal.

  • By evaluating the effect surrounding the Signal, we can get to the actual AuthToken. In this sense, Stream.eval really is just being used to extract out the actual Signal object and at the same time indicate that we want to use it to create a Stream.

  • flatMap { authTokenSignal =>: This creates a new stream from the one created by Stream.eval. It can create a stream using the actual signal value, which in this case is an AuthToken.

  • authStream: We have a stream that generates auth tokens. It only produces a new "fresh" token every so often so we might be asking ourselves how this stream is going to do anything for us? How do we get what's inside this stream into our stream that generates web result data? The secret is to recognize that what we really need is a stream whose sole purpose is to set the signal with a fresh auth and then the other stream needs to use that signal to get the auth when that stream wants an auth. The signal can be used to communicate across streams.

  • authStream.evalMap: This makes more sense, we will map into the authStream and create a stream with function that gets mapped into the AuthToken. The eval part says to eval the effect that the map function has inside it.

  • So what we need to do inside evalMap is take the token and return an effect which contains the values that we want in our stream. Interestingly enough, Signal.set(value) returns a Task[Unit]. Hence, authStream.evalMap(newToken => authTokenSignal.set(newToken)) creates a stream that when run, will have the side effect of setting the value of the signal.

  • drain on the mapped authStream just drains all the values, so it runs when the stream is run, but it outputs no values. So draining a stream allows us to "run" it when it gets connected to another stream but not worry about having to use its values. Since the "drained" stream is designed to set the value of the signal, all we need to really worry about is ensuring that the auth stream is "stopped" when the other main stream that processes data ends.

  • So now we have a <stream on the left> mergeHaltBoth <stream on the right>. The <stream on the left> outputs nothing but does run the stream so it continues to cause auth tokens to be generated every so often. The real values come from <stream on the right>.

  • retrievePages(authTokenSignal) is a stream that retrieves pages. So the stream on the left causes auth tokens to be generated and set into the "signal." The right hand stream actually returns values from the web service. It uses the token signal to "get" the auth token.

The bottom line is that a Signal is used to communicate across Streams. One stream generates AuthTokens and another stream needs that token. You merge them together to ensure that both streams are "run." You could not zip the streames together because then the stream that generates pages cannot access the AuthToken in time to generate the pages.

The fundamential composition is to create a stream that sets a signal, then to merge it with the "data" stream that uses the signal to obtain an auth when desired. You merge the two streams in such a way that you ignore the values (via drain) from the auth stream.

Last updated