Dagster ist eine Python-native Datenorchestrierungsplattform, die auf dem Konzept Softwaredefinierte Vermögenswerte: Datenartefakte, die ihre eigene Abstammung kennen, können auf Anfrage oder nach einem Zeitplan materialisiert werden und reiche Metadaten und Beobachtbarkeit aus der Box freisetzen. Für Makrodaten-Workflows Indikatorreihen ziehen, Release-Ereignisse erkennen, Momentaufnahmen speichern und Anomalien warnen Dagster ist eine hervorragende Passform. Dieser Leitfaden führt durch den Aufbau einer Pipeline, die FXMacroData-Indikatoren in einen lokalen SQLite-Speicher eintritt, bevorstehende Release Kalenderereignisse aufzeigt und automatisch in einem täglichen Zeitplan läuft.
Was du bauen wirst
- Eine FXMacroData Dagster-Ressource ein wiederverwendbarer, konfigurierbarer API-Client, der für alle Assets gemeinsam genutzt wird
- Vier Software-definierte Vermögenswerte Kursentwicklung, KPI, Forex-Spot und Veröffentlichungskalender für das gewählte Währungspaar
- Tägliche Arbeit und Zeitplan läuft jeden Wochentag vor der Londoner Opening, um die neuesten Messwerte zu erhalten
- Ein Anomaliensensor Aufmerksamkeit auf unerwartete Inflation und Alarm durch Webhook
Voraussetzungen
- Python 3.10+ alle Snippets verwenden moderne Schrift-Hinweise und
matchAusführungen - FXMacroData-API-Schlüssel melden Sie sich an /abonnieren; USD-Indikatordaten sind öffentlich zugänglich ohne Schlüssel
- Grundlegende Kenntnisse der Dagster Sie sollten wissen, was Vermögenswerte und Arbeitsplätze sind; Dagster-Schnellstart Das ist in zehn Minuten alles.
- Schritt 1 -
Schritt 1 Installation von Dagster und Projektabhängigkeiten
Erstellen Sie eine neue virtuelle Umgebung und installieren Sie Dagster neben den kleinen Bibliotheken, die diese Pipeline verwendet. dagster Und ... dagster-webserver Die Einführung der gleichen Version vermeidet den häufigsten Installationsfall.
python -m venv .venv
source .venv/bin/activate # Windows: .venv\Scripts\activate
pip install "dagster==1.9.*" "dagster-webserver==1.9.*" \
requests pandas sqlalchemy
Das Programm wird mit dem Kommando "Schaffold" gestaltet.
pyproject.toml Das erlaubt der lokalen Benutzeroberfläche, Ihre Definitionen automatisch zu entdecken.
dagster project scaffold --name fxmacro_pipeline
cd fxmacro_pipeline
Speichern Sie Ihren API-Schlüssel als Umgebungsvariable.
export FXMACRO_API_KEY="YOUR_API_KEY"
Bei Produktionshosts wird der Schlüssel über die geheime Verwaltung des Schedulers (GitHub Actions-Geheimnisse, Kubernetes-Geheime, Dagster Cloud-Umgebungsvariablen) eingespeist.
- Schritt 2 -
Schritt 2 Definition einer wiederverwendbaren FXMacroData-Ressource
Ein Dolch . Ressource ist eine geteilte, injizierbare Abhängigkeit analog zu einer Datenbankverbindung oder einer HTTP-Sitzung. Das Wickeln der FXMacroData REST API in eine Ressource bedeutet, dass jedes Asset sie anrufen kann, ohne Authentifizierungslogik oder Timeout-Handling zu duplizieren.
Erstellen . fxmacro_pipeline/resources.py- Ich weiß .
import os
from typing import Any
import requests
from dagster import ConfigurableResource, get_dagster_logger
_BASE_URL = "https://fxmacrodata.com/api/v1"
class FXMacroDataResource(ConfigurableResource):
"""Thin wrapper around the FXMacroData REST API."""
api_key: str = "" # overridden at run-time from env
timeout: int = 15
def _get(self, path: str, **params: Any) -> dict:
logger = get_dagster_logger()
key = self.api_key or os.environ.get("FXMACRO_API_KEY", "")
resp = requests.get(
f"{_BASE_URL}{path}",
params={"api_key": key, **params},
timeout=self.timeout,
)
resp.raise_for_status()
data = resp.json()
logger.debug("GET %s → %d records", path, len(data.get("data", [])))
return data
def get_announcements(
self,
currency: str,
indicator: str,
start: str = "2020-01-01",
) -> list[dict]:
"""Return time-series records for a macro indicator."""
result = self._get(
f"/announcements/{currency.lower()}/{indicator}",
start=start,
)
return result.get("data", [])
def get_forex(self, base: str, quote: str, start: str = "2023-01-01") -> list[dict]:
"""Return daily FX spot-rate records."""
result = self._get(f"/forex/{base.lower()}/{quote.lower()}", start=start)
return result.get("data", [])
def get_calendar(self, currency: str) -> list[dict]:
"""Return the upcoming release calendar for a currency."""
result = self._get(f"/calendar/{currency.lower()}")
return result.get("data", [])
Warum ConfigurableResource?
ConfigurableResource Sie können die Anmeldeinformationen und Zeitlimits zwischen Entwickler und Prod wechseln, ohne den Assetcode zu berühren. Definitions Und Dagster injiziert es, wo immer es als Parameter deklariert wird.
- Schritt 3 -
Schritt 3 Definition von Makrodatenanlagen
Jedes Dagster-Asset stellt ein logisches Datenprodukt dar. Die Materialisierung eines Assets holt die neuesten Daten von FXMacroData ab und speichert sie lokal. Zinssatz der Fed- Das ist ... US-amerikanischer KPI- Das ist ... Leitzins der EZBEin fünfter Vermögenswert liest dann den Veröffentlichungskalender, damit die Pipeline weiß, wann die nächsten wichtigen Ankündigungen fällig sind.
Erstellen . fxmacro_pipeline/assets.py- Ich weiß .
import os
import sqlite3
from datetime import date, timedelta
import pandas as pd
from dagster import (
AssetExecutionContext,
MaterializeResult,
MetadataValue,
asset,
)
from .resources import FXMacroDataResource
_DB_PATH = os.environ.get("FXMACRO_DB", "fxmacro_data.db")
def _ensure_table(conn: sqlite3.Connection, table: str) -> None:
conn.execute(
f"""
CREATE TABLE IF NOT EXISTS {table} (
date TEXT PRIMARY KEY,
val REAL,
announcement_datetime TEXT
)
"""
)
conn.commit()
def _upsert(conn: sqlite3.Connection, table: str, records: list[dict]) -> int:
rows = [
(r["date"], r.get("val"), r.get("announcement_datetime"))
for r in records
if r.get("date") and r.get("val") is not None
]
conn.executemany(
f"INSERT OR REPLACE INTO {table} (date, val, announcement_datetime) VALUES (?,?,?)",
rows,
)
conn.commit()
return len(rows)
@asset(group_name="macro_indicators", compute_kind="api")
def usd_policy_rate(
context: AssetExecutionContext,
fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
"""Latest Fed funds rate series from the FXMacroData announcements endpoint."""
records = fxmacrodata.get_announcements("usd", "policy_rate", start="2015-01-01")
with sqlite3.connect(_DB_PATH) as conn:
_ensure_table(conn, "usd_policy_rate")
n = _upsert(conn, "usd_policy_rate", records)
latest = records[0] if records else {}
context.log.info("Fed rate: %s%% (as of %s)", latest.get("val"), latest.get("date"))
return MaterializeResult(
metadata={
"row_count": MetadataValue.int(n),
"latest_date": MetadataValue.text(latest.get("date", "n/a")),
"latest_val": MetadataValue.float(latest.get("val") or 0.0),
}
)
@asset(group_name="macro_indicators", compute_kind="api")
def usd_cpi(
context: AssetExecutionContext,
fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
"""US CPI (year-on-year) series. Tracks whether inflation is rising or falling."""
records = fxmacrodata.get_announcements("usd", "inflation", start="2015-01-01")
with sqlite3.connect(_DB_PATH) as conn:
_ensure_table(conn, "usd_cpi")
n = _upsert(conn, "usd_cpi", records)
latest = records[0] if records else {}
context.log.info("US CPI: %s%% (as of %s)", latest.get("val"), latest.get("date"))
return MaterializeResult(
metadata={
"row_count": MetadataValue.int(n),
"latest_date": MetadataValue.text(latest.get("date", "n/a")),
"latest_val": MetadataValue.float(latest.get("val") or 0.0),
}
)
@asset(group_name="macro_indicators", compute_kind="api")
def eur_policy_rate(
context: AssetExecutionContext,
fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
"""ECB main refinancing rate series."""
records = fxmacrodata.get_announcements("eur", "policy_rate", start="2015-01-01")
with sqlite3.connect(_DB_PATH) as conn:
_ensure_table(conn, "eur_policy_rate")
n = _upsert(conn, "eur_policy_rate", records)
latest = records[0] if records else {}
context.log.info("ECB rate: %s%% (as of %s)", latest.get("val"), latest.get("date"))
return MaterializeResult(
metadata={
"row_count": MetadataValue.int(n),
"latest_date": MetadataValue.text(latest.get("date", "n/a")),
"latest_val": MetadataValue.float(latest.get("val") or 0.0),
}
)
@asset(group_name="fx_rates", compute_kind="api")
def eurusd_spot(
context: AssetExecutionContext,
fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
"""Daily EUR/USD closing spot rate."""
records = fxmacrodata.get_forex("eur", "usd", start="2023-01-01")
with sqlite3.connect(_DB_PATH) as conn:
_ensure_table(conn, "eurusd_spot")
n = _upsert(conn, "eurusd_spot", records)
latest = records[0] if records else {}
context.log.info("EUR/USD: %s (as of %s)", latest.get("val"), latest.get("date"))
return MaterializeResult(
metadata={
"row_count": MetadataValue.int(n),
"latest_date": MetadataValue.text(latest.get("date", "n/a")),
"latest_val": MetadataValue.float(latest.get("val") or 0.0),
}
)
@asset(group_name="calendar", compute_kind="api")
def usd_release_calendar(
context: AssetExecutionContext,
fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
"""Upcoming USD macro release dates and times."""
events = fxmacrodata.get_calendar("usd")
df = pd.DataFrame(events) if events else pd.DataFrame()
context.log.info("Found %d upcoming USD events", len(df))
# Surface a preview in the Dagster UI asset metadata pane
preview = df.head(5).to_markdown(index=False) if not df.empty else "No events found"
return MaterializeResult(
metadata={
"event_count": MetadataValue.int(len(df)),
"preview": MetadataValue.md(preview),
}
)
Metadaten von Vermögenswerten in der Dagster-Benutzeroberfläche
Jedes Mal . MaterializeResult gibt strukturierte Metadaten zurück, die Dagster im Asset-Detailbereich Zeilenzahlen, letzte Daten, Vorschautabellen wiedergibt. Dies bedeutet, dass Sie auf einen Blick überprüfen können, ob eine Materialisierung tatsächlich frische Daten abruft, ohne in Protokolle zu graben.
- Schritt 4 -
Schritt 4 Schaffen Sie sich einen Arbeitsplatz und einen Tagesablauf
Ein Dolch . Arbeit Die Kommission hat die Kommission aufgefordert, die in diesem Zusammenhang zu ergreifenden Maßnahmen zu prüfen. Zeitplan Wir wollen jeden Wochentag früh um 06:30 Uhr UTC neue Makro-Werte vor der Frankfurter Öffnung, damit jeder Indikator, der über Nacht veröffentlicht wird, vor Beginn des europäischen Handels erfasst wird.
Erstellen . fxmacro_pipeline/jobs.py- Ich weiß .
from dagster import (
AssetSelection,
ScheduleDefinition,
define_asset_job,
)
# ── Jobs ──────────────────────────────────────────────────────────────────────
macro_refresh_job = define_asset_job(
name="macro_refresh",
description="Refresh all FXMacroData indicator and FX rate assets.",
selection=AssetSelection.groups("macro_indicators", "fx_rates"),
)
calendar_refresh_job = define_asset_job(
name="calendar_refresh",
description="Pull the latest USD release calendar.",
selection=AssetSelection.groups("calendar"),
)
# ── Schedules ─────────────────────────────────────────────────────────────────
# Weekdays at 06:30 UTC — 30 min before Frankfurt opens
macro_daily_schedule = ScheduleDefinition(
name="macro_daily_at_0630_utc",
job=macro_refresh_job,
cron_schedule="30 6 * * 1-5",
execution_timezone="UTC",
)
# Refresh the release calendar every Sunday evening so the week's
# upcoming events are loaded before Monday's first run.
calendar_weekly_schedule = ScheduleDefinition(
name="calendar_weekly_on_sunday",
job=calendar_refresh_job,
cron_schedule="0 18 * * 0",
execution_timezone="UTC",
)
Wenn die Abgabe des Kalenders nicht funktioniert (z. B. an einem Feiertag, wenn der Endpunkt eine leere Nutzlast zurückgibt), ist die Aktualisierung des Indikators nicht betroffen.
- Schritt 5 -
Schritt 5 Alles in ein Definitionsobjekt einführen
Das ist Dagster. Definitions Objekt ist der einzige Eingangspunkt, der Assets, Ressourcen, Jobs und Zeitpläne miteinander verbindet. fxmacro_pipeline/__init__.py- Ich weiß .
import os
from dagster import Definitions
from .assets import (
eur_policy_rate,
eurusd_spot,
usd_cpi,
usd_policy_rate,
usd_release_calendar,
)
from .jobs import (
calendar_refresh_job,
calendar_weekly_schedule,
macro_daily_schedule,
macro_refresh_job,
)
from .resources import FXMacroDataResource
defs = Definitions(
assets=[
usd_policy_rate,
usd_cpi,
eur_policy_rate,
eurusd_spot,
usd_release_calendar,
],
resources={
"fxmacrodata": FXMacroDataResource(
# Falls back to the FXMACRO_API_KEY env var inside the resource
api_key=os.environ.get("FXMACRO_API_KEY", ""),
),
},
jobs=[macro_refresh_job, calendar_refresh_job],
schedules=[macro_daily_schedule, calendar_weekly_schedule],
)
Ressourcenzufuhr zur Laufzeit
Jeder Vermögenswert , der sich meldet . fxmacrodata: FXMacroDataResource Als Parameter erhält automatisch die konfigurierte Instanz bei Laufzeit. Definitions der Vermögenswertcode ändert sich nicht.
- Schritt 6 -
Schritt 6 Hinzufügen eines Sensors, um überraschende Inflationsausdrücke zu alarmieren
Die Sensoren in Dagster ermitteln die äußeren Bedingungen und lösen oder alarmieren, wenn sich etwas ändert. usd_cpi Verwirklichung: Wenn der letzte Preisindex mehr als 0,4 Prozentpunkte über dem vorherigen liegt, wird eine Webhook-Warnung ausgelöst, damit Ihre Strategie oder Ihr Risikosystem sofort reagieren kann.
Fügen Sie hinzu . fxmacro_pipeline/sensors.py- Ich weiß .
import os
import sqlite3
import requests as http
from dagster import (
RunStatusSensorContext,
asset_sensor,
AssetKey,
EventLogEntry,
SensorResult,
SkipReason,
)
_DB_PATH = os.environ.get("FXMACRO_DB", "fxmacro_data.db")
_ALERT_WEBHOOK = os.environ.get("ALERT_WEBHOOK_URL", "")
_SURPRISE_THRESHOLD = 0.4 # percentage points
@asset_sensor(asset_key=AssetKey("usd_cpi"), job_name="macro_refresh")
def cpi_surprise_alert_sensor(
context: RunStatusSensorContext,
asset_event: EventLogEntry,
) -> SensorResult | SkipReason:
"""Alert when the latest US CPI print is significantly above the prior reading."""
try:
conn = sqlite3.connect(_DB_PATH)
rows = conn.execute(
"SELECT date, val FROM usd_cpi ORDER BY date DESC LIMIT 2"
).fetchall()
conn.close()
except Exception as exc:
return SkipReason(f"DB read failed: {exc}")
if len(rows) < 2:
return SkipReason("Not enough CPI history yet.")
latest_date, latest_val = rows[0]
prior_date, prior_val = rows[1]
surprise = (latest_val or 0) - (prior_val or 0)
context.log.info(
"CPI %s: %.2f%% (prior %s: %.2f%%, surprise: %+.2f pp)",
latest_date, latest_val, prior_date, prior_val, surprise,
)
if surprise >= _SURPRISE_THRESHOLD and _ALERT_WEBHOOK:
payload = {
"text": (
f":fire: *USD CPI Surprise* — latest {latest_date}: *{latest_val:.1f}%* "
f"vs prior {prior_date}: {prior_val:.1f}% "
f"(surprise: *{surprise:+.2f} pp*)"
)
}
http.post(_ALERT_WEBHOOK, json=payload, timeout=8)
context.log.warning("CPI surprise alert fired: %+.2f pp", surprise)
return SensorResult(cursor=str(latest_date))
Registrieren Sie den Sensor ein . __init__.py Sie werden dann in die Definitions
sensors Liste:
from .sensors import cpi_surprise_alert_sensor
defs = Definitions(
# ... existing fields ...
sensors=[cpi_surprise_alert_sensor],
)
- Schritt 7 -
Schritt 7 Starten Sie die Dagster-Benutzeroberfläche und starten Sie Ihre erste Verwirklichung
Starten Sie den lokalen Entwicklerserver. Definitions Durch die ...
pyproject.toml Eintrittspunkt:
dagster dev
Navigieren Sie zu http://localhost:3000Du wirst sehen , wie ich ... Vermögenswertdiagramm Die Daten sind in einem Bild der fünf Vermögenswerte in ihre logischen Gruppen zusammengefasst (macro_indicators- Ich weiß . fx_rates- Ich weiß .
calendarKlicken Sie auf Alles materialisieren um den ersten Einnahmepass innerhalb von Sekunden auszuführen, wird der Asset-Fenster Zeilenzahlen, neueste Daten und neueste Werte direkt aus der FXMacroData API anzeigen.
Überprüfen der Daten in SQLite
sqlite3 fxmacro_data.db
sqlite> SELECT date, val FROM usd_policy_rate ORDER BY date DESC LIMIT 5;
sqlite> SELECT date, val FROM usd_cpi ORDER BY date DESC LIMIT 5;
sqlite> SELECT date, val FROM eurusd_spot ORDER BY date DESC LIMIT 3;
Um den Zeitplan zu aktivieren, navigieren Sie zu Übersicht → Zeitpläne in der Dagster Benutzeroberfläche und schalten
macro_daily_at_0630_utc - Ich bin hier . Ich laufe .Dagsters Daemon-Prozess wird nun jeden Wochentag um 06:30 UTC aufwachen und die Makro-Indikatoren-Assets automatisch verwirklichen.
- Schritt 8: Prüfung
Schritt 8 Schreiben von Einheitstests mit einer Scheinressource
Einer der wichtigsten Vorteile von Dagster ist, dass Assets normale Python-Funktionen sind einfach zu testen, ohne den gesamten Orchestrierungstapel auszuführen.
# tests/test_assets.py
import sqlite3
import tempfile
import os
from unittest.mock import MagicMock
import pytest
from dagster import build_asset_context, materialize
from fxmacro_pipeline.assets import usd_policy_rate, usd_cpi
from fxmacro_pipeline.resources import FXMacroDataResource
MOCK_RATE_DATA = [
{"date": "2025-12-18", "val": 4.5, "announcement_datetime": "2025-12-18T19:00:00Z"},
{"date": "2025-11-07", "val": 4.75, "announcement_datetime": "2025-11-07T19:00:00Z"},
]
MOCK_CPI_DATA = [
{"date": "2025-12-11", "val": 2.7, "announcement_datetime": "2025-12-11T13:30:00Z"},
{"date": "2025-11-13", "val": 2.6, "announcement_datetime": "2025-11-13T13:30:00Z"},
]
class MockFXMacroData(FXMacroDataResource):
def get_announcements(self, currency, indicator, start="2020-01-01"):
if indicator == "policy_rate":
return MOCK_RATE_DATA
if indicator == "inflation":
return MOCK_CPI_DATA
return []
def get_forex(self, base, quote, start="2023-01-01"):
return [{"date": "2025-12-31", "val": 1.0482, "announcement_datetime": None}]
def get_calendar(self, currency):
return []
@pytest.fixture(autouse=True)
def temp_db(tmp_path, monkeypatch):
monkeypatch.setenv("FXMACRO_DB", str(tmp_path / "test.db"))
def test_usd_policy_rate_materialises():
result = materialize(
[usd_policy_rate],
resources={"fxmacrodata": MockFXMacroData()},
)
assert result.success
mat = result.asset_materializations_for_node("usd_policy_rate")[0]
assert mat.metadata["row_count"].value == 2
assert mat.metadata["latest_val"].value == pytest.approx(4.5)
def test_usd_cpi_materialises():
result = materialize(
[usd_cpi],
resources={"fxmacrodata": MockFXMacroData()},
)
assert result.success
mat = result.asset_materializations_for_node("usd_cpi")[0]
assert mat.metadata["latest_val"].value == pytest.approx(2.7)
pip install pytest
pytest tests/ -v
- Schritt 9: Produktion
Schritt 9 Einsatz in der Produktion mit Dagster+
Für Teams, die ein gehostetes Kontrollflugzeug ohne Verwaltung ihres eigenen Daemons wollen, Dagster+
(das Managed Cloud-Angebot) erfordert nur einen einzigen zusätzlichen Konfigurationsschritt: dagster dev
mit dem Dagster Cloud Agent, der das gleiche liest. Definitions Objekt und führt Laufvorgänge in Ihrer Infrastruktur aus.
Setzen Sie Ihren API-Schlüssel als Umgebungsvariable von Dagster+ (Einsatz → Umgebungsvariablen) mit dem Namen FXMACRO_API_KEY. Die Ressource wird sie automatisch abholen keine Codeänderungen erforderlich. Das gleiche Muster gilt für jede CI/CD-Umgebung, die Geheimnisse als Umgebungsvariablen injiziert (GitHub Actions, GitLab CI, CircleCI).
Um auf einem selbst gehosteten Kubernetes-Cluster zu implementieren, packen Sie die Pipeline als Docker-Image ein, schieben Sie sie in Ihre Registrierung und zeigen Sie das Dagster Helm-Diagramm darauf.
fxmacrodata.com stellen Sie sicher, dass die Ausgangsrichtlinie Ihres Clusters HTTPS zu diesem Host zulässt.
Zusammenfassung
Was du gebaut hast
Durch die Einhaltung dieser Anleitung haben Sie jetzt eine funktionierende Dagster-Pipeline, die:
- Definition eines wiederverwendbaren, injizierbaren
FXMacroDataResourceDas verarbeitet alle API-Authentifizierung und Fehlerbearbeitung an einem Ort - Die Daten werden in einem System mit einem Datenverzeichnis von SQLite und einem Datenbankverzeichnissen erfasst.
- Planen der Wochentag aktualisiert um 06:30 UTC über einen cron-gesteuerten Dagster Zeitplan, so dass Sie immer up-to-date Messwerte vor den europäischen Märkten öffnen
- Wird mit einem Webhook-Alarm ausgelöst, wenn ein CPI-Druck überrascht, und gibt den nachgelagerten Systemen einen ereignisgesteuerten Auslöser anstelle einer Umfrageschleife
- Enthält schnelle Offline-Einheitstests mit einer Scheinressource, so dass die CI-Pipeline nie Live-API-Aufrufe macht
Nächste Schritte
- Erweitern Sie die Pipeline um weitere Währungen Siehe vollständigen Indikatorkatalog unter /api-daten-docs und neue Vermögenswerte nach dem gleichen Muster hinzufügen
- Ersetzen Sie den SQLite-Speicher durch einen Postgres- oder BigQuery-E/A-Manager für die Produktionsskala
- Fügen Sie einen nachgelagerten Vermögenswert hinzu, der aus der DB liest, einen Makro-Divergenz-Score berechnet und Handelssignale in eine Signaltabelle schreibt Ihre Strategielogik von der Einnahme-Schicht getrennt hält
- Erforschen . COT-Positionsdaten als Stimmungsüberlagerung zur Ergänzung der Indikatorenreihe