8、Akka任务调度(Dispatcher)
Dispatcher解析
Dispatcher是将如何执行任务与何时运行任务两者解耦,所有Actor或Future的工作都是由 (Executor或Dispatcher)分配的资源来完成的。一般来说,Dispatcher会包含一些线程, 这些线程会负责调度并运行任务, 比如处理 Actor 的消息以及线程中的Future事件。
不自己创建Dispatcher时使用的是默认Dispathcer,如果任务中有些耗时任务,容易将默认线程池打满,影响其他任务的调度。
Excutor
Dispatcher 基于 Executor,所以在具体介绍 Dispatcher 之前,我们将介绍两种主要的Executor类型:ForkJoinPool
和ThreadPool
。
- ThreadPool Excutor: 有一个工作队列,队列中包含了分配给各队列的工作,线程空闲时就从队列中认领工作,允许线程重用,减少线程分配和销毁的开销。
- ForkJoinPool Excutor::使用一种分治算法,递归地将任务分隔成更小的子任务,然后把子任务分配给不同的线程运行,最后将运行结果组合起来。
ForkJoinPool的Executor几乎总是比ThreadPool的Executor效率更高,是我们的默认选择。
创建Dispatcher
1. 在application.conf文件中定义一个Dispatcher。
my-dispatcher {
# Dispatcher的类型,选择符合场景的Dispatcher,这里使用的是默认类型Dispatcher
type = Dispatcher
# 定义Excutor类型,上面也介绍过两种Excutor
executor = "fork-join-executor"
fork-join-executor {
# 单核最少线程数,Excutor中最少线程数=parallelism-min*parallelism-factor
parallelism-min = 2
# 此项最好配置成机器的CPU核数
parallelism-factor = 8.0
# 单核最多线程数,Excutor中最多线程数=parallelism-max*parallelism-factor
parallelism-max = 1000
}
# 跳到另一个Acotr之前每个Actor最多处理消息数量,这个参数为了防止单个Acotr一直占用线程,设置此Acotr最多执行多少消息就要被调度
throughput = 1000
}
2. 通过application.confg中配置好的Dispatcher创建Dispatcher对象
system.actorOf(Props[MyActor].withDispatcher("my-pinned-dispatcher"))
在上面Type使用的是默认的Dispatcher,在Akka中有四种类型的 Dispatcher 可以用于描述如何在 Actor 之间共享线程:
Type | 特征 |
---|---|
Dispatcher | 默认的 Dispatcher 类型。将会使用定义的 Executor,在 Actor 中处理消息。在大多数情况下,这种类型能够提供最好的性能。 |
PinnedDispatcher | 给 每 个 Actor 都分配自己独有的线程。 这种类型的 Dispatcher 为每个 Actor 都创建一个 ThreadPool Executor, 每个 Executor 中都包含一个线程。 如果希望确保每个 Actor 都能够立即响应,那么这似乎是个不错的方法。不过PinnedDispatcher 比其他共享资源的方法效率更高的情况其实并不多。可以在单个 Actor 必须处理很多重要工作的时候试试这种类型的 Dispatcher,否则的话不推荐使用。 |
CallingThreadDispatcher | 这个 Dispatcher 比较特殊,它没有 Executor,而是在发起调用的线程上执行工作。这种 Dispatcher 主要用于测试,特别是调试。由于发起调用的线程负责完成工作, 所以清楚地看到栈追踪信息, 了解所执行方法的完整上下文。这对于理解异常是非常有用的。每个 Actor 会获取一个锁,所以每次只有一个线程可以在 Actor 中执行代码,而如果多个线程向一个 Actor 发送信息的话, 就会导致除了拥有锁的线程之外的所有线程处于等待状态。 本书前面介绍过的 TestActorRef 就是基于 CallingThreadDispatcher 实现支持在测试中同步执行工作的。 |
BalancingDispatcher | BalancingDispatcher 有一点很特殊:Pool 中的所有Actor 都共享同一个邮箱,并且会为 Pool 中的每个 Actor 都创建一个线程。使用 BalancingDispatcher 的 Actor 从邮箱中拉取消息,所以只要有 Actor 处于空闲状态,就不会有任何 Actor 的工作队列中存在任务。这是工作窃取的一个变种,所有 Actor 都会从一个共享的邮箱中拉取任务。两者在性能上的优点也类似。 |
参考文献
- 《Akka入门与实践》
关注公众号:Data Porter
回复Akka
领取《Akka入门与实践》书籍
正文到此结束