ダイグスターは Pythonベースのデータオーケストレーションプラットフォームで 概念を元に構築されています ソフトウェアで定義された資産:独自の血統を知っているデータアーテファクトは,要求に応じてまたはスケジュールで実現され,豊富なメタデータと観察可能性を箱から暴露することができます. マクロデータワークフローのために,指標シリーズを引っ張って,リリースイベントを検出し,スナップショットを保存し,異常を警告します. Dagsterは優れたフィットです. このガイドは,FXMacroData指標をローカルSQLiteストアに吸収し,即日リリースカレンダーイベントを表面化し,毎日のスケジュールに自動的に実行するパイプラインを構築します.
建設する
- FXMacroData Dagster リソース すべての資産で共有できる再利用可能で構成可能な API クライアント
- Four software-defined assets 政策金利,CPI,フォックススポット,選択した通貨ペアのリリースカレンダー
- 日々の仕事とスケジュール ロンドンオープン前毎週朝,最新情報を更新します
- 異常センサー 予想外のインフレを監視し,Webフックで警告を発します
条件
ステップ 1
ステップ 1 Dagster とプロジェクト依存関係をインストール
デジタルライブラリをインストールします. ファイルファイルは,このパイプラインで使用されます. dagster ほら dagster-webserver 設置の最も一般的な失敗を回避します.
python -m venv .venv
source .venv/bin/activate # Windows: .venv\Scripts\activate
pip install "dagster==1.9.*" "dagster-webserver==1.9.*" \
requests pandas sqlalchemy
プロジェクトを初期化します.ダグスターのスキャフォルドコマンドはディレクトリ構造と
pyproject.toml 定義を自動的に発見できます. 定義は,
dagster project scaffold --name fxmacro_pipeline
cd fxmacro_pipeline
ソースファイルにハードコード認証を入れない.
export FXMACRO_API_KEY="YOUR_API_KEY"
生産ホストでは,スケジューラーの秘密管理 (GitHub Actions 秘密,Kubernetes 秘密. Dagster Cloud 環境変数) を介して鍵をインジェクトします.パイプラインは実行時に環境から読みます.コードに変更は必要ありません.
ステップ2
Step 2 — Define a reusable FXMacroData resource
ナイフ 資源 データベース接続またはHTTPセッションに類似する共有可能なインジェクタブル依存関係である. FXMacroData REST APIをリソースに包むことは,認証論理やタイムアウト処理を重複せずにすべての資産が呼び出すことができることを意味します.
創る fxmacro_pipeline/resources.pyありがとうございました
import os
from typing import Any
import requests
from dagster import ConfigurableResource, get_dagster_logger
_BASE_URL = "https://fxmacrodata.com/api/v1"
class FXMacroDataResource(ConfigurableResource):
"""Thin wrapper around the FXMacroData REST API."""
api_key: str = "" # overridden at run-time from env
timeout: int = 15
def _get(self, path: str, **params: Any) -> dict:
logger = get_dagster_logger()
key = self.api_key or os.environ.get("FXMACRO_API_KEY", "")
resp = requests.get(
f"{_BASE_URL}{path}",
params={"api_key": key, **params},
timeout=self.timeout,
)
resp.raise_for_status()
data = resp.json()
logger.debug("GET %s → %d records", path, len(data.get("data", [])))
return data
def get_announcements(
self,
currency: str,
indicator: str,
start: str = "2020-01-01",
) -> list[dict]:
"""Return time-series records for a macro indicator."""
result = self._get(
f"/announcements/{currency.lower()}/{indicator}",
start=start,
)
return result.get("data", [])
def get_forex(self, base: str, quote: str, start: str = "2023-01-01") -> list[dict]:
"""Return daily FX spot-rate records."""
result = self._get(f"/forex/{base.lower()}/{quote.lower()}", start=start)
return result.get("data", [])
def get_calendar(self, currency: str) -> list[dict]:
"""Return the upcoming release calendar for a currency."""
result = self._get(f"/calendar/{currency.lower()}")
return result.get("data", [])
なぜConfigurableResourceなのか?
ConfigurableResource 開発者とプロッドの間の認証とタイムアウトを交換できます. 資産コードに触らない限り. リソースを一度設定します. Definitions パーマターとして宣言される場所へインジェクトします.
ステップ3
ステップ3 マクロデータ資産の定義
Each Dagster asset represents one logical data product. Materialising an asset fetches the latest data from FXMacroData and persists it locally. We define four assets: the 政策金利ほら アメリカCPIほら 政策金利, and the EUR/USD spot series. A fifth asset then reads the release calendar so the pipeline knows when the next high-impact announcements are due.
創る fxmacro_pipeline/assets.pyありがとうございました
import os
import sqlite3
from datetime import date, timedelta
import pandas as pd
from dagster import (
AssetExecutionContext,
MaterializeResult,
MetadataValue,
asset,
)
from .resources import FXMacroDataResource
_DB_PATH = os.environ.get("FXMACRO_DB", "fxmacro_data.db")
def _ensure_table(conn: sqlite3.Connection, table: str) -> None:
conn.execute(
f"""
CREATE TABLE IF NOT EXISTS {table} (
date TEXT PRIMARY KEY,
val REAL,
announcement_datetime TEXT
)
"""
)
conn.commit()
def _upsert(conn: sqlite3.Connection, table: str, records: list[dict]) -> int:
rows = [
(r["date"], r.get("val"), r.get("announcement_datetime"))
for r in records
if r.get("date") and r.get("val") is not None
]
conn.executemany(
f"INSERT OR REPLACE INTO {table} (date, val, announcement_datetime) VALUES (?,?,?)",
rows,
)
conn.commit()
return len(rows)
@asset(group_name="macro_indicators", compute_kind="api")
def usd_policy_rate(
context: AssetExecutionContext,
fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
"""Latest Fed funds rate series from the FXMacroData announcements endpoint."""
records = fxmacrodata.get_announcements("usd", "policy_rate", start="2015-01-01")
with sqlite3.connect(_DB_PATH) as conn:
_ensure_table(conn, "usd_policy_rate")
n = _upsert(conn, "usd_policy_rate", records)
latest = records[0] if records else {}
context.log.info("Fed rate: %s%% (as of %s)", latest.get("val"), latest.get("date"))
return MaterializeResult(
metadata={
"row_count": MetadataValue.int(n),
"latest_date": MetadataValue.text(latest.get("date", "n/a")),
"latest_val": MetadataValue.float(latest.get("val") or 0.0),
}
)
@asset(group_name="macro_indicators", compute_kind="api")
def usd_cpi(
context: AssetExecutionContext,
fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
"""US CPI (year-on-year) series. Tracks whether inflation is rising or falling."""
records = fxmacrodata.get_announcements("usd", "inflation", start="2015-01-01")
with sqlite3.connect(_DB_PATH) as conn:
_ensure_table(conn, "usd_cpi")
n = _upsert(conn, "usd_cpi", records)
latest = records[0] if records else {}
context.log.info("US CPI: %s%% (as of %s)", latest.get("val"), latest.get("date"))
return MaterializeResult(
metadata={
"row_count": MetadataValue.int(n),
"latest_date": MetadataValue.text(latest.get("date", "n/a")),
"latest_val": MetadataValue.float(latest.get("val") or 0.0),
}
)
@asset(group_name="macro_indicators", compute_kind="api")
def eur_policy_rate(
context: AssetExecutionContext,
fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
"""ECB main refinancing rate series."""
records = fxmacrodata.get_announcements("eur", "policy_rate", start="2015-01-01")
with sqlite3.connect(_DB_PATH) as conn:
_ensure_table(conn, "eur_policy_rate")
n = _upsert(conn, "eur_policy_rate", records)
latest = records[0] if records else {}
context.log.info("ECB rate: %s%% (as of %s)", latest.get("val"), latest.get("date"))
return MaterializeResult(
metadata={
"row_count": MetadataValue.int(n),
"latest_date": MetadataValue.text(latest.get("date", "n/a")),
"latest_val": MetadataValue.float(latest.get("val") or 0.0),
}
)
@asset(group_name="fx_rates", compute_kind="api")
def eurusd_spot(
context: AssetExecutionContext,
fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
"""Daily EUR/USD closing spot rate."""
records = fxmacrodata.get_forex("eur", "usd", start="2023-01-01")
with sqlite3.connect(_DB_PATH) as conn:
_ensure_table(conn, "eurusd_spot")
n = _upsert(conn, "eurusd_spot", records)
latest = records[0] if records else {}
context.log.info("EUR/USD: %s (as of %s)", latest.get("val"), latest.get("date"))
return MaterializeResult(
metadata={
"row_count": MetadataValue.int(n),
"latest_date": MetadataValue.text(latest.get("date", "n/a")),
"latest_val": MetadataValue.float(latest.get("val") or 0.0),
}
)
@asset(group_name="calendar", compute_kind="api")
def usd_release_calendar(
context: AssetExecutionContext,
fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
"""Upcoming USD macro release dates and times."""
events = fxmacrodata.get_calendar("usd")
df = pd.DataFrame(events) if events else pd.DataFrame()
context.log.info("Found %d upcoming USD events", len(df))
# Surface a preview in the Dagster UI asset metadata pane
preview = df.head(5).to_markdown(index=False) if not df.empty else "No events found"
return MaterializeResult(
metadata={
"event_count": MetadataValue.int(len(df)),
"preview": MetadataValue.md(preview),
}
)
Dagster UI の資産メタデータ
どこにでも MaterializeResult 資産詳細パネルで表示される構造化メタデータを返します. 行数,最新の日付,プレビューテーブル. これは,ログを掘らないで,実物化が実際に新しいデータを取得したか一目で確認できるということです.
ステップ4
ステップ4 職と日程を設定する
ナイフ 職種 実行する資産を選択します. 予定表 drives that job on a cron expression. We want fresh macro readings every weekday morning at 06:30 UTC — before the Frankfurt open — so any indicator released overnight is captured before European trading begins.
創る fxmacro_pipeline/jobs.pyありがとうございました
from dagster import (
AssetSelection,
ScheduleDefinition,
define_asset_job,
)
# ── Jobs ──────────────────────────────────────────────────────────────────────
macro_refresh_job = define_asset_job(
name="macro_refresh",
description="Refresh all FXMacroData indicator and FX rate assets.",
selection=AssetSelection.groups("macro_indicators", "fx_rates"),
)
calendar_refresh_job = define_asset_job(
name="calendar_refresh",
description="Pull the latest USD release calendar.",
selection=AssetSelection.groups("calendar"),
)
# ── Schedules ─────────────────────────────────────────────────────────────────
# Weekdays at 06:30 UTC — 30 min before Frankfurt opens
macro_daily_schedule = ScheduleDefinition(
name="macro_daily_at_0630_utc",
job=macro_refresh_job,
cron_schedule="30 6 * * 1-5",
execution_timezone="UTC",
)
# Refresh the release calendar every Sunday evening so the week's
# upcoming events are loaded before Monday's first run.
calendar_weekly_schedule = ScheduleDefinition(
name="calendar_weekly_on_sunday",
job=calendar_refresh_job,
cron_schedule="0 18 * * 0",
execution_timezone="UTC",
)
Two separate jobs — one for indicator series, one for the release calendar — let you observe and re-run them independently. If the release calendar fetch fails (for example during a public holiday when the endpoint returns an empty payload), the indicator refresh job is not affected.
ステップ5
ステップ5 すべてを定義オブジェクトに接続する
ダグスターの Definitions 資産,リソース,仕事,スケジュールを結びつける単一のエントリーポイントです. fxmacro_pipeline/__init__.pyありがとうございました
import os
from dagster import Definitions
from .assets import (
eur_policy_rate,
eurusd_spot,
usd_cpi,
usd_policy_rate,
usd_release_calendar,
)
from .jobs import (
calendar_refresh_job,
calendar_weekly_schedule,
macro_daily_schedule,
macro_refresh_job,
)
from .resources import FXMacroDataResource
defs = Definitions(
assets=[
usd_policy_rate,
usd_cpi,
eur_policy_rate,
eurusd_spot,
usd_release_calendar,
],
resources={
"fxmacrodata": FXMacroDataResource(
# Falls back to the FXMACRO_API_KEY env var inside the resource
api_key=os.environ.get("FXMACRO_API_KEY", ""),
),
},
jobs=[macro_refresh_job, calendar_refresh_job],
schedules=[macro_daily_schedule, calendar_weekly_schedule],
)
実行時にリソースインジェクション
報告する資産 fxmacrodata: FXMacroDataResource テスト用の模擬で交換するには,テスト中のリソースをオーバーライドします. Definitions 資産コードは変わらない
ステップ6
ステップ6 驚異的な膨張の印を警告するセンサーを追加
センサーは,外部の条件を調査し,何か変化したときのトリガーランまたは警告チャンネルを実行します. 次のセンサーは毎回実行します. usd_cpi 実現:最新のCPI値が前回の値より0.4パーセントポイント以上上がると,Webフックアラートが発信され,戦略やリスクシステムが即座に反応します.
追加する fxmacro_pipeline/sensors.pyありがとうございました
import os
import sqlite3
import requests as http
from dagster import (
RunStatusSensorContext,
asset_sensor,
AssetKey,
EventLogEntry,
SensorResult,
SkipReason,
)
_DB_PATH = os.environ.get("FXMACRO_DB", "fxmacro_data.db")
_ALERT_WEBHOOK = os.environ.get("ALERT_WEBHOOK_URL", "")
_SURPRISE_THRESHOLD = 0.4 # percentage points
@asset_sensor(asset_key=AssetKey("usd_cpi"), job_name="macro_refresh")
def cpi_surprise_alert_sensor(
context: RunStatusSensorContext,
asset_event: EventLogEntry,
) -> SensorResult | SkipReason:
"""Alert when the latest US CPI print is significantly above the prior reading."""
try:
conn = sqlite3.connect(_DB_PATH)
rows = conn.execute(
"SELECT date, val FROM usd_cpi ORDER BY date DESC LIMIT 2"
).fetchall()
conn.close()
except Exception as exc:
return SkipReason(f"DB read failed: {exc}")
if len(rows) < 2:
return SkipReason("Not enough CPI history yet.")
latest_date, latest_val = rows[0]
prior_date, prior_val = rows[1]
surprise = (latest_val or 0) - (prior_val or 0)
context.log.info(
"CPI %s: %.2f%% (prior %s: %.2f%%, surprise: %+.2f pp)",
latest_date, latest_val, prior_date, prior_val, surprise,
)
if surprise >= _SURPRISE_THRESHOLD and _ALERT_WEBHOOK:
payload = {
"text": (
f":fire: *USD CPI Surprise* — latest {latest_date}: *{latest_val:.1f}%* "
f"vs prior {prior_date}: {prior_val:.1f}% "
f"(surprise: *{surprise:+.2f} pp*)"
)
}
http.post(_ALERT_WEBHOOK, json=payload, timeout=8)
context.log.warning("CPI surprise alert fired: %+.2f pp", surprise)
return SensorResult(cursor=str(latest_date))
センサーを登録する __init__.py 輸入して 追加して Definitions
sensors リスト:
from .sensors import cpi_surprise_alert_sensor
defs = Definitions(
# ... existing fields ...
sensors=[cpi_surprise_alert_sensor],
)
ステップ7
ステップ7 Dagster UI を起動して最初のマテリアライゼーションを実行します
開発サーバーを起動する Definitions を抜いて
pyproject.toml 入力ポイント:
dagster dev
移動する http://localhost:3000じゃあ じゃあ 資産グラフ showing
all five assets grouped into their logical groups (macro_indicatorsほら fx_ratesほら
calendarクリックします すべて を 物質 に する 秒以内に最初のインジェステーションパスを実行するには,資産パネルは行数,最新の日付,FXMacroData APIから直接抽出した最新の値を表示します.
SQLite でデータを検証する
sqlite3 fxmacro_data.db
sqlite> SELECT date, val FROM usd_policy_rate ORDER BY date DESC LIMIT 5;
sqlite> SELECT date, val FROM usd_cpi ORDER BY date DESC LIMIT 5;
sqlite> SELECT date, val FROM eurusd_spot ORDER BY date DESC LIMIT 3;
プログラムを起動するには, 概要 → 日程 ダイグスターのUIに切り替える
macro_daily_at_0630_utc やってる 走ってる. Dagster's daemon process will now wake up
every weekday at 06:30 UTC and materialise the macro indicator assets automatically.
── ステップ8 テスト ──
ステップ8 模擬リソースでユニットテストを書く
One of Dagster's key design benefits is that assets are ordinary Python functions — easy to unit-test without running the full orchestration stack. Swap the live resource for a mock to keep tests fast and offline.
# tests/test_assets.py
import sqlite3
import tempfile
import os
from unittest.mock import MagicMock
import pytest
from dagster import build_asset_context, materialize
from fxmacro_pipeline.assets import usd_policy_rate, usd_cpi
from fxmacro_pipeline.resources import FXMacroDataResource
MOCK_RATE_DATA = [
{"date": "2025-12-18", "val": 4.5, "announcement_datetime": "2025-12-18T19:00:00Z"},
{"date": "2025-11-07", "val": 4.75, "announcement_datetime": "2025-11-07T19:00:00Z"},
]
MOCK_CPI_DATA = [
{"date": "2025-12-11", "val": 2.7, "announcement_datetime": "2025-12-11T13:30:00Z"},
{"date": "2025-11-13", "val": 2.6, "announcement_datetime": "2025-11-13T13:30:00Z"},
]
class MockFXMacroData(FXMacroDataResource):
def get_announcements(self, currency, indicator, start="2020-01-01"):
if indicator == "policy_rate":
return MOCK_RATE_DATA
if indicator == "inflation":
return MOCK_CPI_DATA
return []
def get_forex(self, base, quote, start="2023-01-01"):
return [{"date": "2025-12-31", "val": 1.0482, "announcement_datetime": None}]
def get_calendar(self, currency):
return []
@pytest.fixture(autouse=True)
def temp_db(tmp_path, monkeypatch):
monkeypatch.setenv("FXMACRO_DB", str(tmp_path / "test.db"))
def test_usd_policy_rate_materialises():
result = materialize(
[usd_policy_rate],
resources={"fxmacrodata": MockFXMacroData()},
)
assert result.success
mat = result.asset_materializations_for_node("usd_policy_rate")[0]
assert mat.metadata["row_count"].value == 2
assert mat.metadata["latest_val"].value == pytest.approx(4.5)
def test_usd_cpi_materialises():
result = materialize(
[usd_cpi],
resources={"fxmacrodata": MockFXMacroData()},
)
assert result.success
mat = result.asset_materializations_for_node("usd_cpi")[0]
assert mat.metadata["latest_val"].value == pytest.approx(2.7)
pip install pytest
pytest tests/ -v
── ステップ9 生産 ──
ステップ9 生産への導入
チームに自分のデモン管理なしでホスト制御機が欲しいチームのために ダグスター+
(the managed cloud offering) requires only a single additional configuration step: replace dagster dev
ダイグスター・クラウドのエージェントと 同じ文字が書かれています Definitions 実行します. 実行する
設定する (配備 → 環境変数名前で FXMACRO_API_KEY. リソースは自動的にそれをピックアップします.コード変更は必要ありません.同じパターンは,環境変数 (GitHub Actions, GitLab CI, CircleCI) として秘密をインジェクトするすべての CI/CD 環境に適用されます.
自動ホストされた Kubernetes クラスターに展開するには,パイプラインを Docker イメージとしてパッケージ化し,レジストリに押し,それに Dagster Helm チャートを指します. FXMacroData リソースは,出力接続します
fxmacrodata.com クラスタの出口ポリシーが,そのホストへの HTTPS を許可することを確認します.
概要 概説
あなたが作ったもの
このガイドに従って,あなたは今,作業するダグスターパイプラインを持っている:
- 定義します 繰り返し使用可能な,注射可能な
FXMacroDataResourcethat handles all API auth and error handling in one place - Exposes five software-defined assets — Fed rate, US CPI, ECB rate, EUR/USD spot, and the USD release calendar — each persisted to SQLite with full Dagster metadata
- クロン駆動のダグスタースケジュールで,欧州市場が開く前に常に最新値読をできるように,平日の更新を06:30 UTCでスケジュールします.
- CPIが上昇するときに Webhook アラートを発動し,下流システムに投票ループではなくイベント駆動トリガーを与えます
- 偽のリソースを使用して高速オフラインユニットテストを含みます.
次のステップ
- 追加通貨のパイプラインを拡張する 詳細の指標カタログを閲覧 /api-data-docs ファイルファイル 同じパターンで新しい資産を追加します
- 生産スケール用の Postgres または BigQuery I/O マネージャーで SQLite ストアを置き換える
- DBから読み,マクロディバージェンススコアを計算し,シグナルテーブルに取引信号を書き込むダウンストリーム資産を追加します 戦略ロジックを摂取層から分離します
- 探検する COT位置情報 指標シリーズを補完する 感情の重なりとして