dgamore.mpi_utils#
Multiprocessing (MPI) utilities for the non-local step — a single module covering everything parallel:
low-level message-chunking primitives (
send_rows/recv_rows_into/recv_rows_alloc/bcast_rows/bcast_rows_into/send_bytes/recv_bytesand therow_chunks/chunk_stepbound math) that split any transfer below the 2 GB MPI per-message limit and write received data directly into the caller’s preallocated contiguous buffer (no per-chunk staging copy);the work distributor
MpiDistributor, which splits a number of tasks (typically the q-points of the irreducible Brillouin zone) into per-rank contiguous slices and wraps the collective/point-to-point communication (scatter, gather, all-gather, all-reduce, broadcast, object send/recv); each rank owns a private HDF5 file for spilling intermediate results without write conflicts;higher-level data-movement routines built on top: mapping objects between the irreducible-BZ and full-BZ rank distributions (a simple gather/scatter version and a fully distributed peer-to-peer version that never assembles the full object on one rank), a node-aware frequency distribution, and a distributed 3D FFT over the BZ via pencil redistributions.
MAX_MPI_BYTES is the single source of the 2 GB limit; the chunking helpers take it as an explicit limit
argument (read at call time by the callers), so monkeypatching mpi_utils.MAX_MPI_BYTES in tests still forces the
chunked path.
Functions
|
Broadcasts a numpy array from |
|
Broadcasts into an existing contiguous buffer view, chunked along axis 0. |
|
Returns the number of leading-axis rows that fit into a single |
|
Maps an object from the irreducible BZ distribution to the full BZ distribution without ever assembling the full object on any single rank. |
|
Main routine: Call this for objects that are local to a rank but in the respective full BZ slice. |
|
Re-lays out a q-distributed pairing vertex into a fermionic-frequency-distributed one for the Eliashberg solver: each rank ends up with the full BZ but only its node-aware slice of the (second) fermionic frequency. |
|
Calculates which global flattened q-indices (0 to |
|
Maps an object from the irreducible-BZ rank distribution to the full-BZ distribution the simple way: gather to rank 0, unfold to the full BZ there, then scatter back. |
|
Receives a chunked raw byte blob sent by |
|
Allocates an array of the given shape/dtype and receives into it (see |
|
Receives rows from |
|
Yields |
|
Sends a raw byte blob to |
|
Sends |
- class dgamore.mpi_utils.MpiDistributor(ntasks: int = 1, comm: mpi4py.MPI.Comm = None, name: str = '', output_path: str = None)[source]#
Bases:
objectDistributes tasks among all available cores. Uses the first (q) dimension to slice the vertex data into chunks and sends it to all active MPI processes. Saves intermediate computational results in rank files. Each rank has its own instance of an MPI distributor and hdf5-file to avoid write conflicts.
- allgather(rank_result: ndarray = None) ndarray[source]#
Gathers each rank’s array slice (along axis 0) into the full array, replicated on every rank. The common case is a single bandwidth-optimal
Allgathervcollective (a derived “row” count keeps the per-rank counts and displacements small, so the result is correct regardless of element size); only when a rank’s slice would exceed the 2 GB per-message limit does it fall back to per-rank chunked broadcasts.
- allreduce(rank_result=None) ndarray[source]#
Sums an array element-wise across all ranks in place and returns the result on every rank, chunked along axis 0 so no single message exceeds the 2 GB MPI limit (consistent with the rest of the module).
Allreduceis collective, so the chunk schedule must be identical on every rank. That holds here because the reduced arrays are always equally shaped across ranks (the callers reduce full, replicated quantities such as the full-k-space self-energy / Fock term — each rank holds a partial sum of the same array), so every rank derives the same chunk boundaries. The single-chunk case is byte-for-byte the previous behavior.- Parameters:
rank_result – This rank’s contribution; reduced in place. Must have the same shape on every rank.
- Returns:
The summed array (same buffer), identical on all ranks.
- Return type:
- barrier()[source]#
Synchronizes all ranks. Forces a garbage collection first so that all ranks free their memory before the barrier.
- Returns:
None.
- bcast(data, root=0)[source]#
Broadcasts an arbitrary (picklable) object from
rootto all ranks.- Parameters:
data – The object to broadcast (only read on
root).root – The broadcasting rank.
- Returns:
The broadcast object on every rank.
- bcast_chunked(arr: ndarray, root: int = 0) ndarray[source]#
Broadcasts a large numpy array from
rootto all ranks, using raw MPI buffers and chunking along axis 0 to respect the 2 GB MPI message limit.
- bcast_npoint(obj, root: int = 0)[source]#
Broadcasts an N-point-like object (one exposing a
.matnumpy array) fromrootto all ranks. The large.matis broadcast as raw sub-2 GB chunks (so there is no multi-gigabyte pickle blob and no >2 GB message), while the rest of the object travels as a small pickled metadata blob — the broadcast analogue ofsend_to_rank()/recv_from_rank(). Prefer this overbcast()for large objects such as a full-BZ self-energy or gap function, both to respect the 2 GB limit and to avoid the full in-memory pickle copy.- Parameters:
obj – The object to broadcast; must expose a
.matnumpy array attribute. Only read onroot.root (int) – The broadcasting rank.
- Returns:
The broadcast object with its
.matattached, on every rank.
- close_file()[source]#
Closes this rank’s hdf5 file. Silently does nothing if it is not open.
- Returns:
None.
- property comm: mpi4py.MPI.Comm#
The MPI communicator this distributor operates on.
- Returns:
The MPI communicator.
- static create_distributor(ntasks: int, comm: mpi4py.MPI.Comm = None, name: str = '', output_path: str = None) MpiDistributor[source]#
Factory that creates an
MpiDistributor, defaulting toMPI.COMM_WORLDif no communicator is given.- Parameters:
- Returns:
The created
MpiDistributor.- Return type:
- delete_file()[source]#
Deletes this rank’s hdf5 spill file. Silently does nothing if it does not exist.
- Returns:
None.
- gather(rank_result: ndarray = None, root: int = 0) ndarray[source]#
Gathers each rank’s array slice into the full array, in correct task order, on the
rootrank only. Handles arrays exceeding the 2 GB MPI limit by chunking along axis 0.
- property is_root: bool#
Whether the current process is the root rank.
- Returns:
True if the current rank is the root rank (rank 0).
- property mpi_size: int#
The communicator size.
- Returns:
The total number of MPI processes in the communicator.
- property my_size: int#
The number of tasks owned by the current rank.
- Returns:
The number of tasks assigned to the current rank.
- property my_slice: int#
The current rank’s slice into the full task list.
- Returns:
The
sliceobject selecting the current rank’s portion of the full task list.
- property my_tasks: ndarray#
The task indices owned by the current rank.
- Returns:
The task indices assigned to the current rank (e.g. the q-points it processes).
- property ntasks: int#
The total number of distributed tasks.
- Returns:
The total number of tasks to be distributed (e.g. the number of irreducible-BZ q-points).
- open_file()[source]#
Opens this rank’s hdf5 file for read/write. Silently does nothing if the file is missing.
- Returns:
None.
- recv_from_rank(source: int, base_tag: int = 0)[source]#
Receives an object sent by
send_to_rank(): reconstructs the pickled metadata object and reattaches the chunk-received.matarray.- Parameters:
source (int) – Source rank.
base_tag (int) – Base MPI tag matching the one used by
send_to_rank().
- Returns:
The reconstructed object with its
.matarray attached.
- scatter(full_data: ndarray = None, root: int = 0)[source]#
Scatters the full array (held on
root) along axis 0 into the per-rank task slices. Handles the 2 GB MPI limit by chunking. The single-rank case wherefull_dataalready has the rank-local length is passed through directly.- Parameters:
- Returns:
This rank’s slice of the data (shape
(my_size, ...)).- Raises:
TypeError – If
full_datais given but is not a numpy array.ValueError – If
full_data’s leading length matches neitherntasksnor the single-rank case.
- send_to_rank(obj, dest: int, base_tag: int = 0)[source]#
Sends an N-point-like object to a single rank. The large
.matarray is sent as raw chunks (to avoid holding a full pickle blob in memory), while the rest of the object is pickled into a small metadata blob.
- dgamore.mpi_utils.bcast_rows(comm, arr: ndarray, root: int, limit: int = 2147483647) ndarray[source]#
Broadcasts a numpy array from
rootto all ranks, chunked along axis 0. Non-root ranks allocate the receive buffer from the broadcast shape/dtype.
- dgamore.mpi_utils.bcast_rows_into(comm, view: ndarray, root: int, limit: int = 2147483647) ndarray[source]#
Broadcasts into an existing contiguous buffer view, chunked along axis 0. This is a collective call: every rank must pass the matching view (same shape/dtype). Used to fill one rank’s slice of an all-gather target.
- Parameters:
- Returns:
view(filled in place).- Return type:
- dgamore.mpi_utils.chunk_step(itemsize: int, items_per_row: int, limit: int = 2147483647) int[source]#
Returns the number of leading-axis rows that fit into a single
limit-byte message (at least 1). This is the raw step used by callers that drive their own (e.g. non-blocking) send/receive loop.
- dgamore.mpi_utils.exchange_and_map_irrbz_fullbz(obj: FourPoint, mpi_dist_irrk: MpiDistributor, mpi_dist_fullbz: MpiDistributor) FourPoint[source]#
Maps an object from the irreducible BZ distribution to the full BZ distribution without ever assembling the full object on any single rank.
Each rank holds a slice of the object over the irreducible BZ (shape [q_irr_rank, …]). This routine redistributes the data peer-to-peer so that each rank ends up with a slice over the full BZ (shape [q_full_rank, …]), with symmetry-equivalent points correctly replicated according to the
irrk_invmapping.If
config.lattice.q_gridis in auto-discovered symmetry mode (itsspecify_auto_symmetrieshas been called), the per-k orbital transformation(sigma_k, U_k, conj_k)is also applied locally on each rank, using only the transformation arrays sliced to that rank’s FBZ range. No global gather is needed.This is a distributed replacement for the pattern (see also
map_irrbz_fullbz()):obj.mat = mpi_dist_irrk.gather(obj.mat) if comm.rank == 0: obj = obj.map_to_full_bz(q_grid) obj.mat = mpi_dist_fullbz.scatter(obj.mat)
which would require rank 0 to hold the entire full-BZ object in memory.
- Parameters:
obj (FourPoint) – The
FourPointdistributed over the irreducible BZ.mpi_dist_irrk (MpiDistributor) – MPI distributor over the irreducible BZ (source layout).
mpi_dist_fullbz (MpiDistributor) – MPI distributor over the full BZ (target layout).
- Returns:
The
FourPointdistributed over the full BZ (compressed q dimension).- Return type:
- dgamore.mpi_utils.execute_distributed_fft(obj: FourPoint, comm: mpi4py.MPI.Comm) FourPoint[source]#
Main routine: Call this for objects that are local to a rank but in the respective full BZ slice. E.g., after a call to
exchange_and_map_irrbz_fullbz(). This routine performs a distributed 3D FFT by redistributing the data into pencil decompositions for each dimension, performing local FFTs, and then redistributing back to the original layout. The final result is thatobj.matis transformed in place to the Fourier space representation corresponding to the full BZ. Attention: modifies the object in place!
- dgamore.mpi_utils.gather_full_ibz_for_vslice(gamma_r: FourPoint, mpi_dist_irrq: MpiDistributor, mpi_dist_v: MpiDistributor, q_grid: KGrid) FourPoint[source]#
Re-lays out a q-distributed pairing vertex into a fermionic-frequency-distributed one for the Eliashberg solver: each rank ends up with the full BZ but only its node-aware slice of the (second) fermionic frequency. The momentum is unfolded to the full BZ locally. Ranks with an empty frequency slice receive
None.- Parameters:
gamma_r (FourPoint) – The
FourPointpairing vertex distributed over the irreducible BZ.mpi_dist_irrq (MpiDistributor) – MPI distributor over the irreducible BZ q-points (source layout).
mpi_dist_v (MpiDistributor) – MPI distributor over the fermionic frequency axis (updated in place to the node-aware split).
q_grid (KGrid) – The
KGridused to unfold to the full BZ.
- Returns:
The full-BZ
FourPointfor this rank’s frequency slice, orNoneif the slice is empty.- Return type:
- dgamore.mpi_utils.get_pencil_indices(rank: int, size: int, nq: tuple[int, int, int], layout: str) ndarray[source]#
Calculates which global flattened q-indices (0 to
n_tot - 1) a rank owns under a given decomposition layout. The"flat"layout matchesMpiDistributor._distribute_tasks()(excess on the last ranks); the pencil layouts assign whole lines along one axis so a subsequent 1D FFT along that axis is rank-local.- Parameters:
- Returns:
The global flattened q-indices owned by
rank.- Raises:
ValueError – If
layoutis not one of the supported layouts.- Return type:
- dgamore.mpi_utils.map_irrbz_fullbz(obj, mpi_dist_irrk, mpi_dist_fullbz)[source]#
Maps an object from the irreducible-BZ rank distribution to the full-BZ distribution the simple way: gather to rank 0, unfold to the full BZ there, then scatter back. Requires rank 0 to hold the full-BZ object transiently.
- Parameters:
obj – The object distributed over the irreducible BZ (must support
map_to_full_bz()).mpi_dist_irrk – MPI distributor over the irreducible BZ (source layout).
mpi_dist_fullbz – MPI distributor over the full BZ (target layout).
- Returns:
The object distributed over the full BZ.
- dgamore.mpi_utils.recv_bytes(comm, source: int, base_tag: int = 0, limit: int = 2147483647) bytes[source]#
Receives a chunked raw byte blob sent by
send_bytes().- Parameters:
comm – The MPI communicator.
source (int) – Source rank.
base_tag (int) – Base MPI tag matching the one used by
send_bytes().limit (int) – Maximum message size in bytes.
- Returns:
The reassembled bytes.
- Return type:
- dgamore.mpi_utils.recv_rows_alloc(comm, shape: tuple, dtype, source: int, base_tag: int = 0, limit: int = 2147483647) ndarray[source]#
Allocates an array of the given shape/dtype and receives into it (see
recv_rows_into()).- Parameters:
- Returns:
The received array.
- Return type:
- dgamore.mpi_utils.recv_rows_into(comm, buf: ndarray, source: int, base_tag: int = 0, limit: int = 2147483647) ndarray[source]#
Receives rows from
sourcedirectly into the contiguous bufferbuf(no per-chunk staging buffer). The buffer’s axis-0 slices must be contiguous (e.g.bufis C-contiguous or an axis-0 view of such an array).- Parameters:
- Returns:
buf(filled in place).- Return type:
- dgamore.mpi_utils.row_chunks(n_rows: int, itemsize: int, items_per_row: int, limit: int = 2147483647)[source]#
Yields
(start, stop)row-index pairs splittingn_rowsleading-axis rows into sub-limit-byte chunks.- Parameters:
- Returns:
A generator of
(start, stop)row-index pairs.
- dgamore.mpi_utils.send_bytes(comm, data: bytes, dest: int, base_tag: int = 0, limit: int = 2147483647) None[source]#
Sends a raw byte blob to
destin sub-limit-byte chunks, preceded by a small length message (atbase_tag); the chunks usebase_tag + 1 + chunk_index.