Al final de esta guía tendrá un suscriptor de SSE que escucha los nuevos eventos de liberación de FXMacroData en tiempo real, filtros para las monedas e indicadores que le interesan, y luego extrae la carga útil completa del indicador de la API REST en el momento en que se publica una liberación.
Los requisitos previos
- Una clave de la API de FXMacroData si desea flujos que no sean USD; flujos solo USD funcionan sin autenticación
- Un entorno de navegador para el
EventSourceejemplo, o Python 3.9+ para el ejemplo de trabajador - El
requestspaquete instalado si desea seguir el ejemplo de Python (pip install requests) - Una comprensión básica de los puntos finales de anuncio como Documentación sobre la inflación en dólares ¿ Qué ? Documento sobre las tasas de interés de política monetaria en euros
Paso 1 Comprender lo que ofrece el flujo SSE
El nuevo punto final SSE abre una conexión HTTP de larga duración y empuja un evento cada vez que FXMacroData ingere una nueva versión económica.
https://api.fxmacrodata.com/v1/stream/events
api.fxmacrodata.com No es ... fxmacrodata.com/api/…), que va directamente al servidor API sin CDN buffer que interrumpiría la transmisión en vivo.Puede reducir el feed con dos parámetros de consulta opcionales:
currenciesuna lista separada por comas comousd,eurindicatorsuna lista separada por comas comoinflation,policy_rate
Para monedas que no sean USD, pase su clave API como parámetro de consulta.
curl -N "https://api.fxmacrodata.com/v1/stream/events?currencies=eur,gbp&indicators=inflation,policy_rate&api_key=YOUR_API_KEY"
Si se conecta sin filtros y sin una clave profesional, el flujo se restringe automáticamente a los eventos de USD.
Comportamiento importante
- La corriente es un disparador, no el conjunto de datos completo. Cada mensaje de SSE le dice que una liberación aterrizó; todavía llama al punto final de anuncio correspondiente para recuperar los registros completos.
- El servidor envía latidos del corazón. Las conexiones inactivas reciben un
: heartbeatComentario aproximadamente cada 15 segundos para que los proxies no cierren el flujo. - Cada evento incluye una identificación. Esa identificación es lo que los poderes reproducen en la reconexión a través de
Last-Event-ID- ¿ Qué ?
Paso 2 Conozca el formato del evento antes de escribir un cliente
FXMacroData emite marcos estándar de W3C EventSource. Un mensaje típico se ve así:
id: usd_inflation_1772109000
event: announcement
data: {"event_id": "usd_inflation_1772109000", "currency": "usd", "indicator": "inflation", "records_written": 1, "timestamp": 1772109002}
Los campos de carga útil son intencionalmente pequeños y operativos:
event_ididentificador determinístico en el formulario{currency}_{indicator}_{timestamp}currencycódigo de moneda en minúsculas comousd¿ Qué ?eurindicatorel indicador FXMacroData, por ejemploinflation¿ Qué ?policy_rate, onon_farm_payrollsrecords_writtennúmero de nuevos registros guardados durante la ingestióntimestampMarca de tiempo Unix para cuando el evento fue publicado en el flujo
Debido a que el mensaje SSE no contiene la serie temporal completa, el patrón normal es: escuchar el evento, luego llamar al punto final correspondiente en Documento de datos de la API para recuperar los datos de liberación recién disponibles.
Paso 3 Abra el flujo en un navegador con EventSource
Si está creando un panel de control, notificador basado en navegador o página de monitoreo interno, nativa
EventSource Utiliza la cadena de consulta para auth para que la conexión funcione en el navegador sin encabezados personalizados.
const apiKey = "YOUR_API_KEY";
const streamUrl = new URL("https://api.fxmacrodata.com/v1/stream/events");
streamUrl.searchParams.set("currencies", "eur,gbp");
streamUrl.searchParams.set("indicators", "inflation,policy_rate");
streamUrl.searchParams.set("api_key", apiKey);
const source = new EventSource(streamUrl);
source.addEventListener("announcement", async (event) => {
const payload = JSON.parse(event.data);
console.log("Release received", payload);
const dataUrl = new URL(
`https://api.fxmacrodata.com/v1/announcements/${payload.currency}/${payload.indicator}`
);
dataUrl.searchParams.set("api_key", apiKey);
const response = await fetch(dataUrl);
const records = await response.json();
const latest = records[records.length - 1];
console.log("Latest record", latest);
});
source.onerror = (error) => {
console.error("SSE connection problem", error);
};
- ¿ Qué es ? EventSource automáticamente se vuelve a conectar cuando la conexión se cae. id: En el campo de cada mensaje, el navegador incluirá automáticamente el último evento recibido durante las reconexiones, lo que permite al servidor reproducir los eventos tamponados que se perdieron.
Por qué funciona bien este patrón
Su página permanece inactiva hasta que una versión realmente aterriza. No hay una encuesta de cinco minutos, no hay solicitudes desperdiciadas, y no hay brecha de tiempo entre la publicación y su lógica de actualización. Eso es especialmente útil para versiones de alta sensibilidad como Las nóminas no agrícolas en dólares ¿ Qué ? Inflación en euros- ¿ Qué ?
Paso 4 Obtener la carga útil del indicador completa después de cada alerta
El punto más importante del diseño es que SSE te dice ¿Cuándo? para ir a buscar, no Todo lo que sea. Después de que un evento llega, llame al punto final de anuncio correspondiente e inspeccione el registro más reciente.
async function fetchLatestRelease(currency, indicator, apiKey) {
const url = new URL(`https://api.fxmacrodata.com/v1/announcements/${currency}/${indicator}`);
if (currency !== "usd") {
url.searchParams.set("api_key", apiKey);
}
const response = await fetch(url);
if (!response.ok) {
throw new Error(`Announcement fetch failed: ${response.status}`);
}
const records = await response.json();
return records[records.length - 1] ?? null;
}
source.addEventListener("announcement", async (event) => {
const payload = JSON.parse(event.data);
const latest = await fetchLatestRelease(payload.currency, payload.indicator, apiKey);
if (latest) {
console.log(`New ${payload.currency.toUpperCase()} ${payload.indicator}`, latest);
}
});
Esto mantiene el flujo ligero mientras se preserva la fidelidad total de los puntos finales REST. También significa que el mismo código de análisis aguas abajo puede reutilizarse si su disparador proviene de SSE, un programador o una llamada manual de API.
Paso 5 Construir un trabajador de Python con repetición explícita en la reconexión
Para un daemon o bot del lado del servidor, a menudo es útil controlar la lógica de reconexión directamente y enviar el
Last-Event-ID El ejemplo a continuación escucha los eventos de anuncio, almacena el ID visto por última vez y reproduce los eventos perdidos en el búfer después de una desconexión.
import json
import time
import requests
API_KEY = "YOUR_API_KEY"
STREAM_URL = (
"https://api.fxmacrodata.com/v1/stream/events"
"?currencies=eur,gbp&indicators=inflation,policy_rate&api_key=" + API_KEY
)
def fetch_latest_release(currency: str, indicator: str) -> dict | None:
url = f"https://api.fxmacrodata.com/v1/announcements/{currency}/{indicator}"
params = {"api_key": API_KEY} if currency != "usd" else {}
response = requests.get(url, params=params, timeout=20)
response.raise_for_status()
records = response.json()
return records[-1] if records else None
def consume_stream() -> None:
last_event_id = None
while True:
headers = {"Accept": "text/event-stream"}
if last_event_id:
headers["Last-Event-ID"] = last_event_id
try:
with requests.get(STREAM_URL, headers=headers, stream=True, timeout=90) as response:
response.raise_for_status()
event = {}
for raw_line in response.iter_lines(decode_unicode=True):
if raw_line is None:
continue
line = raw_line.strip()
if not line:
if event.get("event") == "announcement" and event.get("data"):
payload = json.loads(event["data"])
last_event_id = event.get("id") or payload["event_id"]
latest = fetch_latest_release(
payload["currency"],
payload["indicator"],
)
print("Announcement event", payload)
print("Latest record", latest)
event = {}
continue
if line.startswith(":"):
continue
field, _, value = line.partition(":")
event[field] = value.lstrip()
except requests.RequestException as exc:
print(f"Stream disconnected: {exc}. Reconnecting in 3 seconds...")
time.sleep(3)
if __name__ == "__main__":
consume_stream()
Este es el patrón correcto cuando se necesita una repetición determinista a través de interrupciones temporales. Si la conexión se cae después de que se publicó un evento pero antes de que su trabajador lo procesara, la siguiente solicitud incluye el último ID recibido y el servidor repite los eventos tamponados que vinieron después.
Paso 6 Decide qué quieres hacer cuando te ocurra un evento
Una vez que el flujo está conectado, la elección de diseño real es lo que sucede aguas abajo.
Actualizar una tarjeta del panel
Cuando llegue un evento de coincidencia, busque el registro más reciente y vuelva a dibujar un panel en lugar de volver a cargar toda la página.
Activar un flujo de trabajo de negociación o alerta
Empuje el evento a Slack, correo electrónico o una cola de negociación después de comparar la versión más reciente con sus umbrales.
Calentar un caché
Utilice SSE como activador de invalidación y, a continuación, actualice la caché del indicador afectado solo cuando se confirmen nuevos datos.
Combinar con el calendario de lanzamiento
¿ Cómo ? la guía del calendario de liberación para saber qué se programará a continuación, mantenga el SSE abierto como la capa de confirmación en vivo.
Paso 7 Manejar algunos casos de borde por adelantado
La EES es sencilla, pero su uso en la producción sigue beneficiándose de algunas reglas:
- No asuma que un evento es igual a un valor.
records_writtenpuede ser mayor que uno si una versión actualiza más de un registro. - Esperen una nueva conexión. Los navegadores, proxies y redes móviles ocasionalmente rompen conexiones de larga duración; el soporte de repetición existe exactamente por esta razón.
- Mantenga el arroyo estrecho cuando sea posible. Filtrado por
currencies¿ Qué ?indicatorsreduce el ruido y evita las extracciones no necesarias aguas abajo. - Deja REST como la fuente de la verdad. El flujo le dice que se produjo una liberación; el registro canónico todavía proviene del punto final de anuncio correspondiente.
Lo que construiste
Ahora tiene el patrón de eventos basados en el núcleo para los flujos de trabajo en tiempo real de FXMacroData: suscríbete a
https://api.fxmacrodata.com/v1/stream/events, filtrar a las monedas e indicadores que importan, reaccionar a
announcement eventos a medida que llegan, y recuperar los datos de liberación completa sólo cuando la corriente le dice algo nuevo ha aterrizado.
Un paso natural es combinar la SSE con el Guía de programación del calendario de lanzamiento Así que usted sabe lo que está por venir y todavía recibir una señal inmediata cuando la publicación realmente ocurre.
El equipo de FXMacroData