多线程
Base.Threads.@threads
— 宏Threads.@threads [schedule] for ... end
一个宏,用于并行执行 for
循环。迭代空间被分配给粗粒度任务。此策略可以通过 schedule
参数指定。循环的执行将等待所有迭代的评估完成。
另请参阅:@spawn
和 Distributed
中的 pmap
。
扩展帮助
语义
除非调度选项指定了更强的保证,否则 @threads
宏执行的循环具有以下语义。
@threads
宏以不确定的顺序和潜在的并发方式执行循环体。它不指定任务和工作线程的确切分配。每次执行的分配可能不同。循环体代码(包括从其传递调用到的任何代码)不得对迭代分配给任务或执行它们的 worker 线程做出任何假设。每次迭代的循环体必须能够独立于其他迭代取得进展,并且必须没有数据竞争。因此,跨迭代的无效同步可能会导致死锁,而未同步的内存访问可能会导致未定义的行为。
例如,上述条件意味着
- 在一次迭代中获取的锁 *必须* 在同一迭代中释放。
- 使用
Channel
等阻塞原语在迭代之间进行通信是不正确的。 - 仅写入未跨迭代共享的位置(除非使用锁或原子操作)。
- 除非使用
:static
调度,否则threadid()
的值即使在单个迭代中也可能发生变化。见任务迁移
。
调度器
如果没有调度器参数,则确切的调度是未指定的,并且在不同的 Julia 版本中有所不同。目前,当没有指定调度器时,将使用 :dynamic
。
schedule
参数从 Julia 1.5 开始可用。
:dynamic
(默认)
:dynamic
调度器以动态方式将迭代执行到可用的工作线程。当前实现假定每次迭代的工作负载是均匀的。但是,此假设可能会在将来移除。
此调度选项只是一个对底层执行机制的提示。但是,可以预期一些属性。:dynamic
调度器使用的 Task
数受可用工作线程数(Threads.threadpoolsize()
)的小常数倍数的限制。每个任务处理迭代空间的连续区域。因此,@threads :dynamic for x in xs; f(x); end
通常比 @sync for x in xs; @spawn f(x); end
更有效,如果 length(xs)
显著大于工作线程数,并且 f(x)
的运行时间相对小于生成和同步任务的成本(通常小于 10 微秒)。
schedule
参数的 :dynamic
选项可用,并且是 Julia 1.8 中的默认选项。
:static
:static
调度器为每个线程创建一个任务,并将迭代在它们之间平均分配,并将每个任务专门分配给每个线程。特别是,threadid()
的值保证在一次迭代中保持不变。如果从另一个 @threads
循环中或从除 1 以外的线程中使用 :static
,则会出错。
:static
调度是为了支持 Julia 1.3 之前编写的代码的转换。在新编写的库函数中,不建议使用 :static
调度,因为使用此选项的函数不能从任意工作线程调用。
示例
为了说明不同的调度策略,考虑以下函数 busywait
,它包含一个不屈服的计时循环,该循环运行特定秒数。
julia> function busywait(seconds)
tstart = time_ns()
while (time_ns() - tstart) / 1e9 < seconds
end
end
julia> @time begin
Threads.@spawn busywait(5)
Threads.@threads :static for i in 1:Threads.threadpoolsize()
busywait(1)
end
end
6.003001 seconds (16.33 k allocations: 899.255 KiB, 0.25% compilation time)
julia> @time begin
Threads.@spawn busywait(5)
Threads.@threads :dynamic for i in 1:Threads.threadpoolsize()
busywait(1)
end
end
2.012056 seconds (16.05 k allocations: 883.919 KiB, 0.66% compilation time)
:dynamic
示例需要 2 秒,因为一个空闲线程能够运行两个 1 秒迭代以完成 for 循环。
Base.Threads.foreach
— 函数Threads.foreach(f, channel::Channel;
schedule::Threads.AbstractSchedule=Threads.FairSchedule(),
ntasks=Threads.threadpoolsize())
类似于 foreach(f, channel)
,但对 channel
的迭代和对 f
的调用由 Threads.@spawn
生成的 ntasks
个任务分割。此函数将在返回之前等待所有内部生成的的任务完成。
如果 schedule isa FairSchedule
,则 Threads.foreach
将尝试以一种允许 Julia 调度程序更自由地在线程之间负载平衡工作项的方式生成任务。这种方法通常具有更高的每项开销,但与其他多线程工作负载一起使用时,其性能可能比 StaticSchedule
更好。
如果 schedule isa StaticSchedule
,则 Threads.foreach
将以一种比 FairSchedule
具有更低的每项开销的方式生成任务,但不太适合负载平衡。因此,这种方法可能更适合细粒度、均匀的工作负载,但在与其他多线程工作负载一起使用时,其性能可能比 FairSchedule
更差。
示例
julia> n = 20
julia> c = Channel{Int}(ch -> foreach(i -> put!(ch, i), 1:n), 1)
julia> d = Channel{Int}(n) do ch
f = i -> put!(ch, i^2)
Threads.foreach(f, c)
end
julia> collect(d)
collect(d) = [1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400]
此函数需要 Julia 1.6 或更高版本。
Base.Threads.@spawn
— 宏Threads.@spawn [:default|:interactive] expr
创建一个 Task
并将其 schedule
到指定的线程池中的任何可用线程上(如果未指定,则为 :default
)。一旦可用,任务将被分配给一个线程。要等待任务完成,请在该宏的结果上调用 wait
,或者调用 fetch
以等待并获取其返回值。
值可以通过 $
插入 @spawn
,这会将值直接复制到构造的底层闭包中。这允许您插入变量的 *值*,将异步代码与当前任务中变量值的更改隔离开来。
此宏从 Julia 1.3 开始可用。
通过 $
插入值从 Julia 1.4 开始可用。
从 Julia 1.9 开始可以指定线程池。
示例
julia> t() = println("Hello from ", Threads.threadid());
julia> tasks = fetch.([Threads.@spawn t() for i in 1:4]);
Hello from 1
Hello from 1
Hello from 3
Hello from 4
Base.Threads.threadid
— 函数Threads.threadid() -> Int
获取当前执行线程的 ID 号。主线程的 ID 为 1
。
示例
julia> Threads.threadid()
1
julia> Threads.@threads for i in 1:4
println(Threads.threadid())
end
4
2
5
4
如果任务屈服,则任务运行的线程可能会发生变化,这被称为 任务迁移
。出于这个原因,在大多数情况下,使用 threadid()
来索引到例如缓冲区或有状态对象的向量中是不安全的。
Base.Threads.maxthreadid
— 函数Threads.maxthreadid() -> Int
获取 Julia 进程可用的线程数(跨所有线程池)的下限,具有原子获取语义。结果始终大于或等于 threadid()
以及您在调用 maxthreadid
之前能够观察到的任何任务的 threadid(task)
。
Base.Threads.nthreads
— 函数Threads.nthreads(:default | :interactive) -> Int
获取指定线程池中的当前线程数。:interactive
中的线程的 ID 号为 1:nthreads(:interactive)
,:default
中的线程的 ID 号为 nthreads(:interactive) .+ (1:nthreads(:default))
。
另请参阅 LinearAlgebra
标准库中的 BLAS.get_num_threads
和 BLAS.set_num_threads
,以及 Distributed
标准库中的 nprocs()
和 Threads.maxthreadid()
。
Base.Threads.threadpool
— 函数Threads.threadpool(tid = threadid()) -> Symbol
返回指定线程的线程池;:default
、:interactive
或 :foreign
之一。
Base.Threads.nthreadpools
— 函数Threads.nthreadpools() -> Int
返回当前配置的线程池数量。
Base.Threads.threadpoolsize
— 函数Threads.threadpoolsize(pool::Symbol = :default) -> Int
获取默认线程池(或指定线程池)可用的线程数量。
另请参阅:LinearAlgebra
标准库中的 BLAS.get_num_threads
和 BLAS.set_num_threads
,以及 Distributed
标准库中的 nprocs()
。
Base.Threads.ngcthreads
— 函数Threads.ngcthreads() -> Int
返回当前配置的 GC 线程数量。这包括标记线程和并发扫描线程。
另请参阅 多线程。
原子操作
atomic
— 关键字不安全指针操作与分别在 C11 和 C++23 中使用 _Atomic
和 std::atomic
类型声明的指针加载和存储兼容。如果不存在对原子加载 Julia 类型 T
的支持,可能会抛出错误。
另请参阅:unsafe_load
、unsafe_modify!
、unsafe_replace!
、unsafe_store!
、unsafe_swap!
Base.@atomic
— 宏@atomic var
@atomic order ex
如果 ex
是支持的表达式,则将 var
或 ex
标记为以原子方式执行。如果没有指定 order
,则默认为 :sequentially_consistent
。
@atomic a.b.x = new
@atomic a.b.x += addend
@atomic :release a.b.x = new
@atomic :acquire_release a.b.x += addend
以原子方式执行在右侧表达的存储操作并返回新值。
使用 =
时,此操作转换为 setproperty!(a.b, :x, new)
调用。使用任何运算符时,此操作也将转换为 modifyproperty!(a.b, :x, +, addend)[2]
调用。
@atomic a.b.x max arg2
@atomic a.b.x + arg2
@atomic max(a.b.x, arg2)
@atomic :acquire_release max(a.b.x, arg2)
@atomic :acquire_release a.b.x + arg2
@atomic :acquire_release a.b.x max arg2
以原子方式执行在右侧表达的二元操作。将结果存储到第一个参数中的字段并返回值 (old, new)
。
此操作转换为 modifyproperty!(a.b, :x, func, arg2)
调用。
有关更多详细信息,请参阅手册中的 每个字段原子操作 部分。
示例
julia> mutable struct Atomic{T}; @atomic x::T; end
julia> a = Atomic(1)
Atomic{Int64}(1)
julia> @atomic a.x # fetch field x of a, with sequential consistency
1
julia> @atomic :sequentially_consistent a.x = 2 # set field x of a, with sequential consistency
2
julia> @atomic a.x += 1 # increment field x of a, with sequential consistency
3
julia> @atomic a.x + 1 # increment field x of a, with sequential consistency
3 => 4
julia> @atomic a.x # fetch field x of a, with sequential consistency
4
julia> @atomic max(a.x, 10) # change field x of a to the max value, with sequential consistency
4 => 10
julia> @atomic a.x max 5 # again change field x of a to the max value, with sequential consistency
10 => 10
此功能需要至少 Julia 1.7。
Base.@atomicswap
— 宏@atomicswap a.b.x = new
@atomicswap :sequentially_consistent a.b.x = new
将 new
存储到 a.b.x
并返回 a.b.x
的旧值。
此操作转换为 swapproperty!(a.b, :x, new)
调用。
有关更多详细信息,请参阅手册中的 每个字段原子操作 部分。
示例
julia> mutable struct Atomic{T}; @atomic x::T; end
julia> a = Atomic(1)
Atomic{Int64}(1)
julia> @atomicswap a.x = 2+2 # replace field x of a with 4, with sequential consistency
1
julia> @atomic a.x # fetch field x of a, with sequential consistency
4
此功能需要至少 Julia 1.7。
Base.@atomicreplace
— 宏@atomicreplace a.b.x expected => desired
@atomicreplace :sequentially_consistent a.b.x expected => desired
@atomicreplace :sequentially_consistent :monotonic a.b.x expected => desired
以原子方式执行由对表达的条件替换,返回值 (old, success::Bool)
。其中 success
指示是否已完成替换。
此操作转换为 replaceproperty!(a.b, :x, expected, desired)
调用。
有关更多详细信息,请参阅手册中的 每个字段原子操作 部分。
示例
julia> mutable struct Atomic{T}; @atomic x::T; end
julia> a = Atomic(1)
Atomic{Int64}(1)
julia> @atomicreplace a.x 1 => 2 # replace field x of a with 2 if it was 1, with sequential consistency
(old = 1, success = true)
julia> @atomic a.x # fetch field x of a, with sequential consistency
2
julia> @atomicreplace a.x 1 => 2 # replace field x of a with 2 if it was 1, with sequential consistency
(old = 2, success = false)
julia> xchg = 2 => 0; # replace field x of a with 0 if it was 2, with sequential consistency
julia> @atomicreplace a.x xchg
(old = 2, success = true)
julia> @atomic a.x # fetch field x of a, with sequential consistency
0
此功能需要至少 Julia 1.7。
以下 API 非常原始,并且可能会通过类似 unsafe_*
的包装器公开。
Core.Intrinsics.atomic_pointerref(pointer::Ptr{T}, order::Symbol) --> T
Core.Intrinsics.atomic_pointerset(pointer::Ptr{T}, new::T, order::Symbol) --> pointer
Core.Intrinsics.atomic_pointerswap(pointer::Ptr{T}, new::T, order::Symbol) --> old
Core.Intrinsics.atomic_pointermodify(pointer::Ptr{T}, function::(old::T,arg::S)->T, arg::S, order::Symbol) --> old
Core.Intrinsics.atomic_pointerreplace(pointer::Ptr{T}, expected::Any, new::T, success_order::Symbol, failure_order::Symbol) --> (old, cmp)
以下 API 已弃用,但对其的支持可能会保留几个版本。
Base.Threads.Atomic
— 类型Threads.Atomic{T}
保存对类型为 T
的对象的引用,确保它仅以原子方式访问,即以线程安全的方式访问。
只有某些“简单”类型可以以原子方式使用,即基本布尔型、整型和浮点型。这些是 Bool
、Int8
...Int128
、UInt8
...UInt128
和 Float16
...Float64
。
可以从非原子值创建新的原子对象;如果没有指定任何值,则原子对象将初始化为零。
可以使用 []
符号访问原子对象
示例
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> x[] = 1
1
julia> x[]
1
原子操作使用 atomic_
前缀,例如 atomic_add!
、atomic_xchg!
等。
Base.Threads.atomic_cas!
— 函数Threads.atomic_cas!(x::Atomic{T}, cmp::T, newval::T) where T
以原子方式比较并设置 x
以原子方式将 x
中的值与 cmp
进行比较。如果相等,则将 newval
写入 x
。否则,保留 x
不变。返回 x
中的旧值。通过将返回值与 cmp
进行比较(通过 ===
),可以知道 x
是否已修改,并且现在是否包含新值 newval
。
有关更多详细信息,请参阅 LLVM 的 cmpxchg
指令。
此函数可用于实现事务语义。在事务之前,会记录 x
中的值。在事务之后,只有在 x
在此期间未被修改的情况下才会存储新值。
示例
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_cas!(x, 4, 2);
julia> x
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_cas!(x, 3, 2);
julia> x
Base.Threads.Atomic{Int64}(2)
Base.Threads.atomic_xchg!
— 函数Threads.atomic_xchg!(x::Atomic{T}, newval::T) where T
以原子方式交换 x
中的值
以原子方式将 x
中的值与 newval
进行交换。返回旧值。
有关更多详细信息,请参阅 LLVM 的 atomicrmw xchg
指令。
示例
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_xchg!(x, 2)
3
julia> x[]
2
Base.Threads.atomic_add!
— 函数Threads.atomic_add!(x::Atomic{T}, val::T) where T <: ArithmeticTypes
以原子方式将 val
添加到 x
以原子方式执行 x[] += val
。返回旧值。未为 Atomic{Bool}
定义。
有关更多详细信息,请参阅 LLVM 的 atomicrmw add
指令。
示例
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_add!(x, 2)
3
julia> x[]
5
Base.Threads.atomic_sub!
— 函数Threads.atomic_sub!(x::Atomic{T}, val::T) where T <: ArithmeticTypes
以原子方式从 x
中减去 val
以原子方式执行 x[] -= val
。返回旧值。未为 Atomic{Bool}
定义。
有关更多详细信息,请参阅 LLVM 的 atomicrmw sub
指令。
示例
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_sub!(x, 2)
3
julia> x[]
1
Base.Threads.atomic_and!
— 函数Threads.atomic_and!(x::Atomic{T}, val::T) where T
以原子方式将 x
与 val
进行按位与运算
以原子方式执行 x[] &= val
。返回旧值。
有关更多详细信息,请参阅 LLVM 的 atomicrmw and
指令。
示例
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_and!(x, 2)
3
julia> x[]
2
Base.Threads.atomic_nand!
— 函数Threads.atomic_nand!(x::Atomic{T}, val::T) where T
以原子方式将 x
与 val
进行按位非与运算(非与)
以原子方式执行 x[] = ~(x[] & val)
。返回旧值。
有关更多详细信息,请参阅 LLVM 的 atomicrmw nand
指令。
示例
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_nand!(x, 2)
3
julia> x[]
-3
Base.Threads.atomic_or!
— 函数Threads.atomic_or!(x::Atomic{T}, val::T) where T
以原子方式将 x
与 val
进行按位或运算
以原子方式执行 x[] |= val
。返回旧值。
有关更多详细信息,请参阅 LLVM 的 atomicrmw or
指令。
示例
julia> x = Threads.Atomic{Int}(5)
Base.Threads.Atomic{Int64}(5)
julia> Threads.atomic_or!(x, 7)
5
julia> x[]
7
Base.Threads.atomic_xor!
— 函数Threads.atomic_xor!(x::Atomic{T}, val::T) where T
以原子方式将 x
与 val
进行按位异或运算(异或)
以原子方式执行 x[] ^= val
。返回旧值。
有关更多详细信息,请参阅 LLVM 的 atomicrmw xor
指令。
示例
julia> x = Threads.Atomic{Int}(5)
Base.Threads.Atomic{Int64}(5)
julia> Threads.atomic_xor!(x, 7)
5
julia> x[]
2
Base.Threads.atomic_max!
— 函数Threads.atomic_max!(x::Atomic{T}, val::T) where T
以原子方式将 x
和 val
的最大值存储到 x
中
以原子方式执行 x[] = max(x[], val)
。返回旧值。
有关更多详细信息,请参阅 LLVM 的 atomicrmw max
指令。
示例
julia> x = Threads.Atomic{Int}(5)
Base.Threads.Atomic{Int64}(5)
julia> Threads.atomic_max!(x, 7)
5
julia> x[]
7
Base.Threads.atomic_min!
— 函数Threads.atomic_min!(x::Atomic{T}, val::T) where T
以原子方式将 x
和 val
的最小值存储到 x
中
以原子方式执行 x[] = min(x[], val)
。返回旧值。
有关更多详细信息,请参阅 LLVM 的 atomicrmw min
指令。
示例
julia> x = Threads.Atomic{Int}(7)
Base.Threads.Atomic{Int64}(7)
julia> Threads.atomic_min!(x, 5)
7
julia> x[]
5
Base.Threads.atomic_fence
— 函数Threads.atomic_fence()
插入顺序一致性内存栅栏
插入具有顺序一致性排序语义的内存栅栏。存在需要此功能的算法,例如,获取/释放排序不足的算法。
这很可能是非常昂贵的操作。鉴于 Julia 中所有其他原子操作都具有获取/释放语义,在大多数情况下不需要显式栅栏。
有关更多详细信息,请参阅 LLVM 的 fence
指令。
使用 libuv 线程池进行 ccall(实验性)
Base.@threadcall
— 宏@threadcall((cfunc, clib), rettype, (argtypes...), argvals...)
@threadcall
宏的调用方式与 ccall
相同,但工作是在不同的线程中完成的。这在您想调用阻塞 C 函数而不导致当前 julia
线程被阻塞时非常有用。并发性受 libuv 线程池大小的限制,默认情况下为 4 个线程,但可以通过设置 UV_THREADPOOL_SIZE
环境变量并重新启动 julia
进程来增加。
请注意,被调用的函数不应该调用回 Julia。
低级同步原语
这些构建块用于创建常规同步对象。
Base.Threads.SpinLock
— 类型SpinLock()
创建一个非可重入的测试和测试以及设置自旋锁。递归使用会导致死锁。这种类型的锁仅应在执行时间很短且不阻塞的代码周围使用(例如,执行 I/O)。一般情况下,应使用 ReentrantLock
代替。
每个 lock
必须与一个 unlock
相匹配。如果 !islocked(lck::SpinLock)
成立,则 trylock(lck)
将成功,除非有其他任务尝试“同时”持有该锁。
测试和测试以及设置自旋锁在最多大约 30 个竞争线程时最快。如果您有更多竞争,则应考虑不同的同步方法。