分布式计算

用于分布式并行处理的工具。

Distributed.addprocs函数
addprocs(manager::ClusterManager; kwargs...) -> List of process identifiers

通过指定的集群管理器启动工作进程。

例如,Beowulf 集群通过在 ClusterManagers.jl 包中实现的自定义集群管理器来支持。

新启动的工作进程等待主进程建立连接的时间(以秒为单位)可以通过工作进程环境中的变量 JULIA_WORKER_TIMEOUT 指定。仅在使用 TCP/IP 作为传输协议时相关。

为了在不阻塞 REPL 的情况下启动工作进程,或者在以编程方式启动工作进程时在包含函数中启动工作进程,请在自己的任务中执行 addprocs

示例

# On busy clusters, call `addprocs` asynchronously
t = @async addprocs(...)
# Utilize workers as and when they come online
if nprocs() > 1   # Ensure at least one new worker is available
   ....   # perform distributed execution
end
# Retrieve newly launched worker IDs, or any error messages
if istaskdone(t)   # Check if `addprocs` has completed to ensure `fetch` doesn't block
    if nworkers() == N
        new_pids = fetch(t)
    else
        fetch(t)
    end
end
来源
addprocs(machines; tunnel=false, sshflags=``, max_parallel=10, kwargs...) -> List of process identifiers

通过 SSH 在远程机器上添加工作进程。配置通过关键字参数完成(见下文)。特别是,exename 关键字可用于指定远程机器上的 julia 二进制文件的路径。

machines 是一个“机器规范”向量,这些规范以以下格式给出:[user@]host[:port] [bind_addr[:port]]user 默认设置为当前用户,port 默认设置为标准 SSH 端口。如果指定了 [bind_addr[:port]],则其他工作进程将通过指定的 bind_addrport 连接到此工作进程。

通过在 machines 向量中使用元组或 (machine_spec, count) 形式,可以在远程主机上启动多个进程,其中 count 是要启动的进程数。将 :auto 作为工作进程数量传递将启动与远程主机上的 CPU 线程数量相同的进程数。

示例:

addprocs([
    "remote1",               # one worker on 'remote1' logging in with the current username
    "user@remote2",          # one worker on 'remote2' logging in with the 'user' username
    "user@remote3:2222",     # specifying SSH port to '2222' for 'remote3'
    ("user@remote4", 4),     # launch 4 workers on 'remote4'
    ("user@remote5", :auto), # launch as many workers as CPU threads on 'remote5'
])

关键字参数:

  • tunnel:如果为 true,则将使用 SSH 隧道从主进程连接到工作进程。默认值为 false

  • multiplex:如果为 true,则将使用 SSH 多路复用进行 SSH 隧道。默认值为 false

  • ssh:用于启动工作进程的 SSH 客户端可执行文件的名称或路径。默认值为 "ssh"

  • sshflags:指定额外的 ssh 选项,例如 sshflags=`-i /home/foo/bar.pem`

  • max_parallel:指定在主机上并行连接的最大工作进程数。默认值为 10。

  • shell:指定 ssh 连接到的工作进程的 shell 类型。

    • shell=:posix:一个与 POSIX 兼容的 Unix/Linux shell (sh, ksh, bash, dash, zsh, 等)。默认值。

    • shell=:csh:一个 Unix C shell (csh, tcsh)。

    • shell=:wincmd:Microsoft Windows cmd.exe

  • dir:指定工作进程上的工作目录。默认设置为主机当前目录(由 pwd() 查找)。

  • enable_threaded_blas:如果为 true,则 BLAS 将在添加的进程中使用多线程运行。默认值为 false

  • exenamejulia 可执行文件的名称。默认值为 "$(Sys.BINDIR)/julia""$(Sys.BINDIR)/julia-debug"(具体取决于情况)。建议所有远程机器使用相同的 Julia 版本,因为否则序列化和代码分发可能会失败。

  • exeflags:传递给工作进程的额外标志。

  • topology:指定工作进程如何相互连接。在未连接的工作进程之间发送消息会导致错误。

    • topology=:all_to_all:所有进程相互连接。默认值。

    • topology=:master_worker:只有驱动进程,即 pid 1 连接到工作进程。工作进程不相互连接。

    • topology=:custom:集群管理器的 launch 方法通过 WorkerConfig 中的 identconnect_idents 字段指定连接拓扑。具有集群管理器身份 ident 的工作进程将连接到 connect_idents 中指定的所有工作进程。

  • lazy:仅适用于 topology=:all_to_all。如果为 true,则工作进程之间的连接以懒惰方式建立,即在工作进程之间首次进行远程调用时建立。默认值为 true。

  • env:提供字符串对数组,例如 env=["JULIA_DEPOT_PATH"=>"/depot"],以请求在远程机器上设置环境变量。默认情况下,只有环境变量 JULIA_WORKER_TIMEOUT 会自动从本地环境传递到远程环境。

  • cmdline_cookie:通过 --worker 命令行选项传递身份验证 cookie。通过 ssh stdio 传递 cookie 的(更安全)默认行为可能会在使用旧版本(ConPTY 之前的)Julia 或 Windows 版本的 Windows 工作进程中挂起,在这种情况下,cmdline_cookie=true 提供了一种解决方法。

Julia 1.6

在 Julia 1.6 中添加了 sshshellenvcmdline_cookie 关键字参数。

环境变量

如果主进程在 60.0 秒内无法与新启动的工作进程建立连接,工作进程将将其视为致命情况并终止。此超时可以通过环境变量 JULIA_WORKER_TIMEOUT 控制。主进程上 JULIA_WORKER_TIMEOUT 的值指定新启动的工作进程等待连接建立的时间(以秒为单位)。

来源
addprocs(np::Integer=Sys.CPU_THREADS; restrict=true, kwargs...) -> List of process identifiers

使用内置的 LocalManager 在本地主机上启动 np 个工作进程。

本地工作进程继承主进程的当前包环境(即,活动项目、LOAD_PATHDEPOT_PATH)。

关键字参数:

  • restrict::Bool:如果为 true(默认值),则绑定将限制为 127.0.0.1
  • direxenameexeflagsenvtopologylazyenable_threaded_blas:与 SSHManager 的效果相同,请参阅 addprocs(machines::AbstractVector) 的文档。
Julia 1.9

在 Julia 1.9 中添加了包环境的继承和 env 关键字参数。

来源
Distributed.nprocs函数
nprocs()

获取可用进程的数量。

示例

julia> nprocs()
3

julia> workers()
2-element Array{Int64,1}:
 2
 3
来源
Distributed.nworkers函数
nworkers()

获取可用工作进程的数量。这比 nprocs() 少一个。如果 nprocs() == 1,则等于 nprocs()

示例

$ julia -p 2

julia> nprocs()
3

julia> nworkers()
2
来源
Distributed.procs方法
procs()

返回所有进程标识符列表,包括 pid 1(不包括在 workers() 中)。

示例

$ julia -p 2

julia> procs()
3-element Array{Int64,1}:
 1
 2
 3
来源
Distributed.procs方法
procs(pid::Integer)

返回同一物理节点上的所有进程标识符列表。具体来说,返回绑定到与 pid 相同 IP 地址的所有工作进程。

来源
Distributed.workers函数
workers()

返回所有工作进程标识符列表。

示例

$ julia -p 2

julia> workers()
2-element Array{Int64,1}:
 2
 3
来源
Distributed.rmprocs函数
rmprocs(pids...; waitfor=typemax(Int))

移除指定的工作进程。请注意,只有进程 1 可以添加或移除工作进程。

参数 waitfor 指定等待工作进程关闭的时间。

  • 如果未指定,rmprocs 将等待所有请求的 pids 被移除。
  • 如果所有工作进程无法在请求的 waitfor 秒内终止,则会引发 ErrorException
  • waitfor 值为 0 时,调用立即返回,并将要移除的工作进程安排在其他任务中执行。返回已安排的 Task 对象。用户应在调用任何其他并行调用之前,在该任务上调用 wait

示例

$ julia -p 5

julia> t = rmprocs(2, 3, waitfor=0)
Task (runnable) @0x0000000107c718d0

julia> wait(t)

julia> workers()
3-element Array{Int64,1}:
 4
 5
 6
来源
Distributed.interrupt函数
interrupt(pids::Integer...)

中断指定工作进程上当前正在执行的任务。这相当于在本地机器上按下 Ctrl-C。如果未提供参数,则会中断所有工作进程。

来源
interrupt(pids::AbstractVector=workers())

中断指定工作进程上当前正在执行的任务。这相当于在本地机器上按下 Ctrl-C。如果未提供参数,则会中断所有工作进程。

来源
Distributed.myid函数
myid()

获取当前进程的 ID。

示例

julia> myid()
1

julia> remotecall_fetch(() -> myid(), 4)
4
来源
Distributed.pmap函数
pmap(f, [::AbstractWorkerPool], c...; distributed=true, batch_size=1, on_error=nothing, retry_delays=[], retry_check=nothing) -> collection

通过使用可用工作进程和任务将 f 应用于集合 c 的每个元素来转换集合 c

对于多个集合参数,按元素应用 f

请注意,f 必须对所有工作进程可用;有关详细信息,请参阅 代码可用性和加载包

如果没有指定工作进程池,则会使用所有可用工作进程,即默认工作进程池。

默认情况下,pmap 会将计算分布到所有指定工作进程上。若要仅使用本地进程并将计算分布到任务上,请指定 distributed=false。这等效于使用 asyncmap。例如,pmap(f, c; distributed=false) 等效于 asyncmap(f,c; ntasks=()->nworkers())

pmap 还可以通过 batch_size 参数使用进程和任务的混合方式。对于大于 1 的批处理大小,会将集合按批处理处理,每个批处理的长度为 batch_size 或更小。将一个批处理作为单个请求发送给空闲的工作进程,其中本地 asyncmap 使用多个并发任务处理批处理中的元素。

任何错误都会阻止 pmap 处理集合的剩余部分。若要覆盖此行为,可以通过 on_error 参数指定错误处理函数,该函数接收单个参数,即异常。该函数可以通过重新抛出错误来停止处理,或者要继续处理,则返回任何值,该值随后将与结果一起返回给调用方。

请考虑以下两个示例。第一个示例将异常对象内联返回,第二个示例将 0 返回到任何异常的位置。

julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=identity)
4-element Array{Any,1}:
 1
  ErrorException("foo")
 3
  ErrorException("foo")

julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=ex->0)
4-element Array{Int64,1}:
 1
 0
 3
 0

也可以通过重试失败的计算来处理错误。关键字参数 retry_delaysretry_check 传递给 retry,分别作为关键字参数 delayscheck。如果指定了批处理,并且整个批处理失败,则会重试批处理中的所有项。

请注意,如果同时指定了 on_errorretry_delays,则会先调用 on_error 钩子,然后进行重试。如果 on_error 不抛出(或重新抛出)异常,则不会重试该元素。

示例:对于错误,对元素上的 f 重试最多 3 次,两次重试之间没有延迟。

pmap(f, c; retry_delays = zeros(3))

示例:仅当异常类型不是 InexactError 时才重试 f,并使用指数递增的延迟最多重试 3 次。对于所有 InexactError 出现的情况,返回 NaN

pmap(f, c; on_error = e->(isa(e, InexactError) ? NaN : rethrow()), retry_delays = ExponentialBackOff(n = 3))
来源
Distributed.RemoteException类型
RemoteException(captured)

远程计算上的异常会被捕获并在本地重新抛出。RemoteException 封装了工作进程的 pid 和捕获的异常。CapturedException 捕获远程异常以及引发异常时调用堆栈的可序列化形式。

来源
Distributed.Future类型
Future(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing)

Future 是未知终止状态和时间的一次计算的占位符。对于多个潜在计算,请参阅 RemoteChannel。有关识别 AbstractRemoteRef 的信息,请参阅 remoteref_id

来源
Distributed.RemoteChannel类型
RemoteChannel(pid::Integer=myid())

对进程 pid 上的 Channel{Any}(1) 进行引用。默认 pid 是当前进程。

RemoteChannel(f::Function, pid::Integer=myid())

创建特定大小和类型的远程通道的引用。f 是一个函数,在 pid 上执行时必须返回 AbstractChannel 的实现。

例如,RemoteChannel(()->Channel{Int}(10), pid) 将返回对 pid 上类型为 Int、大小为 10 的通道的引用。

默认 pid 是当前进程。

来源
Base.fetch方法
fetch(x::Future)

等待并获取 Future 的值。获取的值会缓存在本地。对同一引用的 fetch 的进一步调用将返回缓存的值。如果远程值为异常,则会抛出 RemoteException,该异常会捕获远程异常和回溯。

来源
Distributed.remotecall方法
remotecall(f, id::Integer, args...; kwargs...) -> Future

在指定进程上对给定参数异步调用函数 f。返回 Future。如果存在,则关键字参数将传递给 f

来源
Distributed.remotecall_wait方法
remotecall_wait(f, id::Integer, args...; kwargs...)

在由工作进程 ID id 指定的 Worker 上执行一个消息中的更快 wait(remotecall(...))。如果存在,则关键字参数将传递给 f

另请参阅 waitremotecall

来源
Distributed.remotecall_fetch方法
remotecall_fetch(f, id::Integer, args...; kwargs...)

在一个消息中执行 fetch(remotecall(...))。如果存在,则关键字参数将传递给 f。所有远程异常都将在 RemoteException 中捕获并抛出。

另请参阅 fetchremotecall

示例

$ julia -p 2

julia> remotecall_fetch(sqrt, 2, 4)
2.0

julia> remotecall_fetch(sqrt, 2, -4)
ERROR: On worker 2:
DomainError with -4.0:
sqrt was called with a negative real argument but will only return a complex result if called with a complex argument. Try sqrt(Complex(x)).
...
来源
Distributed.remote_do方法
remote_do(f, id::Integer, args...; kwargs...) -> nothing

在工作进程 id 上异步执行 f。与 remotecall 不同,它不会存储计算结果,也不存在等待其完成的方法。

成功调用表示已接受请求以便在远程节点上执行。

虽然对同一工作进程的连续 remotecall 会按它们被调用的顺序序列化,但远程工作进程上的执行顺序是不确定的。例如,remote_do(f1, 2); remotecall(f2, 2); remote_do(f3, 2) 会按顺序序列化对 f1f2f3 的调用。但是,不能保证 f1 在工作进程 2 上执行之前 f3 会被执行。

f 抛出的任何异常都会打印到远程工作进程上的 stderr

如果存在,则关键字参数将传递给 f

来源
Base.put!方法
put!(rr::RemoteChannel, args...)

将一组值存储到 RemoteChannel。如果通道已满,则会阻塞,直到有空间可用为止。返回第一个参数。

来源
Base.put!方法
put!(rr::Future, v)

将值存储到 Future rrFuture 是只写一次远程引用。对已设置的 Future 进行 put! 操作会抛出 Exception。所有异步远程调用都会返回 Future,并在完成后将值设置为调用的返回值。

来源
Base.isready方法
isready(rr::RemoteChannel, args...)

确定 RemoteChannel 是否已存储值。请注意,此函数会导致竞争条件,因为当您收到其结果时,它可能不再为真。但是,它可以安全地用于 Future,因为它们只分配一次。

来源
Base.isready方法
isready(rr::Future)

确定 Future 是否已存储值。

如果参数 Future 由其他节点拥有,则此调用将阻塞以等待答案。建议在单独的任务中等待 rr,或者使用本地 Channel 作为代理。

p = 1
f = Future(p)
errormonitor(@async put!(f, remotecall_fetch(long_computation, p)))
isready(f)  # will not block
来源
Distributed.AbstractWorkerPool类型
AbstractWorkerPool

工作进程池的超类型,例如 WorkerPoolCachingPoolAbstractWorkerPool 应实现

  • push! - 将新的工作进程添加到整个池中(可用 + 繁忙)
  • put! - 将工作进程放回可用池中
  • take! - 从可用池中获取工作进程(用于远程函数执行)
  • length - 整个池中可用工作进程的数量
  • isready - 如果对池执行 take! 会阻塞,则返回 false,否则返回 true

上述操作的默认实现(在 AbstractWorkerPool 上)需要以下字段

  • channel::Channel{Int}
  • workers::Set{Int}

其中 channel 包含空闲工作进程 pid,workers 是与此池相关联的所有工作进程的集合。

来源
Distributed.WorkerPool类型
WorkerPool(workers::Union{Vector{Int},AbstractRange{Int}})

从一个 worker id 的向量或范围中创建一个 WorkerPool

示例

$ julia -p 3

julia> WorkerPool([2, 3])
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([2, 3]), RemoteChannel{Channel{Any}}(1, 1, 6))

julia> WorkerPool(2:4)
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 7))
来源
Distributed.CachingPool类型
CachingPool(workers::Vector{Int})

AbstractWorkerPool 的实现。 remoteremotecall_fetchpmap(以及其他在远程执行函数的远程调用)受益于在 worker 节点上缓存序列化/反序列化函数,特别是闭包(它们可能会捕获大量数据)。

远程缓存会为返回的 CachingPool 对象的整个生命周期维护。要提前清除缓存,请使用 clear!(pool)

对于全局变量,只有绑定在闭包中捕获,而不是数据。let 块可以用于捕获全局数据。

示例

const foo = rand(10^8);
wp = CachingPool(workers())
let foo = foo
    pmap(i -> sum(foo) + i, wp, 1:100);
end

以上操作只会将 foo 传输一次到每个 worker。

来源
Distributed.default_worker_pool函数
default_worker_pool()

AbstractWorkerPool 包含空闲的 workers - 由 remote(f)pmap 使用(默认情况下)。除非通过 default_worker_pool!(pool) 显式设置,否则默认 worker 池初始化为 WorkerPool

示例

$ julia -p 3

julia> default_worker_pool()
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:3), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 4))
来源
Distributed.clear!方法
clear!(pool::CachingPool) -> pool

从所有参与的 worker 中删除所有缓存的函数。

来源
Distributed.remotecall方法
remotecall(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future

WorkerPool 版本的 remotecall(f, pid, ....)。等待并从 pool 中获取一个空闲 worker,并在其上执行 remotecall

示例

$ julia -p 3

julia> wp = WorkerPool([2, 3]);

julia> A = rand(3000);

julia> f = remotecall(maximum, wp, A)
Future(2, 1, 6, nothing)

在这个例子中,任务在 pid 2 上运行,从 pid 1 调用。

来源
Distributed.remotecall_wait方法
remotecall_wait(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future

WorkerPool 版本的 remotecall_wait(f, pid, ....)。等待并从 pool 中获取一个空闲 worker,并在其上执行 remotecall_wait

示例

$ julia -p 3

julia> wp = WorkerPool([2, 3]);

julia> A = rand(3000);

julia> f = remotecall_wait(maximum, wp, A)
Future(3, 1, 9, nothing)

julia> fetch(f)
0.9995177101692958
来源
Distributed.remotecall_fetch方法
remotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...) -> result

WorkerPool 版本的 remotecall_fetch(f, pid, ....)。等待并从 pool 中获取一个空闲 worker,并在其上执行 remotecall_fetch

示例

$ julia -p 3

julia> wp = WorkerPool([2, 3]);

julia> A = rand(3000);

julia> remotecall_fetch(maximum, wp, A)
0.9995177101692958
来源
Distributed.remote_do方法
remote_do(f, pool::AbstractWorkerPool, args...; kwargs...) -> nothing

WorkerPool 版本的 remote_do(f, pid, ....)。等待并从 pool 中获取一个空闲 worker,并在其上执行 remote_do

来源
Distributed.@spawnat
@spawnat p expr

在一个表达式周围创建一个闭包,并在进程 p 上异步运行该闭包。返回一个指向结果的 Future。如果 p 是带引号的文字符号 :any,那么系统将自动选择一个处理器使用。

示例

julia> addprocs(3);

julia> f = @spawnat 2 myid()
Future(2, 1, 3, nothing)

julia> fetch(f)
2

julia> f = @spawnat :any myid()
Future(3, 1, 7, nothing)

julia> fetch(f)
3
Julia 1.3

:any 参数从 Julia 1.3 开始可用。

来源
Distributed.@fetch
@fetch expr

等效于 fetch(@spawnat :any expr)。请参阅 fetch@spawnat

示例

julia> addprocs(3);

julia> @fetch myid()
2

julia> @fetch myid()
3

julia> @fetch myid()
4

julia> @fetch myid()
2
来源
Distributed.@distributed
@distributed

一个分布式内存并行 for 循环,形式为

@distributed [reducer] for var = range
    body
end

指定的范围被分区并在所有 worker 上本地执行。如果指定了可选的 reducer 函数,@distributed 将在每个 worker 上执行本地归约,并在调用进程上执行最终归约。

请注意,没有 reducer 函数,@distributed 异步执行,即它在所有可用的 worker 上生成独立的任务并立即返回,而不会等待完成。要等待完成,请在调用前加上 @sync,例如

@sync @distributed for var = range
    body
end
来源
Distributed.@everywhere
@everywhere [procs()] expr

在所有 procs 上执行 Main 下的表达式。任何进程上的错误都将收集到一个 CompositeException 并抛出。例如

@everywhere bar = 1

将在所有当前进程上定义 Main.bar。任何稍后添加的进程(例如使用 addprocs())都不会定义该表达式。

@spawnat 不同,@everywhere 不会捕获任何局部变量。相反,局部变量可以使用插值进行广播

foo = 1
@everywhere bar = $foo

可选参数 procs 允许指定所有进程的子集来执行表达式。

类似于调用 remotecall_eval(Main, procs, expr),但有两个额外的功能

- `using` and `import` statements run on the calling process first, to ensure
  packages are precompiled.
- The current source file path used by `include` is propagated to other processes.
来源
Distributed.clear!方法
clear!(syms, pids=workers(); mod=Main)

通过将全局绑定初始化为 nothing 来清除模块中的全局绑定。syms 应该是 Symbol 类型或 Symbol 的集合。pidsmod 标识要重新初始化全局变量的进程和模块。只有在 mod 下定义的那些名称会被清除。

如果请求清除全局常量,则会抛出异常。

来源
Distributed.remoteref_id函数
remoteref_id(r::AbstractRemoteRef) -> RRID

FutureRemoteChannel 由字段标识

  • where - 指向引用实际存在的底层对象/存储所在的节点。

  • whence - 指向创建远程引用的节点。请注意,这与底层对象实际存在的节点不同。例如,从主进程调用 RemoteChannel(2) 会导致 where 值为 2,whence 值为 1。

  • id 在从 whence 指定的 worker 创建的所有引用中是唯一的。

whenceid 共同在所有 worker 中唯一标识一个引用。

remoteref_id 是一个低级 API,它返回一个 RRID 对象,该对象包装了远程引用的 whenceid 值。

来源
Distributed.worker_id_from_socket函数
worker_id_from_socket(s) -> pid

一个低级 API,它在给定一个 IO 连接或 Worker 的情况下,返回与其连接的 worker 的 pid。这在为类型编写自定义 serialize 方法时很有用,这会根据接收进程 ID 优化写出的数据。

来源

集群管理器接口

此接口提供了一种机制,可以在不同的集群环境中启动和管理 Julia worker。Base 中存在两种类型的管理器:LocalManager,用于在同一主机上启动其他 worker,以及 SSHManager,用于通过 ssh 在远程主机上启动 worker。TCP/IP 套接字用于连接和传输进程之间的消息。集群管理器可以使用不同的传输方式。

Distributed.ClusterManager类型
ClusterManager

集群管理器的超类型,它控制 worker 进程作为一个集群。集群管理器实现如何添加、删除和与 worker 通信。SSHManagerLocalManager 是它的子类型。

来源
Distributed.WorkerConfig类型
WorkerConfig

ClusterManager 使用的类型,用于控制添加到其集群中的 worker。某些字段用于所有集群管理器以访问主机

  • io – 用于访问 worker 的连接(IO 的子类型或 Nothing
  • host – 主机地址(StringNothing
  • port – 用于连接到 worker 的主机上的端口(IntNothing

一些用于集群管理器将 worker 添加到已初始化的主机

  • count – 要在主机上启动的 worker 数量
  • exename – 主机上 Julia 可执行文件的路径,默认为 "$(Sys.BINDIR)/julia""$(Sys.BINDIR)/julia-debug"
  • exeflags – 远程启动 Julia 时要使用的标志

userdata 字段用于由外部管理器为每个 worker 存储信息。

某些字段由 SSHManager 和类似的管理器使用

  • tunneltrue(使用隧道),false(不使用隧道),或 nothing(使用管理器的默认值)
  • multiplextrue(使用 SSH 多路复用进行隧道)或 false
  • forward – 用于 ssh 的 -L 选项的转发选项
  • bind_addr – 要绑定的远程主机上的地址
  • sshflags – 用于建立 SSH 连接的标志
  • max_parallel – 主机上并行连接的最大 worker 数量

某些字段由 LocalManagerSSHManager 都使用

  • connect_at – 确定这是 worker-to-worker 还是 driver-to-worker 设置调用
  • process – 将连接的进程(通常管理器将在 addprocs 期间分配此进程)
  • ospid – 按照主机操作系统的进程 ID,用于中断 worker 进程
  • environ – 用于存储 Local/SSH 管理器临时信息的私有字典
  • ident – 由 ClusterManager 标识的 worker
  • connect_idents – 如果使用自定义拓扑,worker 必须连接到的 worker ID 列表
  • enable_threaded_blastruefalsenothing,是否在 worker 上使用线程化的 BLAS
来源
Distributed.launch函数
launch(manager::ClusterManager, params::Dict, launched::Array, launch_ntfy::Condition)

由集群管理器实现。对于此函数启动的每个 Julia 工作器,它应该将一个 WorkerConfig 条目追加到 launched 并通知 launch_ntfy。一旦 manager 请求的所有工作器启动,该函数必须退出。params 是一个字典,包含调用 addprocs 时使用的所有关键字参数。

来源
Distributed.manage函数
manage(manager::ClusterManager, id::Integer, config::WorkerConfig. op::Symbol)

由集群管理器实现。它在主进程上调用,在工作器的生命周期内,使用适当的 op

  • 当工作器从 Julia 工作器池中添加/删除时,使用 :register/:deregister
  • 当调用 interrupt(workers) 时,使用 :interruptClusterManager 应该向相应的工作器发送一个中断信号。
  • 使用 :finalize 用于清理目的。
来源
Base.kill方法
kill(manager::ClusterManager, pid::Int, config::WorkerConfig)

由集群管理器实现。它在主进程上调用,由 rmprocs 调用。它应该导致由 pid 指定的远程工作器退出。kill(manager::ClusterManager.....)pid 上执行远程 exit()

来源
Sockets.connect方法
connect(manager::ClusterManager, pid::Int, config::WorkerConfig) -> (instrm::IO, outstrm::IO)

由使用自定义传输的集群管理器实现。它应该建立一个逻辑连接到具有 id pid 的工作器,由 config 指定,并返回一对 IO 对象。来自 pid 到当前进程的消息将从 instrm 读取,而要发送到 pid 的消息将写入 outstrm。自定义传输实现必须确保消息完整且按顺序传递和接收。connect(manager::ClusterManager.....) 在工作器之间建立 TCP/IP 套接字连接。

来源
Distributed.init_worker函数
init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager())

由实现自定义传输的集群管理器调用。它将新启动的进程初始化为工作器。命令行参数 --worker[=<cookie>] 的作用是使用 TCP/IP 套接字进行传输,将进程初始化为工作器。cookie 是一个 cluster_cookie

来源
Distributed.start_worker函数
start_worker([out::IO=stdout], cookie::AbstractString=readline(stdin); close_stdin::Bool=true, stderr_to_stdout::Bool=true)

start_worker 是一个内部函数,是通过 TCP/IP 连接的工作器进程的默认入口点。它将进程设置为 Julia 集群工作器。

主机:端口信息写入流 out(默认为标准输出)。

该函数在需要时从标准输入读取 cookie,并在一个空闲端口(或者如果指定了,则在 --bind-to 命令行选项中的端口)上监听,并调度任务来处理传入的 TCP 连接和请求。它还(可选)关闭标准输入并将标准错误重定向到标准输出。

它不返回。

来源
Distributed.process_messages函数
process_messages(r_stream::IO, w_stream::IO, incoming::Bool=true)

由使用自定义传输的集群管理器调用。当自定义传输实现接收到来自远程工作器的第一条消息时,应该调用它。自定义传输必须管理与远程工作器的逻辑连接,并提供两个 IO 对象,一个用于传入消息,另一个用于发送到远程工作器的消息。如果 incomingtrue,则远程对等体发起了连接。连接发起者中的任意一方都会发送集群 cookie 及其 Julia 版本号以执行身份验证握手。

另请参见 cluster_cookie

来源
Distributed.default_addprocs_params函数
default_addprocs_params(mgr::ClusterManager) -> Dict{Symbol, Any}

由集群管理器实现。调用 addprocs(mgr) 时传递的默认关键字参数。通过调用 default_addprocs_params() 可以获得最小选项集。

来源