Cafe

Scalaz streams (sstreams) is in an in-process pipeline library. It employs a functional approach to applications that envision their processing as functions acting on streams of data. The library is fairly new but it has roots in a similar haskell-based version.

In order to become familiar with the library, I reworked the cafe example found in spring integration (SI). SI overlaps conceptually with sstreams. SI also contains many adapters for sourcing or sending data to and from external, out-of-jvm systems and is a mature library. Recently, SI introduced a java DSL creating a network which can replace the XML application context specification approach. I do not have any issues with XML based configuration. However, since sstreams is code based, we will compare the sstreams based example to the java DSL example.

There are many ways to handle concurrency and data processing and most languages have several frameworks or libraries for handling pipeline processing. We'll see by the end of this article that many of the libraries/frameworks provide a decent last-mile that allows you to write succinct code as the end-programmer. In other words, productivity for creating an app is uniformly low regardless of whether you take a functional or non-functional approach or use SI or sstreams. The real proof, which is not demonstrated in this article, is when you need to make modifications to the existing application for new use cases. If making those changes are easy, do not cause a rewrite and are conceptually easy to follow, then you have a winner.

As a side note, the SI system looks like a message bus like environment for within-JVM work. However, you can also think of the SI framework as a way to manage concurrency in complex applications. The now standard Actor model in scala uses messages and actors to handle the same thing and proves out that creating tight boundaries around data, immutability and "message passing" is a useful abstraction to create concurrent programs. sstreams uses a Process to connect "tasks" where a Task is the core concurrency abstraction (actually its a scalaz Future, but a scalaz Task has more convenience methods).

As you work the article below, realize that SI describes itself using a vocabulary that overlaps with sstreams somewhat, but the semantics for some of the words are different. For example, a SI "channel" is not the same as a sstreams "channel."

Use Case: The Cafe

The cafe example is a simple example of Enterprise Integration Patterns (EIP). The basic premise is that in a cafe, orders for hot and cold drinks are fulfilled. Hot and cold drinks are created by different baristas. The basic components include

  • A cafe that takes orders. The orders could come from a server or any other place, but you can think of the cafe as an order taking data entry terminal for a server or as a feed from the internet.

  • An order splitter that splits hot and cold orders. Each part of the order could have multiple drinks e.g. a table order 3 hot drinks.

  • A router that routes hold and cold orders to the appropriate destinations

  • Baristas, who make either hot or cold orders

  • A waiter who takes the finished drinks and does something with them, such as serves them or

The important part of the use case is that hot and cold orders are created at different rates as creating a hot drink requires heating the drink prior to serving. The hot and cold drink order queues are of different sizes, only so many orders can be in each queue at one time. The number of baristas can change. For example, there can be 3 hot drink baristas and 1 cold drink barista but if more cold drink orders come in, additional baristas could be added to the cold drink queue.

It is easy to imagine that this is a good scenario for akka actors as well. There is a good discussion of a very similar scenario here. The spring integration documentation describing this example is here. The spring integration java dsl is described here and here. The github example is located here.

A really great blog on sstreams is here. Another nice tutorial is here.

Basic Objects

For clarity, I will create the basic objects similar to what is done in spring integration. You could just use functions instead of objects, but a real world example would need some objects to describe the basic entities and manage other related data and behaviors.

In SI, the main driver creates orders in a for-loop and and issues placeOrder calls to a cafe. In SI, a Cafe is an interface that is proxied by spring. When a caller makes a call to the proxy, it is really placing an Order object into a channel called the requestChannel.

For sstreams, I will make the Cafe a source of Orders that can be "pulled" by the processing engine. Each time the method getOrder is called, a new order will be produced. In order to simulate the end of all orders and shutdown the cafe for the day, we will limit the number of orders retrieved by sstreams using take() applied to the Cafe order stream itself. We could have encoded the order "limit" inside the Cafe object, for example, by having the "process" inside the Cafe that the external world accesses much like a message bus endpoint or an observable value. We could have the Cafe place a message onto a message bus that is picked up by another process that runs sstreams. Many variations exist.

The take() call on the Cafe takes the place of the app for loop in SI and limits the number of orders in the example. Because Cafe keeps track of the number of orders internally, obtaining an order modifies Cafe state. In the spirit of sstreams, the driver program that runs the processes will "ask" the Cafe for an Order. I will make the ask for an Order to be asynchronous because in the real world an order may be retrieved from sources with variable latencies. We do not want blocking behavior although back pressure further downstream could create semantically defined blocking. In the real world, Orders would come from an online ordering system or an order taker and a POS terminal at the counter.

Here's the definition of some core objects including Cafe.

package cafe

import java.util.concurrent.{ atomic => juca }

sealed trait DrinkType
case object Espresso extends DrinkType
case object Latte extends DrinkType
case object Cappuccino extends DrinkType
case object Mocha extends DrinkType

object DrinkType {
  private val indices = Map(0 -> Espresso, 1 -> Latte, 2 -> Cappuccino, 3 -> Mocha)
  private val random = scala.util.Random
  def randomDrink() = indices(random.nextInt(3))
}

case class Item(
  onumber: Int,
  drinkType: DrinkType,
  shots: Int = 1,
  iced: Boolean = false)

case class Order(
  number: Int,
  items: Seq[Item])

case class Cafe(
  name: String = "MyBucks") {
  import DrinkType._

  private val orders = new juca.AtomicInteger(0)
  private val random = scala.util.Random

  def getOrder(): Order = {
    val orderNum = orders.addAndGet(1)
    Order(orderNum, Seq(
      Item(orderNum, randomDrink(), random.nextInt(5), random.nextBoolean()),
      Item(orderNum, randomDrink(), random.nextInt(5), random.nextBoolean())))
  }

}

To make it more interesting, the drink and shot values are randomly generated.

With this in place we can write a small main to test the processes. The comments in the code should explain how the program is composed.

package cafe

import com.twitter.app.App
import scalaz.concurrent.Task
import scalaz._
import stream._
import scalaz.\/._

object Main1 extends App {

  val norders = flag[Int]("norders", 5, "the number of orders to simulate")

  def main() {
    val c = Cafe()
    println(s"Cafe ${c.name} opened for business")

    // An asynchronous task that asks for a single order in
    // case getOrder() does always not return quickly.
    val askForOrder = Task { c.getOrder() }

    // A process that repeatedly ask for an order
    val askForOrders = Process.repeatEval(askForOrder)

    // The process that asks for N orders. Normally the ordering
    // process would end when the Cafe closes. This process
    // enters a Halt(Cause.End) state automatically after the take() reaches
    // its limit.
    val orders = askForOrders.take(norders())

    // Run and collect the results for print out.
    val orderingResult = orders.runLog.run
    orderingResult.foreach(order => println(s"order: $order"))
  }

}

Your output will vary due to the randomness of the orders, but mine looked like

Cafe MyBucks opened for business
order: Order(1,List(Item(1,Espresso,1,true), Item(1,Latte,0,false)))
order: Order(2,List(Item(2,Cappuccino,4,true), Item(2,Cappuccino,2,true)))
order: Order(3,List(Item(3,Latte,3,true), Item(3,Espresso,0,false)))
order: Order(4,List(Item(4,Latte,4,true), Item(4,Espresso,3,false)))
order: Order(5,List(Item(5,Latte,0,true), Item(5,Cappuccino,3,false)))

You can see how the thinking around sstreams is different than SI. For example, instead of channels and dispatchers, you think about processes. The logic is still held by your domain objects but as you can see, objects are not essential when using sstreams. How the processes are composed using the F[_] environment determines how your program runs. For example, using Task.apply() (using Task(...your function...) indicates that the Order request should run asynchronously. Its equivalent to specifying the dispatcher type in SI. There is a good tutorial of scalaz's Task here.

Order Splitter

The next step is to split the order. In this scenario, this just means that the drink items are split from the order and the order itself disappears. In real life, the other parts of the order may go off to the kitchen or to database for record keeping. In both sstreams and SI, this is a simple operation. In SI, the order splitter is a class. In sstreams, its just a map function.

    // Splitting the order means extracting out the drinks.
    val drinksOnly = orders.map(_.items)

    // Run and collect the results for print out.
    val orderingResult = drinksOnly.runLog.run
    orderingResult.foreach { item => println(s"item: $item")}

This produces the expected output, which is the items instead of the full order.

Cafe MyBucks opened for business
item: List(Item(1,Espresso,1,true), Item(1,Espresso,3,false))
item: List(Item(2,Latte,4,false), Item(2,Espresso,3,false))
item: List(Item(3,Espresso,0,true), Item(3,Cappuccino,1,true))
item: List(Item(4,Cappuccino,1,true), Item(4,Cappuccino,2,true))
item: List(Item(5,Espresso,3,false), Item(5,Cappuccino,1,false))

Drink Router

The next steps is to peel off the hot drinks for one set of Baristas and the cold drinks for another set of Baristas. In SI, the drink type, based on iced in the order item, determines which channel to place the message. A router takes an input message and returns a string that names the channel to publish the message to. The framework takes the returned string, and uses the string->channel mapping to lookup the channel then sends the message to the channel. The router can be completed defined in XML markup using the spring expression language for selection criteria, but in this case, the SI example defines a class that return the destination "string" indicating the hot or cold channel.

In sstreams,there are a couple of ways to do this. Before we cover an approach, you need to remember the sstreams abstraction. In sstreams, you create a Process, modify it, connect it other process and eventually wind up with a single Process object that you convert to your F[_] in order to start pipeline processing. It is quite common to use F[_]=Task[_] as the environment for the pipeline. A Task is just a container for a function/job to run once you decide how to run it. The .runLog or .run converts a Process to a Task. You run this task potentially multiple times to get the results or execute side-effects. The .run converts the Process to a single Task and that Task is the "interpreter" for the pipeline program you composed. When you need to split a process's output into two pipes, you may immediately wonder if you have two pipelines that you must .run to generate two interpreters.

The best way to answer this question is to remember that sstreams is a pull model. Data is pulled from the end of the pipeline. A pull strategy is reactive (you may need a push to start the processing of course :-)). For example, a desire to print the domain objects to the console acts as a "pull" to start a processing pipeline via .runLog. The rest of the pipeline reacts to this request. Obviously, the reactive model can also react to an object showing up asynchronously. Reactive concepts can be applied to both ends of the pipeline as long as a pull happens from one of the ends. The pull action is propagated using sstreams messages (Emit, Await, Halt) back to the source. sstreams also supports pushing data into the pipeline, for example, if you are polling a network resource to see if data is available to process. However, you have to have the interpreter already running so that it it pulls the object once its available at the entry point.

If the stream needs to be split to two different locations, say two completely different sink endpoints, then yes, you will use two different interpreters. Your app has two endpoints, probably running in their own threads and essentially each is a "stream." It just happens that they share the same source, which in this case is the same in-process source. You would also need to decouple the source, in this case the last part of the pipeline generating orders, so the order can be available to different streams. Then you would run each sink as you see fit with the coordination point being some object that attaches to the "source."

In our case, the stream joins again after Barista's generate the drinks. The drinks are sent to a waiter who needs to deliver them somewhere and we assume the waiter delivers them instantaneously. Since Baristas operate at different speeds, we will need a queue to hold the drink orders until Barista's can process them. Since we want hot and cold orders to flow independently, the queue cannot be on the single stream of Orders, but need to be after the "split."

Let's build this up gradually. First lets convert the runLog approach to a side effect approach using a sink. In this iteration, we model the waiter as a Sink for strings that outputs the strings to stdout, as if the waiter is really a message board in the restaurant. Then we will just map the last output from above, the list of drink items and send it them to the sink. We use .run.run now because we do not need to collect result using .runLog and the output to stdout is the confirmation that the process ran correctly. We use a quick .map(_.toString) to convert the list of Items to a string for display purposes. io.stdOutLines needs string as inputs and does not automatically apply .toString to input objects.

    // Splitting the order means extracting out the drinks.
    val drinksOnly = orders.map(_.items)

    val waiter = io.stdOutLines
    val drinkOrderingSystem = drinksOnly.map(_.toString) to waiter
    drinkOrderingSystem.run.run

our output looks as expected

Cafe MyBucks opened for business
List(Item(1,Cappuccino,0,false), Item(1,Latte,4,true))
List(Item(2,Cappuccino,2,true), Item(2,Cappuccino,3,true))
List(Item(3,Latte,4,false), Item(3,Latte,0,false))
List(Item(4,Espresso,1,false), Item(4,Latte,2,true))
List(Item(5,Latte,4,true), Item(5,Cappuccino,0,true))

Of course, this does not do what we want yet.

To model hot and cold channels in SI, we will use disjunction and assign each drink item to a Left or Right instance. Left will be cold, Right will be hot. The disjunction will be sent down the stream designated for drink processing. In this iteration, each drink will be process individually. For example, a list of drink items that has three drink orders will turn into three different outputs from the waiter. Instead of using the Barista objects, we'll just sleep for a duration with different durations for hot and cold before ending them to the io.stdOutLines "waiter."

In the reworked main below, we create a disjunction, left is hot, right is cold. Since each order has multiple drinks, we will just use flatMap to convert a sequence of drinks into a process that emits single drinks. If an order has 5 drinks, then there will be a new process returned that emits 5 drinks. \/ in scalaz lingo is just Either in plain scala lingo.

package cafe

import com.twitter.app.App
import scalaz.concurrent.Task
import scalaz._
import stream._
import scalaz.\/._

object Main extends App {

  val norders = flag[Int]("norders", 5, "the number of orders to simulate")

  def main() {
    val c = Cafe()
    println(s"Cafe ${c.name} opened for business")

    // An asynchronous task that asks for a single order in
    // case getOrder() does always not return quickly.
    val askForOrder = Task { c.getOrder() }

    // A process that repeatedly ask for an order
    val askForOrders = Process.repeatEval(askForOrder)

    // The process that asks for N orders. Normally the ordering
    // process would end when the Cafe closes. This process
    // enters a Halt(Cause.End) state automatically after the take() reaches
    // its limit.
    val orders = askForOrders.take(norders())

    // Splitting the order means extracting out the drinks.
    val drinksOnly = orders.map(_.items)

    // Based on "iced", map to left or right. Use \/ (disjunction).
    // Each drink order is broken out individually.
    val coldOrHotIndividualDrink = drinksOnly.flatMap { items =>
      Process.emitAll(items.map { item =>
        if (item.iced) -\/(item)
        else \/-(item)
      })
    }

    val waiter = io.stdOutLines
    val drinkOrderingSystem = coldOrHotIndividualDrink.map(_.toString) to waiter
    drinkOrderingSystem.run.run
  }

}

We could have made the flatMap a bit neater. Regardless, the output is as expected, a stream of individual items wrapped in a disjunction.

Cafe MyBucks opened for business
-\/(Item(1,Latte,3,true))
-\/(Item(1,Cappuccino,2,true))
\/-(Item(2,Espresso,1,false))
-\/(Item(2,Cappuccino,1,true))
\/-(Item(3,Cappuccino,1,false))
\/-(Item(3,Cappuccino,4,false))
-\/(Item(4,Cappuccino,1,true))
\/-(Item(4,Espresso,4,false))
\/-(Item(5,Espresso,3,false))
-\/(Item(5,Espresso,4,true))

So this seems good! Well, no, its not so great. If we think ahead a little, we want composable processes that we can stack together. When we used drinksOnly.flatMap{...} we were making ourselves entirely dependent on drinksOnly which is not so great. What we really want is to be able to pipe any list of "list of drinks" though a process and have that split out like we want. So we could make coldOrHotIndividualDrink take a parameter that is the process with lists of drink items. But we can even skip the parameter. Since we are using sstreams, what we really want is a pipe that we can pipe anything we want as long previous object creates an output consisting of a list of drink items. Let's revise slightly. We need to create a Process1 because we are taking an input and converting it to an output regardless of the Task environment.

package cafe

import com.twitter.app.App
import scalaz.concurrent.Task
import scalaz._
import stream._
import scalaz.\/._

object Main extends App {

  val norders = flag[Int]("norders", 5, "the number of orders to simulate")

  def main() {
    val c = Cafe()
    println(s"Cafe ${c.name} opened for business")

    // An asynchronous task that asks for a single order in
    // case getOrder() does always not return quickly.
    val askForOrder = Task { c.getOrder() }

    // A process that repeatedly ask for an order
    val askForOrders = Process.repeatEval(askForOrder)

    // The process that asks for N orders. Normally the ordering
    // process would end when the Cafe closes. This process
    // enters a Halt(Cause.End) state automatically after the take() reaches
    // its limit.
    val orders = askForOrders.take(norders())

    // Splitting the order means extracting out the drinks.
    val drinksOnly = orders.map(_.items)

    // Based on "iced", map to left or right. Use \/ (disjunction).
    // Each drink order is broken out individually.
    val coldOrHotIndividualDrink = process1.unchunk[Item].map { item =>
      if (item.iced) -\/(item)
      else \/-(item)
    }

    val waiter = io.stdOutLines
    val drinkOrderingSystem =
      (drinksOnly |> coldOrHotIndividualDrink).map(_.toString) to waiter
    drinkOrderingSystem.run.run
  }

}

We get what we want:

Cafe MyBucks opened for business
\/-(Item(1,Espresso,1,false))
\/-(Item(1,Espresso,1,false))
\/-(Item(2,Latte,0,false))
-\/(Item(2,Espresso,4,true))
-\/(Item(3,Latte,4,true))
-\/(Item(3,Latte,1,true))
\/-(Item(4,Cappuccino,0,false))
-\/(Item(4,Cappuccino,4,true))
-\/(Item(5,Espresso,3,true))
-\/(Item(5,Latte,2,true))

Remember, due the random number generator in the order creator, your output results may vary but the structure should match the outputs in this article.

This version creates a reusable process, coldOrHotIndividualDrink that allows us to reuse it in other situations where we have a sequence of drink items and we want to create individual items and assign them to a left or right label. Notice that at the end of the world we used the pipe shorthand, |> to pipe the drinksOnly through our reusable process. We knew to use unchunk above because we wanted a process1 and we looked at the document and found that the signature Process1[Seq[I],I] matched the operation we wanted to perform. We mapped that Process1 using a function that calculates the individual disjunction assignment logic. If we apply the same decoupling design further upstream, we write a process that strips out just the drinks from any order. We now have two reusable pieces. The new fragment is shown below.

    // A process that only grabs the drink orders from an order.
    val drinksOnly = process1.lift[Order, Seq[Item]](_.items)

    // Based on "iced", map to left or right. Use \/ (disjunction).
    // Each drink order is broken out individually.
    val coldOrHotIndividualDrink = process1.unchunk[Item].map { item =>
      if (item.iced) -\/(item)
      else \/-(item)
    }

    // Our fake waiter
    val waiter = io.stdOutLines

    // Our "application"
    val drinkOrderingSystem =
      (orders |>
        drinksOnly |>
        coldOrHotIndividualDrink).map(_.toString) to waiter
    drinkOrderingSystem.run.run

The output is structurally the same:

Cafe MyBucks opened for business
-\/(Item(1,Espresso,1,true))
\/-(Item(1,Cappuccino,3,false))
\/-(Item(2,Latte,0,false))
\/-(Item(2,Latte,3,false))
\/-(Item(3,Espresso,1,false))
-\/(Item(3,Cappuccino,2,true))
\/-(Item(4,Cappuccino,1,false))
\/-(Item(4,Latte,1,false))
\/-(Item(5,Latte,3,false))
-\/(Item(5,Latte,2,true))

Creating Drinks

Now we need to have the Barista create the drink. It's at this point we need to once again think about the problem. In the SI version, a single hot and cold Barista instance each is created to handle the drinks. To have the drink item stream fed to the Barista through a service activator model. The service activator takes the object from a channel, runs it through the method, then takes the output and pushes it into the output channel--the "preparedDrinks" channel. Only a single instance of Barista is created. Really, Barista is really a Barista service. You generically ask the service for a drink to be prepared and call the respective API based on whether the drink is hot or cold. I will match this style of implementation. Similarly, the return object from the Barista should be a Drink. When a Drink is prepared a message is printed to let you know which thread is preparing the drink. In this section, drink creation will happen all on the same thread.

We update our set of domain objects accordingly:

package cafe

import java.util.concurrent.{ atomic => juca }

sealed trait DrinkType
case object Espresso extends DrinkType
case object Latte extends DrinkType
case object Cappuccino extends DrinkType
case object Mocha extends DrinkType

object DrinkType {
  private val indices = Map(0 -> Espresso, 1 -> Latte, 2 -> Cappuccino, 3 -> Mocha)
  private val random = scala.util.Random
  def randomDrink() = indices(random.nextInt(indices.size - 1))
}

case class Drink(
  onumber: Int,
  drinkType: DrinkType,
  shots: Int,
  iced: Boolean)

case class Item(
  onumber: Int,
  drinkType: DrinkType,
  shots: Int = 1,
  iced: Boolean = false)

case class Order(
  number: Int,
  items: Seq[Item])

case class Cafe(
  name: String = "MyBucks") {
  import DrinkType._

  private val orders = new juca.AtomicInteger(0)
  private val random = scala.util.Random

  def getOrder(): Order = {
    val orderNum = orders.addAndGet(1)
    Order(orderNum, Seq(
      Item(orderNum, randomDrink(), random.nextInt(5), random.nextBoolean()),
      Item(orderNum, randomDrink(), random.nextInt(5), random.nextBoolean())))
  }

}

case class Barista() {
  val hotDrinkCounter = new juca.AtomicInteger(0)
  val coldDrinkCounter = new juca.AtomicInteger(0)

  val hotDrinkDelay = 2000
  val coldDrinkDelay = 100

  def prepareHotDrink(item: Item): Drink = {
    Thread.sleep(hotDrinkDelay)
    println(s"""${Thread.currentThread.getName} prepared hot drink #${hotDrinkCounter.getAndIncrement} for order #${item.onumber}: $item""")
    Drink(item.onumber, item.drinkType, item.shots, item.iced)
  }
  def prepareColdDrink(item: Item): Drink = {
    Thread.sleep(coldDrinkDelay)
    println(s"""${Thread.currentThread.getName} prepared cold drink #${hotDrinkCounter.getAndIncrement} for order #${item.onumber}: $item""")
    Drink(item.onumber, item.drinkType, item.shots, item.iced)
  }
}

Now we just need to hook up the stream of drink to the Barista "service." Since we are not doing any parallel activity, we will just lift a function into the environment that processes the input stream and calls the right function. Its fairly trivial. To improve reusability and composability, we will create a Process that can be used for piping.

package cafe

import com.twitter.app.App
import scalaz.concurrent.Task
import scalaz._
import stream._
import scalaz.\/._

object Main extends App {

  val norders = flag[Int]("norders", 5, "the number of orders to simulate")

  def main() {
    val c = Cafe()
    println(s"Cafe ${c.name} opened for business")

    // An asynchronous task that asks for a single order in
    // case getOrder() does always not return quickly.
    val askForOrder = Task { c.getOrder() }

    // A process that repeatedly ask for an order
    val askForOrders = Process.repeatEval(askForOrder)

    // The process that asks for N orders. Normally the ordering
    // process would end when the Cafe closes. This process
    // enters a Halt(Cause.End) state automatically after the take() reaches
    // its limit.
    val orders = askForOrders.take(norders())

    // A process that only grabs the drink orders from an order.
    val drinksOnly = process1.lift[Order, Seq[Item]](_.items)

    // Based on "iced", map to left or right. Use \/ (disjunction).
    // Each drink order is broken out individually.
    val coldOrHotIndividualDrink = process1.unchunk[Item].map { item =>
      if (item.iced) -\/(item)
      else \/-(item)
    }

    def createDrinks(barista: Barista) =
      process1.lift[\/[Item, Item], Drink](_ match {
        case -\/(item) => barista.prepareColdDrink(item)
        case \/-(item) => barista.prepareHotDrink(item)
      })

    // Our fake waiter
    val waiter = io.stdOutLines

    // Our "application"
    val drinkOrderingSystem =
      (orders |>
        drinksOnly |>
        coldOrHotIndividualDrink |>
        createDrinks(Barista())).map(_.toString) to waiter
    drinkOrderingSystem.run.run
  }

}

The output shows that each order was processed in order. The output is more noisy because of the print statements.

Cafe MyBucks opened for business
pool-1-thread-1 prepared hot drink # 0 for order # 1: Item(1,Espresso,4,false)
Drink(1,Espresso,4,false)
pool-1-thread-1 prepared hot drink # 1 for order # 1: Item(1,Latte,0,false)
Drink(1,Latte,0,false)
pool-1-thread-2 prepared cold drink # 2 for order # 2: Item(2,Espresso,3,true)
Drink(2,Espresso,3,true)
pool-1-thread-2 prepared cold drink # 3 for order # 2: Item(2,Latte,4,true)
Drink(2,Latte,4,true)
pool-1-thread-3 prepared hot drink # 4 for order # 3: Item(3,Espresso,4,false)
Drink(3,Espresso,4,false)
pool-1-thread-3 prepared hot drink # 5 for order # 3: Item(3,Latte,3,false)
Drink(3,Latte,3,false)
pool-1-thread-4 prepared cold drink # 6 for order # 4: Item(4,Cappuccino,2,true)
Drink(4,Cappuccino,2,true)
pool-1-thread-4 prepared hot drink # 7 for order # 4: Item(4,Cappuccino,4,false)
Drink(4,Cappuccino,4,false)
pool-1-thread-4 prepared cold drink # 8 for order # 5: Item(5,Espresso,2,true)
Drink(5,Espresso,2,true)
pool-1-thread-4 prepared hot drink # 9 for order # 5: Item(5,Cappuccino,2,false)
Drink(5,Cappuccino,2,false)

We used a single Process to process the drink order stream. However, we may want to set up separate processes to improve composability using the pipe operator, for hot and cold drinks. In the SI example, they setup two channels. Even though the two channels eventually call methods on the same object like ours, the SI example feels different. In the interest of comparison, we will set up two processes and have them call the respective function on the Barista service object. Fortunately, there is an Process1.multiplex method that multiplexes the input. Since the multiplex method outputs the same type of object, we can pipe this output to the waiter as we normally do. We will still use the same Barista object though to match the SI example. multiplex can be used because we sagely split the stream into a disjunction to start with in anticipation of showing this scenario. Since left -\/ is a different type from \/- we can use multiplex.

The new main program is below:

    /**
     * The process that asks for N orders. Normally the ordering
     * process would end when the Cafe closes. This process
     * enters a Halt(Cause.End) state automatically after the take() reaches
     * its limit.
     */
    val orders = askForOrders.take(norders())

    // A process that only grabs the drink orders from an order.
    val drinksOnly = process1.lift[Order, Seq[Item]](_.items)

    /**
     * Based on "iced", map to left or right. Use \/ (disjunction).
     * Each drink order is broken out individually. Cold is left
     * and hot is right.
     */
    val coldOrHotIndividualDrink = process1.unchunk[Item].map { item =>
      if (item.iced) -\/(item)
      else \/-(item)
    }

    /**
     * Any drink order item sent to this process is processed
     * as a hot drink.
     */
    def processHotDrinks(barista: Barista) =
      process1.lift[Item, Drink](barista.prepareHotDrink(_))

    /**
     * Any drink order item sent to this process is processed
     * as a cold drink.
     */
    def processColdDrinks(barista: Barista) =
      process1.lift[Item, Drink](barista.prepareColdDrink(_))

    // Use two separate processes to process the drinks
    def sendDrinksToCreationProcess(barista: Barista) =
      process1.multiplex(processColdDrinks(barista), processHotDrinks(barista))

    // Our fake waiter
    val waiter = io.stdOutLines

    // Our "application"
    val drinkOrderingSystem =
      (orders |>
        drinksOnly |>
        coldOrHotIndividualDrink |>
        sendDrinksToCreationProcess(Barista())).map(_.toString) to waiter
    drinkOrderingSystem.run.run

and the output matches our expectations

Cafe MyBucks opened for business
pool-1-thread-1 prepared hot drink #0 for order #1: Item(1,Cappuccino,1,false)
Drink(1,Cappuccino,1,false)
pool-1-thread-1 prepared hot drink #1 for order #1: Item(1,Espresso,4,false)
Drink(1,Espresso,4,false)
pool-1-thread-2 prepared cold drink #2 for order #2: Item(2,Latte,4,true)
Drink(2,Latte,4,true)
pool-1-thread-2 prepared cold drink #3 for order #2: Item(2,Espresso,2,true)
Drink(2,Espresso,2,true)
pool-1-thread-3 prepared cold drink #4 for order #3: Item(3,Latte,1,true)
Drink(3,Latte,1,true)
pool-1-thread-3 prepared hot drink #5 for order #3: Item(3,Cappuccino,4,false)
Drink(3,Cappuccino,4,false)
pool-1-thread-4 prepared hot drink #6 for order #4: Item(4,Latte,2,false)
Drink(4,Latte,2,false)
pool-1-thread-4 prepared hot drink #7 for order #4: Item(4,Cappuccino,0,false)
Drink(4,Cappuccino,0,false)
pool-1-thread-4 prepared hot drink #8 for order #5: Item(5,Espresso,3,false)
Drink(5,Espresso,3,false)
pool-1-thread-4 prepared cold drink #9 for order #5: Item(5,Cappuccino,4,true)
Drink(5,Cappuccino,4,true)

Where Did the Parallelism Come From?

In the above examples, the thread's used to run the prepare drinks process is not the same thread each time. Its coming from a pool. But we never created a pool ourselves. Threading concerns should be under explicit programmer control.

It is! Our order generation process is asynchronous because we started it off that way. The default executor service is used which for scalaz is a thread pool with the number of threads equal to the number of processors. If we change one line, everything becomes sequential and runs in the same thread. Here's the line we need to change from async using Task{} which is asynchronous, to Task.delay{} which is not asynchronous.

    // Synchronous task that asks for a single order.
    val askForOrder = Task.delay { c.getOrder() }

which produces the output we expect where all the action occurs on the main thread:

Cafe MyBucks opened for business
main prepared cold drink #0 for order #1: Item(1,Latte,2,true)
Drink(1,Latte,2,true)
main prepared cold drink #1 for order #1: Item(1,Latte,0,true)
Drink(1,Latte,0,true)
main prepared cold drink #2 for order #2: Item(2,Espresso,0,true)
Drink(2,Espresso,0,true)
main prepared cold drink #3 for order #2: Item(2,Latte,2,true)
Drink(2,Latte,2,true)
main prepared cold drink #4 for order #3: Item(3,Espresso,2,true)
Drink(3,Espresso,2,true)
main prepared cold drink #5 for order #3: Item(3,Espresso,0,true)
Drink(3,Espresso,0,true)
main prepared cold drink #6 for order #4: Item(4,Latte,4,true)
Drink(4,Latte,4,true)
main prepared cold drink #7 for order #4: Item(4,Cappuccino,2,true)
Drink(4,Cappuccino,2,true)
main prepared cold drink #8 for order #5: Item(5,Cappuccino,0,true)
Drink(5,Cappuccino,0,true)
main prepared cold drink #9 for order #5: Item(5,Espresso,4,true)
Drink(5,Espresso,4,true)

Once the order generation process was asynchronous, no other asynchronous tasks were introduced and the remaining processing of that Order continued in that initial "thread pool" until the end. However, the SI Cafe example has parallelism in the drink creation process using a thread pool to ask Baristas to make drinks--essentially creating multiple, parallel requests to the Barista service to create drinks. In our last formulation, we had sendDrinksToCreationProcess sending drink item orders to the Barista drink creation service sequentially. We want to have this run with a thread pool so the drink creation process can run in a parallel thread then the Drink that is created goes to the Waiter to be sent to the customer.

The SI Cafe example has a queue sitting in front of the hot and cold Barista creation request function call. This allowed the driver program to shove orders into the queue (a push strategy) as fast as it wanted to and to let the creation process run at a different speed--it takes time to make drink. The hot and cold "channels" are the queue. Once an drink order enters the queue associated with hot or cold drink orders, a poller dequeues the item, then calls the service method to create the hot or cold drink. A channel can send messages to a single object (point to point) or multiple objects (publish/subscribe). A channel can have a queue or no queue. A channel can dispatch messages either in the same thread as the endpoint or in a separate thread. The SI example uses a queue based channel and a poller on a service activator that calls the Barista service. When something appears in the channel, the poller takes an object out of the queue and calls the Barista create drink method. Since there is a poller involved, the dispatch to the Barista service using the queue object occurs in a separate thread. A thread pool is used so the polling has up to X number of "baristas" available to make drinks. In the SI example, instead of directly configuring the service activator instance with a poller, they could have used a "bridge" that connects the standard channel to a pollable channel so that the polling configuration can be left out of the service activator.

Trying to specify the right combination of synchronous/asynchronous/buffering behaviors in an application is part of the design process. It's enough to make your head hurt.

[[more here about asynch stuff]]

The Waiter

Up to now we had the Waiter just be a sink that outputs the lines to stdout. In the SI example, the Waiter aggregates all of the drinks that were created by the Barista's then delivers the entire order at once. In spring integration, you define an aggregator that "aggregates" successive elements into a list. The aggregation occurs through a correlation strategy that groups the drinks by their order number. It is not so clear in the SI example that ordering within the queue and with parallel Baristas working that other orders do not somehow slip into the sequence of another order. Perhaps its implied by the queue and semantics as documented.

[[more here about aggregating by order number]]

Last updated