Como integrar FXMacroData com Dagster banner image

Implementation

How-To Guides

Como integrar FXMacroData com Dagster

Construir um pipeline de dados Dagster pronto para produção que extraia indicadores macro do FXMacroData em um cronograma, armazena os resultados em um banco de dados local e libera eventos de calendário de superfícies para um cronometragem de execução mais inteligente.

Também disponível em English

Dagster é uma plataforma de orquestração de dados nativa Python construída em torno do conceito de Ativos definidos por software: artefatos de dados que conhecem sua própria linhagem, podem ser materializados sob demanda ou em um cronograma, e expor metadados ricos e observabilidade fora da caixa. Para fluxos de trabalho de dados macro puxando séries de indicadores, detectando eventos de lançamento, armazenando instantâneos e alertando sobre anomalias Dagster é um excelente ajuste. Este guia percorre a construção de um pipeline que ingere indicadores FXMacroData em um armazém local do SQLite, surfa eventos de calendário de lancamento futuros e é executado automaticamente em um calendário diário.

O que você vai construir

  • Um recurso FXMacroData Dagster um cliente API reutilizável e configurável partilhado entre todos os activos
  • Quatro activos definidos por software taxas de juro, IPC, forex spot e calendário de lançamento do par de moedas escolhido
  • Trabalho e horário diários é transmitido todas as manhãs de semana antes da abertura do Londres para actualizar as últimas leituras
  • Um sensor de anomalias vigia as impressões de inflação inesperada e dispara um alerta através de um webhook

Requisitos

  • Python 3.10+ todos os fragmentos usam sugestões de tipos modernos e match declarações
  • Chave da API do FXMacroData Inscreva-se em / subscrever; os dados do indicador USD são acessíveis ao público sem uma chave
  • Familiarização básica com Dagster deve saber quais são os activos e os empregos; - O Dagster começa rápido . Abre os fundamentos em dez minutos.

- Passo 1 -

Passo 1 Instalar o Dagster e as dependências do projeto

Crie um novo ambiente virtual e instale o Dagster ao lado do pequeno conjunto de bibliotecas que este pipeline usa. dagster E ... dagster-webserver A utilização de uma versão mais simples do mesmo sistema evita o problema mais comum de instalação.

python -m venv .venv
source .venv/bin/activate            # Windows: .venv\Scripts\activate

pip install "dagster==1.9.*" "dagster-webserver==1.9.*" \
            requests pandas sqlalchemy

Inicialmente um layout de projeto mínimo. pyproject.toml que permite que a interface de usuário local descubra suas definições automaticamente.

dagster project scaffold --name fxmacro_pipeline
cd fxmacro_pipeline

Armazenar sua chave de API como uma variável de ambiente. Nunca credenciais de código rígido em arquivos de origem.

export FXMACRO_API_KEY="YOUR_API_KEY"

Em hosts de produção, injete a chave através da gestão secreta do seu agendador (segredos de ações do GitHub, segredos do Kubernetes, variáveis de ambiente da Dagster Cloud).


- O passo 2 .

Passo 2 Definir um recurso FXMacroData reutilizável

Um Dagster . recursos é uma dependência compartilhada e injetável análoga a uma conexão de banco de dados ou uma sessão HTTP. Envolver a API REST FXMacroData em um recurso significa que todos os ativos podem chamá-lo sem duplicar a lógica de autenticação ou o manuseio de timeout.

Criar . fxmacro_pipeline/resources.py- Não .

import os
from typing import Any

import requests
from dagster import ConfigurableResource, get_dagster_logger

_BASE_URL = "https://fxmacrodata.com/api/v1"


class FXMacroDataResource(ConfigurableResource):
    """Thin wrapper around the FXMacroData REST API."""

    api_key: str = ""           # overridden at run-time from env
    timeout: int = 15

    def _get(self, path: str, **params: Any) -> dict:
        logger = get_dagster_logger()
        key = self.api_key or os.environ.get("FXMACRO_API_KEY", "")
        resp = requests.get(
            f"{_BASE_URL}{path}",
            params={"api_key": key, **params},
            timeout=self.timeout,
        )
        resp.raise_for_status()
        data = resp.json()
        logger.debug("GET %s → %d records", path, len(data.get("data", [])))
        return data

    def get_announcements(
        self,
        currency: str,
        indicator: str,
        start: str = "2020-01-01",
    ) -> list[dict]:
        """Return time-series records for a macro indicator."""
        result = self._get(
            f"/announcements/{currency.lower()}/{indicator}",
            start=start,
        )
        return result.get("data", [])

    def get_forex(self, base: str, quote: str, start: str = "2023-01-01") -> list[dict]:
        """Return daily FX spot-rate records."""
        result = self._get(f"/forex/{base.lower()}/{quote.lower()}", start=start)
        return result.get("data", [])

    def get_calendar(self, currency: str) -> list[dict]:
        """Return the upcoming release calendar for a currency."""
        result = self._get(f"/calendar/{currency.lower()}")
        return result.get("data", [])

Porquê ConfigurableResource?

ConfigurableResource permite trocar credenciais e timeouts entre dev e prod sem tocar no código do ativo. Definitions e Dagster injeta onde quer que seja declarado como um parâmetro.


- Passo 3 -

Passo 3 Definição dos ativos de dados macro

Cada ativo Dagster representa um produto de dados lógicos. Materializando um ativo, obtém os dados mais recentes do FXMacroData e persiste localmente. Taxa de política monetária- O quê ? IPC dos EUA- O quê ? Taxa de juro do BCE, e a série spot EUR/USD. Um quinto activo lê então o calendário de lançamento para que o pipeline saiba quando os próximos anúncios de alto impacto devem ser feitos.

Criar . fxmacro_pipeline/assets.py- Não .

import os
import sqlite3
from datetime import date, timedelta

import pandas as pd
from dagster import (
    AssetExecutionContext,
    MaterializeResult,
    MetadataValue,
    asset,
)

from .resources import FXMacroDataResource

_DB_PATH = os.environ.get("FXMACRO_DB", "fxmacro_data.db")


def _ensure_table(conn: sqlite3.Connection, table: str) -> None:
    conn.execute(
        f"""
        CREATE TABLE IF NOT EXISTS {table} (
            date TEXT PRIMARY KEY,
            val  REAL,
            announcement_datetime TEXT
        )
        """
    )
    conn.commit()


def _upsert(conn: sqlite3.Connection, table: str, records: list[dict]) -> int:
    rows = [
        (r["date"], r.get("val"), r.get("announcement_datetime"))
        for r in records
        if r.get("date") and r.get("val") is not None
    ]
    conn.executemany(
        f"INSERT OR REPLACE INTO {table} (date, val, announcement_datetime) VALUES (?,?,?)",
        rows,
    )
    conn.commit()
    return len(rows)


@asset(group_name="macro_indicators", compute_kind="api")
def usd_policy_rate(
    context: AssetExecutionContext,
    fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
    """Latest Fed funds rate series from the FXMacroData announcements endpoint."""
    records = fxmacrodata.get_announcements("usd", "policy_rate", start="2015-01-01")
    with sqlite3.connect(_DB_PATH) as conn:
        _ensure_table(conn, "usd_policy_rate")
        n = _upsert(conn, "usd_policy_rate", records)
    latest = records[0] if records else {}
    context.log.info("Fed rate: %s%% (as of %s)", latest.get("val"), latest.get("date"))
    return MaterializeResult(
        metadata={
            "row_count": MetadataValue.int(n),
            "latest_date": MetadataValue.text(latest.get("date", "n/a")),
            "latest_val": MetadataValue.float(latest.get("val") or 0.0),
        }
    )


@asset(group_name="macro_indicators", compute_kind="api")
def usd_cpi(
    context: AssetExecutionContext,
    fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
    """US CPI (year-on-year) series. Tracks whether inflation is rising or falling."""
    records = fxmacrodata.get_announcements("usd", "inflation", start="2015-01-01")
    with sqlite3.connect(_DB_PATH) as conn:
        _ensure_table(conn, "usd_cpi")
        n = _upsert(conn, "usd_cpi", records)
    latest = records[0] if records else {}
    context.log.info("US CPI: %s%% (as of %s)", latest.get("val"), latest.get("date"))
    return MaterializeResult(
        metadata={
            "row_count": MetadataValue.int(n),
            "latest_date": MetadataValue.text(latest.get("date", "n/a")),
            "latest_val": MetadataValue.float(latest.get("val") or 0.0),
        }
    )


@asset(group_name="macro_indicators", compute_kind="api")
def eur_policy_rate(
    context: AssetExecutionContext,
    fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
    """ECB main refinancing rate series."""
    records = fxmacrodata.get_announcements("eur", "policy_rate", start="2015-01-01")
    with sqlite3.connect(_DB_PATH) as conn:
        _ensure_table(conn, "eur_policy_rate")
        n = _upsert(conn, "eur_policy_rate", records)
    latest = records[0] if records else {}
    context.log.info("ECB rate: %s%% (as of %s)", latest.get("val"), latest.get("date"))
    return MaterializeResult(
        metadata={
            "row_count": MetadataValue.int(n),
            "latest_date": MetadataValue.text(latest.get("date", "n/a")),
            "latest_val": MetadataValue.float(latest.get("val") or 0.0),
        }
    )


@asset(group_name="fx_rates", compute_kind="api")
def eurusd_spot(
    context: AssetExecutionContext,
    fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
    """Daily EUR/USD closing spot rate."""
    records = fxmacrodata.get_forex("eur", "usd", start="2023-01-01")
    with sqlite3.connect(_DB_PATH) as conn:
        _ensure_table(conn, "eurusd_spot")
        n = _upsert(conn, "eurusd_spot", records)
    latest = records[0] if records else {}
    context.log.info("EUR/USD: %s (as of %s)", latest.get("val"), latest.get("date"))
    return MaterializeResult(
        metadata={
            "row_count": MetadataValue.int(n),
            "latest_date": MetadataValue.text(latest.get("date", "n/a")),
            "latest_val": MetadataValue.float(latest.get("val") or 0.0),
        }
    )


@asset(group_name="calendar", compute_kind="api")
def usd_release_calendar(
    context: AssetExecutionContext,
    fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
    """Upcoming USD macro release dates and times."""
    events = fxmacrodata.get_calendar("usd")
    df = pd.DataFrame(events) if events else pd.DataFrame()
    context.log.info("Found %d upcoming USD events", len(df))
    # Surface a preview in the Dagster UI asset metadata pane
    preview = df.head(5).to_markdown(index=False) if not df.empty else "No events found"
    return MaterializeResult(
        metadata={
            "event_count": MetadataValue.int(len(df)),
            "preview": MetadataValue.md(preview),
        }
    )

Metadados de ativos na UI do Dagster

Todos os MaterializeResult Retorna metadados estruturados que o Dagster retrata no painel de detalhes do ativo contagens de linhas, datas mais recentes, tabelas de visualização.


- Passo 4 -

Passo 4 Criar um emprego e um horário diário

Um Dagster . trabalho A. Seleciona os activos a concretizar numa única corrida. programação Queremos leituras de macro frescas todas as manhãs de semana às 06:30 UTC antes da abertura de Frankfurt para que qualquer indicador lançado durante a noite seja capturado antes do início das negociações europeias.

Criar . fxmacro_pipeline/jobs.py- Não .

from dagster import (
    AssetSelection,
    ScheduleDefinition,
    define_asset_job,
)

# ── Jobs ──────────────────────────────────────────────────────────────────────

macro_refresh_job = define_asset_job(
    name="macro_refresh",
    description="Refresh all FXMacroData indicator and FX rate assets.",
    selection=AssetSelection.groups("macro_indicators", "fx_rates"),
)

calendar_refresh_job = define_asset_job(
    name="calendar_refresh",
    description="Pull the latest USD release calendar.",
    selection=AssetSelection.groups("calendar"),
)

# ── Schedules ─────────────────────────────────────────────────────────────────

# Weekdays at 06:30 UTC — 30 min before Frankfurt opens
macro_daily_schedule = ScheduleDefinition(
    name="macro_daily_at_0630_utc",
    job=macro_refresh_job,
    cron_schedule="30 6 * * 1-5",
    execution_timezone="UTC",
)

# Refresh the release calendar every Sunday evening so the week's
# upcoming events are loaded before Monday's first run.
calendar_weekly_schedule = ScheduleDefinition(
    name="calendar_weekly_on_sunday",
    job=calendar_refresh_job,
    cron_schedule="0 18 * * 0",
    execution_timezone="UTC",
)

Se a busca do calendário de lançamento falhar (por exemplo, durante um feriado público quando o ponto final retorna uma carga útil vazia), o trabalho de atualização do indicador não é afetado.


- Passo 5 -

Passo 5 Conecte tudo num objeto Definitions

- O Dagster. Definitions Objeto é o único ponto de entrada que liga ativos, recursos, empregos e agendas juntos. fxmacro_pipeline/__init__.py- Não .

import os

from dagster import Definitions

from .assets import (
    eur_policy_rate,
    eurusd_spot,
    usd_cpi,
    usd_policy_rate,
    usd_release_calendar,
)
from .jobs import (
    calendar_refresh_job,
    calendar_weekly_schedule,
    macro_daily_schedule,
    macro_refresh_job,
)
from .resources import FXMacroDataResource

defs = Definitions(
    assets=[
        usd_policy_rate,
        usd_cpi,
        eur_policy_rate,
        eurusd_spot,
        usd_release_calendar,
    ],
    resources={
        "fxmacrodata": FXMacroDataResource(
            # Falls back to the FXMACRO_API_KEY env var inside the resource
            api_key=os.environ.get("FXMACRO_API_KEY", ""),
        ),
    },
    jobs=[macro_refresh_job, calendar_refresh_job],
    schedules=[macro_daily_schedule, calendar_weekly_schedule],
)

Injecção de recursos em tempo de execução

Qualquer activo que declare ... fxmacrodata: FXMacroDataResource Para trocar em um simulador para teste, anule o recurso em seu teste Definitions o código do activo não é alterado.


- O passo 6 .

Passo 6 Adicionar um sensor para alertar sobre impressões de inflação surpreendentes

Os sensores no Dagster pesquisam condições externas e disparam ou alertan canais quando algo muda. usd_cpi materialização: se a última leitura do IPC for superior a 0,4 pontos percentuais à leitura anterior, é emitido um alerta webhook para que a sua estratégia ou sistema de risco possa reagir imediatamente.

Adicione . fxmacro_pipeline/sensors.py- Não .

import os
import sqlite3

import requests as http
from dagster import (
    RunStatusSensorContext,
    asset_sensor,
    AssetKey,
    EventLogEntry,
    SensorResult,
    SkipReason,
)

_DB_PATH = os.environ.get("FXMACRO_DB", "fxmacro_data.db")
_ALERT_WEBHOOK = os.environ.get("ALERT_WEBHOOK_URL", "")
_SURPRISE_THRESHOLD = 0.4  # percentage points


@asset_sensor(asset_key=AssetKey("usd_cpi"), job_name="macro_refresh")
def cpi_surprise_alert_sensor(
    context: RunStatusSensorContext,
    asset_event: EventLogEntry,
) -> SensorResult | SkipReason:
    """Alert when the latest US CPI print is significantly above the prior reading."""
    try:
        conn = sqlite3.connect(_DB_PATH)
        rows = conn.execute(
            "SELECT date, val FROM usd_cpi ORDER BY date DESC LIMIT 2"
        ).fetchall()
        conn.close()
    except Exception as exc:
        return SkipReason(f"DB read failed: {exc}")

    if len(rows) < 2:
        return SkipReason("Not enough CPI history yet.")

    latest_date, latest_val = rows[0]
    prior_date, prior_val = rows[1]
    surprise = (latest_val or 0) - (prior_val or 0)

    context.log.info(
        "CPI %s: %.2f%% (prior %s: %.2f%%, surprise: %+.2f pp)",
        latest_date, latest_val, prior_date, prior_val, surprise,
    )

    if surprise >= _SURPRISE_THRESHOLD and _ALERT_WEBHOOK:
        payload = {
            "text": (
                f":fire: *USD CPI Surprise* — latest {latest_date}: *{latest_val:.1f}%* "
                f"vs prior {prior_date}: {prior_val:.1f}% "
                f"(surprise: *{surprise:+.2f} pp*)"
            )
        }
        http.post(_ALERT_WEBHOOK, json=payload, timeout=8)
        context.log.warning("CPI surprise alert fired: %+.2f pp", surprise)

    return SensorResult(cursor=str(latest_date))

Regista o sensor . __init__.py importando-o e adicionando-a ao Definitions sensors lista:

from .sensors import cpi_surprise_alert_sensor

defs = Definitions(
    # ... existing fields ...
    sensors=[cpi_surprise_alert_sensor],
)

- O passo 7 .

Passo 7 Lançar a UI Dagster e executar a sua primeira materialização

Inicie o servidor de desenvolvimento local. Definitions através do pyproject.toml ponto de entrada:

dagster dev

Navegação para ... http://localhost:3000- Vais ver o ... Gráfico de activos Apresentação dos cinco activos agrupados nos seus grupos lógicos (macro_indicators- Não . fx_rates- Não . calendarClique Materializem tudo Para executar o primeiro passo de ingestão em segundos, o painel de ativos mostrará contagens de linhas, últimas datas e valores mais recentes extraídos diretamente da API FXMacroData.

Verificar os dados no SQLite

sqlite3 fxmacro_data.db
sqlite> SELECT date, val FROM usd_policy_rate ORDER BY date DESC LIMIT 5;
sqlite> SELECT date, val FROM usd_cpi ORDER BY date DESC LIMIT 5;
sqlite> SELECT date, val FROM eurusd_spot ORDER BY date DESC LIMIT 3;

Para activar o calendário, navegar para Visão geral → Calendários na interface do Dagster e alternar macro_daily_at_0630_utc Para A correr.O processo daemon do Dagster agora acordará todos os dias da semana às 06:30 UTC e materializará os ativos do indicador macro automaticamente.


- PASSO 8: Ensaios

Passo 8 Escrever testes unitários com um recurso simulado

Um dos principais benefícios do design do Dagster é que os ativos são funções Python comuns fáceis de testar sem executar a pilha de orquestração completa. Troque o recurso ao vivo por um simulador para manter os testes rápidos e offline.

# tests/test_assets.py
import sqlite3
import tempfile
import os

from unittest.mock import MagicMock

import pytest
from dagster import build_asset_context, materialize

from fxmacro_pipeline.assets import usd_policy_rate, usd_cpi
from fxmacro_pipeline.resources import FXMacroDataResource


MOCK_RATE_DATA = [
    {"date": "2025-12-18", "val": 4.5, "announcement_datetime": "2025-12-18T19:00:00Z"},
    {"date": "2025-11-07", "val": 4.75, "announcement_datetime": "2025-11-07T19:00:00Z"},
]

MOCK_CPI_DATA = [
    {"date": "2025-12-11", "val": 2.7, "announcement_datetime": "2025-12-11T13:30:00Z"},
    {"date": "2025-11-13", "val": 2.6, "announcement_datetime": "2025-11-13T13:30:00Z"},
]


class MockFXMacroData(FXMacroDataResource):
    def get_announcements(self, currency, indicator, start="2020-01-01"):
        if indicator == "policy_rate":
            return MOCK_RATE_DATA
        if indicator == "inflation":
            return MOCK_CPI_DATA
        return []

    def get_forex(self, base, quote, start="2023-01-01"):
        return [{"date": "2025-12-31", "val": 1.0482, "announcement_datetime": None}]

    def get_calendar(self, currency):
        return []


@pytest.fixture(autouse=True)
def temp_db(tmp_path, monkeypatch):
    monkeypatch.setenv("FXMACRO_DB", str(tmp_path / "test.db"))


def test_usd_policy_rate_materialises():
    result = materialize(
        [usd_policy_rate],
        resources={"fxmacrodata": MockFXMacroData()},
    )
    assert result.success
    mat = result.asset_materializations_for_node("usd_policy_rate")[0]
    assert mat.metadata["row_count"].value == 2
    assert mat.metadata["latest_val"].value == pytest.approx(4.5)


def test_usd_cpi_materialises():
    result = materialize(
        [usd_cpi],
        resources={"fxmacrodata": MockFXMacroData()},
    )
    assert result.success
    mat = result.asset_materializations_for_node("usd_cpi")[0]
    assert mat.metadata["latest_val"].value == pytest.approx(2.7)
pip install pytest
pytest tests/ -v

PASSO 9: Produção

Passo 9 Implementação na produção com o Dagster+

Para equipes que querem um avião de controle hospedado sem gerenciar seu próprio demônio, Dagster+ (a oferta de nuvem gerenciada) requer apenas uma única etapa de configuração adicional: substituir dagster dev com o agente Dagster Cloud, que lê o mesmo. Definitions Objeto e executa executos em sua infraestrutura.

Configure sua chave de API como uma variável de ambiente Dagster + (Implementação → Variaveis ambientaisCom o nome FXMACRO_API_KEYO recurso irá buscá-lo automaticamente sem alterações de código necessárias. O mesmo padrão se aplica a qualquer ambiente CI/CD que injecta segredos como variáveis de ambiente (GitHub Actions, GitLab CI, CircleCI).

Para implantar em um cluster Kubernetes auto-hospedado, empacotar o pipeline como uma imagem Docker, empurrá-lo para o seu registro, e apontar o diagrama Dagster Helm para ele. fxmacrodata.com certifique-se de que a política de saída do seu cluster permite HTTPS para esse host.


- Resumo - Resumindo

O que construíste

Seguindo este guia , agora tem um pipeline Dagster funcional que:

  • Define um reutilizável, injetável FXMacroDataResource que lida com todas as autenticações e erros da API em um só lugar
  • Expose cinco ativos definidos por software taxa da Fed, IPC dos EUA, taxa do BCE, EUR/USD spot e calendário de lançamento do USD cada um persistindo no SQLite com metadados completos do Dagster
  • Agende atualizações de dias úteis às 06:30 UTC através de um cron-driven Dagster agendamento para que você sempre tenha leituras atualizadas antes de mercados europeus abrir
  • Aterrar um alerta webhook quando um IPC imprimir surpresas para cima, dando aos sistemas a jusante um gatilho impulsionado por eventos em vez de um ciclo de votação
  • Inclui testes rápidos de unidade offline usando um recurso simulado, para que o pipeline CI nunca faça chamadas ao vivo da API

Próximos passos

  • Aumentar o número de moedas consultar o catálogo completo dos indicadores em /api-data-docs e adicionar novos activos seguindo o mesmo padrão
  • Substitua o armazém SQLite por um gerenciador de E/S de Postgres ou BigQuery para escala de produção
  • Adicionar um ativo a jusante que lê a partir do DB, calcula uma pontuação de divergência macro, e escreve sinais de negociação para uma tabela de sinais mantendo a sua lógica estratégica separada da camada de ingestão
  • Explorar . Dados de posicionamento COT como uma sobreposição de sentimento para complementar a série de indicadores

Blogroll