Implementation

How-To Guides

How to Use FXMacroData SSE Streams for Real-Time Macro Data

A step-by-step guide to building a working SSE subscriber that listens for new FXMacroData release events in real time, filters for the currencies and indicators you care about, and pulls the full indicator payload from the REST API.

其他语言版本 English
Share article X LinkedIn Email

通过此指南,您将有一个可用的SSE订阅者,它可以实时收听新FXMacroData发布事件,对您感兴趣的货币和指标进行过,然后在发布发布时从REST API中提取全部指标有效负载.

预先要求

  • 如果您想要非美元流,则需要FXMacroData API 密钥;只有美元流在没有身份验证的情况下工作
  • 浏览器环境 EventSource 工作者示例,或Python 3.9+
  • 没有什么. requests 如果您想遵循Python示例 (pip install requests) 没有
  • 基本了解公告终点,如 美元通货膨胀数据 现在我 欧元政策利率文件

了解SSE流程所提供的内容

新的SSE终点打开一个长期的HTTP连接,并在FXMacroData每次摄入新经济版本时推送一个事件.终点是:

https://api.fxmacrodata.com/v1/stream/events
重要的是使用直接API域: 需要直接连接. api.fxmacrodata.com (不是 fxmacrodata.com/api/…直接传输到API服务器, 没有CDN缓冲,

您可以通过两个可选查询参数缩小输入范围:

  • currencies 像是 像 像 usd,eur
  • indicators 像是 像 像 inflation,policy_rate

对于非美元货币,将您的API键传输为查询参数. 例如:

curl -N "https://api.fxmacrodata.com/v1/stream/events?currencies=eur,gbp&indicators=inflation,policy_rate&api_key=YOUR_API_KEY"

如果您连接没有过器和没有专业密钥,流自动限制在美元事件. 这使SSE甚至在您在更广泛的覆盖范围内进行免费概念验证时也很有用.

重要行为

  • 流量是一个触发器,而不是整个数据集. 每个SSE消息都告诉你发布了;你仍然需要调用匹配的公告终点来获取完整的记录.
  • 服务器发送心跳. 无效连接接收到一个 : heartbeat 评论大约每15秒就会出现,
  • 每个事件都包含一个身份证. 通过重新连接, Last-Event-ID现在我们要做什么?

步骤2 写客户端之前,要知道事件格式

FXMacroData发出标准的W3C事件源框架.典型的消息看起来像这样:

id: usd_inflation_1772109000
event: announcement
data: {"event_id": "usd_inflation_1772109000", "currency": "usd", "indicator": "inflation", "records_written": 1, "timestamp": 1772109002}

有效载荷场是故意小的,可操作的:

  • event_id 确定性标识符 {currency}_{indicator}_{timestamp}
  • currency 小写货币代码,如 usd 没有 eur
  • indicator 例如FXMacroData指标 inflation没有人知道. policy_rate没有 non_farm_payrolls
  • records_written 在摄入过程中保存的新记录数量
  • timestamp 时刻显示事件在流中发布时

由于SSE消息不包含完整的时间序列,正常模式是:听取事件,然后在 API数据文件 获取新获取的发布数据.


步骤3 用 EventSource 在浏览器中打开流

您正在建立一个仪表板,基于浏览器的通知或内部监控页面, EventSource 是消耗流的最简单方法. 使用查询字符串为 auth,以便连接在浏览器中工作,而不需要自定义头.

const apiKey = "YOUR_API_KEY";

const streamUrl = new URL("https://api.fxmacrodata.com/v1/stream/events");
streamUrl.searchParams.set("currencies", "eur,gbp");
streamUrl.searchParams.set("indicators", "inflation,policy_rate");
streamUrl.searchParams.set("api_key", apiKey);

const source = new EventSource(streamUrl);

source.addEventListener("announcement", async (event) => {
  const payload = JSON.parse(event.data);
  console.log("Release received", payload);

  const dataUrl = new URL(
    `https://api.fxmacrodata.com/v1/announcements/${payload.currency}/${payload.indicator}`
  );
  dataUrl.searchParams.set("api_key", apiKey);

  const response = await fetch(dataUrl);
  const records = await response.json();
  const latest = records[records.length - 1];

  console.log("Latest record", latest);
});

source.onerror = (error) => {
  console.error("SSE connection problem", error);
};

美国人 EventSource 由于FXMacroData会发送一个 id: 在重新连接时,浏览器会自动包含最后接收的事件ID,

这种模式为什么很有效

您的页面将停留在空中,直到发布实际登陆. 没有五分钟的投票,没有浪费的请求,以及发布和更新逻辑之间的时间差距. 这对于高敏感性发布特别有用. 美元非农业工资 没有 欧元通货膨胀现在我们要做什么?


步骤4 在每个报警后获取全指标有效载荷

设计中最重要的一点是SSE告诉你 什么时候 为了找,而不是 一切都没有 显示. 事件到达后,调用匹配公告终点,检查最新记录.

async function fetchLatestRelease(currency, indicator, apiKey) {
  const url = new URL(`https://api.fxmacrodata.com/v1/announcements/${currency}/${indicator}`);

  if (currency !== "usd") {
    url.searchParams.set("api_key", apiKey);
  }

  const response = await fetch(url);
  if (!response.ok) {
    throw new Error(`Announcement fetch failed: ${response.status}`);
  }

  const records = await response.json();
  return records[records.length - 1] ?? null;
}

source.addEventListener("announcement", async (event) => {
  const payload = JSON.parse(event.data);
  const latest = await fetchLatestRelease(payload.currency, payload.indicator, apiKey);

  if (latest) {
    console.log(`New ${payload.currency.toUpperCase()} ${payload.indicator}`, latest);
  }
});

这使流量保持轻量化,同时保持REST终端的完整保真性.这也意味着无论您的触发来自SSE,调度器还是手动API调用,都可以重复使用相同的下游解析代码.


步骤 5 构建一个Python工作者,在重新连接时具有明确的重播

对于服务器端的机器人或机器人的来说, 经常有用的就是直接控制重连逻辑, Last-Event-ID 下面的示例会听取公告事件,存储最后一次看到的ID,并在断开连接后重播错过的缓冲事件.

import json
import time
import requests

API_KEY = "YOUR_API_KEY"
STREAM_URL = (
    "https://api.fxmacrodata.com/v1/stream/events"
    "?currencies=eur,gbp&indicators=inflation,policy_rate&api_key=" + API_KEY
)


def fetch_latest_release(currency: str, indicator: str) -> dict | None:
    url = f"https://api.fxmacrodata.com/v1/announcements/{currency}/{indicator}"
    params = {"api_key": API_KEY} if currency != "usd" else {}
    response = requests.get(url, params=params, timeout=20)
    response.raise_for_status()
    records = response.json()
    return records[-1] if records else None


def consume_stream() -> None:
    last_event_id = None

    while True:
        headers = {"Accept": "text/event-stream"}
        if last_event_id:
            headers["Last-Event-ID"] = last_event_id

        try:
            with requests.get(STREAM_URL, headers=headers, stream=True, timeout=90) as response:
                response.raise_for_status()

                event = {}
                for raw_line in response.iter_lines(decode_unicode=True):
                    if raw_line is None:
                        continue

                    line = raw_line.strip()

                    if not line:
                        if event.get("event") == "announcement" and event.get("data"):
                            payload = json.loads(event["data"])
                            last_event_id = event.get("id") or payload["event_id"]

                            latest = fetch_latest_release(
                                payload["currency"],
                                payload["indicator"],
                            )
                            print("Announcement event", payload)
                            print("Latest record", latest)

                        event = {}
                        continue

                    if line.startswith(":"):
                        continue

                    field, _, value = line.partition(":")
                    event[field] = value.lstrip()

        except requests.RequestException as exc:
            print(f"Stream disconnected: {exc}. Reconnecting in 3 seconds...")
            time.sleep(3)


if __name__ == "__main__":
    consume_stream()

如果在发布事件后,但在您的员工处理事件之前,连接下降,下一个请求包括最后接收的ID,服务器将继此之后的缓冲事件重播.


步骤6 决定当一个事件发生时,你想做什么

一旦流线连接到,真正的设计选择是下游发生的事情.

更新仪表板卡

当一个匹配事件到达时,取出最新记录,重新绘制一个面板,而不是重新加载整个页面.

触发交易或警报工作流程

在您将最新版本与您的门进行比较后, 将事件推到Slack,电子邮件或交易队列中.

热一个缓存

使用SSE作为无效触发,然后只有在新数据得到确认时才更新受影响的指标缓存.

结合与发布日历

使用 发布日程指南 知道接下来计划什么,然后将SSE作为实时确认层保持开放.


步骤7 处理前面的一些边缘案例

虽然SSE很简单,但生产使用仍然受益于几个规则:

  • 不要假设一个事件等于一个值. records_written 如果一个版本更新了不止一个记录,则可能大于一个.
  • 预计会重新连接. 浏览器,代理程序和移动网络偶尔会断绝长期连接;重播支持存在的原因正是这个.
  • 尽可能保持小溪. 通过 currencies 现在我 indicators 减少噪音,避免不必要的下游采集.
  • 让REST成为真理的来源. 流向告诉你发布发生了; 定制记录仍然来自匹配的公告终点.
实践规则: 如果您的应用程序错过了长时间的停机窗口,并且重播缓冲区不再足够,则重新使用正常的REST检索.

你所建立的

现在您已经有了FXMacroData实时工作流的核心事件驱动模式:订阅 https://api.fxmacrodata.com/v1/stream/events过到重要货币和指标,对其作出反应 announcement 只有当流向告诉你新东西已经降落时, 才能获取完整的发布数据.

接下来的自然步骤是将SSE与 发布日历计划指南 您可以知道接下来会发生什么,


汇率数据团队

Blogroll

AI Answer-Ready

Key Facts

Page
How To Use FXmacrodata Sse Streams
Section
Articles
Canonical URL
https://fxmacrodata.com/articles/how-to-use-fxmacrodata-sse-streams
Source
FXMacroData editorial and official publisher references
Last Updated
2026-06-15 11:01 UTC

Provenance And Trust

Cite the canonical URL and source field above. Where available, this page maps to official publisher releases and timestamped updates.

Quick Q&A

What is this page about? This page explains How To Use FXmacrodata Sse Streams with directly usable context for trading, research, and API workflows.

What source should be cited? Use the canonical URL and the listed source field; cite official publisher references when available.

How fresh is this content? The last updated value above reflects the page metadata or latest available data timestamp.

Can this be used in AI assistants? Yes. This section is intentionally structured for retrieval and citation in chat assistants.

Prompt Packs

Use these in ChatGPT, Claude, Gemini, Mistral, Perplexity, or Grok for consistent source-aware outputs.

Share page X LinkedIn Email