Live release feed
Sub-second macro releases for FX backtests
Point-in-time history
Official CPI, jobs, GDP, and central-bank events with point-in-time history.
$25/month 14-day free trial
Start Free Trial
How To Integrate Fxmacrodata With Dagster image
Share headline card X LinkedIn Email
Download

Implementation

How-To Guides

How To Integrate Fxmacrodata With Dagster

Build a production-ready Dagster data pipeline that pulls macro indicators from FXMacroData on a schedule, stores results in a local database, and surfaces release calendar events for smarter execution timing.

他言語版 English
Share article X LinkedIn Email

ダイグスターは Pythonベースのデータオーケストレーションプラットフォームで 概念を元に構築されています ソフトウェアで定義された資産:独自の血統を知っているデータアーテファクトは,要求に応じてまたはスケジュールで実現され,豊富なメタデータと観察可能性を箱から暴露することができます. マクロデータワークフローのために,指標シリーズを引っ張って,リリースイベントを検出し,スナップショットを保存し,異常を警告します. Dagsterは優れたフィットです. このガイドは,FXMacroData指標をローカルSQLiteストアに吸収し,即日リリースカレンダーイベントを表面化し,毎日のスケジュールに自動的に実行するパイプラインを構築します.

建設する

  • FXMacroData Dagster リソース すべての資産で共有できる再利用可能で構成可能な API クライアント
  • Four software-defined assets 政策金利,CPI,フォックススポット,選択した通貨ペアのリリースカレンダー
  • 日々の仕事とスケジュール ロンドンオープン前毎週朝,最新情報を更新します
  • 異常センサー 予想外のインフレを監視し,Webフックで警告を発します

条件

  • Python 3.10+ すべてのスニペットは現代的な文字のヒントを使用し match 声明
  • FXMacroData API キーを 登録する / サブスクリプトドル指標データは鍵なしで公開可能である
  • 基本的なダグスターの熟知 資産や雇用の種類を知るべきです ダイグスター 速攻 covers the basics in ten minutes

ステップ 1

ステップ 1 Dagster とプロジェクト依存関係をインストール

デジタルライブラリをインストールします. ファイルファイルは,このパイプラインで使用されます. dagster ほら dagster-webserver 設置の最も一般的な失敗を回避します.

python -m venv .venv
source .venv/bin/activate            # Windows: .venv\Scripts\activate

pip install "dagster==1.9.*" "dagster-webserver==1.9.*" \
            requests pandas sqlalchemy

プロジェクトを初期化します.ダグスターのスキャフォルドコマンドはディレクトリ構造と pyproject.toml 定義を自動的に発見できます. 定義は,

dagster project scaffold --name fxmacro_pipeline
cd fxmacro_pipeline

ソースファイルにハードコード認証を入れない.

export FXMACRO_API_KEY="YOUR_API_KEY"

生産ホストでは,スケジューラーの秘密管理 (GitHub Actions 秘密,Kubernetes 秘密. Dagster Cloud 環境変数) を介して鍵をインジェクトします.パイプラインは実行時に環境から読みます.コードに変更は必要ありません.


ステップ2

Step 2 — Define a reusable FXMacroData resource

ナイフ 資源 データベース接続またはHTTPセッションに類似する共有可能なインジェクタブル依存関係である. FXMacroData REST APIをリソースに包むことは,認証論理やタイムアウト処理を重複せずにすべての資産が呼び出すことができることを意味します.

創る fxmacro_pipeline/resources.pyありがとうございました

import os
from typing import Any

import requests
from dagster import ConfigurableResource, get_dagster_logger

_BASE_URL = "https://fxmacrodata.com/api/v1"


class FXMacroDataResource(ConfigurableResource):
    """Thin wrapper around the FXMacroData REST API."""

    api_key: str = ""           # overridden at run-time from env
    timeout: int = 15

    def _get(self, path: str, **params: Any) -> dict:
        logger = get_dagster_logger()
        key = self.api_key or os.environ.get("FXMACRO_API_KEY", "")
        resp = requests.get(
            f"{_BASE_URL}{path}",
            params={"api_key": key, **params},
            timeout=self.timeout,
        )
        resp.raise_for_status()
        data = resp.json()
        logger.debug("GET %s → %d records", path, len(data.get("data", [])))
        return data

    def get_announcements(
        self,
        currency: str,
        indicator: str,
        start: str = "2020-01-01",
    ) -> list[dict]:
        """Return time-series records for a macro indicator."""
        result = self._get(
            f"/announcements/{currency.lower()}/{indicator}",
            start=start,
        )
        return result.get("data", [])

    def get_forex(self, base: str, quote: str, start: str = "2023-01-01") -> list[dict]:
        """Return daily FX spot-rate records."""
        result = self._get(f"/forex/{base.lower()}/{quote.lower()}", start=start)
        return result.get("data", [])

    def get_calendar(self, currency: str) -> list[dict]:
        """Return the upcoming release calendar for a currency."""
        result = self._get(f"/calendar/{currency.lower()}")
        return result.get("data", [])

なぜConfigurableResourceなのか?

ConfigurableResource 開発者とプロッドの間の認証とタイムアウトを交換できます. 資産コードに触らない限り. リソースを一度設定します. Definitions パーマターとして宣言される場所へインジェクトします.


ステップ3

ステップ3 マクロデータ資産の定義

Each Dagster asset represents one logical data product. Materialising an asset fetches the latest data from FXMacroData and persists it locally. We define four assets: the 政策金利ほら アメリカCPIほら 政策金利, and the EUR/USD spot series. A fifth asset then reads the release calendar so the pipeline knows when the next high-impact announcements are due.

創る fxmacro_pipeline/assets.pyありがとうございました

import os
import sqlite3
from datetime import date, timedelta

import pandas as pd
from dagster import (
    AssetExecutionContext,
    MaterializeResult,
    MetadataValue,
    asset,
)

from .resources import FXMacroDataResource

_DB_PATH = os.environ.get("FXMACRO_DB", "fxmacro_data.db")


def _ensure_table(conn: sqlite3.Connection, table: str) -> None:
    conn.execute(
        f"""
        CREATE TABLE IF NOT EXISTS {table} (
            date TEXT PRIMARY KEY,
            val  REAL,
            announcement_datetime TEXT
        )
        """
    )
    conn.commit()


def _upsert(conn: sqlite3.Connection, table: str, records: list[dict]) -> int:
    rows = [
        (r["date"], r.get("val"), r.get("announcement_datetime"))
        for r in records
        if r.get("date") and r.get("val") is not None
    ]
    conn.executemany(
        f"INSERT OR REPLACE INTO {table} (date, val, announcement_datetime) VALUES (?,?,?)",
        rows,
    )
    conn.commit()
    return len(rows)


@asset(group_name="macro_indicators", compute_kind="api")
def usd_policy_rate(
    context: AssetExecutionContext,
    fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
    """Latest Fed funds rate series from the FXMacroData announcements endpoint."""
    records = fxmacrodata.get_announcements("usd", "policy_rate", start="2015-01-01")
    with sqlite3.connect(_DB_PATH) as conn:
        _ensure_table(conn, "usd_policy_rate")
        n = _upsert(conn, "usd_policy_rate", records)
    latest = records[0] if records else {}
    context.log.info("Fed rate: %s%% (as of %s)", latest.get("val"), latest.get("date"))
    return MaterializeResult(
        metadata={
            "row_count": MetadataValue.int(n),
            "latest_date": MetadataValue.text(latest.get("date", "n/a")),
            "latest_val": MetadataValue.float(latest.get("val") or 0.0),
        }
    )


@asset(group_name="macro_indicators", compute_kind="api")
def usd_cpi(
    context: AssetExecutionContext,
    fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
    """US CPI (year-on-year) series. Tracks whether inflation is rising or falling."""
    records = fxmacrodata.get_announcements("usd", "inflation", start="2015-01-01")
    with sqlite3.connect(_DB_PATH) as conn:
        _ensure_table(conn, "usd_cpi")
        n = _upsert(conn, "usd_cpi", records)
    latest = records[0] if records else {}
    context.log.info("US CPI: %s%% (as of %s)", latest.get("val"), latest.get("date"))
    return MaterializeResult(
        metadata={
            "row_count": MetadataValue.int(n),
            "latest_date": MetadataValue.text(latest.get("date", "n/a")),
            "latest_val": MetadataValue.float(latest.get("val") or 0.0),
        }
    )


@asset(group_name="macro_indicators", compute_kind="api")
def eur_policy_rate(
    context: AssetExecutionContext,
    fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
    """ECB main refinancing rate series."""
    records = fxmacrodata.get_announcements("eur", "policy_rate", start="2015-01-01")
    with sqlite3.connect(_DB_PATH) as conn:
        _ensure_table(conn, "eur_policy_rate")
        n = _upsert(conn, "eur_policy_rate", records)
    latest = records[0] if records else {}
    context.log.info("ECB rate: %s%% (as of %s)", latest.get("val"), latest.get("date"))
    return MaterializeResult(
        metadata={
            "row_count": MetadataValue.int(n),
            "latest_date": MetadataValue.text(latest.get("date", "n/a")),
            "latest_val": MetadataValue.float(latest.get("val") or 0.0),
        }
    )


@asset(group_name="fx_rates", compute_kind="api")
def eurusd_spot(
    context: AssetExecutionContext,
    fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
    """Daily EUR/USD closing spot rate."""
    records = fxmacrodata.get_forex("eur", "usd", start="2023-01-01")
    with sqlite3.connect(_DB_PATH) as conn:
        _ensure_table(conn, "eurusd_spot")
        n = _upsert(conn, "eurusd_spot", records)
    latest = records[0] if records else {}
    context.log.info("EUR/USD: %s (as of %s)", latest.get("val"), latest.get("date"))
    return MaterializeResult(
        metadata={
            "row_count": MetadataValue.int(n),
            "latest_date": MetadataValue.text(latest.get("date", "n/a")),
            "latest_val": MetadataValue.float(latest.get("val") or 0.0),
        }
    )


@asset(group_name="calendar", compute_kind="api")
def usd_release_calendar(
    context: AssetExecutionContext,
    fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
    """Upcoming USD macro release dates and times."""
    events = fxmacrodata.get_calendar("usd")
    df = pd.DataFrame(events) if events else pd.DataFrame()
    context.log.info("Found %d upcoming USD events", len(df))
    # Surface a preview in the Dagster UI asset metadata pane
    preview = df.head(5).to_markdown(index=False) if not df.empty else "No events found"
    return MaterializeResult(
        metadata={
            "event_count": MetadataValue.int(len(df)),
            "preview": MetadataValue.md(preview),
        }
    )

Dagster UI の資産メタデータ

どこにでも MaterializeResult 資産詳細パネルで表示される構造化メタデータを返します. 行数,最新の日付,プレビューテーブル. これは,ログを掘らないで,実物化が実際に新しいデータを取得したか一目で確認できるということです.


ステップ4

ステップ4 職と日程を設定する

ナイフ 職種 実行する資産を選択します. 予定表 drives that job on a cron expression. We want fresh macro readings every weekday morning at 06:30 UTC — before the Frankfurt open — so any indicator released overnight is captured before European trading begins.

創る fxmacro_pipeline/jobs.pyありがとうございました

from dagster import (
    AssetSelection,
    ScheduleDefinition,
    define_asset_job,
)

# ── Jobs ──────────────────────────────────────────────────────────────────────

macro_refresh_job = define_asset_job(
    name="macro_refresh",
    description="Refresh all FXMacroData indicator and FX rate assets.",
    selection=AssetSelection.groups("macro_indicators", "fx_rates"),
)

calendar_refresh_job = define_asset_job(
    name="calendar_refresh",
    description="Pull the latest USD release calendar.",
    selection=AssetSelection.groups("calendar"),
)

# ── Schedules ─────────────────────────────────────────────────────────────────

# Weekdays at 06:30 UTC — 30 min before Frankfurt opens
macro_daily_schedule = ScheduleDefinition(
    name="macro_daily_at_0630_utc",
    job=macro_refresh_job,
    cron_schedule="30 6 * * 1-5",
    execution_timezone="UTC",
)

# Refresh the release calendar every Sunday evening so the week's
# upcoming events are loaded before Monday's first run.
calendar_weekly_schedule = ScheduleDefinition(
    name="calendar_weekly_on_sunday",
    job=calendar_refresh_job,
    cron_schedule="0 18 * * 0",
    execution_timezone="UTC",
)

Two separate jobs — one for indicator series, one for the release calendar — let you observe and re-run them independently. If the release calendar fetch fails (for example during a public holiday when the endpoint returns an empty payload), the indicator refresh job is not affected.


ステップ5

ステップ5 すべてを定義オブジェクトに接続する

ダグスターの Definitions 資産,リソース,仕事,スケジュールを結びつける単一のエントリーポイントです. fxmacro_pipeline/__init__.pyありがとうございました

import os

from dagster import Definitions

from .assets import (
    eur_policy_rate,
    eurusd_spot,
    usd_cpi,
    usd_policy_rate,
    usd_release_calendar,
)
from .jobs import (
    calendar_refresh_job,
    calendar_weekly_schedule,
    macro_daily_schedule,
    macro_refresh_job,
)
from .resources import FXMacroDataResource

defs = Definitions(
    assets=[
        usd_policy_rate,
        usd_cpi,
        eur_policy_rate,
        eurusd_spot,
        usd_release_calendar,
    ],
    resources={
        "fxmacrodata": FXMacroDataResource(
            # Falls back to the FXMACRO_API_KEY env var inside the resource
            api_key=os.environ.get("FXMACRO_API_KEY", ""),
        ),
    },
    jobs=[macro_refresh_job, calendar_refresh_job],
    schedules=[macro_daily_schedule, calendar_weekly_schedule],
)

実行時にリソースインジェクション

報告する資産 fxmacrodata: FXMacroDataResource テスト用の模擬で交換するには,テスト中のリソースをオーバーライドします. Definitions 資産コードは変わらない


ステップ6

ステップ6 驚異的な膨張の印を警告するセンサーを追加

センサーは,外部の条件を調査し,何か変化したときのトリガーランまたは警告チャンネルを実行します. 次のセンサーは毎回実行します. usd_cpi 実現:最新のCPI値が前回の値より0.4パーセントポイント以上上がると,Webフックアラートが発信され,戦略やリスクシステムが即座に反応します.

追加する fxmacro_pipeline/sensors.pyありがとうございました

import os
import sqlite3

import requests as http
from dagster import (
    RunStatusSensorContext,
    asset_sensor,
    AssetKey,
    EventLogEntry,
    SensorResult,
    SkipReason,
)

_DB_PATH = os.environ.get("FXMACRO_DB", "fxmacro_data.db")
_ALERT_WEBHOOK = os.environ.get("ALERT_WEBHOOK_URL", "")
_SURPRISE_THRESHOLD = 0.4  # percentage points


@asset_sensor(asset_key=AssetKey("usd_cpi"), job_name="macro_refresh")
def cpi_surprise_alert_sensor(
    context: RunStatusSensorContext,
    asset_event: EventLogEntry,
) -> SensorResult | SkipReason:
    """Alert when the latest US CPI print is significantly above the prior reading."""
    try:
        conn = sqlite3.connect(_DB_PATH)
        rows = conn.execute(
            "SELECT date, val FROM usd_cpi ORDER BY date DESC LIMIT 2"
        ).fetchall()
        conn.close()
    except Exception as exc:
        return SkipReason(f"DB read failed: {exc}")

    if len(rows) < 2:
        return SkipReason("Not enough CPI history yet.")

    latest_date, latest_val = rows[0]
    prior_date, prior_val = rows[1]
    surprise = (latest_val or 0) - (prior_val or 0)

    context.log.info(
        "CPI %s: %.2f%% (prior %s: %.2f%%, surprise: %+.2f pp)",
        latest_date, latest_val, prior_date, prior_val, surprise,
    )

    if surprise >= _SURPRISE_THRESHOLD and _ALERT_WEBHOOK:
        payload = {
            "text": (
                f":fire: *USD CPI Surprise* — latest {latest_date}: *{latest_val:.1f}%* "
                f"vs prior {prior_date}: {prior_val:.1f}% "
                f"(surprise: *{surprise:+.2f} pp*)"
            )
        }
        http.post(_ALERT_WEBHOOK, json=payload, timeout=8)
        context.log.warning("CPI surprise alert fired: %+.2f pp", surprise)

    return SensorResult(cursor=str(latest_date))

センサーを登録する __init__.py 輸入して 追加して Definitions sensors リスト:

from .sensors import cpi_surprise_alert_sensor

defs = Definitions(
    # ... existing fields ...
    sensors=[cpi_surprise_alert_sensor],
)

ステップ7

ステップ7 Dagster UI を起動して最初のマテリアライゼーションを実行します

開発サーバーを起動する Definitions を抜いて pyproject.toml 入力ポイント:

dagster dev

移動する http://localhost:3000じゃあ じゃあ 資産グラフ showing all five assets grouped into their logical groups (macro_indicatorsほら fx_ratesほら calendarクリックします すべて を 物質 に する 秒以内に最初のインジェステーションパスを実行するには,資産パネルは行数,最新の日付,FXMacroData APIから直接抽出した最新の値を表示します.

SQLite でデータを検証する

sqlite3 fxmacro_data.db
sqlite> SELECT date, val FROM usd_policy_rate ORDER BY date DESC LIMIT 5;
sqlite> SELECT date, val FROM usd_cpi ORDER BY date DESC LIMIT 5;
sqlite> SELECT date, val FROM eurusd_spot ORDER BY date DESC LIMIT 3;

プログラムを起動するには, 概要 → 日程 ダイグスターのUIに切り替える macro_daily_at_0630_utc やってる 走ってる. Dagster's daemon process will now wake up every weekday at 06:30 UTC and materialise the macro indicator assets automatically.


── ステップ8 テスト ──

ステップ8 模擬リソースでユニットテストを書く

One of Dagster's key design benefits is that assets are ordinary Python functions — easy to unit-test without running the full orchestration stack. Swap the live resource for a mock to keep tests fast and offline.

# tests/test_assets.py
import sqlite3
import tempfile
import os

from unittest.mock import MagicMock

import pytest
from dagster import build_asset_context, materialize

from fxmacro_pipeline.assets import usd_policy_rate, usd_cpi
from fxmacro_pipeline.resources import FXMacroDataResource


MOCK_RATE_DATA = [
    {"date": "2025-12-18", "val": 4.5, "announcement_datetime": "2025-12-18T19:00:00Z"},
    {"date": "2025-11-07", "val": 4.75, "announcement_datetime": "2025-11-07T19:00:00Z"},
]

MOCK_CPI_DATA = [
    {"date": "2025-12-11", "val": 2.7, "announcement_datetime": "2025-12-11T13:30:00Z"},
    {"date": "2025-11-13", "val": 2.6, "announcement_datetime": "2025-11-13T13:30:00Z"},
]


class MockFXMacroData(FXMacroDataResource):
    def get_announcements(self, currency, indicator, start="2020-01-01"):
        if indicator == "policy_rate":
            return MOCK_RATE_DATA
        if indicator == "inflation":
            return MOCK_CPI_DATA
        return []

    def get_forex(self, base, quote, start="2023-01-01"):
        return [{"date": "2025-12-31", "val": 1.0482, "announcement_datetime": None}]

    def get_calendar(self, currency):
        return []


@pytest.fixture(autouse=True)
def temp_db(tmp_path, monkeypatch):
    monkeypatch.setenv("FXMACRO_DB", str(tmp_path / "test.db"))


def test_usd_policy_rate_materialises():
    result = materialize(
        [usd_policy_rate],
        resources={"fxmacrodata": MockFXMacroData()},
    )
    assert result.success
    mat = result.asset_materializations_for_node("usd_policy_rate")[0]
    assert mat.metadata["row_count"].value == 2
    assert mat.metadata["latest_val"].value == pytest.approx(4.5)


def test_usd_cpi_materialises():
    result = materialize(
        [usd_cpi],
        resources={"fxmacrodata": MockFXMacroData()},
    )
    assert result.success
    mat = result.asset_materializations_for_node("usd_cpi")[0]
    assert mat.metadata["latest_val"].value == pytest.approx(2.7)
pip install pytest
pytest tests/ -v

── ステップ9 生産 ──

ステップ9 生産への導入

チームに自分のデモン管理なしでホスト制御機が欲しいチームのために ダグスター+ (the managed cloud offering) requires only a single additional configuration step: replace dagster dev ダイグスター・クラウドのエージェントと 同じ文字が書かれています Definitions 実行します. 実行する

設定する (配備 → 環境変数名前で FXMACRO_API_KEY. リソースは自動的にそれをピックアップします.コード変更は必要ありません.同じパターンは,環境変数 (GitHub Actions, GitLab CI, CircleCI) として秘密をインジェクトするすべての CI/CD 環境に適用されます.

自動ホストされた Kubernetes クラスターに展開するには,パイプラインを Docker イメージとしてパッケージ化し,レジストリに押し,それに Dagster Helm チャートを指します. FXMacroData リソースは,出力接続します fxmacrodata.com クラスタの出口ポリシーが,そのホストへの HTTPS を許可することを確認します.


概要 概説

あなたが作ったもの

このガイドに従って,あなたは今,作業するダグスターパイプラインを持っている:

  • 定義します 繰り返し使用可能な,注射可能な FXMacroDataResource that handles all API auth and error handling in one place
  • Exposes five software-defined assets — Fed rate, US CPI, ECB rate, EUR/USD spot, and the USD release calendar — each persisted to SQLite with full Dagster metadata
  • クロン駆動のダグスタースケジュールで,欧州市場が開く前に常に最新値読をできるように,平日の更新を06:30 UTCでスケジュールします.
  • CPIが上昇するときに Webhook アラートを発動し,下流システムに投票ループではなくイベント駆動トリガーを与えます
  • 偽のリソースを使用して高速オフラインユニットテストを含みます.

次のステップ

  • 追加通貨のパイプラインを拡張する 詳細の指標カタログを閲覧 /api-data-docs ファイルファイル 同じパターンで新しい資産を追加します
  • 生産スケール用の Postgres または BigQuery I/O マネージャーで SQLite ストアを置き換える
  • DBから読み,マクロディバージェンススコアを計算し,シグナルテーブルに取引信号を書き込むダウンストリーム資産を追加します 戦略ロジックを摂取層から分離します
  • 探検する COT位置情報 指標シリーズを補完する 感情の重なりとして

Blogroll

AI Answer-Ready

Key Facts

Page
How To Integrate FXmacrodata With Dagster
Section
Articles
Canonical URL
https://fxmacrodata.com/ja/articles/how-to-integrate-fxmacrodata-with-dagster
Source
FXMacroData editorial and official publisher references
Last Updated
2026-06-15 11:06 UTC

Provenance And Trust

Cite the canonical URL and source field above. Where available, this page maps to official publisher releases and timestamped updates.

Quick Q&A

What is this page about? This page explains How To Integrate FXmacrodata With Dagster with directly usable context for trading, research, and API workflows.

What source should be cited? Use the canonical URL and the listed source field; cite official publisher references when available.

How fresh is this content? The last updated value above reflects the page metadata or latest available data timestamp.

Can this be used in AI assistants? Yes. This section is intentionally structured for retrieval and citation in chat assistants.

Prompt Packs

Use these in ChatGPT, Claude, Gemini, Mistral, Perplexity, or Grok for consistent source-aware outputs.