Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-42576: mpSkyEphemerisQuery #227

Merged
merged 2 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions python/lsst/ap/association/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@
from .transformDiaSourceCatalog import *
from .utils import *
from .version import *
from .mpSkyEphemerisQuery import *
20 changes: 13 additions & 7 deletions python/lsst/ap/association/diaPipe.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't do umbrella "response to review" commits. Commits get their value from being logically coherent and individually correct, so review responses should be done either by tweaking the existing commits (edit, squash, or fixup commands in git rebase -i) or, for a genuinely new change, creating new commits that are as well-organized and -documented as the original ones.

In this case, since you had one giant commit to begin with, squashing the changes onto it is probably the least of the available evils. I'll continue the review on the combined diff.

Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,10 @@ class DiaPipelineConnections(
solarSystemObjectTable = connTypes.Input(
doc="Catalog of SolarSolarSystem objects expected to be observable in "
"this detectorVisit.",
name="visitSsObjects",
name="preloaded_SsObjects",
kfindeisen marked this conversation as resolved.
Show resolved Hide resolved
storageClass="DataFrame",
dimensions=("instrument", "visit"),
dimensions=("instrument", "group", "detector"),
minimum=0,
)
diffIm = connTypes.Input(
doc="Difference image on which the DiaSources were detected.",
Expand Down Expand Up @@ -375,6 +376,7 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs):
inputs = butlerQC.get(inputRefs)
inputs["idGenerator"] = self.config.idGenerator.apply(butlerQC.quantum.dataId)
inputs["band"] = butlerQC.quantum.dataId["band"]
inputs["legacySolarSystemTable"] = None
if not self.config.doSolarSystemAssociation:
inputs["solarSystemObjectTable"] = None

Expand All @@ -385,15 +387,16 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs):
@timeMethod
def run(self,
diaSourceTable,
solarSystemObjectTable,
legacySolarSystemTable,
diffIm,
exposure,
template,
preloadedDiaObjects,
preloadedDiaSources,
preloadedDiaForcedSources,
band,
idGenerator):
idGenerator,
solarSystemObjectTable=None):
"""Process DiaSources and DiaObjects.

Load previous DiaObjects and their DiaSource history. Calibrate the
Expand All @@ -405,8 +408,8 @@ def run(self,
----------
diaSourceTable : `pandas.DataFrame`
Newly detected DiaSources.
solarSystemObjectTable : `pandas.DataFrame`
Preloaded Solar System objects expected to be visible in the image.
legacySolarSystemTable : `pandas.DataFrame`
Not used
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the reason for including a "virtual" input that is never used but must be specified. What is this for, and is there a plan for phasing it out?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is to maintain the interface's backwards compatibility -- removing the argument in the place of legacySolarSystemTable would break every place where diaPipe.run is currently being called. I do not believe there is a plan for phasing this out.

Copy link
Member

@kfindeisen kfindeisen Aug 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Except that what you have here is not actually backward-compatible -- a hypothetical user who tries to call run with the old signature, expecting to get solar system associations, will silently get nothing. If you do want backward compatibility, you'd need to convert the old-style input into the new-style input.

Phasing out could be handled by raising a FutureWarning if the argument isn't None (there's no specific procedure for deprecating parameters, so you'd have to do it manually, but see that page for recommended text) and filing a Jira issue to remove it later.

All that said, though, I doubt that run is called anywhere except from runQuantum or unit tests, so I think you could get away with unceremoniously deleting it.

Copy link
Member

@kfindeisen kfindeisen Sep 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After discussing with @Gerenjie and @isullivan separately, we've agreed to move forward as follows:

  • legacySolarSystemTable will be used in place of solarSystemObjectTable when the latter is undefined. That should be enough to give backwards-compatibility whether run is called with positional args or keywords.
  • After merging this PR, we'll file an RFC to deprecate and remove legacySolarSystemTable per the usual process. I doubt there will be any pushback.

diffIm : `lsst.afw.image.ExposureF`
Difference image exposure in which the sources in ``diaSourceCat``
were detected.
Expand Down Expand Up @@ -445,6 +448,9 @@ def run(self,
RuntimeError
Raised if duplicate DiaObjects or duplicate DiaSources are found.
"""
# Accept either legacySolarSystemTable or optional solarSystemObjectTable.
if legacySolarSystemTable is not None and solarSystemObjectTable is None:
solarSystemObjectTable = legacySolarSystemTable
if not preloadedDiaObjects.empty:
diaObjects = self.purgeDiaObjects(diffIm.getBBox(), diffIm.getWcs(), preloadedDiaObjects,
buffer=self.config.imagePixelMargin)
Expand All @@ -453,7 +459,7 @@ def run(self,
# Associate new DiaSources with existing DiaObjects.
assocResults = self.associator.run(diaSourceTable, diaObjects)

if self.config.doSolarSystemAssociation:
if self.config.doSolarSystemAssociation and solarSystemObjectTable is not None:
ssoAssocResult = self.solarSystemAssociator.run(
assocResults.unAssocDiaSources,
solarSystemObjectTable,
Expand Down
223 changes: 223 additions & 0 deletions python/lsst/ap/association/mpSkyEphemerisQuery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
# This file is part of ap_association.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

"""Solar System Object Query to MPSky in place of a internal Rubin solar
system object caching/retrieval code.

Will compute the location for of known SSObjects within a visit-detector combination.
"""
kfindeisen marked this conversation as resolved.
Show resolved Hide resolved

__all__ = ["MPSkyEphemerisQueryConfig", "MPSkyEphemerisQueryTask"]


import os
import pandas as pd
import pyarrow as pa
import requests

from lsst.ap.association.utils import getMidpointFromTimespan
from lsst.geom import SpherePoint
import lsst.pex.config as pexConfig
from lsst.utils.timer import timeMethod

from lsst.pipe.base import connectionTypes, NoWorkFound, PipelineTask, \
PipelineTaskConfig, PipelineTaskConnections, Struct


class MPSkyEphemerisQueryConnections(PipelineTaskConnections,
dimensions=("instrument",
"group", "detector")):

predictedRegionTime = connectionTypes.Input(
doc="The predicted exposure region and time",
name="regionTimeInfo",
storageClass="RegionTimeInfo",
dimensions={"instrument", "group", "detector"},
)

ssObjects = connectionTypes.Output(
doc="MPSky-provided Solar System objects observable in this detector-visit",
name="preloaded_SsObjects",
storageClass="DataFrame",
dimensions=("instrument", "group", "detector"),
)


class MPSkyEphemerisQueryConfig(
PipelineTaskConfig,
pipelineConnections=MPSkyEphemerisQueryConnections):
observerCode = pexConfig.Field(
dtype=str,
doc="IAU Minor Planet Center observer code for queries "
"(default is X05 for Rubin Obs./LSST)",
default='X05'
)
queryBufferRadiusDegrees = pexConfig.Field(
dtype=float,
doc="Buffer radius in degrees added to detector bounding circle for ephemeris "
"cone search. Defaults to 10 deg/day * 30 minutes",
default=0.208
)
mpSkyRequestTimeoutSeconds = pexConfig.Field(
dtype=float,
doc="Time in seconds to wait for mpSky request before failing ",
default=1.0
)


class MPSkyEphemerisQueryTask(PipelineTask):
"""Task to query the MPSky service and retrieve the solar system objects
that are observable within the input visit.
"""
ConfigClass = MPSkyEphemerisQueryConfig
_DefaultName = "mpSkyEphemerisQuery"

@timeMethod
def run(self, predictedRegionTime):
"""Parse the information on the current visit and retrieve the
observable solar system objects from MPSky.

Parameters
----------
predictedRegionTime : `lsst.pipe.base.utils.RegionTimeInfo`
Predicted footprint and timespan of the exposure

Returns
-------
result : `lsst.pipe.base.Struct`
Results struct with components:

- ``ssObjects`` : `pandas.DataFrame`
DataFrame containing Solar System Objects near the detector
footprint as retrieved by MPSky. The columns are as follows:

``Name``
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure there needs to be an empty line before starting the nested list. Does package-docs give you errors?

object name (`str`)
``ra``
RA in decimal degrees (`float`)
``dec``
DEC in decimal degrees (`float`)
``obj_poly``
DO NOT USE until DM-46069 is resolved
``obs_poly``
DO NOT USE until DM-46069 is resolved
"""
# Get detector center and timespan midpoint from predictedRegionTime.
region = predictedRegionTime.region
timespan = predictedRegionTime.timespan
expCenter = SpherePoint(region.getBoundingCircle().getCenter())
kfindeisen marked this conversation as resolved.
Show resolved Hide resolved
expRadius = region.getBoundingCircle().getOpeningAngle().asDegrees()
expMidPointEPOCH = getMidpointFromTimespan(timespan, allowUnbounded=False).utc.mjd

# MPSky service query
mpSkyURL = os.environ.get('MP_SKY_URL', '')
mpSkySsObjects = self._mpSkyConeSearch(expCenter, expMidPointEPOCH,
expRadius + self.config.queryBufferRadiusDegrees, mpSkyURL)

return Struct(
ssObjects=mpSkySsObjects,
)

def read_mp_sky_response(self, response):
"""Extract ephemerides from an MPSky web request

Parameters
----------
response : `requests.Response`
MPSky message

Returns
-------
objID : `np.ndarray`
Designations of nearby objects
ra : `np.ndarray`
Array of object right ascensions
dec : `np.ndarray`
Array of object declinations
object_polynomial : `np.ndarray`, (N,M)
Array of object cartesian position polynomials
observer_polynomial : `np.ndarray`, (N,M)
Array of observer cartesian position polynomials
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding explicit dimensions, at least for the multidimensional arrays (a reader can probably figure out that objID, ra, and dec are N-vectors).

"""
with pa.input_stream(memoryview(response.content)) as stream:
stream.seek(0)
object_polynomial = pa.ipc.read_tensor(stream)
observer_polynomial = pa.ipc.read_tensor(stream)
with pa.ipc.open_stream(stream) as reader:
columns = next(reader)
objID = columns["name"].to_numpy(zero_copy_only=False)
ra = columns["ra"].to_numpy()
dec = columns["dec"].to_numpy()
return objID, ra, dec, object_polynomial, observer_polynomial

def _mpSkyConeSearch(self, expCenter, epochMJD, queryRadius, mpSkyURL):
"""Query MPSky ephemeris service for objects near the expected detector position

Parameters
----------
expCenter : `lsst.geom.SpherePoint`
Center of search cone
epochMJD : `float`
Epoch of cone search, (MJD in UTC).
queryRadius : `float`
Radius of the cone search in degrees.
mpSkyURL : `str`
URL to query for MPSky.

Returns
-------
mpSkySsObjects : `pandas.DataFrame`
DataFrame with Solar System Object information and RA/DEC position
within the visit.
"""
fieldRA = expCenter.getRa().asDegrees()
fieldDec = expCenter.getDec().asDegrees()

params = {
"t": epochMJD,
"ra": fieldRA,
"dec": fieldDec,
"radius": queryRadius
}

try:
response = requests.get(mpSkyURL, params=params, timeout=self.config.mpSkyRequestTimeoutSeconds)
response.raise_for_status()
objID, ra, dec, object_polynomial, observer_polynomial = self.read_mp_sky_response(response)

mpSkySsObjects = pd.DataFrame()
mpSkySsObjects['ObjID'] = objID
mpSkySsObjects['ra'] = ra
mpSkySsObjects['obj_poly'] = object_polynomial
mpSkySsObjects['obs_poly'] = observer_polynomial
mpSkySsObjects['dec'] = dec
mpSkySsObjects['Err(arcsec)'] = 2
mpSkySsObjects['ssObjectId'] = [abs(hash(v)) for v in mpSkySsObjects['ObjID'].values]
nFound = len(mpSkySsObjects)

if nFound == 0:
self.log.info("No Solar System objects found for visit.")
else:
self.log.info("%d Solar System Objects in visit", nFound)
except requests.RequestException as e:
raise NoWorkFound(f"Failed to connect to the remote ephemerides service: {e}") from e

return mpSkySsObjects
6 changes: 4 additions & 2 deletions python/lsst/ap/association/ssoAssociation.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file seems to mix formatting fixes, bug workarounds, and changes actually related to mpSkyEphemerisQuery. It's tricky to fix after the fact, but in the future, please do such background fixes on their own commits.

Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,15 @@ def run(self, diaSourceCatalog, solarSystemObjects, exposure):
# Query the KDtree for DIA nearest neighbors to SSOs. Currently only
# picks the DiaSource with the shortest distance. We can do something
# fancier later.
diaSourceCatalog["ssObjectId"] = 0
for index, ssObject in maskedObjects.iterrows():

ssoVect = self._radec_to_xyz(ssObject["ra"], ssObject["dec"])

# Which DIA Sources fall within r?
dist, idx = tree.query(ssoVect, distance_upper_bound=maxRadius)
if np.isfinite(dist[0]):
nFound += 1
diaSourceCatalog.loc[idx[0], "ssObjectId"] = ssObject["ssObjectId"]
diaSourceCatalog.loc[diaSourceCatalog.index[idx[0]], "ssObjectId"] = ssObject["ssObjectId"]

self.log.info("Successfully associated %d SolarSystemObjects.", nFound)
assocMask = diaSourceCatalog["ssObjectId"] != 0
Expand Down Expand Up @@ -159,6 +159,8 @@ def _maskToCcdRegion(self, solarSystemObjects, exposure, marginArcsec):
maskedSolarSystemObjects : `pandas.DataFrame`
Set of SolarSystemObjects contained within the exposure bounds.
"""
if len(solarSystemObjects) == 0:
return solarSystemObjects
wcs = exposure.getWcs()
padding = min(
int(np.ceil(marginArcsec / wcs.getPixelScale(exposure.getBBox().getCenter()).asArcseconds())),
Expand Down
68 changes: 68 additions & 0 deletions python/lsst/ap/association/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,3 +261,71 @@ def getMidpointFromTimespan(timespan, allowUnbounded=True):
raise ValueError("Cannot compute midpoint: unbounded timespan.") from e
else:
raise ValueError("Cannot compute midpoint: unbounded timespan.") from e


def objID_to_ssObjectID(objID, flags=0):
"""Convert from Minor Planet Center packed provisional object ID to
Rubin ssObjectID.

Parameters
----------
objID : `str`
Minor Planet Center packed provisional designation for a small solar
system object. Must be fewer than eight characters.
flags : `int`, optional
Eight free bits to enable future decoupling between Minor Planet Center
and Rubin. Zero by default, should not be changed unless we need to
move away from a 1:1 mapping with the MPC. Must be within [0, 255].

Returns
-------
ssObjectID : `int`
Rubin ssObjectID

Raises
------
ValueError
Raised if either objID is longer than 7 characters or flags is greater
than 255 or less than 0.
"""
if len(objID) > 7:
raise ValueError(f'objID longer than 7 characters: "{objID}"')
if len(objID) < 7:
raise ValueError(f'objID shorter than 7 characters: "{objID}"')
if flags < 0 or flags > 255:
raise ValueError(f'Flags ({flags}) outside [0, 255].')
if any([ord(c) > 255 for c in objID]):
raise ValueError(f'{[c for c in objID if ord(c) > 255]} not legal objID characters (ascii [1, 255])')

ssObjectID = flags
for character in objID:
ssObjectID <<= 8
ssObjectID += ord(character)
return ssObjectID
Copy link
Member

@kfindeisen kfindeisen Aug 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two functions should have unit tests. Since the behavior is self-contained and the output is deterministic, it's possible to test them much more thoroughly than most of the code in ap_association:

  • ObjIDs of different lengths (including empty string)
  • ObjIDs containing characters for which ord returns a number greater than 255 (I think this causes multiple strings to map to the same ID, and can corrupt the flags if it's the first character)
  • Flags with different values, including -1, 0, 255, and 256.



def ssObjectID_to_objID(ssObjectID):
"""Convert from Rubin ssObjectID to Minor Planet Center packed provisional
object ID.

Parameters
----------
ssObjectID : `int`
Rubin ssObjectID

Returns
-------
objID : `str`
Minor Planet Center packed provisional designation.

flags : `int`
Rubin flags (not yet defined, but usable in case we decouple from MPC).

Raises
------
"""
if ssObjectID < 0 or ssObjectID >= (1 << 64):
raise ValueError(f'ssObjectID ({ssObjectID}) outside [0, 2^64 - 1].')

objID = ''.join([chr((ssObjectID >> (8 * i)) % 256) for i in reversed(range(0, 7))])
return objID, ssObjectID >> (8 * 7) % 256
Loading
Loading