任务
Core.Task
— 类型Task(func)
创建一个 Task
(即协程) 来执行给定的函数 func
(该函数必须是可调用且不带参数的)。当此函数返回时,任务退出。该任务将在构造时从父任务运行在 "世界年龄" 中,并在被 schedule
时运行。
默认情况下,任务的粘性位将设置为 true
(t.sticky
)。这模拟了 @async
的历史默认值。粘性任务只能在其首次调度到的工作线程上运行。要获得 Threads.@spawn
的行为,请手动将粘性位设置为 false
。
示例
julia> a() = sum(i for i in 1:1000);
julia> b = Task(a);
在此示例中,b
是一个可运行的 Task
,但尚未开始运行。
Base.@task
— 宏@task
将表达式包装在一个 Task
中,但不执行它,并返回 Task
。这只会创建一个任务,而不会运行它。
示例
julia> a1() = sum(i for i in 1:1000);
julia> b = @task a1();
julia> istaskstarted(b)
false
julia> schedule(b);
julia> yield();
julia> istaskdone(b)
true
Base.@async
— 宏@async
将表达式包装在一个 Task
中,并将其添加到本地机器的调度程序队列中。
可以使用 $
将值插值到 @async
中,这会将值直接复制到构造的底层闭包中。这允许您插入变量的 *值*,从而将异步代码与当前任务中变量值的更改隔离开来。
强烈建议始终优先使用 Threads.@spawn
而不是 @async
,即使不需要并行,特别是在公开发布的库中。这是因为使用 @async
会在 Julia 的当前实现中禁用 *父* 任务在工作线程之间的迁移。因此,在库函数中看似无害地使用 @async
会对用户应用程序中非常不同的部分的性能产生重大影响。
从 Julia 1.4 开始,可以使用 $
插值值。
Base.asyncmap
— 函数asyncmap(f, c...; ntasks=0, batch_size=nothing)
使用多个并发任务将 f
映射到集合 (或多个等长集合) 上。对于多个集合参数,f
将按元素应用。
ntasks
指定并发运行的任务数量。根据集合的长度,如果未指定 ntasks
,则最多将使用 100 个任务进行并发映射。
ntasks
也可以指定为一个零参数函数。在这种情况下,将在处理每个元素之前检查并行运行的任务数量,如果 ntasks_func
的值大于当前任务数量,则会启动一个新任务。
如果指定了 batch_size
,则集合将以批处理模式进行处理。在这种情况下,f
必须是一个函数,该函数必须接受一个参数元组的 Vector
并且必须返回一个结果向量。输入向量将具有 batch_size
或更小的长度。
以下示例通过返回执行映射函数的任务的 objectid
来突出显示在不同任务中的执行。
首先,当 ntasks
未定义时,每个元素都在不同的任务中处理。
julia> tskoid() = objectid(current_task());
julia> asyncmap(x->tskoid(), 1:5)
5-element Array{UInt64,1}:
0x6e15e66c75c75853
0x440f8819a1baa682
0x9fb3eeadd0c83985
0xebd3e35fe90d4050
0x29efc93edce2b961
julia> length(unique(asyncmap(x->tskoid(), 1:5)))
5
当 ntasks=2
时,所有元素都在 2 个任务中处理。
julia> asyncmap(x->tskoid(), 1:5; ntasks=2)
5-element Array{UInt64,1}:
0x027ab1680df7ae94
0xa23d2f80cd7cf157
0x027ab1680df7ae94
0xa23d2f80cd7cf157
0x027ab1680df7ae94
julia> length(unique(asyncmap(x->tskoid(), 1:5; ntasks=2)))
2
当 batch_size
定义时,映射函数需要更改为接受一个参数元组数组并返回一个结果数组。map
在修改后的映射函数中使用以实现此目的。
julia> batch_func(input) = map(x->string("args_tuple: ", x, ", element_val: ", x[1], ", task: ", tskoid()), input)
batch_func (generic function with 1 method)
julia> asyncmap(batch_func, 1:5; ntasks=2, batch_size=2)
5-element Array{String,1}:
"args_tuple: (1,), element_val: 1, task: 9118321258196414413"
"args_tuple: (2,), element_val: 2, task: 4904288162898683522"
"args_tuple: (3,), element_val: 3, task: 9118321258196414413"
"args_tuple: (4,), element_val: 4, task: 4904288162898683522"
"args_tuple: (5,), element_val: 5, task: 9118321258196414413"
Base.asyncmap!
— 函数asyncmap!(f, results, c...; ntasks=0, batch_size=nothing)
类似于 asyncmap
,但将输出存储在 results
中,而不是返回一个集合。
当任何被修改的参数与任何其他参数共享内存时,行为可能出乎意料。
Base.current_task
— 函数current_task()
获取当前正在运行的 Task
。
Base.istaskdone
— 函数istaskdone(t::Task) -> Bool
确定任务是否已退出。
示例
julia> a2() = sum(i for i in 1:1000);
julia> b = Task(a2);
julia> istaskdone(b)
false
julia> schedule(b);
julia> yield();
julia> istaskdone(b)
true
Base.istaskstarted
— 函数istaskstarted(t::Task) -> Bool
确定任务是否已开始执行。
示例
julia> a3() = sum(i for i in 1:1000);
julia> b = Task(a3);
julia> istaskstarted(b)
false
Base.istaskfailed
— 函数istaskfailed(t::Task) -> Bool
确定任务是否已退出,因为抛出了异常。
示例
julia> a4() = error("task failed");
julia> b = Task(a4);
julia> istaskfailed(b)
false
julia> schedule(b);
julia> yield();
julia> istaskfailed(b)
true
此函数需要至少为 Julia 1.3。
Base.task_local_storage
— 方法task_local_storage(key)
在当前任务的任务局部存储中查找键的值。
Base.task_local_storage
— 方法task_local_storage(key, value)
在当前任务的任务局部存储中为键分配一个值。
Base.task_local_storage
— 方法task_local_storage(body, key, value)
使用修改后的任务局部存储调用函数 body
,其中 value
分配给 key
;之后会恢复 key
的先前值 (如果存在)。对于模拟动态作用域很有用。
调度
Base.yield
— 函数yield()
切换到调度程序,以允许另一个已调度任务运行。调用此函数的任务仍然是可运行的,如果没有任何其他可运行的任务,它将立即重新启动。
yield(t::Task, arg = nothing)
schedule(t, arg); yield()
的快速、不公平调度版本,在调用调度程序之前立即让出给 t
。
Base.yieldto
— 函数yieldto(t::Task, arg = nothing)
切换到给定的任务。任务第一次被切换时,将使用无参数调用该任务的函数。在后续切换中,arg
将从该任务对 yieldto
的最后一次调用中返回。这是一个低级调用,它只切换任务,而不考虑状态或调度。不鼓励使用它。
Base.sleep
— 函数sleep(seconds)
阻塞当前任务指定的秒数。最短睡眠时间为 1 毫秒或 0.001
的输入。
Base.schedule
— 函数schedule(t::Task, [val]; error=false)
将一个 Task
添加到调度程序的队列中。这会导致该任务在系统空闲时不断运行,除非该任务执行阻塞操作,例如 wait
。
如果提供第二个参数 val
,它将在任务再次运行时传递给该任务 (通过 yieldto
的返回值)。如果 error
为 true
,则该值将在唤醒的任务中作为异常引发。
对已启动的任意 Task
使用 schedule
是不正确的。有关更多信息,请参阅 API 参考。
示例
julia> a5() = sum(i for i in 1:1000);
julia> b = Task(a5);
julia> istaskstarted(b)
false
julia> schedule(b);
julia> yield();
julia> istaskstarted(b)
true
julia> istaskdone(b)
true
同步
Base.errormonitor
— 函数errormonitor(t::Task)
如果任务 t
失败,则将错误日志打印到 stderr
。
示例
julia> Base._wait(errormonitor(Threads.@spawn error("task failed")))
Unhandled Task ERROR: task failed
Stacktrace:
[...]
Base.@sync
— 宏@sync
等待所有词法上封闭的 @async
、@spawn
、@spawnat
和 @distributed
使用完成。由封闭异步操作引发的所有异常将被收集并作为 CompositeException
抛出。
示例
julia> Threads.nthreads()
4
julia> @sync begin
Threads.@spawn println("Thread-id $(Threads.threadid()), task 1")
Threads.@spawn println("Thread-id $(Threads.threadid()), task 2")
end;
Thread-id 3, task 1
Thread-id 1, task 2
Base.wait
— 函数关于 Threads.Condition
的特别说明
调用者必须持有拥有 Threads.Condition
的 lock
,然后才能调用此方法。调用任务将被阻塞,直到其他任务唤醒它,通常通过调用同一个 Threads.Condition
对象上的 notify
。锁将在阻塞时原子地释放(即使它被递归地锁定),并且将在返回之前重新获取。
wait(r::Future)
等待指定 Future
的值变为可用。
wait(r::RemoteChannel, args...)
等待指定 RemoteChannel
上的值变为可用。
wait([x])
阻塞当前任务,直到发生某些事件,具体取决于参数的类型
Channel
: 等待通道上追加一个值。Condition
: 等待条件上的notify
并返回传递给notify
的val
参数。在条件上等待还可以传递first=true
,这会导致等待者在notify
上唤醒时排在第一位,而不是通常的先进先出行为。Process
: 等待进程或进程链退出。进程的exitcode
字段可用于确定成功或失败。Task
: 等待Task
完成。如果任务因异常而失败,则会抛出TaskFailedException
(它包装了失败的任务)。RawFD
: 等待文件描述符上的更改(参见FileWatching
包)。
如果没有传递参数,则任务将无限期阻塞。只能通过显式调用 schedule
或 yieldto
重新启动任务。
通常在 while
循环中调用 wait
以确保在继续之前满足等待的条件。
wait(c::Channel)
阻塞,直到 Channel
isready
。
julia> c = Channel(1);
julia> isready(c)
false
julia> task = Task(() -> wait(c));
julia> schedule(task);
julia> istaskdone(task) # task is blocked because channel is not ready
false
julia> put!(c, 1);
julia> istaskdone(task) # task is now unblocked
true
Base.fetch
— 方法fetch(t::Task)
等待 Task
完成,然后返回其结果值。如果任务因异常而失败,则会抛出 TaskFailedException
(它包装了失败的任务)。
Base.fetch
— 方法fetch(x::Any)
返回 x
。
Base.timedwait
— 函数timedwait(testcb, timeout::Real; pollint::Real=0.1)
等待直到 testcb()
返回 true
或 timeout
秒过去,以先发生者为准。测试函数每 pollint
秒轮询一次。pollint
的最小值为 0.001 秒,即 1 毫秒。
返回 :ok
或 :timed_out
。
Base.Condition
— 类型Condition()
创建一个边沿触发的事件源,任务可以等待该事件源。调用 wait
等待 Condition
的任务将被挂起并排队。当 notify
稍后在 Condition
上被调用时,任务会被唤醒。边沿触发意味着只有在调用 notify
时正在等待的任务才能被唤醒。对于电平触发的通知,您必须保持额外的状态来跟踪通知是否发生。Channel
和 Threads.Event
类型会执行此操作,并且可以用于电平触发的事件。
此对象不是线程安全的。有关线程安全版本的详细信息,请参见 Threads.Condition
。
Base.Threads.Condition
— 类型Threads.Condition([lock])
Base.Condition
的线程安全版本。
要调用 Threads.Condition
上的 wait
或 notify
,您必须先调用其上的 lock
。当调用 wait
时,锁将在阻塞期间原子地释放,并且将在 wait
返回之前重新获取。因此,Threads.Condition
c
的惯用方式如下所示
lock(c)
try
while !thing_we_are_waiting_for
wait(c)
end
finally
unlock(c)
end
此功能需要至少 Julia 1.2。
Base.Event
— 类型Event([autoreset=false])
创建一个电平触发的事件源。调用 wait
等待 Event
的任务将被挂起并排队,直到在 Event
上调用 notify
。在调用 notify
之后,Event
将保持在已发出信号的状态,并且任务将不再阻塞等待它,直到调用 reset
。
如果 autoreset
为真,则对于每次调用 notify
,最多只有一个任务会从 wait
中释放。
这在 notify/wait 上提供了获取和释放内存排序。
此功能需要至少 Julia 1.1。
autoreset
功能和内存排序保证需要至少 Julia 1.8。
Base.notify
— 函数notify(condition, val=nothing; all=true, error=false)
唤醒等待条件的任务,并将 val
传递给它们。如果 all
为 true
(默认值),则唤醒所有等待的任务,否则只唤醒一个任务。如果 error
为 true
,则传递的值将在唤醒的任务中作为异常引发。
返回唤醒的任务数量。如果 condition
上没有任务在等待,则返回 0。
Base.reset
— 方法Base.Semaphore
— 类型Semaphore(sem_size)
创建一个计数信号量,该信号量允许最多 sem_size
个获取在任何时间处于使用状态。每次获取都必须与一个释放相匹配。
这在 acquire/release 调用上提供了获取和释放内存排序。
Base.acquire
— 函数acquire(s::Semaphore)
等待 sem_size
个许可证之一可用,直到可以获取一个许可证才阻塞。
acquire(f, s::Semaphore)
在从信号量 s
获取许可证后执行 f
,并在完成或发生错误时释放许可证。
例如,一种 do-block 形式,它确保 foo
的调用只有 2 个同时处于活动状态。
s = Base.Semaphore(2)
@sync for _ in 1:100
Threads.@spawn begin
Base.acquire(s) do
foo()
end
end
end
此方法需要至少 Julia 1.8。
Base.release
— 函数release(s::Semaphore)
将一个许可证返回到池中,可能允许另一个任务获取它并恢复执行。
Base.AbstractLock
— 类型Base.lock
— 函数lock(f::Function, lock)
获取锁,以持有锁的方式执行 f
,并在 f
返回时释放锁。如果锁已被其他任务/线程锁定,则等待它变为可用。
当此函数返回时,锁已被释放,因此调用者不应尝试再次释放它。
将 Channel
作为第二个参数需要 Julia 1.7 或更高版本。
Base.unlock
— 函数unlock(lock)
释放对锁的所有权。
如果这是一个以前被获取的递归锁,则递减内部计数器并立即返回。
Base.trylock
— 函数trylock(lock) -> Success (Boolean)
如果锁可用,则获取锁,如果成功,则返回 true
。如果锁已被其他任务/线程锁定,则返回 false
。
每个成功的 trylock
都必须与一个 unlock
相匹配。
函数 trylock
与 islocked
相结合可以用于编写测试和测试和设置或指数回退算法,如果 typeof(lock)
支持它(请阅读其文档)。
Base.islocked
— 函数islocked(lock) -> Status (Boolean)
检查锁是否被任何任务/线程持有。此函数本身不应用于同步。但是,islocked
与 trylock
相结合可以用于编写测试和测试和设置或指数回退算法,如果 typeof(lock)
支持它(请阅读其文档)。
扩展帮助
例如,如果锁实现满足下面记录的属性,则指数回退可以按如下方式实现。
nspins = 0
while true
while islocked(lock)
GC.safepoint()
nspins += 1
nspins > LIMIT && error("timeout")
end
trylock(lock) && break
backoff()
end
实现
建议锁实现定义 islocked
具有以下属性,并在其文档字符串中说明。
islocked(lock)
是无数据竞争的。- 如果
islocked(lock)
返回false
,则如果没有任何其他任务的干扰,trylock(lock)
的立即调用必须成功(返回true
)。
Base.ReentrantLock
— 类型ReentrantLock()
创建一个可重入锁,用于同步 Task
。同一个任务可以根据需要多次获取锁(这就是名称中“可重入”部分的含义)。每个 lock
都必须与一个 unlock
相匹配。
调用 'lock' 还会阻止在该线程上运行终结器,直到相应的 'unlock'。应该自然地支持下面说明的标准锁模式,但要注意反转 try/lock 的顺序或完全缺少 try 块(例如,尝试在仍持有锁的情况下返回)
这在 lock/unlock 调用上提供了获取和释放内存排序。
lock(l)
try
<atomic work>
finally
unlock(l)
end
如果 !islocked(lck::ReentrantLock)
成立,则 trylock(lck)
将成功,除非有其他任务试图“同时”持有锁。
通道
Base.AbstractChannel
— 类型AbstractChannel{T}
表示传递类型为 T
的对象的通道。
Base.Channel
— 类型Channel{T=Any}(size::Int=0)
构造一个 Channel
,其内部缓冲区可以容纳最大 size
个类型为 T
的对象。对已满通道的 put!
调用会阻塞,直到使用 take!
移除一个对象。
Channel(0)
构造一个无缓冲通道。put!
会阻塞,直到调用匹配的 take!
。反之亦然。
其他构造函数
Channel()
:默认构造函数,等效于Channel{Any}(0)
Channel(Inf)
:等效于Channel{Any}(typemax(Int))
Channel(sz)
:等效于Channel{Any}(sz)
默认构造函数 Channel()
和默认 size=0
在 Julia 1.3 中添加。
Base.Channel
— 方法Channel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false, threadpool=nothing)
从 func
创建一个新的任务,将其绑定到一个新的类型为 T
、大小为 size
的通道,并在一次调用中调度任务。通道在任务终止时自动关闭。
func
必须接受绑定的通道作为其唯一参数。
如果您需要创建的任务的引用,请通过关键字参数 taskref
传递一个 Ref{Task}
对象。
如果 spawn=true
,则为 func
创建的 Task
可能在另一个线程上并行调度,等效于通过 Threads.@spawn
创建任务。
如果 spawn=true
且 threadpool
参数未设置,则默认值为 :default
。
如果 threadpool
参数设置(为 :default
或 :interactive
),则意味着 spawn=true
,新任务将被生成到指定的线程池中。
返回一个 Channel
。
示例
julia> chnl = Channel() do ch
foreach(i -> put!(ch, i), 1:4)
end;
julia> typeof(chnl)
Channel{Any}
julia> for i in chnl
@show i
end;
i = 1
i = 2
i = 3
i = 4
引用创建的任务
julia> taskref = Ref{Task}();
julia> chnl = Channel(taskref=taskref) do ch
println(take!(ch))
end;
julia> istaskdone(taskref[])
false
julia> put!(chnl, "Hello");
Hello
julia> istaskdone(taskref[])
true
spawn=
参数在 Julia 1.3 中添加。此构造函数在 Julia 1.3 中添加。在早期版本的 Julia 中,Channel 使用关键字参数来设置 size
和 T
,但这些构造函数已弃用。
threadpool=
参数在 Julia 1.9 中添加。
julia> chnl = Channel{Char}(1, spawn=true) do ch
for c in "hello world"
put!(ch, c)
end
end
Channel{Char}(1) (2 items available)
julia> String(collect(chnl))
"hello world"
Base.put!
— 方法put!(c::Channel, v)
将项目 v
附加到通道 c
。如果通道已满,则会阻塞。
对于无缓冲通道,会阻塞,直到另一个任务执行 take!
。
现在,v
会使用 convert
转换为通道的类型,因为在调用 put!
时。
Base.take!
— 方法take!(c::Channel)
按顺序从 Channel
中移除并返回一个值。阻塞,直到数据可用。对于无缓冲通道,会阻塞,直到另一个任务执行 put!
。
示例
带缓冲通道
julia> c = Channel(1);
julia> put!(c, 1);
julia> take!(c)
1
无缓冲通道
julia> c = Channel(0);
julia> task = Task(() -> put!(c, 1));
julia> schedule(task);
julia> take!(c)
1
Base.isready
— 方法isready(c::Channel)
确定 Channel
中是否存储了一个值。立即返回,不会阻塞。
对于无缓冲通道,如果存在任务正在等待 put!
,则返回 true
。
示例
带缓冲通道
julia> c = Channel(1);
julia> isready(c)
false
julia> put!(c, 1);
julia> isready(c)
true
无缓冲通道
julia> c = Channel();
julia> isready(c) # no tasks waiting to put!
false
julia> task = Task(() -> put!(c, 1));
julia> schedule(task); # schedule a put! task
julia> isready(c)
true
Base.fetch
— 方法fetch(c::Channel)
等待并返回(不移除)Channel
中第一个可用的项目。注意:fetch
在无缓冲(0 大小)Channel
上不受支持。
示例
带缓冲通道
julia> c = Channel(3) do ch
foreach(i -> put!(ch, i), 1:3)
end;
julia> fetch(c)
1
julia> collect(c) # item is not removed
3-element Vector{Any}:
1
2
3
Base.close
— 方法close(c::Channel[, excp::Exception])
关闭一个通道。由
Base.bind
— 方法bind(chnl::Channel, task::Task)
将 chnl
的生命周期与一个任务关联起来。当任务终止时,Channel
chnl
会自动关闭。任务中的任何未捕获异常都会传播到 chnl
上的所有等待者。
chnl
对象可以独立于任务终止而显式关闭。终止任务不会影响已关闭的 Channel
对象。
当一个通道绑定到多个任务时,第一个终止的任务将关闭通道。当多个通道绑定到同一个任务时,任务的终止将关闭所有绑定的通道。
示例
julia> c = Channel(0);
julia> task = @async foreach(i->put!(c, i), 1:4);
julia> bind(c,task);
julia> for i in c
@show i
end;
i = 1
i = 2
i = 3
i = 4
julia> isopen(c)
false
julia> c = Channel(0);
julia> task = @async (put!(c, 1); error("foo"));
julia> bind(c, task);
julia> take!(c)
1
julia> put!(c, 1);
ERROR: TaskFailedException
Stacktrace:
[...]
nested task error: foo
[...]
使用 schedule
和 wait
的低级同步
对尚未启动(调度)的 Task
使用 schedule
的最简单正确方法。但是,可以使用 schedule
和 wait
作为非常低级的构建块来构建同步接口。调用 schedule(task)
的关键先决条件是调用者必须“拥有”task
;即它必须知道给定 task
中的 wait
调用正在调用 schedule(task)
的代码已知的位置发生。确保此先决条件的一种策略是使用原子,如以下示例所示
@enum OWEState begin
OWE_EMPTY
OWE_WAITING
OWE_NOTIFYING
end
mutable struct OneWayEvent
@atomic state::OWEState
task::Task
OneWayEvent() = new(OWE_EMPTY)
end
function Base.notify(ev::OneWayEvent)
state = @atomic ev.state
while state !== OWE_NOTIFYING
# Spin until we successfully update the state to OWE_NOTIFYING:
state, ok = @atomicreplace(ev.state, state => OWE_NOTIFYING)
if ok
if state == OWE_WAITING
# OWE_WAITING -> OWE_NOTIFYING transition means that the waiter task is
# already waiting or about to call `wait`. The notifier task must wake up
# the waiter task.
schedule(ev.task)
else
@assert state == OWE_EMPTY
# Since we are assuming that there is only one notifier task (for
# simplicity), we know that the other possible case here is OWE_EMPTY.
# We do not need to do anything because we know that the waiter task has
# not called `wait(ev::OneWayEvent)` yet.
end
break
end
end
return
end
function Base.wait(ev::OneWayEvent)
ev.task = current_task()
state, ok = @atomicreplace(ev.state, OWE_EMPTY => OWE_WAITING)
if ok
# OWE_EMPTY -> OWE_WAITING transition means that the notifier task is guaranteed to
# invoke OWE_WAITING -> OWE_NOTIFYING transition. The waiter task must call
# `wait()` immediately. In particular, it MUST NOT invoke any function that may
# yield to the scheduler at this point in code.
wait()
else
@assert state == OWE_NOTIFYING
# Otherwise, the `state` must have already been moved to OWE_NOTIFYING by the
# notifier task.
end
return
end
ev = OneWayEvent()
@sync begin
@async begin
wait(ev)
println("done")
end
println("notifying...")
notify(ev)
end
# output
notifying...
done
OneWayEvent
允许一个任务等待另一个任务的 notify
。它是一个有限的通信接口,因为 wait
只能从单个任务中使用一次(请注意 ev.task
的非原子赋值)
在此示例中,notify(ev::OneWayEvent)
仅当它将状态从 OWE_WAITING
修改为 OWE_NOTIFYING
时才允许调用 schedule(ev.task)
。这让我们知道执行 wait(ev::OneWayEvent)
的任务现在位于 ok
分支中,并且不可能存在其他尝试 schedule(ev.task)
的任务,因为它们的 @atomicreplace(ev.state, state => OWE_NOTIFYING)
会失败。