Sinks

UPDATE FOR FS2

Sinks are processes that perform side effects and do not return domain values, they return Unit. Sinks are often found at the end of a stream. They can also be present in the middle of a stream if stream data is being sent to a Sink as well as being retained as output. You typically stream items through the sink. Sinks are often thought as places where your domain data is transported to an external, effectful system e.g. disk, network, outer space.

A common sink is to print to the stdout:

scala>Process.range(1,10).map(_.toString).to(io.stdOutLines).run.run
1
2
3
4
5
6
7
8
9

To send value from a process to a Sink you need to use .to. Since a Sink does not output any values, you can use .run.run the process. If you used .runLog.run, you get back a set of Units because println returns Unit (or void in java-land).

Process.range(1,10).map(_.toString).to(io.stdOutLines).runLog.run
1
2
3
4
5
6
7
8
9
res70: IndexedSeq[Unit] = Vector((), (), (), (), (), (), (), (), ())

The signature for a Sink is type Sink[+F[_], -O] = Process[F, (O) ⇒ F[Unit]] so as long as you can write a function that returns an environment and that environment returns Unit, you can create a sink. A database can form a Sink:

 /**
   * Write events to a database as a side effect.
   */
  def dbWriter(da: DataAccess): Sink[Task, Vector[Event]] =
    Process.repeatEval {
      import da._
      Task.delay {
        events: Vector[Event] =>
          Task.delay {
            val result = Await.result(db.run(Events ++= events), Duration.Inf)
            println("Inserted: " + result.getOrElse(0) + " records.")
          }
      }
  }

Of course, you may want to use channel for this and return the number of records inserted. The returned value could be processed in another stream to check for insert errors, for example. This particular example uses typesafe's slick. In the example above, the repeatEval is needed so that a Process is returned.

You might think that since the signature of a Sink and a Channel are so similar that you could convert a Sink to a Channel, you can. The input will be echoed as an output versus return Unit: dbWriter.toChannel.

Another way to create a Sink is to use io.channel and make sure you return a Unit:

// This won't work because a Sink has to return Unit.
val mySink: Sink[Task, Int] = io.channel{(i:Int) => Task.delay{ println("i: "+i); 5 }}
<console>:24: warning: a pure expression does nothing in statement position; you may be omitting necessary parentheses
       val mySink: Sink[Task, Int] = io.channel{(i:Int) => Task.delay{ println("i: "+i); 5 }}
                                                                                         ^
mySink: scalaz.stream.Sink[scalaz.concurrent.Task,Int] = Append(Emit(Vector(<function1>)),Vector(<function1>))

scala val mySink: Sink[Task, Int] = io.channel{(i:Int) => Task.delay{ println("i: "+i); () }}
mySink: scalaz.stream.Sink[scalaz.concurrent.Task,Int] = Append(Emit(Vector(<function1>)),Vector(<function1>))

However, io.channel may no longer be available in the library. A new sstream function allows to you lift a function much easier. It is essentially the same is using io.channel but with better syntax clarity (it actually just does the channel lift thing

val printPlusOne = sink.lift((i: Int) => Task.delay { println("i: " + i); () })
                                                  //> printPlusOne  : scalaz.stream.Sink[scalaz.concurrent.Task,Int] = Append(Emi
                                                  //| t(Vector(<function1>)),Vector(<function1>))

We can then rewrite our dbWriter as:

 val dbWriter : Sink[Task, Vector[Event]] =
    sink.lift{ events: Vector[Event] =>
          Task.delay {
            val result = Await.result(db.run(Events ++= events), Duration.Inf)
            println("Inserted: " + result.getOrElse(0) + " records.")
    }}

Which is clearer. I converted this to val with the assumption that the data access machinery is in scope as needed.

You can look at the "Write to Database" recipe in the cookboook for a better formulation than the above.

Last updated