elpresidente* писал(а): ↑Сб апр 18, 2026 1:18 pm
Думаю написать скрипт чтобы включал Cloudflare Non-Interactive JS Challenge автоматом при >1.5К/5мин активных пользователей. Не совсем ясно как определить что пора выключать.
Untitled.jpg
Untitled1.jpg
Супермозг все сразу сделал
Самый практичный вариант — не пытаться искать у Cloudflare отдельный API-тумблер именно для “Non-Interactive JS Challenge”, а автоматически включать Under Attack mode через Configuration Rules API. В актуальной документации Cloudflare Under Attack включается через security_level = "under_attack" в фазе http_config_settings. Cloudflare также разделяет JavaScript Detections и обычные Challenge Pages: JavaScript Detections — это отдельный невидимый JS-механизм для HTML-страниц, он не действует на API/mobile трафик. Для custom rules Cloudflare поддерживает, в том числе, Managed Challenge.
Ниже готовый Python-скрипт. Он:
раз в запуск читает ваш метрик-endpoint с числом активных пользователей за 5 минут;
если значение > 1500, включает правило Cloudflare с security_level=under_attack;
если значение < 1200, выключает правило обратно;
использует гистерезис, чтобы защита не “дёргалась” туда-сюда около порога.
Что нужно заранее
1. Cloudflare API token с правом Zone > Config Rules > Edit. Это прямо указано в документации для Configuration Rules API.
2. ZONE_ID
3. hostname, который хотите защищать, например example.com
4. ваш endpoint метрики, который возвращает JSON вроде:
{"active_users_5m": 1684}
---
Скрипт
#!/usr/bin/env python3
import os
import sys
import json
import requests
from typing import Dict, Any, List
CF_API_BASE = "
https://api.cloudflare.com/client/v4"
CF_API_TOKEN = os.environ["CF_API_TOKEN"]
CF_ZONE_ID = os.environ["CF_ZONE_ID"]
CF_HOSTNAME = os.environ["CF_HOSTNAME"] # example.com
METRIC_URL = os.environ["METRIC_URL"] #
http://127.0.0.1:9000/metrics/active-users-5m
# Порог включения / выключения
ENABLE_THRESHOLD = int(os.getenv("ENABLE_THRESHOLD", "1500"))
DISABLE_THRESHOLD = int(os.getenv("DISABLE_THRESHOLD", "1200"))
# Стабильный ref, чтобы правило можно было безопасно находить и обновлять
RULE_REF = "auto_under_attack_on_peak_users"
RULE_DESCRIPTION = "Auto-enable Under Attack when active_users_5m is high"
session = requests.Session()
session.headers.update({
"Authorization": f"Bearer {CF_API_TOKEN}",
"Content-Type": "application/json",
})
def cf_get(path: str) -> Dict[str, Any]:
r = session.get(f"{CF_API_BASE}{path}", timeout=30)
r.raise_for_status()
data = r.json()
if not data.get("success", False):
raise RuntimeError(f"Cloudflare GET failed: {json.dumps(data, ensure_ascii=False)}")
return data
def cf_put(path: str, payload: Dict[str, Any]) -> Dict[str, Any]:
r = session.put(f"{CF_API_BASE}{path}", json=payload, timeout=30)
r.raise_for_status()
data = r.json()
if not data.get("success", False):
raise RuntimeError(f"Cloudflare PUT failed: {json.dumps(data, ensure_ascii=False)}")
return data
def get_active_users_5m() -> int:
r = requests.get(METRIC_URL, timeout=15)
r.raise_for_status()
data = r.json()
value = data.get("active_users_5m")
if value is None:
raise RuntimeError("Metric response must contain 'active_users_5m'")
return int(value)
def get_config_entrypoint() -> Dict[str, Any]:
# Актуальный entrypoint API для phase ruleset
# GET /zones/{zone_id}/rulesets/phases/http_config_settings/entrypoint
return cf_get(f"/zones/{CF_ZONE_ID}/rulesets/phases/http_config_settings/entrypoint")["result"]
def build_rule(enabled: bool) -> Dict[str, Any]:
return {
"ref": RULE_REF,
"description": RULE_DESCRIPTION,
"expression": f'http.host eq "{CF_HOSTNAME}"',
"action": "set_config",
"enabled": enabled,
"action_parameters": {
"security_level": "under_attack"
}
}
def upsert_rule(ruleset: Dict[str, Any], enable_rule: bool) -> bool:
"""
Возвращает True, если были изменения в Cloudflare.
"""
existing_rules: List[Dict[str, Any]] = ruleset.get("rules", [])
changed = False
found = False
new_rules: List[Dict[str, Any]] = []
for rule in existing_rules:
if rule.get("ref") == RULE_REF:
found = True
current_enabled = bool(rule.get("enabled", True))
desired_enabled = enable_rule
# Обновляем только если действительно нужно
desired_rule = dict(rule)
desired_rule["description"] = RULE_DESCRIPTION
desired_rule["expression"] = f'http.host eq "{CF_HOSTNAME}"'
desired_rule["action"] = "set_config"
desired_rule["enabled"] = desired_enabled
desired_rule["action_parameters"] = {"security_level": "under_attack"}
if (
current_enabled != desired_enabled
or rule.get("expression") != desired_rule["expression"]
or rule.get("action") != "set_config"
or rule.get("action_parameters", {}).get("security_level") != "under_attack"
or rule.get("description") != RULE_DESCRIPTION
):
changed = True
new_rules.append(desired_rule)
else:
new_rules.append(rule)
else:
new_rules.append(rule)
if not found:
new_rules.append(build_rule(enable_rule))
changed = True
if not changed:
return False
cf_put(
f"/zones/{CF_ZONE_ID}/rulesets/phases/http_config_settings/entrypoint",
{
"rules": new_rules
}
)
return True
def current_rule_enabled(ruleset: Dict[str, Any]) -> bool:
for rule in ruleset.get("rules", []):
if rule.get("ref") == RULE_REF:
return bool(rule.get("enabled", True))
return False
def decide(current_enabled: bool, active_users_5m: int) -> bool:
"""
Гистерезис:
- если уже включено, выключаем только ниже DISABLE_THRESHOLD
- если выключено, включаем только выше ENABLE_THRESHOLD
"""
if current_enabled:
if active_users_5m < DISABLE_THRESHOLD:
return False
return True
else:
if active_users_5m > ENABLE_THRESHOLD:
return True
return False
def main() -> int:
active_users = get_active_users_5m()
ruleset = get_config_entrypoint()
current_enabled = current_rule_enabled(ruleset)
desired_enabled = decide(current_enabled, active_users)
changed = upsert_rule(ruleset, desired_enabled)
print(json.dumps({
"active_users_5m": active_users,
"current_enabled": current_enabled,
"desired_enabled": desired_enabled,
"changed": changed
}, ensure_ascii=False))
return 0
if __name__ == "__main__":
try:
raise SystemExit(main())
except Exception as e:
print(f"ERROR: {e}", file=sys.stderr)
raise SystemExit(1)
---
Как запускать
Через cron раз в минуту
* * * * * /usr/bin/env CF_API_TOKEN=... CF_ZONE_ID=... CF_HOSTNAME=example.com METRIC_URL=
http://127.0.0.1:9000/metrics/active-users-5m /usr/bin/python3 /opt/cf-auto-uam.py >> /var/log/cf-auto-uam.log 2>&1
Или через systemd timer
Это лучше, чем cron, если хотите нормальные логи и перезапуски.
---
Почему я предложил именно так
Cloudflare документирует, что:
Under Attack mode сейчас управляется через security_level в API/config rules; значения включают under_attack.
Configuration Rules надо деплоить в фазу http_config_settings, с действием set_config.
JavaScript Detections — отдельный механизм, работающий через невидимый injected JS для HTML/page views, а не универсальный on/off-переключатель “challenge all traffic”.
Для WAF custom rules доступны challenge-действия, включая Managed Challenge.
---
Если нужен именно challenge, а не Under Attack
Тогда лучше сделать второй вариант: не set_config/security_level, а custom rule в фазе http_request_firewall_custom с действием Managed Challenge и просто включать/выключать это правило по тому же порогу. Cloudflare явно указывает, что custom rules работают в фазе http_request_firewall_custom, а action challenge/managed challenge там поддерживается.
Могу сразу дать и вторую версию скрипта — именно под Managed Challenge вместо Under Attack.