分布式计算
用于分布式并行处理的工具。
Distributed.addprocs
— 函数addprocs(manager::ClusterManager; kwargs...) -> List of process identifiers
通过指定的集群管理器启动工作进程。
例如,Beowulf 集群通过在 ClusterManagers.jl
包中实现的自定义集群管理器来支持。
新启动的工作进程等待主进程建立连接的时间(以秒为单位)可以通过工作进程环境中的变量 JULIA_WORKER_TIMEOUT
指定。仅在使用 TCP/IP 作为传输协议时相关。
为了在不阻塞 REPL 的情况下启动工作进程,或者在以编程方式启动工作进程时在包含函数中启动工作进程,请在自己的任务中执行 addprocs
。
示例
# On busy clusters, call `addprocs` asynchronously
t = @async addprocs(...)
# Utilize workers as and when they come online
if nprocs() > 1 # Ensure at least one new worker is available
.... # perform distributed execution
end
# Retrieve newly launched worker IDs, or any error messages
if istaskdone(t) # Check if `addprocs` has completed to ensure `fetch` doesn't block
if nworkers() == N
new_pids = fetch(t)
else
fetch(t)
end
end
addprocs(machines; tunnel=false, sshflags=``, max_parallel=10, kwargs...) -> List of process identifiers
通过 SSH 在远程机器上添加工作进程。配置通过关键字参数完成(见下文)。特别是,exename
关键字可用于指定远程机器上的 julia
二进制文件的路径。
machines
是一个“机器规范”向量,这些规范以以下格式给出:[user@]host[:port] [bind_addr[:port]]
。user
默认设置为当前用户,port
默认设置为标准 SSH 端口。如果指定了 [bind_addr[:port]]
,则其他工作进程将通过指定的 bind_addr
和 port
连接到此工作进程。
通过在 machines
向量中使用元组或 (machine_spec, count)
形式,可以在远程主机上启动多个进程,其中 count
是要启动的进程数。将 :auto
作为工作进程数量传递将启动与远程主机上的 CPU 线程数量相同的进程数。
示例:
addprocs([
"remote1", # one worker on 'remote1' logging in with the current username
"user@remote2", # one worker on 'remote2' logging in with the 'user' username
"user@remote3:2222", # specifying SSH port to '2222' for 'remote3'
("user@remote4", 4), # launch 4 workers on 'remote4'
("user@remote5", :auto), # launch as many workers as CPU threads on 'remote5'
])
关键字参数:
tunnel
:如果为true
,则将使用 SSH 隧道从主进程连接到工作进程。默认值为false
。multiplex
:如果为true
,则将使用 SSH 多路复用进行 SSH 隧道。默认值为false
。ssh
:用于启动工作进程的 SSH 客户端可执行文件的名称或路径。默认值为"ssh"
。sshflags
:指定额外的 ssh 选项,例如sshflags=`-i /home/foo/bar.pem`
max_parallel
:指定在主机上并行连接的最大工作进程数。默认值为 10。shell
:指定 ssh 连接到的工作进程的 shell 类型。shell=:posix
:一个与 POSIX 兼容的 Unix/Linux shell (sh, ksh, bash, dash, zsh, 等)。默认值。shell=:csh
:一个 Unix C shell (csh, tcsh)。shell=:wincmd
:Microsoft Windowscmd.exe
。
dir
:指定工作进程上的工作目录。默认设置为主机当前目录(由pwd()
查找)。enable_threaded_blas
:如果为true
,则 BLAS 将在添加的进程中使用多线程运行。默认值为false
。exename
:julia
可执行文件的名称。默认值为"$(Sys.BINDIR)/julia"
或"$(Sys.BINDIR)/julia-debug"
(具体取决于情况)。建议所有远程机器使用相同的 Julia 版本,因为否则序列化和代码分发可能会失败。exeflags
:传递给工作进程的额外标志。topology
:指定工作进程如何相互连接。在未连接的工作进程之间发送消息会导致错误。topology=:all_to_all
:所有进程相互连接。默认值。topology=:master_worker
:只有驱动进程,即pid
1 连接到工作进程。工作进程不相互连接。topology=:custom
:集群管理器的launch
方法通过WorkerConfig
中的ident
和connect_idents
字段指定连接拓扑。具有集群管理器身份ident
的工作进程将连接到connect_idents
中指定的所有工作进程。
lazy
:仅适用于topology=:all_to_all
。如果为true
,则工作进程之间的连接以懒惰方式建立,即在工作进程之间首次进行远程调用时建立。默认值为 true。env
:提供字符串对数组,例如env=["JULIA_DEPOT_PATH"=>"/depot"]
,以请求在远程机器上设置环境变量。默认情况下,只有环境变量JULIA_WORKER_TIMEOUT
会自动从本地环境传递到远程环境。cmdline_cookie
:通过--worker
命令行选项传递身份验证 cookie。通过 ssh stdio 传递 cookie 的(更安全)默认行为可能会在使用旧版本(ConPTY 之前的)Julia 或 Windows 版本的 Windows 工作进程中挂起,在这种情况下,cmdline_cookie=true
提供了一种解决方法。
在 Julia 1.6 中添加了 ssh
、shell
、env
和 cmdline_cookie
关键字参数。
环境变量
如果主进程在 60.0 秒内无法与新启动的工作进程建立连接,工作进程将将其视为致命情况并终止。此超时可以通过环境变量 JULIA_WORKER_TIMEOUT
控制。主进程上 JULIA_WORKER_TIMEOUT
的值指定新启动的工作进程等待连接建立的时间(以秒为单位)。
addprocs(np::Integer=Sys.CPU_THREADS; restrict=true, kwargs...) -> List of process identifiers
使用内置的 LocalManager
在本地主机上启动 np
个工作进程。
本地工作进程继承主进程的当前包环境(即,活动项目、LOAD_PATH
和 DEPOT_PATH
)。
关键字参数:
restrict::Bool
:如果为true
(默认值),则绑定将限制为127.0.0.1
。dir
、exename
、exeflags
、env
、topology
、lazy
、enable_threaded_blas
:与SSHManager
的效果相同,请参阅addprocs(machines::AbstractVector)
的文档。
在 Julia 1.9 中添加了包环境的继承和 env
关键字参数。
Distributed.nprocs
— 函数nprocs()
获取可用进程的数量。
示例
julia> nprocs()
3
julia> workers()
2-element Array{Int64,1}:
2
3
Distributed.nworkers
— 函数nworkers()
获取可用工作进程的数量。这比 nprocs()
少一个。如果 nprocs() == 1
,则等于 nprocs()
。
示例
$ julia -p 2
julia> nprocs()
3
julia> nworkers()
2
Distributed.procs
— 方法procs()
返回所有进程标识符列表,包括 pid 1(不包括在 workers()
中)。
示例
$ julia -p 2
julia> procs()
3-element Array{Int64,1}:
1
2
3
Distributed.procs
— 方法procs(pid::Integer)
返回同一物理节点上的所有进程标识符列表。具体来说,返回绑定到与 pid
相同 IP 地址的所有工作进程。
Distributed.workers
— 函数workers()
返回所有工作进程标识符列表。
示例
$ julia -p 2
julia> workers()
2-element Array{Int64,1}:
2
3
Distributed.rmprocs
— 函数rmprocs(pids...; waitfor=typemax(Int))
移除指定的工作进程。请注意,只有进程 1 可以添加或移除工作进程。
参数 waitfor
指定等待工作进程关闭的时间。
- 如果未指定,
rmprocs
将等待所有请求的pids
被移除。 - 如果所有工作进程无法在请求的
waitfor
秒内终止,则会引发ErrorException
。 - 当
waitfor
值为 0 时,调用立即返回,并将要移除的工作进程安排在其他任务中执行。返回已安排的Task
对象。用户应在调用任何其他并行调用之前,在该任务上调用wait
。
示例
$ julia -p 5
julia> t = rmprocs(2, 3, waitfor=0)
Task (runnable) @0x0000000107c718d0
julia> wait(t)
julia> workers()
3-element Array{Int64,1}:
4
5
6
Distributed.interrupt
— 函数interrupt(pids::Integer...)
中断指定工作进程上当前正在执行的任务。这相当于在本地机器上按下 Ctrl-C。如果未提供参数,则会中断所有工作进程。
interrupt(pids::AbstractVector=workers())
中断指定工作进程上当前正在执行的任务。这相当于在本地机器上按下 Ctrl-C。如果未提供参数,则会中断所有工作进程。
Distributed.myid
— 函数myid()
获取当前进程的 ID。
示例
julia> myid()
1
julia> remotecall_fetch(() -> myid(), 4)
4
Distributed.pmap
— 函数pmap(f, [::AbstractWorkerPool], c...; distributed=true, batch_size=1, on_error=nothing, retry_delays=[], retry_check=nothing) -> collection
通过使用可用工作进程和任务将 f
应用于集合 c
的每个元素来转换集合 c
。
对于多个集合参数,按元素应用 f
。
请注意,f
必须对所有工作进程可用;有关详细信息,请参阅 代码可用性和加载包。
如果没有指定工作进程池,则会使用所有可用工作进程,即默认工作进程池。
默认情况下,pmap
会将计算分布到所有指定工作进程上。若要仅使用本地进程并将计算分布到任务上,请指定 distributed=false
。这等效于使用 asyncmap
。例如,pmap(f, c; distributed=false)
等效于 asyncmap(f,c; ntasks=()->nworkers())
。
pmap
还可以通过 batch_size
参数使用进程和任务的混合方式。对于大于 1 的批处理大小,会将集合按批处理处理,每个批处理的长度为 batch_size
或更小。将一个批处理作为单个请求发送给空闲的工作进程,其中本地 asyncmap
使用多个并发任务处理批处理中的元素。
任何错误都会阻止 pmap
处理集合的剩余部分。若要覆盖此行为,可以通过 on_error
参数指定错误处理函数,该函数接收单个参数,即异常。该函数可以通过重新抛出错误来停止处理,或者要继续处理,则返回任何值,该值随后将与结果一起返回给调用方。
请考虑以下两个示例。第一个示例将异常对象内联返回,第二个示例将 0 返回到任何异常的位置。
julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=identity)
4-element Array{Any,1}:
1
ErrorException("foo")
3
ErrorException("foo")
julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=ex->0)
4-element Array{Int64,1}:
1
0
3
0
也可以通过重试失败的计算来处理错误。关键字参数 retry_delays
和 retry_check
传递给 retry
,分别作为关键字参数 delays
和 check
。如果指定了批处理,并且整个批处理失败,则会重试批处理中的所有项。
请注意,如果同时指定了 on_error
和 retry_delays
,则会先调用 on_error
钩子,然后进行重试。如果 on_error
不抛出(或重新抛出)异常,则不会重试该元素。
示例:对于错误,对元素上的 f
重试最多 3 次,两次重试之间没有延迟。
pmap(f, c; retry_delays = zeros(3))
示例:仅当异常类型不是 InexactError
时才重试 f
,并使用指数递增的延迟最多重试 3 次。对于所有 InexactError
出现的情况,返回 NaN
。
pmap(f, c; on_error = e->(isa(e, InexactError) ? NaN : rethrow()), retry_delays = ExponentialBackOff(n = 3))
Distributed.RemoteException
— 类型RemoteException(captured)
远程计算上的异常会被捕获并在本地重新抛出。RemoteException
封装了工作进程的 pid
和捕获的异常。CapturedException
捕获远程异常以及引发异常时调用堆栈的可序列化形式。
Distributed.ProcessExitedException
— 类型ProcessExitedException(worker_id::Int)
在客户端 Julia 进程退出后,进一步尝试引用已死亡的子进程会抛出此异常。
Distributed.Future
— 类型Future(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing)
Future
是未知终止状态和时间的一次计算的占位符。对于多个潜在计算,请参阅 RemoteChannel
。有关识别 AbstractRemoteRef
的信息,请参阅 remoteref_id
。
Distributed.RemoteChannel
— 类型RemoteChannel(pid::Integer=myid())
对进程 pid
上的 Channel{Any}(1)
进行引用。默认 pid
是当前进程。
RemoteChannel(f::Function, pid::Integer=myid())
创建特定大小和类型的远程通道的引用。f
是一个函数,在 pid
上执行时必须返回 AbstractChannel
的实现。
例如,RemoteChannel(()->Channel{Int}(10), pid)
将返回对 pid
上类型为 Int
、大小为 10 的通道的引用。
默认 pid
是当前进程。
Base.fetch
— 方法fetch(x::Future)
等待并获取 Future
的值。获取的值会缓存在本地。对同一引用的 fetch
的进一步调用将返回缓存的值。如果远程值为异常,则会抛出 RemoteException
,该异常会捕获远程异常和回溯。
Base.fetch
— 方法fetch(c::RemoteChannel)
等待并从 RemoteChannel
获取值。引发的异常与 Future
相同。不会移除获取的项。
fetch(x::Any)
返回 x
。
Distributed.remotecall
— 方法remotecall(f, id::Integer, args...; kwargs...) -> Future
在指定进程上对给定参数异步调用函数 f
。返回 Future
。如果存在,则关键字参数将传递给 f
。
Distributed.remotecall_wait
— 方法remotecall_wait(f, id::Integer, args...; kwargs...)
在由工作进程 ID id
指定的 Worker
上执行一个消息中的更快 wait(remotecall(...))
。如果存在,则关键字参数将传递给 f
。
另请参阅 wait
和 remotecall
。
Distributed.remotecall_fetch
— 方法remotecall_fetch(f, id::Integer, args...; kwargs...)
在一个消息中执行 fetch(remotecall(...))
。如果存在,则关键字参数将传递给 f
。所有远程异常都将在 RemoteException
中捕获并抛出。
另请参阅 fetch
和 remotecall
。
示例
$ julia -p 2
julia> remotecall_fetch(sqrt, 2, 4)
2.0
julia> remotecall_fetch(sqrt, 2, -4)
ERROR: On worker 2:
DomainError with -4.0:
sqrt was called with a negative real argument but will only return a complex result if called with a complex argument. Try sqrt(Complex(x)).
...
Distributed.remote_do
— 方法remote_do(f, id::Integer, args...; kwargs...) -> nothing
在工作进程 id
上异步执行 f
。与 remotecall
不同,它不会存储计算结果,也不存在等待其完成的方法。
成功调用表示已接受请求以便在远程节点上执行。
虽然对同一工作进程的连续 remotecall
会按它们被调用的顺序序列化,但远程工作进程上的执行顺序是不确定的。例如,remote_do(f1, 2); remotecall(f2, 2); remote_do(f3, 2)
会按顺序序列化对 f1
、f2
和 f3
的调用。但是,不能保证 f1
在工作进程 2 上执行之前 f3
会被执行。
f
抛出的任何异常都会打印到远程工作进程上的 stderr
。
如果存在,则关键字参数将传递给 f
。
Base.put!
— 方法put!(rr::RemoteChannel, args...)
将一组值存储到 RemoteChannel
。如果通道已满,则会阻塞,直到有空间可用为止。返回第一个参数。
Base.put!
— 方法put!(rr::Future, v)
将值存储到 Future
rr
。Future
是只写一次远程引用。对已设置的 Future
进行 put!
操作会抛出 Exception
。所有异步远程调用都会返回 Future
,并在完成后将值设置为调用的返回值。
Base.take!
— 方法take!(rr::RemoteChannel, args...)
从 RemoteChannel
rr
获取值,并将值删除。
Base.isready
— 方法isready(rr::RemoteChannel, args...)
确定 RemoteChannel
是否已存储值。请注意,此函数会导致竞争条件,因为当您收到其结果时,它可能不再为真。但是,它可以安全地用于 Future
,因为它们只分配一次。
Base.isready
— 方法isready(rr::Future)
确定 Future
是否已存储值。
如果参数 Future
由其他节点拥有,则此调用将阻塞以等待答案。建议在单独的任务中等待 rr
,或者使用本地 Channel
作为代理。
p = 1
f = Future(p)
errormonitor(@async put!(f, remotecall_fetch(long_computation, p)))
isready(f) # will not block
Distributed.AbstractWorkerPool
— 类型AbstractWorkerPool
工作进程池的超类型,例如 WorkerPool
和 CachingPool
。AbstractWorkerPool
应实现
push!
- 将新的工作进程添加到整个池中(可用 + 繁忙)put!
- 将工作进程放回可用池中take!
- 从可用池中获取工作进程(用于远程函数执行)length
- 整个池中可用工作进程的数量isready
- 如果对池执行take!
会阻塞,则返回 false,否则返回 true
上述操作的默认实现(在 AbstractWorkerPool
上)需要以下字段
channel::Channel{Int}
workers::Set{Int}
其中 channel
包含空闲工作进程 pid,workers
是与此池相关联的所有工作进程的集合。
Distributed.WorkerPool
— 类型WorkerPool(workers::Union{Vector{Int},AbstractRange{Int}})
从一个 worker id 的向量或范围中创建一个 WorkerPool
。
示例
$ julia -p 3
julia> WorkerPool([2, 3])
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([2, 3]), RemoteChannel{Channel{Any}}(1, 1, 6))
julia> WorkerPool(2:4)
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 7))
Distributed.CachingPool
— 类型CachingPool(workers::Vector{Int})
AbstractWorkerPool
的实现。 remote
, remotecall_fetch
, pmap
(以及其他在远程执行函数的远程调用)受益于在 worker 节点上缓存序列化/反序列化函数,特别是闭包(它们可能会捕获大量数据)。
远程缓存会为返回的 CachingPool
对象的整个生命周期维护。要提前清除缓存,请使用 clear!(pool)
。
对于全局变量,只有绑定在闭包中捕获,而不是数据。let
块可以用于捕获全局数据。
示例
const foo = rand(10^8);
wp = CachingPool(workers())
let foo = foo
pmap(i -> sum(foo) + i, wp, 1:100);
end
以上操作只会将 foo
传输一次到每个 worker。
Distributed.default_worker_pool
— 函数default_worker_pool()
AbstractWorkerPool
包含空闲的 workers
- 由 remote(f)
和 pmap
使用(默认情况下)。除非通过 default_worker_pool!(pool)
显式设置,否则默认 worker 池初始化为 WorkerPool
。
示例
$ julia -p 3
julia> default_worker_pool()
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:3), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 4))
Distributed.clear!
— 方法clear!(pool::CachingPool) -> pool
从所有参与的 worker 中删除所有缓存的函数。
Distributed.remote
— 函数remote([p::AbstractWorkerPool], f) -> Function
返回一个匿名函数,该函数使用 remotecall_fetch
在一个可用的 worker(如果提供,则从 WorkerPool
p
中获取)上执行函数 f
。
Distributed.remotecall
— 方法remotecall(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future
WorkerPool
版本的 remotecall(f, pid, ....)
。等待并从 pool
中获取一个空闲 worker,并在其上执行 remotecall
。
示例
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> f = remotecall(maximum, wp, A)
Future(2, 1, 6, nothing)
在这个例子中,任务在 pid 2 上运行,从 pid 1 调用。
Distributed.remotecall_wait
— 方法remotecall_wait(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future
WorkerPool
版本的 remotecall_wait(f, pid, ....)
。等待并从 pool
中获取一个空闲 worker,并在其上执行 remotecall_wait
。
示例
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> f = remotecall_wait(maximum, wp, A)
Future(3, 1, 9, nothing)
julia> fetch(f)
0.9995177101692958
Distributed.remotecall_fetch
— 方法remotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...) -> result
WorkerPool
版本的 remotecall_fetch(f, pid, ....)
。等待并从 pool
中获取一个空闲 worker,并在其上执行 remotecall_fetch
。
示例
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> remotecall_fetch(maximum, wp, A)
0.9995177101692958
Distributed.remote_do
— 方法remote_do(f, pool::AbstractWorkerPool, args...; kwargs...) -> nothing
WorkerPool
版本的 remote_do(f, pid, ....)
。等待并从 pool
中获取一个空闲 worker,并在其上执行 remote_do
。
Distributed.@spawnat
— 宏@spawnat p expr
在一个表达式周围创建一个闭包,并在进程 p
上异步运行该闭包。返回一个指向结果的 Future
。如果 p
是带引号的文字符号 :any
,那么系统将自动选择一个处理器使用。
示例
julia> addprocs(3);
julia> f = @spawnat 2 myid()
Future(2, 1, 3, nothing)
julia> fetch(f)
2
julia> f = @spawnat :any myid()
Future(3, 1, 7, nothing)
julia> fetch(f)
3
:any
参数从 Julia 1.3 开始可用。
Distributed.@fetch
— 宏@fetch expr
等效于 fetch(@spawnat :any expr)
。请参阅 fetch
和 @spawnat
。
示例
julia> addprocs(3);
julia> @fetch myid()
2
julia> @fetch myid()
3
julia> @fetch myid()
4
julia> @fetch myid()
2
Distributed.@fetchfrom
— 宏@fetchfrom
等效于 fetch(@spawnat p expr)
。请参阅 fetch
和 @spawnat
。
示例
julia> addprocs(3);
julia> @fetchfrom 2 myid()
2
julia> @fetchfrom 4 myid()
4
Distributed.@distributed
— 宏@distributed
一个分布式内存并行 for 循环,形式为
@distributed [reducer] for var = range
body
end
指定的范围被分区并在所有 worker 上本地执行。如果指定了可选的 reducer 函数,@distributed
将在每个 worker 上执行本地归约,并在调用进程上执行最终归约。
请注意,没有 reducer 函数,@distributed
异步执行,即它在所有可用的 worker 上生成独立的任务并立即返回,而不会等待完成。要等待完成,请在调用前加上 @sync
,例如
@sync @distributed for var = range
body
end
Distributed.@everywhere
— 宏@everywhere [procs()] expr
在所有 procs
上执行 Main
下的表达式。任何进程上的错误都将收集到一个 CompositeException
并抛出。例如
@everywhere bar = 1
将在所有当前进程上定义 Main.bar
。任何稍后添加的进程(例如使用 addprocs()
)都不会定义该表达式。
与 @spawnat
不同,@everywhere
不会捕获任何局部变量。相反,局部变量可以使用插值进行广播
foo = 1
@everywhere bar = $foo
可选参数 procs
允许指定所有进程的子集来执行表达式。
类似于调用 remotecall_eval(Main, procs, expr)
,但有两个额外的功能
- `using` and `import` statements run on the calling process first, to ensure
packages are precompiled.
- The current source file path used by `include` is propagated to other processes.
Distributed.clear!
— 方法clear!(syms, pids=workers(); mod=Main)
通过将全局绑定初始化为 nothing
来清除模块中的全局绑定。syms
应该是 Symbol
类型或 Symbol
的集合。pids
和 mod
标识要重新初始化全局变量的进程和模块。只有在 mod
下定义的那些名称会被清除。
如果请求清除全局常量,则会抛出异常。
Distributed.remoteref_id
— 函数remoteref_id(r::AbstractRemoteRef) -> RRID
Future
和 RemoteChannel
由字段标识
where
- 指向引用实际存在的底层对象/存储所在的节点。whence
- 指向创建远程引用的节点。请注意,这与底层对象实际存在的节点不同。例如,从主进程调用RemoteChannel(2)
会导致where
值为 2,whence
值为 1。id
在从whence
指定的 worker 创建的所有引用中是唯一的。
whence
和 id
共同在所有 worker 中唯一标识一个引用。
remoteref_id
是一个低级 API,它返回一个 RRID
对象,该对象包装了远程引用的 whence
和 id
值。
Distributed.channel_from_id
— 函数channel_from_id(id) -> c
一个低级 API,它返回由 remoteref_id
返回的 id
的支持 AbstractChannel
。该调用仅在支持通道存在的节点上有效。
Distributed.worker_id_from_socket
— 函数worker_id_from_socket(s) -> pid
一个低级 API,它在给定一个 IO
连接或 Worker
的情况下,返回与其连接的 worker 的 pid
。这在为类型编写自定义 serialize
方法时很有用,这会根据接收进程 ID 优化写出的数据。
Distributed.cluster_cookie
— 方法cluster_cookie() -> cookie
返回集群 cookie。
Distributed.cluster_cookie
— 方法cluster_cookie(cookie) -> cookie
将传递的 cookie 设置为集群 cookie,然后返回它。
集群管理器接口
此接口提供了一种机制,可以在不同的集群环境中启动和管理 Julia worker。Base 中存在两种类型的管理器:LocalManager
,用于在同一主机上启动其他 worker,以及 SSHManager
,用于通过 ssh
在远程主机上启动 worker。TCP/IP 套接字用于连接和传输进程之间的消息。集群管理器可以使用不同的传输方式。
Distributed.ClusterManager
— 类型ClusterManager
集群管理器的超类型,它控制 worker 进程作为一个集群。集群管理器实现如何添加、删除和与 worker 通信。SSHManager
和 LocalManager
是它的子类型。
Distributed.WorkerConfig
— 类型WorkerConfig
由 ClusterManager
使用的类型,用于控制添加到其集群中的 worker。某些字段用于所有集群管理器以访问主机
io
– 用于访问 worker 的连接(IO
的子类型或Nothing
)host
– 主机地址(String
或Nothing
)port
– 用于连接到 worker 的主机上的端口(Int
或Nothing
)
一些用于集群管理器将 worker 添加到已初始化的主机
count
– 要在主机上启动的 worker 数量exename
– 主机上 Julia 可执行文件的路径,默认为"$(Sys.BINDIR)/julia"
或"$(Sys.BINDIR)/julia-debug"
exeflags
– 远程启动 Julia 时要使用的标志
userdata
字段用于由外部管理器为每个 worker 存储信息。
某些字段由 SSHManager
和类似的管理器使用
tunnel
–true
(使用隧道),false
(不使用隧道),或nothing
(使用管理器的默认值)multiplex
–true
(使用 SSH 多路复用进行隧道)或false
forward
– 用于 ssh 的-L
选项的转发选项bind_addr
– 要绑定的远程主机上的地址sshflags
– 用于建立 SSH 连接的标志max_parallel
– 主机上并行连接的最大 worker 数量
某些字段由 LocalManager
和 SSHManager
都使用
connect_at
– 确定这是 worker-to-worker 还是 driver-to-worker 设置调用process
– 将连接的进程(通常管理器将在addprocs
期间分配此进程)ospid
– 按照主机操作系统的进程 ID,用于中断 worker 进程environ
– 用于存储 Local/SSH 管理器临时信息的私有字典ident
– 由ClusterManager
标识的 workerconnect_idents
– 如果使用自定义拓扑,worker 必须连接到的 worker ID 列表enable_threaded_blas
–true
、false
或nothing
,是否在 worker 上使用线程化的 BLAS
Distributed.launch
— 函数launch(manager::ClusterManager, params::Dict, launched::Array, launch_ntfy::Condition)
由集群管理器实现。对于此函数启动的每个 Julia 工作器,它应该将一个 WorkerConfig
条目追加到 launched
并通知 launch_ntfy
。一旦 manager
请求的所有工作器启动,该函数必须退出。params
是一个字典,包含调用 addprocs
时使用的所有关键字参数。
Distributed.manage
— 函数manage(manager::ClusterManager, id::Integer, config::WorkerConfig. op::Symbol)
由集群管理器实现。它在主进程上调用,在工作器的生命周期内,使用适当的 op
值
- 当工作器从 Julia 工作器池中添加/删除时,使用
:register
/:deregister
。 - 当调用
interrupt(workers)
时,使用:interrupt
。ClusterManager
应该向相应的工作器发送一个中断信号。 - 使用
:finalize
用于清理目的。
Base.kill
— 方法kill(manager::ClusterManager, pid::Int, config::WorkerConfig)
由集群管理器实现。它在主进程上调用,由 rmprocs
调用。它应该导致由 pid
指定的远程工作器退出。kill(manager::ClusterManager.....)
在 pid
上执行远程 exit()
。
Sockets.connect
— 方法connect(manager::ClusterManager, pid::Int, config::WorkerConfig) -> (instrm::IO, outstrm::IO)
由使用自定义传输的集群管理器实现。它应该建立一个逻辑连接到具有 id pid
的工作器,由 config
指定,并返回一对 IO
对象。来自 pid
到当前进程的消息将从 instrm
读取,而要发送到 pid
的消息将写入 outstrm
。自定义传输实现必须确保消息完整且按顺序传递和接收。connect(manager::ClusterManager.....)
在工作器之间建立 TCP/IP 套接字连接。
Distributed.init_worker
— 函数init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager())
由实现自定义传输的集群管理器调用。它将新启动的进程初始化为工作器。命令行参数 --worker[=<cookie>]
的作用是使用 TCP/IP 套接字进行传输,将进程初始化为工作器。cookie
是一个 cluster_cookie
。
Distributed.start_worker
— 函数start_worker([out::IO=stdout], cookie::AbstractString=readline(stdin); close_stdin::Bool=true, stderr_to_stdout::Bool=true)
start_worker
是一个内部函数,是通过 TCP/IP 连接的工作器进程的默认入口点。它将进程设置为 Julia 集群工作器。
主机:端口信息写入流 out
(默认为标准输出)。
该函数在需要时从标准输入读取 cookie,并在一个空闲端口(或者如果指定了,则在 --bind-to
命令行选项中的端口)上监听,并调度任务来处理传入的 TCP 连接和请求。它还(可选)关闭标准输入并将标准错误重定向到标准输出。
它不返回。
Distributed.process_messages
— 函数process_messages(r_stream::IO, w_stream::IO, incoming::Bool=true)
由使用自定义传输的集群管理器调用。当自定义传输实现接收到来自远程工作器的第一条消息时,应该调用它。自定义传输必须管理与远程工作器的逻辑连接,并提供两个 IO
对象,一个用于传入消息,另一个用于发送到远程工作器的消息。如果 incoming
为 true
,则远程对等体发起了连接。连接发起者中的任意一方都会发送集群 cookie 及其 Julia 版本号以执行身份验证握手。
另请参见 cluster_cookie
。
Distributed.default_addprocs_params
— 函数default_addprocs_params(mgr::ClusterManager) -> Dict{Symbol, Any}
由集群管理器实现。调用 addprocs(mgr)
时传递的默认关键字参数。通过调用 default_addprocs_params()
可以获得最小选项集。