डैगस्टर एक पायथन मूल डेटा ऑर्केस्ट्रेशन प्लेटफॉर्म है जो की अवधारणा के आसपास बनाया गया है सॉफ्टवेयर-परिभाषित संपत्ति: डेटा कलाकृतियां जो अपनी वंशावली जानते हैं, उन्हें मांग पर या एक कार्यक्रम पर भौतिक बनाया जा सकता है, और बॉक्स से बाहर समृद्ध मेटाडेटा और अवलोकनशीलता को उजागर कर सकते हैं। मैक्रो डेटा वर्कफ़्लो के लिए सूचक श्रृंखला खींचना, रिलीज घटनाओं का पता लगाना, स्नैपशॉट संग्रहीत करना, और विसंगतियों पर चेतावनी देना डैगस्टर एक उत्कृष्ट फिट है। यह गाइड एक पाइपलाइन के निर्माण के माध्यम से चलता है जो स्थानीय एसक्यूएलइट स्टोर में एफएक्समैक्रोडाटा संकेतकों को निगलता है, आगामी रिलीज कैलेंडर घटनाओं को सतह देता है, तथा दैनिक कार्यक्रम पर स्वचालित रूप से चलता हैं।
आप क्या बनाएंगे
- एक FXMacroData डैगस्टर संसाधन एक पुनः प्रयोज्य, विन्यास योग्य एपीआई क्लाइंट सभी परिसंपत्तियों में साझा किया गया
- चार सॉफ्टवेयर-परिभाषित संपत्ति नीतिगत दरें, सीपीआई, विदेशी मुद्रा स्पॉट और चयनित मुद्रा जोड़ी के लिए रिलीज़ कैलेंडर
- रोजमर्रा की नौकरी और कार्यक्रम नवीनतम रीडिंग को ताज़ा करने के लिए लंदन के उद्घाटन से पहले हर सप्ताह की सुबह चलता है
- विसंगति सेंसर अप्रत्याशित मुद्रास्फीति के लिए घड़ी प्रिंट और एक वेबहुक के माध्यम से एक अलर्ट फायर करता है
पूर्व शर्तें
- पायथन 3.10+ सभी स्निपेट्स आधुनिक टाइप संकेतों का उपयोग करते हैं और
matchबयान - FXMacroData एपीआई कुंजी पर साइन अप करें /अपना नाम लिखें; अमरीकी डालर सूचक डेटा सार्वजनिक रूप से कुंजी के बिना सुलभ है
- मूल डैगस्टर परिचितता आपको पता होना चाहिए कि संपत्ति और नौकरी क्या हैं; डैगस्टर त्वरित प्रारंभ दस मिनट में मूल बातें बताता है
चरण 1
चरण 1 डैगस्टर और परियोजना निर्भरताएँ स्थापित करें
एक नया आभासी वातावरण बनाएँ और इस पाइपलाइन का उपयोग करता है पुस्तकालयों के छोटे सेट के साथ Dagster स्थापित करें. dagster और dagster-webserver एक ही संस्करण के लिए सबसे आम स्थापना जाल से बचता है।
python -m venv .venv
source .venv/bin/activate # Windows: .venv\Scripts\activate
pip install "dagster==1.9.*" "dagster-webserver==1.9.*" \
requests pandas sqlalchemy
एक न्यूनतम परियोजना लेआउट आरंभ करें. डैगस्टर के मचान आदेश निर्देशिका संरचना बनाता है और एक
pyproject.toml जो स्थानीय यूआई को आपकी परिभाषाओं को स्वचालित रूप से पता लगाने देता है।
dagster project scaffold --name fxmacro_pipeline
cd fxmacro_pipeline
अपनी एपीआई कुंजी को पर्यावरण चर के रूप में स्टोर करें. स्रोत फ़ाइलों में हार्ड-कोड क्रेडेंशियल्स कभी नहीं.
export FXMACRO_API_KEY="YOUR_API_KEY"
उत्पादन मेजबानों पर, अपने शेड्यूलर के गुप्त प्रबंधन (GitHub क्रिया रहस्य, Kubernetes रहस्य, Dagster क्लाउड पर्यावरण चर) के माध्यम से कुंजी इंजेक्ट करें। पाइपलाइन इसे रनटाइम पर पर्यावरण से पढ़ती है कोड में कोई बदलाव की आवश्यकता नहीं है।
- चरण 2
चरण 2 एक पुनः प्रयोज्य FXMacroData संसाधन को परिभाषित करें
एक डैगस्टर संसाधन एक साझा, इंजेक्शन निर्भरता है एक डेटाबेस कनेक्शन या HTTP सत्र के अनुरूप। एक संसाधन में FXMacroData REST एपीआई को लपेटने का मतलब है कि प्रत्येक संपत्ति प्रमाणीकरण तर्क या टाइमआउट हैंडलिंग को दोहराए बिना इसे कॉल कर सकती है।
सृजन करना fxmacro_pipeline/resources.py
import os
from typing import Any
import requests
from dagster import ConfigurableResource, get_dagster_logger
_BASE_URL = "https://fxmacrodata.com/api/v1"
class FXMacroDataResource(ConfigurableResource):
"""Thin wrapper around the FXMacroData REST API."""
api_key: str = "" # overridden at run-time from env
timeout: int = 15
def _get(self, path: str, **params: Any) -> dict:
logger = get_dagster_logger()
key = self.api_key or os.environ.get("FXMACRO_API_KEY", "")
resp = requests.get(
f"{_BASE_URL}{path}",
params={"api_key": key, **params},
timeout=self.timeout,
)
resp.raise_for_status()
data = resp.json()
logger.debug("GET %s → %d records", path, len(data.get("data", [])))
return data
def get_announcements(
self,
currency: str,
indicator: str,
start: str = "2020-01-01",
) -> list[dict]:
"""Return time-series records for a macro indicator."""
result = self._get(
f"/announcements/{currency.lower()}/{indicator}",
start=start,
)
return result.get("data", [])
def get_forex(self, base: str, quote: str, start: str = "2023-01-01") -> list[dict]:
"""Return daily FX spot-rate records."""
result = self._get(f"/forex/{base.lower()}/{quote.lower()}", start=start)
return result.get("data", [])
def get_calendar(self, currency: str) -> list[dict]:
"""Return the upcoming release calendar for a currency."""
result = self._get(f"/calendar/{currency.lower()}")
return result.get("data", [])
कॉन्फ़िगर करने योग्य संसाधन क्यों?
ConfigurableResource आप संसाधन एक बार में विन्यस्त करता है Definitions और Dagster इसे इंजेक्ट करता है जहां भी यह एक पैरामीटर के रूप में घोषित किया जाता है।
चरण 3
चरण 3 मैक्रो डेटा परिसंपत्तियों को परिभाषित करें
प्रत्येक डैगस्टर संपत्ति एक तार्किक डेटा उत्पाद का प्रतिनिधित्व करती है। एक संपत्ति को भौतिक बनाना FXMacroData से नवीनतम डेटा प्राप्त करता है और इसे स्थानीय रूप से बनाए रखता है। हम चार संपत्ति को परिभाषित करते हैंः फेड नीतिगत दर, को अमेरिकी सीपीआई, को ईसीबी नीतिगत दर, और EUR/USD स्पॉट श्रृंखला। फिर एक पांचवीं संपत्ति रिलीज़ कैलेंडर पढ़ती है ताकि पाइपलाइन को पता चले कि अगली उच्च प्रभाव वाली घोषणाएं कब होने वाली हैं।
सृजन करना fxmacro_pipeline/assets.py
import os
import sqlite3
from datetime import date, timedelta
import pandas as pd
from dagster import (
AssetExecutionContext,
MaterializeResult,
MetadataValue,
asset,
)
from .resources import FXMacroDataResource
_DB_PATH = os.environ.get("FXMACRO_DB", "fxmacro_data.db")
def _ensure_table(conn: sqlite3.Connection, table: str) -> None:
conn.execute(
f"""
CREATE TABLE IF NOT EXISTS {table} (
date TEXT PRIMARY KEY,
val REAL,
announcement_datetime TEXT
)
"""
)
conn.commit()
def _upsert(conn: sqlite3.Connection, table: str, records: list[dict]) -> int:
rows = [
(r["date"], r.get("val"), r.get("announcement_datetime"))
for r in records
if r.get("date") and r.get("val") is not None
]
conn.executemany(
f"INSERT OR REPLACE INTO {table} (date, val, announcement_datetime) VALUES (?,?,?)",
rows,
)
conn.commit()
return len(rows)
@asset(group_name="macro_indicators", compute_kind="api")
def usd_policy_rate(
context: AssetExecutionContext,
fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
"""Latest Fed funds rate series from the FXMacroData announcements endpoint."""
records = fxmacrodata.get_announcements("usd", "policy_rate", start="2015-01-01")
with sqlite3.connect(_DB_PATH) as conn:
_ensure_table(conn, "usd_policy_rate")
n = _upsert(conn, "usd_policy_rate", records)
latest = records[0] if records else {}
context.log.info("Fed rate: %s%% (as of %s)", latest.get("val"), latest.get("date"))
return MaterializeResult(
metadata={
"row_count": MetadataValue.int(n),
"latest_date": MetadataValue.text(latest.get("date", "n/a")),
"latest_val": MetadataValue.float(latest.get("val") or 0.0),
}
)
@asset(group_name="macro_indicators", compute_kind="api")
def usd_cpi(
context: AssetExecutionContext,
fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
"""US CPI (year-on-year) series. Tracks whether inflation is rising or falling."""
records = fxmacrodata.get_announcements("usd", "inflation", start="2015-01-01")
with sqlite3.connect(_DB_PATH) as conn:
_ensure_table(conn, "usd_cpi")
n = _upsert(conn, "usd_cpi", records)
latest = records[0] if records else {}
context.log.info("US CPI: %s%% (as of %s)", latest.get("val"), latest.get("date"))
return MaterializeResult(
metadata={
"row_count": MetadataValue.int(n),
"latest_date": MetadataValue.text(latest.get("date", "n/a")),
"latest_val": MetadataValue.float(latest.get("val") or 0.0),
}
)
@asset(group_name="macro_indicators", compute_kind="api")
def eur_policy_rate(
context: AssetExecutionContext,
fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
"""ECB main refinancing rate series."""
records = fxmacrodata.get_announcements("eur", "policy_rate", start="2015-01-01")
with sqlite3.connect(_DB_PATH) as conn:
_ensure_table(conn, "eur_policy_rate")
n = _upsert(conn, "eur_policy_rate", records)
latest = records[0] if records else {}
context.log.info("ECB rate: %s%% (as of %s)", latest.get("val"), latest.get("date"))
return MaterializeResult(
metadata={
"row_count": MetadataValue.int(n),
"latest_date": MetadataValue.text(latest.get("date", "n/a")),
"latest_val": MetadataValue.float(latest.get("val") or 0.0),
}
)
@asset(group_name="fx_rates", compute_kind="api")
def eurusd_spot(
context: AssetExecutionContext,
fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
"""Daily EUR/USD closing spot rate."""
records = fxmacrodata.get_forex("eur", "usd", start="2023-01-01")
with sqlite3.connect(_DB_PATH) as conn:
_ensure_table(conn, "eurusd_spot")
n = _upsert(conn, "eurusd_spot", records)
latest = records[0] if records else {}
context.log.info("EUR/USD: %s (as of %s)", latest.get("val"), latest.get("date"))
return MaterializeResult(
metadata={
"row_count": MetadataValue.int(n),
"latest_date": MetadataValue.text(latest.get("date", "n/a")),
"latest_val": MetadataValue.float(latest.get("val") or 0.0),
}
)
@asset(group_name="calendar", compute_kind="api")
def usd_release_calendar(
context: AssetExecutionContext,
fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
"""Upcoming USD macro release dates and times."""
events = fxmacrodata.get_calendar("usd")
df = pd.DataFrame(events) if events else pd.DataFrame()
context.log.info("Found %d upcoming USD events", len(df))
# Surface a preview in the Dagster UI asset metadata pane
preview = df.head(5).to_markdown(index=False) if not df.empty else "No events found"
return MaterializeResult(
metadata={
"event_count": MetadataValue.int(len(df)),
"preview": MetadataValue.md(preview),
}
)
डैगस्टर यूआई में परिसंपत्ति मेटाडेटा
हर MaterializeResult संरचनात्मक मेटाडेटा देता है जो कि Dagster संपत्ति विवरण फलक में प्रस्तुत करता है पंक्ति गणना, नवीनतम तिथियां, पूर्वावलोकन तालिकाएं। इसका मतलब है कि आप एक नज़र में जांच सकते हैं कि क्या एक भौतिककरण ने वास्तव में लॉग में खोदने के बिना ताजा डेटा प्राप्त किया है।
चरण 4
चरण 4 एक नौकरी और एक दैनिक कार्यक्रम बनाएं
एक डैगस्टर काम एक बार में कौन सी संपत्ति का निर्माण किया जाना है, इसका चयन करता है। अनुसूची हम ताजा मैक्रो रीडिंग हर कार्यदिवस सुबह 06:30 UTC फ्रैंकफर्ट खोलने से पहले चाहते हैं तो किसी भी संकेतक रात भर जारी किया यूरोपीय व्यापार शुरू होने से पहले कब्जा कर लिया है।
सृजन करना fxmacro_pipeline/jobs.py
from dagster import (
AssetSelection,
ScheduleDefinition,
define_asset_job,
)
# ── Jobs ──────────────────────────────────────────────────────────────────────
macro_refresh_job = define_asset_job(
name="macro_refresh",
description="Refresh all FXMacroData indicator and FX rate assets.",
selection=AssetSelection.groups("macro_indicators", "fx_rates"),
)
calendar_refresh_job = define_asset_job(
name="calendar_refresh",
description="Pull the latest USD release calendar.",
selection=AssetSelection.groups("calendar"),
)
# ── Schedules ─────────────────────────────────────────────────────────────────
# Weekdays at 06:30 UTC — 30 min before Frankfurt opens
macro_daily_schedule = ScheduleDefinition(
name="macro_daily_at_0630_utc",
job=macro_refresh_job,
cron_schedule="30 6 * * 1-5",
execution_timezone="UTC",
)
# Refresh the release calendar every Sunday evening so the week's
# upcoming events are loaded before Monday's first run.
calendar_weekly_schedule = ScheduleDefinition(
name="calendar_weekly_on_sunday",
job=calendar_refresh_job,
cron_schedule="0 18 * * 0",
execution_timezone="UTC",
)
दो अलग-अलग कार्य एक सूचक श्रृंखला के लिए, एक रिलीज कैलेंडर के लिए आप स्वतंत्र रूप से उन्हें अवलोकन और फिर से चलाने के लिए अनुमति देते हैं। यदि रिलीज कैलेण्डर फ़ॉच विफल हो जाता है (उदाहरण के लिए एक सार्वजनिक अवकाश के दौरान जब एंडपॉइंट खाली पेलोड वापस करता है), तो सूचक ताज़ा कार्य प्रभावित नहीं होता है।
चरण 5
चरण 5 सब कुछ परिभाषाओं वस्तु में तार
डैगस्टर का Definitions वस्तु एक एकल प्रवेश बिंदु है जो संपत्ति, संसाधन, नौकरियों और कार्यक्रमों को एक साथ जोड़ता है। fxmacro_pipeline/__init__.py
import os
from dagster import Definitions
from .assets import (
eur_policy_rate,
eurusd_spot,
usd_cpi,
usd_policy_rate,
usd_release_calendar,
)
from .jobs import (
calendar_refresh_job,
calendar_weekly_schedule,
macro_daily_schedule,
macro_refresh_job,
)
from .resources import FXMacroDataResource
defs = Definitions(
assets=[
usd_policy_rate,
usd_cpi,
eur_policy_rate,
eurusd_spot,
usd_release_calendar,
],
resources={
"fxmacrodata": FXMacroDataResource(
# Falls back to the FXMACRO_API_KEY env var inside the resource
api_key=os.environ.get("FXMACRO_API_KEY", ""),
),
},
jobs=[macro_refresh_job, calendar_refresh_job],
schedules=[macro_daily_schedule, calendar_weekly_schedule],
)
रनटाइम पर संसाधन इंजेक्शन
किसी भी संपत्ति जो घोषणा करता है fxmacrodata: FXMacroDataResource एक पैरामीटर के रूप में स्वचालित रूप से रन-टाइम पर विन्यस्त उदाहरण प्राप्त करता है. परीक्षण के लिए एक नकली में स्वैप करने के लिए, अपने परीक्षण में संसाधन को ओवरराइट करें Definitions परिसंपत्ति कोड में कोई परिवर्तन नहीं होता है।
चरण 6
चरण 6 आश्चर्यजनक मुद्रास्फीति के निशान पर चेतावनी देने के लिए एक सेंसर जोड़ें
सेंसर में Dagster सर्वेक्षण बाहरी परिस्थितियों के लिए और ट्रिगर चलता है या चेतावनी चैनलों जब कुछ बदल जाता है। निम्नलिखित सेंसर हर बाद चलता है usd_cpi वास्तविकताः यदि नवीनतम सीपीआई रीडिंग पिछले रीडिंग से 0.4 प्रतिशत अंक से अधिक है, तो यह एक वेबहुक अलर्ट चलाता है ताकि आपकी रणनीति या जोखिम प्रणाली तुरंत प्रतिक्रिया कर सके।
जोड़ें fxmacro_pipeline/sensors.py
import os
import sqlite3
import requests as http
from dagster import (
RunStatusSensorContext,
asset_sensor,
AssetKey,
EventLogEntry,
SensorResult,
SkipReason,
)
_DB_PATH = os.environ.get("FXMACRO_DB", "fxmacro_data.db")
_ALERT_WEBHOOK = os.environ.get("ALERT_WEBHOOK_URL", "")
_SURPRISE_THRESHOLD = 0.4 # percentage points
@asset_sensor(asset_key=AssetKey("usd_cpi"), job_name="macro_refresh")
def cpi_surprise_alert_sensor(
context: RunStatusSensorContext,
asset_event: EventLogEntry,
) -> SensorResult | SkipReason:
"""Alert when the latest US CPI print is significantly above the prior reading."""
try:
conn = sqlite3.connect(_DB_PATH)
rows = conn.execute(
"SELECT date, val FROM usd_cpi ORDER BY date DESC LIMIT 2"
).fetchall()
conn.close()
except Exception as exc:
return SkipReason(f"DB read failed: {exc}")
if len(rows) < 2:
return SkipReason("Not enough CPI history yet.")
latest_date, latest_val = rows[0]
prior_date, prior_val = rows[1]
surprise = (latest_val or 0) - (prior_val or 0)
context.log.info(
"CPI %s: %.2f%% (prior %s: %.2f%%, surprise: %+.2f pp)",
latest_date, latest_val, prior_date, prior_val, surprise,
)
if surprise >= _SURPRISE_THRESHOLD and _ALERT_WEBHOOK:
payload = {
"text": (
f":fire: *USD CPI Surprise* — latest {latest_date}: *{latest_val:.1f}%* "
f"vs prior {prior_date}: {prior_val:.1f}% "
f"(surprise: *{surprise:+.2f} pp*)"
)
}
http.post(_ALERT_WEBHOOK, json=payload, timeout=8)
context.log.warning("CPI surprise alert fired: %+.2f pp", surprise)
return SensorResult(cursor=str(latest_date))
सेंसर में पंजीकृत करें __init__.py इसे आयात करके और इसे Definitions
sensors सूची:
from .sensors import cpi_surprise_alert_sensor
defs = Definitions(
# ... existing fields ...
sensors=[cpi_surprise_alert_sensor],
)
चरण 7
चरण 7 डैगस्टर यूआई लॉन्च करें और अपना पहला कार्यान्वयन चलाएं
स्थानीय विकास सर्वर शुरू करें. Definitions के माध्यम से
pyproject.toml प्रवेश बिंदु:
dagster dev
नेविगेट करें http://localhost:3000तुम देखोगे परिसंपत्ति ग्राफ सभी पांच परिसंपत्तियों को उनके तार्किक समूहों में समूहीकृत करके दिखाता है (macro_indicators, fx_rates,
calendarक्लिक करें सब कुछ भौतिक बनाओ सेकंड के भीतर पहला सेवन पास चलाने के लिए, संपत्ति फलक पंक्ति गिनती, नवीनतम तिथियों, और नवीनतम मूल्यों सीधे FXMacroData एपीआई से खींचा दिखाएगा।
SQLite में डेटा सत्यापित करें
sqlite3 fxmacro_data.db
sqlite> SELECT date, val FROM usd_policy_rate ORDER BY date DESC LIMIT 5;
sqlite> SELECT date, val FROM usd_cpi ORDER BY date DESC LIMIT 5;
sqlite> SELECT date, val FROM eurusd_spot ORDER BY date DESC LIMIT 3;
कार्यक्रम को सक्रिय करने के लिए, नेविगेट करें अवलोकन → कार्यक्रम डैगस्टर यूआई में और टॉगल करें
macro_daily_at_0630_utc दौड़ना. डैगस्टर की डेमॉन प्रक्रिया अब हर कार्यदिवस 06:30 UTC पर जाग जाएगी और मैक्रो संकेतक संपत्ति को स्वचालित रूप से वास्तविकता में लाएगी.
चरण 8: परीक्षण
चरण 8 एक नकली संसाधन के साथ इकाई परीक्षण लिखें
डैगस्टर के मुख्य डिजाइन लाभों में से एक यह है कि संपत्ति साधारण पायथन फ़ंक्शन हैं पूरे ऑर्केस्ट्रेशन स्टैक को चलाए बिना यूनिट-टेस्ट करना आसान है। परीक्षणों को तेज़ और ऑफ़लाइन रखने के लिए लाइव संसाधन को मॉक के लिए स्वैप करें।
# tests/test_assets.py
import sqlite3
import tempfile
import os
from unittest.mock import MagicMock
import pytest
from dagster import build_asset_context, materialize
from fxmacro_pipeline.assets import usd_policy_rate, usd_cpi
from fxmacro_pipeline.resources import FXMacroDataResource
MOCK_RATE_DATA = [
{"date": "2025-12-18", "val": 4.5, "announcement_datetime": "2025-12-18T19:00:00Z"},
{"date": "2025-11-07", "val": 4.75, "announcement_datetime": "2025-11-07T19:00:00Z"},
]
MOCK_CPI_DATA = [
{"date": "2025-12-11", "val": 2.7, "announcement_datetime": "2025-12-11T13:30:00Z"},
{"date": "2025-11-13", "val": 2.6, "announcement_datetime": "2025-11-13T13:30:00Z"},
]
class MockFXMacroData(FXMacroDataResource):
def get_announcements(self, currency, indicator, start="2020-01-01"):
if indicator == "policy_rate":
return MOCK_RATE_DATA
if indicator == "inflation":
return MOCK_CPI_DATA
return []
def get_forex(self, base, quote, start="2023-01-01"):
return [{"date": "2025-12-31", "val": 1.0482, "announcement_datetime": None}]
def get_calendar(self, currency):
return []
@pytest.fixture(autouse=True)
def temp_db(tmp_path, monkeypatch):
monkeypatch.setenv("FXMACRO_DB", str(tmp_path / "test.db"))
def test_usd_policy_rate_materialises():
result = materialize(
[usd_policy_rate],
resources={"fxmacrodata": MockFXMacroData()},
)
assert result.success
mat = result.asset_materializations_for_node("usd_policy_rate")[0]
assert mat.metadata["row_count"].value == 2
assert mat.metadata["latest_val"].value == pytest.approx(4.5)
def test_usd_cpi_materialises():
result = materialize(
[usd_cpi],
resources={"fxmacrodata": MockFXMacroData()},
)
assert result.success
mat = result.asset_materializations_for_node("usd_cpi")[0]
assert mat.metadata["latest_val"].value == pytest.approx(2.7)
pip install pytest
pytest tests/ -v
चरण 9: उत्पादन
चरण 9 डैगस्टर+ के साथ उत्पादन में तैनात करें
टीमों के लिए जो अपने स्वयं के डेमॉन का प्रबंधन किए बिना एक होस्ट नियंत्रण विमान चाहते हैं, डैगस्टर+
(प्रबंधित क्लाउड की पेशकश) केवल एक अतिरिक्त कॉन्फ़िगरेशन चरण की आवश्यकता हैः प्रतिस्थापित करें dagster dev
डैगस्टर क्लाउड एजेंट के साथ, जो एक ही पढ़ता है Definitions वस्तु और अपने बुनियादी ढांचे में चलाता है निष्पादित करता है.
अपनी एपीआई कुंजी को एक डैगस्टर+ पर्यावरण चर के रूप में सेट करें (तैनाती → पर्यावरण चर) के साथ नाम FXMACRO_API_KEY. संसाधन इसे स्वचालित रूप से उठाएगा कोई कोड परिवर्तन की आवश्यकता नहीं है. एक ही पैटर्न किसी भी CI/CD वातावरण पर लागू होता है जो पर्यावरण चर (GitHub क्रियाओं, GitLab CI, CircleCI) के रूप में रहस्य इंजेक्ट करता है.
एक स्व-होस्टेड Kubernetes क्लस्टर पर तैनात करने के लिए, एक डॉकर छवि के रूप में पाइपलाइन पैकेज, इसे अपने रजिस्ट्री में धक्का, और उस पर डैगस्टर हेलम चार्ट इंगित करें। FXMacroData संसाधन आउटबाउंड से कनेक्ट होता है
fxmacrodata.com सुनिश्चित करें कि आपके समूह की निकास नीति उस होस्ट के लिए HTTPS की अनुमति देती है.
संक्षेप में
आपने जो बनाया
इस गाइड का पालन करके अब आपके पास एक कामकाजी डैगस्टर पाइपलाइन है जोः
- पुनः प्रयोज्य, इंजेक्शन योग्य
FXMacroDataResourceजो एक ही स्थान पर सभी एपीआई सत्यापन और त्रुटि हैंडलिंग संभालता है - पांच सॉफ्टवेयर-परिभाषित परिसंपत्तियों को उजागर करता है फेड दर, यूएस सीपीआई, ईसीबी दर, EUR/USD स्पॉट और USD रिलीज़ कैलेंडर प्रत्येक पूर्ण डैगस्टर मेटाडेटा के साथ SQLite के लिए दृढ़ता से
- एक क्रोन-संचालित डैगस्टर कार्यक्रम के माध्यम से 06:30 UTC पर कार्यदिवस ताज़ा करने की योजना बनाता है ताकि यूरोपीय बाजारों के खुलने से पहले आपके पास हमेशा अद्यतित रीडिंग हो
- जब सीपीआई प्रिंट अपसाइड में आश्चर्यचकित होता है तो वेबहूक अलर्ट चलाता है, जिससे डाउनस्ट्रीम सिस्टम को पोलिंग लूप के बजाय इवेंट-ड्राइव ट्रिगर मिलता है
- एक नकली संसाधन का उपयोग कर तेजी से ऑफ़लाइन इकाई परीक्षण शामिल है, तो आईसी पाइपलाइन कभी भी लाइव एपीआई कॉल नहीं करता है
अगला कदम
- अतिरिक्त मुद्राओं के साथ पाइपलाइन का विस्तार करें पूर्ण सूचक सूची को देखें /api-data-docs और उसी पैटर्न के अनुसार नई संपत्ति जोड़ें
- उत्पादन पैमाने के लिए एक Postgres या BigQuery I/O प्रबंधक के साथ SQLite भंडारण की जगह
- एक डाउनस्ट्रीम संपत्ति जो डीबी से पढ़ता है जोड़ें, एक मैक्रो विचलन स्कोर की गणना, और एक संकेत तालिका के लिए व्यापार संकेत लिखता है अपनी रणनीति तर्क निगल परत से अलग रखने
- खोजें सीओटी पोजिशनिंग डेटा संकेतक श्रृंखला के पूरक के रूप में एक भावना ओवरले के रूप