How To Integrate Fxmacrodata With Dagster banner image

Implementation

How-To Guides

How To Integrate Fxmacrodata With Dagster

Build a production-ready Dagster data pipeline that pulls macro indicators from FXMacroData on a schedule, stores results in a local database, and surfaces release calendar events for smarter execution timing.

Dagster is a Python-native data orchestration platform built around the concept of software-defined assets: data artefacts that know their own lineage, can be materialised on demand or on a schedule, and expose rich metadata and observability out of the box. For macro data workflows — pulling indicator series, detecting release events, storing snapshots, and alerting on anomalies — Dagster is an excellent fit. This guide walks through building a pipeline that ingests FXMacroData indicators into a local SQLite store, surfaces upcoming release calendar events, and runs automatically on a daily schedule.

What you will build

  • A FXMacroData Dagster resource — a reusable, configurable API client shared across all assets
  • Four software-defined assets — policy rates, CPI, forex spot, and the release calendar for a chosen currency pair
  • A daily job and schedule — runs every weekday morning before the London open to refresh the latest readings
  • An anomaly sensor — watches for unexpected inflation prints and fires an alert via a webhook

Prerequisites

  • Python 3.10+ — all snippets use modern type hints and match statements
  • FXMacroData API key — sign up at /subscribe; USD indicator data is publicly accessible without a key
  • Basic Dagster familiarity — you should know what assets and jobs are; the Dagster quickstart covers the basics in ten minutes

Step 1 — Install Dagster and project dependencies

Create a fresh virtual environment and install Dagster alongside the small set of libraries this pipeline uses. Pinning dagster and dagster-webserver to the same version avoids the most common installation pitfall.

python -m venv .venv
source .venv/bin/activate            # Windows: .venv\Scripts\activate

pip install "dagster==1.9.*" "dagster-webserver==1.9.*" \
            requests pandas sqlalchemy

Initialise a minimal project layout. Dagster's scaffold command creates the directory structure and a pyproject.toml that lets the local UI discover your definitions automatically.

dagster project scaffold --name fxmacro_pipeline
cd fxmacro_pipeline

Store your API key as an environment variable. Never hard-code credentials in source files.

export FXMACRO_API_KEY="YOUR_API_KEY"

On production hosts, inject the key via your scheduler's secret management (GitHub Actions secrets, Kubernetes secrets, Dagster Cloud environment variables). The pipeline reads it from the environment at runtime — no changes needed to the code.


Step 2 — Define a reusable FXMacroData resource

A Dagster resource is a shared, injectable dependency — analogous to a database connection or an HTTP session. Wrapping the FXMacroData REST API in a resource means every asset can call it without duplicating authentication logic or timeout handling.

Create fxmacro_pipeline/resources.py:

import os
from typing import Any

import requests
from dagster import ConfigurableResource, get_dagster_logger

_BASE_URL = "https://fxmacrodata.com/api/v1"


class FXMacroDataResource(ConfigurableResource):
    """Thin wrapper around the FXMacroData REST API."""

    api_key: str = ""           # overridden at run-time from env
    timeout: int = 15

    def _get(self, path: str, **params: Any) -> dict:
        logger = get_dagster_logger()
        key = self.api_key or os.environ.get("FXMACRO_API_KEY", "")
        resp = requests.get(
            f"{_BASE_URL}{path}",
            params={"api_key": key, **params},
            timeout=self.timeout,
        )
        resp.raise_for_status()
        data = resp.json()
        logger.debug("GET %s → %d records", path, len(data.get("data", [])))
        return data

    def get_announcements(
        self,
        currency: str,
        indicator: str,
        start: str = "2020-01-01",
    ) -> list[dict]:
        """Return time-series records for a macro indicator."""
        result = self._get(
            f"/announcements/{currency.lower()}/{indicator}",
            start=start,
        )
        return result.get("data", [])

    def get_forex(self, base: str, quote: str, start: str = "2023-01-01") -> list[dict]:
        """Return daily FX spot-rate records."""
        result = self._get(f"/forex/{base.lower()}/{quote.lower()}", start=start)
        return result.get("data", [])

    def get_calendar(self, currency: str) -> list[dict]:
        """Return the upcoming release calendar for a currency."""
        result = self._get(f"/calendar/{currency.lower()}")
        return result.get("data", [])

Why ConfigurableResource?

ConfigurableResource lets you swap credentials and timeouts between dev and prod without touching asset code. You configure the resource once in Definitions and Dagster injects it wherever it is declared as a parameter.


Step 3 — Define macro data assets

Each Dagster asset represents one logical data product. Materialising an asset fetches the latest data from FXMacroData and persists it locally. We define four assets: the Fed policy rate, the US CPI, the ECB policy rate, and the EUR/USD spot series. A fifth asset then reads the release calendar so the pipeline knows when the next high-impact announcements are due.

Create fxmacro_pipeline/assets.py:

import os
import sqlite3
from datetime import date, timedelta

import pandas as pd
from dagster import (
    AssetExecutionContext,
    MaterializeResult,
    MetadataValue,
    asset,
)

from .resources import FXMacroDataResource

_DB_PATH = os.environ.get("FXMACRO_DB", "fxmacro_data.db")


def _ensure_table(conn: sqlite3.Connection, table: str) -> None:
    conn.execute(
        f"""
        CREATE TABLE IF NOT EXISTS {table} (
            date TEXT PRIMARY KEY,
            val  REAL,
            announcement_datetime TEXT
        )
        """
    )
    conn.commit()


def _upsert(conn: sqlite3.Connection, table: str, records: list[dict]) -> int:
    rows = [
        (r["date"], r.get("val"), r.get("announcement_datetime"))
        for r in records
        if r.get("date") and r.get("val") is not None
    ]
    conn.executemany(
        f"INSERT OR REPLACE INTO {table} (date, val, announcement_datetime) VALUES (?,?,?)",
        rows,
    )
    conn.commit()
    return len(rows)


@asset(group_name="macro_indicators", compute_kind="api")
def usd_policy_rate(
    context: AssetExecutionContext,
    fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
    """Latest Fed funds rate series from the FXMacroData announcements endpoint."""
    records = fxmacrodata.get_announcements("usd", "policy_rate", start="2015-01-01")
    with sqlite3.connect(_DB_PATH) as conn:
        _ensure_table(conn, "usd_policy_rate")
        n = _upsert(conn, "usd_policy_rate", records)
    latest = records[0] if records else {}
    context.log.info("Fed rate: %s%% (as of %s)", latest.get("val"), latest.get("date"))
    return MaterializeResult(
        metadata={
            "row_count": MetadataValue.int(n),
            "latest_date": MetadataValue.text(latest.get("date", "n/a")),
            "latest_val": MetadataValue.float(latest.get("val") or 0.0),
        }
    )


@asset(group_name="macro_indicators", compute_kind="api")
def usd_cpi(
    context: AssetExecutionContext,
    fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
    """US CPI (year-on-year) series. Tracks whether inflation is rising or falling."""
    records = fxmacrodata.get_announcements("usd", "inflation", start="2015-01-01")
    with sqlite3.connect(_DB_PATH) as conn:
        _ensure_table(conn, "usd_cpi")
        n = _upsert(conn, "usd_cpi", records)
    latest = records[0] if records else {}
    context.log.info("US CPI: %s%% (as of %s)", latest.get("val"), latest.get("date"))
    return MaterializeResult(
        metadata={
            "row_count": MetadataValue.int(n),
            "latest_date": MetadataValue.text(latest.get("date", "n/a")),
            "latest_val": MetadataValue.float(latest.get("val") or 0.0),
        }
    )


@asset(group_name="macro_indicators", compute_kind="api")
def eur_policy_rate(
    context: AssetExecutionContext,
    fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
    """ECB main refinancing rate series."""
    records = fxmacrodata.get_announcements("eur", "policy_rate", start="2015-01-01")
    with sqlite3.connect(_DB_PATH) as conn:
        _ensure_table(conn, "eur_policy_rate")
        n = _upsert(conn, "eur_policy_rate", records)
    latest = records[0] if records else {}
    context.log.info("ECB rate: %s%% (as of %s)", latest.get("val"), latest.get("date"))
    return MaterializeResult(
        metadata={
            "row_count": MetadataValue.int(n),
            "latest_date": MetadataValue.text(latest.get("date", "n/a")),
            "latest_val": MetadataValue.float(latest.get("val") or 0.0),
        }
    )


@asset(group_name="fx_rates", compute_kind="api")
def eurusd_spot(
    context: AssetExecutionContext,
    fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
    """Daily EUR/USD closing spot rate."""
    records = fxmacrodata.get_forex("eur", "usd", start="2023-01-01")
    with sqlite3.connect(_DB_PATH) as conn:
        _ensure_table(conn, "eurusd_spot")
        n = _upsert(conn, "eurusd_spot", records)
    latest = records[0] if records else {}
    context.log.info("EUR/USD: %s (as of %s)", latest.get("val"), latest.get("date"))
    return MaterializeResult(
        metadata={
            "row_count": MetadataValue.int(n),
            "latest_date": MetadataValue.text(latest.get("date", "n/a")),
            "latest_val": MetadataValue.float(latest.get("val") or 0.0),
        }
    )


@asset(group_name="calendar", compute_kind="api")
def usd_release_calendar(
    context: AssetExecutionContext,
    fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
    """Upcoming USD macro release dates and times."""
    events = fxmacrodata.get_calendar("usd")
    df = pd.DataFrame(events) if events else pd.DataFrame()
    context.log.info("Found %d upcoming USD events", len(df))
    # Surface a preview in the Dagster UI asset metadata pane
    preview = df.head(5).to_markdown(index=False) if not df.empty else "No events found"
    return MaterializeResult(
        metadata={
            "event_count": MetadataValue.int(len(df)),
            "preview": MetadataValue.md(preview),
        }
    )

Asset metadata in the Dagster UI

Every MaterializeResult returns structured metadata that Dagster renders in the asset detail pane — row counts, latest dates, preview tables. This means you can check at a glance whether a materialisation actually fetched fresh data without digging into logs.


Step 4 — Create a job and a daily schedule

A Dagster job selects which assets to materialise in one run. A schedule drives that job on a cron expression. We want fresh macro readings every weekday morning at 06:30 UTC — before the Frankfurt open — so any indicator released overnight is captured before European trading begins.

Create fxmacro_pipeline/jobs.py:

from dagster import (
    AssetSelection,
    ScheduleDefinition,
    define_asset_job,
)

# ── Jobs ──────────────────────────────────────────────────────────────────────

macro_refresh_job = define_asset_job(
    name="macro_refresh",
    description="Refresh all FXMacroData indicator and FX rate assets.",
    selection=AssetSelection.groups("macro_indicators", "fx_rates"),
)

calendar_refresh_job = define_asset_job(
    name="calendar_refresh",
    description="Pull the latest USD release calendar.",
    selection=AssetSelection.groups("calendar"),
)

# ── Schedules ─────────────────────────────────────────────────────────────────

# Weekdays at 06:30 UTC — 30 min before Frankfurt opens
macro_daily_schedule = ScheduleDefinition(
    name="macro_daily_at_0630_utc",
    job=macro_refresh_job,
    cron_schedule="30 6 * * 1-5",
    execution_timezone="UTC",
)

# Refresh the release calendar every Sunday evening so the week's
# upcoming events are loaded before Monday's first run.
calendar_weekly_schedule = ScheduleDefinition(
    name="calendar_weekly_on_sunday",
    job=calendar_refresh_job,
    cron_schedule="0 18 * * 0",
    execution_timezone="UTC",
)

Two separate jobs — one for indicator series, one for the release calendar — let you observe and re-run them independently. If the release calendar fetch fails (for example during a public holiday when the endpoint returns an empty payload), the indicator refresh job is not affected.


Step 5 — Wire everything into a Definitions object

Dagster's Definitions object is the single entry point that ties assets, resources, jobs, and schedules together. Edit the generated fxmacro_pipeline/__init__.py:

import os

from dagster import Definitions

from .assets import (
    eur_policy_rate,
    eurusd_spot,
    usd_cpi,
    usd_policy_rate,
    usd_release_calendar,
)
from .jobs import (
    calendar_refresh_job,
    calendar_weekly_schedule,
    macro_daily_schedule,
    macro_refresh_job,
)
from .resources import FXMacroDataResource

defs = Definitions(
    assets=[
        usd_policy_rate,
        usd_cpi,
        eur_policy_rate,
        eurusd_spot,
        usd_release_calendar,
    ],
    resources={
        "fxmacrodata": FXMacroDataResource(
            # Falls back to the FXMACRO_API_KEY env var inside the resource
            api_key=os.environ.get("FXMACRO_API_KEY", ""),
        ),
    },
    jobs=[macro_refresh_job, calendar_refresh_job],
    schedules=[macro_daily_schedule, calendar_weekly_schedule],
)

Resource injection at runtime

Any asset that declares fxmacrodata: FXMacroDataResource as a parameter automatically receives the configured instance at run-time. To swap in a mock for testing, override the resource in your test Definitions — the asset code does not change.


Step 6 — Add a sensor to alert on surprising inflation prints

Sensors in Dagster poll for external conditions and trigger runs or alert channels when something changes. The following sensor runs after every usd_cpi materialisation: if the latest CPI reading is more than 0.4 percentage points above the prior reading, it fires a webhook alert so your strategy or risk system can react immediately.

Add fxmacro_pipeline/sensors.py:

import os
import sqlite3

import requests as http
from dagster import (
    RunStatusSensorContext,
    asset_sensor,
    AssetKey,
    EventLogEntry,
    SensorResult,
    SkipReason,
)

_DB_PATH = os.environ.get("FXMACRO_DB", "fxmacro_data.db")
_ALERT_WEBHOOK = os.environ.get("ALERT_WEBHOOK_URL", "")
_SURPRISE_THRESHOLD = 0.4  # percentage points


@asset_sensor(asset_key=AssetKey("usd_cpi"), job_name="macro_refresh")
def cpi_surprise_alert_sensor(
    context: RunStatusSensorContext,
    asset_event: EventLogEntry,
) -> SensorResult | SkipReason:
    """Alert when the latest US CPI print is significantly above the prior reading."""
    try:
        conn = sqlite3.connect(_DB_PATH)
        rows = conn.execute(
            "SELECT date, val FROM usd_cpi ORDER BY date DESC LIMIT 2"
        ).fetchall()
        conn.close()
    except Exception as exc:
        return SkipReason(f"DB read failed: {exc}")

    if len(rows) < 2:
        return SkipReason("Not enough CPI history yet.")

    latest_date, latest_val = rows[0]
    prior_date, prior_val = rows[1]
    surprise = (latest_val or 0) - (prior_val or 0)

    context.log.info(
        "CPI %s: %.2f%% (prior %s: %.2f%%, surprise: %+.2f pp)",
        latest_date, latest_val, prior_date, prior_val, surprise,
    )

    if surprise >= _SURPRISE_THRESHOLD and _ALERT_WEBHOOK:
        payload = {
            "text": (
                f":fire: *USD CPI Surprise* — latest {latest_date}: *{latest_val:.1f}%* "
                f"vs prior {prior_date}: {prior_val:.1f}% "
                f"(surprise: *{surprise:+.2f} pp*)"
            )
        }
        http.post(_ALERT_WEBHOOK, json=payload, timeout=8)
        context.log.warning("CPI surprise alert fired: %+.2f pp", surprise)

    return SensorResult(cursor=str(latest_date))

Register the sensor in __init__.py by importing it and adding it to the Definitions sensors list:

from .sensors import cpi_surprise_alert_sensor

defs = Definitions(
    # ... existing fields ...
    sensors=[cpi_surprise_alert_sensor],
)

Step 7 — Launch the Dagster UI and run your first materialisation

Start the local development server. Dagster will discover your Definitions through the pyproject.toml entry point:

dagster dev

Navigate to http://localhost:3000. You will see the Asset Graph showing all five assets grouped into their logical groups (macro_indicators, fx_rates, calendar). Click Materialise All to run the first ingestion pass — within seconds, the asset pane will show row counts, latest dates, and latest values pulled directly from the FXMacroData API.

Verify the data in SQLite

sqlite3 fxmacro_data.db
sqlite> SELECT date, val FROM usd_policy_rate ORDER BY date DESC LIMIT 5;
sqlite> SELECT date, val FROM usd_cpi ORDER BY date DESC LIMIT 5;
sqlite> SELECT date, val FROM eurusd_spot ORDER BY date DESC LIMIT 3;

To activate the schedule, navigate to Overview → Schedules in the Dagster UI and toggle macro_daily_at_0630_utc to Running. Dagster's daemon process will now wake up every weekday at 06:30 UTC and materialise the macro indicator assets automatically.


Step 8 — Write unit tests with a mock resource

One of Dagster's key design benefits is that assets are ordinary Python functions — easy to unit-test without running the full orchestration stack. Swap the live resource for a mock to keep tests fast and offline.

# tests/test_assets.py
import sqlite3
import tempfile
import os

from unittest.mock import MagicMock

import pytest
from dagster import build_asset_context, materialize

from fxmacro_pipeline.assets import usd_policy_rate, usd_cpi
from fxmacro_pipeline.resources import FXMacroDataResource


MOCK_RATE_DATA = [
    {"date": "2025-12-18", "val": 4.5, "announcement_datetime": "2025-12-18T19:00:00Z"},
    {"date": "2025-11-07", "val": 4.75, "announcement_datetime": "2025-11-07T19:00:00Z"},
]

MOCK_CPI_DATA = [
    {"date": "2025-12-11", "val": 2.7, "announcement_datetime": "2025-12-11T13:30:00Z"},
    {"date": "2025-11-13", "val": 2.6, "announcement_datetime": "2025-11-13T13:30:00Z"},
]


class MockFXMacroData(FXMacroDataResource):
    def get_announcements(self, currency, indicator, start="2020-01-01"):
        if indicator == "policy_rate":
            return MOCK_RATE_DATA
        if indicator == "inflation":
            return MOCK_CPI_DATA
        return []

    def get_forex(self, base, quote, start="2023-01-01"):
        return [{"date": "2025-12-31", "val": 1.0482, "announcement_datetime": None}]

    def get_calendar(self, currency):
        return []


@pytest.fixture(autouse=True)
def temp_db(tmp_path, monkeypatch):
    monkeypatch.setenv("FXMACRO_DB", str(tmp_path / "test.db"))


def test_usd_policy_rate_materialises():
    result = materialize(
        [usd_policy_rate],
        resources={"fxmacrodata": MockFXMacroData()},
    )
    assert result.success
    mat = result.asset_materializations_for_node("usd_policy_rate")[0]
    assert mat.metadata["row_count"].value == 2
    assert mat.metadata["latest_val"].value == pytest.approx(4.5)


def test_usd_cpi_materialises():
    result = materialize(
        [usd_cpi],
        resources={"fxmacrodata": MockFXMacroData()},
    )
    assert result.success
    mat = result.asset_materializations_for_node("usd_cpi")[0]
    assert mat.metadata["latest_val"].value == pytest.approx(2.7)
pip install pytest
pytest tests/ -v

Step 9 — Deploy to production with Dagster+

For teams that want a hosted control plane without managing their own daemon, Dagster+ (the managed cloud offering) requires only a single additional configuration step: replace dagster dev with the Dagster Cloud agent, which reads the same Definitions object and executes runs in your infrastructure.

Set your API key as a Dagster+ environment variable (Deployment → Environment Variables) with the name FXMACRO_API_KEY. The resource will pick it up automatically — no code changes required. The same pattern applies to any CI/CD environment that injects secrets as environment variables (GitHub Actions, GitLab CI, CircleCI).

To deploy on a self-hosted Kubernetes cluster, package the pipeline as a Docker image, push it to your registry, and point the Dagster Helm chart at it. The FXMacroData resource connects outbound to fxmacrodata.com — ensure your cluster's egress policy allows HTTPS to that host.


What you built

By following this guide you now have a working Dagster pipeline that:

  • Defines a reusable, injectable FXMacroDataResource that handles all API auth and error handling in one place
  • Exposes five software-defined assets — Fed rate, US CPI, ECB rate, EUR/USD spot, and the USD release calendar — each persisted to SQLite with full Dagster metadata
  • Schedules weekday refreshes at 06:30 UTC via a cron-driven Dagster schedule so you always have up-to-date readings before European markets open
  • Fires a webhook alert when a CPI print surprises to the upside, giving downstream systems an event-driven trigger rather than a polling loop
  • Includes fast offline unit tests using a mock resource, so the CI pipeline never makes live API calls

Next steps

  • Extend the pipeline with additional currencies — browse the full indicator catalogue at /api-data-docs and add new assets following the same pattern
  • Replace the SQLite store with a Postgres or BigQuery I/O manager for production scale
  • Add a downstream asset that reads from the DB, computes a macro divergence score, and writes trading signals to a signals table — keeping your strategy logic separate from the ingestion layer
  • Explore COT positioning data as a sentiment overlay to complement the indicator series