Live release feed
Sub-second macro releases for FX backtests
Point-in-time history
Official CPI, jobs, GDP, and central-bank events with point-in-time history.
$25/month 14-day free trial
Start Free Trial
Bagaimana Mengintegrasikan FXMacroData dengan Dagster image
Share headline card X LinkedIn Email
Download

Implementation

How-To Guides

Bagaimana Mengintegrasikan FXMacroData dengan Dagster

Membangun pipa data Dagster siap produksi yang menarik indikator makro dari FXMacroData pada jadwal, menyimpan hasil dalam database lokal, dan permukaan rilis acara kalender untuk waktu eksekusi yang lebih cerdas.

Juga tersedia dalam English
Share article X LinkedIn Email

Dagster adalah platform orkestrasi data asli Python yang dibangun di sekitar konsep aset software-defined: artefak data yang tahu garis keturunan mereka sendiri, dapat dimaterialisasi sesuai permintaan atau jadwal, dan mengekspos metadata kaya dan pengamatan keluar dari kotak. Untuk alur kerja data makro menarik seri indikator, mendeteksi peristiwa rilis, menyimpan snapshot, dan memperingatkan tentang anomali Dagster sangat cocok. Panduan ini berjalan melalui membangun pipa yang menelan indikator FXMacroData ke dalam toko SQLite lokal, permukaan acara kalender rilis mendatang, dan berjalan secara otomatis pada jadwal harian.

Apa yang akan Anda bangun

  • Sumber FXMacroData Dagster klien API yang dapat digunakan kembali dan dapat dikonfigurasi yang dibagikan di semua aset
  • Empat aset software-defined suku bunga kebijakan, CPI, forex spot, dan kalender rilis untuk pasangan mata uang yang dipilih
  • Pekerjaan dan jadwal harian berjalan setiap hari kerja pagi sebelum London terbuka untuk menyegarkan pembacaan terbaru
  • Sensor anomali menatap inflasi tak terduga dan menembakkan peringatan melalui webhook

Persyaratan

  • Python 3.10+ semua cuplikan menggunakan tip tip modern dan match pernyataan
  • Kunci API FXMacroData daftar di /langganan; Data indikator USD dapat diakses publik tanpa kunci
  • Keterampilan dasar Dagster Anda harus tahu apa aset dan pekerjaan; Dagster cepat memulai mencakup dasar-dasar dalam sepuluh menit

- Langkah 1 -

Langkah 1 Menginstal Dagster dan ketergantungan proyek

Buatlah lingkungan virtual baru dan install Dagster di samping kumpulan kecil dari perpustakaan pipa ini menggunakan. dagster Dan dagster-webserver Untuk versi yang sama menghindari perangkap instalasi yang paling umum.

python -m venv .venv
source .venv/bin/activate            # Windows: .venv\Scripts\activate

pip install "dagster==1.9.*" "dagster-webserver==1.9.*" \
            requests pandas sqlalchemy

Inisialisasi tata letak proyek minimal. perintah perancah Dagster menciptakan struktur direktori dan pyproject.toml yang memungkinkan UI lokal menemukan definisi Anda secara otomatis.

dagster project scaffold --name fxmacro_pipeline
cd fxmacro_pipeline

Simpan kunci API Anda sebagai variabel lingkungan. Jangan pernah hard-code kredensial dalam file sumber.

export FXMACRO_API_KEY="YOUR_API_KEY"

Pada host produksi, suntikkan kunci melalui manajemen rahasia penjadwal Anda (rahasia tindakan GitHub, rahasia Kubernetes, variabel lingkungan Dagster Cloud).


- Langkah 2 -

Langkah 2 Mendefinisikan sumber FXMacroData yang dapat digunakan kembali

Dagster. sumber daya adalah ketergantungan yang dapat diinjeksikan yang dapat dibagi mirip dengan koneksi database atau sesi HTTP. Membungkus FXMacroData REST API dalam sumber daya berarti setiap aset dapat memanggilnya tanpa menduplikasi logika otentikasi atau penanganan timeout.

Ciptakan fxmacro_pipeline/resources.pyAku tidak tahu.

import os
from typing import Any

import requests
from dagster import ConfigurableResource, get_dagster_logger

_BASE_URL = "https://fxmacrodata.com/api/v1"


class FXMacroDataResource(ConfigurableResource):
    """Thin wrapper around the FXMacroData REST API."""

    api_key: str = ""           # overridden at run-time from env
    timeout: int = 15

    def _get(self, path: str, **params: Any) -> dict:
        logger = get_dagster_logger()
        key = self.api_key or os.environ.get("FXMACRO_API_KEY", "")
        resp = requests.get(
            f"{_BASE_URL}{path}",
            params={"api_key": key, **params},
            timeout=self.timeout,
        )
        resp.raise_for_status()
        data = resp.json()
        logger.debug("GET %s → %d records", path, len(data.get("data", [])))
        return data

    def get_announcements(
        self,
        currency: str,
        indicator: str,
        start: str = "2020-01-01",
    ) -> list[dict]:
        """Return time-series records for a macro indicator."""
        result = self._get(
            f"/announcements/{currency.lower()}/{indicator}",
            start=start,
        )
        return result.get("data", [])

    def get_forex(self, base: str, quote: str, start: str = "2023-01-01") -> list[dict]:
        """Return daily FX spot-rate records."""
        result = self._get(f"/forex/{base.lower()}/{quote.lower()}", start=start)
        return result.get("data", [])

    def get_calendar(self, currency: str) -> list[dict]:
        """Return the upcoming release calendar for a currency."""
        result = self._get(f"/calendar/{currency.lower()}")
        return result.get("data", [])

Mengapa ConfigurableResource?

ConfigurableResource memungkinkan Anda untuk menukar kredensial dan waktu antara dev dan prod tanpa menyentuh kode aset. Anda mengkonfigurasi sumber daya sekali dalam Definitions dan Dagster menyuntikkannya di mana pun yang dinyatakan sebagai parameter.


- Langkah 3 -

Langkah 3 Mendefinisikan aset data makro

Setiap aset Dagster mewakili satu produk data logis. Mematerialisasi aset mengambil data terbaru dari FXMacroData dan bertahan secara lokal. Tingkat kebijakan Fed, yang US CPI, yang Nilai tukar mata uang, dan seri spot EUR/USD. Aset kelima kemudian membaca kalender rilis sehingga pipa tahu kapan pengumuman berdampak tinggi berikutnya.

Ciptakan fxmacro_pipeline/assets.pyAku tidak tahu.

import os
import sqlite3
from datetime import date, timedelta

import pandas as pd
from dagster import (
    AssetExecutionContext,
    MaterializeResult,
    MetadataValue,
    asset,
)

from .resources import FXMacroDataResource

_DB_PATH = os.environ.get("FXMACRO_DB", "fxmacro_data.db")


def _ensure_table(conn: sqlite3.Connection, table: str) -> None:
    conn.execute(
        f"""
        CREATE TABLE IF NOT EXISTS {table} (
            date TEXT PRIMARY KEY,
            val  REAL,
            announcement_datetime TEXT
        )
        """
    )
    conn.commit()


def _upsert(conn: sqlite3.Connection, table: str, records: list[dict]) -> int:
    rows = [
        (r["date"], r.get("val"), r.get("announcement_datetime"))
        for r in records
        if r.get("date") and r.get("val") is not None
    ]
    conn.executemany(
        f"INSERT OR REPLACE INTO {table} (date, val, announcement_datetime) VALUES (?,?,?)",
        rows,
    )
    conn.commit()
    return len(rows)


@asset(group_name="macro_indicators", compute_kind="api")
def usd_policy_rate(
    context: AssetExecutionContext,
    fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
    """Latest Fed funds rate series from the FXMacroData announcements endpoint."""
    records = fxmacrodata.get_announcements("usd", "policy_rate", start="2015-01-01")
    with sqlite3.connect(_DB_PATH) as conn:
        _ensure_table(conn, "usd_policy_rate")
        n = _upsert(conn, "usd_policy_rate", records)
    latest = records[0] if records else {}
    context.log.info("Fed rate: %s%% (as of %s)", latest.get("val"), latest.get("date"))
    return MaterializeResult(
        metadata={
            "row_count": MetadataValue.int(n),
            "latest_date": MetadataValue.text(latest.get("date", "n/a")),
            "latest_val": MetadataValue.float(latest.get("val") or 0.0),
        }
    )


@asset(group_name="macro_indicators", compute_kind="api")
def usd_cpi(
    context: AssetExecutionContext,
    fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
    """US CPI (year-on-year) series. Tracks whether inflation is rising or falling."""
    records = fxmacrodata.get_announcements("usd", "inflation", start="2015-01-01")
    with sqlite3.connect(_DB_PATH) as conn:
        _ensure_table(conn, "usd_cpi")
        n = _upsert(conn, "usd_cpi", records)
    latest = records[0] if records else {}
    context.log.info("US CPI: %s%% (as of %s)", latest.get("val"), latest.get("date"))
    return MaterializeResult(
        metadata={
            "row_count": MetadataValue.int(n),
            "latest_date": MetadataValue.text(latest.get("date", "n/a")),
            "latest_val": MetadataValue.float(latest.get("val") or 0.0),
        }
    )


@asset(group_name="macro_indicators", compute_kind="api")
def eur_policy_rate(
    context: AssetExecutionContext,
    fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
    """ECB main refinancing rate series."""
    records = fxmacrodata.get_announcements("eur", "policy_rate", start="2015-01-01")
    with sqlite3.connect(_DB_PATH) as conn:
        _ensure_table(conn, "eur_policy_rate")
        n = _upsert(conn, "eur_policy_rate", records)
    latest = records[0] if records else {}
    context.log.info("ECB rate: %s%% (as of %s)", latest.get("val"), latest.get("date"))
    return MaterializeResult(
        metadata={
            "row_count": MetadataValue.int(n),
            "latest_date": MetadataValue.text(latest.get("date", "n/a")),
            "latest_val": MetadataValue.float(latest.get("val") or 0.0),
        }
    )


@asset(group_name="fx_rates", compute_kind="api")
def eurusd_spot(
    context: AssetExecutionContext,
    fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
    """Daily EUR/USD closing spot rate."""
    records = fxmacrodata.get_forex("eur", "usd", start="2023-01-01")
    with sqlite3.connect(_DB_PATH) as conn:
        _ensure_table(conn, "eurusd_spot")
        n = _upsert(conn, "eurusd_spot", records)
    latest = records[0] if records else {}
    context.log.info("EUR/USD: %s (as of %s)", latest.get("val"), latest.get("date"))
    return MaterializeResult(
        metadata={
            "row_count": MetadataValue.int(n),
            "latest_date": MetadataValue.text(latest.get("date", "n/a")),
            "latest_val": MetadataValue.float(latest.get("val") or 0.0),
        }
    )


@asset(group_name="calendar", compute_kind="api")
def usd_release_calendar(
    context: AssetExecutionContext,
    fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
    """Upcoming USD macro release dates and times."""
    events = fxmacrodata.get_calendar("usd")
    df = pd.DataFrame(events) if events else pd.DataFrame()
    context.log.info("Found %d upcoming USD events", len(df))
    # Surface a preview in the Dagster UI asset metadata pane
    preview = df.head(5).to_markdown(index=False) if not df.empty else "No events found"
    return MaterializeResult(
        metadata={
            "event_count": MetadataValue.int(len(df)),
            "preview": MetadataValue.md(preview),
        }
    )

Metadata aset di UI Dagster

Setiap MaterializeResult mengembalikan metadata terstruktur yang digambarkan Dagster di panel detail aset jumlah baris, tanggal terakhir, tabel pratinjau.


- Langkah 4 -

Langkah 4 Buat pekerjaan dan jadwal harian

Dagster. pekerjaan memilih aset mana yang akan terwujud dalam satu run. jadwal Kami ingin pembacaan makro segar setiap hari kerja pagi pukul 06:30 UTC sebelum Frankfurt terbuka sehingga setiap indikator yang dirilis semalam ditangkap sebelum perdagangan Eropa dimulai.

Ciptakan fxmacro_pipeline/jobs.pyAku tidak tahu.

from dagster import (
    AssetSelection,
    ScheduleDefinition,
    define_asset_job,
)

# ── Jobs ──────────────────────────────────────────────────────────────────────

macro_refresh_job = define_asset_job(
    name="macro_refresh",
    description="Refresh all FXMacroData indicator and FX rate assets.",
    selection=AssetSelection.groups("macro_indicators", "fx_rates"),
)

calendar_refresh_job = define_asset_job(
    name="calendar_refresh",
    description="Pull the latest USD release calendar.",
    selection=AssetSelection.groups("calendar"),
)

# ── Schedules ─────────────────────────────────────────────────────────────────

# Weekdays at 06:30 UTC — 30 min before Frankfurt opens
macro_daily_schedule = ScheduleDefinition(
    name="macro_daily_at_0630_utc",
    job=macro_refresh_job,
    cron_schedule="30 6 * * 1-5",
    execution_timezone="UTC",
)

# Refresh the release calendar every Sunday evening so the week's
# upcoming events are loaded before Monday's first run.
calendar_weekly_schedule = ScheduleDefinition(
    name="calendar_weekly_on_sunday",
    job=calendar_refresh_job,
    cron_schedule="0 18 * * 0",
    execution_timezone="UTC",
)

Dua pekerjaan terpisah satu untuk seri indikator, satu untuk kalender rilis memungkinkan Anda untuk mengamati dan menjalankan ulang secara independen. Jika pengambilan kalender rilan gagal (misalnya selama hari libur umum ketika titik akhir mengembalikan muatan kosong), pekerjaan pembaruan indikator tidak terpengaruh.


- Langkah 5 -

Langkah 5 Kabel semua ke dalam objek Definisi

Dagster's. Definitions objek adalah titik masuk tunggal yang menghubungkan aset, sumber daya, pekerjaan, dan jadwal bersama-sama. fxmacro_pipeline/__init__.pyAku tidak tahu.

import os

from dagster import Definitions

from .assets import (
    eur_policy_rate,
    eurusd_spot,
    usd_cpi,
    usd_policy_rate,
    usd_release_calendar,
)
from .jobs import (
    calendar_refresh_job,
    calendar_weekly_schedule,
    macro_daily_schedule,
    macro_refresh_job,
)
from .resources import FXMacroDataResource

defs = Definitions(
    assets=[
        usd_policy_rate,
        usd_cpi,
        eur_policy_rate,
        eurusd_spot,
        usd_release_calendar,
    ],
    resources={
        "fxmacrodata": FXMacroDataResource(
            # Falls back to the FXMACRO_API_KEY env var inside the resource
            api_key=os.environ.get("FXMACRO_API_KEY", ""),
        ),
    },
    jobs=[macro_refresh_job, calendar_refresh_job],
    schedules=[macro_daily_schedule, calendar_weekly_schedule],
)

Injeksi sumber daya pada saat runtime

Setiap aset yang menyatakan fxmacrodata: FXMacroDataResource sebagai parameter secara otomatis menerima contoh yang dikonfigurasi pada saat runtime. untuk menukarkan dalam simulasi untuk pengujian, override sumber daya dalam tes Anda Definitions kode aset tidak berubah.


- Langkah 6 -

Langkah 6 Tambahkan sensor untuk memperingatkan pada tanda-tanda inflasi yang mengejutkan

Sensor di Dagster jajak pendapat untuk kondisi eksternal dan pemicu berjalan atau saluran peringatan ketika sesuatu berubah. usd_cpi materialisasi: jika pembacaan CPI terbaru lebih dari 0,4 poin persentase di atas pembacaan sebelumnya, itu memicu peringatan webhook sehingga strategi atau sistem risiko Anda dapat bereaksi segera.

Tambahkan fxmacro_pipeline/sensors.pyAku tidak tahu.

import os
import sqlite3

import requests as http
from dagster import (
    RunStatusSensorContext,
    asset_sensor,
    AssetKey,
    EventLogEntry,
    SensorResult,
    SkipReason,
)

_DB_PATH = os.environ.get("FXMACRO_DB", "fxmacro_data.db")
_ALERT_WEBHOOK = os.environ.get("ALERT_WEBHOOK_URL", "")
_SURPRISE_THRESHOLD = 0.4  # percentage points


@asset_sensor(asset_key=AssetKey("usd_cpi"), job_name="macro_refresh")
def cpi_surprise_alert_sensor(
    context: RunStatusSensorContext,
    asset_event: EventLogEntry,
) -> SensorResult | SkipReason:
    """Alert when the latest US CPI print is significantly above the prior reading."""
    try:
        conn = sqlite3.connect(_DB_PATH)
        rows = conn.execute(
            "SELECT date, val FROM usd_cpi ORDER BY date DESC LIMIT 2"
        ).fetchall()
        conn.close()
    except Exception as exc:
        return SkipReason(f"DB read failed: {exc}")

    if len(rows) < 2:
        return SkipReason("Not enough CPI history yet.")

    latest_date, latest_val = rows[0]
    prior_date, prior_val = rows[1]
    surprise = (latest_val or 0) - (prior_val or 0)

    context.log.info(
        "CPI %s: %.2f%% (prior %s: %.2f%%, surprise: %+.2f pp)",
        latest_date, latest_val, prior_date, prior_val, surprise,
    )

    if surprise >= _SURPRISE_THRESHOLD and _ALERT_WEBHOOK:
        payload = {
            "text": (
                f":fire: *USD CPI Surprise* — latest {latest_date}: *{latest_val:.1f}%* "
                f"vs prior {prior_date}: {prior_val:.1f}% "
                f"(surprise: *{surprise:+.2f} pp*)"
            )
        }
        http.post(_ALERT_WEBHOOK, json=payload, timeout=8)
        context.log.warning("CPI surprise alert fired: %+.2f pp", surprise)

    return SensorResult(cursor=str(latest_date))

Masukkan sensornya. __init__.py dengan mengimpornya dan menambahkannya ke Definitions sensors Daftar:

from .sensors import cpi_surprise_alert_sensor

defs = Definitions(
    # ... existing fields ...
    sensors=[cpi_surprise_alert_sensor],
)

- Langkah 7 -

Langkah 7 Luncurkan Dagster UI dan jalankan materialisasi pertama Anda

Mulai server pengembangan lokal. Definitions melalui pyproject.toml Tempat masuk:

dagster dev

Navigasi ke http://localhost:3000Kau akan melihat Grafik Aset menunjukkan semua lima aset dikelompokkan ke dalam kelompok logis mereka (macro_indicatorsAku akan pergi. fx_ratesAku akan pergi. calendarKlik Mematerialkan Semuanya Untuk menjalankan pertama intake pass dalam hitungan detik, panel aset akan menunjukkan jumlah baris, tanggal terbaru, dan nilai terbaru ditarik langsung dari FXMacroData API.

Memverifikasi data di SQLite

sqlite3 fxmacro_data.db
sqlite> SELECT date, val FROM usd_policy_rate ORDER BY date DESC LIMIT 5;
sqlite> SELECT date, val FROM usd_cpi ORDER BY date DESC LIMIT 5;
sqlite> SELECT date, val FROM eurusd_spot ORDER BY date DESC LIMIT 3;

Untuk mengaktifkan jadwal, navigasi ke Tinjauan → Jadwal di UI Dagster dan beralih macro_daily_at_0630_utc Untuk Berlari. Proses Daemon Dagster sekarang akan bangun setiap hari kerja pada 06:30 UTC dan mematerialisasi aset indikator makro secara otomatis.


Langkah 8: Uji

Langkah 8 Menulis tes unit dengan sumber daya simulasi

Salah satu manfaat desain Dagster adalah bahwa aset adalah fungsi Python biasa mudah untuk unit-test tanpa menjalankan tumpukan orkestrasi penuh.

# tests/test_assets.py
import sqlite3
import tempfile
import os

from unittest.mock import MagicMock

import pytest
from dagster import build_asset_context, materialize

from fxmacro_pipeline.assets import usd_policy_rate, usd_cpi
from fxmacro_pipeline.resources import FXMacroDataResource


MOCK_RATE_DATA = [
    {"date": "2025-12-18", "val": 4.5, "announcement_datetime": "2025-12-18T19:00:00Z"},
    {"date": "2025-11-07", "val": 4.75, "announcement_datetime": "2025-11-07T19:00:00Z"},
]

MOCK_CPI_DATA = [
    {"date": "2025-12-11", "val": 2.7, "announcement_datetime": "2025-12-11T13:30:00Z"},
    {"date": "2025-11-13", "val": 2.6, "announcement_datetime": "2025-11-13T13:30:00Z"},
]


class MockFXMacroData(FXMacroDataResource):
    def get_announcements(self, currency, indicator, start="2020-01-01"):
        if indicator == "policy_rate":
            return MOCK_RATE_DATA
        if indicator == "inflation":
            return MOCK_CPI_DATA
        return []

    def get_forex(self, base, quote, start="2023-01-01"):
        return [{"date": "2025-12-31", "val": 1.0482, "announcement_datetime": None}]

    def get_calendar(self, currency):
        return []


@pytest.fixture(autouse=True)
def temp_db(tmp_path, monkeypatch):
    monkeypatch.setenv("FXMACRO_DB", str(tmp_path / "test.db"))


def test_usd_policy_rate_materialises():
    result = materialize(
        [usd_policy_rate],
        resources={"fxmacrodata": MockFXMacroData()},
    )
    assert result.success
    mat = result.asset_materializations_for_node("usd_policy_rate")[0]
    assert mat.metadata["row_count"].value == 2
    assert mat.metadata["latest_val"].value == pytest.approx(4.5)


def test_usd_cpi_materialises():
    result = materialize(
        [usd_cpi],
        resources={"fxmacrodata": MockFXMacroData()},
    )
    assert result.success
    mat = result.asset_materializations_for_node("usd_cpi")[0]
    assert mat.metadata["latest_val"].value == pytest.approx(2.7)
pip install pytest
pytest tests/ -v

Langkah 9: Produksi

Langkah 9 Mengerahkan ke produksi dengan Dagster+

Untuk tim yang ingin pesawat kontrol yang dihosting tanpa mengelola daemon mereka sendiri, Dagster+ (managed cloud offering) hanya membutuhkan satu langkah konfigurasi tambahan: mengganti dagster dev dengan agen Dagster Cloud, yang berbunyi sama. Definitions objek dan menjalankan berjalan di infrastruktur Anda.

Atur kunci API Anda sebagai variabel lingkungan Dagster + (Pengiriman → Variabel LingkunganDengan nama FXMACRO_API_KEY. Sumber akan mengambilnya secara otomatis tidak ada perubahan kode yang diperlukan. Pola yang sama berlaku untuk setiap lingkungan CI / CD yang menyuntikkan rahasia sebagai variabel lingkungan (GitHub Actions, GitLab CI, CircleCI).

Untuk menyebarkan pada kelompok Kubernetes yang dihosting sendiri, paket pipa sebagai gambar Docker, mendorong ke registri Anda, dan mengarahkan bagan Dagster Helm ke dalamnya. fxmacrodata.com pastikan kebijakan keluar cluster Anda memungkinkan HTTPS ke host itu.


- Ringkasan -

Apa yang kau bangun

Dengan mengikuti panduan ini Anda sekarang memiliki pipa Dagster yang bekerja yang:

  • Mengidentifikasi bahan yang dapat digunakan kembali, injeksi FXMacroDataResource yang menangani semua API auth dan pengelolaan kesalahan di satu tempat
  • Mengeksposisi lima aset yang ditentukan perangkat lunak suku bunga Fed, US CPI, suku bunga ECB, EUR/USD spot dan kalender rilis USD masing-masing bertahan ke SQLite dengan metadata Dagster lengkap
  • Jadwal hari kerja memperbarui pada 06:30 UTC melalui jadwal Dagster yang didorong cron sehingga Anda selalu memiliki pembacaan terbaru sebelum pasar Eropa dibuka
  • Menembakkan peringatan webhook ketika CPI cetak kejutan ke atas, memberikan sistem hilir pemicu yang didorong oleh peristiwa daripada lingkaran jajak pendapat
  • Termasuk tes unit offline cepat menggunakan sumber daya simulasi, sehingga pipa CI tidak pernah melakukan panggilan API langsung

Langkah selanjutnya

  • Perluas pipa dengan mata uang tambahan melihat katalog indikator penuh di /api-data-docs dan menambahkan aset baru mengikuti pola yang sama
  • Ganti penyimpanan SQLite dengan Postgres atau BigQuery I / O manajer untuk skala produksi
  • Tambahkan aset hilir yang membaca dari DB, menghitung skor divergensi makro, dan menulis sinyal perdagangan ke tabel sinyal menjaga logika strategi Anda terpisah dari lapisan konsumsi
  • Menjelajahi. Data posisi COT sebagai sentiment overlay untuk melengkapi seri indikator

Blogroll

AI Answer-Ready

Key Facts

Page
How To Integrate FXmacrodata With Dagster
Section
Articles
Canonical URL
https://fxmacrodata.com/id/articles/how-to-integrate-fxmacrodata-with-dagster
Source
FXMacroData editorial and official publisher references
Last Updated
2026-06-15 11:06 UTC

Provenance And Trust

Cite the canonical URL and source field above. Where available, this page maps to official publisher releases and timestamped updates.

Quick Q&A

What is this page about? This page explains How To Integrate FXmacrodata With Dagster with directly usable context for trading, research, and API workflows.

What source should be cited? Use the canonical URL and the listed source field; cite official publisher references when available.

How fresh is this content? The last updated value above reflects the page metadata or latest available data timestamp.

Can this be used in AI assistants? Yes. This section is intentionally structured for retrieval and citation in chat assistants.

Prompt Packs

Use these in ChatGPT, Claude, Gemini, Mistral, Perplexity, or Grok for consistent source-aware outputs.