多线程

Base.Threads.@threads
Threads.@threads [schedule] for ... end

一个宏,用于并行执行 for 循环。迭代空间被分配给粗粒度任务。此策略可以通过 schedule 参数指定。循环的执行将等待所有迭代的评估完成。

另请参阅:@spawnDistributed 中的 pmap

扩展帮助

语义

除非调度选项指定了更强的保证,否则 @threads 宏执行的循环具有以下语义。

@threads 宏以不确定的顺序和潜在的并发方式执行循环体。它不指定任务和工作线程的确切分配。每次执行的分配可能不同。循环体代码(包括从其传递调用到的任何代码)不得对迭代分配给任务或执行它们的 worker 线程做出任何假设。每次迭代的循环体必须能够独立于其他迭代取得进展,并且必须没有数据竞争。因此,跨迭代的无效同步可能会导致死锁,而未同步的内存访问可能会导致未定义的行为。

例如,上述条件意味着

  • 在一次迭代中获取的锁 *必须* 在同一迭代中释放。
  • 使用 Channel 等阻塞原语在迭代之间进行通信是不正确的。
  • 仅写入未跨迭代共享的位置(除非使用锁或原子操作)。
  • 除非使用 :static 调度,否则 threadid() 的值即使在单个迭代中也可能发生变化。见 任务迁移

调度器

如果没有调度器参数,则确切的调度是未指定的,并且在不同的 Julia 版本中有所不同。目前,当没有指定调度器时,将使用 :dynamic

Julia 1.5

schedule 参数从 Julia 1.5 开始可用。

:dynamic (默认)

:dynamic 调度器以动态方式将迭代执行到可用的工作线程。当前实现假定每次迭代的工作负载是均匀的。但是,此假设可能会在将来移除。

此调度选项只是一个对底层执行机制的提示。但是,可以预期一些属性。:dynamic 调度器使用的 Task 数受可用工作线程数(Threads.threadpoolsize())的小常数倍数的限制。每个任务处理迭代空间的连续区域。因此,@threads :dynamic for x in xs; f(x); end 通常比 @sync for x in xs; @spawn f(x); end 更有效,如果 length(xs) 显著大于工作线程数,并且 f(x) 的运行时间相对小于生成和同步任务的成本(通常小于 10 微秒)。

Julia 1.8

schedule 参数的 :dynamic 选项可用,并且是 Julia 1.8 中的默认选项。

:static

:static 调度器为每个线程创建一个任务,并将迭代在它们之间平均分配,并将每个任务专门分配给每个线程。特别是,threadid() 的值保证在一次迭代中保持不变。如果从另一个 @threads 循环中或从除 1 以外的线程中使用 :static,则会出错。

注意

:static 调度是为了支持 Julia 1.3 之前编写的代码的转换。在新编写的库函数中,不建议使用 :static 调度,因为使用此选项的函数不能从任意工作线程调用。

示例

为了说明不同的调度策略,考虑以下函数 busywait,它包含一个不屈服的计时循环,该循环运行特定秒数。

julia> function busywait(seconds)
            tstart = time_ns()
            while (time_ns() - tstart) / 1e9 < seconds
            end
        end

julia> @time begin
            Threads.@spawn busywait(5)
            Threads.@threads :static for i in 1:Threads.threadpoolsize()
                busywait(1)
            end
        end
6.003001 seconds (16.33 k allocations: 899.255 KiB, 0.25% compilation time)

julia> @time begin
            Threads.@spawn busywait(5)
            Threads.@threads :dynamic for i in 1:Threads.threadpoolsize()
                busywait(1)
            end
        end
2.012056 seconds (16.05 k allocations: 883.919 KiB, 0.66% compilation time)

:dynamic 示例需要 2 秒,因为一个空闲线程能够运行两个 1 秒迭代以完成 for 循环。

来源
Base.Threads.foreach函数
Threads.foreach(f, channel::Channel;
                schedule::Threads.AbstractSchedule=Threads.FairSchedule(),
                ntasks=Threads.threadpoolsize())

类似于 foreach(f, channel),但对 channel 的迭代和对 f 的调用由 Threads.@spawn 生成的 ntasks 个任务分割。此函数将在返回之前等待所有内部生成的的任务完成。

如果 schedule isa FairSchedule,则 Threads.foreach 将尝试以一种允许 Julia 调度程序更自由地在线程之间负载平衡工作项的方式生成任务。这种方法通常具有更高的每项开销,但与其他多线程工作负载一起使用时,其性能可能比 StaticSchedule 更好。

如果 schedule isa StaticSchedule,则 Threads.foreach 将以一种比 FairSchedule 具有更低的每项开销的方式生成任务,但不太适合负载平衡。因此,这种方法可能更适合细粒度、均匀的工作负载,但在与其他多线程工作负载一起使用时,其性能可能比 FairSchedule 更差。

示例

julia> n = 20

julia> c = Channel{Int}(ch -> foreach(i -> put!(ch, i), 1:n), 1)

julia> d = Channel{Int}(n) do ch
           f = i -> put!(ch, i^2)
           Threads.foreach(f, c)
       end

julia> collect(d)
collect(d) = [1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400]
Julia 1.6

此函数需要 Julia 1.6 或更高版本。

来源
Base.Threads.@spawn
Threads.@spawn [:default|:interactive] expr

创建一个 Task 并将其 schedule 到指定的线程池中的任何可用线程上(如果未指定,则为 :default)。一旦可用,任务将被分配给一个线程。要等待任务完成,请在该宏的结果上调用 wait,或者调用 fetch 以等待并获取其返回值。

值可以通过 $ 插入 @spawn,这会将值直接复制到构造的底层闭包中。这允许您插入变量的 *值*,将异步代码与当前任务中变量值的更改隔离开来。

注意

如果任务屈服,则任务运行的线程可能会发生变化,因此 threadid() 不应被视为任务的常量。见 任务迁移,以及更广泛的 多线程 手册以获取更多重要注意事项。另请参阅关于 线程池 的章节。

Julia 1.3

此宏从 Julia 1.3 开始可用。

Julia 1.4

通过 $ 插入值从 Julia 1.4 开始可用。

Julia 1.9

从 Julia 1.9 开始可以指定线程池。

示例

julia> t() = println("Hello from ", Threads.threadid());

julia> tasks = fetch.([Threads.@spawn t() for i in 1:4]);
Hello from 1
Hello from 1
Hello from 3
Hello from 4
来源
Base.Threads.threadid函数
Threads.threadid() -> Int

获取当前执行线程的 ID 号。主线程的 ID 为 1

示例

julia> Threads.threadid()
1

julia> Threads.@threads for i in 1:4
          println(Threads.threadid())
       end
4
2
5
4
注意

如果任务屈服,则任务运行的线程可能会发生变化,这被称为 任务迁移。出于这个原因,在大多数情况下,使用 threadid() 来索引到例如缓冲区或有状态对象的向量中是不安全的。

来源
Base.Threads.maxthreadid函数
Threads.maxthreadid() -> Int

获取 Julia 进程可用的线程数(跨所有线程池)的下限,具有原子获取语义。结果始终大于或等于 threadid() 以及您在调用 maxthreadid 之前能够观察到的任何任务的 threadid(task)

来源
Base.Threads.nthreads函数
Threads.nthreads(:default | :interactive) -> Int

获取指定线程池中的当前线程数。:interactive 中的线程的 ID 号为 1:nthreads(:interactive):default 中的线程的 ID 号为 nthreads(:interactive) .+ (1:nthreads(:default))

另请参阅 LinearAlgebra 标准库中的 BLAS.get_num_threadsBLAS.set_num_threads,以及 Distributed 标准库中的 nprocs()Threads.maxthreadid()

来源
Base.Threads.threadpool函数
Threads.threadpool(tid = threadid()) -> Symbol

返回指定线程的线程池;:default:interactive:foreign 之一。

来源
Base.Threads.threadpoolsize函数
Threads.threadpoolsize(pool::Symbol = :default) -> Int

获取默认线程池(或指定线程池)可用的线程数量。

另请参阅:LinearAlgebra 标准库中的 BLAS.get_num_threadsBLAS.set_num_threads,以及 Distributed 标准库中的 nprocs()

来源
Base.Threads.ngcthreads函数
Threads.ngcthreads() -> Int

返回当前配置的 GC 线程数量。这包括标记线程和并发扫描线程。

来源

另请参阅 多线程

原子操作

Base.@atomic
@atomic var
@atomic order ex

如果 ex 是支持的表达式,则将 varex 标记为以原子方式执行。如果没有指定 order,则默认为 :sequentially_consistent

@atomic a.b.x = new
@atomic a.b.x += addend
@atomic :release a.b.x = new
@atomic :acquire_release a.b.x += addend

以原子方式执行在右侧表达的存储操作并返回新值。

使用 = 时,此操作转换为 setproperty!(a.b, :x, new) 调用。使用任何运算符时,此操作也将转换为 modifyproperty!(a.b, :x, +, addend)[2] 调用。

@atomic a.b.x max arg2
@atomic a.b.x + arg2
@atomic max(a.b.x, arg2)
@atomic :acquire_release max(a.b.x, arg2)
@atomic :acquire_release a.b.x + arg2
@atomic :acquire_release a.b.x max arg2

以原子方式执行在右侧表达的二元操作。将结果存储到第一个参数中的字段并返回值 (old, new)

此操作转换为 modifyproperty!(a.b, :x, func, arg2) 调用。

有关更多详细信息,请参阅手册中的 每个字段原子操作 部分。

示例

julia> mutable struct Atomic{T}; @atomic x::T; end

julia> a = Atomic(1)
Atomic{Int64}(1)

julia> @atomic a.x # fetch field x of a, with sequential consistency
1

julia> @atomic :sequentially_consistent a.x = 2 # set field x of a, with sequential consistency
2

julia> @atomic a.x += 1 # increment field x of a, with sequential consistency
3

julia> @atomic a.x + 1 # increment field x of a, with sequential consistency
3 => 4

julia> @atomic a.x # fetch field x of a, with sequential consistency
4

julia> @atomic max(a.x, 10) # change field x of a to the max value, with sequential consistency
4 => 10

julia> @atomic a.x max 5 # again change field x of a to the max value, with sequential consistency
10 => 10
Julia 1.7

此功能需要至少 Julia 1.7。

来源
Base.@atomicswap
@atomicswap a.b.x = new
@atomicswap :sequentially_consistent a.b.x = new

new 存储到 a.b.x 并返回 a.b.x 的旧值。

此操作转换为 swapproperty!(a.b, :x, new) 调用。

有关更多详细信息,请参阅手册中的 每个字段原子操作 部分。

示例

julia> mutable struct Atomic{T}; @atomic x::T; end

julia> a = Atomic(1)
Atomic{Int64}(1)

julia> @atomicswap a.x = 2+2 # replace field x of a with 4, with sequential consistency
1

julia> @atomic a.x # fetch field x of a, with sequential consistency
4
Julia 1.7

此功能需要至少 Julia 1.7。

来源
Base.@atomicreplace
@atomicreplace a.b.x expected => desired
@atomicreplace :sequentially_consistent a.b.x expected => desired
@atomicreplace :sequentially_consistent :monotonic a.b.x expected => desired

以原子方式执行由对表达的条件替换,返回值 (old, success::Bool)。其中 success 指示是否已完成替换。

此操作转换为 replaceproperty!(a.b, :x, expected, desired) 调用。

有关更多详细信息,请参阅手册中的 每个字段原子操作 部分。

示例

julia> mutable struct Atomic{T}; @atomic x::T; end

julia> a = Atomic(1)
Atomic{Int64}(1)

julia> @atomicreplace a.x 1 => 2 # replace field x of a with 2 if it was 1, with sequential consistency
(old = 1, success = true)

julia> @atomic a.x # fetch field x of a, with sequential consistency
2

julia> @atomicreplace a.x 1 => 2 # replace field x of a with 2 if it was 1, with sequential consistency
(old = 2, success = false)

julia> xchg = 2 => 0; # replace field x of a with 0 if it was 2, with sequential consistency

julia> @atomicreplace a.x xchg
(old = 2, success = true)

julia> @atomic a.x # fetch field x of a, with sequential consistency
0
Julia 1.7

此功能需要至少 Julia 1.7。

来源
注意

以下 API 非常原始,并且可能会通过类似 unsafe_* 的包装器公开。

Core.Intrinsics.atomic_pointerref(pointer::Ptr{T}, order::Symbol) --> T
Core.Intrinsics.atomic_pointerset(pointer::Ptr{T}, new::T, order::Symbol) --> pointer
Core.Intrinsics.atomic_pointerswap(pointer::Ptr{T}, new::T, order::Symbol) --> old
Core.Intrinsics.atomic_pointermodify(pointer::Ptr{T}, function::(old::T,arg::S)->T, arg::S, order::Symbol) --> old
Core.Intrinsics.atomic_pointerreplace(pointer::Ptr{T}, expected::Any, new::T, success_order::Symbol, failure_order::Symbol) --> (old, cmp)
警告

以下 API 已弃用,但对其的支持可能会保留几个版本。

Base.Threads.Atomic类型
Threads.Atomic{T}

保存对类型为 T 的对象的引用,确保它仅以原子方式访问,即以线程安全的方式访问。

只有某些“简单”类型可以以原子方式使用,即基本布尔型、整型和浮点型。这些是 BoolInt8...Int128UInt8...UInt128Float16...Float64

可以从非原子值创建新的原子对象;如果没有指定任何值,则原子对象将初始化为零。

可以使用 [] 符号访问原子对象

示例

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> x[] = 1
1

julia> x[]
1

原子操作使用 atomic_ 前缀,例如 atomic_add!atomic_xchg! 等。

来源
Base.Threads.atomic_cas!函数
Threads.atomic_cas!(x::Atomic{T}, cmp::T, newval::T) where T

以原子方式比较并设置 x

以原子方式将 x 中的值与 cmp 进行比较。如果相等,则将 newval 写入 x。否则,保留 x 不变。返回 x 中的旧值。通过将返回值与 cmp 进行比较(通过 ===),可以知道 x 是否已修改,并且现在是否包含新值 newval

有关更多详细信息,请参阅 LLVM 的 cmpxchg 指令。

此函数可用于实现事务语义。在事务之前,会记录 x 中的值。在事务之后,只有在 x 在此期间未被修改的情况下才会存储新值。

示例

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_cas!(x, 4, 2);

julia> x
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_cas!(x, 3, 2);

julia> x
Base.Threads.Atomic{Int64}(2)
来源
Base.Threads.atomic_xchg!函数
Threads.atomic_xchg!(x::Atomic{T}, newval::T) where T

以原子方式交换 x 中的值

以原子方式将 x 中的值与 newval 进行交换。返回值。

有关更多详细信息,请参阅 LLVM 的 atomicrmw xchg 指令。

示例

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_xchg!(x, 2)
3

julia> x[]
2
来源
Base.Threads.atomic_add!函数
Threads.atomic_add!(x::Atomic{T}, val::T) where T <: ArithmeticTypes

以原子方式将 val 添加到 x

以原子方式执行 x[] += val。返回值。未为 Atomic{Bool} 定义。

有关更多详细信息,请参阅 LLVM 的 atomicrmw add 指令。

示例

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_add!(x, 2)
3

julia> x[]
5
来源
Base.Threads.atomic_sub!函数
Threads.atomic_sub!(x::Atomic{T}, val::T) where T <: ArithmeticTypes

以原子方式从 x 中减去 val

以原子方式执行 x[] -= val。返回值。未为 Atomic{Bool} 定义。

有关更多详细信息,请参阅 LLVM 的 atomicrmw sub 指令。

示例

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_sub!(x, 2)
3

julia> x[]
1
来源
Base.Threads.atomic_and!函数
Threads.atomic_and!(x::Atomic{T}, val::T) where T

以原子方式将 xval 进行按位与运算

以原子方式执行 x[] &= val。返回值。

有关更多详细信息,请参阅 LLVM 的 atomicrmw and 指令。

示例

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_and!(x, 2)
3

julia> x[]
2
来源
Base.Threads.atomic_nand!函数
Threads.atomic_nand!(x::Atomic{T}, val::T) where T

以原子方式将 xval 进行按位非与运算(非与)

以原子方式执行 x[] = ~(x[] & val)。返回值。

有关更多详细信息,请参阅 LLVM 的 atomicrmw nand 指令。

示例

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_nand!(x, 2)
3

julia> x[]
-3
来源
Base.Threads.atomic_or!函数
Threads.atomic_or!(x::Atomic{T}, val::T) where T

以原子方式将 xval 进行按位或运算

以原子方式执行 x[] |= val。返回值。

有关更多详细信息,请参阅 LLVM 的 atomicrmw or 指令。

示例

julia> x = Threads.Atomic{Int}(5)
Base.Threads.Atomic{Int64}(5)

julia> Threads.atomic_or!(x, 7)
5

julia> x[]
7
来源
Base.Threads.atomic_xor!函数
Threads.atomic_xor!(x::Atomic{T}, val::T) where T

以原子方式将 xval 进行按位异或运算(异或)

以原子方式执行 x[] ^= val。返回值。

有关更多详细信息,请参阅 LLVM 的 atomicrmw xor 指令。

示例

julia> x = Threads.Atomic{Int}(5)
Base.Threads.Atomic{Int64}(5)

julia> Threads.atomic_xor!(x, 7)
5

julia> x[]
2
来源
Base.Threads.atomic_max!函数
Threads.atomic_max!(x::Atomic{T}, val::T) where T

以原子方式将 xval 的最大值存储到 x

以原子方式执行 x[] = max(x[], val)。返回值。

有关更多详细信息,请参阅 LLVM 的 atomicrmw max 指令。

示例

julia> x = Threads.Atomic{Int}(5)
Base.Threads.Atomic{Int64}(5)

julia> Threads.atomic_max!(x, 7)
5

julia> x[]
7
来源
Base.Threads.atomic_min!函数
Threads.atomic_min!(x::Atomic{T}, val::T) where T

以原子方式将 xval 的最小值存储到 x

以原子方式执行 x[] = min(x[], val)。返回值。

有关更多详细信息,请参阅 LLVM 的 atomicrmw min 指令。

示例

julia> x = Threads.Atomic{Int}(7)
Base.Threads.Atomic{Int64}(7)

julia> Threads.atomic_min!(x, 5)
7

julia> x[]
5
来源
Base.Threads.atomic_fence函数
Threads.atomic_fence()

插入顺序一致性内存栅栏

插入具有顺序一致性排序语义的内存栅栏。存在需要此功能的算法,例如,获取/释放排序不足的算法。

这很可能是非常昂贵的操作。鉴于 Julia 中所有其他原子操作都具有获取/释放语义,在大多数情况下不需要显式栅栏。

有关更多详细信息,请参阅 LLVM 的 fence 指令。

来源

使用 libuv 线程池进行 ccall(实验性)

Base.@threadcall
@threadcall((cfunc, clib), rettype, (argtypes...), argvals...)

@threadcall 宏的调用方式与 ccall 相同,但工作是在不同的线程中完成的。这在您想调用阻塞 C 函数而不导致当前 julia 线程被阻塞时非常有用。并发性受 libuv 线程池大小的限制,默认情况下为 4 个线程,但可以通过设置 UV_THREADPOOL_SIZE 环境变量并重新启动 julia 进程来增加。

请注意,被调用的函数不应该调用回 Julia。

来源

低级同步原语

这些构建块用于创建常规同步对象。

Base.Threads.SpinLock类型
SpinLock()

创建一个非可重入的测试和测试以及设置自旋锁。递归使用会导致死锁。这种类型的锁仅应在执行时间很短且不阻塞的代码周围使用(例如,执行 I/O)。一般情况下,应使用 ReentrantLock 代替。

每个 lock 必须与一个 unlock 相匹配。如果 !islocked(lck::SpinLock) 成立,则 trylock(lck) 将成功,除非有其他任务尝试“同时”持有该锁。

测试和测试以及设置自旋锁在最多大约 30 个竞争线程时最快。如果您有更多竞争,则应考虑不同的同步方法。

来源