Theme: Black (default) - Solarized
A new form of "streams" have been becoming popular.
Not all languages/libraries handle the concept equally.
That said...
Once you know one, to an extent, you know them all.
My streaming library of choice in Scala is FS2.
FS2 is inspired by - and also inspires - other libraries from other ecosystems.
This presentation is applicable to almost any other language or streaming library.
I got in to Angular in JavaScript...
Which got me into RxJS
Single | Multiple | |
---|---|---|
Pull / Sync | Function | Iterator |
Push / Async | Promise | Observable |
(PS: Monix is like ReactiveX in Scala)
A stream:
(In Scala this is just IO / Future thanks to map/flatmap)
Looks like:
Stream[F[_], A]
import fs2.{Pure, Stream}
val pureStream: Stream[Pure, Int] = Stream(1, 2, 3)
pureStream.map(_ - 1).compile.toList // List(0, 1, 2)
Code Theme: Monokai (default) - Zenburn - Purebasic
import cats.effect.IO
def putstrln(s: String): IO[Unit] = IO(println(s))
val pureStream = Stream(1, 2, 3)
val printEverything: IO[Unit] =
pureStream
.map(_.toString)
.evalTap(putstrln) //Now we have a Stream[IO, Int]
.compile
.drain //Discards all results
//Prints 1, 2, 3 on separate lines
printeverything.unsafeRunSync()
Streams in FS2 must be "compiled" to a pure value
"Pure" streams compile to the raw result A
Effectful streams must compile to an F[A] value
Note: you can only call .compile if your F[_] type can handle Throwable errors, or is "pure"
My take:
In software, "necessary" just means "easier/better"
The real question is... what are our alternatives?
Streams, in many libraries (but especially FS2):
So lets use them everywhere!
Find all file lines w/ "password"
val blocker: Blocker = ??? //A blocking ExecutionContext
val docPath =
java.nio.file.Paths.get("/home/rpeters/Documents")
val outFile = docPath.resolveSibling("out.txt")
//Start w/ a stream of all files in the directory
fs2.io.file.walk(blocker, docPath).flatMap { path =>
//Read the file in, 512 bytes at a time
fs2.io.file.readAll(path, blocker, 512)
.through(fs2.text.utf8Decode) //Decode to String
.through(fs2.text.lines) //Extract lines
.filter(_.toLowerCase.contains("password"))
.through(fs2.io.writeAll[IO](outFile, blocker))
}.compile.drain.unsafeRunSync()
Doobie is a pure JDBC wrapper built on Cats Effect
//Runs DB transactions as your F[_] of choice
val xa: Transactor[IO] = ???
//Represents a step in a DB connection
val userQuery: ConnectionIO[List[(String, Int)]] =
sql"SELECT username, age FROM users"
.query[(String, Int)]
.toList
val runQuery: IO[List[(String, Int)]] =
userQuery.transact(xa)
runQuery.flatMap(users => IO(users.foreach(println)))
Queries can also be streamed
def sendBetaInvite(u: UserAccount): IO[Unit]
sql"SELECT username, email FROM users WHERE banned = false"
.query[(String, String)]
.stream //Stream[ConnectionIO, (String, String)]
.map(UserAccount(_, _, banned = false))
.transact(xa) //Stream[IO, UserAccount]
.parEvalMap(32)(user => sendBetaInvite(user))
.compile
.drain
parEvalMap runs N evaluations in parallel
http4s is a pure HTTP server and client library
def sendWorkRequest(work: UnitOfWork): IO[Unit] = ???
def parseUnitOfwork(s: String): Option[UnitOfWork] = ???
val routes = HttpRoutes.of[IO] {
case req @ POST -> Root / "work" / "submit" =>
req.body.through(fs2.text.utf8decode)
.through(fs2.text.lines)
.map(parseUnitOfWork)
.unNone //Filters out "None" from the stream
.parEvalMap(32)(sendWorkRequest)
.compile
.drain
.flatMap(_ => Ok("Work requests sent!"))
}
In http4s, every request/response is a stream
def runScript(blocker: Blocker, client: Client[F], cc: ConnectionConfig)(implicit F: FunctorRaise[F, Client.Error]) {
client.exec(connectionConfig, "/path/to/script.sh", blocker) { process =>
process.stdout //A stream of raw byte output
.through(fs2.text.utf8Decode)
.through(fs2.text.lines)
.filterNot(_.contains("log"))
.showLinesStdout
.compile
.drain
}
}
Also supports stderr, stdin, joining
Lift cats.effect.Resource to Stream
import cats.effect.{Blocker, IO, Resource}
val newBlocker: Resource[IO, Blocker] = Blocker[IO]
val blockerStream = Stream.resource(newBlocker)
val fileNameStream: Stream[IO, Path] = ???
def search(blocker: Blocker, file: Path): IO[String] = ???
blockerStream.flatMap { blocker =>
fileNameStream.evalMap { name =>
searchForTextInFile(blocker, name)
}
} //Blocker resource is closed once your stream is over
Run your whole program as concurrent streams
val httpServer: Stream[IO, Unit] = ???
val handleMessagesFromKafka: Stream[IO, Unit] = ???
val processAsyncRequests: Stream[IO, Unit] = ???
//A stream of streams - super streamy! :)
Stream(
httpServer,
handleMessagesFromKafka,
processAsyncRequests
).parJoinUnbounded //Run all streams concurrently
.compile
.drain
Front-end and back-end apps can benefit from having state be event-driven
Key in "Event Sourcing", Flux/Redux, etc.
def buildState(state: Int, next: Int) = state + next
val eventStream = Stream(1, 2, 3)
//"fold" on a Stream gives you a stream of 1 element
//"compile.fold" gives you the final accumulated value
val finalState = eventStream.compile.fold(0)(buildState)
//finalState == 6
//"scan" folds but also emits all intermediate states
val intermediateStates = eventStream.scan(0)(buildState)
.compile
.toList
//intermediateStates == List(0, 1, 3, 6)
For some extra event-sourcing utils, try FS2-ES*
import dev.rpeters.fs2.es._
val stateProgram: IO[(List[Int], Int)] = for {
eventState <- EventState[IO].initial[Int, Int](0)(_ + _)
states <- Stream(1, 2, 3)
.through(eventState.hookup) //Stream of result states
.compile
.toList
currentState <- eventState.get //Gets the current state
} yield allStates -> currentState
stateProgram.unsafeRunSync() //List(1, 3, 6) -> 6
Also includes a state cache & other helpful utils
*Bias Alert: I wrote this one!
Lets get reactive!
import fs2.concurrent.Queue
def renderButton(count: Int, increase: IO[Unit]): IO[Unit]
val fauxFlux: IO[Unit] = for {
q <- Queue.unbounded[IO, Int]
plusOne = q.enqueue1(1)
_ <- q.dequeue
.scan(0)(_ + _)
.evalTap(i => renderButton(i, plusOne))
.compile
.drain
} yield ()
fauxFlux.unsafeRunAsyncAndForget() //Needed for ScalaJS
Live demo -->
Misconception: "Streaming" only makes sense for "streaming apps"
You'd only be making things harder on yourself!
Presentations:
On the web: