多进程和分布式计算
分布式内存并行计算的实现由模块 Distributed
提供,它是 Julia 附带的标准库的一部分。
大多数现代计算机都拥有多个 CPU,并且可以将多台计算机组合成一个集群。利用这些多个 CPU 的强大功能,可以更快地完成许多计算。有两个主要因素会影响性能:CPU 本身的速度以及它们访问内存的速度。在集群中,很明显,给定的 CPU 将最快地访问同一台计算机(节点)内的 RAM。也许更令人惊讶的是,由于主内存和 缓存 的速度差异,类似的问题也与典型的多核笔记本电脑相关。因此,良好的多处理环境应该允许控制特定 CPU 对内存块的“所有权”。Julia 提供了一个基于消息传递的多处理环境,允许程序同时在多个具有独立内存域的进程上运行。
Julia 的消息传递实现与其他环境(如 MPI[1])不同。Julia 中的通信通常是“单方面的”,这意味着程序员只需要在两个进程的操作中显式管理一个进程。此外,这些操作通常看起来不像“消息发送”和“消息接收”,而是类似于对用户函数的调用等更高级别的操作。
Julia 中的分布式编程建立在两个原语之上:远程引用和远程调用。远程引用是一个对象,可以从任何进程中使用它来引用存储在特定进程上的对象。远程调用是一个进程的请求,要求在另一个(可能是相同的)进程上对某些参数调用某个函数。
远程引用有两种形式:Future
和 RemoteChannel
。
远程调用会将其结果的 Future
返回。远程调用会立即返回;发出调用的进程继续执行其下一个操作,而远程调用则在其他地方发生。可以通过对返回的 Future
调用 wait
来等待远程调用的完成,并且可以使用 fetch
获取结果的完整值。
另一方面,RemoteChannel
是可重写的。例如,多个进程可以通过引用同一个远程 Channel
来协调其处理过程。
每个进程都有一个关联的标识符。提供交互式 Julia 提示的进程的 id
始终等于 1。默认情况下用于并行操作的进程称为“工作进程”。当只有一个进程时,进程 1 被认为是一个工作进程。否则,工作进程被认为是除进程 1 之外的所有进程。因此,添加 2 个或更多进程才能从并行处理方法(如 pmap
)中获益。如果您只想在主进程中执行其他操作,而长计算在工作进程上运行,则添加单个进程是有益的。
让我们尝试一下。从 julia -p n
开始,它会在本地机器上提供 n
个工作进程。通常,n
等于机器上的 CPU 线程(逻辑核心)数是有意义的。请注意,-p
参数会隐式加载模块 Distributed
。
$ julia -p 2
julia> r = remotecall(rand, 2, 2, 2)
Future(2, 1, 4, nothing)
julia> s = @spawnat 2 1 .+ fetch(r)
Future(2, 1, 5, nothing)
julia> fetch(s)
2×2 Array{Float64,2}:
1.18526 1.50912
1.16296 1.60607
remotecall
的第一个参数是要调用的函数。Julia 中的大多数并行编程都不会引用特定的进程或可用的进程数,但 remotecall
被认为是一个提供更精细控制的低级接口。 remotecall
的第二个参数是将执行工作的进程的 id
,其余参数将传递给被调用的函数。
如您所见,在第一行中,我们要求进程 2 构造一个 2x2 的随机矩阵,在第二行中,我们要求它为其添加 1。这两个计算的结果都存在于两个 future 中,即 r
和 s
。 @spawnat
宏在第一个参数指定的进程上评估第二个参数中的表达式。
有时您可能需要立即获得远程计算的值。这通常发生在您从远程对象读取以获取下一个本地操作所需的数据时。函数 remotecall_fetch
用于此目的。它等效于 fetch(remotecall(...))
,但效率更高。
julia> remotecall_fetch(r-> fetch(r)[1, 1], 2, r)
0.18526337335308085
这会在工作进程 2 上获取数组并返回第一个值。请注意,在这种情况下,fetch
不会移动任何数据,因为它是在拥有数组的工作进程上执行的。还可以编写
julia> remotecall_fetch(getindex, 2, r, 1, 1)
0.10824216411304866
请记住,getindex(r,1,1)
等效于 r[1,1]
,因此此调用获取 future r
的第一个元素。
为了简化操作,可以将符号 :any
传递给 @spawnat
,它会为您选择执行操作的位置
julia> r = @spawnat :any rand(2,2)
Future(2, 1, 4, nothing)
julia> s = @spawnat :any 1 .+ fetch(r)
Future(3, 1, 5, nothing)
julia> fetch(s)
2×2 Array{Float64,2}:
1.38854 1.9098
1.20939 1.57158
请注意,我们使用了 1 .+ fetch(r)
而不是 1 .+ r
。这是因为我们不知道代码将在哪里运行,因此通常可能需要 fetch
将 r
移动到执行加法的进程。在这种情况下,@spawnat
足够智能,可以在拥有 r
的进程上执行计算,因此 fetch
将是一个无操作(不执行任何工作)。
(值得注意的是,@spawnat
不是内置的,而是在 Julia 中定义为一个 宏。可以定义您自己的此类构造。)
需要记住的一件重要事情是,一旦获取,Future
将在本地缓存其值。进一步的 fetch
调用不会涉及网络跳转。一旦所有引用的 Future
都已获取,则会删除远程存储的值。
@async
类似于 @spawnat
,但仅在本地进程上运行任务。我们使用它为每个进程创建一个“馈送器”任务。每个任务选择需要计算的下一个索引,然后等待其进程完成,然后重复,直到我们用完索引。请注意,馈送器任务直到主任务到达 @sync
块的末尾才会开始执行,此时它会放弃控制权并等待所有本地任务完成,然后从函数返回。对于 v0.7 及更高版本,馈送器任务能够通过 nextidx
共享状态,因为它们都在同一进程上运行。即使 Tasks
是协作调度的,在某些情况下仍然可能需要锁定,如 异步 I/O 中所示。这意味着上下文切换仅发生在明确定义的点:在本例中,当调用 remotecall_fetch
时。这是当前的实现状态,它可能会在未来的 Julia 版本中更改,因为它旨在使其能够在 M 个 Process
上运行最多 N 个 Tasks
,即 M:N 线程。然后将需要一个用于 nextidx
的锁获取/释放模型,因为让多个进程同时读写一个资源是不安全的。
代码可用性和加载包
您的代码必须在运行它的任何进程上可用。例如,在 Julia 提示符中输入以下内容
julia> function rand2(dims...)
return 2*rand(dims...)
end
julia> rand2(2,2)
2×2 Array{Float64,2}:
0.153756 0.368514
1.15119 0.918912
julia> fetch(@spawnat :any rand2(2,2))
ERROR: RemoteException(2, CapturedException(UndefVarError(Symbol("#rand2"))
Stacktrace:
[...]
进程 1 知道函数 rand2
,但进程 2 不知道。
最常见的是,您将从文件或包中加载代码,并且您在控制哪些进程加载代码方面具有相当大的灵活性。考虑一个文件 DummyModule.jl
,其中包含以下代码
module DummyModule
export MyType, f
mutable struct MyType
a::Int
end
f(x) = x^2+1
println("loaded")
end
为了在所有进程中引用 MyType
,需要在每个进程上加载 DummyModule.jl
。调用 include("DummyModule.jl")
仅在一个进程上加载它。要在每个进程上加载它,请使用 @everywhere
宏(使用 julia -p 2
启动 Julia)
julia> @everywhere include("DummyModule.jl")
loaded
From worker 3: loaded
From worker 2: loaded
像往常一样,这不会将 DummyModule
带入任何进程的范围,这需要 using
或 import
。此外,当 DummyModule
被带入一个进程的范围时,它不在任何其他进程中
julia> using .DummyModule
julia> MyType(7)
MyType(7)
julia> fetch(@spawnat 2 MyType(7))
ERROR: On worker 2:
UndefVarError: `MyType` not defined
⋮
julia> fetch(@spawnat 2 DummyModule.MyType(7))
MyType(7)
但是,仍然可以例如将 MyType
发送到已加载 DummyModule
的进程,即使它不在作用域内
julia> put!(RemoteChannel(2), MyType(7))
RemoteChannel{Channel{Any}}(2, 1, 13)
文件也可以在启动时使用 -L
标志预加载到多个进程上,并且可以使用驱动程序脚本驱动计算
julia -p <n> -L file1.jl -L file2.jl driver.jl
在上面的示例中运行驱动程序脚本的 Julia 进程具有等于 1 的 id
,就像提供交互式提示的进程一样。
最后,如果 DummyModule.jl
不是一个独立的文件,而是一个包,那么 using DummyModule
将在所有进程上加载 DummyModule.jl
,但仅将其带入调用 using
的进程的作用域。
启动和管理工作进程
基本 Julia 安装内置支持两种类型的集群
- 如上所示,使用
-p
选项指定的本地集群。 - 使用
--machine-file
选项跨机器的集群。这使用无密码ssh
登录在指定机器上启动 Julia 工作进程(来自与当前主机相同的路径)。每个机器定义采用[count*][user@]host[:port] [bind_addr[:port]]
的形式。user
默认为当前用户,port
默认为标准 ssh 端口。count
是在节点上生成的 worker 数量,默认为 1。可选的bind-to bind_addr[:port]
指定其他 worker 应用来连接到此 worker 的 IP 地址和端口。
虽然 Julia 通常努力实现向后兼容性,但代码到工作进程的分布依赖于 Serialization.serialize
。正如相应文档中指出的那样,这不能保证在不同的 Julia 版本之间工作,因此建议所有机器上的所有 worker 使用相同的版本。
函数 addprocs
、rmprocs
、workers
等可用作添加、删除和查询集群中进程的编程方式。
julia> using Distributed
julia> addprocs(2)
2-element Array{Int64,1}:
2
3
在调用 addprocs
之前,必须在主进程上显式加载模块 Distributed
。它会在工作进程上自动可用。
请注意,worker 不会运行 ~/.julia/config/startup.jl
启动脚本,也不会将其全局状态(例如全局变量、新方法定义和加载的模块)与任何其他正在运行的进程同步。您可以使用 addprocs(exeflags="--project")
使用特定环境初始化 worker,然后使用 @everywhere using <modulename>
或 @everywhere include("file.jl")
。
其他类型的集群可以通过编写您自己的自定义 ClusterManager
来支持,如下面的 ClusterManagers 部分所述。
数据移动
发送消息和移动数据构成了分布式程序中大部分开销。减少消息数量和发送的数据量对于实现性能和可扩展性至关重要。为此,了解 Julia 的各种分布式编程结构执行的数据移动非常重要。
fetch
可以被认为是一个显式的数据移动操作,因为它直接要求将对象移动到本地机器。 @spawnat
(以及一些相关的结构)也会移动数据,但这并不明显,因此可以将其称为隐式数据移动操作。考虑这两种构建和平方随机矩阵的方法
方法 1
julia> A = rand(1000,1000);
julia> Bref = @spawnat :any A^2;
[...]
julia> fetch(Bref);
方法 2
julia> Bref = @spawnat :any rand(1000,1000)^2;
[...]
julia> fetch(Bref);
差异看起来微不足道,但实际上由于 @spawnat
的行为而非常重要。在第一种方法中,在本地构建一个随机矩阵,然后将其发送到另一个进程,在该进程中对其进行平方。在第二种方法中,在另一个进程上构建和平方一个随机矩阵。因此,第二种方法发送的数据比第一种方法少得多。
在这个玩具示例中,这两种方法很容易区分和选择。但是,在实际程序中,设计数据移动可能需要更多思考,并且可能需要一些测量。例如,如果第一个进程需要矩阵 A
,那么第一种方法可能更好。或者,如果计算 A
非常昂贵并且只有当前进程拥有它,那么将其移动到另一个进程可能是不可避免的。或者,如果当前进程在 @spawnat
和 fetch(Bref)
之间几乎没有工作要做,那么最好完全消除并行性。或者想象 rand(1000,1000)
被替换为更昂贵的操作。然后,仅为此步骤添加另一个 @spawnat
语句可能是有意义的。
全局变量
通过 @spawnat
远程执行的表达式或使用 remotecall
指定用于远程执行的闭包可能引用全局变量。与其他模块中的全局绑定相比,模块 Main
下的全局绑定处理方式略有不同。考虑以下代码片段
A = rand(10,10)
remotecall_fetch(()->sum(A), 2)
在这种情况下,必须在远程进程中定义 sum
。请注意,A
是在本地工作区中定义的全局变量。Worker 2 在 Main
下没有名为 A
的变量。将闭包 ()->sum(A)
发送到 worker 2 的行为会导致在 2 上定义 Main.A
。即使在调用 remotecall_fetch
返回后,Main.A
也会继续存在于 worker 2 上。带有嵌入式全局引用的远程调用(仅在 Main
模块下)按如下方式管理全局变量
如果作为远程调用的部分被引用,则会在目标 worker 上创建新的全局绑定。
全局常量也在远程节点上声明为常量。
全局变量仅在远程调用的上下文中重新发送到目标 worker,并且仅当其值已更改时才发送。此外,集群不会跨节点同步全局绑定。例如
A = rand(10,10) remotecall_fetch(()->sum(A), 2) # worker 2 A = rand(10,10) remotecall_fetch(()->sum(A), 3) # worker 3 A = nothing
执行上述代码段会导致 worker 2 上的
Main.A
与 worker 3 上的Main.A
具有不同的值,而节点 1 上的Main.A
的值设置为nothing
。
您可能已经意识到,当全局变量在主节点上重新分配时,与其关联的内存可能会被收集,但在 worker 上不会采取此类操作,因为绑定继续有效。 clear!
可用于在不再需要时手动将远程节点上的特定全局变量重新分配为 nothing
。这将在常规垃圾回收周期中释放与其关联的任何内存。
因此,程序在远程调用中引用全局变量时应谨慎。实际上,如果可能,最好完全避免它们。如果必须引用全局变量,请考虑使用 let
块来本地化全局变量。
例如
julia> A = rand(10,10);
julia> remotecall_fetch(()->A, 2);
julia> B = rand(10,10);
julia> let B = B
remotecall_fetch(()->B, 2)
end;
julia> @fetchfrom 2 InteractiveUtils.varinfo()
name size summary
––––––––– ––––––––– ––––––––––––––––––––––
A 800 bytes 10×10 Array{Float64,2}
Base Module
Core Module
Main Module
可以看出,全局变量 A
在 worker 2 上定义,但 B
被捕获为局部变量,因此 worker 2 上不存在 B
的绑定。
并行映射和循环
幸运的是,许多有用的并行计算不需要数据移动。一个常见的例子是蒙特卡罗模拟,其中多个进程可以同时处理独立的模拟试验。我们可以使用 @spawnat
在两个进程上掷硬币。首先,在 count_heads.jl
中编写以下函数
function count_heads(n)
c::Int = 0
for i = 1:n
c += rand(Bool)
end
c
end
函数 count_heads
只是将 n
个随机位加在一起。以下是如何在两台机器上执行一些试验并将结果加在一起
julia> @everywhere include_string(Main, $(read("count_heads.jl", String)), "count_heads.jl")
julia> a = @spawnat :any count_heads(100000000)
Future(2, 1, 6, nothing)
julia> b = @spawnat :any count_heads(100000000)
Future(3, 1, 7, nothing)
julia> fetch(a)+fetch(b)
100001564
此示例演示了一种强大且常用的并行编程模式。许多迭代在多个进程上独立运行,然后使用某些函数组合其结果。组合过程称为归约,因为它通常是张量秩减少:数字向量被归约为单个数字,或者矩阵被归约为单行或单列,等等。在代码中,这通常看起来像模式x = f(x,v[i])
,其中x
是累加器,f
是归约函数,v[i]
是被归约的元素。希望f
是结合的,以便操作执行的顺序无关紧要。
请注意,我们使用count_heads
的这种模式可以进行泛化。我们使用了两个显式的@spawnat
语句,这将并行性限制在两个进程上。要在任意数量的进程上运行,我们可以使用分布式内存中的并行 for 循环,这可以使用 Julia 中的@distributed
编写,如下所示
nheads = @distributed (+) for i = 1:200000000
Int(rand(Bool))
end
此构造实现了将迭代分配给多个进程并使用指定的归约(在本例中为(+)
)进行组合的模式。每次迭代的结果被视为循环内最后一个表达式的值。整个并行循环表达式本身会计算出最终答案。
请注意,尽管并行 for 循环看起来像串行 for 循环,但它们的执行方式却大不相同。特别是,迭代不会以指定的顺序发生,并且由于迭代在不同的进程上运行,因此对变量或数组的写入不会全局可见。并行循环内部使用的任何变量都将被复制并广播到每个进程。
例如,以下代码将无法按预期工作
a = zeros(100000)
@distributed for i = 1:100000
a[i] = i
end
此代码不会初始化所有a
,因为每个进程都会拥有其独立的副本。必须避免使用此类并行 for 循环。幸运的是,共享数组可用于解决此限制
using SharedArrays
a = SharedArray{Float64}(10)
@distributed for i = 1:10
a[i] = i
end
如果变量是只读的,则在并行循环中使用“外部”变量是完全合理的
a = randn(1000)
@distributed (+) for i = 1:100000
f(a[rand(1:end)])
end
这里,每次迭代都会将f
应用于从所有进程共享的向量a
中随机选择的样本。
如您所见,如果不需要归约运算符,则可以省略它。在这种情况下,循环会异步执行,即它在所有可用的工作进程上生成独立的任务,并立即返回一个Future
数组,而无需等待完成。调用者可以通过对它们调用fetch
来等待Future
完成,或者通过在循环前面加上@sync
(例如@sync @distributed for
)来等待循环结束时完成。
在某些情况下,不需要归约运算符,我们只是希望将函数应用于某个范围内的所有整数(或者更一般地,应用于某个集合中的所有元素)。这是另一个有用的操作,称为并行映射,在 Julia 中由pmap
函数实现。例如,我们可以并行计算几个大型随机矩阵的奇异值,如下所示
julia> M = Matrix{Float64}[rand(1000,1000) for i = 1:10];
julia> pmap(svdvals, M);
Julia 的pmap
旨在用于每次函数调用执行大量工作的情况。相反,@distributed for
可以处理每次迭代都很小的情况,也许只是将两个数字相加。对于并行计算,pmap
和@distributed for
都只使用工作进程。在@distributed for
的情况下,最终的归约是在调用进程上完成的。
远程引用和抽象通道
远程引用始终引用AbstractChannel
的实现。
AbstractChannel
(如Channel
)的具体实现需要实现put!
、take!
、fetch
、isready
和wait
。Future
引用的远程对象存储在Channel{Any}(1)
中,即大小为 1 且能够保存Any
类型对象的Channel
。
RemoteChannel
是可重写的,可以指向任何类型和大小的通道,或AbstractChannel
的任何其他实现。
构造函数RemoteChannel(f::Function, pid)()
允许我们构造对包含多个特定类型值的通道的引用。f
是在pid
上执行的函数,它必须返回一个AbstractChannel
。
例如,RemoteChannel(()->Channel{Int}(10), pid)
将返回对类型为Int
且大小为 10 的通道的引用。该通道存在于工作进程pid
上。
put!
、take!
、fetch
、isready
和wait
在RemoteChannel
上的方法被代理到远程进程上的后备存储上。
RemoteChannel
因此可用于引用用户实现的AbstractChannel
对象。示例存储库中的dictchannel.jl
提供了一个简单的示例,该示例使用字典作为其远程存储。
通道和远程通道
Channel
对于某个进程是本地的。工作进程 2 无法直接引用工作进程 3 上的Channel
,反之亦然。RemoteChannel
可以跨工作进程放置和获取值。RemoteChannel
可以被认为是Channel
的句柄。- 与
RemoteChannel
关联的进程 IDpid
标识后备存储(即后备Channel
)所在的进程。 - 任何拥有对
RemoteChannel
的引用的进程都可以从通道中放置和获取项目。数据会自动发送到(或从)与RemoteChannel
关联的进程。 - 序列化
Channel
还会序列化通道中存在的任何数据。因此,反序列化实际上会创建原始对象的副本。 - 另一方面,序列化
RemoteChannel
仅涉及序列化标识符,该标识符标识句柄引用的Channel
的位置和实例。因此,反序列化的RemoteChannel
对象(在任何工作进程上)也指向与原始对象相同的后备存储。
上面通道的示例可以修改为进行进程间通信,如下所示。
我们启动 4 个工作进程来处理单个jobs
远程通道。通过 ID(job_id
)标识的任务被写入通道。在此模拟中,每个远程执行的任务都会读取一个job_id
,等待随机时间,并将job_id
、花费的时间及其自己的pid
的元组写回结果通道。最后,所有results
都会在主进程上打印出来。
julia> addprocs(4); # add worker processes
julia> const jobs = RemoteChannel(()->Channel{Int}(32));
julia> const results = RemoteChannel(()->Channel{Tuple}(32));
julia> @everywhere function do_work(jobs, results) # define work function everywhere
while true
job_id = take!(jobs)
exec_time = rand()
sleep(exec_time) # simulates elapsed time doing actual work
put!(results, (job_id, exec_time, myid()))
end
end
julia> function make_jobs(n)
for i in 1:n
put!(jobs, i)
end
end;
julia> n = 12;
julia> errormonitor(@async make_jobs(n)); # feed the jobs channel with "n" jobs
julia> for p in workers() # start tasks on the workers to process requests in parallel
remote_do(do_work, p, jobs, results)
end
julia> @elapsed while n > 0 # print out results
job_id, exec_time, where = take!(results)
println("$job_id finished in $(round(exec_time; digits=2)) seconds on worker $where")
global n = n - 1
end
1 finished in 0.18 seconds on worker 4
2 finished in 0.26 seconds on worker 5
6 finished in 0.12 seconds on worker 4
7 finished in 0.18 seconds on worker 4
5 finished in 0.35 seconds on worker 5
4 finished in 0.68 seconds on worker 2
3 finished in 0.73 seconds on worker 3
11 finished in 0.01 seconds on worker 3
12 finished in 0.02 seconds on worker 3
9 finished in 0.26 seconds on worker 5
8 finished in 0.57 seconds on worker 4
10 finished in 0.58 seconds on worker 2
0.055971741
远程引用和分布式垃圾回收
只有当集群中所有持有的引用都被删除时,才能释放远程引用引用的对象。
存储值的节点会跟踪哪些工作进程引用了它。每次将RemoteChannel
或(未获取的)Future
序列化到工作进程时,引用指向的节点都会收到通知。每次在本地垃圾回收RemoteChannel
或(未获取的)Future
时,拥有该值的节点也会收到通知。这是在内部集群感知序列化器中实现的。远程引用仅在正在运行的集群的上下文中有效。不支持将引用序列化和反序列化到和从常规IO
对象中。
通知是通过发送“跟踪”消息完成的——当引用序列化到不同的进程时发送“添加引用”消息,当引用在本地垃圾回收时发送“删除引用”消息。
由于Future
是单次写入并在本地缓存的,因此获取Future
的行为也会更新拥有该值的节点上的引用跟踪信息。
拥有该值的节点会在所有对它的引用都被清除后释放它。
对于Future
,将已获取的Future
序列化到不同的节点也会发送该值,因为原始远程存储可能已在此之前收集了该值。
需要注意的是,对象在本地垃圾回收的时间取决于对象的大小和系统中的当前内存压力。
在远程引用的情况下,本地引用对象的大小非常小,而远程节点上存储的值可能非常大。由于本地对象可能不会立即被回收,因此最好显式地对RemoteChannel
的本地实例或未获取的Future
调用finalize
。由于对Future
调用fetch
也会从远程存储中删除其引用,因此在已获取的Future
上不需要这样做。显式调用finalize
会导致立即向远程节点发送消息,以便继续并删除其对该值的引用。
完成最终化后,引用将变得无效,并且不能用于任何进一步的调用。
本地调用
数据必然会被复制到远程节点以执行。对于远程调用以及将数据存储到不同节点上的RemoteChannel
/ Future
,都是如此。正如预期的那样,这会导致远程节点上出现序列化对象的副本。但是,当目标节点是本地节点时,即调用进程 ID 与远程节点 ID 相同,则会将其作为本地调用执行。它通常(并非总是)在不同的任务中执行 - 但不会进行数据的序列化/反序列化。因此,该调用引用传递的相同对象实例 - 不会创建副本。此行为在下面突出显示
julia> using Distributed;
julia> rc = RemoteChannel(()->Channel(3)); # RemoteChannel created on local node
julia> v = [0];
julia> for i in 1:3
v[1] = i # Reusing `v`
put!(rc, v)
end;
julia> result = [take!(rc) for _ in 1:3];
julia> println(result);
Array{Int64,1}[[3], [3], [3]]
julia> println("Num Unique objects : ", length(unique(map(objectid, result))));
Num Unique objects : 1
julia> addprocs(1);
julia> rc = RemoteChannel(()->Channel(3), workers()[1]); # RemoteChannel created on remote node
julia> v = [0];
julia> for i in 1:3
v[1] = i
put!(rc, v)
end;
julia> result = [take!(rc) for _ in 1:3];
julia> println(result);
Array{Int64,1}[[1], [2], [3]]
julia> println("Num Unique objects : ", length(unique(map(objectid, result))));
Num Unique objects : 3
可以看到,在本地拥有的RemoteChannel
上使用相同对象v
(在调用之间修改)进行put!
操作会导致存储同一个对象实例。而当拥有rc
的节点是不同的节点时,则会创建v
的副本。
需要注意的是,这通常不是问题。只有当对象同时在本地存储并在调用后修改时,才需要考虑它。在这种情况下,存储对象的deepcopy
可能是合适的。
对于本地节点上的远程调用,情况也是如此,如下例所示
julia> using Distributed; addprocs(1);
julia> v = [0];
julia> v2 = remotecall_fetch(x->(x[1] = 1; x), myid(), v); # Executed on local node
julia> println("v=$v, v2=$v2, ", v === v2);
v=[1], v2=[1], true
julia> v = [0];
julia> v2 = remotecall_fetch(x->(x[1] = 1; x), workers()[1], v); # Executed on remote node
julia> println("v=$v, v2=$v2, ", v === v2);
v=[0], v2=[1], false
再次可以看到,对本地节点的远程调用行为就像直接调用一样。该调用修改作为参数传递的本地对象。在远程调用中,它对参数的副本进行操作。
重申一遍,一般来说这不是问题。如果本地节点也用作计算节点,并且在调用后使用参数,则需要考虑此行为,如果需要,必须将参数的深拷贝传递给在本地节点上调用的调用。远程节点上的调用将始终对参数的副本进行操作。
共享数组
共享数组使用系统共享内存来映射跨多个进程的相同数组。虽然与DArray
有一些相似之处,但SharedArray
的行为却大不相同。在DArray
中,每个进程仅本地访问数据的一部分,并且没有两个进程共享相同的块;相反,在SharedArray
中,每个“参与”进程都可以访问整个数组。SharedArray
是一个不错的选择,当您希望在同一台机器上的两个或多个进程之间联合访问大量数据时。
共享数组支持通过模块SharedArrays
提供,必须在所有参与的工作节点上显式加载。
SharedArray
的索引(赋值和访问值)与普通数组的工作方式相同,并且效率很高,因为底层内存对本地进程可用。因此,大多数算法自然可以在SharedArray
上工作,尽管是在单进程模式下。在算法坚持要求Array
输入的情况下,可以通过调用sdata
从SharedArray
中检索底层数组。对于其他AbstractArray
类型,sdata
只是返回对象本身,因此在任何Array
类型对象上使用sdata
都是安全的。
共享数组的构造函数形式如下
SharedArray{T,N}(dims::NTuple; init=false, pids=Int[])
它会在指定的进程pids
中创建一个N
维共享数组,其位类型为T
,大小为dims
。与分布式数组不同,共享数组只能从由pids
命名参数指定的参与工作节点(以及创建进程,如果它在同一主机上)访问。请注意,只有isbits
的元素才支持在SharedArray中。
如果指定了签名为initfn(S::SharedArray)
的init
函数,则会在所有参与的工作节点上调用它。您可以指定每个工作节点在数组的不同部分上运行init
函数,从而使初始化并行化。
这是一个简短的示例
julia> using Distributed
julia> addprocs(3)
3-element Array{Int64,1}:
2
3
4
julia> @everywhere using SharedArrays
julia> S = SharedArray{Int,2}((3,4), init = S -> S[localindices(S)] = repeat([myid()], length(localindices(S))))
3×4 SharedArray{Int64,2}:
2 2 3 4
2 3 3 4
2 3 4 4
julia> S[3,2] = 7
7
julia> S
3×4 SharedArray{Int64,2}:
2 2 3 4
2 3 3 4
2 7 4 4
SharedArrays.localindices
提供不重叠的一维索引范围,有时对于在进程之间拆分任务很方便。当然,您可以根据需要划分工作
julia> S = SharedArray{Int,2}((3,4), init = S -> S[indexpids(S):length(procs(S)):length(S)] = repeat([myid()], length( indexpids(S):length(procs(S)):length(S))))
3×4 SharedArray{Int64,2}:
2 2 2 2
3 3 3 3
4 4 4 4
由于所有进程都可以访问底层数据,因此您必须小心避免设置冲突。例如
@sync begin
for p in procs(S)
@async begin
remotecall_wait(fill!, p, S, p)
end
end
end
会导致未定义的行为。因为每个进程都用自己的pid
填充整个数组,所以最后一个执行(对于S
的任何特定元素)的进程将保留其pid
。
作为一个更扩展和复杂的例子,考虑并行运行以下“内核”
q[i,j,t+1] = q[i,j,t] + u[i,j,t]
在这种情况下,如果我们尝试使用一维索引来划分工作,则很可能会遇到麻烦:如果q[i,j,t]
接近分配给一个工作节点的块的末尾,而q[i,j,t+1]
接近分配给另一个工作节点的块的开头,则q[i,j,t]
很可能在计算q[i,j,t+1]
时尚未准备好。在这种情况下,最好手动将数组分成块。让我们沿着第二维进行拆分。定义一个返回分配给此工作节点的(irange, jrange)
索引的函数
julia> @everywhere function myrange(q::SharedArray)
idx = indexpids(q)
if idx == 0 # This worker is not assigned a piece
return 1:0, 1:0
end
nchunks = length(procs(q))
splits = [round(Int, s) for s in range(0, stop=size(q,2), length=nchunks+1)]
1:size(q,1), splits[idx]+1:splits[idx+1]
end
接下来,定义内核
julia> @everywhere function advection_chunk!(q, u, irange, jrange, trange)
@show (irange, jrange, trange) # display so we can see what's happening
for t in trange, j in jrange, i in irange
q[i,j,t+1] = q[i,j,t] + u[i,j,t]
end
q
end
我们还为SharedArray
实现定义了一个便利的包装器
julia> @everywhere advection_shared_chunk!(q, u) =
advection_chunk!(q, u, myrange(q)..., 1:size(q,3)-1)
现在让我们比较三个不同的版本,一个在单个进程中运行
julia> advection_serial!(q, u) = advection_chunk!(q, u, 1:size(q,1), 1:size(q,2), 1:size(q,3)-1);
一个使用@distributed
julia> function advection_parallel!(q, u)
for t = 1:size(q,3)-1
@sync @distributed for j = 1:size(q,2)
for i = 1:size(q,1)
q[i,j,t+1]= q[i,j,t] + u[i,j,t]
end
end
end
q
end;
以及一个按块委派的任务
julia> function advection_shared!(q, u)
@sync begin
for p in procs(q)
@async remotecall_wait(advection_shared_chunk!, p, q, u)
end
end
q
end;
如果我们创建SharedArray
并计时这些函数,我们将获得以下结果(使用julia -p 4
)
julia> q = SharedArray{Float64,3}((500,500,500));
julia> u = SharedArray{Float64,3}((500,500,500));
运行这些函数一次以进行JIT编译,并在第二次运行时@time
它们
julia> @time advection_serial!(q, u);
(irange,jrange,trange) = (1:500,1:500,1:499)
830.220 milliseconds (216 allocations: 13820 bytes)
julia> @time advection_parallel!(q, u);
2.495 seconds (3999 k allocations: 289 MB, 2.09% gc time)
julia> @time advection_shared!(q,u);
From worker 2: (irange,jrange,trange) = (1:500,1:125,1:499)
From worker 4: (irange,jrange,trange) = (1:500,251:375,1:499)
From worker 3: (irange,jrange,trange) = (1:500,126:250,1:499)
From worker 5: (irange,jrange,trange) = (1:500,376:500,1:499)
238.119 milliseconds (2264 allocations: 169 KB)
advection_shared!
的最大优势在于它最大程度地减少了工作节点之间的通信,允许每个工作节点在其分配的部分上进行长时间计算。
共享数组和分布式垃圾回收
与远程引用一样,共享数组也依赖于创建节点上的垃圾回收来释放来自所有参与工作节点的引用。创建许多短暂的共享数组对象的代码将受益于尽快显式完成这些对象。这会导致映射共享段的内存和文件句柄更快地释放。
集群管理器
Julia 进程到逻辑集群的启动、管理和联网是通过集群管理器完成的。ClusterManager
负责
- 在集群环境中启动工作节点进程
- 管理每个工作节点生命周期中的事件
- 可选地,提供数据传输
Julia 集群具有以下特征
- 初始 Julia 进程,也称为
master
,是特殊的,并且具有id
为 1。 - 只有
master
进程可以添加或删除工作节点进程。 - 所有进程都可以直接相互通信。
工作节点之间(使用内置的 TCP/IP 传输)的连接以以下方式建立
- 在主进程上使用
ClusterManager
对象调用addprocs
。 addprocs
调用相应的launch
方法,该方法在适当的机器上生成所需数量的工作节点进程。- 每个工作节点开始在空闲端口上监听,并将主机和端口信息写入
stdout
。 - 集群管理器捕获每个工作节点的
stdout
并将其提供给主进程。 - 主进程解析此信息并建立到每个工作节点的 TCP/IP 连接。
- 每个工作节点也会收到集群中其他工作节点的通知。
- 每个工作节点连接到所有
id
小于工作节点自身id
的工作节点。 - 这样就建立了一个网状网络,其中每个工作节点都直接连接到其他每个工作节点。
虽然默认的传输层使用普通的TCPSocket
,但 Julia 集群可以提供自己的传输。
Julia 提供了两个内置的集群管理器
LocalManager
,当调用addprocs()
或addprocs(np::Integer)
时使用SSHManager
,当调用addprocs(hostnames::Array)
并传入主机名列表时使用
LocalManager
用于在同一主机上启动其他工作节点,从而利用多核和多处理器硬件。
因此,一个最小的集群管理器需要
addprocs(manager::FooManager)
要求FooManager
实现
function launch(manager::FooManager, params::Dict, launched::Array, c::Condition)
[...]
end
function manage(manager::FooManager, id::Integer, config::WorkerConfig, op::Symbol)
[...]
end
例如,让我们看看负责在同一主机上启动工作节点的管理器LocalManager
是如何实现的
struct LocalManager <: ClusterManager
np::Integer
end
function launch(manager::LocalManager, params::Dict, launched::Array, c::Condition)
[...]
end
function manage(manager::LocalManager, id::Integer, config::WorkerConfig, op::Symbol)
[...]
end
launch
方法接受以下参数
manager::ClusterManager
:调用addprocs
的集群管理器params::Dict
:传递给addprocs
的所有关键字参数launched::Array
:要向其中追加一个或多个WorkerConfig
对象的数组c::Condition
:在启动工作节点时要通知的条件变量
launch
方法在单独的任务中异步调用。此任务的终止表示所有请求的工作节点都已启动。因此,launch
函数必须在所有请求的工作节点启动后立即退出。
新启动的工作节点以全连接方式相互连接以及与主进程连接。指定命令行参数--worker[=<cookie>]
会导致启动的进程将自身初始化为工作节点,并通过 TCP/IP 套接字建立连接。
集群中的所有工作节点与主节点共享相同的cookie。当 cookie 未指定时,即使用--worker
选项时,工作节点会尝试从其标准输入中读取它。LocalManager
和SSHManager
都通过其标准输入将 cookie 传递给新启动的工作节点。
默认情况下,工作节点将在调用getipaddr()
返回的地址上的空闲端口上监听。可以通过可选参数--bind-to bind_addr[:port]
指定要监听的特定地址。这对于多宿主主机很有用。
作为非 TCP/IP 传输的示例,实现可以选择使用 MPI,在这种情况下,必须不指定--worker
。相反,新启动的工作节点应在使用任何并行构造之前调用init_worker(cookie)
。
对于启动的每个工作进程,launch
方法必须向 launched
添加一个 WorkerConfig
对象(并初始化相应的字段)。
mutable struct WorkerConfig
# Common fields relevant to all cluster managers
io::Union{IO, Nothing}
host::Union{AbstractString, Nothing}
port::Union{Integer, Nothing}
# Used when launching additional workers at a host
count::Union{Int, Symbol, Nothing}
exename::Union{AbstractString, Cmd, Nothing}
exeflags::Union{Cmd, Nothing}
# External cluster managers can use this to store information at a per-worker level
# Can be a dict if multiple fields need to be stored.
userdata::Any
# SSHManager / SSH tunnel connections to workers
tunnel::Union{Bool, Nothing}
bind_addr::Union{AbstractString, Nothing}
sshflags::Union{Cmd, Nothing}
max_parallel::Union{Integer, Nothing}
# Used by Local/SSH managers
connect_at::Any
[...]
end
WorkerConfig
中的大多数字段由内置管理器使用。自定义集群管理器通常只需要指定 io
或 host
/port
。
如果指定了
io
,则用于读取主机/端口信息。Julia 工作进程在启动时会打印出其绑定地址和端口。这允许 Julia 工作进程监听任何可用的空闲端口,而无需手动配置工作进程端口。如果没有指定
io
,则使用host
和port
进行连接。count
、exename
和exeflags
与从工作进程启动其他工作进程相关。例如,集群管理器可以在每个节点上启动一个工作进程,并使用它来启动其他工作进程。count
的整数值n
将启动总共n
个工作进程。count
的值为:auto
将启动与该机器上的 CPU 线程数(逻辑核心数)相同数量的工作进程。exename
是julia
可执行文件的名称,包括完整路径。exeflags
应设置为新工作进程所需的命令行参数。
tunnel
、bind_addr
、sshflags
和max_parallel
用于当需要 SSH 隧道从主进程连接到工作进程时。userdata
用于自定义集群管理器存储其自己的工作进程特定信息。
manage(manager::FooManager, id::Integer, config::WorkerConfig, op::Symbol)
在工作进程生命周期的不同时间被调用,并使用相应的 op
值。
- 当工作进程被添加到或从 Julia 工作进程池中移除时,使用
:register
/:deregister
。 - 当调用
interrupt(workers)
时,使用:interrupt
。ClusterManager
应该向相应的工作进程发送中断信号。 - 出于清理目的,使用
:finalize
。
具有自定义传输的集群管理器
用自定义传输层替换默认的 TCP/IP 全连接有点复杂。每个 Julia 进程都有与之连接的工作进程数量的通信任务。例如,考虑一个在全连接网状网络中拥有 32 个进程的 Julia 集群。
- 因此,每个 Julia 进程都有 31 个通信任务。
- 每个任务在消息处理循环中处理来自单个远程工作进程的所有传入消息。
- 消息处理循环等待一个
IO
对象(例如,在默认实现中为TCPSocket
),读取整个消息,处理它并等待下一个消息。 - 发送消息到进程可以直接从任何 Julia 任务完成——不仅仅是通信任务——同样,通过相应的
IO
对象。
替换默认传输需要新实现建立到远程工作进程的连接,并提供消息处理循环可以等待的相应的 IO
对象。需要实现的管理器特定回调为:
connect(manager::FooManager, pid::Integer, config::WorkerConfig)
kill(manager::FooManager, pid::Int, config::WorkerConfig)
默认实现(使用 TCP/IP 套接字)实现为 connect(manager::ClusterManager, pid::Integer, config::WorkerConfig)
。
connect
应该返回一对 IO
对象,一个用于读取从工作进程 pid
发送的数据,另一个用于写入需要发送到工作进程 pid
的数据。自定义集群管理器可以使用内存中的 BufferStream
作为管道,在自定义的(可能是非 IO
)传输和 Julia 的内置并行基础设施之间代理数据。
BufferStream
是一个内存中的 IOBuffer
,其行为类似于 IO
——它是一个可以异步处理的流。
示例库 中的 clustermanager/0mq
文件夹包含了一个使用 ZeroMQ 将 Julia 工作进程连接到星型拓扑结构的示例,中间有一个 0MQ 代理。注意:Julia 进程仍然全部逻辑上相互连接——任何工作进程都可以直接向任何其他工作进程发送消息,而无需了解 0MQ 作为传输层被使用。
当使用自定义传输时:
- Julia 工作进程绝不能使用
--worker
启动。使用--worker
启动会导致新启动的工作进程默认为 TCP/IP 套接字传输实现。 - 对于每个传入的工作进程逻辑连接,必须调用
Base.process_messages(rd::IO, wr::IO)()
。这会启动一个新任务,处理来自/到由IO
对象表示的工作进程的消息的读取和写入。 init_worker(cookie, manager::FooManager)
必须作为工作进程初始化的一部分被调用。- 当调用
launch
时,集群管理器可以设置WorkerConfig
中的connect_at::Any
字段。此字段的值传递给所有connect
回调。通常,它包含有关如何连接到工作进程的信息。例如,TCP/IP 套接字传输使用此字段来指定用于连接到工作进程的(host, port)
元组。
kill(manager, pid, config)
用于从集群中移除工作进程。在主进程上,实现必须关闭相应的 IO
对象以确保正确清理。默认实现只是在指定的远程工作进程上执行 exit()
调用。
clustermanager/simple
示例文件夹展示了一个使用 UNIX 域套接字进行集群设置的简单实现示例。
LocalManager 和 SSHManager 的网络需求
Julia 集群旨在在诸如本地笔记本电脑、部门集群甚至云等已安全的环境中执行。本节介绍内置 LocalManager
和 SSHManager
的网络安全需求。
主进程不会监听任何端口。它只会连接到工作进程。
每个工作进程只绑定到本地接口之一,并在操作系统分配的短暂端口号上监听。
LocalManager
(由addprocs(N)
使用)默认情况下仅绑定到环回接口。这意味着后来在远程主机上启动的工作进程(或任何怀有恶意意图的人)都无法连接到集群。addprocs(4)
后跟addprocs(["remote_host"])
将失败。某些用户可能需要创建一个包含其本地系统和几个远程系统的集群。这可以通过显式请求LocalManager
通过restrict
关键字参数绑定到外部网络接口来完成:addprocs(4; restrict=false)
。SSHManager
(由addprocs(list_of_remote_hosts)
使用)通过 SSH 在远程主机上启动工作进程。默认情况下,SSH 仅用于启动 Julia 工作进程。后续的主工作进程和工作进程之间的连接使用普通的、未加密的 TCP/IP 套接字。远程主机必须启用免密码登录。其他 SSH 标志或凭据可以通过关键字参数sshflags
指定。addprocs(list_of_remote_hosts; tunnel=true, sshflags=<ssh keys and other flags>)
在我们希望将 SSH 连接也用于主工作进程时很有用。这种情况的一个典型场景是运行 Julia REPL(即主进程)的本地笔记本电脑,其余集群位于云端,例如 Amazon EC2。在这种情况下,只需要在远程集群上打开端口 22,并通过公钥基础设施 (PKI) 进行 SSH 客户端身份验证。身份验证凭据可以通过sshflags
提供,例如sshflags=`-i <keyfile>`
。在全连接拓扑结构(默认)中,所有工作进程都通过普通 TCP 套接字相互连接。因此,集群节点上的安全策略必须确保工作进程之间在短暂端口范围内(因操作系统而异)的自由连接。
可以通过自定义
ClusterManager
来保护和加密所有工作进程之间的流量(通过 SSH)或加密单个消息。如果将
multiplex=true
作为addprocs
的选项指定,则使用 SSH 多路复用在主进程和工作进程之间创建隧道。如果已自行配置 SSH 多路复用并且连接已建立,则无论multiplex
选项如何,都会使用 SSH 多路复用。如果启用了多路复用,则通过使用现有连接(ssh 中的-O forward
选项)设置转发。如果您的服务器需要密码身份验证,这将非常有用;您可以通过在addprocs
之前登录到服务器来避免在 Julia 中进行身份验证。控制套接字将在会话期间位于~/.ssh/julia-%r@%h:%p
,除非使用现有的多路复用连接。请注意,如果您在一个节点上创建多个进程并启用多路复用,则带宽可能会受到限制,因为在这种情况下,进程共享一个多路复用 TCP 连接。
集群 Cookie
集群中的所有进程共享相同的 Cookie,默认情况下,该 Cookie 是在主进程上随机生成的字符串。
cluster_cookie()
返回 Cookie,而cluster_cookie(cookie)()
设置 Cookie 并返回新的 Cookie。- 所有连接都在两端进行身份验证,以确保只有由主进程启动的工作进程才能相互连接。
- 可以通过参数
--worker=<cookie>
在启动时将 Cookie 传递给工作进程。如果指定了--worker
参数但没有 Cookie,则工作进程会尝试从其标准输入 (stdin
) 读取 Cookie。检索到 Cookie 后,stdin
会立即关闭。 ClusterManager
可以通过调用cluster_cookie()
在主进程上检索 Cookie。不使用默认 TCP/IP 传输(因此不指定--worker
)的集群管理器必须使用与主进程相同的 Cookie 调用init_worker(cookie, manager)
。
请注意,需要更高安全级别的环境可以通过自定义 ClusterManager
实现此功能。例如,可以预先共享 Cookie,因此无需将其作为启动参数指定。
指定网络拓扑结构(实验性)
传递给 addprocs
的关键字参数 topology
用于指定工作进程如何相互连接。
:all_to_all
,默认值:所有工作进程都相互连接。:master_worker
:只有驱动程序进程,即pid
1,与工作进程有连接。:custom
:集群管理器的launch
方法通过WorkerConfig
中的ident
和connect_idents
字段指定连接拓扑结构。具有集群管理器提供的标识ident
的工作进程将连接到connect_idents
中指定的所有工作进程。
关键字参数lazy=true|false
仅影响topology
选项:all_to_all
。如果为true
,则集群从主节点连接到所有工作节点开始。两个工作节点之间的第一次远程调用时建立特定的工作节点间连接。这有助于减少为集群内通信分配的初始资源。连接的建立取决于并行程序的运行时需求。lazy
的默认值为true
。
目前,在未连接的工作节点之间发送消息会导致错误。此行为以及功能和接口应被视为实验性,可能会在将来的版本中发生变化。
值得注意的外部包
除了 Julia 并行性之外,还有许多值得一提的外部包。例如,MPI.jl 是 MPI
协议的 Julia 包装器,Dagger.jl 提供类似于 Python 的 Dask 的功能,而 DistributedArrays.jl 提供分布在工作节点上的数组操作,如共享数组中所述。
必须提及 Julia 的 GPU 编程生态系统,其中包括
CUDA.jl 包装了各种 CUDA 库,并支持为 Nvidia GPU 编译 Julia 内核。
oneAPI.jl 包装了 oneAPI 统一编程模型,并支持在受支持的加速器上执行 Julia 内核。目前仅支持 Linux。
AMDGPU.jl 包装了 AMD ROCm 库,并支持为 AMD GPU 编译 Julia 内核。目前仅支持 Linux。
高级库,例如 KernelAbstractions.jl、Tullio.jl 和 ArrayFire.jl。
在以下示例中,我们将使用 DistributedArrays.jl
和 CUDA.jl
将数组分布到多个进程中,方法是首先通过 distribute()
和 CuArray()
进行转换。
请记住,在导入 DistributedArrays.jl
时,需要使用 @everywhere
在所有进程中导入它。
$ ./julia -p 4
julia> addprocs()
julia> @everywhere using DistributedArrays
julia> using CUDA
julia> B = ones(10_000) ./ 2;
julia> A = ones(10_000) .* π;
julia> C = 2 .* A ./ B;
julia> all(C .≈ 4*π)
true
julia> typeof(C)
Array{Float64,1}
julia> dB = distribute(B);
julia> dA = distribute(A);
julia> dC = 2 .* dA ./ dB;
julia> all(dC .≈ 4*π)
true
julia> typeof(dC)
DistributedArrays.DArray{Float64,1,Array{Float64,1}}
julia> cuB = CuArray(B);
julia> cuA = CuArray(A);
julia> cuC = 2 .* cuA ./ cuB;
julia> all(cuC .≈ 4*π);
true
julia> typeof(cuC)
CuArray{Float64,1}
在以下示例中,我们将使用 DistributedArrays.jl
和 CUDA.jl
将数组分布到多个进程中,并在其上调用通用函数。
function power_method(M, v)
for i in 1:100
v = M*v
v /= norm(v)
end
return v, norm(M*v) / norm(v) # or (M*v) ./ v
end
power_method
重复创建一个新向量并对其进行归一化。我们在函数声明中没有指定任何类型签名,让我们看看它是否适用于上述数据类型。
julia> M = [2. 1; 1 1];
julia> v = rand(2)
2-element Array{Float64,1}:
0.40395
0.445877
julia> power_method(M,v)
([0.850651, 0.525731], 2.618033988749895)
julia> cuM = CuArray(M);
julia> cuv = CuArray(v);
julia> curesult = power_method(cuM, cuv);
julia> typeof(curesult)
CuArray{Float64,1}
julia> dM = distribute(M);
julia> dv = distribute(v);
julia> dC = power_method(dM, dv);
julia> typeof(dC)
Tuple{DistributedArrays.DArray{Float64,1,Array{Float64,1}},Float64}
为了结束对外部包的简短介绍,我们可以考虑 MPI.jl
,它是 MPI 协议的 Julia 包装器。由于考虑每个内部函数都需要花费太长时间,因此最好简单地欣赏实现该协议所使用的方法。
考虑这个玩具脚本,它只是调用每个子进程,实例化其秩,并在到达主进程时执行秩的总和。
import MPI
MPI.Init()
comm = MPI.COMM_WORLD
MPI.Barrier(comm)
root = 0
r = MPI.Comm_rank(comm)
sr = MPI.Reduce(r, MPI.SUM, root, comm)
if(MPI.Comm_rank(comm) == root)
@printf("sum of ranks: %s\n", sr)
end
MPI.Finalize()
mpirun -np 4 ./julia example.jl
- 1在此上下文中,MPI 指的是 MPI-1 标准。从 MPI-2 开始,MPI 标准委员会引入了一组新的通信机制,统称为远程内存访问 (RMA)。添加 RMA 到 MPI 标准的动机是为了促进单向通信模式。有关最新 MPI 标准的更多信息,请参见 https://mpi-forum.org/docs。