Implementation

How-To Guides

How to Use FXMacroData SSE Streams for Real-Time Macro Data

A step-by-step guide to building a working SSE subscriber that listens for new FXMacroData release events in real time, filters for the currencies and indicators you care about, and pulls the full indicator payload from the REST API.

متوفر أيضًا في English
Share article X LinkedIn Email

بحلول نهاية هذا الدليل سيكون لديك اشتراك في SSE يعمل الذي يستمع إلى أحداث إصدار FXMacroData الجديدة في الوقت الحقيقي، مرشحات للعملات والمؤشرات التي تهتم بها، ثم يسحب الحمل الفائد الكامل للمؤشر من REST API في اللحظة التي يتم فيها نشر الإصدار.

الشروط المسبقة

  • مفتاح FXMacroData API إذا كنت تريد تدفقات غير دولار أمريكي ؛ تدفق USD فقط تعمل دون مصادقة
  • بيئة متصفح لـ EventSource مثال، أو بايثون 3.9+ لمثال العامل
  • - ... requests حزمة مثبتة إذا كنت تريد أن تتبع مثال بايثون (pip install requests)
  • فهم أساسي لمؤشرات الإعلانات مثل وثائق التضخم بالدولار و أسعار سعر العملة في اليورو

الخطوة 1 فهم ما يقدمه سلسلة SSE

نقطة نهاية SSE الجديدة تفتح اتصال HTTP طويل الأمد وتدفع حدثًا في كل مرة تستهلك فيها FXMacroData إصدارًا اقتصاديًا جديدًا. النقطة النهائية هي:

https://api.fxmacrodata.com/v1/stream/events
مهم استخدم نطاق API المباشر: الـ SSE تتطلب اتصال مباشر دائماً استخدم api.fxmacrodata.com (ليس fxmacrodata.com/api/…), الذي يذهب مباشرة إلى خادم API دون CDN تخزين المؤقت الذي سيكسر التيار المباشر.

يمكنك تضييق نطاق المعلومات باستخدام معينين اختياريين:

  • currencies قائمة منفصلة عن الفاصلة مثل usd,eur
  • indicators قائمة منفصلة عن الفاصلة مثل inflation,policy_rate

بالنسبة للعملات غير الدولار الأمريكي، مرر مفتاح API الخاص بك كمعلم استفسار. على سبيل المثال:

curl -N "https://api.fxmacrodata.com/v1/stream/events?currencies=eur,gbp&indicators=inflation,policy_rate&api_key=YOUR_API_KEY"

إذا قمت بالاتصال دون مرشحات ودون مفتاح مهني، يتم تقييد التدفق تلقائيًا إلى أحداث الدولار الأمريكي. وهذا يجعل SSE مفيدًا حتى لإثبات مفهوم مجاني قبل أن تقوم بالأسلاك في تغطية أوسع.

سلوك مهم

  • التيار هو محفز، وليس مجموعة البيانات الكاملة. كل رسالة SSE تخبرك أن الإصدار قد وصل، لا تزال تدعو نقطة نهاية الإعلان المتطابقة لاسترداد السجلات الكاملة.
  • الخادم يرسل نبضات القلب الاتصالات العاطلة تتلقى : heartbeat يعلقون كل 15 ثانية تقريباً حتى لا تغلق الوكلاء التيار
  • كل حدث يتضمن هوية تلك الهوية هي ما تمكّن من إعادة تشغيلها عند إعادة الاتصال عبر Last-Event-ID. .

الخطوة 2 معرفة شكل الحدث قبل كتابة عميل

تقوم FXMacroData بإصدار إطارات W3C EventSource القياسية. تبدو رسالة نموذجية كهذه:

id: usd_inflation_1772109000
event: announcement
data: {"event_id": "usd_inflation_1772109000", "currency": "usd", "indicator": "inflation", "records_written": 1, "timestamp": 1772109002}

مجالات الحمولة المفيدة صغيرة وفعالة عمداً:

  • event_id معرف محدد في النموذج {currency}_{indicator}_{timestamp}
  • currency رمز العملة الحرفي الصغير مثل usd أو eur
  • indicator مؤشر FXMacroData slug، على سبيل المثال inflation- لا policy_rateأو non_farm_payrolls
  • records_written عدد السجلات الجديدة التي تم حفظها أثناء الإبتلاع
  • timestamp طابع وقت يونيكس عندما تم نشر الحدث على التيار

لأن رسالة SSE لا تحمل سلسلة زمنية كاملة، والنمط الطبيعي هو: الاستماع للحدث، ثم استدعاء نقطة النهاية ذات الصلة تحت مستندات بيانات واجهة برمجة التطبيقات لاسترداد بيانات الإصدار المتاحة حديثاً.


الخطوة 3 افتح التيار في المتصفح مع EventSource

إذا كنت تقوم ببناء لوحة التحكم، أو إشعار قائم على المتصفح، أو صفحة المراقبة الداخلية، EventSource هي أبسط طريقة لاستهلاك التدفق. استخدم سلسلة الاستفسار لـ auth حتى يعمل الاتصال في المتصفح دون رؤوس مخصصة.

const apiKey = "YOUR_API_KEY";

const streamUrl = new URL("https://api.fxmacrodata.com/v1/stream/events");
streamUrl.searchParams.set("currencies", "eur,gbp");
streamUrl.searchParams.set("indicators", "inflation,policy_rate");
streamUrl.searchParams.set("api_key", apiKey);

const source = new EventSource(streamUrl);

source.addEventListener("announcement", async (event) => {
  const payload = JSON.parse(event.data);
  console.log("Release received", payload);

  const dataUrl = new URL(
    `https://api.fxmacrodata.com/v1/announcements/${payload.currency}/${payload.indicator}`
  );
  dataUrl.searchParams.set("api_key", apiKey);

  const response = await fetch(dataUrl);
  const records = await response.json();
  const latest = records[records.length - 1];

  console.log("Latest record", latest);
});

source.onerror = (error) => {
  console.error("SSE connection problem", error);
};

الوطني EventSource تلقائياً عندما تسقط الاتصال. لأن FXMacroData يرسل id: في كل رسالة، سيشمل المتصفح تلقائيًا آخر معرف حدث تم استلامه أثناء إعادة الاتصال، مما يسمح للخادم بإعادة عرض الأحداث التي فاتتك.

لماذا هذا النمط يعمل بشكل جيد

تبقى صفحتك غير فعالة حتى يصل الإصدار فعليًا. لا يوجد استطلاع لمدة خمس دقائق ، ولا طلبات ضائعة ، ولا يوجد فجوة زمنية بين النشر ومنطق التحديث الخاص بك. هذا مفيد بشكل خاص للإصدارات عالية الحساسية مثل الرواتب غير الزراعية بالدولار أو التضخم في اليورو.


الخطوة 4 احصل على الحمولة المفيدة الكاملة للمؤشر بعد كل إنذار

أهم نقطة تصميم هي أن SSE يخبرك متى لا، لا، ليس كل شيء بعد وصول الحدث، اتصل بمركز الإعلان المتطابق وافحص أحدث سجل.

async function fetchLatestRelease(currency, indicator, apiKey) {
  const url = new URL(`https://api.fxmacrodata.com/v1/announcements/${currency}/${indicator}`);

  if (currency !== "usd") {
    url.searchParams.set("api_key", apiKey);
  }

  const response = await fetch(url);
  if (!response.ok) {
    throw new Error(`Announcement fetch failed: ${response.status}`);
  }

  const records = await response.json();
  return records[records.length - 1] ?? null;
}

source.addEventListener("announcement", async (event) => {
  const payload = JSON.parse(event.data);
  const latest = await fetchLatestRelease(payload.currency, payload.indicator, apiKey);

  if (latest) {
    console.log(`New ${payload.currency.toUpperCase()} ${payload.indicator}`, latest);
  }
});

هذا يبقي على وزن التدفق خفيفًا مع الحفاظ على الوفاء الكامل لمواقع النهاية REST. وهذا يعني أيضًا أنه يمكن إعادة استخدام نفس رمز تحليل التدريبات في الأسفل سواء كان الزناد الخاص بك يأتي من SSE أو جدولة أو دعوة API يدوية.


الخطوة 5 بناء عامل بايثون مع إعادة تشغيل صريحة على إعادة الاتصال

بالنسبة إلى الديمون أو الروبوت من جانب الخادم، فإنه من المفيد في كثير من الأحيان للتحكم في منطق إعادة الاتصال مباشرة وإرسال Last-Event-ID المثال أدناه يستمع إلى أحداث الإعلان، ويخزن آخر معرف تم رؤيته، ويعيد تشغيل الأحداث المفقودة المخبأة بعد فصل.

import json
import time
import requests

API_KEY = "YOUR_API_KEY"
STREAM_URL = (
    "https://api.fxmacrodata.com/v1/stream/events"
    "?currencies=eur,gbp&indicators=inflation,policy_rate&api_key=" + API_KEY
)


def fetch_latest_release(currency: str, indicator: str) -> dict | None:
    url = f"https://api.fxmacrodata.com/v1/announcements/{currency}/{indicator}"
    params = {"api_key": API_KEY} if currency != "usd" else {}
    response = requests.get(url, params=params, timeout=20)
    response.raise_for_status()
    records = response.json()
    return records[-1] if records else None


def consume_stream() -> None:
    last_event_id = None

    while True:
        headers = {"Accept": "text/event-stream"}
        if last_event_id:
            headers["Last-Event-ID"] = last_event_id

        try:
            with requests.get(STREAM_URL, headers=headers, stream=True, timeout=90) as response:
                response.raise_for_status()

                event = {}
                for raw_line in response.iter_lines(decode_unicode=True):
                    if raw_line is None:
                        continue

                    line = raw_line.strip()

                    if not line:
                        if event.get("event") == "announcement" and event.get("data"):
                            payload = json.loads(event["data"])
                            last_event_id = event.get("id") or payload["event_id"]

                            latest = fetch_latest_release(
                                payload["currency"],
                                payload["indicator"],
                            )
                            print("Announcement event", payload)
                            print("Latest record", latest)

                        event = {}
                        continue

                    if line.startswith(":"):
                        continue

                    field, _, value = line.partition(":")
                    event[field] = value.lstrip()

        except requests.RequestException as exc:
            print(f"Stream disconnected: {exc}. Reconnecting in 3 seconds...")
            time.sleep(3)


if __name__ == "__main__":
    consume_stream()

هذا هو النمط الصحيح عندما تحتاج إلى إعادة تحديد خلال انقطاع مؤقت. إذا انخفض الاتصال بعد نشر حدث ولكن قبل أن يعالجها عاملك ، فإن الطلب التالي يتضمن آخر معرف تم استلامه ويستعيد الخادم الأحداث المخزنة التي جاءت بعد ذلك.


الخطوة 6 قرر ما تريد فعله عندما يحصل حدث

بمجرد توصيل التيار ، فإن اختيار التصميم الحقيقي هو ما يحدث أسفل النهر. وتشمل الأنماط الشائعة:

احديث بطاقة لوحة القيادة

عندما يصل حدث مطابق، احصل على آخر سجل وإعادة رسم لوحة واحدة بدلاً من إعادة تحميل الصفحة بأكملها.

إطلاق سير العمل للتداول أو التنبيه

قم بتحويل الحدث إلى Slack أو البريد الإلكتروني أو صف التداول بعد مقارنة أحدث إصدار مع عتباتك.

احرص على الاحتفاظ

استخدم SSE كشعلة لإبطال، ثم قم بتحديث ذاكرة المؤشر المتأثرة فقط عندما يتم تأكيد بيانات جديدة.

دمج مع تقويم الإصدار

استخدم دليل تقويم الإصدار لمعرفة ما هو المقرر بعد ذلك، ثم إبقاء SSE مفتوحة كطبقة تأكيد حية.


الخطوة 7 التعامل مع بعض الحواف في المقدمة

إنّه بسيط، لكن الاستخدام في الإنتاج لا يزال يستفيد من قواعد قليلة:

  • لا تفترض أن حدث واحد يساوي قيمة واحدة. records_written قد يكون أكبر من واحد إذا كان الإصدار يحديث أكثر من سجل واحد.
  • أتوقع إعادة الاتصال المتصفحات والوكلاء وشبكات الهاتف المحمول سوف تقطع أحيانا اتصالات طويلة الأمد؛ دعم إعادة تشغيل موجود لهذا السبب بالضبط.
  • أبقِ الجدول ضيقاً قدر المستطاع تصفية بواسطة currencies و indicators يقلل من الضوضاء ويتجنب عمليات الاستيلاء غير الضرورية في الأسفل.
  • اتركوا الراحة كمصدر للحق يخبرك التيار بأن هناك إصدار حدث، السجل القانوني لا يزال يأتي من نقطة نهاية الإعلان المتطابقة.
قاعدة عملية: إذا لم تتمكن تطبيقك من إيقاف تشغيل نافذة طويلة ولم يعد بوفر إعادة تشغيلة كافياً، فاستمر في استرداد REST العادي للمؤشرات التي تتبعها. يجب أن يجعل SSE نظامك أسرع، وليس أكثر هشاشة.

ما بنيت

لديك الآن نمط الحدث القائم على الأساس لـ FXMacroData سير العمل في الوقت الحقيقي: الاشتراك في https://api.fxmacrodata.com/v1/stream/events، تصفية إلى العملات والمؤشرات التي تهم، والرد على announcement الأحداث كما تصل، والحصول على بيانات الإصدار الكامل فقط عندما يخبرك التيار شيء جديد هبط.

الخطوة التالية الطبيعية هي الجمع بين SSE و دليل جدولة تقويم الإصدار حتى تعرف ما هو المقرر بعد ذلك ولا تزال تتلقى إشارة فورية عندما يحدث النشر في الواقع.


فريق FXMacroData

Blogroll

AI Answer-Ready

Key Facts

Page
How To Use FXmacrodata Sse Streams
Section
Articles
Canonical URL
https://fxmacrodata.com/articles/how-to-use-fxmacrodata-sse-streams
Source
FXMacroData editorial and official publisher references
Last Updated
2026-06-15 11:01 UTC

Provenance And Trust

Cite the canonical URL and source field above. Where available, this page maps to official publisher releases and timestamped updates.

Quick Q&A

What is this page about? This page explains How To Use FXmacrodata Sse Streams with directly usable context for trading, research, and API workflows.

What source should be cited? Use the canonical URL and the listed source field; cite official publisher references when available.

How fresh is this content? The last updated value above reflects the page metadata or latest available data timestamp.

Can this be used in AI assistants? Yes. This section is intentionally structured for retrieval and citation in chat assistants.

Prompt Packs

Use these in ChatGPT, Claude, Gemini, Mistral, Perplexity, or Grok for consistent source-aware outputs.

Share page X LinkedIn Email