Dagster is a Python-native data orchestration platform built around the concept of software-defined assets: data artefacts that know their own lineage, can be materialised on demand or on a schedule, and expose rich metadata and observability out of the box. For macro data workflows — pulling indicator series, detecting release events, storing snapshots, and alerting on anomalies — Dagster is an excellent fit. This guide walks through building a pipeline that ingests FXMacroData indicators into a local SQLite store, surfaces upcoming release calendar events, and runs automatically on a daily schedule.
What you will build
- A FXMacroData Dagster resource — a reusable, configurable API client shared across all assets
- Four software-defined assets — policy rates, CPI, forex spot, and the release calendar for a chosen currency pair
- A daily job and schedule — runs every weekday morning before the London open to refresh the latest readings
- An anomaly sensor — watches for unexpected inflation prints and fires an alert via a webhook
Prerequisites
- Python 3.10+ — all snippets use modern type hints and
matchstatements - FXMacroData API key — sign up at /subscribe; USD indicator data is publicly accessible without a key
- Basic Dagster familiarity — you should know what assets and jobs are; the Dagster quickstart covers the basics in ten minutes
Step 1 — Install Dagster and project dependencies
Create a fresh virtual environment and install Dagster alongside the small set of libraries this pipeline uses.
Pinning dagster and dagster-webserver to the same version avoids the most common
installation pitfall.
python -m venv .venv
source .venv/bin/activate # Windows: .venv\Scripts\activate
pip install "dagster==1.9.*" "dagster-webserver==1.9.*" \
requests pandas sqlalchemy
Initialise a minimal project layout. Dagster's scaffold command creates the directory structure and a
pyproject.toml that lets the local UI discover your definitions automatically.
dagster project scaffold --name fxmacro_pipeline
cd fxmacro_pipeline
Store your API key as an environment variable. Never hard-code credentials in source files.
export FXMACRO_API_KEY="YOUR_API_KEY"
On production hosts, inject the key via your scheduler's secret management (GitHub Actions secrets, Kubernetes secrets, Dagster Cloud environment variables). The pipeline reads it from the environment at runtime — no changes needed to the code.
Step 2 — Define a reusable FXMacroData resource
A Dagster resource is a shared, injectable dependency — analogous to a database connection or an HTTP session. Wrapping the FXMacroData REST API in a resource means every asset can call it without duplicating authentication logic or timeout handling.
Create fxmacro_pipeline/resources.py:
import os
from typing import Any
import requests
from dagster import ConfigurableResource, get_dagster_logger
_BASE_URL = "https://fxmacrodata.com/api/v1"
class FXMacroDataResource(ConfigurableResource):
"""Thin wrapper around the FXMacroData REST API."""
api_key: str = "" # overridden at run-time from env
timeout: int = 15
def _get(self, path: str, **params: Any) -> dict:
logger = get_dagster_logger()
key = self.api_key or os.environ.get("FXMACRO_API_KEY", "")
resp = requests.get(
f"{_BASE_URL}{path}",
params={"api_key": key, **params},
timeout=self.timeout,
)
resp.raise_for_status()
data = resp.json()
logger.debug("GET %s → %d records", path, len(data.get("data", [])))
return data
def get_announcements(
self,
currency: str,
indicator: str,
start: str = "2020-01-01",
) -> list[dict]:
"""Return time-series records for a macro indicator."""
result = self._get(
f"/announcements/{currency.lower()}/{indicator}",
start=start,
)
return result.get("data", [])
def get_forex(self, base: str, quote: str, start: str = "2023-01-01") -> list[dict]:
"""Return daily FX spot-rate records."""
result = self._get(f"/forex/{base.lower()}/{quote.lower()}", start=start)
return result.get("data", [])
def get_calendar(self, currency: str) -> list[dict]:
"""Return the upcoming release calendar for a currency."""
result = self._get(f"/calendar/{currency.lower()}")
return result.get("data", [])
Why ConfigurableResource?
ConfigurableResource lets you swap credentials and timeouts between dev and prod without
touching asset code. You configure the resource once in Definitions and Dagster injects it
wherever it is declared as a parameter.
Step 3 — Define macro data assets
Each Dagster asset represents one logical data product. Materialising an asset fetches the latest data from FXMacroData and persists it locally. We define four assets: the Fed policy rate, the US CPI, the ECB policy rate, and the EUR/USD spot series. A fifth asset then reads the release calendar so the pipeline knows when the next high-impact announcements are due.
Create fxmacro_pipeline/assets.py:
import os
import sqlite3
from datetime import date, timedelta
import pandas as pd
from dagster import (
AssetExecutionContext,
MaterializeResult,
MetadataValue,
asset,
)
from .resources import FXMacroDataResource
_DB_PATH = os.environ.get("FXMACRO_DB", "fxmacro_data.db")
def _ensure_table(conn: sqlite3.Connection, table: str) -> None:
conn.execute(
f"""
CREATE TABLE IF NOT EXISTS {table} (
date TEXT PRIMARY KEY,
val REAL,
announcement_datetime TEXT
)
"""
)
conn.commit()
def _upsert(conn: sqlite3.Connection, table: str, records: list[dict]) -> int:
rows = [
(r["date"], r.get("val"), r.get("announcement_datetime"))
for r in records
if r.get("date") and r.get("val") is not None
]
conn.executemany(
f"INSERT OR REPLACE INTO {table} (date, val, announcement_datetime) VALUES (?,?,?)",
rows,
)
conn.commit()
return len(rows)
@asset(group_name="macro_indicators", compute_kind="api")
def usd_policy_rate(
context: AssetExecutionContext,
fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
"""Latest Fed funds rate series from the FXMacroData announcements endpoint."""
records = fxmacrodata.get_announcements("usd", "policy_rate", start="2015-01-01")
with sqlite3.connect(_DB_PATH) as conn:
_ensure_table(conn, "usd_policy_rate")
n = _upsert(conn, "usd_policy_rate", records)
latest = records[0] if records else {}
context.log.info("Fed rate: %s%% (as of %s)", latest.get("val"), latest.get("date"))
return MaterializeResult(
metadata={
"row_count": MetadataValue.int(n),
"latest_date": MetadataValue.text(latest.get("date", "n/a")),
"latest_val": MetadataValue.float(latest.get("val") or 0.0),
}
)
@asset(group_name="macro_indicators", compute_kind="api")
def usd_cpi(
context: AssetExecutionContext,
fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
"""US CPI (year-on-year) series. Tracks whether inflation is rising or falling."""
records = fxmacrodata.get_announcements("usd", "inflation", start="2015-01-01")
with sqlite3.connect(_DB_PATH) as conn:
_ensure_table(conn, "usd_cpi")
n = _upsert(conn, "usd_cpi", records)
latest = records[0] if records else {}
context.log.info("US CPI: %s%% (as of %s)", latest.get("val"), latest.get("date"))
return MaterializeResult(
metadata={
"row_count": MetadataValue.int(n),
"latest_date": MetadataValue.text(latest.get("date", "n/a")),
"latest_val": MetadataValue.float(latest.get("val") or 0.0),
}
)
@asset(group_name="macro_indicators", compute_kind="api")
def eur_policy_rate(
context: AssetExecutionContext,
fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
"""ECB main refinancing rate series."""
records = fxmacrodata.get_announcements("eur", "policy_rate", start="2015-01-01")
with sqlite3.connect(_DB_PATH) as conn:
_ensure_table(conn, "eur_policy_rate")
n = _upsert(conn, "eur_policy_rate", records)
latest = records[0] if records else {}
context.log.info("ECB rate: %s%% (as of %s)", latest.get("val"), latest.get("date"))
return MaterializeResult(
metadata={
"row_count": MetadataValue.int(n),
"latest_date": MetadataValue.text(latest.get("date", "n/a")),
"latest_val": MetadataValue.float(latest.get("val") or 0.0),
}
)
@asset(group_name="fx_rates", compute_kind="api")
def eurusd_spot(
context: AssetExecutionContext,
fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
"""Daily EUR/USD closing spot rate."""
records = fxmacrodata.get_forex("eur", "usd", start="2023-01-01")
with sqlite3.connect(_DB_PATH) as conn:
_ensure_table(conn, "eurusd_spot")
n = _upsert(conn, "eurusd_spot", records)
latest = records[0] if records else {}
context.log.info("EUR/USD: %s (as of %s)", latest.get("val"), latest.get("date"))
return MaterializeResult(
metadata={
"row_count": MetadataValue.int(n),
"latest_date": MetadataValue.text(latest.get("date", "n/a")),
"latest_val": MetadataValue.float(latest.get("val") or 0.0),
}
)
@asset(group_name="calendar", compute_kind="api")
def usd_release_calendar(
context: AssetExecutionContext,
fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
"""Upcoming USD macro release dates and times."""
events = fxmacrodata.get_calendar("usd")
df = pd.DataFrame(events) if events else pd.DataFrame()
context.log.info("Found %d upcoming USD events", len(df))
# Surface a preview in the Dagster UI asset metadata pane
preview = df.head(5).to_markdown(index=False) if not df.empty else "No events found"
return MaterializeResult(
metadata={
"event_count": MetadataValue.int(len(df)),
"preview": MetadataValue.md(preview),
}
)
Asset metadata in the Dagster UI
Every MaterializeResult returns structured metadata that Dagster renders in the asset detail
pane — row counts, latest dates, preview tables. This means you can check at a glance whether a
materialisation actually fetched fresh data without digging into logs.
Step 4 — Create a job and a daily schedule
A Dagster job selects which assets to materialise in one run. A schedule drives that job on a cron expression. We want fresh macro readings every weekday morning at 06:30 UTC — before the Frankfurt open — so any indicator released overnight is captured before European trading begins.
Create fxmacro_pipeline/jobs.py:
from dagster import (
AssetSelection,
ScheduleDefinition,
define_asset_job,
)
# ── Jobs ──────────────────────────────────────────────────────────────────────
macro_refresh_job = define_asset_job(
name="macro_refresh",
description="Refresh all FXMacroData indicator and FX rate assets.",
selection=AssetSelection.groups("macro_indicators", "fx_rates"),
)
calendar_refresh_job = define_asset_job(
name="calendar_refresh",
description="Pull the latest USD release calendar.",
selection=AssetSelection.groups("calendar"),
)
# ── Schedules ─────────────────────────────────────────────────────────────────
# Weekdays at 06:30 UTC — 30 min before Frankfurt opens
macro_daily_schedule = ScheduleDefinition(
name="macro_daily_at_0630_utc",
job=macro_refresh_job,
cron_schedule="30 6 * * 1-5",
execution_timezone="UTC",
)
# Refresh the release calendar every Sunday evening so the week's
# upcoming events are loaded before Monday's first run.
calendar_weekly_schedule = ScheduleDefinition(
name="calendar_weekly_on_sunday",
job=calendar_refresh_job,
cron_schedule="0 18 * * 0",
execution_timezone="UTC",
)
Two separate jobs — one for indicator series, one for the release calendar — let you observe and re-run them independently. If the release calendar fetch fails (for example during a public holiday when the endpoint returns an empty payload), the indicator refresh job is not affected.
Step 5 — Wire everything into a Definitions object
Dagster's Definitions object is the single entry point that ties assets, resources, jobs,
and schedules together. Edit the generated fxmacro_pipeline/__init__.py:
import os
from dagster import Definitions
from .assets import (
eur_policy_rate,
eurusd_spot,
usd_cpi,
usd_policy_rate,
usd_release_calendar,
)
from .jobs import (
calendar_refresh_job,
calendar_weekly_schedule,
macro_daily_schedule,
macro_refresh_job,
)
from .resources import FXMacroDataResource
defs = Definitions(
assets=[
usd_policy_rate,
usd_cpi,
eur_policy_rate,
eurusd_spot,
usd_release_calendar,
],
resources={
"fxmacrodata": FXMacroDataResource(
# Falls back to the FXMACRO_API_KEY env var inside the resource
api_key=os.environ.get("FXMACRO_API_KEY", ""),
),
},
jobs=[macro_refresh_job, calendar_refresh_job],
schedules=[macro_daily_schedule, calendar_weekly_schedule],
)
Resource injection at runtime
Any asset that declares fxmacrodata: FXMacroDataResource as a parameter automatically
receives the configured instance at run-time. To swap in a mock for testing, override the resource
in your test Definitions — the asset code does not change.
Step 6 — Add a sensor to alert on surprising inflation prints
Sensors in Dagster poll for external conditions and trigger runs or alert channels when something
changes. The following sensor runs after every usd_cpi materialisation: if the latest
CPI reading is more than 0.4 percentage points above the prior reading, it fires a webhook alert
so your strategy or risk system can react immediately.
Add fxmacro_pipeline/sensors.py:
import os
import sqlite3
import requests as http
from dagster import (
RunStatusSensorContext,
asset_sensor,
AssetKey,
EventLogEntry,
SensorResult,
SkipReason,
)
_DB_PATH = os.environ.get("FXMACRO_DB", "fxmacro_data.db")
_ALERT_WEBHOOK = os.environ.get("ALERT_WEBHOOK_URL", "")
_SURPRISE_THRESHOLD = 0.4 # percentage points
@asset_sensor(asset_key=AssetKey("usd_cpi"), job_name="macro_refresh")
def cpi_surprise_alert_sensor(
context: RunStatusSensorContext,
asset_event: EventLogEntry,
) -> SensorResult | SkipReason:
"""Alert when the latest US CPI print is significantly above the prior reading."""
try:
conn = sqlite3.connect(_DB_PATH)
rows = conn.execute(
"SELECT date, val FROM usd_cpi ORDER BY date DESC LIMIT 2"
).fetchall()
conn.close()
except Exception as exc:
return SkipReason(f"DB read failed: {exc}")
if len(rows) < 2:
return SkipReason("Not enough CPI history yet.")
latest_date, latest_val = rows[0]
prior_date, prior_val = rows[1]
surprise = (latest_val or 0) - (prior_val or 0)
context.log.info(
"CPI %s: %.2f%% (prior %s: %.2f%%, surprise: %+.2f pp)",
latest_date, latest_val, prior_date, prior_val, surprise,
)
if surprise >= _SURPRISE_THRESHOLD and _ALERT_WEBHOOK:
payload = {
"text": (
f":fire: *USD CPI Surprise* — latest {latest_date}: *{latest_val:.1f}%* "
f"vs prior {prior_date}: {prior_val:.1f}% "
f"(surprise: *{surprise:+.2f} pp*)"
)
}
http.post(_ALERT_WEBHOOK, json=payload, timeout=8)
context.log.warning("CPI surprise alert fired: %+.2f pp", surprise)
return SensorResult(cursor=str(latest_date))
Register the sensor in __init__.py by importing it and adding it to the Definitions
sensors list:
from .sensors import cpi_surprise_alert_sensor
defs = Definitions(
# ... existing fields ...
sensors=[cpi_surprise_alert_sensor],
)
Step 7 — Launch the Dagster UI and run your first materialisation
Start the local development server. Dagster will discover your Definitions through the
pyproject.toml entry point:
dagster dev
Navigate to http://localhost:3000. You will see the Asset Graph showing
all five assets grouped into their logical groups (macro_indicators, fx_rates,
calendar). Click Materialise All to run the first ingestion pass — within
seconds, the asset pane will show row counts, latest dates, and latest values pulled directly from
the FXMacroData API.
Verify the data in SQLite
sqlite3 fxmacro_data.db
sqlite> SELECT date, val FROM usd_policy_rate ORDER BY date DESC LIMIT 5;
sqlite> SELECT date, val FROM usd_cpi ORDER BY date DESC LIMIT 5;
sqlite> SELECT date, val FROM eurusd_spot ORDER BY date DESC LIMIT 3;
To activate the schedule, navigate to Overview → Schedules in the Dagster UI and toggle
macro_daily_at_0630_utc to Running. Dagster's daemon process will now wake up
every weekday at 06:30 UTC and materialise the macro indicator assets automatically.
Step 8 — Write unit tests with a mock resource
One of Dagster's key design benefits is that assets are ordinary Python functions — easy to unit-test without running the full orchestration stack. Swap the live resource for a mock to keep tests fast and offline.
# tests/test_assets.py
import sqlite3
import tempfile
import os
from unittest.mock import MagicMock
import pytest
from dagster import build_asset_context, materialize
from fxmacro_pipeline.assets import usd_policy_rate, usd_cpi
from fxmacro_pipeline.resources import FXMacroDataResource
MOCK_RATE_DATA = [
{"date": "2025-12-18", "val": 4.5, "announcement_datetime": "2025-12-18T19:00:00Z"},
{"date": "2025-11-07", "val": 4.75, "announcement_datetime": "2025-11-07T19:00:00Z"},
]
MOCK_CPI_DATA = [
{"date": "2025-12-11", "val": 2.7, "announcement_datetime": "2025-12-11T13:30:00Z"},
{"date": "2025-11-13", "val": 2.6, "announcement_datetime": "2025-11-13T13:30:00Z"},
]
class MockFXMacroData(FXMacroDataResource):
def get_announcements(self, currency, indicator, start="2020-01-01"):
if indicator == "policy_rate":
return MOCK_RATE_DATA
if indicator == "inflation":
return MOCK_CPI_DATA
return []
def get_forex(self, base, quote, start="2023-01-01"):
return [{"date": "2025-12-31", "val": 1.0482, "announcement_datetime": None}]
def get_calendar(self, currency):
return []
@pytest.fixture(autouse=True)
def temp_db(tmp_path, monkeypatch):
monkeypatch.setenv("FXMACRO_DB", str(tmp_path / "test.db"))
def test_usd_policy_rate_materialises():
result = materialize(
[usd_policy_rate],
resources={"fxmacrodata": MockFXMacroData()},
)
assert result.success
mat = result.asset_materializations_for_node("usd_policy_rate")[0]
assert mat.metadata["row_count"].value == 2
assert mat.metadata["latest_val"].value == pytest.approx(4.5)
def test_usd_cpi_materialises():
result = materialize(
[usd_cpi],
resources={"fxmacrodata": MockFXMacroData()},
)
assert result.success
mat = result.asset_materializations_for_node("usd_cpi")[0]
assert mat.metadata["latest_val"].value == pytest.approx(2.7)
pip install pytest
pytest tests/ -v
Step 9 — Deploy to production with Dagster+
For teams that want a hosted control plane without managing their own daemon, Dagster+
(the managed cloud offering) requires only a single additional configuration step: replace dagster dev
with the Dagster Cloud agent, which reads the same Definitions object and executes runs
in your infrastructure.
Set your API key as a Dagster+ environment variable (Deployment → Environment Variables) with
the name FXMACRO_API_KEY. The resource will pick it up automatically — no code changes
required. The same pattern applies to any CI/CD environment that injects secrets as environment variables
(GitHub Actions, GitLab CI, CircleCI).
To deploy on a self-hosted Kubernetes cluster, package the pipeline as a Docker image, push it to your
registry, and point the Dagster Helm chart at it. The FXMacroData resource connects outbound to
fxmacrodata.com — ensure your cluster's egress policy allows HTTPS to that host.
What you built
By following this guide you now have a working Dagster pipeline that:
- Defines a reusable, injectable
FXMacroDataResourcethat handles all API auth and error handling in one place - Exposes five software-defined assets — Fed rate, US CPI, ECB rate, EUR/USD spot, and the USD release calendar — each persisted to SQLite with full Dagster metadata
- Schedules weekday refreshes at 06:30 UTC via a cron-driven Dagster schedule so you always have up-to-date readings before European markets open
- Fires a webhook alert when a CPI print surprises to the upside, giving downstream systems an event-driven trigger rather than a polling loop
- Includes fast offline unit tests using a mock resource, so the CI pipeline never makes live API calls
Next steps
- Extend the pipeline with additional currencies — browse the full indicator catalogue at /api-data-docs and add new assets following the same pattern
- Replace the SQLite store with a Postgres or BigQuery I/O manager for production scale
- Add a downstream asset that reads from the DB, computes a macro divergence score, and writes trading signals to a signals table — keeping your strategy logic separate from the ingestion layer
- Explore COT positioning data as a sentiment overlay to complement the indicator series