深入探索Akka中Actors与STM的融合应用
立即解锁
发布时间: 2025-08-19 02:34:06 阅读量: 84 订阅数: 23 

# 深入探索 Akka 中 Actors 与 STM 的融合应用
## 1. Actors 与 STM 融合的背景与需求
在并发编程中,Actors 是一种强大的模型,它允许我们隔离可变状态。当一个问题可以分解为独立运行的并发任务,并且这些任务可以通过消息进行异步通信时,Actors 表现出色。然而,Actors 本身并不提供跨任务的一致性管理机制。我们可能希望两个或多个 Actor 的操作要么全部成功,要么全部失败,即实现事务性的操作。为了实现这一目标,我们可以将软件事务内存(STM)与 Actors 结合使用。
以账户转账为例,存款和取款操作可以独立应用于单个账户,因此可以使用简单的 Actor 或类型化 Actor 来实现账户类。但转账操作需要协调两个账户的存款和取款,这就需要事务性的支持,以确保转账操作的原子性。
Akka 提供了几种将 Actors 与 STM 融合的方式,我们可以创建一个单独的事务协调对象来管理各个 Actor 的事务顺序,也可以使用两种便捷的方式:使用事务 Actor(Transactors)和协调类型化 Actor(Coordinating Typed Actors)。
## 2. 使用事务 Actor(Transactors)
### 2.1 事务 Actor 概述
Akka 的事务 Actor 或事务性 Actor 将多个 Actor 的执行纳入事务的范畴。事务 Actor 使多个协调 Actor 对管理的 STM Ref 对象的更改具有原子性。只有当包含这些更改的事务提交时,这些更改才会被保留;否则,更改将被丢弃。
事务 Actor 提供了三种处理消息的选项:
- 默认选项是在其自己的事务中处理消息。
- 可以实现 `normally()` 方法来独立处理特定消息,即不将其作为任何事务的一部分。
- 可以要求消息以协调方式处理,即作为一个总体事务的一部分。
事务 Actor 还提供了可选的事务前和事务后方法,可用于准备事务和执行提交后的处理。
### 2.2 在 Java 中使用事务 Actor
#### 2.2.1 定义消息类
首先,我们需要定义一些消息类来表示不同的操作:
```java
// Deposit.java
public class Deposit {
public final int amount;
public Deposit(final int theAmount) { amount = theAmount; }
}
// Withdraw.java
public class Withdraw {
public final int amount;
public Withdraw(final int theAmount) { amount = theAmount; }
}
// FetchBalance.java
public class FetchBalance {}
// Balance.java
public class Balance {
public final int amount;
public Balance(final int theBalance) { amount = theBalance; }
}
// Transfer.java
public class Transfer {
public final ActorRef from;
public final ActorRef to;
public final int amount;
public Transfer(final ActorRef fromAccount,
final ActorRef toAccount, final int theAmount) {
from = fromAccount;
to = toAccount;
amount = theAmount;
}
}
```
#### 2.2.2 实现账户事务 Actor
```java
// Account.java
public class Account extends UntypedTransactor {
private final Ref<Integer> balance = new Ref<Integer>(0);
public void atomically(final Object message) {
if(message instanceof Deposit) {
int amount = ((Deposit)(message)).amount;
if (amount > 0) {
balance.swap(balance.get() + amount);
System.out.println("Received Deposit request " + amount);
}
}
if(message instanceof Withdraw) {
int amount = ((Withdraw)(message)).amount;
System.out.println("Received Withdraw request " + amount);
if (amount > 0 && balance.get() >= amount)
balance.swap(balance.get() - amount);
else {
System.out.println("...insufficient funds...");
throw new RuntimeException("Insufficient fund");
}
}
if(message instanceof FetchBalance) {
getContext().replySafe(new Balance(balance.get()));
}
}
}
```
在 `Account` 类中,`atomically()` 方法在事务上下文中运行。如果收到 `Deposit` 消息,会增加账户余额;如果收到 `Withdraw` 消息,只有在余额充足时才会减少余额,否则抛出异常,触发事务回滚;如果收到 `FetchBalance` 消息,会回复当前余额。
#### 2.2.3 实现账户服务事务 Actor
```java
// AccountService.java
public class AccountService extends UntypedTransactor {
@Override public Set<SendTo> coordinate(final Object message) {
if(message instanceof Transfer) {
Set<SendTo> coordinations = new java.util.HashSet<SendTo>();
Transfer transfer = (Transfer) message;
coordinations.add(sendTo(transfer.to, new Deposit(transfer.amount)));
coordinations.add(sendTo(transfer.from,
new Withdraw(transfer.amount)));
return java.util.Collections.unmodifiableSet(coordinations);
}
return nobody();
}
public void atomically(final Object message) {}
}
```
`AccountService` 类的 `coordinate()` 方法负责协调两个账户的存款和取款操作。它将相应的消息发送给两个账户的事务 Actor,并返回一个包含这些协调信息的集合。
#### 2.2.4 使用示例
```java
// UseAccountService.java
public class UseAccountService {
public static void printBalance(
final String accountName, final ActorRef account) {
Balance balance =
(Balance)(account.sendRequestReply(new FetchBalance()));
System.out.println(accountName + " balance is " + balance.amount);
}
public static void main(final String[] args)
throws InterruptedException {
final ActorRef account1 = Actors.actorOf(Account.class).start();
final ActorRef account2 = Actors.actorOf(Account.class).start();
final ActorRef accountService =
Actors.actorOf(AccountService.class).start();
account1.sendOneWay(new Deposit(1000));
account2.sendOneWay(new Deposit(1000));
Thread.sleep(1000);
printBalance("Account1", account1);
printBalance("Account2", account2);
System.out.println("Let's transfer $20... should succeed");
accountService.sendOneWay(new Transfer(account1, account2, 20));
Thread.sleep(1000);
printBalance("Account1", account1);
printBalance("Account2", account2);
System.out.println("Let's transfe
```
0
0
复制全文
相关推荐








