Os lançamentos de dados macro estão entre os catalisadores mais confiáveis nos mercados de câmbio. Uma impressão do IPC 0,2% acima do consenso pode enviar o EUR/USD deslizando 60 pips antes mesmo do cabeçalho das notícias carregar. Se o seu sistema de alerta depende de verificações manuais do calendário ou de pesquisas baseadas em cronômetro amplo, você já está atrás. Este guia mostra como construir um pipeline de alertas preciso, baseado em webhook, que detecta novos lançamento FXMacroData no momento em que eles aparecem e entrega notificações para Slack, Discord ou qualquer endpoint HTTP tudo em menos de 80 linhas de Python.
O que você vai construir
- Um ciclo de votação que verifica o ponto final de anúncios FXMacroData num intervalo configurável e detecta valores publicados recentemente
- Um despachante de webhook que dispara um POST HTTP para Slack, Discord ou um endpoint personalizado sempre que uma nova versão é detectada
- Um alerta prévio do calendário de lançamento que avisa minutos antes de um evento de alto impacto é devido para que você esteja pronto antes que o número impressos
- Persistência do estado usando um arquivo JSON leve para que o bot nunca faça o mesmo alerta duas vezes em reinicializações
Requisitos
- Python 3.9+
- Chave da API do FXMacroData Inscreva-se em / subscrever e copiar a chave do painel
- URL de webhook criar um Slack webhook de entrada em Aplicações de softwareOu pegue um canal do Discord por baixo . Configurações → Integrações → Webhooks
- Pacotes Python- Não .
requests- Não .schedule
pip install requests schedule
Armazenar todas as credenciais como variáveis de ambiente nunca chaves de código rígido em arquivos de origem:
export FXMACRO_API_KEY="YOUR_FXMACRODATA_KEY"
export WEBHOOK_URL="https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK"
Pesquisa vs. empurrão: escolher o padrão certo
As assinaturas de webhook puro exigem um endpoint de servidor público e um provedor de dados que envia notificações de saída. circuito de votação leve (o seu código pede a API periodicamente) com um Dispatcher de webhook (o seu código empurra o resultado para baixo no momento em que algo novo aparece).
Isso lhe dá a confiabilidade dos dados baseados em pull (sem pushes perdidos se o seu servidor estiver fora de funcionamento) com a imediatização da entrega de webhook para as ferramentas que sua equipe já usa Slack, Discord, PagerDuty, n8n ou qualquer alvo HTTP.
- Passo 1 -
Passo 1 Obter o valor de anúncio mais recente
O ... Anúncios ponto final
Retorna o valor mais recente de qualquer indicador e moeda.
announcement_datetime um selo de tempo UTC de segundo nível que informa exatamente quando este valor foi publicado. Você usará este selo como a chave de deduplicação: se ele tiver mudado desde a sua última verificação, uma nova versão foi impressa.
import os
import requests
BASE_URL = "https://fxmacrodata.com/api/v1"
API_KEY = os.environ["FXMACRO_API_KEY"]
def fetch_latest(currency: str, indicator: str) -> dict | None:
"""Return the most recent announcement record, or None on failure."""
try:
resp = requests.get(
f"{BASE_URL}/announcements/{currency}/{indicator}",
params={"api_key": API_KEY},
timeout=10,
)
resp.raise_for_status()
data = resp.json().get("data", [])
return data[0] if data else None
except requests.RequestException as exc:
print(f"[WARN] fetch failed for {currency}/{indicator}: {exc}")
return None
Uma amostra de resposta parece:
{
"date": "2026-04-02",
"val": 4.35,
"prior": 4.10,
"announcement_datetime": "2026-04-02T02:30:00Z",
"currency": "aud",
"indicator": "policy_rate"
}
- O passo 2 .
Passo 2 Estado de rastreamento para evitar alertas duplicados
Uma reinicialização do seu processo não deve reiniciar alertas para libertações que você já viu. announcement_datetime Para cada indicador observado em um simples arquivo JSON. No início o bot carrega este arquiwo; após cada novo alerta, ele escreve o carimbo de tempo atualizado de volta.
import json
from pathlib import Path
STATE_FILE = Path("alert_state.json")
def load_state() -> dict:
if STATE_FILE.exists():
return json.loads(STATE_FILE.read_text())
return {}
def save_state(state: dict) -> None:
STATE_FILE.write_text(json.dumps(state, indent=2))
def is_new_release(state: dict, key: str, record: dict) -> bool:
"""Return True if the announcement_datetime is newer than what we last saw."""
last_seen = state.get(key)
current = record.get("announcement_datetime")
return current is not None and current != last_seen
- Passo 3 -
Passo 3 Enviar uma notificação de webhook
Tanto os webhooks de entrada do Slack quanto os web hooks do Discord aceitam um POST HTTP com uma carga útil JSON. A função abaixo constrói uma mensagem formatada e a envia. text Discord usa
contentO auxiliar adapta-se automaticamente com base no prefixo URL.
def send_webhook(webhook_url: str, record: dict, currency: str, indicator: str) -> None:
"""POST a formatted macro-alert message to a Slack or Discord webhook."""
value = record.get("val")
prior = record.get("prior")
dt = record.get("announcement_datetime", "")
ccy = currency.upper()
ind = indicator.replace("_", " ").title()
lines = [
f"📣 *{ccy} {ind}* just printed",
f" Value : *{value}* | Prior: {prior}",
f" Released: {dt}",
]
message = "\n".join(lines)
# Discord uses "content", Slack uses "text"
if "discord.com" in webhook_url:
payload = {"content": message.replace("*", "**")}
else:
payload = {"text": message}
try:
resp = requests.post(webhook_url, json=payload, timeout=10)
resp.raise_for_status()
print(f"[OK] alert sent for {ccy} {ind}")
except requests.RequestException as exc:
print(f"[ERROR] webhook delivery failed: {exc}")
Dica: alvos HTTP personalizados
O mesmo . send_webhook Função funciona com qualquer endpoint HTTP que aceite POST n8n fluxos de trabalho de automação, cenários Make (Integromat), APIs de eventos PagerDuty, ou o seu próprio receptor Flask/FastAPI. WEBHOOK_URL para o URL de destino e ajustar a forma da carga útil.
Passo 4 Observar vários indicadores num ciclo de sondagem
Defina os pares de moedas/indicadores que lhe interessam e execute a verificação em intervalos regulares.
schedule A biblioteca torna isto simples sem um cron daemon.
import schedule
import time
WEBHOOK_URL = os.environ["WEBHOOK_URL"]
# Pairs to monitor — add or remove as needed
WATCHLIST = [
("usd", "policy_rate"),
("usd", "inflation"),
("usd", "non_farm_payrolls"),
("eur", "policy_rate"),
("eur", "inflation"),
("aud", "policy_rate"),
("gbp", "policy_rate"),
]
def check_all(state: dict) -> None:
for currency, indicator in WATCHLIST:
key = f"{currency}/{indicator}"
record = fetch_latest(currency, indicator)
if record and is_new_release(state, key, record):
send_webhook(WEBHOOK_URL, record, currency, indicator)
state[key] = record["announcement_datetime"]
save_state(state)
def main():
state = load_state()
# Run immediately on start, then every 5 minutes
check_all(state)
schedule.every(5).minutes.do(check_all, state=state)
while True:
schedule.run_pending()
time.sleep(30)
if __name__ == "__main__":
main()
Execute o bot do seu terminal:
python macro_alert_bot.py
A primeira corrida é cheia . alert_state.json com os valores atuais mais recentes, por isso o futuro só corre fogo quando algo genuinamente novo aparece.
Passo 5 Adicionar alertas de contagem regressiva antes da liberação
Saber que uma libertação aconteceu é útil; saber que é Está prestes a acontecer . É valioso.
ponto final do calendário de liberação
expõe o planeado . announcement_datetime Para eventos futuros, incluindo os valores de consenso esperados.
from datetime import datetime, timezone, timedelta
LEAD_MINUTES = 10 # fire a pre-alert this many minutes before the release
def fetch_calendar(currency: str) -> list[dict]:
try:
resp = requests.get(
f"{BASE_URL}/calendar/{currency}",
params={"api_key": API_KEY},
timeout=10,
)
resp.raise_for_status()
return resp.json().get("data", [])
except requests.RequestException as exc:
print(f"[WARN] calendar fetch failed for {currency}: {exc}")
return []
def check_upcoming(state: dict) -> None:
now = datetime.now(tz=timezone.utc)
for currency, _ in WATCHLIST:
for event in fetch_calendar(currency):
scheduled = event.get("announcement_datetime")
if not scheduled:
continue
try:
event_dt = datetime.fromisoformat(scheduled.replace("Z", "+00:00"))
except ValueError:
continue
# Fire pre-alert if the event is within the lead window and not yet fired
pre_key = f"pre:{currency}/{event.get('indicator')}:{scheduled}"
delta = event_dt - now
if timedelta(0) < delta <= timedelta(minutes=LEAD_MINUTES):
if pre_key not in state:
send_pre_alert(WEBHOOK_URL, event, currency, delta)
state[pre_key] = True
save_state(state)
def send_pre_alert(webhook_url: str, event: dict, currency: str, delta: timedelta) -> None:
ccy = currency.upper()
ind = event.get("indicator", "").replace("_", " ").title()
expected = event.get("expected")
prior = event.get("prior")
mins = int(delta.total_seconds() / 60)
lines = [
f"⏰ *{ccy} {ind}* due in ~{mins} min",
f" Expected: {expected} | Prior: {prior}",
]
message = "\n".join(lines)
if "discord.com" in webhook_url:
payload = {"content": message.replace("*", "**")}
else:
payload = {"text": message}
try:
resp = requests.post(webhook_url, json=payload, timeout=10)
resp.raise_for_status()
print(f"[OK] pre-alert sent for {ccy} {ind}")
except requests.RequestException as exc:
print(f"[ERROR] pre-alert delivery failed: {exc}")
Adicione . check_upcoming para o calendário de votação ao lado check_all- Não .
def main():
state = load_state()
check_all(state)
check_upcoming(state)
schedule.every(5).minutes.do(check_all, state=state)
schedule.every(5).minutes.do(check_upcoming, state=state)
while True:
schedule.run_pending()
time.sleep(30)
- O passo 6 .
Etapa 6 Implementação como serviço de longa duração
Para uso em produção, você vai querer que o bot seja executado continuamente sem uma sessão de terminal. Abaixo estão os dois padrões de implantação mais comuns.
Opção A unidade de sistema (servidor Linux / VPS)
# /etc/systemd/system/macro-alert-bot.service
[Unit]
Description=FXMacroData Alert Bot
After=network.target
[Service]
Type=simple
User=youruser
WorkingDirectory=/opt/macro-alert-bot
ExecStart=/opt/macro-alert-bot/.venv/bin/python macro_alert_bot.py
Restart=always
RestartSec=30
Environment="FXMACRO_API_KEY=YOUR_FXMACRODATA_KEY"
Environment="WEBHOOK_URL=https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK"
[Install]
WantedBy=multi-user.target
sudo systemctl daemon-reload
sudo systemctl enable --now macro-alert-bot
Opção B Container Docker
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY macro_alert_bot.py .
CMD ["python", "macro_alert_bot.py"]
docker build -t macro-alert-bot .
docker run -d --name macro-alert-bot --restart unless-stopped \
-e FXMACRO_API_KEY="YOUR_FXMACRODATA_KEY" \
-e WEBHOOK_URL="https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK" \
macro-alert-bot
Cheio . requirements.txt
requests>=2.31
schedule>=1.2
Resumo
Agora tem um sistema de alerta macro funcional.
- ✅ Pesquisas do FXMacroData Anúncios ponto final a cada 5 minutos para novas versões na sua lista de vigilância
- ✅ Estado persistente para evitar alertas duplicados em reinicializações
- ✅ Envia mensagens de webhook formatadas para Slack, Discord ou qualquer alvo HTTP
- ✅ Fires pré-lançamento alertas de contagem regressiva usando o Calendário de lançamento com tempo de execução configurável
- ✅ Executa-se continuamente como um serviço systemd ou um contêiner Docker
Próximos passos
- → Aumentar a lista de vigilância com indicadores adicionais do Documentação da API IPC, emprego, balança comercial e muito mais
- → Adicionar um filtro surpresa: apenas alerta quando
valdesvia-se deexpectedPor mais de um limiar para reduzir o ruído - → Combinar com Dados de posicionamento COT Para contextualizar se o mercado já está posicionado para a surpresa
- → Alertas para um agente de IA (ver o Guia de integração do OpenClaw) para que possa interpretar a liberação no contexto