Makrodatenveröffentlichungen gehören zu den zuverlässigsten Katalysatoren auf den Devisenmärkten. Ein CPI-Druck von 0,2% über dem Konsens kann EUR/USD 60 Pips schieben, bevor die Nachrichtenüberschrift sogar geladen wird. Wenn Ihr Alarmsystem auf manuelle Kalenderprüfungen oder breite zeitgebundene Umfragen beruht, sind Sie bereits zurück. Diese Anleitung zeigt Ihnen, wie Sie eine präzise, webhook-gesteuerte Warnpipeline erstellen, die neue FXMacroData-Veröffentlichung erkennt, sobald sie erscheint, und Benachrichtigungen an Slack, Discord oder einen HTTP-Endpunkt in weniger als 80 Zeilen Python liefert.
Was du bauen wirst
- Eine Wahlschleife der den Endpunkt der Ankündigungen von FXMacroData auf einem konfigurierbaren Intervall überprüft und neu veröffentlichte Werte erkennt
- Ein Webhook-Dispatcher Das schießt eine HTTP POST an Slack, Discord oder einen benutzerdefinierten Endpunkt, wenn eine neue Version erkannt wird
- Eine Vorwarnung für den Kalender der Veröffentlichung Das warnt Sie Minuten vor einem Ereignis mit hoher Wirkung so sind Sie bereit, bevor die Zahl druckt
- Persistenz des Staates mit einer leichten JSON-Datei, so dass der Bot nie doppelt die gleiche Warnung über Neustarts
Voraussetzungen
- Python 3.9+
- FXMacroData-API-Schlüssel melden Sie sich an /abonnieren und kopieren Sie Ihren Schlüssel vom Armaturenbrett
- Eine Webhook-URL ein Slack-Eingangs-Webhook erstellen Die Daten werden von den zuständigen Behörden der Mitgliedstaaten übermittelt.Oder hol dir einen Discord-Kanal unter dem Link . Einstellungen → Integrationen → Webhooks
- Python-Pakete- Ich weiß .
requests- Ich weiß .schedule
pip install requests schedule
Alle Anmeldeinformationen als Umgebungsvariablen speichern niemals Hardcode-Schlüssel in Quelldateien:
export FXMACRO_API_KEY="YOUR_FXMACRODATA_KEY"
export WEBHOOK_URL="https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK"
Umfragen gegen Push: Wahl des richtigen Musters
Für reine Webhook-Abonnements sind ein öffentlicher Server-Endpunkt und ein Datenanbieter erforderlich, der ausgehende Benachrichtigungen sendet. Leichtgewicht der Wahlschleife (Ihr Code fragt die API regelmäßig) mit einem Webhook-Dispatcher (Ihr Code schiebt das Ergebnis nach unten, sobald etwas Neues erscheint).
Dies gibt Ihnen die Zuverlässigkeit von pull-basierten Daten (keine verpassten Pushes, wenn Ihr Server nicht funktioniert) mit der Sofortigkeit der Webhook-Lieferung an die Tools, die Ihr Team bereits verwendet Slack, Discord, PagerDuty, n8n oder ein HTTP-Ziel.
- Schritt 1 -
Schritt 1 Holen Sie den letzten Ankündigungswert ab
Die ... Ankündigungen Endpunkt
gibt den letzten veröffentlichten Wert für jeden Indikator und jede Währung zurück.
announcement_datetime ein UTC-Zeitstempel zweiter Ebene, der genau anzeigt, wann dieser Wert veröffentlicht wurde.
import os
import requests
BASE_URL = "https://fxmacrodata.com/api/v1"
API_KEY = os.environ["FXMACRO_API_KEY"]
def fetch_latest(currency: str, indicator: str) -> dict | None:
"""Return the most recent announcement record, or None on failure."""
try:
resp = requests.get(
f"{BASE_URL}/announcements/{currency}/{indicator}",
params={"api_key": API_KEY},
timeout=10,
)
resp.raise_for_status()
data = resp.json().get("data", [])
return data[0] if data else None
except requests.RequestException as exc:
print(f"[WARN] fetch failed for {currency}/{indicator}: {exc}")
return None
Eine Probenreaktion sieht so aus:
{
"date": "2026-04-02",
"val": 4.35,
"prior": 4.10,
"announcement_datetime": "2026-04-02T02:30:00Z",
"currency": "aud",
"indicator": "policy_rate"
}
- Schritt 2 -
Schritt 2 Verfolgungsstatus zur Vermeidung von doppelten Warnmeldungen
Ein Neustart Ihres Prozesses sollte keine erneuten Warnungen für bereits angezeigte Releases auslösen. announcement_datetime Bei dem Start lädt der Bot diese Datei; nach jeder neuen Warnung schreibt er den aktualisierten Zeitstempel zurück.
import json
from pathlib import Path
STATE_FILE = Path("alert_state.json")
def load_state() -> dict:
if STATE_FILE.exists():
return json.loads(STATE_FILE.read_text())
return {}
def save_state(state: dict) -> None:
STATE_FILE.write_text(json.dumps(state, indent=2))
def is_new_release(state: dict, key: str, record: dict) -> bool:
"""Return True if the announcement_datetime is newer than what we last saw."""
last_seen = state.get(key)
current = record.get("announcement_datetime")
return current is not None and current != last_seen
- Schritt 3 -
Schritt 3 Senden einer Webhook-Benachrichtigung
Sowohl Slack-Eingangs-Webhooks als auch Discord-Web-Hooks akzeptieren eine HTTP-POST mit einer JSON-Nutzlast. Die folgende Funktion erstellt eine formatierte Nachricht und versendet sie. Slack verwendet eine text Feld; Discord verwendet
contentDer Helfer passt sich automatisch an das URL-Präfix an.
def send_webhook(webhook_url: str, record: dict, currency: str, indicator: str) -> None:
"""POST a formatted macro-alert message to a Slack or Discord webhook."""
value = record.get("val")
prior = record.get("prior")
dt = record.get("announcement_datetime", "")
ccy = currency.upper()
ind = indicator.replace("_", " ").title()
lines = [
f"📣 *{ccy} {ind}* just printed",
f" Value : *{value}* | Prior: {prior}",
f" Released: {dt}",
]
message = "\n".join(lines)
# Discord uses "content", Slack uses "text"
if "discord.com" in webhook_url:
payload = {"content": message.replace("*", "**")}
else:
payload = {"text": message}
try:
resp = requests.post(webhook_url, json=payload, timeout=10)
resp.raise_for_status()
print(f"[OK] alert sent for {ccy} {ind}")
except requests.RequestException as exc:
print(f"[ERROR] webhook delivery failed: {exc}")
Tipp: benutzerdefinierte HTTP-Ziele
Das gleiche . send_webhook Funktion funktioniert mit jedem HTTP-Endpunkt, der POST n8n Automatisierung Workflows, Make (Integromat) Szenarien, PagerDuty Ereignis-APIs oder Ihren eigenen Flask/FastAPI-Empfänger akzeptiert. WEBHOOK_URL Die Daten werden dann an die Ziel-URL angepasst.
Schritt 4 Beobachten Sie mehrere Indikatoren in einer Umfrageschleife
Die - - - - - - - - -
schedule Die Bibliothek macht das ohne einen Cron-Daemon einfach.
import schedule
import time
WEBHOOK_URL = os.environ["WEBHOOK_URL"]
# Pairs to monitor — add or remove as needed
WATCHLIST = [
("usd", "policy_rate"),
("usd", "inflation"),
("usd", "non_farm_payrolls"),
("eur", "policy_rate"),
("eur", "inflation"),
("aud", "policy_rate"),
("gbp", "policy_rate"),
]
def check_all(state: dict) -> None:
for currency, indicator in WATCHLIST:
key = f"{currency}/{indicator}"
record = fetch_latest(currency, indicator)
if record and is_new_release(state, key, record):
send_webhook(WEBHOOK_URL, record, currency, indicator)
state[key] = record["announcement_datetime"]
save_state(state)
def main():
state = load_state()
# Run immediately on start, then every 5 minutes
check_all(state)
schedule.every(5).minutes.do(check_all, state=state)
while True:
schedule.run_pending()
time.sleep(30)
if __name__ == "__main__":
main()
Führen Sie den Bot von Ihrem Terminal aus:
python macro_alert_bot.py
Die erste Ausgabe wird belegt . alert_state.json Die Zukunft läuft nur dann auf Feuer, wenn etwas wirklich Neues erscheint.
Schritt 5 Hinzufügen von Benachrichtigungen vor der Veröffentlichung
Zu wissen , dass eine Freilassung passiert ist , ist nützlich; zu wissen , ist Es wird bald passieren. - Das ist wertvoll.
Endpunkt des Release-Kalenders
Das ist ein Schlagzeug . announcement_datetime Sie können die Daten für zukünftige Ereignisse, einschließlich der erwarteten Konsenswerte, verwenden, um eine vordefinierte Anzahl von Minuten vor dem Druck eine Vorwarnung auszulösen.
from datetime import datetime, timezone, timedelta
LEAD_MINUTES = 10 # fire a pre-alert this many minutes before the release
def fetch_calendar(currency: str) -> list[dict]:
try:
resp = requests.get(
f"{BASE_URL}/calendar/{currency}",
params={"api_key": API_KEY},
timeout=10,
)
resp.raise_for_status()
return resp.json().get("data", [])
except requests.RequestException as exc:
print(f"[WARN] calendar fetch failed for {currency}: {exc}")
return []
def check_upcoming(state: dict) -> None:
now = datetime.now(tz=timezone.utc)
for currency, _ in WATCHLIST:
for event in fetch_calendar(currency):
scheduled = event.get("announcement_datetime")
if not scheduled:
continue
try:
event_dt = datetime.fromisoformat(scheduled.replace("Z", "+00:00"))
except ValueError:
continue
# Fire pre-alert if the event is within the lead window and not yet fired
pre_key = f"pre:{currency}/{event.get('indicator')}:{scheduled}"
delta = event_dt - now
if timedelta(0) < delta <= timedelta(minutes=LEAD_MINUTES):
if pre_key not in state:
send_pre_alert(WEBHOOK_URL, event, currency, delta)
state[pre_key] = True
save_state(state)
def send_pre_alert(webhook_url: str, event: dict, currency: str, delta: timedelta) -> None:
ccy = currency.upper()
ind = event.get("indicator", "").replace("_", " ").title()
expected = event.get("expected")
prior = event.get("prior")
mins = int(delta.total_seconds() / 60)
lines = [
f"⏰ *{ccy} {ind}* due in ~{mins} min",
f" Expected: {expected} | Prior: {prior}",
]
message = "\n".join(lines)
if "discord.com" in webhook_url:
payload = {"content": message.replace("*", "**")}
else:
payload = {"text": message}
try:
resp = requests.post(webhook_url, json=payload, timeout=10)
resp.raise_for_status()
print(f"[OK] pre-alert sent for {ccy} {ind}")
except requests.RequestException as exc:
print(f"[ERROR] pre-alert delivery failed: {exc}")
Fügen Sie hinzu . check_upcoming Die Wahlplanung ist gleichzeitig mit der Wahlplanungen . check_all- Ich weiß .
def main():
state = load_state()
check_all(state)
check_upcoming(state)
schedule.every(5).minutes.do(check_all, state=state)
schedule.every(5).minutes.do(check_upcoming, state=state)
while True:
schedule.run_pending()
time.sleep(30)
- Schritt 6 -
Schritt 6 Einsatz als langfristiger Dienst
Für die Produktion möchten Sie, dass der Bot ohne Terminal-Session kontinuierlich ausgeführt wird.
Option A Systemd-Einheit (Linux-Server / VPS)
# /etc/systemd/system/macro-alert-bot.service
[Unit]
Description=FXMacroData Alert Bot
After=network.target
[Service]
Type=simple
User=youruser
WorkingDirectory=/opt/macro-alert-bot
ExecStart=/opt/macro-alert-bot/.venv/bin/python macro_alert_bot.py
Restart=always
RestartSec=30
Environment="FXMACRO_API_KEY=YOUR_FXMACRODATA_KEY"
Environment="WEBHOOK_URL=https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK"
[Install]
WantedBy=multi-user.target
sudo systemctl daemon-reload
sudo systemctl enable --now macro-alert-bot
Option B Docker-Container
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY macro_alert_bot.py .
CMD ["python", "macro_alert_bot.py"]
docker build -t macro-alert-bot .
docker run -d --name macro-alert-bot --restart unless-stopped \
-e FXMACRO_API_KEY="YOUR_FXMACRODATA_KEY" \
-e WEBHOOK_URL="https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK" \
macro-alert-bot
Voll . requirements.txt
requests>=2.31
schedule>=1.2
Zusammenfassung
Sie haben jetzt eine funktionierende Makro-Alarm-Pipeline.
- ✅ Umfragen der FXMacroData Ankündigungen Endpunkt alle 5 Minuten für neue Veröffentlichungen auf Ihrer Beobachtungsliste
- ✅ Aufrechterhaltener Zustand, um doppelte Warnungen bei Neustarts zu vermeiden
- ✅ Versendet formatierte Webhook-Nachrichten an Slack, Discord oder ein anderes HTTP-Ziel
- ✅ Feuer erzeugen mit Hilfe der Veröffentlichungskalender mit konfigurierbarer Vorlaufzeit
- ✅ Laufen kontinuierlich als systemd-Dienst oder Docker-Container
Nächste Schritte
- → Erweitern Sie die Beobachtungsliste um weitere Indikatoren aus dem API-Dokumentation KPI, Beschäftigung, Handelsbilanz und mehr
- → Hinzufügen eines Überraschungsfilters: nur Alarmieren, wenn
valabweicht vonexpectedum mehr als einen Schwellenwert zur Lärmreduzierung - → Kombinieren mit COT-Positionsdaten Um zu sehen, ob der Markt bereits für die Überraschung positioniert ist
- → Weiterleitung von Warnungen an einen KI-Agent (siehe OpenClaw-Integrationsführer) so kann sie die Veröffentlichung im Kontext interpretieren