Commit d96d5969 by sanshi

完成 巡检任务 模块

parent dcc17dc3
...@@ -27,8 +27,8 @@ class InspectionRuleGet(object): ...@@ -27,8 +27,8 @@ class InspectionRuleGet(object):
self.response = "" self.response = ""
self.token = None self.token = None
self.id = None self.name = None
self.status = None self.period = None
self.api = UtilsRequest() self.api = UtilsRequest()
...@@ -38,7 +38,7 @@ class InspectionRuleGet(object): ...@@ -38,7 +38,7 @@ class InspectionRuleGet(object):
self._headers = base._json self._headers = base._json
base = ApiBase() base = ApiBase()
base.dict_add_key(_key="id", value=self.id) base.dict_add_key(_key="name", value=self.name)
base.dict_add_key(_key="status", value=self.status) base.dict_add_key(_key="period", value=self.period)
self._params = base._json self._params = base._json
self.response = self.api.get(url=self._url, headers=self._headers, params=self._params) self.response = self.api.get(url=self._url, headers=self._headers, params=self._params)
# -*- coding: utf-8 -*-
# 测试用例
# 作者: 陈磊
# 时间: 2019-12-20
from __future__ import division
import pytest
from WorkCase import CaseBase
from WorkCase.APP import AppBase
from WorkUtils.UtilsLog import UtilsLog
from WorkUtils.UtilsDataBase import UtilsDataBase
from WorkUtils.UtilsResponse import UtilsResponse
from WorkApi.APP.Api.api_login import ApiLogin
from WorkApi.APP.HostType.hostType_create import HostTypeCreate
from WorkData.Argus.hosts import DataHosts
from WorkData.Argus.inspection_rule import DataInspectionRule
import allure
@allure.feature("测试模块:inspection.rule.create")
class TestInspectionRuleCreate(object):
log = UtilsLog()
env = CaseBase().environment_test
host = env["host"]
db_url = env["db_url"]
db_port = env["db_port"]
db_user = env["db_user"]
db_pw = env["db_pw"]
db_base = env["db_base"]
login_name = "SS测试用户名"
base_id = 10
base_password = "123456"
base_type_agent = 126
base_type_snmp = 127
base_ip = "172.16.3.171"
base_dns = "sanshi.zmops.com"
base_port = 161
base_business_id_1 = 12
base_business_id_2 = 13
base_host_name = "SS 自动化巡检任务专用"
base_name = "SS 自动化专用巡检任务"
@classmethod
def setup_class(cls):
cls.log.debug("开始执行测试套件.......")
session = UtilsDataBase().conn_mysql(db_url=cls.db_url, db_port=cls.db_port, db_base=cls.db_base, db_user=cls.db_user, db_pw=cls.db_pw)
DataInspectionRule().delete_like_name(session=session, name=cls.base_name)
session = UtilsDataBase().conn_mysql(db_url=cls.db_url, db_port=cls.db_port, db_base=cls.db_base, db_user=cls.db_user, db_pw=cls.db_pw)
DataHosts().delete_like_host_name(session=session, host_name=cls.base_host_name)
@classmethod
def teardown_class(cls):
cls.log.debug("结束执行测试套件.......")
def setup_method(self):
self.log.debug("测试用例执行开始...")
self.app = AppBase(host=self.host)
self.app.update_members_del_flag(session=self.db_session(), members_id=self.base_id, value=CaseBase().del_flag["OFF"])
self.app.update_members_status(session=self.db_session(), members_id=self.base_id, value=CaseBase().status["ON"])
self.token = self.app.get_base_token(login_name=self.login_name, base_password=self.base_password)
def teardown_method(self):
self.log.debug("测试用例执行结束...")
self.app.update_members_del_flag(session=self.db_session(), members_id=self.base_id, value=CaseBase().del_flag["OFF"])
self.app.update_members_status(session=self.db_session(), members_id=self.base_id, value=CaseBase().status["ON"])
@allure.step("连接数据库:Argus")
def db_session(self):
session = UtilsDataBase().conn_mysql(db_url=self.db_url, db_port=self.db_port, db_base=self.db_base, db_user=self.db_user, db_pw=self.db_pw)
return session
@allure.step("查询表:inspection_rule")
def select_inspection_rule(self, _id=None, name=None):
session = self.db_session()
sql = DataInspectionRule().select_all_from_allKeys(session=session, _id=_id, name=name)
return sql
@allure.step("校验查询结果")
def check_select(self, response, name, _type, time, period, hosts, crt_user, status):
data = UtilsResponse().get_data(response=response)
sql = self.select_inspection_rule(_id=data[0])
assert sql != []
for x, y in enumerate(sql):
assert y.name == name
assert y.type == _type
assert y.time == time
assert y.period == period
assert y.hosts == hosts
# assert y.crt_user == crt_user
assert y.status == status
assert x == 0
@allure.title("inspections.rule.create:token未传")
@allure.story("创建巡检任务:token未传")
@allure.severity("blocker")
def test_case_01(self):
response = self.app.inspection_rule_create()
self.app.check_code(response=response, code=2001)
self.app.check_msg(response=response, msg="无token,请重新登录")
@allure.title("inspections.rule.create:token的用户已删除")
@allure.story("创建巡检任务:token的用户已删除")
@allure.severity("blocker")
def test_case_02(self):
self.app.update_members_del_flag(session=self.db_session(), members_id=self.base_id, value=CaseBase().del_flag["ON"])
response = self.app.inspection_rule_create(token=self.token)
self.app.check_code(response=response, code=2004)
self.app.check_msg(response=response, msg="用户不存在")
@allure.title("inspections.rule.create:token的用户已禁用")
@allure.story("创建巡检任务:token的用户已禁用")
@allure.severity("blocker")
def test_case_03(self):
self.app.update_members_status(session=self.db_session(), members_id=self.base_id, value=CaseBase().status["OFF"])
response = self.app.inspection_rule_create(token=self.token)
self.app.check_code(response=response, code=2003)
self.app.check_msg(response=response, msg="账户已被禁用")
@allure.title("inspections.rule.create:成功创建-自动日巡检")
@allure.story("创建巡检任务:成功创建-自动日巡检")
@allure.severity("blocker")
def test_case_04(self):
name = self.base_name + "04"
host_name = self.base_host_name + "04"
host_id = self.app.case_create_host(name=host_name, host_type=self.base_type_snmp,
iplist=self.base_ip, port=self.base_port,
businessIds=[self.base_business_id_1, self.base_business_id_2])
response = self.app.inspection_rule_create(token=self.token, name=name, time="[][14]", period=1, _type=2, hosts=host_id)
self.app.check_code(response=response, code=0)
self.check_select(response=response, name=name, time="[][14]", period=1,
_type=2, hosts=str(host_id), crt_user=self.base_id, status=CaseBase().status["ON"])
@allure.title("inspections.rule.create:成功创建-自动周巡检")
@allure.story("创建巡检任务:成功创建-自动周巡检")
@allure.severity("blocker")
def test_case_05(self):
name = self.base_name + "05"
host_name = self.base_host_name + "05"
host_id = self.app.case_create_host(name=host_name, host_type=self.base_type_snmp,
iplist=self.base_ip, port=self.base_port,
businessIds=[self.base_business_id_1, self.base_business_id_2])
response = self.app.inspection_rule_create(token=self.token, name=name, time="[1,2,3,7][3,21]", period=2, _type=2, hosts=host_id)
self.app.check_code(response=response, code=0)
self.check_select(response=response, name=name, time="[1,2,3,7][3,21]", period=2,
_type=2, hosts=str(host_id), crt_user=self.base_id, status=CaseBase().status["ON"])
@allure.title("inspections.rule.create:成功创建-自动月巡检")
@allure.story("创建巡检任务:成功创建-自动月巡检")
@allure.severity("blocker")
def test_case_06(self):
name = self.base_name + "06"
host_name = self.base_host_name + "06"
host_id = self.app.case_create_host(name=host_name, host_type=self.base_type_snmp,
iplist=self.base_ip, port=self.base_port,
businessIds=[self.base_business_id_1, self.base_business_id_2])
response = self.app.inspection_rule_create(token=self.token, name=name, time="[1,5,16,25,31][4,20]", period=3, _type=2, hosts=host_id)
self.app.check_code(response=response, code=0)
self.check_select(response=response, name=name, time="[1,5,16,25,31][4,20]", period=3,
_type=2, hosts=str(host_id), crt_user=self.base_id, status=CaseBase().status["ON"])
@allure.title("inspections.rule.create:成功创建-手动巡检")
@allure.story("创建巡检任务:成功创建-手动巡检")
@allure.severity("blocker")
def test_case_07(self):
name = self.base_name + "07"
host_name = self.base_host_name + "07"
host_id = self.app.case_create_host(name=host_name, host_type=self.base_type_snmp,
iplist=self.base_ip, port=self.base_port,
businessIds=[self.base_business_id_1, self.base_business_id_2])
response = self.app.inspection_rule_create(token=self.token, name=name, time=None, period=None, _type=1, hosts=host_id)
self.app.check_code(response=response, code=0)
self.check_select(response=response, name=name, time="[1][0]", period=3,
_type=1, hosts=str(host_id), crt_user=self.base_id, status=CaseBase().status["ON"])
if __name__ == "__main__":
from WorkUtils.UtilsPyTest import UtilsPyTest
from WorkUtils.UtilsCmd import UtilsCmd
import os
# 执行自动化测试用例
case_info = os.path.split(__file__)
case = UtilsCmd().pytest_cmd()
r = UtilsPyTest(case=case, case_info=case_info)
r.run_main()
# a = TestInspectionRuleCreate()
# a.setup_class()
# a.setup_method()
# a.test_case_04()
# -*- coding: utf-8 -*-
# 测试用例
# 作者: 陈磊
# 时间: 2019-12-20
from __future__ import division
from WorkCase import CaseBase
from WorkCase.APP import AppBase
from WorkUtils.UtilsLog import UtilsLog
from WorkUtils.UtilsDataBase import UtilsDataBase
from WorkUtils.UtilsResponse import UtilsResponse
from WorkApi.APP.Api.api_login import ApiLogin
from WorkApi.APP.HostType.hostType_create import HostTypeCreate
from WorkData.Argus.hosts import DataHosts
from WorkData.Argus.inspection_rule import DataInspectionRule
import allure
@allure.feature("测试模块:inspection.rule.delete")
class TestInspectionRuleDelete(object):
log = UtilsLog()
env = CaseBase().environment_test
host = env["host"]
db_url = env["db_url"]
db_port = env["db_port"]
db_user = env["db_user"]
db_pw = env["db_pw"]
db_base = env["db_base"]
login_name = "SS测试用户名"
base_id = 10
base_password = "123456"
base_type_agent = 126
base_type_snmp = 127
base_ip = "172.16.3.171"
base_dns = "sanshi.zmops.com"
base_port = 161
base_business_id_1 = 12
base_business_id_2 = 13
base_host_name = "SS 自动化巡检任务专用"
base_name = "SS 自动化专用巡检任务"
@classmethod
def setup_class(cls):
cls.log.debug("开始执行测试套件.......")
session = UtilsDataBase().conn_mysql(db_url=cls.db_url, db_port=cls.db_port, db_base=cls.db_base, db_user=cls.db_user, db_pw=cls.db_pw)
DataInspectionRule().delete_like_name(session=session, name=cls.base_name)
session = UtilsDataBase().conn_mysql(db_url=cls.db_url, db_port=cls.db_port, db_base=cls.db_base, db_user=cls.db_user, db_pw=cls.db_pw)
DataHosts().delete_like_host_name(session=session, host_name=cls.base_host_name)
@classmethod
def teardown_class(cls):
cls.log.debug("结束执行测试套件.......")
def setup_method(self):
self.log.debug("测试用例执行开始...")
self.app = AppBase(host=self.host)
self.app.update_members_del_flag(session=self.db_session(), members_id=self.base_id, value=CaseBase().del_flag["OFF"])
self.app.update_members_status(session=self.db_session(), members_id=self.base_id, value=CaseBase().status["ON"])
self.token = self.app.get_base_token(login_name=self.login_name, base_password=self.base_password)
def teardown_method(self):
self.log.debug("测试用例执行结束...")
self.app.update_members_del_flag(session=self.db_session(), members_id=self.base_id, value=CaseBase().del_flag["OFF"])
self.app.update_members_status(session=self.db_session(), members_id=self.base_id, value=CaseBase().status["ON"])
@allure.step("连接数据库:Argus")
def db_session(self):
session = UtilsDataBase().conn_mysql(db_url=self.db_url, db_port=self.db_port, db_base=self.db_base, db_user=self.db_user, db_pw=self.db_pw)
return session
@allure.step("查询表:inspection_rule")
def select_inspection_rule(self, _id=None, name=None):
session = self.db_session()
sql = DataInspectionRule().select_all_from_allKeys(session=session, _id=_id, name=name)
return sql
@allure.step("校验查询结果")
def check_select(self, rule_id, status, del_flag):
sql = self.select_inspection_rule(_id=rule_id)
assert sql != []
for x, y in enumerate(sql):
assert y.del_flag == del_flag
assert y.status == status
assert x == 0
@allure.title("inspections.rule.delete:token未传")
@allure.story("删除巡检任务:token未传")
@allure.severity("blocker")
def test_case_01(self):
response = self.app.inspection_rule_create()
self.app.check_code(response=response, code=2001)
self.app.check_msg(response=response, msg="无token,请重新登录")
@allure.title("inspections.rule.delete:token的用户已删除")
@allure.story("删除巡检任务:token的用户已删除")
@allure.severity("blocker")
def test_case_02(self):
self.app.update_members_del_flag(session=self.db_session(), members_id=self.base_id, value=CaseBase().del_flag["ON"])
response = self.app.inspection_rule_create(token=self.token)
self.app.check_code(response=response, code=2004)
self.app.check_msg(response=response, msg="用户不存在")
@allure.title("inspections.rule.delete:token的用户已禁用")
@allure.story("删除巡检任务:token的用户已禁用")
@allure.severity("blocker")
def test_case_03(self):
self.app.update_members_status(session=self.db_session(), members_id=self.base_id, value=CaseBase().status["OFF"])
response = self.app.inspection_rule_create(token=self.token)
self.app.check_code(response=response, code=2003)
self.app.check_msg(response=response, msg="账户已被禁用")
@allure.title("inspections.rule.delete:删除一个")
@allure.story("删除巡检任务:删除一个")
@allure.severity("blocker")
def test_case_04(self):
name_1 = self.base_name + "041"
name_2 = self.base_name + "042"
name_3 = self.base_name + "043"
name_4 = self.base_name + "044"
host_name = self.base_host_name + "04"
host_id = self.app.case_create_host(name=host_name, host_type=self.base_type_snmp,
iplist=self.base_ip, port=self.base_port,
businessIds=[self.base_business_id_1, self.base_business_id_2])
rule_id_1 = self.app.case_create_inspection_rule(name=name_1, time=None, period=None, _type=1, hosts=host_id)
rule_id_2 = self.app.case_create_inspection_rule(name=name_2, time="[][16]", period=1, _type=2, hosts=host_id)
rule_id_3 = self.app.case_create_inspection_rule(name=name_3, time="[1,4,6,7][3,21]", period=2, _type=2, hosts=host_id)
rule_id_4 = self.app.case_create_inspection_rule(name=name_4, time="[2,5,20,25,31][5,19]", period=3, _type=2, hosts=host_id)
response = self.app.inspection_rule_delete(token=self.token, ruleids=[rule_id_1])
self.app.inspection_rule_get(token=self.token)
self.app.check_code(response=response, code=0)
self.check_select(rule_id=rule_id_2, status=CaseBase().status["ON"], del_flag=CaseBase().del_flag["ON"])
@allure.title("inspections.rule.delete:删除多个")
@allure.story("删除巡检任务:删除多个")
@allure.severity("blocker")
def test_case_05(self):
name_1 = self.base_name + "051"
name_2 = self.base_name + "052"
name_3 = self.base_name + "053"
name_4 = self.base_name + "054"
host_name = self.base_host_name + "05"
host_id = self.app.case_create_host(name=host_name, host_type=self.base_type_snmp,
iplist=self.base_ip, port=self.base_port,
businessIds=[self.base_business_id_1, self.base_business_id_2])
rule_id_1 = self.app.case_create_inspection_rule(name=name_1, time=None, period=None, _type=1, hosts=host_id)
rule_id_2 = self.app.case_create_inspection_rule(name=name_2, time="[][16]", period=1, _type=2, hosts=host_id)
rule_id_3 = self.app.case_create_inspection_rule(name=name_3, time="[1,4,6,7][3,21]", period=2, _type=2, hosts=host_id)
rule_id_4 = self.app.case_create_inspection_rule(name=name_4, time="[2,5,20,25,31][5,19]", period=3, _type=2, hosts=host_id)
response = self.app.inspection_rule_delete(token=self.token, ruleids=[rule_id_1, rule_id_2, rule_id_3, rule_id_4])
self.app.inspection_rule_get(token=self.token)
self.app.check_code(response=response, code=0)
self.check_select(rule_id=rule_id_1, status=CaseBase().status["ON"], del_flag=CaseBase().del_flag["ON"])
self.check_select(rule_id=rule_id_2, status=CaseBase().status["ON"], del_flag=CaseBase().del_flag["ON"])
self.check_select(rule_id=rule_id_3, status=CaseBase().status["ON"], del_flag=CaseBase().del_flag["ON"])
self.check_select(rule_id=rule_id_4, status=CaseBase().status["ON"], del_flag=CaseBase().del_flag["ON"])
if __name__ == "__main__":
from WorkUtils.UtilsPyTest import UtilsPyTest
from WorkUtils.UtilsCmd import UtilsCmd
import os
# 执行自动化测试用例
case_info = os.path.split(__file__)
case = UtilsCmd().pytest_cmd()
r = UtilsPyTest(case=case, case_info=case_info)
r.run_main()
# a = TestInspectionRuleDelete()
# a.setup_class()
# a.setup_method()
# a.test_case_05()
# -*- coding: utf-8 -*-
# 测试用例
# 作者: 陈磊
# 时间: 2019-12-20
from __future__ import division
from WorkCase import CaseBase
from WorkCase.APP import AppBase
from WorkUtils.UtilsLog import UtilsLog
from WorkUtils.UtilsDataBase import UtilsDataBase
from WorkUtils.UtilsResponse import UtilsResponse
from WorkApi.APP.Api.api_login import ApiLogin
from WorkApi.APP.HostType.hostType_create import HostTypeCreate
from WorkData.Argus.hosts import DataHosts
from WorkData.Argus.inspection_rule import DataInspectionRule
import allure
@allure.feature("测试模块:inspection.rule.status")
class TestInspectionRuleStatus(object):
log = UtilsLog()
env = CaseBase().environment_test
host = env["host"]
db_url = env["db_url"]
db_port = env["db_port"]
db_user = env["db_user"]
db_pw = env["db_pw"]
db_base = env["db_base"]
login_name = "SS测试用户名"
base_id = 10
base_password = "123456"
base_type_agent = 126
base_type_snmp = 127
base_ip = "172.16.3.171"
base_dns = "sanshi.zmops.com"
base_port = 161
base_business_id_1 = 12
base_business_id_2 = 13
base_host_name = "SS 自动化巡检任务专用"
base_name = "SS 自动化专用巡检任务"
@classmethod
def setup_class(cls):
cls.log.debug("开始执行测试套件.......")
session = UtilsDataBase().conn_mysql(db_url=cls.db_url, db_port=cls.db_port, db_base=cls.db_base, db_user=cls.db_user, db_pw=cls.db_pw)
DataInspectionRule().delete_like_name(session=session, name=cls.base_name)
session = UtilsDataBase().conn_mysql(db_url=cls.db_url, db_port=cls.db_port, db_base=cls.db_base, db_user=cls.db_user, db_pw=cls.db_pw)
DataHosts().delete_like_host_name(session=session, host_name=cls.base_host_name)
@classmethod
def teardown_class(cls):
cls.log.debug("结束执行测试套件.......")
def setup_method(self):
self.log.debug("测试用例执行开始...")
self.app = AppBase(host=self.host)
self.app.update_members_del_flag(session=self.db_session(), members_id=self.base_id, value=CaseBase().del_flag["OFF"])
self.app.update_members_status(session=self.db_session(), members_id=self.base_id, value=CaseBase().status["ON"])
self.token = self.app.get_base_token(login_name=self.login_name, base_password=self.base_password)
def teardown_method(self):
self.log.debug("测试用例执行结束...")
self.app.update_members_del_flag(session=self.db_session(), members_id=self.base_id, value=CaseBase().del_flag["OFF"])
self.app.update_members_status(session=self.db_session(), members_id=self.base_id, value=CaseBase().status["ON"])
@allure.step("连接数据库:Argus")
def db_session(self):
session = UtilsDataBase().conn_mysql(db_url=self.db_url, db_port=self.db_port, db_base=self.db_base, db_user=self.db_user, db_pw=self.db_pw)
return session
@allure.step("查询表:inspection_rule")
def select_inspection_rule(self, _id=None, name=None):
session = self.db_session()
sql = DataInspectionRule().select_all_from_allKeys(session=session, _id=_id, name=name)
return sql
@allure.step("校验查询结果")
def check_select(self, rule_id, status, del_flag):
sql = self.select_inspection_rule(_id=rule_id)
assert sql != []
for x, y in enumerate(sql):
assert y.del_flag == del_flag
assert y.status == status
assert x == 0
@allure.title("inspections.rule.status:token未传")
@allure.story("更新巡检任务状态:token未传")
@allure.severity("blocker")
def test_case_01(self):
response = self.app.inspection_rule_create()
self.app.check_code(response=response, code=2001)
self.app.check_msg(response=response, msg="无token,请重新登录")
@allure.title("inspections.rule.status:token的用户已删除")
@allure.story("更新巡检任务状态:token的用户已删除")
@allure.severity("blocker")
def test_case_02(self):
self.app.update_members_del_flag(session=self.db_session(), members_id=self.base_id, value=CaseBase().del_flag["ON"])
response = self.app.inspection_rule_create(token=self.token)
self.app.check_code(response=response, code=2004)
self.app.check_msg(response=response, msg="用户不存在")
@allure.title("inspections.rule.status:token的用户已禁用")
@allure.story("更新巡检任务状态:token的用户已禁用")
@allure.severity("blocker")
def test_case_03(self):
self.app.update_members_status(session=self.db_session(), members_id=self.base_id, value=CaseBase().status["OFF"])
response = self.app.inspection_rule_create(token=self.token)
self.app.check_code(response=response, code=2003)
self.app.check_msg(response=response, msg="账户已被禁用")
@allure.title("inspections.rule.status:关闭任务")
@allure.story("更新巡检任务状态:关闭任务")
@allure.severity("blocker")
def test_case_04(self):
name = self.base_name + "04"
host_name = self.base_host_name + "04"
host_id = self.app.case_create_host(name=host_name, host_type=self.base_type_snmp,
iplist=self.base_ip, port=self.base_port,
businessIds=[self.base_business_id_1, self.base_business_id_2])
rule_id = self.app.case_create_inspection_rule(name=name, time=None, period=None, _type=1, hosts=host_id)
response = self.app.inspection_rule_status(token=self.token, _id=rule_id, status=CaseBase().status["OFF"])
self.app.check_code(response=response, code=0)
self.check_select(rule_id=rule_id, status=CaseBase().status["OFF"], del_flag=CaseBase().del_flag["OFF"])
@allure.title("inspections.rule.status:启用任务")
@allure.story("更新巡检任务状态:启用任务")
@allure.severity("blocker")
def test_case_05(self):
name = self.base_name + "05"
host_name = self.base_host_name + "05"
host_id = self.app.case_create_host(name=host_name, host_type=self.base_type_snmp,
iplist=self.base_ip, port=self.base_port,
businessIds=[self.base_business_id_1, self.base_business_id_2])
rule_id = self.app.case_create_inspection_rule(name=name, time=None, period=None, _type=1, hosts=host_id)
response = self.app.inspection_rule_status(token=self.token, _id=rule_id, status=CaseBase().status["OFF"])
self.app.check_code(response=response, code=0)
self.check_select(rule_id=rule_id, status=CaseBase().status["OFF"], del_flag=CaseBase().del_flag["OFF"])
response = self.app.inspection_rule_status(token=self.token, _id=rule_id, status=CaseBase().status["ON"])
self.app.check_code(response=response, code=0)
self.check_select(rule_id=rule_id, status=CaseBase().status["ON"], del_flag=CaseBase().del_flag["OFF"])
if __name__ == "__main__":
from WorkUtils.UtilsPyTest import UtilsPyTest
from WorkUtils.UtilsCmd import UtilsCmd
import os
# 执行自动化测试用例
case_info = os.path.split(__file__)
case = UtilsCmd().pytest_cmd()
r = UtilsPyTest(case=case, case_info=case_info)
r.run_main()
# a = TestInspectionRuleStatus()
# a.setup_class()
# a.setup_method()
# a.test_case_05()
...@@ -64,6 +64,12 @@ from WorkApi.APP.Settings.settings_changePassword import SettingsChangePassword ...@@ -64,6 +64,12 @@ from WorkApi.APP.Settings.settings_changePassword import SettingsChangePassword
from WorkData.Argus.members import DataMembers from WorkData.Argus.members import DataMembers
from WorkUtils.UtilsResponse import UtilsResponse from WorkUtils.UtilsResponse import UtilsResponse
from WorkApi.APP.Inspection.Rule.inspection_rule_create import InspectionRuleCreate
from WorkApi.APP.Inspection.Rule.inspection_rule_delete import InspectionRuleDelete
from WorkApi.APP.Inspection.Rule.inspection_rule_get import InspectionRuleGet
from WorkApi.APP.Inspection.Rule.inspection_rule_update import InspectionRuleUpdate
from WorkApi.APP.Inspection.Rule.inspection_rule_status import InspectionRuleStatus
class AppBase(object): class AppBase(object):
def __init__(self, host): def __init__(self, host):
...@@ -949,3 +955,65 @@ class AppBase(object): ...@@ -949,3 +955,65 @@ class AppBase(object):
api.eventid = eventid api.eventid = eventid
api.get_response() api.get_response()
return api.response return api.response
@allure.step("调用接口:inspection.rule.create")
def inspection_rule_create(self, token=None, name=None, _type=None, time=None, period=None, hosts=None, status=None):
api = InspectionRuleCreate(_host=self.host)
api.token = token
api.name = name
api.type = _type
api.time = time
api.period = period
api.hosts = hosts
api.status = status
api.get_response()
return api.response
@allure.step("调用接口:inspection.rule.get")
def inspection_rule_get(self, token=None, name=None, period=None):
api = InspectionRuleGet(_host=self.host)
api.token = token
api.name = name
api.period = period
api.get_response()
return api.response
@allure.step("调用接口:inspection.rule.update")
def inspection_rule_update(self, token=None, _id=None, name=None, _type=None, time=None, period=None,
hosts=None, status=None):
api = InspectionRuleUpdate(_host=self.host)
api.token = token
api.id = _id
api.name = name
api.type = _type
api.time = time
api.period = period
api.hosts = hosts
api.status = status
api.get_response()
return api.response
@allure.step("调用接口:inspection.rule.delete")
def inspection_rule_delete(self, token=None, ruleids=None):
api = InspectionRuleDelete(_host=self.host)
api.token = token
api.ruleids = ruleids
api.get_response()
return api.response
@allure.step("调用接口:inspection.rule.status")
def inspection_rule_status(self, token=None, _id=None, status=None):
api = InspectionRuleStatus(_host=self.host)
api.token = token
api.id = _id
api.status = status
api.get_response()
return api.response
@allure.step("创建测试数据-巡检任务")
def case_create_inspection_rule(self, name=None, time=None, period=None, _type=None, hosts=None, status=None):
response = self.inspection_rule_create(token=self.token, name=name, time=time, period=period, _type=_type, hosts=hosts, status=status)
self.check_code(response=response, code=0)
data = UtilsResponse().get_data(response=response)
rule_id = data[0]
return rule_id
...@@ -49,8 +49,8 @@ class CaseBase: ...@@ -49,8 +49,8 @@ class CaseBase:
} }
self.status = { self.status = {
"ON": 1, "ON": 0,
"OFF": 0 "OFF": 1
} }
self.del_flag = { self.del_flag = {
......
# -*- coding: utf_8 -*-
# 表名: inspection_rule
# 作者: 陈磊
# 时间: 2019-12-19
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, VARCHAR, SMALLINT, DATETIME, DECIMAL, BigInteger, FLOAT, NUMERIC, TEXT
from sqlalchemy.orm.exc import MultipleResultsFound, NoResultFound
from sqlalchemy import or_
from WorkUtils.UtilsLog import UtilsLog
from WorkUtils.UtilsDataBase import UtilsDataBase
import time
Base = declarative_base()
class InspectionRule(Base):
"""
巡检任务
"""
__tablename__ = "inspection_rule"
id = Column(Integer, primary_key=True)
name = Column(VARCHAR(255), comment="名称")
type = Column(Integer, comment="1-手动巡检 2-自动巡检")
time = Column(VARCHAR(255), comment="巡检时间 [1-31][1,17]")
period = Column(Integer, comment="1-日巡检,2-周巡检,3-月巡检")
hosts = Column(VARCHAR(255), comment="主机 1,2,3")
crt_user = Column(Integer, comment="规则id")
status = Column(Integer, comment="0-启用 1-禁用")
del_flag = Column(Integer, comment="'1-删除 0-存在")
class DataInspectionRule(object):
def __init__(self):
self.log = UtilsLog()
self.log.info(self.__class__)
self.table = InspectionRule
def select_all_from_allKeys(self, session, _id=None, name=None):
"""
:param session: 指针
:param _id:
:param name:
:return: 查询结果
"""
self.log.debug("查询数据库:")
try:
base = UtilsDataBase()
base.add_param(_key="id", value=_id)
base.add_param(_key="name", value=name)
sql_rep = session.query(self.table).filter_by(**base.param).all()
self.log.debug(sql_rep)
session.close()
return sql_rep
except UtilsDataBase().errors as error:
self.log.error("异常:")
self.log.error(error)
session.close()
return error
def delete_like_name(self, session, name):
"""
:param session: 指针
:param name:
:return:
"""
self.log.debug("查询数据库:")
try:
self.log.debug("删除相关测试数据")
session.query(self.table).filter(self.table.name.like("%" + str(name) + "%")).delete(synchronize_session=False)
# session.delete(sql)
session.commit()
session.close()
self.log.debug("删除成功")
return True
except UtilsDataBase().errors as error:
self.log.error("异常:")
self.log.error(error)
session.close()
return error
...@@ -152,6 +152,21 @@ class UtilsResponse: ...@@ -152,6 +152,21 @@ class UtilsResponse:
self.log.error(error) self.log.error(error)
return error return error
def get_data_data(self, response):
"""
:param response: 返回信息
:return: 返回value
"""
self.log.debug(u"解析返回信息中data的data:")
try:
value = json.loads(response)["data"]["data"]
self.log.debug(value)
return value
except KeyError as error:
self.log.error(u"json解析出错缺少Key:")
self.log.error(error)
return error
def get_data_list_records(self, response): def get_data_list_records(self, response):
""" """
:param response: 返回信息 :param response: 返回信息
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment