分布式计算
用于分布式并行处理的工具。
Distributed.addprocs — 函数addprocs(manager::ClusterManager; kwargs...) -> List of process identifiers通过指定的集群管理器启动工作进程。
例如,Beowulf 集群通过在 ClusterManagers.jl 包中实现的自定义集群管理器来支持。
新启动的工作进程等待主进程建立连接的时间(以秒为单位)可以通过工作进程环境中的变量 JULIA_WORKER_TIMEOUT 指定。仅在使用 TCP/IP 作为传输协议时相关。
为了在不阻塞 REPL 的情况下启动工作进程,或者在以编程方式启动工作进程时在包含函数中启动工作进程,请在自己的任务中执行 addprocs。
示例
# On busy clusters, call `addprocs` asynchronously
t = @async addprocs(...)# Utilize workers as and when they come online
if nprocs() > 1 # Ensure at least one new worker is available
.... # perform distributed execution
end# Retrieve newly launched worker IDs, or any error messages
if istaskdone(t) # Check if `addprocs` has completed to ensure `fetch` doesn't block
if nworkers() == N
new_pids = fetch(t)
else
fetch(t)
end
endaddprocs(machines; tunnel=false, sshflags=``, max_parallel=10, kwargs...) -> List of process identifiers通过 SSH 在远程机器上添加工作进程。配置通过关键字参数完成(见下文)。特别是,exename 关键字可用于指定远程机器上的 julia 二进制文件的路径。
machines 是一个“机器规范”向量,这些规范以以下格式给出:[user@]host[:port] [bind_addr[:port]]。user 默认设置为当前用户,port 默认设置为标准 SSH 端口。如果指定了 [bind_addr[:port]],则其他工作进程将通过指定的 bind_addr 和 port 连接到此工作进程。
通过在 machines 向量中使用元组或 (machine_spec, count) 形式,可以在远程主机上启动多个进程,其中 count 是要启动的进程数。将 :auto 作为工作进程数量传递将启动与远程主机上的 CPU 线程数量相同的进程数。
示例:
addprocs([
"remote1", # one worker on 'remote1' logging in with the current username
"user@remote2", # one worker on 'remote2' logging in with the 'user' username
"user@remote3:2222", # specifying SSH port to '2222' for 'remote3'
("user@remote4", 4), # launch 4 workers on 'remote4'
("user@remote5", :auto), # launch as many workers as CPU threads on 'remote5'
])关键字参数:
tunnel:如果为true,则将使用 SSH 隧道从主进程连接到工作进程。默认值为false。multiplex:如果为true,则将使用 SSH 多路复用进行 SSH 隧道。默认值为false。ssh:用于启动工作进程的 SSH 客户端可执行文件的名称或路径。默认值为"ssh"。sshflags:指定额外的 ssh 选项,例如sshflags=`-i /home/foo/bar.pem`max_parallel:指定在主机上并行连接的最大工作进程数。默认值为 10。shell:指定 ssh 连接到的工作进程的 shell 类型。shell=:posix:一个与 POSIX 兼容的 Unix/Linux shell (sh, ksh, bash, dash, zsh, 等)。默认值。shell=:csh:一个 Unix C shell (csh, tcsh)。shell=:wincmd:Microsoft Windowscmd.exe。
dir:指定工作进程上的工作目录。默认设置为主机当前目录(由pwd()查找)。enable_threaded_blas:如果为true,则 BLAS 将在添加的进程中使用多线程运行。默认值为false。exename:julia可执行文件的名称。默认值为"$(Sys.BINDIR)/julia"或"$(Sys.BINDIR)/julia-debug"(具体取决于情况)。建议所有远程机器使用相同的 Julia 版本,因为否则序列化和代码分发可能会失败。exeflags:传递给工作进程的额外标志。topology:指定工作进程如何相互连接。在未连接的工作进程之间发送消息会导致错误。topology=:all_to_all:所有进程相互连接。默认值。topology=:master_worker:只有驱动进程,即pid1 连接到工作进程。工作进程不相互连接。topology=:custom:集群管理器的launch方法通过WorkerConfig中的ident和connect_idents字段指定连接拓扑。具有集群管理器身份ident的工作进程将连接到connect_idents中指定的所有工作进程。
lazy:仅适用于topology=:all_to_all。如果为true,则工作进程之间的连接以懒惰方式建立,即在工作进程之间首次进行远程调用时建立。默认值为 true。env:提供字符串对数组,例如env=["JULIA_DEPOT_PATH"=>"/depot"],以请求在远程机器上设置环境变量。默认情况下,只有环境变量JULIA_WORKER_TIMEOUT会自动从本地环境传递到远程环境。cmdline_cookie:通过--worker命令行选项传递身份验证 cookie。通过 ssh stdio 传递 cookie 的(更安全)默认行为可能会在使用旧版本(ConPTY 之前的)Julia 或 Windows 版本的 Windows 工作进程中挂起,在这种情况下,cmdline_cookie=true提供了一种解决方法。
在 Julia 1.6 中添加了 ssh、shell、env 和 cmdline_cookie 关键字参数。
环境变量
如果主进程在 60.0 秒内无法与新启动的工作进程建立连接,工作进程将将其视为致命情况并终止。此超时可以通过环境变量 JULIA_WORKER_TIMEOUT 控制。主进程上 JULIA_WORKER_TIMEOUT 的值指定新启动的工作进程等待连接建立的时间(以秒为单位)。
addprocs(np::Integer=Sys.CPU_THREADS; restrict=true, kwargs...) -> List of process identifiers使用内置的 LocalManager 在本地主机上启动 np 个工作进程。
本地工作进程继承主进程的当前包环境(即,活动项目、LOAD_PATH 和 DEPOT_PATH)。
关键字参数:
restrict::Bool:如果为true(默认值),则绑定将限制为127.0.0.1。dir、exename、exeflags、env、topology、lazy、enable_threaded_blas:与SSHManager的效果相同,请参阅addprocs(machines::AbstractVector)的文档。
在 Julia 1.9 中添加了包环境的继承和 env 关键字参数。
Distributed.nprocs — 函数nprocs()获取可用进程的数量。
示例
julia> nprocs()
3
julia> workers()
2-element Array{Int64,1}:
2
3Distributed.nworkers — 函数nworkers()获取可用工作进程的数量。这比 nprocs() 少一个。如果 nprocs() == 1,则等于 nprocs()。
示例
$ julia -p 2
julia> nprocs()
3
julia> nworkers()
2Distributed.procs — 方法procs()返回所有进程标识符列表,包括 pid 1(不包括在 workers() 中)。
示例
$ julia -p 2
julia> procs()
3-element Array{Int64,1}:
1
2
3Distributed.procs — 方法procs(pid::Integer)返回同一物理节点上的所有进程标识符列表。具体来说,返回绑定到与 pid 相同 IP 地址的所有工作进程。
Distributed.workers — 函数workers()返回所有工作进程标识符列表。
示例
$ julia -p 2
julia> workers()
2-element Array{Int64,1}:
2
3Distributed.rmprocs — 函数rmprocs(pids...; waitfor=typemax(Int))移除指定的工作进程。请注意,只有进程 1 可以添加或移除工作进程。
参数 waitfor 指定等待工作进程关闭的时间。
- 如果未指定,
rmprocs将等待所有请求的pids被移除。 - 如果所有工作进程无法在请求的
waitfor秒内终止,则会引发ErrorException。 - 当
waitfor值为 0 时,调用立即返回,并将要移除的工作进程安排在其他任务中执行。返回已安排的Task对象。用户应在调用任何其他并行调用之前,在该任务上调用wait。
示例
$ julia -p 5
julia> t = rmprocs(2, 3, waitfor=0)
Task (runnable) @0x0000000107c718d0
julia> wait(t)
julia> workers()
3-element Array{Int64,1}:
4
5
6Distributed.interrupt — 函数interrupt(pids::Integer...)中断指定工作进程上当前正在执行的任务。这相当于在本地机器上按下 Ctrl-C。如果未提供参数,则会中断所有工作进程。
interrupt(pids::AbstractVector=workers())中断指定工作进程上当前正在执行的任务。这相当于在本地机器上按下 Ctrl-C。如果未提供参数,则会中断所有工作进程。
Distributed.myid — 函数myid()获取当前进程的 ID。
示例
julia> myid()
1
julia> remotecall_fetch(() -> myid(), 4)
4Distributed.pmap — 函数pmap(f, [::AbstractWorkerPool], c...; distributed=true, batch_size=1, on_error=nothing, retry_delays=[], retry_check=nothing) -> collection通过使用可用工作进程和任务将 f 应用于集合 c 的每个元素来转换集合 c。
对于多个集合参数,按元素应用 f。
请注意,f 必须对所有工作进程可用;有关详细信息,请参阅 代码可用性和加载包。
如果没有指定工作进程池,则会使用所有可用工作进程,即默认工作进程池。
默认情况下,pmap 会将计算分布到所有指定工作进程上。若要仅使用本地进程并将计算分布到任务上,请指定 distributed=false。这等效于使用 asyncmap。例如,pmap(f, c; distributed=false) 等效于 asyncmap(f,c; ntasks=()->nworkers())。
pmap 还可以通过 batch_size 参数使用进程和任务的混合方式。对于大于 1 的批处理大小,会将集合按批处理处理,每个批处理的长度为 batch_size 或更小。将一个批处理作为单个请求发送给空闲的工作进程,其中本地 asyncmap 使用多个并发任务处理批处理中的元素。
任何错误都会阻止 pmap 处理集合的剩余部分。若要覆盖此行为,可以通过 on_error 参数指定错误处理函数,该函数接收单个参数,即异常。该函数可以通过重新抛出错误来停止处理,或者要继续处理,则返回任何值,该值随后将与结果一起返回给调用方。
请考虑以下两个示例。第一个示例将异常对象内联返回,第二个示例将 0 返回到任何异常的位置。
julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=identity)
4-element Array{Any,1}:
1
ErrorException("foo")
3
ErrorException("foo")
julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=ex->0)
4-element Array{Int64,1}:
1
0
3
0也可以通过重试失败的计算来处理错误。关键字参数 retry_delays 和 retry_check 传递给 retry,分别作为关键字参数 delays 和 check。如果指定了批处理,并且整个批处理失败,则会重试批处理中的所有项。
请注意,如果同时指定了 on_error 和 retry_delays,则会先调用 on_error 钩子,然后进行重试。如果 on_error 不抛出(或重新抛出)异常,则不会重试该元素。
示例:对于错误,对元素上的 f 重试最多 3 次,两次重试之间没有延迟。
pmap(f, c; retry_delays = zeros(3))示例:仅当异常类型不是 InexactError 时才重试 f,并使用指数递增的延迟最多重试 3 次。对于所有 InexactError 出现的情况,返回 NaN。
pmap(f, c; on_error = e->(isa(e, InexactError) ? NaN : rethrow()), retry_delays = ExponentialBackOff(n = 3))Distributed.RemoteException — 类型RemoteException(captured)远程计算上的异常会被捕获并在本地重新抛出。RemoteException 封装了工作进程的 pid 和捕获的异常。CapturedException 捕获远程异常以及引发异常时调用堆栈的可序列化形式。
Distributed.ProcessExitedException — 类型ProcessExitedException(worker_id::Int)在客户端 Julia 进程退出后,进一步尝试引用已死亡的子进程会抛出此异常。
Distributed.Future — 类型Future(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing)Future 是未知终止状态和时间的一次计算的占位符。对于多个潜在计算,请参阅 RemoteChannel。有关识别 AbstractRemoteRef 的信息,请参阅 remoteref_id。
Distributed.RemoteChannel — 类型RemoteChannel(pid::Integer=myid())对进程 pid 上的 Channel{Any}(1) 进行引用。默认 pid 是当前进程。
RemoteChannel(f::Function, pid::Integer=myid())创建特定大小和类型的远程通道的引用。f 是一个函数,在 pid 上执行时必须返回 AbstractChannel 的实现。
例如,RemoteChannel(()->Channel{Int}(10), pid) 将返回对 pid 上类型为 Int、大小为 10 的通道的引用。
默认 pid 是当前进程。
Base.fetch — 方法fetch(x::Future)等待并获取 Future 的值。获取的值会缓存在本地。对同一引用的 fetch 的进一步调用将返回缓存的值。如果远程值为异常,则会抛出 RemoteException,该异常会捕获远程异常和回溯。
Base.fetch — 方法fetch(c::RemoteChannel)等待并从 RemoteChannel 获取值。引发的异常与 Future 相同。不会移除获取的项。
fetch(x::Any)返回 x。
Distributed.remotecall — 方法remotecall(f, id::Integer, args...; kwargs...) -> Future在指定进程上对给定参数异步调用函数 f。返回 Future。如果存在,则关键字参数将传递给 f。
Distributed.remotecall_wait — 方法remotecall_wait(f, id::Integer, args...; kwargs...)在由工作进程 ID id 指定的 Worker 上执行一个消息中的更快 wait(remotecall(...))。如果存在,则关键字参数将传递给 f。
另请参阅 wait 和 remotecall。
Distributed.remotecall_fetch — 方法remotecall_fetch(f, id::Integer, args...; kwargs...)在一个消息中执行 fetch(remotecall(...))。如果存在,则关键字参数将传递给 f。所有远程异常都将在 RemoteException 中捕获并抛出。
另请参阅 fetch 和 remotecall。
示例
$ julia -p 2
julia> remotecall_fetch(sqrt, 2, 4)
2.0
julia> remotecall_fetch(sqrt, 2, -4)
ERROR: On worker 2:
DomainError with -4.0:
sqrt was called with a negative real argument but will only return a complex result if called with a complex argument. Try sqrt(Complex(x)).
...Distributed.remote_do — 方法remote_do(f, id::Integer, args...; kwargs...) -> nothing在工作进程 id 上异步执行 f。与 remotecall 不同,它不会存储计算结果,也不存在等待其完成的方法。
成功调用表示已接受请求以便在远程节点上执行。
虽然对同一工作进程的连续 remotecall 会按它们被调用的顺序序列化,但远程工作进程上的执行顺序是不确定的。例如,remote_do(f1, 2); remotecall(f2, 2); remote_do(f3, 2) 会按顺序序列化对 f1、f2 和 f3 的调用。但是,不能保证 f1 在工作进程 2 上执行之前 f3 会被执行。
f 抛出的任何异常都会打印到远程工作进程上的 stderr。
如果存在,则关键字参数将传递给 f。
Base.put! — 方法put!(rr::RemoteChannel, args...)将一组值存储到 RemoteChannel。如果通道已满,则会阻塞,直到有空间可用为止。返回第一个参数。
Base.put! — 方法put!(rr::Future, v)将值存储到 Future rr。Future 是只写一次远程引用。对已设置的 Future 进行 put! 操作会抛出 Exception。所有异步远程调用都会返回 Future,并在完成后将值设置为调用的返回值。
Base.take! — 方法take!(rr::RemoteChannel, args...)从 RemoteChannel rr 获取值,并将值删除。
Base.isready — 方法isready(rr::RemoteChannel, args...)确定 RemoteChannel 是否已存储值。请注意,此函数会导致竞争条件,因为当您收到其结果时,它可能不再为真。但是,它可以安全地用于 Future,因为它们只分配一次。
Base.isready — 方法isready(rr::Future)确定 Future 是否已存储值。
如果参数 Future 由其他节点拥有,则此调用将阻塞以等待答案。建议在单独的任务中等待 rr,或者使用本地 Channel 作为代理。
p = 1
f = Future(p)
errormonitor(@async put!(f, remotecall_fetch(long_computation, p)))
isready(f) # will not blockDistributed.AbstractWorkerPool — 类型AbstractWorkerPool工作进程池的超类型,例如 WorkerPool 和 CachingPool。AbstractWorkerPool 应实现
push!- 将新的工作进程添加到整个池中(可用 + 繁忙)put!- 将工作进程放回可用池中take!- 从可用池中获取工作进程(用于远程函数执行)length- 整个池中可用工作进程的数量isready- 如果对池执行take!会阻塞,则返回 false,否则返回 true
上述操作的默认实现(在 AbstractWorkerPool 上)需要以下字段
channel::Channel{Int}workers::Set{Int}
其中 channel 包含空闲工作进程 pid,workers 是与此池相关联的所有工作进程的集合。
Distributed.WorkerPool — 类型WorkerPool(workers::Union{Vector{Int},AbstractRange{Int}})从一个 worker id 的向量或范围中创建一个 WorkerPool。
示例
$ julia -p 3
julia> WorkerPool([2, 3])
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([2, 3]), RemoteChannel{Channel{Any}}(1, 1, 6))
julia> WorkerPool(2:4)
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 7))Distributed.CachingPool — 类型CachingPool(workers::Vector{Int})AbstractWorkerPool 的实现。 remote, remotecall_fetch, pmap(以及其他在远程执行函数的远程调用)受益于在 worker 节点上缓存序列化/反序列化函数,特别是闭包(它们可能会捕获大量数据)。
远程缓存会为返回的 CachingPool 对象的整个生命周期维护。要提前清除缓存,请使用 clear!(pool)。
对于全局变量,只有绑定在闭包中捕获,而不是数据。let 块可以用于捕获全局数据。
示例
const foo = rand(10^8);
wp = CachingPool(workers())
let foo = foo
pmap(i -> sum(foo) + i, wp, 1:100);
end以上操作只会将 foo 传输一次到每个 worker。
Distributed.default_worker_pool — 函数default_worker_pool()AbstractWorkerPool 包含空闲的 workers - 由 remote(f) 和 pmap 使用(默认情况下)。除非通过 default_worker_pool!(pool) 显式设置,否则默认 worker 池初始化为 WorkerPool。
示例
$ julia -p 3
julia> default_worker_pool()
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:3), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 4))Distributed.clear! — 方法clear!(pool::CachingPool) -> pool从所有参与的 worker 中删除所有缓存的函数。
Distributed.remote — 函数remote([p::AbstractWorkerPool], f) -> Function返回一个匿名函数,该函数使用 remotecall_fetch 在一个可用的 worker(如果提供,则从 WorkerPool p 中获取)上执行函数 f。
Distributed.remotecall — 方法remotecall(f, pool::AbstractWorkerPool, args...; kwargs...) -> FutureWorkerPool 版本的 remotecall(f, pid, ....)。等待并从 pool 中获取一个空闲 worker,并在其上执行 remotecall。
示例
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> f = remotecall(maximum, wp, A)
Future(2, 1, 6, nothing)在这个例子中,任务在 pid 2 上运行,从 pid 1 调用。
Distributed.remotecall_wait — 方法remotecall_wait(f, pool::AbstractWorkerPool, args...; kwargs...) -> FutureWorkerPool 版本的 remotecall_wait(f, pid, ....)。等待并从 pool 中获取一个空闲 worker,并在其上执行 remotecall_wait。
示例
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> f = remotecall_wait(maximum, wp, A)
Future(3, 1, 9, nothing)
julia> fetch(f)
0.9995177101692958Distributed.remotecall_fetch — 方法remotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...) -> resultWorkerPool 版本的 remotecall_fetch(f, pid, ....)。等待并从 pool 中获取一个空闲 worker,并在其上执行 remotecall_fetch。
示例
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> remotecall_fetch(maximum, wp, A)
0.9995177101692958Distributed.remote_do — 方法remote_do(f, pool::AbstractWorkerPool, args...; kwargs...) -> nothingWorkerPool 版本的 remote_do(f, pid, ....)。等待并从 pool 中获取一个空闲 worker,并在其上执行 remote_do。
Distributed.@spawnat — 宏@spawnat p expr在一个表达式周围创建一个闭包,并在进程 p 上异步运行该闭包。返回一个指向结果的 Future。如果 p 是带引号的文字符号 :any,那么系统将自动选择一个处理器使用。
示例
julia> addprocs(3);
julia> f = @spawnat 2 myid()
Future(2, 1, 3, nothing)
julia> fetch(f)
2
julia> f = @spawnat :any myid()
Future(3, 1, 7, nothing)
julia> fetch(f)
3:any 参数从 Julia 1.3 开始可用。
Distributed.@fetch — 宏@fetch expr等效于 fetch(@spawnat :any expr)。请参阅 fetch 和 @spawnat。
示例
julia> addprocs(3);
julia> @fetch myid()
2
julia> @fetch myid()
3
julia> @fetch myid()
4
julia> @fetch myid()
2Distributed.@fetchfrom — 宏@fetchfrom等效于 fetch(@spawnat p expr)。请参阅 fetch 和 @spawnat。
示例
julia> addprocs(3);
julia> @fetchfrom 2 myid()
2
julia> @fetchfrom 4 myid()
4Distributed.@distributed — 宏@distributed一个分布式内存并行 for 循环,形式为
@distributed [reducer] for var = range
body
end指定的范围被分区并在所有 worker 上本地执行。如果指定了可选的 reducer 函数,@distributed 将在每个 worker 上执行本地归约,并在调用进程上执行最终归约。
请注意,没有 reducer 函数,@distributed 异步执行,即它在所有可用的 worker 上生成独立的任务并立即返回,而不会等待完成。要等待完成,请在调用前加上 @sync,例如
@sync @distributed for var = range
body
endDistributed.@everywhere — 宏@everywhere [procs()] expr在所有 procs 上执行 Main 下的表达式。任何进程上的错误都将收集到一个 CompositeException 并抛出。例如
@everywhere bar = 1将在所有当前进程上定义 Main.bar。任何稍后添加的进程(例如使用 addprocs())都不会定义该表达式。
与 @spawnat 不同,@everywhere 不会捕获任何局部变量。相反,局部变量可以使用插值进行广播
foo = 1
@everywhere bar = $foo可选参数 procs 允许指定所有进程的子集来执行表达式。
类似于调用 remotecall_eval(Main, procs, expr),但有两个额外的功能
- `using` and `import` statements run on the calling process first, to ensure
packages are precompiled.
- The current source file path used by `include` is propagated to other processes.Distributed.clear! — 方法clear!(syms, pids=workers(); mod=Main)通过将全局绑定初始化为 nothing 来清除模块中的全局绑定。syms 应该是 Symbol 类型或 Symbol 的集合。pids 和 mod 标识要重新初始化全局变量的进程和模块。只有在 mod 下定义的那些名称会被清除。
如果请求清除全局常量,则会抛出异常。
Distributed.remoteref_id — 函数remoteref_id(r::AbstractRemoteRef) -> RRIDFuture 和 RemoteChannel 由字段标识
where- 指向引用实际存在的底层对象/存储所在的节点。whence- 指向创建远程引用的节点。请注意,这与底层对象实际存在的节点不同。例如,从主进程调用RemoteChannel(2)会导致where值为 2,whence值为 1。id在从whence指定的 worker 创建的所有引用中是唯一的。
whence 和 id 共同在所有 worker 中唯一标识一个引用。
remoteref_id 是一个低级 API,它返回一个 RRID 对象,该对象包装了远程引用的 whence 和 id 值。
Distributed.channel_from_id — 函数channel_from_id(id) -> c一个低级 API,它返回由 remoteref_id 返回的 id 的支持 AbstractChannel。该调用仅在支持通道存在的节点上有效。
Distributed.worker_id_from_socket — 函数worker_id_from_socket(s) -> pid一个低级 API,它在给定一个 IO 连接或 Worker 的情况下,返回与其连接的 worker 的 pid。这在为类型编写自定义 serialize 方法时很有用,这会根据接收进程 ID 优化写出的数据。
Distributed.cluster_cookie — 方法cluster_cookie() -> cookie返回集群 cookie。
Distributed.cluster_cookie — 方法cluster_cookie(cookie) -> cookie将传递的 cookie 设置为集群 cookie,然后返回它。
集群管理器接口
此接口提供了一种机制,可以在不同的集群环境中启动和管理 Julia worker。Base 中存在两种类型的管理器:LocalManager,用于在同一主机上启动其他 worker,以及 SSHManager,用于通过 ssh 在远程主机上启动 worker。TCP/IP 套接字用于连接和传输进程之间的消息。集群管理器可以使用不同的传输方式。
Distributed.ClusterManager — 类型ClusterManager集群管理器的超类型,它控制 worker 进程作为一个集群。集群管理器实现如何添加、删除和与 worker 通信。SSHManager 和 LocalManager 是它的子类型。
Distributed.WorkerConfig — 类型WorkerConfig由 ClusterManager 使用的类型,用于控制添加到其集群中的 worker。某些字段用于所有集群管理器以访问主机
io– 用于访问 worker 的连接(IO的子类型或Nothing)host– 主机地址(String或Nothing)port– 用于连接到 worker 的主机上的端口(Int或Nothing)
一些用于集群管理器将 worker 添加到已初始化的主机
count– 要在主机上启动的 worker 数量exename– 主机上 Julia 可执行文件的路径,默认为"$(Sys.BINDIR)/julia"或"$(Sys.BINDIR)/julia-debug"exeflags– 远程启动 Julia 时要使用的标志
userdata 字段用于由外部管理器为每个 worker 存储信息。
某些字段由 SSHManager 和类似的管理器使用
tunnel–true(使用隧道),false(不使用隧道),或nothing(使用管理器的默认值)multiplex–true(使用 SSH 多路复用进行隧道)或falseforward– 用于 ssh 的-L选项的转发选项bind_addr– 要绑定的远程主机上的地址sshflags– 用于建立 SSH 连接的标志max_parallel– 主机上并行连接的最大 worker 数量
某些字段由 LocalManager 和 SSHManager 都使用
connect_at– 确定这是 worker-to-worker 还是 driver-to-worker 设置调用process– 将连接的进程(通常管理器将在addprocs期间分配此进程)ospid– 按照主机操作系统的进程 ID,用于中断 worker 进程environ– 用于存储 Local/SSH 管理器临时信息的私有字典ident– 由ClusterManager标识的 workerconnect_idents– 如果使用自定义拓扑,worker 必须连接到的 worker ID 列表enable_threaded_blas–true、false或nothing,是否在 worker 上使用线程化的 BLAS
Distributed.launch — 函数launch(manager::ClusterManager, params::Dict, launched::Array, launch_ntfy::Condition)由集群管理器实现。对于此函数启动的每个 Julia 工作器,它应该将一个 WorkerConfig 条目追加到 launched 并通知 launch_ntfy。一旦 manager 请求的所有工作器启动,该函数必须退出。params 是一个字典,包含调用 addprocs 时使用的所有关键字参数。
Distributed.manage — 函数manage(manager::ClusterManager, id::Integer, config::WorkerConfig. op::Symbol)由集群管理器实现。它在主进程上调用,在工作器的生命周期内,使用适当的 op 值
- 当工作器从 Julia 工作器池中添加/删除时,使用
:register/:deregister。 - 当调用
interrupt(workers)时,使用:interrupt。ClusterManager应该向相应的工作器发送一个中断信号。 - 使用
:finalize用于清理目的。
Base.kill — 方法kill(manager::ClusterManager, pid::Int, config::WorkerConfig)由集群管理器实现。它在主进程上调用,由 rmprocs 调用。它应该导致由 pid 指定的远程工作器退出。kill(manager::ClusterManager.....) 在 pid 上执行远程 exit()。
Sockets.connect — 方法connect(manager::ClusterManager, pid::Int, config::WorkerConfig) -> (instrm::IO, outstrm::IO)由使用自定义传输的集群管理器实现。它应该建立一个逻辑连接到具有 id pid 的工作器,由 config 指定,并返回一对 IO 对象。来自 pid 到当前进程的消息将从 instrm 读取,而要发送到 pid 的消息将写入 outstrm。自定义传输实现必须确保消息完整且按顺序传递和接收。connect(manager::ClusterManager.....) 在工作器之间建立 TCP/IP 套接字连接。
Distributed.init_worker — 函数init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager())由实现自定义传输的集群管理器调用。它将新启动的进程初始化为工作器。命令行参数 --worker[=<cookie>] 的作用是使用 TCP/IP 套接字进行传输,将进程初始化为工作器。cookie 是一个 cluster_cookie。
Distributed.start_worker — 函数start_worker([out::IO=stdout], cookie::AbstractString=readline(stdin); close_stdin::Bool=true, stderr_to_stdout::Bool=true)start_worker 是一个内部函数,是通过 TCP/IP 连接的工作器进程的默认入口点。它将进程设置为 Julia 集群工作器。
主机:端口信息写入流 out(默认为标准输出)。
该函数在需要时从标准输入读取 cookie,并在一个空闲端口(或者如果指定了,则在 --bind-to 命令行选项中的端口)上监听,并调度任务来处理传入的 TCP 连接和请求。它还(可选)关闭标准输入并将标准错误重定向到标准输出。
它不返回。
Distributed.process_messages — 函数process_messages(r_stream::IO, w_stream::IO, incoming::Bool=true)由使用自定义传输的集群管理器调用。当自定义传输实现接收到来自远程工作器的第一条消息时,应该调用它。自定义传输必须管理与远程工作器的逻辑连接,并提供两个 IO 对象,一个用于传入消息,另一个用于发送到远程工作器的消息。如果 incoming 为 true,则远程对等体发起了连接。连接发起者中的任意一方都会发送集群 cookie 及其 Julia 版本号以执行身份验证握手。
另请参见 cluster_cookie。
Distributed.default_addprocs_params — 函数default_addprocs_params(mgr::ClusterManager) -> Dict{Symbol, Any}由集群管理器实现。调用 addprocs(mgr) 时传递的默认关键字参数。通过调用 default_addprocs_params() 可以获得最小选项集。