Dagster é uma plataforma de orquestração de dados nativa Python construída em torno do conceito de Ativos definidos por software: artefatos de dados que conhecem sua própria linhagem, podem ser materializados sob demanda ou em um cronograma, e expor metadados ricos e observabilidade fora da caixa. Para fluxos de trabalho de dados macro puxando séries de indicadores, detectando eventos de lançamento, armazenando instantâneos e alertando sobre anomalias Dagster é um excelente ajuste. Este guia percorre a construção de um pipeline que ingere indicadores FXMacroData em um armazém local do SQLite, surfa eventos de calendário de lancamento futuros e é executado automaticamente em um calendário diário.
O que você vai construir
- Um recurso FXMacroData Dagster um cliente API reutilizável e configurável partilhado entre todos os activos
- Quatro activos definidos por software taxas de juro, IPC, forex spot e calendário de lançamento do par de moedas escolhido
- Trabalho e horário diários é transmitido todas as manhãs de semana antes da abertura do Londres para actualizar as últimas leituras
- Um sensor de anomalias vigia as impressões de inflação inesperada e dispara um alerta através de um webhook
Requisitos
- Python 3.10+ todos os fragmentos usam sugestões de tipos modernos e
matchdeclarações - Chave da API do FXMacroData Inscreva-se em / subscrever; os dados do indicador USD são acessíveis ao público sem uma chave
- Familiarização básica com Dagster deve saber quais são os activos e os empregos; - O Dagster começa rápido . Abre os fundamentos em dez minutos.
- Passo 1 -
Passo 1 Instalar o Dagster e as dependências do projeto
Crie um novo ambiente virtual e instale o Dagster ao lado do pequeno conjunto de bibliotecas que este pipeline usa. dagster E ... dagster-webserver A utilização de uma versão mais simples do mesmo sistema evita o problema mais comum de instalação.
python -m venv .venv
source .venv/bin/activate # Windows: .venv\Scripts\activate
pip install "dagster==1.9.*" "dagster-webserver==1.9.*" \
requests pandas sqlalchemy
Inicialmente um layout de projeto mínimo.
pyproject.toml que permite que a interface de usuário local descubra suas definições automaticamente.
dagster project scaffold --name fxmacro_pipeline
cd fxmacro_pipeline
Armazenar sua chave de API como uma variável de ambiente. Nunca credenciais de código rígido em arquivos de origem.
export FXMACRO_API_KEY="YOUR_API_KEY"
Em hosts de produção, injete a chave através da gestão secreta do seu agendador (segredos de ações do GitHub, segredos do Kubernetes, variáveis de ambiente da Dagster Cloud).
- O passo 2 .
Passo 2 Definir um recurso FXMacroData reutilizável
Um Dagster . recursos é uma dependência compartilhada e injetável análoga a uma conexão de banco de dados ou uma sessão HTTP. Envolver a API REST FXMacroData em um recurso significa que todos os ativos podem chamá-lo sem duplicar a lógica de autenticação ou o manuseio de timeout.
Criar . fxmacro_pipeline/resources.py- Não .
import os
from typing import Any
import requests
from dagster import ConfigurableResource, get_dagster_logger
_BASE_URL = "https://fxmacrodata.com/api/v1"
class FXMacroDataResource(ConfigurableResource):
"""Thin wrapper around the FXMacroData REST API."""
api_key: str = "" # overridden at run-time from env
timeout: int = 15
def _get(self, path: str, **params: Any) -> dict:
logger = get_dagster_logger()
key = self.api_key or os.environ.get("FXMACRO_API_KEY", "")
resp = requests.get(
f"{_BASE_URL}{path}",
params={"api_key": key, **params},
timeout=self.timeout,
)
resp.raise_for_status()
data = resp.json()
logger.debug("GET %s → %d records", path, len(data.get("data", [])))
return data
def get_announcements(
self,
currency: str,
indicator: str,
start: str = "2020-01-01",
) -> list[dict]:
"""Return time-series records for a macro indicator."""
result = self._get(
f"/announcements/{currency.lower()}/{indicator}",
start=start,
)
return result.get("data", [])
def get_forex(self, base: str, quote: str, start: str = "2023-01-01") -> list[dict]:
"""Return daily FX spot-rate records."""
result = self._get(f"/forex/{base.lower()}/{quote.lower()}", start=start)
return result.get("data", [])
def get_calendar(self, currency: str) -> list[dict]:
"""Return the upcoming release calendar for a currency."""
result = self._get(f"/calendar/{currency.lower()}")
return result.get("data", [])
Porquê ConfigurableResource?
ConfigurableResource permite trocar credenciais e timeouts entre dev e prod sem tocar no código do ativo. Definitions e Dagster injeta onde quer que seja declarado como um parâmetro.
- Passo 3 -
Passo 3 Definição dos ativos de dados macro
Cada ativo Dagster representa um produto de dados lógicos. Materializando um ativo, obtém os dados mais recentes do FXMacroData e persiste localmente. Taxa de política monetária- O quê ? IPC dos EUA- O quê ? Taxa de juro do BCE, e a série spot EUR/USD. Um quinto activo lê então o calendário de lançamento para que o pipeline saiba quando os próximos anúncios de alto impacto devem ser feitos.
Criar . fxmacro_pipeline/assets.py- Não .
import os
import sqlite3
from datetime import date, timedelta
import pandas as pd
from dagster import (
AssetExecutionContext,
MaterializeResult,
MetadataValue,
asset,
)
from .resources import FXMacroDataResource
_DB_PATH = os.environ.get("FXMACRO_DB", "fxmacro_data.db")
def _ensure_table(conn: sqlite3.Connection, table: str) -> None:
conn.execute(
f"""
CREATE TABLE IF NOT EXISTS {table} (
date TEXT PRIMARY KEY,
val REAL,
announcement_datetime TEXT
)
"""
)
conn.commit()
def _upsert(conn: sqlite3.Connection, table: str, records: list[dict]) -> int:
rows = [
(r["date"], r.get("val"), r.get("announcement_datetime"))
for r in records
if r.get("date") and r.get("val") is not None
]
conn.executemany(
f"INSERT OR REPLACE INTO {table} (date, val, announcement_datetime) VALUES (?,?,?)",
rows,
)
conn.commit()
return len(rows)
@asset(group_name="macro_indicators", compute_kind="api")
def usd_policy_rate(
context: AssetExecutionContext,
fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
"""Latest Fed funds rate series from the FXMacroData announcements endpoint."""
records = fxmacrodata.get_announcements("usd", "policy_rate", start="2015-01-01")
with sqlite3.connect(_DB_PATH) as conn:
_ensure_table(conn, "usd_policy_rate")
n = _upsert(conn, "usd_policy_rate", records)
latest = records[0] if records else {}
context.log.info("Fed rate: %s%% (as of %s)", latest.get("val"), latest.get("date"))
return MaterializeResult(
metadata={
"row_count": MetadataValue.int(n),
"latest_date": MetadataValue.text(latest.get("date", "n/a")),
"latest_val": MetadataValue.float(latest.get("val") or 0.0),
}
)
@asset(group_name="macro_indicators", compute_kind="api")
def usd_cpi(
context: AssetExecutionContext,
fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
"""US CPI (year-on-year) series. Tracks whether inflation is rising or falling."""
records = fxmacrodata.get_announcements("usd", "inflation", start="2015-01-01")
with sqlite3.connect(_DB_PATH) as conn:
_ensure_table(conn, "usd_cpi")
n = _upsert(conn, "usd_cpi", records)
latest = records[0] if records else {}
context.log.info("US CPI: %s%% (as of %s)", latest.get("val"), latest.get("date"))
return MaterializeResult(
metadata={
"row_count": MetadataValue.int(n),
"latest_date": MetadataValue.text(latest.get("date", "n/a")),
"latest_val": MetadataValue.float(latest.get("val") or 0.0),
}
)
@asset(group_name="macro_indicators", compute_kind="api")
def eur_policy_rate(
context: AssetExecutionContext,
fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
"""ECB main refinancing rate series."""
records = fxmacrodata.get_announcements("eur", "policy_rate", start="2015-01-01")
with sqlite3.connect(_DB_PATH) as conn:
_ensure_table(conn, "eur_policy_rate")
n = _upsert(conn, "eur_policy_rate", records)
latest = records[0] if records else {}
context.log.info("ECB rate: %s%% (as of %s)", latest.get("val"), latest.get("date"))
return MaterializeResult(
metadata={
"row_count": MetadataValue.int(n),
"latest_date": MetadataValue.text(latest.get("date", "n/a")),
"latest_val": MetadataValue.float(latest.get("val") or 0.0),
}
)
@asset(group_name="fx_rates", compute_kind="api")
def eurusd_spot(
context: AssetExecutionContext,
fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
"""Daily EUR/USD closing spot rate."""
records = fxmacrodata.get_forex("eur", "usd", start="2023-01-01")
with sqlite3.connect(_DB_PATH) as conn:
_ensure_table(conn, "eurusd_spot")
n = _upsert(conn, "eurusd_spot", records)
latest = records[0] if records else {}
context.log.info("EUR/USD: %s (as of %s)", latest.get("val"), latest.get("date"))
return MaterializeResult(
metadata={
"row_count": MetadataValue.int(n),
"latest_date": MetadataValue.text(latest.get("date", "n/a")),
"latest_val": MetadataValue.float(latest.get("val") or 0.0),
}
)
@asset(group_name="calendar", compute_kind="api")
def usd_release_calendar(
context: AssetExecutionContext,
fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
"""Upcoming USD macro release dates and times."""
events = fxmacrodata.get_calendar("usd")
df = pd.DataFrame(events) if events else pd.DataFrame()
context.log.info("Found %d upcoming USD events", len(df))
# Surface a preview in the Dagster UI asset metadata pane
preview = df.head(5).to_markdown(index=False) if not df.empty else "No events found"
return MaterializeResult(
metadata={
"event_count": MetadataValue.int(len(df)),
"preview": MetadataValue.md(preview),
}
)
Metadados de ativos na UI do Dagster
Todos os MaterializeResult Retorna metadados estruturados que o Dagster retrata no painel de detalhes do ativo contagens de linhas, datas mais recentes, tabelas de visualização.
- Passo 4 -
Passo 4 Criar um emprego e um horário diário
Um Dagster . trabalho A. Seleciona os activos a concretizar numa única corrida. programação Queremos leituras de macro frescas todas as manhãs de semana às 06:30 UTC antes da abertura de Frankfurt para que qualquer indicador lançado durante a noite seja capturado antes do início das negociações europeias.
Criar . fxmacro_pipeline/jobs.py- Não .
from dagster import (
AssetSelection,
ScheduleDefinition,
define_asset_job,
)
# ── Jobs ──────────────────────────────────────────────────────────────────────
macro_refresh_job = define_asset_job(
name="macro_refresh",
description="Refresh all FXMacroData indicator and FX rate assets.",
selection=AssetSelection.groups("macro_indicators", "fx_rates"),
)
calendar_refresh_job = define_asset_job(
name="calendar_refresh",
description="Pull the latest USD release calendar.",
selection=AssetSelection.groups("calendar"),
)
# ── Schedules ─────────────────────────────────────────────────────────────────
# Weekdays at 06:30 UTC — 30 min before Frankfurt opens
macro_daily_schedule = ScheduleDefinition(
name="macro_daily_at_0630_utc",
job=macro_refresh_job,
cron_schedule="30 6 * * 1-5",
execution_timezone="UTC",
)
# Refresh the release calendar every Sunday evening so the week's
# upcoming events are loaded before Monday's first run.
calendar_weekly_schedule = ScheduleDefinition(
name="calendar_weekly_on_sunday",
job=calendar_refresh_job,
cron_schedule="0 18 * * 0",
execution_timezone="UTC",
)
Se a busca do calendário de lançamento falhar (por exemplo, durante um feriado público quando o ponto final retorna uma carga útil vazia), o trabalho de atualização do indicador não é afetado.
- Passo 5 -
Passo 5 Conecte tudo num objeto Definitions
- O Dagster. Definitions Objeto é o único ponto de entrada que liga ativos, recursos, empregos e agendas juntos. fxmacro_pipeline/__init__.py- Não .
import os
from dagster import Definitions
from .assets import (
eur_policy_rate,
eurusd_spot,
usd_cpi,
usd_policy_rate,
usd_release_calendar,
)
from .jobs import (
calendar_refresh_job,
calendar_weekly_schedule,
macro_daily_schedule,
macro_refresh_job,
)
from .resources import FXMacroDataResource
defs = Definitions(
assets=[
usd_policy_rate,
usd_cpi,
eur_policy_rate,
eurusd_spot,
usd_release_calendar,
],
resources={
"fxmacrodata": FXMacroDataResource(
# Falls back to the FXMACRO_API_KEY env var inside the resource
api_key=os.environ.get("FXMACRO_API_KEY", ""),
),
},
jobs=[macro_refresh_job, calendar_refresh_job],
schedules=[macro_daily_schedule, calendar_weekly_schedule],
)
Injecção de recursos em tempo de execução
Qualquer activo que declare ... fxmacrodata: FXMacroDataResource Para trocar em um simulador para teste, anule o recurso em seu teste Definitions o código do activo não é alterado.
- O passo 6 .
Passo 6 Adicionar um sensor para alertar sobre impressões de inflação surpreendentes
Os sensores no Dagster pesquisam condições externas e disparam ou alertan canais quando algo muda. usd_cpi materialização: se a última leitura do IPC for superior a 0,4 pontos percentuais à leitura anterior, é emitido um alerta webhook para que a sua estratégia ou sistema de risco possa reagir imediatamente.
Adicione . fxmacro_pipeline/sensors.py- Não .
import os
import sqlite3
import requests as http
from dagster import (
RunStatusSensorContext,
asset_sensor,
AssetKey,
EventLogEntry,
SensorResult,
SkipReason,
)
_DB_PATH = os.environ.get("FXMACRO_DB", "fxmacro_data.db")
_ALERT_WEBHOOK = os.environ.get("ALERT_WEBHOOK_URL", "")
_SURPRISE_THRESHOLD = 0.4 # percentage points
@asset_sensor(asset_key=AssetKey("usd_cpi"), job_name="macro_refresh")
def cpi_surprise_alert_sensor(
context: RunStatusSensorContext,
asset_event: EventLogEntry,
) -> SensorResult | SkipReason:
"""Alert when the latest US CPI print is significantly above the prior reading."""
try:
conn = sqlite3.connect(_DB_PATH)
rows = conn.execute(
"SELECT date, val FROM usd_cpi ORDER BY date DESC LIMIT 2"
).fetchall()
conn.close()
except Exception as exc:
return SkipReason(f"DB read failed: {exc}")
if len(rows) < 2:
return SkipReason("Not enough CPI history yet.")
latest_date, latest_val = rows[0]
prior_date, prior_val = rows[1]
surprise = (latest_val or 0) - (prior_val or 0)
context.log.info(
"CPI %s: %.2f%% (prior %s: %.2f%%, surprise: %+.2f pp)",
latest_date, latest_val, prior_date, prior_val, surprise,
)
if surprise >= _SURPRISE_THRESHOLD and _ALERT_WEBHOOK:
payload = {
"text": (
f":fire: *USD CPI Surprise* — latest {latest_date}: *{latest_val:.1f}%* "
f"vs prior {prior_date}: {prior_val:.1f}% "
f"(surprise: *{surprise:+.2f} pp*)"
)
}
http.post(_ALERT_WEBHOOK, json=payload, timeout=8)
context.log.warning("CPI surprise alert fired: %+.2f pp", surprise)
return SensorResult(cursor=str(latest_date))
Regista o sensor . __init__.py importando-o e adicionando-a ao Definitions
sensors lista:
from .sensors import cpi_surprise_alert_sensor
defs = Definitions(
# ... existing fields ...
sensors=[cpi_surprise_alert_sensor],
)
- O passo 7 .
Passo 7 Lançar a UI Dagster e executar a sua primeira materialização
Inicie o servidor de desenvolvimento local. Definitions através do
pyproject.toml ponto de entrada:
dagster dev
Navegação para ... http://localhost:3000- Vais ver o ... Gráfico de activos Apresentação dos cinco activos agrupados nos seus grupos lógicos (macro_indicators- Não . fx_rates- Não .
calendarClique Materializem tudo Para executar o primeiro passo de ingestão em segundos, o painel de ativos mostrará contagens de linhas, últimas datas e valores mais recentes extraídos diretamente da API FXMacroData.
Verificar os dados no SQLite
sqlite3 fxmacro_data.db
sqlite> SELECT date, val FROM usd_policy_rate ORDER BY date DESC LIMIT 5;
sqlite> SELECT date, val FROM usd_cpi ORDER BY date DESC LIMIT 5;
sqlite> SELECT date, val FROM eurusd_spot ORDER BY date DESC LIMIT 3;
Para activar o calendário, navegar para Visão geral → Calendários na interface do Dagster e alternar
macro_daily_at_0630_utc Para A correr.O processo daemon do Dagster agora acordará todos os dias da semana às 06:30 UTC e materializará os ativos do indicador macro automaticamente.
- PASSO 8: Ensaios
Passo 8 Escrever testes unitários com um recurso simulado
Um dos principais benefícios do design do Dagster é que os ativos são funções Python comuns fáceis de testar sem executar a pilha de orquestração completa. Troque o recurso ao vivo por um simulador para manter os testes rápidos e offline.
# tests/test_assets.py
import sqlite3
import tempfile
import os
from unittest.mock import MagicMock
import pytest
from dagster import build_asset_context, materialize
from fxmacro_pipeline.assets import usd_policy_rate, usd_cpi
from fxmacro_pipeline.resources import FXMacroDataResource
MOCK_RATE_DATA = [
{"date": "2025-12-18", "val": 4.5, "announcement_datetime": "2025-12-18T19:00:00Z"},
{"date": "2025-11-07", "val": 4.75, "announcement_datetime": "2025-11-07T19:00:00Z"},
]
MOCK_CPI_DATA = [
{"date": "2025-12-11", "val": 2.7, "announcement_datetime": "2025-12-11T13:30:00Z"},
{"date": "2025-11-13", "val": 2.6, "announcement_datetime": "2025-11-13T13:30:00Z"},
]
class MockFXMacroData(FXMacroDataResource):
def get_announcements(self, currency, indicator, start="2020-01-01"):
if indicator == "policy_rate":
return MOCK_RATE_DATA
if indicator == "inflation":
return MOCK_CPI_DATA
return []
def get_forex(self, base, quote, start="2023-01-01"):
return [{"date": "2025-12-31", "val": 1.0482, "announcement_datetime": None}]
def get_calendar(self, currency):
return []
@pytest.fixture(autouse=True)
def temp_db(tmp_path, monkeypatch):
monkeypatch.setenv("FXMACRO_DB", str(tmp_path / "test.db"))
def test_usd_policy_rate_materialises():
result = materialize(
[usd_policy_rate],
resources={"fxmacrodata": MockFXMacroData()},
)
assert result.success
mat = result.asset_materializations_for_node("usd_policy_rate")[0]
assert mat.metadata["row_count"].value == 2
assert mat.metadata["latest_val"].value == pytest.approx(4.5)
def test_usd_cpi_materialises():
result = materialize(
[usd_cpi],
resources={"fxmacrodata": MockFXMacroData()},
)
assert result.success
mat = result.asset_materializations_for_node("usd_cpi")[0]
assert mat.metadata["latest_val"].value == pytest.approx(2.7)
pip install pytest
pytest tests/ -v
PASSO 9: Produção
Passo 9 Implementação na produção com o Dagster+
Para equipes que querem um avião de controle hospedado sem gerenciar seu próprio demônio, Dagster+
(a oferta de nuvem gerenciada) requer apenas uma única etapa de configuração adicional: substituir dagster dev
com o agente Dagster Cloud, que lê o mesmo. Definitions Objeto e executa executos em sua infraestrutura.
Configure sua chave de API como uma variável de ambiente Dagster + (Implementação → Variaveis ambientaisCom o nome FXMACRO_API_KEYO recurso irá buscá-lo automaticamente sem alterações de código necessárias. O mesmo padrão se aplica a qualquer ambiente CI/CD que injecta segredos como variáveis de ambiente (GitHub Actions, GitLab CI, CircleCI).
Para implantar em um cluster Kubernetes auto-hospedado, empacotar o pipeline como uma imagem Docker, empurrá-lo para o seu registro, e apontar o diagrama Dagster Helm para ele.
fxmacrodata.com certifique-se de que a política de saída do seu cluster permite HTTPS para esse host.
- Resumo - Resumindo
O que construíste
Seguindo este guia , agora tem um pipeline Dagster funcional que:
- Define um reutilizável, injetável
FXMacroDataResourceque lida com todas as autenticações e erros da API em um só lugar - Expose cinco ativos definidos por software taxa da Fed, IPC dos EUA, taxa do BCE, EUR/USD spot e calendário de lançamento do USD cada um persistindo no SQLite com metadados completos do Dagster
- Agende atualizações de dias úteis às 06:30 UTC através de um cron-driven Dagster agendamento para que você sempre tenha leituras atualizadas antes de mercados europeus abrir
- Aterrar um alerta webhook quando um IPC imprimir surpresas para cima, dando aos sistemas a jusante um gatilho impulsionado por eventos em vez de um ciclo de votação
- Inclui testes rápidos de unidade offline usando um recurso simulado, para que o pipeline CI nunca faça chamadas ao vivo da API
Próximos passos
- Aumentar o número de moedas consultar o catálogo completo dos indicadores em /api-data-docs e adicionar novos activos seguindo o mesmo padrão
- Substitua o armazém SQLite por um gerenciador de E/S de Postgres ou BigQuery para escala de produção
- Adicionar um ativo a jusante que lê a partir do DB, calcula uma pontuação de divergência macro, e escreve sinais de negociação para uma tabela de sinais mantendo a sua lógica estratégica separada da camada de ingestão
- Explorar . Dados de posicionamento COT como uma sobreposição de sentimento para complementar a série de indicadores