"""Per-iteration PSF calibration: sphot's mainloop hook.
`calibrate_psf_step` is called per Sersic+iPSF mainloop iteration via
`sphot.core._maybe_recalibrate_psf` when `[psf-calib] in_mainloop = true`.
Each call:
1. Pulls quality-passing anchors from `cutoutdata.psf_table`. With
no usable photometry the bootstrap blur grid picks a starting PSF
from source counts.
2. Fits a kernel `K` (Gaussian / Moffat / drizzle) against the
anchors via the multi-source NNLS objective.
3. Replaces `cutoutdata.psf` with `library_psf * K`.
4. Scans `dao_fwhm_factor` on a 3-point bracket and picks the factor
that maximises quality-passing source count under the new PSF.
"""
from __future__ import annotations
import warnings
from typing import Sequence
import numpy as np
# =====================================================================
# Section 1 — utility
# =====================================================================
def _normalize_psf(img):
img = np.asarray(img, dtype=float)
s = img.sum()
if s != 0 and np.isfinite(s):
img = img / s
return img
def _kernel_data_to_oversampled(K_data, oversample, target_oversampled_shape):
"""Up-sample a data-px kernel to oversampled-PSF resolution and
pad/crop to target shape. Result sums to 1 (preserves PSF flux
after convolution).
"""
from scipy.ndimage import zoom
K_oversampled = zoom(K_data, oversample, order=3, mode='reflect')
s = K_oversampled.sum()
if s > 0:
K_oversampled = K_oversampled / s
h_t, w_t = target_oversampled_shape
h_n, w_n = K_oversampled.shape
if h_n > h_t or w_n > w_t:
yy0 = (h_n - h_t) // 2
xx0 = (w_n - w_t) // 2
K_oversampled = K_oversampled[yy0:yy0 + h_t, xx0:xx0 + w_t]
elif h_n < h_t or w_n < w_t:
canvas = np.zeros((h_t, w_t), dtype=float)
y0 = (h_t - h_n) // 2
x0 = (w_t - w_n) // 2
canvas[y0:y0 + h_n, x0:x0 + w_n] = K_oversampled
K_oversampled = canvas
s = K_oversampled.sum()
if s > 0:
K_oversampled = K_oversampled / s
return K_oversampled
def _apply_kernel(library_psf_oversampled, K_oversampled):
"""library * K, normalised. Returns library_psf size."""
from scipy.signal import fftconvolve
out = fftconvolve(library_psf_oversampled, K_oversampled, mode='same')
out[out < 0] = 0
s = out.sum()
if s > 0:
out = out / s
return out
def _crop_centered(img, half):
"""Return the centered `(2*half+1)` square crop of `img`.
If the image is already smaller than the requested size, return as-is.
Used to shrink the library PSF for the kernel-fit's inner FFT and
spline interpolation; the full library is preserved for the final
`_build_effective_psf` call that updates `cd.psf`.
"""
H, W = img.shape
cy = (H - 1) // 2
cx = (W - 1) // 2
half = int(min(half, cy, cx))
return img[cy - half : cy + half + 1, cx - half : cx + half + 1]
def _empirical_fwhm_factor(psf_image, oversample, psf_sigma,
*, fit_half_pix=20, fwhm_init=None):
"""Compute the canonical `dao_fwhm_factor` from the calibrated PSF.
Runs `photutils.psf.fit_fwhm` on the central core of an oversampled PSF
image, converts the fitted FWHM to data pixels, and returns
`F_eff_in_data_px / psf_sigma`. This is the matched-filter-theorem
answer for DAO's FWHM argument and replaces the bracket scan when
`[psf-calib].fwhm_method == 'empirical'`.
Returns `None` if the fit fails or yields a non-finite value.
"""
try:
from photutils.psf import fit_fwhm
except Exception as e:
logger.warning(f'_empirical_fwhm_factor: photutils.psf.fit_fwhm '
f'unavailable ({e})')
return None
psf = np.asarray(psf_image, dtype=float)
psf = np.where(np.isfinite(psf), psf, 0.0)
H, W = psf.shape
cx, cy = (W - 1) / 2.0, (H - 1) / 2.0
fit_half = int(max(1, fit_half_pix * int(oversample)))
fit_shape = (2 * fit_half + 1, 2 * fit_half + 1)
init = float(fwhm_init) if fwhm_init is not None else 4.0 * float(oversample)
try:
fwhms = fit_fwhm(psf, xypos=[(cx, cy)], fit_shape=fit_shape, fwhm=init)
fwhm_oversampled = float(np.atleast_1d(fwhms)[0])
except Exception as e:
logger.warning(f'_empirical_fwhm_factor: fit_fwhm failed ({e})')
return None
if not np.isfinite(fwhm_oversampled) or fwhm_oversampled <= 0:
return None
fwhm_data_px = fwhm_oversampled / float(oversample)
if not np.isfinite(psf_sigma) or psf_sigma <= 0:
return None
return fwhm_data_px / float(psf_sigma)
[docs]
def effective_fwhm_data_px(cutoutdata):
"""FWHM of `cd.psf` in data pixels (post-kernel calibration).
Used for sizing the centre-mask radius and any other "where is the
PSF actually wide" calculation. Falls back to the library Gaussian
equivalent (2.355 × psf_sigma) if `photutils.psf.fit_fwhm` fails so
the caller always gets a finite, sane number.
"""
psf_sigma = float(getattr(cutoutdata, 'psf_sigma', 0.0))
fallback = 2.3548 * psf_sigma if psf_sigma > 0 else 0.0
psf_image = getattr(cutoutdata, 'psf', None)
oversample = int(getattr(cutoutdata, 'psf_oversample', 1) or 1)
if psf_image is None or not np.isfinite(psf_sigma) or psf_sigma <= 0:
return fallback
factor = _empirical_fwhm_factor(psf_image, oversample, psf_sigma)
if factor is None or not np.isfinite(factor) or factor <= 0:
return fallback
return float(factor) * psf_sigma
# =====================================================================
# Section 2 — kernel families
# =====================================================================
def _gaussian_kernel(shape, sigma):
sh = shape[0]
cy = (sh - 1) / 2.0
yy, xx = np.indices(shape)
rr = (xx - cy) ** 2 + (yy - cy) ** 2
if sigma <= 0:
out = np.zeros(shape)
out[int(round(cy)), int(round(cy))] = 1.0
return out
g = np.exp(-rr / (2.0 * sigma ** 2))
return g / g.sum()
def _moffat_kernel(shape, alpha, beta):
sh = shape[0]
cy = (sh - 1) / 2.0
yy, xx = np.indices(shape)
rr = np.sqrt((xx - cy) ** 2 + (yy - cy) ** 2)
if alpha <= 0 or beta <= 1.01:
return _gaussian_kernel(shape, max(alpha, 0.1))
m = (1.0 + (rr / alpha) ** 2) ** (-beta)
return m / m.sum()
def _drizzle_kernel(shape, sigma, pixel_size=1.0):
"""Drizzle pixel-square (top-hat) ⊗ Gaussian. Models drizzling's
pixel-response broadening + an optical Gaussian.
"""
sh = shape[0]
cy = (sh - 1) / 2.0
yy, xx = np.indices(shape)
in_pix = (np.abs(xx - cy) <= 0.5 * pixel_size) & \
(np.abs(yy - cy) <= 0.5 * pixel_size)
tophat = in_pix.astype(float)
if tophat.sum() == 0:
tophat[int(round(cy)), int(round(cy))] = 1.0
tophat = tophat / tophat.sum()
if sigma <= 0:
return tophat
g = _gaussian_kernel(shape, sigma)
from scipy.signal import fftconvolve
out = fftconvolve(tophat, g, mode='same')
out[out < 0] = 0
out = out / out.sum()
return out
def _build_kernel(family, params, shape):
if family == 'gaussian':
return _gaussian_kernel(shape, params['sigma_data'])
if family == 'moffat':
return _moffat_kernel(shape, params['alpha_data'], params['beta'])
if family == 'drizzle':
return _drizzle_kernel(shape, params['sigma_data'],
params.get('pixel_size', 1.0))
raise ValueError(f'unknown kernel family: {family}')
def _build_effective_psf(library_norm, oversample, family, params, half=20):
sh = 2 * half + 1
K_data = _build_kernel(family, params, (sh, sh))
K_oversampled = _kernel_data_to_oversampled(
K_data, oversample, library_norm.shape)
return _apply_kernel(library_norm, K_oversampled)
def _make_psf_evaluator(eff_psf_oversampled, oversample):
"""Fast PSF evaluator using `scipy.ndimage.map_coordinates`.
Bypasses photutils ImagePSF's `RectBivariateSpline` (which dominated
the kernel-fit profile at ~44%). Returns a callable
`evaluate(xx, yy, x_0, y_0) -> array` giving interpolated PSF
values at data positions (xx, yy) for a source centered at (x_0, y_0).
The output scale is arbitrary (not normalised to data-pixel flux) —
this is fine for the kernel fit because the per-anchor LM solves
for flux, so the residual is invariant to a constant scaling of m.
The spline pre-filter is computed ONCE here. `map_coordinates`
re-spline-filters the full input on every call when
`prefilter=True` (the default), which would dominate the profile;
we pass the pre-filtered image with `prefilter=False`.
"""
from scipy.ndimage import map_coordinates, spline_filter
H, W = eff_psf_oversampled.shape
cy = (H - 1) / 2.0
cx = (W - 1) / 2.0
img = np.ascontiguousarray(eff_psf_oversampled, dtype=float)
img_filtered = spline_filter(img, order=3, mode='constant')
def evaluate(xx, yy, x_0, y_0):
x_over = (xx - x_0) * oversample + cx
y_over = (yy - y_0) * oversample + cy
coords = np.stack([y_over.ravel(), x_over.ravel()], axis=0)
vals = map_coordinates(img_filtered, coords, order=3,
mode='constant', cval=0.0,
prefilter=False)
return vals.reshape(xx.shape)
return evaluate
def _per_source_residual_score_multi(
K_data, library_psf_oversampled, psf_oversample,
data, x_anchor, y_anchor, half=8,
*,
all_x, all_y, all_flux=None,
max_neighbors=15, neighbor_pad=4.0,
):
"""Per-anchor stamp residual with a **multi-source** closed-form
least-squares flux fit (positions pinned to iPSF centroids).
Counters the σ₂ runaway in crowded fields with wide PSFs: when a
stamp contains the anchor plus blended neighbours, the
single-source LM in `_per_source_residual_score` is forced to
explain the neighbours' flux via the model's wings — driving
`σ₂` to the upper bound. Here every source within
`half + neighbor_pad` data px gets its own column in the
design matrix, so neighbour flux is accounted for and the kernel
only has to match the *shape* of each source's PSF.
Parameters
----------
all_x, all_y : 1D arrays
Positions (data px) of ALL quality-passing sources from
`cd.psf_table`, used to populate per-stamp neighbour lists.
all_flux : 1D array, optional
Flux per source; used to keep the brightest neighbours when
a stamp's neighbour count exceeds `max_neighbors`.
"""
K_oversampled = _kernel_data_to_oversampled(
K_data, psf_oversample, library_psf_oversampled.shape)
eff_psf = _apply_kernel(library_psf_oversampled, K_oversampled)
evaluate_psf = _make_psf_evaluator(eff_psf, psf_oversample)
H, W = data.shape
yy0, xx0 = np.indices((2 * half + 1, 2 * half + 1), dtype=float)
radius = float(half) + float(neighbor_pad)
radius2 = radius * radius
all_x = np.asarray(all_x, dtype=float)
all_y = np.asarray(all_y, dtype=float)
if all_flux is not None:
all_flux = np.asarray(all_flux, dtype=float)
total = 0.0
n_pix = 0
for x_a, y_a in zip(x_anchor, y_anchor):
ix = int(round(x_a)); iy = int(round(y_a))
if (ix - half < 0 or iy - half < 0
or ix + half + 1 > W or iy + half + 1 > H):
continue
sy = slice(iy - half, iy + half + 1)
sx = slice(ix - half, ix + half + 1)
stamp = np.asarray(data[sy, sx], dtype=float)
finite = np.isfinite(stamp)
if not finite.any():
continue
if not finite.all():
stamp = np.where(finite, stamp, 0.0)
xx = xx0 + (ix - half)
yy = yy0 + (iy - half)
d2 = (all_x - x_a) ** 2 + (all_y - y_a) ** 2
in_range = d2 <= radius2
nx = all_x[in_range]; ny = all_y[in_range]
if len(nx) > max_neighbors:
if all_flux is not None:
order = np.argsort(-all_flux[in_range])[:max_neighbors]
else:
order = np.argsort(d2[in_range])[:max_neighbors]
nx = nx[order]; ny = ny[order]
n_n = len(nx)
if n_n == 0:
# Anchor itself fell outside the search radius? Shouldn't
# happen, but be defensive: fall back to anchor-only.
nx = np.array([x_a]); ny = np.array([y_a]); n_n = 1
# Design matrix: one column per source + 1 pedestal column.
L = np.empty((stamp.size, n_n + 1), dtype=float)
for j in range(n_n):
mj = evaluate_psf(xx, yy, nx[j], ny[j])
mj = np.where(np.isfinite(mj), mj, 0.0)
L[:, j] = mj.ravel()
L[:, -1] = 1.0
finite_flat = finite.ravel().astype(float)
L_w = L * finite_flat[:, None]
b_w = stamp.ravel() * finite_flat
try:
params, *_ = np.linalg.lstsq(L_w, b_w, rcond=None)
resid = b_w - L_w @ params
except Exception:
# Numerically degenerate (e.g., colocated sources); skip.
resid = b_w
total += float(np.sum(resid * resid))
n_pix += int(finite.sum())
return total / max(n_pix, 1)
def _score_blur_candidate_residual(
psf_cand_oversampled, psf_oversample, psf_sigma_cand,
data, phot, s_pass, *, bkg_std, K=30,
neighbor_pad=4.0, max_neighbors=15,
):
"""Score a bootstrap blur candidate by median per-anchor stamp
residual MSE, normalized by bkg_std² so candidates with different
PSF widths produce comparable numbers.
Picks the top-K brightest passing sources as anchors. Each stamp
is half-size = max(8, ceil(3·psf_sigma_cand)) so the wings fit
inside; design matrix includes neighbour columns within
`half + neighbor_pad` data px (same pattern as
_per_source_residual_score_multi) so blended neighbours don't
pollute the score.
Returns np.inf when there aren't enough usable anchors to score
reliably — bootstrap's min-selection then naturally rejects that
candidate.
"""
if phot is None or len(phot) == 0:
return float('inf')
s_pass = np.asarray(s_pass, dtype=bool)
if s_pass.sum() < 1:
return float('inf')
x_all = np.asarray(phot['x_fit'], dtype=float)
y_all = np.asarray(phot['y_fit'], dtype=float)
f_all = np.asarray(phot['flux_fit'], dtype=float)
finite = np.isfinite(x_all) & np.isfinite(y_all) & np.isfinite(f_all)
pass_finite = s_pass & finite
if pass_finite.sum() < 1:
return float('inf')
x_p = x_all[pass_finite]
y_p = y_all[pass_finite]
f_p = f_all[pass_finite]
order = np.argsort(-f_p)[:K]
x_anchor = x_p[order]
y_anchor = y_p[order]
# All passing sources (with their flux) act as the neighbour pool
# for the design matrix.
all_x = x_p
all_y = y_p
all_flux = f_p
half = max(8, int(np.ceil(3.0 * float(psf_sigma_cand))))
H, W = data.shape
yy0, xx0 = np.indices((2 * half + 1, 2 * half + 1), dtype=float)
radius = float(half) + float(neighbor_pad)
radius2 = radius * radius
evaluate_psf = _make_psf_evaluator(psf_cand_oversampled, psf_oversample)
bkg_var = max(float(bkg_std) ** 2, 1e-30)
per_source = []
for x_a, y_a in zip(x_anchor, y_anchor):
ix = int(round(x_a)); iy = int(round(y_a))
if (ix - half < 0 or iy - half < 0
or ix + half + 1 > W or iy + half + 1 > H):
continue
sy = slice(iy - half, iy + half + 1)
sx = slice(ix - half, ix + half + 1)
stamp = np.asarray(data[sy, sx], dtype=float)
finite_st = np.isfinite(stamp)
if not finite_st.any():
continue
if not finite_st.all():
stamp = np.where(finite_st, stamp, 0.0)
xx = xx0 + (ix - half)
yy = yy0 + (iy - half)
d2 = (all_x - x_a) ** 2 + (all_y - y_a) ** 2
in_range = d2 <= radius2
nx = all_x[in_range]; ny = all_y[in_range]
if len(nx) > max_neighbors:
order_n = np.argsort(-all_flux[in_range])[:max_neighbors]
nx = nx[order_n]; ny = ny[order_n]
n_n = len(nx)
if n_n == 0:
nx = np.array([x_a]); ny = np.array([y_a]); n_n = 1
L = np.empty((stamp.size, n_n + 1), dtype=float)
for j in range(n_n):
mj = evaluate_psf(xx, yy, nx[j], ny[j])
mj = np.where(np.isfinite(mj), mj, 0.0)
L[:, j] = mj.ravel()
L[:, -1] = 1.0
finite_flat = finite_st.ravel().astype(float)
L_w = L * finite_flat[:, None]
b_w = stamp.ravel() * finite_flat
try:
params, *_ = np.linalg.lstsq(L_w, b_w, rcond=None)
resid = b_w - L_w @ params
except Exception:
continue
n_pix_eff = max(int(finite_st.sum()), 1)
per_source.append(float(np.sum(resid * resid)) / n_pix_eff)
if len(per_source) < 1:
return float('inf')
return float(np.nanmedian(per_source)) / bkg_var
def _per_source_residual_score(
K_data, library_psf_oversampled, psf_oversample,
data, x_arr, y_arr, flux_arr, half=8,
):
"""Per-source residual L2 with a per-anchor (x, y, f, c) PSF refit.
For each anchor we run a 4-parameter Levenberg-Marquardt fit of
(dx, dy, flux, pedestal) against the stamp, with the candidate
kernel-blurred PSF as the model. The input `flux_arr`, `x_arr`,
`y_arr` from the prior iteration's photometry are used only as
initial guesses — the LM resolves the centroid/flux bias that the
too-sharp prior PSF introduces, so the kernel optimizer sees a
fair "shape match" score independent of the prior's flux scale.
"""
from scipy.optimize import least_squares
K_oversampled = _kernel_data_to_oversampled(
K_data, psf_oversample, library_psf_oversampled.shape)
eff_psf = _apply_kernel(library_psf_oversampled, K_oversampled)
evaluate_psf = _make_psf_evaluator(eff_psf, psf_oversample)
H, W = data.shape
yy0, xx0 = np.indices((2 * half + 1, 2 * half + 1), dtype=float)
total = 0.0
n_pix = 0
xy_bound = 4.0
max_nfev = 10
for x_init, y_init, f_init in zip(x_arr, y_arr, flux_arr):
x0 = float(x_init); y0 = float(y_init); f0 = float(f_init)
ix = int(round(x0)); iy = int(round(y0))
if (ix - half < 0 or iy - half < 0
or ix + half + 1 > W or iy + half + 1 > H):
continue
sy = slice(iy - half, iy + half + 1)
sx = slice(ix - half, ix + half + 1)
stamp = np.asarray(data[sy, sx], dtype=float)
finite = np.isfinite(stamp)
if not finite.any():
continue
if not finite.all():
stamp = np.where(finite, stamp, 0.0)
xx = xx0 + (ix - half)
yy = yy0 + (iy - half)
# Closed-form (f, c) at the initial position seeds the LM with
# an unbiased flux estimate; LM then refines (dx, dy) and
# tweaks (f, c). Without this seed the LM starts at f=f_init
# which is biased low and can take many iterations to recover.
m0 = evaluate_psf(xx, yy, x0, y0)
m0 = np.where(np.isfinite(m0), m0, 0.0)
N = float(stamp.size)
Sm = float(m0.sum()); Smm = float(np.sum(m0 * m0))
Sd = float(stamp.sum()); Sdm = float(np.sum(stamp * m0))
det = Smm * N - Sm * Sm
if det > 0:
f_seed = (N * Sdm - Sm * Sd) / det
c_seed = (Smm * Sd - Sm * Sdm) / det
if f_seed <= 0:
f_seed = max(f0, 0.0); c_seed = 0.0
else:
f_seed = max(f0, 0.0); c_seed = 0.0
finite_flat = finite.ravel()
def residuals(p):
dx, dy, fl, c = p
m = evaluate_psf(xx, yy, x0 + dx, y0 + dy)
m = np.where(np.isfinite(m), m, 0.0)
return ((stamp - fl * m - c).ravel()) * finite_flat
try:
res = least_squares(
residuals,
[0.0, 0.0, float(f_seed), float(c_seed)],
bounds=([-xy_bound, -xy_bound, 0.0, -np.inf],
[ xy_bound, xy_bound, np.inf, np.inf]),
method='trf', max_nfev=max_nfev)
r = res.fun
except Exception:
# LM failure on a single anchor mustn't kill the whole
# kernel evaluation; fall back to the closed-form score.
r = ((stamp - f_seed * m0 - c_seed).ravel()) * finite_flat
total += float(np.sum(r * r))
n_pix += int(finite.sum())
return total / max(n_pix, 1)
# =====================================================================
# Section 3 — kernel fitters (per-source-residual objective)
# =====================================================================
def _fit_kernel_to_data(
library_psf_oversampled, psf_oversample, data,
x_arr, y_arr, flux_arr, family, half=8,
*, on_seed_done=None, objective='multi_source',
all_x=None, all_y=None, all_flux=None,
max_neighbors=15, neighbor_pad=4.0,
):
"""Fit a single-parameter kernel `family` to per-source residual.
`objective`: 'multi_source' (closed-form NNLS lstsq with each
nearby psf_table source as its own column; positions pinned) or
'single_source' (legacy per-anchor LM with free dx, dy, flux, c).
The multi_source path needs `all_x`, `all_y` (and optionally
`all_flux`).
`on_seed_done` is a zero-arg callback fired after the (single)
minimisation completes — used to advance an outer progress bar.
"""
from scipy.optimize import minimize, minimize_scalar
sh = 2 * max(half, 10) + 1
kshape = (sh, sh)
if objective == 'multi_source' and all_x is not None and all_y is not None:
def loss(K_data):
return _per_source_residual_score_multi(
K_data, library_psf_oversampled, psf_oversample,
data, x_arr, y_arr, half=half,
all_x=all_x, all_y=all_y, all_flux=all_flux,
max_neighbors=max_neighbors, neighbor_pad=neighbor_pad)
else:
def loss(K_data):
return _per_source_residual_score(
K_data, library_psf_oversampled, psf_oversample,
data, x_arr, y_arr, flux_arr, half=half)
def _tick():
if on_seed_done is not None:
try: on_seed_done()
except Exception: pass
if family == 'gaussian':
res = minimize_scalar(
lambda s: loss(_gaussian_kernel(kshape, s)),
bounds=(0.01, 5.0), method='bounded')
K = _gaussian_kernel(kshape, res.x)
_tick()
return {'family': family,
'params': {'sigma_data': float(res.x)},
'kernel_data': K, 'residual_l2': float(res.fun),
'n_seeds': 1}
if family == 'moffat':
res = minimize(
lambda x: loss(_moffat_kernel(kshape, x[0], x[1])),
np.array([1.5, 3.0]), method='L-BFGS-B',
bounds=[(0.3, 6.0), (1.5, 10.0)])
K = _moffat_kernel(kshape, res.x[0], res.x[1])
_tick()
return {'family': family,
'params': {'alpha_data': float(res.x[0]),
'beta': float(res.x[1])},
'kernel_data': K, 'residual_l2': float(res.fun),
'n_seeds': 1}
if family == 'drizzle':
res = minimize_scalar(
lambda s: loss(_drizzle_kernel(kshape, s)),
bounds=(0.01, 4.0), method='bounded')
K = _drizzle_kernel(kshape, res.x)
_tick()
return {'family': family,
'params': {'sigma_data': float(res.x), 'pixel_size': 1.0},
'kernel_data': K, 'residual_l2': float(res.fun),
'n_seeds': 1}
raise ValueError(f'unknown kernel family: {family!r} '
f'(supported: gaussian, moffat, drizzle)')
# =====================================================================
# Section 4 — DAO fwhm scan via temporary _prepare_psf_fitters patch
# =====================================================================
def _single_pass_phot(data, psf_oversampled, oversample, psf_sigma,
th=None, fwhm_factor=None, mask=None):
"""One photometry pass at threshold `th` (default config th_min).
Optionally monkey-patches `_prepare_psf_fitters` so DAO uses
`psf_sigma * fwhm_factor` as the matched-filter width instead of
sphot's hard-coded 2.33.
Bootstrap and fwhm scans only need to RANK candidates by
n_passing; full ladder iteration would be ~10x slower for the
same ranking signal.
`mask` (optional 2D bool array, True = exclude) is forwarded to
`do_psf_photometry` so DAO + LM both skip the masked region —
used to keep the under-modelled Sersic core out of bootstrap and
scan candidate evaluation.
"""
from photutils.psf import ImagePSF, PSFPhotometry, SourceGrouper
from photutils.detection import DAOStarFinder
from photutils.background import MMMBackground, LocalBackground, MADStdBackgroundRMS
from . import psf as sp
from .psf import do_psf_photometry
from .config import config
psf_model = ImagePSF(psf_oversampled, flux=1.0,
oversampling=oversample, fill_value=0.0)
if th is None:
th = float(config['psf'].get('th_min', 1.0))
bkg_std = float(MADStdBackgroundRMS()(np.nan_to_num(data, nan=0.0)))
data_error = np.full_like(data, bkg_std, dtype=float)
original = sp._prepare_psf_fitters
if fwhm_factor is not None:
def patched(th_, pmod, bkg_std_, ps):
finder_kwargs = config['psf']['finder_kwargs']
daofinder = DAOStarFinder(
threshold=th_ * bkg_std_,
fwhm=ps * fwhm_factor,
**finder_kwargs)
grouper = SourceGrouper(
min_separation=config['psf']['grouper_separation_in_psfsigma'] * ps)
localbkg = LocalBackground(
config['psf']['localbkg_bounds_in_psfsigma'][0] * ps,
config['psf']['localbkg_bounds_in_psfsigma'][1] * ps,
MMMBackground())
from .psf import _resolve_fit_shape
return PSFPhotometry(
pmod,
fit_shape=_resolve_fit_shape(ps),
finder=daofinder, grouper=grouper,
local_bkg_estimator=localbkg,
aperture_radius=config['psf']['PSFPhotometry_aperture_radius'],
fitter_maxiters=config['psf']['PSFPhotometry_fitter_maxiters'],
group_warning_threshold=config['psf']['PSFPhotometry_group_warning_threshold'],
)
sp._prepare_psf_fitters = patched
try:
with warnings.catch_warnings():
warnings.simplefilter('ignore')
phot, _ = do_psf_photometry(
data, data_error, bkg_std,
psf_model, psf_sigma, th=th, plot=False, mask=mask)
finally:
sp._prepare_psf_fitters = original
return phot, bkg_std
def _scan_dao_fwhm_factor(
data, psf_oversampled, oversample, psf_sigma, fwhm_factor_grid,
log=print, on_progress=None, mask=None,
):
"""Return (best_factor, list_of_records). Best = max n_passing.
Uses single-pass photometry per candidate (~10x faster than
iterating the full threshold ladder); the relative ranking is
preserved, which is all the scan needs.
`on_progress` is an optional zero-arg callback invoked once per
grid candidate (success or failure) so an outer progress bar can
advance phase-by-phase.
"""
from .psf import filter_psfphot_results
results = []
for fac in fwhm_factor_grid:
try:
phot, bkg_std = _single_pass_phot(
data, psf_oversampled, oversample, psf_sigma,
fwhm_factor=fac, mask=mask)
except Exception as e:
log(f' [fwhm_scan] fac={fac:.2f}: error {e}')
if on_progress is not None:
try: on_progress()
except Exception: pass
continue
if phot is None or len(phot) == 0:
log(f' [fwhm_scan] fac={fac:.2f}: no detections')
if on_progress is not None:
try: on_progress()
except Exception: pass
continue
try:
s_pass, _ = filter_psfphot_results(phot, bkg_std=bkg_std)
n_pass = int(s_pass.sum())
except Exception:
n_pass = int(len(phot))
n_det = int(len(phot))
results.append({
'fwhm_factor': float(fac),
'dao_fwhm_data_px': float(psf_sigma * fac),
'n_det': n_det, 'n_pass': n_pass,
})
log(f' [fwhm_scan] fac={fac:.2f} (fwhm={psf_sigma*fac:.2f}px): '
f'n_det={n_det}, n_pass={n_pass}')
if on_progress is not None:
try: on_progress()
except Exception: pass
if not results:
return None, []
best = max(results, key=lambda r: r['n_pass'])
return best['fwhm_factor'], results
def _scan_dao_fwhm_factor_nnls(
data, psf_oversampled, oversample, psf_sigma, fwhm_factor_grid,
*, metric='sum_snr', top_k=100, th=None,
log=print, on_progress=None, mask=None,
):
"""NNLS-based variant of `_scan_dao_fwhm_factor`.
For each fwhm_factor candidate: DAOStarFinder finds positions, optional
top-K cap by DAO peak, then `forced_psf_photometry` (closed-form NNLS)
fits fluxes at those fixed positions in one solve. Ranking is done on
a quantitative metric of fit quality, not detection count, so the
top-K cap composes legitimately (no saturation bias).
metric : 'sum_snr' (max), 'resid_mad' (min), or 'count' (max)
top_k : 0 / None disables the cap
th : DAO threshold in bkg_std units; defaults to config['psf']['th_min']
"""
from photutils.psf import ImagePSF
from photutils.detection import DAOStarFinder
from photutils.background import MADStdBackgroundRMS
from .psf import forced_psf_photometry
from .config import config
psf_model = ImagePSF(psf_oversampled, flux=1.0,
oversampling=oversample, fill_value=0.0)
if th is None:
th = float(config['psf'].get('th_min', 1.0))
bkg_std = float(MADStdBackgroundRMS()(np.nan_to_num(data, nan=0.0)))
finder_kwargs = config['psf']['finder_kwargs']
snr_min = float(config['psf'].get('cuts_flux_SNR_min', 1.0))
results = []
for fac in fwhm_factor_grid:
try:
daofinder = DAOStarFinder(
threshold=th * bkg_std,
fwhm=psf_sigma * fac,
**finder_kwargs)
data_for_dao = np.where(np.isfinite(data), data, 0.0)
star_cat = daofinder.find_stars(data_for_dao, mask=mask)
except Exception as e:
log(f' [nnls_scan] fac={fac:.2f}: DAO error {e}')
if on_progress is not None:
try: on_progress()
except Exception: pass
continue
if star_cat is None or len(star_cat) == 0:
log(f' [nnls_scan] fac={fac:.2f}: no detections')
if on_progress is not None:
try: on_progress()
except Exception: pass
continue
x_init = np.asarray(star_cat['xcentroid'], dtype=float)
y_init = np.asarray(star_cat['ycentroid'], dtype=float)
peak = np.asarray(star_cat['peak'], dtype=float)
# honour the explicit center mask (DAO may still report sources
# whose centroids round to a masked pixel)
if mask is not None:
ix = np.clip(np.round(x_init).astype(int), 0, mask.shape[1] - 1)
iy = np.clip(np.round(y_init).astype(int), 0, mask.shape[0] - 1)
keep = ~mask[iy, ix]
x_init = x_init[keep]
y_init = y_init[keep]
peak = peak[keep]
if len(x_init) == 0:
results.append({'fwhm_factor': float(fac), 'n_det': 0, 'n_pass': 0,
'sum_snr': 0.0, 'resid_mad': np.inf})
continue
if top_k and len(x_init) > top_k:
order = np.argsort(-peak)[:int(top_k)]
x_init = x_init[order]
y_init = y_init[order]
phot, resid = forced_psf_photometry(
data, psf_model, psf_sigma, x_init, y_init,
center_mask_params=None,
)
sum_snr, resid_mad, n_pass = 0.0, np.inf, 0
if phot is not None and len(phot) > 0:
flux = np.asarray(phot['flux_fit'], dtype=float)
ferr = np.asarray(phot['flux_err'], dtype=float)
ferr = np.where(ferr > 0, ferr, np.inf)
snr = flux / ferr
valid = np.isfinite(snr) & (snr > 0)
if valid.any():
sum_snr = float(snr[valid].sum())
n_pass = int(((flux > 0) & (snr >= snr_min) & valid).sum())
if resid is not None:
r = resid.ravel()
r = r[np.isfinite(r)]
if r.size > 0:
resid_mad = float(np.median(np.abs(r - np.median(r))))
results.append({
'fwhm_factor': float(fac),
'n_det': int(len(x_init)), 'n_pass': n_pass,
'sum_snr': sum_snr, 'resid_mad': resid_mad,
})
log(f' [nnls_scan top_k={top_k} m={metric}] '
f'fac={fac:.2f}: n_det={int(len(x_init))} '
f'sum_snr={sum_snr:.1f} resid_mad={resid_mad:.3e}')
if on_progress is not None:
try: on_progress()
except Exception: pass
if not results:
return None, []
if metric == 'resid_mad':
best = min(results, key=lambda r: r['resid_mad'])
elif metric == 'count':
best = max(results, key=lambda r: r['n_pass'])
else: # 'sum_snr' default
best = max(results, key=lambda r: r['sum_snr'])
return best['fwhm_factor'], results
# =====================================================================
# Section 5 — per-iter helper: bootstrap blur
# =====================================================================
def _bootstrap_blur(
cutoutdata,
*,
initial_grid=(0.0, 1.0, 2.0, 3.0, 5.0, 7.0, 10.0),
expand_step=3.0,
max_blur=25.0,
log=print,
on_progress=None,
mask=None,
):
"""Find a sensible Gaussian blur from scratch when iPSF returned
nothing usable. Scans a grid of blurs (oversampled-PSF px) using
single-pass photometry; picks the blur with the most
quality-passing sources. If no candidate yields any quality-passing
sources, expands the grid by `expand_step` up to `max_blur` and
retries.
On success, calls `cutoutdata.blur_psf(best)` so cd.psf is updated
in place. Returns the chosen blur, or None if all attempts fail.
"""
from .psf import filter_psfphot_results
from .data import _calc_psf_sigma
from photutils.background import MADStdBackgroundRMS
from astropy.convolution import convolve as ap_convolve, Gaussian2DKernel
library = np.asarray(getattr(cutoutdata, '_psf_raw', cutoutdata.psf),
dtype=float)
library = _normalize_psf(library)
psf_oversample = int(cutoutdata.psf_oversample)
data = np.asarray(cutoutdata.sersic_residual)
bkg_std = float(MADStdBackgroundRMS()(np.nan_to_num(data, nan=0.0)))
grid = list(initial_grid)
seen = set()
attempts = []
while True:
for cand in grid:
if cand in seen:
continue
seen.add(cand)
psf_cand = library.copy()
if cand > 0:
with warnings.catch_warnings():
warnings.simplefilter('ignore')
psf_cand = ap_convolve(psf_cand, Gaussian2DKernel(cand))
psf_cand = _normalize_psf(psf_cand)
# Recompute psf_sigma from THIS candidate PSF. The library's
# Gaussian-fit sigma is set by the narrow diffraction core
# and stays narrow even after substantial blur unless we
# re-fit. Without this, DAO's fwhm input never widens and
# narrow-core filters (F277W, F090W) detect 0 sources at
# every candidate.
psf_sigma_cand = float(
_calc_psf_sigma(psf_cand, psf_oversample))
try:
phot, _ = _single_pass_phot(
data, psf_cand, psf_oversample, psf_sigma_cand,
mask=mask)
except Exception as e:
log(f'[bootstrap_blur] blur={cand:.2f}: phot error: {e}')
attempts.append((cand, 0, float('inf')))
if on_progress is not None:
try: on_progress()
except Exception: pass
continue
if phot is None or len(phot) == 0:
attempts.append((cand, 0, float('inf')))
log(f'[bootstrap_blur] blur={cand:.2f}: 0 detections')
if on_progress is not None:
try: on_progress()
except Exception: pass
continue
try:
s_pass, _ = filter_psfphot_results(phot, bkg_std=bkg_std)
s_pass = np.asarray(s_pass)
n_pass = int(s_pass.sum())
except Exception:
s_pass = np.ones(len(phot), dtype=bool)
n_pass = int(len(phot))
# Per-anchor stamp-residual MSE on top-K passing sources.
# Replaces the legacy n_pass score, which monotonically
# favored wider blurs (a wide Gaussian's wings absorb the
# un-modelled galaxy disk residual, so MORE detections
# "pass" quality cuts at large blurs even though they're
# actually fitting extended structure, not point sources).
# MSE rewards fits that actually subtract a star cleanly:
# real sources -> low residual; structure peaks -> high.
score = _score_blur_candidate_residual(
psf_cand, psf_oversample, psf_sigma_cand,
data, phot, s_pass, bkg_std=bkg_std, K=30)
attempts.append((cand, n_pass, score))
log(f'[bootstrap_blur] blur={cand:.2f}: n_pass={n_pass}, '
f'residual_MSE={score:.4g}')
if on_progress is not None:
try: on_progress()
except Exception: pass
# Pick lowest residual MSE among candidates that produced a
# finite score (i.e., at least a few passing anchors to fit).
scored = [(c, n, s) for c, n, s in attempts if np.isfinite(s)]
if scored:
best, n_best, mse_best = min(scored, key=lambda r: r[2])
log(f'[bootstrap_blur] chose blur={best:.2f} '
f'(n_pass={n_best}, residual_MSE={mse_best:.4g})')
try:
cutoutdata.blur_psf(float(best))
except Exception as e:
log(f'[bootstrap_blur] cd.blur_psf failed: {e}')
return None
return float(best)
# expand the grid upward
max_seen = max(seen)
if max_seen >= max_blur:
log(f'[bootstrap_blur] no candidate worked up to '
f'blur={max_blur}; giving up')
return None
new_blurs = []
b = max_seen + expand_step
while b <= max_blur:
new_blurs.append(round(b, 2))
b += expand_step
if not new_blurs:
log('[bootstrap_blur] grid exhausted')
return None
grid = new_blurs
log(f'[bootstrap_blur] expanding to {grid}')
# =====================================================================
# Section 6 — per-iter helpers: bracket fwhm + skip-on-stable
# =====================================================================
def _bracket_fwhm_grid(prev_fac, lo=0.5, hi=3.5):
"""3-point bracket centered on `prev_fac`, clamped to [lo, hi]."""
grid = sorted({
round(max(lo, prev_fac * 0.7), 3),
round(min(max(prev_fac, lo), hi), 3),
round(min(hi, prev_fac * 1.4), 3),
})
return grid
def _kernels_close(K_new, K_prev, threshold=1e-3):
"""Per-pixel RMS distance between two same-shape kernels < threshold.
Used to skip the fwhm scan when the calibration has stabilised.
"""
if K_prev is None or K_new is None:
return False
K_new = np.asarray(K_new, dtype=float)
K_prev = np.asarray(K_prev, dtype=float)
if K_new.shape != K_prev.shape:
return False
return float(np.sqrt(np.mean((K_new - K_prev) ** 2))) < threshold
# =====================================================================
# Section 7 — per-iter calibration step (in-mainloop hook)
# =====================================================================
[docs]
def calibrate_psf_step(
cutoutdata,
*,
family: str = 'gaussian',
K: int = 30,
fwhm_default_grid: Sequence[float] = (1.0, 1.5, 2.355),
bracket_lo: float = 0.5,
bracket_hi: float = 3.5,
skip_stable_threshold: float = 1e-3,
progress=None,
log=print,
):
"""One PSF calibration step. Reuses the just-completed photometry
and `cd.sersic_residual` to fit a kernel and update `cd.psf`.
Steps:
1. Pull (x, y, flux) of the K brightest quality-passing sources
from `cd.psf_table` (or run bootstrap blur if none).
2. Fit kernel parameters via the multi-source NNLS objective.
3. `cd.psf <- library * K`.
4. Bracket-scan `dao_fwhm_factor`, or reuse the previous one if
the kernel L2 distance is below `skip_stable_threshold`.
5. Persist `cd.kernel_params`, `cd.dao_fwhm_factor`,
`cd._calibrate_psf_prev_kernel`.
`family`: 'gaussian' (default) | 'moffat' | 'drizzle'.
"""
import time
from .psf import filter_psfphot_results, _build_center_mask
from .config import config
from photutils.background import MADStdBackgroundRMS
t0 = time.time()
sersic_residual = np.asarray(cutoutdata.sersic_residual)
# Treat the FIRST cd.psf we see as the un-blurred library, since
# subsequent calibration applies K on top of `cd._psf_raw` and
# would otherwise compose K onto an already-modulated PSF.
if (not hasattr(cutoutdata, '_psf_raw')
or getattr(cutoutdata, '_psf_raw', None) is None):
cutoutdata._psf_raw = np.asarray(cutoutdata.psf, dtype=float).copy()
library_psf = np.asarray(cutoutdata._psf_raw, dtype=float)
library_norm = _normalize_psf(library_psf)
psf_oversample = int(cutoutdata.psf_oversample)
psf_sigma = float(cutoutdata.psf_sigma)
bkg_std = float(MADStdBackgroundRMS()(
np.nan_to_num(sersic_residual, nan=0.0)))
# Build the center-exclusion mask used by the bootstrap blur scan,
# fwhm scan, and anchor selection. Radius is in units of the
# galaxy's own size: cd.galaxy_size_sersic when a Sersic fit has
# produced an r_eff, otherwise cd.galaxy_size (initial Gaussian
# σ-guess from prep). This is intentionally distinct from the iPSF
# mask in [psf] which is sized in PSF FWHM — the calibrator needs
# an aggressive mask that covers the whole bright galaxy region
# so Sersic-fit residuals don't slip through as fake sources.
center_mask = None
center_mask_params = None
try:
sp = cutoutdata.sersic_params_physical
cm_factor = float(config.get('psf-calib', {}).get(
'center_mask_r_in_galaxy_size', 1.5))
if cm_factor > 0:
gsize = float(getattr(cutoutdata, 'galaxy_size_sersic',
cutoutdata.galaxy_size))
# Elliptical mask shaped by the Sersic ellip + theta so
# inclined galaxies aren't masked as a face-on circle.
# semi-major a = gsize × cm_factor (along Sersic +theta axis)
# semi-minor b = a × (1 - ellip)
# Caps the semi-major at 45% of the smaller residual axis;
# the semi-minor is scaled proportionally so the ellipse
# keeps its axis ratio under the cap.
a_uncapped = gsize * cm_factor
cap = 0.45 * float(min(sersic_residual.shape))
scale = min(1.0, cap / a_uncapped) if a_uncapped > 0 else 0.0
a = a_uncapped * scale
ellip = float(sp.get('ellip', 0.0))
ellip = max(0.0, min(0.99, ellip))
b = a * (1.0 - ellip)
theta = float(sp.get('theta', 0.0))
center_mask_params = [float(sp['x_0']), float(sp['y_0']),
a, b, theta]
center_mask = _build_center_mask(sersic_residual.shape,
center_mask_params)
except Exception:
center_mask = None
center_mask_params = None
# 1. extract anchors via a dedicated calibration photometry.
# The science psf_table comes from a position-PINNED final joint refit
# (xy_bounds≈0). Those pinned anchors are a poor, unstable input for the
# kernel fit: photutils flags ~every source with bit 32, and (more
# importantly) the pinned positions / smaller detection set drive the
# kernel width to its bound in sparse fields. So when
# [psf-calib].calib_xy_bounds is set we re-run a dedicated anchor
# photometry on sersic_residual with that RELAXED position bound —
# positions are free to refine to true centroids, the fits don't trip
# flag 32, and the kernel fit gets an accurate, stable anchor set. This
# is scoped to calibration only: cd.psf_table / psf_sub_data (the
# science products) are never written here, and the main iPSF fit is
# untouched.
calib_xy_bounds = config.get('psf-calib', {}).get('calib_xy_bounds', None)
if calib_xy_bounds is not None:
from .psf import iterative_psf_fitting
from photutils.psf import ImagePSF
anchor_psf_model = ImagePSF(
np.asarray(cutoutdata.psf, dtype=float), flux=1.0,
x_0=0, y_0=0, oversampling=psf_oversample, fill_value=0.0)
# Single low-threshold pass: the full high->low ladder early-exits
# on consecutive-empty high thresholds before reaching th_min where
# the sources are; the joint refit's own leftover-detection loop
# recovers fainter sources iteratively.
th_min = config['psf'].get('th_min', 1.5)
threshold_list = np.array([float(th_min)])
_saved_xyb = config['psf'].get('final_refit_xy_bounds', 0.0)
config['psf']['final_refit_xy_bounds'] = float(calib_xy_bounds)
try:
with warnings.catch_warnings():
warnings.simplefilter('ignore')
# center_mask_params=None: iterative_psf_fitting's internal
# post-fit filter unpacks a 3-tuple and chokes on the
# 5-tuple elliptical form (silently zeroing the result). The
# calibrator re-applies its own elliptical mask below.
phot, _resid_calib = iterative_psf_fitting(
sersic_residual, anchor_psf_model, psf_sigma,
threshold_list=threshold_list,
center_mask_params=None)
except Exception as _e:
log(f'[calibrate_psf_step] relaxed-bound anchor photometry '
f'failed: {_e}; falling back to cd.psf_table')
phot = cutoutdata.psf_table
finally:
config['psf']['final_refit_xy_bounds'] = _saved_xyb
else:
phot = cutoutdata.psf_table
no_phot = phot is None or len(phot) == 0
if not no_phot:
try:
s_pass, _ = filter_psfphot_results(phot, bkg_std=bkg_std)
s_pass = np.asarray(s_pass)
except Exception:
s_pass = np.ones(len(phot), dtype=bool)
no_pass = int(s_pass.sum()) == 0
else:
no_pass = True
if no_phot or no_pass:
# iPSF produced no usable sources — run an expanding-range blur
# scan to recover a sensible PSF; next iter's iPSF picks it up.
log('[calibrate_psf_step] no usable photometry '
f'(n_phot={0 if no_phot else len(phot)}) -> blur bootstrap')
bootstrap_grid = (0.0, 1.0, 2.0, 3.0, 5.0, 7.0, 10.0)
if progress is not None:
boot_task = progress.add_task(
'Calibrating PSF (bootstrap blur)',
total=len(bootstrap_grid))
def _tick_bootstrap():
progress.update(boot_task, advance=1, refresh=True)
else:
boot_task = None
_tick_bootstrap = None
try:
chosen = _bootstrap_blur(
cutoutdata, initial_grid=bootstrap_grid, log=log,
on_progress=_tick_bootstrap, mask=center_mask)
finally:
if progress is not None and boot_task is not None:
try: progress.remove_task(boot_task)
except Exception: pass
if chosen is None:
log('[calibrate_psf_step] blur bootstrap failed; '
'leaving cd.psf unchanged')
return None
# Reset per-cutout state; the kernel/fwhm will be recalibrated on
# the next call once iPSF produces a phot_table.
cutoutdata.kernel_params = None
cutoutdata.dao_fwhm_factor = 2.33
cutoutdata._calibrate_psf_prev_kernel = None
config['psf']['dao_fwhm_factor'] = 2.33
return {
'kernel_family': None,
'kernel_params': None,
'dao_fwhm_factor': 2.33,
'bootstrapped_blur': float(chosen),
'wall_s': float(time.time() - t0),
}
phot_pass = phot[s_pass]
x = np.asarray(phot_pass['x_fit'], dtype=float)
y = np.asarray(phot_pass['y_fit'], dtype=float)
f = np.asarray(phot_pass['flux_fit'], dtype=float)
valid = np.isfinite(x) & np.isfinite(y) & np.isfinite(f) & (f > 0)
# Drop any anchors that fall inside the centre-mask. iPSF already
# excludes them via PSFPhotometry's mask, but stale cd.psf_table
# from older runs may still carry such rows. Matches the elliptical
# vs circular form used when the mask was built.
if center_mask_params is not None:
if len(center_mask_params) == 3:
x_c, y_c, r_c = center_mask_params
valid &= ((x - x_c) ** 2 + (y - y_c) ** 2 > r_c ** 2)
elif len(center_mask_params) == 5:
x_c, y_c, a_c, b_c, theta_c = center_mask_params
dx = x - x_c
dy = y - y_c
ct, st = np.cos(theta_c), np.sin(theta_c)
x_rot = dx * ct + dy * st
y_rot = -dx * st + dy * ct
valid &= (x_rot / a_c) ** 2 + (y_rot / b_c) ** 2 > 1.0
x, y, f = x[valid], y[valid], f[valid]
if len(x) == 0:
log('[calibrate_psf_step] no anchors outside mask; skipping')
return None
# Full anchor pool for the multi-source neighbour lookup; the top-K
# brightest are the fit anchors.
all_x_pass = x.copy()
all_y_pass = y.copy()
all_flux_pass = f.copy()
order = np.argsort(-f)[:K]
x_anchor, y_anchor, f_anchor = x[order], y[order], f[order]
n_pass_input = int(len(x))
# 2. kernel fit (per-source residual against sersic_residual)
prev_params = getattr(cutoutdata, 'kernel_params', None)
cfg_calib = config.get('psf-calib', {})
objective = str(cfg_calib.get('kernel_fit_objective',
'multi_source')).lower()
max_neighbors = int(cfg_calib.get('kernel_fit_max_neighbors', 15))
neighbor_pad = float(cfg_calib.get('kernel_fit_neighbor_pad_pix', 4.0))
# Stamp size: multi_source scales with the effective PSF so the
# wings actually fit inside the stamp; single_source keeps 17×17.
stamp_half_min = int(cfg_calib.get('kernel_fit_stamp_half_min', 8))
stamp_half_factor = float(cfg_calib.get(
'kernel_fit_stamp_half_in_sigmaeff', 3.0))
if objective == 'multi_source':
if prev_params is not None and 'sigma_data' in prev_params:
sigma_kernel_prev = float(prev_params['sigma_data'])
else:
sigma_kernel_prev = 1.5
sigma_eff_est = float(np.sqrt(psf_sigma ** 2 + sigma_kernel_prev ** 2))
half_anchor = max(stamp_half_min,
int(np.ceil(stamp_half_factor * sigma_eff_est)))
else:
half_anchor = stamp_half_min
# Progress task: 1 kernel-fit phase + 3 fwhm-scan phases. The scan
# portion is bulk-advanced if skip-on-stable fires.
n_seeds_expected = 1
n_scan_phases = 3
if progress is not None:
calib_task = progress.add_task(
f'Calibrating PSF (kernel fit 0/{n_seeds_expected})',
total=n_seeds_expected + n_scan_phases)
seeds_done = [0]
def _tick_seed():
seeds_done[0] += 1
try:
progress.update(
calib_task, advance=1,
description=(f'Calibrating PSF '
f'(kernel fit {seeds_done[0]}/'
f'{n_seeds_expected})'),
refresh=True)
except Exception:
pass
else:
calib_task = None
_tick_seed = None
# Crop the library to the central region used by the fit. The fit
# only needs eff_psf within ±half_anchor data px of each anchor,
# plus the kernel's own support; cropping shrinks the per-call
# fftconvolve and spline interpolations dramatically (663²→401²
# is ~2.7× faster). Use the full library for the final eff_psf
# written to cd.psf.
fit_half_oversampled = max(
200, int(np.ceil(10.0 * psf_sigma * psf_oversample)))
library_for_fit = _normalize_psf(
_crop_centered(library_norm, fit_half_oversampled))
try:
fit = _fit_kernel_to_data(
library_for_fit, psf_oversample, sersic_residual,
x_anchor, y_anchor, f_anchor,
family, half=half_anchor,
on_seed_done=_tick_seed,
objective=objective,
all_x=all_x_pass, all_y=all_y_pass, all_flux=all_flux_pass,
max_neighbors=max_neighbors, neighbor_pad=neighbor_pad)
except Exception:
if progress is not None and calib_task is not None:
try: progress.remove_task(calib_task)
except Exception: pass
raise
log(f'[calibrate_psf_step] kernel={family} '
f'params={fit["params"]}, fit_l2={fit["residual_l2"]:.3g}')
# 3. apply: cd.psf ← library * K
new_psf = _build_effective_psf(
library_norm, psf_oversample, family, fit['params'],
half=max(20, int(np.ceil(6.0 * psf_sigma))))
cutoutdata.psf = new_psf
# Refresh cd.psf_sigma from the new effective PSF so downstream
# consumers (DAO fwhm, fit_shape, sky-annulus radii, grouper
# separation, dedup distance) see the calibrated width instead of
# the stale library Gaussian-fit. Update the local psf_sigma too so
# the rest of this function (empirical fwhm fit, anchor stamps,
# scan grid) operates on the calibrated value.
cutoutdata.calc_psf_sigma()
psf_sigma = float(cutoutdata.psf_sigma)
# 4. skip-on-stable check (only honoured when the user opts in via
# `[psf-calib].kernel_fit_skip_on_convergence`). Default off — the
# fwhm scan runs every iteration so a still-evolving kernel always
# gets a fresh matched-filter width.
skip_on_conv = bool(cfg_calib.get('kernel_fit_skip_on_convergence', False))
prev_kernel = getattr(cutoutdata, '_calibrate_psf_prev_kernel', None)
skip_scan = (skip_on_conv
and _kernels_close(fit['kernel_data'], prev_kernel,
threshold=skip_stable_threshold))
prev_fac = getattr(cutoutdata, 'dao_fwhm_factor', None)
fwhm_method = str(cfg_calib.get('fwhm_method', 'scan')).lower()
if skip_scan and prev_fac is not None:
log(f'[calibrate_psf_step] kernel stable (Δrms<{skip_stable_threshold});'
f' reusing fwhm_factor={prev_fac}')
chosen_fac = float(prev_fac)
scan_records = None
# Bulk-advance through the (skipped) scan phases so the bar
# reaches 100% rather than appearing stuck at the kernel-fit step.
if progress is not None and calib_task is not None:
try:
progress.update(
calib_task, advance=n_scan_phases,
description='Calibrating PSF (scan skipped)',
refresh=True)
except Exception:
pass
elif fwhm_method == 'empirical':
# Skip the bracket scan entirely. Use photutils.fit_fwhm on the
# calibrated PSF (matched-filter theorem). Falls back to prev_fac
# if the fit fails so a transient numerical issue doesn't blow up
# the whole calibration.
emp_fac = _empirical_fwhm_factor(new_psf, psf_oversample, psf_sigma)
if emp_fac is None or not np.isfinite(emp_fac):
chosen_fac = float(prev_fac if prev_fac is not None else 2.355)
log(f'[calibrate_psf_step] empirical FWHM fit failed; '
f'reusing fwhm_factor={chosen_fac}')
else:
chosen_fac = float(emp_fac)
log(f'[calibrate_psf_step] empirical FWHM: '
f'fwhm_factor={chosen_fac:.4f} '
f'(F_eff={chosen_fac*psf_sigma:.3f} data px)')
scan_records = None
if progress is not None and calib_task is not None:
try:
progress.update(
calib_task, advance=n_scan_phases,
description='Calibrating PSF (empirical FWHM)',
refresh=True)
except Exception:
pass
else:
if prev_fac is None:
grid = list(fwhm_default_grid)
else:
grid = _bracket_fwhm_grid(prev_fac, lo=bracket_lo, hi=bracket_hi)
if progress is not None and calib_task is not None:
scan_done = [0]
def _tick_scan():
scan_done[0] += 1
try:
progress.update(
calib_task, advance=1,
description=(f'Calibrating PSF '
f'(fwhm scan {scan_done[0]}/'
f'{len(grid)})'),
refresh=True)
except Exception:
pass
else:
_tick_scan = None
scan_method = str(cfg_calib.get('scan_method', 'lm')).lower()
if scan_method == 'nnls':
scan_metric = str(cfg_calib.get('scan_metric', 'sum_snr')).lower()
scan_top_k = int(cfg_calib.get('scan_top_k', 100) or 0)
chosen_fac, scan_records = _scan_dao_fwhm_factor_nnls(
sersic_residual, new_psf, psf_oversample, psf_sigma,
grid, metric=scan_metric, top_k=scan_top_k,
log=log, on_progress=_tick_scan, mask=center_mask)
else:
chosen_fac, scan_records = _scan_dao_fwhm_factor(
sersic_residual, new_psf, psf_oversample, psf_sigma,
grid, log=log, on_progress=_tick_scan,
mask=center_mask)
if chosen_fac is None:
chosen_fac = float(prev_fac if prev_fac is not None else 2.33)
# 5. publish dao_fwhm_factor for the next iPSF call
config['psf']['dao_fwhm_factor'] = float(chosen_fac)
# 6. persist state on cutoutdata
cutoutdata.kernel_family = family
cutoutdata.kernel_params = dict(fit['params'])
cutoutdata.dao_fwhm_factor = float(chosen_fac)
cutoutdata._calibrate_psf_prev_kernel = np.asarray(
fit['kernel_data'], dtype=float).copy()
# diagnostic n_pass: pull from the chosen scan record if scan ran
n_pass_after = n_pass_input
if scan_records:
for rec in scan_records:
if abs(rec['fwhm_factor'] - chosen_fac) < 1e-6:
n_pass_after = int(rec['n_pass'])
break
out = {
'kernel_family': family,
'kernel_params': dict(fit['params']),
'dao_fwhm_factor': float(chosen_fac),
'n_passing_input': n_pass_input,
'n_passing_post_scan': n_pass_after,
'fit_residual_l2': float(fit['residual_l2']),
'skipped_scan': bool(skip_scan and prev_fac is not None),
'wall_s': float(time.time() - t0),
}
log(f'[calibrate_psf_step] done in {out["wall_s"]:.1f}s '
f'(skipped_scan={out["skipped_scan"]}, '
f'fwhm_factor={chosen_fac:.2f})')
if progress is not None and calib_task is not None:
try:
progress.remove_task(calib_task)
except Exception:
pass
return out