Fetchless

A New Take on Data Retrieval

Ryan Peters @ 47 Degrees / Xebia Functional

Back in 2016...

Fetch was born!

  • Optimizes requests for data
  • Batches independent requests
  • Deduplicates requests for the same item
  • Ensures parallel execution

Influenced by Haxl and Stitch libraries


					val dataSource: DataSource[IO, Int, String] = ???
					
					def fetch(i: Int): Fetch[IO, Option[String]] =
					  Fetch.optional(i, dataSource)
					
					//Fetches both 1 and 2, at once
					//Deduplicates the second request for 1
					val myFetch: Fetch[IO, (Option[String], Option[String], Option[String])] =
					  (fetch(1), fetch(2), fetch(1)).tupled
					
					Fetch.run(myFetch).map {
						case (Some(first), Some(second), Some(firstAgain)) => ???
						//And so on...
					}
									

What is Fetchless?

  • Started as a "tagless final port" of Fetch
  • Replaces implicit "magic" with explicit syntax
  • Order of magnitude faster
  • More lightweight and flexible

Fetch is a DSL for data retreival.

Fetchless is a different encoding of those same ideas.

See: "Your Program Is a Language" - ScalaCon 2021

Generally two ways of encoding embedded DSLs:

  • "Free" style - operations are data, and you pass them to "interpreters"
  • "Tagless final" style - operations are methods defined in terms of their interpreters

"Free" vs "TF" in practice


//"Free" - operations as data ("reified") w/ interpreter
sealed trait MyDSL[A]
case class MyOperation(param1: String, param2: Int)
  extends MyDSL[Unit]

def myDslInterpreter[F[_]: Sync](op: MyOperation[A]): F[A]

//"TF" - have an abstract effect type for your interpreter
trait MyDSL[F[_]]:
  def myOperation(param1: String, param2: Int): F[Unit]

object MyDSL:
  def impl[F[_]: Sync] = new MyDSL[F] { ... }
				

Takeaways:

Tagless Final style:

  • Is more efficient overall
  • Lets you have an "extensible DSL"

The Fetch 3.0 Incident

Fetch builds started failing in late 2021

Cause seemed to be an upgrade to Cats 2.7

What happened?

Applicative vs Monad

Applicative:

  • Used for independent effects
  • Good for aggregating, not dependency

Monad:

  • Used for dependent effects
  • Good for dependency, not for aggregating

Either (Monad) vs Validated (Applicative)


List(Right(1), Left(2), Right(3), Left(4)).sequence
//Left(2)
//Traversing implies 'flatMap' by law, short-circuits

List(
	Valid(1),
	Invalid(NonEmptyChain(2)),
	Valid(3),
	Invalid(NonEmptyChain(2))
).sequence
//Invalid(NonEmptyChain(2, 4))
//Traversing cannot flatMap, no short-circuiting
				

Root Cause:

Fetch was pretending to be a Monad while skirting Monad laws

Issue was "resolved" as of Fetch 3.1 by overriding a Cats implementation detail (ugly!)

Fetch would automatically batch the following:


def fetch(i: Int): Fetch[IO, String]
List(1, 2, 3).traverse(i => fetch(i))
				

Fetchless needs syntax for the same


implicit val instance: Fetch[IO, Int, String]
List(1, 2, 3).fetchAll(instance)
List(1, 2, 3).fetchAll[IO, String] //Can also use implicits
				

This is due to lawfulness requirements

The "Fetch" Algebra


trait Fetch[F[_], I, A]:
  def single(i: I): F[Option[A]]
	def singleDedupe(i: I): F[DedupedRequest[F, Option[A]]]
	def singleLazy(i: I): LazyRequest[F, Option[A]]
	def batch(iSet: Set[I]): F[Map[I, A]]
	//And so on...
				

Change to Execution Model

Fetch == "lazy by default", no opting out

Fetchless == "linear by default", lazy opt-in

Deduping


// K/V is: (requestId, fetchId) -> optionResult
type CacheMap = Map[(Any, FetchId), Option[Any]]

final case class FetchCache(
	cacheMap: CacheMap,
	...
)

final case class DedupedRequest[F[_], A](
	unsafeCache: FetchCache,
	last: A
)
				

Opt into deduplicating requests without laziness

Laziness


					final case class LazyRequest[F[_], A](
						k: Kleisli[F, FetchCache, LazyRequest.ReqInfo[F, A]]
					)

					sealed trait ReqInfo[F[_], A] {
						val prevCache: FetchCache
						def updateCache(extra: FetchCache): ReqInfo[F, A]
						...
					}

					final case class FetchReqInfo[F[_], A](...) 
					  extends ReqInfo[F, A]
					final case class LiftedReqInfo[F[_], A](...)
					  extends ReqInfo[F, A] //And so on...
				

Lazy Batching

Fetchless requires Parallel for lazy batching


def lazy(i: Int): LazyRequest[F, Option[String]]

(lazy(1), lazy(2), lazy(3)).tupled //Does NOT batch
(lazy(1), lazy(2), lazy(3)).parTupled //DOES batch

List(lazy(1), lazy(2), lazy(3)).sequence //Does NOT batch
List(lazy(1), lazy(2), lazy(3)).parSequence //DOES batch

//Does not need parallel, custom syntax
List(1, 2, 3).fetchAllLazy[F, String]
				

Other changes:


trait DataSource[F[_], I, A]:
  ...
	def batch(ids: NonEmptyList[I]): F[Map[I, A]]
  ...
				

Why not use Set?


					def batch(iSet: Set[I]): F[Map[I, A]]
					

Set automatically deduplicates requests


						trait DataSource[F[_], I, A]:
							//Smell like implementation details
							def batchExecution: BatchExecution //i.e. 'InParallel'
										

Changed to smart constructors


object Fetch:
  //Batches are turned into single fetches
  def singleSequenced[F[_]: Monad, I, A](fetchId: String)(
		f: I => F[Option[A]]
	): Fetch[F, I, A]
	def singleParallel[F[_]: Monad: Parallel, I, A](fetchId: String)(
		f: I => F[Option[A]]
	): Fetch[F, I, A]

	//Single fetches are done in terms of a batch
	def batchOnly[F[_]: Monad, I, A](fetchId: String)(
		batchFunction: Set[I] => F[Map[I, A]]
	)
										

Fetch requires heavy capabilities when ran


//Slightly simplified from the source code
def run[F[_]: Concurrent: Clock, A](fa: Fetch[F, A])
					

Fetchless captures these constraints in your algebra

Requests can be interleaved with other effects


def runOrTimeout(dur: FiniteDuration)(fa: F[A]): F[A]

val fetch: Fetch[F, Int, String]

val first = runOrTimeout(fetch.singleDedupe(1))
val second = runOrTimeout(first.alsoBatch(Set(2, 3, 4)))
					

Bonus Integrations

Streaming Data w/ FS2


trait StreamingFetch[F[_], I, A] extends Fetch[F, I, A]:
	def streamingBatch(
		iSet: Set[I]
	): Stream[F, (I, Option[A])]

	def streamingBatchFilterOption(
		iSet: Set[I]
	): Stream[F, (I, A)]
				

+ Smart constructors to customize/convert instances

Doobie SQL Queries


final case class Row(...)
val getSingleRow: Query[Int, Row]
val getManyRows: Set[I] => Query0[(I, A)]
val tx: Transactor[F]

val fetch: StreamingFetch[ConnectionIO, Int, Row] =
  DoobieFetch
	  .forBatchableQuery("doobie")(getSingleRow)(getManyRows)

val txFetch: StreamingFetch[F, Int, Row] =
  DoobieFetch.forBatchableQueryTransact(
		"doobieTx",
		xa
	)(getSingleRow)(getManyRows)
					

http4s client requests


final case class MyEntity(...)
val client: Client[F]
def makeRequest(i: Int): Request[F]
implicit ed: EntityDecoder[F, MyEntity]

val fetch = Http4sClientFetch
  .forEntityParallel[F, Int, MyEntity](
		"http4s", 
		client
	)(makeRequest)
					

Also supports batchable entities

Fetch ALL THE THINGS!


trait AllFetch[F[_], I, A] extends Fetch[F, I, A]:
	def batchAll: F[Map[I, A]]
					

Also works with streaming, doobie, etc

Can wrap existing instances

Debugging support


sealed trait FetchType[I] //Multiple subtypes
final case class DebugLog[I](
	fetchId: String,
	fetchTime: FiniteDuration,
	fetchType: FetchType[I]
)

val instance: Fetch[IO, Int, String]
val debugInstance = DebugFetch.wrap(instance)
val doFetch =
  debugInstance.single(1) *> debugInstance.batch((Set(2, 3)))

doFetch *> debugInstance.flushLogs
//Chain(DebugLog("id", 5 seconds, FetchType.Fetch), ...)
				

Live benchmark demo time~

What's Next?

  • Creating docs/website
  • Formalizing benchmarks
  • Publishing the library

Accessible @ github.com/47degrees/fetchless

Thank you for joining!

Slides are up @ slides.rpeters.dev/fetchless-scalacon/