异步编程
当程序需要与外部世界交互时,例如通过互联网与另一台机器通信,程序中的操作可能需要以不可预测的顺序发生。假设您的程序需要下载一个文件。我们希望启动下载操作,在等待下载完成的同时执行其他操作,然后在文件可用时恢复需要下载文件的代码。这种场景属于异步编程的范畴,有时也称为并发编程(因为从概念上讲,多件事同时发生)。
为了解决这些场景,Julia 提供了 Task
(也称为对称协程、轻量级线程、协作式多任务处理或一次性延续)。当一段计算工作(实际上是执行特定函数)被指定为 Task
时,就可以通过切换到另一个 Task
来中断它。原始 Task
可以在以后恢复,此时它将从中断的地方继续执行。乍一看,这可能与函数调用类似。但是,有两个关键区别。首先,切换任务不使用任何空间,因此可以进行任意次数的任务切换而不会占用调用堆栈。其次,任务之间的切换可以以任何顺序发生,这与函数调用不同,函数调用必须在控制权返回给调用函数之前完成执行。
基本 Task
操作
您可以将 Task
视为要执行的计算工作单元的句柄。它具有创建-启动-运行-完成的生命周期。任务是通过在要运行的 0 参数函数上调用 Task
构造函数或使用 @task
宏来创建的
julia> t = @task begin; sleep(5); println("done"); end
Task (runnable) @0x00007f13a40c0eb0
@task x
等价于 Task(()->x)
。
此任务将等待五秒钟,然后打印 done
。但是,它还没有开始运行。我们可以随时通过调用 schedule
来运行它
julia> schedule(t);
如果您在 REPL 中尝试此操作,您将看到 schedule
会立即返回。这是因为它只是将 t
添加到要运行的任务的内部队列中。然后,REPL 将打印下一个提示并等待更多输入。等待键盘输入提供了一个机会让其他任务运行,因此,此时 t
将启动。t
调用 sleep
,它设置计时器并停止执行。如果已调度其他任务,它们可以在此时运行。五秒钟后,计时器触发并重新启动 t
,您将看到打印出 done
。然后,t
就完成了。
wait
函数会阻塞调用任务,直到其他任务完成。因此,例如,如果您输入
julia> schedule(t); wait(t)
而不是只调用 schedule
,您将看到在下一个输入提示出现之前会暂停五秒钟。这是因为 REPL 在继续之前正在等待 t
完成。
通常,您希望创建一个任务并立即安排它,因此提供了 @async
宏来实现此目的 - @async x
等价于 schedule(@task x)
。
使用通道进行通信
在某些问题中,所需的各种工作不是自然地通过函数调用关联的;在需要完成的任务之间没有明显的“调用者”或“被调用者”。一个例子是生产者-消费者问题,其中一个复杂的过程正在生成值,另一个复杂的过程正在消耗它们。消费者不能简单地调用生产者函数来获取值,因为生产者可能还有更多值要生成,因此可能还没有准备好返回。使用任务,生产者和消费者都可以根据需要运行,并在必要时来回传递值。
Julia 提供了 Channel
机制来解决此问题。一个 Channel
是一个可等待的先进先出队列,可以有多个任务从它读取数据和写入数据。
让我们定义一个生产者任务,它通过 put!
调用来生成值。为了消耗值,我们需要在新的任务中安排生产者运行。可以使用接受一个 1 参数函数作为参数的特殊 Channel
构造函数来运行绑定到通道的任务。然后,我们可以 take!
从通道对象中反复取值
julia> function producer(c::Channel)
put!(c, "start")
for n=1:4
put!(c, 2n)
end
put!(c, "stop")
end;
julia> chnl = Channel(producer);
julia> take!(chnl)
"start"
julia> take!(chnl)
2
julia> take!(chnl)
4
julia> take!(chnl)
6
julia> take!(chnl)
8
julia> take!(chnl)
"stop"
一种理解这种行为的方式是,producer
能够多次返回。在 put!
调用之间,生产者的执行被暂停,而消费者拥有控制权。
返回的 Channel
可以用作 for
循环中的可迭代对象,在这种情况下,循环变量将获取所有生成的 value。循环在通道关闭时终止。
julia> for x in Channel(producer)
println(x)
end
start
2
4
6
8
stop
请注意,我们不必在生产者中显式关闭通道。这是因为将 Channel
绑定到 Task
的行为会将通道的打开生命周期与绑定任务的生命周期相关联。当任务终止时,通道对象会自动关闭。多个通道可以绑定到一个任务,反之亦然。
虽然 Task
构造函数需要一个 0 参数函数,但创建任务绑定通道的 Channel
方法需要一个接受单个类型为 Channel
的参数的函数。一种常见的模式是生产者是参数化的,在这种情况下,需要使用部分函数应用来创建一个 0 或 1 参数 匿名函数。
对于 Task
对象,这可以通过直接方式或使用方便宏来完成
function mytask(myarg)
...
end
taskHdl = Task(() -> mytask(7))
# or, equivalently
taskHdl = @task mytask(7)
为了协调更高级的工作分配模式,可以使用 bind
和 schedule
结合 Task
和 Channel
构造函数,来显式地将一组通道与一组生产者/消费者任务链接起来。
更多关于通道的信息
通道可以被视为管道,即它有一个写入端和一个读取端
不同任务中的多个写入者可以并发地通过
put!
调用写入同一个通道。不同任务中的多个读取者可以并发地通过
take!
调用读取数据。例如
# Given Channels c1 and c2, c1 = Channel(32) c2 = Channel(32) # and a function `foo` which reads items from c1, processes the item read # and writes a result to c2, function foo() while true data = take!(c1) [...] # process data put!(c2, result) # write out result end end # we can schedule `n` instances of `foo` to be active concurrently. for _ in 1:n errormonitor(@async foo()) end
通道可以通过
Channel{T}(sz)
构造函数创建。通道将只保存类型为T
的对象。如果类型未指定,则通道可以保存任何类型对象。sz
指的是通道在任何时间可以保存的最大元素数量。例如,Channel(32)
创建一个可以保存最多 32 个任何类型对象的通道。Channel{MyType}(64)
可以在任何时间保存最多 64 个MyType
类型的对象。一个
Channel
最初处于打开状态。这意味着它可以通过take!
和put!
调用自由地进行读取和写入。close
关闭一个Channel
。在一个关闭的Channel
上,put!
将失败。例如julia> c = Channel(2); julia> put!(c, 1) # `put!` on an open channel succeeds 1 julia> close(c); julia> put!(c, 2) # `put!` on a closed channel throws an exception. ERROR: InvalidStateException: Channel is closed. Stacktrace: [...]
take!
和fetch
(它检索但不删除值)在关闭的通道上成功返回任何现有值,直到它被清空。继续上面的例子julia> fetch(c) # Any number of `fetch` calls succeed. 1 julia> fetch(c) 1 julia> take!(c) # The first `take!` removes the value. 1 julia> take!(c) # No more data available on a closed channel. ERROR: InvalidStateException: Channel is closed. Stacktrace: [...]
考虑一个使用通道进行任务间通信的简单例子。我们启动 4 个任务来处理来自单个jobs
通道的数据。作业由一个 id(job_id
)标识,并被写入通道。此模拟中的每个任务都读取一个job_id
,等待随机的时间量,然后将job_id
和模拟时间组成的元组写入结果通道。最后,所有results
都被打印出来。
julia> const jobs = Channel{Int}(32);
julia> const results = Channel{Tuple}(32);
julia> function do_work()
for job_id in jobs
exec_time = rand()
sleep(exec_time) # simulates elapsed time doing actual work
# typically performed externally.
put!(results, (job_id, exec_time))
end
end;
julia> function make_jobs(n)
for i in 1:n
put!(jobs, i)
end
end;
julia> n = 12;
julia> errormonitor(@async make_jobs(n)); # feed the jobs channel with "n" jobs
julia> for i in 1:4 # start 4 tasks to process requests in parallel
errormonitor(@async do_work())
end
julia> @elapsed while n > 0 # print out results
job_id, exec_time = take!(results)
println("$job_id finished in $(round(exec_time; digits=2)) seconds")
global n = n - 1
end
4 finished in 0.22 seconds
3 finished in 0.45 seconds
1 finished in 0.5 seconds
7 finished in 0.14 seconds
2 finished in 0.78 seconds
5 finished in 0.9 seconds
9 finished in 0.36 seconds
6 finished in 0.87 seconds
8 finished in 0.79 seconds
10 finished in 0.64 seconds
12 finished in 0.5 seconds
11 finished in 0.97 seconds
0.029772311
除了errormonitor(t)
之外,一个更健壮的解决方案可能是使用bind(results, t)
,因为它不仅会记录任何意外失败,还会强制关联资源关闭并传播异常到所有地方。
更多任务操作
任务操作建立在一个称为yieldto
的底层原语之上。yieldto(task, value)
挂起当前任务,切换到指定的task
,并导致该任务的最后一个yieldto
调用返回指定value
。注意yieldto
是使用任务风格控制流所需的唯一操作;我们不是调用和返回,而是一直切换到不同的任务。这就是为什么这个特性也被称为“对称协程”的原因;每个任务都使用相同的机制进行切换。
yieldto
功能强大,但大多数使用任务的操作都不会直接调用它。考虑为什么可能是这样。如果您从当前任务切换出去,您可能希望在某个时候切换回它,但知道何时切换回来,以及知道哪个任务有切换回来的责任,可能需要相当多的协调。例如,put!
和take!
是阻塞操作,它们在通道的上下文中使用时会维护状态以记住谁是消费者。不需要手动跟踪消费任务,这使得put!
比底层的yieldto
更容易使用。
除了yieldto
之外,还需要一些其他基本函数才能有效地使用任务。
current_task
获取对当前正在运行任务的引用。istaskdone
查询任务是否已退出。istaskstarted
查询任务是否已运行。task_local_storage
操作特定于当前任务的键值存储。
任务和事件
大多数任务切换都是由于等待 I/O 请求等事件而发生的,并由 Julia Base 中包含的调度器执行。调度器维护一个可运行任务的队列,并执行一个事件循环,根据消息到达等外部事件重新启动任务。
等待事件的基本函数是wait
。几个对象实现了wait
;例如,给定一个Process
对象,wait
将等待它退出。 wait
通常是隐式的;例如,一个wait
可能发生在对read
的调用中,以等待数据可用。
在所有这些情况下,wait
最终操作一个Condition
对象,该对象负责队列和重新启动任务。当一个任务在Condition
上调用wait
时,该任务被标记为不可运行,被添加到条件的队列中,并切换到调度器。调度器随后将选择另一个任务来运行,或者阻塞等待外部事件。如果一切顺利,最终一个事件处理程序将在该条件上调用notify
,这将导致等待该条件的任务再次变得可运行。
通过调用Task
显式创建的任务最初是调度器未知的。这使您能够使用yieldto
手动管理任务,如果您愿意的话。但是,当这样的任务等待事件时,它仍然会在事件发生时自动重新启动,正如您所期望的那样。