Source code for dgamore.nonlocal_sde

# SPDX-FileCopyrightText: 2025-2026 Julian Peil <julian.peil@tuwien.ac.at>
# SPDX-License-Identifier: MIT
#
# DGAmore — Multi-Orbital Ladder Dynamical Vertex Approximation (LDGA) &
#           Eliashberg Equation Solver for Strongly Correlated Electron Systems
r"""
Non-local ladder DGA step — the parallel-heavy core of the code. Starting from the local irreducible vertex
:math:`\Gamma_{r}` and the bare interaction, the functions here build, per momentum :math:`q` and spin channel,
the bubble :math:`\chi_0^q`, the auxiliary susceptibility :math:`\chi^{*;q}_{r}`, the three-leg vertex
:math:`\gamma^q_{r}`, the physical susceptibility :math:`\chi^q_{r}` (with shell and optional
:math:`\lambda`-correction) and the self-energy kernel, then contract the kernel with the Green's function to get
the momentum-dependent self-energy :math:`\Sigma(k, \nu)`. Several CPU/GPU/FFT variants of the heavy contractions
are provided, distributed over MPI ranks. The whole thing is wrapped in a self-consistency loop with chemical-
potential adjustment and self-energy mixing (linear / Pulay / Anderson). Equation numbers refer to the author's
master's thesis (Chapters 3 & 4).
"""

import glob
import os
import re
from copy import deepcopy

import mpi4py.MPI as MPI
import numpy as np
from scipy import optimize as opt

import dgamore.config as config
import dgamore.lambda_correction as lc
import dgamore.mpi_utils as mpi_utils
from dgamore.brillouin_zone import KGrid
from dgamore.bubble_gen import BubbleGenerator
from dgamore.four_point import FourPoint
from dgamore.greens_function import GreensFunction, update_mu
from dgamore.interaction import LocalInteraction, Interaction
from dgamore.local_four_point import LocalFourPoint
from dgamore.matsubara_frequencies import MFHelper
from dgamore.mpi_utils import MpiDistributor
from dgamore.n_point_base import SpinChannel
from dgamore.self_energy import SelfEnergy


[docs] def get_hartree_fock( u_loc: LocalInteraction, v_nonloc: Interaction, q_list: np.ndarray ) -> tuple[np.ndarray, np.ndarray]: r""" Returns the Hartree-Fock term separately for the local and non-local interaction. Since we are always SU(2)-symmetric, the sum over the spins of the first term in Eq. (4.55) in Anna Galler's thesis results in a simple factor of 2. This can be seen in my master's thesis, Eq. (3.55). The Hartree-Fock term is given by .. math:: \Sigma_{HF}^k = 2(U_{acbd} + V^{q=0}_{acbd}) n_{dc} - 1/N_q \sum_q (U_{adcb} + V^{q}_{adcb}) n^{k-q}_{dc} where the Hartree term reads :math:`\Sigma_{H} = 2(U_{acbd} + V^{q=0}_{acbd}) n_{dc}` and the Fock term reads :math:`\Sigma_{F}^k = - 1/N_q \sum_q (U_{adcb} + V^{q}_{adcb}) n^{k-q}_{dc}`. The Hartree contraction uses the middle-index-swapped ``U_{acbd}`` so it picks up the inter-orbital density :math:`U'` (stored at :math:`U_{abab}`); see :func:`dgamore.local_sde.get_local_hartree_fock`. Processes the Fock term for each individual orbital to save memory, as for high momentum grids, the ``occ_qk`` property can become large. :param u_loc: The bare local interaction :math:`U`. :param v_nonloc: The non-local interaction :math:`V^{q}` (see :class:`Interaction`). :param q_list: Array of integer q-point index triplets handled by this rank. :return: The tuple ``(hartree, fock)`` of self-energy contributions, broadcastable to ``[k, o1, o2, v]``. """ v_q0 = v_nonloc.find_q((0, 0, 0)) # The inter-orbital density U' is stored at U_{abab}, so the Hartree term contracts "qacbd" (swapping the middle # orbital indices) to pick it up; the Fock term below uses U_{adcb}. See local_sde.get_local_hartree_fock. hartree = 2 * (u_loc + v_q0).times("qacbd,dc->ab", config.sys.occ) nb = config.sys.n_bands nk_tot = np.prod(config.lattice.nk) nq_tot = np.prod(config.lattice.nq) uq = (u_loc + v_nonloc.reduce_q(q_list)).permute_orbitals("abcd->adcb") # (nq,a,d,c,b) fock = np.zeros((nk_tot, nb, nb), dtype=uq.mat.dtype) for d in range(nb): for c in range(nb): u_slice = uq[:, :, d, c, :] if not np.any(u_slice): continue occ_qk_dc = np.array( [np.roll(config.sys.occ_k[..., d, c], [-i for i in q], axis=(0, 1, 2)) for q in q_list] ) occ_qk_dc = occ_qk_dc.reshape(len(q_list), nk_tot) contribution = u_slice[:, None, :, :] * occ_qk_dc[:, :, None, None] fock += contribution.sum(axis=0) fock *= -1.0 / nq_tot return hartree[None, ..., None], fock[..., None] # [k,o1,o2,v]
[docs] def create_auxiliary_chi_r_q( gamma_r: LocalFourPoint, gchi0_q_inv: FourPoint, u_loc: LocalInteraction, v_nonloc: Interaction ) -> FourPoint: r""" Returns the auxiliary susceptibility, see Eq. (3.60) in my master's thesis, .. math:: \chi^{*;q\nu\nu'}_{r;abcd} = ((\chi_{0;abcd}^{q\nu})^{-1} + (\Gamma_{r;abcd}^{\omega\nu\nu'}-U_{r;abcd}-V_{r;abcd}^q)/\beta^2)^{-1}. :param gamma_r: The local irreducible vertex :math:`\Gamma_{r}`. :param gchi0_q_inv: The inverse bare bubble :math:`(\chi_0^q)^{-1}` (core box). :param u_loc: The bare local interaction :math:`U`. :param v_nonloc: The non-local interaction :math:`V^{q}`. :return: The momentum-dependent auxiliary susceptibility :math:`\chi^{*;q}_{r}` as a :class:`FourPoint`. """ return ( (gchi0_q_inv + 1.0 / config.sys.beta**2 * gamma_r) - 1.0 / config.sys.beta**2 * (v_nonloc.as_channel(gamma_r.channel) + u_loc.as_channel(gamma_r.channel)) ).invert(False)
[docs] def create_auxiliary_chi_r_q_sum_v1( gamma_r: LocalFourPoint, gchi0_q_inv: FourPoint, u_loc: LocalInteraction, v_nonloc: Interaction ) -> FourPoint: r""" Returns the sum over the auxiliary susceptibility, see Eq. (3.60) in my master's thesis. This variant inverts and sums over the last fermionic frequency in one fused step (see :meth:`FourPoint.invert_and_sum_over_last_vn`), .. math:: \sum_{\nu'}\chi^{*;q\nu\nu'}_{r;abcd} = \sum_{\nu'}((\chi_{0;abcd}^{q\nu})^{-1} + (\Gamma_{r;abcd}^{\omega\nu\nu'}-U_{r;abcd}-V_{r;abcd}^q)/\beta^2)^{-1}. :param gamma_r: The local irreducible vertex :math:`\Gamma_{r}`. :param gchi0_q_inv: The inverse bare bubble :math:`(\chi_0^q)^{-1}` (core box). :param u_loc: The bare local interaction :math:`U`. :param v_nonloc: The non-local interaction :math:`V^{q}`. :return: The frequency-summed auxiliary susceptibility :math:`\sum_{\nu'}\chi^{*;q}_{r}` as a :class:`FourPoint`. """ return ( (gchi0_q_inv + 1.0 / config.sys.beta**2 * gamma_r) - 1.0 / config.sys.beta**2 * (v_nonloc.as_channel(gamma_r.channel) + u_loc.as_channel(gamma_r.channel)) ).invert_and_sum_over_last_vn(config.sys.beta)
[docs] def create_auxiliary_chi_r_q_sum_v2( gamma_r: LocalFourPoint, gchi0_q_inv: FourPoint, u_loc: LocalInteraction, v_nonloc: Interaction, mpi_dist_irrq: MpiDistributor, ) -> FourPoint: r""" Returns the sum over the auxiliary susceptibility, see Eq. (3.60) in my master's thesis. This variant loops over the rank-local q-points (capping peak memory to one q at a time) and uses the standard fused invert-and-sum per q (see :meth:`FourPoint.invert_and_sum_over_last_vn`), .. math:: \sum_{\nu'}\chi^{*;q\nu\nu'}_{r;abcd} = \sum_{\nu'}((\chi_{0;abcd}^{q\nu})^{-1} + (\Gamma_{r;abcd}^{\omega\nu\nu'}-U_{r;abcd}-V_{r;abcd}^q)/\beta^2)^{-1}. :param gamma_r: The local irreducible vertex :math:`\Gamma_{r}`. :param gchi0_q_inv: The inverse bare bubble :math:`(\chi_0^q)^{-1}` (core box). :param u_loc: The bare local interaction :math:`U`. :param v_nonloc: The non-local interaction :math:`V^{q}`. :param mpi_dist_irrq: MPI distributor over the irreducible BZ q-points (see :class:`MpiDistributor`). :return: The frequency-summed auxiliary susceptibility :math:`\sum_{\nu'}\chi^{*;q}_{r}` as a :class:`FourPoint`. """ irrk_q_list = config.lattice.q_grid.get_irrq_list() my_irr_q_list = irrk_q_list[mpi_dist_irrq.my_slice] chi_r_q_sum_mat = np.zeros_like(gchi0_q_inv.mat) for idx in range(len(my_irr_q_list)): chi_r_q_sum_mat[idx] = ( ( (gchi0_q_inv.filter_q_index(idx) + 1.0 / config.sys.beta**2 * gamma_r) - 1.0 / config.sys.beta**2 * (v_nonloc.as_channel(gamma_r.channel).filter_q_index(idx) + u_loc.as_channel(gamma_r.channel)) ) .invert_and_sum_over_last_vn(config.sys.beta) .mat ) return FourPoint(chi_r_q_sum_mat, gamma_r.channel, config.lattice.nq, 1, 1, False, has_compressed_q_dimension=True)
[docs] def create_auxiliary_chi_r_q_sum_v3( gamma_r: LocalFourPoint, gchi0_q_inv: FourPoint, u_loc: LocalInteraction, v_nonloc: Interaction, mpi_dist_irrq: MpiDistributor, ) -> FourPoint: r""" Returns the sum over the auxiliary susceptibility, see Eq. (3.60) in my master's thesis. This is the most memory-lean variant: it loops over the rank-local q-points and uses the highly memory-efficient linear-solver-based fused invert-and-sum per q (see :meth:`FourPoint.invert_and_sum_over_last_vn_v2`), .. math:: \sum_{\nu'}\chi^{*;q\nu\nu'}_{r;abcd} = \sum_{\nu'}((\chi_{0;abcd}^{q\nu})^{-1} + (\Gamma_{r;abcd}^{\omega\nu\nu'}-U_{r;abcd}-V_{r;abcd}^q)/\beta^2)^{-1}. :param gamma_r: The local irreducible vertex :math:`\Gamma_{r}`. :param gchi0_q_inv: The inverse bare bubble :math:`(\chi_0^q)^{-1}` (core box). :param u_loc: The bare local interaction :math:`U`. :param v_nonloc: The non-local interaction :math:`V^{q}`. :param mpi_dist_irrq: MPI distributor over the irreducible BZ q-points (see :class:`MpiDistributor`). :return: The frequency-summed auxiliary susceptibility :math:`\sum_{\nu'}\chi^{*;q}_{r}` as a :class:`FourPoint`. """ irrk_q_list = config.lattice.q_grid.get_irrq_list() my_irr_q_list = irrk_q_list[mpi_dist_irrq.my_slice] chi_r_q_sum_mat = np.zeros_like(gchi0_q_inv.mat) for idx in range(len(my_irr_q_list)): chi_r_q_sum_mat[idx] = ( ( (gchi0_q_inv.filter_q_index(idx) + 1.0 / config.sys.beta**2 * gamma_r) - 1.0 / config.sys.beta**2 * (v_nonloc.as_channel(gamma_r.channel).filter_q_index(idx) + u_loc.as_channel(gamma_r.channel)) ) .invert_and_sum_over_last_vn_v2(config.sys.beta) .mat ) return FourPoint(chi_r_q_sum_mat, gamma_r.channel, config.lattice.nq, 1, 1, False, has_compressed_q_dimension=True)
[docs] def create_vrg_r_q(gchi_aux_q_r_sum: FourPoint, gchi0_q_inv: FourPoint) -> FourPoint: r""" Returns the momentum-dependent three-leg vertex, see Eq. (3.63) in my master's thesis, :math:`\gamma_{r;abcd}^{q\nu} = \beta (\chi^{q\nu\nu}_{0;ablm})^{-1} (\sum_{\nu'} \chi^{*;q\nu\nu'}_{r;mlcd})`. :param gchi_aux_q_r_sum: The frequency-summed auxiliary susceptibility :math:`\sum_{\nu'}\chi^{*;q}_{r}`. :param gchi0_q_inv: The inverse bare bubble :math:`(\chi_0^q)^{-1}` (core box). :return: The three-leg vertex :math:`\gamma^q_{r}` (``vrg``) as a :class:`FourPoint`. """ return config.sys.beta * (gchi0_q_inv @ gchi_aux_q_r_sum)
[docs] def create_generalized_chi_q_with_shell_correction( gchi_aux_q_sum: FourPoint, gchi0_q_full_sum: FourPoint, gchi0_q_core_sum: FourPoint, u_loc: LocalInteraction, v_nonloc: Interaction, ) -> FourPoint: r""" Calculates the generalized susceptibility with the shell correction as described by Motoharu Kitatani et al. 2022 J. Phys. Mater. 5 034005; DOI 10.1088/2515-7639/ac7e6d. Eq. A.15. See also Sec. 3.7.2 in my master's thesis for details. :param gchi_aux_q_sum: The frequency-summed auxiliary susceptibility :math:`\sum_{\nu\nu'}\chi^{*;q}_{r}`. :param gchi0_q_full_sum: The frequency-summed bare bubble over the full box. :param gchi0_q_core_sum: The frequency-summed bare bubble over the core box. :param u_loc: The bare local interaction :math:`U`. :param v_nonloc: The non-local interaction :math:`V^{q}`. :return: The shell-corrected physical susceptibility :math:`\chi^{q}_{r}` as a :class:`FourPoint`. """ return ( (gchi_aux_q_sum + gchi0_q_full_sum - gchi0_q_core_sum).invert() + (u_loc.as_channel(gchi_aux_q_sum.channel) + v_nonloc.as_channel(gchi_aux_q_sum.channel)) ).invert()
[docs] def calculate_sigma_dc_kernel(f_dc_loc: LocalFourPoint, gchi0_q: FourPoint, u_loc: LocalInteraction) -> FourPoint: r""" Returns the double-counting kernel for the self-energy calculation, contracting the local full vertex with the momentum-dependent bubble per q-point. For details, see Eq. (4.28) in my master's thesis. :param f_dc_loc: The local full vertex :math:`F` used for the double-counting correction. :param gchi0_q: The momentum-dependent bare bubble :math:`\chi_0^q`. :param u_loc: The bare local interaction :math:`U`. :return: The double-counting kernel as a :class:`FourPoint`, cut to the core fermionic box. """ kernel = 1.0 / config.sys.beta**2 * u_loc.permute_orbitals("abcd->adcb") @ gchi0_q einsum_str = "abcdwv,dcefwvp->abefwp" path, _ = np.einsum_path(einsum_str, kernel.mat[0].copy(), f_dc_loc.mat, optimize="optimal") for q in range(kernel.current_shape[0]): kernel[q] = np.einsum(einsum_str, kernel[q].copy(), f_dc_loc.mat, optimize=path) return kernel.cut_niv(config.box.niv_core)
[docs] def calculate_kernel_r_q( vrg_q_r: FourPoint, gchi_aux_q_r_sum: FourPoint, v_nonloc: Interaction, u_loc: LocalInteraction ) -> FourPoint: r""" Returns the kernel for the self-energy calculation minus 2/3 times the identity if the channel is the magnetic channel (due to the extra factor of :math:`U_{ah21}` in Eq. (4.29) in my master's thesis), .. math:: K = \gamma_{r;abcd}^{q\nu} - \gamma_{r;abef}^{q\nu} U^{q}_{r;fehg} \chi_{r;ghcd}^{q}. :param vrg_q_r: The momentum-dependent three-leg vertex :math:`\gamma^q_{r}`. :param gchi_aux_q_r_sum: The (shell-corrected) physical susceptibility :math:`\chi^{q}_{r}`. :param v_nonloc: The non-local interaction :math:`V^{q}`. :param u_loc: The bare local interaction :math:`U`. :return: The self-energy kernel :math:`U_r K` as a :class:`FourPoint`. """ u_r = v_nonloc.as_channel(vrg_q_r.channel) + u_loc.as_channel(vrg_q_r.channel) kernel = vrg_q_r - vrg_q_r @ u_r @ gchi_aux_q_r_sum if vrg_q_r.channel == SpinChannel.MAGN: kernel -= 2.0 / 3.0 * FourPoint.identity_like(kernel) return u_r @ kernel
[docs] def perform_ornstein_zernike_fit(chi_phys_q_r: FourPoint) -> None: r""" Fits the static (:math:`\omega = 0`) physical susceptibility to an Ornstein-Zernike form :math:`\chi(q) = A / (\xi^{-2} + (q - q_0)^2)` around the antiferromagnetic wave vector :math:`q_0 = (\pi, \pi, 0)`, per orbital combination, and writes the amplitude :math:`A` and correlation length :math:`\xi` to ``oz_coeff.txt``. Non-converging fits are flagged with ``[-1, -1]``. :param chi_phys_q_r: The momentum-dependent physical susceptibility :math:`\chi^{q}_{r}` (irreducible BZ). :return: None. """ def oz_spin_w0(q_grid: KGrid, a: float, xi: float): r""" Evaluates the Ornstein-Zernike model on the full BZ grid, flattened to match the fit data. :param q_grid: The :class:`KGrid` providing the momentum coordinates. :param a: The amplitude :math:`A`. :param xi: The correlation length :math:`\xi`. :return: The flattened model susceptibility over the BZ grid. """ qx = qy = np.pi qz = 0 oz = a / ( xi ** (-2) + (q_grid.kx[:, None, None] - qx) ** 2 + (q_grid.ky[None, :, None] - qy) ** 2 + (q_grid.kz[None, None, :] - qz) ** 2 ) return oz.flatten() def fit_oz_spin(q_grid: KGrid, mat: np.ndarray): """ Least-squares fits the Ornstein-Zernike model to one orbital slice of the susceptibility. :param q_grid: The :class:`KGrid` providing the momentum coordinates. :param mat: The flattened susceptibility slice to fit. :return: The fitted ``(A, xi)`` coefficients. """ initial_guess = (mat.max(), 2.0) return opt.curve_fit(oz_spin_w0, q_grid, mat, p0=initial_guess)[0] chi = deepcopy(chi_phys_q_r) chi_mat = chi.map_to_full_bz(config.lattice.q_grid).to_half_niw_range().take_first_wn().mat.real orb_shape = (config.sys.n_bands,) * 4 oz_coeffs = np.zeros(orb_shape + (2,), dtype=float) failed_orbitals = [] for idx in np.ndindex(orb_shape): mat_slice = chi_mat[..., idx[0], idx[1], idx[2], idx[3]].flatten() try: coeffs = fit_oz_spin(config.lattice.q_grid, mat_slice) if not np.all(mat_slice == 0) else [0.0, 0.0] except (ValueError, RuntimeError, opt.OptimizeWarning): failed_orbitals.append(idx) coeffs = [-1.0, -1.0] oz_coeffs[idx] = coeffs if failed_orbitals: one_based = [tuple(o + 1 for o in idx) for idx in failed_orbitals] config.logger.warning( f"OZ fit did not converge for {len(failed_orbitals)} orbital combination(s): " f"{one_based}. Using [-1, -1]." ) rows = [] for idx in np.ndindex(orb_shape): rows.append([*idx, *oz_coeffs[idx]]) data_to_save = np.array(rows, dtype=float) path = os.path.join(config.output.output_path, f"oz_coeff.txt") np.savetxt(path, data_to_save, delimiter=",", fmt="%d %d %d %d %.9f %.9f", header="o1 o2 o3 o4 A xi")
[docs] def calculate_and_save_chi_q_r_rpa( gchi0_q_core_inv: FourPoint, u_loc: LocalInteraction, v_nonloc: Interaction, mpi_dist_irrk: MpiDistributor ): r""" Calculates and saves the RPA susceptibility (for both density and magnetic channels) from the DMFT Green's functions, :math:`\chi_{d/m;\mathrm{RPA}} = \chi_0 (1 + U_{d/m}\chi_0)^{-1} = (\chi_0^{-1} + U_{d/m})^{-1}`. The result is gathered to rank 0 and written to file. :param gchi0_q_core_inv: The inverse bare bubble :math:`(\chi_0^q)^{-1}` (core box). :param u_loc: The bare local interaction :math:`U`. :param v_nonloc: The non-local interaction :math:`V^{q}`. :param mpi_dist_irrk: MPI distributor over the irreducible BZ q-points (see :class:`MpiDistributor`). :return: None. """ for channel in [SpinChannel.DENS, SpinChannel.MAGN]: u_r = u_loc.as_channel(channel) + v_nonloc.as_channel(channel) chi_rpa_q_r = (gchi0_q_core_inv + u_r).invert(False).sum_over_all_vn(config.sys.beta) chi_rpa_q_r.mat = mpi_dist_irrk.gather(chi_rpa_q_r.mat) if mpi_dist_irrk.my_rank == 0: chi_rpa_q_r.save(name=f"chi_rpa_q_{channel.value}", output_dir=config.output.output_path) chi_rpa_q_r.free() config.logger.info(f"Calculated RPA susceptibility ({channel.value}).")
[docs] def calculate_sigma_kernel_r_q( gamma_r: LocalFourPoint, gchi0_q_inv: FourPoint, gchi0_q_full_sum: FourPoint, gchi0_q_core_sum: FourPoint, u_loc: LocalInteraction, v_nonloc: Interaction, mpi_dist_irrq: MpiDistributor, ) -> FourPoint: r""" Returns the kernel for the self-energy calculation in a specific spin channel. Calculates the auxiliary susceptibility, the three-leg vertex and the physical susceptibility with shell correction. Also performs a :math:`\lambda`-correction on the physical susceptibility if specified in the config for single-band input. Saves the physical susceptibility (and, if Eliashberg is enabled, the intermediate vertices) to file. :param gamma_r: The local irreducible vertex :math:`\Gamma_{r}`. :param gchi0_q_inv: The inverse bare bubble :math:`(\chi_0^q)^{-1}` (core box). :param gchi0_q_full_sum: The frequency-summed bare bubble over the full box. :param gchi0_q_core_sum: The frequency-summed bare bubble over the core box. :param u_loc: The bare local interaction :math:`U`. :param v_nonloc: The non-local interaction :math:`V^{q}`. :param mpi_dist_irrq: MPI distributor over the irreducible BZ q-points (see :class:`MpiDistributor`). :return: The self-energy kernel for this channel as a :class:`FourPoint`. """ logger = config.logger if config.memory.save_memory_for_chiq_aux: gchi_aux_q_r_sum = create_auxiliary_chi_r_q_sum_v3(gamma_r, gchi0_q_inv, u_loc, v_nonloc, mpi_dist_irrq) else: gchi_aux_q_r_sum = create_auxiliary_chi_r_q_sum_v1(gamma_r, gchi0_q_inv, u_loc, v_nonloc) mpi_dist_irrq.barrier() logger.log_memory_usage( f"Gchi_aux ({gchi_aux_q_r_sum.channel.value})", gchi_aux_q_r_sum, mpi_dist_irrq.comm.size * 2 * config.box.niv_core, ) logger.info(f"Non-Local auxiliary susceptibility ({gchi_aux_q_r_sum.channel.value}) calculated.") vrg_q_r = create_vrg_r_q(gchi_aux_q_r_sum, gchi0_q_inv) logger.info(f"Non-local three-leg vertex gamma^wv ({vrg_q_r.channel.value}) done.") logger.log_memory_usage(f"Three-leg vertex ({vrg_q_r.channel.value})", vrg_q_r, mpi_dist_irrq.comm.size) if config.eliashberg.perform_eliashberg: vrg_q_r.save( name=f"vrg_q_{vrg_q_r.channel.value}_rank_{mpi_dist_irrq.comm.rank}", output_dir=config.output.eliashberg_path, ) chi_phys_q_r = gchi_aux_q_r_sum.sum_over_all_vn(config.sys.beta) gchi_aux_q_r_sum.free() chi_phys_q_r = create_generalized_chi_q_with_shell_correction( chi_phys_q_r, gchi0_q_full_sum, gchi0_q_core_sum, u_loc, v_nonloc ) logger.info(f"Updated non-local susceptibility chi^q ({chi_phys_q_r.channel.value}) with asymptotic correction.") if config.self_consistency.restrict_chi_phys: logger.warning("Restricting physical susceptibility to positive values.") chi_phys_q_r = chi_phys_q_r.invert() chi_phys_q_r.mat[chi_phys_q_r.mat < 0] = 1e-4 chi_phys_q_r = chi_phys_q_r.invert() logger.log_memory_usage( f"Physical susceptibility ({chi_phys_q_r.channel.value})", chi_phys_q_r, mpi_dist_irrq.comm.size ) chi_phys_q_r.mat = mpi_dist_irrq.gather(chi_phys_q_r.mat) if mpi_dist_irrq.comm.rank == 0: if config.lambda_correction.perform_lambda_correction: chi_phys_q_r = perform_lambda_correction(chi_phys_q_r) chi_phys_q_r.save(name=f"chi_phys_q_{chi_phys_q_r.channel.value}", output_dir=config.output.output_path) # perform Ornstein-Zernike fit if chi_phys_q_r.channel == SpinChannel.MAGN: perform_ornstein_zernike_fit(chi_phys_q_r) chi_phys_q_r.mat = mpi_dist_irrq.scatter(chi_phys_q_r.mat) logger.info(f"Saved physical susceptibility ({chi_phys_q_r.channel.value}) to file.") if config.eliashberg.perform_eliashberg: chi_phys_q_r.save( name=f"gchi_aux_q_{chi_phys_q_r.channel.value}_sum_rank_{mpi_dist_irrq.comm.rank}", output_dir=config.output.eliashberg_path, ) return calculate_kernel_r_q(vrg_q_r, chi_phys_q_r, v_nonloc, u_loc)
[docs] def perform_lambda_correction(chi_phys_q_r: FourPoint) -> FourPoint: r""" Performs the :math:`\lambda`-correction on the physical susceptibility. If 'spch' is specified, the lambda correction will be performed on both the density and magnetic channel whereas only the magnetic channel will be corrected if 'sp' is specified as :math:`\lambda`-correction type in the corresponding config. The local susceptibility sum-rule target is read from the saved local susceptibilities, and the determined :math:`\lambda` is appended to a text file. :param chi_phys_q_r: The momentum-dependent physical susceptibility :math:`\chi^{q}_{r}` to correct. :return: The :math:`\lambda`-corrected physical susceptibility (unchanged for the 'sp' type in non-magnetic channels). :raises ValueError: If the configured lambda-correction type is neither 'spch' nor 'sp'. """ logger = config.logger if config.lambda_correction.type.lower() not in ["spch", "sp"]: raise ValueError("Lambda correction type must be either 'spch' or 'sp'.") logger.info(f"Lambda correction type set to '{config.lambda_correction.type}'.") if config.lambda_correction.type.lower() == "spch": logger.info(f"Performing lambda correction for {chi_phys_q_r.channel.value} channel.") chi_r_loc = LocalFourPoint.load( os.path.join(config.output.output_path, f"chi_{chi_phys_q_r.channel.value}_loc.npy"), chi_phys_q_r.channel, num_vn_dimensions=0, ).to_full_niw_range() chi_phys_q_r, lambda_r = lc.perform_single_lambda_correction( chi_phys_q_r, chi_r_loc.mat.sum() / config.sys.beta ) chi_r_loc.free() logger.info( f"Lambda correction for the {chi_phys_q_r.channel.value} channel applied with lambda = {lambda_r:.6f}." ) with open(os.path.join(config.output.output_path, f"lambda_{config.lambda_correction.type}.txt"), "a") as f: f.write(f"lambda_{chi_phys_q_r.channel.value}: {lambda_r}\n") return chi_phys_q_r # else: "sp" if chi_phys_q_r.channel != SpinChannel.MAGN: return chi_phys_q_r logger.info(f"Performing lambda correction for magn channel.") chi_phys_q_dens = FourPoint.load( os.path.join(config.output.output_path, f"chi_phys_q_dens.npy"), SpinChannel.DENS, num_vn_dimensions=0, ).to_full_niw_range() chi_dens_loc, chi_magn_loc = [ LocalFourPoint.load( os.path.join(config.output.output_path, f"chi_{channel.value}_loc.npy"), channel, num_vn_dimensions=0, ).to_full_niw_range() for channel in [SpinChannel.DENS, SpinChannel.MAGN] ] chi_magn_loc_sum = (chi_dens_loc.mat + chi_magn_loc.mat).sum() - 1 / config.lattice.q_grid.nk_tot * ( config.lattice.q_grid.irrk_count[:, None, None, None, None, None] * chi_phys_q_dens.mat ).sum() chi_phys_q_r, lambda_r = lc.perform_single_lambda_correction(chi_phys_q_r, chi_magn_loc_sum / config.sys.beta) logger.info(f"Lambda correction 'sp' applied. Lambda for magn channel is: {lambda_r:.6f}.") with open(os.path.join(config.output.output_path, f"lambda_{config.lambda_correction.type}.txt"), "a") as f: f.write(f"lambda_{chi_phys_q_r.channel.value}: {lambda_r}\n") return chi_phys_q_r
[docs] def calculate_sigma_from_kernel(kernel: FourPoint, giwk: GreensFunction, my_full_q_list: np.ndarray) -> SelfEnergy: r""" Returns :math:`\Sigma_{ij}^{k} = -\frac{1}{2\beta N_q} \sum_q U^q_{r;aibc} K_{r;cbjd}^{q\nu} G_{ad}^{\omega-\nu}`. For very large momentum grids, this function is the slowest part compared to the rest of the code due to the repeated loops. Potential speed-ups could be achieved by batching the q-points or using numba. :param kernel: The self-energy kernel :math:`K` (full BZ, scattered across ranks). :param giwk: The momentum-dependent :class:`GreensFunction`. :param my_full_q_list: Array of integer q-point index triplets handled by this rank. :return: The rank-local contribution to the non-local :class:`SelfEnergy` (compressed q, full niv range). """ mat = np.zeros( (*config.lattice.k_grid.nk, config.sys.n_bands, config.sys.n_bands, config.box.niv_core), dtype=kernel.mat.dtype, ) kernel = kernel.to_full_niw_range() wn = MFHelper.wn(config.box.niw_core) path = np.einsum_path("aijdv,xyzadv->xyzijv", kernel[0, ..., 0, :], mat, optimize=True)[1] for idx_q, q in enumerate(my_full_q_list): shifted_mat = np.roll(giwk.mat, [-i for i in q], axis=(0, 1, 2)) for idx_w, wn_i in enumerate(wn): g_qk = shifted_mat[..., giwk.niv - wn_i : giwk.niv + config.box.niv_core - wn_i] k_slice = kernel[idx_q, ..., idx_w, config.box.niv_core :] mat += np.einsum("aijdv,xyzadv->xyzijv", k_slice, g_qk, optimize=path) mat *= -0.5 / config.sys.beta / config.lattice.q_grid.nk_tot return SelfEnergy(mat, config.lattice.nk, False, beta=config.sys.beta).compress_q_dimension().to_full_niv_range()
[docs] def calculate_sigma_from_kernel_cpu( kernel: FourPoint, giwk: GreensFunction, my_full_q_list: np.ndarray, ) -> SelfEnergy: r""" Returns :math:`\Sigma_{ij}^{k} = -\frac{1}{2\beta N_q} \sum_q U^q_{r;aibc} K_{r;cbjd}^{q\nu} G_{ad}^{\omega-\nu}`. For very large momentum grids, this function is the slowest part compared to the rest of the code due to the repeated loops. There is no real way to speed it up further without leveraging GPUs or other hardware accelerators. This is the CPU implementation (Fortran-ordered buffers, preallocated accumulator). :param kernel: The self-energy kernel :math:`K` (full BZ, scattered across ranks). :param giwk: The momentum-dependent :class:`GreensFunction`. :param my_full_q_list: Array of integer q-point index triplets handled by this rank. :return: The rank-local contribution to the non-local :class:`SelfEnergy` (compressed q, full niv range). """ nkx, nky, nkz = config.lattice.k_grid.nk nb = config.sys.n_bands niv_core = config.box.niv_core mat = np.zeros((nkx, nky, nkz, nb, nb, niv_core), dtype=kernel.mat.dtype) wn = MFHelper.wn(config.box.niw_core) giwk_mat = np.asfortranarray(giwk.mat) kernel = np.asfortranarray(kernel.to_full_niw_range().mat[..., niv_core:]) kxs, kys, kzs = np.arange(nkx), np.arange(nky), np.arange(nkz) kx_indices = [((kxs + q[0]) % nkx) for q in my_full_q_list] ky_indices = [((kys + q[1]) % nky) for q in my_full_q_list] kz_indices = [((kzs + q[2]) % nkz) for q in my_full_q_list] acc = np.empty((nkx, nky, nkz, nb, nb, niv_core), dtype=mat.dtype) for iq in range(len(my_full_q_list)): g_q_view = giwk_mat[ kx_indices[iq][:, None, None], ky_indices[iq][None, :, None], kz_indices[iq][None, None, :], ... ] for iw, w in enumerate(wn): g_slice = g_q_view[..., giwk.niv - w : giwk.niv + niv_core - w] k_slice = kernel[iq, ..., iw, :] np.einsum("xyzadv,aijdv->xyzijv", g_slice, k_slice, out=acc, optimize=True) np.add(mat, acc, out=mat) mat *= -0.5 / config.sys.beta / config.lattice.q_grid.nk_tot return ( SelfEnergy(np.ascontiguousarray(mat), config.lattice.nk, False, beta=config.sys.beta) .compress_q_dimension() .to_full_niv_range() )
[docs] def calculate_sigma_from_kernel_gpu( kernel: FourPoint, giwk: GreensFunction, my_full_q_list: np.ndarray, ) -> SelfEnergy: r""" Returns :math:`\Sigma_{ij}^{k} = -\frac{1}{2\beta N_q} \sum_q U^q_{r;aibc} K_{r;cbjd}^{q\nu} G_{ad}^{\omega-\nu}`. For very large momentum grids, this function is the slowest part compared to the rest of the code due to the repeated loops. This is the GPU implementation using CuPy. :param kernel: The self-energy kernel :math:`K` (full BZ, scattered across ranks). :param giwk: The momentum-dependent :class:`GreensFunction`. :param my_full_q_list: Array of integer q-point index triplets handled by this rank. :return: The rank-local contribution to the non-local :class:`SelfEnergy` (compressed q, full niv range). """ import cupy as cp nkx, nky, nkz = config.lattice.k_grid.nk nb = config.sys.n_bands niv_core = config.box.niv_core mat_gpu = cp.zeros((nkx, nky, nkz, nb, nb, niv_core), dtype=kernel.mat.dtype, order="F") wn = MFHelper.wn(config.box.niw_core) giwk_mat = cp.asarray(giwk.mat, order="F") kernel = cp.asarray(kernel.to_full_niw_range().mat, order="F")[..., niv_core:] kxs, kys, kzs = cp.arange(nkx), cp.arange(nky), cp.arange(nkz) kx_indices = [((kxs + q[0]) % nkx) for q in my_full_q_list] ky_indices = [((kys + q[1]) % nky) for q in my_full_q_list] kz_indices = [((kzs + q[2]) % nkz) for q in my_full_q_list] for iq in range(len(my_full_q_list)): g_q_view = giwk_mat[ kx_indices[iq][:, None, None], ky_indices[iq][None, :, None], kz_indices[iq][None, None, :], ... ] for iw, w in enumerate(wn): g_slice = g_q_view[..., giwk.niv - w : giwk.niv + niv_core - w] k_slice = kernel[iq, ..., iw, :] mat_gpu += cp.einsum("xyzadv,aijdv->xyzijv", g_slice, k_slice, optimize=True) mat_gpu *= -0.5 / config.sys.beta / config.lattice.q_grid.nk_tot return ( SelfEnergy(np.ascontiguousarray(cp.asnumpy(mat_gpu)), config.lattice.nk, False, beta=config.sys.beta) .compress_q_dimension() .to_full_niv_range() )
[docs] def calculate_sigma_from_kernel_auto( mpi_distributor: MpiDistributor, kernel: FourPoint, giwk: GreensFunction, my_full_q_list: np.ndarray ) -> SelfEnergy: r""" Dispatches the q-loop self-energy contraction to the GPU (:func:`calculate_sigma_from_kernel_gpu`) when CuPy and a usable CUDA device are available (one GPU per MPI rank, round-robin), otherwise falls back to the CPU implementation (:func:`calculate_sigma_from_kernel_cpu`). :param mpi_distributor: MPI distributor used to choose the per-rank GPU (see :class:`MpiDistributor`). :param kernel: The self-energy kernel :math:`K` (full BZ, scattered across ranks). :param giwk: The momentum-dependent :class:`GreensFunction`. :param my_full_q_list: Array of integer q-point index triplets handled by this rank. :return: The rank-local contribution to the non-local :class:`SelfEnergy`. """ logger = config.logger cp = None try: import cupy as cp except ImportError: pass # CuPy not installed -> CPU n_gpus = 0 if cp is not None: try: n_gpus = cp.cuda.runtime.getDeviceCount() except cp.cuda.runtime.CUDARuntimeError: n_gpus = 0 # no usable CUDA driver/device -> CPU if n_gpus > 0 and cp.cuda.is_available(): logger.info(f"CuPy detected {n_gpus} GPU(s). Using GPU acceleration for self-energy calculation.") gpu_id = mpi_distributor.my_rank % n_gpus cp.cuda.Device(gpu_id).use() return calculate_sigma_from_kernel_gpu(kernel, giwk, my_full_q_list) return calculate_sigma_from_kernel_cpu(kernel, giwk, my_full_q_list)
[docs] def calculate_sigma_from_kernel_fft_cpu( mpi_dist: MpiDistributor, kernel: FourPoint, giwk: GreensFunction, niw_index_w_pairs: list[tuple[int, int]] ) -> SelfEnergy: r""" Computes the self-energy using distributed FFTs (CPU). Replaces the q-loop with a real-space pointwise multiplication: both the Green's function and the kernel are FFT'd to real space (the kernel to :math:`-R` via the conjugate trick), contracted pointwise per rank-local R-slice, and accumulated. Returns :math:`\Sigma` in R-space, positive-:math:`\nu` half only; the caller must ifft over :math:`(k_x, k_y, k_z)` and then call :meth:`SelfEnergy.to_full_niv_range` before use. This contracts a **single** bosonic-frequency half (positive-w *or* negative-w) so the full-BZ kernel is never materialized over the full niw range (see :meth:`LocalNPoint.to_negative_niw_range`): the kernel is consumed in whatever niw orientation it is handed (no internal ``to_full_niw_range``), and ``niw_index_w_pairs`` selects which kernel w-slices to contract and how to shift the Green's function. :param mpi_dist: MPI distributor providing the communicator and R-space pencil decomposition. :param kernel: The self-energy kernel :math:`K` for one niw half (full BZ): the positive half (``w >= 0``) or the negative block from :meth:`LocalNPoint.to_negative_niw_range` (``w = 0, -1, ..., -niw``). :param giwk: The momentum-dependent :class:`GreensFunction`. :param niw_index_w_pairs: The ``(kernel_w_index, w)`` pairs to contract. The positive pass passes ``[(i, i) for i in range(niw + 1)]`` (``w = 0..+niw``), the negative pass ``[(i, -i) for i in range(1, niw + 1)]`` (``w = -1..-niw``, skipping the ``w = 0`` duplicate). :return: The rank-local R-space :class:`SelfEnergy` (compressed q, half niv range, moments not fitted). """ comm = mpi_dist.comm rank = comm.Get_rank() size = comm.Get_size() nkx, nky, nkz = config.lattice.k_grid.nk nk_tot = config.lattice.q_grid.nk_tot nb = config.sys.n_bands niv_core = config.box.niv_core beta = config.sys.beta # G(k) -> F[G](R), forward FFT, replicated on every rank g_r_mat = giwk.fft().mat # K(q) -> F[K](-R) via the conjugate trick: conj, fft, conj. The kernel is already a single niw half (positive # half, or the negative block), so there is no to_full_niw_range here -- the full-niw kernel is never built. kernel = kernel.to_half_niv_range() kernel.mat = np.conj(kernel.mat) kernel = mpi_utils.execute_distributed_fft(kernel, comm) kernel.mat = np.conj(kernel.mat) # Local R-space contraction; each rank owns a slice of R-points n_r_local = kernel.mat.shape[0] mat = np.zeros((n_r_local, nb, nb, niv_core), dtype=kernel.mat.dtype) acc = np.empty_like(mat) my_r_indices = mpi_utils.get_pencil_indices(rank, size, (nkx, nky, nkz), "flat") g_r_local = g_r_mat.reshape(nk_tot, nb, nb, -1)[my_r_indices] path = None for idx, w in niw_index_w_pairs: g_slice = g_r_local[..., giwk.niv - w : giwk.niv + niv_core - w] k_slice = kernel.mat[..., idx, :] if path is None: # slice shapes are identical across the loop -> compute the contraction path once path = np.einsum_path("Radv,Raijdv->Rijv", g_slice, k_slice, optimize="optimal")[0] np.einsum("Radv,Raijdv->Rijv", g_slice, k_slice, out=acc, optimize=path) np.add(mat, acc, out=mat) mat *= -0.5 / beta / nk_tot return SelfEnergy( np.ascontiguousarray(mat), config.lattice.nk, full_niv_range=False, has_compressed_q_dimension=True, calc_smom=False, beta=config.sys.beta, )
[docs] def calculate_sigma_from_kernel_fft_gpu( mpi_dist: MpiDistributor, kernel: FourPoint, giwk: GreensFunction, niw_index_w_pairs: list[tuple[int, int]] ) -> SelfEnergy: r""" Computes the self-energy using distributed FFTs, running on the GPU (CuPy). Same algorithm as :func:`calculate_sigma_from_kernel_fft_cpu` (including the single-niw-half / ``niw_index_w_pairs`` contraction). Returns :math:`\Sigma` in R-space, positive-:math:`\nu` half only; the caller must ifft over :math:`(k_x, k_y, k_z)` and then call :meth:`SelfEnergy.to_full_niv_range` before use. :param mpi_dist: MPI distributor providing the communicator and R-space pencil decomposition. :param kernel: The self-energy kernel :math:`K` for one niw half (full BZ): the positive half or the negative block from :meth:`LocalNPoint.to_negative_niw_range`. :param giwk: The momentum-dependent :class:`GreensFunction`. :param niw_index_w_pairs: The ``(kernel_w_index, w)`` pairs to contract (see :func:`calculate_sigma_from_kernel_fft_cpu`). :return: The rank-local R-space :class:`SelfEnergy` (compressed q, half niv range, moments not fitted). """ import cupy as cp comm = mpi_dist.comm rank = comm.Get_rank() size = comm.Get_size() nkx, nky, nkz = config.lattice.k_grid.nk nk_tot = config.lattice.q_grid.nk_tot nb = config.sys.n_bands niv_core = config.box.niv_core beta = config.sys.beta # G(k) -> F[G](R), forward FFT, replicated on every rank g_r_mat = cp.asarray(giwk.fft().mat) # K(q) -> F[K](-R) via the conjugate trick: conj, fft, conj. The kernel is already a single niw half, so there is # no to_full_niw_range here -- the full-niw kernel is never built. kernel = kernel.to_half_niv_range() kernel.mat = np.conj(kernel.mat) kernel = mpi_utils.execute_distributed_fft(kernel, comm) kernel.mat = cp.conj(cp.asarray(kernel.mat)) # Local R-space contraction; each rank owns a slice of R-points n_r_local = kernel.mat.shape[0] mat = cp.zeros((n_r_local, nb, nb, niv_core), dtype=kernel.mat.dtype) my_r_indices = mpi_utils.get_pencil_indices(rank, size, (nkx, nky, nkz), "flat") g_r_local = g_r_mat.reshape(nk_tot, nb, nb, -1)[cp.asarray(my_r_indices)] for idx, w in niw_index_w_pairs: g_slice = g_r_local[..., giwk.niv - w : giwk.niv + niv_core - w] k_slice = kernel.mat[..., idx, :] mat += cp.einsum("Radv,Raijdv->Rijv", g_slice, k_slice, optimize=True) mat *= -0.5 / beta / nk_tot return SelfEnergy( np.ascontiguousarray(cp.asnumpy(mat)), config.lattice.nk, full_niv_range=False, has_compressed_q_dimension=True, calc_smom=False, beta=config.sys.beta, )
[docs] def select_sigma_fft_device(mpi_distributor: MpiDistributor) -> bool: """ Detects whether CuPy and a usable CUDA device are available (one GPU per MPI rank, round-robin), selects this rank's GPU and logs the decision **once**. Intended to be called a single time before the positive- and negative-w FFT passes so the GPU-detection message is not emitted per pass. :param mpi_distributor: MPI distributor providing the per-rank GPU choice. :return: True if the GPU implementation should be used, False to fall back to the CPU implementation. """ cp = None try: import cupy as cp except ImportError: return False # CuPy not installed -> CPU n_gpus = 0 try: n_gpus = cp.cuda.runtime.getDeviceCount() except cp.cuda.runtime.CUDARuntimeError: n_gpus = 0 # no usable CUDA driver/device -> CPU if n_gpus > 0 and cp.cuda.is_available(): config.logger.info(f"CuPy detected {n_gpus} GPU(s). Using GPU acceleration for self-energy calculation.") cp.cuda.Device(mpi_distributor.my_rank % n_gpus).use() return True return False
[docs] def calculate_sigma_from_kernel_fft( mpi_dist: MpiDistributor, kernel: FourPoint, giwk: GreensFunction, niw_index_w_pairs: list[tuple[int, int]], use_gpu: bool, ) -> SelfEnergy: r""" Dispatches one bosonic-frequency FFT pass to the GPU (:func:`calculate_sigma_from_kernel_fft_gpu`) or CPU (:func:`calculate_sigma_from_kernel_fft_cpu`) implementation according to ``use_gpu`` (decided once by :func:`select_sigma_fft_device`, so no per-pass GPU-detection logging). :param mpi_dist: MPI distributor providing the communicator and R-space pencil decomposition. :param kernel: The self-energy kernel :math:`K` for one niw half (full BZ). :param giwk: The momentum-dependent :class:`GreensFunction`. :param niw_index_w_pairs: The ``(kernel_w_index, w)`` pairs to contract (see :func:`calculate_sigma_from_kernel_fft_cpu`). :param use_gpu: Whether to run the GPU implementation (as decided by :func:`select_sigma_fft_device`). :return: The rank-local R-space :class:`SelfEnergy`. """ if use_gpu: return calculate_sigma_from_kernel_fft_gpu(mpi_dist, kernel, giwk, niw_index_w_pairs) return calculate_sigma_from_kernel_fft_cpu(mpi_dist, kernel, giwk, niw_index_w_pairs)
def _map_kernel_to_full_bz( kernel: FourPoint, mpi_dist_irrk: MpiDistributor, mpi_dist_fullbz: MpiDistributor ) -> FourPoint: """ Maps the self-energy kernel from the irreducible to the full Brillouin zone, choosing the gather/scatter routine or the fully distributed peer-to-peer routine according to ``config.memory.save_memory_for_chiq_aux``. :param kernel: The self-energy kernel on the irreducible BZ. :param mpi_dist_irrk: MPI distributor over the irreducible BZ q-points. :param mpi_dist_fullbz: MPI distributor over the full BZ q-points. :return: The kernel distributed over the full BZ. """ if not config.memory.save_memory_for_chiq_aux: return mpi_utils.map_irrbz_fullbz(kernel, mpi_dist_irrk, mpi_dist_fullbz) return mpi_utils.exchange_and_map_irrbz_fullbz(kernel, mpi_dist_irrk, mpi_dist_fullbz)
[docs] def get_starting_sigma(default_sigma: SelfEnergy) -> tuple[SelfEnergy, int]: """ Tries to retrieve the last calculated self-energy from a previous self-consistency calculation as a starting point for the next calculation. Whether the normal or interpolated sigma is chosen depends on the setting. If no ``sigma_dga_*_N.npy`` file is found, we use the DMFT self-energy as a starting point. :param default_sigma: The fallback (DMFT) :class:`SelfEnergy` used when no previous result is found. :return: A tuple of the starting :class:`SelfEnergy` (cut to the core box and interpolated onto the k-grid) and the iteration number it was taken from (0 if none found). """ previous_sc_path = config.self_consistency.previous_sc_path if previous_sc_path is None or previous_sc_path == "" or not os.path.exists(previous_sc_path): return default_sigma, 0 if config.self_consistency.use_interpolated_sigma: glob_pattern = "sigma_dga_interpolated_*_iteration_*.npy" iteration_regex = re.compile(r"sigma_dga_interpolated_.+_iteration_(\d+)\.npy$") else: glob_pattern = "sigma_dga_iteration_*.npy" iteration_regex = re.compile(r"sigma_dga_iteration_(\d+)\.npy$") files = glob.glob(os.path.join(previous_sc_path, glob_pattern)) if not files or len(files) == 0: return default_sigma, 0 iterations = [(int(match.group(1)), f) for f in files if (match := iteration_regex.search(f))] if not iterations or len(iterations) == 0: return default_sigma, 0 max_iter, max_file = max(iterations, key=lambda x: x[0]) mat = np.load(max_file) return ( SelfEnergy(mat, mat.shape[:3], True, False, beta=config.sys.beta) .cut_niv(config.box.niv_core) .interpolate_q_grid(config.lattice.k_grid.nk, False), max_iter, )
def _init_mu_history(starting_iter: int) -> list[float]: r""" Seeds the chemical-potential history for the self-consistency loop. For a fresh run (``starting_iter == 0``) the history starts at the current (DMFT) chemical potential :math:`\mu`. When resuming from a previous self-consistency calculation it is seeded with that run's last :math:`\mu` (from ``mu_history.npy``) and the global ``config.sys.mu`` is synced to it: otherwise ``config.sys.mu`` would stay at the DMFT value while ``giwk_full`` is built with the previous run's :math:`\mu`, and any quantity computed from the global (e.g. the lattice filling in :meth:`GreensFunction.get_fill_nonlocal`, which now reads ``self._mu``) would use an inconsistent chemical potential. :param starting_iter: The iteration the previous calculation stopped at (0 for a fresh run). :return: The single-element chemical-potential history list. """ if starting_iter == 0: return [config.sys.mu] previous_mu = float(np.load(os.path.join(config.self_consistency.previous_sc_path, "mu_history.npy"))[-1]) config.sys.mu = previous_mu return [previous_mu]
[docs] def read_last_n_sigmas_from_files(n: int, output_path: str = "./", previous_sc_path: str = "./") -> list[np.ndarray]: """ Reads the last ``n`` total self-energies from the output directory and - if specified - the previous self-consistency path. This is used for the Pulay/Anderson mixing schemes. If one has a history of self-energies from a previous calculation, these will be used as well. :param n: Number of most recent self-energies to read. :param output_path: Directory holding the current run's ``sigma_dga_iteration_*.npy`` files. :param previous_sc_path: Directory of a previous self-consistency run to prepend to the history (if set). :return: A list of self-energy arrays (cut to the core box and interpolated onto the k-grid), oldest first. """ def _get_top_n_files(path: str, pattern: str, regex: re.Pattern) -> list[tuple[int, str]]: """ Finds the ``n`` highest-iteration files in ``path`` matching ``pattern``/``regex``. :param path: Directory to search. :param pattern: Glob pattern selecting candidate files. :param regex: Regex whose first group captures the iteration number. :return: A list of ``(iteration, filepath)`` tuples, sorted ascending, truncated to the last ``n``. """ files = glob.glob(os.path.join(path, pattern)) matched = [(int(match.group(1)), f) for f in files if (match := regex.search(f))] return sorted(matched, key=lambda x: x[0])[-n:] interp_pattern = "sigma_dga_interpolated_*_iteration_*.npy" interp_regex = re.compile(r"sigma_dga_interpolated_.+_iteration_(\d+)\.npy$") normal_pattern = "sigma_dga_iteration_*.npy" normal_regex = re.compile(r"sigma_dga_iteration_(\d+)\.npy$") last_iterations_previous_dir = [] if previous_sc_path and previous_sc_path.strip(): if config.self_consistency.use_interpolated_sigma: last_iterations_previous_dir = _get_top_n_files(previous_sc_path, interp_pattern, interp_regex) else: last_iterations_previous_dir = _get_top_n_files(previous_sc_path, normal_pattern, normal_regex) last_iterations_current_dir = _get_top_n_files(output_path, normal_pattern, normal_regex) last_iterations = (last_iterations_previous_dir + last_iterations_current_dir)[-n:] sigmas = [] for _, file in last_iterations: sigma_mat = np.load(file) sigmas.append( SelfEnergy(sigma_mat, sigma_mat.shape[:3], True, False, False, False, beta=config.sys.beta) .cut_niv(config.box.niv_core) .interpolate_q_grid(config.lattice.k_grid.nk, False) .mat ) return sigmas
[docs] def calculate_self_energy_q( comm: MPI.Comm, u_loc: LocalInteraction, v_nonloc: Interaction, sigma_dmft: SelfEnergy, sigma_local: SelfEnergy ) -> SelfEnergy: r""" Runs the non-local DGA self-energy calculation. Calculates the Hartree- and Fock terms, the bubble, the double-counting correction and the kernel in the density and magnetic channel. Finally, calculates the non-local self-energy from the kernel and the Green's function. Also takes care of the self-consistency loop and the chemical potential adjustment as well as the self-energy mixing, etc. :param comm: The MPI communicator. :param u_loc: The bare local interaction :math:`U`. :param v_nonloc: The non-local interaction :math:`V^{q}`. :param sigma_dmft: The DMFT self-energy (used as the starting point and for the shell/tail correction). :param sigma_local: The locally recomputed self-energy (used for the double-counting :math:`\Delta\Sigma`). :return: The converged (or last-iteration) momentum-dependent DGA :class:`SelfEnergy`. """ logger = config.logger logger.info("Starting with non-local DGA routine.") logger.info("Initializing MPI distributor.") # MPI distributor for the irreducible BZ mpi_dist_irrk = MpiDistributor.create_distributor( ntasks=config.lattice.q_grid.nk_irr, comm=comm, name="Q", output_path=config.output.output_path ) full_q_list = config.lattice.q_grid.get_q_list() irrk_q_list = config.lattice.q_grid.get_irrq_list() my_irr_q_list = irrk_q_list[mpi_dist_irrk.my_slice] mpi_dist_fullbz = MpiDistributor.create_distributor( ntasks=config.lattice.q_grid.nk_tot, comm=comm, name="FBZ", output_path=config.output.output_path ) my_full_q_list = full_q_list[mpi_dist_fullbz.my_slice] sigma_old, starting_iter = get_starting_sigma(sigma_dmft) if starting_iter > 0: logger.info( f"Using previous calculation and starting the self-consistency loop at iteration {starting_iter + 1}." ) mu_history = _init_mu_history(starting_iter) niv_cut = min(config.box.niw_core + config.box.niv_full + 10, config.box.niv_dmft) sigma_dmft_full = deepcopy(sigma_dmft) if comm.rank == 0: giwk_full_dmft = GreensFunction.get_g_full( sigma_dmft_full, config.sys.mu_dmft, config.lattice.hamiltonian.get_ek(), config.sys.beta ) giwk_full_dmft.save(output_dir=config.output.output_path, name="g_latt_dmft") giwk_full_dmft.free() sigma_old = sigma_old.concatenate_self_energies(sigma_dmft_full) giwk_full = GreensFunction.get_g_full( sigma_old, mu_history[-1], config.lattice.hamiltonian.get_ek(), config.sys.beta ) config.sys.n, config.sys.occ, config.sys.occ_k = giwk_full.get_fill_nonlocal() giwk_full.cut_niv(niv_cut) config.sys.n, config.sys.occ, config.sys.occ_k = comm.bcast( (config.sys.n, config.sys.occ, config.sys.occ_k), root=0 ) sigma_old = sigma_old.cut_niv(niv_cut) sigma_dmft = sigma_dmft.cut_niv(niv_cut) if sigma_old.niv < niv_cut: sigma_old = sigma_old.concatenate_self_energies(sigma_dmft) delta_sigma = sigma_dmft.cut_niv(config.box.niv_core) - sigma_local.cut_niv(config.box.niv_core) v_nonloc_full = deepcopy(v_nonloc) v_nonloc = v_nonloc.reduce_q(my_irr_q_list) for current_iter in range(starting_iter + 1, starting_iter + config.self_consistency.max_iter + 1): logger.info("----------------------------------------") logger.info(f"Starting iteration {current_iter}.") logger.info("----------------------------------------") hartree, fock = get_hartree_fock(u_loc, v_nonloc_full, my_full_q_list) fock = mpi_dist_fullbz.allreduce(fock) logger.info("Calculated Hartree and Fock terms.") giwk_full = GreensFunction.get_g_full( sigma_old, mu_history[-1], config.lattice.hamiltonian.get_ek(), config.sys.beta ) logger.log_memory_usage("giwk", giwk_full, comm.size) if config.memory.save_memory_for_chi0q: gchi0_q = BubbleGenerator.create_generalized_chi0_q_auto( mpi_dist_irrk, giwk_full, config.box.niw_core, config.box.niv_full, my_irr_q_list, config.lattice.q_grid, config.sys.beta, config.logger, ) else: gchi0_q = BubbleGenerator.create_generalized_chi0_q_fft_auto( mpi_dist_irrk, giwk_full, config.box.niw_core, config.box.niv_full, config.lattice.k_grid, config.sys.beta, config.logger, ) if config.eliashberg.perform_eliashberg: gchi0_q.save(name=f"gchi0_q_rank_{comm.rank}", output_dir=config.output.output_path) logger.log_memory_usage("Gchi0_q_full", gchi0_q, comm.size) giwk_full = giwk_full.cut_niv(config.box.niv_core + config.box.niw_core) # sigma_old is not read again until the mixing step at the end of the iteration, and its shell # [niv_core, niv_cut) is always the (k-independent) DMFT self-energy. Shrink the full-grid sigma_old to its core # here -- freeing the broadcast shell through the kernel/SDE/fq peak -- and reconstruct the shell from sigma_dmft # just before mixing, which restores it to niv_cut bit-for-bit. sigma_old = sigma_old.cut_niv(config.box.niv_core) f_dc_loc = 2 * LocalFourPoint.load(os.path.join(config.output.output_path, "f_magn_loc.npy")).permute_orbitals( "abcd->cbad" ) kernel = -calculate_sigma_dc_kernel(f_dc_loc, gchi0_q, u_loc) f_dc_loc.free() logger.info("Calculated double-counting kernel.") gchi0_q_full_sum = 1.0 / config.sys.beta * gchi0_q.sum_over_all_vn(config.sys.beta) gchi0_q_core = gchi0_q.cut_niv(config.box.niv_core) gchi0_q.free() logger.log_memory_usage("Gchi0_q_core", gchi0_q_core, comm.size) gchi0_q_core_inv = deepcopy(gchi0_q_core).invert(False) logger.log_memory_usage("Gchi0_q_inv", gchi0_q_core_inv, comm.size) if current_iter == 1: calculate_and_save_chi_q_r_rpa(gchi0_q_core_inv, u_loc, v_nonloc, mpi_dist_irrk) if config.eliashberg.perform_eliashberg: gchi0_q_core_inv.save(name=f"gchi0_q_inv_rank_{comm.rank}", output_dir=config.output.eliashberg_path) gchi0_q_core_sum = 1.0 / config.sys.beta * gchi0_q_core.sum_over_all_vn(config.sys.beta) gchi0_q_core.free() gamma_dens = LocalFourPoint.load( os.path.join(config.output.output_path, "gamma_dens_loc.npy"), SpinChannel.DENS ) kernel += calculate_sigma_kernel_r_q( gamma_dens, gchi0_q_core_inv, gchi0_q_full_sum, gchi0_q_core_sum, u_loc, v_nonloc, mpi_dist_irrk ) gamma_dens.free() mpi_dist_irrk.barrier() logger.info("Calculated kernel for density channel.") gamma_magn = LocalFourPoint.load( os.path.join(config.output.output_path, "gamma_magn_loc.npy"), SpinChannel.MAGN ) kernel += 3 * calculate_sigma_kernel_r_q( gamma_magn, gchi0_q_core_inv, gchi0_q_full_sum, gchi0_q_core_sum, u_loc, v_nonloc, mpi_dist_irrk ) gchi0_q_core_inv.free() gchi0_q_full_sum.free() gchi0_q_core_sum.free() gamma_magn.free() logger.info("Calculated kernel for magnetic channel.") logger.info("Started calculation of DGA self-energy.") if config.memory.save_memory_for_sde: # q-loop path: the kernel is mapped to the full BZ once (the full niw range is handled inside the q-loop). kernel = _map_kernel_to_full_bz(kernel, mpi_dist_irrk, mpi_dist_fullbz) logger.info("Kernel mapped to full BZ.") sigma_new = calculate_sigma_from_kernel_auto(mpi_dist_fullbz, kernel, giwk_full, my_full_q_list) kernel.free(trim=True) # coarse per-iteration trim: return the full-BZ kernel peak to the OS sigma_new.mat = mpi_dist_fullbz.allreduce(sigma_new.mat) else: # FFT path: split the bosonic-frequency sum into a positive- and a negative-w pass so the full-BZ # kernel is only ever materialized over half the niw range AND only one niw half exists at a time. The # small irreducible-BZ kernel is kept and mapped to the full BZ separately for each pass; the negative # block's time-reversal is applied *after* the irreducible->full-BZ unfold (not before). The unfold applies # a per-k orbital rotation U (complex for multi-band) plus an optional conjugation (see # :meth:`IAmNonLocal._map_to_full_bz`), and ``to_negative_niw_range`` conjugates the data, so it must see # the already-unfolded kernel (conj of U*..U^T), exactly as the q-loop path and the original single-pass # implementation did. Flipping the irreducible-BZ kernel first would leave U un-conjugated and corrupt # sigma on the symmetry-folded q-points. niw = config.box.niw_core kernel_irr = kernel # the (small) irreducible-BZ positive-w kernel, mapped to the full BZ once per pass # Decide CPU/GPU (and select the GPU) once, so the detection is not logged for each of the two passes. use_gpu = select_sigma_fft_device(mpi_dist_fullbz) # positive pass: map a copy of the irr-BZ kernel to the full BZ and contract w = 0..+niw. The deep copy # keeps ``kernel_irr`` (small) intact for the negative pass, which maps it again -- so only a single # full-BZ niw half is ever resident. ``_map_kernel_to_full_bz`` may either mutate-and-return its argument # or return a fresh object, so free the source copy only when a distinct object came back. kernel_pos_irr = deepcopy(kernel_irr) kernel_pos = _map_kernel_to_full_bz(kernel_pos_irr, mpi_dist_irrk, mpi_dist_fullbz) if kernel_pos is not kernel_pos_irr: kernel_pos_irr.free() sigma_new = calculate_sigma_from_kernel_fft( mpi_dist_irrk, kernel_pos, giwk_full, [(i, i) for i in range(niw + 1)], use_gpu ) kernel_pos.free() # negative pass: map the irr-BZ kernel to the full BZ, time-reverse the full-BZ result (w = 0, -1, ..., # -niw), and contract w = -1..-niw (skip the w=0 duplicate). kernel_neg_full = _map_kernel_to_full_bz(kernel_irr, mpi_dist_irrk, mpi_dist_fullbz) if kernel_neg_full is not kernel_irr: kernel_irr.free() # the irr-BZ kernel is no longer needed once the full-BZ negative source exists kernel_neg = kernel_neg_full.to_negative_niw_range() kernel_neg_full.free() # release the full-BZ positive copy as soon as the negative block is built sigma_neg = calculate_sigma_from_kernel_fft( mpi_dist_irrk, kernel_neg, giwk_full, [(i, -i) for i in range(1, niw + 1)], use_gpu ) kernel_neg.free(trim=True) # coarse per-iteration trim: return the full-BZ kernel peak to the OS sigma_new.mat += sigma_neg.mat # accumulate the rank-local R-space partial self-energies (in place) sigma_neg.free() sigma_new.mat = mpi_dist_fullbz.gather(sigma_new.mat) if comm.rank == 0: sigma_new = sigma_new.ifft().to_full_niv_range() sigma_new = mpi_dist_fullbz.bcast_npoint(sigma_new) logger.info("Self-energy calculated from kernel.") logger.log_memory_usage("Non-local sigma", sigma_new, comm.size) sigma_new = sigma_new + hartree + fock logger.info("Full non-local self-energy calculated.") # This is done to minimize noise. We remove some fluctuations from dmft that are included in the local self-energy # calculated in this code and add the smooth dmft self-energy sigma_new += delta_sigma sigma_new = sigma_new.concatenate_self_energies(sigma_dmft) # delta_sigma = sigma_dmft.cut_niv(config.box.niv_core) - sigma_new.q_mean().cut_niv(config.box.niv_core) logger.info("Applying mixing strategy to the self-energy.") # Restore sigma_old's DMFT shell (cut after the bubble for memory) so the mixing and the residual below see the # full niv_cut self-energy exactly as before. sigma_old = sigma_old.concatenate_self_energies(sigma_dmft) sigma_new = apply_mixing_strategy(sigma_new, sigma_old, sigma_dmft, current_iter) sigma_new = sigma_new.compress_q_dimension() sigma_old = sigma_old.compress_q_dimension() # Canonical self-consistency residual. This is the relative (L2) residual used for the convergence check sigma_new_test = sigma_new.mat[..., sigma_new.niv : sigma_new.niv + config.box.niv_core] sigma_old_test = sigma_old.mat[..., sigma_new.niv : sigma_old.niv + config.box.niv_core] diff = (sigma_new_test - sigma_old_test).ravel() norm_x = np.linalg.norm(np.concatenate([sigma_old_test.real.ravel(), sigma_old_test.imag.ravel()])) relative_residual = np.linalg.norm(np.concatenate([diff.real, diff.imag])) / norm_x old_mu = mu_history[-1] if comm.rank == 0: config.sys.mu = update_mu( old_mu, config.sys.n, giwk_full.ek, sigma_new.mat, config.sys.beta, sigma_new.fit_smom()[0], logger=logger, ) config.sys.mu = comm.bcast(config.sys.mu) mu_history.append(config.sys.mu) logger.info(f"Updated mu from {old_mu} to {config.sys.mu}.") if comm.rank == 0: sigma_occ = deepcopy(sigma_new).concatenate_self_energies(sigma_dmft_full) giwk_occ = giwk_full.get_g_full( sigma_occ, config.sys.mu, config.lattice.hamiltonian.get_ek(), config.sys.beta ) # calculate new occupation matrix from new Green's function (outside asympt region it is the DMFT # lattice Green's function) _, config.sys.occ, config.sys.occ_k = giwk_occ.get_fill_nonlocal() # n should not change ekin = giwk_occ.get_ekin() logger.info(f"Kinetic energy: {ekin:.4f} [t or eV].") epot = giwk_occ.get_epot() logger.info(f"Potential energy: {epot:.4f} [t or eV].") logger.info(f"Total energy: {(ekin + epot):.4f} [t or eV].") config.sys.occ, config.sys.occ_k = comm.bcast((config.sys.occ, config.sys.occ_k), root=0) if config.self_consistency.max_iter > 1: logger.info("Updated occupation matrix from new Green's function.") if comm.rank == 0: sigma_new.decompress_q_dimension().save( name=f"sigma_dga_iteration_{current_iter}", output_dir=config.output.output_path ) logger.info(f"Saved sigma for iteration {current_iter}.") if config.self_energy_interpolation.do_interpolation: beta_target = config.self_energy_interpolation.beta_target niv_target = config.self_energy_interpolation.niv_target sigma_new.decompress_q_dimension().interpolate(beta_target, niv_target).save( name=f"sigma_dga_interpolated_beta{beta_target}_niv{niv_target}_iteration_{current_iter}", output_dir=config.output.output_path, ) logger.info( f"Interpolated sigma for iteration {current_iter} to beta={beta_target} and niv={niv_target}." ) logger.info("Checking self-consistency convergence.") if comm.rank == 0 and current_iter > starting_iter + 1: sigma_converged = abs(relative_residual) < config.self_consistency.epsilon logger.info( f"Self-energy convergence: {sigma_converged} " f"(relative residual={relative_residual:.3e}, epsilon={config.self_consistency.epsilon:.3e})." ) mu_converged = abs(mu_history[-1] - mu_history[-2]) < np.pi / (10 * config.sys.beta) logger.info(f"Chemical potential convergence: {mu_converged}.") converged = mu_converged and sigma_converged else: converged = False converged = comm.bcast(converged) sigma_old = sigma_new if converged: if config.self_consistency.restrict_chi_phys: logger.info( "ATTENTION: Self-consistency with restricted susceptibility reached. Disabling restriction." ) config.self_consistency.restrict_chi_phys = False else: logger.info(f"Self-consistency of sigma and mu reached at iteration {current_iter}.") break logger.info("Self-consistency not reached.") mpi_dist_irrk.delete_file() mpi_dist_fullbz.delete_file() np.save(os.path.join(config.output.output_path, "mu_history.npy"), mu_history) logger.info("Saved mu history as numpy array.") return sigma_old
[docs] def apply_mixing_strategy( sigma_new: SelfEnergy, sigma_old: SelfEnergy, sigma_dmft: SelfEnergy, current_iter: int ) -> SelfEnergy: """ Applies the self-energy mixing strategy for the self-consistency loop. Supports linear mixing as well as the accelerated Pulay (DIIS) and Anderson schemes (which use the self-energy history read from file); the accelerated schemes fall back to linear mixing when their least-squares problem is ill-conditioned or the history is too short. The mixing strategy and parameters are taken from the config. :param sigma_new: The freshly computed self-energy proposal. :param sigma_old: The previous iteration's self-energy. :param sigma_dmft: The DMFT self-energy (used to seed the proposal history for the accelerated schemes). :param current_iter: The current self-consistency iteration number. :return: The mixed :class:`SelfEnergy` for the next iteration. """ logger = config.logger n_hist = config.self_consistency.mixing_history_length alpha = config.self_consistency.mixing last_results, last_proposals = [], [] if config.self_consistency.mixing_strategy.lower() in ("pulay", "anderson"): last_results = read_last_n_sigmas_from_files( n_hist, config.output.output_path, config.self_consistency.previous_sc_path ) sigma_dmft_stacked = np.tile( sigma_dmft.cut_niv(config.box.niv_core).mat, (config.lattice.k_grid.nk_tot, 1, 1, 1) ) last_proposals = [sigma_dmft_stacked] + last_results # [dmft, s1, ..., s_{n-1}] last_results = last_results + [sigma_new.cut_niv(config.box.niv_core).mat] # [s1, s2, ..., s_n] logger.info(f"Loaded last {n_hist} self-energies from files.") accelerated_mixing_condition = current_iter > n_hist and len(last_results) > n_hist and len(last_proposals) > n_hist if config.self_consistency.mixing_strategy.lower() == "pulay" and accelerated_mixing_condition: shape = last_results[-1].shape n_total = int(np.prod(shape)) r_matrix = np.zeros((2 * n_total, n_hist), dtype=np.float64) f_matrix = np.zeros_like(r_matrix) f_i = np.zeros((2 * n_total), dtype=np.float64) def get_proposal(idx: int): """ Fetches a flattened proposal self-energy from the history. :param idx: Index into the proposal history. :return: The flattened proposal self-energy at ``idx``. """ return last_proposals[idx].flatten() def get_result(idx: int): """ Fetches a flattened result self-energy from the history. :param idx: Index into the result history. :return: The flattened result self-energy at ``idx``. """ return last_results[idx].flatten() for i in range(n_hist): proposal_diff = get_proposal(-1 - i) - get_proposal(-2 - i) r_matrix[:n_total, i] = proposal_diff.real r_matrix[n_total:, i] = proposal_diff.imag result_diff = get_result(-1 - i) - get_result(-2 - i) f_matrix[:n_total, i] = result_diff.real f_matrix[n_total:, i] = result_diff.imag f_matrix[:, i] -= r_matrix[:, i] # Residual: F(x_n) - x_n, where x_n = last_proposals[-1] = sigma_old (core window) iter_diff = get_result(-1) - get_proposal(-1) f_i[:n_total] = iter_diff.real f_i[n_total:] = iter_diff.imag norm_f = np.linalg.norm(f_i) # Solve min||F @ c - f_i|| via truncated-SVD pseudoinverse (drops collinear directions) u, s, vh = np.linalg.svd(f_matrix, full_matrices=False) cutoff = 1e-5 * (s[0] if s.size else 1.0) mask = s > cutoff if not np.any(mask): logger.warning("Pulay SVD ill-conditioned - falling back to linear mixing.") return alpha * sigma_new + (1 - alpha) * sigma_old coeffs = vh[mask].T @ ((u[:, mask].T @ f_i) / s[mask]) # Pulay update: x_{n+1} = x_n + alpha*f_i - (R + alpha*F) @ c update = alpha * f_i - (r_matrix + alpha * f_matrix) @ coeffs norm_u = np.linalg.norm(update) if norm_f > 0 and norm_u > 10.0 * norm_f: update *= 10.0 * norm_f / norm_u logger.warning(f"Pulay step clamped (norm_u={norm_u:.3e}, norm_f={norm_f:.3e}).") update = update[:n_total] + 1j * update[n_total:] # Update the new self energy niv = sigma_new.niv niv_core = config.box.niv_core sigma_new.mat[..., niv - niv_core : niv + niv_core] = get_proposal(-1).reshape(shape) + update.reshape(shape) logger.info(f"Pulay mixing applied (m={n_hist}, alpha={alpha:.3f}, norm_f={norm_f:.3e}).") return sigma_new if config.self_consistency.mixing_strategy.lower() == "anderson" and accelerated_mixing_condition: shape = last_results[-1].shape n_total = int(np.prod(shape)) flat = lambda x: x.reshape(-1) # Current residual f_n = F(x_n) - x_n f_curr = flat(last_results[-1]) - flat(last_proposals[-1]) f_vec = np.concatenate([f_curr.real, f_curr.imag]) norm_f = np.linalg.norm(f_vec) # Build dX and dF matrices (n_hist columns) # dX[:,i] = x_{n-i} - x_{n-i-1} (proposal differences) # dF[:,i] = f_{n-i} - f_{n-i-1} (residual differences) dx_cols = [] df_cols = [] for i in range(n_hist): dx = flat(last_proposals[-1 - i]) - flat(last_proposals[-2 - i]) dx_cols.append(np.concatenate([dx.real, dx.imag])) df_i = flat(last_results[-1 - i]) - flat(last_proposals[-1 - i]) df_im1 = flat(last_results[-2 - i]) - flat(last_proposals[-2 - i]) df = df_i - df_im1 df_cols.append(np.concatenate([df.real, df.imag])) dx_matrix = np.column_stack(dx_cols) # (2*n_total, n_hist) df_matrix = np.column_stack(df_cols) # (2*n_total, n_hist) # Anderson: solve min ||f_curr - dF @ c|| try: u, s, vh = np.linalg.svd(df_matrix, full_matrices=False) s_max = s[0] if len(s) > 0 else 1.0 cutoff = 1e-5 * s_max mask = s > cutoff if not np.any(mask): raise np.linalg.LinAlgError("All singular values below threshold.") s_reg = s[mask] / (s[mask] ** 2 + cutoff**2) coeffs = vh[mask].T @ (s_reg * (u[:, mask].T @ f_vec)) except np.linalg.LinAlgError: logger.warning("Anderson SVD failed - falling back to linear mixing.") return alpha * sigma_new + (1 - alpha) * sigma_old # Undamped Anderson proposal: x_n + f_n - (dX + dF) @ c x_n = flat(last_proposals[-1]) x_anderson = np.concatenate([x_n.real, x_n.imag]) + f_vec - (dx_matrix + df_matrix) @ coeffs x_anderson = x_anderson[:n_total] + 1j * x_anderson[n_total:] # Damp between old proposal and Anderson proposal x_n_complex = x_n candidate = (1 - alpha) * x_n_complex + alpha * x_anderson.reshape(-1) # Safety clamp update = candidate - x_n_complex norm_u = np.linalg.norm(update) if norm_f > 0 and norm_u > 3.0 * norm_f: candidate = x_n_complex + update * (3.0 * norm_f / norm_u) logger.warning(f"Anderson step clamped (norm_u={norm_u:.3e}, norm_f={norm_f:.3e}).") # Update the new self energy niv = sigma_new.niv niv_core = config.box.niv_core sigma_new.mat[..., niv - niv_core : niv + niv_core] = candidate.reshape(shape) logger.info(f"Anderson acceleration applied (m={n_hist}, alpha={alpha:.3f}, norm_f={norm_f:.3e}).") return sigma_new sigma_new = alpha * sigma_new + (1 - alpha) * sigma_old logger.info(f"Sigma linearly mixed (m=1, alpha={alpha}).") return sigma_new