Commit 17933444 by sanshi

all

parent b302ff89
# -*- coding: utf-8 -*-
# 创建动作
# 作者: 陈磊
# 时间: 2019-10-31
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
class ActionCreate(object):
def __init__(self, _host, esc_period=None, name=None, eventsource=None, def_shortdata=None, def_longdata=None):
"""
:param _host: 域名
:param esc_period:
:param name:
:param eventsource:
:param def_shortdata:
:param def_longdata:
:return:
"""
self.log = UtilsLog()
self.log.info("调用创建动作")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/action/create"
self._url = self._host + self._path
self._params = {}
self._json = {}
self.response = ""
self.esc_period = esc_period
self.eventsource = eventsource
self.name = name
self.def_shortdata = def_shortdata
self.def_longdata = def_longdata
self.filter = {}
self.filter_conditions = []
self.operations = []
self.operations_opconditions = []
self.operations_opmessage_grp = []
self.operations_opmessage = []
self.operations_opcommand = []
self.recovery_operations = []
self.recovery_operations_opmessage = []
self.acknowledge_operations = []
self.acknowledge_operations_opmessage = []
self.api = UtilsRequest()
def filter_conditions_add(self, conditiontype=None, operator=None, value=None):
self.filter_conditions.append({
"conditiontype": conditiontype,
"operator": operator,
"value": value
})
def operations_opconditions_add(self, conditiontype=None, operator=None, value=None):
self.operations_opconditions.append({
"conditiontype": conditiontype,
"operator": operator,
"value": value
})
def operations_opmessage_grp_add(self, opcommand_grpid=None, operationid=None, groupid=None):
self.operations_opmessage_grp.append({
"opcommand_grpid": opcommand_grpid,
"operationid": operationid,
"groupid": groupid
})
def operations_opmessage_add(self, operationid=None, default_msg=None, mediatypeid=None, message=None, subject=None):
self.operations_opmessage.append({
"operationid": operationid,
"default_msg": default_msg,
"mediatypeid": mediatypeid,
"message": message,
"subject": subject
})
def get_response(self):
self._json = {
"esc_period": self.esc_period,
"eventsource": self.eventsource,
"name": self.name,
"def_shortdata": self.def_shortdata,
"def_longdata": self.def_longdata,
}
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 查询动作
# 作者: 陈磊
# 时间: 2019-11-01
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
from WorkApi.ApiBase import ApiBase, GetBase
class ActionGet(GetBase):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
super().__init__()
self.log = UtilsLog()
self.log.info("调用查询动作")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/action/get"
self._url = self._host + self._path
self._params = {}
self._json = {}
self._data = {}
self.response = ""
self.local_json = {}
self.actionids = None
self.groupids = None
self.hostids = None
self.triggerids = None
self.mediatypeids = None
self.usrgrpids = None
self.userids = None
self.scriptids = None
self.selectFilter = None
self.selectOperations = None
self.selectRecoveryOperations = None
self.selectAcknowledgeOperations = None
self.api = UtilsRequest()
def get_response(self):
self.get_base_json()
base = ApiBase()
base.dict_add_key(_key="actionids", value=self.actionids)
base.dict_add_key(_key="groupids", value=self.groupids)
base.dict_add_key(_key="hostids", value=self.hostids)
base.dict_add_key(_key="triggerids", value=self.triggerids)
base.dict_add_key(_key="mediatypeids", value=self.mediatypeids)
base.dict_add_key(_key="usrgrpids", value=self.usrgrpids)
base.dict_add_key(_key="userids", value=self.userids)
base.dict_add_key(_key="scriptids", value=self.scriptids)
base.dict_add_key(_key="selectFilter", value=self.selectFilter)
base.dict_add_key(_key="selectOperations", value=self.selectOperations)
base.dict_add_key(_key="selectRecoveryOperations", value=self.selectRecoveryOperations)
base.dict_add_key(_key="selectAcknowledgeOperations", value=self.selectAcknowledgeOperations)
self.local_json = base._json
# self.log.debug(self.local_json)
# self.log.debug(self.base_json)
# 当前接口支持的参数与公共参数合并
add = ApiBase()
add.dict_add_dict(_dict_1=self.local_json, _dict_2=self.base_json)
self._json = add._json
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 查询版本号
# 作者: 陈磊
# 时间: 2019-10-23
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
class ApiInfoVersion(object):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
self.log = UtilsLog()
self.log.info("调用查询版本号")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/apiinfo/version"
self._url = self._host + self._path
self._params = {}
self._json = {}
self.response = ""
self.api = UtilsRequest()
def get_response(self):
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 创建应用
# 作者: 陈磊
# 时间: 2019-10-22
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
class ApplicationCreate(object):
def __init__(self, _host, applicationid=None, hostid=None, name=None, flags=None, templateids=None):
"""
:param _host: 域名
:param applicationid: 应用ID
:param hostid : 主机组的ID
:param name: 应用名称
:param flags: 应用程序的来源
:param templateids: 父模板应用程序的ID
:return:
"""
self.log = UtilsLog()
self.log.info("调用创建应用")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/application/create"
self._url = self._host + self._path
self._params = {}
self._json = {}
self.response = ""
self.applicationid = applicationid
self.hostid = hostid
self.name = name
self.flags = flags
self.templateids = templateids
self.api = UtilsRequest()
def get_response(self):
self._json = {
"applicationid": self.applicationid,
"hostid": self.hostid,
"name": self.name,
"flags": self.flags,
"templateids": self.templateids
}
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 删除应用
# 作者: 陈磊
# 时间: 2019-10-23
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
class ApplicationDelete(object):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
self.log = UtilsLog()
self.log.info("调用删除应用")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/application/delete"
self._url = self._host + self._path
self._params = {}
self._json = {}
self.response = ""
self.applicationIds = []
self.api = UtilsRequest()
def applicationIds_add(self, applicationId):
"""
:param applicationId:
:return:
"""
self.applicationIds.append(applicationId)
def get_response(self):
self._json = self.applicationIds
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 更新应用
# 作者: 陈磊
# 时间: 2019-10-23
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
class ApplicationUpdate(object):
def __init__(self, _host, applicationid=None, hostid=None, name=None, flags=None, templateids=None):
"""
:param _host: 域名
:param applicationid: 应用ID
:param hostid : 主机组的ID
:param name: 应用名称
:param flags: 应用程序的来源
:param templateids: 父模板应用程序的ID
:return:
"""
self.log = UtilsLog()
self.log.info("调用更新应用")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/application/update"
self._url = self._host + self._path
self._params = {}
self._json = {}
self.response = ""
self.applicationid = applicationid
self.hostid = hostid
self.name = name
self.flags = flags
self.templateids = templateids
self.api = UtilsRequest()
def get_response(self):
self._json = {
"applicationid": self.applicationid,
"hostid": self.hostid,
"name": self.name,
"flags": self.flags,
"templateids": self.templateids
}
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 确认事件
# 作者: 陈磊
# 时间: 2019-10-24
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
class EventAcknowledge(object):
def __init__(self, _host, eventids=None, action=None, message=None, severity=None):
"""
:param _host: 域名
:param eventids: 应用ID
:param action : 主机组的ID
:param message: 应用名称
:param severity: 应用程序的来源
:return:
"""
self.log = UtilsLog()
self.log.info("调用确认事件")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/event/acknowledge"
self._url = self._host + self._path
self._params = {}
self._json = {}
self.response = ""
self.eventids = eventids
self.action = action
self.message = message
self.severity = severity
self.api = UtilsRequest()
def get_response(self):
self._json = {
"eventids": self.eventids,
"action": self.action,
"message": self.message,
"severity": self.severity,
}
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 查询历史
# 作者: 陈磊
# 时间: 2019-11-01
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
from WorkApi.ApiBase import ApiBase, GetBase
class HistoryGet(GetBase):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
super().__init__()
self.log = UtilsLog()
self.log.info("调用查询历史")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/history/get"
self._url = self._host + self._path
self._params = {}
self._json = {}
self._data = {}
self.response = ""
self.local_json = {}
self.history = None
self.hostids = None
self.itemids = None
self.time_from = None
self.time_till = None
self.api = UtilsRequest()
def get_response(self):
self.get_base_json()
base = ApiBase()
base.dict_add_key(_key="history", value=self.history)
base.dict_add_key(_key="hostids", value=self.hostids)
base.dict_add_key(_key="itemids", value=self.itemids)
base.dict_add_key(_key="time_from", value=self.time_from)
base.dict_add_key(_key="time_till", value=self.time_till)
self.local_json = base._json
# self.log.debug(self.local_json)
# self.log.debug(self.base_json)
# 当前接口支持的参数与公共参数合并
add = ApiBase()
add.dict_add_dict(_dict_1=self.local_json, _dict_2=self.base_json)
self._json = add._json
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 创建主机
# 作者: 陈磊
# 时间: 2019-10-22
from WorkApi.ApiBase import ApiBase
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
class HostCreate(object):
def __init__(self, _host, host=None, description=None, groupids=None, interfaces=None, tags=None, templateids=None, macros=None,
inventory=None, status=None, tls_accept=None, tls_psk_identity=None, tls_psk=None):
"""
:param _host: 域名
:param host:
:param description:
:param groupids:
:param interfaces:
:param tags:
:param templateids:
:param macros:
:param inventory:
:param status:
:param tls_accept:
:param tls_psk_identity:
:param tls_psk:
:return:
"""
self.log = UtilsLog()
self.log.info("调用创建主机")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/host/create"
self._url = self._host + self._path
self._params = {}
self._json = {}
self.local_json = {}
self.response = ""
self.host = host
self.description = description
self.groupids = groupids
self.interfaces = interfaces
self.tags = tags
self.templateids = templateids
self.macros = macros
self.inventory = inventory
self.status = status
self.tls_accept = tls_accept
self.tls_psk_identity = tls_psk_identity
self.tls_psk = tls_psk
self.api = UtilsRequest()
def get_response(self):
base = ApiBase()
base.dict_add_key(_key="host", value=self.host)
base.dict_add_key(_key="description", value=self.description)
base.dict_add_key(_key="groupids", value=self.groupids)
base.dict_add_key(_key="interfaces", value=self.interfaces)
base.dict_add_key(_key="tags", value=self.tags)
base.dict_add_key(_key="templateids", value=self.templateids)
base.dict_add_key(_key="macros", value=self.macros)
base.dict_add_key(_key="inventory", value=self.inventory)
base.dict_add_key(_key="status", value=self.status)
base.dict_add_key(_key="tlsAccept", value=self.tls_accept)
base.dict_add_key(_key="tlsPskIdentity", value=self.tls_psk_identity)
base.dict_add_key(_key="tlsPsk", value=self.tls_psk)
self.local_json = base._json
self._json = self.local_json
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 创建主机组
# 作者: 陈磊
# 时间: 2019-10-22
from WorkApi.ApiBase import ApiBase
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
class HostGroupCreate(object):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
self.log = UtilsLog()
self.log.info("调用创建主机组")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/hostgroup/create"
self._url = self._host + self._path
self._params = {}
self._json = {}
self.local_json = {}
self.response = ""
self.name = None
self.groupid = None
self.flags = None
self.internal = None
self.api = UtilsRequest()
def get_response(self):
base = ApiBase()
base.dict_add_key(_key="name", value=self.name)
base.dict_add_key(_key="groupid", value=self.groupid)
base.dict_add_key(_key="flags", value=self.flags)
base.dict_add_key(_key="internal", value=self.internal)
self.local_json = base._json
self._json = self.local_json
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 删除主机组
# 作者: 陈磊
# 时间: 2019-10-22
from WorkApi.ApiBase import ApiBase
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
class HostGroupDelete(object):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
self.log = UtilsLog()
self.log.info("调用删除主机组")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/hostgroup/delete"
self._url = self._host + self._path
self._params = {}
self._json = {}
self.local_json = {}
self.response = ""
self.groupids = None
self.api = UtilsRequest()
def get_response(self):
base = ApiBase()
base.dict_add_key(_key="groupids", value=self.groupids)
self.local_json = base._json
self._json = self.local_json
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 查询主机组
# 作者: 陈磊
# 时间: 2019-10-21
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
from WorkApi.ApiBase import ApiBase, GetBase
class HostGroupGet(GetBase):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
super().__init__()
self.log = UtilsLog()
self.log.info("调用查询主机组")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/hostgroup/get"
self._url = self._host + self._path
self._params = {}
self._json = {}
self._data = {}
self.response = ""
self.local_json = {}
self.graphids = None
self.groupids = None
self.hostids = None
self.maintenanceids = None
self.monitored_hosts = None
self.real_hosts = None
self.templated_hosts = None
self.templateids = None
self.triggerids = None
self.with_applications = None
self.with_graphs = None
self.with_hosts_and_templates = None
self.with_httptests = None
self.with_items = None
self.with_monitored_httptests = None
self.with_monitored_items = None
self.with_monitored_triggers = None
self.with_simple_graph_items = None
self.with_triggers = None
self.selectDiscoveryRule = None
self.selectGroupDiscovery = None
self.selectHosts = None
self.selectTemplates = None
self.limitSelects = None
self.api = UtilsRequest()
def get_response(self):
self.get_base_json()
base = ApiBase()
base.dict_add_key(_key="graphids", value=self.graphids)
base.dict_add_key(_key="groupids", value=self.groupids)
base.dict_add_key(_key="hostids", value=self.hostids)
base.dict_add_key(_key="maintenanceids", value=self.maintenanceids)
base.dict_add_key(_key="monitored_hosts", value=self.monitored_hosts)
base.dict_add_key(_key="real_hosts", value=self.real_hosts)
base.dict_add_key(_key="templated_hosts", value=self.templated_hosts)
base.dict_add_key(_key="templateids", value=self.templateids)
base.dict_add_key(_key="triggerids", value=self.triggerids)
base.dict_add_key(_key="with_applications", value=self.with_applications)
base.dict_add_key(_key="with_graphs", value=self.with_graphs)
base.dict_add_key(_key="with_hosts_and_templates", value=self.with_hosts_and_templates)
base.dict_add_key(_key="with_httptests", value=self.with_httptests)
base.dict_add_key(_key="with_items", value=self.with_items)
base.dict_add_key(_key="with_monitored_httptests", value=self.with_monitored_httptests)
base.dict_add_key(_key="with_monitored_items", value=self.with_monitored_items)
base.dict_add_key(_key="with_monitored_triggers", value=self.with_monitored_triggers)
base.dict_add_key(_key="with_simple_graph_items", value=self.with_simple_graph_items)
base.dict_add_key(_key="with_triggers", value=self.with_triggers)
base.dict_add_key(_key="selectDiscoveryRule", value=self.selectDiscoveryRule)
base.dict_add_key(_key="selectGroupDiscovery", value=self.selectGroupDiscovery)
base.dict_add_key(_key="selectHosts", value=self.selectHosts)
base.dict_add_key(_key="selectTemplates", value=self.selectTemplates)
base.dict_add_key(_key="limitSelects", value=self.limitSelects)
self.local_json = base._json
# self.log.debug(self.local_json)
# self.log.debug(self.base_json)
# 当前接口支持的参数与公共参数合并
add = ApiBase()
add.dict_add_dict(_dict_1=self.local_json, _dict_2=self.base_json)
self._json = add._json
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 批量添加主机组
# 作者: 陈磊
# 时间: 2019-10-21
from WorkApi.ApiBase import ApiBase
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
class HostGroupMassadd(object):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
self.log = UtilsLog()
self.log.info("调用批量添加主机组")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/hostgroup/massadd"
self._url = self._host + self._path
self._params = {}
self._json = {}
self.local_json = {}
self.response = ""
self.groups = None
self.hosts = None
self.templates = None
self.api = UtilsRequest()
def get_response(self):
base = ApiBase()
base.dict_add_key(_key="groups", value=self.groups)
base.dict_add_key(_key="hosts", value=self.hosts)
base.dict_add_key(_key="templates", value=self.templates)
self.local_json = base._json
self._json = self.local_json
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 批量删除主机组
# 作者: 陈磊
# 时间: 2019-10-21
from WorkApi.ApiBase import ApiBase
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
class HostGroupMassremove(object):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
self.log = UtilsLog()
self.log.info("调用批量删除主机组")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/hostgroup/massremove"
self._url = self._host + self._path
self._params = {}
self._json = {}
self.local_json = {}
self.response = ""
self.groups = None
self.hosts = None
self.templates = None
self.api = UtilsRequest()
def get_response(self):
base = ApiBase()
base.dict_add_key(_key="groups", value=self.groups)
base.dict_add_key(_key="hosts", value=self.hosts)
base.dict_add_key(_key="templates", value=self.templates)
self.local_json = base._json
self._json = self.local_json
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 批量更新主机组
# 作者: 陈磊
# 时间: 2019-10-21
from WorkApi.ApiBase import ApiBase
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
class HostGroupMassupdate(object):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
self.log = UtilsLog()
self.log.info("调用批量更新主机组")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/hostgroup/massupdate"
self._url = self._host + self._path
self._params = {}
self._json = {}
self.local_json = {}
self.response = ""
self.groups = None
self.hosts = None
self.templates = None
self.api = UtilsRequest()
def get_response(self):
base = ApiBase()
base.dict_add_key(_key="groups", value=self.groups)
base.dict_add_key(_key="hosts", value=self.hosts)
base.dict_add_key(_key="templates", value=self.templates)
self.local_json = base._json
self._json = self.local_json
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 更新主机组
# 作者: 陈磊
# 时间: 2019-10-23
from WorkApi.ApiBase import ApiBase
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
class HostGroupUpdate(object):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
self.log = UtilsLog()
self.log.info("调用更新主机组")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/hostgroup/update"
self._url = self._host + self._path
self._params = {}
self._json = {}
self.local_json = {}
self.response = ""
self.name = None
self.groupid = None
self.flags = None
self.internal = None
self.api = UtilsRequest()
def get_response(self):
base = ApiBase()
base.dict_add_key(_key="name", value=self.name)
base.dict_add_key(_key="groupid", value=self.groupid)
base.dict_add_key(_key="flags", value=self.flags)
base.dict_add_key(_key="internal", value=self.internal)
self.local_json = base._json
self._json = self.local_json
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 创建主机接口
# 作者: 陈磊
# 时间: 2019-11-11
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
from WorkApi.ApiBase import ApiBase
class HostInterfaceCreate(object):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
self.log = UtilsLog()
self.log.info("调用创建主机接口")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/hostinterface/create"
self._url = self._host + self._path
self._params = {}
self._json = {}
self.local_json = {}
self.response = ""
self.dns = None
self.hostid = None
self.ip = None
self.main = None
self.port = None
self.type = None
self.useip = None
self.bulk = None
self.api = UtilsRequest()
def get_response(self):
base = ApiBase()
base.dict_add_key(_key="dns", value=self.dns)
base.dict_add_key(_key="hostid", value=self.hostid)
base.dict_add_key(_key="ip", value=self.ip)
base.dict_add_key(_key="main", value=self.main)
base.dict_add_key(_key="port", value=self.port)
base.dict_add_key(_key="type", value=self.type)
base.dict_add_key(_key="useip", value=self.useip)
base.dict_add_key(_key="bulk", value=self.bulk)
self.local_json = base._json
self._json = self.local_json
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 删除主机接口
# 作者: 陈磊
# 时间: 2019-11-11
from WorkApi.ApiBase import ApiBase
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
class HostInterfaceDelete(object):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
self.log = UtilsLog()
self.log.info("调用删除主机接口")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/hostinterface/delete"
self._url = self._host + self._path
self._params = {}
self._json = {}
self.local_json = {}
self.response = ""
self.interfaceids = None
self.api = UtilsRequest()
def get_response(self):
base = ApiBase()
base.dict_add_key(_key="interfaceids", value=self.interfaceids)
self.local_json = base._json
self._json = self.local_json
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 查询主机接口
# 作者: 陈磊
# 时间: 2019-11-11
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
from WorkApi.ApiBase import ApiBase, GetBase
class HostInterfaceGet(GetBase):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
super().__init__()
self.log = UtilsLog()
self.log.info("调用查询主机接口")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/hostinterface/get"
self._url = self._host + self._path
self._params = {}
self._json = {}
self._data = {}
self.response = ""
self.local_json = {}
self.hostids = None
self.interfaceids = None
self.itemids = None
self.triggerids = None
self.selectItems = None
self.selectHosts = None
self.limitSelects = None
self.api = UtilsRequest()
def get_response(self):
self.get_base_json()
base = ApiBase()
base.dict_add_key(_key="hostids", value=self.hostids)
base.dict_add_key(_key="interfaceids", value=self.interfaceids)
base.dict_add_key(_key="itemids", value=self.itemids)
base.dict_add_key(_key="triggerids", value=self.triggerids)
base.dict_add_key(_key="selectItems", value=self.selectItems)
base.dict_add_key(_key="selectHosts", value=self.selectHosts)
base.dict_add_key(_key="limitSelects", value=self.limitSelects)
self.local_json = base._json
# self.log.debug(self.local_json)
# self.log.debug(self.base_json)
# 当前接口支持的参数与公共参数合并
add = ApiBase()
add.dict_add_dict(_dict_1=self.local_json, _dict_2=self.base_json)
self._json = add._json
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 批量添加主机接口
# 作者: 陈磊
# 时间: 2019-11-11
from WorkApi.ApiBase import ApiBase
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
class HostInterfaceMassadd(object):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
self.log = UtilsLog()
self.log.info("调用批量添加主机接口")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/hostinterface/massadd"
self._url = self._host + self._path
self._params = {}
self._json = {}
self.local_json = {}
self.response = ""
self.hosts = None
self.interfaces = None
self.interfaces_dns = None
self.interfaces_ip = None
self.interfaces_main = None
self.interfaces_port = None
self.interfaces_type = None
self.interfaces_useip = None
self.interfaces_bulk = None
self.api = UtilsRequest()
def get_response(self):
if self.interfaces_ip is not None:
self.interfaces = []
for x, y in enumerate(self.interfaces_ip):
base = ApiBase()
base.dict_add_key(_key="dns", value=self.interfaces_dns, num=x)
base.dict_add_key(_key="ip", value=self.interfaces_ip, num=x)
base.dict_add_key(_key="main", value=self.interfaces_main, num=x)
base.dict_add_key(_key="port", value=self.interfaces_port, num=x)
base.dict_add_key(_key="type", value=self.interfaces_type, num=x)
base.dict_add_key(_key="useip", value=self.interfaces_useip, num=x)
base.dict_add_key(_key="bulk", value=self.interfaces_bulk, num=x)
self.interfaces.append(base._json)
base = ApiBase()
base.dict_add_key(_key="hosts", value=self.hosts)
base.dict_add_key(_key="interfaces", value=self.interfaces)
self.local_json = base._json
self._json = self.local_json
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 批量删除主机接口
# 作者: 陈磊
# 时间: 2019-11-14
from WorkApi.ApiBase import ApiBase
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
class HostInterfaceMassremove(object):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
self.log = UtilsLog()
self.log.info("调用批量删除主机接口")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/hostinterface/massremove"
self._url = self._host + self._path
self._params = {}
self._json = {}
self.local_json = {}
self.response = ""
self.hostids = None
self.interfaces = None
self.interfaces_dns = None
self.interfaces_ip = None
self.interfaces_main = None
self.interfaces_port = None
self.interfaces_type = None
self.interfaces_useip = None
self.interfaces_bulk = None
self.api = UtilsRequest()
def get_response(self):
if self.interfaces_ip is not None:
self.interfaces = []
for x, y in enumerate(self.interfaces_ip):
base = ApiBase()
base.dict_add_key(_key="dns", value=self.interfaces_dns, num=x)
base.dict_add_key(_key="ip", value=self.interfaces_ip, num=x)
base.dict_add_key(_key="main", value=self.interfaces_main, num=x)
base.dict_add_key(_key="port", value=self.interfaces_port, num=x)
base.dict_add_key(_key="type", value=self.interfaces_type, num=x)
base.dict_add_key(_key="useip", value=self.interfaces_useip, num=x)
base.dict_add_key(_key="bulk", value=self.interfaces_bulk, num=x)
self.interfaces.append(base._json)
base = ApiBase()
base.dict_add_key(_key="hostids", value=self.hostids)
base.dict_add_key(_key="interfaces", value=self.interfaces)
self.local_json = base._json
self._json = self.local_json
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 批量更新主机接口
# 作者: 陈磊
# 时间: 2019-11-19
from WorkApi.ApiBase import ApiBase
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
class HostInterfaceReplace(object):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
self.log = UtilsLog()
self.log.info("调用批量更新主机接口")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/hostinterface/replace"
self._url = self._host + self._path
self._params = {}
self._json = {}
self.local_json = {}
self.response = ""
self.hostids = None
self.interfaces = None
self.interfaces_dns = None
self.interfaces_ip = None
self.interfaces_main = None
self.interfaces_port = None
self.interfaces_type = None
self.interfaces_useip = None
self.interfaces_bulk = None
self.api = UtilsRequest()
def get_response(self):
if self.interfaces_ip is not None:
self.interfaces = []
for x, y in enumerate(self.interfaces_ip):
base = ApiBase()
base.dict_add_key(_key="dns", value=self.interfaces_dns, num=x)
base.dict_add_key(_key="ip", value=self.interfaces_ip, num=x)
base.dict_add_key(_key="main", value=self.interfaces_main, num=x)
base.dict_add_key(_key="port", value=self.interfaces_port, num=x)
base.dict_add_key(_key="type", value=self.interfaces_type, num=x)
base.dict_add_key(_key="useip", value=self.interfaces_useip, num=x)
base.dict_add_key(_key="bulk", value=self.interfaces_bulk, num=x)
self.interfaces.append(base._json)
base = ApiBase()
base.dict_add_key(_key="hostids", value=self.hostids)
base.dict_add_key(_key="interfaces", value=self.interfaces)
self.local_json = base._json
self._json = self.local_json
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 更新主机接口
# 作者: 陈磊
# 时间: 2019-11-12
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
from WorkApi.ApiBase import ApiBase
class HostInterfaceUpdate(object):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
self.log = UtilsLog()
self.log.info("调用更新主机接口")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/hostinterface/update"
self._url = self._host + self._path
self._params = {}
self._json = {}
self.local_json = {}
self.response = ""
self.interfaceid = None
self.dns = None
self.hostid = None
self.ip = None
self.main = None
self.port = None
self.type = None
self.useip = None
self.bulk = None
self.api = UtilsRequest()
def get_response(self):
base = ApiBase()
base.dict_add_key(_key="interfaceid", value=self.interfaceid)
base.dict_add_key(_key="dns", value=self.dns)
base.dict_add_key(_key="hostid", value=self.hostid)
base.dict_add_key(_key="ip", value=self.ip)
base.dict_add_key(_key="main", value=self.main)
base.dict_add_key(_key="port", value=self.port)
base.dict_add_key(_key="type", value=self.type)
base.dict_add_key(_key="useip", value=self.useip)
base.dict_add_key(_key="bulk", value=self.bulk)
self.local_json = base._json
self._json = self.local_json
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 创建主机原型
# 作者: 陈磊
# 时间: 2019-11-11
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
from WorkApi.ApiBase import ApiBase
class HostPrototypeCreate(object):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
self.log = UtilsLog()
self.log.info("调用创建主机原型")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/hostprototype/create"
self._url = self._host + self._path
self._params = {}
self._json = {}
self.local_json = {}
self.response = ""
self.hostid = None
self.host = None
self.name = None
self.status = None
self.templateid = None
self.tls_connect = None
self.tls_accept = None
self.tls_issuer = None
self.tls_subject = None
self.tls_psk_identity = None
self.tls_psk = None
self.groupLinks = None
self.groupPrototypes = None
self.templateids = None
self.ruleid = None
self.groupLinks_groupid = None
self.groupPrototypes_name = None
self.api = UtilsRequest()
def get_response(self):
if self.groupLinks_groupid is not None:
self.groupLinks = []
for x, y in enumerate(self.groupLinks_groupid):
base = ApiBase()
base.dict_add_key(_key="groupid", value=self.groupLinks_groupid, num=x)
self.groupLinks.append(base._json)
else:
pass
if self.groupPrototypes_name is not None:
self.groupPrototypes = []
for x, y in enumerate(self.groupPrototypes_name):
base = ApiBase()
base.dict_add_key(_key="name", value=self.groupPrototypes_name, num=x)
self.groupPrototypes.append(base._json)
else:
pass
base = ApiBase()
base.dict_add_key(_key="hostid", value=self.hostid)
base.dict_add_key(_key="host", value=self.host)
base.dict_add_key(_key="name", value=self.name)
base.dict_add_key(_key="status", value=self.status)
base.dict_add_key(_key="templateids", value=self.templateids)
base.dict_add_key(_key="tls_connect", value=self.tls_connect)
base.dict_add_key(_key="tls_accept", value=self.tls_accept)
base.dict_add_key(_key="tls_issuer", value=self.tls_issuer)
base.dict_add_key(_key="tls_subject", value=self.tls_subject)
base.dict_add_key(_key="tls_psk_identity", value=self.tls_psk_identity)
base.dict_add_key(_key="ruleid", value=self.ruleid)
base.dict_add_key(_key="groupLinks", value=self.groupLinks)
base.dict_add_key(_key="groupPrototypes", value=self.groupPrototypes)
self.local_json = base._json
self._json = self.local_json
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 删除主机原型
# 作者: 陈磊
# 时间: 2019-11-13
from WorkApi.ApiBase import ApiBase
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
class HostPrototypeDelete(object):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
self.log = UtilsLog()
self.log.info("调用删除主机原型")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/hostprototype/delete"
self._url = self._host + self._path
self._params = {}
self._json = {}
self.local_json = {}
self.response = ""
self.hostids = None
self.api = UtilsRequest()
def get_response(self):
base = ApiBase()
base.dict_add_key(_key="hostids", value=self.hostids)
self.local_json = base._json
self._json = self.local_json
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 查询主机原型
# 作者: 陈磊
# 时间: 2019-11-13
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
from WorkApi.ApiBase import ApiBase, GetBase
class HostPrototypeGet(GetBase):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
super().__init__()
self.log = UtilsLog()
self.log.info("调用查询主机原型")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/hostprototype/get"
self._url = self._host + self._path
self._params = {}
self._json = {}
self._data = {}
self.response = ""
self.local_json = {}
self.hostids = None
self.discoveryids = None
self.inherited = None
self.selectDiscoveryRule = None
self.selectGroupLinks = None
self.selectGroupPrototypes = None
self.selectInventory = None
self.selectParentHost = None
self.selectTemplates = None
self.api = UtilsRequest()
def get_response(self):
self.get_base_json()
base = ApiBase()
base.dict_add_key(_key="hostids", value=self.hostids)
base.dict_add_key(_key="discoveryids", value=self.discoveryids)
base.dict_add_key(_key="inherited", value=self.inherited)
base.dict_add_key(_key="selectDiscoveryRule", value=self.selectDiscoveryRule)
base.dict_add_key(_key="selectGroupLinks", value=self.selectGroupLinks)
base.dict_add_key(_key="selectGroupPrototypes", value=self.selectGroupPrototypes)
base.dict_add_key(_key="selectInventory", value=self.selectInventory)
base.dict_add_key(_key="selectParentHost", value=self.selectParentHost)
base.dict_add_key(_key="selectTemplates", value=self.selectTemplates)
self.local_json = base._json
# self.log.debug(self.local_json)
# self.log.debug(self.base_json)
# 当前接口支持的参数与公共参数合并
add = ApiBase()
add.dict_add_dict(_dict_1=self.local_json, _dict_2=self.base_json)
self._json = add._json
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 更新主机原型
# 作者: 陈磊
# 时间: 2019-11-11
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
from WorkApi.ApiBase import ApiBase
class HostPrototypeUpdate(object):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
self.log = UtilsLog()
self.log.info("调用更新主机原型")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/hostprototype/update"
self._url = self._host + self._path
self._params = {}
self._json = {}
self.local_json = {}
self.response = ""
self.hostid = None
self.host = None
self.name = None
self.status = None
self.templateid = None
self.tls_connect = None
self.tls_accept = None
self.tls_issuer = None
self.tls_subject = None
self.tls_psk_identity = None
self.tls_psk = None
self.groupLinks = None
self.groupPrototypes = None
self.templateids = None
# self.ruleid = None
self.groupLinks_groupid = None
self.groupPrototypes_name = None
self.api = UtilsRequest()
def get_response(self):
if self.groupLinks_groupid is not None:
self.groupLinks = []
for x, y in enumerate(self.groupLinks_groupid):
base = ApiBase()
base.dict_add_key(_key="groupid", value=self.groupLinks_groupid, num=x)
self.groupLinks.append(base._json)
else:
pass
if self.groupPrototypes_name is not None:
self.groupPrototypes = []
for x, y in enumerate(self.groupPrototypes_name):
base = ApiBase()
base.dict_add_key(_key="name", value=self.groupPrototypes_name, num=x)
self.groupPrototypes.append(base._json)
else:
pass
base = ApiBase()
base.dict_add_key(_key="hostid", value=self.hostid)
base.dict_add_key(_key="host", value=self.host)
base.dict_add_key(_key="name", value=self.name)
base.dict_add_key(_key="status", value=self.status)
base.dict_add_key(_key="templateids", value=self.templateids)
base.dict_add_key(_key="tls_connect", value=self.tls_connect)
base.dict_add_key(_key="tls_accept", value=self.tls_accept)
base.dict_add_key(_key="tls_issuer", value=self.tls_issuer)
base.dict_add_key(_key="tls_subject", value=self.tls_subject)
base.dict_add_key(_key="tls_psk_identity", value=self.tls_psk_identity)
# base.dict_add_key(_key="ruleid", value=self.ruleid)
base.dict_add_key(_key="groupLinks", value=self.groupLinks)
base.dict_add_key(_key="groupPrototypes", value=self.groupPrototypes)
self.local_json = base._json
self._json = self.local_json
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 创建图片
# 作者: 陈磊
# 时间: 2019-10-23
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
class Create(object):
def __init__(self, host, name=None, imagetype=None, image=None):
"""
:param host: 域名
:param name: 图片名称
:param imagetype: 图片类型
:param image: 图片Base64字符串
:return:
"""
self.log = UtilsLog()
self.log.info("调用创建图片接口")
self.log.info(self.__class__)
self.host = host
self.headers = {}
self.path = "/image/create"
self.url = self.host + self.path
self.params = {}
self.json = {}
self.response = ""
self.name = name
self.imagetype = imagetype
self.image = image
self.api = UtilsRequest()
def get_response(self):
self.json = {
"name": self.name,
"imagetype": self.imagetype,
"image": self.image,
}
self.response = self.api.post(url=self.url, json=self.json)
# -*- coding: utf-8 -*-
# 更新图片
# 作者: 陈磊
# 时间: 2019-10-23
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
class Update(object):
def __init__(self, host, imageid=None, name=None, imagetype=None, image=None):
"""
:param host: 域名
:param imageid: 图片ID
:param name: 图片名称
:param imagetype: 图片类型
:param image: 图片Base64字符串
:return:
"""
self.log = UtilsLog()
self.log.info("调用更新图片接口")
self.log.info(self.__class__)
self.host = host
self.headers = {}
self.path = "/image/update"
self.url = self.host + self.path
self.params = {}
self.json = {}
self.response = ""
self.imageid = imageid
self.name = name
self.imagetype = imagetype
self.image = image
self.api = UtilsRequest()
def get_response(self):
self.json = {
"imageid": self.imageid,
"name": self.name,
"imagetype": self.imagetype,
"image": self.image,
}
self.response = self.api.post(url=self.url, json=self.json)
# -*- coding: utf-8 -*-
# 查询维护
# 作者: 陈磊
# 时间: 2019-11-01
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
from WorkApi.ApiBase import ApiBase, GetBase
class MaintenanceGet(GetBase):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
super().__init__()
self.log = UtilsLog()
self.log.info("调用查询维护")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/maintenance/get"
self._url = self._host + self._path
self._params = {}
self._json = {}
self._data = {}
self.response = ""
self.local_json = {}
self.groupids = None
self.hostids = None
self.maintenanceids = None
self.selectGroups = None
self.selectHosts = None
self.selectTimeperiods = None
self.selectTags = None
self.api = UtilsRequest()
def get_response(self):
self.get_base_json()
base = ApiBase()
base.dict_add_key(_key="groupids", value=self.groupids)
base.dict_add_key(_key="hostids", value=self.hostids)
base.dict_add_key(_key="maintenanceids", value=self.maintenanceids)
base.dict_add_key(_key="selectGroups", value=self.selectGroups)
base.dict_add_key(_key="selectHosts", value=self.selectHosts)
base.dict_add_key(_key="selectTimeperiods", value=self.selectTimeperiods)
base.dict_add_key(_key="selectTags", value=self.selectTags)
self.local_json = base._json
# self.log.debug(self.local_json)
# self.log.debug(self.base_json)
# 当前接口支持的参数与公共参数合并
add = ApiBase()
add.dict_add_dict(_dict_1=self.local_json, _dict_2=self.base_json)
self._json = add._json
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 创建媒体类型
# 作者: 陈磊
# 时间: 2019-11-08
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
from WorkApi.ApiBase import ApiBase, GetBase
class MediaTypeCreate(GetBase):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
super().__init__()
self.log = UtilsLog()
self.log.info("调用创建媒体类型")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/mediatype/create"
self._url = self._host + self._path
self._params = {}
self._json = {}
self._data = {}
self.response = ""
self.local_json = {}
self.description = None
self.type = None
self.exec_path = None
self.gsm_modem = None
self.passwd = None
self.smtp_email = None
self.smtp_helo = None
self.smtp_server = None
self.smtp_port = None
self.smtp_security = None
self.smtp_verify_host = None
self.smtp_verify_peer = None
self.smtp_authentication = None
self.status = None
self.username = None
self.exec_params = None
self.maxsessions = None
self.maxattempts = None
self.attempt_interval = None
self.content_type = None
self.api = UtilsRequest()
def get_response(self):
self.get_base_json()
base = ApiBase()
base.dict_add_key(_key="description", value=self.description)
base.dict_add_key(_key="type", value=self.type)
base.dict_add_key(_key="exec_path", value=self.exec_path)
base.dict_add_key(_key="gsm_modem", value=self.gsm_modem)
base.dict_add_key(_key="passwd", value=self.passwd)
base.dict_add_key(_key="smtp_email", value=self.smtp_email)
base.dict_add_key(_key="smtp_helo", value=self.smtp_helo)
base.dict_add_key(_key="smtp_server", value=self.smtp_server)
base.dict_add_key(_key="smtp_port", value=self.smtp_port)
base.dict_add_key(_key="smtp_security", value=self.smtp_security)
base.dict_add_key(_key="smtp_verify_host", value=self.smtp_verify_host)
base.dict_add_key(_key="smtp_verify_peer", value=self.smtp_verify_peer)
base.dict_add_key(_key="smtp_authentication", value=self.smtp_authentication)
base.dict_add_key(_key="status", value=self.status)
base.dict_add_key(_key="username", value=self.username)
base.dict_add_key(_key="exec_params", value=self.exec_params)
base.dict_add_key(_key="maxsessions", value=self.maxsessions)
base.dict_add_key(_key="maxattempts", value=self.maxattempts)
base.dict_add_key(_key="attempt_interval", value=self.attempt_interval)
base.dict_add_key(_key="content_type", value=self.content_type)
self.local_json = base._json
# self.log.debug(self.local_json)
# self.log.debug(self.base_json)
# 当前接口支持的参数与公共参数合并
add = ApiBase()
add.dict_add_dict(_dict_1=self.local_json, _dict_2=self.base_json)
self._json = add._json
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 查询媒体类型
# 作者: 陈磊
# 时间: 2019-11-01
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
from WorkApi.ApiBase import ApiBase, GetBase
class MediaTypeGet(GetBase):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
super().__init__()
self.log = UtilsLog()
self.log.info("调用查询媒体类型")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/mediatype/get"
self._url = self._host + self._path
self._params = {}
self._json = {}
self._data = {}
self.response = ""
self.local_json = {}
self.mediatypeids = None
self.mediaids = None
self.userids = None
self.selectUsers = None
self.api = UtilsRequest()
def get_response(self):
self.get_base_json()
base = ApiBase()
base.dict_add_key(_key="mediatypeids", value=self.mediatypeids)
base.dict_add_key(_key="mediaids", value=self.mediaids)
base.dict_add_key(_key="userids", value=self.userids)
base.dict_add_key(_key="selectUsers", value=self.selectUsers)
self.local_json = base._json
# self.log.debug(self.local_json)
# self.log.debug(self.base_json)
# 当前接口支持的参数与公共参数合并
add = ApiBase()
add.dict_add_dict(_dict_1=self.local_json, _dict_2=self.base_json)
self._json = add._json
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 查询告警
# 作者: 陈磊
# 时间: 2019-11-14
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
from WorkApi.ApiBase import ApiBase, GetBase
class ProblemGet(GetBase):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
super().__init__()
self.log = UtilsLog()
self.log.info("调用查询告警")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/problem/get"
self._url = self._host + self._path
self._params = {}
self._json = {}
self._data = {}
self.response = ""
self.local_json = {}
self.eventids = None
self.groupids = None
self.hostids = None
self.objectids = None
self.applicationids = None
self.source = None
self.object = None
self.acknowledged = None
self.suppressed = None
self.severities = None
self.evaltype = None
self.tags = None
self.recent = None
self.eventid_from = None
self.eventid_till = None
self.time_from = None
self.time_till = None
self.selectAcknowledges = None
self.selectTags = None
self.selectSuppressionData = None
self.api = UtilsRequest()
def get_response(self):
self.get_base_json()
base = ApiBase()
base.dict_add_key(_key="eventids", value=self.eventids)
base.dict_add_key(_key="groupids", value=self.groupids)
base.dict_add_key(_key="hostids", value=self.hostids)
base.dict_add_key(_key="objectids", value=self.objectids)
base.dict_add_key(_key="applicationids", value=self.applicationids)
base.dict_add_key(_key="source", value=self.source)
base.dict_add_key(_key="object", value=self.object)
base.dict_add_key(_key="acknowledged", value=self.acknowledged)
base.dict_add_key(_key="suppressed", value=self.suppressed)
base.dict_add_key(_key="severities", value=self.severities)
base.dict_add_key(_key="evaltype", value=self.evaltype)
base.dict_add_key(_key="tags", value=self.tags)
base.dict_add_key(_key="recent", value=self.recent)
base.dict_add_key(_key="eventid_from", value=self.eventid_from)
base.dict_add_key(_key="eventid_till", value=self.eventid_till)
base.dict_add_key(_key="time_from", value=self.time_from)
base.dict_add_key(_key="time_till", value=self.time_till)
base.dict_add_key(_key="selectAcknowledges", value=self.selectAcknowledges)
base.dict_add_key(_key="selectTags", value=self.selectTags)
base.dict_add_key(_key="selectSuppressionData", value=self.selectSuppressionData)
self.local_json = base._json
# self.log.debug(self.local_json)
# self.log.debug(self.base_json)
# 当前接口支持的参数与公共参数合并
add = ApiBase()
add.dict_add_dict(_dict_1=self.local_json, _dict_2=self.base_json)
self._json = add._json
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 查询代理
# 作者: 陈磊
# 时间: 2019-11-04
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
from WorkApi.ApiBase import ApiBase, GetBase
class ProxyGet(GetBase):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
super().__init__()
self.log = UtilsLog()
self.log.info("调用查询代理")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/proxy/get"
self._url = self._host + self._path
self._params = {}
self._json = {}
self._data = {}
self.response = ""
self.local_json = {}
self.proxyids = None
self.selectHosts = None
self.selectInterface = None
self.api = UtilsRequest()
def get_response(self):
self.get_base_json()
base = ApiBase()
base.dict_add_key(_key="proxyids", value=self.proxyids)
base.dict_add_key(_key="selectHosts", value=self.selectHosts)
base.dict_add_key(_key="selectInterface", value=self.selectInterface)
self.local_json = base._json
# self.log.debug(self.local_json)
# self.log.debug(self.base_json)
# 当前接口支持的参数与公共参数合并
add = ApiBase()
add.dict_add_dict(_dict_1=self.local_json, _dict_2=self.base_json)
self._json = add._json
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 创建脚本
# 作者: 陈磊
# 时间: 2019-10-22
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
class ScriptCreate(object):
def __init__(self, _host, scriptid=None, command=None, name=None, confirmation=None, description=None, execute_on=None, groupid=None,
host_access=None, _type=None, usrgrpid=None):
"""
:param _host: 域名
:param scriptid: 脚本ID
:param command : 命令
:param name : 脚本名称
:param confirmation: 确认
:param description: 描述
:param execute_on: 在哪里运行脚本
:param groupid: 可以在其上运行脚本的主机组的ID
:param host_access: 运行脚本所需的主机权限
:param _type: 脚本类型
:param usrgrpid: 允许运行脚本的用户组的ID
:return:
"""
self.log = UtilsLog()
self.log.info("调用创建脚本")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/script/create"
self._url = self._host + self._path
self._params = {}
self._json = {}
self.response = ""
self.scriptid = scriptid
self.command = command
self.name = name
self.confirmation = confirmation
self.description = description
self.execute_on = execute_on
self.groupid = groupid
self.host_access = host_access
self._type = _type
self.usrgrpid = usrgrpid
self.api = UtilsRequest()
def get_response(self):
self._json = {
"scriptid": self.scriptid,
"command": self.command,
"name": self.name,
"confirmation": self.confirmation,
"description": self.description,
"execute_on": self.execute_on,
"groupid": self.groupid,
"host_access": self.host_access,
"_type": self._type,
"usrgrpid": self.usrgrpid
}
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 删除脚本
# 作者: 陈磊
# 时间: 2019-10-23
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
class ScriptDelete(object):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
self.log = UtilsLog()
self.log.info("调用删除脚本")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/script/delete"
self._url = self._host + self._path
self._params = {}
self._json = {}
self.response = ""
self.scriptIds = []
self.api = UtilsRequest()
def scriptIds_add(self, scriptId):
"""
:param scriptId:
:return:
"""
self.scriptIds.append(scriptId)
def get_response(self):
self._json = self.scriptIds
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 执行脚本
# 作者: 陈磊
# 时间: 2019-10-22
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
class ScriptExecute(object):
def __init__(self, _host, hostid=None, scriptid=None):
"""
:param _host: 域名
:param hostid : 主机ID
:param scriptid: 脚本ID
:return:
"""
self.log = UtilsLog()
self.log.info("调用执行脚本")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/script/execute"
self._url = self._host + self._path
self._params = {}
self._json = {}
self.response = ""
self.hostid = hostid
self.scriptid = scriptid
self.api = UtilsRequest()
def get_response(self):
self._json = {
"hostid": self.hostid,
"scriptid": self.scriptid,
}
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 查询脚本
# 作者: 陈磊
# 时间: 2019-11-01
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
from WorkApi.ApiBase import ApiBase, GetBase
class ScriptGet(GetBase):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
super().__init__()
self.log = UtilsLog()
self.log.info("调用查询脚本")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/scripts/get"
self._url = self._host + self._path
self._params = {}
self._json = {}
self._data = {}
self.response = ""
self.local_json = {}
self.groupids = None
self.hostids = None
self.scriptids = None
self.usrgrpids = None
self.selectGroups = None
self.selectHosts = None
self.api = UtilsRequest()
def get_response(self):
self.get_base_json()
base = ApiBase()
base.dict_add_key(_key="groupids", value=self.groupids)
base.dict_add_key(_key="hostids", value=self.hostids)
base.dict_add_key(_key="scriptids", value=self.scriptids)
base.dict_add_key(_key="usrgrpids", value=self.usrgrpids)
base.dict_add_key(_key="selectGroups", value=self.selectGroups)
base.dict_add_key(_key="selectHosts", value=self.selectHosts)
self.local_json = base._json
# self.log.debug(self.local_json)
# self.log.debug(self.base_json)
# 当前接口支持的参数与公共参数合并
add = ApiBase()
add.dict_add_dict(_dict_1=self.local_json, _dict_2=self.base_json)
self._json = add._json
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 查询主机中的脚本
# 作者: 陈磊
# 时间: 2019-10-23
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
class GetScriptsByHosts(object):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
self.log = UtilsLog()
self.log.info("调用查询主机中的脚本")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/script/getscriptsbyhosts"
self._url = self._host + self._path
self._params = {}
self._json = {}
self.response = ""
self.hostIds = []
self.api = UtilsRequest()
def hostIds_add(self, hostId):
"""
:param hostId:
:return:
"""
self.hostIds.append(hostId)
def get_response(self):
self._json = self.hostIds
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 更新脚本
# 作者: 陈磊
# 时间: 2019-10-22
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
class ScriptUpdate(object):
def __init__(self, _host, scriptid=None, command=None, name=None, confirmation=None, description=None, execute_on=None, groupid=None,
host_access=None, _type=None, usrgrpid=None):
"""
:param _host: 域名
:param scriptid: 脚本ID
:param command : 命令
:param name : 脚本名称
:param confirmation: 确认
:param description: 描述
:param execute_on: 在哪里运行脚本
:param groupid: 可以在其上运行脚本的主机组的ID
:param host_access: 运行脚本所需的主机权限
:param _type: 脚本类型
:param usrgrpid: 允许运行脚本的用户组的ID
:return:
"""
self.log = UtilsLog()
self.log.info("调用更新脚本")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/script/update"
self._url = self._host + self._path
self._params = {}
self._json = {}
self.response = ""
self.scriptid = scriptid
self.command = command
self.name = name
self.confirmation = confirmation
self.description = description
self.execute_on = execute_on
self.groupid = groupid
self.host_access = host_access
self._type = _type
self.usrgrpid = usrgrpid
self.api = UtilsRequest()
def get_response(self):
self._json = {
"scriptid": self.scriptid,
"command": self.command,
"name": self.name,
"confirmation": self.confirmation,
"description": self.description,
"execute_on": self.execute_on,
"groupid": self.groupid,
"host_access": self.host_access,
"_type": self._type,
"usrgrpid": self.usrgrpid
}
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 创建任务
# 作者: 陈磊
# 时间: 2019-10-22
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
class TaskCreate(object):
def __init__(self, _host, _type=None):
"""
:param _host: 域名
:param _type: 任务类型
:return:
"""
self.log = UtilsLog()
self.log.info("调用创建任务")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/task/create"
self._url = self._host + self._path
self._params = {}
self._json = {}
self.response = ""
self.itemids = []
self._type = _type
self.api = UtilsRequest()
def itemids_add(self, itemid):
"""
:param itemid:
:return:
"""
self.itemids.append(itemid)
def get_response(self):
self._json = {
"_type": self._type,
"itemids": self.itemids,
}
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 创建模板组
# 作者: 陈磊
# 时间: 2019-10-23
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
class TemplateCreate(object):
def __init__(self, _host, host=None):
"""
:param _host: 域名
:param host: 主机名称
:return:
"""
self.log = UtilsLog()
self.log.info("调用创模板")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/template/create"
self._url = self._host + self._path
self._params = {}
self._json = {}
self.response = ""
self.host = host
self.groups = []
self.tags = []
self.templates = []
self.macros = []
self.hosts = []
self.api = UtilsRequest()
def groups_add(self, groupid):
"""
:param groupid:
:return:
"""
self.groups.append({
"groupid": groupid,
})
def tags_add(self, tag):
"""
:param tag:
:return:
"""
self.tags.append({
"tags": "Host name",
"value": tag
})
def templates_add(self, template):
"""
:param template:
:return:
"""
self.templates.append(template)
def macros_add(self, macro):
"""
:param macro:
:return:
"""
self.macros.append(macro)
def hosts_add(self, hostid):
"""
:param hostid:
:return:
"""
self.hosts.append({
"hostid": hostid
})
def get_response(self):
self._json = {
"host": self.host,
"tags": self.tags,
"groups": self.groups,
"templates": self.templates,
"macros": self.macros,
"hosts": self.hosts,
}
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 查询趋势
# 作者: 陈磊
# 时间: 2019-10-24
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
class TrendGet(object):
def __init__(self, _host, time_from=None, time_till=None, limit=None):
"""
:param _host: 域名
:param time_from: 应用ID
:param time_till : 主机组的ID
:param limit: 应用名称
:return:
"""
self.log = UtilsLog()
self.log.info("调用查询趋势接口")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/trend/get"
self._url = self._host + self._path
self._params = {}
self._json = {}
self.response = ""
self.time_from = time_from
self.time_till = time_till
self.limit = limit
self.output = []
self.itemids = []
self.countOutput = []
self.api = UtilsRequest()
def output_add(self, output):
"""
:param output:
:return:
"""
self.output.append(output)
def itemids_add(self, itemid):
"""
:param itemid:
:return:
"""
self.itemids.append(itemid)
def get_response(self):
self._json = {
"time_from": self.time_from,
"time_till": self.time_till,
"limit": self.limit,
"output": self.output,
"itemids": self.itemids,
}
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 创建触发器
# 作者: 陈磊
# 时间: 2019-11-14
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
from WorkApi.ApiBase import ApiBase
class TriggerCreate(object):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
self.log = UtilsLog()
self.log.info("调用创建触发器")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/trigger/create"
self._url = self._host + self._path
self._params = {}
self._json = {}
self.local_json = {}
self.response = ""
self.triggerid = None
self.description = None
self.expression = None
self.comments = None
self.error = None
self.flags = None
self.lastchange = None
self.priority = None
self.state = None
self.status = None
self.templateid = None
self.type = None
self.url = None
self.recovery_mode = None
self.recovery_expression = None
self.correlation_mode = None
self.correlation_tag = None
self.manual_close = None
self.tags = None
self.tags_tag = None
self.tags_value = None
self.api = UtilsRequest()
def get_response(self):
if self.tags_tag is not None:
self.tags = []
for x, y in enumerate(self.tags_tag):
base = ApiBase()
base.dict_add_key(_key="tag", value=self.tags_tag[x])
base.dict_add_key(_key="value", value=self.tags_value[x])
self.tags.append(base._json)
else:
pass
base = ApiBase()
base.dict_add_key(_key="triggerid", value=self.triggerid)
base.dict_add_key(_key="description", value=self.description)
base.dict_add_key(_key="expression", value=self.expression)
base.dict_add_key(_key="comments", value=self.comments)
base.dict_add_key(_key="priority", value=self.priority)
base.dict_add_key(_key="status", value=self.status)
base.dict_add_key(_key="templateid", value=self.templateid)
base.dict_add_key(_key="type", value=self.type)
base.dict_add_key(_key="url", value=self.url)
base.dict_add_key(_key="recovery_mode", value=self.recovery_mode)
base.dict_add_key(_key="recovery_expression", value=self.recovery_expression)
base.dict_add_key(_key="correlation_mode", value=self.correlation_mode)
base.dict_add_key(_key="correlation_tag", value=self.correlation_tag)
base.dict_add_key(_key="manual_close", value=self.manual_close)
base.dict_add_key(_key="tags", value=self.tags)
self.local_json = base._json
self._json = self.local_json
self.response = self.api.post(url=self._url, json=[self._json])
# -*- coding: utf-8 -*-
# 创建触发器原型
# 作者: 陈磊
# 时间: 2019-11-13
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
from WorkApi.ApiBase import ApiBase
class TriggerPrototypeCreate(object):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
self.log = UtilsLog()
self.log.info("调用创建触发器原型")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/triggerprototype/create"
self._url = self._host + self._path
self._params = {}
self._json = {}
self.local_json = {}
self.response = ""
self.triggerid = None
self.description = None
self.expression = None
self.comments = None
self.priority = None
self.status = None
self.templateid = None
self.type = None
self.url = None
self.recovery_mode = None
self.recovery_expression = None
self.correlation_mode = None
self.correlation_tag = None
self.manual_close = None
self.tags = None
self.tags_tag = None
self.tags_value = None
self.api = UtilsRequest()
def get_response(self):
if self.tags_tag is not None:
self.tags = []
for x, y in enumerate(self.tags_tag):
base = ApiBase()
base.dict_add_key(_key="tag", value=self.tags_tag, num=x)
base.dict_add_key(_key="value", value=self.tags_value, num=x)
self.tags.append(base._json)
else:
pass
base = ApiBase()
base.dict_add_key(_key="triggerid", value=self.triggerid)
base.dict_add_key(_key="description", value=self.description)
base.dict_add_key(_key="expression", value=self.expression)
base.dict_add_key(_key="comments", value=self.comments)
base.dict_add_key(_key="priority", value=self.priority)
base.dict_add_key(_key="status", value=self.status)
base.dict_add_key(_key="templateid", value=self.templateid)
base.dict_add_key(_key="type", value=self.type)
base.dict_add_key(_key="url", value=self.url)
base.dict_add_key(_key="recovery_mode", value=self.recovery_mode)
base.dict_add_key(_key="recovery_expression", value=self.recovery_expression)
base.dict_add_key(_key="correlation_mode", value=self.correlation_mode)
base.dict_add_key(_key="correlation_tag", value=self.correlation_tag)
base.dict_add_key(_key="manual_close", value=self.manual_close)
base.dict_add_key(_key="tags", value=self.tags)
self.local_json = base._json
self._json = self.local_json
self.response = self.api.post(url=self._url, json=[self._json])
# -*- coding: utf-8 -*-
# 删除触发器原型
# 作者: 陈磊
# 时间: 2019-11-13
from WorkApi.ApiBase import ApiBase
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
class TriggerPrototypeDelete(object):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
self.log = UtilsLog()
self.log.info("调用删除触发器原型")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/triggerprototype/delete"
self._url = self._host + self._path
self._params = {}
self._json = {}
self.local_json = {}
self.response = ""
self.triggerids = None
self.api = UtilsRequest()
def get_response(self):
base = ApiBase()
base.dict_add_key(_key="triggerids", value=self.triggerids)
self.local_json = base._json
self._json = self.local_json
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 查询触发器原型
# 作者: 陈磊
# 时间: 2019-11-14
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
from WorkApi.ApiBase import ApiBase, GetBase
class TriggerPrototypeGet(GetBase):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
super().__init__()
self.log = UtilsLog()
self.log.info("调用查询触发器原型")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/triggerprototype/get"
self._url = self._host + self._path
self._params = {}
self._json = {}
self._data = {}
self.response = ""
self.local_json = {}
self.active = None
self.applicationids = None
self.discoveryids = None
self.functions = None
self.group = None
self.groupids = None
self.host = None
self.hostids = None
self.inherited = None
self.maintenance = None
self.min_severity = None
self.monitored = None
self.templated = None
self.templateids = None
self.triggerids = None
self.expandExpression = None
self.selectDiscoveryRule = None
self.selectFunctions = None
self.selectGroups = None
self.selectHosts = None
self.selectItems = None
self.selectDependencies = None
self.selectTags = None
self.api = UtilsRequest()
def get_response(self):
self.get_base_json()
base = ApiBase()
base.dict_add_key(_key="groupids", value=self.active)
base.dict_add_key(_key="hostids", value=self.applicationids)
base.dict_add_key(_key="maintenanceids", value=self.discoveryids)
base.dict_add_key(_key="maintenanceids", value=self.functions)
base.dict_add_key(_key="groupids", value=self.group)
base.dict_add_key(_key="hostids", value=self.groupids)
base.dict_add_key(_key="maintenanceids", value=self.host)
base.dict_add_key(_key="maintenanceids", value=self.hostids)
base.dict_add_key(_key="groupids", value=self.inherited)
base.dict_add_key(_key="hostids", value=self.maintenance)
base.dict_add_key(_key="maintenanceids", value=self.min_severity)
base.dict_add_key(_key="maintenanceids", value=self.monitored)
base.dict_add_key(_key="groupids", value=self.templated)
base.dict_add_key(_key="hostids", value=self.templateids)
base.dict_add_key(_key="triggerids", value=self.triggerids)
base.dict_add_key(_key="expandExpression", value=self.expandExpression)
base.dict_add_key(_key="selectDiscoveryRule", value=self.selectDiscoveryRule)
base.dict_add_key(_key="selectFunctions", value=self.selectFunctions)
base.dict_add_key(_key="selectGroups", value=self.selectGroups)
base.dict_add_key(_key="selectHosts", value=self.selectHosts)
base.dict_add_key(_key="selectItems", value=self.selectItems)
base.dict_add_key(_key="selectDependencies", value=self.selectDependencies)
base.dict_add_key(_key="selectTags", value=self.selectTags)
self.local_json = base._json
# self.log.debug(self.local_json)
# self.log.debug(self.base_json)
# 当前接口支持的参数与公共参数合并
add = ApiBase()
add.dict_add_dict(_dict_1=self.local_json, _dict_2=self.base_json)
self._json = add._json
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 更新触发器原型
# 作者: 陈磊
# 时间: 2019-11-14
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
from WorkApi.ApiBase import ApiBase
class TriggerPrototypeUpdate(object):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
self.log = UtilsLog()
self.log.info("调用更新触发器原型")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/triggerprototype/update"
self._url = self._host + self._path
self._params = {}
self._json = {}
self.local_json = {}
self.response = ""
self.triggerid = None
self.description = None
self.expression = None
self.comments = None
self.priority = None
self.status = None
self.templateid = None
self.type = None
self.url = None
self.recovery_mode = None
self.recovery_expression = None
self.correlation_mode = None
self.correlation_tag = None
self.manual_close = None
self.tags = None
self.tags_tag = None
self.tags_value = None
self.api = UtilsRequest()
def get_response(self):
if self.tags_tag is not None:
self.tags = []
for x, y in enumerate(self.tags_tag):
base = ApiBase()
base.dict_add_key(_key="tag", value=self.tags_tag[x])
base.dict_add_key(_key="value", value=self.tags_value[x])
self.tags.append(base._json)
else:
pass
base = ApiBase()
base.dict_add_key(_key="triggerid", value=self.triggerid)
base.dict_add_key(_key="description", value=self.description)
base.dict_add_key(_key="expression", value=self.expression)
base.dict_add_key(_key="comments", value=self.comments)
base.dict_add_key(_key="priority", value=self.priority)
base.dict_add_key(_key="status", value=self.status)
base.dict_add_key(_key="templateid", value=self.templateid)
base.dict_add_key(_key="type", value=self.type)
base.dict_add_key(_key="url", value=self.url)
base.dict_add_key(_key="recovery_mode", value=self.recovery_mode)
base.dict_add_key(_key="recovery_expression", value=self.recovery_expression)
base.dict_add_key(_key="correlation_mode", value=self.correlation_mode)
base.dict_add_key(_key="correlation_tag", value=self.correlation_tag)
base.dict_add_key(_key="manual_close", value=self.manual_close)
base.dict_add_key(_key="tags", value=self.tags)
self.local_json = base._json
self._json = self.local_json
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 查询用户
# 作者: 陈磊
# 时间: 2019-11-04
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
from WorkApi.ApiBase import ApiBase, GetBase
class UserGet(GetBase):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
super().__init__()
self.log = UtilsLog()
self.log.info("调用查询用户")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/users/get"
self._url = self._host + self._path
self._params = {}
self._json = {}
self._data = {}
self.response = ""
self.local_json = {}
self.mediaids = None
self.mediatypeids = None
self.userids = None
self.usrgrpids = None
self.getAccess = None
self.selectMedias = None
self.selectMediatypes = None
self.selectUsrgrps = None
self.api = UtilsRequest()
def get_response(self):
self.get_base_json()
base = ApiBase()
base.dict_add_key(_key="mediaids", value=self.mediaids)
base.dict_add_key(_key="mediatypeids", value=self.mediatypeids)
base.dict_add_key(_key="userids", value=self.userids)
base.dict_add_key(_key="usrgrpids", value=self.usrgrpids)
base.dict_add_key(_key="getAccess", value=self.getAccess)
base.dict_add_key(_key="selectMedias", value=self.selectMedias)
base.dict_add_key(_key="selectMediatypes", value=self.selectMediatypes)
base.dict_add_key(_key="selectUsrgrps", value=self.selectUsrgrps)
self.local_json = base._json
# self.log.debug(self.local_json)
# self.log.debug(self.base_json)
# 当前接口支持的参数与公共参数合并
add = ApiBase()
add.dict_add_dict(_dict_1=self.local_json, _dict_2=self.base_json)
self._json = add._json
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 创建用户组
# 作者: 陈磊
# 时间: 2019-10-22
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
class UserGroupCreate(object):
def __init__(self, _host, name=None, rights_permission=None, rights_id=None, tag_filters=None, userids=None):
"""
:param _host: 域名
:param name: 用户组名称
:param rights_permission: 权限对象的等级
:param rights_id: 权限对象的ID
:param tag_filters: 标签
:param userids: 需要添加的用户ID
:return:
"""
self.log = UtilsLog()
self.log.info("调用创建用户组")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/usergroup/create"
self._url = self._host + self._path
self._params = {}
self._json = {}
self.response = ""
self.name = name
self.rights_permission = rights_permission
self.rights_id = rights_id
self.tag_filters = tag_filters
self.userids = userids
self.api = UtilsRequest()
def get_response(self):
self._json = {
"name": self.name,
"rights": {
"permissions": self.rights_permission,
"id": self.rights_id
},
"tag_filters": self.tag_filters,
"userids": self.userids,
}
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 查询用户组
# 作者: 陈磊
# 时间: 2019-11-04
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
from WorkApi.ApiBase import ApiBase, GetBase
class UserGroupGet(GetBase):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
super().__init__()
self.log = UtilsLog()
self.log.info("调用查询用户组")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/usergroup/get"
self._url = self._host + self._path
self._params = {}
self._json = {}
self._data = {}
self.response = ""
self.local_json = {}
self.status = None
self.userids = None
self.usrgrpids = None
self.with_gui_access = None
self.selectTagFilters = None
self.selectUsers = None
self.selectRights = None
self.api = UtilsRequest()
def get_response(self):
self.get_base_json()
base = ApiBase()
base.dict_add_key(_key="status", value=self.status)
base.dict_add_key(_key="userids", value=self.userids)
base.dict_add_key(_key="usrgrpids", value=self.usrgrpids)
base.dict_add_key(_key="with_gui_access", value=self.with_gui_access)
base.dict_add_key(_key="selectTagFilters", value=self.selectTagFilters)
base.dict_add_key(_key="selectUsers", value=self.selectUsers)
base.dict_add_key(_key="selectRights", value=self.selectRights)
self.local_json = base._json
# self.log.debug(self.local_json)
# self.log.debug(self.base_json)
# 当前接口支持的参数与公共参数合并
add = ApiBase()
add.dict_add_dict(_dict_1=self.local_json, _dict_2=self.base_json)
self._json = add._json
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 创建经验手册
# 作者: 陈磊
# 时间: 2019-11-18
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
from WorkApi.ApiBase import ApiBase, GetBase
class ExperienceCreate(object):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
self.log = UtilsLog()
self.log.info("调用创建经验手册")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/experience/create"
self._url = self._host + self._path
self._params = {}
self._json = {}
self._data = {}
self.response = ""
self.local_json = {}
self.title = None
self.content = None
self.businessid = None
self.businessName = None
self.hostType = None
self.hostid = None
self.versions = None
self.monitorid = None
self.monitorName = None
self.alarmRuleids = None
self.description = None
self.tagNameList = None
self.api = UtilsRequest()
def get_response(self):
base = ApiBase()
base.dict_add_key(_key="title", value=self.title)
base.dict_add_key(_key="content", value=self.content)
base.dict_add_key(_key="businessid", value=self.businessid)
base.dict_add_key(_key="businessName", value=self.businessName)
base.dict_add_key(_key="hostType", value=self.hostType)
base.dict_add_key(_key="hostid", value=self.hostid)
base.dict_add_key(_key="versions", value=self.versions)
base.dict_add_key(_key="monitorid", value=self.monitorid)
base.dict_add_key(_key="monitorName", value=self.monitorName)
base.dict_add_key(_key="alarmRuleids", value=self.alarmRuleids)
base.dict_add_key(_key="description", value=self.description)
base.dict_add_key(_key="tagNameList", value=self.tagNameList)
self.local_json = base._json
self._json = self.local_json
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 删除经验手册
# 作者: 陈磊
# 时间: 2019-11-18
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
from WorkApi.ApiBase import ApiBase, GetBase
class ExperienceDelete(object):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
self.log = UtilsLog()
self.log.info("调用删除经验手册")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/experience/delete"
self._url = self._host + self._path
self._params = {}
self._json = {}
self._data = {}
self.response = ""
self.local_json = {}
self.experienceids = None
self.api = UtilsRequest()
def get_response(self):
base = ApiBase()
base.dict_add_key(_key="experienceids", value=self.experienceids)
self.local_json = base._json
self._json = self.local_json
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 查询经验手册
# 作者: 陈磊
# 时间: 2019-11-18
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
from WorkApi.ApiBase import ApiBase, GetBase
class ExperienceGet(object):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
self.log = UtilsLog()
self.log.info("调用查询经验手册")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/experience/get"
self._url = self._host + self._path
self._params = {}
self._json = {}
self._data = {}
self.response = ""
self.local_json = {}
self.hostType = None
self.hostName = None
self.manageLevel = None
self.manageIp = None
self.businessid = None
self.alarmRuleid = None
self.title = None
self.tagid = None
self.description = None
self.api = UtilsRequest()
def get_response(self):
base = ApiBase()
base.dict_add_key(_key="hostType", value=self.hostType)
base.dict_add_key(_key="hostName", value=self.hostName)
base.dict_add_key(_key="manageLevel", value=self.manageLevel)
base.dict_add_key(_key="manageIp", value=self.manageIp)
base.dict_add_key(_key="businessid", value=self.businessid)
base.dict_add_key(_key="alarmRuleid", value=self.alarmRuleid)
base.dict_add_key(_key="title", value=self.title)
base.dict_add_key(_key="tagid", value=self.tagid)
base.dict_add_key(_key="description", value=self.description)
self.local_json = base._json
self._json = self.local_json
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 更新经验手册
# 作者: 陈磊
# 时间: 2019-11-18
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
from WorkApi.ApiBase import ApiBase, GetBase
class ExperienceUpdate(object):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
self.log = UtilsLog()
self.log.info("调用更新经验手册")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/experience/update"
self._url = self._host + self._path
self._params = {}
self._json = {}
self._data = {}
self.response = ""
self.local_json = {}
self.experienceids = None
self.title = None
self.content = None
self.businessid = None
self.businessName = None
self.hostType = None
self.hostid = None
self.versions = None
self.monitorid = None
self.monitorName = None
self.alarmRuleids = None
self.description = None
self.tagNameList = None
self.api = UtilsRequest()
def get_response(self):
base = ApiBase()
base.dict_add_key(_key="experienceids", value=self.experienceids)
base.dict_add_key(_key="title", value=self.title)
base.dict_add_key(_key="content", value=self.content)
base.dict_add_key(_key="businessid", value=self.businessid)
base.dict_add_key(_key="businessName", value=self.businessName)
base.dict_add_key(_key="hostType", value=self.hostType)
base.dict_add_key(_key="hostid", value=self.hostid)
base.dict_add_key(_key="versions", value=self.versions)
base.dict_add_key(_key="monitorid", value=self.monitorid)
base.dict_add_key(_key="monitorName", value=self.monitorName)
base.dict_add_key(_key="alarmRuleids", value=self.alarmRuleids)
base.dict_add_key(_key="description", value=self.description)
base.dict_add_key(_key="tagNameList", value=self.tagNameList)
self.local_json = base._json
self._json = self.local_json
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 查询历史告警
# 作者: 陈磊
# 时间: 2019-11-19
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
from WorkApi.ApiBase import ApiBase, GetBase
class HistoryWarningGet(object):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
self.log = UtilsLog()
self.log.info("调用查询历史告警")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/historyWarning/get"
self._url = self._host + self._path
self._params = {}
self._json = {}
self._data = {}
self.response = ""
self.local_json = {}
self.api = UtilsRequest()
def get_response(self):
self._json = self.local_json
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 查询主机监控
# 作者: 陈磊
# 时间: 2019-11-15
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
from WorkApi.ApiBase import ApiBase, GetBase
class HostMonitorGet(object):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
self.log = UtilsLog()
self.log.info("调用查询主机监控")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/hostmonitor/get"
self._url = self._host + self._path
self._params = {}
self._json = {}
self._data = {}
self.response = ""
self.local_json = {}
self.hostName = None
self.assetCode = None
self.manageIp = None
self.parentHost = None
self.available = None
self.status = None
self.api = UtilsRequest()
def get_response(self):
base = ApiBase()
base.dict_add_key(_key="hostName", value=self.hostName)
base.dict_add_key(_key="assetCode", value=self.assetCode)
base.dict_add_key(_key="manageIp", value=self.manageIp)
base.dict_add_key(_key="parentHost", value=self.parentHost)
base.dict_add_key(_key="available", value=self.available)
base.dict_add_key(_key="status", value=self.status)
self.local_json = base._json
self._json = self.local_json
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 查询监控概览
# 作者: 陈磊
# 时间: 2019-11-18
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
from WorkApi.ApiBase import ApiBase, GetBase
class MonitorGeneralGet(object):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
self.log = UtilsLog()
self.log.info("调用查询监控概览")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/monitorgeneral/get"
self._url = self._host + self._path
self._params = {}
self._json = {}
self._data = {}
self.response = ""
self.local_json = {}
self.api = UtilsRequest()
def get_response(self):
self._json = self.local_json
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 查询网络监控
# 作者: 陈磊
# 时间: 2019-11-18
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
from WorkApi.ApiBase import ApiBase, GetBase
class NetworkMonitorGet(object):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
self.log = UtilsLog()
self.log.info("调用查询网络监控")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/networkmonitor/get"
self._url = self._host + self._path
self._params = {}
self._json = {}
self._data = {}
self.response = ""
self.local_json = {}
self.hostName = None
self.assetCode = None
self.manageIp = None
self.parentHost = None
self.available = None
self.status = None
self.operationsPeopleId = None
self.page = None
self.size = None
self.api = UtilsRequest()
def get_response(self):
base = ApiBase()
base.dict_add_key(_key="hostName", value=self.hostName)
base.dict_add_key(_key="assetCode", value=self.assetCode)
base.dict_add_key(_key="manageIp", value=self.manageIp)
base.dict_add_key(_key="parentHost", value=self.parentHost)
base.dict_add_key(_key="available", value=self.available)
base.dict_add_key(_key="status", value=self.status)
base.dict_add_key(_key="operationsPeopleId", value=self.operationsPeopleId)
base.dict_add_key(_key="page", value=self.page)
base.dict_add_key(_key="size", value=self.size)
self.local_json = base._json
self._json = self.local_json
self.response = self.api.post(url=self._url, json=self._json)
# -*- coding: utf-8 -*-
# 基础接口参数信息
# 作者: 陈磊
# 时间: 2019-10-31
from WorkUtils.UtilsLog import UtilsLog
class GetBase:
"""
公共查询请求参数
"""
def __init__(self):
self.log = UtilsLog()
self.log.info("调用公共查询请求参数方法")
self.log.info(self.__class__)
self.base_json = {}
self.countOutput = None
self.editable = None
self.excludeSearch = None
self.filter = None
self.limit = None
self.output = None
self.preservekeys = None
self.search = None
self.searchByAny = None
self.searchWildcardsEnabled = None
self.sortfield = None
self.sortorder = None
self.startSearch = None
def get_base_json(self):
"""
组装公共查询请求参数
:return:
"""
base = ApiBase()
base.dict_add_key(_key="countOutput", value=self.countOutput)
base.dict_add_key(_key="editable", value=self.editable)
base.dict_add_key(_key="excludeSearch", value=self.excludeSearch)
base.dict_add_key(_key="filter", value=self.filter)
base.dict_add_key(_key="limit", value=self.limit)
base.dict_add_key(_key="output", value=self.output)
base.dict_add_key(_key="preservekeys", value=self.preservekeys)
base.dict_add_key(_key="search", value=self.search)
base.dict_add_key(_key="searchByAny", value=self.searchByAny)
base.dict_add_key(_key="searchWildcardsEnabled", value=self.searchWildcardsEnabled)
base.dict_add_key(_key="sortfield", value=self.sortfield)
base.dict_add_key(_key="sortorder", value=self.sortorder)
base.dict_add_key(_key="startSearch", value=self.startSearch)
self.base_json = base._json
class ApiBase:
def __init__(self):
self.log = UtilsLog()
self.log.info("调用公共接口方法")
self.log.info(self.__class__)
self._json = {}
def dict_add_key(self, _key, value, num=None):
"""
根据value判断key是否为空
根据num是否为空 value按照list取对应序列
:param _key:
:param value:
:param num:
:return:
"""
if value is None:
pass
elif num is None:
self._json[_key] = value
elif num is not None and value is not None:
self._json[_key] = value[num]
else:
pass
def dict_add_dict(self, _dict_1, _dict_2):
"""
两个字典相加
:param _dict_1: 1号字典
:param _dict_2: 2号字典
:return:
"""
if _dict_1 == {}:
if _dict_2 == {}:
self._json = {}
else:
self._json = _dict_2
else:
self._json = {**_dict_1, **_dict_2}
# -*- coding: utf-8 -*-
# 接口测试
# 作者: 陈磊
# 时间: 2019-10-21
from WorkApi.HostGroup.hostGroup_get import Get
from WorkApi.HostGroup.hostGroup_create import Create
from WorkApi.HostGroup.hostGroup_delete import Delete
from WorkApi.HostGroup.hostGroup_update import Update
from WorkUtils.UtilsResponse import UtilsResponse
import time
# 域名
host = "http://10.0.0.12:8080"
# 查询主机
# api = Get(host=host)
# api.searchWildcardsEnabled = "true"
# api.searchByAny = "true"
# api.monitoredHosts = "true"
# api.withItems = "true"
#
# api.output_add("extend")
# api.filter_name_add("Zabbix servers")
# api.filter_name_add("Linux servers")
# api.search_name_add("*emp*")
# 创建主机
# api = Create(host=host)
# api.name = "三石测试主机2"
# api.groupid = 1
# api.flags = 0
# api.internal = 1
# 删除主机
# api = Delete(host=host)
# api.groupids_add(groupids=34)
# 更新主机
api = Update(host=host)
api.groupid = 33
api.name = "三石测试主机1-1"
api.get_response()
# -*- coding: utf-8 -*-
# 查询字典类型
# 作者: 陈磊
# 时间: 2019-11-19
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
from WorkApi.ApiBase import ApiBase, GetBase
class AdminSysDictGetDictTypes(object):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
self.log = UtilsLog()
self.log.info("调用查询字典类型")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/admin/sys/dict/getDictTypes"
self._url = self._host + self._path
self._params = {}
self._json = {}
self._data = {}
self.response = ""
self.local_json = {}
self.api = UtilsRequest()
def get_response(self):
self._json = self.local_json
self.response = self.api.post(url=self._url, json=self._json)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment