بحلول نهاية هذا الدليل سيكون لديك اشتراك في SSE يعمل الذي يستمع إلى أحداث إصدار FXMacroData الجديدة في الوقت الحقيقي، مرشحات للعملات والمؤشرات التي تهتم بها، ثم يسحب الحمل الفائد الكامل للمؤشر من REST API في اللحظة التي يتم فيها نشر الإصدار.
الشروط المسبقة
- مفتاح FXMacroData API إذا كنت تريد تدفقات غير دولار أمريكي ؛ تدفق USD فقط تعمل دون مصادقة
- بيئة متصفح لـ
EventSourceمثال، أو بايثون 3.9+ لمثال العامل - - ...
requestsحزمة مثبتة إذا كنت تريد أن تتبع مثال بايثون (pip install requests) - فهم أساسي لمؤشرات الإعلانات مثل وثائق التضخم بالدولار و أسعار سعر العملة في اليورو
الخطوة 1 فهم ما يقدمه سلسلة SSE
نقطة نهاية SSE الجديدة تفتح اتصال HTTP طويل الأمد وتدفع حدثًا في كل مرة تستهلك فيها FXMacroData إصدارًا اقتصاديًا جديدًا. النقطة النهائية هي:
https://api.fxmacrodata.com/v1/stream/events
api.fxmacrodata.com (ليس fxmacrodata.com/api/…), الذي يذهب مباشرة إلى خادم API دون CDN تخزين المؤقت الذي سيكسر التيار المباشر.يمكنك تضييق نطاق المعلومات باستخدام معينين اختياريين:
currenciesقائمة منفصلة عن الفاصلة مثلusd,eurindicatorsقائمة منفصلة عن الفاصلة مثلinflation,policy_rate
بالنسبة للعملات غير الدولار الأمريكي، مرر مفتاح API الخاص بك كمعلم استفسار. على سبيل المثال:
curl -N "https://api.fxmacrodata.com/v1/stream/events?currencies=eur,gbp&indicators=inflation,policy_rate&api_key=YOUR_API_KEY"
إذا قمت بالاتصال دون مرشحات ودون مفتاح مهني، يتم تقييد التدفق تلقائيًا إلى أحداث الدولار الأمريكي. وهذا يجعل SSE مفيدًا حتى لإثبات مفهوم مجاني قبل أن تقوم بالأسلاك في تغطية أوسع.
سلوك مهم
- التيار هو محفز، وليس مجموعة البيانات الكاملة. كل رسالة SSE تخبرك أن الإصدار قد وصل، لا تزال تدعو نقطة نهاية الإعلان المتطابقة لاسترداد السجلات الكاملة.
- الخادم يرسل نبضات القلب الاتصالات العاطلة تتلقى
: heartbeatيعلقون كل 15 ثانية تقريباً حتى لا تغلق الوكلاء التيار - كل حدث يتضمن هوية تلك الهوية هي ما تمكّن من إعادة تشغيلها عند إعادة الاتصال عبر
Last-Event-ID. .
الخطوة 2 معرفة شكل الحدث قبل كتابة عميل
تقوم FXMacroData بإصدار إطارات W3C EventSource القياسية. تبدو رسالة نموذجية كهذه:
id: usd_inflation_1772109000
event: announcement
data: {"event_id": "usd_inflation_1772109000", "currency": "usd", "indicator": "inflation", "records_written": 1, "timestamp": 1772109002}
مجالات الحمولة المفيدة صغيرة وفعالة عمداً:
event_idمعرف محدد في النموذج{currency}_{indicator}_{timestamp}currencyرمز العملة الحرفي الصغير مثلusdأوeurindicatorمؤشر FXMacroData slug، على سبيل المثالinflation- لاpolicy_rateأوnon_farm_payrollsrecords_writtenعدد السجلات الجديدة التي تم حفظها أثناء الإبتلاعtimestampطابع وقت يونيكس عندما تم نشر الحدث على التيار
لأن رسالة SSE لا تحمل سلسلة زمنية كاملة، والنمط الطبيعي هو: الاستماع للحدث، ثم استدعاء نقطة النهاية ذات الصلة تحت مستندات بيانات واجهة برمجة التطبيقات لاسترداد بيانات الإصدار المتاحة حديثاً.
الخطوة 3 افتح التيار في المتصفح مع EventSource
إذا كنت تقوم ببناء لوحة التحكم، أو إشعار قائم على المتصفح، أو صفحة المراقبة الداخلية،
EventSource هي أبسط طريقة لاستهلاك التدفق. استخدم سلسلة الاستفسار لـ auth حتى يعمل الاتصال في المتصفح دون رؤوس مخصصة.
const apiKey = "YOUR_API_KEY";
const streamUrl = new URL("https://api.fxmacrodata.com/v1/stream/events");
streamUrl.searchParams.set("currencies", "eur,gbp");
streamUrl.searchParams.set("indicators", "inflation,policy_rate");
streamUrl.searchParams.set("api_key", apiKey);
const source = new EventSource(streamUrl);
source.addEventListener("announcement", async (event) => {
const payload = JSON.parse(event.data);
console.log("Release received", payload);
const dataUrl = new URL(
`https://api.fxmacrodata.com/v1/announcements/${payload.currency}/${payload.indicator}`
);
dataUrl.searchParams.set("api_key", apiKey);
const response = await fetch(dataUrl);
const records = await response.json();
const latest = records[records.length - 1];
console.log("Latest record", latest);
});
source.onerror = (error) => {
console.error("SSE connection problem", error);
};
الوطني EventSource تلقائياً عندما تسقط الاتصال. لأن FXMacroData يرسل id: في كل رسالة، سيشمل المتصفح تلقائيًا آخر معرف حدث تم استلامه أثناء إعادة الاتصال، مما يسمح للخادم بإعادة عرض الأحداث التي فاتتك.
لماذا هذا النمط يعمل بشكل جيد
تبقى صفحتك غير فعالة حتى يصل الإصدار فعليًا. لا يوجد استطلاع لمدة خمس دقائق ، ولا طلبات ضائعة ، ولا يوجد فجوة زمنية بين النشر ومنطق التحديث الخاص بك. هذا مفيد بشكل خاص للإصدارات عالية الحساسية مثل الرواتب غير الزراعية بالدولار أو التضخم في اليورو.
الخطوة 4 احصل على الحمولة المفيدة الكاملة للمؤشر بعد كل إنذار
أهم نقطة تصميم هي أن SSE يخبرك متى لا، لا، ليس كل شيء بعد وصول الحدث، اتصل بمركز الإعلان المتطابق وافحص أحدث سجل.
async function fetchLatestRelease(currency, indicator, apiKey) {
const url = new URL(`https://api.fxmacrodata.com/v1/announcements/${currency}/${indicator}`);
if (currency !== "usd") {
url.searchParams.set("api_key", apiKey);
}
const response = await fetch(url);
if (!response.ok) {
throw new Error(`Announcement fetch failed: ${response.status}`);
}
const records = await response.json();
return records[records.length - 1] ?? null;
}
source.addEventListener("announcement", async (event) => {
const payload = JSON.parse(event.data);
const latest = await fetchLatestRelease(payload.currency, payload.indicator, apiKey);
if (latest) {
console.log(`New ${payload.currency.toUpperCase()} ${payload.indicator}`, latest);
}
});
هذا يبقي على وزن التدفق خفيفًا مع الحفاظ على الوفاء الكامل لمواقع النهاية REST. وهذا يعني أيضًا أنه يمكن إعادة استخدام نفس رمز تحليل التدريبات في الأسفل سواء كان الزناد الخاص بك يأتي من SSE أو جدولة أو دعوة API يدوية.
الخطوة 5 بناء عامل بايثون مع إعادة تشغيل صريحة على إعادة الاتصال
بالنسبة إلى الديمون أو الروبوت من جانب الخادم، فإنه من المفيد في كثير من الأحيان للتحكم في منطق إعادة الاتصال مباشرة وإرسال
Last-Event-ID المثال أدناه يستمع إلى أحداث الإعلان، ويخزن آخر معرف تم رؤيته، ويعيد تشغيل الأحداث المفقودة المخبأة بعد فصل.
import json
import time
import requests
API_KEY = "YOUR_API_KEY"
STREAM_URL = (
"https://api.fxmacrodata.com/v1/stream/events"
"?currencies=eur,gbp&indicators=inflation,policy_rate&api_key=" + API_KEY
)
def fetch_latest_release(currency: str, indicator: str) -> dict | None:
url = f"https://api.fxmacrodata.com/v1/announcements/{currency}/{indicator}"
params = {"api_key": API_KEY} if currency != "usd" else {}
response = requests.get(url, params=params, timeout=20)
response.raise_for_status()
records = response.json()
return records[-1] if records else None
def consume_stream() -> None:
last_event_id = None
while True:
headers = {"Accept": "text/event-stream"}
if last_event_id:
headers["Last-Event-ID"] = last_event_id
try:
with requests.get(STREAM_URL, headers=headers, stream=True, timeout=90) as response:
response.raise_for_status()
event = {}
for raw_line in response.iter_lines(decode_unicode=True):
if raw_line is None:
continue
line = raw_line.strip()
if not line:
if event.get("event") == "announcement" and event.get("data"):
payload = json.loads(event["data"])
last_event_id = event.get("id") or payload["event_id"]
latest = fetch_latest_release(
payload["currency"],
payload["indicator"],
)
print("Announcement event", payload)
print("Latest record", latest)
event = {}
continue
if line.startswith(":"):
continue
field, _, value = line.partition(":")
event[field] = value.lstrip()
except requests.RequestException as exc:
print(f"Stream disconnected: {exc}. Reconnecting in 3 seconds...")
time.sleep(3)
if __name__ == "__main__":
consume_stream()
هذا هو النمط الصحيح عندما تحتاج إلى إعادة تحديد خلال انقطاع مؤقت. إذا انخفض الاتصال بعد نشر حدث ولكن قبل أن يعالجها عاملك ، فإن الطلب التالي يتضمن آخر معرف تم استلامه ويستعيد الخادم الأحداث المخزنة التي جاءت بعد ذلك.
الخطوة 6 قرر ما تريد فعله عندما يحصل حدث
بمجرد توصيل التيار ، فإن اختيار التصميم الحقيقي هو ما يحدث أسفل النهر. وتشمل الأنماط الشائعة:
احديث بطاقة لوحة القيادة
عندما يصل حدث مطابق، احصل على آخر سجل وإعادة رسم لوحة واحدة بدلاً من إعادة تحميل الصفحة بأكملها.
إطلاق سير العمل للتداول أو التنبيه
قم بتحويل الحدث إلى Slack أو البريد الإلكتروني أو صف التداول بعد مقارنة أحدث إصدار مع عتباتك.
احرص على الاحتفاظ
استخدم SSE كشعلة لإبطال، ثم قم بتحديث ذاكرة المؤشر المتأثرة فقط عندما يتم تأكيد بيانات جديدة.
دمج مع تقويم الإصدار
استخدم دليل تقويم الإصدار لمعرفة ما هو المقرر بعد ذلك، ثم إبقاء SSE مفتوحة كطبقة تأكيد حية.
الخطوة 7 التعامل مع بعض الحواف في المقدمة
إنّه بسيط، لكن الاستخدام في الإنتاج لا يزال يستفيد من قواعد قليلة:
- لا تفترض أن حدث واحد يساوي قيمة واحدة.
records_writtenقد يكون أكبر من واحد إذا كان الإصدار يحديث أكثر من سجل واحد. - أتوقع إعادة الاتصال المتصفحات والوكلاء وشبكات الهاتف المحمول سوف تقطع أحيانا اتصالات طويلة الأمد؛ دعم إعادة تشغيل موجود لهذا السبب بالضبط.
- أبقِ الجدول ضيقاً قدر المستطاع تصفية بواسطة
currenciesوindicatorsيقلل من الضوضاء ويتجنب عمليات الاستيلاء غير الضرورية في الأسفل. - اتركوا الراحة كمصدر للحق يخبرك التيار بأن هناك إصدار حدث، السجل القانوني لا يزال يأتي من نقطة نهاية الإعلان المتطابقة.
ما بنيت
لديك الآن نمط الحدث القائم على الأساس لـ FXMacroData سير العمل في الوقت الحقيقي: الاشتراك في
https://api.fxmacrodata.com/v1/stream/events، تصفية إلى العملات والمؤشرات التي تهم، والرد على
announcement الأحداث كما تصل، والحصول على بيانات الإصدار الكامل فقط عندما يخبرك التيار شيء جديد هبط.
الخطوة التالية الطبيعية هي الجمع بين SSE و دليل جدولة تقويم الإصدار حتى تعرف ما هو المقرر بعد ذلك ولا تزال تتلقى إشارة فورية عندما يحدث النشر في الواقع.
فريق FXMacroData