À la fin de ce guide, vous aurez un abonné SSE qui écoute les nouveaux événements de sortie de FXMacroData en temps réel, filtre les devises et les indicateurs qui vous intéressent, puis extrait la charge utile complète de l'indicateur de l 'API REST dès la publication d'une version.
Pré-requis
- Une clé API FXMacroData si vous voulez des flux non USD; les flux uniquement USD fonctionnent sans authentification
- Un environnement de navigateur pour le
EventSourceexemple, ou Python 3.9+ pour l'exemple du travailleur - Le
requestsLe programme est installé si vous voulez suivre l'exemple Python (pip install requests) - Une compréhension de base des points de fin d'annonce tels que Documents sur l'inflation en USD Je suis désolé . Taux de référence en euros
Étape 1 Comprendre ce que le volet SSE offre
Le nouveau point de terminaison SSE ouvre une connexion HTTP à longue durée de vie et envoie un événement chaque fois que FXMacroData ingère une nouvelle version économique.
https://api.fxmacrodata.com/v1/stream/events
api.fxmacrodata.com Je ne veux pas . fxmacrodata.com/api/…), qui va directement au serveur API sans tampon CDN qui interromprait le flux en direct.Vous pouvez réduire le flux avec deux paramètres de requête optionnels:
currenciesune liste séparée par des virgules, par exempleusd,eurindicatorsune liste séparée par des virgules, par exempleinflation,policy_rate
Pour les devises autres que l'USD, passez votre clé API comme paramètre de requête.
curl -N "https://api.fxmacrodata.com/v1/stream/events?currencies=eur,gbp&indicators=inflation,policy_rate&api_key=YOUR_API_KEY"
Si vous vous connectez sans filtres et sans clé professionnelle, le flux est automatiquement limité aux événements USD. Cela rend SSE utile même pour une preuve de concept gratuite avant de câbler une couverture plus large.
Comportement important
- Le flux est un déclencheur, pas l'ensemble complet de données. Chaque message SSE vous indique qu'une version est arrivée; vous appelez toujours le point d'arrivée d'annonce correspondant pour récupérer les enregistrements complets.
- Le serveur envoie des battements de cœur. Les connexions inactives reçoivent un
: heartbeatLes commentaires sont tous les 15 secondes pour que les proxies ne ferment pas le flux. - Chaque événement comporte une pièce d'identité. Cette identité est ce que les pouvoirs de répliquer sur la reconnexion via
Last-Event-IDJe suis désolé .
Étape 2 Connaître le format de l'événement avant d'écrire un client
FXMacroData émet des trames standard W3C EventSource. Un message typique ressemble à ceci:
id: usd_inflation_1772109000
event: announcement
data: {"event_id": "usd_inflation_1772109000", "currency": "usd", "indicator": "inflation", "records_written": 1, "timestamp": 1772109002}
Les champs de charge utile sont intentionnellement petits et opérationnels:
event_ididentifiant déterministe dans le formulaire{currency}_{indicator}_{timestamp}currencycode de monnaie en minuscules tel queusdou ...eurindicatorl'indicateur FXMacroData, par exempleinflationJe suis désolé .policy_rate, ounon_farm_payrollsrecords_writtennombre de nouveaux enregistrements enregistrés lors de l'ingestiontimestampMarque d'heure Unix pour quand l'événement a été publié sur le flux
Étant donné que le message SSE ne contient pas la série temporelle complète, le schéma normal est le suivant: écouter l'événement, puis appeler le point final concerné sous Les données de l'API pour récupérer les données de libération nouvellement disponibles.
Étape 3 Ouvrez le flux dans un navigateur avec EventSource
Si vous créez un tableau de bord, un notificateur basé sur un navigateur ou une page de surveillance interne, native
EventSource Utilisez la chaîne de requête pour authentifier afin que la connexion fonctionne dans le navigateur sans en-têtes personnalisés.
const apiKey = "YOUR_API_KEY";
const streamUrl = new URL("https://api.fxmacrodata.com/v1/stream/events");
streamUrl.searchParams.set("currencies", "eur,gbp");
streamUrl.searchParams.set("indicators", "inflation,policy_rate");
streamUrl.searchParams.set("api_key", apiKey);
const source = new EventSource(streamUrl);
source.addEventListener("announcement", async (event) => {
const payload = JSON.parse(event.data);
console.log("Release received", payload);
const dataUrl = new URL(
`https://api.fxmacrodata.com/v1/announcements/${payload.currency}/${payload.indicator}`
);
dataUrl.searchParams.set("api_key", apiKey);
const response = await fetch(dataUrl);
const records = await response.json();
const latest = records[records.length - 1];
console.log("Latest record", latest);
});
source.onerror = (error) => {
console.error("SSE connection problem", error);
};
Je suis natif . EventSource Il se reconnecte automatiquement lorsque la connexion tombe. id: Dans le champ de chaque message, le navigateur inclura automatiquement le dernier ID d'événement reçu lors des reconnexions, ce qui permet au serveur de rejouer les événements tamponnés que vous avez manqués.
Pourquoi ce modèle fonctionne bien
Votre page reste inactive jusqu'à ce qu'une version arrive. Il n'y a pas de sondage de cinq minutes, pas de demandes gaspillées, et pas d'écart de temps entre la publication et votre logique de rafraîchissement. C'est particulièrement utile pour les versions très sensibles telles que Liste des salariés non agricoles en USD ou ... Inflation en eurosJe suis désolé .
Étape 4 Retrouvez la charge utile complète de l'indicateur après chaque alerte
Le point de conception le plus important est que le SSE vous le dit. quand ? Pour aller chercher, pas Tout ce que tu veux . Après l'arrivée d'un événement, appelez le point d'arrière d'annonce correspondant et inspectez le dernier enregistrement.
async function fetchLatestRelease(currency, indicator, apiKey) {
const url = new URL(`https://api.fxmacrodata.com/v1/announcements/${currency}/${indicator}`);
if (currency !== "usd") {
url.searchParams.set("api_key", apiKey);
}
const response = await fetch(url);
if (!response.ok) {
throw new Error(`Announcement fetch failed: ${response.status}`);
}
const records = await response.json();
return records[records.length - 1] ?? null;
}
source.addEventListener("announcement", async (event) => {
const payload = JSON.parse(event.data);
const latest = await fetchLatestRelease(payload.currency, payload.indicator, apiKey);
if (latest) {
console.log(`New ${payload.currency.toUpperCase()} ${payload.indicator}`, latest);
}
});
Cela permet de garder le flux léger tout en préservant la fidélité totale des terminaux REST. Cela signifie également que le même code d'analyse en aval peut être réutilisé que votre déclencheur provienne de SSE, d'un planificateur ou d'une appel API manuel.
Étape 5 Construire un travailleur Python avec une répétition explicite sur reconnect
Pour un démon ou un bot côté serveur, il est souvent utile de contrôler la logique de reconnexion directement et d'envoyer le
Last-Event-ID L'exemple ci-dessous écoute les événements d'annonce, stocke l'ID de dernière vue et reproduit les évènements tamponnés manqués après une déconnexion.
import json
import time
import requests
API_KEY = "YOUR_API_KEY"
STREAM_URL = (
"https://api.fxmacrodata.com/v1/stream/events"
"?currencies=eur,gbp&indicators=inflation,policy_rate&api_key=" + API_KEY
)
def fetch_latest_release(currency: str, indicator: str) -> dict | None:
url = f"https://api.fxmacrodata.com/v1/announcements/{currency}/{indicator}"
params = {"api_key": API_KEY} if currency != "usd" else {}
response = requests.get(url, params=params, timeout=20)
response.raise_for_status()
records = response.json()
return records[-1] if records else None
def consume_stream() -> None:
last_event_id = None
while True:
headers = {"Accept": "text/event-stream"}
if last_event_id:
headers["Last-Event-ID"] = last_event_id
try:
with requests.get(STREAM_URL, headers=headers, stream=True, timeout=90) as response:
response.raise_for_status()
event = {}
for raw_line in response.iter_lines(decode_unicode=True):
if raw_line is None:
continue
line = raw_line.strip()
if not line:
if event.get("event") == "announcement" and event.get("data"):
payload = json.loads(event["data"])
last_event_id = event.get("id") or payload["event_id"]
latest = fetch_latest_release(
payload["currency"],
payload["indicator"],
)
print("Announcement event", payload)
print("Latest record", latest)
event = {}
continue
if line.startswith(":"):
continue
field, _, value = line.partition(":")
event[field] = value.lstrip()
except requests.RequestException as exc:
print(f"Stream disconnected: {exc}. Reconnecting in 3 seconds...")
time.sleep(3)
if __name__ == "__main__":
consume_stream()
Si la connexion tombe après qu'un événement a été publié mais avant que votre travailleur ne le traite, la prochaine demande comprend le dernier identifiant reçu et le serveur répète les événements tamponnés qui sont venus après.
Étape 6 Décidez de ce que vous voulez faire quand un événement arrive
Une fois le courant branché, le choix de conception réel est ce qui se passe en aval.
Ressouffler une carte de tableau de bord
Quand un événement correspondant arrive, récupérez le dernier enregistrement et redessinez un panneau au lieu de recharger la page entière.
Déclencher un flux de travail de négociation ou d'alerte
Poussez l'événement dans Slack, e-mail, ou une file d'attente de trading après avoir comparé la dernière version avec vos seuils.
On a un cache à chauffer .
Utiliser l'ESS comme déclencheur d'invalidation, puis mettre à jour le cache des indicateurs affectés uniquement lorsque de nouvelles données sont confirmées.
Combiner avec le calendrier de sortie
Utiliser le guide du calendrier de sortie pour savoir ce qui est prévu ensuite, puis garder SSE ouvert comme la couche de confirmation en direct.
Étape 7 Traiter quelques cas de bord à l'avance
L'ESS est simple, mais l'utilisation en production bénéficie encore de quelques règles:
- Ne présumez pas qu'un événement équivaut à une valeur.
records_writtenpeut être supérieure à un si une version met à jour plus d'un enregistrement. - Attends de la reconnexion. Les navigateurs, les proxies et les réseaux mobiles interrompent parfois les connexions de longue durée; le support de lecture existe pour cette raison.
- Gardez le ruisseau étroit autant que possible. Filtration par
currenciesJe suis désolé .indicatorsréduit le bruit et évite les prises inutiles en aval. - Laissez REST comme source de vérité. Le flux vous indique qu'une sortie s'est produite; l'enregistrement canonique provient toujours du point final d'annonce correspondant.
Ce que vous avez construit
Vous avez maintenant le modèle de base axé sur les événements pour les flux de travail en temps réel FXMacroData: abonnez-vous à
https://api.fxmacrodata.com/v1/stream/events, filtrer vers les monnaies et les indicateurs qui comptent, réagir à
announcement Les événements arrivent, et récupèrent les données de sortie complètes seulement lorsque le flux vous dit que quelque chose de nouveau est arrivé.
Une prochaine étape naturelle est de combiner l'ESS avec le Guide de planification du calendrier de sortie Vous savez donc ce qui est prévu ensuite et vous recevez toujours un signal immédiat lorsque la publication a lieu.
L'équipe FXMacroData