Cómo integrar FXMacroData con Dagster banner image

Implementation

How-To Guides

Cómo integrar FXMacroData con Dagster

Construir una tubería de datos Dagster lista para la producción que extraiga los indicadores macro de FXMacroData en un cronograma, almacene los resultados en una base de datos local y libere eventos de calendario de superficies para un cronometraje de ejecución más inteligente.

Disponible también en English

Dagster es una plataforma de orquestación de datos nativa de Python construida alrededor del concepto de activos definidos por software: artefactos de datos que conocen su propio linaje, pueden materializarse bajo demanda o en un cronograma, y exponer metadatos ricos y observabilidad fuera de la caja. Para los flujos de trabajo de datos macro tirar de las series de indicadores, detectar eventos de liberación, almacenar instantáneas y alertar sobre anomalías Dagster es un ajuste excelente. Esta guía recorre la construcción de una tubería que ingiere los indicadores FXMacroData en un almacén local de SQLite, muestra los próximos eventos del calendario de liberaciones y se ejecuta automáticamente en un horario diario.

Lo que construirás

  • Un recurso Dagster de FXMacroData un cliente API reutilizable y configurable compartido entre todos los activos
  • Cuatro activos definidos por software tipos de interés, IPC, forex spot y calendario de liberación del par de divisas elegido
  • Trabajo diario y horario se ejecuta todas las mañanas de la semana antes de la apertura de Londres para actualizar las últimas lecturas
  • Un sensor de anomalías vigila las impresiones de inflación inesperadas y dispara una alerta a través de un webhook

Los requisitos previos

  • Python 3.10+ todos los fragmentos utilizan pistas de tipo modernas y match declaraciones
  • La clave de la API de FXMacroData inscribirse en / suscribirse; los datos del indicador USD son accesibles al público sin necesidad de una clave
  • Familiaridad básica con Dagster debe saber qué son los activos y los puestos de trabajo; El Dagster arranca rápido cubre los conceptos básicos en diez minutos

- ¿ Qué pasa ?

Paso 1 Instalar Dagster y las dependencias del proyecto

Crear un nuevo entorno virtual e instalar Dagster junto con el pequeño conjunto de bibliotecas que utiliza este pipeline. dagster ¿ Qué ? dagster-webserver El uso de la misma versión evita el error de instalación más común.

python -m venv .venv
source .venv/bin/activate            # Windows: .venv\Scripts\activate

pip install "dagster==1.9.*" "dagster-webserver==1.9.*" \
            requests pandas sqlalchemy

Inicializa un diseño de proyecto mínimo. pyproject.toml que permite que la interfaz de usuario local descubra sus definiciones automáticamente.

dagster project scaffold --name fxmacro_pipeline
cd fxmacro_pipeline

Almacene su clave de API como una variable de entorno. Nunca codifique las credenciales en los archivos fuente.

export FXMACRO_API_KEY="YOUR_API_KEY"

En los hosts de producción, inyecte la clave a través de la gestión secreta de su programador (secretos de acciones de GitHub, secretos de Kubernetes, variables de entorno de Dagster Cloud).


- ¿ Qué pasa ?

Paso 2 Definir un recurso FXMacroData reutilizable

Un Dagster . recursos es una dependencia compartida e inyectable análoga a una conexión a una base de datos o una sesión HTTP. Envuelver la API REST FXMacroData en un recurso significa que todos los activos pueden llamarla sin duplicar la lógica de autenticación o el manejo del tiempo de espera.

Crear . fxmacro_pipeline/resources.py¿ Qué ?

import os
from typing import Any

import requests
from dagster import ConfigurableResource, get_dagster_logger

_BASE_URL = "https://fxmacrodata.com/api/v1"


class FXMacroDataResource(ConfigurableResource):
    """Thin wrapper around the FXMacroData REST API."""

    api_key: str = ""           # overridden at run-time from env
    timeout: int = 15

    def _get(self, path: str, **params: Any) -> dict:
        logger = get_dagster_logger()
        key = self.api_key or os.environ.get("FXMACRO_API_KEY", "")
        resp = requests.get(
            f"{_BASE_URL}{path}",
            params={"api_key": key, **params},
            timeout=self.timeout,
        )
        resp.raise_for_status()
        data = resp.json()
        logger.debug("GET %s → %d records", path, len(data.get("data", [])))
        return data

    def get_announcements(
        self,
        currency: str,
        indicator: str,
        start: str = "2020-01-01",
    ) -> list[dict]:
        """Return time-series records for a macro indicator."""
        result = self._get(
            f"/announcements/{currency.lower()}/{indicator}",
            start=start,
        )
        return result.get("data", [])

    def get_forex(self, base: str, quote: str, start: str = "2023-01-01") -> list[dict]:
        """Return daily FX spot-rate records."""
        result = self._get(f"/forex/{base.lower()}/{quote.lower()}", start=start)
        return result.get("data", [])

    def get_calendar(self, currency: str) -> list[dict]:
        """Return the upcoming release calendar for a currency."""
        result = self._get(f"/calendar/{currency.lower()}")
        return result.get("data", [])

¿Por qué ConfigurableResource?

ConfigurableResource permite intercambiar credenciales y tiempos de espera entre dev y prod sin tocar el código del activo. Definitions y Dagster lo inyecta donde sea declarado como un parámetro.


- ¿ Qué pasa ?

Paso 3 Definición de los activos de datos macro

Cada activo Dagster representa un producto de datos lógicos. La materialización de un activo recupera los datos más recientes de FXMacroData y los persiste localmente. Tasa de política de la Fed, el Indicador de precios de los Estados Unidos, el Tipo de interés de la BCEUn quinto activo lee el calendario de lanzamiento para que el canal sepa cuándo se deben hacer los próximos anuncios de alto impacto.

Crear . fxmacro_pipeline/assets.py¿ Qué ?

import os
import sqlite3
from datetime import date, timedelta

import pandas as pd
from dagster import (
    AssetExecutionContext,
    MaterializeResult,
    MetadataValue,
    asset,
)

from .resources import FXMacroDataResource

_DB_PATH = os.environ.get("FXMACRO_DB", "fxmacro_data.db")


def _ensure_table(conn: sqlite3.Connection, table: str) -> None:
    conn.execute(
        f"""
        CREATE TABLE IF NOT EXISTS {table} (
            date TEXT PRIMARY KEY,
            val  REAL,
            announcement_datetime TEXT
        )
        """
    )
    conn.commit()


def _upsert(conn: sqlite3.Connection, table: str, records: list[dict]) -> int:
    rows = [
        (r["date"], r.get("val"), r.get("announcement_datetime"))
        for r in records
        if r.get("date") and r.get("val") is not None
    ]
    conn.executemany(
        f"INSERT OR REPLACE INTO {table} (date, val, announcement_datetime) VALUES (?,?,?)",
        rows,
    )
    conn.commit()
    return len(rows)


@asset(group_name="macro_indicators", compute_kind="api")
def usd_policy_rate(
    context: AssetExecutionContext,
    fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
    """Latest Fed funds rate series from the FXMacroData announcements endpoint."""
    records = fxmacrodata.get_announcements("usd", "policy_rate", start="2015-01-01")
    with sqlite3.connect(_DB_PATH) as conn:
        _ensure_table(conn, "usd_policy_rate")
        n = _upsert(conn, "usd_policy_rate", records)
    latest = records[0] if records else {}
    context.log.info("Fed rate: %s%% (as of %s)", latest.get("val"), latest.get("date"))
    return MaterializeResult(
        metadata={
            "row_count": MetadataValue.int(n),
            "latest_date": MetadataValue.text(latest.get("date", "n/a")),
            "latest_val": MetadataValue.float(latest.get("val") or 0.0),
        }
    )


@asset(group_name="macro_indicators", compute_kind="api")
def usd_cpi(
    context: AssetExecutionContext,
    fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
    """US CPI (year-on-year) series. Tracks whether inflation is rising or falling."""
    records = fxmacrodata.get_announcements("usd", "inflation", start="2015-01-01")
    with sqlite3.connect(_DB_PATH) as conn:
        _ensure_table(conn, "usd_cpi")
        n = _upsert(conn, "usd_cpi", records)
    latest = records[0] if records else {}
    context.log.info("US CPI: %s%% (as of %s)", latest.get("val"), latest.get("date"))
    return MaterializeResult(
        metadata={
            "row_count": MetadataValue.int(n),
            "latest_date": MetadataValue.text(latest.get("date", "n/a")),
            "latest_val": MetadataValue.float(latest.get("val") or 0.0),
        }
    )


@asset(group_name="macro_indicators", compute_kind="api")
def eur_policy_rate(
    context: AssetExecutionContext,
    fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
    """ECB main refinancing rate series."""
    records = fxmacrodata.get_announcements("eur", "policy_rate", start="2015-01-01")
    with sqlite3.connect(_DB_PATH) as conn:
        _ensure_table(conn, "eur_policy_rate")
        n = _upsert(conn, "eur_policy_rate", records)
    latest = records[0] if records else {}
    context.log.info("ECB rate: %s%% (as of %s)", latest.get("val"), latest.get("date"))
    return MaterializeResult(
        metadata={
            "row_count": MetadataValue.int(n),
            "latest_date": MetadataValue.text(latest.get("date", "n/a")),
            "latest_val": MetadataValue.float(latest.get("val") or 0.0),
        }
    )


@asset(group_name="fx_rates", compute_kind="api")
def eurusd_spot(
    context: AssetExecutionContext,
    fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
    """Daily EUR/USD closing spot rate."""
    records = fxmacrodata.get_forex("eur", "usd", start="2023-01-01")
    with sqlite3.connect(_DB_PATH) as conn:
        _ensure_table(conn, "eurusd_spot")
        n = _upsert(conn, "eurusd_spot", records)
    latest = records[0] if records else {}
    context.log.info("EUR/USD: %s (as of %s)", latest.get("val"), latest.get("date"))
    return MaterializeResult(
        metadata={
            "row_count": MetadataValue.int(n),
            "latest_date": MetadataValue.text(latest.get("date", "n/a")),
            "latest_val": MetadataValue.float(latest.get("val") or 0.0),
        }
    )


@asset(group_name="calendar", compute_kind="api")
def usd_release_calendar(
    context: AssetExecutionContext,
    fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
    """Upcoming USD macro release dates and times."""
    events = fxmacrodata.get_calendar("usd")
    df = pd.DataFrame(events) if events else pd.DataFrame()
    context.log.info("Found %d upcoming USD events", len(df))
    # Surface a preview in the Dagster UI asset metadata pane
    preview = df.head(5).to_markdown(index=False) if not df.empty else "No events found"
    return MaterializeResult(
        metadata={
            "event_count": MetadataValue.int(len(df)),
            "preview": MetadataValue.md(preview),
        }
    )

Metadatos de activos en la interfaz de usuario de Dagster

Cada uno . MaterializeResult devuelve metadatos estructurados que Dagster muestra en el panel de detalles de activos recuentos de filas, fechas más recientes, tablas de vista previa. Esto significa que puede comprobar de un vistazo si una materialización realmente obtuvo datos nuevos sin tener que buscar en los registros.


- ¿ Qué pasa ?

Paso 4 Establezca un trabajo y un horario diario

Un Dagster . trabajo Selecciona los activos que se materializarán en una sola ejecución. el calendario Queremos nuevas lecturas de macro cada mañana de los días laborables a las 06:30 UTC antes de la apertura de Frankfurt para que cualquier indicador publicado durante la noche se capture antes de que comience el comercio europeo.

Crear . fxmacro_pipeline/jobs.py¿ Qué ?

from dagster import (
    AssetSelection,
    ScheduleDefinition,
    define_asset_job,
)

# ── Jobs ──────────────────────────────────────────────────────────────────────

macro_refresh_job = define_asset_job(
    name="macro_refresh",
    description="Refresh all FXMacroData indicator and FX rate assets.",
    selection=AssetSelection.groups("macro_indicators", "fx_rates"),
)

calendar_refresh_job = define_asset_job(
    name="calendar_refresh",
    description="Pull the latest USD release calendar.",
    selection=AssetSelection.groups("calendar"),
)

# ── Schedules ─────────────────────────────────────────────────────────────────

# Weekdays at 06:30 UTC — 30 min before Frankfurt opens
macro_daily_schedule = ScheduleDefinition(
    name="macro_daily_at_0630_utc",
    job=macro_refresh_job,
    cron_schedule="30 6 * * 1-5",
    execution_timezone="UTC",
)

# Refresh the release calendar every Sunday evening so the week's
# upcoming events are loaded before Monday's first run.
calendar_weekly_schedule = ScheduleDefinition(
    name="calendar_weekly_on_sunday",
    job=calendar_refresh_job,
    cron_schedule="0 18 * * 0",
    execution_timezone="UTC",
)

Dos trabajos separados uno para la serie de indicadores, otro para el calendario de lanzamiento le permiten observarlos y volver a ejecutarlos de forma independiente. Si la búsqueda del calendario no funciona (por ejemplo, durante un día festivo cuando el punto final devuelve una carga útil vacía), el trabajo de actualización del indicador no se ve afectado.


- ¿ Qué pasa ?

Paso 5 Conecte todo en un objeto de definiciones

Es de Dagster. Definitions Objeto es el punto de entrada único que une activos, recursos, trabajos y horarios juntos. fxmacro_pipeline/__init__.py¿ Qué ?

import os

from dagster import Definitions

from .assets import (
    eur_policy_rate,
    eurusd_spot,
    usd_cpi,
    usd_policy_rate,
    usd_release_calendar,
)
from .jobs import (
    calendar_refresh_job,
    calendar_weekly_schedule,
    macro_daily_schedule,
    macro_refresh_job,
)
from .resources import FXMacroDataResource

defs = Definitions(
    assets=[
        usd_policy_rate,
        usd_cpi,
        eur_policy_rate,
        eurusd_spot,
        usd_release_calendar,
    ],
    resources={
        "fxmacrodata": FXMacroDataResource(
            # Falls back to the FXMACRO_API_KEY env var inside the resource
            api_key=os.environ.get("FXMACRO_API_KEY", ""),
        ),
    },
    jobs=[macro_refresh_job, calendar_refresh_job],
    schedules=[macro_daily_schedule, calendar_weekly_schedule],
)

Inyección de recursos en tiempo de ejecución

Cualquier activo que declare fxmacrodata: FXMacroDataResource Para cambiar en un simulacro para pruebas, anule el recurso en su prueba Definitions el código del activo no cambia.


- ¿ Qué pasa ?

Paso 6 Añadir un sensor para alertar sobre las impresiones de inflación sorprendentes

Los sensores en Dagster encuestamos las condiciones externas y activamos los canales de alerta cuando algo cambia. usd_cpi materialización: si la última lectura del IPC es superior en más de 0,4 puntos porcentuales a la anterior, se dispara una alerta webhook para que su estrategia o sistema de riesgo pueda reaccionar inmediatamente.

Añadir fxmacro_pipeline/sensors.py¿ Qué ?

import os
import sqlite3

import requests as http
from dagster import (
    RunStatusSensorContext,
    asset_sensor,
    AssetKey,
    EventLogEntry,
    SensorResult,
    SkipReason,
)

_DB_PATH = os.environ.get("FXMACRO_DB", "fxmacro_data.db")
_ALERT_WEBHOOK = os.environ.get("ALERT_WEBHOOK_URL", "")
_SURPRISE_THRESHOLD = 0.4  # percentage points


@asset_sensor(asset_key=AssetKey("usd_cpi"), job_name="macro_refresh")
def cpi_surprise_alert_sensor(
    context: RunStatusSensorContext,
    asset_event: EventLogEntry,
) -> SensorResult | SkipReason:
    """Alert when the latest US CPI print is significantly above the prior reading."""
    try:
        conn = sqlite3.connect(_DB_PATH)
        rows = conn.execute(
            "SELECT date, val FROM usd_cpi ORDER BY date DESC LIMIT 2"
        ).fetchall()
        conn.close()
    except Exception as exc:
        return SkipReason(f"DB read failed: {exc}")

    if len(rows) < 2:
        return SkipReason("Not enough CPI history yet.")

    latest_date, latest_val = rows[0]
    prior_date, prior_val = rows[1]
    surprise = (latest_val or 0) - (prior_val or 0)

    context.log.info(
        "CPI %s: %.2f%% (prior %s: %.2f%%, surprise: %+.2f pp)",
        latest_date, latest_val, prior_date, prior_val, surprise,
    )

    if surprise >= _SURPRISE_THRESHOLD and _ALERT_WEBHOOK:
        payload = {
            "text": (
                f":fire: *USD CPI Surprise* — latest {latest_date}: *{latest_val:.1f}%* "
                f"vs prior {prior_date}: {prior_val:.1f}% "
                f"(surprise: *{surprise:+.2f} pp*)"
            )
        }
        http.post(_ALERT_WEBHOOK, json=payload, timeout=8)
        context.log.warning("CPI surprise alert fired: %+.2f pp", surprise)

    return SensorResult(cursor=str(latest_date))

Registra el sensor en __init__.py importándolo y añadiéndolo a la Definitions sensors lista:

from .sensors import cpi_surprise_alert_sensor

defs = Definitions(
    # ... existing fields ...
    sensors=[cpi_surprise_alert_sensor],
)

- ¿ Qué pasa ?

Paso 7 Lanza la interfaz de usuario Dagster y ejecuta tu primera materialización

Enciende el servidor de desarrollo local. Definitions a través de la pyproject.toml punto de entrada:

dagster dev

Navegación a http://localhost:3000Verás el Gráfico de activos En el cuadro se indican los cinco activos agrupados en sus grupos lógicos (macro_indicators¿ Qué ? fx_rates¿ Qué ? calendarHaga clic Materializad todo Para ejecutar el primer pase de ingestión en cuestión de segundos, el panel de activos mostrará recuentos de filas, últimas fechas y últimos valores extraídos directamente de la API FXMacroData.

Verificar los datos en SQLite

sqlite3 fxmacro_data.db
sqlite> SELECT date, val FROM usd_policy_rate ORDER BY date DESC LIMIT 5;
sqlite> SELECT date, val FROM usd_cpi ORDER BY date DESC LIMIT 5;
sqlite> SELECT date, val FROM eurusd_spot ORDER BY date DESC LIMIT 3;

Para activar el horario, navegar a Resumen → Horarios en la interfaz de usuario Dagster y cambiar macro_daily_at_0630_utc ¿ Qué ? CorriendoEl proceso daemon de Dagster ahora se despertará todos los días laborables a las 06:30 UTC y materializará los activos de los indicadores macro automáticamente.


- Paso 8: Prueba

Paso 8 Escribir pruebas unitarias con un recurso simulado

Uno de los principales beneficios de diseño de Dagster es que los activos son funciones Python ordinarias fáciles de probar en unidad sin ejecutar la pila de orquestación completa.

# tests/test_assets.py
import sqlite3
import tempfile
import os

from unittest.mock import MagicMock

import pytest
from dagster import build_asset_context, materialize

from fxmacro_pipeline.assets import usd_policy_rate, usd_cpi
from fxmacro_pipeline.resources import FXMacroDataResource


MOCK_RATE_DATA = [
    {"date": "2025-12-18", "val": 4.5, "announcement_datetime": "2025-12-18T19:00:00Z"},
    {"date": "2025-11-07", "val": 4.75, "announcement_datetime": "2025-11-07T19:00:00Z"},
]

MOCK_CPI_DATA = [
    {"date": "2025-12-11", "val": 2.7, "announcement_datetime": "2025-12-11T13:30:00Z"},
    {"date": "2025-11-13", "val": 2.6, "announcement_datetime": "2025-11-13T13:30:00Z"},
]


class MockFXMacroData(FXMacroDataResource):
    def get_announcements(self, currency, indicator, start="2020-01-01"):
        if indicator == "policy_rate":
            return MOCK_RATE_DATA
        if indicator == "inflation":
            return MOCK_CPI_DATA
        return []

    def get_forex(self, base, quote, start="2023-01-01"):
        return [{"date": "2025-12-31", "val": 1.0482, "announcement_datetime": None}]

    def get_calendar(self, currency):
        return []


@pytest.fixture(autouse=True)
def temp_db(tmp_path, monkeypatch):
    monkeypatch.setenv("FXMACRO_DB", str(tmp_path / "test.db"))


def test_usd_policy_rate_materialises():
    result = materialize(
        [usd_policy_rate],
        resources={"fxmacrodata": MockFXMacroData()},
    )
    assert result.success
    mat = result.asset_materializations_for_node("usd_policy_rate")[0]
    assert mat.metadata["row_count"].value == 2
    assert mat.metadata["latest_val"].value == pytest.approx(4.5)


def test_usd_cpi_materialises():
    result = materialize(
        [usd_cpi],
        resources={"fxmacrodata": MockFXMacroData()},
    )
    assert result.success
    mat = result.asset_materializations_for_node("usd_cpi")[0]
    assert mat.metadata["latest_val"].value == pytest.approx(2.7)
pip install pytest
pytest tests/ -v

- Paso 9: Producción

Paso 9 Implementación en la producción con Dagster+

Para equipos que quieren un avión de control alojado sin administrar su propio daemon, El Dagster+ (la oferta de nube gestionada) requiere sólo un paso de configuración adicional: sustituir dagster dev con el agente Dagster Cloud, que dice lo mismo. Definitions Objeto y ejecuta ejecuta en su infraestructura.

Establezca su clave de API como una variable de entorno Dagster + (Despliegue → Variables del entorno- ¿ Qué ? FXMACRO_API_KEYEl recurso lo recogerá automáticamente sin cambios de código necesarios. El mismo patrón se aplica a cualquier entorno CI/CD que inyecte secretos como variables de entorno (GitHub Actions, GitLab CI, CircleCI).

Para implementar en un clúster Kubernetes auto-alojado, empaquete la tubería como una imagen Docker, empuje a su registro y apunte el diagrama Dagster Helm a él. fxmacrodata.com asegúrese de que la política de salida de su grupo permite HTTPS a ese host.


- ¿ Qué es lo que está pasando ?

Lo que construiste

Siguiendo esta guía ahora tiene una tubería Dagster que funciona:

  • Define un producto reutilizable e inyectable. FXMacroDataResource que maneja toda la autenticación de API y manejo de errores en un solo lugar
  • Expone cinco activos definidos por software tasa de la Fed, el IPC de EE.UU., la tasa del BCE, el spot EUR/USD y el calendario de liberación del USD cada uno persistió en SQLite con los metadatos completos de Dagster
  • Programa actualizaciones de días laborables a las 06:30 UTC a través de un cron-driven Dagster programación para que siempre tenga lecturas actualizadas antes de que los mercados europeos abran
  • Se dispara una alerta webhook cuando un IPC imprime sorpresas al alza, dando a los sistemas aguas abajo un desencadenante impulsado por eventos en lugar de un bucle de votación
  • Incluye pruebas de unidades fuera de línea rápidas utilizando un recurso simulado, por lo que el canal de CI nunca hace llamadas de API en vivo

Los siguientes pasos

  • Ampliar la cartera con monedas adicionales consulte el catálogo completo de indicadores en /api-datos-doc y añadir nuevos activos siguiendo el mismo patrón
  • Reemplazar el almacén SQLite con un administrador de E/S de Postgres o BigQuery para escala de producción
  • Añadir un activo descendente que lee desde el DB, calcula una puntuación de divergencia macro, y escribe señales de comercio a una tabla de señales manteniendo su lógica de estrategia separada de la capa de ingestión
  • Explorar . Datos de posicionamiento del COT como superposición de sentimiento para complementar la serie de indicadores

Blogroll