dgamore.mpi_utils#

Multiprocessing (MPI) utilities for the non-local step — a single module covering everything parallel:

  • low-level message-chunking primitives (send_rows / recv_rows_into / recv_rows_alloc / bcast_rows / bcast_rows_into / send_bytes / recv_bytes and the row_chunks / chunk_step bound math) that split any transfer below the 2 GB MPI per-message limit and write received data directly into the caller’s preallocated contiguous buffer (no per-chunk staging copy);

  • the work distributor MpiDistributor, which splits a number of tasks (typically the q-points of the irreducible Brillouin zone) into per-rank contiguous slices and wraps the collective/point-to-point communication (scatter, gather, all-gather, all-reduce, broadcast, object send/recv); each rank owns a private HDF5 file for spilling intermediate results without write conflicts;

  • higher-level data-movement routines built on top: mapping objects between the irreducible-BZ and full-BZ rank distributions (a simple gather/scatter version and a fully distributed peer-to-peer version that never assembles the full object on one rank), a node-aware frequency distribution, and a distributed 3D FFT over the BZ via pencil redistributions.

MAX_MPI_BYTES is the single source of the 2 GB limit; the chunking helpers take it as an explicit limit argument (read at call time by the callers), so monkeypatching mpi_utils.MAX_MPI_BYTES in tests still forces the chunked path.

Functions

bcast_rows(comm, arr, root[, limit])

Broadcasts a numpy array from root to all ranks, chunked along axis 0.

bcast_rows_into(comm, view, root[, limit])

Broadcasts into an existing contiguous buffer view, chunked along axis 0.

chunk_step(itemsize, items_per_row[, limit])

Returns the number of leading-axis rows that fit into a single limit-byte message (at least 1).

exchange_and_map_irrbz_fullbz(obj, ...)

Maps an object from the irreducible BZ distribution to the full BZ distribution without ever assembling the full object on any single rank.

execute_distributed_fft(obj, comm)

Main routine: Call this for objects that are local to a rank but in the respective full BZ slice.

gather_full_ibz_for_vslice(gamma_r, ...)

Re-lays out a q-distributed pairing vertex into a fermionic-frequency-distributed one for the Eliashberg solver: each rank ends up with the full BZ but only its node-aware slice of the (second) fermionic frequency.

get_pencil_indices(rank, size, nq, layout)

Calculates which global flattened q-indices (0 to n_tot - 1) a rank owns under a given decomposition layout.

map_irrbz_fullbz(obj, mpi_dist_irrk, ...)

Maps an object from the irreducible-BZ rank distribution to the full-BZ distribution the simple way: gather to rank 0, unfold to the full BZ there, then scatter back.

recv_bytes(comm, source[, base_tag, limit])

Receives a chunked raw byte blob sent by send_bytes().

recv_rows_alloc(comm, shape, dtype, source)

Allocates an array of the given shape/dtype and receives into it (see recv_rows_into()).

recv_rows_into(comm, buf, source[, ...])

Receives rows from source directly into the contiguous buffer buf (no per-chunk staging buffer).

row_chunks(n_rows, itemsize, items_per_row)

Yields (start, stop) row-index pairs splitting n_rows leading-axis rows into sub-limit-byte chunks.

send_bytes(comm, data, dest[, base_tag, limit])

Sends a raw byte blob to dest in sub-limit-byte chunks, preceded by a small length message (at base_tag); the chunks use base_tag + 1 + chunk_index.

send_rows(comm, arr, dest[, base_tag, limit])

Sends arr to dest in sub-limit-byte chunks along axis 0 (tag = base_tag + chunk_index).

class dgamore.mpi_utils.MpiDistributor(ntasks: int = 1, comm: mpi4py.MPI.Comm = None, name: str = '', output_path: str = None)[source]#

Bases: object

Distributes tasks among all available cores. Uses the first (q) dimension to slice the vertex data into chunks and sends it to all active MPI processes. Saves intermediate computational results in rank files. Each rank has its own instance of an MPI distributor and hdf5-file to avoid write conflicts.

Parameters:
  • ntasks (int)

  • comm (mpi4py.MPI.Comm)

  • name (str)

  • output_path (str)

allgather(rank_result: ndarray = None) ndarray[source]#

Gathers each rank’s array slice (along axis 0) into the full array, replicated on every rank. The common case is a single bandwidth-optimal Allgatherv collective (a derived “row” count keeps the per-rank counts and displacements small, so the result is correct regardless of element size); only when a rank’s slice would exceed the 2 GB per-message limit does it fall back to per-rank chunked broadcasts.

Parameters:

rank_result (ndarray) – This rank’s slice of the result (leading axis indexes the rank’s tasks).

Returns:

The full array of shape (ntasks, ...) on all ranks.

Return type:

ndarray

allreduce(rank_result=None) ndarray[source]#

Sums an array element-wise across all ranks in place and returns the result on every rank, chunked along axis 0 so no single message exceeds the 2 GB MPI limit (consistent with the rest of the module).

Allreduce is collective, so the chunk schedule must be identical on every rank. That holds here because the reduced arrays are always equally shaped across ranks (the callers reduce full, replicated quantities such as the full-k-space self-energy / Fock term — each rank holds a partial sum of the same array), so every rank derives the same chunk boundaries. The single-chunk case is byte-for-byte the previous behavior.

Parameters:

rank_result – This rank’s contribution; reduced in place. Must have the same shape on every rank.

Returns:

The summed array (same buffer), identical on all ranks.

Return type:

ndarray

barrier()[source]#

Synchronizes all ranks. Forces a garbage collection first so that all ranks free their memory before the barrier.

Returns:

None.

bcast(data, root=0)[source]#

Broadcasts an arbitrary (picklable) object from root to all ranks.

Parameters:
  • data – The object to broadcast (only read on root).

  • root – The broadcasting rank.

Returns:

The broadcast object on every rank.

bcast_chunked(arr: ndarray, root: int = 0) ndarray[source]#

Broadcasts a large numpy array from root to all ranks, using raw MPI buffers and chunking along axis 0 to respect the 2 GB MPI message limit.

Parameters:
  • arr (ndarray) – The array to broadcast (only read on root; non-root ranks allocate from the broadcast metadata).

  • root (int) – The broadcasting rank.

Returns:

The broadcast array on every rank.

Return type:

ndarray

bcast_npoint(obj, root: int = 0)[source]#

Broadcasts an N-point-like object (one exposing a .mat numpy array) from root to all ranks. The large .mat is broadcast as raw sub-2 GB chunks (so there is no multi-gigabyte pickle blob and no >2 GB message), while the rest of the object travels as a small pickled metadata blob — the broadcast analogue of send_to_rank()/recv_from_rank(). Prefer this over bcast() for large objects such as a full-BZ self-energy or gap function, both to respect the 2 GB limit and to avoid the full in-memory pickle copy.

Parameters:
  • obj – The object to broadcast; must expose a .mat numpy array attribute. Only read on root.

  • root (int) – The broadcasting rank.

Returns:

The broadcast object with its .mat attached, on every rank.

close_file()[source]#

Closes this rank’s hdf5 file. Silently does nothing if it is not open.

Returns:

None.

property comm: mpi4py.MPI.Comm#

The MPI communicator this distributor operates on.

Returns:

The MPI communicator.

static create_distributor(ntasks: int, comm: mpi4py.MPI.Comm = None, name: str = '', output_path: str = None) MpiDistributor[source]#

Factory that creates an MpiDistributor, defaulting to MPI.COMM_WORLD if no communicator is given.

Parameters:
  • ntasks (int) – Total number of tasks to distribute.

  • comm (mpi4py.MPI.Comm) – The MPI communicator (MPI.COMM_WORLD if None).

  • name (str) – Prefix for the per-rank HDF5 spill file.

  • output_path (str) – Directory the per-rank HDF5 spill file is created in; if None, no spill file is opened.

Returns:

The created MpiDistributor.

Return type:

MpiDistributor

delete_file()[source]#

Deletes this rank’s hdf5 spill file. Silently does nothing if it does not exist.

Returns:

None.

gather(rank_result: ndarray = None, root: int = 0) ndarray[source]#

Gathers each rank’s array slice into the full array, in correct task order, on the root rank only. Handles arrays exceeding the 2 GB MPI limit by chunking along axis 0.

Parameters:
  • rank_result (ndarray) – This rank’s slice of the result (leading axis indexes the rank’s tasks).

  • root (int) – The rank that collects the full array.

Returns:

The full array of shape (ntasks, ...) on root, None on the other ranks.

Return type:

ndarray

property is_root: bool#

Whether the current process is the root rank.

Returns:

True if the current rank is the root rank (rank 0).

property mpi_size: int#

The communicator size.

Returns:

The total number of MPI processes in the communicator.

property my_rank: int#

The current process’s rank.

Returns:

The rank of the current process.

property my_size: int#

The number of tasks owned by the current rank.

Returns:

The number of tasks assigned to the current rank.

property my_slice: int#

The current rank’s slice into the full task list.

Returns:

The slice object selecting the current rank’s portion of the full task list.

property my_tasks: ndarray#

The task indices owned by the current rank.

Returns:

The task indices assigned to the current rank (e.g. the q-points it processes).

property ntasks: int#

The total number of distributed tasks.

Returns:

The total number of tasks to be distributed (e.g. the number of irreducible-BZ q-points).

open_file()[source]#

Opens this rank’s hdf5 file for read/write. Silently does nothing if the file is missing.

Returns:

None.

recv_from_rank(source: int, base_tag: int = 0)[source]#

Receives an object sent by send_to_rank(): reconstructs the pickled metadata object and reattaches the chunk-received .mat array.

Parameters:
  • source (int) – Source rank.

  • base_tag (int) – Base MPI tag matching the one used by send_to_rank().

Returns:

The reconstructed object with its .mat array attached.

scatter(full_data: ndarray = None, root: int = 0)[source]#

Scatters the full array (held on root) along axis 0 into the per-rank task slices. Handles the 2 GB MPI limit by chunking. The single-rank case where full_data already has the rank-local length is passed through directly.

Parameters:
  • full_data (ndarray) – The full array on root (shape (ntasks, ...)); ignored on non-root ranks.

  • root (int) – The rank holding full_data.

Returns:

This rank’s slice of the data (shape (my_size, ...)).

Raises:
  • TypeError – If full_data is given but is not a numpy array.

  • ValueError – If full_data’s leading length matches neither ntasks nor the single-rank case.

send_to_rank(obj, dest: int, base_tag: int = 0)[source]#

Sends an N-point-like object to a single rank. The large .mat array is sent as raw chunks (to avoid holding a full pickle blob in memory), while the rest of the object is pickled into a small metadata blob.

Parameters:
  • obj – The object to send; must expose a .mat numpy array attribute.

  • dest (int) – Destination rank.

  • base_tag (int) – Base MPI tag (metadata uses base_tag, array chunks base_tag + 500 + ...).

Returns:

None.

property sizes: ndarray#

The per-rank task counts.

Returns:

The per-rank chunk sizes (number of tasks assigned to each rank).

property slices: ndarray#

The per-rank slices into the full task list.

Returns:

The per-rank slice objects into the full task list.

dgamore.mpi_utils.bcast_rows(comm, arr: ndarray, root: int, limit: int = 2147483647) ndarray[source]#

Broadcasts a numpy array from root to all ranks, chunked along axis 0. Non-root ranks allocate the receive buffer from the broadcast shape/dtype.

Parameters:
  • comm – The MPI communicator.

  • arr (ndarray) – The array to broadcast (only read on root).

  • root (int) – The broadcasting rank.

  • limit (int) – Maximum message size in bytes.

Returns:

The broadcast array on every rank.

Return type:

ndarray

dgamore.mpi_utils.bcast_rows_into(comm, view: ndarray, root: int, limit: int = 2147483647) ndarray[source]#

Broadcasts into an existing contiguous buffer view, chunked along axis 0. This is a collective call: every rank must pass the matching view (same shape/dtype). Used to fill one rank’s slice of an all-gather target.

Parameters:
  • comm – The MPI communicator.

  • view (ndarray) – The destination buffer view (axis-0 slices must be contiguous), identical in shape on all ranks.

  • root (int) – The rank whose data is broadcast.

  • limit (int) – Maximum message size in bytes.

Returns:

view (filled in place).

Return type:

ndarray

dgamore.mpi_utils.chunk_step(itemsize: int, items_per_row: int, limit: int = 2147483647) int[source]#

Returns the number of leading-axis rows that fit into a single limit-byte message (at least 1). This is the raw step used by callers that drive their own (e.g. non-blocking) send/receive loop.

Parameters:
  • itemsize (int) – Size in bytes of one array element.

  • items_per_row (int) – Number of scalars per axis-0 row (product of trailing dimensions).

  • limit (int) – Maximum message size in bytes.

Returns:

The maximum number of rows per chunk (>= 1).

Return type:

int

dgamore.mpi_utils.exchange_and_map_irrbz_fullbz(obj: FourPoint, mpi_dist_irrk: MpiDistributor, mpi_dist_fullbz: MpiDistributor) FourPoint[source]#

Maps an object from the irreducible BZ distribution to the full BZ distribution without ever assembling the full object on any single rank.

Each rank holds a slice of the object over the irreducible BZ (shape [q_irr_rank, …]). This routine redistributes the data peer-to-peer so that each rank ends up with a slice over the full BZ (shape [q_full_rank, …]), with symmetry-equivalent points correctly replicated according to the irrk_inv mapping.

If config.lattice.q_grid is in auto-discovered symmetry mode (its specify_auto_symmetries has been called), the per-k orbital transformation (sigma_k, U_k, conj_k) is also applied locally on each rank, using only the transformation arrays sliced to that rank’s FBZ range. No global gather is needed.

This is a distributed replacement for the pattern (see also map_irrbz_fullbz()):

obj.mat = mpi_dist_irrk.gather(obj.mat)
if comm.rank == 0:
    obj = obj.map_to_full_bz(q_grid)
obj.mat = mpi_dist_fullbz.scatter(obj.mat)

which would require rank 0 to hold the entire full-BZ object in memory.

Parameters:
  • obj (FourPoint) – The FourPoint distributed over the irreducible BZ.

  • mpi_dist_irrk (MpiDistributor) – MPI distributor over the irreducible BZ (source layout).

  • mpi_dist_fullbz (MpiDistributor) – MPI distributor over the full BZ (target layout).

Returns:

The FourPoint distributed over the full BZ (compressed q dimension).

Return type:

FourPoint

dgamore.mpi_utils.execute_distributed_fft(obj: FourPoint, comm: mpi4py.MPI.Comm) FourPoint[source]#

Main routine: Call this for objects that are local to a rank but in the respective full BZ slice. E.g., after a call to exchange_and_map_irrbz_fullbz(). This routine performs a distributed 3D FFT by redistributing the data into pencil decompositions for each dimension, performing local FFTs, and then redistributing back to the original layout. The final result is that obj.mat is transformed in place to the Fourier space representation corresponding to the full BZ. Attention: modifies the object in place!

Parameters:
  • obj (FourPoint) – The FourPoint distributed over the full BZ (flat layout), transformed in place.

  • comm (mpi4py.MPI.Comm) – The MPI communicator.

Returns:

The same FourPoint, now holding the BZ Fourier transform (back in the flat layout).

Return type:

FourPoint

dgamore.mpi_utils.gather_full_ibz_for_vslice(gamma_r: FourPoint, mpi_dist_irrq: MpiDistributor, mpi_dist_v: MpiDistributor, q_grid: KGrid) FourPoint[source]#

Re-lays out a q-distributed pairing vertex into a fermionic-frequency-distributed one for the Eliashberg solver: each rank ends up with the full BZ but only its node-aware slice of the (second) fermionic frequency. The momentum is unfolded to the full BZ locally. Ranks with an empty frequency slice receive None.

Parameters:
  • gamma_r (FourPoint) – The FourPoint pairing vertex distributed over the irreducible BZ.

  • mpi_dist_irrq (MpiDistributor) – MPI distributor over the irreducible BZ q-points (source layout).

  • mpi_dist_v (MpiDistributor) – MPI distributor over the fermionic frequency axis (updated in place to the node-aware split).

  • q_grid (KGrid) – The KGrid used to unfold to the full BZ.

Returns:

The full-BZ FourPoint for this rank’s frequency slice, or None if the slice is empty.

Return type:

FourPoint

dgamore.mpi_utils.get_pencil_indices(rank: int, size: int, nq: tuple[int, int, int], layout: str) ndarray[source]#

Calculates which global flattened q-indices (0 to n_tot - 1) a rank owns under a given decomposition layout. The "flat" layout matches MpiDistributor._distribute_tasks() (excess on the last ranks); the pencil layouts assign whole lines along one axis so a subsequent 1D FFT along that axis is rank-local.

Parameters:
  • rank (int) – The rank whose indices to compute.

  • size (int) – Total number of ranks.

  • nq (tuple[int, int, int]) – The momentum grid sizes (nx, ny, nz).

  • layout (str) – One of "flat", "z_pencil", "y_pencil", "x_pencil".

Returns:

The global flattened q-indices owned by rank.

Raises:

ValueError – If layout is not one of the supported layouts.

Return type:

ndarray

dgamore.mpi_utils.map_irrbz_fullbz(obj, mpi_dist_irrk, mpi_dist_fullbz)[source]#

Maps an object from the irreducible-BZ rank distribution to the full-BZ distribution the simple way: gather to rank 0, unfold to the full BZ there, then scatter back. Requires rank 0 to hold the full-BZ object transiently.

Parameters:
  • obj – The object distributed over the irreducible BZ (must support map_to_full_bz()).

  • mpi_dist_irrk – MPI distributor over the irreducible BZ (source layout).

  • mpi_dist_fullbz – MPI distributor over the full BZ (target layout).

Returns:

The object distributed over the full BZ.

dgamore.mpi_utils.recv_bytes(comm, source: int, base_tag: int = 0, limit: int = 2147483647) bytes[source]#

Receives a chunked raw byte blob sent by send_bytes().

Parameters:
  • comm – The MPI communicator.

  • source (int) – Source rank.

  • base_tag (int) – Base MPI tag matching the one used by send_bytes().

  • limit (int) – Maximum message size in bytes.

Returns:

The reassembled bytes.

Return type:

bytes

dgamore.mpi_utils.recv_rows_alloc(comm, shape: tuple, dtype, source: int, base_tag: int = 0, limit: int = 2147483647) ndarray[source]#

Allocates an array of the given shape/dtype and receives into it (see recv_rows_into()).

Parameters:
  • comm – The MPI communicator.

  • shape (tuple) – Shape of the array to receive.

  • dtype – Dtype of the array to receive.

  • source (int) – Source rank.

  • base_tag (int) – Base MPI tag; successive chunks use base_tag + chunk_index.

  • limit (int) – Maximum message size in bytes.

Returns:

The received array.

Return type:

ndarray

dgamore.mpi_utils.recv_rows_into(comm, buf: ndarray, source: int, base_tag: int = 0, limit: int = 2147483647) ndarray[source]#

Receives rows from source directly into the contiguous buffer buf (no per-chunk staging buffer). The buffer’s axis-0 slices must be contiguous (e.g. buf is C-contiguous or an axis-0 view of such an array).

Parameters:
  • comm – The MPI communicator.

  • buf (ndarray) – The destination buffer; its leading axis length determines the number of rows received.

  • source (int) – Source rank.

  • base_tag (int) – Base MPI tag; successive chunks use base_tag + chunk_index.

  • limit (int) – Maximum message size in bytes.

Returns:

buf (filled in place).

Return type:

ndarray

dgamore.mpi_utils.row_chunks(n_rows: int, itemsize: int, items_per_row: int, limit: int = 2147483647)[source]#

Yields (start, stop) row-index pairs splitting n_rows leading-axis rows into sub-limit-byte chunks.

Parameters:
  • n_rows (int) – Number of rows (axis-0 elements) to split.

  • itemsize (int) – Size in bytes of one array element.

  • items_per_row (int) – Number of scalars per axis-0 row (product of trailing dimensions).

  • limit (int) – Maximum message size in bytes.

Returns:

A generator of (start, stop) row-index pairs.

dgamore.mpi_utils.send_bytes(comm, data: bytes, dest: int, base_tag: int = 0, limit: int = 2147483647) None[source]#

Sends a raw byte blob to dest in sub-limit-byte chunks, preceded by a small length message (at base_tag); the chunks use base_tag + 1 + chunk_index.

Parameters:
  • comm – The MPI communicator.

  • data (bytes) – The bytes to send.

  • dest (int) – Destination rank.

  • base_tag (int) – Base MPI tag (length at base_tag, chunks at base_tag + 1 + chunk_index).

  • limit (int) – Maximum message size in bytes.

Returns:

None.

Return type:

None

dgamore.mpi_utils.send_rows(comm, arr: ndarray, dest: int, base_tag: int = 0, limit: int = 2147483647) None[source]#

Sends arr to dest in sub-limit-byte chunks along axis 0 (tag = base_tag + chunk_index).

Parameters:
  • comm – The MPI communicator.

  • arr (ndarray) – The array to send.

  • dest (int) – Destination rank.

  • base_tag (int) – Base MPI tag; successive chunks use base_tag + chunk_index.

  • limit (int) – Maximum message size in bytes.

Returns:

None.

Return type:

None