Live release feed
Sub-second macro releases for FX backtests
Point-in-time history
Official CPI, jobs, GDP, and central-bank events with point-in-time history.
$25/month 14-day free trial
Start Free Trial
How To Integrate Fxmacrodata With Dagster image
Share headline card X LinkedIn Email
Download

Implementation

How-To Guides

How To Integrate Fxmacrodata With Dagster

Build a production-ready Dagster data pipeline that pulls macro indicators from FXMacroData on a schedule, stores results in a local database, and surfaces release calendar events for smarter execution timing.

Auch verfügbar auf English
Share article X LinkedIn Email

Dagster ist eine Python-native Datenorchestrierungsplattform, die auf dem Konzept Softwaredefinierte Vermögenswerte: Datenartefakte, die ihre eigene Abstammung kennen, können auf Anfrage oder nach einem Zeitplan materialisiert werden und reiche Metadaten und Beobachtbarkeit aus der Box freisetzen. Für Makrodaten-Workflows Indikatorreihen ziehen, Release-Ereignisse erkennen, Momentaufnahmen speichern und Anomalien warnen Dagster ist eine hervorragende Passform. Dieser Leitfaden führt durch den Aufbau einer Pipeline, die FXMacroData-Indikatoren in einen lokalen SQLite-Speicher eintritt, bevorstehende Release Kalenderereignisse aufzeigt und automatisch in einem täglichen Zeitplan läuft.

Was du bauen wirst

  • Eine FXMacroData Dagster-Ressource ein wiederverwendbarer, konfigurierbarer API-Client, der für alle Assets gemeinsam genutzt wird
  • Vier Software-definierte Vermögenswerte Kursentwicklung, KPI, Forex-Spot und Veröffentlichungskalender für das gewählte Währungspaar
  • Tägliche Arbeit und Zeitplan läuft jeden Wochentag vor der Londoner Opening, um die neuesten Messwerte zu erhalten
  • Ein Anomaliensensor Aufmerksamkeit auf unerwartete Inflation und Alarm durch Webhook

Voraussetzungen

  • Python 3.10+ alle Snippets verwenden moderne Schrift-Hinweise und match Ausführungen
  • FXMacroData-API-Schlüssel melden Sie sich an /abonnieren; USD-Indikatordaten sind öffentlich zugänglich ohne Schlüssel
  • Grundlegende Kenntnisse der Dagster Sie sollten wissen, was Vermögenswerte und Arbeitsplätze sind; Dagster-Schnellstart Das ist in zehn Minuten alles.

- Schritt 1 -

Schritt 1 Installation von Dagster und Projektabhängigkeiten

Erstellen Sie eine neue virtuelle Umgebung und installieren Sie Dagster neben den kleinen Bibliotheken, die diese Pipeline verwendet. dagster Und ... dagster-webserver Die Einführung der gleichen Version vermeidet den häufigsten Installationsfall.

python -m venv .venv
source .venv/bin/activate            # Windows: .venv\Scripts\activate

pip install "dagster==1.9.*" "dagster-webserver==1.9.*" \
            requests pandas sqlalchemy

Das Programm wird mit dem Kommando "Schaffold" gestaltet. pyproject.toml Das erlaubt der lokalen Benutzeroberfläche, Ihre Definitionen automatisch zu entdecken.

dagster project scaffold --name fxmacro_pipeline
cd fxmacro_pipeline

Speichern Sie Ihren API-Schlüssel als Umgebungsvariable.

export FXMACRO_API_KEY="YOUR_API_KEY"

Bei Produktionshosts wird der Schlüssel über die geheime Verwaltung des Schedulers (GitHub Actions-Geheimnisse, Kubernetes-Geheime, Dagster Cloud-Umgebungsvariablen) eingespeist.


- Schritt 2 -

Schritt 2 Definition einer wiederverwendbaren FXMacroData-Ressource

Ein Dolch . Ressource ist eine geteilte, injizierbare Abhängigkeit analog zu einer Datenbankverbindung oder einer HTTP-Sitzung. Das Wickeln der FXMacroData REST API in eine Ressource bedeutet, dass jedes Asset sie anrufen kann, ohne Authentifizierungslogik oder Timeout-Handling zu duplizieren.

Erstellen . fxmacro_pipeline/resources.py- Ich weiß .

import os
from typing import Any

import requests
from dagster import ConfigurableResource, get_dagster_logger

_BASE_URL = "https://fxmacrodata.com/api/v1"


class FXMacroDataResource(ConfigurableResource):
    """Thin wrapper around the FXMacroData REST API."""

    api_key: str = ""           # overridden at run-time from env
    timeout: int = 15

    def _get(self, path: str, **params: Any) -> dict:
        logger = get_dagster_logger()
        key = self.api_key or os.environ.get("FXMACRO_API_KEY", "")
        resp = requests.get(
            f"{_BASE_URL}{path}",
            params={"api_key": key, **params},
            timeout=self.timeout,
        )
        resp.raise_for_status()
        data = resp.json()
        logger.debug("GET %s → %d records", path, len(data.get("data", [])))
        return data

    def get_announcements(
        self,
        currency: str,
        indicator: str,
        start: str = "2020-01-01",
    ) -> list[dict]:
        """Return time-series records for a macro indicator."""
        result = self._get(
            f"/announcements/{currency.lower()}/{indicator}",
            start=start,
        )
        return result.get("data", [])

    def get_forex(self, base: str, quote: str, start: str = "2023-01-01") -> list[dict]:
        """Return daily FX spot-rate records."""
        result = self._get(f"/forex/{base.lower()}/{quote.lower()}", start=start)
        return result.get("data", [])

    def get_calendar(self, currency: str) -> list[dict]:
        """Return the upcoming release calendar for a currency."""
        result = self._get(f"/calendar/{currency.lower()}")
        return result.get("data", [])

Warum ConfigurableResource?

ConfigurableResource Sie können die Anmeldeinformationen und Zeitlimits zwischen Entwickler und Prod wechseln, ohne den Assetcode zu berühren. Definitions Und Dagster injiziert es, wo immer es als Parameter deklariert wird.


- Schritt 3 -

Schritt 3 Definition von Makrodatenanlagen

Jedes Dagster-Asset stellt ein logisches Datenprodukt dar. Die Materialisierung eines Assets holt die neuesten Daten von FXMacroData ab und speichert sie lokal. Zinssatz der Fed- Das ist ... US-amerikanischer KPI- Das ist ... Leitzins der EZBEin fünfter Vermögenswert liest dann den Veröffentlichungskalender, damit die Pipeline weiß, wann die nächsten wichtigen Ankündigungen fällig sind.

Erstellen . fxmacro_pipeline/assets.py- Ich weiß .

import os
import sqlite3
from datetime import date, timedelta

import pandas as pd
from dagster import (
    AssetExecutionContext,
    MaterializeResult,
    MetadataValue,
    asset,
)

from .resources import FXMacroDataResource

_DB_PATH = os.environ.get("FXMACRO_DB", "fxmacro_data.db")


def _ensure_table(conn: sqlite3.Connection, table: str) -> None:
    conn.execute(
        f"""
        CREATE TABLE IF NOT EXISTS {table} (
            date TEXT PRIMARY KEY,
            val  REAL,
            announcement_datetime TEXT
        )
        """
    )
    conn.commit()


def _upsert(conn: sqlite3.Connection, table: str, records: list[dict]) -> int:
    rows = [
        (r["date"], r.get("val"), r.get("announcement_datetime"))
        for r in records
        if r.get("date") and r.get("val") is not None
    ]
    conn.executemany(
        f"INSERT OR REPLACE INTO {table} (date, val, announcement_datetime) VALUES (?,?,?)",
        rows,
    )
    conn.commit()
    return len(rows)


@asset(group_name="macro_indicators", compute_kind="api")
def usd_policy_rate(
    context: AssetExecutionContext,
    fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
    """Latest Fed funds rate series from the FXMacroData announcements endpoint."""
    records = fxmacrodata.get_announcements("usd", "policy_rate", start="2015-01-01")
    with sqlite3.connect(_DB_PATH) as conn:
        _ensure_table(conn, "usd_policy_rate")
        n = _upsert(conn, "usd_policy_rate", records)
    latest = records[0] if records else {}
    context.log.info("Fed rate: %s%% (as of %s)", latest.get("val"), latest.get("date"))
    return MaterializeResult(
        metadata={
            "row_count": MetadataValue.int(n),
            "latest_date": MetadataValue.text(latest.get("date", "n/a")),
            "latest_val": MetadataValue.float(latest.get("val") or 0.0),
        }
    )


@asset(group_name="macro_indicators", compute_kind="api")
def usd_cpi(
    context: AssetExecutionContext,
    fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
    """US CPI (year-on-year) series. Tracks whether inflation is rising or falling."""
    records = fxmacrodata.get_announcements("usd", "inflation", start="2015-01-01")
    with sqlite3.connect(_DB_PATH) as conn:
        _ensure_table(conn, "usd_cpi")
        n = _upsert(conn, "usd_cpi", records)
    latest = records[0] if records else {}
    context.log.info("US CPI: %s%% (as of %s)", latest.get("val"), latest.get("date"))
    return MaterializeResult(
        metadata={
            "row_count": MetadataValue.int(n),
            "latest_date": MetadataValue.text(latest.get("date", "n/a")),
            "latest_val": MetadataValue.float(latest.get("val") or 0.0),
        }
    )


@asset(group_name="macro_indicators", compute_kind="api")
def eur_policy_rate(
    context: AssetExecutionContext,
    fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
    """ECB main refinancing rate series."""
    records = fxmacrodata.get_announcements("eur", "policy_rate", start="2015-01-01")
    with sqlite3.connect(_DB_PATH) as conn:
        _ensure_table(conn, "eur_policy_rate")
        n = _upsert(conn, "eur_policy_rate", records)
    latest = records[0] if records else {}
    context.log.info("ECB rate: %s%% (as of %s)", latest.get("val"), latest.get("date"))
    return MaterializeResult(
        metadata={
            "row_count": MetadataValue.int(n),
            "latest_date": MetadataValue.text(latest.get("date", "n/a")),
            "latest_val": MetadataValue.float(latest.get("val") or 0.0),
        }
    )


@asset(group_name="fx_rates", compute_kind="api")
def eurusd_spot(
    context: AssetExecutionContext,
    fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
    """Daily EUR/USD closing spot rate."""
    records = fxmacrodata.get_forex("eur", "usd", start="2023-01-01")
    with sqlite3.connect(_DB_PATH) as conn:
        _ensure_table(conn, "eurusd_spot")
        n = _upsert(conn, "eurusd_spot", records)
    latest = records[0] if records else {}
    context.log.info("EUR/USD: %s (as of %s)", latest.get("val"), latest.get("date"))
    return MaterializeResult(
        metadata={
            "row_count": MetadataValue.int(n),
            "latest_date": MetadataValue.text(latest.get("date", "n/a")),
            "latest_val": MetadataValue.float(latest.get("val") or 0.0),
        }
    )


@asset(group_name="calendar", compute_kind="api")
def usd_release_calendar(
    context: AssetExecutionContext,
    fxmacrodata: FXMacroDataResource,
) -> MaterializeResult:
    """Upcoming USD macro release dates and times."""
    events = fxmacrodata.get_calendar("usd")
    df = pd.DataFrame(events) if events else pd.DataFrame()
    context.log.info("Found %d upcoming USD events", len(df))
    # Surface a preview in the Dagster UI asset metadata pane
    preview = df.head(5).to_markdown(index=False) if not df.empty else "No events found"
    return MaterializeResult(
        metadata={
            "event_count": MetadataValue.int(len(df)),
            "preview": MetadataValue.md(preview),
        }
    )

Metadaten von Vermögenswerten in der Dagster-Benutzeroberfläche

Jedes Mal . MaterializeResult gibt strukturierte Metadaten zurück, die Dagster im Asset-Detailbereich Zeilenzahlen, letzte Daten, Vorschautabellen wiedergibt. Dies bedeutet, dass Sie auf einen Blick überprüfen können, ob eine Materialisierung tatsächlich frische Daten abruft, ohne in Protokolle zu graben.


- Schritt 4 -

Schritt 4 Schaffen Sie sich einen Arbeitsplatz und einen Tagesablauf

Ein Dolch . Arbeit Die Kommission hat die Kommission aufgefordert, die in diesem Zusammenhang zu ergreifenden Maßnahmen zu prüfen. Zeitplan Wir wollen jeden Wochentag früh um 06:30 Uhr UTC neue Makro-Werte vor der Frankfurter Öffnung, damit jeder Indikator, der über Nacht veröffentlicht wird, vor Beginn des europäischen Handels erfasst wird.

Erstellen . fxmacro_pipeline/jobs.py- Ich weiß .

from dagster import (
    AssetSelection,
    ScheduleDefinition,
    define_asset_job,
)

# ── Jobs ──────────────────────────────────────────────────────────────────────

macro_refresh_job = define_asset_job(
    name="macro_refresh",
    description="Refresh all FXMacroData indicator and FX rate assets.",
    selection=AssetSelection.groups("macro_indicators", "fx_rates"),
)

calendar_refresh_job = define_asset_job(
    name="calendar_refresh",
    description="Pull the latest USD release calendar.",
    selection=AssetSelection.groups("calendar"),
)

# ── Schedules ─────────────────────────────────────────────────────────────────

# Weekdays at 06:30 UTC — 30 min before Frankfurt opens
macro_daily_schedule = ScheduleDefinition(
    name="macro_daily_at_0630_utc",
    job=macro_refresh_job,
    cron_schedule="30 6 * * 1-5",
    execution_timezone="UTC",
)

# Refresh the release calendar every Sunday evening so the week's
# upcoming events are loaded before Monday's first run.
calendar_weekly_schedule = ScheduleDefinition(
    name="calendar_weekly_on_sunday",
    job=calendar_refresh_job,
    cron_schedule="0 18 * * 0",
    execution_timezone="UTC",
)

Wenn die Abgabe des Kalenders nicht funktioniert (z. B. an einem Feiertag, wenn der Endpunkt eine leere Nutzlast zurückgibt), ist die Aktualisierung des Indikators nicht betroffen.


- Schritt 5 -

Schritt 5 Alles in ein Definitionsobjekt einführen

Das ist Dagster. Definitions Objekt ist der einzige Eingangspunkt, der Assets, Ressourcen, Jobs und Zeitpläne miteinander verbindet. fxmacro_pipeline/__init__.py- Ich weiß .

import os

from dagster import Definitions

from .assets import (
    eur_policy_rate,
    eurusd_spot,
    usd_cpi,
    usd_policy_rate,
    usd_release_calendar,
)
from .jobs import (
    calendar_refresh_job,
    calendar_weekly_schedule,
    macro_daily_schedule,
    macro_refresh_job,
)
from .resources import FXMacroDataResource

defs = Definitions(
    assets=[
        usd_policy_rate,
        usd_cpi,
        eur_policy_rate,
        eurusd_spot,
        usd_release_calendar,
    ],
    resources={
        "fxmacrodata": FXMacroDataResource(
            # Falls back to the FXMACRO_API_KEY env var inside the resource
            api_key=os.environ.get("FXMACRO_API_KEY", ""),
        ),
    },
    jobs=[macro_refresh_job, calendar_refresh_job],
    schedules=[macro_daily_schedule, calendar_weekly_schedule],
)

Ressourcenzufuhr zur Laufzeit

Jeder Vermögenswert , der sich meldet . fxmacrodata: FXMacroDataResource Als Parameter erhält automatisch die konfigurierte Instanz bei Laufzeit. Definitions der Vermögenswertcode ändert sich nicht.


- Schritt 6 -

Schritt 6 Hinzufügen eines Sensors, um überraschende Inflationsausdrücke zu alarmieren

Die Sensoren in Dagster ermitteln die äußeren Bedingungen und lösen oder alarmieren, wenn sich etwas ändert. usd_cpi Verwirklichung: Wenn der letzte Preisindex mehr als 0,4 Prozentpunkte über dem vorherigen liegt, wird eine Webhook-Warnung ausgelöst, damit Ihre Strategie oder Ihr Risikosystem sofort reagieren kann.

Fügen Sie hinzu . fxmacro_pipeline/sensors.py- Ich weiß .

import os
import sqlite3

import requests as http
from dagster import (
    RunStatusSensorContext,
    asset_sensor,
    AssetKey,
    EventLogEntry,
    SensorResult,
    SkipReason,
)

_DB_PATH = os.environ.get("FXMACRO_DB", "fxmacro_data.db")
_ALERT_WEBHOOK = os.environ.get("ALERT_WEBHOOK_URL", "")
_SURPRISE_THRESHOLD = 0.4  # percentage points


@asset_sensor(asset_key=AssetKey("usd_cpi"), job_name="macro_refresh")
def cpi_surprise_alert_sensor(
    context: RunStatusSensorContext,
    asset_event: EventLogEntry,
) -> SensorResult | SkipReason:
    """Alert when the latest US CPI print is significantly above the prior reading."""
    try:
        conn = sqlite3.connect(_DB_PATH)
        rows = conn.execute(
            "SELECT date, val FROM usd_cpi ORDER BY date DESC LIMIT 2"
        ).fetchall()
        conn.close()
    except Exception as exc:
        return SkipReason(f"DB read failed: {exc}")

    if len(rows) < 2:
        return SkipReason("Not enough CPI history yet.")

    latest_date, latest_val = rows[0]
    prior_date, prior_val = rows[1]
    surprise = (latest_val or 0) - (prior_val or 0)

    context.log.info(
        "CPI %s: %.2f%% (prior %s: %.2f%%, surprise: %+.2f pp)",
        latest_date, latest_val, prior_date, prior_val, surprise,
    )

    if surprise >= _SURPRISE_THRESHOLD and _ALERT_WEBHOOK:
        payload = {
            "text": (
                f":fire: *USD CPI Surprise* — latest {latest_date}: *{latest_val:.1f}%* "
                f"vs prior {prior_date}: {prior_val:.1f}% "
                f"(surprise: *{surprise:+.2f} pp*)"
            )
        }
        http.post(_ALERT_WEBHOOK, json=payload, timeout=8)
        context.log.warning("CPI surprise alert fired: %+.2f pp", surprise)

    return SensorResult(cursor=str(latest_date))

Registrieren Sie den Sensor ein . __init__.py Sie werden dann in die Definitions sensors Liste:

from .sensors import cpi_surprise_alert_sensor

defs = Definitions(
    # ... existing fields ...
    sensors=[cpi_surprise_alert_sensor],
)

- Schritt 7 -

Schritt 7 Starten Sie die Dagster-Benutzeroberfläche und starten Sie Ihre erste Verwirklichung

Starten Sie den lokalen Entwicklerserver. Definitions Durch die ... pyproject.toml Eintrittspunkt:

dagster dev

Navigieren Sie zu http://localhost:3000Du wirst sehen , wie ich ... Vermögenswertdiagramm Die Daten sind in einem Bild der fünf Vermögenswerte in ihre logischen Gruppen zusammengefasst (macro_indicators- Ich weiß . fx_rates- Ich weiß . calendarKlicken Sie auf Alles materialisieren um den ersten Einnahmepass innerhalb von Sekunden auszuführen, wird der Asset-Fenster Zeilenzahlen, neueste Daten und neueste Werte direkt aus der FXMacroData API anzeigen.

Überprüfen der Daten in SQLite

sqlite3 fxmacro_data.db
sqlite> SELECT date, val FROM usd_policy_rate ORDER BY date DESC LIMIT 5;
sqlite> SELECT date, val FROM usd_cpi ORDER BY date DESC LIMIT 5;
sqlite> SELECT date, val FROM eurusd_spot ORDER BY date DESC LIMIT 3;

Um den Zeitplan zu aktivieren, navigieren Sie zu Übersicht → Zeitpläne in der Dagster Benutzeroberfläche und schalten macro_daily_at_0630_utc - Ich bin hier . Ich laufe .Dagsters Daemon-Prozess wird nun jeden Wochentag um 06:30 UTC aufwachen und die Makro-Indikatoren-Assets automatisch verwirklichen.


- Schritt 8: Prüfung

Schritt 8 Schreiben von Einheitstests mit einer Scheinressource

Einer der wichtigsten Vorteile von Dagster ist, dass Assets normale Python-Funktionen sind einfach zu testen, ohne den gesamten Orchestrierungstapel auszuführen.

# tests/test_assets.py
import sqlite3
import tempfile
import os

from unittest.mock import MagicMock

import pytest
from dagster import build_asset_context, materialize

from fxmacro_pipeline.assets import usd_policy_rate, usd_cpi
from fxmacro_pipeline.resources import FXMacroDataResource


MOCK_RATE_DATA = [
    {"date": "2025-12-18", "val": 4.5, "announcement_datetime": "2025-12-18T19:00:00Z"},
    {"date": "2025-11-07", "val": 4.75, "announcement_datetime": "2025-11-07T19:00:00Z"},
]

MOCK_CPI_DATA = [
    {"date": "2025-12-11", "val": 2.7, "announcement_datetime": "2025-12-11T13:30:00Z"},
    {"date": "2025-11-13", "val": 2.6, "announcement_datetime": "2025-11-13T13:30:00Z"},
]


class MockFXMacroData(FXMacroDataResource):
    def get_announcements(self, currency, indicator, start="2020-01-01"):
        if indicator == "policy_rate":
            return MOCK_RATE_DATA
        if indicator == "inflation":
            return MOCK_CPI_DATA
        return []

    def get_forex(self, base, quote, start="2023-01-01"):
        return [{"date": "2025-12-31", "val": 1.0482, "announcement_datetime": None}]

    def get_calendar(self, currency):
        return []


@pytest.fixture(autouse=True)
def temp_db(tmp_path, monkeypatch):
    monkeypatch.setenv("FXMACRO_DB", str(tmp_path / "test.db"))


def test_usd_policy_rate_materialises():
    result = materialize(
        [usd_policy_rate],
        resources={"fxmacrodata": MockFXMacroData()},
    )
    assert result.success
    mat = result.asset_materializations_for_node("usd_policy_rate")[0]
    assert mat.metadata["row_count"].value == 2
    assert mat.metadata["latest_val"].value == pytest.approx(4.5)


def test_usd_cpi_materialises():
    result = materialize(
        [usd_cpi],
        resources={"fxmacrodata": MockFXMacroData()},
    )
    assert result.success
    mat = result.asset_materializations_for_node("usd_cpi")[0]
    assert mat.metadata["latest_val"].value == pytest.approx(2.7)
pip install pytest
pytest tests/ -v

- Schritt 9: Produktion

Schritt 9 Einsatz in der Produktion mit Dagster+

Für Teams, die ein gehostetes Kontrollflugzeug ohne Verwaltung ihres eigenen Daemons wollen, Dagster+ (das Managed Cloud-Angebot) erfordert nur einen einzigen zusätzlichen Konfigurationsschritt: dagster dev mit dem Dagster Cloud Agent, der das gleiche liest. Definitions Objekt und führt Laufvorgänge in Ihrer Infrastruktur aus.

Setzen Sie Ihren API-Schlüssel als Umgebungsvariable von Dagster+ (Einsatz → Umgebungsvariablen) mit dem Namen FXMACRO_API_KEY. Die Ressource wird sie automatisch abholen keine Codeänderungen erforderlich. Das gleiche Muster gilt für jede CI/CD-Umgebung, die Geheimnisse als Umgebungsvariablen injiziert (GitHub Actions, GitLab CI, CircleCI).

Um auf einem selbst gehosteten Kubernetes-Cluster zu implementieren, packen Sie die Pipeline als Docker-Image ein, schieben Sie sie in Ihre Registrierung und zeigen Sie das Dagster Helm-Diagramm darauf. fxmacrodata.com stellen Sie sicher, dass die Ausgangsrichtlinie Ihres Clusters HTTPS zu diesem Host zulässt.


Zusammenfassung

Was du gebaut hast

Durch die Einhaltung dieser Anleitung haben Sie jetzt eine funktionierende Dagster-Pipeline, die:

  • Definition eines wiederverwendbaren, injizierbaren FXMacroDataResource Das verarbeitet alle API-Authentifizierung und Fehlerbearbeitung an einem Ort
  • Die Daten werden in einem System mit einem Datenverzeichnis von SQLite und einem Datenbankverzeichnissen erfasst.
  • Planen der Wochentag aktualisiert um 06:30 UTC über einen cron-gesteuerten Dagster Zeitplan, so dass Sie immer up-to-date Messwerte vor den europäischen Märkten öffnen
  • Wird mit einem Webhook-Alarm ausgelöst, wenn ein CPI-Druck überrascht, und gibt den nachgelagerten Systemen einen ereignisgesteuerten Auslöser anstelle einer Umfrageschleife
  • Enthält schnelle Offline-Einheitstests mit einer Scheinressource, so dass die CI-Pipeline nie Live-API-Aufrufe macht

Nächste Schritte

  • Erweitern Sie die Pipeline um weitere Währungen Siehe vollständigen Indikatorkatalog unter /api-daten-docs und neue Vermögenswerte nach dem gleichen Muster hinzufügen
  • Ersetzen Sie den SQLite-Speicher durch einen Postgres- oder BigQuery-E/A-Manager für die Produktionsskala
  • Fügen Sie einen nachgelagerten Vermögenswert hinzu, der aus der DB liest, einen Makro-Divergenz-Score berechnet und Handelssignale in eine Signaltabelle schreibt Ihre Strategielogik von der Einnahme-Schicht getrennt hält
  • Erforschen . COT-Positionsdaten als Stimmungsüberlagerung zur Ergänzung der Indikatorenreihe

Blogroll

AI Answer-Ready

Key Facts

Page
How To Integrate FXmacrodata With Dagster
Section
Articles
Canonical URL
https://fxmacrodata.com/de/artikel/how-to-integrate-fxmacrodata-with-dagster
Source
FXMacroData editorial and official publisher references
Last Updated
2026-06-15 11:06 UTC

Provenance And Trust

Cite the canonical URL and source field above. Where available, this page maps to official publisher releases and timestamped updates.

Quick Q&A

What is this page about? This page explains How To Integrate FXmacrodata With Dagster with directly usable context for trading, research, and API workflows.

What source should be cited? Use the canonical URL and the listed source field; cite official publisher references when available.

How fresh is this content? The last updated value above reflects the page metadata or latest available data timestamp.

Can this be used in AI assistants? Yes. This section is intentionally structured for retrieval and citation in chat assistants.

Prompt Packs

Use these in ChatGPT, Claude, Gemini, Mistral, Perplexity, or Grok for consistent source-aware outputs.