# SPDX-FileCopyrightText: 2025-2026 Julian Peil <julian.peil@tuwien.ac.at>
# SPDX-License-Identifier: MIT
#
# DGAmore — Multi-Orbital Ladder Dynamical Vertex Approximation (LDGA) &
# Eliashberg Equation Solver for Strongly Correlated Electron Systems
r"""
Non-local ladder DGA step — the parallel-heavy core of the code. Starting from the local irreducible vertex
:math:`\Gamma_{r}` and the bare interaction, the functions here build, per momentum :math:`q` and spin channel,
the bubble :math:`\chi_0^q`, the auxiliary susceptibility :math:`\chi^{*;q}_{r}`, the three-leg vertex
:math:`\gamma^q_{r}`, the physical susceptibility :math:`\chi^q_{r}` (with shell and optional
:math:`\lambda`-correction) and the self-energy kernel, then contract the kernel with the Green's function to get
the momentum-dependent self-energy :math:`\Sigma(k, \nu)`. Several CPU/GPU/FFT variants of the heavy contractions
are provided, distributed over MPI ranks. The whole thing is wrapped in a self-consistency loop with chemical-
potential adjustment and self-energy mixing (linear / Pulay / Anderson). Equation numbers refer to the author's
master's thesis (Chapters 3 & 4).
"""
import glob
import os
import re
from copy import deepcopy
import mpi4py.MPI as MPI
import numpy as np
from scipy import optimize as opt
import dgamore.config as config
import dgamore.lambda_correction as lc
import dgamore.mpi_utils as mpi_utils
from dgamore.brillouin_zone import KGrid
from dgamore.bubble_gen import BubbleGenerator
from dgamore.four_point import FourPoint
from dgamore.greens_function import GreensFunction, update_mu
from dgamore.interaction import LocalInteraction, Interaction
from dgamore.local_four_point import LocalFourPoint
from dgamore.matsubara_frequencies import MFHelper
from dgamore.mpi_utils import MpiDistributor
from dgamore.n_point_base import SpinChannel
from dgamore.self_energy import SelfEnergy
[docs]
def get_hartree_fock(
u_loc: LocalInteraction, v_nonloc: Interaction, q_list: np.ndarray
) -> tuple[np.ndarray, np.ndarray]:
r"""
Returns the Hartree-Fock term separately for the local and non-local interaction. Since we are always SU(2)-symmetric,
the sum over the spins of the first term in Eq. (4.55) in Anna Galler's thesis results in a simple factor of 2. This
can be seen in my master's thesis, Eq. (3.55). The Hartree-Fock term is given by
.. math:: \Sigma_{HF}^k = 2(U_{acbd} + V^{q=0}_{acbd}) n_{dc} - 1/N_q \sum_q (U_{adcb} + V^{q}_{adcb}) n^{k-q}_{dc}
where the Hartree term reads :math:`\Sigma_{H} = 2(U_{acbd} + V^{q=0}_{acbd}) n_{dc}` and the Fock term reads
:math:`\Sigma_{F}^k = - 1/N_q \sum_q (U_{adcb} + V^{q}_{adcb}) n^{k-q}_{dc}`. The Hartree contraction uses the
middle-index-swapped ``U_{acbd}`` so it picks up the inter-orbital density :math:`U'` (stored at :math:`U_{abab}`);
see :func:`dgamore.local_sde.get_local_hartree_fock`.
Processes the Fock term for each individual orbital to save memory, as for high momentum grids,
the ``occ_qk`` property can become large.
:param u_loc: The bare local interaction :math:`U`.
:param v_nonloc: The non-local interaction :math:`V^{q}` (see :class:`Interaction`).
:param q_list: Array of integer q-point index triplets handled by this rank.
:return: The tuple ``(hartree, fock)`` of self-energy contributions, broadcastable to ``[k, o1, o2, v]``.
"""
v_q0 = v_nonloc.find_q((0, 0, 0))
# The inter-orbital density U' is stored at U_{abab}, so the Hartree term contracts "qacbd" (swapping the middle
# orbital indices) to pick it up; the Fock term below uses U_{adcb}. See local_sde.get_local_hartree_fock.
hartree = 2 * (u_loc + v_q0).times("qacbd,dc->ab", config.sys.occ)
nb = config.sys.n_bands
nk_tot = np.prod(config.lattice.nk)
nq_tot = np.prod(config.lattice.nq)
uq = (u_loc + v_nonloc.reduce_q(q_list)).permute_orbitals("abcd->adcb") # (nq,a,d,c,b)
fock = np.zeros((nk_tot, nb, nb), dtype=uq.mat.dtype)
for d in range(nb):
for c in range(nb):
u_slice = uq[:, :, d, c, :]
if not np.any(u_slice):
continue
occ_qk_dc = np.array(
[np.roll(config.sys.occ_k[..., d, c], [-i for i in q], axis=(0, 1, 2)) for q in q_list]
)
occ_qk_dc = occ_qk_dc.reshape(len(q_list), nk_tot)
contribution = u_slice[:, None, :, :] * occ_qk_dc[:, :, None, None]
fock += contribution.sum(axis=0)
fock *= -1.0 / nq_tot
return hartree[None, ..., None], fock[..., None] # [k,o1,o2,v]
[docs]
def create_auxiliary_chi_r_q(
gamma_r: LocalFourPoint, gchi0_q_inv: FourPoint, u_loc: LocalInteraction, v_nonloc: Interaction
) -> FourPoint:
r"""
Returns the auxiliary susceptibility, see Eq. (3.60) in my master's thesis,
.. math:: \chi^{*;q\nu\nu'}_{r;abcd} = ((\chi_{0;abcd}^{q\nu})^{-1} + (\Gamma_{r;abcd}^{\omega\nu\nu'}-U_{r;abcd}-V_{r;abcd}^q)/\beta^2)^{-1}.
:param gamma_r: The local irreducible vertex :math:`\Gamma_{r}`.
:param gchi0_q_inv: The inverse bare bubble :math:`(\chi_0^q)^{-1}` (core box).
:param u_loc: The bare local interaction :math:`U`.
:param v_nonloc: The non-local interaction :math:`V^{q}`.
:return: The momentum-dependent auxiliary susceptibility :math:`\chi^{*;q}_{r}` as a :class:`FourPoint`.
"""
return (
(gchi0_q_inv + 1.0 / config.sys.beta**2 * gamma_r)
- 1.0 / config.sys.beta**2 * (v_nonloc.as_channel(gamma_r.channel) + u_loc.as_channel(gamma_r.channel))
).invert(False)
[docs]
def create_auxiliary_chi_r_q_sum_v1(
gamma_r: LocalFourPoint, gchi0_q_inv: FourPoint, u_loc: LocalInteraction, v_nonloc: Interaction
) -> FourPoint:
r"""
Returns the sum over the auxiliary susceptibility, see Eq. (3.60) in my master's thesis. This variant inverts
and sums over the last fermionic frequency in one fused step (see
:meth:`FourPoint.invert_and_sum_over_last_vn`),
.. math:: \sum_{\nu'}\chi^{*;q\nu\nu'}_{r;abcd} = \sum_{\nu'}((\chi_{0;abcd}^{q\nu})^{-1} + (\Gamma_{r;abcd}^{\omega\nu\nu'}-U_{r;abcd}-V_{r;abcd}^q)/\beta^2)^{-1}.
:param gamma_r: The local irreducible vertex :math:`\Gamma_{r}`.
:param gchi0_q_inv: The inverse bare bubble :math:`(\chi_0^q)^{-1}` (core box).
:param u_loc: The bare local interaction :math:`U`.
:param v_nonloc: The non-local interaction :math:`V^{q}`.
:return: The frequency-summed auxiliary susceptibility :math:`\sum_{\nu'}\chi^{*;q}_{r}` as a :class:`FourPoint`.
"""
return (
(gchi0_q_inv + 1.0 / config.sys.beta**2 * gamma_r)
- 1.0 / config.sys.beta**2 * (v_nonloc.as_channel(gamma_r.channel) + u_loc.as_channel(gamma_r.channel))
).invert_and_sum_over_last_vn(config.sys.beta)
[docs]
def create_auxiliary_chi_r_q_sum_v2(
gamma_r: LocalFourPoint,
gchi0_q_inv: FourPoint,
u_loc: LocalInteraction,
v_nonloc: Interaction,
mpi_dist_irrq: MpiDistributor,
) -> FourPoint:
r"""
Returns the sum over the auxiliary susceptibility, see Eq. (3.60) in my master's thesis. This variant loops over
the rank-local q-points (capping peak memory to one q at a time) and uses the standard fused invert-and-sum per
q (see :meth:`FourPoint.invert_and_sum_over_last_vn`),
.. math:: \sum_{\nu'}\chi^{*;q\nu\nu'}_{r;abcd} = \sum_{\nu'}((\chi_{0;abcd}^{q\nu})^{-1} + (\Gamma_{r;abcd}^{\omega\nu\nu'}-U_{r;abcd}-V_{r;abcd}^q)/\beta^2)^{-1}.
:param gamma_r: The local irreducible vertex :math:`\Gamma_{r}`.
:param gchi0_q_inv: The inverse bare bubble :math:`(\chi_0^q)^{-1}` (core box).
:param u_loc: The bare local interaction :math:`U`.
:param v_nonloc: The non-local interaction :math:`V^{q}`.
:param mpi_dist_irrq: MPI distributor over the irreducible BZ q-points (see :class:`MpiDistributor`).
:return: The frequency-summed auxiliary susceptibility :math:`\sum_{\nu'}\chi^{*;q}_{r}` as a :class:`FourPoint`.
"""
irrk_q_list = config.lattice.q_grid.get_irrq_list()
my_irr_q_list = irrk_q_list[mpi_dist_irrq.my_slice]
chi_r_q_sum_mat = np.zeros_like(gchi0_q_inv.mat)
for idx in range(len(my_irr_q_list)):
chi_r_q_sum_mat[idx] = (
(
(gchi0_q_inv.filter_q_index(idx) + 1.0 / config.sys.beta**2 * gamma_r)
- 1.0
/ config.sys.beta**2
* (v_nonloc.as_channel(gamma_r.channel).filter_q_index(idx) + u_loc.as_channel(gamma_r.channel))
)
.invert_and_sum_over_last_vn(config.sys.beta)
.mat
)
return FourPoint(chi_r_q_sum_mat, gamma_r.channel, config.lattice.nq, 1, 1, False, has_compressed_q_dimension=True)
[docs]
def create_auxiliary_chi_r_q_sum_v3(
gamma_r: LocalFourPoint,
gchi0_q_inv: FourPoint,
u_loc: LocalInteraction,
v_nonloc: Interaction,
mpi_dist_irrq: MpiDistributor,
) -> FourPoint:
r"""
Returns the sum over the auxiliary susceptibility, see Eq. (3.60) in my master's thesis. This is the most
memory-lean variant: it loops over the rank-local q-points and uses the highly memory-efficient
linear-solver-based fused invert-and-sum per q (see :meth:`FourPoint.invert_and_sum_over_last_vn_v2`),
.. math:: \sum_{\nu'}\chi^{*;q\nu\nu'}_{r;abcd} = \sum_{\nu'}((\chi_{0;abcd}^{q\nu})^{-1} + (\Gamma_{r;abcd}^{\omega\nu\nu'}-U_{r;abcd}-V_{r;abcd}^q)/\beta^2)^{-1}.
:param gamma_r: The local irreducible vertex :math:`\Gamma_{r}`.
:param gchi0_q_inv: The inverse bare bubble :math:`(\chi_0^q)^{-1}` (core box).
:param u_loc: The bare local interaction :math:`U`.
:param v_nonloc: The non-local interaction :math:`V^{q}`.
:param mpi_dist_irrq: MPI distributor over the irreducible BZ q-points (see :class:`MpiDistributor`).
:return: The frequency-summed auxiliary susceptibility :math:`\sum_{\nu'}\chi^{*;q}_{r}` as a :class:`FourPoint`.
"""
irrk_q_list = config.lattice.q_grid.get_irrq_list()
my_irr_q_list = irrk_q_list[mpi_dist_irrq.my_slice]
chi_r_q_sum_mat = np.zeros_like(gchi0_q_inv.mat)
for idx in range(len(my_irr_q_list)):
chi_r_q_sum_mat[idx] = (
(
(gchi0_q_inv.filter_q_index(idx) + 1.0 / config.sys.beta**2 * gamma_r)
- 1.0
/ config.sys.beta**2
* (v_nonloc.as_channel(gamma_r.channel).filter_q_index(idx) + u_loc.as_channel(gamma_r.channel))
)
.invert_and_sum_over_last_vn_v2(config.sys.beta)
.mat
)
return FourPoint(chi_r_q_sum_mat, gamma_r.channel, config.lattice.nq, 1, 1, False, has_compressed_q_dimension=True)
[docs]
def create_vrg_r_q(gchi_aux_q_r_sum: FourPoint, gchi0_q_inv: FourPoint) -> FourPoint:
r"""
Returns the momentum-dependent three-leg vertex, see Eq. (3.63) in my master's thesis,
:math:`\gamma_{r;abcd}^{q\nu} = \beta (\chi^{q\nu\nu}_{0;ablm})^{-1} (\sum_{\nu'} \chi^{*;q\nu\nu'}_{r;mlcd})`.
:param gchi_aux_q_r_sum: The frequency-summed auxiliary susceptibility :math:`\sum_{\nu'}\chi^{*;q}_{r}`.
:param gchi0_q_inv: The inverse bare bubble :math:`(\chi_0^q)^{-1}` (core box).
:return: The three-leg vertex :math:`\gamma^q_{r}` (``vrg``) as a :class:`FourPoint`.
"""
return config.sys.beta * (gchi0_q_inv @ gchi_aux_q_r_sum)
[docs]
def create_generalized_chi_q_with_shell_correction(
gchi_aux_q_sum: FourPoint,
gchi0_q_full_sum: FourPoint,
gchi0_q_core_sum: FourPoint,
u_loc: LocalInteraction,
v_nonloc: Interaction,
) -> FourPoint:
r"""
Calculates the generalized susceptibility with the shell correction as described by
Motoharu Kitatani et al. 2022 J. Phys. Mater. 5 034005; DOI 10.1088/2515-7639/ac7e6d. Eq. A.15. See also Sec. 3.7.2
in my master's thesis for details.
:param gchi_aux_q_sum: The frequency-summed auxiliary susceptibility :math:`\sum_{\nu\nu'}\chi^{*;q}_{r}`.
:param gchi0_q_full_sum: The frequency-summed bare bubble over the full box.
:param gchi0_q_core_sum: The frequency-summed bare bubble over the core box.
:param u_loc: The bare local interaction :math:`U`.
:param v_nonloc: The non-local interaction :math:`V^{q}`.
:return: The shell-corrected physical susceptibility :math:`\chi^{q}_{r}` as a :class:`FourPoint`.
"""
return (
(gchi_aux_q_sum + gchi0_q_full_sum - gchi0_q_core_sum).invert()
+ (u_loc.as_channel(gchi_aux_q_sum.channel) + v_nonloc.as_channel(gchi_aux_q_sum.channel))
).invert()
[docs]
def calculate_sigma_dc_kernel(f_dc_loc: LocalFourPoint, gchi0_q: FourPoint, u_loc: LocalInteraction) -> FourPoint:
r"""
Returns the double-counting kernel for the self-energy calculation, contracting the local full vertex with the
momentum-dependent bubble per q-point. For details, see Eq. (4.28) in my master's thesis.
:param f_dc_loc: The local full vertex :math:`F` used for the double-counting correction.
:param gchi0_q: The momentum-dependent bare bubble :math:`\chi_0^q`.
:param u_loc: The bare local interaction :math:`U`.
:return: The double-counting kernel as a :class:`FourPoint`, cut to the core fermionic box.
"""
kernel = 1.0 / config.sys.beta**2 * u_loc.permute_orbitals("abcd->adcb") @ gchi0_q
einsum_str = "abcdwv,dcefwvp->abefwp"
path, _ = np.einsum_path(einsum_str, kernel.mat[0].copy(), f_dc_loc.mat, optimize="optimal")
for q in range(kernel.current_shape[0]):
kernel[q] = np.einsum(einsum_str, kernel[q].copy(), f_dc_loc.mat, optimize=path)
return kernel.cut_niv(config.box.niv_core)
[docs]
def calculate_kernel_r_q(
vrg_q_r: FourPoint, gchi_aux_q_r_sum: FourPoint, v_nonloc: Interaction, u_loc: LocalInteraction
) -> FourPoint:
r"""
Returns the kernel for the self-energy calculation minus 2/3 times the identity if the channel is the magnetic
channel (due to the extra factor of :math:`U_{ah21}` in Eq. (4.29) in my master's thesis),
.. math:: K = \gamma_{r;abcd}^{q\nu} - \gamma_{r;abef}^{q\nu} U^{q}_{r;fehg} \chi_{r;ghcd}^{q}.
:param vrg_q_r: The momentum-dependent three-leg vertex :math:`\gamma^q_{r}`.
:param gchi_aux_q_r_sum: The (shell-corrected) physical susceptibility :math:`\chi^{q}_{r}`.
:param v_nonloc: The non-local interaction :math:`V^{q}`.
:param u_loc: The bare local interaction :math:`U`.
:return: The self-energy kernel :math:`U_r K` as a :class:`FourPoint`.
"""
u_r = v_nonloc.as_channel(vrg_q_r.channel) + u_loc.as_channel(vrg_q_r.channel)
kernel = vrg_q_r - vrg_q_r @ u_r @ gchi_aux_q_r_sum
if vrg_q_r.channel == SpinChannel.MAGN:
kernel -= 2.0 / 3.0 * FourPoint.identity_like(kernel)
return u_r @ kernel
[docs]
def calculate_and_save_chi_q_r_rpa(
gchi0_q_core_inv: FourPoint, u_loc: LocalInteraction, v_nonloc: Interaction, mpi_dist_irrk: MpiDistributor
):
r"""
Calculates and saves the RPA susceptibility (for both density and magnetic channels) from the DMFT Green's
functions, :math:`\chi_{d/m;\mathrm{RPA}} = \chi_0 (1 + U_{d/m}\chi_0)^{-1} = (\chi_0^{-1} + U_{d/m})^{-1}`. The
result is gathered to rank 0 and written to file.
:param gchi0_q_core_inv: The inverse bare bubble :math:`(\chi_0^q)^{-1}` (core box).
:param u_loc: The bare local interaction :math:`U`.
:param v_nonloc: The non-local interaction :math:`V^{q}`.
:param mpi_dist_irrk: MPI distributor over the irreducible BZ q-points (see :class:`MpiDistributor`).
:return: None.
"""
for channel in [SpinChannel.DENS, SpinChannel.MAGN]:
u_r = u_loc.as_channel(channel) + v_nonloc.as_channel(channel)
chi_rpa_q_r = (gchi0_q_core_inv + u_r).invert(False).sum_over_all_vn(config.sys.beta)
chi_rpa_q_r.mat = mpi_dist_irrk.gather(chi_rpa_q_r.mat)
if mpi_dist_irrk.my_rank == 0:
chi_rpa_q_r.save(name=f"chi_rpa_q_{channel.value}", output_dir=config.output.output_path)
chi_rpa_q_r.free()
config.logger.info(f"Calculated RPA susceptibility ({channel.value}).")
[docs]
def calculate_sigma_kernel_r_q(
gamma_r: LocalFourPoint,
gchi0_q_inv: FourPoint,
gchi0_q_full_sum: FourPoint,
gchi0_q_core_sum: FourPoint,
u_loc: LocalInteraction,
v_nonloc: Interaction,
mpi_dist_irrq: MpiDistributor,
) -> FourPoint:
r"""
Returns the kernel for the self-energy calculation in a specific spin channel. Calculates the auxiliary
susceptibility, the three-leg vertex and the physical susceptibility with shell correction. Also performs a
:math:`\lambda`-correction on the physical susceptibility if specified in the config for single-band input.
Saves the physical susceptibility (and, if Eliashberg is enabled, the intermediate vertices) to file.
:param gamma_r: The local irreducible vertex :math:`\Gamma_{r}`.
:param gchi0_q_inv: The inverse bare bubble :math:`(\chi_0^q)^{-1}` (core box).
:param gchi0_q_full_sum: The frequency-summed bare bubble over the full box.
:param gchi0_q_core_sum: The frequency-summed bare bubble over the core box.
:param u_loc: The bare local interaction :math:`U`.
:param v_nonloc: The non-local interaction :math:`V^{q}`.
:param mpi_dist_irrq: MPI distributor over the irreducible BZ q-points (see :class:`MpiDistributor`).
:return: The self-energy kernel for this channel as a :class:`FourPoint`.
"""
logger = config.logger
if config.memory.save_memory_for_chiq_aux:
gchi_aux_q_r_sum = create_auxiliary_chi_r_q_sum_v3(gamma_r, gchi0_q_inv, u_loc, v_nonloc, mpi_dist_irrq)
else:
gchi_aux_q_r_sum = create_auxiliary_chi_r_q_sum_v1(gamma_r, gchi0_q_inv, u_loc, v_nonloc)
mpi_dist_irrq.barrier()
logger.log_memory_usage(
f"Gchi_aux ({gchi_aux_q_r_sum.channel.value})",
gchi_aux_q_r_sum,
mpi_dist_irrq.comm.size * 2 * config.box.niv_core,
)
logger.info(f"Non-Local auxiliary susceptibility ({gchi_aux_q_r_sum.channel.value}) calculated.")
vrg_q_r = create_vrg_r_q(gchi_aux_q_r_sum, gchi0_q_inv)
logger.info(f"Non-local three-leg vertex gamma^wv ({vrg_q_r.channel.value}) done.")
logger.log_memory_usage(f"Three-leg vertex ({vrg_q_r.channel.value})", vrg_q_r, mpi_dist_irrq.comm.size)
if config.eliashberg.perform_eliashberg:
vrg_q_r.save(
name=f"vrg_q_{vrg_q_r.channel.value}_rank_{mpi_dist_irrq.comm.rank}",
output_dir=config.output.eliashberg_path,
)
chi_phys_q_r = gchi_aux_q_r_sum.sum_over_all_vn(config.sys.beta)
gchi_aux_q_r_sum.free()
chi_phys_q_r = create_generalized_chi_q_with_shell_correction(
chi_phys_q_r, gchi0_q_full_sum, gchi0_q_core_sum, u_loc, v_nonloc
)
logger.info(f"Updated non-local susceptibility chi^q ({chi_phys_q_r.channel.value}) with asymptotic correction.")
if config.self_consistency.restrict_chi_phys:
logger.warning("Restricting physical susceptibility to positive values.")
chi_phys_q_r = chi_phys_q_r.invert()
chi_phys_q_r.mat[chi_phys_q_r.mat < 0] = 1e-4
chi_phys_q_r = chi_phys_q_r.invert()
logger.log_memory_usage(
f"Physical susceptibility ({chi_phys_q_r.channel.value})", chi_phys_q_r, mpi_dist_irrq.comm.size
)
chi_phys_q_r.mat = mpi_dist_irrq.gather(chi_phys_q_r.mat)
if mpi_dist_irrq.comm.rank == 0:
if config.lambda_correction.perform_lambda_correction:
chi_phys_q_r = perform_lambda_correction(chi_phys_q_r)
chi_phys_q_r.save(name=f"chi_phys_q_{chi_phys_q_r.channel.value}", output_dir=config.output.output_path)
# perform Ornstein-Zernike fit
if chi_phys_q_r.channel == SpinChannel.MAGN:
perform_ornstein_zernike_fit(chi_phys_q_r)
chi_phys_q_r.mat = mpi_dist_irrq.scatter(chi_phys_q_r.mat)
logger.info(f"Saved physical susceptibility ({chi_phys_q_r.channel.value}) to file.")
if config.eliashberg.perform_eliashberg:
chi_phys_q_r.save(
name=f"gchi_aux_q_{chi_phys_q_r.channel.value}_sum_rank_{mpi_dist_irrq.comm.rank}",
output_dir=config.output.eliashberg_path,
)
return calculate_kernel_r_q(vrg_q_r, chi_phys_q_r, v_nonloc, u_loc)
[docs]
def calculate_sigma_from_kernel(kernel: FourPoint, giwk: GreensFunction, my_full_q_list: np.ndarray) -> SelfEnergy:
r"""
Returns :math:`\Sigma_{ij}^{k} = -\frac{1}{2\beta N_q} \sum_q U^q_{r;aibc} K_{r;cbjd}^{q\nu} G_{ad}^{\omega-\nu}`.
For very large momentum grids, this function is the slowest part compared to the rest of the code due to the
repeated loops. Potential speed-ups could be achieved by batching the q-points or using numba.
:param kernel: The self-energy kernel :math:`K` (full BZ, scattered across ranks).
:param giwk: The momentum-dependent :class:`GreensFunction`.
:param my_full_q_list: Array of integer q-point index triplets handled by this rank.
:return: The rank-local contribution to the non-local :class:`SelfEnergy` (compressed q, full niv range).
"""
mat = np.zeros(
(*config.lattice.k_grid.nk, config.sys.n_bands, config.sys.n_bands, config.box.niv_core),
dtype=kernel.mat.dtype,
)
kernel = kernel.to_full_niw_range()
wn = MFHelper.wn(config.box.niw_core)
path = np.einsum_path("aijdv,xyzadv->xyzijv", kernel[0, ..., 0, :], mat, optimize=True)[1]
for idx_q, q in enumerate(my_full_q_list):
shifted_mat = np.roll(giwk.mat, [-i for i in q], axis=(0, 1, 2))
for idx_w, wn_i in enumerate(wn):
g_qk = shifted_mat[..., giwk.niv - wn_i : giwk.niv + config.box.niv_core - wn_i]
k_slice = kernel[idx_q, ..., idx_w, config.box.niv_core :]
mat += np.einsum("aijdv,xyzadv->xyzijv", k_slice, g_qk, optimize=path)
mat *= -0.5 / config.sys.beta / config.lattice.q_grid.nk_tot
return SelfEnergy(mat, config.lattice.nk, False, beta=config.sys.beta).compress_q_dimension().to_full_niv_range()
[docs]
def calculate_sigma_from_kernel_cpu(
kernel: FourPoint,
giwk: GreensFunction,
my_full_q_list: np.ndarray,
) -> SelfEnergy:
r"""
Returns :math:`\Sigma_{ij}^{k} = -\frac{1}{2\beta N_q} \sum_q U^q_{r;aibc} K_{r;cbjd}^{q\nu} G_{ad}^{\omega-\nu}`.
For very large momentum grids, this function is the slowest part compared to the rest of the code due to the
repeated loops. There is no real way to speed it up further without leveraging GPUs or other hardware accelerators.
This is the CPU implementation (Fortran-ordered buffers, preallocated accumulator).
:param kernel: The self-energy kernel :math:`K` (full BZ, scattered across ranks).
:param giwk: The momentum-dependent :class:`GreensFunction`.
:param my_full_q_list: Array of integer q-point index triplets handled by this rank.
:return: The rank-local contribution to the non-local :class:`SelfEnergy` (compressed q, full niv range).
"""
nkx, nky, nkz = config.lattice.k_grid.nk
nb = config.sys.n_bands
niv_core = config.box.niv_core
mat = np.zeros((nkx, nky, nkz, nb, nb, niv_core), dtype=kernel.mat.dtype)
wn = MFHelper.wn(config.box.niw_core)
giwk_mat = np.asfortranarray(giwk.mat)
kernel = np.asfortranarray(kernel.to_full_niw_range().mat[..., niv_core:])
kxs, kys, kzs = np.arange(nkx), np.arange(nky), np.arange(nkz)
kx_indices = [((kxs + q[0]) % nkx) for q in my_full_q_list]
ky_indices = [((kys + q[1]) % nky) for q in my_full_q_list]
kz_indices = [((kzs + q[2]) % nkz) for q in my_full_q_list]
acc = np.empty((nkx, nky, nkz, nb, nb, niv_core), dtype=mat.dtype)
for iq in range(len(my_full_q_list)):
g_q_view = giwk_mat[
kx_indices[iq][:, None, None], ky_indices[iq][None, :, None], kz_indices[iq][None, None, :], ...
]
for iw, w in enumerate(wn):
g_slice = g_q_view[..., giwk.niv - w : giwk.niv + niv_core - w]
k_slice = kernel[iq, ..., iw, :]
np.einsum("xyzadv,aijdv->xyzijv", g_slice, k_slice, out=acc, optimize=True)
np.add(mat, acc, out=mat)
mat *= -0.5 / config.sys.beta / config.lattice.q_grid.nk_tot
return (
SelfEnergy(np.ascontiguousarray(mat), config.lattice.nk, False, beta=config.sys.beta)
.compress_q_dimension()
.to_full_niv_range()
)
[docs]
def calculate_sigma_from_kernel_gpu(
kernel: FourPoint,
giwk: GreensFunction,
my_full_q_list: np.ndarray,
) -> SelfEnergy:
r"""
Returns :math:`\Sigma_{ij}^{k} = -\frac{1}{2\beta N_q} \sum_q U^q_{r;aibc} K_{r;cbjd}^{q\nu} G_{ad}^{\omega-\nu}`.
For very large momentum grids, this function is the slowest part compared to the rest of the code due to the
repeated loops. This is the GPU implementation using CuPy.
:param kernel: The self-energy kernel :math:`K` (full BZ, scattered across ranks).
:param giwk: The momentum-dependent :class:`GreensFunction`.
:param my_full_q_list: Array of integer q-point index triplets handled by this rank.
:return: The rank-local contribution to the non-local :class:`SelfEnergy` (compressed q, full niv range).
"""
import cupy as cp
nkx, nky, nkz = config.lattice.k_grid.nk
nb = config.sys.n_bands
niv_core = config.box.niv_core
mat_gpu = cp.zeros((nkx, nky, nkz, nb, nb, niv_core), dtype=kernel.mat.dtype, order="F")
wn = MFHelper.wn(config.box.niw_core)
giwk_mat = cp.asarray(giwk.mat, order="F")
kernel = cp.asarray(kernel.to_full_niw_range().mat, order="F")[..., niv_core:]
kxs, kys, kzs = cp.arange(nkx), cp.arange(nky), cp.arange(nkz)
kx_indices = [((kxs + q[0]) % nkx) for q in my_full_q_list]
ky_indices = [((kys + q[1]) % nky) for q in my_full_q_list]
kz_indices = [((kzs + q[2]) % nkz) for q in my_full_q_list]
for iq in range(len(my_full_q_list)):
g_q_view = giwk_mat[
kx_indices[iq][:, None, None], ky_indices[iq][None, :, None], kz_indices[iq][None, None, :], ...
]
for iw, w in enumerate(wn):
g_slice = g_q_view[..., giwk.niv - w : giwk.niv + niv_core - w]
k_slice = kernel[iq, ..., iw, :]
mat_gpu += cp.einsum("xyzadv,aijdv->xyzijv", g_slice, k_slice, optimize=True)
mat_gpu *= -0.5 / config.sys.beta / config.lattice.q_grid.nk_tot
return (
SelfEnergy(np.ascontiguousarray(cp.asnumpy(mat_gpu)), config.lattice.nk, False, beta=config.sys.beta)
.compress_q_dimension()
.to_full_niv_range()
)
[docs]
def calculate_sigma_from_kernel_auto(
mpi_distributor: MpiDistributor, kernel: FourPoint, giwk: GreensFunction, my_full_q_list: np.ndarray
) -> SelfEnergy:
r"""
Dispatches the q-loop self-energy contraction to the GPU (:func:`calculate_sigma_from_kernel_gpu`) when CuPy and
a usable CUDA device are available (one GPU per MPI rank, round-robin), otherwise falls back to the CPU
implementation (:func:`calculate_sigma_from_kernel_cpu`).
:param mpi_distributor: MPI distributor used to choose the per-rank GPU (see :class:`MpiDistributor`).
:param kernel: The self-energy kernel :math:`K` (full BZ, scattered across ranks).
:param giwk: The momentum-dependent :class:`GreensFunction`.
:param my_full_q_list: Array of integer q-point index triplets handled by this rank.
:return: The rank-local contribution to the non-local :class:`SelfEnergy`.
"""
logger = config.logger
cp = None
try:
import cupy as cp
except ImportError:
pass # CuPy not installed -> CPU
n_gpus = 0
if cp is not None:
try:
n_gpus = cp.cuda.runtime.getDeviceCount()
except cp.cuda.runtime.CUDARuntimeError:
n_gpus = 0 # no usable CUDA driver/device -> CPU
if n_gpus > 0 and cp.cuda.is_available():
logger.info(f"CuPy detected {n_gpus} GPU(s). Using GPU acceleration for self-energy calculation.")
gpu_id = mpi_distributor.my_rank % n_gpus
cp.cuda.Device(gpu_id).use()
return calculate_sigma_from_kernel_gpu(kernel, giwk, my_full_q_list)
return calculate_sigma_from_kernel_cpu(kernel, giwk, my_full_q_list)
[docs]
def calculate_sigma_from_kernel_fft_cpu(
mpi_dist: MpiDistributor, kernel: FourPoint, giwk: GreensFunction, niw_index_w_pairs: list[tuple[int, int]]
) -> SelfEnergy:
r"""
Computes the self-energy using distributed FFTs (CPU). Replaces the q-loop with a real-space pointwise
multiplication: both the Green's function and the kernel are FFT'd to real space (the kernel to :math:`-R` via the
conjugate trick), contracted pointwise per rank-local R-slice, and accumulated. Returns :math:`\Sigma` in R-space,
positive-:math:`\nu` half only; the caller must ifft over :math:`(k_x, k_y, k_z)` and then call
:meth:`SelfEnergy.to_full_niv_range` before use.
This contracts a **single** bosonic-frequency half (positive-w *or* negative-w) so the full-BZ kernel is never
materialized over the full niw range (see :meth:`LocalNPoint.to_negative_niw_range`): the kernel is consumed in
whatever niw orientation it is handed (no internal ``to_full_niw_range``), and ``niw_index_w_pairs`` selects which
kernel w-slices to contract and how to shift the Green's function.
:param mpi_dist: MPI distributor providing the communicator and R-space pencil decomposition.
:param kernel: The self-energy kernel :math:`K` for one niw half (full BZ): the positive half (``w >= 0``) or the
negative block from :meth:`LocalNPoint.to_negative_niw_range` (``w = 0, -1, ..., -niw``).
:param giwk: The momentum-dependent :class:`GreensFunction`.
:param niw_index_w_pairs: The ``(kernel_w_index, w)`` pairs to contract. The positive pass passes
``[(i, i) for i in range(niw + 1)]`` (``w = 0..+niw``), the negative pass
``[(i, -i) for i in range(1, niw + 1)]`` (``w = -1..-niw``, skipping the ``w = 0`` duplicate).
:return: The rank-local R-space :class:`SelfEnergy` (compressed q, half niv range, moments not fitted).
"""
comm = mpi_dist.comm
rank = comm.Get_rank()
size = comm.Get_size()
nkx, nky, nkz = config.lattice.k_grid.nk
nk_tot = config.lattice.q_grid.nk_tot
nb = config.sys.n_bands
niv_core = config.box.niv_core
beta = config.sys.beta
# G(k) -> F[G](R), forward FFT, replicated on every rank
g_r_mat = giwk.fft().mat
# K(q) -> F[K](-R) via the conjugate trick: conj, fft, conj. The kernel is already a single niw half (positive
# half, or the negative block), so there is no to_full_niw_range here -- the full-niw kernel is never built.
kernel = kernel.to_half_niv_range()
kernel.mat = np.conj(kernel.mat)
kernel = mpi_utils.execute_distributed_fft(kernel, comm)
kernel.mat = np.conj(kernel.mat)
# Local R-space contraction; each rank owns a slice of R-points
n_r_local = kernel.mat.shape[0]
mat = np.zeros((n_r_local, nb, nb, niv_core), dtype=kernel.mat.dtype)
acc = np.empty_like(mat)
my_r_indices = mpi_utils.get_pencil_indices(rank, size, (nkx, nky, nkz), "flat")
g_r_local = g_r_mat.reshape(nk_tot, nb, nb, -1)[my_r_indices]
path = None
for idx, w in niw_index_w_pairs:
g_slice = g_r_local[..., giwk.niv - w : giwk.niv + niv_core - w]
k_slice = kernel.mat[..., idx, :]
if path is None: # slice shapes are identical across the loop -> compute the contraction path once
path = np.einsum_path("Radv,Raijdv->Rijv", g_slice, k_slice, optimize="optimal")[0]
np.einsum("Radv,Raijdv->Rijv", g_slice, k_slice, out=acc, optimize=path)
np.add(mat, acc, out=mat)
mat *= -0.5 / beta / nk_tot
return SelfEnergy(
np.ascontiguousarray(mat),
config.lattice.nk,
full_niv_range=False,
has_compressed_q_dimension=True,
calc_smom=False,
beta=config.sys.beta,
)
[docs]
def calculate_sigma_from_kernel_fft_gpu(
mpi_dist: MpiDistributor, kernel: FourPoint, giwk: GreensFunction, niw_index_w_pairs: list[tuple[int, int]]
) -> SelfEnergy:
r"""
Computes the self-energy using distributed FFTs, running on the GPU (CuPy). Same algorithm as
:func:`calculate_sigma_from_kernel_fft_cpu` (including the single-niw-half / ``niw_index_w_pairs`` contraction).
Returns :math:`\Sigma` in R-space, positive-:math:`\nu` half only; the caller must ifft over
:math:`(k_x, k_y, k_z)` and then call :meth:`SelfEnergy.to_full_niv_range` before use.
:param mpi_dist: MPI distributor providing the communicator and R-space pencil decomposition.
:param kernel: The self-energy kernel :math:`K` for one niw half (full BZ): the positive half or the negative
block from :meth:`LocalNPoint.to_negative_niw_range`.
:param giwk: The momentum-dependent :class:`GreensFunction`.
:param niw_index_w_pairs: The ``(kernel_w_index, w)`` pairs to contract (see
:func:`calculate_sigma_from_kernel_fft_cpu`).
:return: The rank-local R-space :class:`SelfEnergy` (compressed q, half niv range, moments not fitted).
"""
import cupy as cp
comm = mpi_dist.comm
rank = comm.Get_rank()
size = comm.Get_size()
nkx, nky, nkz = config.lattice.k_grid.nk
nk_tot = config.lattice.q_grid.nk_tot
nb = config.sys.n_bands
niv_core = config.box.niv_core
beta = config.sys.beta
# G(k) -> F[G](R), forward FFT, replicated on every rank
g_r_mat = cp.asarray(giwk.fft().mat)
# K(q) -> F[K](-R) via the conjugate trick: conj, fft, conj. The kernel is already a single niw half, so there is
# no to_full_niw_range here -- the full-niw kernel is never built.
kernel = kernel.to_half_niv_range()
kernel.mat = np.conj(kernel.mat)
kernel = mpi_utils.execute_distributed_fft(kernel, comm)
kernel.mat = cp.conj(cp.asarray(kernel.mat))
# Local R-space contraction; each rank owns a slice of R-points
n_r_local = kernel.mat.shape[0]
mat = cp.zeros((n_r_local, nb, nb, niv_core), dtype=kernel.mat.dtype)
my_r_indices = mpi_utils.get_pencil_indices(rank, size, (nkx, nky, nkz), "flat")
g_r_local = g_r_mat.reshape(nk_tot, nb, nb, -1)[cp.asarray(my_r_indices)]
for idx, w in niw_index_w_pairs:
g_slice = g_r_local[..., giwk.niv - w : giwk.niv + niv_core - w]
k_slice = kernel.mat[..., idx, :]
mat += cp.einsum("Radv,Raijdv->Rijv", g_slice, k_slice, optimize=True)
mat *= -0.5 / beta / nk_tot
return SelfEnergy(
np.ascontiguousarray(cp.asnumpy(mat)),
config.lattice.nk,
full_niv_range=False,
has_compressed_q_dimension=True,
calc_smom=False,
beta=config.sys.beta,
)
[docs]
def select_sigma_fft_device(mpi_distributor: MpiDistributor) -> bool:
"""
Detects whether CuPy and a usable CUDA device are available (one GPU per MPI rank, round-robin), selects this
rank's GPU and logs the decision **once**. Intended to be called a single time before the positive- and
negative-w FFT passes so the GPU-detection message is not emitted per pass.
:param mpi_distributor: MPI distributor providing the per-rank GPU choice.
:return: True if the GPU implementation should be used, False to fall back to the CPU implementation.
"""
cp = None
try:
import cupy as cp
except ImportError:
return False # CuPy not installed -> CPU
n_gpus = 0
try:
n_gpus = cp.cuda.runtime.getDeviceCount()
except cp.cuda.runtime.CUDARuntimeError:
n_gpus = 0 # no usable CUDA driver/device -> CPU
if n_gpus > 0 and cp.cuda.is_available():
config.logger.info(f"CuPy detected {n_gpus} GPU(s). Using GPU acceleration for self-energy calculation.")
cp.cuda.Device(mpi_distributor.my_rank % n_gpus).use()
return True
return False
[docs]
def calculate_sigma_from_kernel_fft(
mpi_dist: MpiDistributor,
kernel: FourPoint,
giwk: GreensFunction,
niw_index_w_pairs: list[tuple[int, int]],
use_gpu: bool,
) -> SelfEnergy:
r"""
Dispatches one bosonic-frequency FFT pass to the GPU (:func:`calculate_sigma_from_kernel_fft_gpu`) or CPU
(:func:`calculate_sigma_from_kernel_fft_cpu`) implementation according to ``use_gpu`` (decided once by
:func:`select_sigma_fft_device`, so no per-pass GPU-detection logging).
:param mpi_dist: MPI distributor providing the communicator and R-space pencil decomposition.
:param kernel: The self-energy kernel :math:`K` for one niw half (full BZ).
:param giwk: The momentum-dependent :class:`GreensFunction`.
:param niw_index_w_pairs: The ``(kernel_w_index, w)`` pairs to contract (see
:func:`calculate_sigma_from_kernel_fft_cpu`).
:param use_gpu: Whether to run the GPU implementation (as decided by :func:`select_sigma_fft_device`).
:return: The rank-local R-space :class:`SelfEnergy`.
"""
if use_gpu:
return calculate_sigma_from_kernel_fft_gpu(mpi_dist, kernel, giwk, niw_index_w_pairs)
return calculate_sigma_from_kernel_fft_cpu(mpi_dist, kernel, giwk, niw_index_w_pairs)
def _map_kernel_to_full_bz(
kernel: FourPoint, mpi_dist_irrk: MpiDistributor, mpi_dist_fullbz: MpiDistributor
) -> FourPoint:
"""
Maps the self-energy kernel from the irreducible to the full Brillouin zone, choosing the gather/scatter routine
or the fully distributed peer-to-peer routine according to ``config.memory.save_memory_for_chiq_aux``.
:param kernel: The self-energy kernel on the irreducible BZ.
:param mpi_dist_irrk: MPI distributor over the irreducible BZ q-points.
:param mpi_dist_fullbz: MPI distributor over the full BZ q-points.
:return: The kernel distributed over the full BZ.
"""
if not config.memory.save_memory_for_chiq_aux:
return mpi_utils.map_irrbz_fullbz(kernel, mpi_dist_irrk, mpi_dist_fullbz)
return mpi_utils.exchange_and_map_irrbz_fullbz(kernel, mpi_dist_irrk, mpi_dist_fullbz)
[docs]
def get_starting_sigma(default_sigma: SelfEnergy) -> tuple[SelfEnergy, int]:
"""
Tries to retrieve the last calculated self-energy from a previous self-consistency calculation as a starting point
for the next calculation. Whether the normal or interpolated sigma is chosen depends on the setting. If no
``sigma_dga_*_N.npy`` file is found, we use the DMFT self-energy as a starting point.
:param default_sigma: The fallback (DMFT) :class:`SelfEnergy` used when no previous result is found.
:return: A tuple of the starting :class:`SelfEnergy` (cut to the core box and interpolated onto the k-grid) and
the iteration number it was taken from (0 if none found).
"""
previous_sc_path = config.self_consistency.previous_sc_path
if previous_sc_path is None or previous_sc_path == "" or not os.path.exists(previous_sc_path):
return default_sigma, 0
if config.self_consistency.use_interpolated_sigma:
glob_pattern = "sigma_dga_interpolated_*_iteration_*.npy"
iteration_regex = re.compile(r"sigma_dga_interpolated_.+_iteration_(\d+)\.npy$")
else:
glob_pattern = "sigma_dga_iteration_*.npy"
iteration_regex = re.compile(r"sigma_dga_iteration_(\d+)\.npy$")
files = glob.glob(os.path.join(previous_sc_path, glob_pattern))
if not files or len(files) == 0:
return default_sigma, 0
iterations = [(int(match.group(1)), f) for f in files if (match := iteration_regex.search(f))]
if not iterations or len(iterations) == 0:
return default_sigma, 0
max_iter, max_file = max(iterations, key=lambda x: x[0])
mat = np.load(max_file)
return (
SelfEnergy(mat, mat.shape[:3], True, False, beta=config.sys.beta)
.cut_niv(config.box.niv_core)
.interpolate_q_grid(config.lattice.k_grid.nk, False),
max_iter,
)
def _init_mu_history(starting_iter: int) -> list[float]:
r"""
Seeds the chemical-potential history for the self-consistency loop. For a fresh run (``starting_iter == 0``) the
history starts at the current (DMFT) chemical potential :math:`\mu`. When resuming from a previous self-consistency
calculation it is seeded with that run's last :math:`\mu` (from ``mu_history.npy``) and the global ``config.sys.mu``
is synced to it: otherwise ``config.sys.mu`` would stay at the DMFT value while ``giwk_full`` is built with the
previous run's :math:`\mu`, and any quantity computed from the global (e.g. the lattice filling in
:meth:`GreensFunction.get_fill_nonlocal`, which now reads ``self._mu``) would use an inconsistent chemical potential.
:param starting_iter: The iteration the previous calculation stopped at (0 for a fresh run).
:return: The single-element chemical-potential history list.
"""
if starting_iter == 0:
return [config.sys.mu]
previous_mu = float(np.load(os.path.join(config.self_consistency.previous_sc_path, "mu_history.npy"))[-1])
config.sys.mu = previous_mu
return [previous_mu]
[docs]
def read_last_n_sigmas_from_files(n: int, output_path: str = "./", previous_sc_path: str = "./") -> list[np.ndarray]:
"""
Reads the last ``n`` total self-energies from the output directory and - if specified - the previous
self-consistency path. This is used for the Pulay/Anderson mixing schemes. If one has a history of self-energies
from a previous calculation, these will be used as well.
:param n: Number of most recent self-energies to read.
:param output_path: Directory holding the current run's ``sigma_dga_iteration_*.npy`` files.
:param previous_sc_path: Directory of a previous self-consistency run to prepend to the history (if set).
:return: A list of self-energy arrays (cut to the core box and interpolated onto the k-grid), oldest first.
"""
def _get_top_n_files(path: str, pattern: str, regex: re.Pattern) -> list[tuple[int, str]]:
"""
Finds the ``n`` highest-iteration files in ``path`` matching ``pattern``/``regex``.
:param path: Directory to search.
:param pattern: Glob pattern selecting candidate files.
:param regex: Regex whose first group captures the iteration number.
:return: A list of ``(iteration, filepath)`` tuples, sorted ascending, truncated to the last ``n``.
"""
files = glob.glob(os.path.join(path, pattern))
matched = [(int(match.group(1)), f) for f in files if (match := regex.search(f))]
return sorted(matched, key=lambda x: x[0])[-n:]
interp_pattern = "sigma_dga_interpolated_*_iteration_*.npy"
interp_regex = re.compile(r"sigma_dga_interpolated_.+_iteration_(\d+)\.npy$")
normal_pattern = "sigma_dga_iteration_*.npy"
normal_regex = re.compile(r"sigma_dga_iteration_(\d+)\.npy$")
last_iterations_previous_dir = []
if previous_sc_path and previous_sc_path.strip():
if config.self_consistency.use_interpolated_sigma:
last_iterations_previous_dir = _get_top_n_files(previous_sc_path, interp_pattern, interp_regex)
else:
last_iterations_previous_dir = _get_top_n_files(previous_sc_path, normal_pattern, normal_regex)
last_iterations_current_dir = _get_top_n_files(output_path, normal_pattern, normal_regex)
last_iterations = (last_iterations_previous_dir + last_iterations_current_dir)[-n:]
sigmas = []
for _, file in last_iterations:
sigma_mat = np.load(file)
sigmas.append(
SelfEnergy(sigma_mat, sigma_mat.shape[:3], True, False, False, False, beta=config.sys.beta)
.cut_niv(config.box.niv_core)
.interpolate_q_grid(config.lattice.k_grid.nk, False)
.mat
)
return sigmas
[docs]
def calculate_self_energy_q(
comm: MPI.Comm, u_loc: LocalInteraction, v_nonloc: Interaction, sigma_dmft: SelfEnergy, sigma_local: SelfEnergy
) -> SelfEnergy:
r"""
Runs the non-local DGA self-energy calculation. Calculates the Hartree- and Fock terms, the bubble,
the double-counting correction and the kernel in the density and magnetic channel. Finally, calculates the
non-local self-energy from the kernel and the Green's function. Also takes care of the self-consistency loop and
the chemical potential adjustment as well as the self-energy mixing, etc.
:param comm: The MPI communicator.
:param u_loc: The bare local interaction :math:`U`.
:param v_nonloc: The non-local interaction :math:`V^{q}`.
:param sigma_dmft: The DMFT self-energy (used as the starting point and for the shell/tail correction).
:param sigma_local: The locally recomputed self-energy (used for the double-counting :math:`\Delta\Sigma`).
:return: The converged (or last-iteration) momentum-dependent DGA :class:`SelfEnergy`.
"""
logger = config.logger
logger.info("Starting with non-local DGA routine.")
logger.info("Initializing MPI distributor.")
# MPI distributor for the irreducible BZ
mpi_dist_irrk = MpiDistributor.create_distributor(
ntasks=config.lattice.q_grid.nk_irr, comm=comm, name="Q", output_path=config.output.output_path
)
full_q_list = config.lattice.q_grid.get_q_list()
irrk_q_list = config.lattice.q_grid.get_irrq_list()
my_irr_q_list = irrk_q_list[mpi_dist_irrk.my_slice]
mpi_dist_fullbz = MpiDistributor.create_distributor(
ntasks=config.lattice.q_grid.nk_tot, comm=comm, name="FBZ", output_path=config.output.output_path
)
my_full_q_list = full_q_list[mpi_dist_fullbz.my_slice]
sigma_old, starting_iter = get_starting_sigma(sigma_dmft)
if starting_iter > 0:
logger.info(
f"Using previous calculation and starting the self-consistency loop at iteration {starting_iter + 1}."
)
mu_history = _init_mu_history(starting_iter)
niv_cut = min(config.box.niw_core + config.box.niv_full + 10, config.box.niv_dmft)
sigma_dmft_full = deepcopy(sigma_dmft)
if comm.rank == 0:
giwk_full_dmft = GreensFunction.get_g_full(
sigma_dmft_full, config.sys.mu_dmft, config.lattice.hamiltonian.get_ek(), config.sys.beta
)
giwk_full_dmft.save(output_dir=config.output.output_path, name="g_latt_dmft")
giwk_full_dmft.free()
sigma_old = sigma_old.concatenate_self_energies(sigma_dmft_full)
giwk_full = GreensFunction.get_g_full(
sigma_old, mu_history[-1], config.lattice.hamiltonian.get_ek(), config.sys.beta
)
config.sys.n, config.sys.occ, config.sys.occ_k = giwk_full.get_fill_nonlocal()
giwk_full.cut_niv(niv_cut)
config.sys.n, config.sys.occ, config.sys.occ_k = comm.bcast(
(config.sys.n, config.sys.occ, config.sys.occ_k), root=0
)
sigma_old = sigma_old.cut_niv(niv_cut)
sigma_dmft = sigma_dmft.cut_niv(niv_cut)
if sigma_old.niv < niv_cut:
sigma_old = sigma_old.concatenate_self_energies(sigma_dmft)
delta_sigma = sigma_dmft.cut_niv(config.box.niv_core) - sigma_local.cut_niv(config.box.niv_core)
v_nonloc_full = deepcopy(v_nonloc)
v_nonloc = v_nonloc.reduce_q(my_irr_q_list)
for current_iter in range(starting_iter + 1, starting_iter + config.self_consistency.max_iter + 1):
logger.info("----------------------------------------")
logger.info(f"Starting iteration {current_iter}.")
logger.info("----------------------------------------")
hartree, fock = get_hartree_fock(u_loc, v_nonloc_full, my_full_q_list)
fock = mpi_dist_fullbz.allreduce(fock)
logger.info("Calculated Hartree and Fock terms.")
giwk_full = GreensFunction.get_g_full(
sigma_old, mu_history[-1], config.lattice.hamiltonian.get_ek(), config.sys.beta
)
logger.log_memory_usage("giwk", giwk_full, comm.size)
if config.memory.save_memory_for_chi0q:
gchi0_q = BubbleGenerator.create_generalized_chi0_q_auto(
mpi_dist_irrk,
giwk_full,
config.box.niw_core,
config.box.niv_full,
my_irr_q_list,
config.lattice.q_grid,
config.sys.beta,
config.logger,
)
else:
gchi0_q = BubbleGenerator.create_generalized_chi0_q_fft_auto(
mpi_dist_irrk,
giwk_full,
config.box.niw_core,
config.box.niv_full,
config.lattice.k_grid,
config.sys.beta,
config.logger,
)
if config.eliashberg.perform_eliashberg:
gchi0_q.save(name=f"gchi0_q_rank_{comm.rank}", output_dir=config.output.output_path)
logger.log_memory_usage("Gchi0_q_full", gchi0_q, comm.size)
giwk_full = giwk_full.cut_niv(config.box.niv_core + config.box.niw_core)
# sigma_old is not read again until the mixing step at the end of the iteration, and its shell
# [niv_core, niv_cut) is always the (k-independent) DMFT self-energy. Shrink the full-grid sigma_old to its core
# here -- freeing the broadcast shell through the kernel/SDE/fq peak -- and reconstruct the shell from sigma_dmft
# just before mixing, which restores it to niv_cut bit-for-bit.
sigma_old = sigma_old.cut_niv(config.box.niv_core)
f_dc_loc = 2 * LocalFourPoint.load(os.path.join(config.output.output_path, "f_magn_loc.npy")).permute_orbitals(
"abcd->cbad"
)
kernel = -calculate_sigma_dc_kernel(f_dc_loc, gchi0_q, u_loc)
f_dc_loc.free()
logger.info("Calculated double-counting kernel.")
gchi0_q_full_sum = 1.0 / config.sys.beta * gchi0_q.sum_over_all_vn(config.sys.beta)
gchi0_q_core = gchi0_q.cut_niv(config.box.niv_core)
gchi0_q.free()
logger.log_memory_usage("Gchi0_q_core", gchi0_q_core, comm.size)
gchi0_q_core_inv = deepcopy(gchi0_q_core).invert(False)
logger.log_memory_usage("Gchi0_q_inv", gchi0_q_core_inv, comm.size)
if current_iter == 1:
calculate_and_save_chi_q_r_rpa(gchi0_q_core_inv, u_loc, v_nonloc, mpi_dist_irrk)
if config.eliashberg.perform_eliashberg:
gchi0_q_core_inv.save(name=f"gchi0_q_inv_rank_{comm.rank}", output_dir=config.output.eliashberg_path)
gchi0_q_core_sum = 1.0 / config.sys.beta * gchi0_q_core.sum_over_all_vn(config.sys.beta)
gchi0_q_core.free()
gamma_dens = LocalFourPoint.load(
os.path.join(config.output.output_path, "gamma_dens_loc.npy"), SpinChannel.DENS
)
kernel += calculate_sigma_kernel_r_q(
gamma_dens, gchi0_q_core_inv, gchi0_q_full_sum, gchi0_q_core_sum, u_loc, v_nonloc, mpi_dist_irrk
)
gamma_dens.free()
mpi_dist_irrk.barrier()
logger.info("Calculated kernel for density channel.")
gamma_magn = LocalFourPoint.load(
os.path.join(config.output.output_path, "gamma_magn_loc.npy"), SpinChannel.MAGN
)
kernel += 3 * calculate_sigma_kernel_r_q(
gamma_magn, gchi0_q_core_inv, gchi0_q_full_sum, gchi0_q_core_sum, u_loc, v_nonloc, mpi_dist_irrk
)
gchi0_q_core_inv.free()
gchi0_q_full_sum.free()
gchi0_q_core_sum.free()
gamma_magn.free()
logger.info("Calculated kernel for magnetic channel.")
logger.info("Started calculation of DGA self-energy.")
if config.memory.save_memory_for_sde:
# q-loop path: the kernel is mapped to the full BZ once (the full niw range is handled inside the q-loop).
kernel = _map_kernel_to_full_bz(kernel, mpi_dist_irrk, mpi_dist_fullbz)
logger.info("Kernel mapped to full BZ.")
sigma_new = calculate_sigma_from_kernel_auto(mpi_dist_fullbz, kernel, giwk_full, my_full_q_list)
kernel.free(trim=True) # coarse per-iteration trim: return the full-BZ kernel peak to the OS
sigma_new.mat = mpi_dist_fullbz.allreduce(sigma_new.mat)
else:
# FFT path: split the bosonic-frequency sum into a positive- and a negative-w pass so the full-BZ
# kernel is only ever materialized over half the niw range AND only one niw half exists at a time. The
# small irreducible-BZ kernel is kept and mapped to the full BZ separately for each pass; the negative
# block's time-reversal is applied *after* the irreducible->full-BZ unfold (not before). The unfold applies
# a per-k orbital rotation U (complex for multi-band) plus an optional conjugation (see
# :meth:`IAmNonLocal._map_to_full_bz`), and ``to_negative_niw_range`` conjugates the data, so it must see
# the already-unfolded kernel (conj of U*..U^T), exactly as the q-loop path and the original single-pass
# implementation did. Flipping the irreducible-BZ kernel first would leave U un-conjugated and corrupt
# sigma on the symmetry-folded q-points.
niw = config.box.niw_core
kernel_irr = kernel # the (small) irreducible-BZ positive-w kernel, mapped to the full BZ once per pass
# Decide CPU/GPU (and select the GPU) once, so the detection is not logged for each of the two passes.
use_gpu = select_sigma_fft_device(mpi_dist_fullbz)
# positive pass: map a copy of the irr-BZ kernel to the full BZ and contract w = 0..+niw. The deep copy
# keeps ``kernel_irr`` (small) intact for the negative pass, which maps it again -- so only a single
# full-BZ niw half is ever resident. ``_map_kernel_to_full_bz`` may either mutate-and-return its argument
# or return a fresh object, so free the source copy only when a distinct object came back.
kernel_pos_irr = deepcopy(kernel_irr)
kernel_pos = _map_kernel_to_full_bz(kernel_pos_irr, mpi_dist_irrk, mpi_dist_fullbz)
if kernel_pos is not kernel_pos_irr:
kernel_pos_irr.free()
sigma_new = calculate_sigma_from_kernel_fft(
mpi_dist_irrk, kernel_pos, giwk_full, [(i, i) for i in range(niw + 1)], use_gpu
)
kernel_pos.free()
# negative pass: map the irr-BZ kernel to the full BZ, time-reverse the full-BZ result (w = 0, -1, ...,
# -niw), and contract w = -1..-niw (skip the w=0 duplicate).
kernel_neg_full = _map_kernel_to_full_bz(kernel_irr, mpi_dist_irrk, mpi_dist_fullbz)
if kernel_neg_full is not kernel_irr:
kernel_irr.free() # the irr-BZ kernel is no longer needed once the full-BZ negative source exists
kernel_neg = kernel_neg_full.to_negative_niw_range()
kernel_neg_full.free() # release the full-BZ positive copy as soon as the negative block is built
sigma_neg = calculate_sigma_from_kernel_fft(
mpi_dist_irrk, kernel_neg, giwk_full, [(i, -i) for i in range(1, niw + 1)], use_gpu
)
kernel_neg.free(trim=True) # coarse per-iteration trim: return the full-BZ kernel peak to the OS
sigma_new.mat += sigma_neg.mat # accumulate the rank-local R-space partial self-energies (in place)
sigma_neg.free()
sigma_new.mat = mpi_dist_fullbz.gather(sigma_new.mat)
if comm.rank == 0:
sigma_new = sigma_new.ifft().to_full_niv_range()
sigma_new = mpi_dist_fullbz.bcast_npoint(sigma_new)
logger.info("Self-energy calculated from kernel.")
logger.log_memory_usage("Non-local sigma", sigma_new, comm.size)
sigma_new = sigma_new + hartree + fock
logger.info("Full non-local self-energy calculated.")
# This is done to minimize noise. We remove some fluctuations from dmft that are included in the local self-energy
# calculated in this code and add the smooth dmft self-energy
sigma_new += delta_sigma
sigma_new = sigma_new.concatenate_self_energies(sigma_dmft)
# delta_sigma = sigma_dmft.cut_niv(config.box.niv_core) - sigma_new.q_mean().cut_niv(config.box.niv_core)
logger.info("Applying mixing strategy to the self-energy.")
# Restore sigma_old's DMFT shell (cut after the bubble for memory) so the mixing and the residual below see the
# full niv_cut self-energy exactly as before.
sigma_old = sigma_old.concatenate_self_energies(sigma_dmft)
sigma_new = apply_mixing_strategy(sigma_new, sigma_old, sigma_dmft, current_iter)
sigma_new = sigma_new.compress_q_dimension()
sigma_old = sigma_old.compress_q_dimension()
# Canonical self-consistency residual. This is the relative (L2) residual used for the convergence check
sigma_new_test = sigma_new.mat[..., sigma_new.niv : sigma_new.niv + config.box.niv_core]
sigma_old_test = sigma_old.mat[..., sigma_new.niv : sigma_old.niv + config.box.niv_core]
diff = (sigma_new_test - sigma_old_test).ravel()
norm_x = np.linalg.norm(np.concatenate([sigma_old_test.real.ravel(), sigma_old_test.imag.ravel()]))
relative_residual = np.linalg.norm(np.concatenate([diff.real, diff.imag])) / norm_x
old_mu = mu_history[-1]
if comm.rank == 0:
config.sys.mu = update_mu(
old_mu,
config.sys.n,
giwk_full.ek,
sigma_new.mat,
config.sys.beta,
sigma_new.fit_smom()[0],
logger=logger,
)
config.sys.mu = comm.bcast(config.sys.mu)
mu_history.append(config.sys.mu)
logger.info(f"Updated mu from {old_mu} to {config.sys.mu}.")
if comm.rank == 0:
sigma_occ = deepcopy(sigma_new).concatenate_self_energies(sigma_dmft_full)
giwk_occ = giwk_full.get_g_full(
sigma_occ, config.sys.mu, config.lattice.hamiltonian.get_ek(), config.sys.beta
)
# calculate new occupation matrix from new Green's function (outside asympt region it is the DMFT
# lattice Green's function)
_, config.sys.occ, config.sys.occ_k = giwk_occ.get_fill_nonlocal() # n should not change
ekin = giwk_occ.get_ekin()
logger.info(f"Kinetic energy: {ekin:.4f} [t or eV].")
epot = giwk_occ.get_epot()
logger.info(f"Potential energy: {epot:.4f} [t or eV].")
logger.info(f"Total energy: {(ekin + epot):.4f} [t or eV].")
config.sys.occ, config.sys.occ_k = comm.bcast((config.sys.occ, config.sys.occ_k), root=0)
if config.self_consistency.max_iter > 1:
logger.info("Updated occupation matrix from new Green's function.")
if comm.rank == 0:
sigma_new.decompress_q_dimension().save(
name=f"sigma_dga_iteration_{current_iter}", output_dir=config.output.output_path
)
logger.info(f"Saved sigma for iteration {current_iter}.")
if config.self_energy_interpolation.do_interpolation:
beta_target = config.self_energy_interpolation.beta_target
niv_target = config.self_energy_interpolation.niv_target
sigma_new.decompress_q_dimension().interpolate(beta_target, niv_target).save(
name=f"sigma_dga_interpolated_beta{beta_target}_niv{niv_target}_iteration_{current_iter}",
output_dir=config.output.output_path,
)
logger.info(
f"Interpolated sigma for iteration {current_iter} to beta={beta_target} and niv={niv_target}."
)
logger.info("Checking self-consistency convergence.")
if comm.rank == 0 and current_iter > starting_iter + 1:
sigma_converged = abs(relative_residual) < config.self_consistency.epsilon
logger.info(
f"Self-energy convergence: {sigma_converged} "
f"(relative residual={relative_residual:.3e}, epsilon={config.self_consistency.epsilon:.3e})."
)
mu_converged = abs(mu_history[-1] - mu_history[-2]) < np.pi / (10 * config.sys.beta)
logger.info(f"Chemical potential convergence: {mu_converged}.")
converged = mu_converged and sigma_converged
else:
converged = False
converged = comm.bcast(converged)
sigma_old = sigma_new
if converged:
if config.self_consistency.restrict_chi_phys:
logger.info(
"ATTENTION: Self-consistency with restricted susceptibility reached. Disabling restriction."
)
config.self_consistency.restrict_chi_phys = False
else:
logger.info(f"Self-consistency of sigma and mu reached at iteration {current_iter}.")
break
logger.info("Self-consistency not reached.")
mpi_dist_irrk.delete_file()
mpi_dist_fullbz.delete_file()
np.save(os.path.join(config.output.output_path, "mu_history.npy"), mu_history)
logger.info("Saved mu history as numpy array.")
return sigma_old
[docs]
def apply_mixing_strategy(
sigma_new: SelfEnergy, sigma_old: SelfEnergy, sigma_dmft: SelfEnergy, current_iter: int
) -> SelfEnergy:
"""
Applies the self-energy mixing strategy for the self-consistency loop. Supports linear mixing as well as the
accelerated Pulay (DIIS) and Anderson schemes (which use the self-energy history read from file); the accelerated
schemes fall back to linear mixing when their least-squares problem is ill-conditioned or the history is too short.
The mixing strategy and parameters are taken from the config.
:param sigma_new: The freshly computed self-energy proposal.
:param sigma_old: The previous iteration's self-energy.
:param sigma_dmft: The DMFT self-energy (used to seed the proposal history for the accelerated schemes).
:param current_iter: The current self-consistency iteration number.
:return: The mixed :class:`SelfEnergy` for the next iteration.
"""
logger = config.logger
n_hist = config.self_consistency.mixing_history_length
alpha = config.self_consistency.mixing
last_results, last_proposals = [], []
if config.self_consistency.mixing_strategy.lower() in ("pulay", "anderson"):
last_results = read_last_n_sigmas_from_files(
n_hist, config.output.output_path, config.self_consistency.previous_sc_path
)
sigma_dmft_stacked = np.tile(
sigma_dmft.cut_niv(config.box.niv_core).mat, (config.lattice.k_grid.nk_tot, 1, 1, 1)
)
last_proposals = [sigma_dmft_stacked] + last_results # [dmft, s1, ..., s_{n-1}]
last_results = last_results + [sigma_new.cut_niv(config.box.niv_core).mat] # [s1, s2, ..., s_n]
logger.info(f"Loaded last {n_hist} self-energies from files.")
accelerated_mixing_condition = current_iter > n_hist and len(last_results) > n_hist and len(last_proposals) > n_hist
if config.self_consistency.mixing_strategy.lower() == "pulay" and accelerated_mixing_condition:
shape = last_results[-1].shape
n_total = int(np.prod(shape))
r_matrix = np.zeros((2 * n_total, n_hist), dtype=np.float64)
f_matrix = np.zeros_like(r_matrix)
f_i = np.zeros((2 * n_total), dtype=np.float64)
def get_proposal(idx: int):
"""
Fetches a flattened proposal self-energy from the history.
:param idx: Index into the proposal history.
:return: The flattened proposal self-energy at ``idx``.
"""
return last_proposals[idx].flatten()
def get_result(idx: int):
"""
Fetches a flattened result self-energy from the history.
:param idx: Index into the result history.
:return: The flattened result self-energy at ``idx``.
"""
return last_results[idx].flatten()
for i in range(n_hist):
proposal_diff = get_proposal(-1 - i) - get_proposal(-2 - i)
r_matrix[:n_total, i] = proposal_diff.real
r_matrix[n_total:, i] = proposal_diff.imag
result_diff = get_result(-1 - i) - get_result(-2 - i)
f_matrix[:n_total, i] = result_diff.real
f_matrix[n_total:, i] = result_diff.imag
f_matrix[:, i] -= r_matrix[:, i]
# Residual: F(x_n) - x_n, where x_n = last_proposals[-1] = sigma_old (core window)
iter_diff = get_result(-1) - get_proposal(-1)
f_i[:n_total] = iter_diff.real
f_i[n_total:] = iter_diff.imag
norm_f = np.linalg.norm(f_i)
# Solve min||F @ c - f_i|| via truncated-SVD pseudoinverse (drops collinear directions)
u, s, vh = np.linalg.svd(f_matrix, full_matrices=False)
cutoff = 1e-5 * (s[0] if s.size else 1.0)
mask = s > cutoff
if not np.any(mask):
logger.warning("Pulay SVD ill-conditioned - falling back to linear mixing.")
return alpha * sigma_new + (1 - alpha) * sigma_old
coeffs = vh[mask].T @ ((u[:, mask].T @ f_i) / s[mask])
# Pulay update: x_{n+1} = x_n + alpha*f_i - (R + alpha*F) @ c
update = alpha * f_i - (r_matrix + alpha * f_matrix) @ coeffs
norm_u = np.linalg.norm(update)
if norm_f > 0 and norm_u > 10.0 * norm_f:
update *= 10.0 * norm_f / norm_u
logger.warning(f"Pulay step clamped (norm_u={norm_u:.3e}, norm_f={norm_f:.3e}).")
update = update[:n_total] + 1j * update[n_total:]
# Update the new self energy
niv = sigma_new.niv
niv_core = config.box.niv_core
sigma_new.mat[..., niv - niv_core : niv + niv_core] = get_proposal(-1).reshape(shape) + update.reshape(shape)
logger.info(f"Pulay mixing applied (m={n_hist}, alpha={alpha:.3f}, norm_f={norm_f:.3e}).")
return sigma_new
if config.self_consistency.mixing_strategy.lower() == "anderson" and accelerated_mixing_condition:
shape = last_results[-1].shape
n_total = int(np.prod(shape))
flat = lambda x: x.reshape(-1)
# Current residual f_n = F(x_n) - x_n
f_curr = flat(last_results[-1]) - flat(last_proposals[-1])
f_vec = np.concatenate([f_curr.real, f_curr.imag])
norm_f = np.linalg.norm(f_vec)
# Build dX and dF matrices (n_hist columns)
# dX[:,i] = x_{n-i} - x_{n-i-1} (proposal differences)
# dF[:,i] = f_{n-i} - f_{n-i-1} (residual differences)
dx_cols = []
df_cols = []
for i in range(n_hist):
dx = flat(last_proposals[-1 - i]) - flat(last_proposals[-2 - i])
dx_cols.append(np.concatenate([dx.real, dx.imag]))
df_i = flat(last_results[-1 - i]) - flat(last_proposals[-1 - i])
df_im1 = flat(last_results[-2 - i]) - flat(last_proposals[-2 - i])
df = df_i - df_im1
df_cols.append(np.concatenate([df.real, df.imag]))
dx_matrix = np.column_stack(dx_cols) # (2*n_total, n_hist)
df_matrix = np.column_stack(df_cols) # (2*n_total, n_hist)
# Anderson: solve min ||f_curr - dF @ c||
try:
u, s, vh = np.linalg.svd(df_matrix, full_matrices=False)
s_max = s[0] if len(s) > 0 else 1.0
cutoff = 1e-5 * s_max
mask = s > cutoff
if not np.any(mask):
raise np.linalg.LinAlgError("All singular values below threshold.")
s_reg = s[mask] / (s[mask] ** 2 + cutoff**2)
coeffs = vh[mask].T @ (s_reg * (u[:, mask].T @ f_vec))
except np.linalg.LinAlgError:
logger.warning("Anderson SVD failed - falling back to linear mixing.")
return alpha * sigma_new + (1 - alpha) * sigma_old
# Undamped Anderson proposal: x_n + f_n - (dX + dF) @ c
x_n = flat(last_proposals[-1])
x_anderson = np.concatenate([x_n.real, x_n.imag]) + f_vec - (dx_matrix + df_matrix) @ coeffs
x_anderson = x_anderson[:n_total] + 1j * x_anderson[n_total:]
# Damp between old proposal and Anderson proposal
x_n_complex = x_n
candidate = (1 - alpha) * x_n_complex + alpha * x_anderson.reshape(-1)
# Safety clamp
update = candidate - x_n_complex
norm_u = np.linalg.norm(update)
if norm_f > 0 and norm_u > 3.0 * norm_f:
candidate = x_n_complex + update * (3.0 * norm_f / norm_u)
logger.warning(f"Anderson step clamped (norm_u={norm_u:.3e}, norm_f={norm_f:.3e}).")
# Update the new self energy
niv = sigma_new.niv
niv_core = config.box.niv_core
sigma_new.mat[..., niv - niv_core : niv + niv_core] = candidate.reshape(shape)
logger.info(f"Anderson acceleration applied (m={n_hist}, alpha={alpha:.3f}, norm_f={norm_f:.3e}).")
return sigma_new
sigma_new = alpha * sigma_new + (1 - alpha) * sigma_old
logger.info(f"Sigma linearly mixed (m=1, alpha={alpha}).")
return sigma_new