如何将FXMacroData与Dagster集成 banner image

Implementation

How-To Guides

如何将FXMacroData与Dagster集成

建立一个生产准备的 Dagster 数据管道,从 FXMacroData 按时间表抽取宏观指标,将结果存储在本地数据库中,并发布日历事件以实现更智能执行时间.

其他语言版本 English

达格斯特是一个基于Python的数据编排平台, 软件定义资产: 知道自己的血统的数据文物,可以按需求或时间表实现,并将丰富的元数据和可观察性暴露在盒子外.对于宏观数据工作流程,拉动指标系列,检测发布事件,存储快照,并警告异常情况.

你将要建造什么

  • 一个FXMacroData Dagster资源 可重复使用,可配置的API客户端,共享在所有资产
  • 四个软件定义资产 政策利率,CPI,外汇现货,以及所选货币对的发布日程
  • 每日工作和时间表 每周早上在伦敦开幕前运行,以更新最新的读数
  • 一个异常感应器 监视意外的通胀,并通过网络连接发出警报

预先要求

  • Python 3.10+ 所有片段使用现代类型提示和 match 声明
  • 汇率数据API键 登记在 订阅美元指标数据是公开访问的,没有密钥
  • 基本的士特熟悉 应该知道资产和工作是什么; 子快速启动 基本知识在10分钟内

现在我们要做什么?

步骤1 安装Dagster和项目依赖

创建一个新的虚拟环境,并安装Dagster,并使用这个管道的小图书馆. dagster 现在我 dagster-webserver 避免了最常见的安装陷.

python -m venv .venv
source .venv/bin/activate            # Windows: .venv\Scripts\activate

pip install "dagster==1.9.*" "dagster-webserver==1.9.*" \
            requests pandas sqlalchemy

启动一个最小的项目布局. Dagster的脚手架命令创建目录结构和一个 pyproject.toml 让本地用户界面自动发现你的定义.

dagster project scaffold --name fxmacro_pipeline
cd fxmacro_pipeline

存储您的API密钥作为环境变量. 永远不要在源文件中硬代码凭证.

export FXMACRO_API_KEY="YOUR_API_KEY"

在生产主机上,通过您的调度器的秘密管理 (GitHub 操作秘密,Kubernetes 秘密,Dagster Cloud 环境变量) 插入密钥.管道在运行时从环境中读取它. 无需对代码进行更改.


现在我们要做什么?

步骤2 定义可重复使用的FXMacroData资源

一个子. 资源 是一个可注入的共享依赖性,类似于数据库连接或HTTP会话. 将FXMacroData REST API包装在资源中意味着每个资产都可以调用它,而不需要重复身份验证逻辑或时间止步处理.

创造 fxmacro_pipeline/resources.py没有人知道.

import os
from typing import Any

import requests
from dagster import ConfigurableResource, get_dagster_logger

_BASE_URL = "https://fxmacrodata.com/api/v1"


class FXMacroDataResource(ConfigurableResource):
    """Thin wrapper around the FXMacroData REST API."""

    api_key: str = ""           # overridden at run-time from env
    timeout: int = 15

    def _get(self, path: str, **params: Any) -> dict:
        logger = get_dagster_logger()
        key = self.api_key or os.environ.get("FXMACRO_API_KEY", "")
        resp = requests.get(
            f"{_BASE_URL}{path}",
            params={"api_key": key, **params},
            timeout=self.timeout,
        )
        resp.raise_for_status()
        data = resp.json()
        logger.debug("GET %s → %d records", path, len(data.get("data", [])))
        return data

    def get_announcements(
        self,
        currency: str,
        indicator: str,
        start: str = "2020-01-01",
    ) -> list[dict]:
        """Return time-series records for a macro indicator."""
        result = self._get(
            f"/announcements/{currency.lower()}/{indicator}",
            start=start,
        )
        return result.get("data", [])

    def get_forex(self, base: str, quote: str, start: str = "2023-01-01") -> list[dict]:
        """Return daily FX spot-rate records."""
        result = self._get(f"/forex/{base.lower()}/{quote.lower()}", start=start)
        return result.get("data", [])

    def get_calendar(self, currency: str) -> list[dict]:
        """Return the upcoming release calendar for a currency."""
        result = self._get(f"/calendar/{currency.lower()}")
        return result.get("data", [])

为什么要配置资源?

ConfigurableResource 让你在开发和制作之间交换凭证和时间止息,而不需要触摸资产代码. Definitions 并且Dagster将其注入任何被声明为参数的地方.


现在我们要做什么?

步骤3 定义宏观数据资产

每个Dagster资产都代表一个逻辑数据产品. 实现资产将从FXMacroData获取最新数据并将其局部保留. 我们定义四个资产: 美联储政策利率没有 美国CPI没有 欧洲央行政策利率五个资产读取发布日历,以便管道知道下一个影响力很大的公告何时到期.

创造 fxmacro_pipeline/assets.py没有人知道.

import os
import sqlite3
from datetime import date, timedelta

import pandas as pd
from dagster import (
    AssetExecutionContext,
    MaterializeResult,
    MetadataValue,
    asset,
)

from .resources import FXMacroDataResource

_DB_PATH = os.environ.get("FXMACRO_DB", "fxmacro_data.db")


def _ensure_table(conn: sqlite3.Connection, table: str) -> None:
    conn.execute(
        f"""
        CREATE TABLE IF NOT EXISTS {table} (
            date TEXT PRIMARY KEY,
            val  REAL,
            announcement_datetime TEXT
        )
        """
    )
    conn.commit()


def _upsert(conn: sqlite3.Connection, table: str, records: list[dict]) -> int:
    rows = [
        (r["date"], r.get("val"), r.get("announcement_datetime"))
        for r in records
        if r.get("date") and r.get("val") is not None
    ]
    conn.executemany(
        f"INSERT OR REPLACE INTO {table} (date, val, announcement_datetime) VALUES (?,?,?)",
        rows,
    )
    conn.commit()
    return len(rows)


@asset(group_name="macro_indicators", compute_kind="api")
def usd_policy_rate(
    context: AssetExecutionContext,
    fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
    """Latest Fed funds rate series from the FXMacroData announcements endpoint."""
    records = fxmacrodata.get_announcements("usd", "policy_rate", start="2015-01-01")
    with sqlite3.connect(_DB_PATH) as conn:
        _ensure_table(conn, "usd_policy_rate")
        n = _upsert(conn, "usd_policy_rate", records)
    latest = records[0] if records else {}
    context.log.info("Fed rate: %s%% (as of %s)", latest.get("val"), latest.get("date"))
    return MaterializeResult(
        metadata={
            "row_count": MetadataValue.int(n),
            "latest_date": MetadataValue.text(latest.get("date", "n/a")),
            "latest_val": MetadataValue.float(latest.get("val") or 0.0),
        }
    )


@asset(group_name="macro_indicators", compute_kind="api")
def usd_cpi(
    context: AssetExecutionContext,
    fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
    """US CPI (year-on-year) series. Tracks whether inflation is rising or falling."""
    records = fxmacrodata.get_announcements("usd", "inflation", start="2015-01-01")
    with sqlite3.connect(_DB_PATH) as conn:
        _ensure_table(conn, "usd_cpi")
        n = _upsert(conn, "usd_cpi", records)
    latest = records[0] if records else {}
    context.log.info("US CPI: %s%% (as of %s)", latest.get("val"), latest.get("date"))
    return MaterializeResult(
        metadata={
            "row_count": MetadataValue.int(n),
            "latest_date": MetadataValue.text(latest.get("date", "n/a")),
            "latest_val": MetadataValue.float(latest.get("val") or 0.0),
        }
    )


@asset(group_name="macro_indicators", compute_kind="api")
def eur_policy_rate(
    context: AssetExecutionContext,
    fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
    """ECB main refinancing rate series."""
    records = fxmacrodata.get_announcements("eur", "policy_rate", start="2015-01-01")
    with sqlite3.connect(_DB_PATH) as conn:
        _ensure_table(conn, "eur_policy_rate")
        n = _upsert(conn, "eur_policy_rate", records)
    latest = records[0] if records else {}
    context.log.info("ECB rate: %s%% (as of %s)", latest.get("val"), latest.get("date"))
    return MaterializeResult(
        metadata={
            "row_count": MetadataValue.int(n),
            "latest_date": MetadataValue.text(latest.get("date", "n/a")),
            "latest_val": MetadataValue.float(latest.get("val") or 0.0),
        }
    )


@asset(group_name="fx_rates", compute_kind="api")
def eurusd_spot(
    context: AssetExecutionContext,
    fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
    """Daily EUR/USD closing spot rate."""
    records = fxmacrodata.get_forex("eur", "usd", start="2023-01-01")
    with sqlite3.connect(_DB_PATH) as conn:
        _ensure_table(conn, "eurusd_spot")
        n = _upsert(conn, "eurusd_spot", records)
    latest = records[0] if records else {}
    context.log.info("EUR/USD: %s (as of %s)", latest.get("val"), latest.get("date"))
    return MaterializeResult(
        metadata={
            "row_count": MetadataValue.int(n),
            "latest_date": MetadataValue.text(latest.get("date", "n/a")),
            "latest_val": MetadataValue.float(latest.get("val") or 0.0),
        }
    )


@asset(group_name="calendar", compute_kind="api")
def usd_release_calendar(
    context: AssetExecutionContext,
    fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
    """Upcoming USD macro release dates and times."""
    events = fxmacrodata.get_calendar("usd")
    df = pd.DataFrame(events) if events else pd.DataFrame()
    context.log.info("Found %d upcoming USD events", len(df))
    # Surface a preview in the Dagster UI asset metadata pane
    preview = df.head(5).to_markdown(index=False) if not df.empty else "No events found"
    return MaterializeResult(
        metadata={
            "event_count": MetadataValue.int(len(df)),
            "preview": MetadataValue.md(preview),
        }
    )

在 Dagster UI 中的资产元数据

每一个 MaterializeResult 返回Dagster在资产详细窗口行数,最新日期,预览表中呈现的结构化元数据.这意味着您可以一眼查看是否实际上得到了新数据,而无需挖掘日志.


现在我们要做什么?

步骤4 建立一个工作和日程

一个子. 工作 选择哪些资产在一次运行中实现. 时间表 我们希望每周早上06:30 UTC前的新宏观读数,

创造 fxmacro_pipeline/jobs.py没有人知道.

from dagster import (
    AssetSelection,
    ScheduleDefinition,
    define_asset_job,
)

# ── Jobs ──────────────────────────────────────────────────────────────────────

macro_refresh_job = define_asset_job(
    name="macro_refresh",
    description="Refresh all FXMacroData indicator and FX rate assets.",
    selection=AssetSelection.groups("macro_indicators", "fx_rates"),
)

calendar_refresh_job = define_asset_job(
    name="calendar_refresh",
    description="Pull the latest USD release calendar.",
    selection=AssetSelection.groups("calendar"),
)

# ── Schedules ─────────────────────────────────────────────────────────────────

# Weekdays at 06:30 UTC — 30 min before Frankfurt opens
macro_daily_schedule = ScheduleDefinition(
    name="macro_daily_at_0630_utc",
    job=macro_refresh_job,
    cron_schedule="30 6 * * 1-5",
    execution_timezone="UTC",
)

# Refresh the release calendar every Sunday evening so the week's
# upcoming events are loaded before Monday's first run.
calendar_weekly_schedule = ScheduleDefinition(
    name="calendar_weekly_on_sunday",
    job=calendar_refresh_job,
    cron_schedule="0 18 * * 0",
    execution_timezone="UTC",
)

两个独立的工作一个用于指标系列,一个用于发布日历让您独立观察并重新运行它们. 如果发布日程检索失败 (例如在公共假期期间,终端返回空负载时),则指标更新工作不会受到影响.


五步 没有

步骤5 将所有内容都连接到定义对象中

子的 Definitions 目标是将资产,资源,工作和时间表联系在一起的单一入口点. fxmacro_pipeline/__init__.py没有人知道.

import os

from dagster import Definitions

from .assets import (
    eur_policy_rate,
    eurusd_spot,
    usd_cpi,
    usd_policy_rate,
    usd_release_calendar,
)
from .jobs import (
    calendar_refresh_job,
    calendar_weekly_schedule,
    macro_daily_schedule,
    macro_refresh_job,
)
from .resources import FXMacroDataResource

defs = Definitions(
    assets=[
        usd_policy_rate,
        usd_cpi,
        eur_policy_rate,
        eurusd_spot,
        usd_release_calendar,
    ],
    resources={
        "fxmacrodata": FXMacroDataResource(
            # Falls back to the FXMACRO_API_KEY env var inside the resource
            api_key=os.environ.get("FXMACRO_API_KEY", ""),
        ),
    },
    jobs=[macro_refresh_job, calendar_refresh_job],
    schedules=[macro_daily_schedule, calendar_weekly_schedule],
)

在运行时的资源注入

任何一个资产, fxmacrodata: FXMacroDataResource 作为参数,在运行时自动接收配置的实例. 为了在模拟中进行测试,在测试中覆盖资源 Definitions 资产代码没有变化.


现在我们要做什么?

步骤6 添加传感器,提醒您出现意外的通胀痕迹

传感器在Dagster调查外部条件和触发运行或警报频道时,有些变化. usd_cpi 实际化:如果最新的CPI读数高于之前的0.4个百分点,则会发出网络连接警报,以便您的策略或风险系统可以立即反应.

加入 fxmacro_pipeline/sensors.py没有人知道.

import os
import sqlite3

import requests as http
from dagster import (
    RunStatusSensorContext,
    asset_sensor,
    AssetKey,
    EventLogEntry,
    SensorResult,
    SkipReason,
)

_DB_PATH = os.environ.get("FXMACRO_DB", "fxmacro_data.db")
_ALERT_WEBHOOK = os.environ.get("ALERT_WEBHOOK_URL", "")
_SURPRISE_THRESHOLD = 0.4  # percentage points


@asset_sensor(asset_key=AssetKey("usd_cpi"), job_name="macro_refresh")
def cpi_surprise_alert_sensor(
    context: RunStatusSensorContext,
    asset_event: EventLogEntry,
) -> SensorResult | SkipReason:
    """Alert when the latest US CPI print is significantly above the prior reading."""
    try:
        conn = sqlite3.connect(_DB_PATH)
        rows = conn.execute(
            "SELECT date, val FROM usd_cpi ORDER BY date DESC LIMIT 2"
        ).fetchall()
        conn.close()
    except Exception as exc:
        return SkipReason(f"DB read failed: {exc}")

    if len(rows) < 2:
        return SkipReason("Not enough CPI history yet.")

    latest_date, latest_val = rows[0]
    prior_date, prior_val = rows[1]
    surprise = (latest_val or 0) - (prior_val or 0)

    context.log.info(
        "CPI %s: %.2f%% (prior %s: %.2f%%, surprise: %+.2f pp)",
        latest_date, latest_val, prior_date, prior_val, surprise,
    )

    if surprise >= _SURPRISE_THRESHOLD and _ALERT_WEBHOOK:
        payload = {
            "text": (
                f":fire: *USD CPI Surprise* — latest {latest_date}: *{latest_val:.1f}%* "
                f"vs prior {prior_date}: {prior_val:.1f}% "
                f"(surprise: *{surprise:+.2f} pp*)"
            )
        }
        http.post(_ALERT_WEBHOOK, json=payload, timeout=8)
        context.log.warning("CPI surprise alert fired: %+.2f pp", surprise)

    return SensorResult(cursor=str(latest_date))

登录传感器 __init__.py 通过导入并添加到 Definitions sensors 列表:

from .sensors import cpi_surprise_alert_sensor

defs = Definitions(
    # ... existing fields ...
    sensors=[cpi_surprise_alert_sensor],
)

现在我们要做什么?

步骤7 启动Dagster UI并运行您的第一个实现

启动本地开发服务器. Definitions 通过 pyproject.toml 进入点:

dagster dev

导航到 http://localhost:3000你会看到 资产图 显示所有五个资产按逻辑组合 (macro_indicators没有人知道. fx_rates没有人知道. calendar点击 让一切物质化 为了在几秒内运行第一个吞通行,资产窗口将显示行数,最新日期和最新值直接从FXMacroData API中提取.

在SQLite中验证数据

sqlite3 fxmacro_data.db
sqlite> SELECT date, val FROM usd_policy_rate ORDER BY date DESC LIMIT 5;
sqlite> SELECT date, val FROM usd_cpi ORDER BY date DESC LIMIT 5;
sqlite> SELECT date, val FROM eurusd_spot ORDER BY date DESC LIMIT 3;

为了激活时间表,导航到 概述 → 时间表 在DagsterUI中切换 macro_daily_at_0630_utc 现在 跑步现在,Dagster的程序将每周6:30 UTC起床,


步骤8:测试

步骤8 编写单元测试使用模拟资源

达格斯特的一个主要设计优势是,资产是普通的 Python 函数,不需要运行完整的编排堆,就很容易单元测试.将现实资源交换为模拟,以保持测试速度快和离线.

# tests/test_assets.py
import sqlite3
import tempfile
import os

from unittest.mock import MagicMock

import pytest
from dagster import build_asset_context, materialize

from fxmacro_pipeline.assets import usd_policy_rate, usd_cpi
from fxmacro_pipeline.resources import FXMacroDataResource


MOCK_RATE_DATA = [
    {"date": "2025-12-18", "val": 4.5, "announcement_datetime": "2025-12-18T19:00:00Z"},
    {"date": "2025-11-07", "val": 4.75, "announcement_datetime": "2025-11-07T19:00:00Z"},
]

MOCK_CPI_DATA = [
    {"date": "2025-12-11", "val": 2.7, "announcement_datetime": "2025-12-11T13:30:00Z"},
    {"date": "2025-11-13", "val": 2.6, "announcement_datetime": "2025-11-13T13:30:00Z"},
]


class MockFXMacroData(FXMacroDataResource):
    def get_announcements(self, currency, indicator, start="2020-01-01"):
        if indicator == "policy_rate":
            return MOCK_RATE_DATA
        if indicator == "inflation":
            return MOCK_CPI_DATA
        return []

    def get_forex(self, base, quote, start="2023-01-01"):
        return [{"date": "2025-12-31", "val": 1.0482, "announcement_datetime": None}]

    def get_calendar(self, currency):
        return []


@pytest.fixture(autouse=True)
def temp_db(tmp_path, monkeypatch):
    monkeypatch.setenv("FXMACRO_DB", str(tmp_path / "test.db"))


def test_usd_policy_rate_materialises():
    result = materialize(
        [usd_policy_rate],
        resources={"fxmacrodata": MockFXMacroData()},
    )
    assert result.success
    mat = result.asset_materializations_for_node("usd_policy_rate")[0]
    assert mat.metadata["row_count"].value == 2
    assert mat.metadata["latest_val"].value == pytest.approx(4.5)


def test_usd_cpi_materialises():
    result = materialize(
        [usd_cpi],
        resources={"fxmacrodata": MockFXMacroData()},
    )
    assert result.success
    mat = result.asset_materializations_for_node("usd_cpi")[0]
    assert mat.metadata["latest_val"].value == pytest.approx(2.7)
pip install pytest
pytest tests/ -v

步骤9:生产

步骤9 使用Dagster+投入生产

对于想要一个主机控制飞机而不是管理自己的小妖怪的团队, 管理云服务只需要一个额外的配置步骤: dagster dev 它们的读法与"客云"的读音相同. Definitions 您的基础设施中运行.

设置您的API键为Dagster+环境变量 (部署 →环境变量带着名字 FXMACRO_API_KEY.资源将自动取出它不需要代码更改.同样的模式适用于任何CI/CD环境中,将秘密注入环境变量 (GitHub Actions,GitLab CI,CircleCI).

为了在自主托管的Kubernetes集群上部署,将管道包装成Docker图像,将其推到注册表中,并将Dagster Helm图向它.FXMacroData资源连接到 fxmacrodata.com 确保您的集群的退出策略允许HTTPS到该主机.


总结:

你所建立的

通过遵循这份指南, 您现在有了一个可用的达格斯特管道:

  • 定义一个可重复使用,可注射的 FXMacroDataResource 这处理所有API验证和错误处理在一个地方
  • 暴露五个软件定义资产美联储利率,美国CPI,欧洲央行利率和美元现货货币每一个持久到SQLite,并提供完整的Dagster元数据
  • 计划在06:30 UTC通过cron驱动的Dagster时间表进行平日更新,因此在欧洲市场开放之前,您总是有最新的读数
  • 当CPI打印出惊喜时,会发出网络连接警报,使下游系统具有事件驱动的触发,而不是投票循环
  • 包括使用模拟资源进行快速离线单元测试,因此CI管道从未进行实时API调用

下一步

  • 扩大配资线以增加更多货币浏览指标全目录 没有任何信息. 并且按照相同的模式添加新的资产
  • 替换SQLite存储器使用Postgres或BigQuery I/O管理器进行生产规模
  • 添加一个下游资产从DB阅读,计算宏观分歧分数,并写交易信号到信号表保持你的战略逻辑与摄入层分开
  • 探索 机场位置数据 作为一种情绪覆盖,以补充指标系列

AI Answer-Ready

Key Facts

Page
How To Integrate FXmacrodata With Dagster
Section
Articles
Canonical URL
https://fxmacrodata.com/articles/how-to-integrate-fxmacrodata-with-dagster
Source
FXMacroData editorial and official publisher references
Last Updated
2026-04-22 12:36 UTC

Provenance And Trust

Cite the canonical URL and source field above. Where available, this page maps to official publisher releases and timestamped updates.

Quick Q&A

What is this page about? This page explains How To Integrate FXmacrodata With Dagster with directly usable context for trading, research, and API workflows.

What source should be cited? Use the canonical URL and the listed source field; cite official publisher references when available.

How fresh is this content? The last updated value above reflects the page metadata or latest available data timestamp.

Can this be used in AI assistants? Yes. This section is intentionally structured for retrieval and citation in chat assistants.

Prompt Packs

Use these in ChatGPT, Claude, Gemini, Mistral, Perplexity, or Grok for consistent source-aware outputs.

Blogroll