Implementation

How-To Guides

How to Use FXMacroData SSE Streams for Real-Time Macro Data

A step-by-step guide to building a working SSE subscriber that listens for new FXMacroData release events in real time, filters for the currencies and indicators you care about, and pulls the full indicator payload from the REST API.

Également disponible en English
Share article X LinkedIn Email

À la fin de ce guide, vous aurez un abonné SSE qui écoute les nouveaux événements de sortie de FXMacroData en temps réel, filtre les devises et les indicateurs qui vous intéressent, puis extrait la charge utile complète de l'indicateur de l 'API REST dès la publication d'une version.

Pré-requis

  • Une clé API FXMacroData si vous voulez des flux non USD; les flux uniquement USD fonctionnent sans authentification
  • Un environnement de navigateur pour le EventSource exemple, ou Python 3.9+ pour l'exemple du travailleur
  • Le requests Le programme est installé si vous voulez suivre l'exemple Python (pip install requests)
  • Une compréhension de base des points de fin d'annonce tels que Documents sur l'inflation en USD Je suis désolé . Taux de référence en euros

Étape 1 Comprendre ce que le volet SSE offre

Le nouveau point de terminaison SSE ouvre une connexion HTTP à longue durée de vie et envoie un événement chaque fois que FXMacroData ingère une nouvelle version économique.

https://api.fxmacrodata.com/v1/stream/events
Important utiliser le domaine direct de l'API: Le SSE nécessite une connexion directe. api.fxmacrodata.com Je ne veux pas . fxmacrodata.com/api/…), qui va directement au serveur API sans tampon CDN qui interromprait le flux en direct.

Vous pouvez réduire le flux avec deux paramètres de requête optionnels:

  • currencies une liste séparée par des virgules, par exemple usd,eur
  • indicators une liste séparée par des virgules, par exemple inflation,policy_rate

Pour les devises autres que l'USD, passez votre clé API comme paramètre de requête.

curl -N "https://api.fxmacrodata.com/v1/stream/events?currencies=eur,gbp&indicators=inflation,policy_rate&api_key=YOUR_API_KEY"

Si vous vous connectez sans filtres et sans clé professionnelle, le flux est automatiquement limité aux événements USD. Cela rend SSE utile même pour une preuve de concept gratuite avant de câbler une couverture plus large.

Comportement important

  • Le flux est un déclencheur, pas l'ensemble complet de données. Chaque message SSE vous indique qu'une version est arrivée; vous appelez toujours le point d'arrivée d'annonce correspondant pour récupérer les enregistrements complets.
  • Le serveur envoie des battements de cœur. Les connexions inactives reçoivent un : heartbeat Les commentaires sont tous les 15 secondes pour que les proxies ne ferment pas le flux.
  • Chaque événement comporte une pièce d'identité. Cette identité est ce que les pouvoirs de répliquer sur la reconnexion via Last-Event-IDJe suis désolé .

Étape 2 Connaître le format de l'événement avant d'écrire un client

FXMacroData émet des trames standard W3C EventSource. Un message typique ressemble à ceci:

id: usd_inflation_1772109000
event: announcement
data: {"event_id": "usd_inflation_1772109000", "currency": "usd", "indicator": "inflation", "records_written": 1, "timestamp": 1772109002}

Les champs de charge utile sont intentionnellement petits et opérationnels:

  • event_id identifiant déterministe dans le formulaire {currency}_{indicator}_{timestamp}
  • currency code de monnaie en minuscules tel que usd ou ... eur
  • indicator l'indicateur FXMacroData, par exemple inflationJe suis désolé . policy_rate, ou non_farm_payrolls
  • records_written nombre de nouveaux enregistrements enregistrés lors de l'ingestion
  • timestamp Marque d'heure Unix pour quand l'événement a été publié sur le flux

Étant donné que le message SSE ne contient pas la série temporelle complète, le schéma normal est le suivant: écouter l'événement, puis appeler le point final concerné sous Les données de l'API pour récupérer les données de libération nouvellement disponibles.


Étape 3 Ouvrez le flux dans un navigateur avec EventSource

Si vous créez un tableau de bord, un notificateur basé sur un navigateur ou une page de surveillance interne, native EventSource Utilisez la chaîne de requête pour authentifier afin que la connexion fonctionne dans le navigateur sans en-têtes personnalisés.

const apiKey = "YOUR_API_KEY";

const streamUrl = new URL("https://api.fxmacrodata.com/v1/stream/events");
streamUrl.searchParams.set("currencies", "eur,gbp");
streamUrl.searchParams.set("indicators", "inflation,policy_rate");
streamUrl.searchParams.set("api_key", apiKey);

const source = new EventSource(streamUrl);

source.addEventListener("announcement", async (event) => {
  const payload = JSON.parse(event.data);
  console.log("Release received", payload);

  const dataUrl = new URL(
    `https://api.fxmacrodata.com/v1/announcements/${payload.currency}/${payload.indicator}`
  );
  dataUrl.searchParams.set("api_key", apiKey);

  const response = await fetch(dataUrl);
  const records = await response.json();
  const latest = records[records.length - 1];

  console.log("Latest record", latest);
});

source.onerror = (error) => {
  console.error("SSE connection problem", error);
};

Je suis natif . EventSource Il se reconnecte automatiquement lorsque la connexion tombe. id: Dans le champ de chaque message, le navigateur inclura automatiquement le dernier ID d'événement reçu lors des reconnexions, ce qui permet au serveur de rejouer les événements tamponnés que vous avez manqués.

Pourquoi ce modèle fonctionne bien

Votre page reste inactive jusqu'à ce qu'une version arrive. Il n'y a pas de sondage de cinq minutes, pas de demandes gaspillées, et pas d'écart de temps entre la publication et votre logique de rafraîchissement. C'est particulièrement utile pour les versions très sensibles telles que Liste des salariés non agricoles en USD ou ... Inflation en eurosJe suis désolé .


Étape 4 Retrouvez la charge utile complète de l'indicateur après chaque alerte

Le point de conception le plus important est que le SSE vous le dit. quand ? Pour aller chercher, pas Tout ce que tu veux . Après l'arrivée d'un événement, appelez le point d'arrière d'annonce correspondant et inspectez le dernier enregistrement.

async function fetchLatestRelease(currency, indicator, apiKey) {
  const url = new URL(`https://api.fxmacrodata.com/v1/announcements/${currency}/${indicator}`);

  if (currency !== "usd") {
    url.searchParams.set("api_key", apiKey);
  }

  const response = await fetch(url);
  if (!response.ok) {
    throw new Error(`Announcement fetch failed: ${response.status}`);
  }

  const records = await response.json();
  return records[records.length - 1] ?? null;
}

source.addEventListener("announcement", async (event) => {
  const payload = JSON.parse(event.data);
  const latest = await fetchLatestRelease(payload.currency, payload.indicator, apiKey);

  if (latest) {
    console.log(`New ${payload.currency.toUpperCase()} ${payload.indicator}`, latest);
  }
});

Cela permet de garder le flux léger tout en préservant la fidélité totale des terminaux REST. Cela signifie également que le même code d'analyse en aval peut être réutilisé que votre déclencheur provienne de SSE, d'un planificateur ou d'une appel API manuel.


Étape 5 Construire un travailleur Python avec une répétition explicite sur reconnect

Pour un démon ou un bot côté serveur, il est souvent utile de contrôler la logique de reconnexion directement et d'envoyer le Last-Event-ID L'exemple ci-dessous écoute les événements d'annonce, stocke l'ID de dernière vue et reproduit les évènements tamponnés manqués après une déconnexion.

import json
import time
import requests

API_KEY = "YOUR_API_KEY"
STREAM_URL = (
    "https://api.fxmacrodata.com/v1/stream/events"
    "?currencies=eur,gbp&indicators=inflation,policy_rate&api_key=" + API_KEY
)


def fetch_latest_release(currency: str, indicator: str) -> dict | None:
    url = f"https://api.fxmacrodata.com/v1/announcements/{currency}/{indicator}"
    params = {"api_key": API_KEY} if currency != "usd" else {}
    response = requests.get(url, params=params, timeout=20)
    response.raise_for_status()
    records = response.json()
    return records[-1] if records else None


def consume_stream() -> None:
    last_event_id = None

    while True:
        headers = {"Accept": "text/event-stream"}
        if last_event_id:
            headers["Last-Event-ID"] = last_event_id

        try:
            with requests.get(STREAM_URL, headers=headers, stream=True, timeout=90) as response:
                response.raise_for_status()

                event = {}
                for raw_line in response.iter_lines(decode_unicode=True):
                    if raw_line is None:
                        continue

                    line = raw_line.strip()

                    if not line:
                        if event.get("event") == "announcement" and event.get("data"):
                            payload = json.loads(event["data"])
                            last_event_id = event.get("id") or payload["event_id"]

                            latest = fetch_latest_release(
                                payload["currency"],
                                payload["indicator"],
                            )
                            print("Announcement event", payload)
                            print("Latest record", latest)

                        event = {}
                        continue

                    if line.startswith(":"):
                        continue

                    field, _, value = line.partition(":")
                    event[field] = value.lstrip()

        except requests.RequestException as exc:
            print(f"Stream disconnected: {exc}. Reconnecting in 3 seconds...")
            time.sleep(3)


if __name__ == "__main__":
    consume_stream()

Si la connexion tombe après qu'un événement a été publié mais avant que votre travailleur ne le traite, la prochaine demande comprend le dernier identifiant reçu et le serveur répète les événements tamponnés qui sont venus après.


Étape 6 Décidez de ce que vous voulez faire quand un événement arrive

Une fois le courant branché, le choix de conception réel est ce qui se passe en aval.

Ressouffler une carte de tableau de bord

Quand un événement correspondant arrive, récupérez le dernier enregistrement et redessinez un panneau au lieu de recharger la page entière.

Déclencher un flux de travail de négociation ou d'alerte

Poussez l'événement dans Slack, e-mail, ou une file d'attente de trading après avoir comparé la dernière version avec vos seuils.

On a un cache à chauffer .

Utiliser l'ESS comme déclencheur d'invalidation, puis mettre à jour le cache des indicateurs affectés uniquement lorsque de nouvelles données sont confirmées.

Combiner avec le calendrier de sortie

Utiliser le guide du calendrier de sortie pour savoir ce qui est prévu ensuite, puis garder SSE ouvert comme la couche de confirmation en direct.


Étape 7 Traiter quelques cas de bord à l'avance

L'ESS est simple, mais l'utilisation en production bénéficie encore de quelques règles:

  • Ne présumez pas qu'un événement équivaut à une valeur. records_written peut être supérieure à un si une version met à jour plus d'un enregistrement.
  • Attends de la reconnexion. Les navigateurs, les proxies et les réseaux mobiles interrompent parfois les connexions de longue durée; le support de lecture existe pour cette raison.
  • Gardez le ruisseau étroit autant que possible. Filtration par currencies Je suis désolé . indicators réduit le bruit et évite les prises inutiles en aval.
  • Laissez REST comme source de vérité. Le flux vous indique qu'une sortie s'est produite; l'enregistrement canonique provient toujours du point final d'annonce correspondant.
Règle pratique: Si votre application a manqué une longue fenêtre d'arrêt et que le tampon de lecture n'est plus suffisant, retournez à une récupération REST normale pour les indicateurs que vous suivez.

Ce que vous avez construit

Vous avez maintenant le modèle de base axé sur les événements pour les flux de travail en temps réel FXMacroData: abonnez-vous à https://api.fxmacrodata.com/v1/stream/events, filtrer vers les monnaies et les indicateurs qui comptent, réagir à announcement Les événements arrivent, et récupèrent les données de sortie complètes seulement lorsque le flux vous dit que quelque chose de nouveau est arrivé.

Une prochaine étape naturelle est de combiner l'ESS avec le Guide de planification du calendrier de sortie Vous savez donc ce qui est prévu ensuite et vous recevez toujours un signal immédiat lorsque la publication a lieu.


L'équipe FXMacroData

Blogroll

AI Answer-Ready

Key Facts

Page
How To Use FXmacrodata Sse Streams
Section
Articles
Canonical URL
https://fxmacrodata.com/articles/how-to-use-fxmacrodata-sse-streams
Source
FXMacroData editorial and official publisher references
Last Updated
2026-06-15 11:01 UTC

Provenance And Trust

Cite the canonical URL and source field above. Where available, this page maps to official publisher releases and timestamped updates.

Quick Q&A

What is this page about? This page explains How To Use FXmacrodata Sse Streams with directly usable context for trading, research, and API workflows.

What source should be cited? Use the canonical URL and the listed source field; cite official publisher references when available.

How fresh is this content? The last updated value above reflects the page metadata or latest available data timestamp.

Can this be used in AI assistants? Yes. This section is intentionally structured for retrieval and citation in chat assistants.

Prompt Packs

Use these in ChatGPT, Claude, Gemini, Mistral, Perplexity, or Grok for consistent source-aware outputs.

Share page X LinkedIn Email