Commit 08452c1b by sanshi

创建主机

parent ec0bf746
......@@ -2,7 +2,11 @@
<dictionary name="Administrator">
<words>
<w>applicationids</w>
<w>authpassphrase</w>
<w>authprotocol</w>
<w>authtype</w>
<w>businessid</w>
<w>contextname</w>
<w>departmentid</w>
<w>dingding</w>
<w>discoveryids</w>
......@@ -10,27 +14,43 @@
<w>eventid</w>
<w>eventids</w>
<w>experienceid</w>
<w>functionid</w>
<w>groupid</w>
<w>groupids</w>
<w>hostid</w>
<w>hostids</w>
<w>hostmonitor</w>
<w>hostprototype</w>
<w>hstgrp</w>
<w>interfaceid</w>
<w>ipmi</w>
<w>itemid</w>
<w>itemids</w>
<w>lastchange</w>
<w>lastcheck</w>
<w>lastlogsize</w>
<w>logtimefmt</w>
<w>maintenanceid</w>
<w>mediatype</w>
<w>mediatypeids</w>
<w>monitorid</w>
<w>mtime</w>
<w>networkmonitor</w>
<w>nodata</w>
<w>objectid</w>
<w>objectids</w>
<w>preservekeys</w>
<w>privatekey</w>
<w>privpassphrase</w>
<w>privprotocol</w>
<w>publickey</w>
<w>pytest</w>
<w>ruleid</w>
<w>ruleids</w>
<w>sanshi</w>
<w>securitylevel</w>
<w>securityname</w>
<w>snmpv</w>
<w>sortfield</w>
<w>sortorder</w>
<w>tablename</w>
......@@ -40,6 +60,8 @@
<w>triggerid</w>
<w>triggerids</w>
<w>triggerprototype</w>
<w>useip</w>
<w>valuemapid</w>
<w>wechat</w>
<w>zabbix</w>
<w>zmops</w>
......
......@@ -6,6 +6,7 @@
<list>
<option value="N803" />
<option value="N802" />
<option value="N806" />
</list>
</option>
</inspection_tool>
......
......@@ -8,22 +8,9 @@ from WorkUtils.UtilsLog import UtilsLog
class HostCreate(object):
def __init__(self, _host, host=None, description=None, groupids=None, interfaces=None, tags=None, templateids=None, macros=None,
inventory=None, status=None, tls_accept=None, tls_psk_identity=None, tls_psk=None):
def __init__(self, _host):
"""
:param _host: 域名
:param host:
:param description:
:param groupids:
:param interfaces:
:param tags:
:param templateids:
:param macros:
:param inventory:
:param status:
:param tls_accept:
:param tls_psk_identity:
:param tls_psk:
:return:
"""
self.log = UtilsLog()
......@@ -39,33 +26,91 @@ class HostCreate(object):
self.local_json = {}
self.response = ""
self.host = host
self.description = description
self.groupids = groupids
self.interfaces = interfaces
self.tags = tags
self.templateids = templateids
self.macros = macros
self.inventory = inventory
self.status = status
self.tls_accept = tls_accept
self.tls_psk_identity = tls_psk_identity
self.tls_psk = tls_psk
self.hostid = None
self.host = None
self.available = None
self.description = None
self.flags = None
self.inventory_mode = None
self.ipmi_authtype = None
self.ipmi_available = None
self.ipmi_disable_until = None
self.ipmi_error = None
self.ipmi_errors_from = None
self.ipmi_password = None
self.ipmi_privilege = None
self.ipmi_username = None
self.jmx_available = None
self.jmx_disable_until = None
self.jmx_error = None
self.jmx_errors_from = None
self.maintenance_from = None
self.maintenance_status = None
self.maintenance_type = None
self.maintenanceid = None
self.name = None
self.proxy_hostid = None
self.snmp_available = None
self.snmp_disable_until = None
self.snmp_error = None
self.snmp_errors_from = None
self.interfaces = None
self.groupids = None
self.tags = None
self.templateids = None
self.macros = None
self.inventory = None
self.status = None
self.tls_connect = None
self.tls_accept = None
self.tls_issuer = None
self.tls_subject = None
self.tls_psk_identity = None
self.tls_psk = None
self.api = UtilsRequest()
def get_response(self):
base = ApiBase()
base.dict_add_key(_key="hostid", value=self.hostid)
base.dict_add_key(_key="host", value=self.host)
base.dict_add_key(_key="available", value=self.available)
base.dict_add_key(_key="description", value=self.description)
base.dict_add_key(_key="groupids", value=self.groupids)
base.dict_add_key(_key="flags", value=self.flags)
base.dict_add_key(_key="inventory_mode", value=self.inventory_mode)
base.dict_add_key(_key="ipmiAuthtype", value=self.ipmi_authtype)
base.dict_add_key(_key="ipmiAvailable", value=self.ipmi_available)
base.dict_add_key(_key="ipmiDisable_until", value=self.ipmi_disable_until)
base.dict_add_key(_key="ipmiError", value=self.ipmi_error)
base.dict_add_key(_key="ipmiErrorsFrom", value=self.ipmi_errors_from)
base.dict_add_key(_key="ipmiPassword", value=self.ipmi_password)
base.dict_add_key(_key="ipmiPrivilege", value=self.ipmi_privilege)
base.dict_add_key(_key="ipmiUsername", value=self.ipmi_username)
base.dict_add_key(_key="jmxAvailable", value=self.jmx_available)
base.dict_add_key(_key="jmxDisableUntil", value=self.jmx_disable_until)
base.dict_add_key(_key="jmxError", value=self.jmx_error)
base.dict_add_key(_key="jmxErrorsFrom", value=self.jmx_errors_from)
base.dict_add_key(_key="maintenanceFrom", value=self.maintenance_from)
base.dict_add_key(_key="maintenanceStatus", value=self.maintenance_status)
base.dict_add_key(_key="maintenanceType", value=self.maintenance_type)
base.dict_add_key(_key="maintenanceid", value=self.maintenanceid)
base.dict_add_key(_key="name", value=self.name)
base.dict_add_key(_key="proxyHostid", value=self.proxy_hostid)
base.dict_add_key(_key="snmpAvailable", value=self.snmp_available)
base.dict_add_key(_key="snmpDisableUntil", value=self.snmp_disable_until)
base.dict_add_key(_key="snmpError", value=self.snmp_error)
base.dict_add_key(_key="snmpErrorsFrom", value=self.snmp_errors_from)
base.dict_add_key(_key="interfaces", value=self.interfaces)
base.dict_add_key(_key="groupids", value=self.groupids)
base.dict_add_key(_key="tags", value=self.tags)
base.dict_add_key(_key="templateids", value=self.templateids)
base.dict_add_key(_key="macros", value=self.macros)
base.dict_add_key(_key="inventory", value=self.inventory)
base.dict_add_key(_key="status", value=self.status)
base.dict_add_key(_key="tlsConnect", value=self.tls_connect)
base.dict_add_key(_key="tlsAccept", value=self.tls_accept)
base.dict_add_key(_key="tlsIssuer", value=self.tls_issuer)
base.dict_add_key(_key="tlsSubject", value=self.tls_subject)
base.dict_add_key(_key="tlsPskIdentity", value=self.tls_psk_identity)
base.dict_add_key(_key="tlsPsk", value=self.tls_psk)
self.local_json = base._json
......
......@@ -42,9 +42,9 @@ class MembersCreate(object):
self.api = UtilsRequest()
def get_response(self):
self._headers = {
"token": self.token
}
base = ApiBase()
base.dict_add_key(_key="token", value=self.token)
self._headers = base._json
base = ApiBase()
base.dict_add_key(_key="name", value=self.name)
......
......@@ -34,9 +34,9 @@ class SettingsGetMember(object):
self.api = UtilsRequest()
def get_response(self):
self._headers = {
"token": self.token
}
base = ApiBase()
base.dict_add_key(_key="token", value=self.token)
self._headers = base._json
base = ApiBase()
base.dict_add_key(_key="memberId", value=self.memberId)
......
......@@ -36,9 +36,9 @@ class SettingsChangePassword(object):
self.api = UtilsRequest()
def get_response(self):
self._headers = {
"token": self.token
}
base = ApiBase()
base.dict_add_key(_key="token", value=self.token)
self._headers = base._json
base = ApiBase()
base.dict_add_key(_key="memberId", value=self.memberId)
......
......@@ -12,6 +12,11 @@ from WorkUtils.UtilsDataBase import UtilsDataBase
from WorkUtils.UtilsResponse import UtilsResponse
from WorkApi.API.Host.host_create import HostCreate
from WorkData.Zabbix.hosts import DataHosts
from WorkData.Zabbix.items import DataItems
from WorkData.Zabbix.triggers import DataTriggers
from WorkData.Zabbix.functions import DataFunctions
from WorkData.Zabbix.hosts_groups import DataHostsGroups
from WorkData.Zabbix.interface import DataInterface
import allure
......@@ -29,16 +34,25 @@ class TestHostCreate(object):
db_user = env["db_user"]
db_pw = env["db_pw"]
db_base = env["db_base"]
base_name = "SS测试主机接口"
base_ip = "SS测试主机接口IP"
base_group_id = 101
base_name = "SS TEST HOST"
base_ip = "10.0.0.10"
base_host = "SS测试主机"
base_description = "SS测试主机描述"
@classmethod
def setup_class(cls):
cls.log.debug("开始执行测试套件.......")
# session = UtilsDataBase().conn_mysql(db_url=cls.db_url, db_port=cls.db_port, db_base=cls.db_base, db_user=cls.db_user, db_pw=cls.db_pw)
# DataHstGrp().delete_like_name(session=session, name=cls.base_name)
session = UtilsDataBase().conn_mysql(db_url=cls.db_url, db_port=cls.db_port, db_base=cls.db_base, db_user=cls.db_user, db_pw=cls.db_pw)
sql = DataHosts().select_like_name(session=session, name=cls.base_name)
for x, y in enumerate(sql):
session = UtilsDataBase().conn_mysql(db_url=cls.db_url, db_port=cls.db_port, db_base=cls.db_base, db_user=cls.db_user, db_pw=cls.db_pw)
DataItems().delete_hostid(session=session, hostid=y.hostid)
session = UtilsDataBase().conn_mysql(db_url=cls.db_url, db_port=cls.db_port, db_base=cls.db_base, db_user=cls.db_user, db_pw=cls.db_pw)
DataHosts().delete_like_name(session=session, name=cls.base_name)
@classmethod
def teardown_class(cls):
......@@ -51,10 +65,11 @@ class TestHostCreate(object):
self.log.debug("测试用例执行结束...")
@allure.step("调用接口:host.create")
def host_create(self, host=None, description=None, groupids=None, interfaces=None, tags=None, templateids=None, macros=None, inventory=None, status=None,
tls_accept=None, tls_psk_identity=None, tls_psk=None):
def host_create(self, host=None, name=None, description=None, groupids=None, interfaces=None, tags=None, templateids=None, macros=None,
inventory=None, status=None, inventory_mode=None):
api = HostCreate(_host=self.host)
api.host = host
api.name = name
api.description = description
api.groupids = groupids
api.interfaces = interfaces
......@@ -63,9 +78,11 @@ class TestHostCreate(object):
api.macros = macros
api.inventory = inventory
api.status = status
api.tls_accept = tls_accept
api.tls_psk_identity = tls_psk_identity
api.tls_psk = tls_psk
api.inventory_mode = inventory_mode
api.tls_connect = 1
api.tls_accept = 1
api.tls_psk_identity = "Test"
api.tls_psk = "Test"
api.get_response()
return api.response
......@@ -74,37 +91,57 @@ class TestHostCreate(object):
session = UtilsDataBase().conn_mysql(db_url=self.db_url, db_port=self.db_port, db_base=self.db_base, db_user=self.db_user, db_pw=self.db_pw)
return session
@allure.step("查询表:hosts_group")
def select_hosts_group(self, groupid=None, hostid=None):
session = self.db_session()
sql = DataHostsGroups().select_all_from_allKeys(session=session, groupid=groupid, hostid=hostid)
return sql
@allure.step("查询表:interface")
def select_interface(self, interfaceid=None, hostid=None):
session = self.db_session()
sql = DataInterface().select_all_from_allKeys(session=session, interfaceid=interfaceid, hostid=hostid)
return sql
@allure.step("查询表:items")
def select_items(self, hostid=None, templateid=None):
session = self.db_session()
sql = DataItems().select_all_from_allKeys(session=session, hostid=hostid, templateid=templateid)
return sql
@allure.step("查询表:functions")
def select_functions(self, triggerid=None, itemid=None):
session = self.db_session()
sql = DataFunctions().select_all_from_allKeys(session=session, triggerid=triggerid, itemid=itemid)
return sql
@allure.step("查询表:templateid")
def select_triggers(self, templateid=None):
session = self.db_session()
sql = DataTriggers().select_all_from_allKeys(session=session, templateid=templateid)
return sql
@allure.step("校验查询结果")
def check_select(self, groupid, name, internal, flags):
sql = self.select_hstgrp(groupid=groupid)
def check_select(self, hostid, _type):
sql = self.select_hosts_group(groupid=self.base_group_id, hostid=hostid)
for x, y in enumerate(sql):
assert y.groupid == groupid
assert y.name == name
assert y.flags == flags
assert y.internal == internal
assert y.groupid == self.base_group_id
assert y.hostid == hostid
@allure.step("断言返回结果:校验排序")
def check_sortfield(self, response, possible):
result = UtilsResponse().get_result(response=response)
UtilsResponse().check_sort(_list=result, key="groupid", possible=possible)
@allure.step("断言返回结果:校验返回数据的数量")
def check_num(self, response, num):
result = UtilsResponse().get_result(response=response)
assert len(result) == num
sql = self.select_interface(hostid=hostid)
for x, y in enumerate(sql):
assert y.ip == self.base_ip
assert y.port == "100"
assert y.type == _type
assert y.main == 1
@allure.step("断言返回结果")
def check_code(self, response, code):
_code = UtilsResponse().get_code(response=response)
assert _code == code
@allure.step("断言返回错误结果")
def check_status(self, response, status):
_status = UtilsResponse().get_status(response=response)
assert _status == status
@allure.title("host.create:IP地址相应信息")
@allure.story("创建主机接口:IP地址相应信息")
@allure.title("host.create:创建agent主机")
@allure.story("创建主机接口:创建agent主机")
@allure.severity("blocker")
def test_case_01(self):
host = self.base_name + "01"
......@@ -113,42 +150,113 @@ class TestHostCreate(object):
"type": 1,
"main": 1,
"useip": 1,
"ip": "192.168.3.1",
"ip": self.base_ip,
"dns": "",
"port": "10050",
"port": 100,
"bulk": 1
}
response = self.host_create(host=host, description=description, groupids=[87], interfaces=[interfaces], status=0,
tls_accept=1, tls_psk_identity="22", tls_psk="22")
response = self.host_create(host=host, description=description, groupids=[self.base_group_id], interfaces=[interfaces], status=0)
self.check_code(response=response, code=0)
# groupid = UtilsResponse().get_result(response=response)["groupids"][0]
# self.check_select(groupid=groupid, name=name, flags=flags, internal=internal)
hostid = UtilsResponse().get_result(response=response)["hostids"][0]
self.check_select(hostid=hostid, _type=1)
@allure.title("host.create:重复创建")
@allure.story("创建主机接口:重复创建")
@allure.title("host.create:创建snmp主机")
@allure.story("创建主机接口:创建snmp主机")
@allure.severity("blocker")
def test_case_02(self):
name = self.base_name + "02"
flags = 0
internal = 0
response = self.host_create(name=name)
host = self.base_name + "02"
description = self.base_description + "02"
interfaces = {
"type": 2,
"main": 1,
"useip": 1,
"ip": self.base_ip,
"dns": "",
"port": 100,
"bulk": 1
}
response = self.host_create(host=host, description=description, groupids=[self.base_group_id], interfaces=[interfaces], status=0)
self.check_code(response=response, code=0)
groupid = UtilsResponse().get_result(response=response)["groupids"][0]
self.check_select(groupid=groupid, name=name, flags=flags, internal=internal)
hostid = UtilsResponse().get_result(response=response)["hostids"][0]
self.check_select(hostid=hostid, _type=2)
response = self.host_create(name=name, flags=flags, internal=internal)
self.check_code(response=response, code=100)
@allure.title("host.create:无参数调用")
@allure.story("创建主机接口:无参数调用")
@allure.title("host.create:重复创建")
@allure.story("创建主机接口:重复创建")
def test_case_03(self):
response = self.host_create()
self.check_status(response=response, status=400)
host = self.base_name + "03"
description = self.base_description + "03"
interfaces = {
"type": 1,
"main": 1,
"useip": 1,
"ip": self.base_ip,
"dns": "",
"port": 100,
"bulk": 1
}
response = self.host_create(host=host, description=description, groupids=[101], interfaces=[interfaces], status=0)
self.check_code(response=response, code=0)
response = self.host_create(host=host, description=description, groupids=[101], interfaces=[interfaces], status=0)
self.check_code(response=response, code=100)
@allure.title("host.create:name=空字符串")
@allure.story("创建主机接口:name=空字符串")
@allure.title("host.create:模板创建")
@allure.story("创建主机接口:模板创建")
def test_case_04(self):
response = self.host_create(name="")
self.check_status(response=response, status=400)
templateid = 10001
host = self.base_name + "04"
description = self.base_description + "04"
interfaces = {
"type": 2,
"main": 1,
"useip": 1,
"ip": self.base_ip,
"dns": "",
"port": 100,
"bulk": 1
}
response = self.host_create(host=host, description=description, groupids=[101], interfaces=[interfaces], status=0, templateids=[templateid],
macros=[{
"macro": "{$SNMP_COMMUNITY}",
"value": "public"
}])
self.check_code(response=response, code=0)
hostid = UtilsResponse().get_result(response=response)["hostids"][0]
self.check_host(hostid=hostid, templateid=templateid)
@allure.step("校验创建host的相关数据")
def check_host(self, hostid, templateid):
self.log.debug("根据templateid= %s, 获取模板监控项IDs" % templateid)
itemids = []
sql = self.select_items(hostid=templateid)
for x, y in enumerate(sql):
itemids.append(y.itemid)
self.log.debug("根据模板监控项IDs, 获取模板触发器IDs")
triggers = []
for x, y in enumerate(itemids):
sql = self.select_functions(itemid=y)
if sql:
triggers.append(sql[0].triggerid)
new_itemids = []
self.log.info("校验监控项是否根据模板监控器已创建")
for x, y in enumerate(itemids):
sql = self.select_items(hostid=hostid, templateid=y)
assert sql != []
new_itemids.append(sql[0].itemid)
new_triggers = []
self.log.info("校验触发器是否根据模板触发器已创建")
for x, y in enumerate(triggers):
sql = self.select_triggers(templateid=y)
assert sql != []
new_triggers.append(sql[0].triggerid)
self.log.info("校验监控器与触发器关系是否已创建")
for x, y in enumerate(new_triggers):
sql = self.select_functions(triggerid=y)
assert sql != []
if __name__ == "__main__":
......@@ -160,12 +268,9 @@ if __name__ == "__main__":
# 执行自动化测试用例
# case_info = os.path.split(__file__)
# case = UtilsCmd().pytest_cmd()
# r = UtilsPyTest(version=case["version"])
# r.run_main(case_info=case_info)
# r = UtilsPyTest(case=case, case_info=case_info)
# r.run_main()
a = TestHostCreate()
# a.setup_class()
a.test_case_01()
# a.test_case_02()
# a.test_case_03()
# a.test_case_04()
a.setup_class()
a.test_case_04()
......@@ -31,13 +31,13 @@ class TestTriggerCreate(object):
db_base = env["db_base"]
base_name = "SS测试主机"
base_description = "SS测试触发器"
base_expression = "{Template OS Linux:agent.ping.last()}=0"
base_expression = "{Template OS Linux:agent.ping.nodata(5m)}=1"
@classmethod
def setup_class(cls):
cls.log.debug("开始执行测试套件.......")
# session = UtilsDataBase().conn_mysql(db_url=cls.db_url, db_port=cls.db_port, db_base=cls.db_base, db_user=cls.db_user, db_pw=cls.db_pw)
# DataTriggers().delete_description(session=session, description=cls.base_description)
session = UtilsDataBase().conn_mysql(db_url=cls.db_url, db_port=cls.db_port, db_base=cls.db_base, db_user=cls.db_user, db_pw=cls.db_pw)
DataTriggers().delete_description(session=session, description=cls.base_description)
@classmethod
def teardown_class(cls):
......
......@@ -10,6 +10,8 @@ from WorkCase import CaseBase
from WorkUtils.UtilsLog import UtilsLog
from WorkUtils.UtilsDataBase import UtilsDataBase
from WorkUtils.UtilsResponse import UtilsResponse
from WorkApi.APP.Api.api_login import ApiLogin
from WorkApi.APP.Members.members_create import MembersCreate
from WorkData.Argus.members import DataMembers
......@@ -28,7 +30,10 @@ class TestMembersCreate(object):
db_user = env["db_user"]
db_pw = env["db_pw"]
db_base = env["db_base"]
base_id = 10
base_name = "SS测试用户名"
base_password = "123456"
@classmethod
def setup_class(cls):
......@@ -44,6 +49,14 @@ class TestMembersCreate(object):
def teardown_method(self):
self.log.debug("测试用例执行结束...")
@allure.step("调用接口:api.login")
def api_login(self, name=None, password=None):
api = ApiLogin(_host=self.host)
api.name = name
api.password = password
api.get_response()
return api.response
@allure.step("调用接口:members.create")
def members_create(self, token=None, name=None, position=None, role=None, password=None,
phone=None, email=None, wechatUrl=None, dingdingUrl=None, remark=None):
......@@ -61,11 +74,22 @@ class TestMembersCreate(object):
api.get_response()
return api.response
@allure.step("创建测试数据")
def get_base_token(self):
response = self.api_login(name=self.base_name, password=self.base_password)
self.token = UtilsResponse().get_data(response=response)
@allure.step("连接数据库:Argus")
def db_session(self):
session = UtilsDataBase().conn_mysql(db_url=self.db_url, db_port=self.db_port, db_base=self.db_base, db_user=self.db_user, db_pw=self.db_pw)
return session
@allure.step("新增表数据:members")
def insert_one(self, name=None, role=None, password=None):
session = self.db_session()
sql = DataMembers().insert_one(session=session, name=name, role=role, password=password)
return sql
@allure.step("查询表:members")
def select_members(self, members_id=None, name=None, role=None, status=None, del_flag=None):
session = self.db_session()
......@@ -73,6 +97,24 @@ class TestMembersCreate(object):
status=status, del_flag=del_flag)
return sql
@allure.step("更新表数据:members -- del_flag")
def update_members_del_flag(self, members_id=None, value=None):
session = self.db_session()
sql = DataMembers().update_del_flag(session=session, _id=members_id, value=value)
return sql
@allure.step("更新表数据:members -- status")
def update_members_status(self, members_id=None, value=None):
session = self.db_session()
sql = DataMembers().update_status(session=session, _id=members_id, value=value)
return sql
@allure.step("更新表数据:members -- password")
def update_members_password(self, members_id=None, value=None):
session = self.db_session()
sql = DataMembers().update_password(session=session, _id=members_id, value=value)
return sql
@allure.step("校验查询结果")
def check_select(self, members_id, name, position, role, password, phone, email, wechatUrl, dingdingUrl, remark):
sql = self.select_memberss(members_id=members_id)
......@@ -112,15 +154,43 @@ class TestMembersCreate(object):
_status = UtilsResponse().get_status(response=response)
assert _status == status
@allure.title("members.create:创建一个新用户")
@allure.story("创建用户:创建一个新用户")
@allure.title("members.create:token未传")
@allure.story("创建用户:token未传")
@allure.severity("blocker")
def test_case_01(self):
response = self.members_create(token="eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJhdWQiOiIxIn0.qfd0G-elhE1aGr15LrnYlIZ_3UToaOM5HeMcXrmDGBM",name=self.base_name, role=0, password=123456)
self.check_code(response=response, code=0)
self.get_base_token()
response = self.members_create()
self.check_code(response=response, code=2001)
self.check_msg(response=response, msg="无token,请重新登录")
members_id = UtilsResponse().get_data(response=response)
# self.check_select(membersid=membersid, description=self.base_description, tags_tag=tags_tag, tags_value=tags_value)
@allure.title("members.create:token的用户已删除")
@allure.story("创建用户:token的用户已删除")
@allure.severity("blocker")
def test_case_02(self):
self.get_base_token()
self.update_members_del_flag(members_id=self.base_id, value=1)
response = self.members_create(token=self.token)
self.check_code(response=response, code=2004)
self.check_msg(response=response, msg="用户不存在")
@allure.title("settings.changePassword:token的用户已禁用")
@allure.story("变更密码:token的用户已禁用")
@allure.severity("blocker")
def test_case_03(self):
self.get_base_token()
self.update_members_status(members_id=self.base_id, value=1)
response = self.members_create(token=self.token)
self.check_code(response=response, code=2003)
self.check_msg(response=response, msg="账户已被禁用")
@allure.title("settings.changePassword:token错误")
@allure.story("变更密码:token错误")
def test_case_04(self):
self.get_base_token()
response = self.settings_changePassword(token="Test", memberId=self.base_id)
self.check_code(response=response, code=2001)
self.check_msg(response=response, msg="token错误,请重新登录")
if __name__ == "__main__":
......@@ -137,4 +207,4 @@ if __name__ == "__main__":
a = TestMembersCreate()
a.setup_class()
a.test_case_01()
a.test_case_02()
......@@ -46,6 +46,7 @@ class TestSettingsChangePassword(object):
self.log.debug("恢复测试数据...")
self.update_members_del_flag(members_id=self.base_id, value=0)
self.update_members_status(members_id=self.base_id, value=0)
self.update_members_password(members_id=self.base_id, value=self.base_password)
def teardown_method(self):
self.log.debug("测试用例执行结束...")
......@@ -98,6 +99,12 @@ class TestSettingsChangePassword(object):
sql = DataMembers().update_status(session=session, _id=members_id, value=value)
return sql
@allure.step("更新表数据:members -- password")
def update_members_password(self, members_id=None, value=None):
session = self.db_session()
sql = DataMembers().update_password(session=session, _id=members_id, value=value)
return sql
@allure.title("settings.changePassword:token未传")
@allure.story("变更密码:token未传")
@allure.severity("blocker")
......@@ -147,9 +154,13 @@ class TestSettingsChangePassword(object):
def test_case_05(self):
response = self.api_login(name=self.base_name, password=self.base_password)
self.token = UtilsResponse().get_data(response=response)
response = self.settings_changePassword(token=self.token, memberId=self.base_id)
new_password = "Test"
response = self.settings_changePassword(token=self.token, memberId=self.base_id, oldPassword=self.base_password,
newPassword=new_password)
self.check_code(response=response, code=0)
self.api_login
if __name__ == "__main__":
from WorkUtils.UtilsPyTest import UtilsPyTest
......
......@@ -11,17 +11,17 @@ class CaseBase:
self.environment = {
# "host": "http://10.0.0.12:7070",
# "db_url": "10.0.0.153",
# "db_port": 3306,
# "db_user": "zmops",
# "db_pw": "0VMoH%W7|h",
# "db_base": "zabbix",
"host": "http://localhost:7070",
"db_url": "10.0.0.155",
"db_url": "10.0.0.153",
"db_port": 3306,
"db_user": "root",
"db_pw": "123456",
"db_base": "zabbix"
"db_user": "zmops",
"db_pw": "0VMoH%W7|h",
"db_base": "zabbix",
"host": "http://localhost:7070",
# "db_url": "10.0.0.155",
# "db_port": 3306,
# "db_user": "root",
# "db_pw": "123456",
# "db_base": "zabbix"
}
self.app_environment = {
......
......@@ -47,6 +47,34 @@ class DataMembers(object):
self.table = Members
def insert_one(self, session, name=None, password=None, role=None):
"""
:param session: 指针
:param name:
:param password:
:param role:
:return:
"""
self.log.debug("新增表数据:")
try:
base = UtilsDataBase()
base.add_param(_key="name", value=name)
base.add_param(_key="password", value=password)
base.add_param(_key="role", value=role)
sql_rep = self.table(**base.param)
session.add(sql_rep)
session.commit()
_id = sql_rep.id
self.log.debug("新增的id:%s" % _id)
session.close()
return _id
except UtilsDataBase().errors as error:
self.log.error("异常:")
self.log.error(error)
session.close()
return error
def select_all_from_allKeys(self, session, _id=None, name=None, role=None, status=None, del_flag=None):
"""
:param session: 指针
......@@ -139,3 +167,25 @@ class DataMembers(object):
self.log.error(error)
session.close()
return error
def update_password(self, session, _id, value):
"""
:param session: 指针
:param _id:
:param value:
:return:
"""
self.log.debug("查询数据库:")
try:
self.log.debug("更新相关测试数据")
sql_rep = session.query(self.table).filter_by(id=_id).one()
sql_rep.password = value
session.commit()
session.close()
self.log.debug("更新成功")
return True
except UtilsDataBase().errors as error:
self.log.error("异常:")
self.log.error(error)
session.close()
return error
# -*- coding: utf_8 -*-
# 表名: functions
# 作者: 陈磊
# 时间: 2019-11-21
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, VARCHAR, BIGINT, SMALLINT, DATETIME, DECIMAL, BigInteger, FLOAT, NUMERIC, TEXT
from sqlalchemy.orm.exc import MultipleResultsFound, NoResultFound
from sqlalchemy import or_
from WorkUtils.UtilsLog import UtilsLog
from WorkUtils.UtilsDataBase import UtilsDataBase
import time
Base = declarative_base()
class Functions(Base):
"""
监控与触发器关系表
"""
__tablename__ = "functions"
functionid = Column(BigInteger, primary_key=True)
itemid = Column(BigInteger)
triggerid = Column(BigInteger)
name = Column(VARCHAR(12))
parameter = Column(VARCHAR(255))
class DataFunctions(object):
def __init__(self):
self.log = UtilsLog()
self.log.info(self.__class__)
self.table = Functions
def select_all_from_allKeys(self, session, itemid=None, triggerid=None):
"""
:param session: 指针
:param itemid:
:param triggerid:
:return: 查询结果
"""
self.log.debug("查询数据库:")
self.log.debug("triggerid: %s" % triggerid)
self.log.debug("itemid: %s" % itemid)
try:
base = UtilsDataBase()
base.add_param(_key="itemid", value=itemid)
base.add_param(_key="triggerid", value=triggerid)
sql_rep = session.query(self.table).filter_by(**base.param).all()
self.log.debug(sql_rep)
session.close()
return sql_rep
except UtilsDataBase().errors as error:
self.log.error("异常:")
self.log.error(error)
session.close()
return error
......@@ -82,6 +82,9 @@ class DataHosts(object):
:return: 查询结果
"""
self.log.debug("查询数据库:")
self.log.debug("hostid: %s" % hostid)
self.log.debug("host: %s" % host)
self.log.debug("name: %s" % name)
try:
base = UtilsDataBase()
base.add_param(_key="hostid", value=hostid)
......@@ -97,7 +100,26 @@ class DataHosts(object):
session.close()
return error
def delete_name(self, session, name):
def select_like_name(self, session, name):
"""
:param session: 指针
:param name:
:return:
"""
self.log.debug("查询数据库:")
try:
self.log.debug("查询相关测试数据")
sql = session.query(self.table).filter(self.table.name.like(name + "%")).all()
session.close()
self.log.debug("删除成功")
return sql
except UtilsDataBase().errors as error:
self.log.error("异常:")
self.log.error(error)
session.close()
return error
def delete_like_name(self, session, name):
"""
:param session: 指针
:param name:
......@@ -106,7 +128,7 @@ class DataHosts(object):
self.log.debug("查询数据库:")
try:
self.log.debug("删除相关测试数据")
session.query(self.table).filter_by(name=name).delete(synchronize_session=False)
session.query(self.table).filter(self.table.name.like(name + "%")).delete(synchronize_session=False)
# session.delete(sql)
session.commit()
session.close()
......
......@@ -51,6 +51,10 @@ class DataInterface(object):
:return: 查询结果
"""
self.log.debug("查询数据库:")
self.log.debug("interfaceid: %s" % interfaceid)
self.log.debug("hostid: %s" % hostid)
self.log.debug("ip: %s" % ip)
self.log.debug("port: %s" % port)
try:
base = UtilsDataBase()
base.add_param(_key="interfaceid", value=interfaceid)
......
# -*- coding: utf_8 -*-
# 表名: items
# 作者: 陈磊
# 时间: 2019-11-21
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, VARCHAR, SMALLINT, DATETIME, DECIMAL, BigInteger, FLOAT, NUMERIC, TEXT
from sqlalchemy.orm.exc import MultipleResultsFound, NoResultFound
from sqlalchemy import or_
from WorkUtils.UtilsLog import UtilsLog
from WorkUtils.UtilsDataBase import UtilsDataBase
import time
Base = declarative_base()
class Items(Base):
"""
监控信息表
"""
__tablename__ = "items"
itemid = Column(BigInteger, primary_key=True)
type = Column(Integer)
snmp_community = Column(VARCHAR(64))
snmp_oid = Column(VARCHAR(512))
hostid = Column(BigInteger)
name = Column(VARCHAR(255))
key_ = Column(VARCHAR(255))
delay = Column(VARCHAR(1024))
history = Column(VARCHAR(255))
trends = Column(VARCHAR(255))
status = Column(Integer)
value_type = Column(Integer)
trapper_hosts = Column(VARCHAR(255))
units = Column(VARCHAR(255))
snmpv3_securityname = Column(VARCHAR(64))
snmpv3_securitylevel = Column(Integer)
snmpv3_authpassphrase = Column(Integer)
snmpv3_privpassphrase = Column(VARCHAR(64))
formula = Column(VARCHAR(255))
error = Column(VARCHAR(2048))
lastlogsize = Column(BigInteger)
logtimefmt = Column(VARCHAR(64))
templateid = Column(BigInteger)
valuemapid = Column(BigInteger)
params = Column(TEXT)
ipmi_sensor = Column(VARCHAR(128))
authtype = Column(Integer)
username = Column(VARCHAR(64))
password = Column(VARCHAR(64))
publickey = Column(VARCHAR(64))
privatekey = Column(VARCHAR(64))
mtime = Column(Integer)
flags = Column(Integer)
interfaceid = Column(BigInteger)
port = Column(VARCHAR(64))
description = Column(TEXT)
inventory_link = Column(Integer)
lifetime = Column(VARCHAR(255))
snmpv3_authprotocol = Column(Integer)
snmpv3_privprotocol = Column(Integer)
state = Column(Integer)
snmpv3_contextname = Column(VARCHAR(255))
evaltype = Column(Integer)
jmx_endpoint = Column(VARCHAR(255))
master_itemid = Column(BigInteger)
timeout = Column(VARCHAR(255))
url = Column(VARCHAR(2048))
query_fields = Column(VARCHAR(2048))
posts = Column(TEXT)
status_codes = Column(VARCHAR(255))
follow_redirects = Column(Integer)
post_type = Column(Integer)
http_proxy = Column(VARCHAR(255))
headers = Column(TEXT)
retrieve_mode = Column(Integer)
request_method = Column(Integer)
output_format = Column(Integer)
ssl_cert_file = Column(VARCHAR(255))
ssl_key_file = Column(VARCHAR(255))
ssl_key_password = Column(VARCHAR(64))
verify_peer = Column(Integer)
verify_host = Column(Integer)
allow_traps = Column(Integer)
class DataItems(object):
def __init__(self):
self.log = UtilsLog()
self.log.info(self.__class__)
self.table = Items
def select_all_from_allKeys(self, session, hostid=None, templateid=None):
"""
:param session: 指针
:param hostid:
:param templateid:
:return: 查询结果
"""
self.log.debug("查询数据库:")
self.log.debug("hostid: %s" % hostid)
self.log.debug("templateid: %s" % templateid)
try:
base = UtilsDataBase()
base.add_param(_key="hostid", value=hostid)
base.add_param(_key="templateid", value=templateid)
sql_rep = session.query(self.table).filter_by(**base.param).all()
self.log.debug(sql_rep)
session.close()
return sql_rep
except UtilsDataBase().errors as error:
self.log.error("异常:")
self.log.error(error)
session.close()
return error
def delete_hostid(self, session, hostid):
"""
:param session: 指针
:param hostid:
:return:
"""
self.log.debug("查询数据库:")
try:
self.log.debug("删除相关测试数据")
session.query(self.table).filter_by(hostid=hostid).delete(synchronize_session=False)
# session.delete(sql)
session.commit()
session.close()
self.log.debug("删除成功")
return True
except UtilsDataBase().errors as error:
self.log.error("异常:")
self.log.error(error)
session.close()
return error
......@@ -51,18 +51,23 @@ class DataTriggers(object):
self.table = Triggers
def select_all_from_allKeys(self, session, triggerid=None, description=None):
def select_all_from_allKeys(self, session, triggerid=None, description=None, templateid=None):
"""
:param session: 指针
:param triggerid:
:param templateid:
:param description:
:return: 查询结果
"""
self.log.debug("查询数据库:")
self.log.debug("triggerid: %s" % triggerid)
self.log.debug("description: %s" % description)
self.log.debug("templateid: %s" % templateid)
try:
base = UtilsDataBase()
base.add_param(_key="triggerid", value=triggerid)
base.add_param(_key="description", value=description)
base.add_param(_key="templateid", value=templateid)
sql_rep = session.query(self.table).filter_by(**base.param).all()
self.log.debug(sql_rep)
session.close()
......@@ -93,3 +98,24 @@ class DataTriggers(object):
self.log.error(error)
session.close()
return error
def delete_description(self, session, description):
"""
:param session: 指针
:param description:
:return:
"""
self.log.debug("查询数据库:")
try:
self.log.debug("删除相关测试数据")
session.query(self.table).filter_by(description=description).delete(synchronize_session=False)
# session.delete(sql)
session.commit()
session.close()
self.log.debug("删除成功")
return True
except UtilsDataBase().errors as error:
self.log.error("异常:")
self.log.error(error)
session.close()
return error
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment