通过此指南,您将有一个可用的SSE订阅者,它可以实时收听新FXMacroData发布事件,对您感兴趣的货币和指标进行过,然后在发布发布时从REST API中提取全部指标有效负载.
预先要求
了解SSE流程所提供的内容
新的SSE终点打开一个长期的HTTP连接,并在FXMacroData每次摄入新经济版本时推送一个事件.终点是:
https://api.fxmacrodata.com/v1/stream/events
api.fxmacrodata.com (不是 fxmacrodata.com/api/…直接传输到API服务器, 没有CDN缓冲,您可以通过两个可选查询参数缩小输入范围:
currencies像是 像 像usd,eurindicators像是 像 像inflation,policy_rate
对于非美元货币,将您的API键传输为查询参数. 例如:
curl -N "https://api.fxmacrodata.com/v1/stream/events?currencies=eur,gbp&indicators=inflation,policy_rate&api_key=YOUR_API_KEY"
如果您连接没有过器和没有专业密钥,流自动限制在美元事件. 这使SSE甚至在您在更广泛的覆盖范围内进行免费概念验证时也很有用.
重要行为
- 流量是一个触发器,而不是整个数据集. 每个SSE消息都告诉你发布了;你仍然需要调用匹配的公告终点来获取完整的记录.
- 服务器发送心跳. 无效连接接收到一个
: heartbeat评论大约每15秒就会出现, - 每个事件都包含一个身份证. 通过重新连接,
Last-Event-ID现在我们要做什么?
步骤2 写客户端之前,要知道事件格式
FXMacroData发出标准的W3C事件源框架.典型的消息看起来像这样:
id: usd_inflation_1772109000
event: announcement
data: {"event_id": "usd_inflation_1772109000", "currency": "usd", "indicator": "inflation", "records_written": 1, "timestamp": 1772109002}
有效载荷场是故意小的,可操作的:
event_id确定性标识符{currency}_{indicator}_{timestamp}currency小写货币代码,如usd没有eurindicator例如FXMacroData指标inflation没有人知道.policy_rate没有non_farm_payrollsrecords_written在摄入过程中保存的新记录数量timestamp时刻显示事件在流中发布时
由于SSE消息不包含完整的时间序列,正常模式是:听取事件,然后在 API数据文件 获取新获取的发布数据.
步骤3 用 EventSource 在浏览器中打开流
您正在建立一个仪表板,基于浏览器的通知或内部监控页面,
EventSource 是消耗流的最简单方法. 使用查询字符串为 auth,以便连接在浏览器中工作,而不需要自定义头.
const apiKey = "YOUR_API_KEY";
const streamUrl = new URL("https://api.fxmacrodata.com/v1/stream/events");
streamUrl.searchParams.set("currencies", "eur,gbp");
streamUrl.searchParams.set("indicators", "inflation,policy_rate");
streamUrl.searchParams.set("api_key", apiKey);
const source = new EventSource(streamUrl);
source.addEventListener("announcement", async (event) => {
const payload = JSON.parse(event.data);
console.log("Release received", payload);
const dataUrl = new URL(
`https://api.fxmacrodata.com/v1/announcements/${payload.currency}/${payload.indicator}`
);
dataUrl.searchParams.set("api_key", apiKey);
const response = await fetch(dataUrl);
const records = await response.json();
const latest = records[records.length - 1];
console.log("Latest record", latest);
});
source.onerror = (error) => {
console.error("SSE connection problem", error);
};
美国人 EventSource 由于FXMacroData会发送一个 id: 在重新连接时,浏览器会自动包含最后接收的事件ID,
这种模式为什么很有效
您的页面将停留在空中,直到发布实际登陆. 没有五分钟的投票,没有浪费的请求,以及发布和更新逻辑之间的时间差距. 这对于高敏感性发布特别有用. 美元非农业工资 没有 欧元通货膨胀现在我们要做什么?
步骤4 在每个报警后获取全指标有效载荷
设计中最重要的一点是SSE告诉你 什么时候 为了找,而不是 一切都没有 显示. 事件到达后,调用匹配公告终点,检查最新记录.
async function fetchLatestRelease(currency, indicator, apiKey) {
const url = new URL(`https://api.fxmacrodata.com/v1/announcements/${currency}/${indicator}`);
if (currency !== "usd") {
url.searchParams.set("api_key", apiKey);
}
const response = await fetch(url);
if (!response.ok) {
throw new Error(`Announcement fetch failed: ${response.status}`);
}
const records = await response.json();
return records[records.length - 1] ?? null;
}
source.addEventListener("announcement", async (event) => {
const payload = JSON.parse(event.data);
const latest = await fetchLatestRelease(payload.currency, payload.indicator, apiKey);
if (latest) {
console.log(`New ${payload.currency.toUpperCase()} ${payload.indicator}`, latest);
}
});
这使流量保持轻量化,同时保持REST终端的完整保真性.这也意味着无论您的触发来自SSE,调度器还是手动API调用,都可以重复使用相同的下游解析代码.
步骤 5 构建一个Python工作者,在重新连接时具有明确的重播
对于服务器端的机器人或机器人的来说, 经常有用的就是直接控制重连逻辑,
Last-Event-ID 下面的示例会听取公告事件,存储最后一次看到的ID,并在断开连接后重播错过的缓冲事件.
import json
import time
import requests
API_KEY = "YOUR_API_KEY"
STREAM_URL = (
"https://api.fxmacrodata.com/v1/stream/events"
"?currencies=eur,gbp&indicators=inflation,policy_rate&api_key=" + API_KEY
)
def fetch_latest_release(currency: str, indicator: str) -> dict | None:
url = f"https://api.fxmacrodata.com/v1/announcements/{currency}/{indicator}"
params = {"api_key": API_KEY} if currency != "usd" else {}
response = requests.get(url, params=params, timeout=20)
response.raise_for_status()
records = response.json()
return records[-1] if records else None
def consume_stream() -> None:
last_event_id = None
while True:
headers = {"Accept": "text/event-stream"}
if last_event_id:
headers["Last-Event-ID"] = last_event_id
try:
with requests.get(STREAM_URL, headers=headers, stream=True, timeout=90) as response:
response.raise_for_status()
event = {}
for raw_line in response.iter_lines(decode_unicode=True):
if raw_line is None:
continue
line = raw_line.strip()
if not line:
if event.get("event") == "announcement" and event.get("data"):
payload = json.loads(event["data"])
last_event_id = event.get("id") or payload["event_id"]
latest = fetch_latest_release(
payload["currency"],
payload["indicator"],
)
print("Announcement event", payload)
print("Latest record", latest)
event = {}
continue
if line.startswith(":"):
continue
field, _, value = line.partition(":")
event[field] = value.lstrip()
except requests.RequestException as exc:
print(f"Stream disconnected: {exc}. Reconnecting in 3 seconds...")
time.sleep(3)
if __name__ == "__main__":
consume_stream()
如果在发布事件后,但在您的员工处理事件之前,连接下降,下一个请求包括最后接收的ID,服务器将继此之后的缓冲事件重播.
步骤6 决定当一个事件发生时,你想做什么
一旦流线连接到,真正的设计选择是下游发生的事情.
更新仪表板卡
当一个匹配事件到达时,取出最新记录,重新绘制一个面板,而不是重新加载整个页面.
触发交易或警报工作流程
在您将最新版本与您的门进行比较后, 将事件推到Slack,电子邮件或交易队列中.
热一个缓存
使用SSE作为无效触发,然后只有在新数据得到确认时才更新受影响的指标缓存.
结合与发布日历
使用 发布日程指南 知道接下来计划什么,然后将SSE作为实时确认层保持开放.
步骤7 处理前面的一些边缘案例
虽然SSE很简单,但生产使用仍然受益于几个规则:
- 不要假设一个事件等于一个值.
records_written如果一个版本更新了不止一个记录,则可能大于一个. - 预计会重新连接. 浏览器,代理程序和移动网络偶尔会断绝长期连接;重播支持存在的原因正是这个.
- 尽可能保持小溪. 通过
currencies现在我indicators减少噪音,避免不必要的下游采集. - 让REST成为真理的来源. 流向告诉你发布发生了; 定制记录仍然来自匹配的公告终点.
你所建立的
现在您已经有了FXMacroData实时工作流的核心事件驱动模式:订阅
https://api.fxmacrodata.com/v1/stream/events过到重要货币和指标,对其作出反应
announcement 只有当流向告诉你新东西已经降落时, 才能获取完整的发布数据.
接下来的自然步骤是将SSE与 发布日历计划指南 您可以知道接下来会发生什么,
汇率数据团队