Ryan Peters @ 47 Degrees
Builds on top of my ScalaCon 2021 Talk:
//Represents a K/V store where strings have an integer ID
val dataSource: DataSource[IO, Int, String] = ???
def fetch(i: Int) = Fetch.optional(i, dataSource)
//Fetches both 1 and 2, at once
//Deduplicates the second request for 1
val dedupedFetch = (fetch(1), fetch(2), fetch(1)).tupled
Fetch.run(dedupedFetch).map {
case (Some(first), Some(second), Some(firstAgain)) => ???
//And so on...
}
DSLs are just high-level code
DSLs can be written in multiple styles
Fetch is written in a "free" style, as a data structure
//Defines all terms in our language
sealed trait MyLanguage
//Concepts, operations, etc are just data
final case class MyIntOp(i: Int) extends MyLanguage
final case class MyStrOp(s: String) extends MyLanguage
//We need a way to combine our ops into a program
//List is a simple way to imply a sequence or ordering
val myProgram = List(MyIntOp(42), MyStrOp("Hello world!"))
//Programs as data structure must be interpreted explicitly
def interpret(ops: List[MyLanguage]): Unit
List is a "free monoid" - giving us the ability to combine ops "for free"
Other "free" variants:
flatMap
)product
, etc)You can find these in Cats and other libraries
Monads MUST be dependent, Applicatives can't
//Must work as a strict, dependent sequence
List(Right(1), Left(1), Right(1))
//Applicatives are independent,
//so there is room for optimizing
List(Valid(1), Invalid(1), Valid(1))
Cats 2.7.0 changed an implementation detail of the type classes Fetch
used.
Fetch is unlawful - it wants to be Monadic without the guarantee of sequencing
Fetch 3 won't guarantee automatic sequence batching
Fetch 3.1.0 brings it back w/ a targeted override
trait DataSource[F[_], I, A] {
def data: Data[I, A] //Used to optimize access
//Needs to capture the Concurrent instance
implicit def CF: Concurrent[F]
def fetch(id: I): F[Option[A]]
def batch(ids: NonEmptyList[I]): F[Map[I, A]]
//Smells like implementation details
def maxBatchSize: Option[Int] = None
def batchExecution: BatchExecution = InParallel
}
//Slightly simplified from the source code
//You need `Concurrent` and `Clock` wherever you run a fetch
def run[F[_]: Concurrent: Clock, A](fa: Fetch[F, A])
Functions such as runLog
need to be hard-coded in, and you can't write your own
See: GitHub Issue #429 where a user wants to
define a timeout on a single Fetch
, which is not possible
[int].myOp
//Replaces DataSource
trait Fetch[F[_], I, A] {
val id: String //Naive identifier per-instance
def single(i: I): F[Option[A]]
def many(is: Set[I]): F[Map[I, A]]
}
val intFetch: Fetch[IO, Int, Int] =
Fetch.echo[IO, Int, Int]("intFetch")
//Can support syntax
4.fetch(intFetch) >> 5.fetch(intFetch) //Manual sequencing
(4, 5, 6).fetchTupled(intFetch) //Applicative-like syntax
Pros: Very little code, syntax makes intent apparent
Cons: No concept of deduping, auto-batching
//We need to cache results between fetches
type CacheMap = Map[(Any, String), Option[Any]]
//An already-ran fetch, can be used for linear deduping
final case class DedupedFetch[F[_], A](
unsafeCache: CacheMap, last: A
)
//Supported with syntax, not as nice
intFetch.fetchDedupe(5)
.alsoFetch(intFetch)(5)
.alsoFetchAll(intFetch)(Set(4, 5, 6))
(4, 5, 6).fetchTupledDedupe(intFetch)
Pros: Now we can dedupe fetches in a strict linear order!
Cons:
//Lets defer running a fetch, make it lazy!
final case class LazyFetch[F[_], A](
k: Kleisli[F, CacheMap, DedupedFetch[F, A]]
)
//Much nicer syntax again
5.fetchLazy(intFetch) >> 6.fetchLazy(intFetch)
//Can compose fetches in any direction
val oneOrder = fetch1 >> fetch2 >> fetch3
val otherOrder = fetch2 >> fetch3 >> fetch1
//Run any time
otherOrder.run //F[DedupedFetch[F, A]]
We can fetch, dedupe, even interleve effects...
...But we still can't auto-batch!
final case class LazyFetch[F[_]: FlatMap, A](
k: Kleisli[F, CacheMap, DedupedFetch[F, A]]
)
//Kinda looks like...
final case class OptionT[F[_], A](value: F[Option[A]])
We've (unintentionally) made a transformer!
Lets fix auto-batching:
//How to get a batch
type LazyBatchGetter[F[_]] =
(Set[Any], CacheMap) => F[DedupedFetch[F, Map[Any, Any]]]
//Fetch ID -> (IDs -> Getters)
type LazyBatchReqs[F[_]] =
Map[String, (Set[Any], LazyBatchGetter[F])]
final case class LazyBatch[F[_]: Applicative, A](
reqs: LazyBatchReqs[F],
getResult: Kleisli[F, CacheMap, DedupedFetch[F, A]]
)
//LazyBatch will auto-batch requests
def fetch(i: Int) = LazyBatch.single(intFetch)(i)
(fetch(1), fetch(2), fetch(3)).tupled
//LazyFetch forms a Parallel with LazyBatch
List(5, 6, 7).parTraverse(_.fetchLazy(intFetch))
//Equivalent to a manual batch
LazyBatch.many(intFetch)(Set(5, 6, 7))
In order of speed:
Other additions
//Fetch instances determine parallelism, can be hard-coded
object Fetch {
def singleParallel[F[_]: Monad: Parallel, I, A](
fetchId: String
)(f: I => F[Option[A]]): Fetch[F, I, A]
}
object LazyFetch {
//Interleve effects between individual fetches
def liftF[F[_]: FlatMap](fa: F[A])
}
case class LazyFetch[F[_]: FlatMap, A](...) {
//No requirements to run after creating
def run: F[DedupedFetch[F, A]]
}
case class LazyBatch[F[_]: Applicative, A](...) {
//Some still needed for LazyBatch
def run(implicit M: Monad[F], P: Parallel[F])
}
>>
-style code anywayCode is @ github.com/sloshy/fetchless
Slides are up @ slides.rpeters.dev/fetchless
Thank you!