多线程
访问此博客文章,了解 Julia 多线程功能的介绍。
使用多个线程启动 Julia
默认情况下,Julia 以单个执行线程启动。可以使用命令Threads.nthreads()
验证这一点。
julia> Threads.nthreads()
1
执行线程的数量可以通过使用-t
/--threads
命令行参数或使用JULIA_NUM_THREADS
环境变量来控制。当两者都指定时,-t
/--threads
优先。
线程数可以指定为整数 (--threads=4
) 或 auto
(--threads=auto
),其中 auto
尝试推断要使用的有用的默认线程数 (有关更多详细信息,请参阅命令行选项)。
-t
/--threads
命令行参数至少需要 Julia 1.5。在较旧的版本中,必须改用环境变量。
使用 auto
作为环境变量 JULIA_NUM_THREADS
的值至少需要 Julia 1.7。在较旧的版本中,此值将被忽略。
让我们使用 4 个线程启动 Julia
$ julia --threads 4
让我们验证是否有 4 个线程可用。
julia> Threads.nthreads()
4
但我们目前在主线程上。要检查,我们使用函数Threads.threadid
julia> Threads.threadid()
1
如果您更喜欢使用环境变量,则可以在 Bash (Linux/macOS) 中按如下方式设置它
export JULIA_NUM_THREADS=4
Linux/macOS 上的 C shell,Windows 上的 CMD
set JULIA_NUM_THREADS=4
Windows 上的 Powershell
$env:JULIA_NUM_THREADS=4
请注意,这必须在启动 Julia *之前*完成。
使用-t
/--threads
指定的线程数将传播到使用-p
/--procs
或--machine-file
命令行选项生成的 worker 进程。例如,julia -p2 -t2
生成一个具有 2 个 worker 进程的主进程,并且所有三个进程都启用了 2 个线程。要更细粒度地控制 worker 线程,请使用addprocs
并将-t
/--threads
作为exeflags
传递。
多个 GC 线程
垃圾回收器 (GC) 可以使用多个线程。使用量要么是计算 worker 线程数量的一半,要么由--gcthreads
命令行参数或使用JULIA_NUM_GC_THREADS
环境变量配置。
--gcthreads
命令行参数至少需要 Julia 1.10。
线程池
当程序的线程忙于运行许多任务时,任务可能会遇到延迟,这可能会对程序的响应性和交互性产生负面影响。为了解决这个问题,您可以在Threads.@spawn
任务时指定该任务是交互式的。
using Base.Threads
@spawn :interactive f()
交互式任务应避免执行高延迟操作,如果它们是长时间运行的任务,则应经常 yield。
Julia 可以使用一个或多个线程来运行交互式任务。
$ julia --threads 3,1
环境变量 JULIA_NUM_THREADS
也可以类似地使用
export JULIA_NUM_THREADS=3,1
这将使用 3 个线程在 :default
线程池中启动 Julia,并在 :interactive
线程池中启动 1 个线程。
julia> using Base.Threads
julia> nthreadpools()
2
julia> threadpool() # the main thread is in the interactive thread pool
:interactive
julia> nthreads(:default)
3
julia> nthreads(:interactive)
1
julia> nthreads()
3
nthreads
的零参数版本返回默认池中的线程数。
根据 Julia 是否已使用交互式线程启动,主线程位于默认线程池或交互式线程池中。
这两个数字中的任何一个或两个都可以替换为单词 auto
,这会导致 Julia 选择一个合理的默认值。
通信和同步
尽管 Julia 的线程可以通过共享内存进行通信,但编写正确且无数据竞争的多线程代码非常困难。Julia 的Channel
是线程安全的,可用于安全通信。
数据竞争自由
您完全有责任确保您的程序没有数据竞争,并且如果您不遵守此要求,则不能保证此处承诺的任何内容。观察到的结果可能非常不直观。
确保这一点的最佳方法是在对可以从多个线程观察到的任何数据进行访问时获取锁。例如,在大多数情况下,您应该使用以下代码模式
julia> lock(lk) do
use(a)
end
julia> begin
lock(lk)
try
use(a)
finally
unlock(lk)
end
end
其中 lk
是一个锁 (例如 ReentrantLock()
) 且 a
是数据。
此外,在存在数据竞争的情况下,Julia 不是内存安全的。如果另一个线程可能会写入数据,请务必小心读取*任何*数据!相反,在更改由其他线程访问的数据 (例如,分配给全局变量或闭包变量) 时,始终使用上面的锁模式。
Thread 1:
global b = false
global a = rand()
global b = true
Thread 2:
while !b; end
bad_read1(a) # it is NOT safe to access `a` here!
Thread 3:
while !@isdefined(a); end
bad_read2(a) # it is NOT safe to access `a` here
@threads
宏
让我们使用我们的原生线程做一个简单的例子。让我们创建一个零数组
julia> a = zeros(10)
10-element Vector{Float64}:
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
让我们使用 4 个线程同时操作此数组。我们将让每个线程将其线程 ID 写入每个位置。
Julia 使用Threads.@threads
宏支持并行循环。此宏附加在 for
循环前面,以指示 Julia 该循环是多线程区域
julia> Threads.@threads for i = 1:10
a[i] = Threads.threadid()
end
迭代空间在线程之间分割,之后每个线程将其线程 ID 写入其分配的位置
julia> a
10-element Vector{Float64}:
1.0
1.0
1.0
2.0
2.0
2.0
3.0
3.0
4.0
4.0
请注意,Threads.@threads
没有像@distributed
这样的可选归约参数。
在没有数据竞争的情况下使用 @threads
以一个简单的求和为例
julia> function sum_single(a)
s = 0
for i in a
s += i
end
s
end
sum_single (generic function with 1 method)
julia> sum_single(1:1_000_000)
500000500000
简单地添加 @threads
会暴露出数据竞争,多个线程同时读取和写入 s
。
julia> function sum_multi_bad(a)
s = 0
Threads.@threads for i in a
s += i
end
s
end
sum_multi_bad (generic function with 1 method)
julia> sum_multi_bad(1:1_000_000)
70140554652
请注意,结果不是 500000500000
,因为它应该是,并且很可能每次评估都会发生变化。
要解决此问题,可以使用特定于任务的缓冲区将总和分割成无竞争的块。这里 sum_single
被重用,它有自己的内部缓冲区 s
,并且向量 a
通过 nthreads()
个 @spawn
-ed 任务被分割成 nthreads()
个块以进行并行工作。
julia> function sum_multi_good(a)
chunks = Iterators.partition(a, length(a) ÷ Threads.nthreads())
tasks = map(chunks) do chunk
Threads.@spawn sum_single(chunk)
end
chunk_sums = fetch.(tasks)
return sum_single(chunk_sums)
end
sum_multi_good (generic function with 1 method)
julia> sum_multi_good(1:1_000_000)
500000500000
缓冲区不应基于threadid()
进行管理,例如buffers = zeros(Threads.nthreads())
,因为并发任务可能会挂起,这意味着多个并发任务可能会在给定的线程上使用相同的缓冲区,从而引入数据竞争的风险。此外,当有多个线程可用时,任务可能会在挂起点更改线程,这被称为任务迁移。
另一种选择是在跨任务/线程共享的变量上使用原子操作,这可能根据操作的特性而提高性能。
原子操作
Julia 支持以原子方式访问和修改值,即以线程安全的方式来避免竞争条件。一个值(必须是基本类型)可以包装为Threads.Atomic
来指示它必须以这种方式访问。这里我们可以看到一个例子
julia> i = Threads.Atomic{Int}(0);
julia> ids = zeros(4);
julia> old_is = zeros(4);
julia> Threads.@threads for id in 1:4
old_is[id] = Threads.atomic_add!(i, id)
ids[id] = id
end
julia> old_is
4-element Vector{Float64}:
0.0
1.0
7.0
3.0
julia> i[]
10
julia> ids
4-element Vector{Float64}:
1.0
2.0
3.0
4.0
如果我们尝试在没有原子标签的情况下进行加法,则由于竞争条件,我们可能会得到错误的答案。一个如果没有避免竞争将会发生什么的例子
julia> using Base.Threads
julia> Threads.nthreads()
4
julia> acc = Ref(0)
Base.RefValue{Int64}(0)
julia> @threads for i in 1:1000
acc[] += 1
end
julia> acc[]
926
julia> acc = Atomic{Int64}(0)
Atomic{Int64}(0)
julia> @threads for i in 1:1000
atomic_add!(acc, 1)
end
julia> acc[]
1000
按字段原子操作
我们还可以使用@atomic
、@atomicswap
和@atomicreplace
宏在更细粒度的级别上使用原子操作。
内存模型的具体细节以及设计的其他细节都写在了Julia 原子操作宣言中,该宣言稍后将正式发布。
结构体声明中的任何字段都可以用@atomic
装饰,然后任何写入也必须用@atomic
标记,并且必须使用定义的原子顺序之一(:monotonic
、:acquire
、:release
、:acquire_release
或:sequentially_consistent
)。任何对原子字段的读取也可以用原子顺序约束进行注释,或者如果未指定,则将使用单调(松散)顺序进行。
按字段原子操作需要 Julia 1.7 或更高版本。
副作用和可变函数参数
在使用多线程时,当使用非纯函数时,我们必须小心,因为我们可能会得到错误的答案。例如,根据约定,名称以!
结尾的函数会修改其参数,因此不是纯函数。
@threadcall
外部库(例如通过ccall
调用的库)对 Julia 的基于任务的 I/O 机制提出了问题。如果 C 库执行阻塞操作,则会阻止 Julia 调度程序执行任何其他任务,直到调用返回。(例外情况是调用回 Julia 的自定义 C 代码的调用,然后可能会挂起,或者调用jl_yield()
的 C 代码,它是yield
的 C 等价物。)
@threadcall
宏提供了一种避免在这种情况下使执行停滞的方法。它安排一个 C 函数在单独的线程中执行。为此使用一个默认大小为 4 的线程池。线程池的大小通过环境变量UV_THREADPOOL_SIZE
控制。在等待空闲线程期间,以及一旦线程可用后在函数执行期间,请求任务(在主 Julia 事件循环上)会让位于其他任务。请注意,@threadcall
不会在执行完成之前返回。因此,从用户的角度来看,它就像其他 Julia API 一样是阻塞调用。
非常重要的是,被调用的函数不要回调到 Julia,因为它会导致段错误。
@threadcall
可能会在 Julia 的未来版本中被移除/更改。
注意事项
目前,如果用户代码没有数据竞争,则 Julia 运行时和标准库中的大多数操作都可以以线程安全的方式使用。但是,在某些领域,关于稳定线程支持的工作仍在进行中。多线程编程有许多固有的困难,如果使用线程的程序表现出异常或不希望的行为(例如崩溃或神秘的结果),则通常首先应该怀疑线程交互。
在 Julia 中使用线程时,需要注意一些具体的限制和警告
- 如果多个线程同时使用基本集合类型,其中至少一个线程修改了集合,则需要手动锁定(常见的例子包括数组上的
push!
或将项目插入Dict
)。 @spawn
使用的调度是非确定性的,不应依赖它。- 计算密集型、非内存分配任务可能会阻止垃圾回收在其他正在分配内存的线程中运行。在这些情况下,可能需要插入对
GC.safepoint()
的手动调用以允许 GC 运行。此限制将在未来删除。 - 避免并行运行顶级操作,例如
include
或类型、方法和模块定义的eval
。 - 请注意,如果启用了线程,库注册的终结器可能会中断。在能够放心地广泛采用线程之前,这可能需要在整个生态系统中进行一些过渡工作。有关更多详细信息,请参阅下一节。
任务迁移
任务在某个线程上开始运行后,如果该任务挂起,它可能会移动到另一个线程。
此类任务可能是使用@spawn
或@threads
启动的,尽管@threads
的:static
调度选项确实会冻结线程 ID。
这意味着在大多数情况下,不应将threadid()
视为任务内的一个常量,因此不应将其用于索引缓冲区或状态对象向量。
任务迁移是在 Julia 1.7 中引入的。在此之前,任务始终停留在其启动所在的同一线程上。
终结器的安全使用
因为终结器可以中断任何代码,所以它们在与任何全局状态交互时必须非常小心。不幸的是,使用终结器的主要原因是更新全局状态(纯函数通常作为终结器毫无意义)。这导致我们陷入困境。有几种方法可以解决此问题
在单线程情况下,代码可以调用内部
jl_gc_enable_finalizers
C 函数以防止在关键区域内调度终结器。在内部,这在某些函数(例如我们的 C 锁)中使用,以防止在执行某些操作(增量包加载、代码生成等)时发生递归。锁和此标志的组合可以用于使终结器安全。第二种策略(Base 在几个地方使用)是显式延迟终结器,直到它能够非递归地获取其锁。以下示例演示了如何将此策略应用于
Distributed.finalize_ref
function finalize_ref(r::AbstractRemoteRef) if r.where > 0 # Check if the finalizer is already run if islocked(client_refs) || !trylock(client_refs) # delay finalizer for later if we aren't free to acquire the lock finalizer(finalize_ref, r) return nothing end try # `lock` should always be followed by `try` if r.where > 0 # Must check again here # Do actual cleanup here r.where = 0 end finally unlock(client_refs) end end nothing end
相关的第三种策略是使用无挂起队列。我们目前还没有在 Base 中实现无锁队列,但
Base.IntrusiveLinkedListSynchronized{T}
是合适的。这通常是用于带有事件循环的代码的良好策略。例如,Gtk.jl
使用此策略来管理生命周期引用计数。在这种方法中,我们不会在finalizer
内部执行任何显式工作,而是将其添加到队列中以便在更安全的时间运行。事实上,Julia 的任务调度程序已经使用了这一点,因此将终结器定义为x -> @spawn do_cleanup(x)
是这种方法的一个例子。但是请注意,这不会控制do_cleanup
在哪个线程上运行,因此do_cleanup
仍然需要获取锁。如果您实现自己的队列,则不必如此,因为您可以明确地只从您的线程中清空该队列。