Implementation

How-To Guides

How to Use FXMacroData SSE Streams for Real-Time Macro Data

A step-by-step guide to building a working SSE subscriber that listens for new FXMacroData release events in real time, filters for the currencies and indicators you care about, and pulls the full indicator payload from the REST API.

Também disponível em English
Share article X LinkedIn Email

No final deste guia, você terá um assinante SSE que ouve novos eventos de lançamento do FXMacroData em tempo real, filtros para as moedas e indicadores que lhe interessam e, em seguida, extrai a carga útil completa do indicador da API REST no momento em que um lançamiento é publicado.

Requisitos prévios

  • Uma chave de API FXMacroData se você quiser fluxos não-USD; fluxos somente USD funcionam sem autenticação
  • Um ambiente de navegador para o EventSource exemplo, ou Python 3.9+ para o exemplo do trabalhador
  • O ... requests O pacote está instalado se quiser seguir o exemplo Python (pip install requests)
  • Uma compreensão básica dos pontos finais de anúncio, tais como Docs de inflação em USD E ... Docs de taxas de juro de política monetária

Passo 1 Entender o que o fluxo SSE oferece

O novo endpoint SSE abre uma conexão HTTP de longa duração e empurra um evento toda vez que o FXMacroData ingere uma versão econômica fresca.

https://api.fxmacrodata.com/v1/stream/events
Importante utilizar o domínio direto da API: O SSE requer uma ligação directa. api.fxmacrodata.com Não . fxmacrodata.com/api/…), que vai direto para o servidor API sem CDN buffering que iria interromper o fluxo ao vivo.

Pode reduzir o feed com dois parâmetros opcionais de consulta:

  • currencies uma lista separada por vírgulas, como usd,eur
  • indicators uma lista separada por vírgulas, como inflation,policy_rate

Para moedas que não sejam USD, passe sua chave API como um parâmetro de consulta.

curl -N "https://api.fxmacrodata.com/v1/stream/events?currencies=eur,gbp&indicators=inflation,policy_rate&api_key=YOUR_API_KEY"

Se você se conectar sem filtros e sem uma chave Profissional, o fluxo é automaticamente restrito a eventos USD. Isso torna o SSE útil mesmo para uma prova de conceito gratuita antes de conectar uma cobertura mais ampla.

Comportamento importante

  • O fluxo é um gatilho, não o conjunto de dados completo. Cada mensagem SSE diz que uma versão foi lançada; você ainda chama o ponto final de anúncio correspondente para obter os registros completos.
  • O servidor envia batimentos cardíacos. As ligações inactivas recebem um : heartbeat Comentam aproximadamente a cada 15 segundos para que os proxies não fechem o fluxo.
  • Cada evento inclui uma identificação. Essa identificação é o que os poderes reproduzem quando se reconectar através do Last-Event-ID- Não .

Passo 2 Conheça o formato do evento antes de escrever um cliente

A FXMacroData emite quadros padrão do W3C EventSource. Uma mensagem típica parece com isto:

id: usd_inflation_1772109000
event: announcement
data: {"event_id": "usd_inflation_1772109000", "currency": "usd", "indicator": "inflation", "records_written": 1, "timestamp": 1772109002}

Os campos de carga útil são intencionalmente pequenos e operacionais:

  • event_id identificador determinístico no formulário {currency}_{indicator}_{timestamp}
  • currency código de moeda em minúsculas, como usd Ou ... eur
  • indicator o indicador FXMacroData slug, por exemplo inflation- Não . policy_rateOu ... non_farm_payrolls
  • records_written número de novos registos salvos durante a ingestão
  • timestamp Marca de tempo Unix para quando o evento foi publicado no stream

Uma vez que a mensagem SSE não contém a série temporal completa, o padrão normal é: ouvir o evento, depois chamar o ponto final relevante em Documentação de dados da API para recuperar os dados de liberação recém-disponíveis.


Passo 3 Abra o fluxo em um navegador com o EventSource

Se você está criando um painel, notificador baseado em navegador, ou página de monitoramento interno, nativa EventSource Use a cadeia de consulta para auth para que a conexão funcione no navegador sem cabeçalhos personalizados.

const apiKey = "YOUR_API_KEY";

const streamUrl = new URL("https://api.fxmacrodata.com/v1/stream/events");
streamUrl.searchParams.set("currencies", "eur,gbp");
streamUrl.searchParams.set("indicators", "inflation,policy_rate");
streamUrl.searchParams.set("api_key", apiKey);

const source = new EventSource(streamUrl);

source.addEventListener("announcement", async (event) => {
  const payload = JSON.parse(event.data);
  console.log("Release received", payload);

  const dataUrl = new URL(
    `https://api.fxmacrodata.com/v1/announcements/${payload.currency}/${payload.indicator}`
  );
  dataUrl.searchParams.set("api_key", apiKey);

  const response = await fetch(dataUrl);
  const records = await response.json();
  const latest = records[records.length - 1];

  console.log("Latest record", latest);
});

source.onerror = (error) => {
  console.error("SSE connection problem", error);
};

Nativo . EventSource A rede de dados de FXMacroData é automaticamente religada quando a ligação é interrompida. id: campo com cada mensagem, o navegador incluirá automaticamente o ID do último evento recebido durante as reconnectas, o que permite ao servidor reproduzir eventos tamponados que você perdeu.

Por que este padrão funciona bem

A sua página fica inactiva até que uma versão realmente aterrisse. Não há poller de cinco minutos, não há pedidos desperdiçados e não há diferença de tempo entre a publicação e a sua lógica de atualização. Isso é especialmente útil para versões de alta sensibilidade como Lista de salários não agrícolas em USD Ou ... Inflação em euros- Não .


Passo 4 Obter a carga útil do indicador completa após cada alerta

O ponto mais importante do projeto é que o SSE diz-lhe . Quando? Para ir buscar, não. Tudo . Depois de um evento chegar, ligue para o ponto final de anúncio correspondente e inspecione o registro mais recente.

async function fetchLatestRelease(currency, indicator, apiKey) {
  const url = new URL(`https://api.fxmacrodata.com/v1/announcements/${currency}/${indicator}`);

  if (currency !== "usd") {
    url.searchParams.set("api_key", apiKey);
  }

  const response = await fetch(url);
  if (!response.ok) {
    throw new Error(`Announcement fetch failed: ${response.status}`);
  }

  const records = await response.json();
  return records[records.length - 1] ?? null;
}

source.addEventListener("announcement", async (event) => {
  const payload = JSON.parse(event.data);
  const latest = await fetchLatestRelease(payload.currency, payload.indicator, apiKey);

  if (latest) {
    console.log(`New ${payload.currency.toUpperCase()} ${payload.indicator}`, latest);
  }
});

Isso mantém o fluxo leve, preservando a total fidelidade dos endpoints REST. Isso também significa que o mesmo código de análise a jusante pode ser reutilizado, independentemente de seu gatilho vir de SSE, um agendador ou uma chamada manual de API.


Passo 5 Construir um Python worker com replay explícito no reconnect

Para um daemon ou bot do lado do servidor, é muitas vezes útil controlar a lógica de reconexão diretamente e enviar o Last-Event-ID O exemplo abaixo ouve eventos de anúncio, armazena o ID visto pela última vez e reproduz eventos perdidos em buffer após uma desconexão.

import json
import time
import requests

API_KEY = "YOUR_API_KEY"
STREAM_URL = (
    "https://api.fxmacrodata.com/v1/stream/events"
    "?currencies=eur,gbp&indicators=inflation,policy_rate&api_key=" + API_KEY
)


def fetch_latest_release(currency: str, indicator: str) -> dict | None:
    url = f"https://api.fxmacrodata.com/v1/announcements/{currency}/{indicator}"
    params = {"api_key": API_KEY} if currency != "usd" else {}
    response = requests.get(url, params=params, timeout=20)
    response.raise_for_status()
    records = response.json()
    return records[-1] if records else None


def consume_stream() -> None:
    last_event_id = None

    while True:
        headers = {"Accept": "text/event-stream"}
        if last_event_id:
            headers["Last-Event-ID"] = last_event_id

        try:
            with requests.get(STREAM_URL, headers=headers, stream=True, timeout=90) as response:
                response.raise_for_status()

                event = {}
                for raw_line in response.iter_lines(decode_unicode=True):
                    if raw_line is None:
                        continue

                    line = raw_line.strip()

                    if not line:
                        if event.get("event") == "announcement" and event.get("data"):
                            payload = json.loads(event["data"])
                            last_event_id = event.get("id") or payload["event_id"]

                            latest = fetch_latest_release(
                                payload["currency"],
                                payload["indicator"],
                            )
                            print("Announcement event", payload)
                            print("Latest record", latest)

                        event = {}
                        continue

                    if line.startswith(":"):
                        continue

                    field, _, value = line.partition(":")
                    event[field] = value.lstrip()

        except requests.RequestException as exc:
            print(f"Stream disconnected: {exc}. Reconnecting in 3 seconds...")
            time.sleep(3)


if __name__ == "__main__":
    consume_stream()

Este é o padrão certo quando você precisa de repetição determinista através de interrupções temporárias. Se a conexão cair após um evento ser publicado, mas antes de seu trabalhador processá-lo, a próxima solicitação inclui o último ID recebido e o servidor repete eventos tamponados que vieram depois dele.


Passo 6 Decida o que quer fazer quando um evento acontecer

Uma vez que o fluxo é ligado, a escolha de design real é o que acontece a jusante.

Refazer um cartão do painel

Quando um evento de correspondência chegar, busque o último registro e redesenhe um painel em vez de recarregar a página inteira.

Ativar um fluxo de trabalho de negociação ou alerta

Empurre o evento para o Slack, e-mail ou fila de negociação depois de comparar a versão mais recente com seus limiares.

Aqueça um cache.

Utilize o SSE como gatilho de invalidação e atualize o cache do indicador afetado apenas quando novos dados forem confirmados.

Combinar com o calendário de lançamento

Utilize O guia do calendário de lançamento para saber o que está programado a seguir, mantenha o SSE aberto como a camada de confirmação em tempo real.


Passo 7 Manter alguns casos de borda na frente

A EES é simples, mas a utilização na produção ainda beneficia de algumas regras:

  • Não assuma que um evento é igual a um valor. records_written Pode ser maior que um se uma versão atualizar mais de um registro.
  • Esperem que se reconecte. Os navegadores, proxies e redes móveis ocasionalmente quebram conexões de longa duração; o suporte de repetição existe exatamente por essa razão.
  • Mantenha o riacho estreito sempre que possível. Filtragem por currencies E ... indicators reduz o ruído e evita a recolha desnecessária a jusante.
  • Deixe o REST como a fonte da verdade. O fluxo diz que uma liberação aconteceu; o registro canônico ainda vem do ponto final de anúncio correspondente.
Regra prática: Se o seu aplicativo perdeu uma janela de interrupção longa e o buffer de repetição não é mais suficiente, volte a uma busca REST normal para os indicadores que você rastreia.

O que construíste

Agora tem o padrão de eventos baseados em fluxos de trabalho em tempo real do FXMacroData: subscreva https://api.fxmacrodata.com/v1/stream/events, filtrar para as moedas e indicadores que importam, reagir a announcement eventos à medida que chegam, e obter os dados de lançamento completo apenas quando o fluxo diz-lhe algo novo aterrissou.

O próximo passo natural é combinar a SSE com o Guia de programação do calendário de lançamento Assim, você sabe o que está previsto a seguir e ainda recebe um sinal imediato quando a publicação realmente acontece.


A equipa FXMacroData

Blogroll

AI Answer-Ready

Key Facts

Page
How To Use FXmacrodata Sse Streams
Section
Articles
Canonical URL
https://fxmacrodata.com/articles/how-to-use-fxmacrodata-sse-streams
Source
FXMacroData editorial and official publisher references
Last Updated
2026-06-15 11:01 UTC

Provenance And Trust

Cite the canonical URL and source field above. Where available, this page maps to official publisher releases and timestamped updates.

Quick Q&A

What is this page about? This page explains How To Use FXmacrodata Sse Streams with directly usable context for trading, research, and API workflows.

What source should be cited? Use the canonical URL and the listed source field; cite official publisher references when available.

How fresh is this content? The last updated value above reflects the page metadata or latest available data timestamp.

Can this be used in AI assistants? Yes. This section is intentionally structured for retrieval and citation in chat assistants.

Prompt Packs

Use these in ChatGPT, Claude, Gemini, Mistral, Perplexity, or Grok for consistent source-aware outputs.

Share page X LinkedIn Email