Live release feed
Sub-second macro releases for FX backtests
Point-in-time history
Official CPI, jobs, GDP, and central-bank events with point-in-time history.
$25/month 14-day free trial
Start Free Trial

Implementation

How-To Guides

FXMacroData와 Dagster를 통합하는 방법

생산 준비가 된 Dagster 데이터 파이프 라인을 구축하여 FXMacroData에서 매크로 지표를 일정으로 가져와 결과를 로컬 데이터베이스에 저장하고 더 똑똑한 실행 시기를 위해 표면 달력 이벤트를 출시합니다.

다른 언어로도 제공 English
Share article X LinkedIn Email

다거스터는 파이썬 기반의 데이터 오케스트레이션 플랫폼입니다. 소프트웨어 정의 자산: 자신의 계통을 알고 있는 데이터 유물, 요구 또는 일정으로 실현될 수 있으며, 풍부한 메타데이터와 관찰성을 상자 밖으로 노출. 매크로 데이터 워크플로우 인디케이터 시리즈를 뽑는, 릴리스 이벤트를 감지, 스냅샷을 저장, 그리고 이상에 대한 경고 Dagster는 우수한 적합성입니다. 이 가이드는 FXMacroData 인디켓을 로컬 SQLite 스토어에 섭취하는 파이프 라인을 구축하는 과정을 걸으며, 다가오는 릴리즈 캘린더 이벤트를 표면화하고 매일 일정에 자동으로 실행합니다.

당신이 무엇을 만들 것인가

  • FXMacroData Dagster 리소스 모든 자산에 공유되는 재사용 가능한 구성 가능한 API 클라이언트
  • 소프트웨어 정의 자산 4개 정책금리, CPI, 외환 스팟, 선택된 통화 쌍의 출시 일정은
  • 일일 업무와 일정이 런던 오픈 전 아침 매일 진행되며 최신 자료를 업데이트합니다.
  • 이상 감지기 예상치 못한 인플레이션을 감시하고 웹 을 통해 경고를 발사합니다.

필수 조건

  • 파이썬 3.10+ 모든 스니펙트에는 현대적인 타입 힌트가 사용되고 match 진술
  • FXMacroData API 키 등록하세요 / 가입; USD 지표 데이터는 키 없이 공개적으로 접근할 수 있습니다.
  • 기본 Dagster 친숙성 자산과 일자리가 무엇인지 알아야 합니다. 대거스터 빠른 시작 10분 안에 기본 내용을 다룰 수 있습니다.

- 1단계

단계 1 Dagster 및 프로젝트 의존성을 설치

새로운 가상 환경을 만들고 이 파이프라인이 사용하는 작은 라이브러리들과 함께 Dagster를 설치합니다. dagster 그리고 dagster-webserver 같은 버전으로 가장 일반적인 설치 함정을 피합니다.

python -m venv .venv
source .venv/bin/activate            # Windows: .venv\Scripts\activate

pip install "dagster==1.9.*" "dagster-webserver==1.9.*" \
            requests pandas sqlalchemy

최소 프로젝트 레이아웃을 초기화합니다. Dagster의 scaffold 명령어는 디렉토리 구조를 만들고 pyproject.toml 로컬 UI가 자동으로 정의를 발견할 수 있게 해줍니다.

dagster project scaffold --name fxmacro_pipeline
cd fxmacro_pipeline

환경 변수로 API 키를 저장합니다. 소스 파일에서 하드 코드 자격 증명을 절대 하지 마십시오.

export FXMACRO_API_KEY="YOUR_API_KEY"

생산 호스트에서 키를 스케줄러의 비밀 관리 (GitHub Actions 비밀, Kubernetes 비밀, Dagster Cloud 환경 변수) 를 통해 주입합니다. 파이프 라인은 실행 시간에 환경으로부터 읽습니다. 코드 변경이 필요하지 않습니다.


- 2단계

단계 2 재사용 가능한 FXMacroData 리소스를 정의

다거스터 자원 공유, 주입 가능한 의존성입니다. 데이터베이스 연결 또는 HTTP 세션과 유사합니다. FXMacroData REST API를 리소스에 엮는 것은 모든 자산이 인증 논리 또는 타임 아웃 처리 중복 없이 호출 할 수 있음을 의미합니다.

창조해 fxmacro_pipeline/resources.py

import os
from typing import Any

import requests
from dagster import ConfigurableResource, get_dagster_logger

_BASE_URL = "https://fxmacrodata.com/api/v1"


class FXMacroDataResource(ConfigurableResource):
    """Thin wrapper around the FXMacroData REST API."""

    api_key: str = ""           # overridden at run-time from env
    timeout: int = 15

    def _get(self, path: str, **params: Any) -> dict:
        logger = get_dagster_logger()
        key = self.api_key or os.environ.get("FXMACRO_API_KEY", "")
        resp = requests.get(
            f"{_BASE_URL}{path}",
            params={"api_key": key, **params},
            timeout=self.timeout,
        )
        resp.raise_for_status()
        data = resp.json()
        logger.debug("GET %s → %d records", path, len(data.get("data", [])))
        return data

    def get_announcements(
        self,
        currency: str,
        indicator: str,
        start: str = "2020-01-01",
    ) -> list[dict]:
        """Return time-series records for a macro indicator."""
        result = self._get(
            f"/announcements/{currency.lower()}/{indicator}",
            start=start,
        )
        return result.get("data", [])

    def get_forex(self, base: str, quote: str, start: str = "2023-01-01") -> list[dict]:
        """Return daily FX spot-rate records."""
        result = self._get(f"/forex/{base.lower()}/{quote.lower()}", start=start)
        return result.get("data", [])

    def get_calendar(self, currency: str) -> list[dict]:
        """Return the upcoming release calendar for a currency."""
        result = self._get(f"/calendar/{currency.lower()}")
        return result.get("data", [])

왜 "ConfigurableResource"가 필요하죠?

ConfigurableResource 개발자와 프로드 사이로 인증서와 타임오웃을 교환할 수 있습니다. Definitions 그리고 Dagster는 매개 변수로 선언된 곳에 주입합니다.


- 3단계

단계 3 매크로 데이터 자산을 정의

각 Dagster 자산은 하나의 논리적 데이터 제품을 나타냅니다. 자산을 물질화하면 FXMacroData에서 최신 데이터를 가져오고 로컬로 유지됩니다. 우리는 네 가지 자산을 정의합니다: 미국 연방준비제도, 그 미국 CPI, 그 ECB 정책금리5번째 자산은 출시 일정을 읽고, 파이프 라인이 다음의 큰 영향력을 가진 발표가 언제 될지 알 수 있습니다.

창조해 fxmacro_pipeline/assets.py

import os
import sqlite3
from datetime import date, timedelta

import pandas as pd
from dagster import (
    AssetExecutionContext,
    MaterializeResult,
    MetadataValue,
    asset,
)

from .resources import FXMacroDataResource

_DB_PATH = os.environ.get("FXMACRO_DB", "fxmacro_data.db")


def _ensure_table(conn: sqlite3.Connection, table: str) -> None:
    conn.execute(
        f"""
        CREATE TABLE IF NOT EXISTS {table} (
            date TEXT PRIMARY KEY,
            val  REAL,
            announcement_datetime TEXT
        )
        """
    )
    conn.commit()


def _upsert(conn: sqlite3.Connection, table: str, records: list[dict]) -> int:
    rows = [
        (r["date"], r.get("val"), r.get("announcement_datetime"))
        for r in records
        if r.get("date") and r.get("val") is not None
    ]
    conn.executemany(
        f"INSERT OR REPLACE INTO {table} (date, val, announcement_datetime) VALUES (?,?,?)",
        rows,
    )
    conn.commit()
    return len(rows)


@asset(group_name="macro_indicators", compute_kind="api")
def usd_policy_rate(
    context: AssetExecutionContext,
    fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
    """Latest Fed funds rate series from the FXMacroData announcements endpoint."""
    records = fxmacrodata.get_announcements("usd", "policy_rate", start="2015-01-01")
    with sqlite3.connect(_DB_PATH) as conn:
        _ensure_table(conn, "usd_policy_rate")
        n = _upsert(conn, "usd_policy_rate", records)
    latest = records[0] if records else {}
    context.log.info("Fed rate: %s%% (as of %s)", latest.get("val"), latest.get("date"))
    return MaterializeResult(
        metadata={
            "row_count": MetadataValue.int(n),
            "latest_date": MetadataValue.text(latest.get("date", "n/a")),
            "latest_val": MetadataValue.float(latest.get("val") or 0.0),
        }
    )


@asset(group_name="macro_indicators", compute_kind="api")
def usd_cpi(
    context: AssetExecutionContext,
    fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
    """US CPI (year-on-year) series. Tracks whether inflation is rising or falling."""
    records = fxmacrodata.get_announcements("usd", "inflation", start="2015-01-01")
    with sqlite3.connect(_DB_PATH) as conn:
        _ensure_table(conn, "usd_cpi")
        n = _upsert(conn, "usd_cpi", records)
    latest = records[0] if records else {}
    context.log.info("US CPI: %s%% (as of %s)", latest.get("val"), latest.get("date"))
    return MaterializeResult(
        metadata={
            "row_count": MetadataValue.int(n),
            "latest_date": MetadataValue.text(latest.get("date", "n/a")),
            "latest_val": MetadataValue.float(latest.get("val") or 0.0),
        }
    )


@asset(group_name="macro_indicators", compute_kind="api")
def eur_policy_rate(
    context: AssetExecutionContext,
    fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
    """ECB main refinancing rate series."""
    records = fxmacrodata.get_announcements("eur", "policy_rate", start="2015-01-01")
    with sqlite3.connect(_DB_PATH) as conn:
        _ensure_table(conn, "eur_policy_rate")
        n = _upsert(conn, "eur_policy_rate", records)
    latest = records[0] if records else {}
    context.log.info("ECB rate: %s%% (as of %s)", latest.get("val"), latest.get("date"))
    return MaterializeResult(
        metadata={
            "row_count": MetadataValue.int(n),
            "latest_date": MetadataValue.text(latest.get("date", "n/a")),
            "latest_val": MetadataValue.float(latest.get("val") or 0.0),
        }
    )


@asset(group_name="fx_rates", compute_kind="api")
def eurusd_spot(
    context: AssetExecutionContext,
    fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
    """Daily EUR/USD closing spot rate."""
    records = fxmacrodata.get_forex("eur", "usd", start="2023-01-01")
    with sqlite3.connect(_DB_PATH) as conn:
        _ensure_table(conn, "eurusd_spot")
        n = _upsert(conn, "eurusd_spot", records)
    latest = records[0] if records else {}
    context.log.info("EUR/USD: %s (as of %s)", latest.get("val"), latest.get("date"))
    return MaterializeResult(
        metadata={
            "row_count": MetadataValue.int(n),
            "latest_date": MetadataValue.text(latest.get("date", "n/a")),
            "latest_val": MetadataValue.float(latest.get("val") or 0.0),
        }
    )


@asset(group_name="calendar", compute_kind="api")
def usd_release_calendar(
    context: AssetExecutionContext,
    fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
    """Upcoming USD macro release dates and times."""
    events = fxmacrodata.get_calendar("usd")
    df = pd.DataFrame(events) if events else pd.DataFrame()
    context.log.info("Found %d upcoming USD events", len(df))
    # Surface a preview in the Dagster UI asset metadata pane
    preview = df.head(5).to_markdown(index=False) if not df.empty else "No events found"
    return MaterializeResult(
        metadata={
            "event_count": MetadataValue.int(len(df)),
            "preview": MetadataValue.md(preview),
        }
    )

Dagster UI의 자산 메타데이터

모든 MaterializeResult 재산 세부 행 수, 최신 날짜, 미리보기 테이블에서 Dagster가 표시하는 구조화된 메타데이터를 반환합니다. 이것은 로그에 파고들지 않고 실제로 새로운 데이터를 가져온지 한눈에 확인할 수 있음을 의미합니다.


4단계

단계 4 일 일과 일정을 정해두세요

다거스터 직업 어떤 자산이 한 번에 실현될지를 선택합니다. 일정은 크론 표현식으로 작동합니다. 우리는 매일 아침 6시 30분 UTC에서 새로운 매크로 판독을 원합니다.

창조해 fxmacro_pipeline/jobs.py

from dagster import (
    AssetSelection,
    ScheduleDefinition,
    define_asset_job,
)

# ── Jobs ──────────────────────────────────────────────────────────────────────

macro_refresh_job = define_asset_job(
    name="macro_refresh",
    description="Refresh all FXMacroData indicator and FX rate assets.",
    selection=AssetSelection.groups("macro_indicators", "fx_rates"),
)

calendar_refresh_job = define_asset_job(
    name="calendar_refresh",
    description="Pull the latest USD release calendar.",
    selection=AssetSelection.groups("calendar"),
)

# ── Schedules ─────────────────────────────────────────────────────────────────

# Weekdays at 06:30 UTC — 30 min before Frankfurt opens
macro_daily_schedule = ScheduleDefinition(
    name="macro_daily_at_0630_utc",
    job=macro_refresh_job,
    cron_schedule="30 6 * * 1-5",
    execution_timezone="UTC",
)

# Refresh the release calendar every Sunday evening so the week's
# upcoming events are loaded before Monday's first run.
calendar_weekly_schedule = ScheduleDefinition(
    name="calendar_weekly_on_sunday",
    job=calendar_refresh_job,
    cron_schedule="0 18 * * 0",
    execution_timezone="UTC",
)

두 개의 별도의 작업 하나 표시기 시리즈, 하나 릴리스 캘린더 당신이 관찰하고 독립적으로 다시 실행 할 수 있습니다. 출시 캘린딩 검색 실패 (예를 들어 공휴일 동안 최종 지점이 빈 페이로드를 반환 할 때), 표시기 갱신 작업은 영향을받지 않습니다.


- 5단계

단계 5 모든 것을 정의 객체로 연결

다거스터의 Definitions 이 객체는 자산, 자원, 일자리, 스케줄을 연결하는 단일 입구점입니다. fxmacro_pipeline/__init__.py

import os

from dagster import Definitions

from .assets import (
    eur_policy_rate,
    eurusd_spot,
    usd_cpi,
    usd_policy_rate,
    usd_release_calendar,
)
from .jobs import (
    calendar_refresh_job,
    calendar_weekly_schedule,
    macro_daily_schedule,
    macro_refresh_job,
)
from .resources import FXMacroDataResource

defs = Definitions(
    assets=[
        usd_policy_rate,
        usd_cpi,
        eur_policy_rate,
        eurusd_spot,
        usd_release_calendar,
    ],
    resources={
        "fxmacrodata": FXMacroDataResource(
            # Falls back to the FXMACRO_API_KEY env var inside the resource
            api_key=os.environ.get("FXMACRO_API_KEY", ""),
        ),
    },
    jobs=[macro_refresh_job, calendar_refresh_job],
    schedules=[macro_daily_schedule, calendar_weekly_schedule],
)

실행시 리소스 주입

어떤 자산도 신고합니다 fxmacrodata: FXMacroDataResource 매개 변수로 실행 시 자동으로 구성된 인스턴스를 수신합니다. 테스트를 위해 모의 인스턴스 교환을 위해 테스트에서 리소스를 오버라이드합니다. Definitions 자산 코드는 변하지 않습니다.


- 6단계 -

단계 6 놀라울 정도로 팽창한 표지에 대한 경고를 위한 센서를 추가합니다

외부 조건에 대 한 Dagster 조회 센서 및 트리거 실행 또는 뭔가 변경되면 경고 채널 다음 센서는 모든 다음을 실행 usd_cpi 실제화: 최신 CPI 가치가 이전보다 0.4 퍼센트 포인트 이상 높으면 웹 알림을 발사하여 전략 또는 위험 시스템이 즉시 반응 할 수 있습니다.

추가해 fxmacro_pipeline/sensors.py

import os
import sqlite3

import requests as http
from dagster import (
    RunStatusSensorContext,
    asset_sensor,
    AssetKey,
    EventLogEntry,
    SensorResult,
    SkipReason,
)

_DB_PATH = os.environ.get("FXMACRO_DB", "fxmacro_data.db")
_ALERT_WEBHOOK = os.environ.get("ALERT_WEBHOOK_URL", "")
_SURPRISE_THRESHOLD = 0.4  # percentage points


@asset_sensor(asset_key=AssetKey("usd_cpi"), job_name="macro_refresh")
def cpi_surprise_alert_sensor(
    context: RunStatusSensorContext,
    asset_event: EventLogEntry,
) -> SensorResult | SkipReason:
    """Alert when the latest US CPI print is significantly above the prior reading."""
    try:
        conn = sqlite3.connect(_DB_PATH)
        rows = conn.execute(
            "SELECT date, val FROM usd_cpi ORDER BY date DESC LIMIT 2"
        ).fetchall()
        conn.close()
    except Exception as exc:
        return SkipReason(f"DB read failed: {exc}")

    if len(rows) < 2:
        return SkipReason("Not enough CPI history yet.")

    latest_date, latest_val = rows[0]
    prior_date, prior_val = rows[1]
    surprise = (latest_val or 0) - (prior_val or 0)

    context.log.info(
        "CPI %s: %.2f%% (prior %s: %.2f%%, surprise: %+.2f pp)",
        latest_date, latest_val, prior_date, prior_val, surprise,
    )

    if surprise >= _SURPRISE_THRESHOLD and _ALERT_WEBHOOK:
        payload = {
            "text": (
                f":fire: *USD CPI Surprise* — latest {latest_date}: *{latest_val:.1f}%* "
                f"vs prior {prior_date}: {prior_val:.1f}% "
                f"(surprise: *{surprise:+.2f} pp*)"
            )
        }
        http.post(_ALERT_WEBHOOK, json=payload, timeout=8)
        context.log.warning("CPI surprise alert fired: %+.2f pp", surprise)

    return SensorResult(cursor=str(latest_date))

센서를 입력하세요 __init__.py 이 정보를 가져와서 Definitions sensors 목록:

from .sensors import cpi_surprise_alert_sensor

defs = Definitions(
    # ... existing fields ...
    sensors=[cpi_surprise_alert_sensor],
)

7단계

단계 7 Dagster UI를 실행하고 첫 번째 구현을 실행

로컬 개발 서버를 시작해 Definitions 을 통해 pyproject.toml 입국 지점:

dagster dev

네비게이션 http://localhost:3000- 당신은 을 보게 될거야 자산 그래프 이 다섯 가지 자산 모두 논리 그룹으로 그룹화되어 있습니다 (macro_indicators fx_rates calendar클릭합니다. 모든 것 을 물질화 하라 첫 번째 흡입 패스를 실행하려면, 자산 은 몇 초 이내에, 라인 카운트, 최신 날짜, FXMacroData API에서 직접 뽑은 최신 값을 표시합니다.

SQLite에서 데이터를 확인

sqlite3 fxmacro_data.db
sqlite> SELECT date, val FROM usd_policy_rate ORDER BY date DESC LIMIT 5;
sqlite> SELECT date, val FROM usd_cpi ORDER BY date DESC LIMIT 5;
sqlite> SELECT date, val FROM eurusd_spot ORDER BY date DESC LIMIT 3;

스케줄을 활성화하려면, 개요 → 일정은 스터 UI에서 그룹을 니다 macro_daily_at_0630_utc 달리고. 더거스터의 대몬 프로세스는 이제 매주 평일 6시 30분 UTC에서 일어나 자동으로 매크로 지표 자산을 구현합니다.


단계 8: 테스트

단계 8 모의 자원을 사용하여 단위 테스트를 작성

다거스터의 주요 디자인 장점 중 하나는 자산이 일반적인 파이썬 함수라는 것입니다. 전체 오케스트레이션 스택을 실행하지 않고도 단위 테스트가 쉽습니다. 테스트를 빠르고 오프라인으로 유지하기 위해 라이브 리소스를 모형으로 교환하십시오.

# tests/test_assets.py
import sqlite3
import tempfile
import os

from unittest.mock import MagicMock

import pytest
from dagster import build_asset_context, materialize

from fxmacro_pipeline.assets import usd_policy_rate, usd_cpi
from fxmacro_pipeline.resources import FXMacroDataResource


MOCK_RATE_DATA = [
    {"date": "2025-12-18", "val": 4.5, "announcement_datetime": "2025-12-18T19:00:00Z"},
    {"date": "2025-11-07", "val": 4.75, "announcement_datetime": "2025-11-07T19:00:00Z"},
]

MOCK_CPI_DATA = [
    {"date": "2025-12-11", "val": 2.7, "announcement_datetime": "2025-12-11T13:30:00Z"},
    {"date": "2025-11-13", "val": 2.6, "announcement_datetime": "2025-11-13T13:30:00Z"},
]


class MockFXMacroData(FXMacroDataResource):
    def get_announcements(self, currency, indicator, start="2020-01-01"):
        if indicator == "policy_rate":
            return MOCK_RATE_DATA
        if indicator == "inflation":
            return MOCK_CPI_DATA
        return []

    def get_forex(self, base, quote, start="2023-01-01"):
        return [{"date": "2025-12-31", "val": 1.0482, "announcement_datetime": None}]

    def get_calendar(self, currency):
        return []


@pytest.fixture(autouse=True)
def temp_db(tmp_path, monkeypatch):
    monkeypatch.setenv("FXMACRO_DB", str(tmp_path / "test.db"))


def test_usd_policy_rate_materialises():
    result = materialize(
        [usd_policy_rate],
        resources={"fxmacrodata": MockFXMacroData()},
    )
    assert result.success
    mat = result.asset_materializations_for_node("usd_policy_rate")[0]
    assert mat.metadata["row_count"].value == 2
    assert mat.metadata["latest_val"].value == pytest.approx(4.5)


def test_usd_cpi_materialises():
    result = materialize(
        [usd_cpi],
        resources={"fxmacrodata": MockFXMacroData()},
    )
    assert result.success
    mat = result.asset_materializations_for_node("usd_cpi")[0]
    assert mat.metadata["latest_val"].value == pytest.approx(2.7)
pip install pytest
pytest tests/ -v

단계 9: 생산

단계 9 Dagster+로 생산에 도입

자신의 대몬을 관리하지 않고 호스트 제어 비행기를 원하는 팀에 대한, 다거스터+ (관리된 클라우드 제공) 는 단지 하나의 추가 구성 단계만 필요합니다. dagster dev 다거스터 클라우드 에이전트와 같은 글씨입니다 Definitions 객체와 실행 실행에 당신의 인프라.

당신의 API 키를 Dagster+ 환경 변수로 설정 (배포 → 환경 변수이름과 함께 FXMACRO_API_KEY. 자원은 자동으로 코드를 변경할 필요가 없습니다. 동일한 패턴은 환경 변수 (GitHub Actions, GitLab CI, CircleCI) 로 비밀을 주입하는 모든 CI/CD 환경에 적용됩니다.

자체 호스팅 Kubernetes 클러스터에 배포하려면 파이프 라인을 Docker 이미지로 패키지하고 레지스트리에 밀어 넣고 Dagster Helm 차트를 가리키십시오. FXMacroData 리소스는 출력으로 연결됩니다 fxmacrodata.com 클러스터의 출입 정책이 해당 호스트로 HTTPS를 허용하는지 확인합니다.


── 요약 ──

당신이 만든

이 가이드를 따라가면 이제 작동하는 Dagster 파이프라인을 갖게 됩니다.

  • 재사용 가능한 주사용으로 정의합니다. FXMacroDataResource 모든 API 인증과 오류 처리 기능을 한 곳에서 처리합니다.
  • 소프트웨어 정의 자산 5개 FED 금리, 미국 CPI, ECB 금리 EUR/USD 스팟, 그리고 USD 출시 달력 각각은 완전한 Dagster 메타데이터를 SQLite에 지속
  • 크론 구동 Dagster 스케줄을 통해 06:30 UTC에서 평일 업데이트를 스케줄하여 유럽 시장이 열리기 전에 항상 최신 판독을 할 수 있습니다.
  • CPI가 인상되는 경우 웹후크 알림을 발사하여 아래줄기 시스템에 투표 루프가 아닌 이벤트에 의한 트리거를 제공합니다.
  • 모의 리소스를 사용하여 빠른 오프라인 단위 테스트를 포함합니다. 그래서 CI 파이프 라인은 라이브 API 호출을 하지 않습니다.

다음 단계

  • 추가 통화로 파이프 라인을 확장하십시오 /api-data-docs 그리고 같은 패턴을 따라 새로운 자산을 추가
  • 생산용 규모를 위한 Postgres 또는 BigQuery I/O 관리자로 SQLite 저장소를 대체
  • DB에서 읽고 매크로 디버전스 점수를 계산하고 신호 테이블에 거래 신호를 작성하는 하류 자산을 추가하십시오
  • 탐험해 COT 위치 데이터 인디케이터 시리즈를 보완하는 감정 덮개로

Blogroll

AI Answer-Ready

Key Facts

Page
How To Integrate FXmacrodata With Dagster
Section
Articles
Canonical URL
https://fxmacrodata.com/articles/how-to-integrate-fxmacrodata-with-dagster
Source
FXMacroData editorial and official publisher references
Last Updated
2026-06-15 11:06 UTC

Provenance And Trust

Cite the canonical URL and source field above. Where available, this page maps to official publisher releases and timestamped updates.

Quick Q&A

What is this page about? This page explains How To Integrate FXmacrodata With Dagster with directly usable context for trading, research, and API workflows.

What source should be cited? Use the canonical URL and the listed source field; cite official publisher references when available.

How fresh is this content? The last updated value above reflects the page metadata or latest available data timestamp.

Can this be used in AI assistants? Yes. This section is intentionally structured for retrieval and citation in chat assistants.

Prompt Packs

Use these in ChatGPT, Claude, Gemini, Mistral, Perplexity, or Grok for consistent source-aware outputs.

Share page X LinkedIn Email