Macro data releases are among the most reliable catalysts in FX markets. A CPI print 0.2% above consensus can send EUR/USD sliding 60 pips before the news headline even loads. If your alert system relies on manual calendar checks or broad timer-based polling, you are already behind. This guide shows you how to build a precise, webhook-driven alert pipeline that detects new FXMacroData releases the moment they appear and delivers notifications to Slack, Discord, or any HTTP endpoint — all in under 80 lines of Python.
What you will build
- A polling loop that checks the FXMacroData announcements endpoint on a configurable interval and detects newly published values
- A webhook dispatcher that fires an HTTP POST to Slack, Discord, or a custom endpoint whenever a new release is detected
- A release-calendar pre-alert that warns you minutes before a high-impact event is due — so you are ready before the number prints
- State persistence using a lightweight JSON file so the bot never double-fires the same alert across restarts
Prerequisites
- Python 3.9+
- FXMacroData API key — sign up at /subscribe and copy your key from the dashboard
- A webhook URL — create a Slack incoming webhook at api.slack.com/apps, or grab a Discord channel webhook under Settings → Integrations → Webhooks
- Python packages:
requests,schedule
pip install requests schedule
Store all credentials as environment variables — never hard-code keys in source files:
export FXMACRO_API_KEY="YOUR_FXMACRODATA_KEY"
export WEBHOOK_URL="https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK"
Polling vs. push: choosing the right pattern
Pure webhook subscriptions require a public server endpoint and a data provider that sends outbound notifications. Most macro data APIs — including FXMacroData — are pull-based: you query the endpoint and receive the latest value. The practical pattern therefore combines a lightweight polling loop (your code asks the API periodically) with a webhook dispatcher (your code pushes the result downstream the moment something new appears).
This gives you the reliability of pull-based data (no missed pushes if your server is down) with the immediacy of webhook delivery to the tools your team already uses — Slack, Discord, PagerDuty, n8n, or any HTTP target.
Step 1 — Fetch the latest announcement value
The announcements endpoint
returns the most recent released value for any indicator and currency. The key field is
announcement_datetime — a second-level UTC timestamp that tells you exactly when this value
was published. You will use this timestamp as the deduplication key: if it has changed since your last
check, a new release has printed.
import os
import requests
BASE_URL = "https://fxmacrodata.com/api/v1"
API_KEY = os.environ["FXMACRO_API_KEY"]
def fetch_latest(currency: str, indicator: str) -> dict | None:
"""Return the most recent announcement record, or None on failure."""
try:
resp = requests.get(
f"{BASE_URL}/announcements/{currency}/{indicator}",
params={"api_key": API_KEY},
timeout=10,
)
resp.raise_for_status()
data = resp.json().get("data", [])
return data[0] if data else None
except requests.RequestException as exc:
print(f"[WARN] fetch failed for {currency}/{indicator}: {exc}")
return None
A sample response looks like:
{
"date": "2026-04-02",
"val": 4.35,
"prior": 4.10,
"announcement_datetime": "2026-04-02T02:30:00Z",
"currency": "aud",
"indicator": "policy_rate"
}
Step 2 — Track state to avoid duplicate alerts
A restart of your process should not re-fire alerts for releases you have already seen. Persist the last
known announcement_datetime for each watched indicator in a simple JSON file. On startup
the bot loads this file; after each new alert it writes the updated timestamp back.
import json
from pathlib import Path
STATE_FILE = Path("alert_state.json")
def load_state() -> dict:
if STATE_FILE.exists():
return json.loads(STATE_FILE.read_text())
return {}
def save_state(state: dict) -> None:
STATE_FILE.write_text(json.dumps(state, indent=2))
def is_new_release(state: dict, key: str, record: dict) -> bool:
"""Return True if the announcement_datetime is newer than what we last saw."""
last_seen = state.get(key)
current = record.get("announcement_datetime")
return current is not None and current != last_seen
Step 3 — Send a webhook notification
Both Slack incoming webhooks and Discord webhooks accept an HTTP POST with a JSON payload. The function
below builds a formatted message and dispatches it. Slack uses a text field; Discord uses
content. The helper adapts automatically based on the URL prefix.
def send_webhook(webhook_url: str, record: dict, currency: str, indicator: str) -> None:
"""POST a formatted macro-alert message to a Slack or Discord webhook."""
value = record.get("val")
prior = record.get("prior")
dt = record.get("announcement_datetime", "")
ccy = currency.upper()
ind = indicator.replace("_", " ").title()
lines = [
f"📣 *{ccy} {ind}* just printed",
f" Value : *{value}* | Prior: {prior}",
f" Released: {dt}",
]
message = "\n".join(lines)
# Discord uses "content", Slack uses "text"
if "discord.com" in webhook_url:
payload = {"content": message.replace("*", "**")}
else:
payload = {"text": message}
try:
resp = requests.post(webhook_url, json=payload, timeout=10)
resp.raise_for_status()
print(f"[OK] alert sent for {ccy} {ind}")
except requests.RequestException as exc:
print(f"[ERROR] webhook delivery failed: {exc}")
Tip: custom HTTP targets
The same send_webhook function works with any HTTP endpoint that accepts POST — n8n
automation workflows, Make (Integromat) scenarios, PagerDuty event APIs, or your own Flask/FastAPI
receiver. Just set WEBHOOK_URL to the target URL and adjust the payload shape.
Step 4 — Watch multiple indicators in a polling loop
Define the currency/indicator pairs you care about and run the check on a regular interval. The
schedule library makes this straightforward without a cron daemon.
import schedule
import time
WEBHOOK_URL = os.environ["WEBHOOK_URL"]
# Pairs to monitor — add or remove as needed
WATCHLIST = [
("usd", "policy_rate"),
("usd", "inflation"),
("usd", "non_farm_payrolls"),
("eur", "policy_rate"),
("eur", "inflation"),
("aud", "policy_rate"),
("gbp", "policy_rate"),
]
def check_all(state: dict) -> None:
for currency, indicator in WATCHLIST:
key = f"{currency}/{indicator}"
record = fetch_latest(currency, indicator)
if record and is_new_release(state, key, record):
send_webhook(WEBHOOK_URL, record, currency, indicator)
state[key] = record["announcement_datetime"]
save_state(state)
def main():
state = load_state()
# Run immediately on start, then every 5 minutes
check_all(state)
schedule.every(5).minutes.do(check_all, state=state)
while True:
schedule.run_pending()
time.sleep(30)
if __name__ == "__main__":
main()
Run the bot from your terminal:
python macro_alert_bot.py
The first run populates alert_state.json with the current latest values so future runs
only fire when something genuinely new appears.
Step 5 — Add pre-release countdown alerts
Knowing a release happened is useful; knowing it is about to happen is valuable. The
release calendar endpoint
exposes the scheduled announcement_datetime for future events, including expected consensus
values. Use this to fire a pre-alert a configurable number of minutes before the print.
from datetime import datetime, timezone, timedelta
LEAD_MINUTES = 10 # fire a pre-alert this many minutes before the release
def fetch_calendar(currency: str) -> list[dict]:
try:
resp = requests.get(
f"{BASE_URL}/calendar/{currency}",
params={"api_key": API_KEY},
timeout=10,
)
resp.raise_for_status()
return resp.json().get("data", [])
except requests.RequestException as exc:
print(f"[WARN] calendar fetch failed for {currency}: {exc}")
return []
def check_upcoming(state: dict) -> None:
now = datetime.now(tz=timezone.utc)
for currency, _ in WATCHLIST:
for event in fetch_calendar(currency):
scheduled = event.get("announcement_datetime")
if not scheduled:
continue
try:
event_dt = datetime.fromisoformat(scheduled.replace("Z", "+00:00"))
except ValueError:
continue
# Fire pre-alert if the event is within the lead window and not yet fired
pre_key = f"pre:{currency}/{event.get('indicator')}:{scheduled}"
delta = event_dt - now
if timedelta(0) < delta <= timedelta(minutes=LEAD_MINUTES):
if pre_key not in state:
send_pre_alert(WEBHOOK_URL, event, currency, delta)
state[pre_key] = True
save_state(state)
def send_pre_alert(webhook_url: str, event: dict, currency: str, delta: timedelta) -> None:
ccy = currency.upper()
ind = event.get("indicator", "").replace("_", " ").title()
expected = event.get("expected")
prior = event.get("prior")
mins = int(delta.total_seconds() / 60)
lines = [
f"⏰ *{ccy} {ind}* due in ~{mins} min",
f" Expected: {expected} | Prior: {prior}",
]
message = "\n".join(lines)
if "discord.com" in webhook_url:
payload = {"content": message.replace("*", "**")}
else:
payload = {"text": message}
try:
resp = requests.post(webhook_url, json=payload, timeout=10)
resp.raise_for_status()
print(f"[OK] pre-alert sent for {ccy} {ind}")
except requests.RequestException as exc:
print(f"[ERROR] pre-alert delivery failed: {exc}")
Add check_upcoming to the polling schedule alongside check_all:
def main():
state = load_state()
check_all(state)
check_upcoming(state)
schedule.every(5).minutes.do(check_all, state=state)
schedule.every(5).minutes.do(check_upcoming, state=state)
while True:
schedule.run_pending()
time.sleep(30)
Step 6 — Deploy as a long-running service
For production use you will want the bot to run continuously without a terminal session. Below are the two most common deployment patterns.
Option A — systemd unit (Linux server / VPS)
# /etc/systemd/system/macro-alert-bot.service
[Unit]
Description=FXMacroData Alert Bot
After=network.target
[Service]
Type=simple
User=youruser
WorkingDirectory=/opt/macro-alert-bot
ExecStart=/opt/macro-alert-bot/.venv/bin/python macro_alert_bot.py
Restart=always
RestartSec=30
Environment="FXMACRO_API_KEY=YOUR_FXMACRODATA_KEY"
Environment="WEBHOOK_URL=https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK"
[Install]
WantedBy=multi-user.target
sudo systemctl daemon-reload
sudo systemctl enable --now macro-alert-bot
Option B — Docker container
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY macro_alert_bot.py .
CMD ["python", "macro_alert_bot.py"]
docker build -t macro-alert-bot .
docker run -d --name macro-alert-bot --restart unless-stopped \
-e FXMACRO_API_KEY="YOUR_FXMACRODATA_KEY" \
-e WEBHOOK_URL="https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK" \
macro-alert-bot
Full requirements.txt
requests>=2.31
schedule>=1.2
Summary
You now have a working macro alert pipeline. The complete system:
- ✅ Polls the FXMacroData announcements endpoint every 5 minutes for new releases across your watchlist
- ✅ Persists state to avoid duplicate alerts across restarts
- ✅ Dispatches formatted webhook messages to Slack, Discord, or any HTTP target
- ✅ Fires pre-release countdown alerts using the release calendar with configurable lead time
- ✅ Runs continuously as a systemd service or Docker container
Next steps
- → Extend the watchlist with additional indicators from the API docs — CPI, employment, trade balance, and more
- → Add a surprise-filter: only alert when
valdeviates fromexpectedby more than a threshold to reduce noise - → Combine with COT positioning data to contextualise whether the market is already positioned for the surprise
- → Forward alerts to an AI agent (see the OpenClaw integration guide) so it can interpret the release in context