Pada akhir panduan ini Anda akan memiliki pelanggan SSE yang bekerja yang mendengarkan acara rilis FXMacroData baru secara real time, filter untuk mata uang dan indikator yang Anda pedulikan, dan kemudian menarik muatan indikator penuh dari REST API saat rilis diterbitkan.
Persyaratan
- Kunci API FXMacroData jika Anda ingin aliran non-USD; aliran hanya USD bekerja tanpa otentikasi
- Lingkungan browser untuk
EventSourcecontoh, atau Python 3.9+ untuk contoh pekerja - - Apa?
requestspaket yang diinstal jika Anda ingin mengikuti contoh Python (pip install requests) - Pemahaman dasar tentang titik akhir pengumuman seperti Dokumen inflasi USD Dan Docs suku bunga kebijakan EUR
Langkah 1 Memahami apa yang disampaikan oleh aliran SSE
Endpoint SSE baru membuka koneksi HTTP yang tahan lama dan mendorong sebuah event setiap kali FXMacroData mengonsumsi rilis ekonomi baru.
https://api.fxmacrodata.com/v1/stream/events
api.fxmacrodata.com (tidak fxmacrodata.com/api/…), yang langsung ke server API tanpa CDN buffering yang akan mengganggu live stream.Anda dapat mempersempit umpan dengan dua parameter kueri opsional:
currenciesdaftar yang dipisahkan dengan koma sepertiusd,eurindicatorsdaftar yang dipisahkan dengan koma sepertiinflation,policy_rate
Untuk mata uang non-USD, berikan kunci API Anda sebagai parameter kueri.
curl -N "https://api.fxmacrodata.com/v1/stream/events?currencies=eur,gbp&indicators=inflation,policy_rate&api_key=YOUR_API_KEY"
Jika Anda terhubung tanpa filter dan tanpa kunci Profesional, aliran secara otomatis dibatasi untuk acara USD. Itu membuat SSE berguna bahkan untuk bukti konsep gratis sebelum Anda kabel dalam cakupan yang lebih luas.
Perilaku penting
- Aliran adalah pemicu, bukan seluruh dataset. Setiap pesan SSE memberitahu Anda bahwa rilis mendarat; Anda masih memanggil titik akhir pengumuman yang cocok untuk mengambil catatan lengkap.
- Server mengirim detak jantung. Koneksi yang tidak aktif menerima
: heartbeatkomentar kira-kira setiap 15 detik sehingga proxy tidak menutup aliran. - Setiap acara termasuk ID. ID itu adalah apa yang kekuatan playback pada kembali terhubung melalui
Last-Event-IDAku tidak tahu.
Langkah 2 Tahu format acara sebelum Anda menulis klien
FXMacroData mengeluarkan frame W3C EventSource standar.
id: usd_inflation_1772109000
event: announcement
data: {"event_id": "usd_inflation_1772109000", "currency": "usd", "indicator": "inflation", "records_written": 1, "timestamp": 1772109002}
Bidang muatan berguna sengaja kecil dan operasional:
event_idpengidentifikasi deterministik dalam bentuk{currency}_{indicator}_{timestamp}currencykode mata uang huruf kecil sepertiusdataueurindicatorindikator FXMacroData slug, misalnyainflationAku akan pergi.policy_rate, ataunon_farm_payrollsrecords_writtenjumlah catatan baru yang disimpan selama konsumsitimestampUnix timestamp untuk ketika acara dipublikasikan ke aliran
Karena pesan SSE tidak membawa serangkaian waktu penuh, pola normal adalah: mendengarkan untuk peristiwa, kemudian memanggil titik akhir yang relevan di bawah Dokumen data API Untuk mengambil data rilis yang baru tersedia.
Langkah 3 Buka stream di browser dengan EventSource
Jika Anda membangun dasbor, browser berbasis pemberitahuan, atau halaman pemantauan internal, asli
EventSource adalah cara termudah untuk mengkonsumsi aliran. Gunakan string query untuk auth sehingga koneksi bekerja di browser tanpa header khusus.
const apiKey = "YOUR_API_KEY";
const streamUrl = new URL("https://api.fxmacrodata.com/v1/stream/events");
streamUrl.searchParams.set("currencies", "eur,gbp");
streamUrl.searchParams.set("indicators", "inflation,policy_rate");
streamUrl.searchParams.set("api_key", apiKey);
const source = new EventSource(streamUrl);
source.addEventListener("announcement", async (event) => {
const payload = JSON.parse(event.data);
console.log("Release received", payload);
const dataUrl = new URL(
`https://api.fxmacrodata.com/v1/announcements/${payload.currency}/${payload.indicator}`
);
dataUrl.searchParams.set("api_key", apiKey);
const response = await fetch(dataUrl);
const records = await response.json();
const latest = records[records.length - 1];
console.log("Latest record", latest);
});
source.onerror = (error) => {
console.error("SSE connection problem", error);
};
Orang asli. EventSource otomatis terhubung kembali ketika koneksi jatuh. id: Dalam setiap pesan, browser akan secara otomatis memasukkan ID acara terakhir yang diterima selama koneksi kembali, yang memungkinkan server memutar ulang acara yang Anda lewatkan.
Mengapa pola ini bekerja dengan baik
Halaman Anda tetap tidak aktif sampai rilis benar-benar mendarat. Tidak ada pemungutan suara lima menit, tidak ada permintaan yang sia-sia, dan tidak ada celah waktu antara publikasi dan logika refresh Anda. USD non-perhutanan gaji atau Inflasi EUR.
Langkah 4 Ambil muatan indikator penuh setelah setiap peringatan
Titik desain yang paling penting adalah bahwa SSE memberitahu Anda kapan untuk mengambil, tidak Semua Setelah suatu acara tiba, hubungi titik akhir pengumuman yang cocok dan periksa catatan terbaru.
async function fetchLatestRelease(currency, indicator, apiKey) {
const url = new URL(`https://api.fxmacrodata.com/v1/announcements/${currency}/${indicator}`);
if (currency !== "usd") {
url.searchParams.set("api_key", apiKey);
}
const response = await fetch(url);
if (!response.ok) {
throw new Error(`Announcement fetch failed: ${response.status}`);
}
const records = await response.json();
return records[records.length - 1] ?? null;
}
source.addEventListener("announcement", async (event) => {
const payload = JSON.parse(event.data);
const latest = await fetchLatestRelease(payload.currency, payload.indicator, apiKey);
if (latest) {
console.log(`New ${payload.currency.toUpperCase()} ${payload.indicator}`, latest);
}
});
Ini membuat aliran ringan sambil menjaga kesetiaan penuh dari titik akhir REST. Ini juga berarti kode parsing hilir yang sama dapat digunakan kembali apakah pemicu Anda berasal dari SSE, penjadwal, atau panggilan API manual.
Langkah 5 Membangun Python Worker dengan eksplisit replay pada reconnect
Untuk server-side daemon atau bot, sering berguna untuk mengontrol logik sambung kembali secara langsung dan mengirim
Last-Event-ID Contoh di bawah ini mendengarkan acara pengumuman, menyimpan ID terakhir yang terlihat, dan memutar ulang acara yang terbuffer yang hilang setelah pemutusan.
import json
import time
import requests
API_KEY = "YOUR_API_KEY"
STREAM_URL = (
"https://api.fxmacrodata.com/v1/stream/events"
"?currencies=eur,gbp&indicators=inflation,policy_rate&api_key=" + API_KEY
)
def fetch_latest_release(currency: str, indicator: str) -> dict | None:
url = f"https://api.fxmacrodata.com/v1/announcements/{currency}/{indicator}"
params = {"api_key": API_KEY} if currency != "usd" else {}
response = requests.get(url, params=params, timeout=20)
response.raise_for_status()
records = response.json()
return records[-1] if records else None
def consume_stream() -> None:
last_event_id = None
while True:
headers = {"Accept": "text/event-stream"}
if last_event_id:
headers["Last-Event-ID"] = last_event_id
try:
with requests.get(STREAM_URL, headers=headers, stream=True, timeout=90) as response:
response.raise_for_status()
event = {}
for raw_line in response.iter_lines(decode_unicode=True):
if raw_line is None:
continue
line = raw_line.strip()
if not line:
if event.get("event") == "announcement" and event.get("data"):
payload = json.loads(event["data"])
last_event_id = event.get("id") or payload["event_id"]
latest = fetch_latest_release(
payload["currency"],
payload["indicator"],
)
print("Announcement event", payload)
print("Latest record", latest)
event = {}
continue
if line.startswith(":"):
continue
field, _, value = line.partition(":")
event[field] = value.lstrip()
except requests.RequestException as exc:
print(f"Stream disconnected: {exc}. Reconnecting in 3 seconds...")
time.sleep(3)
if __name__ == "__main__":
consume_stream()
Ini adalah pola yang tepat ketika Anda membutuhkan pemutaran ulang deterministik di seluruh pemadaman sementara. Jika koneksi turun setelah sebuah acara dipublikasikan tetapi sebelum pekerja Anda memprosesnya, permintaan berikutnya termasuk ID terakhir yang diterima dan server memutar ulang acara yang disimpen yang datang setelahnya.
Langkah 6 Tentukan apa yang ingin Anda lakukan ketika suatu peristiwa terjadi
Setelah aliran dihubungkan, pilihan desain yang sebenarnya adalah apa yang terjadi di bawah aliran.
Perbarui kartu dashboard
Ketika acara yang cocok tiba, mengambil catatan terbaru dan menggambar ulang satu panel alih-alih memuat ulang seluruh halaman.
Trigger alert atau alert workflow
Push event ke Slack, email, atau antrian trading setelah Anda membandingkan rilis terbaru dengan ambang batas Anda.
Panaskan cache
Gunakan SSE sebagai pemicu pembatalan, kemudian perbarui cache indikator yang terkena hanya ketika data baru dikonfirmasi.
Gabungkan dengan kalender rilis
Gunakan panduan kalender rilis untuk mengetahui apa yang dijadwalkan selanjutnya, kemudian tetap SSE terbuka sebagai lapisan konfirmasi langsung.
Langkah 7 Mengatasi beberapa kasus tepi di depan
SSE sederhana, tetapi penggunaan produksi masih mendapat manfaat dari beberapa aturan:
- Jangan menganggap satu peristiwa sama dengan satu nilai.
records_writtendapat lebih besar dari satu jika rilis memperbarui lebih dari satu catatan. - Harapkan koneksi kembali. Browser, proxy, dan jaringan seluler kadang-kadang akan merusak koneksi yang tahan lama; dukungan replay ada untuk alasan ini.
- Jauhkan aliran sempit jika memungkinkan. Menyaring dengan
currenciesDanindicatorsmengurangi kebisingan dan menghindari pengambilan yang tidak perlu di hulu sungai. - Tinggalkan REST sebagai sumber kebenaran. Aliran memberitahu Anda rilis terjadi; catatan kanonik masih berasal dari titik akhir pengumuman yang cocok.
Apa yang kau bangun
Anda sekarang memiliki pola inti acara-disetir untuk FXMacroData realtime alur kerja: berlangganan
https://api.fxmacrodata.com/v1/stream/events, filter ke mata uang dan indikator yang penting, bereaksi terhadap
announcement peristiwa saat mereka tiba, dan mengambil data rilis penuh hanya ketika aliran memberitahu Anda sesuatu yang baru telah mendarat.
Langkah selanjutnya adalah menggabungkan SSE dengan Panduan penjadwalan kalender rilis Jadi Anda tahu apa yang akan terjadi selanjutnya dan masih menerima sinyal segera ketika publikasi benar-benar terjadi.
Tim FXMacroData