-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DM-42576: mpSkyEphemerisQuery #227
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -67,9 +67,10 @@ class DiaPipelineConnections( | |
solarSystemObjectTable = connTypes.Input( | ||
doc="Catalog of SolarSolarSystem objects expected to be observable in " | ||
"this detectorVisit.", | ||
name="visitSsObjects", | ||
name="preloaded_SsObjects", | ||
kfindeisen marked this conversation as resolved.
Show resolved
Hide resolved
|
||
storageClass="DataFrame", | ||
dimensions=("instrument", "visit"), | ||
dimensions=("instrument", "group", "detector"), | ||
minimum=0, | ||
) | ||
diffIm = connTypes.Input( | ||
doc="Difference image on which the DiaSources were detected.", | ||
|
@@ -375,6 +376,7 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs): | |
inputs = butlerQC.get(inputRefs) | ||
inputs["idGenerator"] = self.config.idGenerator.apply(butlerQC.quantum.dataId) | ||
inputs["band"] = butlerQC.quantum.dataId["band"] | ||
inputs["legacySolarSystemTable"] = None | ||
if not self.config.doSolarSystemAssociation: | ||
inputs["solarSystemObjectTable"] = None | ||
|
||
|
@@ -385,15 +387,16 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs): | |
@timeMethod | ||
def run(self, | ||
diaSourceTable, | ||
solarSystemObjectTable, | ||
legacySolarSystemTable, | ||
diffIm, | ||
exposure, | ||
template, | ||
preloadedDiaObjects, | ||
preloadedDiaSources, | ||
preloadedDiaForcedSources, | ||
band, | ||
idGenerator): | ||
idGenerator, | ||
solarSystemObjectTable=None): | ||
"""Process DiaSources and DiaObjects. | ||
|
||
Load previous DiaObjects and their DiaSource history. Calibrate the | ||
|
@@ -405,8 +408,8 @@ def run(self, | |
---------- | ||
diaSourceTable : `pandas.DataFrame` | ||
Newly detected DiaSources. | ||
solarSystemObjectTable : `pandas.DataFrame` | ||
Preloaded Solar System objects expected to be visible in the image. | ||
legacySolarSystemTable : `pandas.DataFrame` | ||
Not used | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't understand the reason for including a "virtual" input that is never used but must be specified. What is this for, and is there a plan for phasing it out? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The idea is to maintain the interface's backwards compatibility -- removing the argument in the place of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Except that what you have here is not actually backward-compatible -- a hypothetical user who tries to call
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After discussing with @Gerenjie and @isullivan separately, we've agreed to move forward as follows:
|
||
diffIm : `lsst.afw.image.ExposureF` | ||
Difference image exposure in which the sources in ``diaSourceCat`` | ||
were detected. | ||
|
@@ -445,6 +448,9 @@ def run(self, | |
RuntimeError | ||
Raised if duplicate DiaObjects or duplicate DiaSources are found. | ||
""" | ||
# Accept either legacySolarSystemTable or optional solarSystemObjectTable. | ||
if legacySolarSystemTable is not None and solarSystemObjectTable is None: | ||
solarSystemObjectTable = legacySolarSystemTable | ||
if not preloadedDiaObjects.empty: | ||
diaObjects = self.purgeDiaObjects(diffIm.getBBox(), diffIm.getWcs(), preloadedDiaObjects, | ||
buffer=self.config.imagePixelMargin) | ||
|
@@ -453,7 +459,7 @@ def run(self, | |
# Associate new DiaSources with existing DiaObjects. | ||
assocResults = self.associator.run(diaSourceTable, diaObjects) | ||
|
||
if self.config.doSolarSystemAssociation: | ||
if self.config.doSolarSystemAssociation and solarSystemObjectTable is not None: | ||
ssoAssocResult = self.solarSystemAssociator.run( | ||
assocResults.unAssocDiaSources, | ||
solarSystemObjectTable, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,223 @@ | ||
# This file is part of ap_association. | ||
# | ||
# Developed for the LSST Data Management System. | ||
# This product includes software developed by the LSST Project | ||
# (https://www.lsst.org). | ||
# See the COPYRIGHT file at the top-level directory of this distribution | ||
# for details of code ownership. | ||
# | ||
# This program is free software: you can redistribute it and/or modify | ||
# it under the terms of the GNU General Public License as published by | ||
# the Free Software Foundation, either version 3 of the License, or | ||
# (at your option) any later version. | ||
# | ||
# This program is distributed in the hope that it will be useful, | ||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
# GNU General Public License for more details. | ||
# | ||
# You should have received a copy of the GNU General Public License | ||
# along with this program. If not, see <https://www.gnu.org/licenses/>. | ||
|
||
"""Solar System Object Query to MPSky in place of a internal Rubin solar | ||
system object caching/retrieval code. | ||
|
||
Will compute the location for of known SSObjects within a visit-detector combination. | ||
""" | ||
kfindeisen marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
__all__ = ["MPSkyEphemerisQueryConfig", "MPSkyEphemerisQueryTask"] | ||
|
||
|
||
import os | ||
import pandas as pd | ||
import pyarrow as pa | ||
import requests | ||
|
||
from lsst.ap.association.utils import getMidpointFromTimespan | ||
from lsst.geom import SpherePoint | ||
import lsst.pex.config as pexConfig | ||
from lsst.utils.timer import timeMethod | ||
|
||
from lsst.pipe.base import connectionTypes, NoWorkFound, PipelineTask, \ | ||
PipelineTaskConfig, PipelineTaskConnections, Struct | ||
|
||
|
||
class MPSkyEphemerisQueryConnections(PipelineTaskConnections, | ||
dimensions=("instrument", | ||
"group", "detector")): | ||
|
||
predictedRegionTime = connectionTypes.Input( | ||
doc="The predicted exposure region and time", | ||
name="regionTimeInfo", | ||
storageClass="RegionTimeInfo", | ||
dimensions={"instrument", "group", "detector"}, | ||
) | ||
|
||
ssObjects = connectionTypes.Output( | ||
doc="MPSky-provided Solar System objects observable in this detector-visit", | ||
name="preloaded_SsObjects", | ||
storageClass="DataFrame", | ||
dimensions=("instrument", "group", "detector"), | ||
) | ||
|
||
|
||
class MPSkyEphemerisQueryConfig( | ||
PipelineTaskConfig, | ||
pipelineConnections=MPSkyEphemerisQueryConnections): | ||
observerCode = pexConfig.Field( | ||
dtype=str, | ||
doc="IAU Minor Planet Center observer code for queries " | ||
"(default is X05 for Rubin Obs./LSST)", | ||
default='X05' | ||
) | ||
queryBufferRadiusDegrees = pexConfig.Field( | ||
dtype=float, | ||
doc="Buffer radius in degrees added to detector bounding circle for ephemeris " | ||
"cone search. Defaults to 10 deg/day * 30 minutes", | ||
default=0.208 | ||
) | ||
mpSkyRequestTimeoutSeconds = pexConfig.Field( | ||
dtype=float, | ||
doc="Time in seconds to wait for mpSky request before failing ", | ||
default=1.0 | ||
) | ||
|
||
|
||
class MPSkyEphemerisQueryTask(PipelineTask): | ||
"""Task to query the MPSky service and retrieve the solar system objects | ||
that are observable within the input visit. | ||
""" | ||
ConfigClass = MPSkyEphemerisQueryConfig | ||
_DefaultName = "mpSkyEphemerisQuery" | ||
|
||
@timeMethod | ||
def run(self, predictedRegionTime): | ||
"""Parse the information on the current visit and retrieve the | ||
observable solar system objects from MPSky. | ||
|
||
Parameters | ||
---------- | ||
predictedRegionTime : `lsst.pipe.base.utils.RegionTimeInfo` | ||
Predicted footprint and timespan of the exposure | ||
|
||
Returns | ||
------- | ||
result : `lsst.pipe.base.Struct` | ||
Results struct with components: | ||
|
||
- ``ssObjects`` : `pandas.DataFrame` | ||
DataFrame containing Solar System Objects near the detector | ||
footprint as retrieved by MPSky. The columns are as follows: | ||
|
||
``Name`` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm pretty sure there needs to be an empty line before starting the nested list. Does |
||
object name (`str`) | ||
``ra`` | ||
RA in decimal degrees (`float`) | ||
``dec`` | ||
DEC in decimal degrees (`float`) | ||
``obj_poly`` | ||
DO NOT USE until DM-46069 is resolved | ||
``obs_poly`` | ||
DO NOT USE until DM-46069 is resolved | ||
""" | ||
# Get detector center and timespan midpoint from predictedRegionTime. | ||
region = predictedRegionTime.region | ||
timespan = predictedRegionTime.timespan | ||
expCenter = SpherePoint(region.getBoundingCircle().getCenter()) | ||
kfindeisen marked this conversation as resolved.
Show resolved
Hide resolved
|
||
expRadius = region.getBoundingCircle().getOpeningAngle().asDegrees() | ||
expMidPointEPOCH = getMidpointFromTimespan(timespan, allowUnbounded=False).utc.mjd | ||
|
||
# MPSky service query | ||
mpSkyURL = os.environ.get('MP_SKY_URL', '') | ||
mpSkySsObjects = self._mpSkyConeSearch(expCenter, expMidPointEPOCH, | ||
expRadius + self.config.queryBufferRadiusDegrees, mpSkyURL) | ||
|
||
return Struct( | ||
ssObjects=mpSkySsObjects, | ||
) | ||
|
||
def read_mp_sky_response(self, response): | ||
"""Extract ephemerides from an MPSky web request | ||
|
||
Parameters | ||
---------- | ||
response : `requests.Response` | ||
MPSky message | ||
|
||
Returns | ||
------- | ||
objID : `np.ndarray` | ||
Designations of nearby objects | ||
ra : `np.ndarray` | ||
Array of object right ascensions | ||
dec : `np.ndarray` | ||
Array of object declinations | ||
object_polynomial : `np.ndarray`, (N,M) | ||
Array of object cartesian position polynomials | ||
observer_polynomial : `np.ndarray`, (N,M) | ||
Array of observer cartesian position polynomials | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider adding explicit dimensions, at least for the multidimensional arrays (a reader can probably figure out that |
||
""" | ||
with pa.input_stream(memoryview(response.content)) as stream: | ||
stream.seek(0) | ||
object_polynomial = pa.ipc.read_tensor(stream) | ||
observer_polynomial = pa.ipc.read_tensor(stream) | ||
with pa.ipc.open_stream(stream) as reader: | ||
columns = next(reader) | ||
objID = columns["name"].to_numpy(zero_copy_only=False) | ||
ra = columns["ra"].to_numpy() | ||
dec = columns["dec"].to_numpy() | ||
return objID, ra, dec, object_polynomial, observer_polynomial | ||
|
||
def _mpSkyConeSearch(self, expCenter, epochMJD, queryRadius, mpSkyURL): | ||
"""Query MPSky ephemeris service for objects near the expected detector position | ||
|
||
Parameters | ||
---------- | ||
expCenter : `lsst.geom.SpherePoint` | ||
Center of search cone | ||
epochMJD : `float` | ||
Epoch of cone search, (MJD in UTC). | ||
queryRadius : `float` | ||
Radius of the cone search in degrees. | ||
mpSkyURL : `str` | ||
URL to query for MPSky. | ||
|
||
Returns | ||
------- | ||
mpSkySsObjects : `pandas.DataFrame` | ||
DataFrame with Solar System Object information and RA/DEC position | ||
within the visit. | ||
""" | ||
fieldRA = expCenter.getRa().asDegrees() | ||
fieldDec = expCenter.getDec().asDegrees() | ||
|
||
params = { | ||
"t": epochMJD, | ||
"ra": fieldRA, | ||
"dec": fieldDec, | ||
"radius": queryRadius | ||
} | ||
|
||
try: | ||
response = requests.get(mpSkyURL, params=params, timeout=self.config.mpSkyRequestTimeoutSeconds) | ||
response.raise_for_status() | ||
objID, ra, dec, object_polynomial, observer_polynomial = self.read_mp_sky_response(response) | ||
|
||
mpSkySsObjects = pd.DataFrame() | ||
mpSkySsObjects['ObjID'] = objID | ||
mpSkySsObjects['ra'] = ra | ||
mpSkySsObjects['obj_poly'] = object_polynomial | ||
mpSkySsObjects['obs_poly'] = observer_polynomial | ||
mpSkySsObjects['dec'] = dec | ||
mpSkySsObjects['Err(arcsec)'] = 2 | ||
mpSkySsObjects['ssObjectId'] = [abs(hash(v)) for v in mpSkySsObjects['ObjID'].values] | ||
nFound = len(mpSkySsObjects) | ||
|
||
if nFound == 0: | ||
self.log.info("No Solar System objects found for visit.") | ||
else: | ||
self.log.info("%d Solar System Objects in visit", nFound) | ||
except requests.RequestException as e: | ||
raise NoWorkFound(f"Failed to connect to the remote ephemerides service: {e}") from e | ||
|
||
return mpSkySsObjects |
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This file seems to mix formatting fixes, bug workarounds, and changes actually related to mpSkyEphemerisQuery. It's tricky to fix after the fact, but in the future, please do such background fixes on their own commits. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -261,3 +261,71 @@ def getMidpointFromTimespan(timespan, allowUnbounded=True): | |
raise ValueError("Cannot compute midpoint: unbounded timespan.") from e | ||
else: | ||
raise ValueError("Cannot compute midpoint: unbounded timespan.") from e | ||
|
||
|
||
def objID_to_ssObjectID(objID, flags=0): | ||
"""Convert from Minor Planet Center packed provisional object ID to | ||
Rubin ssObjectID. | ||
|
||
Parameters | ||
---------- | ||
objID : `str` | ||
Minor Planet Center packed provisional designation for a small solar | ||
system object. Must be fewer than eight characters. | ||
flags : `int`, optional | ||
Eight free bits to enable future decoupling between Minor Planet Center | ||
and Rubin. Zero by default, should not be changed unless we need to | ||
move away from a 1:1 mapping with the MPC. Must be within [0, 255]. | ||
|
||
Returns | ||
------- | ||
ssObjectID : `int` | ||
Rubin ssObjectID | ||
|
||
Raises | ||
------ | ||
ValueError | ||
Raised if either objID is longer than 7 characters or flags is greater | ||
than 255 or less than 0. | ||
""" | ||
if len(objID) > 7: | ||
raise ValueError(f'objID longer than 7 characters: "{objID}"') | ||
if len(objID) < 7: | ||
raise ValueError(f'objID shorter than 7 characters: "{objID}"') | ||
if flags < 0 or flags > 255: | ||
raise ValueError(f'Flags ({flags}) outside [0, 255].') | ||
if any([ord(c) > 255 for c in objID]): | ||
raise ValueError(f'{[c for c in objID if ord(c) > 255]} not legal objID characters (ascii [1, 255])') | ||
|
||
ssObjectID = flags | ||
for character in objID: | ||
ssObjectID <<= 8 | ||
ssObjectID += ord(character) | ||
return ssObjectID | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These two functions should have unit tests. Since the behavior is self-contained and the output is deterministic, it's possible to test them much more thoroughly than most of the code in
|
||
|
||
|
||
def ssObjectID_to_objID(ssObjectID): | ||
"""Convert from Rubin ssObjectID to Minor Planet Center packed provisional | ||
object ID. | ||
|
||
Parameters | ||
---------- | ||
ssObjectID : `int` | ||
Rubin ssObjectID | ||
|
||
Returns | ||
------- | ||
objID : `str` | ||
Minor Planet Center packed provisional designation. | ||
|
||
flags : `int` | ||
Rubin flags (not yet defined, but usable in case we decouple from MPC). | ||
|
||
Raises | ||
------ | ||
""" | ||
if ssObjectID < 0 or ssObjectID >= (1 << 64): | ||
raise ValueError(f'ssObjectID ({ssObjectID}) outside [0, 2^64 - 1].') | ||
|
||
objID = ''.join([chr((ssObjectID >> (8 * i)) % 256) for i in reversed(range(0, 7))]) | ||
return objID, ssObjectID >> (8 * 7) % 256 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please don't do umbrella "response to review" commits. Commits get their value from being logically coherent and individually correct, so review responses should be done either by tweaking the existing commits (
edit
,squash
, orfixup
commands ingit rebase -i
) or, for a genuinely new change, creating new commits that are as well-organized and -documented as the original ones.In this case, since you had one giant commit to begin with, squashing the changes onto it is probably the least of the available evils. I'll continue the review on the combined diff.