Commit 04b295ba by zmops

远景能源采集平台代码

parent d8401abf
# 这是 Meraki API Key 不可为空 # 这是 Meraki API Key 不可为空
#MERAKI_API_KEY=bbaa461f371d9eb323ba478028a9585624f2500c MERAKI_API_KEY=bbaa461f371d9eb323ba478028a9585624f2500c
MERAKI_API_KEY=715183336c6eecfa3924a95e07256c464ac9516d #MERAKI_API_KEY=715183336c6eecfa3924a95e07256c464ac9516d
# 组织 ID 可为空 # 组织 ID 可为空
ORGANIZATION_ID= ORGANIZATION_ID=
# Meraki API 基础 URL 可为空 # Meraki API 基础 URL 可为空
#MERAKI_API_BASE=https://n3.dashboard.meraki.cn/api/v1 MERAKI_API_BASE=https://n3.dashboard.meraki.cn/api/v1
MERAKI_API_BASE=https://n190.dashboard.meraki.com/api/v1 #MERAKI_API_BASE=https://n190.dashboard.meraki.com/api/v1
# 是否开启debug模式 # 是否开启debug模式
DJANGO_DEBUG=True DJANGO_DEBUG=False
#redis 相关配置 #redis 相关配置
...@@ -18,6 +18,11 @@ DJANGO_DEBUG=True ...@@ -18,6 +18,11 @@ DJANGO_DEBUG=True
REDIS_HOST=14.103.242.161 REDIS_HOST=14.103.242.161
#REDIS_HOST=127.0.0.1 #REDIS_HOST=127.0.0.1
REDIS_PORT=6379 REDIS_PORT=6379
REDIS_DB=9 REDIS_DB=8
REDIS_USERNAME= REDIS_USERNAME=
REDIS_PASSWORD=redis@123 REDIS_PASSWORD=redis@123
ZABBIX_API_URL=https://netmonitor.envision-energy.com/api_jsonrpc.php
ZABBIX_USER=weiguan
ZABBIX_PASSWORD=Test@123
\ No newline at end of file
...@@ -6,6 +6,7 @@ import logging ...@@ -6,6 +6,7 @@ import logging
from meraki_Interface_forward.redis_utils import ( from meraki_Interface_forward.redis_utils import (
set_json, set_json,
CacheKey, CacheKey,
get_redis_client,
) )
logger = logging.getLogger("meraki_Interface_forward.services.cache_service") logger = logging.getLogger("meraki_Interface_forward.services.cache_service")
...@@ -19,22 +20,36 @@ CHANNEL_UTILIZATION_CACHE_PREFIX = "channel_utilization:" ...@@ -19,22 +20,36 @@ CHANNEL_UTILIZATION_CACHE_PREFIX = "channel_utilization:"
def cache_devices_by_serial(devices, ttl: int = 60 * 60 * 12) -> None: def cache_devices_by_serial(devices, ttl: int = 60 * 60 * 12) -> None:
""" """
将 Meraki 设备列表按 serial 维度拆分,逐个写入 Redis。 将 Meraki 设备列表按 serial 维度拆分,批量写入 Redis。
- key: device:<serial> 使用 Pipeline 优化,避免 N+1 次网络请求。
- value: 单个设备 JSON
""" """
if not isinstance(devices, list): if not isinstance(devices, list) or not devices:
return return
for dev in devices:
if not isinstance(dev, dict): client = get_redis_client()
continue if not client:
serial = dev.get("serial") return
if not serial:
continue try:
try: pipeline = client.pipeline()
set_json(f"{DEVICE_CACHE_PREFIX}{serial}", dev, ex=ttl) import json
except Exception: for dev in devices:
continue if not isinstance(dev, dict):
continue
serial = dev.get("serial")
if not serial:
continue
# 直接使用 pipeline.set,手动序列化 json
# 注意:set_json 内部是调用的 client.set,这里为了 pipeline 效率直接操作
key = f"{DEVICE_CACHE_PREFIX}{serial}"
val = json.dumps(dev, ensure_ascii=False)
pipeline.set(key, val, ex=ttl)
pipeline.execute()
except Exception as e:
logger.error(f"cache_devices_by_serial pipeline error: {e}")
def cache_uplinks_by_serial(uplinks, ttl: int = 60 * 60 * 6) -> None: def cache_uplinks_by_serial(uplinks, ttl: int = 60 * 60 * 6) -> None:
......
...@@ -4,8 +4,10 @@ Meraki API 服务层:封装所有 Meraki API 调用 ...@@ -4,8 +4,10 @@ Meraki API 服务层:封装所有 Meraki API 调用
import os import os
import meraki import meraki
import logging import logging
import json
from meraki_Interface_forward.redis_utils import CacheKey, set_json from meraki_Interface_forward.redis_utils import CacheKey, set_json, get_json
from meraki_Interface_forward.services.cache_service import cache_devices_by_serial
logger = logging.getLogger("meraki_Interface_forward.services.meraki_service") logger = logging.getLogger("meraki_Interface_forward.services.meraki_service")
...@@ -44,7 +46,7 @@ def get_organization_networks(): ...@@ -44,7 +46,7 @@ def get_organization_networks():
total_pages='all' total_pages='all'
) )
if len(networks): if len(networks):
set_json(CacheKey.NETWORKS.value, networks, ex=60*60*6) set_json(CacheKey.NETWORKS.value, networks)
return networks return networks
except Exception as e: except Exception as e:
msg = str(e) msg = str(e)
...@@ -132,3 +134,98 @@ def get_device_switch_ports_status(serial): ...@@ -132,3 +134,98 @@ def get_device_switch_ports_status(serial):
timespan=3600, timespan=3600,
) )
def get_filtered_devices(filter_networks=None):
"""
获取经过过滤和处理的设备列表
:param filter_networks: list of network names to exclude
:return: list of device dicts
"""
filter_names = set(filter_networks) if isinstance(filter_networks, list) else set()
# 获取网络列表(缓存优先)
networks = get_json(CacheKey.NETWORKS.value) or []
if isinstance(networks, str):
try:
networks = json.loads(networks)
except Exception:
networks = []
if not networks:
networks = get_organization_networks() or []
network_map = {}
if isinstance(networks, list):
network_map = {net.get("id"): net.get("name") for net in networks if isinstance(net, dict)}
# 网络名 -> id 映射,用于过滤
exclude_network_ids = set()
if filter_names and isinstance(networks, list):
for net in networks:
if isinstance(net, dict) and net.get("name") in filter_names:
nid = net.get("id")
if nid:
exclude_network_ids.add(nid)
# 获取设备(缓存优先,未命中则回源)
devices = get_json(CacheKey.DEVICES.value)
if not devices:
logger.info("get_filtered_devices: Redis miss for ALL devices. Fetching from Meraki API...")
devices = get_organization_devices()
if devices:
set_json(CacheKey.DEVICES.value, devices, ex=60 * 60)
logger.info(f"get_filtered_devices: Fetched {len(devices)} devices. Caching by serial...")
cache_devices_by_serial(devices, ttl=60 * 60)
logger.info("get_filtered_devices: Caching complete.")
else:
logger.info(f"get_filtered_devices: Redis hit. {len(devices)} devices found.")
if not devices:
return []
# 过滤:networkId 命中需要排除的网络则丢弃
if exclude_network_ids:
filtered = []
for dev in devices:
if not isinstance(dev, dict):
continue
nid = dev.get("networkId")
if nid in exclude_network_ids:
continue
filtered.append(dev)
devices = filtered
if devices:
processed = []
for dev in devices:
if not isinstance(dev, dict):
continue
name = dev.get("name")
serial = dev.get("serial")
mac = dev.get("mac")
if not name or (isinstance(name, str) and not name.strip()):
dev["name"] = serial or mac or "UNKNOWN"
# 追加 networkName 字段
net_id = dev.get("networkId")
if network_map and net_id:
dev["networkName"] = network_map.get(net_id)
processed.append(dev)
# 重名处理
name_counts = {}
for dev in processed:
n = dev.get("name")
if not isinstance(n, str):
n = str(n) if n is not None else ""
name_counts[n] = name_counts.get(n, 0) + 1
for dev in processed:
n = dev.get("name")
if name_counts.get(n, 0) > 1:
s = dev.get("serial") or ""
dev["name"] = f"{n}{s}"
devices = processed
return devices
...@@ -24,6 +24,12 @@ BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) ...@@ -24,6 +24,12 @@ BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
load_dotenv(os.path.join(BASE_DIR, '.env')) load_dotenv(os.path.join(BASE_DIR, '.env'))
# Zabbix Configuration
ZABBIX_API_URL = os.getenv("ZABBIX_API_URL", "")
ZABBIX_USER = os.getenv("ZABBIX_USER", "")
ZABBIX_PASSWORD = os.getenv("ZABBIX_PASSWORD", "")
REDIS_URL = os.getenv("REDIS_URL") REDIS_URL = os.getenv("REDIS_URL")
REDIS_HOST = os.getenv("REDIS_HOST") REDIS_HOST = os.getenv("REDIS_HOST")
REDIS_PORT = os.getenv("REDIS_PORT") REDIS_PORT = os.getenv("REDIS_PORT")
...@@ -153,7 +159,7 @@ try: ...@@ -153,7 +159,7 @@ try:
if networks: if networks:
# 缓存进入redis # 缓存进入redis
res = json.dumps(networks) res = json.dumps(networks)
set(CacheKey.NETWORKS.value, res, ex=60*60*6) set(CacheKey.NETWORKS.value, res)
except Exception as e: except Exception as e:
print(f"缓存 networks 失败: {e}") print(f"缓存 networks 失败: {e}")
...@@ -315,6 +321,20 @@ class CustomTimedRotatingFileHandler(logging.handlers.TimedRotatingFileHandler): ...@@ -315,6 +321,20 @@ class CustomTimedRotatingFileHandler(logging.handlers.TimedRotatingFileHandler):
new_name = f"meraki.{date_str}.log" new_name = f"meraki.{date_str}.log"
return os.path.join(base_dir, new_name) return os.path.join(base_dir, new_name)
return name return name
def getFilesToDelete(self):
import re
base_dir = os.path.dirname(self.baseFilename)
base_name = os.path.splitext(os.path.basename(self.baseFilename))[0]
pattern = re.compile(rf"{re.escape(base_name)}\.(\d{{8}})\.log$")
dated = []
for fn in os.listdir(base_dir):
m = pattern.match(fn)
if m:
dated.append((m.group(1), os.path.join(base_dir, fn)))
dated.sort(key=lambda x: x[0])
if len(dated) <= self.backupCount:
return []
return [p for _, p in dated[:-self.backupCount]]
LOGGING = { LOGGING = {
...@@ -339,7 +359,7 @@ LOGGING = { ...@@ -339,7 +359,7 @@ LOGGING = {
"()": CustomTimedRotatingFileHandler, "()": CustomTimedRotatingFileHandler,
"filename": LOG_FILE_PATH, "filename": LOG_FILE_PATH,
"when": "midnight", "when": "midnight",
"backupCount": 7, "backupCount": 6,
"encoding": "utf-8", "encoding": "utf-8",
}, },
}, },
......
...@@ -4,8 +4,6 @@ ...@@ -4,8 +4,6 @@
import threading import threading
import time import time
import logging import logging
from datetime import datetime
from zoneinfo import ZoneInfo
from django.http import JsonResponse from django.http import JsonResponse
from meraki_Interface_forward.resultAnalysis import logger from meraki_Interface_forward.resultAnalysis import logger
...@@ -48,7 +46,7 @@ def _execute_synchronization(): ...@@ -48,7 +46,7 @@ def _execute_synchronization():
try: try:
networks = get_organization_networks() networks = get_organization_networks()
if networks: if networks:
set_json(CacheKey.NETWORKS.value, networks, ex=60 * 60 * 6) set_json(CacheKey.NETWORKS.value, networks)
logger.info("网络数据同步完成") logger.info("网络数据同步完成")
except Exception as e: except Exception as e:
logger.error(f"Failed to cache networks: {e}") logger.error(f"Failed to cache networks: {e}")
...@@ -75,7 +73,7 @@ def _execute_synchronization(): ...@@ -75,7 +73,7 @@ def _execute_synchronization():
try: try:
alerts = get_organization_alert() alerts = get_organization_alert()
if alerts: if alerts:
set_json(CacheKey.ASSURANCE_ALERTS.value, alerts, ex=60) set_json(CacheKey.ASSURANCE_ALERTS.value, alerts, ex= 60 * 5 )
logger.info("告警数据同步完成") logger.info("告警数据同步完成")
except Exception as e: except Exception as e:
logger.error(f"Failed to cache alert: {e}") logger.error(f"Failed to cache alert: {e}")
...@@ -84,18 +82,17 @@ def _execute_synchronization(): ...@@ -84,18 +82,17 @@ def _execute_synchronization():
try: try:
devices = get_organization_devices() devices = get_organization_devices()
if devices: if devices:
set_json(CacheKey.DEVICES.value, devices, ex=60 * 60 * 12) set_json(CacheKey.DEVICES.value, devices, ex=60 * 60)
# 同步按 serial 建立单设备缓存,便于高并发单设备查询 # 同步按 serial 建立单设备缓存,便于高并发单设备查询
cache_devices_by_serial(devices, ttl=60 * 60 * 12) cache_devices_by_serial(devices, ttl=60 * 60)
logger.info(f"设备列表同步完成,共 {len(devices) if devices else 0} 台设备") logger.info(f"设备列表同步完成,共 {len(devices) if devices else 0} 台设备")
except Exception as e: except Exception as e:
logger.error(f"Failed to cache devices: {e}") logger.error(f"Failed to cache devices: {e}")
# 任务完成 # 任务完成
elapsed_time = time.time() - start_time elapsed_time = time.time() - start_time
finish_ts = time.time()
set_json(TASK_STATUS_KEY, TASK_STATUS_COMPLETED, ex=3600) set_json(TASK_STATUS_KEY, TASK_STATUS_COMPLETED, ex=3600)
set_json(TASK_FINISH_TIME_KEY, finish_ts, ex=3600) set_json(TASK_FINISH_TIME_KEY, time.time(), ex=3600)
logger.info(f"数据同步任务完成,耗时 {elapsed_time:.2f} 秒") logger.info(f"数据同步任务完成,耗时 {elapsed_time:.2f} 秒")
except Exception as e: except Exception as e:
...@@ -104,17 +101,6 @@ def _execute_synchronization(): ...@@ -104,17 +101,6 @@ def _execute_synchronization():
set_json(TASK_FINISH_TIME_KEY, time.time(), ex=3600) set_json(TASK_FINISH_TIME_KEY, time.time(), ex=3600)
def _format_ts_shanghai(ts: float | None) -> str | None:
"""将时间戳格式化为上海时间字符串"""
if ts is None:
return None
try:
dt = datetime.fromtimestamp(float(ts), tz=ZoneInfo("Asia/Shanghai"))
return dt.strftime("%Y-%m-%d %H:%M:%S")
except Exception:
return None
def synchronization_data(request): def synchronization_data(request):
""" """
数据同步接口(异步执行) 数据同步接口(异步执行)
...@@ -132,7 +118,6 @@ def synchronization_data(request): ...@@ -132,7 +118,6 @@ def synchronization_data(request):
current_status = get_json(TASK_STATUS_KEY) or TASK_STATUS_IDLE current_status = get_json(TASK_STATUS_KEY) or TASK_STATUS_IDLE
start_time = get_json(TASK_START_TIME_KEY) start_time = get_json(TASK_START_TIME_KEY)
finish_time = get_json(TASK_FINISH_TIME_KEY) finish_time = get_json(TASK_FINISH_TIME_KEY)
finish_time_str = _format_ts_shanghai(finish_time)
# 如果任务正在执行中,返回当前状态 # 如果任务正在执行中,返回当前状态
if current_status == TASK_STATUS_RUNNING: if current_status == TASK_STATUS_RUNNING:
...@@ -146,7 +131,6 @@ def synchronization_data(request): ...@@ -146,7 +131,6 @@ def synchronization_data(request):
"elapsed_time": elapsed_time, "elapsed_time": elapsed_time,
"message": "数据同步任务正在执行中,请稍后查询", "message": "数据同步任务正在执行中,请稍后查询",
"last_finished_at": finish_time, "last_finished_at": finish_time,
"last_finished_at_shanghai": finish_time_str,
}, },
safe=False, safe=False,
json_dumps_params={'indent': 2, 'ensure_ascii': False}, json_dumps_params={'indent': 2, 'ensure_ascii': False},
...@@ -164,7 +148,6 @@ def synchronization_data(request): ...@@ -164,7 +148,6 @@ def synchronization_data(request):
"elapsed_time": elapsed_time, "elapsed_time": elapsed_time,
"message": f"当前任务状态: {current_status}", "message": f"当前任务状态: {current_status}",
"last_finished_at": finish_time, "last_finished_at": finish_time,
"last_finished_at_shanghai": finish_time_str,
}, },
safe=False, safe=False,
json_dumps_params={'indent': 2, 'ensure_ascii': False}, json_dumps_params={'indent': 2, 'ensure_ascii': False},
...@@ -181,7 +164,6 @@ def synchronization_data(request): ...@@ -181,7 +164,6 @@ def synchronization_data(request):
"elapsed_time": 0, "elapsed_time": 0,
"message": "数据同步任务已创建并在后台执行", "message": "数据同步任务已创建并在后台执行",
"last_finished_at": finish_time, "last_finished_at": finish_time,
"last_finished_at_shanghai": finish_time_str,
}, },
safe=False, safe=False,
json_dumps_params={'indent': 2, 'ensure_ascii': False}, json_dumps_params={'indent': 2, 'ensure_ascii': False},
...@@ -198,7 +180,6 @@ def synchronization_data(request): ...@@ -198,7 +180,6 @@ def synchronization_data(request):
"elapsed_time": elapsed_time, "elapsed_time": elapsed_time,
"message": f"当前任务状态: {current_status}", "message": f"当前任务状态: {current_status}",
"last_finished_at": finish_time, "last_finished_at": finish_time,
"last_finished_at_shanghai": finish_time_str,
}, },
safe=False, safe=False,
json_dumps_params={'indent': 2, 'ensure_ascii': False}, json_dumps_params={'indent': 2, 'ensure_ascii': False},
......
import json
from django.test import SimpleTestCase, RequestFactory
from meraki_Interface_forward.redis_utils import CacheKey
import meraki_Interface_forward.views.device_views as views
class DiscoveryFilterNameFixTests(SimpleTestCase):
def setUp(self):
self.factory = RequestFactory()
self._orig_get_json = views.get_json
self._orig_get_org_devices = views.get_organization_devices
def tearDown(self):
views.get_json = self._orig_get_json
views.get_organization_devices = self._orig_get_org_devices
def _stub_get_json(self, key):
if key == CacheKey.NETWORKS.value:
return None
if key == CacheKey.DEVICES.value:
return [
{"name": "", "serial": "Q3AL-2A5G-3AF5", "mac": "aa:bb:cc:dd:ee:ff", "networkId": "N1", "productType": "switch"},
{"name": "AS01", "serial": "Q4AV-TBV5-C2XL", "mac": "14:9f:43:47:f2:38", "networkId": "N2", "productType": "switch"},
{"name": "AS01", "serial": "Q5BX-XXXX-YYYY", "mac": "14:9f:43:47:f2:39", "networkId": "N3", "productType": "switch"},
]
return None
def test_empty_name_fixed_and_duplicates_deduped(self):
views.get_json = self._stub_get_json
views.get_organization_devices = lambda: []
req = self.factory.post(
"/discovery/host_prototype/filter",
data=json.dumps({"filter_networks": []}),
content_type="application/json",
)
res = views.discovery_host_prototype_filter(req)
data = json.loads(res.content.decode("utf-8"))
names = {d["serial"]: d.get("name") for d in data}
assert names["Q3AL-2A5G-3AF5"] == "Q3AL-2A5G-3AF5"
assert names["Q4AV-TBV5-C2XL"] == "AS01Q4AV-TBV5-C2XL"
assert names["Q5BX-XXXX-YYYY"] == "AS01Q5BX-XXXX-YYYY"
...@@ -18,12 +18,14 @@ from django.contrib import admin ...@@ -18,12 +18,14 @@ from django.contrib import admin
from django.urls import path from django.urls import path
from . import task from . import task
# 直接从子模块导入,避免 views.py 和 views/ 目录的命名冲突 # 直接从子模块导入,避免 views.py 和 views/ 目录的命名冲突
from .views import device_views, status_views, cache_views from .views import device_views, status_views, cache_views, zabbix_views, juniper_views
urlpatterns = [ urlpatterns = [
# path('admin/', admin.site.urls), # path('admin/', admin.site.urls),
path('discovery/host_prototype', device_views.discovery_host_prototype), path('discovery/host_prototype', device_views.discovery_host_prototype),
path('discovery/host_prototype/filter', device_views.discovery_host_prototype_filter), path('discovery/host_prototype/filter', device_views.discovery_host_prototype_filter),
path('zabbix/sync_hosts', zabbix_views.sync_meraki_to_zabbix),
path('zabbix/sync_juniper_hosts', juniper_views.sync_juniper_to_zabbix),
path('all_host', device_views.get_device_by_serial_or_product_type), path('all_host', device_views.get_device_by_serial_or_product_type),
path('device/status', status_views.get_device_status), path('device/status', status_views.get_device_status),
path('device/status/overview', status_views.get_device_status_overview), path('device/status/overview', status_views.get_device_status_overview),
......
...@@ -4,40 +4,44 @@ from typing import Dict ...@@ -4,40 +4,44 @@ from typing import Dict
from meraki_Interface_forward.redis_utils import get_redis_client from meraki_Interface_forward.redis_utils import get_redis_client
def collect_prefix_ttl_stats(pattern: str, sample_size: int = 10, max_scan_keys: int = 200) -> Dict[str, object]: def collect_prefix_ttl_stats(pattern: str, sample_size: int = 10, max_scan_keys: int = 50, quick_check: bool = True) -> Dict[str, object]:
""" """
收集指定前缀的 key 数量与 TTL 统计。 收集指定前缀的 key 数量与 TTL 统计(极速优化版)。
- 精确 count:全量 scan 计数(仅保存前 max_scan_keys 个用于采样) - 空前缀:只扫描 10 个 key,如果没找到立即返回(< 10ms)
- TTL 采样:最多 sample_size 个,使用 pipeline 批量查询 - 有数据前缀:限制扫描范围,不进行全量计数,只统计已扫描的 key
- 批量查询:使用 pipeline 批量获取 TTL
""" """
client = get_redis_client() client = get_redis_client()
if not client: if not client:
return {"count": 0, "sampled": 0, "minTtl": None, "avgTtl": None} return {"count": 0, "sampled": 0, "minTtl": None, "avgTtl": None}
keys = [] keys = []
count = 0
cursor = 0
try: try:
while True: # 极速检查:空前缀只扫描 10 个 key,如果没找到立即返回
cursor, batch = client.scan(cursor=cursor, match=pattern, count=500) cursor, batch = client.scan(cursor=0, match=pattern, count=10 if quick_check else 50)
batch = batch or [] batch = batch or []
count += len(batch) if not batch:
# 仅保留前 max_scan_keys 个用于采样 # 空前缀:立即返回,不继续扫描
for k in batch: return {"count": 0, "sampled": 0, "minTtl": None, "avgTtl": None}
if len(keys) < max_scan_keys:
keys.append(k) keys.extend(batch)
if cursor == 0:
break # 如果找到了数据,继续扫描但严格限制总量(不再进行全量计数)
if cursor != 0 and len(keys) < max_scan_keys:
# 只再扫描一次,获取更多 key 用于采样
cursor, batch = client.scan(cursor=cursor, match=pattern, count=max_scan_keys - len(keys))
if batch:
keys.extend(batch[:max_scan_keys - len(keys)])
except Exception: except Exception:
return {"count": 0, "sampled": 0, "minTtl": None, "avgTtl": None} return {"count": 0, "sampled": 0, "minTtl": None, "avgTtl": None}
if count == 0 or not keys: if not keys:
return {"count": count, "sampled": 0, "minTtl": None, "avgTtl": None} return {"count": 0, "sampled": 0, "minTtl": None, "avgTtl": None}
# 随机采样(进一步减少 TTL 查询) # 限制采样数量(进一步减少 TTL 查询)
actual_sample_size = min(len(keys), sample_size) actual_sample_size = min(len(keys), sample_size)
sampled_keys = random.sample(keys, actual_sample_size) sampled_keys = random.sample(keys, actual_sample_size)
# 使用 pipeline 批量查询 TTL(一次性查询所有采样 key) # 使用 pipeline 批量查询 TTL(一次性查询所有采样 key)
ttls = [] ttls = []
try: try:
...@@ -58,8 +62,10 @@ def collect_prefix_ttl_stats(pattern: str, sample_size: int = 10, max_scan_keys: ...@@ -58,8 +62,10 @@ def collect_prefix_ttl_stats(pattern: str, sample_size: int = 10, max_scan_keys:
except Exception: except Exception:
continue continue
# 对于 device 这种可能有大量 key 的情况,count 只返回实际扫描到的数量
# 不再进行全量计数,避免性能问题
return { return {
"count": count, "count": len(keys), # 只返回实际扫描到的数量,不再全量计数
"sampled": len(sampled_keys), "sampled": len(sampled_keys),
"minTtl": min(ttls) if ttls else None, "minTtl": min(ttls) if ttls else None,
"avgTtl": round(sum(ttls) / len(ttls), 2) if ttls else None, "avgTtl": round(sum(ttls) / len(ttls), 2) if ttls else None,
...@@ -68,16 +74,18 @@ def collect_prefix_ttl_stats(pattern: str, sample_size: int = 10, max_scan_keys: ...@@ -68,16 +74,18 @@ def collect_prefix_ttl_stats(pattern: str, sample_size: int = 10, max_scan_keys:
def build_prefix_stats(prefix_patterns: Dict[str, str], sample_size: int = 10) -> Dict[str, dict]: def build_prefix_stats(prefix_patterns: Dict[str, str], sample_size: int = 10) -> Dict[str, dict]:
""" """
生成按前缀的 TTL 统计结果。 生成按前缀的 TTL 统计结果(极速优化版)
- 精确 count(全量 scan - 空前缀:极速检查(只扫描 10 个 key
- device 前缀:采样 5,保留 200 个键用于采样 - device 前缀:限制扫描 50 个 key,采样 5 个
- 其他前缀:采样 5,保留 100 个键用于采样 - 其他前缀:限制扫描 30 个 key,采样 5 个
""" """
result = {} result = {}
for name, pattern in prefix_patterns.items(): for name, pattern in prefix_patterns.items():
if name == "device": if name == "device":
result[name] = collect_prefix_ttl_stats(pattern, sample_size=5, max_scan_keys=200) # device 前缀:扫描 50 个 key,采样 5 个
result[name] = collect_prefix_ttl_stats(pattern, sample_size=5, max_scan_keys=50, quick_check=True)
else: else:
result[name] = collect_prefix_ttl_stats(pattern, sample_size=5, max_scan_keys=100) # 其他前缀:极速检查,扫描 30 个 key,采样 5 个
result[name] = collect_prefix_ttl_stats(pattern, sample_size=5, max_scan_keys=30, quick_check=True)
return result return result
...@@ -17,6 +17,7 @@ from meraki_Interface_forward.services.meraki_service import ( ...@@ -17,6 +17,7 @@ from meraki_Interface_forward.services.meraki_service import (
get_organization_uplinks, get_organization_uplinks,
get_organization_channel_utilization, get_organization_channel_utilization,
get_device_switch_ports_status as fetch_device_switch_ports_status, get_device_switch_ports_status as fetch_device_switch_ports_status,
get_filtered_devices,
) )
from meraki_Interface_forward.services.cache_service import ( from meraki_Interface_forward.services.cache_service import (
DEVICE_CACHE_PREFIX, DEVICE_CACHE_PREFIX,
...@@ -42,8 +43,8 @@ def discovery_host_prototype(request): ...@@ -42,8 +43,8 @@ def discovery_host_prototype(request):
# 缓存未命中,回退 Meraki API,并顺便按 serial 建立索引 # 缓存未命中,回退 Meraki API,并顺便按 serial 建立索引
devices = get_organization_devices() devices = get_organization_devices()
if devices: if devices:
set_json(CacheKey.DEVICES.value, devices, ex=43200) set_json(CacheKey.DEVICES.value, devices, ex=60 * 60)
cache_devices_by_serial(devices, ttl=43200) cache_devices_by_serial(devices, ttl=60 * 60)
return JsonResponse(devices, safe=False, json_dumps_params={'indent': 2, 'ensure_ascii': False}) return JsonResponse(devices, safe=False, json_dumps_params={'indent': 2, 'ensure_ascii': False})
return JsonResponse([], safe=False, json_dumps_params={'indent': 2, 'ensure_ascii': False}) return JsonResponse([], safe=False, json_dumps_params={'indent': 2, 'ensure_ascii': False})
...@@ -69,47 +70,8 @@ def discovery_host_prototype_filter(request): ...@@ -69,47 +70,8 @@ def discovery_host_prototype_filter(request):
if request.body: if request.body:
body = json.loads(request.body.decode("utf-8")) body = json.loads(request.body.decode("utf-8"))
filter_networks = body.get("filter_networks") or [] filter_networks = body.get("filter_networks") or []
filter_names = set(filter_networks) if isinstance(filter_networks, list) else set()
devices = get_filtered_devices(filter_networks)
# 获取网络列表(缓存优先)
networks = get_json(CacheKey.NETWORKS.value) or []
if isinstance(networks, str):
try:
networks = json.loads(networks)
except Exception:
networks = []
# 网络名 -> id 映射,用于过滤
exclude_network_ids = set()
if filter_names and isinstance(networks, list):
for net in networks:
if isinstance(net, dict) and net.get("name") in filter_names:
nid = net.get("id")
if nid:
exclude_network_ids.add(nid)
# 获取设备(缓存优先,未命中则回源)
devices = get_json(CacheKey.DEVICES.value)
if not devices:
devices = get_organization_devices()
if devices:
set_json(CacheKey.DEVICES.value, devices, ex=43200)
cache_devices_by_serial(devices, ttl=43200)
if not devices:
return JsonResponse([], safe=False, json_dumps_params={'indent': 2, 'ensure_ascii': False})
# 过滤:networkId 命中需要排除的网络则丢弃
if exclude_network_ids:
filtered = []
for dev in devices:
if not isinstance(dev, dict):
continue
nid = dev.get("networkId")
if nid in exclude_network_ids:
continue
filtered.append(dev)
devices = filtered
return JsonResponse(devices, safe=False, json_dumps_params={'indent': 2, 'ensure_ascii': False}) return JsonResponse(devices, safe=False, json_dumps_params={'indent': 2, 'ensure_ascii': False})
...@@ -171,7 +133,7 @@ def get_device_by_serial_or_product_type(request): ...@@ -171,7 +133,7 @@ def get_device_by_serial_or_product_type(request):
and dev.get("productType") == product_type and dev.get("productType") == product_type
): ):
# 命中后,顺便写回单设备缓存 # 命中后,顺便写回单设备缓存
cache_devices_by_serial([dev], ttl=43200) cache_devices_by_serial([dev], ttl=60 * 60)
net_id = dev.get("networkId") net_id = dev.get("networkId")
if network_map and net_id: if network_map and net_id:
dev["networkName"] = network_map.get(net_id) dev["networkName"] = network_map.get(net_id)
...@@ -187,14 +149,14 @@ def get_device_by_serial_or_product_type(request): ...@@ -187,14 +149,14 @@ def get_device_by_serial_or_product_type(request):
logger.info("单设备缓存未命中,准备触发全量设备加载: serial=%s", serial) logger.info("单设备缓存未命中,准备触发全量设备加载: serial=%s", serial)
devs = get_organization_devices() devs = get_organization_devices()
if devs: if devs:
set_json(CacheKey.DEVICES.value, devs, ex=43200) set_json(CacheKey.DEVICES.value, devs, ex=60 * 60)
cache_devices_by_serial(devs, ttl=43200) cache_devices_by_serial(devs, ttl=60 * 60)
return devs return devs
_, _ = get_or_set_json_with_lock( _, _ = get_or_set_json_with_lock(
CacheKey.DEVICES.value, CacheKey.DEVICES.value,
loader=loader, loader=loader,
ex=43200, ex=60 * 60,
lock_ttl=30, lock_ttl=30,
wait_timeout=5.0, wait_timeout=5.0,
wait_interval=0.2, wait_interval=0.2,
...@@ -249,13 +211,13 @@ def get_device_uplink(request): ...@@ -249,13 +211,13 @@ def get_device_uplink(request):
def loader(): def loader():
data = get_organization_uplinks() data = get_organization_uplinks()
if data: if data:
cache_uplinks_by_serial(data, ttl=43200) cache_uplinks_by_serial(data, ttl=60 * 60)
return data return data
res, _ = get_or_set_json_with_lock( res, _ = get_or_set_json_with_lock(
CacheKey.DEVICES_UPLINKS_BY_DEVICE.value, CacheKey.DEVICES_UPLINKS_BY_DEVICE.value,
loader=loader, loader=loader,
ex=43200, ex=60 * 60,
) )
if not res: if not res:
...@@ -272,7 +234,7 @@ def get_device_uplink(request): ...@@ -272,7 +234,7 @@ def get_device_uplink(request):
and device.get('serial') == serial and device.get('serial') == serial
and device.get('productType') == product_type and device.get('productType') == product_type
): ):
cache_uplinks_by_serial([device], ttl=43200) cache_uplinks_by_serial([device], ttl=60 * 60)
return JsonResponse( return JsonResponse(
device, device,
safe=False, safe=False,
...@@ -334,14 +296,13 @@ def get_device_ap_channelUtilization(request): ...@@ -334,14 +296,13 @@ def get_device_ap_channelUtilization(request):
res, _ = get_or_set_json_with_lock( res, _ = get_or_set_json_with_lock(
CacheKey.WIRELESS_CHANNEL_UTILIZATION_BY_DEVICE.value, CacheKey.WIRELESS_CHANNEL_UTILIZATION_BY_DEVICE.value,
loader=loader, loader=loader,
ex=43200, ex=60 * 5,
) )
if not res: if not res:
# 冷启动/源数据为空,返回 503 提示稍后重试
return JsonResponse( return JsonResponse(
{"error": "信道利用率数据尚未准备好,请稍后重试"}, {"error": "无可用的信道利用率数据"},
status=503, status=404,
json_dumps_params={'indent': 2, 'ensure_ascii': False}, json_dumps_params={'indent': 2, 'ensure_ascii': False},
) )
...@@ -356,8 +317,8 @@ def get_device_ap_channelUtilization(request): ...@@ -356,8 +317,8 @@ def get_device_ap_channelUtilization(request):
# 5. 依然未找到设备信道利用率数据 # 5. 依然未找到设备信道利用率数据
return JsonResponse( return JsonResponse(
{"error": "未找到指定设备的信道利用率数据", "serial": serial}, { "error": "未找到指定设备的信道利用率数据", "serial": serial},
status=404, status = 404,
json_dumps_params={'indent': 2, 'ensure_ascii': False}, json_dumps_params={'indent': 2, 'ensure_ascii': False},
) )
......
...@@ -80,7 +80,7 @@ def get_device_status(request): ...@@ -80,7 +80,7 @@ def get_device_status(request):
res, _ = get_or_set_json_with_lock( res, _ = get_or_set_json_with_lock(
CacheKey.DEVICES_AVAILABILITIES.value, CacheKey.DEVICES_AVAILABILITIES.value,
loader=loader, loader=loader,
ex=43200, ex=65,
) )
if not res: if not res:
...@@ -101,8 +101,8 @@ def get_device_status(request): ...@@ -101,8 +101,8 @@ def get_device_status(request):
# 5. 依然未找到设备状态 # 5. 依然未找到设备状态
return JsonResponse( return JsonResponse(
{"error": "未找到指定设备的状态信息", "serial": serial, "productType": product_type}, { "error": "未找到指定设备的状态信息", "serial": serial, "productType": product_type},
status=404, status = 404,
json_dumps_params={'indent': 2, 'ensure_ascii': False}, json_dumps_params={'indent': 2, 'ensure_ascii': False},
) )
except Exception as e: except Exception as e:
...@@ -120,7 +120,7 @@ def get_device_status_overview(request): ...@@ -120,7 +120,7 @@ def get_device_status_overview(request):
res, _ = get_or_set_json_with_lock( res, _ = get_or_set_json_with_lock(
CacheKey.DEVICES_STATUSES_OVERVIEW.value, CacheKey.DEVICES_STATUSES_OVERVIEW.value,
loader=loader, loader=loader,
ex=43200, ex=60,
) )
if res: if res:
return JsonResponse(res, safe=False, json_dumps_params={'indent': 2, 'ensure_ascii': False}) return JsonResponse(res, safe=False, json_dumps_params={'indent': 2, 'ensure_ascii': False})
...@@ -144,7 +144,7 @@ def get_device_alert(request): ...@@ -144,7 +144,7 @@ def get_device_alert(request):
alert, _ = get_or_set_json_with_lock( alert, _ = get_or_set_json_with_lock(
CacheKey.ASSURANCE_ALERTS.value, CacheKey.ASSURANCE_ALERTS.value,
loader=loader, loader=loader,
ex=43200, ex=60,
) )
result = [] result = []
if alert: if alert:
......
import os
import sys
from datetime import date, timedelta
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if PROJECT_ROOT not in sys.path:
sys.path.insert(0, PROJECT_ROOT)
from meraki_Interface_forward.settings import CustomTimedRotatingFileHandler
ROOT = os.path.dirname(os.path.abspath(__file__))
LOG_DIR = os.path.join(ROOT, "logs_verify")
os.makedirs(LOG_DIR, exist_ok=True)
BASE_FILE = os.path.join(LOG_DIR, "meraki.log")
with open(BASE_FILE, "w", encoding="utf-8") as f:
f.write("")
for i in range(10):
d = date.today() - timedelta(days=i + 1)
fn = os.path.join(LOG_DIR, f"meraki.{d.strftime('%Y%m%d')}.log")
with open(fn, "w", encoding="utf-8") as f:
f.write(f"{d.isoformat()}\n")
handler = CustomTimedRotatingFileHandler(BASE_FILE, when="midnight", backupCount=6)
to_delete = handler.getFilesToDelete()
for p in to_delete:
try:
os.remove(p)
except OSError:
pass
remaining = []
for f in os.listdir(LOG_DIR):
if f.startswith("meraki.") and f.endswith(".log") and f != "meraki.log":
remaining.append(f)
print(f"remaining_history={len(remaining)}")
print("\n".join(sorted(remaining)))
if len(remaining) == 6:
print("OK")
exit(0)
else:
print("FAILED")
exit(1)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment