使用Scala构建微服务:功能实现与事件溯源
立即解锁
发布时间: 2025-08-19 00:05:42 订阅数: 3 

Scala 2.13编程实战与进阶
### 使用 Scala 构建微服务:功能实现与事件溯源
#### 1. 功能代码实现
在实现微服务时,我们首先处理数据库更新操作。给定一个 `name -> count` 对的映射作为参数,我们将每个对转换为更新操作。代码如下:
```scala
$name".update.run
}.reduce(_ *> _)
Stream
.eval(
FC.setAutoCommit(false) *> updates *> FC.setAutoCommit(true)
)
.attempt.transact(transactor)
```
具体步骤如下:
1. 将 `name -> count` 对映射为更新操作,得到 `CollectionIO[Int]` 集合。
2. 使用 `cats Apply` 运算符将这些更新操作组合成一个 `CollectionIO[Int]`。
3. 由于 JDBC 默认启用自动提交,可能导致批量更新逐个执行和提交,从而出现部分订单完成的情况。为避免这种情况,我们将更新操作包装在流中,在更新前禁用自动提交,更新完成后再启用。
4. 提升结果的错误处理,并将其转换为可运行的 `IO`。
该方法的结果类型为 `Stream[IO, Either[Throwable, Unit]]`,其中流元素的类型编码了两种可能性:由于库存中商品不足而无法更新(`Left`)和成功更新(`Right`)。
#### 2. HTTP 接口实现
我们使用 `http4s` 库实现 HTTP 接口。`http4s` 基于 `FS2` 和 `Cats IO` 构建,与使用 `doobie` 实现的持久层有良好的交互。以下是实现步骤:
1. **导入必要的包**:
```scala
import cats.effect.IO
import fs2.Stream
import org.http4s._
import org.http4s.dsl.Http4sDsl
import org.http4s.circe._
import org.http4s.headers.`Content-Type`
import io.circe.generic.auto._
import io.circe.syntax._
```
2. **定义服务类**:
```scala
class Service(repo: Repository) extends Http4sDsl[IO] { ... }
```
服务类接收一个数据库仓库作为参数。
3. **定义路由**:
- **删除文章**:
```scala
val service: HttpService[IO] = HttpService[IO] {
case DELETE -> Root / "articles" / name if name.nonEmpty =>
val repoResult: IO[Boolean] = repo.deleteArticle(name)
val toResponse: Boolean => IO[Response[IO]] = if (_) NoContent() else NotFound()
val response: IO[Response[IO]] = repoResult.flatMap(toResponse)
response
}
```
这里使用 `http4s DSL` 解构请求并进行模式匹配,将 URL 的最后一部分映射到 `name`,并确保 `name` 不为空。根据仓库方法的返回值,返回相应的响应状态码。
- **创建文章**:
```scala
case POST -> Root / "articles" / name if name.nonEmpty =>
repo.createArticle(name).flatMap { if (_) NoContent() else Conflict() }
```
创建文章的方法与删除文章类似。
- **获取库存**:
```scala
case GET -> Root / "inventory" =>
val inventory: Stream[IO, Inventory] = repo.getInventory
renderInventory(inventory)
private def renderInventory(inventory: Stream[IO, Inventory]): IO[Response[IO]] = {
val json: Stream[IO, String] = inventory.map(_.asJson.noSpaces)
val response: IO[Response[IO]] = Ok(json, `Content-Type`(MediaType.`application/json`))
response
}
```
获取库存时,将返回的 `Map[String, Int]` 转换为 JSON,并使用 `Ok` 状态构造函数和隐式 `EntityEncoder` 将流转换为响应类型。
- **补货**:
```scala
case req @ POST -> Root / "restock" =>
val newState = for {
purchase <- Stream.eval(req.decodeJson[Restock])
_ <- repo.updateStock(purchase.inventory)
inventory <- repo.getInventory
} yield inventory
renderInventory(newState)
```
补货时,首先解码请求的 JSON 体,然后调用仓库的更新方法,最后读取新的库存状态并渲染响应。
#### 3. 服务启动与依赖注入
服务器代码需要导入一些新的包:
```scala
import org.http4s.server.blaze.BlazeBuilder
import scala.concurrent.ExecutionContext.Implicits.global
```
服务器定义如下:
```scala
object Server extends StreamApp[IO] {
override def stream(args: List[String], requestShutdown: IO[Unit]): Stream[IO, ExitCode] = {
val config: IO[Config] = Config.load("application.conf")
new ServerInstance(config).create().flatMap(_.serve)
}
}
class ServerInstance(config: IO[Config]) {
def create(): Stream[IO, BlazeBuilder[IO]] = {
for {
config <- Stream.eval(config)
transactor <- Stream.eval(DB.transactor(config.database))
_ <- Stream.eval(DB.initialize(transactor))
} yield BlazeBuilder[IO]
.bindHttp(config.server.port, config.server.host)
.mountService(new Service(new Repository(transactor)).service, "/")
}
}
```
这里使用了构造函数式依赖注入,将依赖作为构造函数参数传递。
#### 4. 测试
我们使用集成测试来检查整个系统的行为。具体步骤如下:
1. **添加配置**:在 `build.sbt` 中添加必要的配置,以让 SBT 正确识别集成测试。
2. **准备客户端和服务器**:
```scala
class ServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
private lazy val
```
0
0
复制全文


