No final deste guia, você terá um assinante SSE que ouve novos eventos de lançamento do FXMacroData em tempo real, filtros para as moedas e indicadores que lhe interessam e, em seguida, extrai a carga útil completa do indicador da API REST no momento em que um lançamiento é publicado.
Requisitos prévios
- Uma chave de API FXMacroData se você quiser fluxos não-USD; fluxos somente USD funcionam sem autenticação
- Um ambiente de navegador para o
EventSourceexemplo, ou Python 3.9+ para o exemplo do trabalhador - O ...
requestsO pacote está instalado se quiser seguir o exemplo Python (pip install requests) - Uma compreensão básica dos pontos finais de anúncio, tais como Docs de inflação em USD E ... Docs de taxas de juro de política monetária
Passo 1 Entender o que o fluxo SSE oferece
O novo endpoint SSE abre uma conexão HTTP de longa duração e empurra um evento toda vez que o FXMacroData ingere uma versão econômica fresca.
https://api.fxmacrodata.com/v1/stream/events
api.fxmacrodata.com Não . fxmacrodata.com/api/…), que vai direto para o servidor API sem CDN buffering que iria interromper o fluxo ao vivo.Pode reduzir o feed com dois parâmetros opcionais de consulta:
currenciesuma lista separada por vírgulas, comousd,eurindicatorsuma lista separada por vírgulas, comoinflation,policy_rate
Para moedas que não sejam USD, passe sua chave API como um parâmetro de consulta.
curl -N "https://api.fxmacrodata.com/v1/stream/events?currencies=eur,gbp&indicators=inflation,policy_rate&api_key=YOUR_API_KEY"
Se você se conectar sem filtros e sem uma chave Profissional, o fluxo é automaticamente restrito a eventos USD. Isso torna o SSE útil mesmo para uma prova de conceito gratuita antes de conectar uma cobertura mais ampla.
Comportamento importante
- O fluxo é um gatilho, não o conjunto de dados completo. Cada mensagem SSE diz que uma versão foi lançada; você ainda chama o ponto final de anúncio correspondente para obter os registros completos.
- O servidor envia batimentos cardíacos. As ligações inactivas recebem um
: heartbeatComentam aproximadamente a cada 15 segundos para que os proxies não fechem o fluxo. - Cada evento inclui uma identificação. Essa identificação é o que os poderes reproduzem quando se reconectar através do
Last-Event-ID- Não .
Passo 2 Conheça o formato do evento antes de escrever um cliente
A FXMacroData emite quadros padrão do W3C EventSource. Uma mensagem típica parece com isto:
id: usd_inflation_1772109000
event: announcement
data: {"event_id": "usd_inflation_1772109000", "currency": "usd", "indicator": "inflation", "records_written": 1, "timestamp": 1772109002}
Os campos de carga útil são intencionalmente pequenos e operacionais:
event_ididentificador determinístico no formulário{currency}_{indicator}_{timestamp}currencycódigo de moeda em minúsculas, comousdOu ...eurindicatoro indicador FXMacroData slug, por exemploinflation- Não .policy_rateOu ...non_farm_payrollsrecords_writtennúmero de novos registos salvos durante a ingestãotimestampMarca de tempo Unix para quando o evento foi publicado no stream
Uma vez que a mensagem SSE não contém a série temporal completa, o padrão normal é: ouvir o evento, depois chamar o ponto final relevante em Documentação de dados da API para recuperar os dados de liberação recém-disponíveis.
Passo 3 Abra o fluxo em um navegador com o EventSource
Se você está criando um painel, notificador baseado em navegador, ou página de monitoramento interno, nativa
EventSource Use a cadeia de consulta para auth para que a conexão funcione no navegador sem cabeçalhos personalizados.
const apiKey = "YOUR_API_KEY";
const streamUrl = new URL("https://api.fxmacrodata.com/v1/stream/events");
streamUrl.searchParams.set("currencies", "eur,gbp");
streamUrl.searchParams.set("indicators", "inflation,policy_rate");
streamUrl.searchParams.set("api_key", apiKey);
const source = new EventSource(streamUrl);
source.addEventListener("announcement", async (event) => {
const payload = JSON.parse(event.data);
console.log("Release received", payload);
const dataUrl = new URL(
`https://api.fxmacrodata.com/v1/announcements/${payload.currency}/${payload.indicator}`
);
dataUrl.searchParams.set("api_key", apiKey);
const response = await fetch(dataUrl);
const records = await response.json();
const latest = records[records.length - 1];
console.log("Latest record", latest);
});
source.onerror = (error) => {
console.error("SSE connection problem", error);
};
Nativo . EventSource A rede de dados de FXMacroData é automaticamente religada quando a ligação é interrompida. id: campo com cada mensagem, o navegador incluirá automaticamente o ID do último evento recebido durante as reconnectas, o que permite ao servidor reproduzir eventos tamponados que você perdeu.
Por que este padrão funciona bem
A sua página fica inactiva até que uma versão realmente aterrisse. Não há poller de cinco minutos, não há pedidos desperdiçados e não há diferença de tempo entre a publicação e a sua lógica de atualização. Isso é especialmente útil para versões de alta sensibilidade como Lista de salários não agrícolas em USD Ou ... Inflação em euros- Não .
Passo 4 Obter a carga útil do indicador completa após cada alerta
O ponto mais importante do projeto é que o SSE diz-lhe . Quando? Para ir buscar, não. Tudo . Depois de um evento chegar, ligue para o ponto final de anúncio correspondente e inspecione o registro mais recente.
async function fetchLatestRelease(currency, indicator, apiKey) {
const url = new URL(`https://api.fxmacrodata.com/v1/announcements/${currency}/${indicator}`);
if (currency !== "usd") {
url.searchParams.set("api_key", apiKey);
}
const response = await fetch(url);
if (!response.ok) {
throw new Error(`Announcement fetch failed: ${response.status}`);
}
const records = await response.json();
return records[records.length - 1] ?? null;
}
source.addEventListener("announcement", async (event) => {
const payload = JSON.parse(event.data);
const latest = await fetchLatestRelease(payload.currency, payload.indicator, apiKey);
if (latest) {
console.log(`New ${payload.currency.toUpperCase()} ${payload.indicator}`, latest);
}
});
Isso mantém o fluxo leve, preservando a total fidelidade dos endpoints REST. Isso também significa que o mesmo código de análise a jusante pode ser reutilizado, independentemente de seu gatilho vir de SSE, um agendador ou uma chamada manual de API.
Passo 5 Construir um Python worker com replay explícito no reconnect
Para um daemon ou bot do lado do servidor, é muitas vezes útil controlar a lógica de reconexão diretamente e enviar o
Last-Event-ID O exemplo abaixo ouve eventos de anúncio, armazena o ID visto pela última vez e reproduz eventos perdidos em buffer após uma desconexão.
import json
import time
import requests
API_KEY = "YOUR_API_KEY"
STREAM_URL = (
"https://api.fxmacrodata.com/v1/stream/events"
"?currencies=eur,gbp&indicators=inflation,policy_rate&api_key=" + API_KEY
)
def fetch_latest_release(currency: str, indicator: str) -> dict | None:
url = f"https://api.fxmacrodata.com/v1/announcements/{currency}/{indicator}"
params = {"api_key": API_KEY} if currency != "usd" else {}
response = requests.get(url, params=params, timeout=20)
response.raise_for_status()
records = response.json()
return records[-1] if records else None
def consume_stream() -> None:
last_event_id = None
while True:
headers = {"Accept": "text/event-stream"}
if last_event_id:
headers["Last-Event-ID"] = last_event_id
try:
with requests.get(STREAM_URL, headers=headers, stream=True, timeout=90) as response:
response.raise_for_status()
event = {}
for raw_line in response.iter_lines(decode_unicode=True):
if raw_line is None:
continue
line = raw_line.strip()
if not line:
if event.get("event") == "announcement" and event.get("data"):
payload = json.loads(event["data"])
last_event_id = event.get("id") or payload["event_id"]
latest = fetch_latest_release(
payload["currency"],
payload["indicator"],
)
print("Announcement event", payload)
print("Latest record", latest)
event = {}
continue
if line.startswith(":"):
continue
field, _, value = line.partition(":")
event[field] = value.lstrip()
except requests.RequestException as exc:
print(f"Stream disconnected: {exc}. Reconnecting in 3 seconds...")
time.sleep(3)
if __name__ == "__main__":
consume_stream()
Este é o padrão certo quando você precisa de repetição determinista através de interrupções temporárias. Se a conexão cair após um evento ser publicado, mas antes de seu trabalhador processá-lo, a próxima solicitação inclui o último ID recebido e o servidor repete eventos tamponados que vieram depois dele.
Passo 6 Decida o que quer fazer quando um evento acontecer
Uma vez que o fluxo é ligado, a escolha de design real é o que acontece a jusante.
Refazer um cartão do painel
Quando um evento de correspondência chegar, busque o último registro e redesenhe um painel em vez de recarregar a página inteira.
Ativar um fluxo de trabalho de negociação ou alerta
Empurre o evento para o Slack, e-mail ou fila de negociação depois de comparar a versão mais recente com seus limiares.
Aqueça um cache.
Utilize o SSE como gatilho de invalidação e atualize o cache do indicador afetado apenas quando novos dados forem confirmados.
Combinar com o calendário de lançamento
Utilize O guia do calendário de lançamento para saber o que está programado a seguir, mantenha o SSE aberto como a camada de confirmação em tempo real.
Passo 7 Manter alguns casos de borda na frente
A EES é simples, mas a utilização na produção ainda beneficia de algumas regras:
- Não assuma que um evento é igual a um valor.
records_writtenPode ser maior que um se uma versão atualizar mais de um registro. - Esperem que se reconecte. Os navegadores, proxies e redes móveis ocasionalmente quebram conexões de longa duração; o suporte de repetição existe exatamente por essa razão.
- Mantenha o riacho estreito sempre que possível. Filtragem por
currenciesE ...indicatorsreduz o ruído e evita a recolha desnecessária a jusante. - Deixe o REST como a fonte da verdade. O fluxo diz que uma liberação aconteceu; o registro canônico ainda vem do ponto final de anúncio correspondente.
O que construíste
Agora tem o padrão de eventos baseados em fluxos de trabalho em tempo real do FXMacroData: subscreva
https://api.fxmacrodata.com/v1/stream/events, filtrar para as moedas e indicadores que importam, reagir a
announcement eventos à medida que chegam, e obter os dados de lançamento completo apenas quando o fluxo diz-lhe algo novo aterrissou.
O próximo passo natural é combinar a SSE com o Guia de programação do calendário de lançamento Assim, você sabe o que está previsto a seguir e ainda recebe um sinal imediato quando a publicação realmente acontece.
A equipa FXMacroData