Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 89 additions & 8 deletions python/lsst/pipe/tasks/functors.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"ComputePixelScale", "ConvertPixelToArcseconds",
"ConvertPixelSqToArcsecondsSq",
"ConvertDetectorAngleToPositionAngle",
"ConvertDetectorAngleErrToPositionAngleErr",
"ReferenceBand", "Photometry",
"NanoJansky", "NanoJanskyErr", "LocalPhotometry", "LocalNanojansky",
"LocalNanojanskyErr", "LocalDipoleMeanFlux",
Expand Down Expand Up @@ -1394,6 +1395,46 @@ def getPositionAngleFromDetectorAngle(self, theta, cd11, cd12, cd21, cd22):
# Position angle of vector from (RA1, Dec1) to (RA2, Dec2)
return np.rad2deg(self.computePositionAngle(ra1, dec1, ra2, dec2))

def getPositionAngleErrFromDetectorAngleErr(self, theta, theta_err, cd11,
cd12, cd21, cd22):
"""Compute position angle error (E of N) from detector angle error.

Parameters
----------
theta : `float`
detector angle [radian]
theta_err : `float`
detector angle err [radian]
cd11 : `float`
[1, 1] element of the local Wcs affine transform.
cd12 : `float`
[1, 2] element of the local Wcs affine transform.
cd21 : `float`
[2, 1] element of the local Wcs affine transform.
cd22 : `float`
[2, 2] element of the local Wcs affine transform.

Returns
-------
Position Angle Error: `~pandas.Series`
Position angle error in degrees

Notes
-----
The error is estimated by evaluating the position angle at
``theta ± theta_err`` and taking half the absolute difference.
"""
pa_plus = self.getPositionAngleFromDetectorAngle(
theta + theta_err, cd11, cd12, cd21, cd22
)
pa_minus = self.getPositionAngleFromDetectorAngle(
theta - theta_err, cd11, cd12, cd21, cd22
)
# Wrap difference into (-180, 180] to handle the ±180 deg boundary.
delta = pa_plus - pa_minus
delta = (delta + 180) % 360 - 180
return np.abs(delta) / 2


class ComputePixelScale(LocalWcs):
"""Compute the local pixel scale from the stored CDMatrix.
Expand Down Expand Up @@ -1554,6 +1595,53 @@ def _func(self, df):
)


class ConvertDetectorAngleErrToPositionAngleErr(LocalWcs):
"""Compute a position angle error from a detector angle error and the
stored CDMatrix.

Returns
-------
position angle error : degrees
"""

name = "PositionAngleErr"

def __init__(
self,
theta_col,
theta_err_col,
colCD_1_1,
colCD_1_2,
colCD_2_1,
colCD_2_2,
**kwargs
):
self.theta_col = theta_col
self.theta_err_col = theta_err_col
super().__init__(colCD_1_1, colCD_1_2, colCD_2_1, colCD_2_2, **kwargs)

@property
def columns(self):
return [
self.theta_col,
self.theta_err_col,
self.colCD_1_1,
self.colCD_1_2,
self.colCD_2_1,
self.colCD_2_2
]

def _func(self, df):
return self.getPositionAngleErrFromDetectorAngleErr(
df[self.theta_col],
df[self.theta_err_col],
df[self.colCD_1_1],
df[self.colCD_1_2],
df[self.colCD_2_1],
df[self.colCD_2_2]
)


class ReferenceBand(Functor):
"""Return the band used to seed multiband forced photometry.

Expand Down Expand Up @@ -1992,14 +2080,7 @@ def _func(self, df):


class MomentsBase(Functor):
"""Base class for functors that use shape moments and localWCS

Attributes
----------
is_covariance : bool
Whether the shape columns are terms of a covariance matrix. If False,
they will be assumed to be terms of a correlation matrix instead.
"""
"""Base class for functors that use shape moments and localWCS"""

is_covariance: bool = True

Expand Down
200 changes: 198 additions & 2 deletions tests/test_functors.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@
MomentsG1Sky, MomentsG2Sky, MomentsTraceSky,
CorrelationIuuSky, CorrelationIvvSky, CorrelationIuvSky,
SemimajorAxisFromCorrelation, SemiminorAxisFromCorrelation,
PositionAngleFromCorrelation
)
PositionAngleFromCorrelation,
ConvertDetectorAngleErrToPositionAngleErr)

ROOT = os.path.abspath(os.path.dirname(__file__))

Expand Down Expand Up @@ -860,6 +860,202 @@ def testConvertPixelToArcseconds(self):
atol=1e-16,
rtol=1e-16))

def testConvertDetectorAngleErrToPositionAngleErr(self):
"""Test conversion of position angle err in detector degrees to
position angle err on sky.

Requires a similar setup to testConvertDetectorAngleToPositionAngle.
"""
dipoleSep = 10
ixx = 10
testPixelDeltas = np.random.uniform(-100, 100, size=(self.nRecords, 2))

for dec in np.linspace(-80, 80, 10):
for theta in (-35, 0, 90):
for x, y in zip(
np.random.uniform(2 * 1109.99981456774, size=10),
np.random.uniform(2 * 560.018167811613, size=10)):
wcs = self._makeWcs(dec=dec, theta=theta)
cd_matrix = wcs.getCdMatrix()

self.dataDict["dipoleSep"] = np.full(self.nRecords,
dipoleSep)
self.dataDict["ixx"] = np.full(self.nRecords, ixx)
self.dataDict["slot_Centroid_x"] = np.full(self.nRecords,
x)
self.dataDict["slot_Centroid_y"] = np.full(self.nRecords,
y)
self.dataDict["someCentroid_x"] = x + testPixelDeltas[:, 0]
self.dataDict["someCentroid_y"] = y + testPixelDeltas[:, 1]
self.dataDict["orientation"] = np.arctan2(
self.dataDict["someCentroid_y"] - self.dataDict[
"slot_Centroid_y"],
self.dataDict["someCentroid_x"] - self.dataDict[
"slot_Centroid_x"],
)
self.dataDict["orientation_err"] = np.abs(np.arctan2(
self.dataDict["someCentroid_y"] - self.dataDict[
"slot_Centroid_y"],
self.dataDict["someCentroid_x"] - self.dataDict[
"slot_Centroid_x"],
)) * 0.001

self.dataDict["base_LocalWcs_CDMatrix_1_1"] = np.full(
self.nRecords,
cd_matrix[0, 0])
self.dataDict["base_LocalWcs_CDMatrix_1_2"] = np.full(
self.nRecords,
cd_matrix[0, 1])
self.dataDict["base_LocalWcs_CDMatrix_2_1"] = np.full(
self.nRecords,
cd_matrix[1, 0])
self.dataDict["base_LocalWcs_CDMatrix_2_2"] = np.full(
self.nRecords,
cd_matrix[1, 1])
df = self.getMultiIndexDataFrame(self.dataDict)

# Test detector angle err to position angle err conversion
func = ConvertDetectorAngleErrToPositionAngleErr(
"orientation",
"orientation_err",
"base_LocalWcs_CDMatrix_1_1",
"base_LocalWcs_CDMatrix_1_2",
"base_LocalWcs_CDMatrix_2_1",
"base_LocalWcs_CDMatrix_2_2"
)
val = self._funcVal(func, df)

# Errors must be non-negative and finite across all
# WCS configurations, declinations, and pixel positions.
self.assertTrue(np.all(val.to_numpy() >= 0),
"PA errors must be non-negative")
self.assertTrue(np.all(np.isfinite(val.to_numpy())),
"PA errors must be finite")

def testPositionAngleErrPureRotationWcs(self):
"""PA error for a pure scaling WCS is exactly rad2deg(theta_err).

For CD = diag(s, -s) the sky direction of a pixel unit vector at
angle theta is PA(theta) = theta + pi/2 (flat-sky), so
|dPA/dtheta| = 1 everywhere and the propagated error is simply
np.rad2deg(theta_err) regardless of theta. This gives an
independent analytical expectation that does not depend on the
implementation under test.
"""
s = 5e-5 # degrees/pixel
cd11, cd12, cd21, cd22 = s, 0.0, 0.0, -s
local_wcs = LocalWcs("cd11", "cd12", "cd21", "cd22")

thetas = pd.Series([0.0, np.pi / 4, np.pi / 3, -np.pi / 6, 1.2, -0.8])
theta_err = 0.01 # radians
n = len(thetas)
cd11_s = pd.Series(np.full(n, cd11))
cd12_s = pd.Series(np.full(n, cd12))
cd21_s = pd.Series(np.full(n, cd21))
cd22_s = pd.Series(np.full(n, cd22))

pa_err = local_wcs.getPositionAngleErrFromDetectorAngleErr(
thetas, theta_err, cd11_s, cd12_s, cd21_s, cd22_s
)

expected_pa_err_deg = np.rad2deg(theta_err)
np.testing.assert_allclose(
pa_err.to_numpy(), expected_pa_err_deg, rtol=1e-4, atol=0
)

def testPositionAngleErrWrapAround(self):
"""PA error is correct when PA crosses the ±180 deg boundary.

For CD = diag(s, -s) and theta = pi/2, PA(theta) = pi deg exactly
(the wrap boundary). With a small theta_err:
pa_plus = PA(pi/2 + theta_err) wraps to -180 + rad2deg(theta_err)
pa_minus = PA(pi/2 - theta_err) = +180 - rad2deg(theta_err)

Naive subtraction gives ~-360 deg → apparent error ~180 deg (wrong).
The wrap-corrected implementation should return rad2deg(theta_err).
"""
s = 5e-5 # degrees/pixel
cd11, cd12, cd21, cd22 = s, 0.0, 0.0, -s
local_wcs = LocalWcs("cd11", "cd12", "cd21", "cd22")

theta_center = np.pi / 2 # PA(theta_center) = 180 deg → wrap boundary
theta_err = 0.01 # radians

cd_s = lambda v: pd.Series([v]) # noqa: E731
pa_err = local_wcs.getPositionAngleErrFromDetectorAngleErr(
pd.Series([theta_center]),
theta_err,
cd_s(cd11), cd_s(cd12), cd_s(cd21), cd_s(cd22),
)

expected_pa_err_deg = np.rad2deg(theta_err)
# Correct result is ~0.57 deg; naive (unwrapped) result would be ~180 deg.
np.testing.assert_allclose(
pa_err.to_numpy(), expected_pa_err_deg, rtol=1e-4, atol=0
)
self.assertLess(float(pa_err.iloc[0]), 1.0)

def testPositionAngleErrAccuracy(self):
"""Validate PA error accuracy by comparing to a Monte Carlo distribution.

Draw n_samples values of theta from N(theta_center, theta_err), compute
the position angle for each sample using getPositionAngleFromDetectorAngle,
and compare the circular standard deviation of the resulting PA
distribution to the functor output.

Two CD matrices are used to test normal scaling vs shearing:
- Pure scaling (CD = diag(s, -s)): PA(theta) = theta + pi/2, Jacobian = 1.
- Sheared WCS: Jacobian varies with theta, exercising the non-linear path.
"""
rng = np.random.default_rng(42)
n_samples = 2000
theta_err = 0.01
tolerance = 0.05 # allow 5 % relative error

s = 5.5e-5 # degrees/pixel. Do I need to be super accurate?
# Need to test against both?? Probably???
cd_matrices = {
"pure_scaling": np.array([[s, 0.0], [0.0, -s]]),
"sheared": np.array([[s, 0.4 * s], [-0.2 * s, -s]]),
}

for label, cd in cd_matrices.items():
cd11, cd12, cd21, cd22 = cd[0, 0], cd[0, 1], cd[1, 0], cd[1, 1]
local_wcs = LocalWcs("cd11", "cd12", "cd21", "cd22")

for theta_center in [0.0, 0.5, -1.2]:
theta_samples = rng.normal(theta_center, theta_err, size=n_samples)
n = len(theta_samples)

pa_samples_deg = local_wcs.getPositionAngleFromDetectorAngle(
pd.Series(theta_samples),
pd.Series(np.full(n, cd11)),
pd.Series(np.full(n, cd12)),
pd.Series(np.full(n, cd21)),
pd.Series(np.full(n, cd22)),
)
pa_rad = np.deg2rad(pa_samples_deg.to_numpy())
# Make sure circular standard deviation handles the +/-180 deg
# wrap correctly.
R = np.abs(np.mean(np.exp(1j * pa_rad)))
mc_std_deg = np.rad2deg(np.sqrt(-2.0 * np.log(R)))

pa_err = local_wcs.getPositionAngleErrFromDetectorAngleErr(
pd.Series([theta_center]),
theta_err,
pd.Series([cd11]),
pd.Series([cd12]),
pd.Series([cd21]),
pd.Series([cd22]),
)

self.assertAlmostEqual(
float(pa_err.iloc[0]),
mc_std_deg,
delta=mc_std_deg * tolerance,
msg=f"MC vs functor mismatch for {label}, theta_center={theta_center}",
)

def _makeWcs(self, dec=53.1595451514076, theta=0):
"""Create a wcs from real CFHT values, rotated by an optional theta.

Expand Down
Loading