Skip to content

Commit

Permalink
Responses to Krzysztof's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Gerenjie committed Aug 28, 2024
1 parent 5d589c3 commit 1e5823b
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 41 deletions.
15 changes: 9 additions & 6 deletions python/lsst/ap/association/diaPipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,10 @@ class DiaPipelineConnections(
solarSystemObjectTable = connTypes.Input(
doc="Catalog of SolarSolarSystem objects expected to be observable in "
"this detectorVisit.",
name="expected_ssObjects",
name="preloaded_SsObjects",
storageClass="DataFrame",
dimensions=("instrument", "group", "detector"),
minimum=0,
)
diffIm = connTypes.Input(
doc="Difference image on which the DiaSources were detected.",
Expand Down Expand Up @@ -375,6 +376,7 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs):
inputs = butlerQC.get(inputRefs)
inputs["idGenerator"] = self.config.idGenerator.apply(butlerQC.quantum.dataId)
inputs["band"] = butlerQC.quantum.dataId["band"]
inputs["legacySolarSystemTable"] = None
if not self.config.doSolarSystemAssociation:
inputs["solarSystemObjectTable"] = None

Expand All @@ -385,15 +387,16 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs):
@timeMethod
def run(self,
diaSourceTable,
solarSystemObjectTable,
legacySolarSystemTable,
diffIm,
exposure,
template,
preloadedDiaObjects,
preloadedDiaSources,
preloadedDiaForcedSources,
band,
idGenerator):
idGenerator,
solarSystemObjectTable=None):
"""Process DiaSources and DiaObjects.
Load previous DiaObjects and their DiaSource history. Calibrate the
Expand All @@ -405,8 +408,8 @@ def run(self,
----------
diaSourceTable : `pandas.DataFrame`
Newly detected DiaSources.
solarSystemObjectTable : `pandas.DataFrame`
Preloaded Solar System objects expected to be visible in the image.
legacySolarSystemTable : `pandas.DataFrame`
Not used
diffIm : `lsst.afw.image.ExposureF`
Difference image exposure in which the sources in ``diaSourceCat``
were detected.
Expand Down Expand Up @@ -453,7 +456,7 @@ def run(self,
# Associate new DiaSources with existing DiaObjects.
assocResults = self.associator.run(diaSourceTable, diaObjects)

if self.config.doSolarSystemAssociation:
if self.config.doSolarSystemAssociation and solarSystemObjectTable is not None:
ssoAssocResult = self.solarSystemAssociator.run(
assocResults.unAssocDiaSources,
solarSystemObjectTable,
Expand Down
65 changes: 33 additions & 32 deletions python/lsst/ap/association/mpSkyEphemerisQuery.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,18 @@


import pandas as pd
import numpy as np
import pyarrow as pa
import requests
import sys
import os

import lsst.pex.config as pexConfig
from lsst.utils.timer import timeMethod
from lsst.ap.association.utils import getMidpointFromTimespan
from lsst.geom import SpherePoint

from lsst.pipe.base import PipelineTask, PipelineTaskConfig, \
PipelineTaskConnections, connectionTypes, Struct
PipelineTaskConnections, connectionTypes, Struct, NoWorkFound


class MPSkyEphemerisQueryConnections(PipelineTaskConnections,
dimensions=("instrument",
Expand All @@ -54,7 +55,7 @@ class MPSkyEphemerisQueryConnections(PipelineTaskConnections,

ssObjects = connectionTypes.Output(
doc="MPSky-provided Solar System objects observable in this detector-visit",
name="expected_ssObjects",
name="preloaded_SsObjects",
storageClass="DataFrame",
dimensions=("instrument", "group", "detector"),
)
Expand All @@ -69,16 +70,16 @@ class MPSkyEphemerisQueryConfig(
"(Rubin Obs./LSST default is X05)",
default='X05'
)
queryRadiusDegrees = pexConfig.Field(
queryBufferRadiusDegrees = pexConfig.Field(
dtype=float,
doc="On sky radius for Ephemeris cone search. Defaults "
"to the radius of Rubin Obs FoV in degrees",
default=1.75
doc="Buffer radius added to camera bounding circle for Ephemeris "
"cone search. Defaults to 10 deg/day * 30 minutes",
default=0.208
)
mpSkyURL = pexConfig.Field(
dtype=str,
doc="URL to query mpsky service",
default="http://sdfrome001.sdf.slac.stanford.edu:3666"
mpSkyRequestTimeoutSeconds = pexConfig.Field(
dtype=float,
doc="Time to wait for mpSky request before failing ",
default=1.0
)


Expand All @@ -89,11 +90,6 @@ class MPSkyEphemerisQueryTask(PipelineTask):
ConfigClass = MPSkyEphemerisQueryConfig
_DefaultName = "MPSkyEphemerisQuery"

#def runQuantum(self, butlerQC, inputRefs, outputRefs):
#inputs = butlerQC.get(inputRefs)
#outputs = self.run(**inputs)
#butlerQC.put(outputs, outputRefs)

@timeMethod
def run(self, predictedRegionTime):
"""Parse the information on the current visit and retrieve the
Expand Down Expand Up @@ -128,16 +124,19 @@ def run(self, predictedRegionTime):
region = predictedRegionTime.region
timespan = predictedRegionTime.timespan
expCenter = SpherePoint(region.getBoundingCircle().getCenter())
expMidPointEPOCH = (timespan.begin.mjd + timespan.end.mjd)/2
expRadius = region.getBoundingCircle().getOpeningAngle().asDegrees()
expMidPointEPOCH = getMidpointFromTimespan(timespan, allowUnbounded=False).utc.mjd

# MPSky service query
mpSkySsObjects = self._mpSkyConeSearch(expCenter, expMidPointEPOCH, self.config.queryRadiusDegrees)
mpSkyURL = os.environ.get('MP_SKY_URL', '')
mpSkySsObjects = self._mpSkyConeSearch(expCenter, expMidPointEPOCH,
expRadius + self.config.queryBufferRadiusDegrees, mpSkyURL)

return Struct(
ssObjects=mpSkySsObjects,
)

def _mpSkyConeSearch(self, expCenter, epochMJD, queryRadius):
def _mpSkyConeSearch(self, expCenter, epochMJD, queryRadius, mpSkyURL):
"""Query MPSky ephemeris service for objects near the expected detector position
Parameters
Expand All @@ -148,6 +147,8 @@ def _mpSkyConeSearch(self, expCenter, epochMJD, queryRadius):
Epoch of cone search, (MJD in UTC).
queryRadius : `float`
Radius of the cone search in degrees.
mpSkyURL : `str`
URL to query for MPSky.
Returns
-------
Expand All @@ -167,25 +168,24 @@ def _mpSkyConeSearch(self, expCenter, epochMJD, queryRadius):
}

try:
response = requests.get(self.config.mpSkyURL, params=params)
response = requests.get(mpSkyURL, params=params, timeout=self.config.mpSkyRequestTimeoutSeconds)
response.raise_for_status()
with pa.input_stream(memoryview(response.content)) as fp:
fp.seek(0)
p = pa.ipc.read_tensor(fp)
op = pa.ipc.read_tensor(fp)
object_polynomial = pa.ipc.read_tensor(fp)
observer_polynomial = pa.ipc.read_tensor(fp)
with pa.ipc.open_stream(fp) as reader:
r = next(reader)
row = next(reader)

ObjID = r["name"].to_numpy(zero_copy_only=False)
ra = r["ra"].to_numpy()
dec = r["dec"].to_numpy()
ObjID = row["name"].to_numpy(zero_copy_only=False)
ra = row["ra"].to_numpy()
dec = row["dec"].to_numpy()

mpSkySsObjects = pd.DataFrame()
mpSkySsObjects['ObjID'] = ObjID
mpSkySsObjects['ra'] = ra
# TODO: accept MPSky polynomials when they are well-formed.
mpSkySsObjects['obs_poly'] = list(np.zeros((len(mpSkySsObjects), 5)))
mpSkySsObjects['obj_poly'] = list(np.zeros((len(mpSkySsObjects), 5)))
mpSkySsObjects['obj_poly'] = object_polynomial
mpSkySsObjects['obs_poly'] = observer_polynomial
mpSkySsObjects['dec'] = dec
mpSkySsObjects['Err(arcsec)'] = 2
mpSkySsObjects['ssObjectId'] = [abs(hash(v)) for v in mpSkySsObjects['ObjID'].values]
Expand All @@ -195,10 +195,11 @@ def _mpSkyConeSearch(self, expCenter, epochMJD, queryRadius):
self.log.info("No Solar System objects found for visit.")
else:
self.log.info("%d Solar System Objects in visit", nFound)
except requests.exceptions.ConnectionError as e:
self.log.exception("Failed to connect to the remote ephemerides service.")
except Exception:
self.log.info("Failed to connect to the remote ephemerides service.")
mpSkySsObjects = pd.DataFrame(
columns=['ObjID', 'ra', 'dec', 'obj_poly', 'obs_poly',
'Err(arcsec)', 'ssObjectId'])
raise NoWorkFound

return mpSkySsObjects
3 changes: 2 additions & 1 deletion python/lsst/ap/association/ssoAssociation.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ def run(self, diaSourceCatalog, solarSystemObjects, exposure):
maxRadius = np.deg2rad(self.config.maxDistArcSeconds / 3600)

# Transform DIA RADEC coordinates to unit sphere xyz for tree building.
diaSourceCatalog = diaSourceCatalog[np.isfinite(diaSourceCatalog["ra"]).values]
vectors = self._radec_to_xyz(diaSourceCatalog["ra"],
diaSourceCatalog["dec"])

Expand Down Expand Up @@ -161,6 +160,8 @@ def _maskToCcdRegion(self, solarSystemObjects, exposure, marginArcsec):
maskedSolarSystemObjects : `pandas.DataFrame`
Set of SolarSystemObjects contained within the exposure bounds.
"""
if len(solarSystemObjects) == 0:
return solarSystemObjects
wcs = exposure.getWcs()
padding = min(
int(np.ceil(marginArcsec / wcs.getPixelScale(exposure.getBBox().getCenter()).asArcseconds())),
Expand Down
68 changes: 68 additions & 0 deletions python/lsst/ap/association/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,3 +261,71 @@ def getMidpointFromTimespan(timespan, allowUnbounded=True):
raise ValueError("Cannot compute midpoint: unbounded timespan.") from e
else:
raise ValueError("Cannot compute midpoint: unbounded timespan.") from e


def ObjID_to_ssObjectID(ObjID, flags=0):
"""Convert from Minor Planet Center packed provisional object IDs and
Rubin ssObjectID.
Parameters
----------
ObjID : `str`
Minor Planet Center packed provisional designation for a small solar
system object. Must be fewer than eight characters.
flags : `int`
Eight free bits to enable future decoupling between Minor Planet Center
and Rubin. Zero by default, should not be changed unless we need to
move away from a 1:1 mapping with the MPC. Must be within [0, 255].
Returns
-------
ssObjectID : `int`
Rubin ssObjectID
Raises
------
ValueError
Raised if either ObjID is longer than 7 characters or flags is greater
than 255 or less than 0.
"""
if len(ObjID) > 7:
raise ValueError(f'ObjID longer than 7 characters: "{ObjID}"')
if flags < 0 or flags > 255:
raise ValueError(f'Flags ({flags}) outside [0, 255].')

ssObjectID = flags
for character in ObjID:
ssObjectID *= 256
ssObjectID += ord(character)
ssObjectID *= 256 ** (7 - len(ObjID))
return ssObjectID


def ssObjectID_to_ObjID(ssObjectID):
"""Convert from Minor Planet Center packed provisional object IDs and
Rubin ssObjectID.
Parameters
----------
ssObjectID : `int`
Rubin ssObjectID
Returns
-------
ObjID : `str`
Minor Planet Center packed provisional designation for a small solar
system object. Must be fewer than eight characters.
flags : `int`
Eight free bits to enable future decoupling between Minor Planet Center
and Rubin. Zero by default, should not be changed unless we need to
move away from a 1:1 mapping with the MPC. Must be within [0, 255].
Raises
------
"""
ObjID = ''.join([chr(ssObjectID // (256 ** i) % 256) for i in reversed(range(0, 7))])
ObjID = ObjID.replace('\x00', '')
return ObjID, ssObjectID//(256 ** 7)%256
5 changes: 3 additions & 2 deletions tests/test_diaPipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,15 +203,16 @@ def associator_run(table, diaObjects):
side_effect=solarSystemAssociator_run) as ssRun:

result = task.run(diaSrc,
ssObjects,
None,
diffIm,
exposure,
template,
self.diaObjects,
self.diaSources,
self.diaForcedSources,
"g",
IdGenerator())
IdGenerator(),
solarSystemObjectTable=ssObjects)
for subtaskName in subtasksToMock:
getattr(task, subtaskName).run.assert_called_once()
assertValidOutput(task, result)
Expand Down

0 comments on commit 1e5823b

Please sign in to comment.