Dagster adalah platform orkestrasi data asli Python yang dibangun di sekitar konsep aset software-defined: artefak data yang tahu garis keturunan mereka sendiri, dapat dimaterialisasi sesuai permintaan atau jadwal, dan mengekspos metadata kaya dan pengamatan keluar dari kotak. Untuk alur kerja data makro menarik seri indikator, mendeteksi peristiwa rilis, menyimpan snapshot, dan memperingatkan tentang anomali Dagster sangat cocok. Panduan ini berjalan melalui membangun pipa yang menelan indikator FXMacroData ke dalam toko SQLite lokal, permukaan acara kalender rilis mendatang, dan berjalan secara otomatis pada jadwal harian.
Apa yang akan Anda bangun
- Sumber FXMacroData Dagster klien API yang dapat digunakan kembali dan dapat dikonfigurasi yang dibagikan di semua aset
- Empat aset software-defined suku bunga kebijakan, CPI, forex spot, dan kalender rilis untuk pasangan mata uang yang dipilih
- Pekerjaan dan jadwal harian berjalan setiap hari kerja pagi sebelum London terbuka untuk menyegarkan pembacaan terbaru
- Sensor anomali menatap inflasi tak terduga dan menembakkan peringatan melalui webhook
Persyaratan
- Python 3.10+ semua cuplikan menggunakan tip tip modern dan
matchpernyataan - Kunci API FXMacroData daftar di /langganan; Data indikator USD dapat diakses publik tanpa kunci
- Keterampilan dasar Dagster Anda harus tahu apa aset dan pekerjaan; Dagster cepat memulai mencakup dasar-dasar dalam sepuluh menit
- Langkah 1 -
Langkah 1 Menginstal Dagster dan ketergantungan proyek
Buatlah lingkungan virtual baru dan install Dagster di samping kumpulan kecil dari perpustakaan pipa ini menggunakan. dagster Dan dagster-webserver Untuk versi yang sama menghindari perangkap instalasi yang paling umum.
python -m venv .venv
source .venv/bin/activate # Windows: .venv\Scripts\activate
pip install "dagster==1.9.*" "dagster-webserver==1.9.*" \
requests pandas sqlalchemy
Inisialisasi tata letak proyek minimal. perintah perancah Dagster menciptakan struktur direktori dan
pyproject.toml yang memungkinkan UI lokal menemukan definisi Anda secara otomatis.
dagster project scaffold --name fxmacro_pipeline
cd fxmacro_pipeline
Simpan kunci API Anda sebagai variabel lingkungan. Jangan pernah hard-code kredensial dalam file sumber.
export FXMACRO_API_KEY="YOUR_API_KEY"
Pada host produksi, suntikkan kunci melalui manajemen rahasia penjadwal Anda (rahasia tindakan GitHub, rahasia Kubernetes, variabel lingkungan Dagster Cloud).
- Langkah 2 -
Langkah 2 Mendefinisikan sumber FXMacroData yang dapat digunakan kembali
Dagster. sumber daya adalah ketergantungan yang dapat diinjeksikan yang dapat dibagi mirip dengan koneksi database atau sesi HTTP. Membungkus FXMacroData REST API dalam sumber daya berarti setiap aset dapat memanggilnya tanpa menduplikasi logika otentikasi atau penanganan timeout.
Ciptakan fxmacro_pipeline/resources.pyAku tidak tahu.
import os
from typing import Any
import requests
from dagster import ConfigurableResource, get_dagster_logger
_BASE_URL = "https://fxmacrodata.com/api/v1"
class FXMacroDataResource(ConfigurableResource):
"""Thin wrapper around the FXMacroData REST API."""
api_key: str = "" # overridden at run-time from env
timeout: int = 15
def _get(self, path: str, **params: Any) -> dict:
logger = get_dagster_logger()
key = self.api_key or os.environ.get("FXMACRO_API_KEY", "")
resp = requests.get(
f"{_BASE_URL}{path}",
params={"api_key": key, **params},
timeout=self.timeout,
)
resp.raise_for_status()
data = resp.json()
logger.debug("GET %s → %d records", path, len(data.get("data", [])))
return data
def get_announcements(
self,
currency: str,
indicator: str,
start: str = "2020-01-01",
) -> list[dict]:
"""Return time-series records for a macro indicator."""
result = self._get(
f"/announcements/{currency.lower()}/{indicator}",
start=start,
)
return result.get("data", [])
def get_forex(self, base: str, quote: str, start: str = "2023-01-01") -> list[dict]:
"""Return daily FX spot-rate records."""
result = self._get(f"/forex/{base.lower()}/{quote.lower()}", start=start)
return result.get("data", [])
def get_calendar(self, currency: str) -> list[dict]:
"""Return the upcoming release calendar for a currency."""
result = self._get(f"/calendar/{currency.lower()}")
return result.get("data", [])
Mengapa ConfigurableResource?
ConfigurableResource memungkinkan Anda untuk menukar kredensial dan waktu antara dev dan prod tanpa menyentuh kode aset. Anda mengkonfigurasi sumber daya sekali dalam Definitions dan Dagster menyuntikkannya di mana pun yang dinyatakan sebagai parameter.
- Langkah 3 -
Langkah 3 Mendefinisikan aset data makro
Setiap aset Dagster mewakili satu produk data logis. Mematerialisasi aset mengambil data terbaru dari FXMacroData dan bertahan secara lokal. Tingkat kebijakan Fed, yang US CPI, yang Nilai tukar mata uang, dan seri spot EUR/USD. Aset kelima kemudian membaca kalender rilis sehingga pipa tahu kapan pengumuman berdampak tinggi berikutnya.
Ciptakan fxmacro_pipeline/assets.pyAku tidak tahu.
import os
import sqlite3
from datetime import date, timedelta
import pandas as pd
from dagster import (
AssetExecutionContext,
MaterializeResult,
MetadataValue,
asset,
)
from .resources import FXMacroDataResource
_DB_PATH = os.environ.get("FXMACRO_DB", "fxmacro_data.db")
def _ensure_table(conn: sqlite3.Connection, table: str) -> None:
conn.execute(
f"""
CREATE TABLE IF NOT EXISTS {table} (
date TEXT PRIMARY KEY,
val REAL,
announcement_datetime TEXT
)
"""
)
conn.commit()
def _upsert(conn: sqlite3.Connection, table: str, records: list[dict]) -> int:
rows = [
(r["date"], r.get("val"), r.get("announcement_datetime"))
for r in records
if r.get("date") and r.get("val") is not None
]
conn.executemany(
f"INSERT OR REPLACE INTO {table} (date, val, announcement_datetime) VALUES (?,?,?)",
rows,
)
conn.commit()
return len(rows)
@asset(group_name="macro_indicators", compute_kind="api")
def usd_policy_rate(
context: AssetExecutionContext,
fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
"""Latest Fed funds rate series from the FXMacroData announcements endpoint."""
records = fxmacrodata.get_announcements("usd", "policy_rate", start="2015-01-01")
with sqlite3.connect(_DB_PATH) as conn:
_ensure_table(conn, "usd_policy_rate")
n = _upsert(conn, "usd_policy_rate", records)
latest = records[0] if records else {}
context.log.info("Fed rate: %s%% (as of %s)", latest.get("val"), latest.get("date"))
return MaterializeResult(
metadata={
"row_count": MetadataValue.int(n),
"latest_date": MetadataValue.text(latest.get("date", "n/a")),
"latest_val": MetadataValue.float(latest.get("val") or 0.0),
}
)
@asset(group_name="macro_indicators", compute_kind="api")
def usd_cpi(
context: AssetExecutionContext,
fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
"""US CPI (year-on-year) series. Tracks whether inflation is rising or falling."""
records = fxmacrodata.get_announcements("usd", "inflation", start="2015-01-01")
with sqlite3.connect(_DB_PATH) as conn:
_ensure_table(conn, "usd_cpi")
n = _upsert(conn, "usd_cpi", records)
latest = records[0] if records else {}
context.log.info("US CPI: %s%% (as of %s)", latest.get("val"), latest.get("date"))
return MaterializeResult(
metadata={
"row_count": MetadataValue.int(n),
"latest_date": MetadataValue.text(latest.get("date", "n/a")),
"latest_val": MetadataValue.float(latest.get("val") or 0.0),
}
)
@asset(group_name="macro_indicators", compute_kind="api")
def eur_policy_rate(
context: AssetExecutionContext,
fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
"""ECB main refinancing rate series."""
records = fxmacrodata.get_announcements("eur", "policy_rate", start="2015-01-01")
with sqlite3.connect(_DB_PATH) as conn:
_ensure_table(conn, "eur_policy_rate")
n = _upsert(conn, "eur_policy_rate", records)
latest = records[0] if records else {}
context.log.info("ECB rate: %s%% (as of %s)", latest.get("val"), latest.get("date"))
return MaterializeResult(
metadata={
"row_count": MetadataValue.int(n),
"latest_date": MetadataValue.text(latest.get("date", "n/a")),
"latest_val": MetadataValue.float(latest.get("val") or 0.0),
}
)
@asset(group_name="fx_rates", compute_kind="api")
def eurusd_spot(
context: AssetExecutionContext,
fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
"""Daily EUR/USD closing spot rate."""
records = fxmacrodata.get_forex("eur", "usd", start="2023-01-01")
with sqlite3.connect(_DB_PATH) as conn:
_ensure_table(conn, "eurusd_spot")
n = _upsert(conn, "eurusd_spot", records)
latest = records[0] if records else {}
context.log.info("EUR/USD: %s (as of %s)", latest.get("val"), latest.get("date"))
return MaterializeResult(
metadata={
"row_count": MetadataValue.int(n),
"latest_date": MetadataValue.text(latest.get("date", "n/a")),
"latest_val": MetadataValue.float(latest.get("val") or 0.0),
}
)
@asset(group_name="calendar", compute_kind="api")
def usd_release_calendar(
context: AssetExecutionContext,
fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
"""Upcoming USD macro release dates and times."""
events = fxmacrodata.get_calendar("usd")
df = pd.DataFrame(events) if events else pd.DataFrame()
context.log.info("Found %d upcoming USD events", len(df))
# Surface a preview in the Dagster UI asset metadata pane
preview = df.head(5).to_markdown(index=False) if not df.empty else "No events found"
return MaterializeResult(
metadata={
"event_count": MetadataValue.int(len(df)),
"preview": MetadataValue.md(preview),
}
)
Metadata aset di UI Dagster
Setiap MaterializeResult mengembalikan metadata terstruktur yang digambarkan Dagster di panel detail aset jumlah baris, tanggal terakhir, tabel pratinjau.
- Langkah 4 -
Langkah 4 Buat pekerjaan dan jadwal harian
Dagster. pekerjaan memilih aset mana yang akan terwujud dalam satu run. jadwal Kami ingin pembacaan makro segar setiap hari kerja pagi pukul 06:30 UTC sebelum Frankfurt terbuka sehingga setiap indikator yang dirilis semalam ditangkap sebelum perdagangan Eropa dimulai.
Ciptakan fxmacro_pipeline/jobs.pyAku tidak tahu.
from dagster import (
AssetSelection,
ScheduleDefinition,
define_asset_job,
)
# ── Jobs ──────────────────────────────────────────────────────────────────────
macro_refresh_job = define_asset_job(
name="macro_refresh",
description="Refresh all FXMacroData indicator and FX rate assets.",
selection=AssetSelection.groups("macro_indicators", "fx_rates"),
)
calendar_refresh_job = define_asset_job(
name="calendar_refresh",
description="Pull the latest USD release calendar.",
selection=AssetSelection.groups("calendar"),
)
# ── Schedules ─────────────────────────────────────────────────────────────────
# Weekdays at 06:30 UTC — 30 min before Frankfurt opens
macro_daily_schedule = ScheduleDefinition(
name="macro_daily_at_0630_utc",
job=macro_refresh_job,
cron_schedule="30 6 * * 1-5",
execution_timezone="UTC",
)
# Refresh the release calendar every Sunday evening so the week's
# upcoming events are loaded before Monday's first run.
calendar_weekly_schedule = ScheduleDefinition(
name="calendar_weekly_on_sunday",
job=calendar_refresh_job,
cron_schedule="0 18 * * 0",
execution_timezone="UTC",
)
Dua pekerjaan terpisah satu untuk seri indikator, satu untuk kalender rilis memungkinkan Anda untuk mengamati dan menjalankan ulang secara independen. Jika pengambilan kalender rilan gagal (misalnya selama hari libur umum ketika titik akhir mengembalikan muatan kosong), pekerjaan pembaruan indikator tidak terpengaruh.
- Langkah 5 -
Langkah 5 Kabel semua ke dalam objek Definisi
Dagster's. Definitions objek adalah titik masuk tunggal yang menghubungkan aset, sumber daya, pekerjaan, dan jadwal bersama-sama. fxmacro_pipeline/__init__.pyAku tidak tahu.
import os
from dagster import Definitions
from .assets import (
eur_policy_rate,
eurusd_spot,
usd_cpi,
usd_policy_rate,
usd_release_calendar,
)
from .jobs import (
calendar_refresh_job,
calendar_weekly_schedule,
macro_daily_schedule,
macro_refresh_job,
)
from .resources import FXMacroDataResource
defs = Definitions(
assets=[
usd_policy_rate,
usd_cpi,
eur_policy_rate,
eurusd_spot,
usd_release_calendar,
],
resources={
"fxmacrodata": FXMacroDataResource(
# Falls back to the FXMACRO_API_KEY env var inside the resource
api_key=os.environ.get("FXMACRO_API_KEY", ""),
),
},
jobs=[macro_refresh_job, calendar_refresh_job],
schedules=[macro_daily_schedule, calendar_weekly_schedule],
)
Injeksi sumber daya pada saat runtime
Setiap aset yang menyatakan fxmacrodata: FXMacroDataResource sebagai parameter secara otomatis menerima contoh yang dikonfigurasi pada saat runtime. untuk menukarkan dalam simulasi untuk pengujian, override sumber daya dalam tes Anda Definitions kode aset tidak berubah.
- Langkah 6 -
Langkah 6 Tambahkan sensor untuk memperingatkan pada tanda-tanda inflasi yang mengejutkan
Sensor di Dagster jajak pendapat untuk kondisi eksternal dan pemicu berjalan atau saluran peringatan ketika sesuatu berubah. usd_cpi materialisasi: jika pembacaan CPI terbaru lebih dari 0,4 poin persentase di atas pembacaan sebelumnya, itu memicu peringatan webhook sehingga strategi atau sistem risiko Anda dapat bereaksi segera.
Tambahkan fxmacro_pipeline/sensors.pyAku tidak tahu.
import os
import sqlite3
import requests as http
from dagster import (
RunStatusSensorContext,
asset_sensor,
AssetKey,
EventLogEntry,
SensorResult,
SkipReason,
)
_DB_PATH = os.environ.get("FXMACRO_DB", "fxmacro_data.db")
_ALERT_WEBHOOK = os.environ.get("ALERT_WEBHOOK_URL", "")
_SURPRISE_THRESHOLD = 0.4 # percentage points
@asset_sensor(asset_key=AssetKey("usd_cpi"), job_name="macro_refresh")
def cpi_surprise_alert_sensor(
context: RunStatusSensorContext,
asset_event: EventLogEntry,
) -> SensorResult | SkipReason:
"""Alert when the latest US CPI print is significantly above the prior reading."""
try:
conn = sqlite3.connect(_DB_PATH)
rows = conn.execute(
"SELECT date, val FROM usd_cpi ORDER BY date DESC LIMIT 2"
).fetchall()
conn.close()
except Exception as exc:
return SkipReason(f"DB read failed: {exc}")
if len(rows) < 2:
return SkipReason("Not enough CPI history yet.")
latest_date, latest_val = rows[0]
prior_date, prior_val = rows[1]
surprise = (latest_val or 0) - (prior_val or 0)
context.log.info(
"CPI %s: %.2f%% (prior %s: %.2f%%, surprise: %+.2f pp)",
latest_date, latest_val, prior_date, prior_val, surprise,
)
if surprise >= _SURPRISE_THRESHOLD and _ALERT_WEBHOOK:
payload = {
"text": (
f":fire: *USD CPI Surprise* — latest {latest_date}: *{latest_val:.1f}%* "
f"vs prior {prior_date}: {prior_val:.1f}% "
f"(surprise: *{surprise:+.2f} pp*)"
)
}
http.post(_ALERT_WEBHOOK, json=payload, timeout=8)
context.log.warning("CPI surprise alert fired: %+.2f pp", surprise)
return SensorResult(cursor=str(latest_date))
Masukkan sensornya. __init__.py dengan mengimpornya dan menambahkannya ke Definitions
sensors Daftar:
from .sensors import cpi_surprise_alert_sensor
defs = Definitions(
# ... existing fields ...
sensors=[cpi_surprise_alert_sensor],
)
- Langkah 7 -
Langkah 7 Luncurkan Dagster UI dan jalankan materialisasi pertama Anda
Mulai server pengembangan lokal. Definitions melalui
pyproject.toml Tempat masuk:
dagster dev
Navigasi ke http://localhost:3000Kau akan melihat Grafik Aset menunjukkan semua lima aset dikelompokkan ke dalam kelompok logis mereka (macro_indicatorsAku akan pergi. fx_ratesAku akan pergi.
calendarKlik Mematerialkan Semuanya Untuk menjalankan pertama intake pass dalam hitungan detik, panel aset akan menunjukkan jumlah baris, tanggal terbaru, dan nilai terbaru ditarik langsung dari FXMacroData API.
Memverifikasi data di SQLite
sqlite3 fxmacro_data.db
sqlite> SELECT date, val FROM usd_policy_rate ORDER BY date DESC LIMIT 5;
sqlite> SELECT date, val FROM usd_cpi ORDER BY date DESC LIMIT 5;
sqlite> SELECT date, val FROM eurusd_spot ORDER BY date DESC LIMIT 3;
Untuk mengaktifkan jadwal, navigasi ke Tinjauan → Jadwal di UI Dagster dan beralih
macro_daily_at_0630_utc Untuk Berlari. Proses Daemon Dagster sekarang akan bangun setiap hari kerja pada 06:30 UTC dan mematerialisasi aset indikator makro secara otomatis.
Langkah 8: Uji
Langkah 8 Menulis tes unit dengan sumber daya simulasi
Salah satu manfaat desain Dagster adalah bahwa aset adalah fungsi Python biasa mudah untuk unit-test tanpa menjalankan tumpukan orkestrasi penuh.
# tests/test_assets.py
import sqlite3
import tempfile
import os
from unittest.mock import MagicMock
import pytest
from dagster import build_asset_context, materialize
from fxmacro_pipeline.assets import usd_policy_rate, usd_cpi
from fxmacro_pipeline.resources import FXMacroDataResource
MOCK_RATE_DATA = [
{"date": "2025-12-18", "val": 4.5, "announcement_datetime": "2025-12-18T19:00:00Z"},
{"date": "2025-11-07", "val": 4.75, "announcement_datetime": "2025-11-07T19:00:00Z"},
]
MOCK_CPI_DATA = [
{"date": "2025-12-11", "val": 2.7, "announcement_datetime": "2025-12-11T13:30:00Z"},
{"date": "2025-11-13", "val": 2.6, "announcement_datetime": "2025-11-13T13:30:00Z"},
]
class MockFXMacroData(FXMacroDataResource):
def get_announcements(self, currency, indicator, start="2020-01-01"):
if indicator == "policy_rate":
return MOCK_RATE_DATA
if indicator == "inflation":
return MOCK_CPI_DATA
return []
def get_forex(self, base, quote, start="2023-01-01"):
return [{"date": "2025-12-31", "val": 1.0482, "announcement_datetime": None}]
def get_calendar(self, currency):
return []
@pytest.fixture(autouse=True)
def temp_db(tmp_path, monkeypatch):
monkeypatch.setenv("FXMACRO_DB", str(tmp_path / "test.db"))
def test_usd_policy_rate_materialises():
result = materialize(
[usd_policy_rate],
resources={"fxmacrodata": MockFXMacroData()},
)
assert result.success
mat = result.asset_materializations_for_node("usd_policy_rate")[0]
assert mat.metadata["row_count"].value == 2
assert mat.metadata["latest_val"].value == pytest.approx(4.5)
def test_usd_cpi_materialises():
result = materialize(
[usd_cpi],
resources={"fxmacrodata": MockFXMacroData()},
)
assert result.success
mat = result.asset_materializations_for_node("usd_cpi")[0]
assert mat.metadata["latest_val"].value == pytest.approx(2.7)
pip install pytest
pytest tests/ -v
Langkah 9: Produksi
Langkah 9 Mengerahkan ke produksi dengan Dagster+
Untuk tim yang ingin pesawat kontrol yang dihosting tanpa mengelola daemon mereka sendiri, Dagster+
(managed cloud offering) hanya membutuhkan satu langkah konfigurasi tambahan: mengganti dagster dev
dengan agen Dagster Cloud, yang berbunyi sama. Definitions objek dan menjalankan berjalan di infrastruktur Anda.
Atur kunci API Anda sebagai variabel lingkungan Dagster + (Pengiriman → Variabel LingkunganDengan nama FXMACRO_API_KEY. Sumber akan mengambilnya secara otomatis tidak ada perubahan kode yang diperlukan. Pola yang sama berlaku untuk setiap lingkungan CI / CD yang menyuntikkan rahasia sebagai variabel lingkungan (GitHub Actions, GitLab CI, CircleCI).
Untuk menyebarkan pada kelompok Kubernetes yang dihosting sendiri, paket pipa sebagai gambar Docker, mendorong ke registri Anda, dan mengarahkan bagan Dagster Helm ke dalamnya.
fxmacrodata.com pastikan kebijakan keluar cluster Anda memungkinkan HTTPS ke host itu.
- Ringkasan -
Apa yang kau bangun
Dengan mengikuti panduan ini Anda sekarang memiliki pipa Dagster yang bekerja yang:
- Mengidentifikasi bahan yang dapat digunakan kembali, injeksi
FXMacroDataResourceyang menangani semua API auth dan pengelolaan kesalahan di satu tempat - Mengeksposisi lima aset yang ditentukan perangkat lunak suku bunga Fed, US CPI, suku bunga ECB, EUR/USD spot dan kalender rilis USD masing-masing bertahan ke SQLite dengan metadata Dagster lengkap
- Jadwal hari kerja memperbarui pada 06:30 UTC melalui jadwal Dagster yang didorong cron sehingga Anda selalu memiliki pembacaan terbaru sebelum pasar Eropa dibuka
- Menembakkan peringatan webhook ketika CPI cetak kejutan ke atas, memberikan sistem hilir pemicu yang didorong oleh peristiwa daripada lingkaran jajak pendapat
- Termasuk tes unit offline cepat menggunakan sumber daya simulasi, sehingga pipa CI tidak pernah melakukan panggilan API langsung
Langkah selanjutnya
- Perluas pipa dengan mata uang tambahan melihat katalog indikator penuh di /api-data-docs dan menambahkan aset baru mengikuti pola yang sama
- Ganti penyimpanan SQLite dengan Postgres atau BigQuery I / O manajer untuk skala produksi
- Tambahkan aset hilir yang membaca dari DB, menghitung skor divergensi makro, dan menulis sinyal perdagangan ke tabel sinyal menjaga logika strategi Anda terpisah dari lapisan konsumsi
- Menjelajahi. Data posisi COT sebagai sentiment overlay untuk melengkapi seri indikator