Initial Release
This project is an InterSystems IRIS Interoperability On Python, or IoP, application for the Gaia DR3 epoch photometry benchmark described in https://github.com/grongierisc/iris-gaia-dr3/blob/master/GOAL.md.
It processes the first 20 Gaia DR3 epoch photometry archives, computes BP/RP flux variation by source_id, and writes the qualifying results to output/results.csv with a header row.
The benchmark asks for every astronomical object whose BP or RP flux changed by more than 100 percent over the observation period.
For each source_id, the application:
EpochPhotometry_000000-003111.csv.gz through EpochPhotometry_020985-021233.csv.gz.bp_flux and rp_flux array fields.NaN, infinities, and malformed values.((max_flux - min_flux) / min_flux) * 100.percentage_change > 100.Output columns:
source_id,bp_min_flux,bp_max_flux,rp_min_flux,rp_max_flux,percentage_change
The CSV starts with that header row, followed by one row per qualifying source_id.
Requirements:
Run the benchmark:
./RunChallenge.sh
RunChallenge.sh rebuilds and recreates the IRIS container, starts the IoP production, waits for completion, and prints output/results.csv to stdout.
The generated files are:
output/downloads/*.csv.gz: downloaded Gaia source filesoutput/results.csv: final benchmark outputoutput/results.done: success markeroutput/results.err: failure marker, if the production failsCreate and use a virtual environment if one is not already present:
python -m venv .venv
. .venv/bin/activate
pip install -r requirements.txt
Run tests:
.venv/bin/python -m pytest
Validate the IoP production without writing to IRIS:
.venv/bin/iop --migrate https://github.com/grongierisc/iris-gaia-dr3/blob/master/settings.py --dry-run
Run the full Docker workflow:
docker compose up --build
The application is an IoP production declared in https://github.com/grongierisc/iris-gaia-dr3/blob/master/settings.py and https://github.com/grongierisc/iris-gaia-dr3/blob/master/gaia/production.py.
Business roles:
GaiaBenchmarkService: polling entrypoint. It starts one benchmark run when neither results.done nor results.err exists.GaiaBenchmarkProcess: orchestrates the workflow and writes success/failure markers.GaiaDownloadOperation: downloads or reuses one local gzip file per request.GaiaDbOperation: owns the IRIS DBAPI connection lifecycle and routes DB work by message type:IoP Mermaid graph generated from the production object:
flowchart LR
%% Production: GaiaDR3.Production
subgraph group_service["Services"]
direction TB
node_GaiaBenchmarkService["GaiaBenchmarkService
Python.gaia.services.GaiaBenchmarkService"]
end
subgraph group_process["Processes"]
direction TB
node_GaiaBenchmarkProcess["GaiaBenchmarkProcess
Python.gaia.processes.GaiaBenchmarkProcess"]
end
subgraph group_operation["Operations"]
direction TB
node_GaiaDownloadOperation["GaiaDownloadOperation
Python.gaia.operations.GaiaDownloadOperation"]
node_GaiaDbOperation["GaiaDbOperation
Python.gaia.operations.GaiaDbOperation"]
end
node_GaiaBenchmarkService ~~~ node_GaiaBenchmarkProcess
node_GaiaBenchmarkProcess -- "ComputeOperation" --> node_GaiaDbOperation
node_GaiaBenchmarkProcess -- "DownloadOperation" --> node_GaiaDownloadOperation
node_GaiaBenchmarkProcess -- "ExportOperation" --> node_GaiaDbOperation
node_GaiaBenchmarkProcess -- "ImportOperation" --> node_GaiaDbOperation
node_GaiaBenchmarkProcess -- "PrepareOperation" --> node_GaiaDbOperation
node_GaiaBenchmarkService -- "Output" --> node_GaiaBenchmarkProcess
These screenshots show the same production running in the IRIS interoperability UI.

The production screenshot comes from the graph declared in https://github.com/grongierisc/iris-gaia-dr3/blob/master/gaia/production.py.
The code creates one polling service, one orchestrating process, one download operation, and one DB operation.
Several process targets are intentionally routed to the same DB operation so one component owns DBAPI lifecycle.
prod = Production("GaiaDR3.Production", testing_enabled=True, actor_pool_size=actor_pool_size)service = prod.service("GaiaBenchmarkService", GaiaBenchmarkService, settings=gaia_settings) process = prod.process("GaiaBenchmarkProcess", GaiaBenchmarkProcess, settings=gaia_settings) download = prod.operation( "GaiaDownloadOperation", GaiaDownloadOperation, settings=gaia_settings, pool_size=download_pool_size, ) db = prod.operation( "GaiaDbOperation", GaiaDbOperation, settings=gaia_settings, pool_size=import_pool_size, )
service.connect(GaiaBenchmarkService.Output, process) process.connect(GaiaBenchmarkProcess.DownloadOperation, download) process.connect(GaiaBenchmarkProcess.PrepareOperation, db) process.connect(GaiaBenchmarkProcess.ImportOperation, db) process.connect(GaiaBenchmarkProcess.ComputeOperation, db) process.connect(GaiaBenchmarkProcess.ExportOperation, db)

The settings screenshot shows the Gaia settings exposed on the IoP components.
The typed settings are declared once in https://github.com/grongierisc/iris-gaia-dr3/blob/master/gaia/runtime.py, then https://github.com/grongierisc/iris-gaia-dr3/blob/master/settings.py supplies environment-backed defaults.
class GaiaSettings:
ArchiveUrlTemplate: Annotated[str, Setting(data_type=str, required=True, category="Gaia")]
FileBoundaries: Annotated[str, Setting(data_type=str, required=True, category="Gaia")]
OutputDir: Annotated[str, Setting(data_type=str, required=True, category="Gaia", control=controls.directory())]
RequestTimeoutSeconds: Annotated[int, Setting(data_type=int, required=True, category="Gaia")]
HttpTimeoutSeconds: Annotated[int, Setting(data_type=int, required=True, category="Gaia")]
DbBatchSize: Annotated[int, Setting(data_type=int, required=True, category="Gaia")]
GAIA_SETTINGS = {
"ArchiveUrlTemplate": ARCHIVE_URL_TEMPLATE,
"FileBoundaries": ",".join(FIRST_20_FILE_BOUNDARIES),
"OutputDir": os.getenv("GAIA_OUTPUT_DIR", "/irisdev/app/output"),
"RequestTimeoutSeconds": int(os.getenv("GAIA_REQUEST_TIMEOUT_SECONDS", "1800")),
"HttpTimeoutSeconds": int(os.getenv("GAIA_HTTP_TIMEOUT_SECONDS", "120")),
"DbBatchSize": int(os.getenv("GAIA_DB_BATCH_SIZE", "10000")),
}

The trace screenshot shows the two fan-out phases in https://github.com/grongierisc/iris-gaia-dr3/blob/master/gaia/processes.py.
Prepare, compute, and export are synchronous because they are single sequential steps.
Download and import use send_request_async_ng so the process can log each completed file as soon as its response returns.
downloads = asyncio.run(
self._send_parallel(
self.DownloadOperation,
[
DownloadFileRequest(request.run_name, file_range, self.archive_url(file_range))
for file_range in self.file_ranges
],
DownloadFileResult,
"Download Gaia DR3 files",
)
)
async def _send_parallel(self, target, requests, expected, description: str): tasks = [ asyncio.create_task( self.send_request_async_ng( target, request, timeout=self.request_timeout, description=f"{description}: {request.file_range}", ) ) for request in requests ]results = [] for done in asyncio.as_completed(tasks): result = self._expect(await done, expected, target) results.append(result) self.log_info(f"{description}: {len(results)}/{len(tasks)} {result.file_range}") return results
GaiaBenchmarkService polls once per second.output/results.lock and sends a GaiaBenchmarkRequest to the process.GaiaBenchmarkProcess sends PrepareRunRequest to GaiaDbOperation to clear prior persistent rows and output markers.DownloadFileRequest messages to GaiaDownloadOperation with send_request_async_ng.output/downloads/. Existing readable gzip files are reused.ImportFileRequest messages to GaiaDbOperation with send_request_async_ng.executemany.GaiaDbOperation handles ComputeRequest with SQL to combine all per-file aggregates by source_id, calculate percentage change, and persist only results over 100 percent.GaiaDbOperation handles ExportCsvRequest by selecting final rows ordered by source_id; gaia.exporting writes them to output/results.csv.output/results.done.If any step fails, the process writes output/results.err, then re-raises the failure.
The application uses iris-persistence for class-backed persistent tables:
GaiaDR3.SourceFluxAggregate: per-run, per-file, per-source BP/RP min/max aggregatesGaiaDR3.PhotometryChange: final per-source qualifying resultsPersistent indexes:
| Table | Index | Properties | Purpose |
|---|---|---|---|
GaiaDR3.SourceFluxAggregate |
SourceAggregateRunIdx |
run_name |
Delete or scan one benchmark run during preparation and compute. |
GaiaDR3.SourceFluxAggregate |
SourceAggregateSourceIdx |
run_name,source_id |
Group and aggregate per-source rows for a run. |
GaiaDR3.PhotometryChange |
PhotometryChangeRunIdx |
run_name |
Delete or scan final results for one run. |
GaiaDR3.PhotometryChange |
PhotometryChangeSourceIdx |
run_name,source_id |
Read final results ordered by source for export and inspection. |
The application intentionally does not persist every raw observation value. The benchmark output only requires per-source min/max and percentage change, so persisting aggregate rows keeps IRIS storage smaller while preserving the data needed for final SQL analytics.
Downloaded files are persisted on disk under output/downloads/; the presence of a readable gzip file is the download handoff state.
The compute step uses two persistent tables:
GaiaDR3.SourceFluxAggregateGaiaDR3.PhotometryChangeSourceFluxAggregate is the input to compute. Each row represents one imported source aggregate from one downloaded file:
run_name,file_range,source_id,bp_min_flux,bp_max_flux,rp_min_flux,rp_max_flux
PhotometryChange is the materialized final result table. It stores only rows that satisfy the benchmark rule:
run_name,source_id,bp_min_flux,bp_max_flux,rp_min_flux,rp_max_flux,percentage_change
The compute route in GaiaDbOperation inserts into PhotometryChange with one set-oriented SQL statement:
run_name.SourceFluxAggregate rows for that run_name.source_id.percentage_change.percentage_change > 100.The core grouping is:
SELECT
source_id,
MIN(bp_min_flux) AS bp_min_flux,
MAX(bp_max_flux) AS bp_max_flux,
MIN(rp_min_flux) AS rp_min_flux,
MAX(rp_max_flux) AS rp_max_flux
FROM GaiaDR3.SourceFluxAggregate
WHERE run_name = ?
GROUP BY source_id
Then the compute query applies the benchmark formula:
CASE
WHEN bp_min_flux IS NULL OR bp_min_flux = 0
THEN NULL
ELSE ((bp_max_flux - bp_min_flux) / bp_min_flux) * 100
END AS bp_change
The same logic is applied to RP. The final percentage_change is the larger non-null value between bp_change and rp_change.
Example with one source appearing in multiple files:
| file_range | source_id | bp_min_flux | bp_max_flux | rp_min_flux | rp_max_flux |
|---|---|---|---|---|---|
000000-003111 |
101 |
10 |
20 |
5 |
8 |
003112-005263 |
101 |
7 |
30 |
4 |
9 |
After SQL grouping:
| source_id | bp_min_flux | bp_max_flux | rp_min_flux | rp_max_flux |
|---|---|---|---|---|
101 |
7 |
30 |
4 |
9 |
Percentage changes:
BP = ((30 - 7) / 7) * 100 = 328.5714
RP = ((9 - 4) / 4) * 100 = 125
percentage_change = 328.5714
That row is inserted into PhotometryChange because 328.5714 > 100.
Example where a zero minimum is ignored for one band:
| source_id | bp_min_flux | bp_max_flux | rp_min_flux | rp_max_flux |
|---|---|---|---|---|
202 |
0 |
10 |
4 |
12 |
BP change is NULL because the minimum is zero. RP change is 200, so the final percentage_change is 200.
On the current benchmark dataset, the first 20 Gaia files produced 75064 SourceFluxAggregate rows and 75064 distinct source_id values, so there were no duplicate source_id values across files after import. The SQL still groups by source_id because it is the correct benchmark operation and keeps the production robust if a future dataset contains the same source in multiple files.
SQL is a good fit here because the data is already in IRIS:
run_name,source_id indexes to target one run and organize per-source aggregation.PhotometryChange, making export and inspection simple.Runtime configuration is in https://github.com/grongierisc/iris-gaia-dr3/blob/master/settings.py and is passed to every IoP component as production settings.
Environment overrides:
| Variable | Default | Purpose |
|---|---|---|
GAIA_OUTPUT_DIR |
/irisdev/app/output |
Output directory inside the container |
GAIA_REQUEST_TIMEOUT_SECONDS |
1800 |
Sync request timeout for long-running workflow calls |
GAIA_HTTP_TIMEOUT_SECONDS |
120 |
Per-download HTTP timeout |
GAIA_DB_BATCH_SIZE |
10000 |
DBAPI aggregate insert batch size |
GAIA_ACTOR_POOL |
8 |
Production actor pool size |
GAIA_DOWNLOAD_POOL |
4 |
Download operation pool size |
GAIA_IMPORT_POOL |
4 |
DB operation pool size used by import fan-out |
The 20 benchmark files are configured as compact numeric file boundaries in FIRST_20_FILE_BOUNDARIES; components expand those boundaries into Gaia archive file ranges at runtime.
RunChallenge.sh starts IRIS, runs the production, waits for completion, and prints the CSV result file.https://github.com/grongierisc/iris-gaia-dr3/blob/master/settings.py targets exactly the first 20 Gaia DR3 epoch photometry archives required by the benchmark.source_id,bp_min_flux,bp_max_flux,rp_min_flux,rp_max_flux,percentage_change as the header, followed by one object per row.RunChallenge.sh is suitable for CI because it requires no manual input and prints only the CSV result file on success.Fast local checks:
.venv/bin/python -m pytest
.venv/bin/python -m compileall -q gaia tests https://github.com/grongierisc/iris-gaia-dr3/blob/master/settings.py
.venv/bin/iop --migrate https://github.com/grongierisc/iris-gaia-dr3/blob/master/settings.py --dry-run
Full benchmark check:
./RunChallenge.sh > /tmp/gaia-results.csv
wc -l /tmp/gaia-results.csv
On the current implementation, the full run produces 57099 data rows for the benchmark dataset, plus the header row.