Commit beb869df by sanshi

自动发现

parent f9bced93
......@@ -10,9 +10,17 @@
<w>businessids</w>
<w>contextname</w>
<w>correlationid</w>
<w>dcheckid</w>
<w>dchecks</w>
<w>departmentid</w>
<w>dhostids</w>
<w>dingding</w>
<w>discoveryids</w>
<w>drule</w>
<w>druleid</w>
<w>druleids</w>
<w>drules</w>
<w>dserviceids</w>
<w>evaltype</w>
<w>eventid</w>
<w>eventids</w>
......@@ -33,11 +41,13 @@
<w>hosttype</w>
<w>hosttypeids</w>
<w>hstgrp</w>
<w>icmp</w>
<w>icmpping</w>
<w>interfaceid</w>
<w>interfaceids</w>
<w>iplist</w>
<w>ipmi</w>
<w>iprange</w>
<w>itemid</w>
<w>itemids</w>
<w>itsm</w>
......@@ -56,6 +66,7 @@
<w>monitorid</w>
<w>mtime</w>
<w>networkmonitor</w>
<w>nextcheck</w>
<w>nodata</w>
<w>objectid</w>
<w>objectids</w>
......@@ -92,6 +103,8 @@
<w>triggerid</w>
<w>triggerids</w>
<w>triggerprototype</w>
<w>uname</w>
<w>uniq</w>
<w>useip</w>
<w>userid</w>
<w>valuemapid</w>
......
# -*- coding: utf-8 -*-
# 创建发现规则
# 作者: 陈磊
# 时间: 2019-12-06
from WorkApi.ApiBase import ApiBase
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
class DRuleCreate(object):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
self.log = UtilsLog()
self.log.info("调用创建发现规则")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/drule/create"
self._url = self._host + self._path
self._params = {}
self._json = {}
self.response = ""
self.druleid = None
self.iprange = None
self.name = None
self.delay = None
self.nextcheck = None
self.proxy_hostid = None
self.status = None
self.dchecks = None
self.dchecks_type = None
self.dchecks_key_ = None
self.dchecks_snmp_community = None
self.dchecks_ports = None
self.dchecks_host_source = None
self.dchecks_name_source = None
self.api = UtilsRequest()
def get_response(self):
if self.dchecks_type is not None:
self.dchecks = []
for x, y in enumerate(self.dchecks_type):
base = ApiBase()
base.dict_add_key(_key="type", value=self.dchecks_type, num=x)
base.dict_add_key(_key="key_", value=self.dchecks_key_, num=x)
base.dict_add_key(_key="snmp_community", value=self.dchecks_snmp_community, num=x)
base.dict_add_key(_key="ports", value=self.dchecks_ports, num=x)
base.dict_add_key(_key="host_source", value=self.dchecks_host_source, num=x)
base.dict_add_key(_key="name_source", value=self.dchecks_name_source, num=x)
self.dchecks.append(base._json)
else:
pass
base = ApiBase()
base.dict_add_key(_key="druleid", value=self.druleid)
base.dict_add_key(_key="iprange", value=self.iprange)
base.dict_add_key(_key="name", value=self.name)
base.dict_add_key(_key="delay", value=self.delay)
base.dict_add_key(_key="nextcheck", value=self.nextcheck)
base.dict_add_key(_key="proxy_hostid", value=self.proxy_hostid)
base.dict_add_key(_key="status", value=self.status)
base.dict_add_key(_key="dchecks", value=self.dchecks)
self._json = base._json
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 删除发现规则
# 作者: 陈磊
# 时间: 2019-12-06
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
from WorkApi.ApiBase import ApiBase, GetBase
class DRuleDelete(object):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
self.log = UtilsLog()
self.log.info("调用删除发现规则")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/drule/delete"
self._url = self._host + self._path
self._params = {}
self._json = {}
self._data = {}
self.response = ""
self.local_json = {}
self.hostids = None
self.api = UtilsRequest()
def get_response(self):
base = ApiBase()
base.dict_add_key(_key="hostids", value=self.hostids)
self._json = base._json
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 查询发现规则
# 作者: 陈磊
# 时间: 2019-12-06
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
from WorkApi.ApiBase import ApiBase, GetBase
class DRuleGet(GetBase):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
super().__init__()
self.log = UtilsLog()
self.log.info("调用查询发现规则")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/drule/get"
self._url = self._host + self._path
self._params = {}
self._json = {}
self._data = {}
self.response = ""
self.local_json = {}
self.dhostids = None
self.druleids = None
self.dserviceids = None
self.api = UtilsRequest()
def get_response(self):
self.get_base_json()
base = ApiBase()
base.dict_add_key(_key="dhostids", value=self.dhostids)
base.dict_add_key(_key="druleids", value=self.druleids)
base.dict_add_key(_key="dserviceids", value=self.dserviceids)
self.local_json = base._json
add = ApiBase()
add.dict_add_dict(_dict_1=self.local_json, _dict_2=self.base_json)
self._json = add._json
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 更新发现规则
# 作者: 陈磊
# 时间: 2019-12-06
from WorkApi.ApiBase import ApiBase
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
class DRuleUpdate(object):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
self.log = UtilsLog()
self.log.info("调用更新发现规则")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/drule/update"
self._url = self._host + self._path
self._params = {}
self._json = {}
self.response = ""
self.druleid = None
self.iprange = None
self.name = None
self.delay = None
self.nextcheck = None
self.proxy_hostid = None
self.status = None
self.dchecks = None
self.dchecks_type = None
self.dchecks_key_ = None
self.dchecks_snmp_community = None
self.dchecks_ports = None
self.dchecks_host_source = None
self.dchecks_name_source = None
self.api = UtilsRequest()
def get_response(self):
if self.dchecks_type is not None:
self.dchecks = []
for x, y in enumerate(self.dchecks_type):
base = ApiBase()
base.dict_add_key(_key="type", value=self.dchecks_type, num=x)
base.dict_add_key(_key="key_", value=self.dchecks_key_, num=x)
base.dict_add_key(_key="snmp_community", value=self.dchecks_snmp_community, num=x)
base.dict_add_key(_key="ports", value=self.dchecks_ports, num=x)
base.dict_add_key(_key="host_source", value=self.dchecks_host_source, num=x)
base.dict_add_key(_key="name_source", value=self.dchecks_name_source, num=x)
self.dchecks.append(base._json)
else:
pass
base = ApiBase()
base.dict_add_key(_key="druleid", value=self.druleid)
base.dict_add_key(_key="iprange", value=self.iprange)
base.dict_add_key(_key="name", value=self.name)
base.dict_add_key(_key="delay", value=self.delay)
base.dict_add_key(_key="nextcheck", value=self.nextcheck)
base.dict_add_key(_key="proxy_hostid", value=self.proxy_hostid)
base.dict_add_key(_key="status", value=self.status)
base.dict_add_key(_key="dchecks", value=self.dchecks)
self._json = base._json
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf_8 -*-
# 表名: dchecks
# 作者: 陈磊
# 时间: 2019-12-06
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, VARCHAR, BIGINT, SMALLINT, DATETIME, DECIMAL, BigInteger, FLOAT, NUMERIC, TEXT
from sqlalchemy.orm.exc import MultipleResultsFound, NoResultFound
from sqlalchemy import or_
from WorkUtils.UtilsLog import UtilsLog
from WorkUtils.UtilsDataBase import UtilsDataBase
import time
Base = declarative_base()
class Dchecks(Base):
"""
发现规则检测表
"""
__tablename__ = "dchecks"
dcheckid = Column(BigInteger, primary_key=True)
druleid = Column(BigInteger)
type = Column(Integer)
key_ = Column(VARCHAR(255))
snmp_community = Column(VARCHAR(255))
ports = Column(VARCHAR(255))
snmpv3_securityname = Column(VARCHAR(64))
snmpv3_securitylevel = Column(Integer)
snmpv3_authpassphrase = Column(VARCHAR(64))
snmpv3_privpassphrase = Column(VARCHAR(64))
uniq = Column(Integer)
snmpv3_authprotocol = Column(Integer)
snmpv3_privprotocol = Column(Integer)
snmpv3_contextname = Column(VARCHAR(255))
# 1:DNS 2:IP
host_source = Column(Integer)
name_source = Column(Integer)
class DataDchecks(object):
def __init__(self):
self.log = UtilsLog()
self.log.info(self.__class__)
self.table = Dchecks
def select_all_from_allKeys(self, session, dcheckid=None, druleid=None):
"""
:param session: 指针
:param dcheckid:
:param druleid:
:return: 查询结果
"""
self.log.debug("查询数据库:")
self.log.debug("dcheckid: %s" % dcheckid)
self.log.debug("druleid: %s" % druleid)
try:
base = UtilsDataBase()
base.add_param(_key="dcheckid", value=dcheckid)
base.add_param(_key="druleid", value=druleid)
sql_rep = session.query(self.table).filter_by(**base.param).all()
self.log.debug(sql_rep)
session.close()
return sql_rep
except UtilsDataBase().errors as error:
self.log.error("异常:")
self.log.error(error)
session.close()
return error
# -*- coding: utf_8 -*-
# 表名: drules
# 作者: 陈磊
# 时间: 2019-12-06
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, VARCHAR, BIGINT, SMALLINT, DATETIME, DECIMAL, BigInteger, FLOAT, NUMERIC, TEXT
from sqlalchemy.orm.exc import MultipleResultsFound, NoResultFound
from sqlalchemy import or_
from WorkUtils.UtilsLog import UtilsLog
from WorkUtils.UtilsDataBase import UtilsDataBase
import time
Base = declarative_base()
class Drules(Base):
"""
发现规则表
"""
__tablename__ = "drules"
druleid = Column(BigInteger, primary_key=True)
proxy_hostid = Column(BigInteger)
name = Column(VARCHAR(255))
iprange = Column(VARCHAR(2048))
delay = Column(VARCHAR(255))
nextcheck = Column(Integer)
status = Column(Integer)
class DataDrules(object):
def __init__(self):
self.log = UtilsLog()
self.log.info(self.__class__)
self.table = Drules
def select_all_from_allKeys(self, session, druleid=None, name=None, iprange=None):
"""
:param session: 指针
:param druleid:
:param name:
:param iprange:
:return: 查询结果
"""
self.log.debug("查询数据库:")
self.log.debug("druleid: %s" % druleid)
self.log.debug("name: %s" % name)
self.log.debug("iprange: %s" % iprange)
try:
base = UtilsDataBase()
base.add_param(_key="druleid", value=druleid)
base.add_param(_key="name", value=name)
base.add_param(_key="iprange", value=iprange)
sql_rep = session.query(self.table).filter_by(**base.param).all()
self.log.debug(sql_rep)
session.close()
return sql_rep
except UtilsDataBase().errors as error:
self.log.error("异常:")
self.log.error(error)
session.close()
return error
def delete_like_name(self, session, name):
"""
:param session: 指针
:param name:
:return:
"""
self.log.debug("查询数据库:")
try:
self.log.debug("删除相关测试数据")
session.query(self.table).filter(self.table.name.like(name + "%")).delete(synchronize_session=False)
# session.delete(sql)
session.commit()
session.close()
self.log.debug("删除成功")
return True
except UtilsDataBase().errors as error:
self.log.error("异常:")
self.log.error(error)
session.close()
return error
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment