达格斯特是一个基于Python的数据编排平台, 软件定义资产: 知道自己的血统的数据文物,可以按需求或时间表实现,并将丰富的元数据和可观察性暴露在盒子外.对于宏观数据工作流程,拉动指标系列,检测发布事件,存储快照,并警告异常情况.
你将要建造什么
- 一个FXMacroData Dagster资源 可重复使用,可配置的API客户端,共享在所有资产
- 四个软件定义资产 政策利率,CPI,外汇现货,以及所选货币对的发布日程
- 每日工作和时间表 每周早上在伦敦开幕前运行,以更新最新的读数
- 一个异常感应器 监视意外的通胀,并通过网络连接发出警报
预先要求
现在我们要做什么?
步骤1 安装Dagster和项目依赖
创建一个新的虚拟环境,并安装Dagster,并使用这个管道的小图书馆. dagster 现在我 dagster-webserver 避免了最常见的安装陷.
python -m venv .venv
source .venv/bin/activate # Windows: .venv\Scripts\activate
pip install "dagster==1.9.*" "dagster-webserver==1.9.*" \
requests pandas sqlalchemy
启动一个最小的项目布局. Dagster的脚手架命令创建目录结构和一个
pyproject.toml 让本地用户界面自动发现你的定义.
dagster project scaffold --name fxmacro_pipeline
cd fxmacro_pipeline
存储您的API密钥作为环境变量. 永远不要在源文件中硬代码凭证.
export FXMACRO_API_KEY="YOUR_API_KEY"
在生产主机上,通过您的调度器的秘密管理 (GitHub 操作秘密,Kubernetes 秘密,Dagster Cloud 环境变量) 插入密钥.管道在运行时从环境中读取它. 无需对代码进行更改.
现在我们要做什么?
步骤2 定义可重复使用的FXMacroData资源
一个子. 资源 是一个可注入的共享依赖性,类似于数据库连接或HTTP会话. 将FXMacroData REST API包装在资源中意味着每个资产都可以调用它,而不需要重复身份验证逻辑或时间止步处理.
创造 fxmacro_pipeline/resources.py没有人知道.
import os
from typing import Any
import requests
from dagster import ConfigurableResource, get_dagster_logger
_BASE_URL = "https://fxmacrodata.com/api/v1"
class FXMacroDataResource(ConfigurableResource):
"""Thin wrapper around the FXMacroData REST API."""
api_key: str = "" # overridden at run-time from env
timeout: int = 15
def _get(self, path: str, **params: Any) -> dict:
logger = get_dagster_logger()
key = self.api_key or os.environ.get("FXMACRO_API_KEY", "")
resp = requests.get(
f"{_BASE_URL}{path}",
params={"api_key": key, **params},
timeout=self.timeout,
)
resp.raise_for_status()
data = resp.json()
logger.debug("GET %s → %d records", path, len(data.get("data", [])))
return data
def get_announcements(
self,
currency: str,
indicator: str,
start: str = "2020-01-01",
) -> list[dict]:
"""Return time-series records for a macro indicator."""
result = self._get(
f"/announcements/{currency.lower()}/{indicator}",
start=start,
)
return result.get("data", [])
def get_forex(self, base: str, quote: str, start: str = "2023-01-01") -> list[dict]:
"""Return daily FX spot-rate records."""
result = self._get(f"/forex/{base.lower()}/{quote.lower()}", start=start)
return result.get("data", [])
def get_calendar(self, currency: str) -> list[dict]:
"""Return the upcoming release calendar for a currency."""
result = self._get(f"/calendar/{currency.lower()}")
return result.get("data", [])
为什么要配置资源?
ConfigurableResource 让你在开发和制作之间交换凭证和时间止息,而不需要触摸资产代码. Definitions 并且Dagster将其注入任何被声明为参数的地方.
现在我们要做什么?
步骤3 定义宏观数据资产
每个Dagster资产都代表一个逻辑数据产品. 实现资产将从FXMacroData获取最新数据并将其局部保留. 我们定义四个资产: 美联储政策利率没有 美国CPI没有 欧洲央行政策利率五个资产读取发布日历,以便管道知道下一个影响力很大的公告何时到期.
创造 fxmacro_pipeline/assets.py没有人知道.
import os
import sqlite3
from datetime import date, timedelta
import pandas as pd
from dagster import (
AssetExecutionContext,
MaterializeResult,
MetadataValue,
asset,
)
from .resources import FXMacroDataResource
_DB_PATH = os.environ.get("FXMACRO_DB", "fxmacro_data.db")
def _ensure_table(conn: sqlite3.Connection, table: str) -> None:
conn.execute(
f"""
CREATE TABLE IF NOT EXISTS {table} (
date TEXT PRIMARY KEY,
val REAL,
announcement_datetime TEXT
)
"""
)
conn.commit()
def _upsert(conn: sqlite3.Connection, table: str, records: list[dict]) -> int:
rows = [
(r["date"], r.get("val"), r.get("announcement_datetime"))
for r in records
if r.get("date") and r.get("val") is not None
]
conn.executemany(
f"INSERT OR REPLACE INTO {table} (date, val, announcement_datetime) VALUES (?,?,?)",
rows,
)
conn.commit()
return len(rows)
@asset(group_name="macro_indicators", compute_kind="api")
def usd_policy_rate(
context: AssetExecutionContext,
fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
"""Latest Fed funds rate series from the FXMacroData announcements endpoint."""
records = fxmacrodata.get_announcements("usd", "policy_rate", start="2015-01-01")
with sqlite3.connect(_DB_PATH) as conn:
_ensure_table(conn, "usd_policy_rate")
n = _upsert(conn, "usd_policy_rate", records)
latest = records[0] if records else {}
context.log.info("Fed rate: %s%% (as of %s)", latest.get("val"), latest.get("date"))
return MaterializeResult(
metadata={
"row_count": MetadataValue.int(n),
"latest_date": MetadataValue.text(latest.get("date", "n/a")),
"latest_val": MetadataValue.float(latest.get("val") or 0.0),
}
)
@asset(group_name="macro_indicators", compute_kind="api")
def usd_cpi(
context: AssetExecutionContext,
fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
"""US CPI (year-on-year) series. Tracks whether inflation is rising or falling."""
records = fxmacrodata.get_announcements("usd", "inflation", start="2015-01-01")
with sqlite3.connect(_DB_PATH) as conn:
_ensure_table(conn, "usd_cpi")
n = _upsert(conn, "usd_cpi", records)
latest = records[0] if records else {}
context.log.info("US CPI: %s%% (as of %s)", latest.get("val"), latest.get("date"))
return MaterializeResult(
metadata={
"row_count": MetadataValue.int(n),
"latest_date": MetadataValue.text(latest.get("date", "n/a")),
"latest_val": MetadataValue.float(latest.get("val") or 0.0),
}
)
@asset(group_name="macro_indicators", compute_kind="api")
def eur_policy_rate(
context: AssetExecutionContext,
fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
"""ECB main refinancing rate series."""
records = fxmacrodata.get_announcements("eur", "policy_rate", start="2015-01-01")
with sqlite3.connect(_DB_PATH) as conn:
_ensure_table(conn, "eur_policy_rate")
n = _upsert(conn, "eur_policy_rate", records)
latest = records[0] if records else {}
context.log.info("ECB rate: %s%% (as of %s)", latest.get("val"), latest.get("date"))
return MaterializeResult(
metadata={
"row_count": MetadataValue.int(n),
"latest_date": MetadataValue.text(latest.get("date", "n/a")),
"latest_val": MetadataValue.float(latest.get("val") or 0.0),
}
)
@asset(group_name="fx_rates", compute_kind="api")
def eurusd_spot(
context: AssetExecutionContext,
fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
"""Daily EUR/USD closing spot rate."""
records = fxmacrodata.get_forex("eur", "usd", start="2023-01-01")
with sqlite3.connect(_DB_PATH) as conn:
_ensure_table(conn, "eurusd_spot")
n = _upsert(conn, "eurusd_spot", records)
latest = records[0] if records else {}
context.log.info("EUR/USD: %s (as of %s)", latest.get("val"), latest.get("date"))
return MaterializeResult(
metadata={
"row_count": MetadataValue.int(n),
"latest_date": MetadataValue.text(latest.get("date", "n/a")),
"latest_val": MetadataValue.float(latest.get("val") or 0.0),
}
)
@asset(group_name="calendar", compute_kind="api")
def usd_release_calendar(
context: AssetExecutionContext,
fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
"""Upcoming USD macro release dates and times."""
events = fxmacrodata.get_calendar("usd")
df = pd.DataFrame(events) if events else pd.DataFrame()
context.log.info("Found %d upcoming USD events", len(df))
# Surface a preview in the Dagster UI asset metadata pane
preview = df.head(5).to_markdown(index=False) if not df.empty else "No events found"
return MaterializeResult(
metadata={
"event_count": MetadataValue.int(len(df)),
"preview": MetadataValue.md(preview),
}
)
在 Dagster UI 中的资产元数据
每一个 MaterializeResult 返回Dagster在资产详细窗口行数,最新日期,预览表中呈现的结构化元数据.这意味着您可以一眼查看是否实际上得到了新数据,而无需挖掘日志.
现在我们要做什么?
步骤4 建立一个工作和日程
一个子. 工作 选择哪些资产在一次运行中实现. 时间表 我们希望每周早上06:30 UTC前的新宏观读数,
创造 fxmacro_pipeline/jobs.py没有人知道.
from dagster import (
AssetSelection,
ScheduleDefinition,
define_asset_job,
)
# ── Jobs ──────────────────────────────────────────────────────────────────────
macro_refresh_job = define_asset_job(
name="macro_refresh",
description="Refresh all FXMacroData indicator and FX rate assets.",
selection=AssetSelection.groups("macro_indicators", "fx_rates"),
)
calendar_refresh_job = define_asset_job(
name="calendar_refresh",
description="Pull the latest USD release calendar.",
selection=AssetSelection.groups("calendar"),
)
# ── Schedules ─────────────────────────────────────────────────────────────────
# Weekdays at 06:30 UTC — 30 min before Frankfurt opens
macro_daily_schedule = ScheduleDefinition(
name="macro_daily_at_0630_utc",
job=macro_refresh_job,
cron_schedule="30 6 * * 1-5",
execution_timezone="UTC",
)
# Refresh the release calendar every Sunday evening so the week's
# upcoming events are loaded before Monday's first run.
calendar_weekly_schedule = ScheduleDefinition(
name="calendar_weekly_on_sunday",
job=calendar_refresh_job,
cron_schedule="0 18 * * 0",
execution_timezone="UTC",
)
两个独立的工作一个用于指标系列,一个用于发布日历让您独立观察并重新运行它们. 如果发布日程检索失败 (例如在公共假期期间,终端返回空负载时),则指标更新工作不会受到影响.
五步 没有
步骤5 将所有内容都连接到定义对象中
子的 Definitions 目标是将资产,资源,工作和时间表联系在一起的单一入口点. fxmacro_pipeline/__init__.py没有人知道.
import os
from dagster import Definitions
from .assets import (
eur_policy_rate,
eurusd_spot,
usd_cpi,
usd_policy_rate,
usd_release_calendar,
)
from .jobs import (
calendar_refresh_job,
calendar_weekly_schedule,
macro_daily_schedule,
macro_refresh_job,
)
from .resources import FXMacroDataResource
defs = Definitions(
assets=[
usd_policy_rate,
usd_cpi,
eur_policy_rate,
eurusd_spot,
usd_release_calendar,
],
resources={
"fxmacrodata": FXMacroDataResource(
# Falls back to the FXMACRO_API_KEY env var inside the resource
api_key=os.environ.get("FXMACRO_API_KEY", ""),
),
},
jobs=[macro_refresh_job, calendar_refresh_job],
schedules=[macro_daily_schedule, calendar_weekly_schedule],
)
在运行时的资源注入
任何一个资产, fxmacrodata: FXMacroDataResource 作为参数,在运行时自动接收配置的实例. 为了在模拟中进行测试,在测试中覆盖资源 Definitions 资产代码没有变化.
现在我们要做什么?
步骤6 添加传感器,提醒您出现意外的通胀痕迹
传感器在Dagster调查外部条件和触发运行或警报频道时,有些变化. usd_cpi 实际化:如果最新的CPI读数高于之前的0.4个百分点,则会发出网络连接警报,以便您的策略或风险系统可以立即反应.
加入 fxmacro_pipeline/sensors.py没有人知道.
import os
import sqlite3
import requests as http
from dagster import (
RunStatusSensorContext,
asset_sensor,
AssetKey,
EventLogEntry,
SensorResult,
SkipReason,
)
_DB_PATH = os.environ.get("FXMACRO_DB", "fxmacro_data.db")
_ALERT_WEBHOOK = os.environ.get("ALERT_WEBHOOK_URL", "")
_SURPRISE_THRESHOLD = 0.4 # percentage points
@asset_sensor(asset_key=AssetKey("usd_cpi"), job_name="macro_refresh")
def cpi_surprise_alert_sensor(
context: RunStatusSensorContext,
asset_event: EventLogEntry,
) -> SensorResult | SkipReason:
"""Alert when the latest US CPI print is significantly above the prior reading."""
try:
conn = sqlite3.connect(_DB_PATH)
rows = conn.execute(
"SELECT date, val FROM usd_cpi ORDER BY date DESC LIMIT 2"
).fetchall()
conn.close()
except Exception as exc:
return SkipReason(f"DB read failed: {exc}")
if len(rows) < 2:
return SkipReason("Not enough CPI history yet.")
latest_date, latest_val = rows[0]
prior_date, prior_val = rows[1]
surprise = (latest_val or 0) - (prior_val or 0)
context.log.info(
"CPI %s: %.2f%% (prior %s: %.2f%%, surprise: %+.2f pp)",
latest_date, latest_val, prior_date, prior_val, surprise,
)
if surprise >= _SURPRISE_THRESHOLD and _ALERT_WEBHOOK:
payload = {
"text": (
f":fire: *USD CPI Surprise* — latest {latest_date}: *{latest_val:.1f}%* "
f"vs prior {prior_date}: {prior_val:.1f}% "
f"(surprise: *{surprise:+.2f} pp*)"
)
}
http.post(_ALERT_WEBHOOK, json=payload, timeout=8)
context.log.warning("CPI surprise alert fired: %+.2f pp", surprise)
return SensorResult(cursor=str(latest_date))
登录传感器 __init__.py 通过导入并添加到 Definitions
sensors 列表:
from .sensors import cpi_surprise_alert_sensor
defs = Definitions(
# ... existing fields ...
sensors=[cpi_surprise_alert_sensor],
)
现在我们要做什么?
步骤7 启动Dagster UI并运行您的第一个实现
启动本地开发服务器. Definitions 通过
pyproject.toml 进入点:
dagster dev
导航到 http://localhost:3000你会看到 资产图 显示所有五个资产按逻辑组合 (macro_indicators没有人知道. fx_rates没有人知道.
calendar点击 让一切物质化 为了在几秒内运行第一个吞通行,资产窗口将显示行数,最新日期和最新值直接从FXMacroData API中提取.
在SQLite中验证数据
sqlite3 fxmacro_data.db
sqlite> SELECT date, val FROM usd_policy_rate ORDER BY date DESC LIMIT 5;
sqlite> SELECT date, val FROM usd_cpi ORDER BY date DESC LIMIT 5;
sqlite> SELECT date, val FROM eurusd_spot ORDER BY date DESC LIMIT 3;
为了激活时间表,导航到 概述 → 时间表 在DagsterUI中切换
macro_daily_at_0630_utc 现在 跑步现在,Dagster的程序将每周6:30 UTC起床,
步骤8:测试
步骤8 编写单元测试使用模拟资源
达格斯特的一个主要设计优势是,资产是普通的 Python 函数,不需要运行完整的编排堆,就很容易单元测试.将现实资源交换为模拟,以保持测试速度快和离线.
# tests/test_assets.py
import sqlite3
import tempfile
import os
from unittest.mock import MagicMock
import pytest
from dagster import build_asset_context, materialize
from fxmacro_pipeline.assets import usd_policy_rate, usd_cpi
from fxmacro_pipeline.resources import FXMacroDataResource
MOCK_RATE_DATA = [
{"date": "2025-12-18", "val": 4.5, "announcement_datetime": "2025-12-18T19:00:00Z"},
{"date": "2025-11-07", "val": 4.75, "announcement_datetime": "2025-11-07T19:00:00Z"},
]
MOCK_CPI_DATA = [
{"date": "2025-12-11", "val": 2.7, "announcement_datetime": "2025-12-11T13:30:00Z"},
{"date": "2025-11-13", "val": 2.6, "announcement_datetime": "2025-11-13T13:30:00Z"},
]
class MockFXMacroData(FXMacroDataResource):
def get_announcements(self, currency, indicator, start="2020-01-01"):
if indicator == "policy_rate":
return MOCK_RATE_DATA
if indicator == "inflation":
return MOCK_CPI_DATA
return []
def get_forex(self, base, quote, start="2023-01-01"):
return [{"date": "2025-12-31", "val": 1.0482, "announcement_datetime": None}]
def get_calendar(self, currency):
return []
@pytest.fixture(autouse=True)
def temp_db(tmp_path, monkeypatch):
monkeypatch.setenv("FXMACRO_DB", str(tmp_path / "test.db"))
def test_usd_policy_rate_materialises():
result = materialize(
[usd_policy_rate],
resources={"fxmacrodata": MockFXMacroData()},
)
assert result.success
mat = result.asset_materializations_for_node("usd_policy_rate")[0]
assert mat.metadata["row_count"].value == 2
assert mat.metadata["latest_val"].value == pytest.approx(4.5)
def test_usd_cpi_materialises():
result = materialize(
[usd_cpi],
resources={"fxmacrodata": MockFXMacroData()},
)
assert result.success
mat = result.asset_materializations_for_node("usd_cpi")[0]
assert mat.metadata["latest_val"].value == pytest.approx(2.7)
pip install pytest
pytest tests/ -v
步骤9:生产
步骤9 使用Dagster+投入生产
对于想要一个主机控制飞机而不是管理自己的小妖怪的团队, 子
管理云服务只需要一个额外的配置步骤: dagster dev
它们的读法与"客云"的读音相同. Definitions 您的基础设施中运行.
设置您的API键为Dagster+环境变量 (部署 →环境变量带着名字 FXMACRO_API_KEY.资源将自动取出它不需要代码更改.同样的模式适用于任何CI/CD环境中,将秘密注入环境变量 (GitHub Actions,GitLab CI,CircleCI).
为了在自主托管的Kubernetes集群上部署,将管道包装成Docker图像,将其推到注册表中,并将Dagster Helm图向它.FXMacroData资源连接到
fxmacrodata.com 确保您的集群的退出策略允许HTTPS到该主机.
总结:
你所建立的
通过遵循这份指南, 您现在有了一个可用的达格斯特管道:
- 定义一个可重复使用,可注射的
FXMacroDataResource这处理所有API验证和错误处理在一个地方 - 暴露五个软件定义资产美联储利率,美国CPI,欧洲央行利率和美元现货货币每一个持久到SQLite,并提供完整的Dagster元数据
- 计划在06:30 UTC通过cron驱动的Dagster时间表进行平日更新,因此在欧洲市场开放之前,您总是有最新的读数
- 当CPI打印出惊喜时,会发出网络连接警报,使下游系统具有事件驱动的触发,而不是投票循环
- 包括使用模拟资源进行快速离线单元测试,因此CI管道从未进行实时API调用