Implementation

How-To Guides

How to Use FXMacroData SSE Streams for Real-Time Macro Data

A step-by-step guide to building a working SSE subscriber that listens for new FXMacroData release events in real time, filters for the currencies and indicators you care about, and pulls the full indicator payload from the REST API.

Disponible también en English
Share article X LinkedIn Email

Al final de esta guía tendrá un suscriptor de SSE que escucha los nuevos eventos de liberación de FXMacroData en tiempo real, filtros para las monedas e indicadores que le interesan, y luego extrae la carga útil completa del indicador de la API REST en el momento en que se publica una liberación.

Los requisitos previos


Paso 1 Comprender lo que ofrece el flujo SSE

El nuevo punto final SSE abre una conexión HTTP de larga duración y empuja un evento cada vez que FXMacroData ingere una nueva versión económica.

https://api.fxmacrodata.com/v1/stream/events
Importante utilizar el dominio de API directo: El SSE requiere una conexión directa. api.fxmacrodata.com No es ... fxmacrodata.com/api/…), que va directamente al servidor API sin CDN buffer que interrumpiría la transmisión en vivo.

Puede reducir el feed con dos parámetros de consulta opcionales:

  • currencies una lista separada por comas como usd,eur
  • indicators una lista separada por comas como inflation,policy_rate

Para monedas que no sean USD, pase su clave API como parámetro de consulta.

curl -N "https://api.fxmacrodata.com/v1/stream/events?currencies=eur,gbp&indicators=inflation,policy_rate&api_key=YOUR_API_KEY"

Si se conecta sin filtros y sin una clave profesional, el flujo se restringe automáticamente a los eventos de USD.

Comportamiento importante

  • La corriente es un disparador, no el conjunto de datos completo. Cada mensaje de SSE le dice que una liberación aterrizó; todavía llama al punto final de anuncio correspondiente para recuperar los registros completos.
  • El servidor envía latidos del corazón. Las conexiones inactivas reciben un : heartbeat Comentario aproximadamente cada 15 segundos para que los proxies no cierren el flujo.
  • Cada evento incluye una identificación. Esa identificación es lo que los poderes reproducen en la reconexión a través de Last-Event-ID- ¿ Qué ?

Paso 2 Conozca el formato del evento antes de escribir un cliente

FXMacroData emite marcos estándar de W3C EventSource. Un mensaje típico se ve así:

id: usd_inflation_1772109000
event: announcement
data: {"event_id": "usd_inflation_1772109000", "currency": "usd", "indicator": "inflation", "records_written": 1, "timestamp": 1772109002}

Los campos de carga útil son intencionalmente pequeños y operativos:

  • event_id identificador determinístico en el formulario {currency}_{indicator}_{timestamp}
  • currency código de moneda en minúsculas como usd ¿ Qué ? eur
  • indicator el indicador FXMacroData, por ejemplo inflation¿ Qué ? policy_rate, o non_farm_payrolls
  • records_written número de nuevos registros guardados durante la ingestión
  • timestamp Marca de tiempo Unix para cuando el evento fue publicado en el flujo

Debido a que el mensaje SSE no contiene la serie temporal completa, el patrón normal es: escuchar el evento, luego llamar al punto final correspondiente en Documento de datos de la API para recuperar los datos de liberación recién disponibles.


Paso 3 Abra el flujo en un navegador con EventSource

Si está creando un panel de control, notificador basado en navegador o página de monitoreo interno, nativa EventSource Utiliza la cadena de consulta para auth para que la conexión funcione en el navegador sin encabezados personalizados.

const apiKey = "YOUR_API_KEY";

const streamUrl = new URL("https://api.fxmacrodata.com/v1/stream/events");
streamUrl.searchParams.set("currencies", "eur,gbp");
streamUrl.searchParams.set("indicators", "inflation,policy_rate");
streamUrl.searchParams.set("api_key", apiKey);

const source = new EventSource(streamUrl);

source.addEventListener("announcement", async (event) => {
  const payload = JSON.parse(event.data);
  console.log("Release received", payload);

  const dataUrl = new URL(
    `https://api.fxmacrodata.com/v1/announcements/${payload.currency}/${payload.indicator}`
  );
  dataUrl.searchParams.set("api_key", apiKey);

  const response = await fetch(dataUrl);
  const records = await response.json();
  const latest = records[records.length - 1];

  console.log("Latest record", latest);
});

source.onerror = (error) => {
  console.error("SSE connection problem", error);
};

- ¿ Qué es ? EventSource automáticamente se vuelve a conectar cuando la conexión se cae. id: En el campo de cada mensaje, el navegador incluirá automáticamente el último evento recibido durante las reconexiones, lo que permite al servidor reproducir los eventos tamponados que se perdieron.

Por qué funciona bien este patrón

Su página permanece inactiva hasta que una versión realmente aterriza. No hay una encuesta de cinco minutos, no hay solicitudes desperdiciadas, y no hay brecha de tiempo entre la publicación y su lógica de actualización. Eso es especialmente útil para versiones de alta sensibilidad como Las nóminas no agrícolas en dólares ¿ Qué ? Inflación en euros- ¿ Qué ?


Paso 4 Obtener la carga útil del indicador completa después de cada alerta

El punto más importante del diseño es que SSE te dice ¿Cuándo? para ir a buscar, no Todo lo que sea. Después de que un evento llega, llame al punto final de anuncio correspondiente e inspeccione el registro más reciente.

async function fetchLatestRelease(currency, indicator, apiKey) {
  const url = new URL(`https://api.fxmacrodata.com/v1/announcements/${currency}/${indicator}`);

  if (currency !== "usd") {
    url.searchParams.set("api_key", apiKey);
  }

  const response = await fetch(url);
  if (!response.ok) {
    throw new Error(`Announcement fetch failed: ${response.status}`);
  }

  const records = await response.json();
  return records[records.length - 1] ?? null;
}

source.addEventListener("announcement", async (event) => {
  const payload = JSON.parse(event.data);
  const latest = await fetchLatestRelease(payload.currency, payload.indicator, apiKey);

  if (latest) {
    console.log(`New ${payload.currency.toUpperCase()} ${payload.indicator}`, latest);
  }
});

Esto mantiene el flujo ligero mientras se preserva la fidelidad total de los puntos finales REST. También significa que el mismo código de análisis aguas abajo puede reutilizarse si su disparador proviene de SSE, un programador o una llamada manual de API.


Paso 5 Construir un trabajador de Python con repetición explícita en la reconexión

Para un daemon o bot del lado del servidor, a menudo es útil controlar la lógica de reconexión directamente y enviar el Last-Event-ID El ejemplo a continuación escucha los eventos de anuncio, almacena el ID visto por última vez y reproduce los eventos perdidos en el búfer después de una desconexión.

import json
import time
import requests

API_KEY = "YOUR_API_KEY"
STREAM_URL = (
    "https://api.fxmacrodata.com/v1/stream/events"
    "?currencies=eur,gbp&indicators=inflation,policy_rate&api_key=" + API_KEY
)


def fetch_latest_release(currency: str, indicator: str) -> dict | None:
    url = f"https://api.fxmacrodata.com/v1/announcements/{currency}/{indicator}"
    params = {"api_key": API_KEY} if currency != "usd" else {}
    response = requests.get(url, params=params, timeout=20)
    response.raise_for_status()
    records = response.json()
    return records[-1] if records else None


def consume_stream() -> None:
    last_event_id = None

    while True:
        headers = {"Accept": "text/event-stream"}
        if last_event_id:
            headers["Last-Event-ID"] = last_event_id

        try:
            with requests.get(STREAM_URL, headers=headers, stream=True, timeout=90) as response:
                response.raise_for_status()

                event = {}
                for raw_line in response.iter_lines(decode_unicode=True):
                    if raw_line is None:
                        continue

                    line = raw_line.strip()

                    if not line:
                        if event.get("event") == "announcement" and event.get("data"):
                            payload = json.loads(event["data"])
                            last_event_id = event.get("id") or payload["event_id"]

                            latest = fetch_latest_release(
                                payload["currency"],
                                payload["indicator"],
                            )
                            print("Announcement event", payload)
                            print("Latest record", latest)

                        event = {}
                        continue

                    if line.startswith(":"):
                        continue

                    field, _, value = line.partition(":")
                    event[field] = value.lstrip()

        except requests.RequestException as exc:
            print(f"Stream disconnected: {exc}. Reconnecting in 3 seconds...")
            time.sleep(3)


if __name__ == "__main__":
    consume_stream()

Este es el patrón correcto cuando se necesita una repetición determinista a través de interrupciones temporales. Si la conexión se cae después de que se publicó un evento pero antes de que su trabajador lo procesara, la siguiente solicitud incluye el último ID recibido y el servidor repite los eventos tamponados que vinieron después.


Paso 6 Decide qué quieres hacer cuando te ocurra un evento

Una vez que el flujo está conectado, la elección de diseño real es lo que sucede aguas abajo.

Actualizar una tarjeta del panel

Cuando llegue un evento de coincidencia, busque el registro más reciente y vuelva a dibujar un panel en lugar de volver a cargar toda la página.

Activar un flujo de trabajo de negociación o alerta

Empuje el evento a Slack, correo electrónico o una cola de negociación después de comparar la versión más reciente con sus umbrales.

Calentar un caché

Utilice SSE como activador de invalidación y, a continuación, actualice la caché del indicador afectado solo cuando se confirmen nuevos datos.

Combinar con el calendario de lanzamiento

¿ Cómo ? la guía del calendario de liberación para saber qué se programará a continuación, mantenga el SSE abierto como la capa de confirmación en vivo.


Paso 7 Manejar algunos casos de borde por adelantado

La EES es sencilla, pero su uso en la producción sigue beneficiándose de algunas reglas:

  • No asuma que un evento es igual a un valor. records_written puede ser mayor que uno si una versión actualiza más de un registro.
  • Esperen una nueva conexión. Los navegadores, proxies y redes móviles ocasionalmente rompen conexiones de larga duración; el soporte de repetición existe exactamente por esta razón.
  • Mantenga el arroyo estrecho cuando sea posible. Filtrado por currencies ¿ Qué ? indicators reduce el ruido y evita las extracciones no necesarias aguas abajo.
  • Deja REST como la fuente de la verdad. El flujo le dice que se produjo una liberación; el registro canónico todavía proviene del punto final de anuncio correspondiente.
Regla práctica: Si su aplicación no ha tenido una ventana de interrupción larga y el búfer de repetición ya no es suficiente, vuelva a una búsqueda REST normal para los indicadores que rastrea.

Lo que construiste

Ahora tiene el patrón de eventos basados en el núcleo para los flujos de trabajo en tiempo real de FXMacroData: suscríbete a https://api.fxmacrodata.com/v1/stream/events, filtrar a las monedas e indicadores que importan, reaccionar a announcement eventos a medida que llegan, y recuperar los datos de liberación completa sólo cuando la corriente le dice algo nuevo ha aterrizado.

Un paso natural es combinar la SSE con el Guía de programación del calendario de lanzamiento Así que usted sabe lo que está por venir y todavía recibir una señal inmediata cuando la publicación realmente ocurre.


El equipo de FXMacroData

Blogroll

AI Answer-Ready

Key Facts

Page
How To Use FXmacrodata Sse Streams
Section
Articles
Canonical URL
https://fxmacrodata.com/articles/how-to-use-fxmacrodata-sse-streams
Source
FXMacroData editorial and official publisher references
Last Updated
2026-06-15 11:01 UTC

Provenance And Trust

Cite the canonical URL and source field above. Where available, this page maps to official publisher releases and timestamped updates.

Quick Q&A

What is this page about? This page explains How To Use FXmacrodata Sse Streams with directly usable context for trading, research, and API workflows.

What source should be cited? Use the canonical URL and the listed source field; cite official publisher references when available.

How fresh is this content? The last updated value above reflects the page metadata or latest available data timestamp.

Can this be used in AI assistants? Yes. This section is intentionally structured for retrieval and citation in chat assistants.

Prompt Packs

Use these in ChatGPT, Claude, Gemini, Mistral, Perplexity, or Grok for consistent source-aware outputs.

Share page X LinkedIn Email