异步编程

当程序需要与外部世界交互时,例如通过互联网与另一台机器通信,程序中的操作可能需要以不可预测的顺序发生。假设您的程序需要下载一个文件。我们希望启动下载操作,在等待下载完成的同时执行其他操作,然后在文件可用时恢复需要下载文件的代码。这种场景属于异步编程的范畴,有时也称为并发编程(因为从概念上讲,多件事同时发生)。

为了解决这些场景,Julia 提供了 Task(也称为对称协程、轻量级线程、协作式多任务处理或一次性延续)。当一段计算工作(实际上是执行特定函数)被指定为 Task 时,就可以通过切换到另一个 Task 来中断它。原始 Task 可以在以后恢复,此时它将从中断的地方继续执行。乍一看,这可能与函数调用类似。但是,有两个关键区别。首先,切换任务不使用任何空间,因此可以进行任意次数的任务切换而不会占用调用堆栈。其次,任务之间的切换可以以任何顺序发生,这与函数调用不同,函数调用必须在控制权返回给调用函数之前完成执行。

基本 Task 操作

您可以将 Task 视为要执行的计算工作单元的句柄。它具有创建-启动-运行-完成的生命周期。任务是通过在要运行的 0 参数函数上调用 Task 构造函数或使用 @task 宏来创建的

julia> t = @task begin; sleep(5); println("done"); end
Task (runnable) @0x00007f13a40c0eb0

@task x 等价于 Task(()->x)

此任务将等待五秒钟,然后打印 done。但是,它还没有开始运行。我们可以随时通过调用 schedule 来运行它

julia> schedule(t);

如果您在 REPL 中尝试此操作,您将看到 schedule 会立即返回。这是因为它只是将 t 添加到要运行的任务的内部队列中。然后,REPL 将打印下一个提示并等待更多输入。等待键盘输入提供了一个机会让其他任务运行,因此,此时 t 将启动。t 调用 sleep,它设置计时器并停止执行。如果已调度其他任务,它们可以在此时运行。五秒钟后,计时器触发并重新启动 t,您将看到打印出 done。然后,t 就完成了。

wait 函数会阻塞调用任务,直到其他任务完成。因此,例如,如果您输入

julia> schedule(t); wait(t)

而不是只调用 schedule,您将看到在下一个输入提示出现之前会暂停五秒钟。这是因为 REPL 在继续之前正在等待 t 完成。

通常,您希望创建一个任务并立即安排它,因此提供了 @async 宏来实现此目的 - @async x 等价于 schedule(@task x)

使用通道进行通信

在某些问题中,所需的各种工作不是自然地通过函数调用关联的;在需要完成的任务之间没有明显的“调用者”或“被调用者”。一个例子是生产者-消费者问题,其中一个复杂的过程正在生成值,另一个复杂的过程正在消耗它们。消费者不能简单地调用生产者函数来获取值,因为生产者可能还有更多值要生成,因此可能还没有准备好返回。使用任务,生产者和消费者都可以根据需要运行,并在必要时来回传递值。

Julia 提供了 Channel 机制来解决此问题。一个 Channel 是一个可等待的先进先出队列,可以有多个任务从它读取数据和写入数据。

让我们定义一个生产者任务,它通过 put! 调用来生成值。为了消耗值,我们需要在新的任务中安排生产者运行。可以使用接受一个 1 参数函数作为参数的特殊 Channel 构造函数来运行绑定到通道的任务。然后,我们可以 take! 从通道对象中反复取值

julia> function producer(c::Channel)
           put!(c, "start")
           for n=1:4
               put!(c, 2n)
           end
           put!(c, "stop")
       end;

julia> chnl = Channel(producer);

julia> take!(chnl)
"start"

julia> take!(chnl)
2

julia> take!(chnl)
4

julia> take!(chnl)
6

julia> take!(chnl)
8

julia> take!(chnl)
"stop"

一种理解这种行为的方式是,producer 能够多次返回。在 put! 调用之间,生产者的执行被暂停,而消费者拥有控制权。

返回的 Channel 可以用作 for 循环中的可迭代对象,在这种情况下,循环变量将获取所有生成的 value。循环在通道关闭时终止。

julia> for x in Channel(producer)
           println(x)
       end
start
2
4
6
8
stop

请注意,我们不必在生产者中显式关闭通道。这是因为将 Channel 绑定到 Task 的行为会将通道的打开生命周期与绑定任务的生命周期相关联。当任务终止时,通道对象会自动关闭。多个通道可以绑定到一个任务,反之亦然。

虽然 Task 构造函数需要一个 0 参数函数,但创建任务绑定通道的 Channel 方法需要一个接受单个类型为 Channel 的参数的函数。一种常见的模式是生产者是参数化的,在这种情况下,需要使用部分函数应用来创建一个 0 或 1 参数 匿名函数

对于 Task 对象,这可以通过直接方式或使用方便宏来完成

function mytask(myarg)
    ...
end

taskHdl = Task(() -> mytask(7))
# or, equivalently
taskHdl = @task mytask(7)

为了协调更高级的工作分配模式,可以使用 bindschedule 结合 TaskChannel 构造函数,来显式地将一组通道与一组生产者/消费者任务链接起来。

更多关于通道的信息

通道可以被视为管道,即它有一个写入端和一个读取端

  • 不同任务中的多个写入者可以并发地通过 put! 调用写入同一个通道。

  • 不同任务中的多个读取者可以并发地通过 take! 调用读取数据。

  • 例如

    # Given Channels c1 and c2,
    c1 = Channel(32)
    c2 = Channel(32)
    
    # and a function `foo` which reads items from c1, processes the item read
    # and writes a result to c2,
    function foo()
        while true
            data = take!(c1)
            [...]               # process data
            put!(c2, result)    # write out result
        end
    end
    
    # we can schedule `n` instances of `foo` to be active concurrently.
    for _ in 1:n
        errormonitor(@async foo())
    end
  • 通道可以通过Channel{T}(sz)构造函数创建。通道将只保存类型为T的对象。如果类型未指定,则通道可以保存任何类型对象。sz指的是通道在任何时间可以保存的最大元素数量。例如,Channel(32)创建一个可以保存最多 32 个任何类型对象的通道。Channel{MyType}(64)可以在任何时间保存最多 64 个MyType类型的对象。

  • 如果一个Channel为空,读取器(在take!调用中)将阻塞,直到数据可用。

  • 如果一个Channel已满,写入器(在put!调用中)将阻塞,直到空间可用。

  • isready测试通道中是否存在任何对象,而wait等待对象变得可用。

  • 一个Channel最初处于打开状态。这意味着它可以通过take!put!调用自由地进行读取和写入。 close关闭一个Channel。在一个关闭的Channel上,put!将失败。例如

    julia> c = Channel(2);
    
    julia> put!(c, 1) # `put!` on an open channel succeeds
    1
    
    julia> close(c);
    
    julia> put!(c, 2) # `put!` on a closed channel throws an exception.
    ERROR: InvalidStateException: Channel is closed.
    Stacktrace:
    [...]
  • take!fetch(它检索但不删除值)在关闭的通道上成功返回任何现有值,直到它被清空。继续上面的例子

    julia> fetch(c) # Any number of `fetch` calls succeed.
    1
    
    julia> fetch(c)
    1
    
    julia> take!(c) # The first `take!` removes the value.
    1
    
    julia> take!(c) # No more data available on a closed channel.
    ERROR: InvalidStateException: Channel is closed.
    Stacktrace:
    [...]

考虑一个使用通道进行任务间通信的简单例子。我们启动 4 个任务来处理来自单个jobs通道的数据。作业由一个 id(job_id)标识,并被写入通道。此模拟中的每个任务都读取一个job_id,等待随机的时间量,然后将job_id和模拟时间组成的元组写入结果通道。最后,所有results都被打印出来。

julia> const jobs = Channel{Int}(32);

julia> const results = Channel{Tuple}(32);

julia> function do_work()
           for job_id in jobs
               exec_time = rand()
               sleep(exec_time)                # simulates elapsed time doing actual work
                                               # typically performed externally.
               put!(results, (job_id, exec_time))
           end
       end;

julia> function make_jobs(n)
           for i in 1:n
               put!(jobs, i)
           end
       end;

julia> n = 12;

julia> errormonitor(@async make_jobs(n)); # feed the jobs channel with "n" jobs

julia> for i in 1:4 # start 4 tasks to process requests in parallel
           errormonitor(@async do_work())
       end

julia> @elapsed while n > 0 # print out results
           job_id, exec_time = take!(results)
           println("$job_id finished in $(round(exec_time; digits=2)) seconds")
           global n = n - 1
       end
4 finished in 0.22 seconds
3 finished in 0.45 seconds
1 finished in 0.5 seconds
7 finished in 0.14 seconds
2 finished in 0.78 seconds
5 finished in 0.9 seconds
9 finished in 0.36 seconds
6 finished in 0.87 seconds
8 finished in 0.79 seconds
10 finished in 0.64 seconds
12 finished in 0.5 seconds
11 finished in 0.97 seconds
0.029772311

除了errormonitor(t)之外,一个更健壮的解决方案可能是使用bind(results, t),因为它不仅会记录任何意外失败,还会强制关联资源关闭并传播异常到所有地方。

更多任务操作

任务操作建立在一个称为yieldto的底层原语之上。yieldto(task, value)挂起当前任务,切换到指定的task,并导致该任务的最后一个yieldto调用返回指定value。注意yieldto是使用任务风格控制流所需的唯一操作;我们不是调用和返回,而是一直切换到不同的任务。这就是为什么这个特性也被称为“对称协程”的原因;每个任务都使用相同的机制进行切换。

yieldto功能强大,但大多数使用任务的操作都不会直接调用它。考虑为什么可能是这样。如果您从当前任务切换出去,您可能希望在某个时候切换回它,但知道何时切换回来,以及知道哪个任务有切换回来的责任,可能需要相当多的协调。例如,put!take!是阻塞操作,它们在通道的上下文中使用时会维护状态以记住谁是消费者。不需要手动跟踪消费任务,这使得put!比底层的yieldto更容易使用。

除了yieldto之外,还需要一些其他基本函数才能有效地使用任务。

任务和事件

大多数任务切换都是由于等待 I/O 请求等事件而发生的,并由 Julia Base 中包含的调度器执行。调度器维护一个可运行任务的队列,并执行一个事件循环,根据消息到达等外部事件重新启动任务。

等待事件的基本函数是wait。几个对象实现了wait;例如,给定一个Process对象,wait将等待它退出。 wait通常是隐式的;例如,一个wait可能发生在对read的调用中,以等待数据可用。

在所有这些情况下,wait最终操作一个Condition对象,该对象负责队列和重新启动任务。当一个任务在Condition上调用wait时,该任务被标记为不可运行,被添加到条件的队列中,并切换到调度器。调度器随后将选择另一个任务来运行,或者阻塞等待外部事件。如果一切顺利,最终一个事件处理程序将在该条件上调用notify,这将导致等待该条件的任务再次变得可运行。

通过调用Task显式创建的任务最初是调度器未知的。这使您能够使用yieldto手动管理任务,如果您愿意的话。但是,当这样的任务等待事件时,它仍然会在事件发生时自动重新启动,正如您所期望的那样。