Dagster est une plateforme de gestion de données Python basée sur le concept de actifs définis par logiciel: les objets de données qui connaissent leur propre lignée, peuvent être matérialisés à la demande ou sur un calendrier, et exposent des métadonnées riches et une observabilité hors de la boîte. Pour les flux de travail de données macro tirer des séries d'indicateurs, détecter des événements de sortie, stocker des instantanés et alerter sur les anomalies Dagster est un excellent ajustement. Ce guide passe par la construction d'un pipeline qui ingère les indicateurs FXMacroData dans un magasin SQLite local, fait surface aux événement de calendrier de sorties à venir et s'exécute automatiquement sur un horaire quotidien.
Ce que vous allez construire
- Une ressource Dagster FXMacroData un client API réutilisable et configurable partagé sur tous les actifs
- Quatre actifs définis par logiciel taux directeurs, IPC, forex spot et calendrier de mise en circulation pour la paire de devises choisie
- Un emploi et un emploi du temps quotidiens s'effectue tous les matins avant l'ouverture du London Open pour vous mettre à jour sur les dernières données
- Un capteur d'anomalie surveille les signaux d'inflation inattendus et lance une alerte via un webhook
Pré-requis
- Python 3.10+ tous les extraits utilisent des indices de type modernes et
matchdéclarations - Clé de l'API FXMacroData inscrivez-vous à / souscrivez; les données de l' indicateur USD sont accessibles au public sans clé
- Familiarité de base avec le Dagster vous devez savoir ce que sont les actifs et les emplois; Le Dagster est en marche rapide. couvre les bases en dix minutes
- Passe 1 - Passez à la première étape
Étape 1 Installer Dagster et les dépendances du projet
Créez un nouvel environnement virtuel et installez Dagster à côté du petit ensemble de bibliothèques utilisées par ce pipeline. dagster Je suis désolé . dagster-webserver La mise en place d'une version unique de la même machine permet d'éviter le piège le plus courant.
python -m venv .venv
source .venv/bin/activate # Windows: .venv\Scripts\activate
pip install "dagster==1.9.*" "dagster-webserver==1.9.*" \
requests pandas sqlalchemy
La commande "Echafaudage" de Dagster crée la structure du répertoire et un
pyproject.toml qui permet à l'interface locale de découvrir vos définitions automatiquement.
dagster project scaffold --name fxmacro_pipeline
cd fxmacro_pipeline
Ne jamais mettre les informations d'identification dans les fichiers source.
export FXMACRO_API_KEY="YOUR_API_KEY"
Sur les hôtes de production, injectez la clé via la gestion secrète de votre planificateur (secrets d'actions GitHub, secrets Kubernetes, variables d'environnement Dagster Cloud).
- Passe 2 - Je suis désolé
Étape 2 Définir une ressource FXMacroData réutilisable
Une Dagster . ressources est une dépendance partagée et injectable analogue à une connexion de base de données ou à une session HTTP. Envelopper l'API REST FXMacroData dans une ressource signifie que chaque actif peut l'appeler sans dupliquer la logique d'authentification ou la gestion du temps d'arrêt.
Créer . fxmacro_pipeline/resources.pyJe suis désolé .
import os
from typing import Any
import requests
from dagster import ConfigurableResource, get_dagster_logger
_BASE_URL = "https://fxmacrodata.com/api/v1"
class FXMacroDataResource(ConfigurableResource):
"""Thin wrapper around the FXMacroData REST API."""
api_key: str = "" # overridden at run-time from env
timeout: int = 15
def _get(self, path: str, **params: Any) -> dict:
logger = get_dagster_logger()
key = self.api_key or os.environ.get("FXMACRO_API_KEY", "")
resp = requests.get(
f"{_BASE_URL}{path}",
params={"api_key": key, **params},
timeout=self.timeout,
)
resp.raise_for_status()
data = resp.json()
logger.debug("GET %s → %d records", path, len(data.get("data", [])))
return data
def get_announcements(
self,
currency: str,
indicator: str,
start: str = "2020-01-01",
) -> list[dict]:
"""Return time-series records for a macro indicator."""
result = self._get(
f"/announcements/{currency.lower()}/{indicator}",
start=start,
)
return result.get("data", [])
def get_forex(self, base: str, quote: str, start: str = "2023-01-01") -> list[dict]:
"""Return daily FX spot-rate records."""
result = self._get(f"/forex/{base.lower()}/{quote.lower()}", start=start)
return result.get("data", [])
def get_calendar(self, currency: str) -> list[dict]:
"""Return the upcoming release calendar for a currency."""
result = self._get(f"/calendar/{currency.lower()}")
return result.get("data", [])
Pourquoi configurableResource ?
ConfigurableResource Vous pouvez échanger les identifiants et les temps d'arrêt entre le développeur et le prod sans toucher le code de l'actif. Definitions et Dagster l'injecte partout où il est déclaré comme paramètre.
- Pas 3 - Je suis désolé
Étape 3 Définition des actifs de données macro
Chaque actif Dagster représente un produit logique de données. La matérialisation d'un actif récupère les dernières données de FXMacroData et les conserve localement. Taux de la Fed- Je suis désolé . Indice de consommation des États-Unis- Je suis désolé . Taux de référence de la BCEUn cinquième actif lit ensuite le calendrier de sortie afin que le pipeline sache quand les prochaines annonces à fort impact sont attendues.
Créer . fxmacro_pipeline/assets.pyJe suis désolé .
import os
import sqlite3
from datetime import date, timedelta
import pandas as pd
from dagster import (
AssetExecutionContext,
MaterializeResult,
MetadataValue,
asset,
)
from .resources import FXMacroDataResource
_DB_PATH = os.environ.get("FXMACRO_DB", "fxmacro_data.db")
def _ensure_table(conn: sqlite3.Connection, table: str) -> None:
conn.execute(
f"""
CREATE TABLE IF NOT EXISTS {table} (
date TEXT PRIMARY KEY,
val REAL,
announcement_datetime TEXT
)
"""
)
conn.commit()
def _upsert(conn: sqlite3.Connection, table: str, records: list[dict]) -> int:
rows = [
(r["date"], r.get("val"), r.get("announcement_datetime"))
for r in records
if r.get("date") and r.get("val") is not None
]
conn.executemany(
f"INSERT OR REPLACE INTO {table} (date, val, announcement_datetime) VALUES (?,?,?)",
rows,
)
conn.commit()
return len(rows)
@asset(group_name="macro_indicators", compute_kind="api")
def usd_policy_rate(
context: AssetExecutionContext,
fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
"""Latest Fed funds rate series from the FXMacroData announcements endpoint."""
records = fxmacrodata.get_announcements("usd", "policy_rate", start="2015-01-01")
with sqlite3.connect(_DB_PATH) as conn:
_ensure_table(conn, "usd_policy_rate")
n = _upsert(conn, "usd_policy_rate", records)
latest = records[0] if records else {}
context.log.info("Fed rate: %s%% (as of %s)", latest.get("val"), latest.get("date"))
return MaterializeResult(
metadata={
"row_count": MetadataValue.int(n),
"latest_date": MetadataValue.text(latest.get("date", "n/a")),
"latest_val": MetadataValue.float(latest.get("val") or 0.0),
}
)
@asset(group_name="macro_indicators", compute_kind="api")
def usd_cpi(
context: AssetExecutionContext,
fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
"""US CPI (year-on-year) series. Tracks whether inflation is rising or falling."""
records = fxmacrodata.get_announcements("usd", "inflation", start="2015-01-01")
with sqlite3.connect(_DB_PATH) as conn:
_ensure_table(conn, "usd_cpi")
n = _upsert(conn, "usd_cpi", records)
latest = records[0] if records else {}
context.log.info("US CPI: %s%% (as of %s)", latest.get("val"), latest.get("date"))
return MaterializeResult(
metadata={
"row_count": MetadataValue.int(n),
"latest_date": MetadataValue.text(latest.get("date", "n/a")),
"latest_val": MetadataValue.float(latest.get("val") or 0.0),
}
)
@asset(group_name="macro_indicators", compute_kind="api")
def eur_policy_rate(
context: AssetExecutionContext,
fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
"""ECB main refinancing rate series."""
records = fxmacrodata.get_announcements("eur", "policy_rate", start="2015-01-01")
with sqlite3.connect(_DB_PATH) as conn:
_ensure_table(conn, "eur_policy_rate")
n = _upsert(conn, "eur_policy_rate", records)
latest = records[0] if records else {}
context.log.info("ECB rate: %s%% (as of %s)", latest.get("val"), latest.get("date"))
return MaterializeResult(
metadata={
"row_count": MetadataValue.int(n),
"latest_date": MetadataValue.text(latest.get("date", "n/a")),
"latest_val": MetadataValue.float(latest.get("val") or 0.0),
}
)
@asset(group_name="fx_rates", compute_kind="api")
def eurusd_spot(
context: AssetExecutionContext,
fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
"""Daily EUR/USD closing spot rate."""
records = fxmacrodata.get_forex("eur", "usd", start="2023-01-01")
with sqlite3.connect(_DB_PATH) as conn:
_ensure_table(conn, "eurusd_spot")
n = _upsert(conn, "eurusd_spot", records)
latest = records[0] if records else {}
context.log.info("EUR/USD: %s (as of %s)", latest.get("val"), latest.get("date"))
return MaterializeResult(
metadata={
"row_count": MetadataValue.int(n),
"latest_date": MetadataValue.text(latest.get("date", "n/a")),
"latest_val": MetadataValue.float(latest.get("val") or 0.0),
}
)
@asset(group_name="calendar", compute_kind="api")
def usd_release_calendar(
context: AssetExecutionContext,
fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
"""Upcoming USD macro release dates and times."""
events = fxmacrodata.get_calendar("usd")
df = pd.DataFrame(events) if events else pd.DataFrame()
context.log.info("Found %d upcoming USD events", len(df))
# Surface a preview in the Dagster UI asset metadata pane
preview = df.head(5).to_markdown(index=False) if not df.empty else "No events found"
return MaterializeResult(
metadata={
"event_count": MetadataValue.int(len(df)),
"preview": MetadataValue.md(preview),
}
)
Métadonnées d'actifs dans l'interface utilisateur Dagster
Toutes les fois . MaterializeResult renvoie des métadonnées structurées que Dagster affiche dans le volet détails des actifs nombre de lignes, dates les plus récentes, tableaux d'aperçu. Cela signifie que vous pouvez vérifier en un coup d'œil si une matérialisation a réellement récupéré des données fraîches sans creuser dans les journaux.
- Passe 4 - Passez à la première étape
Étape 4 Créer un emploi et un emplois du jour
Une Dagster . emploi Il choisit les actifs à réaliser en une seule course. le calendrier Nous voulons des lectures macro fraîches tous les jours de semaine à 06:30 UTC avant l'ouverture de Francfort afin que tout indicateur publié pendant la nuit soit capturé avant le début des transactions européennes.
Créer . fxmacro_pipeline/jobs.pyJe suis désolé .
from dagster import (
AssetSelection,
ScheduleDefinition,
define_asset_job,
)
# ── Jobs ──────────────────────────────────────────────────────────────────────
macro_refresh_job = define_asset_job(
name="macro_refresh",
description="Refresh all FXMacroData indicator and FX rate assets.",
selection=AssetSelection.groups("macro_indicators", "fx_rates"),
)
calendar_refresh_job = define_asset_job(
name="calendar_refresh",
description="Pull the latest USD release calendar.",
selection=AssetSelection.groups("calendar"),
)
# ── Schedules ─────────────────────────────────────────────────────────────────
# Weekdays at 06:30 UTC — 30 min before Frankfurt opens
macro_daily_schedule = ScheduleDefinition(
name="macro_daily_at_0630_utc",
job=macro_refresh_job,
cron_schedule="30 6 * * 1-5",
execution_timezone="UTC",
)
# Refresh the release calendar every Sunday evening so the week's
# upcoming events are loaded before Monday's first run.
calendar_weekly_schedule = ScheduleDefinition(
name="calendar_weekly_on_sunday",
job=calendar_refresh_job,
cron_schedule="0 18 * * 0",
execution_timezone="UTC",
)
Deux tâches distinctes une pour la série d'indicateurs, une pour le calendrier de sortie vous permettent de les observer et de les exécuter de nouveau indépendamment.
- Passe 5 - Je suis désolé
Étape 5 Connectez tout dans un objet Définitions
Il est à Dagster. Definitions L'objet est le point d'entrée unique qui lie les actifs, les ressources, les emplois et les horaires. fxmacro_pipeline/__init__.pyJe suis désolé .
import os
from dagster import Definitions
from .assets import (
eur_policy_rate,
eurusd_spot,
usd_cpi,
usd_policy_rate,
usd_release_calendar,
)
from .jobs import (
calendar_refresh_job,
calendar_weekly_schedule,
macro_daily_schedule,
macro_refresh_job,
)
from .resources import FXMacroDataResource
defs = Definitions(
assets=[
usd_policy_rate,
usd_cpi,
eur_policy_rate,
eurusd_spot,
usd_release_calendar,
],
resources={
"fxmacrodata": FXMacroDataResource(
# Falls back to the FXMACRO_API_KEY env var inside the resource
api_key=os.environ.get("FXMACRO_API_KEY", ""),
),
},
jobs=[macro_refresh_job, calendar_refresh_job],
schedules=[macro_daily_schedule, calendar_weekly_schedule],
)
Injection de ressources en temps d'exécution
Tout actif qui déclare fxmacrodata: FXMacroDataResource Pour échanger une simulation pour le test, écraser la ressource dans votre test Definitions le code des actifs ne change pas.
- Pas 6 - Je suis désolé
Étape 6 Ajouter un capteur pour alerter sur les empreintes d'inflation surprenantes
Les capteurs de Dagster surveillent les conditions extérieures et déclenchent des signaux ou des canaux d'alerte lorsque quelque chose change. usd_cpi La mise en œuvre: si la dernière lecture de l'IPC est supérieure de plus de 0,4 point de pourcentage à la lecture précédente, elle déclenche une alerte webhook afin que votre stratégie ou votre système de risque puisse réagir immédiatement.
Ajoutez . fxmacro_pipeline/sensors.pyJe suis désolé .
import os
import sqlite3
import requests as http
from dagster import (
RunStatusSensorContext,
asset_sensor,
AssetKey,
EventLogEntry,
SensorResult,
SkipReason,
)
_DB_PATH = os.environ.get("FXMACRO_DB", "fxmacro_data.db")
_ALERT_WEBHOOK = os.environ.get("ALERT_WEBHOOK_URL", "")
_SURPRISE_THRESHOLD = 0.4 # percentage points
@asset_sensor(asset_key=AssetKey("usd_cpi"), job_name="macro_refresh")
def cpi_surprise_alert_sensor(
context: RunStatusSensorContext,
asset_event: EventLogEntry,
) -> SensorResult | SkipReason:
"""Alert when the latest US CPI print is significantly above the prior reading."""
try:
conn = sqlite3.connect(_DB_PATH)
rows = conn.execute(
"SELECT date, val FROM usd_cpi ORDER BY date DESC LIMIT 2"
).fetchall()
conn.close()
except Exception as exc:
return SkipReason(f"DB read failed: {exc}")
if len(rows) < 2:
return SkipReason("Not enough CPI history yet.")
latest_date, latest_val = rows[0]
prior_date, prior_val = rows[1]
surprise = (latest_val or 0) - (prior_val or 0)
context.log.info(
"CPI %s: %.2f%% (prior %s: %.2f%%, surprise: %+.2f pp)",
latest_date, latest_val, prior_date, prior_val, surprise,
)
if surprise >= _SURPRISE_THRESHOLD and _ALERT_WEBHOOK:
payload = {
"text": (
f":fire: *USD CPI Surprise* — latest {latest_date}: *{latest_val:.1f}%* "
f"vs prior {prior_date}: {prior_val:.1f}% "
f"(surprise: *{surprise:+.2f} pp*)"
)
}
http.post(_ALERT_WEBHOOK, json=payload, timeout=8)
context.log.warning("CPI surprise alert fired: %+.2f pp", surprise)
return SensorResult(cursor=str(latest_date))
Enregistrez le capteur . __init__.py en l'important et en l ' ajoutant à la Definitions
sensors Liste:
from .sensors import cpi_surprise_alert_sensor
defs = Definitions(
# ... existing fields ...
sensors=[cpi_surprise_alert_sensor],
)
- Pas 7 - Je suis désolé
Étape 7 Démarrez l'interface utilisateur Dagster et exécutez votre première matérialisation
Démarrez le serveur de développement local. Definitions à travers le
pyproject.toml point d'entrée:
dagster dev
Navigation à http://localhost:3000Vous verrez la Graphique des actifs Les cinq actifs sont regroupés dans leurs groupes logiques (macro_indicatorsJe suis désolé . fx_ratesJe suis désolé .
calendarCliquez sur Matérialiser tout Pour exécuter le premier passage d'ingestion en quelques secondes, le volet des actifs affichera le nombre de lignes, les dernières dates et les dernières valeurs tirées directement de l'API FXMacroData.
Vérifier les données dans SQLite
sqlite3 fxmacro_data.db
sqlite> SELECT date, val FROM usd_policy_rate ORDER BY date DESC LIMIT 5;
sqlite> SELECT date, val FROM usd_cpi ORDER BY date DESC LIMIT 5;
sqlite> SELECT date, val FROM eurusd_spot ORDER BY date DESC LIMIT 3;
Pour activer le calendrier, accédez à Résumé → Calendrier dans l'interface utilisateur Dagster et bascule
macro_daily_at_0630_utc Je suis là . Je cours .Le processus démon de Dagster se réveillera maintenant tous les jours de la semaine à 06h30 UTC et matérialisera les actifs d'indicateur macro automatiquement.
- ÉTAPE 8: Tests - - -
Étape 8 Rédiger des tests unitaires avec une ressource simulée
L'un des principaux avantages de la conception de Dagster est que les actifs sont des fonctions Python ordinaires faciles à tester en unité sans exécuter la pile d'orchestration complète.
# tests/test_assets.py
import sqlite3
import tempfile
import os
from unittest.mock import MagicMock
import pytest
from dagster import build_asset_context, materialize
from fxmacro_pipeline.assets import usd_policy_rate, usd_cpi
from fxmacro_pipeline.resources import FXMacroDataResource
MOCK_RATE_DATA = [
{"date": "2025-12-18", "val": 4.5, "announcement_datetime": "2025-12-18T19:00:00Z"},
{"date": "2025-11-07", "val": 4.75, "announcement_datetime": "2025-11-07T19:00:00Z"},
]
MOCK_CPI_DATA = [
{"date": "2025-12-11", "val": 2.7, "announcement_datetime": "2025-12-11T13:30:00Z"},
{"date": "2025-11-13", "val": 2.6, "announcement_datetime": "2025-11-13T13:30:00Z"},
]
class MockFXMacroData(FXMacroDataResource):
def get_announcements(self, currency, indicator, start="2020-01-01"):
if indicator == "policy_rate":
return MOCK_RATE_DATA
if indicator == "inflation":
return MOCK_CPI_DATA
return []
def get_forex(self, base, quote, start="2023-01-01"):
return [{"date": "2025-12-31", "val": 1.0482, "announcement_datetime": None}]
def get_calendar(self, currency):
return []
@pytest.fixture(autouse=True)
def temp_db(tmp_path, monkeypatch):
monkeypatch.setenv("FXMACRO_DB", str(tmp_path / "test.db"))
def test_usd_policy_rate_materialises():
result = materialize(
[usd_policy_rate],
resources={"fxmacrodata": MockFXMacroData()},
)
assert result.success
mat = result.asset_materializations_for_node("usd_policy_rate")[0]
assert mat.metadata["row_count"].value == 2
assert mat.metadata["latest_val"].value == pytest.approx(4.5)
def test_usd_cpi_materialises():
result = materialize(
[usd_cpi],
resources={"fxmacrodata": MockFXMacroData()},
)
assert result.success
mat = result.asset_materializations_for_node("usd_cpi")[0]
assert mat.metadata["latest_val"].value == pytest.approx(2.7)
pip install pytest
pytest tests/ -v
- Étape 9: la production
Étape 9 Déploiement en production avec Dagster+
Pour les équipes qui veulent un plan de contrôle hébergé sans gérer leur propre démon, Dagster+
(l'offre de cloud géré) ne nécessite qu'une seule étape de configuration supplémentaire: remplacer dagster dev
avec l'agent Dagster Cloud, qui est le même. Definitions Objet et exécute des exécutions dans votre infrastructure.
Définissez votre clé API comme une variable d'environnement Dagster+ (Déploiement → Variables environnementalesAvec le nom FXMACRO_API_KEY. La ressource le récupérera automatiquement sans changement de code requis. Le même schéma s'applique à tout environnement CI/CD qui injecte des secrets en tant que variables d'environnement (GitHub Actions, GitLab CI, CircleCI).
Pour déployer sur un cluster Kubernetes auto-hébergé, emballer le pipeline comme une image Docker, le pousser à votre registre, et de pointer le diagramme Dagster Helm à elle.
fxmacrodata.com assurez-vous que la politique de sortie de votre cluster autorise HTTPS vers cet hôte.
- Résumé -
Ce que vous avez construit
En suivant ce guide , vous avez maintenant un pipeline Dagster qui fonctionne:
- Définit un produit réutilisable, injectable
FXMacroDataResourcequi gère toutes les API authentification et le traitement des erreurs en un seul endroit - Expose cinq actifs définis par logiciel taux de la Fed, IPC américain, taux de BCE, EUR/USD spot et calendrier de sortie USD chacun persisté à SQLite avec les métadonnées Dagster complètes
- Planifie les mises à jour des jours de semaine à 06:30 UTC via un calendrier Dagster basé sur le cron afin que vous ayez toujours des lectures à jour avant l'ouverture des marchés européens
- Déclenche une alerte webhook lorsqu'un indice de consommation imprime des surprises à la hausse, donnant aux systèmes en aval un déclencheur basé sur des événements plutôt qu'une boucle de sondage
- Inclut des tests unitaires hors ligne rapides à l'aide d'une ressource simulée, de sorte que le pipeline CI ne fait jamais d'appels API en direct
Les prochaines étapes
- Étendre le pipeline avec des devises supplémentaires consultez le catalogue complet des indicateurs à /api-data-docs Il est temps de le faire. et ajouter de nouveaux actifs suivant le même schéma
- Remplacer le stockage SQLite par un gestionnaire d'E/S Postgres ou BigQuery pour une échelle de production
- Ajouter un actif en aval qui lit de la base de données, calcule un score de divergence macro, et écrit des signaux de trading à une table de signaux garder votre logique de stratégie séparée de la couche d'ingestion
- Je vais explorer . Données de positionnement COT comme superposition du sentiment pour compléter la série d'indicateurs