Streams

Your New Favorite Primitive

By Ryan Peters

Theme: Black (default) - Solarized

Subject of this talk:

A new form of "streams" have been becoming popular.

Not all languages/libraries handle the concept equally.

That said...

Once you know one, to an extent, you know them all.

Streaming with FS2

My streaming library of choice in Scala is FS2.

FS2 is inspired by - and also inspires - other libraries from other ecosystems.

This presentation is applicable to almost any other language or streaming library.

My History With Streams

Back in 2016...

I got in to Angular in JavaScript...

Which got me into RxJS

RxJS "Observables"

Single Multiple
Pull / Sync Function Iterator
Push / Async Promise Observable

Takeaways:

  • Streams represent possibly async data transformation
  • Useful as a general-purpose primitive
  • "Observables" == "Streams" in other libs
  • (PS: Monix is like ReactiveX in Scala)

A Streaming Vocabulary

A stream:

  • Is an "iterable-like" interface for processing ordered elements
  • Has "operators" that append new steps in the "pipeline"
  • Can be composed together to produce entire program flows

Other common concepts:

  • A "Single" stream - always has 1 element

    (In Scala this is just IO / Future thanks to map/flatmap)

  • Subjects/Topics/Queues - Stream-enabled concurrency primitives
  • A way to define custom operators (GraphStage, Pull, etc.)

FS2 Stream

Looks like:

Stream[F[_], A]

  • F[_] is the effect type
  • A is the element type

Pure Streams


import fs2.{Pure, Stream}

val pureStream: Stream[Pure, Int] = Stream(1, 2, 3)
pureStream.map(_ - 1).compile.toList // List(0, 1, 2)
          

Code Theme: Monokai (default) - Zenburn - Purebasic

Streaming side-effects


import cats.effect.IO

def putstrln(s: String): IO[Unit] = IO(println(s))

val pureStream = Stream(1, 2, 3)
val printEverything: IO[Unit] =
  pureStream
    .map(_.toString)
    .evalTap(putstrln) //Now we have a Stream[IO, Int]
    .compile
    .drain //Discards all results

//Prints 1, 2, 3 on separate lines
printeverything.unsafeRunSync()
          

Compiling Streams

Streams in FS2 must be "compiled" to a pure value

"Pure" streams compile to the raw result A

Effectful streams must compile to an F[A] value

Note: you can only call .compile if your F[_] type can handle Throwable errors, or is "pure"

Other Useful Compile Operators

  • fold - Just like foldLeft on List
  • last - Gives F[Option[A]]
  • string - Efficiently concatenates a stream of Strings w/ StringBuilder
  • to(Col) - Collect results to a collection (great for tests)

Are Streams Necessary?

An Event At Work

  • We had an event-driven application
  • ...Implemented with loops and mutability
  • "A natural streaming use-case!" -me
  • Redid some code w/ Akka Streams
  • It did not go over well w/ the maintainer

Diagnosing the issues

  1. Akka Streams came across as complicated and confusing
  2. Cost/Benefit of bringing in a library "just" to solve a simple, one-off problem
  3. Misconceptions as to when streams are "necessary"

RE: Akka Streams

  • Akka Streams is conceptually large
  • "Streams" are actually Sources/Sinks/Flows
  • ...And those all have "shapes"
  • ...And you compose them with a custom "graph DSL"
  • CAN be done functionally, but it's not actively encouraged
  • Engineered to mesh well with Akka actors, which are complex

Streaming "Cost/Benefits"

My take:

  • Streams are an general abstraction
  • Once you know a concept - you can apply it anywhere
  • Everybody benefits from knowing functional streaming
  • You might as well already know it

When are streams "necessary?"

In software, "necessary" just means "easier/better"

The real question is... what are our alternatives?

Stream "Alternatives"

  • Looping by hand
    • ...no
  • Stream/LazyList
    • Can represent infinitely-repeating tasks
    • Also requires infinite memory!
  • Iterator
    • Can be made infinite
    • Handling concurrency, reactivity is a hassle
    • Mutable, unsafe, and unsuitable for pure FP

What about Actors?

  • With Akka Actors you get concurrent state and reactive, message-based programming
  • Actors are pretty low-level (mailbox management, boilerplate, no backpressure)
  • This is a lot to think about sometimes
  • Actors are a great concept - just not a great default tool
  • Messaging, state, etc. are even easier w/ streams (write your own "actors")

Streams are a Good Abstraction

Streams, in many libraries (but especially FS2):

  • Represent a "linear thread of execution"
  • Handle the possibility of asynchrony with ease
  • Have minimal boilerplate
  • Are declarative and easily composable

So lets use them everywhere!

A Stream of Inspiration

FS2-IO Module

  • File reading/writing
  • TCP/UDP sockets
  • StdIn/StdOut streaming
  • New (v2.2): TLS support
  • All have raw byte support, converting formats

Find all file lines w/ "password"


val blocker: Blocker = ??? //A blocking ExecutionContext
val docPath =
  java.nio.file.Paths.get("/home/rpeters/Documents")
val outFile = docPath.resolveSibling("out.txt")

//Start w/ a stream of all files in the directory
fs2.io.file.walk(blocker, docPath).flatMap { path =>
  //Read the file in, 512 bytes at a time
  fs2.io.file.readAll(path, blocker, 512)
    .through(fs2.text.utf8Decode) //Decode to String
    .through(fs2.text.lines) //Extract lines
    .filter(_.toLowerCase.contains("password"))
    .through(fs2.io.writeAll[IO](outFile, blocker))
}.compile.drain.unsafeRunSync()
          

Database Streaming w/ Doobie

Doobie is a pure JDBC wrapper built on Cats Effect


//Runs DB transactions as your F[_] of choice
val xa: Transactor[IO] = ???

//Represents a step in a DB connection
val userQuery: ConnectionIO[List[(String, Int)]] =
  sql"SELECT username, age FROM users"
    .query[(String, Int)]
    .toList

val runQuery: IO[List[(String, Int)]] =
  userQuery.transact(xa)

runQuery.flatMap(users => IO(users.foreach(println)))
          

Queries can also be streamed


def sendBetaInvite(u: UserAccount): IO[Unit]

sql"SELECT username, email FROM users WHERE banned = false"
  .query[(String, String)]
  .stream //Stream[ConnectionIO, (String, String)]
  .map(UserAccount(_, _, banned = false))
  .transact(xa) //Stream[IO, UserAccount]
  .parEvalMap(32)(user => sendBetaInvite(user))
  .compile
  .drain
          

parEvalMap runs N evaluations in parallel

http4s - Streaming HTTP

http4s is a pure HTTP server and client library


def sendWorkRequest(work: UnitOfWork): IO[Unit] = ???
def parseUnitOfwork(s: String): Option[UnitOfWork] = ???

val routes = HttpRoutes.of[IO] {
  case req @ POST -> Root / "work" / "submit" =>
    req.body.through(fs2.text.utf8decode)
      .through(fs2.text.lines)
      .map(parseUnitOfWork)
      .unNone //Filters out "None" from the stream
      .parEvalMap(32)(sendWorkRequest)
      .compile
      .drain
      .flatMap(_ => Ok("Work requests sent!"))
}
          

In http4s, every request/response is a stream

FS2-SSH - Streaming SSH In Scala


def runScript(blocker: Blocker, client: Client[F], cc: ConnectionConfig)(implicit F: FunctorRaise[F, Client.Error]) {
  client.exec(connectionConfig, "/path/to/script.sh", blocker) { process =>
    process.stdout //A stream of raw byte output
      .through(fs2.text.utf8Decode)
      .through(fs2.text.lines)
      .filterNot(_.contains("log"))
      .showLinesStdout
      .compile
      .drain
  }
}
          

Also supports stderr, stdin, joining

Honorable Mentions

Streaming Patterns

Streams are Resources, Scopes

Lift cats.effect.Resource to Stream


import cats.effect.{Blocker, IO, Resource}

val newBlocker: Resource[IO, Blocker] = Blocker[IO]
val blockerStream = Stream.resource(newBlocker)
val fileNameStream: Stream[IO, Path] = ???
def search(blocker: Blocker, file: Path): IO[String] = ???

blockerStream.flatMap { blocker =>
  fileNameStream.evalMap { name => 
    searchForTextInFile(blocker, name)
  }
} //Blocker resource is closed once your stream is over
          

Run your whole program as concurrent streams


val httpServer: Stream[IO, Unit] = ???
val handleMessagesFromKafka: Stream[IO, Unit] = ???
val processAsyncRequests: Stream[IO, Unit] = ???

//A stream of streams - super streamy! :)
Stream(
  httpServer,
  handleMessagesFromKafka,
  processAsyncRequests
).parJoinUnbounded //Run all streams concurrently
  .compile
  .drain
          

Event-driven State

Front-end and back-end apps can benefit from having state be event-driven

Key in "Event Sourcing", Flux/Redux, etc.


def buildState(state: Int, next: Int) = state + next

val eventStream = Stream(1, 2, 3)

//"fold" on a Stream gives you a stream of 1 element
//"compile.fold" gives you the final accumulated value
val finalState = eventStream.compile.fold(0)(buildState)
//finalState == 6

//"scan" folds but also emits all intermediate states
val intermediateStates = eventStream.scan(0)(buildState)
  .compile
  .toList
//intermediateStates == List(0, 1, 3, 6)
          

For some extra event-sourcing utils, try FS2-ES*


import dev.rpeters.fs2.es._

val stateProgram: IO[(List[Int], Int)] = for {
  eventState <- EventState[IO].initial[Int, Int](0)(_ + _)
  states <- Stream(1, 2, 3)
    .through(eventState.hookup) //Stream of result states
    .compile
    .toList
  currentState <- eventState.get //Gets the current state
} yield allStates -> currentState

stateProgram.unsafeRunSync() //List(1, 3, 6) -> 6
          

Also includes a state cache & other helpful utils

*Bias Alert: I wrote this one!

Lets get reactive!


import fs2.concurrent.Queue

def renderButton(count: Int, increase: IO[Unit]): IO[Unit]

val fauxFlux: IO[Unit] = for {
  q <- Queue.unbounded[IO, Int]
  plusOne = q.enqueue1(1)
  _ <- q.dequeue
    .scan(0)(_ + _)
    .evalTap(i => renderButton(i, plusOne))
    .compile
    .drain
} yield ()
fauxFlux.unsafeRunAsyncAndForget() //Needed for ScalaJS
                      

Live demo -->

Streaming for Batches

Misconception: "Streaming" only makes sense for "streaming apps"

You'd only be making things harder on yourself!

  • Streaming whole files? Swap between stream & batch!
  • Only doing a "single thing?" Handle retries, split it up, & add paralleilsm!
  • Can fit everything into memory anyway? Start processing data immediately!

Further Reading:

Presentations:

On the web:

Thank you!