Live release feed
Sub-second macro releases for FX backtests
Point-in-time history
Official CPI, jobs, GDP, and central-bank events with point-in-time history.
$25/month 14-day free trial
Start Free Trial

Implementation

How-To Guides

डैगस्टर के साथ FXMacroData को कैसे एकीकृत करें

एक उत्पादन के लिए तैयार डैगस्टर डेटा पाइपलाइन बनाएं जो एक अनुसूची पर FXMacroData से मैक्रो संकेतक खींचता है, स्थानीय डेटाबेस में परिणाम संग्रहीत करता है, और स्मार्ट निष्पादन समय के लिए कैलेंडर की घटनाओं को रिलीज़ करता है।

इसमें भी उपलब्ध है English
Share article X LinkedIn Email

डैगस्टर एक पायथन मूल डेटा ऑर्केस्ट्रेशन प्लेटफॉर्म है जो की अवधारणा के आसपास बनाया गया है सॉफ्टवेयर-परिभाषित संपत्ति: डेटा कलाकृतियां जो अपनी वंशावली जानते हैं, उन्हें मांग पर या एक कार्यक्रम पर भौतिक बनाया जा सकता है, और बॉक्स से बाहर समृद्ध मेटाडेटा और अवलोकनशीलता को उजागर कर सकते हैं। मैक्रो डेटा वर्कफ़्लो के लिए सूचक श्रृंखला खींचना, रिलीज घटनाओं का पता लगाना, स्नैपशॉट संग्रहीत करना, और विसंगतियों पर चेतावनी देना डैगस्टर एक उत्कृष्ट फिट है। यह गाइड एक पाइपलाइन के निर्माण के माध्यम से चलता है जो स्थानीय एसक्यूएलइट स्टोर में एफएक्समैक्रोडाटा संकेतकों को निगलता है, आगामी रिलीज कैलेंडर घटनाओं को सतह देता है, तथा दैनिक कार्यक्रम पर स्वचालित रूप से चलता हैं।

आप क्या बनाएंगे

  • एक FXMacroData डैगस्टर संसाधन एक पुनः प्रयोज्य, विन्यास योग्य एपीआई क्लाइंट सभी परिसंपत्तियों में साझा किया गया
  • चार सॉफ्टवेयर-परिभाषित संपत्ति नीतिगत दरें, सीपीआई, विदेशी मुद्रा स्पॉट और चयनित मुद्रा जोड़ी के लिए रिलीज़ कैलेंडर
  • रोजमर्रा की नौकरी और कार्यक्रम नवीनतम रीडिंग को ताज़ा करने के लिए लंदन के उद्घाटन से पहले हर सप्ताह की सुबह चलता है
  • विसंगति सेंसर अप्रत्याशित मुद्रास्फीति के लिए घड़ी प्रिंट और एक वेबहुक के माध्यम से एक अलर्ट फायर करता है

पूर्व शर्तें

  • पायथन 3.10+ सभी स्निपेट्स आधुनिक टाइप संकेतों का उपयोग करते हैं और match बयान
  • FXMacroData एपीआई कुंजी पर साइन अप करें /अपना नाम लिखें; अमरीकी डालर सूचक डेटा सार्वजनिक रूप से कुंजी के बिना सुलभ है
  • मूल डैगस्टर परिचितता आपको पता होना चाहिए कि संपत्ति और नौकरी क्या हैं; डैगस्टर त्वरित प्रारंभ दस मिनट में मूल बातें बताता है

चरण 1

चरण 1 डैगस्टर और परियोजना निर्भरताएँ स्थापित करें

एक नया आभासी वातावरण बनाएँ और इस पाइपलाइन का उपयोग करता है पुस्तकालयों के छोटे सेट के साथ Dagster स्थापित करें. dagster और dagster-webserver एक ही संस्करण के लिए सबसे आम स्थापना जाल से बचता है।

python -m venv .venv
source .venv/bin/activate            # Windows: .venv\Scripts\activate

pip install "dagster==1.9.*" "dagster-webserver==1.9.*" \
            requests pandas sqlalchemy

एक न्यूनतम परियोजना लेआउट आरंभ करें. डैगस्टर के मचान आदेश निर्देशिका संरचना बनाता है और एक pyproject.toml जो स्थानीय यूआई को आपकी परिभाषाओं को स्वचालित रूप से पता लगाने देता है।

dagster project scaffold --name fxmacro_pipeline
cd fxmacro_pipeline

अपनी एपीआई कुंजी को पर्यावरण चर के रूप में स्टोर करें. स्रोत फ़ाइलों में हार्ड-कोड क्रेडेंशियल्स कभी नहीं.

export FXMACRO_API_KEY="YOUR_API_KEY"

उत्पादन मेजबानों पर, अपने शेड्यूलर के गुप्त प्रबंधन (GitHub क्रिया रहस्य, Kubernetes रहस्य, Dagster क्लाउड पर्यावरण चर) के माध्यम से कुंजी इंजेक्ट करें। पाइपलाइन इसे रनटाइम पर पर्यावरण से पढ़ती है कोड में कोई बदलाव की आवश्यकता नहीं है।


- चरण 2

चरण 2 एक पुनः प्रयोज्य FXMacroData संसाधन को परिभाषित करें

एक डैगस्टर संसाधन एक साझा, इंजेक्शन निर्भरता है एक डेटाबेस कनेक्शन या HTTP सत्र के अनुरूप। एक संसाधन में FXMacroData REST एपीआई को लपेटने का मतलब है कि प्रत्येक संपत्ति प्रमाणीकरण तर्क या टाइमआउट हैंडलिंग को दोहराए बिना इसे कॉल कर सकती है।

सृजन करना fxmacro_pipeline/resources.py

import os
from typing import Any

import requests
from dagster import ConfigurableResource, get_dagster_logger

_BASE_URL = "https://fxmacrodata.com/api/v1"


class FXMacroDataResource(ConfigurableResource):
    """Thin wrapper around the FXMacroData REST API."""

    api_key: str = ""           # overridden at run-time from env
    timeout: int = 15

    def _get(self, path: str, **params: Any) -> dict:
        logger = get_dagster_logger()
        key = self.api_key or os.environ.get("FXMACRO_API_KEY", "")
        resp = requests.get(
            f"{_BASE_URL}{path}",
            params={"api_key": key, **params},
            timeout=self.timeout,
        )
        resp.raise_for_status()
        data = resp.json()
        logger.debug("GET %s → %d records", path, len(data.get("data", [])))
        return data

    def get_announcements(
        self,
        currency: str,
        indicator: str,
        start: str = "2020-01-01",
    ) -> list[dict]:
        """Return time-series records for a macro indicator."""
        result = self._get(
            f"/announcements/{currency.lower()}/{indicator}",
            start=start,
        )
        return result.get("data", [])

    def get_forex(self, base: str, quote: str, start: str = "2023-01-01") -> list[dict]:
        """Return daily FX spot-rate records."""
        result = self._get(f"/forex/{base.lower()}/{quote.lower()}", start=start)
        return result.get("data", [])

    def get_calendar(self, currency: str) -> list[dict]:
        """Return the upcoming release calendar for a currency."""
        result = self._get(f"/calendar/{currency.lower()}")
        return result.get("data", [])

कॉन्फ़िगर करने योग्य संसाधन क्यों?

ConfigurableResource आप संसाधन एक बार में विन्यस्त करता है Definitions और Dagster इसे इंजेक्ट करता है जहां भी यह एक पैरामीटर के रूप में घोषित किया जाता है।


चरण 3

चरण 3 मैक्रो डेटा परिसंपत्तियों को परिभाषित करें

प्रत्येक डैगस्टर संपत्ति एक तार्किक डेटा उत्पाद का प्रतिनिधित्व करती है। एक संपत्ति को भौतिक बनाना FXMacroData से नवीनतम डेटा प्राप्त करता है और इसे स्थानीय रूप से बनाए रखता है। हम चार संपत्ति को परिभाषित करते हैंः फेड नीतिगत दर, को अमेरिकी सीपीआई, को ईसीबी नीतिगत दर, और EUR/USD स्पॉट श्रृंखला। फिर एक पांचवीं संपत्ति रिलीज़ कैलेंडर पढ़ती है ताकि पाइपलाइन को पता चले कि अगली उच्च प्रभाव वाली घोषणाएं कब होने वाली हैं।

सृजन करना fxmacro_pipeline/assets.py

import os
import sqlite3
from datetime import date, timedelta

import pandas as pd
from dagster import (
    AssetExecutionContext,
    MaterializeResult,
    MetadataValue,
    asset,
)

from .resources import FXMacroDataResource

_DB_PATH = os.environ.get("FXMACRO_DB", "fxmacro_data.db")


def _ensure_table(conn: sqlite3.Connection, table: str) -> None:
    conn.execute(
        f"""
        CREATE TABLE IF NOT EXISTS {table} (
            date TEXT PRIMARY KEY,
            val  REAL,
            announcement_datetime TEXT
        )
        """
    )
    conn.commit()


def _upsert(conn: sqlite3.Connection, table: str, records: list[dict]) -> int:
    rows = [
        (r["date"], r.get("val"), r.get("announcement_datetime"))
        for r in records
        if r.get("date") and r.get("val") is not None
    ]
    conn.executemany(
        f"INSERT OR REPLACE INTO {table} (date, val, announcement_datetime) VALUES (?,?,?)",
        rows,
    )
    conn.commit()
    return len(rows)


@asset(group_name="macro_indicators", compute_kind="api")
def usd_policy_rate(
    context: AssetExecutionContext,
    fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
    """Latest Fed funds rate series from the FXMacroData announcements endpoint."""
    records = fxmacrodata.get_announcements("usd", "policy_rate", start="2015-01-01")
    with sqlite3.connect(_DB_PATH) as conn:
        _ensure_table(conn, "usd_policy_rate")
        n = _upsert(conn, "usd_policy_rate", records)
    latest = records[0] if records else {}
    context.log.info("Fed rate: %s%% (as of %s)", latest.get("val"), latest.get("date"))
    return MaterializeResult(
        metadata={
            "row_count": MetadataValue.int(n),
            "latest_date": MetadataValue.text(latest.get("date", "n/a")),
            "latest_val": MetadataValue.float(latest.get("val") or 0.0),
        }
    )


@asset(group_name="macro_indicators", compute_kind="api")
def usd_cpi(
    context: AssetExecutionContext,
    fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
    """US CPI (year-on-year) series. Tracks whether inflation is rising or falling."""
    records = fxmacrodata.get_announcements("usd", "inflation", start="2015-01-01")
    with sqlite3.connect(_DB_PATH) as conn:
        _ensure_table(conn, "usd_cpi")
        n = _upsert(conn, "usd_cpi", records)
    latest = records[0] if records else {}
    context.log.info("US CPI: %s%% (as of %s)", latest.get("val"), latest.get("date"))
    return MaterializeResult(
        metadata={
            "row_count": MetadataValue.int(n),
            "latest_date": MetadataValue.text(latest.get("date", "n/a")),
            "latest_val": MetadataValue.float(latest.get("val") or 0.0),
        }
    )


@asset(group_name="macro_indicators", compute_kind="api")
def eur_policy_rate(
    context: AssetExecutionContext,
    fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
    """ECB main refinancing rate series."""
    records = fxmacrodata.get_announcements("eur", "policy_rate", start="2015-01-01")
    with sqlite3.connect(_DB_PATH) as conn:
        _ensure_table(conn, "eur_policy_rate")
        n = _upsert(conn, "eur_policy_rate", records)
    latest = records[0] if records else {}
    context.log.info("ECB rate: %s%% (as of %s)", latest.get("val"), latest.get("date"))
    return MaterializeResult(
        metadata={
            "row_count": MetadataValue.int(n),
            "latest_date": MetadataValue.text(latest.get("date", "n/a")),
            "latest_val": MetadataValue.float(latest.get("val") or 0.0),
        }
    )


@asset(group_name="fx_rates", compute_kind="api")
def eurusd_spot(
    context: AssetExecutionContext,
    fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
    """Daily EUR/USD closing spot rate."""
    records = fxmacrodata.get_forex("eur", "usd", start="2023-01-01")
    with sqlite3.connect(_DB_PATH) as conn:
        _ensure_table(conn, "eurusd_spot")
        n = _upsert(conn, "eurusd_spot", records)
    latest = records[0] if records else {}
    context.log.info("EUR/USD: %s (as of %s)", latest.get("val"), latest.get("date"))
    return MaterializeResult(
        metadata={
            "row_count": MetadataValue.int(n),
            "latest_date": MetadataValue.text(latest.get("date", "n/a")),
            "latest_val": MetadataValue.float(latest.get("val") or 0.0),
        }
    )


@asset(group_name="calendar", compute_kind="api")
def usd_release_calendar(
    context: AssetExecutionContext,
    fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
    """Upcoming USD macro release dates and times."""
    events = fxmacrodata.get_calendar("usd")
    df = pd.DataFrame(events) if events else pd.DataFrame()
    context.log.info("Found %d upcoming USD events", len(df))
    # Surface a preview in the Dagster UI asset metadata pane
    preview = df.head(5).to_markdown(index=False) if not df.empty else "No events found"
    return MaterializeResult(
        metadata={
            "event_count": MetadataValue.int(len(df)),
            "preview": MetadataValue.md(preview),
        }
    )

डैगस्टर यूआई में परिसंपत्ति मेटाडेटा

हर MaterializeResult संरचनात्मक मेटाडेटा देता है जो कि Dagster संपत्ति विवरण फलक में प्रस्तुत करता है पंक्ति गणना, नवीनतम तिथियां, पूर्वावलोकन तालिकाएं। इसका मतलब है कि आप एक नज़र में जांच सकते हैं कि क्या एक भौतिककरण ने वास्तव में लॉग में खोदने के बिना ताजा डेटा प्राप्त किया है।


चरण 4

चरण 4 एक नौकरी और एक दैनिक कार्यक्रम बनाएं

एक डैगस्टर काम एक बार में कौन सी संपत्ति का निर्माण किया जाना है, इसका चयन करता है। अनुसूची हम ताजा मैक्रो रीडिंग हर कार्यदिवस सुबह 06:30 UTC फ्रैंकफर्ट खोलने से पहले चाहते हैं तो किसी भी संकेतक रात भर जारी किया यूरोपीय व्यापार शुरू होने से पहले कब्जा कर लिया है।

सृजन करना fxmacro_pipeline/jobs.py

from dagster import (
    AssetSelection,
    ScheduleDefinition,
    define_asset_job,
)

# ── Jobs ──────────────────────────────────────────────────────────────────────

macro_refresh_job = define_asset_job(
    name="macro_refresh",
    description="Refresh all FXMacroData indicator and FX rate assets.",
    selection=AssetSelection.groups("macro_indicators", "fx_rates"),
)

calendar_refresh_job = define_asset_job(
    name="calendar_refresh",
    description="Pull the latest USD release calendar.",
    selection=AssetSelection.groups("calendar"),
)

# ── Schedules ─────────────────────────────────────────────────────────────────

# Weekdays at 06:30 UTC — 30 min before Frankfurt opens
macro_daily_schedule = ScheduleDefinition(
    name="macro_daily_at_0630_utc",
    job=macro_refresh_job,
    cron_schedule="30 6 * * 1-5",
    execution_timezone="UTC",
)

# Refresh the release calendar every Sunday evening so the week's
# upcoming events are loaded before Monday's first run.
calendar_weekly_schedule = ScheduleDefinition(
    name="calendar_weekly_on_sunday",
    job=calendar_refresh_job,
    cron_schedule="0 18 * * 0",
    execution_timezone="UTC",
)

दो अलग-अलग कार्य एक सूचक श्रृंखला के लिए, एक रिलीज कैलेंडर के लिए आप स्वतंत्र रूप से उन्हें अवलोकन और फिर से चलाने के लिए अनुमति देते हैं। यदि रिलीज कैलेण्डर फ़ॉच विफल हो जाता है (उदाहरण के लिए एक सार्वजनिक अवकाश के दौरान जब एंडपॉइंट खाली पेलोड वापस करता है), तो सूचक ताज़ा कार्य प्रभावित नहीं होता है।


चरण 5

चरण 5 सब कुछ परिभाषाओं वस्तु में तार

डैगस्टर का Definitions वस्तु एक एकल प्रवेश बिंदु है जो संपत्ति, संसाधन, नौकरियों और कार्यक्रमों को एक साथ जोड़ता है। fxmacro_pipeline/__init__.py

import os

from dagster import Definitions

from .assets import (
    eur_policy_rate,
    eurusd_spot,
    usd_cpi,
    usd_policy_rate,
    usd_release_calendar,
)
from .jobs import (
    calendar_refresh_job,
    calendar_weekly_schedule,
    macro_daily_schedule,
    macro_refresh_job,
)
from .resources import FXMacroDataResource

defs = Definitions(
    assets=[
        usd_policy_rate,
        usd_cpi,
        eur_policy_rate,
        eurusd_spot,
        usd_release_calendar,
    ],
    resources={
        "fxmacrodata": FXMacroDataResource(
            # Falls back to the FXMACRO_API_KEY env var inside the resource
            api_key=os.environ.get("FXMACRO_API_KEY", ""),
        ),
    },
    jobs=[macro_refresh_job, calendar_refresh_job],
    schedules=[macro_daily_schedule, calendar_weekly_schedule],
)

रनटाइम पर संसाधन इंजेक्शन

किसी भी संपत्ति जो घोषणा करता है fxmacrodata: FXMacroDataResource एक पैरामीटर के रूप में स्वचालित रूप से रन-टाइम पर विन्यस्त उदाहरण प्राप्त करता है. परीक्षण के लिए एक नकली में स्वैप करने के लिए, अपने परीक्षण में संसाधन को ओवरराइट करें Definitions परिसंपत्ति कोड में कोई परिवर्तन नहीं होता है।


चरण 6

चरण 6 आश्चर्यजनक मुद्रास्फीति के निशान पर चेतावनी देने के लिए एक सेंसर जोड़ें

सेंसर में Dagster सर्वेक्षण बाहरी परिस्थितियों के लिए और ट्रिगर चलता है या चेतावनी चैनलों जब कुछ बदल जाता है। निम्नलिखित सेंसर हर बाद चलता है usd_cpi वास्तविकताः यदि नवीनतम सीपीआई रीडिंग पिछले रीडिंग से 0.4 प्रतिशत अंक से अधिक है, तो यह एक वेबहुक अलर्ट चलाता है ताकि आपकी रणनीति या जोखिम प्रणाली तुरंत प्रतिक्रिया कर सके।

जोड़ें fxmacro_pipeline/sensors.py

import os
import sqlite3

import requests as http
from dagster import (
    RunStatusSensorContext,
    asset_sensor,
    AssetKey,
    EventLogEntry,
    SensorResult,
    SkipReason,
)

_DB_PATH = os.environ.get("FXMACRO_DB", "fxmacro_data.db")
_ALERT_WEBHOOK = os.environ.get("ALERT_WEBHOOK_URL", "")
_SURPRISE_THRESHOLD = 0.4  # percentage points


@asset_sensor(asset_key=AssetKey("usd_cpi"), job_name="macro_refresh")
def cpi_surprise_alert_sensor(
    context: RunStatusSensorContext,
    asset_event: EventLogEntry,
) -> SensorResult | SkipReason:
    """Alert when the latest US CPI print is significantly above the prior reading."""
    try:
        conn = sqlite3.connect(_DB_PATH)
        rows = conn.execute(
            "SELECT date, val FROM usd_cpi ORDER BY date DESC LIMIT 2"
        ).fetchall()
        conn.close()
    except Exception as exc:
        return SkipReason(f"DB read failed: {exc}")

    if len(rows) < 2:
        return SkipReason("Not enough CPI history yet.")

    latest_date, latest_val = rows[0]
    prior_date, prior_val = rows[1]
    surprise = (latest_val or 0) - (prior_val or 0)

    context.log.info(
        "CPI %s: %.2f%% (prior %s: %.2f%%, surprise: %+.2f pp)",
        latest_date, latest_val, prior_date, prior_val, surprise,
    )

    if surprise >= _SURPRISE_THRESHOLD and _ALERT_WEBHOOK:
        payload = {
            "text": (
                f":fire: *USD CPI Surprise* — latest {latest_date}: *{latest_val:.1f}%* "
                f"vs prior {prior_date}: {prior_val:.1f}% "
                f"(surprise: *{surprise:+.2f} pp*)"
            )
        }
        http.post(_ALERT_WEBHOOK, json=payload, timeout=8)
        context.log.warning("CPI surprise alert fired: %+.2f pp", surprise)

    return SensorResult(cursor=str(latest_date))

सेंसर में पंजीकृत करें __init__.py इसे आयात करके और इसे Definitions sensors सूची:

from .sensors import cpi_surprise_alert_sensor

defs = Definitions(
    # ... existing fields ...
    sensors=[cpi_surprise_alert_sensor],
)

चरण 7

चरण 7 डैगस्टर यूआई लॉन्च करें और अपना पहला कार्यान्वयन चलाएं

स्थानीय विकास सर्वर शुरू करें. Definitions के माध्यम से pyproject.toml प्रवेश बिंदु:

dagster dev

नेविगेट करें http://localhost:3000तुम देखोगे परिसंपत्ति ग्राफ सभी पांच परिसंपत्तियों को उनके तार्किक समूहों में समूहीकृत करके दिखाता है (macro_indicators, fx_rates, calendarक्लिक करें सब कुछ भौतिक बनाओ सेकंड के भीतर पहला सेवन पास चलाने के लिए, संपत्ति फलक पंक्ति गिनती, नवीनतम तिथियों, और नवीनतम मूल्यों सीधे FXMacroData एपीआई से खींचा दिखाएगा।

SQLite में डेटा सत्यापित करें

sqlite3 fxmacro_data.db
sqlite> SELECT date, val FROM usd_policy_rate ORDER BY date DESC LIMIT 5;
sqlite> SELECT date, val FROM usd_cpi ORDER BY date DESC LIMIT 5;
sqlite> SELECT date, val FROM eurusd_spot ORDER BY date DESC LIMIT 3;

कार्यक्रम को सक्रिय करने के लिए, नेविगेट करें अवलोकन → कार्यक्रम डैगस्टर यूआई में और टॉगल करें macro_daily_at_0630_utc दौड़ना. डैगस्टर की डेमॉन प्रक्रिया अब हर कार्यदिवस 06:30 UTC पर जाग जाएगी और मैक्रो संकेतक संपत्ति को स्वचालित रूप से वास्तविकता में लाएगी.


चरण 8: परीक्षण

चरण 8 एक नकली संसाधन के साथ इकाई परीक्षण लिखें

डैगस्टर के मुख्य डिजाइन लाभों में से एक यह है कि संपत्ति साधारण पायथन फ़ंक्शन हैं पूरे ऑर्केस्ट्रेशन स्टैक को चलाए बिना यूनिट-टेस्ट करना आसान है। परीक्षणों को तेज़ और ऑफ़लाइन रखने के लिए लाइव संसाधन को मॉक के लिए स्वैप करें।

# tests/test_assets.py
import sqlite3
import tempfile
import os

from unittest.mock import MagicMock

import pytest
from dagster import build_asset_context, materialize

from fxmacro_pipeline.assets import usd_policy_rate, usd_cpi
from fxmacro_pipeline.resources import FXMacroDataResource


MOCK_RATE_DATA = [
    {"date": "2025-12-18", "val": 4.5, "announcement_datetime": "2025-12-18T19:00:00Z"},
    {"date": "2025-11-07", "val": 4.75, "announcement_datetime": "2025-11-07T19:00:00Z"},
]

MOCK_CPI_DATA = [
    {"date": "2025-12-11", "val": 2.7, "announcement_datetime": "2025-12-11T13:30:00Z"},
    {"date": "2025-11-13", "val": 2.6, "announcement_datetime": "2025-11-13T13:30:00Z"},
]


class MockFXMacroData(FXMacroDataResource):
    def get_announcements(self, currency, indicator, start="2020-01-01"):
        if indicator == "policy_rate":
            return MOCK_RATE_DATA
        if indicator == "inflation":
            return MOCK_CPI_DATA
        return []

    def get_forex(self, base, quote, start="2023-01-01"):
        return [{"date": "2025-12-31", "val": 1.0482, "announcement_datetime": None}]

    def get_calendar(self, currency):
        return []


@pytest.fixture(autouse=True)
def temp_db(tmp_path, monkeypatch):
    monkeypatch.setenv("FXMACRO_DB", str(tmp_path / "test.db"))


def test_usd_policy_rate_materialises():
    result = materialize(
        [usd_policy_rate],
        resources={"fxmacrodata": MockFXMacroData()},
    )
    assert result.success
    mat = result.asset_materializations_for_node("usd_policy_rate")[0]
    assert mat.metadata["row_count"].value == 2
    assert mat.metadata["latest_val"].value == pytest.approx(4.5)


def test_usd_cpi_materialises():
    result = materialize(
        [usd_cpi],
        resources={"fxmacrodata": MockFXMacroData()},
    )
    assert result.success
    mat = result.asset_materializations_for_node("usd_cpi")[0]
    assert mat.metadata["latest_val"].value == pytest.approx(2.7)
pip install pytest
pytest tests/ -v

चरण 9: उत्पादन

चरण 9 डैगस्टर+ के साथ उत्पादन में तैनात करें

टीमों के लिए जो अपने स्वयं के डेमॉन का प्रबंधन किए बिना एक होस्ट नियंत्रण विमान चाहते हैं, डैगस्टर+ (प्रबंधित क्लाउड की पेशकश) केवल एक अतिरिक्त कॉन्फ़िगरेशन चरण की आवश्यकता हैः प्रतिस्थापित करें dagster dev डैगस्टर क्लाउड एजेंट के साथ, जो एक ही पढ़ता है Definitions वस्तु और अपने बुनियादी ढांचे में चलाता है निष्पादित करता है.

अपनी एपीआई कुंजी को एक डैगस्टर+ पर्यावरण चर के रूप में सेट करें (तैनाती → पर्यावरण चर) के साथ नाम FXMACRO_API_KEY. संसाधन इसे स्वचालित रूप से उठाएगा कोई कोड परिवर्तन की आवश्यकता नहीं है. एक ही पैटर्न किसी भी CI/CD वातावरण पर लागू होता है जो पर्यावरण चर (GitHub क्रियाओं, GitLab CI, CircleCI) के रूप में रहस्य इंजेक्ट करता है.

एक स्व-होस्टेड Kubernetes क्लस्टर पर तैनात करने के लिए, एक डॉकर छवि के रूप में पाइपलाइन पैकेज, इसे अपने रजिस्ट्री में धक्का, और उस पर डैगस्टर हेलम चार्ट इंगित करें। FXMacroData संसाधन आउटबाउंड से कनेक्ट होता है fxmacrodata.com सुनिश्चित करें कि आपके समूह की निकास नीति उस होस्ट के लिए HTTPS की अनुमति देती है.


संक्षेप में

आपने जो बनाया

इस गाइड का पालन करके अब आपके पास एक कामकाजी डैगस्टर पाइपलाइन है जोः

  • पुनः प्रयोज्य, इंजेक्शन योग्य FXMacroDataResource जो एक ही स्थान पर सभी एपीआई सत्यापन और त्रुटि हैंडलिंग संभालता है
  • पांच सॉफ्टवेयर-परिभाषित परिसंपत्तियों को उजागर करता है फेड दर, यूएस सीपीआई, ईसीबी दर, EUR/USD स्पॉट और USD रिलीज़ कैलेंडर प्रत्येक पूर्ण डैगस्टर मेटाडेटा के साथ SQLite के लिए दृढ़ता से
  • एक क्रोन-संचालित डैगस्टर कार्यक्रम के माध्यम से 06:30 UTC पर कार्यदिवस ताज़ा करने की योजना बनाता है ताकि यूरोपीय बाजारों के खुलने से पहले आपके पास हमेशा अद्यतित रीडिंग हो
  • जब सीपीआई प्रिंट अपसाइड में आश्चर्यचकित होता है तो वेबहूक अलर्ट चलाता है, जिससे डाउनस्ट्रीम सिस्टम को पोलिंग लूप के बजाय इवेंट-ड्राइव ट्रिगर मिलता है
  • एक नकली संसाधन का उपयोग कर तेजी से ऑफ़लाइन इकाई परीक्षण शामिल है, तो आईसी पाइपलाइन कभी भी लाइव एपीआई कॉल नहीं करता है

अगला कदम

  • अतिरिक्त मुद्राओं के साथ पाइपलाइन का विस्तार करें पूर्ण सूचक सूची को देखें /api-data-docs और उसी पैटर्न के अनुसार नई संपत्ति जोड़ें
  • उत्पादन पैमाने के लिए एक Postgres या BigQuery I/O प्रबंधक के साथ SQLite भंडारण की जगह
  • एक डाउनस्ट्रीम संपत्ति जो डीबी से पढ़ता है जोड़ें, एक मैक्रो विचलन स्कोर की गणना, और एक संकेत तालिका के लिए व्यापार संकेत लिखता है अपनी रणनीति तर्क निगल परत से अलग रखने
  • खोजें सीओटी पोजिशनिंग डेटा संकेतक श्रृंखला के पूरक के रूप में एक भावना ओवरले के रूप

Blogroll

AI Answer-Ready

Key Facts

Page
How To Integrate FXmacrodata With Dagster
Section
Articles
Canonical URL
https://fxmacrodata.com/articles/how-to-integrate-fxmacrodata-with-dagster
Source
FXMacroData editorial and official publisher references
Last Updated
2026-06-15 11:06 UTC

Provenance And Trust

Cite the canonical URL and source field above. Where available, this page maps to official publisher releases and timestamped updates.

Quick Q&A

What is this page about? This page explains How To Integrate FXmacrodata With Dagster with directly usable context for trading, research, and API workflows.

What source should be cited? Use the canonical URL and the listed source field; cite official publisher references when available.

How fresh is this content? The last updated value above reflects the page metadata or latest available data timestamp.

Can this be used in AI assistants? Yes. This section is intentionally structured for retrieval and citation in chat assistants.

Prompt Packs

Use these in ChatGPT, Claude, Gemini, Mistral, Perplexity, or Grok for consistent source-aware outputs.

Share page X LinkedIn Email