可扩展的拉取和流处理
立即解锁
发布时间: 2025-08-18 01:01:50 阅读量: 1 订阅数: 7 


Scala函数式编程实战指南
# 可扩展的拉取和流处理
## 1. 可扩展的拉取和流
在流处理中,我们常常会遇到需要不断从队列中取出消息并进行处理的场景。例如,我们有一个 `dequeue` 操作,它会阻塞直到有消息可用:
```scala
val dequeue: IO[Message] = ...
```
我们可以创建一个流 `logAll`,它会不断地从队列中取出消息,将其格式化后进行日志记录:
```scala
val logAll: Stream[IO, Unit] =
Stream.eval(dequeue).repeat.map(format).pipe(log)
```
要运行这个流,我们可以使用 `fold` 或 `toList` 这样的消除器。然而,使用 `toList` 会有问题,因为每个日志操作都会输出一个 `Unit` 值,这些值会在列表构建器中累积,最终可能会耗尽堆内存。因此,我们可以创建一个新的消除器 `run`:
```scala
extension [F[_], O](self: Stream[F, O])
def run(using Monad[F]): F[Unit] =
fold(())((_, _) => ()).map(_(1))
val program: IO[Unit] = logAll.run
```
## 2. 错误处理
某些效果类型提供了处理评估期间发生的错误的能力。例如,`Task` 类型可以让我们在需要处理错误之前忽略潜在错误的存在:
```scala
val a: Task[Int] = Task("asdf".toInt)
val b: Task[Int] = a.map(_ + 1)
val c: Task[Try[Int]] = b.attempt
val d: Try[Int] = c.unsafeRunSync(pool)
```
当任务 `a` 被评估时,字符串到整数的转换会失败,抛出 `NumberFormatException`。`Task` 会处理这个异常并将其存储在 `Failure` 中。
`Task` 为 `IO` 添加了一个错误通道,类似于 `Try` 和 `Either` 为非效果计算提供的错误处理方式。我们可以定义一个 `Monad[Task]` 实例,它在遇到 `Failure` 时会短路。`Task` 还提供了 `handleErrorWith` 组合器:
```scala
def handleErrorWith(h: Throwable => Task[A]): Task[A] =
attempt.flatMap:
case Failure(t) => h(t)
case Success(a) => Task(a)
```
当我们将 `Task` 与 `Stream` 一起使用时,可以像使用 `IO` 一样使用 `eval`:
```scala
val s: Stream[Task, Int] = Stream.eval(Task("asdf".toInt))
```
为了在 `Stream` 和 `Pull` API 中处理这类错误,我们可以为它们添加 `handleErrorWith` 方法:
```scala
enum Pull[+F[_], +O, +R]:
case Handle(
source: Pull[F, O, R],
handler: Throwable => Pull[F, O, R]
) extends Pull[F, O, R]
def handleErrorWith[F2[x] >: F[x], O2 >: O, R2 >: R](
handler: Throwable => Pull[F2, O2, R2]): Pull[F2, O2, R2] =
Pull.Handle(this, handler)
extension [F[_], O](self: Stream[F, O])
def handleErrorWith(handler: Throwable => Stream[F, O]): Stream[F, O] =
Pull.Handle(self, handler)
```
为了解释 `Handle` 构造函数,我们需要更新 `step` 方法的定义。由于 `step` 是为具有 `Monad` 实例的任意效果 `F[_]` 定义的,而 `Monad` 没有提供处理错误的能力,因此我们引入一个新的类型类 `MonadThrow`:
```scala
trait MonadThrow[F[_]] extends Monad[F]:
extension [A](fa: F[A])
def attempt: F[Try[A]]
def handleErrorWith(h: Throwable => F[A]): F[A] =
attempt.flatMap:
case Failure(t) => h(t)
case Success(a) => unit(a)
def raiseError[A](t: Throwable): F[A]
```
`MonadThrow` 扩展了 `Monad`,并添加了抛出和处理错误的能力。
更新后的 `step` 方法如下:
```scala
def step[F2[x] >: F[x], O2 >: O, R2 >: R](
using F: MonadThrow[F2]
): F2[Either[R2, (O2, Pull[F2, O2, R2])]] =
this match
...
case Handle(source, f) =>
source match
case Handle(s2, g) =>
s2.handleErrorWith(x =>
g(x).handleErrorWith(y => f(y))).step
case other =>
other.step
.map:
case Right((hd, tl)) => Right((hd, Handle(tl, f)))
case Left(r) => Left(r)
.handleErrorWith(t => f(t).step)
```
这个实现首先检查左嵌套的错误处理程序,并将其重写为右嵌套,就像处理左嵌套的 `flatMap` 调用一样。否则,它会对原始拉取进行步进,并通过委托给目标效果 `F` 的 `handleErrorWith` 方法来处理发生的任何错误。
有了 `handleErrorWith` 和 `raiseError` 之后,我们可以定义 `onComplete` 方法:
```scala
extension [F[_], O](self: Stream[F, O])
def onComplete(that: => Stream[F, O]): Stream[F, O] =
self.handleErrorWith(t => that ++ raiseError(t)) ++ that
```
`onComplete` 方法允许我们在源流完成后评估另一个流,无论源流是成功完成还是因错误而失败。
下面是一个使用 `onComplete` 打开和关闭文件的示例:
```scala
def acquire(path: String): Task[Source] =
Task(Source.fromFile(path))
def use(source: Source): Stream[Task, Unit] =
Stream.eval(Task(source.getLines))
.flatMap(itr => Stream.fromIterator(itr))
.mapEval(line => Task(println(line)))
def release(source: Source): Task[Unit] =
Task(source.close())
val printLines: Stream[Task, Unit] =
Stream.eval(acquire("path/to/file")).flatMap(resource =>
use(resource).onComplete(Stream.eval(release(resource))))
```
然而,这种方法有一个主要的局限性,即它不可组合。例如,如果我们将每行的打印操作提取到 `onComplete` 调用之后,当最终的 `mapEval` 操作失败时,`onComplete` 注册的错误处理程序不会被调用,资源也不会被释放。
添加错误处理支持会对 `Stream` 的消除形式产生更强的约束。由于 `IO` 没有 `MonadTh
0
0
复制全文
相关推荐










