Commit d8401abf by zmops

初步提交思科采集平台代码

parents
# 这是 Meraki API Key 不可为空
#MERAKI_API_KEY=bbaa461f371d9eb323ba478028a9585624f2500c
MERAKI_API_KEY=715183336c6eecfa3924a95e07256c464ac9516d
# 组织 ID 可为空
ORGANIZATION_ID=
# Meraki API 基础 URL 可为空
#MERAKI_API_BASE=https://n3.dashboard.meraki.cn/api/v1
MERAKI_API_BASE=https://n190.dashboard.meraki.com/api/v1
# 是否开启debug模式
DJANGO_DEBUG=True
#redis 相关配置
#REDIS_URL=redis://14.103.242.161:6379/8
REDIS_HOST=14.103.242.161
#REDIS_HOST=127.0.0.1
REDIS_PORT=6379
REDIS_DB=9
REDIS_USERNAME=
REDIS_PASSWORD=redis@123
.venv
.trae
.logs
.idea/*
\ No newline at end of file
File added
#!/usr/bin/env python
"""Django's command-line utility for administrative tasks."""
import os
import sys
def main():
"""Run administrative tasks."""
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'meraki_Interface_forward.settings')
try:
from django.core.management import execute_from_command_line
except ImportError as exc:
raise ImportError(
"Couldn't import Django. Are you sure it's installed and "
"available on your PYTHONPATH environment variable? Did you "
"forget to activate a virtual environment?"
) from exc
execute_from_command_line(sys.argv)
if __name__ == '__main__':
main()
"""
ASGI config for meraki_Interface_forward project.
It exposes the ASGI callable as a module-level variable named ``application``.
For more information on this file, see
https://docs.djangoproject.com/en/5.2/howto/deployment/asgi/
"""
import os
from django.core.asgi import get_asgi_application
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'meraki_Interface_forward.settings')
application = get_asgi_application()
This source diff could not be displayed because it is too large. You can view the blob instead.
Not Found: /
Not Found: /favicon.ico
import os
import json
import time
import random
from typing import Any, Callable, Tuple, Optional
try:
import redis
_REDIS_AVAILABLE = True
except Exception:
redis = None
_REDIS_AVAILABLE = False
def _env_url():
return os.getenv("REDIS_URL")
def _env_host():
return os.getenv("REDIS_HOST", "127.0.0.1")
def _env_port():
return int(os.getenv("REDIS_PORT", "6379"))
def _env_db():
return int(os.getenv("REDIS_DB", "8"))
def _env_password():
return os.getenv("REDIS_PASSWORD")
def _env_username():
return os.getenv("REDIS_USERNAME")
def get_redis_client():
if not _REDIS_AVAILABLE:
return None
host = os.getenv("REDIS_HOST")
port = os.getenv("REDIS_PORT")
db = os.getenv("REDIS_DB")
username = _env_username()
password = _env_password()
if host or port or db or username or password:
return redis.StrictRedis(
host=_env_host(),
port=_env_port(),
db=_env_db(),
username=username,
password=password,
decode_responses=True,
)
url = _env_url()
if url:
return redis.StrictRedis.from_url(url, decode_responses=True)
return redis.StrictRedis(host=_env_host(), port=_env_port(), db=_env_db(), password=password, username=username, decode_responses=True)
def redis_ping():
client = get_redis_client()
if not client:
return {"connected": False, "error": "redis library not available"}
try:
ok = client.ping()
if ok:
return {"connected": True}
return {"connected": False}
except Exception as e:
return {"connected": False, "error": str(e)}
def set(key, value, ex=None):
c = get_redis_client()
if not c:
return False
return c.set(key, value, ex=ex)
def get(key):
c = get_redis_client()
if not c:
return None
return c.get(key)
def delete(key):
c = get_redis_client()
if not c:
return 0
return c.delete(key)
def exists(key):
c = get_redis_client()
if not c:
return 0
return c.exists(key)
def expire(key, seconds):
c = get_redis_client()
if not c:
return False
return c.expire(key, seconds)
def hset(name, key, value):
c = get_redis_client()
if not c:
return 0
return c.hset(name, key, value)
def hget(name, key):
c = get_redis_client()
if not c:
return None
return c.hget(name, key)
def set_json(key, obj, ex=None):
return set(key, json.dumps(obj, ensure_ascii=False), ex=ex)
def get_json(key):
val = get(key)
if not val:
return None
try:
return json.loads(val)
except Exception:
return None
def ttl(key):
"""Return remaining TTL seconds for key, or None if redis unavailable."""
c = get_redis_client()
if not c:
return None
try:
return c.ttl(key)
except Exception:
return None
def rate_limit_allow(name: str, limit: int = 10, window_seconds: int = 1):
"""
Simple sliding-window rate limiter using Redis INCR + EXPIRE.
Returns (allowed: bool, remaining: int).
- name: unique key for the rate limit bucket (e.g., rl:switch_ports:<serial>)
- limit: max number of requests allowed within window
- window_seconds: window duration in seconds
"""
c = get_redis_client()
# If Redis not available, allow request (best-effort fallback)
if not c:
return True, limit
try:
count = c.incr(name)
if count == 1:
# first hit in window, set expire
c.expire(name, window_seconds)
allowed = count <= limit
remaining = max(limit - count, 0)
return allowed, remaining
except Exception:
# On redis error, allow to avoid hard failure
return True, limit
def scan_delete(pattern: str, batch_size: int = 500) -> int:
"""
按模式批量删除 Redis key,返回删除的数量。
- pattern: 如 'device:*', 'uplink:*', 'lock:*' 等
- batch_size: 每次 scan 的数量
仅用于运维类清理操作,避免 KEYS 带来的阻塞风险。
"""
c = get_redis_client()
if not c:
return 0
deleted = 0
try:
cursor = 0
while True:
cursor, keys = c.scan(cursor=cursor, match=pattern, count=batch_size)
if keys:
deleted += c.delete(*keys)
if cursor == 0:
break
except Exception:
# 清理失败时静默返回当前已删数量,避免影响业务
return deleted
return deleted
def _lock_key_for(cache_key: str) -> str:
return f"lock:{cache_key}"
def acquire_lock(name: str, ttl_seconds: int = 10) -> bool:
"""
获取一个简单的分布式锁,使用 Redis SET NX + EX。
- name: 锁 key
- ttl_seconds: 锁失效时间,避免死锁
"""
c = get_redis_client()
if not c:
# 如果 Redis 不可用,则视为“拿不到锁”,避免逻辑过于复杂
return False
try:
# 使用随机值可以在需要时做更安全的解锁,这里简单处理:只要 key 存在就认为有锁
ok = c.set(name, str(time.time()), nx=True, ex=ttl_seconds)
return bool(ok)
except Exception:
return False
def release_lock(name: str) -> None:
"""释放锁,错误时静默失败。"""
c = get_redis_client()
if not c:
return
try:
c.delete(name)
except Exception:
return
def get_or_set_json_with_lock(
cache_key: str,
loader: Callable[[], Any],
ex: int,
lock_ttl: int = 10,
wait_timeout: float = 5.0,
wait_interval: float = 0.2,
) -> Tuple[Optional[Any], bool]:
"""
通用的“带锁缓存读取”,防止高并发下的缓存击穿/穿透。
返回 (value, loaded_from_source):
- value: 最终得到的值(可能为 None)
- loaded_from_source: 是否由本次调用触发 loader() 从源头加载
行为:
1. 先读缓存,命中直接返回。
2. 未命中时,尝试获取分布式锁:
- 成功:调用 loader(),将结果写回缓存并释放锁。
- 失败:在 wait_timeout 内循环等待,期间轮询缓存,直到有值或超时。
3. 等待超时仍未获取到值时,返回 (None, False)。上层可决定返回错误或空结构。
"""
# Step 1: 先查缓存
val = get_json(cache_key)
if val is not None:
return val, False
# Step 2: 竞争锁
lock_key = _lock_key_for(cache_key)
if acquire_lock(lock_key, ttl_seconds=lock_ttl):
try:
loaded = loader()
if loaded is not None:
set_json(cache_key, loaded, ex=ex)
return loaded, True
finally:
release_lock(lock_key)
# Step 3: 未拿到锁 → 轮询等待其他进程/线程填充缓存,避免缓存穿透
deadline = time.time() + wait_timeout
while time.time() < deadline:
time.sleep(wait_interval + random.uniform(0, wait_interval / 2))
val = get_json(cache_key)
if val is not None:
return val, False
# 等待超时,作为降级处理,交给上层决定怎么返回
return None, False
from enum import Enum
class CacheKey(Enum):
ORGANIZATIONS = "organizations"
NETWORKS = "networks"
DEVICES = "meraki_devices"
DEVICES_AVAILABILITIES = "devices_availabilities"
DEVICES_STATUSES_OVERVIEW = "devices_statuses_overview"
DEVICES_STATUSES = "devices_statuses"
DEVICES_UPLINKS_BY_DEVICE = "devices_uplinks_by_device"
WIRELESS_CHANNEL_UTILIZATION_BY_DEVICE = "wireless_channel_utilization_by_device"
ASSURANCE_ALERTS = "assurance_alerts"
\ No newline at end of file
# 入参 response
# 解析返回结构 list 或者 object
import logging
import json
logger = logging.getLogger(__name__)
def resultAnalysis(response):
result = {
"success": False,
"code": None,
"message": "",
"data": None
}
try:
# --- Step 1: 判断响应类型 ---
if hasattr(response, "status_code"): # requests.Response 类型
result["code"] = response.status_code
# 优先尝试解析 JSON 内容
try:
body = response.json()
except Exception:
body = response.text
# --- Step 2: 根据状态码判断 ---
if 200 <= response.status_code < 300:
result["success"] = True
result["message"] = "请求成功"
result["data"] = body
elif response.status_code == 401:
result["message"] = "认证失败或API Key无效"
elif response.status_code == 403:
result["message"] = "访问被拒绝,权限不足"
elif response.status_code == 404:
result["message"] = "请求的资源不存在"
elif response.status_code >= 500:
result["message"] = "服务器内部错误"
else:
result["message"] = f"请求失败,HTTP状态码 {response.status_code}"
elif isinstance(response, dict): # 已解析的 JSON 对象
code = response.get("code") or response.get("status") or 200
result["code"] = code
result["data"] = response
# --- Step 3: 判断业务状态 ---
if code in [200, 0]:
result["success"] = True
result["message"] = response.get("message", "请求成功")
else:
result["message"] = response.get("message", "业务处理失败")
elif isinstance(response, str):
# 若返回为字符串(可能是错误消息)
result["message"] = response
else:
result["message"] = "无法识别的响应类型"
except Exception as e:
logger.error(f"解析响应失败: {e}")
result["message"] = f"响应解析异常: {e}"
logger.info(f"解析结果 => success={result['success']}, code={result['code']}, message={result['message']}")
return result
"""
Services 模块:业务逻辑层
- meraki_service: Meraki API 调用服务
- cache_service: 缓存操作服务
"""
"""
缓存服务层:封装所有缓存相关操作
"""
import logging
from meraki_Interface_forward.redis_utils import (
set_json,
CacheKey,
)
logger = logging.getLogger("meraki_Interface_forward.services.cache_service")
# 单设备缓存前缀
DEVICE_CACHE_PREFIX = "device:"
UPLINK_CACHE_PREFIX = "uplink:"
STATUS_CACHE_PREFIX = "status:"
CHANNEL_UTILIZATION_CACHE_PREFIX = "channel_utilization:"
def cache_devices_by_serial(devices, ttl: int = 60 * 60 * 12) -> None:
"""
将 Meraki 设备列表按 serial 维度拆分,逐个写入 Redis。
- key: device:<serial>
- value: 单个设备 JSON
"""
if not isinstance(devices, list):
return
for dev in devices:
if not isinstance(dev, dict):
continue
serial = dev.get("serial")
if not serial:
continue
try:
set_json(f"{DEVICE_CACHE_PREFIX}{serial}", dev, ex=ttl)
except Exception:
continue
def cache_uplinks_by_serial(uplinks, ttl: int = 60 * 60 * 6) -> None:
"""将组织级 uplink 列表按 serial 拆分缓存"""
if not isinstance(uplinks, list):
return
for item in uplinks:
if not isinstance(item, dict):
continue
serial = item.get("serial")
if not serial:
continue
try:
set_json(f"{UPLINK_CACHE_PREFIX}{serial}", item, ex=ttl)
except Exception:
continue
def cache_status_by_serial(status_list, ttl: int = 65) -> None:
"""
将组织级设备状态列表按 serial 拆分缓存,只保存 status 字段值
- key: status:<serial>
- value: 设备的 status 字段值(字符串,如 "online", "offline" 等)
"""
if not isinstance(status_list, list):
return
for item in status_list:
if not isinstance(item, dict):
continue
serial = item.get("serial")
if not serial:
continue
status_val = item.get("status")
try:
set_json(f"{STATUS_CACHE_PREFIX}{serial}", status_val, ex=ttl)
except Exception:
continue
def cache_channel_utilization_by_serial(utilization_list, ttl: int = 300) -> None:
"""将组织级信道利用率列表按 serial 拆分缓存"""
if not isinstance(utilization_list, list):
return
for item in utilization_list:
if not isinstance(item, dict):
continue
serial = item.get("serial")
if not serial:
continue
try:
set_json(f"{CHANNEL_UTILIZATION_CACHE_PREFIX}{serial}", item, ex=ttl)
except Exception:
continue
"""
Meraki API 服务层:封装所有 Meraki API 调用
"""
import os
import meraki
import logging
from meraki_Interface_forward.redis_utils import CacheKey, set_json
logger = logging.getLogger("meraki_Interface_forward.services.meraki_service")
# Meraki API 配置
API_KEY = os.getenv("MERAKI_API_KEY")
ORGANIZATION_ID = os.getenv("ORGANIZATION_ID")
MERAKI_API_BASE = os.getenv("MERAKI_API_BASE")
dashboard = meraki.DashboardAPI(
api_key=API_KEY,
base_url=MERAKI_API_BASE,
output_log=True,
print_console=True,
suppress_logging=True,
wait_on_rate_limit=True,
nginx_429_retry_wait_time=5,
maximum_retries=3,
)
def get_organization_devices():
"""获取组织所有设备"""
return dashboard.organizations.getOrganizationDevices(
ORGANIZATION_ID,
perPage=1000,
total_pages='all'
)
def get_organization_networks():
"""获取组织所有网络"""
try:
networks = dashboard.organizations.getOrganizationNetworks(
ORGANIZATION_ID,
perPage=1000,
total_pages='all'
)
if len(networks):
set_json(CacheKey.NETWORKS.value, networks, ex=60*60*6)
return networks
except Exception as e:
msg = str(e)
if "Expecting value: line 1 column 1" in msg:
return []
logger.error(f"获取组织网络失败: {e}")
return None
return None
def get_organization_status():
"""获取组织所有设备状态"""
return dashboard.organizations.getOrganizationDevicesAvailabilities(
ORGANIZATION_ID,
perPage=1000,
total_pages='all'
)
def get_organization_status_overview():
"""获取组织所有设备状态概览"""
return dashboard.organizations.getOrganizationDevicesStatusesOverview(ORGANIZATION_ID)
def get_organization_alert():
"""获取组织所有设备告警"""
alerts = dashboard.organizations.getOrganizationAssuranceAlerts(
ORGANIZATION_ID, total_pages="all"
)
processed = []
if alerts:
for item in alerts or []:
scope = item.get("scope", {})
devices = scope.get("devices", [])
# 有多个设备 → 每个设备都生成一条数据
if devices:
for dev in devices:
processed.append({
"id": item.get("id"),
"categoryType": item.get("categoryType"),
"networkId": item.get("network", {}).get("id"),
"networkName": item.get("network", {}).get("name"),
"type": item.get("type"),
"title": item.get("title"),
"severity": item.get("severity"),
"deviceType": item.get("deviceType"),
"startedAt": item.get("startedAt"),
"resolvedAt": item.get("resolvedAt"),
"dismissedAt": item.get("dismissedAt"),
# 多设备的关键字段
"serial": dev.get("serial"),
"mac": dev.get("mac"),
"deviceName": dev.get("name"),
"port": dev.get("portIdentifier"),
"scope": scope
})
return processed
def get_organization_uplinks():
"""获取组织所有设备 uplink"""
return dashboard.organizations.getOrganizationDevicesUplinksAddressesByDevice(
ORGANIZATION_ID,
perPage=1000,
total_pages='all',
)
def get_organization_channel_utilization():
"""获取组织所有设备信道利用率"""
return dashboard.wireless.getOrganizationWirelessDevicesChannelUtilizationByDevice(
ORGANIZATION_ID,
perPage=1000,
total_pages='all',
timespan=3600
)
def get_device_switch_ports_status(serial):
"""获取单台交换机端口状态"""
return dashboard.switch.getDeviceSwitchPortsStatuses(
serial,
timespan=3600,
)
"""
Django settings for meraki_Interface_forward project.
Generated by 'django-admin startproject' using Django 5.2.8.
For more information on this file, see
https://docs.djangoproject.com/en/5.2/topics/settings/
For the full list of settings and their values, see
https://docs.djangoproject.com/en/5.2/ref/settings/
"""
from pathlib import Path
import os
import json
import logging
from logging.handlers import TimedRotatingFileHandler
from dotenv import load_dotenv
import meraki
from .redis_utils import redis_ping, set, get , CacheKey
# 加载项目根目录下的 .env 文件
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
load_dotenv(os.path.join(BASE_DIR, '.env'))
REDIS_URL = os.getenv("REDIS_URL")
REDIS_HOST = os.getenv("REDIS_HOST")
REDIS_PORT = os.getenv("REDIS_PORT")
REDIS_DB = os.getenv("REDIS_DB")
REDIS_USERNAME = os.getenv("REDIS_USERNAME")
REDIS_PASSWORD = os.getenv("REDIS_PASSWORD")
try:
import django_redis # type: ignore
except Exception:
raise RuntimeError("启动失败:未安装 django-redis,已将 Redis 设为强制依赖")
def _build_redis_location():
host = REDIS_HOST or "127.0.0.1"
port = int(REDIS_PORT or "6379")
db = int(REDIS_DB or "8")
if REDIS_HOST or REDIS_PORT or REDIS_DB or REDIS_USERNAME or REDIS_PASSWORD or not REDIS_URL:
auth = ""
if REDIS_USERNAME and REDIS_PASSWORD:
auth = f"{REDIS_USERNAME}:{REDIS_PASSWORD}@"
elif REDIS_PASSWORD and not REDIS_USERNAME:
auth = f":{REDIS_PASSWORD}@"
elif REDIS_USERNAME and not REDIS_PASSWORD:
auth = f"{REDIS_USERNAME}:@"
return f"redis://{auth}{host}:{port}/{db}"
return REDIS_URL
CACHES = {
'default': {
'BACKEND': 'django_redis.cache.RedisCache',
'LOCATION': _build_redis_location(),
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.DefaultClient',
'PASSWORD': REDIS_PASSWORD,
'USERNAME': REDIS_USERNAME,
}
}
}
redis_status = redis_ping()
if not redis_status.get("connected"):
raise RuntimeError("启动失败:Redis 连接失败" + (": " + str(redis_status.get("error")) if redis_status.get("error") else ""))
print("Redis 连接成功")
MERAKI_API_KEY = os.getenv("MERAKI_API_KEY")
ORGANIZATION_ID = os.getenv("ORGANIZATION_ID")
MERAKI_API_BASE = os.getenv("MERAKI_API_BASE", "https://n3.dashboard.meraki.cn/api/v1")
if not MERAKI_API_KEY:
raise RuntimeError("启动失败:未检测到 Meraki API Key,请在 .env 中设置 MERAKI_API_KEY")
else:
print(MERAKI_API_KEY + 'cv')
response = []
MERAKI_DEFAULT_ORG_ID = None
try:
dashboard = meraki.DashboardAPI(api_key=MERAKI_API_KEY, base_url=MERAKI_API_BASE, suppress_logging=True)
response = dashboard.organizations.getOrganizations()
if response:
if len(response) > 1:
#输出组织数量过多的警告日志
warning = '查询到的组织id数量为: " ' + str(len(response)) + '"默认取第一个'
RuntimeWarning(warning)
first = response[0]
if isinstance(first, dict):
MERAKI_DEFAULT_ORG_ID = first.get("id")
# 缓存进入redis
# response_json =
set(CacheKey.ORGANIZATIONS.value, json.dumps(first),ex=60*60*12)
except Exception as e:
status = getattr(e, "status", None)
if status == 401:
raise RuntimeError("启动失败:MERAKI_API_KEY 无效或无权限访问 Meraki API")
raise RuntimeError(f"启动失败:调用 Meraki API 失败:{e}")
if ORGANIZATION_ID:
ids = {str(org.get("id")) for org in response if isinstance(org, dict) and "id" in org}
if str(ORGANIZATION_ID) not in ids:
raise RuntimeWarning("启动失败:.env 中的 ORGANIZATION_ID 与 MERAKI_API_KEY 可访问组织不一致")
else:
ORGANIZATION_ID = MERAKI_DEFAULT_ORG_ID
os.environ["ORGANIZATION_ID"] = str(MERAKI_DEFAULT_ORG_ID)
RuntimeWarning("已将MERAKI_API_KEY 查询到的组织id赋值到env")
"""
简便写法
orgs = dashboard.organizations.getOrganizations()
print("所有组织信息: " + json.dumps(orgs, indent=2, ensure_ascii=False))
return JsonResponse(orgs[0],safe=False, json_dumps_params={'indent': 2, 'ensure_ascii': False})
"""
print("项目启动 检测参数成功 ")
# === Banner 打印 ===
print(r"""
------------------------------------------------------------
____ _ ____ _
| _ \(_)_ __ __ _ ___ _ __ | _ \ ___ | |_
| | | | | '_ \ / _` |/ _ \| '_ \ | | | |/ _ \| __|
| |_| | | | | | (_| | (_) | | | || |_| | (_) | |_
|____/|_|_| |_|\__, |\___/|_| |_||____/ \___/ \__|
|___/
------------------------------------------------------------
Django 启动成功!配置加载完成。
Redis 链接成功 组织、网络信息 缓存成功
MERAKI_API_KEY: 已检测
------------------------------------------------------------
""")
#开启临时缓存 [一级]
# CACHES = {
# 'default': {
# 'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
# 'LOCATION': 'meraki-local-cache'
# }
# }
# 开始缓存 networks
try:
networks = dashboard.organizations.getOrganizationNetworks(
ORGANIZATION_ID, total_pages='all'
)
print(len(networks))
if networks:
# 缓存进入redis
res = json.dumps(networks)
set(CacheKey.NETWORKS.value, res, ex=60*60*6)
except Exception as e:
print(f"缓存 networks 失败: {e}")
# Build paths inside the project like this: BASE_DIR / 'subdir'.
BASE_DIR = Path(__file__).resolve().parent.parent
# Quick-start development settings - unsuitable for production
# See https://docs.djangoproject.com/en/5.2/howto/deployment/checklist/
# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = 'django-insecure-n=zdl)_2yc=f6zdx(!jry4%w1o&47dyj3qxz_6h_nl3#t9i4+1'
# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = True
# 方式一:添加具体 IP(推荐生产环境)
#ALLOWED_HOSTS = ['127.0.0.1', 'localhost', '172.16.3.203']
# 方式二:允许所有主机(仅开发环境)
ALLOWED_HOSTS = ['*']
# Application definition
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
]
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
]
ROOT_URLCONF = 'meraki_Interface_forward.urls'
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [],
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
],
},
},
]
WSGI_APPLICATION = 'meraki_Interface_forward.wsgi.application'
# Database
# https://docs.djangoproject.com/en/5.2/ref/settings/#databases
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': BASE_DIR / 'db.sqlite3',
}
}
# Password validation
# https://docs.djangoproject.com/en/5.2/ref/settings/#auth-password-validators
AUTH_PASSWORD_VALIDATORS = [
{
'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
},
]
# Internationalization
# https://docs.djangoproject.com/en/5.2/topics/i18n/
LANGUAGE_CODE = 'en-us'
TIME_ZONE = 'UTC'
USE_I18N = True
USE_TZ = True
# Static files (CSS, JavaScript, Images)
# https://docs.djangoproject.com/en/5.2/howto/static-files/
STATIC_URL = 'static/'
# Default primary key field type
# https://docs.djangoproject.com/en/5.2/ref/settings/#default-auto-field
DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField'
# Logging configuration
# 日志目录:meraki_Interface_forward 包目录下 logs,按天滚动,保留 7 天
# 当天日志:meraki.log,历史日志:meraki.YYYYMMDD.log
import logging.handlers
from datetime import datetime
# 日志目录:meraki_Interface_forward/logs
LOG_DIR = os.path.join(BASE_DIR, "meraki_Interface_forward", "logs")
os.makedirs(LOG_DIR, exist_ok=True)
LOG_FILE_PATH = os.path.join(LOG_DIR, "meraki.log")
class CustomTimedRotatingFileHandler(logging.handlers.TimedRotatingFileHandler):
"""
自定义的按天滚动日志 handler,文件名格式:
- 当天:meraki.log
- 历史:meraki.YYYYMMDD.log
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# 设置后缀格式为 .%Y%m%d,TimedRotatingFileHandler 会自动追加
self.suffix = "%Y%m%d"
def namer(self, name):
"""
重写 namer 方法,将 meraki.log.YYYYMMDD 转换为 meraki.YYYYMMDD.log
"""
# name 格式:/path/to/meraki.log.YYYYMMDD
if name.endswith('.log'):
# 当前文件,直接返回
return name
# 历史文件:meraki.log.YYYYMMDD -> meraki.YYYYMMDD.log
base_dir = os.path.dirname(name)
base_name = os.path.basename(name)
# base_name 格式:meraki.log.YYYYMMDD
if '.log.' in base_name:
parts = base_name.split('.log.')
if len(parts) == 2:
# 提取日期部分
date_str = parts[1]
new_name = f"meraki.{date_str}.log"
return os.path.join(base_dir, new_name)
return name
LOGGING = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"verbose": {
"format": "[{levelname}] {asctime} {name} {message}",
"style": "{",
},
"simple": {
"format": "[{levelname}] {name} {message}",
"style": "{",
},
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"formatter": "verbose",
},
"file": {
"()": CustomTimedRotatingFileHandler,
"filename": LOG_FILE_PATH,
"when": "midnight",
"backupCount": 7,
"encoding": "utf-8",
},
},
"root": {
"handlers": ["console", "file"],
"level": "INFO",
},
"loggers": {
# 项目内代码统一使用 meraki_Interface_forward.* 命名空间
"meraki_Interface_forward": {
"handlers": ["console", "file"],
"level": "INFO",
"propagate": False,
},
},
}
"""
任务模块:数据同步任务(异步执行)
"""
import threading
import time
import logging
from datetime import datetime
from zoneinfo import ZoneInfo
from django.http import JsonResponse
from meraki_Interface_forward.resultAnalysis import logger
from meraki_Interface_forward.services.meraki_service import (
get_organization_networks,
get_organization_status_overview,
get_organization_status,
get_organization_alert,
get_organization_devices,
)
from meraki_Interface_forward.services.cache_service import cache_devices_by_serial
from meraki_Interface_forward.redis_utils import (
set_json,
get_json,
CacheKey,
)
# 任务状态 key
TASK_STATUS_KEY = "task:synchronization:status"
TASK_START_TIME_KEY = "task:synchronization:start_time"
TASK_FINISH_TIME_KEY = "task:synchronization:finish_time"
# 任务状态枚举
TASK_STATUS_RUNNING = "执行中"
TASK_STATUS_COMPLETED = "已完成"
TASK_STATUS_FAILED = "执行失败"
TASK_STATUS_IDLE = "空闲"
def _execute_synchronization():
"""
执行数据同步任务(内部函数,在后台线程中执行)
"""
start_time = time.time()
set_json(TASK_START_TIME_KEY, start_time, ex=3600) # 1小时过期
set_json(TASK_STATUS_KEY, TASK_STATUS_RUNNING, ex=3600)
try:
# 1. 同步网络
try:
networks = get_organization_networks()
if networks:
set_json(CacheKey.NETWORKS.value, networks, ex=60 * 60 * 6)
logger.info("网络数据同步完成")
except Exception as e:
logger.error(f"Failed to cache networks: {e}")
# 2. 同步设备状态概览
try:
status_overview = get_organization_status_overview()
if status_overview:
set_json(CacheKey.DEVICES_STATUSES_OVERVIEW.value, status_overview, ex=60 * 5)
logger.info("设备状态概览同步完成")
except Exception as e:
logger.error(f"Failed to cache status overview: {e}")
# 3. 同步设备状态
try:
status = get_organization_status()
if status:
set_json(CacheKey.DEVICES_STATUSES.value, status, ex=60 * 5)
logger.info("设备状态同步完成")
except Exception as e:
logger.error(f"Failed to cache status: {e}")
# 4. 同步告警
try:
alerts = get_organization_alert()
if alerts:
set_json(CacheKey.ASSURANCE_ALERTS.value, alerts, ex=60)
logger.info("告警数据同步完成")
except Exception as e:
logger.error(f"Failed to cache alert: {e}")
# 5. 同步设备列表(最耗时,可能超过60秒)
try:
devices = get_organization_devices()
if devices:
set_json(CacheKey.DEVICES.value, devices, ex=60 * 60 * 12)
# 同步按 serial 建立单设备缓存,便于高并发单设备查询
cache_devices_by_serial(devices, ttl=60 * 60 * 12)
logger.info(f"设备列表同步完成,共 {len(devices) if devices else 0} 台设备")
except Exception as e:
logger.error(f"Failed to cache devices: {e}")
# 任务完成
elapsed_time = time.time() - start_time
finish_ts = time.time()
set_json(TASK_STATUS_KEY, TASK_STATUS_COMPLETED, ex=3600)
set_json(TASK_FINISH_TIME_KEY, finish_ts, ex=3600)
logger.info(f"数据同步任务完成,耗时 {elapsed_time:.2f} 秒")
except Exception as e:
logger.exception("数据同步任务执行失败")
set_json(TASK_STATUS_KEY, TASK_STATUS_FAILED, ex=3600)
set_json(TASK_FINISH_TIME_KEY, time.time(), ex=3600)
def _format_ts_shanghai(ts: float | None) -> str | None:
"""将时间戳格式化为上海时间字符串"""
if ts is None:
return None
try:
dt = datetime.fromtimestamp(float(ts), tz=ZoneInfo("Asia/Shanghai"))
return dt.strftime("%Y-%m-%d %H:%M:%S")
except Exception:
return None
def synchronization_data(request):
"""
数据同步接口(异步执行)
返回当前任务状态:
- status: 执行状态(执行中/已完成/执行失败/空闲)
- elapsed_time: 执行时间(秒)
- message: 状态描述
- 支持查询模式:带 execute=1 时,只查询状态,不触发新任务
"""
try:
execute_only = request.GET.get("execute") == "1"
# 检查当前任务状态
current_status = get_json(TASK_STATUS_KEY) or TASK_STATUS_IDLE
start_time = get_json(TASK_START_TIME_KEY)
finish_time = get_json(TASK_FINISH_TIME_KEY)
finish_time_str = _format_ts_shanghai(finish_time)
# 如果任务正在执行中,返回当前状态
if current_status == TASK_STATUS_RUNNING:
elapsed_time = None
if start_time:
elapsed_time = round(time.time() - start_time, 2)
return JsonResponse(
{
"status": TASK_STATUS_RUNNING,
"elapsed_time": elapsed_time,
"message": "数据同步任务正在执行中,请稍后查询",
"last_finished_at": finish_time,
"last_finished_at_shanghai": finish_time_str,
},
safe=False,
json_dumps_params={'indent': 2, 'ensure_ascii': False},
)
# 仅查询模式:不触发新任务
if execute_only:
elapsed_time = None
if start_time and current_status == TASK_STATUS_RUNNING:
elapsed_time = round(time.time() - start_time, 2)
return JsonResponse(
{
"status": current_status,
"elapsed_time": elapsed_time,
"message": f"当前任务状态: {current_status}",
"last_finished_at": finish_time,
"last_finished_at_shanghai": finish_time_str,
},
safe=False,
json_dumps_params={'indent': 2, 'ensure_ascii': False},
)
# 如果任务空闲/已完成/失败,启动新任务并返回“创建成功”语句,附带上次完成时间
if current_status in [TASK_STATUS_COMPLETED, TASK_STATUS_FAILED, TASK_STATUS_IDLE]:
thread = threading.Thread(target=_execute_synchronization, daemon=True)
thread.start()
return JsonResponse(
{
"status": "已启动",
"elapsed_time": 0,
"message": "数据同步任务已创建并在后台执行",
"last_finished_at": finish_time,
"last_finished_at_shanghai": finish_time_str,
},
safe=False,
json_dumps_params={'indent': 2, 'ensure_ascii': False},
)
# 其他情况,返回当前状态
elapsed_time = None
if start_time:
elapsed_time = round(time.time() - start_time, 2)
return JsonResponse(
{
"status": current_status,
"elapsed_time": elapsed_time,
"message": f"当前任务状态: {current_status}",
"last_finished_at": finish_time,
"last_finished_at_shanghai": finish_time_str,
},
safe=False,
json_dumps_params={'indent': 2, 'ensure_ascii': False},
)
except Exception as e:
logger.exception("synchronization_data 调用失败")
return JsonResponse(
{"error": str(e), "status": TASK_STATUS_FAILED},
status=500,
safe=False,
json_dumps_params={'indent': 2, 'ensure_ascii': False},
)
"""
URL configuration for meraki_Interface_forward project.
The `urlpatterns` list routes URLs to views. For more information please see:
https://docs.djangoproject.com/en/5.2/topics/http/urls/
Examples:
Function views
1. Add an import: from my_app import views
2. Add a URL to urlpatterns: path('', views.home, name='home')
Class-based views
1. Add an import: from other_app.views import Home
2. Add a URL to urlpatterns: path('', Home.as_view(), name='home')
Including another URLconf
1. Import the include() function: from django.urls import include, path
2. Add a URL to urlpatterns: path('blog/', include('blog.urls'))
"""
from django.contrib import admin
from django.urls import path
from . import task
# 直接从子模块导入,避免 views.py 和 views/ 目录的命名冲突
from .views import device_views, status_views, cache_views
urlpatterns = [
# path('admin/', admin.site.urls),
path('discovery/host_prototype', device_views.discovery_host_prototype),
path('discovery/host_prototype/filter', device_views.discovery_host_prototype_filter),
path('all_host', device_views.get_device_by_serial_or_product_type),
path('device/status', status_views.get_device_status),
path('device/status/overview', status_views.get_device_status_overview),
path('device/ap/channel_utilization', device_views.get_device_ap_channelUtilization),
path('device/switch/ports/status', device_views.get_device_switch_ports_status),
path('device/uplinks', device_views.get_device_uplink),
path('device/alert', status_views.get_device_alert),
path('task', task.synchronization_data),
path('cache', cache_views.cache_get_device),
path('cache/clear', cache_views.clear_cache),
]
import random
from typing import Dict
from meraki_Interface_forward.redis_utils import get_redis_client
def collect_prefix_ttl_stats(pattern: str, sample_size: int = 10, max_scan_keys: int = 200) -> Dict[str, object]:
"""
收集指定前缀的 key 数量与 TTL 统计。
- 精确 count:全量 scan 计数(仅保存前 max_scan_keys 个用于采样)
- TTL 采样:最多 sample_size 个,使用 pipeline 批量查询
"""
client = get_redis_client()
if not client:
return {"count": 0, "sampled": 0, "minTtl": None, "avgTtl": None}
keys = []
count = 0
cursor = 0
try:
while True:
cursor, batch = client.scan(cursor=cursor, match=pattern, count=500)
batch = batch or []
count += len(batch)
# 仅保留前 max_scan_keys 个用于采样
for k in batch:
if len(keys) < max_scan_keys:
keys.append(k)
if cursor == 0:
break
except Exception:
return {"count": 0, "sampled": 0, "minTtl": None, "avgTtl": None}
if count == 0 or not keys:
return {"count": count, "sampled": 0, "minTtl": None, "avgTtl": None}
# 随机采样(进一步减少 TTL 查询)
actual_sample_size = min(len(keys), sample_size)
sampled_keys = random.sample(keys, actual_sample_size)
# 使用 pipeline 批量查询 TTL(一次性查询所有采样 key)
ttls = []
try:
pipe = client.pipeline()
for k in sampled_keys:
pipe.ttl(k)
results = pipe.execute()
for t in results:
if isinstance(t, int) and t >= 0:
ttls.append(t)
except Exception:
# pipeline 失败时回退到单个查询(但这种情况应该很少)
for k in sampled_keys[:5]: # 只查询前 5 个,避免超时
try:
t = client.ttl(k)
if isinstance(t, int) and t >= 0:
ttls.append(t)
except Exception:
continue
return {
"count": count,
"sampled": len(sampled_keys),
"minTtl": min(ttls) if ttls else None,
"avgTtl": round(sum(ttls) / len(ttls), 2) if ttls else None,
}
def build_prefix_stats(prefix_patterns: Dict[str, str], sample_size: int = 10) -> Dict[str, dict]:
"""
生成按前缀的 TTL 统计结果。
- 精确 count(全量 scan)
- device 前缀:采样 5,保留 200 个键用于采样
- 其他前缀:采样 5,保留 100 个键用于采样
"""
result = {}
for name, pattern in prefix_patterns.items():
if name == "device":
result[name] = collect_prefix_ttl_stats(pattern, sample_size=5, max_scan_keys=200)
else:
result[name] = collect_prefix_ttl_stats(pattern, sample_size=5, max_scan_keys=100)
return result
# from django.http import JsonResponse
# import meraki
# import os
# import traceback
# from django.core.cache import cache
# import time
# from django.dispatch import Signal, receiver
# from concurrent.futures import ThreadPoolExecutor, as_completed
# from xml.sax import parse
# import random
# import uuid
# import json
#
# # sike 设备类型 'appliance', 'camera', 'campusGateway', 'cellularGateway', 'secureConnect', 'sensor', 'switch', 'systemsManager', 'wireless' or 'wirelessController'
# from meraki_Interface_forward.resultAnalysis import resultAnalysis
# from meraki_Interface_forward.settings import response
#
# API_KEY = os.getenv("MERAKI_API_KEY")
# ORGANIZATION_ID = os.getenv("ORGANIZATION_ID")
# MERAKI_API_BASE = os.getenv("MERAKI_API_BASE")
# dashboard = meraki.DashboardAPI(
# api_key=API_KEY,
# base_url=MERAKI_API_BASE,
# output_log=True,
# suppress_logging=True,
# wait_on_rate_limit=True,
# nginx_429_retry_wait_time=3,
# maximum_retries=3 # 最大重试次数
# )
# # 定义信号
# data_fetched = Signal()
#
# # 创建线程池
# executor = ThreadPoolExecutor(max_workers=5) # 同时处理5个任务
#
#
# # @receiver(data_fetched)
# def handle_data_fetched(sender, **kwargs):
# data = kwargs.get("data")
# print("接收带事件 开始执行 获取设备的接口信息 : " + str(data))
#
# # 异步提交给线程池处理
# executor.submit(process_all_tasks, data)
#
#
# def process_all_tasks(data):
# futures = []
# combined_result = {}
#
# # 三个任务全部异步提交
# futures.append(executor.submit(get_organization_devices))
# futures.append(executor.submit(get_device_ports, data))
# futures.append(executor.submit(get_device_availabilities))
#
# print("执行到这")
# # 按完成顺序汇总(阻塞等待全部完成)
# for future in as_completed(futures):
# result = future.result()
# combined_result.update(result)
# # 你可以在此存储结果,例如存数据库或缓存
# # print("三个任务已全部完成,最终组装结果:")
#
#
# def discovery_host_prototype(request):
# try:
# print("获取到的组织ID: " + ORGANIZATION_ID)
# devices = get_organization_devices()
#
# print("获取到的设备数量 " + str(len(devices)))
# if len(devices):
#
# # 缓存数据
# cache.set("meraki_devices", devices, timeout=300)
# serial_array = []
# for device in devices:
# print(str(device['serial']))
# serial_array.append(device['serial'])
# # 发布事件
# data_fetched.send(sender=discovery_host_prototype, data=serial_array)
# return JsonResponse(devices, safe=False,
# json_dumps_params={'indent': 2, 'ensure_ascii': False})
#
# except Exception as e:
# print("获取到的设备信息; 发生异常:", e)
# traceback.print_exc()
# return JsonResponse({"error": str(e)}, status=500)
#
#
# def get_organization_devices():
# devices = dashboard.organizations.getOrganizationDevices(
# ORGANIZATION_ID,
# perPage=1000,
# total_pages='all'
# )
# return devices
#
#
# def get_organization_networks():
# try:
# networks = dashboard.organizations.getOrganizationNetworks(
# ORGANIZATION_ID,
# perPage=1000,
# total_pages='all'
# )
# print("networks 属性 ", type(networks))
# if len(networks):
# cache.set("meraki_networks", networks, timeout=3600)
# return networks
# except Exception as e:
# msg = str(e)
# if "Expecting value: line 1 column 1" in msg:
# return []
# traceback.print_exc()
# print('缓存组织下的networks失败' + msg)
# return []
#
#
# def cache_get_device(request):
# res = cache.get("meraki_devices")
# data_is_valid = cache.touch("meraki_devices", timeout=None)
#
# return JsonResponse(
# {"device": res[0], "dataIsValid": data_is_valid} if data_is_valid else {"error": "当前缓存数据已超时 !!"},
# safe=False,
# json_dumps_params={'indent': 2, 'ensure_ascii': False})
#
#
# """ async 批量获取设备的端口信息 """
#
#
# def get_device_ports(serial_array):
# res_serial_array = []
# res_serial_dict = {}
# if not serial_array:
# return None
# else:
# for serial in serial_array[:3]:
# response = dashboard.switch.getDeviceSwitchPorts(serial)
# if response:
# res_serial_dict.update({serial: response})
# # time.sleep(0.05)
# # print(str(response))
# return res_serial_dict
#
#
# def get_device_availabilities():
# try:
# device = {}
# res = dashboard.organizations.getOrganizationDevicesAvailabilities(ORGANIZATION_ID)
# if not res:
# return {}
# else:
# for dev in res:
# device.update({dev['serial']: dev['status']})
# return device
#
# except Exception as e:
# traceback.print_exc()
# return JsonResponse({"error": str(e)}, status=500)
#
#
# def get_device_by_serial_or_product_type(request):
# if request.method != 'GET':
# return JsonResponse({"message": "Method Not Allowed"}, status=405)
#
# try:
# serial = request.GET.get('serial')
# product_type = request.GET.get('productType')
#
# if not serial or not product_type:
# return JsonResponse({"message": "缺少查询参数 serial 或 productType"}, status=400)
#
# print(f"请求参数:serial={serial}, productType={product_type}")
# network_map = {}
# # 1. 获取 Network 映射
# networks = get_organization_networks()
# if networks:
# print("获取到的网络数量:", len(networks))
# network_map = {net["id"]: net["name"] for net in networks}
#
# # 2. 获取缓存中 Meraki 设备
# meraki_devices = cache.get("meraki_devices")
#
# # 3. 若缓存有数据,从缓存中查
# if meraki_devices:
# for device in meraki_devices:
#
# # 安全获取 networkId
# net_id = device['networkId']
# if network_map:
# device["networkName"] = network_map.get(net_id)
#
# # 匹配设备
# if (
# device.get("serial") == serial
# and device.get("productType") == product_type
# ):
# return JsonResponse(
# {"device": device},
# safe=False,
# json_dumps_params={"indent": 2, "ensure_ascii": False},
# )
#
# # 4. 缓存未命中 → 调 Meraki API 查询单设备
# print("缓存未命中,调用 Meraki API 查询设备:", serial)
# try:
# device = dashboard.devices.getDevice(serial)
# except Exception as api_err:
# print("Meraki API 查询失败:", api_err)
# return JsonResponse({"message": "Meraki API 查询失败或设备不存在"}, status=404)
#
# if device:
# net_id = device.get("networkId")
# device["networkName"] = network_map.get(net_id)
#
# return JsonResponse(
# {"device": device},
# safe=False,
# json_dumps_params={"indent": 2, "ensure_ascii": False},
# )
#
# # 5. 找不到设备
# return JsonResponse(
# {"error": "未找到匹配的设备"},
# status=404
# )
#
# except Exception as e:
# traceback.print_exc()
# return JsonResponse({"error": str(e)}, status=500)
#
#
# def get_device_status_overview(request):
# try:
# res = dashboard.organizations.getOrganizationDevicesStatusesOverview(ORGANIZATION_ID)
# if res:
# return JsonResponse(res, safe=False,
# json_dumps_params={'indent': 2, 'ensure_ascii': False})
# except Exception as e:
# traceback.print_exc()
# return JsonResponse({"error": str(e)}, status=500)
#
#
# def get_device_status(request):
# serial = request.GET.get('serial')
# product_type = request.GET.get('productType')
#
# try:
# res = dashboard.organizations.getOrganizationDevicesAvailabilities(ORGANIZATION_ID, perPage=1000,
# total_pages='all')
#
# cache.set("devices_status", res, timeout=30)
# if res:
# for device in res:
# if device['serial'] == serial and device['productType'] == product_type:
# return JsonResponse({"status": device['status']}, safe=False,
# json_dumps_params={'indent': 2, 'ensure_ascii': False})
# except Exception as e:
# traceback.print_exc()
# return JsonResponse({"error": str(e)}, status=500)
#
#
# def get_device_uplink(request):
# serial = request.GET.get('serial')
# product_type = request.GET.get('productType')
# try:
#
# res = dashboard.organizations.getOrganizationDevicesUplinksAddressesByDevice(
# ORGANIZATION_ID,
# perPage=1000,
# total_pages='all',
# )
#
# if res:
# for device in res:
# if device['serial'] == serial and device['productType'] == product_type:
# return JsonResponse(device, safe=False,
# json_dumps_params={'indent': 2, 'ensure_ascii': False})
# except Exception as e:
# traceback.print_exc()
# return JsonResponse({"error": str(e)}, status=500)
#
#
# def get_device_ap_channelUtilization(request):
# try:
# serial = request.GET.get('serial')
# if not serial:
# return JsonResponse({"message": "缺少查询参数 serial"}, status=400)
# res = dashboard.wireless.getOrganizationWirelessDevicesChannelUtilizationByDevice(
# ORGANIZATION_ID,
# perPage=1000,
# total_pages='all',
# timespan=3600
# )
# if isinstance(res, list) and res:
# for device in res:
# if device.get('serial') == serial:
# return JsonResponse(device, safe=False, json_dumps_params={'indent': 2, 'ensure_ascii': False})
# else:
# return JsonResponse({"error": "无可用的信道利用率数据"}, status=404)
#
# except Exception as e:
# msg = str(e)
# if "Expecting value: line 1 column 1" in msg:
# return JsonResponse({"error": "Meraki 返回空响应或非 JSON,可能为 204/HTML 页面。请检查 base_url、组织访问权限,或稍后重试。"}, status=502)
# traceback.print_exc()
# return JsonResponse({"error": msg}, status=500)
#
#
# def get_device_switch_ports_status(request):
# try:
# serial = request.GET.get('serial')
# if not serial:
# return JsonResponse({"message": "缺少查询参数 serial"}, status=400)
# res = dashboard.switch.getDeviceSwitchPortsStatuses(
# serial,
# timespan=3600
# )
# if res:
# return JsonResponse(res, safe=False, json_dumps_params={'indent': 2, 'ensure_ascii': False})
#
# except Exception as e:
# traceback.print_exc()
# return JsonResponse({"error": str(e)}, status=500)
\ No newline at end of file
"""
Views 模块:视图层
- device_views: 设备相关视图
- status_views: 状态相关视图
- cache_views: 缓存管理视图
"""
"""
缓存管理视图
"""
import logging
from django.http import JsonResponse
from meraki_Interface_forward.redis_utils import (
get_json,
ttl,
delete,
CacheKey,
scan_delete,
get_redis_client,
)
from meraki_Interface_forward.utils import build_prefix_stats
from meraki_Interface_forward.services.cache_service import (
DEVICE_CACHE_PREFIX,
UPLINK_CACHE_PREFIX,
STATUS_CACHE_PREFIX,
CHANNEL_UTILIZATION_CACHE_PREFIX,
)
logger = logging.getLogger("meraki_Interface_forward.views.cache_views")
def cache_get_device(request):
"""
查询缓存健康度(极速优化版):
- 返回各集中缓存 key 的剩余 TTL(CacheKey.*)
- 针对按前缀拆分的缓存(device/uplink/status/channel/switch_ports/rl/lock)
返回数量、抽样 5 条的平均与最小 TTL,极速检查空前缀
- 兼容返回一个示例设备数据,供快速验证
"""
# 集中缓存 TTL(使用 pipeline 批量查询,减少网络往返)
client = get_redis_client()
cache_keys_ttl = {}
if client:
try:
pipe = client.pipeline()
cache_key_list = [
CacheKey.ORGANIZATIONS.value,
CacheKey.NETWORKS.value,
CacheKey.DEVICES.value,
CacheKey.DEVICES_AVAILABILITIES.value,
CacheKey.DEVICES_STATUSES_OVERVIEW.value,
CacheKey.DEVICES_STATUSES.value,
CacheKey.DEVICES_UPLINKS_BY_DEVICE.value,
CacheKey.WIRELESS_CHANNEL_UTILIZATION_BY_DEVICE.value,
CacheKey.ASSURANCE_ALERTS.value,
]
for key in cache_key_list:
pipe.ttl(key)
ttl_results = pipe.execute()
cache_keys_ttl = {
"organizations": ttl_results[0] if len(ttl_results) > 0 else -2,
"networks": ttl_results[1] if len(ttl_results) > 1 else -2,
"devices": ttl_results[2] if len(ttl_results) > 2 else -2,
"devices_availabilities": ttl_results[3] if len(ttl_results) > 3 else -2,
"devices_statuses_overview": ttl_results[4] if len(ttl_results) > 4 else -2,
"devices_statuses": ttl_results[5] if len(ttl_results) > 5 else -2,
"devices_uplinks_by_device": ttl_results[6] if len(ttl_results) > 6 else -2,
"wireless_channel_utilization_by_device": ttl_results[7] if len(ttl_results) > 7 else -2,
"assurance_alerts": ttl_results[8] if len(ttl_results) > 8 else -2,
}
except Exception:
# pipeline 失败时回退到单个查询
cache_keys_ttl = {
"organizations": ttl(CacheKey.ORGANIZATIONS.value),
"networks": ttl(CacheKey.NETWORKS.value),
"devices": ttl(CacheKey.DEVICES.value),
"devices_availabilities": ttl(CacheKey.DEVICES_AVAILABILITIES.value),
"devices_statuses_overview": ttl(CacheKey.DEVICES_STATUSES_OVERVIEW.value),
"devices_statuses": ttl(CacheKey.DEVICES_STATUSES.value),
"devices_uplinks_by_device": ttl(CacheKey.DEVICES_UPLINKS_BY_DEVICE.value),
"wireless_channel_utilization_by_device": ttl(CacheKey.WIRELESS_CHANNEL_UTILIZATION_BY_DEVICE.value),
"assurance_alerts": ttl(CacheKey.ASSURANCE_ALERTS.value),
}
else:
# Redis 不可用时返回默认值
cache_keys_ttl = {k: -2 for k in [
"organizations", "networks", "devices", "devices_availabilities",
"devices_statuses_overview", "devices_statuses", "devices_uplinks_by_device",
"wireless_channel_utilization_by_device", "assurance_alerts"
]}
# 前缀类缓存统计(极速优化:空前缀只扫描 10 个 key)
prefix_stats = build_prefix_stats(
{
"device": f"{DEVICE_CACHE_PREFIX}*",
"uplink": f"{UPLINK_CACHE_PREFIX}*",
"status": f"{STATUS_CACHE_PREFIX}*",
"channel_utilization": f"{CHANNEL_UTILIZATION_CACHE_PREFIX}*",
"switch_ports_status": "switch_ports_status:*",
"rate_limit": "rl:*",
"lock": "lock:*",
}
)
# 兼容返回示例设备与数据有效性(只在 devices 缓存有效时获取)
sample_device = None
remaining_ttl = cache_keys_ttl.get("devices")
data_is_valid = isinstance(remaining_ttl, int) and remaining_ttl > 0
if data_is_valid:
devices = get_json(CacheKey.DEVICES.value)
if isinstance(devices, list) and devices:
sample_device = devices[0]
return JsonResponse(
{
"dataIsValid": bool(sample_device),
"sampleDevice": sample_device,
"cacheKeysTtl": cache_keys_ttl,
"prefixStats": prefix_stats,
},
safe=False,
json_dumps_params={"indent": 2, "ensure_ascii": False},
)
def clear_cache(request):
"""
清理 Redis 中非必要的缓存 key,主要用于测试/运维场景下清除脏数据。
不会删除业务以外的 key,只清除本项目约定的缓存前缀和 CacheKey。
"""
try:
# 1. 删除基于 CacheKey 的集中缓存
cache_keys = [
CacheKey.ORGANIZATIONS.value,
CacheKey.NETWORKS.value,
CacheKey.DEVICES.value,
CacheKey.DEVICES_AVAILABILITIES.value,
CacheKey.DEVICES_STATUSES_OVERVIEW.value,
CacheKey.DEVICES_STATUSES.value,
CacheKey.DEVICES_UPLINKS_BY_DEVICE.value,
CacheKey.WIRELESS_CHANNEL_UTILIZATION_BY_DEVICE.value,
CacheKey.ASSURANCE_ALERTS.value,
]
deleted_cache_keys = 0
for k in cache_keys:
try:
deleted_cache_keys += delete(k) or 0
except Exception:
continue
# 2. 删除按 serial 维度的单设备缓存 & 限流 / 锁 key
patterns = [
f"{DEVICE_CACHE_PREFIX}*",
f"{UPLINK_CACHE_PREFIX}*",
f"{STATUS_CACHE_PREFIX}*",
f"{CHANNEL_UTILIZATION_CACHE_PREFIX}*",
"rl:*", # 限流 key
"lock:*", # 分布式锁 key
]
pattern_deleted = {}
for p in patterns:
count = scan_delete(p)
pattern_deleted[p] = count
return JsonResponse(
{
"message": "缓存清理完成",
"deleted_cache_keys": deleted_cache_keys,
"deleted_by_pattern": pattern_deleted,
},
safe=False,
json_dumps_params={'indent': 2, 'ensure_ascii': False},
)
except Exception as e:
logger.exception("clear_cache 调用失败")
return JsonResponse({"error": str(e)}, status=500)
"""
设备相关视图
"""
import logging
import json
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from meraki_Interface_forward.redis_utils import (
get_json,
set_json,
CacheKey,
get_or_set_json_with_lock,
)
from meraki_Interface_forward.services.meraki_service import (
get_organization_devices,
get_organization_uplinks,
get_organization_channel_utilization,
get_device_switch_ports_status as fetch_device_switch_ports_status,
)
from meraki_Interface_forward.services.cache_service import (
DEVICE_CACHE_PREFIX,
UPLINK_CACHE_PREFIX,
CHANNEL_UTILIZATION_CACHE_PREFIX,
cache_devices_by_serial,
cache_uplinks_by_serial,
cache_channel_utilization_by_serial,
)
from meraki_Interface_forward.redis_utils import rate_limit_allow
logger = logging.getLogger("meraki_Interface_forward.views.device_views")
def discovery_host_prototype(request):
"""设备发现接口"""
try:
# 先读 Redis
devices = get_json(CacheKey.DEVICES.value)
if devices:
return JsonResponse(devices, safe=False, json_dumps_params={'indent': 2, 'ensure_ascii': False})
# 缓存未命中,回退 Meraki API,并顺便按 serial 建立索引
devices = get_organization_devices()
if devices:
set_json(CacheKey.DEVICES.value, devices, ex=43200)
cache_devices_by_serial(devices, ttl=43200)
return JsonResponse(devices, safe=False, json_dumps_params={'indent': 2, 'ensure_ascii': False})
return JsonResponse([], safe=False, json_dumps_params={'indent': 2, 'ensure_ascii': False})
except Exception as e:
logger.exception("discovery_host_prototype 调用失败")
return JsonResponse({"error": str(e)}, status=500)
@csrf_exempt
def discovery_host_prototype_filter(request):
"""
POST 接口:支持按网络名称过滤设备(过滤命中的网络,将设备剔除)
body 示例:
{
"filter_networks": ["网络1", "网络2"]
}
"""
if request.method != "POST":
return JsonResponse({"message": "Method Not Allowed"}, status=405)
try:
body = {}
if request.body:
body = json.loads(request.body.decode("utf-8"))
filter_networks = body.get("filter_networks") or []
filter_names = set(filter_networks) if isinstance(filter_networks, list) else set()
# 获取网络列表(缓存优先)
networks = get_json(CacheKey.NETWORKS.value) or []
if isinstance(networks, str):
try:
networks = json.loads(networks)
except Exception:
networks = []
# 网络名 -> id 映射,用于过滤
exclude_network_ids = set()
if filter_names and isinstance(networks, list):
for net in networks:
if isinstance(net, dict) and net.get("name") in filter_names:
nid = net.get("id")
if nid:
exclude_network_ids.add(nid)
# 获取设备(缓存优先,未命中则回源)
devices = get_json(CacheKey.DEVICES.value)
if not devices:
devices = get_organization_devices()
if devices:
set_json(CacheKey.DEVICES.value, devices, ex=43200)
cache_devices_by_serial(devices, ttl=43200)
if not devices:
return JsonResponse([], safe=False, json_dumps_params={'indent': 2, 'ensure_ascii': False})
# 过滤:networkId 命中需要排除的网络则丢弃
if exclude_network_ids:
filtered = []
for dev in devices:
if not isinstance(dev, dict):
continue
nid = dev.get("networkId")
if nid in exclude_network_ids:
continue
filtered.append(dev)
devices = filtered
return JsonResponse(devices, safe=False, json_dumps_params={'indent': 2, 'ensure_ascii': False})
except Exception as e:
logger.exception("discovery_host_prototype_filter 调用失败")
return JsonResponse({"error": str(e)}, status=500)
def get_device_by_serial_or_product_type(request):
"""
根据 serial + productType 查询单个设备信息。
优化点:
- 使用按 serial 维度的单设备缓存(device:<serial>),O(1) 查询
- 若单设备缓存未命中,则通过带锁的全量缓存一次性拉取并拆分写入
"""
if request.method != 'GET':
return JsonResponse({"message": "Method Not Allowed"}, status=405)
try:
serial = request.GET.get('serial')
product_type = request.GET.get('productType')
if not serial or not product_type:
return JsonResponse({"message": "缺少查询参数 serial 或 productType"}, status=400)
logger.info("get_device_by_serial_or_product_type 请求: serial=%s, productType=%s", serial, product_type)
network_map = {}
# 1. 获取 Network 映射
networks = get_json(CacheKey.NETWORKS.value)
if networks:
network_map = {net["id"]: net["name"] for net in networks}
# 2. 优先通过单设备缓存(device:<serial>)查询,O(1) 时间复杂度
cache_key = f"{DEVICE_CACHE_PREFIX}{serial}"
device = get_json(cache_key)
if device and device.get("productType") == product_type:
net_id = device.get("networkId")
if network_map and net_id:
device["networkName"] = network_map.get(net_id)
logger.debug("单设备缓存命中: serial=%s", serial)
return JsonResponse(
{"device": device},
safe=False,
json_dumps_params={"indent": 2, "ensure_ascii": False},
)
# 3. 单设备缓存未命中 → 尝试从全量列表缓存中按需回填一条
meraki_devices = get_json(CacheKey.DEVICES.value)
if isinstance(meraki_devices, list) and meraki_devices:
logger.debug(
"单设备缓存未命中,尝试在全量缓存中查找并回填: serial=%s, 当前全量缓存条数=%d",
serial,
len(meraki_devices),
)
for dev in meraki_devices:
if (
isinstance(dev, dict)
and dev.get("serial") == serial
and dev.get("productType") == product_type
):
# 命中后,顺便写回单设备缓存
cache_devices_by_serial([dev], ttl=43200)
net_id = dev.get("networkId")
if network_map and net_id:
dev["networkName"] = network_map.get(net_id)
logger.info("在全量缓存中命中设备并已回填: serial=%s", serial)
return JsonResponse(
{"device": dev},
safe=False,
json_dumps_params={"indent": 2, "ensure_ascii": False},
)
# 4. 单设备缓存与全量缓存都未命中 → 使用带锁的全量刷新
def loader():
logger.info("单设备缓存未命中,准备触发全量设备加载: serial=%s", serial)
devs = get_organization_devices()
if devs:
set_json(CacheKey.DEVICES.value, devs, ex=43200)
cache_devices_by_serial(devs, ttl=43200)
return devs
_, _ = get_or_set_json_with_lock(
CacheKey.DEVICES.value,
loader=loader,
ex=43200,
lock_ttl=30,
wait_timeout=5.0,
wait_interval=0.2,
)
# 5. 再次尝试从单设备缓存读取
device = get_json(cache_key)
if device and device.get("productType") == product_type:
net_id = device.get("networkId")
if network_map and net_id:
device["networkName"] = network_map.get(net_id)
logger.info("通过全量加载后在缓存中找到设备: serial=%s", serial)
return JsonResponse(
{"device": device},
safe=False,
json_dumps_params={"indent": 2, "ensure_ascii": False},
)
# 6. 依然未找到设备
logger.warning("未找到匹配的设备: serial=%s, productType=%s", serial, product_type)
return JsonResponse(
{"error": "未找到匹配的设备"},
status=404,
safe=False,
json_dumps_params={"indent": 2, "ensure_ascii": False},
)
except Exception as e:
logger.exception("get_device_by_serial_or_product_type 调用失败")
return JsonResponse({"error": str(e)}, status=500)
def get_device_uplink(request):
"""获取设备 uplink 信息"""
serial = request.GET.get('serial')
product_type = request.GET.get('productType')
try:
if not serial or not product_type:
return JsonResponse({"message": "缺少查询参数 serial 或 productType"}, status=400)
# 先读单设备 uplink 缓存,O(1) 命中
uplink_cache_key = f"{UPLINK_CACHE_PREFIX}{serial}"
uplink = get_json(uplink_cache_key)
if uplink and uplink.get("productType") == product_type:
return JsonResponse(
uplink,
safe=False,
json_dumps_params={'indent': 2, 'ensure_ascii': False},
)
# 单设备缓存未命中 → 读取组织级 uplink 列表(带锁防击穿)
def loader():
data = get_organization_uplinks()
if data:
cache_uplinks_by_serial(data, ttl=43200)
return data
res, _ = get_or_set_json_with_lock(
CacheKey.DEVICES_UPLINKS_BY_DEVICE.value,
loader=loader,
ex=43200,
)
if not res:
return JsonResponse(
{},
safe=False,
json_dumps_params={'indent': 2, 'ensure_ascii': False},
)
# 在组织级列表中按需查找本设备,并回填单设备缓存
for device in res:
if (
isinstance(device, dict)
and device.get('serial') == serial
and device.get('productType') == product_type
):
cache_uplinks_by_serial([device], ttl=43200)
return JsonResponse(
device,
safe=False,
json_dumps_params={'indent': 2, 'ensure_ascii': False},
)
# 未找到匹配 uplink
return JsonResponse(
{"error": "未找到设备 uplink 信息", "serial": serial, "productType": product_type},
status=404,
json_dumps_params={'indent': 2, 'ensure_ascii': False},
)
except Exception as e:
logger.exception("get_device_uplink 调用失败")
return JsonResponse({"error": str(e)}, status=500)
def get_device_ap_channelUtilization(request):
"""获取设备 AP 信道利用率"""
try:
serial = request.GET.get('serial')
if not serial:
return JsonResponse({"message": "缺少查询参数 serial"}, status=400)
# 1. 优先读单设备信道利用率缓存
channel_cache_key = f"{CHANNEL_UTILIZATION_CACHE_PREFIX}{serial}"
channel_data = get_json(channel_cache_key)
if channel_data:
return JsonResponse(
channel_data,
safe=False,
json_dumps_params={'indent': 2, 'ensure_ascii': False},
)
# 2. 单设备缓存未命中 → 从全量信道利用率列表缓存中按需查找并回填
res = get_json(CacheKey.WIRELESS_CHANNEL_UTILIZATION_BY_DEVICE.value)
if isinstance(res, list) and res:
logger.debug(
"单设备信道利用率缓存未命中,尝试在全量缓存中查找并回填: serial=%s, 当前全量缓存条数=%d",
serial,
len(res),
)
for device in res:
if isinstance(device, dict) and device.get('serial') == serial:
cache_channel_utilization_by_serial([device], ttl=300)
return JsonResponse(
device,
safe=False,
json_dumps_params={'indent': 2, 'ensure_ascii': False},
)
# 3. 全量缓存也未命中 → 使用带锁的全量刷新
def loader():
data = get_organization_channel_utilization()
if data:
cache_channel_utilization_by_serial(data, ttl=300)
return data
res, _ = get_or_set_json_with_lock(
CacheKey.WIRELESS_CHANNEL_UTILIZATION_BY_DEVICE.value,
loader=loader,
ex=43200,
)
if not res:
# 冷启动/源数据为空,返回 503 提示稍后重试
return JsonResponse(
{"error": "信道利用率数据尚未准备好,请稍后重试"},
status=503,
json_dumps_params={'indent': 2, 'ensure_ascii': False},
)
# 4. 全量刷新后,再次尝试从单设备缓存读取
channel_data = get_json(channel_cache_key)
if channel_data:
return JsonResponse(
channel_data,
safe=False,
json_dumps_params={'indent': 2, 'ensure_ascii': False},
)
# 5. 依然未找到设备信道利用率数据
return JsonResponse(
{"error": "未找到指定设备的信道利用率数据", "serial": serial},
status=404,
json_dumps_params={'indent': 2, 'ensure_ascii': False},
)
except Exception as e:
msg = str(e)
if "Expecting value: line 1 column 1" in msg:
return JsonResponse(
{"error": "Meraki 返回空响应或非 JSON,可能为 204/HTML 页面。请检查 base_url、组织访问权限,或稍后重试。"},
status=502,
json_dumps_params={'indent': 2, 'ensure_ascii': False},
)
logger.exception("get_device_ap_channelUtilization 调用失败")
return JsonResponse({"error": msg}, status=500)
def get_device_switch_ports_status(request):
"""
获取单台交换机端口状态。
优化点:
- 优先读缓存,避免高并发下直接打穿 Meraki API
- 使用 Redis 分布式锁避免缓存击穿
- 仍保留每设备限流,保护后端和 Meraki
"""
try:
serial = request.GET.get('serial')
if not serial:
return JsonResponse({"message": "缺少查询参数 serial"}, status=400)
cache_key = f"switch_ports_status:{serial}"
def loader():
# 每设备接口限流:8次/秒
allowed, remaining = rate_limit_allow(f"rl:switch_ports:{serial}", limit=8, window_seconds=1)
if not allowed:
logger.warning(
"switch_ports_status 限流触发: serial=%s limit=%d remaining=%d",
serial, 8, remaining
)
return None
# 真正调用 Meraki API
res = fetch_device_switch_ports_status(serial)
# Meraki 可能返回空列表,统一返回 list 以便缓存
return res if res is not None else []
# 使用带锁缓存读取,TTL 60 秒
data, loaded_from_source = get_or_set_json_with_lock(
cache_key,
loader=loader,
ex=60,
lock_ttl=10,
wait_timeout=5.0,
wait_interval=0.2,
)
if data is None:
return JsonResponse(
{"error": "当前请求过多或后端暂时不可用,请稍后重试"},
status=503,
)
# data 至少是 list(可能为空),保证接口幂等
return JsonResponse(data, safe=False, json_dumps_params={'indent': 2, 'ensure_ascii': False})
except Exception as e:
logger.exception("get_device_switch_ports_status 调用失败")
return JsonResponse({"error": str(e)}, status=500)
"""
状态相关视图
"""
import logging
from django.http import JsonResponse
from meraki_Interface_forward.redis_utils import (
get_json,
CacheKey,
get_or_set_json_with_lock,
)
from meraki_Interface_forward.services.meraki_service import (
get_organization_status,
get_organization_status_overview,
get_organization_alert,
)
from meraki_Interface_forward.services.cache_service import (
STATUS_CACHE_PREFIX,
cache_status_by_serial,
)
logger = logging.getLogger("meraki_Interface_forward.views.status_views")
def get_device_status(request):
"""获取设备状态"""
serial = request.GET.get('serial')
product_type = request.GET.get('productType')
try:
if not serial or not product_type:
return JsonResponse(
{"message": "缺少查询参数 serial 或 productType"},
status=400,
json_dumps_params={'indent': 2, 'ensure_ascii': False},
)
# 1. 优先读单设备状态缓存(status:<serial>),O(1) 命中,TTL 65 秒
status_cache_key = f"{STATUS_CACHE_PREFIX}{serial}"
status_val = get_json(status_cache_key)
if status_val is not None:
# 缓存命中,直接返回
return JsonResponse(
{"status": status_val},
safe=False,
json_dumps_params={'indent': 2, 'ensure_ascii': False},
)
# 2. 单设备缓存未命中 → 从全量状态列表缓存中按需查找并回填
res = get_json(CacheKey.DEVICES_AVAILABILITIES.value)
if isinstance(res, list) and res:
logger.debug(
"单设备状态缓存未命中,尝试在全量状态缓存中查找并回填: serial=%s, 当前全量缓存条数=%d",
serial,
len(res),
)
for device in res:
if (
isinstance(device, dict)
and device.get("serial") == serial
and device.get("productType") == product_type
):
status_val = device.get("status")
# 回填单设备状态缓存(只保存 status 值)
cache_status_by_serial([device], ttl=65)
return JsonResponse(
{"status": status_val},
safe=False,
json_dumps_params={'indent': 2, 'ensure_ascii': False},
)
# 3. 全量缓存也未命中 → 使用带锁的全量刷新,批量保存所有设备状态
def loader():
data = get_organization_status()
if data:
# 一次性批量保存所有设备的状态到单设备缓存(只保存 status 值)
cache_status_by_serial(data, ttl=65)
return data
res, _ = get_or_set_json_with_lock(
CacheKey.DEVICES_AVAILABILITIES.value,
loader=loader,
ex=43200,
)
if not res:
return JsonResponse(
{"error": "当前无法获取设备状态,请稍后重试"},
status=503,
json_dumps_params={'indent': 2, 'ensure_ascii': False},
)
# 4. 全量刷新后,再次尝试从单设备缓存读取
status_val = get_json(status_cache_key)
if status_val is not None:
return JsonResponse(
{"status": status_val},
safe=False,
json_dumps_params={'indent': 2, 'ensure_ascii': False},
)
# 5. 依然未找到设备状态
return JsonResponse(
{"error": "未找到指定设备的状态信息", "serial": serial, "productType": product_type},
status=404,
json_dumps_params={'indent': 2, 'ensure_ascii': False},
)
except Exception as e:
logger.exception("get_device_status 调用失败")
return JsonResponse({"error": str(e)}, status=500)
def get_device_status_overview(request):
"""获取设备状态概览"""
try:
# 先读缓存 + 带锁回源,避免缓存失效瞬间被大量请求打穿
def loader():
return get_organization_status_overview()
res, _ = get_or_set_json_with_lock(
CacheKey.DEVICES_STATUSES_OVERVIEW.value,
loader=loader,
ex=43200,
)
if res:
return JsonResponse(res, safe=False, json_dumps_params={'indent': 2, 'ensure_ascii': False})
return JsonResponse([], safe=False, json_dumps_params={'indent': 2, 'ensure_ascii': False})
except Exception as e:
logger.exception("get_device_status_overview 调用失败")
return JsonResponse({"error": str(e)}, status=500)
def get_device_alert(request):
"""获取设备告警"""
try:
serial = request.GET.get('serial')
if not serial:
return JsonResponse({"message": "缺少查询参数 serial"}, status=400)
# 先读缓存,防止高并发下的缓存穿透
def loader():
return get_organization_alert()
alert, _ = get_or_set_json_with_lock(
CacheKey.ASSURANCE_ALERTS.value,
loader=loader,
ex=43200,
)
result = []
if alert:
for res in alert:
if res.get('serial') == serial:
result.append(res)
return JsonResponse(result, safe=False, json_dumps_params={'indent': 2, 'ensure_ascii': False})
except Exception as e:
logger.exception("get_device_alert 调用失败")
return JsonResponse({"error": str(e)}, status=500)
"""
WSGI config for meraki_Interface_forward project.
It exposes the WSGI callable as a module-level variable named ``application``.
For more information on this file, see
https://docs.djangoproject.com/en/5.2/howto/deployment/wsgi/
"""
import os
from django.core.wsgi import get_wsgi_application
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'meraki_Interface_forward.settings')
application = get_wsgi_application()
django>=5.0,<6.0
python-dotenv>=1.0.0
meraki>=1.52.0
redis>=5.0.0
django-redis>=5.4.0
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment