Smart Constructors and Combinators

You need to create streams then run them. That's the basic model behind fs2. You create most of the streams through smart constructors versus the traditional OOP approach of new YourType(). For example to create a stream that generates a range of Integers you write:

Stream.range(1,10)

If you want to create a pipe, you write:

pipe.lift((line: String) => Task.delay { println(line) })

You can create a sink (which does the same thing as the last example above) using

fs2> val mySink: Sink[Task, Int] = pipe.lift(i => Task.delay(println(s"i: $i")))  
mySink: Stream[Task, Int] => Stream[Task, Unit] = <function1>

Note that a Sink is just a type alias type Sink[F[_],-I] = Pipe[F,I,Unit] so we used the type in our declaration to reinterpret the pipe.

The use of smart constructors is common both in fs2, scalaz and other functionally-oriented libraris.

Another approach to creating your streams involves using combinators. A "combinator" is a fancy term for the loose intersection between the "Builder" design pattern in standard java and complicated java constructors. The Builder design pattern separates out the act of specifying the dependencies of an object from the moment of an objects creation. It also allows a programmer to be less concerned with the order of constructor parameters and enables "fluent" APIs. All of this is helpful in the java world, but in functional languages, its is more common to use smart constructors and combinators.

A combinator takes an instance of an object, typically mutable, and applies a function to it to produce an object of the same type but a new object--the original object is left untouched. Most functional languages handle these layers efficiently so there is less performance issues with creating layers of objects.

When a combinator is applied to an existing object, it adjust some aspect of the object. Standard scala case objects make "copy and change a value" operations easy which is why you see many scala combinator implementations implemented using case classes.

The smart constructor and combinator mix needs to be used to create some process objects.

For example, you may wonder how you combine two streams to make a new stream. Let's consider two streams that you want to combine (hence the concept of a combinator):

fs2> val mySink: Sink[Task, Int] = pipe.lift(i => Task.delay(println(s"i: $i")))  
mySink: Stream[Task, Int] => Stream[Task, Unit] = <function1>
fs2> val lhs = Stream.range(1,5) 
val rhlhs: Stream[Nothing, Int] = Segment(Emit(Chunk(()))).flatMap(<function1>)
fs2> val rhs = Stream.range(6,10) 
rhs: Stream[Nothing, Int] = Segment(Emit(Chunk(()))).flatMap(<function1>)

Then to combine them:

fs2> lhs interleave rhs 
res4: Stream[Nothing, Int] = evalScope(<scope>).flatMap(<function1>).flatMap(<function1>)
fs2> (lhs interleave rhs).runLog 
res5: Attempt[Vector[Int]] = Right(Vector(1, 6, 2, 7, 3, 8, 4, 9))

Here, interleave is a combinator.

Last updated