任务

Core.Task类型
Task(func)

创建一个 Task (即协程) 来执行给定的函数 func (该函数必须是可调用且不带参数的)。当此函数返回时,任务退出。该任务将在构造时从父任务运行在 "世界年龄" 中,并在被 schedule 时运行。

警告

默认情况下,任务的粘性位将设置为 true (t.sticky)。这模拟了 @async 的历史默认值。粘性任务只能在其首次调度到的工作线程上运行。要获得 Threads.@spawn 的行为,请手动将粘性位设置为 false

示例

julia> a() = sum(i for i in 1:1000);

julia> b = Task(a);

在此示例中,b 是一个可运行的 Task,但尚未开始运行。

源代码
Base.@task
@task

将表达式包装在一个 Task 中,但不执行它,并返回 Task。这只会创建一个任务,而不会运行它。

示例

julia> a1() = sum(i for i in 1:1000);

julia> b = @task a1();

julia> istaskstarted(b)
false

julia> schedule(b);

julia> yield();

julia> istaskdone(b)
true
源代码
Base.@async
@async

将表达式包装在一个 Task 中,并将其添加到本地机器的调度程序队列中。

可以使用 $ 将值插值到 @async 中,这会将值直接复制到构造的底层闭包中。这允许您插入变量的 *值*,从而将异步代码与当前任务中变量值的更改隔离开来。

警告

强烈建议始终优先使用 Threads.@spawn 而不是 @async,即使不需要并行,特别是在公开发布的库中。这是因为使用 @async 会在 Julia 的当前实现中禁用 *父* 任务在工作线程之间的迁移。因此,在库函数中看似无害地使用 @async 会对用户应用程序中非常不同的部分的性能产生重大影响。

Julia 1.4

从 Julia 1.4 开始,可以使用 $ 插值值。

源代码
Base.asyncmap函数
asyncmap(f, c...; ntasks=0, batch_size=nothing)

使用多个并发任务将 f 映射到集合 (或多个等长集合) 上。对于多个集合参数,f 将按元素应用。

ntasks 指定并发运行的任务数量。根据集合的长度,如果未指定 ntasks,则最多将使用 100 个任务进行并发映射。

ntasks 也可以指定为一个零参数函数。在这种情况下,将在处理每个元素之前检查并行运行的任务数量,如果 ntasks_func 的值大于当前任务数量,则会启动一个新任务。

如果指定了 batch_size,则集合将以批处理模式进行处理。在这种情况下,f 必须是一个函数,该函数必须接受一个参数元组的 Vector 并且必须返回一个结果向量。输入向量将具有 batch_size 或更小的长度。

以下示例通过返回执行映射函数的任务的 objectid 来突出显示在不同任务中的执行。

首先,当 ntasks 未定义时,每个元素都在不同的任务中处理。

julia> tskoid() = objectid(current_task());

julia> asyncmap(x->tskoid(), 1:5)
5-element Array{UInt64,1}:
 0x6e15e66c75c75853
 0x440f8819a1baa682
 0x9fb3eeadd0c83985
 0xebd3e35fe90d4050
 0x29efc93edce2b961

julia> length(unique(asyncmap(x->tskoid(), 1:5)))
5

ntasks=2 时,所有元素都在 2 个任务中处理。

julia> asyncmap(x->tskoid(), 1:5; ntasks=2)
5-element Array{UInt64,1}:
 0x027ab1680df7ae94
 0xa23d2f80cd7cf157
 0x027ab1680df7ae94
 0xa23d2f80cd7cf157
 0x027ab1680df7ae94

julia> length(unique(asyncmap(x->tskoid(), 1:5; ntasks=2)))
2

batch_size 定义时,映射函数需要更改为接受一个参数元组数组并返回一个结果数组。map 在修改后的映射函数中使用以实现此目的。

julia> batch_func(input) = map(x->string("args_tuple: ", x, ", element_val: ", x[1], ", task: ", tskoid()), input)
batch_func (generic function with 1 method)

julia> asyncmap(batch_func, 1:5; ntasks=2, batch_size=2)
5-element Array{String,1}:
 "args_tuple: (1,), element_val: 1, task: 9118321258196414413"
 "args_tuple: (2,), element_val: 2, task: 4904288162898683522"
 "args_tuple: (3,), element_val: 3, task: 9118321258196414413"
 "args_tuple: (4,), element_val: 4, task: 4904288162898683522"
 "args_tuple: (5,), element_val: 5, task: 9118321258196414413"
源代码
Base.asyncmap!函数
asyncmap!(f, results, c...; ntasks=0, batch_size=nothing)

类似于 asyncmap,但将输出存储在 results 中,而不是返回一个集合。

警告

当任何被修改的参数与任何其他参数共享内存时,行为可能出乎意料。

源代码
Base.istaskdone函数
istaskdone(t::Task) -> Bool

确定任务是否已退出。

示例

julia> a2() = sum(i for i in 1:1000);

julia> b = Task(a2);

julia> istaskdone(b)
false

julia> schedule(b);

julia> yield();

julia> istaskdone(b)
true
源代码
Base.istaskstarted函数
istaskstarted(t::Task) -> Bool

确定任务是否已开始执行。

示例

julia> a3() = sum(i for i in 1:1000);

julia> b = Task(a3);

julia> istaskstarted(b)
false
源代码
Base.istaskfailed函数
istaskfailed(t::Task) -> Bool

确定任务是否已退出,因为抛出了异常。

示例

julia> a4() = error("task failed");

julia> b = Task(a4);

julia> istaskfailed(b)
false

julia> schedule(b);

julia> yield();

julia> istaskfailed(b)
true
Julia 1.3

此函数需要至少为 Julia 1.3。

源代码
Base.task_local_storage方法
task_local_storage(body, key, value)

使用修改后的任务局部存储调用函数 body,其中 value 分配给 key;之后会恢复 key 的先前值 (如果存在)。对于模拟动态作用域很有用。

源代码

调度

Base.yield函数
yield()

切换到调度程序,以允许另一个已调度任务运行。调用此函数的任务仍然是可运行的,如果没有任何其他可运行的任务,它将立即重新启动。

源代码
yield(t::Task, arg = nothing)

schedule(t, arg); yield() 的快速、不公平调度版本,在调用调度程序之前立即让出给 t

源代码
Base.yieldto函数
yieldto(t::Task, arg = nothing)

切换到给定的任务。任务第一次被切换时,将使用无参数调用该任务的函数。在后续切换中,arg 将从该任务对 yieldto 的最后一次调用中返回。这是一个低级调用,它只切换任务,而不考虑状态或调度。不鼓励使用它。

源代码
Base.sleep函数
sleep(seconds)

阻塞当前任务指定的秒数。最短睡眠时间为 1 毫秒或 0.001 的输入。

源代码
Base.schedule函数
schedule(t::Task, [val]; error=false)

将一个 Task 添加到调度程序的队列中。这会导致该任务在系统空闲时不断运行,除非该任务执行阻塞操作,例如 wait

如果提供第二个参数 val,它将在任务再次运行时传递给该任务 (通过 yieldto 的返回值)。如果 errortrue,则该值将在唤醒的任务中作为异常引发。

警告

对已启动的任意 Task 使用 schedule 是不正确的。有关更多信息,请参阅 API 参考

示例

julia> a5() = sum(i for i in 1:1000);

julia> b = Task(a5);

julia> istaskstarted(b)
false

julia> schedule(b);

julia> yield();

julia> istaskstarted(b)
true

julia> istaskdone(b)
true
源代码

同步

Base.errormonitor函数
errormonitor(t::Task)

如果任务 t 失败,则将错误日志打印到 stderr

示例

julia> Base._wait(errormonitor(Threads.@spawn error("task failed")))
Unhandled Task ERROR: task failed
Stacktrace:
[...]
源代码
Base.@sync
@sync

等待所有词法上封闭的 @async@spawn@spawnat@distributed 使用完成。由封闭异步操作引发的所有异常将被收集并作为 CompositeException 抛出。

示例

julia> Threads.nthreads()
4

julia> @sync begin
           Threads.@spawn println("Thread-id $(Threads.threadid()), task 1")
           Threads.@spawn println("Thread-id $(Threads.threadid()), task 2")
       end;
Thread-id 3, task 1
Thread-id 1, task 2
源代码
Base.wait函数

关于 Threads.Condition 的特别说明

调用者必须持有拥有 Threads.Conditionlock,然后才能调用此方法。调用任务将被阻塞,直到其他任务唤醒它,通常通过调用同一个 Threads.Condition 对象上的 notify。锁将在阻塞时原子地释放(即使它被递归地锁定),并且将在返回之前重新获取。

源代码
wait(r::Future)

等待指定 Future 的值变为可用。

源代码
wait(r::RemoteChannel, args...)

等待指定 RemoteChannel 上的值变为可用。

源代码
wait([x])

阻塞当前任务,直到发生某些事件,具体取决于参数的类型

  • Channel: 等待通道上追加一个值。
  • Condition: 等待条件上的 notify 并返回传递给 notifyval 参数。在条件上等待还可以传递 first=true,这会导致等待者在 notify 上唤醒时排在第一位,而不是通常的先进先出行为。
  • Process: 等待进程或进程链退出。进程的 exitcode 字段可用于确定成功或失败。
  • Task: 等待 Task 完成。如果任务因异常而失败,则会抛出 TaskFailedException(它包装了失败的任务)。
  • RawFD: 等待文件描述符上的更改(参见 FileWatching 包)。

如果没有传递参数,则任务将无限期阻塞。只能通过显式调用 scheduleyieldto 重新启动任务。

通常在 while 循环中调用 wait 以确保在继续之前满足等待的条件。

源代码
wait(c::Channel)

阻塞,直到 Channel isready

julia> c = Channel(1);

julia> isready(c)
false

julia> task = Task(() -> wait(c));

julia> schedule(task);

julia> istaskdone(task)  # task is blocked because channel is not ready
false

julia> put!(c, 1);

julia> istaskdone(task)  # task is now unblocked
true
源代码
Base.timedwait函数
timedwait(testcb, timeout::Real; pollint::Real=0.1)

等待直到 testcb() 返回 truetimeout 秒过去,以先发生者为准。测试函数每 pollint 秒轮询一次。pollint 的最小值为 0.001 秒,即 1 毫秒。

返回 :ok:timed_out

源代码
Base.Condition类型
Condition()

创建一个边沿触发的事件源,任务可以等待该事件源。调用 wait 等待 Condition 的任务将被挂起并排队。当 notify 稍后在 Condition 上被调用时,任务会被唤醒。边沿触发意味着只有在调用 notify 时正在等待的任务才能被唤醒。对于电平触发的通知,您必须保持额外的状态来跟踪通知是否发生。ChannelThreads.Event 类型会执行此操作,并且可以用于电平触发的事件。

此对象不是线程安全的。有关线程安全版本的详细信息,请参见 Threads.Condition

源代码
Base.Threads.Condition类型
Threads.Condition([lock])

Base.Condition 的线程安全版本。

要调用 Threads.Condition 上的 waitnotify,您必须先调用其上的 lock。当调用 wait 时,锁将在阻塞期间原子地释放,并且将在 wait 返回之前重新获取。因此,Threads.Condition c 的惯用方式如下所示

lock(c)
try
    while !thing_we_are_waiting_for
        wait(c)
    end
finally
    unlock(c)
end
Julia 1.2

此功能需要至少 Julia 1.2。

源代码
Base.Event类型
Event([autoreset=false])

创建一个电平触发的事件源。调用 wait 等待 Event 的任务将被挂起并排队,直到在 Event 上调用 notify。在调用 notify 之后,Event 将保持在已发出信号的状态,并且任务将不再阻塞等待它,直到调用 reset

如果 autoreset 为真,则对于每次调用 notify,最多只有一个任务会从 wait 中释放。

这在 notify/wait 上提供了获取和释放内存排序。

Julia 1.1

此功能需要至少 Julia 1.1。

Julia 1.8

autoreset 功能和内存排序保证需要至少 Julia 1.8。

源代码
Base.notify函数
notify(condition, val=nothing; all=true, error=false)

唤醒等待条件的任务,并将 val 传递给它们。如果 alltrue(默认值),则唤醒所有等待的任务,否则只唤醒一个任务。如果 errortrue,则传递的值将在唤醒的任务中作为异常引发。

返回唤醒的任务数量。如果 condition 上没有任务在等待,则返回 0。

源代码
Base.reset方法
reset(::Event)

Event 重置回未设置状态。然后,对 wait 的任何未来调用都将阻塞,直到再次调用 notify

源代码
Base.Semaphore类型
Semaphore(sem_size)

创建一个计数信号量,该信号量允许最多 sem_size 个获取在任何时间处于使用状态。每次获取都必须与一个释放相匹配。

这在 acquire/release 调用上提供了获取和释放内存排序。

源代码
Base.acquire函数
acquire(s::Semaphore)

等待 sem_size 个许可证之一可用,直到可以获取一个许可证才阻塞。

源代码
acquire(f, s::Semaphore)

在从信号量 s 获取许可证后执行 f,并在完成或发生错误时释放许可证。

例如,一种 do-block 形式,它确保 foo 的调用只有 2 个同时处于活动状态。

s = Base.Semaphore(2)
@sync for _ in 1:100
    Threads.@spawn begin
        Base.acquire(s) do
            foo()
        end
    end
end
Julia 1.8

此方法需要至少 Julia 1.8。

源代码
Base.release函数
release(s::Semaphore)

将一个许可证返回到池中,可能允许另一个任务获取它并恢复执行。

源代码
Base.lock函数
lock(lock)

在锁可用时获取锁。如果锁已被其他任务/线程锁定,则等待它变为可用。

每个 lock 都必须与一个 unlock 相匹配。

源代码
lock(f::Function, lock)

获取锁,以持有锁的方式执行 f,并在 f 返回时释放锁。如果锁已被其他任务/线程锁定,则等待它变为可用。

当此函数返回时,锁已被释放,因此调用者不应尝试再次释放它。

Julia 1.7

Channel 作为第二个参数需要 Julia 1.7 或更高版本。

源代码
Base.unlock函数
unlock(lock)

释放对锁的所有权。

如果这是一个以前被获取的递归锁,则递减内部计数器并立即返回。

源代码
Base.trylock函数
trylock(lock) -> Success (Boolean)

如果锁可用,则获取锁,如果成功,则返回 true。如果锁已被其他任务/线程锁定,则返回 false

每个成功的 trylock 都必须与一个 unlock 相匹配。

函数 trylockislocked 相结合可以用于编写测试和测试和设置或指数回退算法,如果 typeof(lock) 支持它(请阅读其文档)。

源代码
Base.islocked函数
islocked(lock) -> Status (Boolean)

检查锁是否被任何任务/线程持有。此函数本身不应用于同步。但是,islockedtrylock 相结合可以用于编写测试和测试和设置或指数回退算法,如果 typeof(lock) 支持它(请阅读其文档)。

扩展帮助

例如,如果锁实现满足下面记录的属性,则指数回退可以按如下方式实现。

nspins = 0
while true
    while islocked(lock)
        GC.safepoint()
        nspins += 1
        nspins > LIMIT && error("timeout")
    end
    trylock(lock) && break
    backoff()
end

实现

建议锁实现定义 islocked 具有以下属性,并在其文档字符串中说明。

  • islocked(lock) 是无数据竞争的。
  • 如果 islocked(lock) 返回 false,则如果没有任何其他任务的干扰,trylock(lock) 的立即调用必须成功(返回 true)。
源代码
Base.ReentrantLock类型
ReentrantLock()

创建一个可重入锁,用于同步 Task。同一个任务可以根据需要多次获取锁(这就是名称中“可重入”部分的含义)。每个 lock 都必须与一个 unlock 相匹配。

调用 'lock' 还会阻止在该线程上运行终结器,直到相应的 'unlock'。应该自然地支持下面说明的标准锁模式,但要注意反转 try/lock 的顺序或完全缺少 try 块(例如,尝试在仍持有锁的情况下返回)

这在 lock/unlock 调用上提供了获取和释放内存排序。

lock(l)
try
    <atomic work>
finally
    unlock(l)
end

如果 !islocked(lck::ReentrantLock) 成立,则 trylock(lck) 将成功,除非有其他任务试图“同时”持有锁。

源代码

通道

Base.Channel类型
Channel{T=Any}(size::Int=0)

构造一个 Channel,其内部缓冲区可以容纳最大 size 个类型为 T 的对象。对已满通道的 put! 调用会阻塞,直到使用 take! 移除一个对象。

Channel(0) 构造一个无缓冲通道。put! 会阻塞,直到调用匹配的 take!。反之亦然。

其他构造函数

  • Channel():默认构造函数,等效于 Channel{Any}(0)
  • Channel(Inf):等效于 Channel{Any}(typemax(Int))
  • Channel(sz):等效于 Channel{Any}(sz)
Julia 1.3

默认构造函数 Channel() 和默认 size=0 在 Julia 1.3 中添加。

源代码
Base.Channel方法
Channel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false, threadpool=nothing)

func 创建一个新的任务,将其绑定到一个新的类型为 T、大小为 size 的通道,并在一次调用中调度任务。通道在任务终止时自动关闭。

func 必须接受绑定的通道作为其唯一参数。

如果您需要创建的任务的引用,请通过关键字参数 taskref 传递一个 Ref{Task} 对象。

如果 spawn=true,则为 func 创建的 Task 可能在另一个线程上并行调度,等效于通过 Threads.@spawn 创建任务。

如果 spawn=truethreadpool 参数未设置,则默认值为 :default

如果 threadpool 参数设置(为 :default:interactive),则意味着 spawn=true,新任务将被生成到指定的线程池中。

返回一个 Channel

示例

julia> chnl = Channel() do ch
           foreach(i -> put!(ch, i), 1:4)
       end;

julia> typeof(chnl)
Channel{Any}

julia> for i in chnl
           @show i
       end;
i = 1
i = 2
i = 3
i = 4

引用创建的任务

julia> taskref = Ref{Task}();

julia> chnl = Channel(taskref=taskref) do ch
           println(take!(ch))
       end;

julia> istaskdone(taskref[])
false

julia> put!(chnl, "Hello");
Hello

julia> istaskdone(taskref[])
true
Julia 1.3

spawn= 参数在 Julia 1.3 中添加。此构造函数在 Julia 1.3 中添加。在早期版本的 Julia 中,Channel 使用关键字参数来设置 sizeT,但这些构造函数已弃用。

Julia 1.9

threadpool= 参数在 Julia 1.9 中添加。

julia> chnl = Channel{Char}(1, spawn=true) do ch
           for c in "hello world"
               put!(ch, c)
           end
       end
Channel{Char}(1) (2 items available)

julia> String(collect(chnl))
"hello world"
源代码
Base.put!方法
put!(c::Channel, v)

将项目 v 附加到通道 c。如果通道已满,则会阻塞。

对于无缓冲通道,会阻塞,直到另一个任务执行 take!

Julia 1.1

现在,v 会使用 convert 转换为通道的类型,因为在调用 put! 时。

源代码
Base.take!方法
take!(c::Channel)

按顺序从 Channel 中移除并返回一个值。阻塞,直到数据可用。对于无缓冲通道,会阻塞,直到另一个任务执行 put!

示例

带缓冲通道

julia> c = Channel(1);

julia> put!(c, 1);

julia> take!(c)
1

无缓冲通道

julia> c = Channel(0);

julia> task = Task(() -> put!(c, 1));

julia> schedule(task);

julia> take!(c)
1
源代码
Base.isready方法
isready(c::Channel)

确定 Channel 中是否存储了一个值。立即返回,不会阻塞。

对于无缓冲通道,如果存在任务正在等待 put!,则返回 true

示例

带缓冲通道

julia> c = Channel(1);

julia> isready(c)
false

julia> put!(c, 1);

julia> isready(c)
true

无缓冲通道

julia> c = Channel();

julia> isready(c)  # no tasks waiting to put!
false

julia> task = Task(() -> put!(c, 1));

julia> schedule(task);  # schedule a put! task

julia> isready(c)
true
源代码
Base.fetch方法
fetch(c::Channel)

等待并返回(不移除)Channel 中第一个可用的项目。注意:fetch 在无缓冲(0 大小)Channel 上不受支持。

示例

带缓冲通道

julia> c = Channel(3) do ch
           foreach(i -> put!(ch, i), 1:3)
       end;

julia> fetch(c)
1

julia> collect(c)  # item is not removed
3-element Vector{Any}:
 1
 2
 3
源代码
Base.close方法
close(c::Channel[, excp::Exception])

关闭一个通道。由

  • 封闭通道上的 put! 抛出一个异常(可选地由 excp 给出)。
  • 空、封闭通道上的 take!fetch
源代码
Base.bind方法
bind(chnl::Channel, task::Task)

chnl 的生命周期与一个任务关联起来。当任务终止时,Channel chnl 会自动关闭。任务中的任何未捕获异常都会传播到 chnl 上的所有等待者。

chnl 对象可以独立于任务终止而显式关闭。终止任务不会影响已关闭的 Channel 对象。

当一个通道绑定到多个任务时,第一个终止的任务将关闭通道。当多个通道绑定到同一个任务时,任务的终止将关闭所有绑定的通道。

示例

julia> c = Channel(0);

julia> task = @async foreach(i->put!(c, i), 1:4);

julia> bind(c,task);

julia> for i in c
           @show i
       end;
i = 1
i = 2
i = 3
i = 4

julia> isopen(c)
false
julia> c = Channel(0);

julia> task = @async (put!(c, 1); error("foo"));

julia> bind(c, task);

julia> take!(c)
1

julia> put!(c, 1);
ERROR: TaskFailedException
Stacktrace:
[...]
    nested task error: foo
[...]
源代码

使用 schedulewait 的低级同步

对尚未启动(调度)的 Task 使用 schedule 的最简单正确方法。但是,可以使用 schedulewait 作为非常低级的构建块来构建同步接口。调用 schedule(task) 的关键先决条件是调用者必须“拥有”task;即它必须知道给定 task 中的 wait 调用正在调用 schedule(task) 的代码已知的位置发生。确保此先决条件的一种策略是使用原子,如以下示例所示

@enum OWEState begin
    OWE_EMPTY
    OWE_WAITING
    OWE_NOTIFYING
end

mutable struct OneWayEvent
    @atomic state::OWEState
    task::Task
    OneWayEvent() = new(OWE_EMPTY)
end

function Base.notify(ev::OneWayEvent)
    state = @atomic ev.state
    while state !== OWE_NOTIFYING
        # Spin until we successfully update the state to OWE_NOTIFYING:
        state, ok = @atomicreplace(ev.state, state => OWE_NOTIFYING)
        if ok
            if state == OWE_WAITING
                # OWE_WAITING -> OWE_NOTIFYING transition means that the waiter task is
                # already waiting or about to call `wait`. The notifier task must wake up
                # the waiter task.
                schedule(ev.task)
            else
                @assert state == OWE_EMPTY
                # Since we are assuming that there is only one notifier task (for
                # simplicity), we know that the other possible case here is OWE_EMPTY.
                # We do not need to do anything because we know that the waiter task has
                # not called `wait(ev::OneWayEvent)` yet.
            end
            break
        end
    end
    return
end

function Base.wait(ev::OneWayEvent)
    ev.task = current_task()
    state, ok = @atomicreplace(ev.state, OWE_EMPTY => OWE_WAITING)
    if ok
        # OWE_EMPTY -> OWE_WAITING transition means that the notifier task is guaranteed to
        # invoke OWE_WAITING -> OWE_NOTIFYING transition.  The waiter task must call
        # `wait()` immediately.  In particular, it MUST NOT invoke any function that may
        # yield to the scheduler at this point in code.
        wait()
    else
        @assert state == OWE_NOTIFYING
        # Otherwise, the `state` must have already been moved to OWE_NOTIFYING by the
        # notifier task.
    end
    return
end

ev = OneWayEvent()
@sync begin
    @async begin
        wait(ev)
        println("done")
    end
    println("notifying...")
    notify(ev)
end

# output
notifying...
done

OneWayEvent 允许一个任务等待另一个任务的 notify。它是一个有限的通信接口,因为 wait 只能从单个任务中使用一次(请注意 ev.task 的非原子赋值)

在此示例中,notify(ev::OneWayEvent) 仅当它将状态从 OWE_WAITING 修改为 OWE_NOTIFYING 时才允许调用 schedule(ev.task)。这让我们知道执行 wait(ev::OneWayEvent) 的任务现在位于 ok 分支中,并且不可能存在其他尝试 schedule(ev.task) 的任务,因为它们的 @atomicreplace(ev.state, state => OWE_NOTIFYING) 会失败。