Home Applications iris-gaia-dr3

iris-gaia-dr3

InterSystems does not provide technical support for this project. Please contact its developer for the technical assistance.
0
0 reviews
0
Awards
6
Views
0
IPM installs
0
0
Details
Releases (1)
Reviews
Issues
Videos (1)
Contest
IoP application for the Gaia DR3 epoch photometry benchmark

What's new in this version

Initial Release

Gaia DR3 Parallel IoP Benchmark

Watch the benchmark video

This project is an InterSystems IRIS Interoperability On Python, or IoP, application for the Gaia DR3 epoch photometry benchmark described in https://github.com/grongierisc/iris-gaia-dr3/blob/master/GOAL.md.

It processes the first 20 Gaia DR3 epoch photometry archives, computes BP/RP flux variation by source_id, and writes the qualifying results to output/results.csv with a header row.

Goal

The benchmark asks for every astronomical object whose BP or RP flux changed by more than 100 percent over the observation period.

For each source_id, the application:

  1. Reads only the first 20 Gaia DR3 epoch photometry files, from EpochPhotometry_000000-003111.csv.gz through EpochPhotometry_020985-021233.csv.gz.
  2. Parses bp_flux and rp_flux array fields.
  3. Ignores blanks, null-like values, NaN, infinities, and malformed values.
  4. Computes BP min/max and RP min/max across valid values.
  5. Computes percentage change as ((max_flux - min_flux) / min_flux) * 100.
  6. Keeps the larger BP/RP percentage change.
  7. Outputs only rows where percentage_change > 100.

Output columns:

source_id,bp_min_flux,bp_max_flux,rp_min_flux,rp_max_flux,percentage_change

The CSV starts with that header row, followed by one row per qualifying source_id.

Quick Run

Requirements:

  • Docker with Docker Compose
  • Network access to the Gaia CDN

Run the benchmark:

./RunChallenge.sh

RunChallenge.sh rebuilds and recreates the IRIS container, starts the IoP production, waits for completion, and prints output/results.csv to stdout.

The generated files are:

  • output/downloads/*.csv.gz: downloaded Gaia source files
  • output/results.csv: final benchmark output
  • output/results.done: success marker
  • output/results.err: failure marker, if the production fails

Local Development

Create and use a virtual environment if one is not already present:

python -m venv .venv
. .venv/bin/activate
pip install -r requirements.txt

Run tests:

.venv/bin/python -m pytest

Validate the IoP production without writing to IRIS:

.venv/bin/iop --migrate https://github.com/grongierisc/iris-gaia-dr3/blob/master/settings.py --dry-run

Run the full Docker workflow:

docker compose up --build

Architecture

The application is an IoP production declared in https://github.com/grongierisc/iris-gaia-dr3/blob/master/settings.py and https://github.com/grongierisc/iris-gaia-dr3/blob/master/gaia/production.py.

Business roles:

  • GaiaBenchmarkService: polling entrypoint. It starts one benchmark run when neither results.done nor results.err exists.
  • GaiaBenchmarkProcess: orchestrates the workflow and writes success/failure markers.
  • GaiaDownloadOperation: downloads or reuses one local gzip file per request.
  • GaiaDbOperation: owns the IRIS DBAPI connection lifecycle and routes DB work by message type:
    prepare run, import aggregate rows, compute final rows, and export the CSV.

IoP Mermaid graph generated from the production object:

flowchart LR
  %% Production: GaiaDR3.Production
  subgraph group_service["Services"]
    direction TB
    node_GaiaBenchmarkService["GaiaBenchmarkService
Python.gaia.services.GaiaBenchmarkService"] end subgraph group_process["Processes"] direction TB node_GaiaBenchmarkProcess["GaiaBenchmarkProcess
Python.gaia.processes.GaiaBenchmarkProcess"] end subgraph group_operation["Operations"] direction TB node_GaiaDownloadOperation["GaiaDownloadOperation
Python.gaia.operations.GaiaDownloadOperation"] node_GaiaDbOperation["GaiaDbOperation
Python.gaia.operations.GaiaDbOperation"] end node_GaiaBenchmarkService ~~~ node_GaiaBenchmarkProcess node_GaiaBenchmarkProcess -- "ComputeOperation" --> node_GaiaDbOperation node_GaiaBenchmarkProcess -- "DownloadOperation" --> node_GaiaDownloadOperation node_GaiaBenchmarkProcess -- "ExportOperation" --> node_GaiaDbOperation node_GaiaBenchmarkProcess -- "ImportOperation" --> node_GaiaDbOperation node_GaiaBenchmarkProcess -- "PrepareOperation" --> node_GaiaDbOperation node_GaiaBenchmarkService -- "Output" --> node_GaiaBenchmarkProcess

Demo Screenshots

These screenshots show the same production running in the IRIS interoperability UI.

Production

Gaia DR3 IoP production view

The production screenshot comes from the graph declared in https://github.com/grongierisc/iris-gaia-dr3/blob/master/gaia/production.py.
The code creates one polling service, one orchestrating process, one download operation, and one DB operation.
Several process targets are intentionally routed to the same DB operation so one component owns DBAPI lifecycle.

prod = Production("GaiaDR3.Production", testing_enabled=True, actor_pool_size=actor_pool_size)

service = prod.service("GaiaBenchmarkService", GaiaBenchmarkService, settings=gaia_settings) process = prod.process("GaiaBenchmarkProcess", GaiaBenchmarkProcess, settings=gaia_settings) download = prod.operation( "GaiaDownloadOperation", GaiaDownloadOperation, settings=gaia_settings, pool_size=download_pool_size, ) db = prod.operation( "GaiaDbOperation", GaiaDbOperation, settings=gaia_settings, pool_size=import_pool_size, )

service.connect(GaiaBenchmarkService.Output, process) process.connect(GaiaBenchmarkProcess.DownloadOperation, download) process.connect(GaiaBenchmarkProcess.PrepareOperation, db) process.connect(GaiaBenchmarkProcess.ImportOperation, db) process.connect(GaiaBenchmarkProcess.ComputeOperation, db) process.connect(GaiaBenchmarkProcess.ExportOperation, db)

Settings

Gaia DR3 production settings examples

The settings screenshot shows the Gaia settings exposed on the IoP components.
The typed settings are declared once in https://github.com/grongierisc/iris-gaia-dr3/blob/master/gaia/runtime.py, then https://github.com/grongierisc/iris-gaia-dr3/blob/master/settings.py supplies environment-backed defaults.

class GaiaSettings:
    ArchiveUrlTemplate: Annotated[str, Setting(data_type=str, required=True, category="Gaia")]
    FileBoundaries: Annotated[str, Setting(data_type=str, required=True, category="Gaia")]
    OutputDir: Annotated[str, Setting(data_type=str, required=True, category="Gaia", control=controls.directory())]
    RequestTimeoutSeconds: Annotated[int, Setting(data_type=int, required=True, category="Gaia")]
    HttpTimeoutSeconds: Annotated[int, Setting(data_type=int, required=True, category="Gaia")]
    DbBatchSize: Annotated[int, Setting(data_type=int, required=True, category="Gaia")]
GAIA_SETTINGS = {
    "ArchiveUrlTemplate": ARCHIVE_URL_TEMPLATE,
    "FileBoundaries": ",".join(FIRST_20_FILE_BOUNDARIES),
    "OutputDir": os.getenv("GAIA_OUTPUT_DIR", "/irisdev/app/output"),
    "RequestTimeoutSeconds": int(os.getenv("GAIA_REQUEST_TIMEOUT_SECONDS", "1800")),
    "HttpTimeoutSeconds": int(os.getenv("GAIA_HTTP_TIMEOUT_SECONDS", "120")),
    "DbBatchSize": int(os.getenv("GAIA_DB_BATCH_SIZE", "10000")),
}

Async Traces

Gaia DR3 async download and import traces

The trace screenshot shows the two fan-out phases in https://github.com/grongierisc/iris-gaia-dr3/blob/master/gaia/processes.py.
Prepare, compute, and export are synchronous because they are single sequential steps.
Download and import use send_request_async_ng so the process can log each completed file as soon as its response returns.

downloads = asyncio.run(
    self._send_parallel(
        self.DownloadOperation,
        [
            DownloadFileRequest(request.run_name, file_range, self.archive_url(file_range))
            for file_range in self.file_ranges
        ],
        DownloadFileResult,
        "Download Gaia DR3 files",
    )
)
async def _send_parallel(self, target, requests, expected, description: str):
    tasks = [
        asyncio.create_task(
            self.send_request_async_ng(
                target,
                request,
                timeout=self.request_timeout,
                description=f"{description}: {request.file_range}",
            )
        )
        for request in requests
    ]
results = []
for done in asyncio.as_completed(tasks):
    result = self._expect(await done, expected, target)
    results.append(result)
    self.log_info(f"{description}: {len(results)}/{len(tasks)} {result.file_range}")
return results

Workflow

  1. GaiaBenchmarkService polls once per second.
  2. It creates output/results.lock and sends a GaiaBenchmarkRequest to the process.
  3. GaiaBenchmarkProcess sends PrepareRunRequest to GaiaDbOperation to clear prior persistent rows and output markers.
  4. The process fans out 20 DownloadFileRequest messages to GaiaDownloadOperation with send_request_async_ng.
  5. Downloads are stored in output/downloads/. Existing readable gzip files are reused.
  6. The process logs each completed download, then fans out 20 ImportFileRequest messages to GaiaDbOperation with send_request_async_ng.
  7. Each import request parses one gzip file, computes per-row BP/RP min/max values, and inserts aggregate rows with executemany.
  8. The process logs each completed import.
  9. GaiaDbOperation handles ComputeRequest with SQL to combine all per-file aggregates by source_id, calculate percentage change, and persist only results over 100 percent.
  10. GaiaDbOperation handles ExportCsvRequest by selecting final rows ordered by source_id; gaia.exporting writes them to output/results.csv.
  11. The process writes output/results.done.

If any step fails, the process writes output/results.err, then re-raises the failure.

Persistent Data

The application uses iris-persistence for class-backed persistent tables:

  • GaiaDR3.SourceFluxAggregate: per-run, per-file, per-source BP/RP min/max aggregates
  • GaiaDR3.PhotometryChange: final per-source qualifying results

Persistent indexes:

Table Index Properties Purpose
GaiaDR3.SourceFluxAggregate SourceAggregateRunIdx run_name Delete or scan one benchmark run during preparation and compute.
GaiaDR3.SourceFluxAggregate SourceAggregateSourceIdx run_name,source_id Group and aggregate per-source rows for a run.
GaiaDR3.PhotometryChange PhotometryChangeRunIdx run_name Delete or scan final results for one run.
GaiaDR3.PhotometryChange PhotometryChangeSourceIdx run_name,source_id Read final results ordered by source for export and inspection.

The application intentionally does not persist every raw observation value. The benchmark output only requires per-source min/max and percentage change, so persisting aggregate rows keeps IRIS storage smaller while preserving the data needed for final SQL analytics.

Downloaded files are persisted on disk under output/downloads/; the presence of a readable gzip file is the download handoff state.

Compute SQL

The compute step uses two persistent tables:

  • Source table: GaiaDR3.SourceFluxAggregate
  • Change table: GaiaDR3.PhotometryChange

SourceFluxAggregate is the input to compute. Each row represents one imported source aggregate from one downloaded file:

run_name,file_range,source_id,bp_min_flux,bp_max_flux,rp_min_flux,rp_max_flux

PhotometryChange is the materialized final result table. It stores only rows that satisfy the benchmark rule:

run_name,source_id,bp_min_flux,bp_max_flux,rp_min_flux,rp_max_flux,percentage_change

The compute route in GaiaDbOperation inserts into PhotometryChange with one set-oriented SQL statement:

  1. Delete prior final rows for the same run_name.
  2. Read SourceFluxAggregate rows for that run_name.
  3. Group by source_id.
  4. Compute global min/max BP and RP flux values for each source.
  5. Compute BP and RP percentage changes.
  6. Keep the bigger BP/RP change as percentage_change.
  7. Insert only rows where percentage_change > 100.

The core grouping is:

SELECT
  source_id,
  MIN(bp_min_flux) AS bp_min_flux,
  MAX(bp_max_flux) AS bp_max_flux,
  MIN(rp_min_flux) AS rp_min_flux,
  MAX(rp_max_flux) AS rp_max_flux
FROM GaiaDR3.SourceFluxAggregate
WHERE run_name = ?
GROUP BY source_id

Then the compute query applies the benchmark formula:

CASE
  WHEN bp_min_flux IS NULL OR bp_min_flux = 0
    THEN NULL
  ELSE ((bp_max_flux - bp_min_flux) / bp_min_flux) * 100
END AS bp_change

The same logic is applied to RP. The final percentage_change is the larger non-null value between bp_change and rp_change.

Example with one source appearing in multiple files:

file_range source_id bp_min_flux bp_max_flux rp_min_flux rp_max_flux
000000-003111 101 10 20 5 8
003112-005263 101 7 30 4 9

After SQL grouping:

source_id bp_min_flux bp_max_flux rp_min_flux rp_max_flux
101 7 30 4 9

Percentage changes:

BP = ((30 - 7) / 7) * 100 = 328.5714
RP = ((9 - 4) / 4) * 100 = 125
percentage_change = 328.5714

That row is inserted into PhotometryChange because 328.5714 > 100.

Example where a zero minimum is ignored for one band:

source_id bp_min_flux bp_max_flux rp_min_flux rp_max_flux
202 0 10 4 12

BP change is NULL because the minimum is zero. RP change is 200, so the final percentage_change is 200.

On the current benchmark dataset, the first 20 Gaia files produced 75064 SourceFluxAggregate rows and 75064 distinct source_id values, so there were no duplicate source_id values across files after import. The SQL still groups by source_id because it is the correct benchmark operation and keeps the production robust if a future dataset contains the same source in multiple files.

SQL is a good fit here because the data is already in IRIS:

  • It avoids pulling all aggregate rows back into Python.
  • It lets IRIS perform grouping, min/max, filtering, and insertion as one set operation.
  • It uses the run_name,source_id indexes to target one run and organize per-source aggregation.
  • It materializes final results in PhotometryChange, making export and inspection simple.

Configuration

Runtime configuration is in https://github.com/grongierisc/iris-gaia-dr3/blob/master/settings.py and is passed to every IoP component as production settings.

Environment overrides:

Variable Default Purpose
GAIA_OUTPUT_DIR /irisdev/app/output Output directory inside the container
GAIA_REQUEST_TIMEOUT_SECONDS 1800 Sync request timeout for long-running workflow calls
GAIA_HTTP_TIMEOUT_SECONDS 120 Per-download HTTP timeout
GAIA_DB_BATCH_SIZE 10000 DBAPI aggregate insert batch size
GAIA_ACTOR_POOL 8 Production actor pool size
GAIA_DOWNLOAD_POOL 4 Download operation pool size
GAIA_IMPORT_POOL 4 DB operation pool size used by import fan-out

The 20 benchmark files are configured as compact numeric file boundaries in FIRST_20_FILE_BOUNDARIES; components expand those boundaries into Gaia archive file ranges at runtime.

How It Meets https://github.com/grongierisc/iris-gaia-dr3/blob/master/GOAL.md

  • Fully functional: RunChallenge.sh starts IRIS, runs the production, waits for completion, and prints the CSV result file.
  • Scalable ingestion: downloads and imports are async fan-out operations with configurable pool sizes and per-file completion logs.
  • InterSystems-first implementation: orchestration runs through an IRIS IoP production; aggregate and final result data are stored in IRIS persistent tables.
  • Correct dataset: https://github.com/grongierisc/iris-gaia-dr3/blob/master/settings.py targets exactly the first 20 Gaia DR3 epoch photometry archives required by the benchmark.
  • Correct output: final CSV contains source_id,bp_min_flux,bp_max_flux,rp_min_flux,rp_max_flux,percentage_change as the header, followed by one object per row.
  • Correct filtering: SQL persists only rows where the computed maximum BP/RP percentage change is greater than 100.
  • Automated execution: RunChallenge.sh is suitable for CI because it requires no manual input and prints only the CSV result file on success.

Verification

Fast local checks:

.venv/bin/python -m pytest
.venv/bin/python -m compileall -q gaia tests https://github.com/grongierisc/iris-gaia-dr3/blob/master/settings.py
.venv/bin/iop --migrate https://github.com/grongierisc/iris-gaia-dr3/blob/master/settings.py --dry-run

Full benchmark check:

./RunChallenge.sh > /tmp/gaia-results.csv
wc -l /tmp/gaia-results.csv

On the current implementation, the full run produces 57099 data rows for the benchmark dataset, plus the header row.

Version
1.0.003 Jul, 2026
Ideas to the app
Category
Frameworks
Works with
InterSystems IRIS
First published
03 Jul, 2026
Last edited
03 Jul, 2026