Commit d5bcc358 by zmops

远景能源采集平台代码

parent 04b295ba
import json
import logging
import requests
from meraki_Interface_forward.redis_utils import get_json, set_json
logger = logging.getLogger("meraki_Interface_forward.services.juniper_service")
# Cache keys
JUNIPER_DEVICES_CACHE_KEY = "juniper:devices"
JUNIPER_SITES_CACHE_KEY = "juniper:sites"
def fetch_juniper_devices(api_url, org_id, api_token):
"""
Fetch devices from Juniper API and enrich with site names.
Returns a list of device dicts.
"""
if not api_url or not org_id or not api_token:
logger.error("Missing required Juniper API parameters")
return []
# Try cache first (optional, but good for performance)
# Since parameters might change, we might want to include org_id in cache key if multi-tenant support is needed in future.
# For now, using a simple key as per request pattern.
# To ensure freshness, we can skip cache or set short TTL.
# Given the requirement "like Meraki logic", let's fetch fresh if not specified otherwise,
# but we can implement basic request logic here.
headers = {
"Content-Type": "application/json",
"Authorization": f"Token {api_token}"
}
try:
# 1. Fetch Sites
sites_url = f"{api_url}/api/v1/orgs/{org_id}/sites"
logger.info(f"Fetching Juniper sites from {sites_url}")
resp_sites = requests.get(sites_url, headers=headers, timeout=30)
resp_sites.raise_for_status()
sites = resp_sites.json()
site_map = {s["id"]: s["name"] for s in sites if "id" in s and "name" in s}
logger.info(f"Fetched {len(sites)} sites.")
# 2. Fetch Devices (Pagination handling is simplified to limit=1000 for now,
# or we can implement pagination if needed. User script showed limit/page logic)
# We'll fetch a large chunk.
limit = 3000
devices_url = f"{api_url}/api/v1/orgs/{org_id}/inventory?limit={limit}"
logger.info(f"Fetching Juniper devices from {devices_url}")
resp_devices = requests.get(devices_url, headers=headers, timeout=60)
resp_devices.raise_for_status()
devices = resp_devices.json()
logger.info(f"Fetched {len(devices)} devices.")
processed_devices = []
for dev in devices:
# Filter logic: only switch and ap (wireless)
# User said "wireless device type is ap"
dev_type = dev.get("type")
if dev_type not in ["switch", "ap"]:
continue
# Enrich with site name
site_id = dev.get("site_id")
dev["site_name"] = site_map.get(site_id, "")
processed_devices.append(dev)
logger.info(f"Processed {len(processed_devices)} valid Juniper devices (switch/ap).")
return processed_devices
except Exception as e:
logger.error(f"Error fetching Juniper data: {e}")
return []
import logging
import requests
import json
import threading
from django.conf import settings
logger = logging.getLogger("meraki_Interface_forward.services.zabbix_service")
class ZabbixService:
def __init__(self):
self.url = settings.ZABBIX_API_URL
self.user = settings.ZABBIX_USER
self.password = settings.ZABBIX_PASSWORD
self.auth_token = None
self.request_id = 1
self._lock = threading.RLock()
self.session = requests.Session()
adapter = requests.adapters.HTTPAdapter(pool_connections=20, pool_maxsize=20)
self.session.mount('http://', adapter)
self.session.mount('https://', adapter)
def _request(self, method, params=None):
if not self.url:
raise ValueError("Zabbix URL not configured")
with self._lock:
auth = self.auth_token
current_id = self.request_id
self.request_id += 1
# user.login 不需要 auth 字段
if method == "user.login":
auth = None
payload = {
"jsonrpc": "2.0",
"method": method,
"params": params or {},
"id": current_id,
"auth": auth
}
headers = {'Content-Type': 'application/json-rpc'}
try:
response = self.session.post(self.url, json=payload, headers=headers, timeout=30)
response.raise_for_status()
result = response.json()
if 'error' in result:
raise Exception(f"Zabbix API Error: {result['error']}")
return result.get('result')
except Exception as e:
logger.error(f"Zabbix request failed: {method}, error: {e}")
raise
def login(self):
if self.auth_token:
return
with self._lock:
if self.auth_token:
return
try:
# Zabbix API 6.0+ uses 'user' (not 'username') but some older libs/docs might confuse this.
# The error "Invalid parameter "/": unexpected parameter "user"." suggests
# the server might be expecting 'username' (older versions < 5.4) OR there is a strict validation issue.
# However, standard 6.0/7.0 is 'user'.
# If the error persists with 'user', try 'username' as fallback or check API version.
# Let's try to detect if we should use 'username' based on error, or just try 'username' if 'user' fails.
try:
self.auth_token = self._request("user.login", {
"user": self.user,
"password": self.password
})
except Exception as e:
# If "unexpected parameter "user"" occurs, retry with "username"
if "unexpected parameter" in str(e) and "user" in str(e):
logger.warning("Zabbix login with 'user' failed, retrying with 'username'...")
self.auth_token = self._request("user.login", {
"username": self.user,
"password": self.password
})
else:
raise e
logger.info("Zabbix login successful")
except Exception as e:
logger.error(f"Zabbix login failed: {e}")
raise
def get_hostgroups(self):
"""
Get all host groups
Return: {group_name: group_id}
"""
self.login()
groups = self._request("hostgroup.get", {
"output": ["groupid", "name"],
})
group_map = {}
if isinstance(groups, list):
for group in groups:
group_map[group['name']] = group['groupid']
return group_map
def get_template_id_by_name(self, template_name):
"""
Query template ID by name
"""
self.login()
templates = self._request("template.get", {
"output": ["templateid", "name"],
"filter": {
"host": [template_name] # In Zabbix API, template name is also queried via 'host' field or 'name' depending on version/context, usually 'host' works for technical name
}
})
if templates and isinstance(templates, list):
return templates[0].get("templateid")
return None
def disable_host(self, host_name):
"""
Disable host by technical name (host field in Zabbix)
"""
self.login()
hosts = self._request("host.get", {
"filter": {"host": [host_name]},
"output": ["hostid"]
})
if hosts and isinstance(hosts, list):
host_id = hosts[0]['hostid']
try:
# status: 1 = unmonitored (disabled)
result = self._request("host.update", {"hostid": host_id, "status": 1})
logger.info(f"Host {host_name} disabled: {result}")
return {"status": "disabled", "host": host_name}
except Exception as e:
logger.error(f"Failed to disable host {host_name}: {e}")
return {"status": "failed_disable", "host": host_name, "error": str(e)}
return {"status": "not_found", "host": host_name}
def update_host_group(self, host_id, new_group_id, append=False):
"""
Update host group.
:param host_id: Zabbix Host ID
:param new_group_id: Target Group ID
:param append: True to append group, False to replace ALL groups with new one
"""
self.login()
groups_payload = [{"groupid": new_group_id}]
if append:
# Get current groups
current_host = self._request("host.get", {
"hostids": host_id,
"selectGroups": ["groupid"]
})
if current_host and isinstance(current_host, list):
existing_groups = current_host[0].get("groups", [])
current_group_ids = {g["groupid"] for g in existing_groups}
if str(new_group_id) in current_group_ids:
return {"status": "skipped", "reason": "Already in group"}
# Merge
for g in existing_groups:
groups_payload.append({"groupid": g["groupid"]})
try:
# host.update replaces the 'groups' list
self._request("host.update", {
"hostid": host_id,
"groups": groups_payload
})
return {"status": "updated", "hostid": host_id, "groups": groups_payload}
except Exception as e:
logger.error(f"Failed to update host {host_id} group: {e}")
return {"status": "failed", "error": str(e)}
def update_host_details(self, host_name, description=None, tags=None, status=None):
"""
更新主机的描述、标签和状态
:param host_name: 主机技术名称 (host technical name)
:param description: 新的描述文本 (str)
:param tags: 标签列表 (list of dict), e.g. [{"tag": "Role", "value": "Switch"}]
注意: Zabbix API 的 host.update 会覆盖原有 tags,如需追加请先查询。
:param status: 状态 (0: 启用, 1: 禁用)
"""
self.login()
# 1. 获取 Host ID
hosts = self._request("host.get", {
"filter": {"host": [host_name]},
"output": ["hostid"]
})
if not hosts or not isinstance(hosts, list):
return {"status": "not_found", "host": host_name}
host_id = hosts[0]['hostid']
# 2. 构建更新参数
params = {"hostid": host_id}
if description is not None:
params["description"] = description
if tags is not None:
params["tags"] = tags
if status is not None:
params["status"] = int(status) # 确保是整数
# 如果没有要更新的字段,直接返回
if len(params) <= 1:
return {"status": "skipped", "reason": "No fields to update", "host": host_name}
try:
# 3. 调用 host.update
result = self._request("host.update", params)
logger.info(f"Host {host_name} updated details: {params.keys()}")
return {"status": "updated", "host": host_name, "hostid": result['hostids'][0]}
except Exception as e:
logger.error(f"Failed to update host {host_name}: {e}")
return {"status": "failed", "host": host_name, "error": str(e)}
def update_host_templates(self, host_id, new_template_ids, append=True):
"""
Update host templates.
:param host_id: Zabbix Host ID
:param new_template_ids: List of new template IDs (list of strings or ints)
:param append: True to append templates (merge), False to replace ALL templates
"""
self.login()
templates_payload = [{"templateid": str(tid)} for tid in new_template_ids]
if append:
# Get current templates
current_host = self._request("host.get", {
"hostids": host_id,
"selectParentTemplates": ["templateid"]
})
if current_host and isinstance(current_host, list):
existing_templates = current_host[0].get("parentTemplates", [])
current_template_ids = {t["templateid"] for t in existing_templates}
# Filter out templates that are already linked
to_add = []
for tid in new_template_ids:
if str(tid) not in current_template_ids:
to_add.append({"templateid": str(tid)})
if not to_add:
return {"status": "skipped", "reason": "All templates already linked"}
# Merge existing with new ones
templates_payload = [{"templateid": t["templateid"]} for t in existing_templates]
templates_payload.extend(to_add)
try:
# host.update replaces the 'templates' list
self._request("host.update", {
"hostid": host_id,
"templates": templates_payload
})
return {"status": "updated", "hostid": host_id, "templates": templates_payload}
except Exception as e:
logger.error(f"Failed to update host {host_id} templates: {e}")
return {"status": "failed", "error": str(e)}
def update_host_inventory_mode(self, host_id, mode=1):
"""
Update host inventory mode.
:param host_id: Zabbix Host ID
:param mode: 1=Automatic, 0=Manual, -1=Disabled
"""
self.login()
try:
self._request("host.update", {
"hostid": host_id,
"inventory_mode": int(mode)
})
return {"status": "updated", "hostid": host_id, "inventory_mode": mode}
except Exception as e:
logger.error(f"Failed to update host {host_id} inventory mode: {e}")
return {"status": "failed", "error": str(e)}
def create_host(self, host_technical_name, visible_name, group_id, interfaces, templates=None, macros=None, tags=None):
"""
Create a host in Zabbix with advanced options.
If host exists, it will re-activate the host and update its group/templates.
:param host_technical_name: Technical name (must be unique, e.g. serial)
:param visible_name: Visible name (can be non-unique, e.g. device name)
:param templates: List of template IDs, e.g. [{"templateid": "10001"}]
:param macros: List of macros, e.g. [{"macro": "{$MY_MACRO}", "value": "val"}]
:param tags: List of tags, e.g. [{"tag": "Role", "value": "Switch"}]
"""
self.login()
# Check if host exists (by technical name)
existing = self._request("host.get", {
"filter": {"host": [host_technical_name]},
"output": ["hostid", "status"]
})
if existing:
host_id = existing[0]['hostid']
# current_status = existing[0]['status']
status = "skipped"
msg_parts = ["Already exists"]
# 0. Check and Re-activate logic REMOVED based on user request.
# If a host is disabled (status=1), we leave it as is.
# We only proceed to update Group and Templates.
# 1. Update Group (Replace mode)
group_update_res = self.update_host_group(host_id, group_id, append=False)
if group_update_res.get("status") == "updated":
status = "updated_group"
msg_parts.append("Group updated")
elif group_update_res.get("status") == "failed":
msg_parts.append(f"Group update failed: {group_update_res.get('error')}")
# 2. Update Templates (Append mode)
if templates:
template_ids = [t['templateid'] for t in templates if 'templateid' in t]
if template_ids:
tmpl_update_res = self.update_host_templates(host_id, template_ids, append=True)
if tmpl_update_res.get("status") == "updated":
status = "updated_templates" if status == "skipped" else "updated_all"
msg_parts.append("Templates updated")
elif tmpl_update_res.get("status") == "failed":
msg_parts.append(f"Templates update failed: {tmpl_update_res.get('error')}")
# 3. Update Inventory Mode (Ensure it is Automatic)
inv_res = self.update_host_inventory_mode(host_id, mode=1)
if inv_res.get("status") == "updated":
# Usually we don't need to log this unless it failed, or if we want verbose details
pass
elif inv_res.get("status") == "failed":
msg_parts.append(f"Inventory update failed: {inv_res.get('error')}")
msg = "; ".join(msg_parts)
logger.info(f"Host {host_technical_name} processed: {msg}")
return {"status": status, "host": host_technical_name, "name": visible_name, "reason": msg, "hostid": host_id}
params = {
"host": host_technical_name,
"name": visible_name,
"interfaces": interfaces,
"groups": [{"groupid": group_id}],
"templates": templates or [],
"macros": macros or [],
"tags": tags or [],
"inventory_mode": 1 # 1 = Automatic inventory mode
}
# Clean empty lists
if not params["templates"]: del params["templates"]
if not params["macros"]: del params["macros"]
if not params["tags"]: del params["tags"]
try:
result = self._request("host.create", params)
logger.info(f"Host {host_technical_name} ({visible_name}) created: {result}")
return {"status": "created", "host": host_technical_name, "name": visible_name, "hostid": result['hostids'][0]}
except Exception as e:
logger.error(f"Failed to create host {host_technical_name}: {e}")
return {"status": "failed", "host": host_technical_name, "name": visible_name, "error": str(e)}
import json
import logging
import os
import concurrent.futures
from django.conf import settings
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from meraki_Interface_forward.services.juniper_service import fetch_juniper_devices
from meraki_Interface_forward.services.zabbix_service import ZabbixService
from meraki_Interface_forward.redis_utils import get_json, set_json
logger = logging.getLogger("meraki_Interface_forward.views.juniper_views")
# Redis Key for storing synced host serials (Set or List)
ZABBIX_JUNIPER_SYNCED_HOSTS_KEY = "zabbix:juniper_synced_hosts"
# Max workers for concurrent Zabbix API calls
MAX_WORKERS = 20
def get_default_juniper_group_name(dev_type):
if dev_type == "ap": # wireless
return "Juniper/Wireless"
if dev_type == "switch":
return "Juniper/Switch"
return "Juniper/Other"
@csrf_exempt
def sync_juniper_to_zabbix(request):
"""
Sync Juniper devices to Zabbix
"""
if request.method != "POST":
return JsonResponse({"message": "Method Not Allowed"}, status=405)
try:
logger.info("Starting Juniper to Zabbix synchronization task...")
body = {}
if request.body:
body = json.loads(request.body.decode("utf-8"))
juniper_api = body.get("juniperApi")
org_id = body.get("orgId")
api_token = body.get("apiToken")
site_group_map = body.get("siteGroupMap") or {}
disable_host_tag = body.get("disableHostTag")
logger.info(f"Request parameters - OrgID: {org_id}, Site Map Keys: {list(site_group_map.keys())}")
# 1. Get Devices
devices = fetch_juniper_devices(juniper_api, org_id, api_token)
current_device_map = {} # serial -> device
if devices:
for d in devices:
if isinstance(d, dict) and d.get("serial"):
current_device_map[d.get("serial")] = d
current_serials = set(current_device_map.keys())
logger.info(f"Identified {len(current_serials)} unique valid Juniper devices.")
# 2. Init Zabbix Service
try:
zabbix = ZabbixService()
zabbix_groups = zabbix.get_hostgroups() # {name: id}
except Exception as e:
return JsonResponse({"error": f"Zabbix connection failed: {str(e)}"}, status=500)
# 3. Redis & Sync Logic
cached_serials_list = get_json(ZABBIX_JUNIPER_SYNCED_HOSTS_KEY) or []
cached_serials = set(cached_serials_list)
to_disable = cached_serials - current_serials
logger.info(f"Diff calculation: {len(to_disable)} hosts to disable, {len(current_serials)} hosts to add/update.")
results = {
"created": [],
"skipped": [],
"failed": [],
"disabled": [],
"failed_disable": []
}
# 3.1 Disable removed hosts
if to_disable:
logger.info(f"Processing {len(to_disable)} disabled hosts with {MAX_WORKERS} threads...")
def _disable_task(s_serial):
desc = "该设备从Juniper设备接口 无法发现, 设备已经从Juniper平台删除!"
tags = None
if disable_host_tag and isinstance(disable_host_tag, dict):
tags = [disable_host_tag]
return zabbix.update_host_details(
host_name=s_serial,
description=desc,
tags=tags,
status=1
)
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
future_to_serial = {executor.submit(_disable_task, s): s for s in to_disable}
for future in concurrent.futures.as_completed(future_to_serial):
s = future_to_serial[future]
try:
res = future.result()
if res["status"] == "updated":
results["disabled"].append(res)
elif res["status"] == "not_found":
pass
else:
results["failed_disable"].append(res)
except Exception as exc:
logger.error(f"Disable task generated an exception for {s}: {exc}")
results["failed_disable"].append({"host": s, "error": str(exc)})
# Pre-fetch Template IDs
logger.info("Pre-fetching Zabbix template IDs...")
template_cache = {}
# Juniper templates + Common ICMP
target_templates = ["Env_Juniper_Wireless", "Env_Juniper_Switch", "Env_Meraki_ICMP_Ping"]
for tmpl_name in target_templates:
tid = zabbix.get_template_id_by_name(tmpl_name)
if tid:
template_cache[tmpl_name] = tid
else:
logger.warning(f"Template not found: {tmpl_name}")
# 3.2 Process Current Devices
logger.info(f"Processing {len(current_device_map)} devices with {MAX_WORKERS} threads...")
def _create_update_task(dev_info):
serial = dev_info.get("serial")
name = dev_info.get("name") # Or hostname? Using name as visible name
dev_type = dev_info.get("type") # switch or ap
site_name = dev_info.get("site_name")
mac = dev_info.get("mac")
dev_id = dev_info.get("id")
site_id = dev_info.get("site_id")
# Juniper doesn't explicitly provide IP in inventory endpoint usually,
# and the user stated that Juniper list API does NOT return interface info.
# So we pass an empty list for interfaces.
interfaces = []
# Target Group
target_group_name = None
if site_name and site_name in site_group_map:
target_group_name = site_group_map[site_name]
if not target_group_name and site_name and site_name in zabbix_groups:
target_group_name = site_name
if not target_group_name:
target_group_name = get_default_juniper_group_name(dev_type)
group_id = zabbix_groups.get(target_group_name)
if not group_id:
return {
"status": "failed",
"host": serial,
"name": name,
"reason": f"Target group '{target_group_name}' not found in Zabbix"
}
# Templates
templates = []
target_tmpl_name = None
if dev_type == "ap":
target_tmpl_name = "Env_Juniper_Wireless"
elif dev_type == "switch":
target_tmpl_name = "Env_Juniper_Switch"
if target_tmpl_name and target_tmpl_name in template_cache:
templates.append({"templateid": template_cache[target_tmpl_name]})
if "Env_Meraki_ICMP_Ping" in template_cache:
templates.append({"templateid": template_cache["Env_Meraki_ICMP_Ping"]})
# Macros
macros = [
{"macro": "{$SERIAL}", "value": serial or ""},
{"macro": "{$TYPE}", "value": dev_type or ""},
{"macro": "{$MAC}", "value": mac or ""},
{"macro": "{$ID}", "value": dev_id or ""},
{"macro": "{$SITE_ID}", "value": site_id or ""},
{"macro": "{$JUNIPER.API.ORGID}", "value": org_id or ""},
{"macro": "{$SNMP_COMMUNITY}", "value": "public"}
]
if api_token:
macros.append({"macro": "{$JUNIPER.API.TOEKN}", "value": api_token})
if juniper_api:
macros.append({"macro": "{$JUNIPER.API.URL}", "value": juniper_api})
# Tags
# User requested NOT to add any tags during creation.
tags = []
# Create Host
# Handle visible name conflict by appending serial if needed
# Since ZabbixService.create_host returns 'failed' on name conflict,
# we can try to catch it here or modify ZabbixService.
# But ZabbixService is shared. So let's handle retry here.
res = zabbix.create_host(serial, name, group_id, interfaces, templates=templates, macros=macros, tags=tags)
# Retry logic for visible name conflict
if res.get("status") == "failed" and "visible name" in str(res.get("error", "")).lower() and "already exists" in str(res.get("error", "")).lower():
new_name = f"{name}_{serial}"
logger.warning(f"Visible name conflict for {name} ({serial}). Retrying with {new_name}...")
res = zabbix.create_host(serial, new_name, group_id, interfaces, templates=templates, macros=macros, tags=tags)
res["group"] = target_group_name
return res
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
future_to_dev = {executor.submit(_create_update_task, d): d for d in current_device_map.values()}
for future in concurrent.futures.as_completed(future_to_dev):
d_info = future_to_dev[future]
s_serial = d_info.get("serial")
try:
res = future.result()
if res["status"] == "created":
results["created"].append(res)
elif res["status"] in ["skipped", "updated_group", "updated_templates", "updated_all", "reactivated", "reactivated_and_updated"]:
results["skipped"].append(res)
else:
results["failed"].append(res)
logger.error(f"Failed to create/update Juniper host {s_serial}: {res}")
except Exception as exc:
logger.error(f"Juniper task exception for {s_serial}: {exc}")
results["failed"].append({"host": s_serial, "error": str(exc)})
logger.info(f"Juniper Sync complete. Created: {len(results['created'])}, Skipped/Updated: {len(results['skipped'])}")
# 4. Update Cache & Backup
new_synced_list = list(current_serials)
set_json(ZABBIX_JUNIPER_SYNCED_HOSTS_KEY, new_synced_list)
logger.info("Updated Juniper Redis sync cache.")
# Backup to text file
try:
log_dir = settings.LOG_DIR
backup_file = os.path.join(log_dir, "zabbix_juniper_synced_hosts.txt")
with open(backup_file, "w", encoding="utf-8") as f:
for s in new_synced_list:
f.write(f"{s}\n")
logger.info(f"Backed up Juniper synced hosts list to {backup_file}")
except Exception as e:
logger.error(f"Failed to backup Juniper synced hosts to file: {e}")
return JsonResponse(results, safe=False, json_dumps_params={'indent': 2, 'ensure_ascii': False})
except Exception as e:
logger.exception("sync_juniper_to_zabbix failed")
return JsonResponse({"error": str(e)}, status=500)
import json
import logging
import os
import concurrent.futures
from django.conf import settings
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from meraki_Interface_forward.services.meraki_service import get_filtered_devices
from meraki_Interface_forward.services.zabbix_service import ZabbixService
from meraki_Interface_forward.redis_utils import get_json, set_json
logger = logging.getLogger("meraki_Interface_forward.views.zabbix_views")
# Redis Key for storing synced host serials (Set or List)
ZABBIX_SYNCED_HOSTS_KEY = "zabbix:synced_hosts"
# Max workers for concurrent Zabbix API calls
MAX_WORKERS = 20
DEFAULT_NETWORK_GROUP_MAP = {
"东莞办公室": "东莞办公室",
"丰宁二期-工厂": "丰宁工厂",
"乌兰二期-工厂": "乌兰察布工厂",
"乌兰察布工厂IT网络": "乌兰察布工厂",
"乐亭-工厂": "乐亭工厂",
"五河-工厂": "五河工厂",
"包头主机工厂": "包头主机工厂",
"包头储能工厂": "包头储能工厂",
"包头合金办公室": "包头合金工厂",
"包头合金工厂IT网络": "包头合金工厂",
"北京-办公室": "北京办公室",
"博荟广场C座5F": "上海博荟广场",
"博荟广场C座6F": "上海博荟广场",
"博荟广场C座15F": "上海博荟广场",
"博荟广场C座22F": "上海博荟广场",
"台前云中心": "台前工厂",
"呼和浩特办公室": "呼和浩特办公室",
"商都叶片工厂": "商都叶片工厂",
"大连-庄河工厂": "庄河工厂",
"天津-办公室": "天津办公室",
"如东叶片-工厂": "如东叶片工厂",
"山东-单县塔基工厂": "单县塔基工厂",
"巴彦淖尔二期-工厂": "巴彦淖尔工厂",
"广西南宁-工厂": "南宁工厂",
# "广西合浦-辅房": "停产",
"广西桂平-工厂": "桂平工厂",
"庆阳-工厂": "庆阳工厂",
"新疆-吉木乃工厂": "吉木乃工厂",
"新疆乌鲁木齐办公室": "乌鲁木齐办公室",
"武威储能": "武威工厂",
"武威叶片-工厂": "武威工厂",
"武威叶片宿舍食堂": "武威工厂",
"江阴RDC外仓": "江阴RDC外仓",
"江阴一期堆场": "江阴一期工厂",
"江阴一期篮球场": "江阴一期工厂",
"江阴一期车间": "江阴一期工厂",
"江阴一期辅房": "江阴一期工厂",
"江阴国家级供应商外仓": "江阴二期工厂",
"江阴宝湾国际物流园": "江阴二期工厂",
"江阴渔光会馆C区": "江阴二期工厂",
"江阴二期车间": "江阴二期工厂",
"江阴二期辅房": "江阴二期工厂",
"江阴-口罩厂": "江阴二期工厂",
"江阴三期": "江阴三期工厂",
"江阴四期-仓库": "江阴四期工厂",
"江阴四期变压办公室": "江阴四期工厂",
"江阴石庄仓库": "江阴齿轮箱厂",
"江阴齿轮办公室1F": "江阴齿轮箱厂",
"江阴传动链-工厂": "江阴齿轮箱厂",
"江阴制氢办公室": "江阴制氢办公室",
"江阴氢能星球工厂2#厂房": "江阴制氢办公室",
"江阴-小湖工厂": "江阴小湖工厂",
"沈阳工厂": "沈阳工厂",
"沙尔沁-工厂": "沙尔沁工厂",
"沙尔沁-辅房": "沙尔沁工厂",
"沙尔沁储能": "沙尔沁储能",
"河北沧州塔基工厂": "沧州塔基工厂",
"海兴工厂": "海兴工厂",
"海兴工厂二期": "海兴工厂",
"淮安-盱眙工厂": "盱眙工厂",
"濮阳工厂": "濮阳工厂",
"白城-工厂": "白城工厂",
"翁牛特旗-工厂": "翁牛特旗工厂",
"翁牛特旗-辅房": "翁牛特旗工厂",
"苍南轴承工厂": "苍南轴承工厂",
"若羌工厂": "若羌工厂",
"襄阳工厂": "襄阳工厂",
"赤峰元宝山P1期": "赤峰制氢工厂",
"赤峰元宝山制氢工厂": "赤峰制氢工厂",
"赤峰元宝山氢能3#": "赤峰制氢工厂",
"郎溪工厂": "郎溪工厂",
"酒泉工厂": "酒泉工厂",
"钦州-三期叶片工厂": "钦州工厂",
"陕西榆林云中心": "榆林工厂",
"高安-工厂": "高安工厂",
"高安-辅房": "高安工厂",
"魏县-工厂": "魏县工厂",
# ===== 海外 =====
"Bangalore Office": "印度办公室",
"Bangalore Office 11F": "印度办公室",
"Boston-Office": "波士顿办公室",
"Boulder-Office": "博尔德办公室",
"Brazil-Office": "巴西办公室",
"Denmark Office": "丹麦办公室",
"DuBai-Office": "迪拜办公室",
"GIC-WorkShop": "GIC-WorkShop",
"London-Office": "伦敦办公室",
"Melbourne-Office": "墨尔本办公室",
"Menlo Park": "Menlo Park",
"Singapore Office": "新加坡办公室",
"Spain-Office": "西班牙办公室"
}
def get_default_zabbix_group_name(product_type):
if product_type == "wireless":
return "思科设备/无线设备AP"
if product_type == "switch":
return "思科设备/交换机"
return "思科设备/其他"
@csrf_exempt
def sync_meraki_to_zabbix(request):
"""
同步 Meraki 设备到 Zabbix 主机
Body: {
"filter_networks": [...],
"networkGroupMap": {...}
}
"""
if request.method != "POST":
return JsonResponse({"message": "Method Not Allowed"}, status=405)
try:
logger.info("Starting Meraki to Zabbix synchronization task...")
body = {}
if request.body:
body = json.loads(request.body.decode("utf-8"))
filter_networks = body.get("filter_networks")
custom_map = body.get("networkGroupMap") or {}
meraki_url_base = body.get("merakiApi")
disable_host_tag = body.get("disableHostTag")
logger.info(f"Request parameters - Filter Networks: {len(filter_networks) if filter_networks else 0}, Custom Map Keys: {list(custom_map.keys())}, Meraki API: {meraki_url_base}")
# Merge maps: custom overrides default
network_group_map = DEFAULT_NETWORK_GROUP_MAP.copy()
if isinstance(custom_map, dict):
network_group_map.update(custom_map)
# 1. Get Devices
logger.info("Fetching Meraki devices with filters...")
devices = get_filtered_devices(filter_networks)
logger.info(f"Fetched {len(devices)} devices from Meraki service.")
# Even if devices is empty, we might need to disable hosts, so we don't return early if devices is empty,
# unless it was truly an error. get_filtered_devices returns [] on empty.
current_device_map = {} # serial -> device
if devices:
for d in devices:
if isinstance(d, dict) and d.get("serial"):
current_device_map[d.get("serial")] = d
current_serials = set(current_device_map.keys())
logger.info(f"Identified {len(current_serials)} unique valid devices.")
# 2. Init Zabbix Service
try:
logger.info("Initializing Zabbix service and fetching host groups...")
zabbix = ZabbixService()
zabbix_groups = zabbix.get_hostgroups() # {name: id}
logger.info(f"Fetched {len(zabbix_groups)} host groups from Zabbix.")
except Exception as e:
logger.error(f"Failed to initialize Zabbix service: {e}")
return JsonResponse({"error": f"Zabbix connection failed: {str(e)}"}, status=500)
# 3. Redis & Sync Logic
# Get cached serials (previous state)
cached_serials_list = get_json(ZABBIX_SYNCED_HOSTS_KEY) or []
cached_serials = set(cached_serials_list)
logger.info(f"Loaded {len(cached_serials)} cached serials from Redis.")
# Calculate Diff
to_disable = cached_serials - current_serials
# to_add_or_update = current_serials # We process all current devices to ensure they exist
logger.info(f"Diff calculation: {len(to_disable)} hosts to disable, {len(current_serials)} hosts to add/update.")
results = {
"created": [],
"skipped": [],
"failed": [],
"disabled": [],
"failed_disable": []
}
# 3.1 Disable removed hosts
if to_disable:
logger.info(f"Processing {len(to_disable)} disabled hosts with {MAX_WORKERS} threads...")
def _disable_task(s_serial):
# Prepare update parameters
desc = "该设备从meraki设备接口 无法发现, 设备已经从meraki平台删除!"
tags = None
if disable_host_tag and isinstance(disable_host_tag, dict):
# Note: This replaces existing tags. If we want to append, we'd need to fetch first.
# But for disabled hosts, replacing or setting a specific tag is usually fine.
# Zabbix API requires tags to be a list of objects.
tags = [disable_host_tag]
# Update host details (Status=1 means disable)
return zabbix.update_host_details(
host_name=s_serial,
description=desc,
tags=tags,
status=1
)
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
future_to_serial = {executor.submit(_disable_task, s): s for s in to_disable}
for future in concurrent.futures.as_completed(future_to_serial):
s = future_to_serial[future]
try:
res = future.result()
# update_host_details returns "updated" on success
if res["status"] == "updated":
results["disabled"].append(res)
elif res["status"] == "not_found":
# It might have been deleted manually
pass
else:
results["failed_disable"].append(res)
logger.warning(f"Failed to disable host {s}: {res}")
except Exception as exc:
logger.error(f"Disable task generated an exception for {s}: {exc}")
results["failed_disable"].append({"host": s, "error": str(exc)})
logger.info(f"Disabled processing complete. Success: {len(results['disabled'])}, Failed: {len(results['failed_disable'])}")
# Pre-fetch Template IDs
logger.info("Pre-fetching Zabbix template IDs...")
template_cache = {}
# Added Env_Meraki_ICMP_Ping to the list
for tmpl_name in ["Env_Meraki_Wireless_Template", "Env_Meraki_Switch_Template", "Env_Meraki_ICMP_Ping"]:
tid = zabbix.get_template_id_by_name(tmpl_name)
if tid:
template_cache[tmpl_name] = tid
logger.info(f"Template '{tmpl_name}' found, ID: {tid}")
else:
logger.warning(f"Template not found: {tmpl_name}")
# 3.2 Process Current Devices (Add/Update)
# Filter valid devices first (wireless/switch only)
valid_devices_to_sync = []
for serial, dev in current_device_map.items():
p_type = dev.get("productType")
if p_type not in ["wireless", "switch"]:
# Skip non-supported types
continue
valid_devices_to_sync.append(dev)
logger.info(f"Processing {len(valid_devices_to_sync)} valid devices (wireless/switch) with {MAX_WORKERS} threads...")
def _create_update_task(dev_info):
serial = dev_info.get("serial")
name = dev_info.get("name") # Visible Name
product_type = dev_info.get("productType")
network_name = dev_info.get("networkName")
meraki_tags = dev_info.get("tags") or []
# Determine IP (Priority: lanIp -> publicIp -> None)
ip = dev_info.get("lanIp")
if not ip:
ip = dev_info.get("publicIp")
# Construct SNMP Interface
interfaces = []
if ip:
interfaces.append({
"type": 2, # SNMP
"main": 1,
"useip": 1,
"ip": ip,
"dns": "",
"port": "161",
"details": {
"version": 2,
"community": "{$SNMP_COMMUNITY}"
}
})
else:
# Without IP, we use 127.0.0.1 placeholder for SNMP interface
interfaces.append({
"type": 2,
"main": 1,
"useip": 1,
"ip": "127.0.0.1",
"dns": "",
"port": "161",
"details": {
"version": 2,
"community": "{$SNMP_COMMUNITY}"
}
})
# Determine Target Group
target_group_name = None
# Priority 1: Map
if network_name and network_name in network_group_map:
target_group_name = network_group_map[network_name]
# Priority 2: Exact Match in Zabbix (if networkName exists as a Zabbix Group)
if not target_group_name and network_name and network_name in zabbix_groups:
target_group_name = network_name
# Priority 3: Default Rule
if not target_group_name:
target_group_name = get_default_zabbix_group_name(product_type)
# Resolve Group ID
group_id = zabbix_groups.get(target_group_name)
if not group_id:
return {
"status": "failed",
"host": serial,
"name": name,
"reason": f"Target group '{target_group_name}' not found in Zabbix"
}
# Resolve Template ID
templates = []
target_template_name = None
if product_type == "wireless":
target_template_name = "Env_Meraki_Wireless_Template"
elif product_type == "switch":
target_template_name = "Env_Meraki_Switch_Template"
if target_template_name and target_template_name in template_cache:
templates.append({"templateid": template_cache[target_template_name]})
# Always add ICMP Ping template
if "Env_Meraki_ICMP_Ping" in template_cache:
templates.append({"templateid": template_cache["Env_Meraki_ICMP_Ping"]})
# Macros
macros = [
{"macro": "{$SERIAL}", "value": serial or ""},
{"macro": "{$TYPE}", "value": product_type or ""},
{"macro": "{$SNMP_COMMUNITY}", "value": "public"} # Default community
]
if meraki_url_base:
macros.append({"macro": "{$MERAKI_URL}", "value": meraki_url_base})
# Tags
tags = []
for t in meraki_tags:
tags.append({"tag": "meraki", "value": t})
# Create Host
res = zabbix.create_host(serial, name, group_id, interfaces, templates=templates, macros=macros, tags=tags)
# Inject group info into result for logging/response
res["group"] = target_group_name
return res
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
future_to_dev = {executor.submit(_create_update_task, d): d for d in valid_devices_to_sync}
for future in concurrent.futures.as_completed(future_to_dev):
d_info = future_to_dev[future]
s_serial = d_info.get("serial")
s_name = d_info.get("name")
try:
res = future.result()
if res["status"] == "created":
results["created"].append(res)
elif res["status"] in ["skipped", "updated_group"]:
results["skipped"].append(res)
else:
results["failed"].append(res)
logger.error(f"Failed to create/update host {s_serial}: {res}")
except Exception as exc:
logger.error(f"Create/Update task generated an exception for {s_serial}: {exc}")
results["failed"].append({"host": s_serial, "name": s_name, "error": str(exc)})
logger.info(f"Sync complete. Created: {len(results['created'])}, Skipped/Updated: {len(results['skipped'])}, Failed: {len(results['failed'])}")
# 4. Update Cache & Backup
# Only update cache if we successfully processed at least some devices or if we intended to clear
# But generally, we update cache to reflect the current state (current_serials)
# Exception: if current_serials is empty but we had cached_serials, it means all were disabled.
new_synced_list = list(current_serials)
set_json(ZABBIX_SYNCED_HOSTS_KEY, new_synced_list) # No expire (permanent)
logger.info("Updated Redis sync cache.")
# Backup to text file
try:
log_dir = settings.LOG_DIR
backup_file = os.path.join(log_dir, "zabbix_synced_hosts.txt")
with open(backup_file, "w", encoding="utf-8") as f:
for s in new_synced_list:
f.write(f"{s}\n")
logger.info(f"Backed up synced hosts list to {backup_file}")
except Exception as e:
logger.error(f"Failed to backup synced hosts to file: {e}")
return JsonResponse(results, safe=False, json_dumps_params={'indent': 2, 'ensure_ascii': False})
except Exception as e:
logger.exception("sync_meraki_to_zabbix failed")
return JsonResponse({"error": str(e)}, status=500)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment