Home Applications iris-airflow-provider

iris-airflow-provider

InterSystems does not provide technical support for this project. Please contact its developer for the technical assistance.
0
0 reviews
0
Awards
8
Views
0
IPM installs
0
0
Details
Releases (1)
Reviews
Issues
Contest
Apache Airflow provider for InterSystems IRIS.

What's new in this version

Initial Release

Intersystems IRIS provider for Apache Airflow

image

one
one
one
one
License
The InterSystems IRIS Provider for Apache Airflow enables seamless integration between Airflow workflows and the InterSystems IRIS data platform. It provides native connection support and operators for executing IRIS SQL and automating IRIS-driven tasks within modern ETL/ELT pipelines.

Designed for reliability and ease of use, this provider helps data engineers and developers build scalable, production-ready workflows for healthcare, interoperability, analytics, and enterprise data processing—powered by InterSystems IRIS.

About Apache Airflow

Apache Airflow is the leading open-source platform to programmatically author, schedule, and monitor data pipelines and workflows using Python. Workflows are defined as code (DAGs), making them version-controlled, testable, and reusable. With a rich UI, 100+ built-in operators, dynamic task generation, and native support for cloud providers, Airflow powers ETL/ELT, ML pipelines, and batch jobs at companies like Airbnb, Netflix, and Spotify.

Application Layout

image

Features

  • IrisSQLOperator – Execute raw SQL/ObjectScript with Jinja templating
  • IrisHook – SQLAlchemy-compatible hook for pandas, ORM, and custom logic
  • Connection management with Airflow Connections
  • Reliable bulk data loading via SQLAlchemy + pandas
  • Zero external dependencies beyond standard Airflow & IRIS Python drivers
  • Comprehensive examples for real-world ETL patterns

Installation

Docker (e.g. for dev purposes)

Clone/git pull the repo into any local directory

$ git clone https://github.com/mwaseem75/iris-fhir-lab.git

Open the terminal in this directory and run below commands:

Initializes the Airflow metadata database

$ docker compose up airflow-init

Initializes IRIS and entire Airflow platform

$ docker-compose up -d

Run the Application

Navigate to http://localhost:8080/ to run the application
image

View/Run Sample Dags

Application ships with three DAGS. Click on Dags.
Click toggle to button to enable or disable Dags.
image
Click Trigger Arrow to run the DAG manually
image
Double click on 01-IRIS-demo to view the DAG
image
This DAG has 3 tasks, 1 Create table, Insert Data and Retrieve Data
image
Select the Task and click on the Box to view the task details, click on code to view the code
image
to view code
image
to view log
image

About Airflow-provider-iris package

image

Add IRIS connection

Go to Admin → Connections → Add Connection
image
Click on save button to add the connection
image

Use your InterSystems IRIS connection by setting the iris_conn_id parameter in any of the provided operators.

In the example below, the IrisSQLOperator uses the iris_conn_id parameter to connect to the IRIS instance when the DAG is defined:

from airflow_provider_iris.operators.iris_operator import IrisSQLOperator

with DAG(
dag_id="01_IRIS_Raw_SQL_Demo_Local",
start_date=datetime(2025, 12, 1),
schedule=None,
catchup=False,
tags=["iris-contest"],
) as dag:

create_table = IrisSQLOperator(
    task_id="create_table",
    iris_conn_id="ContainerInstance",
    sql="""CREATE TABLE IF NOT EXISTS Test.AirflowDemo (
           ID INTEGER IDENTITY PRIMARY KEY,
           Message VARCHAR(200),
           RunDate TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )""",
)

ADD new DAG

DAG (Directed Acyclic Graph) – a Python script that defines a workflow as a collection of tasks with dependencies and schedule in Apache Airflow.
Airflow automatically take the DAGS from DAG folder. add new python file in DAG folder:

Click on Create table task then click on

Example DAGs (Included in examples/)

  1. Raw SQL Operator – Simple & Powerful
# dags/01_IRIS_Raw_SQL_Demo.py
from datetime import datetime
from airflow import DAG
from airflow_provider_iris.operators.iris_operator import IrisSQLOperator

with DAG(
dag_id="01_IRIS_Raw_SQL_Demo",
start_date=datetime(2025, 12, 1),
schedule=None,
catchup=False,
tags=["iris-contest"],
) as dag:

create_table = IrisSQLOperator(
    task_id="create_table",
    sql="""CREATE TABLE IF NOT EXISTS Test.AirflowDemo (
           ID INTEGER IDENTITY PRIMARY KEY,
           Message VARCHAR(200),
           RunDate TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )""",
)

insert = IrisSQLOperator(
    task_id="insert_row",
    sql="INSERT INTO Test.AirflowDemo (Message) VALUES ('Hello from raw SQL operator')",
)

select = IrisSQLOperator(
    task_id="select_rows",
    sql="SELECT ID, Message, RunDate FROM Test.AirflowDemo ORDER BY ID DESC",
)

create_table >> insert >> select

  1. ORM + Pandas Integration (Real-World ETL)
    Uses SQLAlchemy + pandas with the only known reliable method for bulk inserts into IRIS.
# dags/example_sqlalchemy_dag.py

from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
import pandas as pd

Import your hook and model

from airflow_provider_iris.hooks.iris_hook import IrisHook
from sqlalchemy import Column, Integer, String, DateTime, Float
from sqlalchemy.orm import declarative_base

Base = declarative_base()

class SalesRecord(Base):
tablename = "SalesRecord"
table_args = {"schema": "Test"}

id        = Column(Integer, primary_key=True)
region    = Column(String(50))
amount    = Column(Float)
sale_date = Column(DateTime)

def create_and_insert_orm(**context):
hook = IrisHook()
engine = hook.get_engine()

# Create table if not exists
Base.metadata.create_all(engine)

# THIS IS THE ONLY METHOD THAT WORKS RELIABLY WITH IRIS RIGHT NOW
data = [
    {"region": "Europe",        "amount": 12500.50, "sale_date": "2025-12-01"},
    {"region": "Asia",          "amount": 8900.00,  "sale_date": "2025-12-02"},
    {"region": "North America", "amount": 56700.00, "sale_date": "2025-12-03"},
    {"region": "Africa",        "amount": 34200.00, "sale_date": "2025-12-03"},
]
df = pd.DataFrame(data)
df["sale_date"] = pd.to_datetime(df["sale_date"])

# pandas.to_sql with single-row inserts → IRIS accepts this perfectly
df.to_sql(
    name="SalesRecord",
    con=engine,
    schema="Test",
    if_exists="append",
    index=False,
    method="multi",           # still fast
    chunksize=1               # ← THIS IS THE MAGIC LINE
)
print(f"Successfully inserted {len(df)} rows using pandas.to_sql() (chunksize=1)")

def query_orm(**context):
hook = IrisHook()
engine = hook.get_engine()
df = pd.read_sql("SELECT * FROM Test.SalesRecord ORDER BY id", engine)
for _, r in df.iterrows():
print(f"ORM → {int(r.id):>3} | {r.region:<15} | ${r.amount:>10,.2f} | {r.sale_date.date()}")

with DAG(
dag_id="02_IRIS_ORM_Demo",
start_date=datetime(2025, 12, 1),
schedule=None,
catchup=False,
tags=["iris-contest", "orm"],
) as dag:

orm_create = PythonOperator(task_id=&quot;orm_create_and_insert&quot;, python_callable=create_and_insert_orm)
orm_read   = PythonOperator(task_id=&quot;orm_read&quot;,               python_callable=query_orm)

orm_create &gt;&gt; orm_read

  1. Synthetic Data Generator → Bulk Load
    Generate realistic sales data and load efficiently.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
import pandas as pd
import numpy as np
from airflow_provider_iris.hooks.iris_hook import IrisHook
from sqlalchemy import Column, Integer, String, DateTime, Float
from sqlalchemy.orm import declarative_base

Base = declarative_base()

class SalesRecord(Base):
tablename = "SalesRecord"
table_args = {"schema": "Test"}

id        = Column(Integer, primary_key=True)
region    = Column(String(50))
amount    = Column(Float)
sale_date = Column(DateTime)

----------- SYNTHETIC DATA GENERATION -----------

def generate_synthetic_sales(num_rows=500):
"""Create synthetic sales data for testing."""

regions = [
    &quot;North America&quot;, &quot;South America&quot;, &quot;Europe&quot;,
    &quot;Asia-Pacific&quot;, &quot;Middle East&quot;, &quot;Africa&quot;
]

# Randomly pick regions
region_data = np.random.choice(regions, size=num_rows)

# Generate synthetic amounts between 10k and 120k
amounts = np.random.uniform(10000, 120000, size=num_rows).round(2)

# Generate random dates within last 30 days
start_date = datetime(2025, 11, 1)
sale_dates = [start_date + timedelta(days=int(x)) for x in np.random.randint(0, 30, size=num_rows)]

df = pd.DataFrame({
    &quot;region&quot;: region_data,
    &quot;amount&quot;: amounts,
    &quot;sale_date&quot;: sale_dates
})

return df

----------- AIRFLOW TASK FUNCTION -----------

def bulk_load_from_csv(**context):

df = generate_synthetic_sales(num_rows=200)   # Change number as needed

hook = IrisHook()
engine = hook.get_engine()

Base.metadata.create_all(engine)

df.to_sql(&quot;SalesRecord&quot;, con=engine, schema=&quot;Test&quot;, if_exists=&quot;append&quot;, index=False)
print(f&quot;Bulk loaded {len(df)} synthetic rows via pandas.to_sql()&quot;)

----------- DAG DEFINITION -----------

with DAG(
dag_id="03_IRIS_Load_CSV_Synthetic_Demo",
start_date=datetime(2025, 12, 1),
schedule=None,
catchup=False,
tags=["iris-contest", "etl", "synthetic"],
) as dag:

bulk_task = PythonOperator(
    task_id=&quot;bulk_load_synthetic_to_iris&quot;,
    python_callable=bulk_load_from_csv
)

Made with
Version
1.0.007 Dec, 2025
Python package
airflow-provider-iris
Ideas portal
https://ideas.intersystems.com/ideas/DPI-I-386
Category
Solutions
Works with
InterSystems IRIS for HealthInterSystems IRIS
First published
07 Dec, 2025
Last edited
07 Dec, 2025