Commit ba9dc48c by sanshi

整个资产代码重构

parent d54c67cc
# -*- coding: utf-8 -*-
# 更新资产
# 作者: 陈磊
# 时间: 2019-12-13
from WorkUtils.UtilsRequest import UtilsRequest
from WorkUtils.UtilsLog import UtilsLog
from WorkApi.ApiBase import ApiBase, GetBase
class DeviceUpdate(object):
def __init__(self, _host):
"""
:param _host: 域名
:return:
"""
self.log = UtilsLog()
self.log.info("调用更新资产")
self.log.info(self.__class__)
self._host = _host
self._headers = {}
self._path = "/device/update"
self._url = self._host + self._path
self._params = {}
self._json = {}
self._data = {}
self.response = ""
self.token = None
self.hostid = None
self.hostName = None
self.hostType = None
self.manageLevel = None
self.iplist = None
self.port = None
self.dns = None
self.monitorInterface = None
self.monitorType = None
self.parentHost = None
self.businessIds = None
self.businessTree = None
self.opsPerson = None
self.snmpCommunity = None
self.ipmiAuthtype = None
self.ipmiPrivilege = None
self.ipmiUsername = None
self.ipmiPassword = None
self.factoryId = None
self.model = None
self.version = None
self.serialnumber = None
self.description = None
self.monitorStatus = None
self.api = UtilsRequest()
def get_response(self):
base = ApiBase()
base.dict_add_key(_key="token", value=self.token)
self._headers = base._json
base = ApiBase()
base.dict_add_key(_key="hostid", value=self.hostid)
base.dict_add_key(_key="hostName", value=self.hostName)
base.dict_add_key(_key="hostType", value=self.hostType)
base.dict_add_key(_key="manageLevel", value=self.manageLevel)
base.dict_add_key(_key="iplist", value=self.iplist)
base.dict_add_key(_key="port", value=self.port)
base.dict_add_key(_key="dns", value=self.dns)
base.dict_add_key(_key="monitorInterface", value=self.monitorInterface)
base.dict_add_key(_key="monitorType", value=self.monitorType)
base.dict_add_key(_key="parentHost", value=self.parentHost)
base.dict_add_key(_key="businessIds", value=self.businessIds)
base.dict_add_key(_key="businessTree", value=self.businessTree)
base.dict_add_key(_key="opsPerson", value=self.opsPerson)
base.dict_add_key(_key="snmpCommunity", value=self.snmpCommunity)
base.dict_add_key(_key="ipmiAuthtype", value=self.ipmiAuthtype)
base.dict_add_key(_key="ipmiPrivilege", value=self.ipmiPrivilege)
base.dict_add_key(_key="ipmiUsername", value=self.ipmiUsername)
base.dict_add_key(_key="ipmiPassword", value=self.ipmiPassword)
base.dict_add_key(_key="factoryId", value=self.factoryId)
base.dict_add_key(_key="model", value=self.model)
base.dict_add_key(_key="version", value=self.version)
base.dict_add_key(_key="serialnumber", value=self.serialnumber)
base.dict_add_key(_key="description", value=self.description)
base.dict_add_key(_key="monitorStatus", value=self.monitorStatus)
self._json = base._json
self.response = self.api.post(url=self._url, headers=self._headers, json=self._json)
......@@ -9,6 +9,7 @@ from __future__ import division
import pytest
from WorkCase import CaseBase
from WorkCase.APP import AppBase
from WorkUtils.UtilsLog import UtilsLog
from WorkUtils.UtilsDataBase import UtilsDataBase
from WorkUtils.UtilsResponse import UtilsResponse
......@@ -42,9 +43,11 @@ class TestDeviceAssetIps(object):
base_type_snmp = 127
base_ip = "172.16.3.171"
base_dns = "sanshi.zmops.com"
base_port = 999
base_port = 161
base_name = "SS测试资产-资产IPS"
base_name = "SS 自动化专用"
base_business_id_1 = "12"
base_business_id_2 = "13"
@classmethod
def setup_class(cls):
......@@ -58,79 +61,17 @@ class TestDeviceAssetIps(object):
def setup_method(self):
self.log.debug("测试用例执行开始...")
self.update_members_del_flag(members_id=self.base_id, value=CaseBase().del_flag["OFF"])
self.update_members_status(members_id=self.base_id, value=CaseBase().status["ON"])
self.app = AppBase(host=self.host)
self.app.update_members_del_flag(session=self.db_session(), members_id=self.base_id, value=CaseBase().del_flag["OFF"])
self.app.update_members_status(session=self.db_session(), members_id=self.base_id, value=CaseBase().status["ON"])
self.token = self.app.get_base_token(login_name=self.login_name, base_password=self.base_password)
def teardown_method(self):
self.log.debug("测试用例执行结束...")
self.update_members_del_flag(members_id=self.base_id, value=CaseBase().del_flag["OFF"])
self.update_members_status(members_id=self.base_id, value=CaseBase().status["ON"])
@allure.step("调用接口:api.login")
def api_login(self, name=None, password=None):
api = ApiLogin(_host=self.host)
api.name = name
api.password = password
api.get_response()
return api.response
@allure.step("更新表数据:members -- del_flag")
def update_members_del_flag(self, members_id=None, value=None):
session = self.db_session()
sql = DataMembers().update_del_flag(session=session, _id=members_id, value=value)
return sql
@allure.step("更新表数据:members -- status")
def update_members_status(self, members_id=None, value=None):
session = self.db_session()
sql = DataMembers().update_status(session=session, _id=members_id, value=value)
return sql
@allure.step("调用接口:device.create")
def device_create(self, token=None, hostName=None, hostType=None, manageLevel=None, iplist=None, dns=None,
monitorInterface=None, monitorType=None,
parentHost=None, businessIds=None, opsPerson=None, snmpCommunity=None, ipmiAuthtype=None,
ipmiPrivilege=None, ipmiUsername=None, ipmiPassword=None,
factoryId=None, model=None, version=None, serialnumber=None,
description=None, monitorStatus=None):
api = DeviceCreate(_host=self.host)
api.token = token
api.hostName = hostName
api.hostType = hostType
api.manageLevel = manageLevel
api.iplist = iplist
api.dns = dns
api.monitorInterface = monitorInterface
api.monitorType = monitorType
api.parentHost = parentHost
api.businessIds = businessIds
api.opsPerson = opsPerson
api.snmpCommunity = snmpCommunity
api.ipmiAuthtype = ipmiAuthtype
api.ipmiPrivilege = ipmiPrivilege
api.ipmiUsername = ipmiUsername
api.ipmiPassword = ipmiPassword
api.factoryId = factoryId
api.model = model
api.version = version
api.serialnumber = serialnumber
api.description = description
api.monitorStatus = monitorStatus
api.get_response()
return api.response
@allure.step("调用接口:device.asset.ips")
def device_asset_ips(self, token=None, ipName=None):
api = DeviceAssetIps(_host=self.host)
api.token = token
api.ipName = ipName
api.get_response()
return api.response
@allure.step("获取token")
def get_base_token(self):
response = self.api_login(name=self.login_name, password=self.base_password)
self.token = UtilsResponse().get_data(response=response)
self.app.update_members_del_flag(session=self.db_session(), members_id=self.base_id, value=CaseBase().del_flag["OFF"])
self.app.update_members_status(session=self.db_session(), members_id=self.base_id, value=CaseBase().status["ON"])
@allure.step("连接数据库:Argus")
def db_session(self):
......@@ -154,74 +95,53 @@ class TestDeviceAssetIps(object):
self.check_code(response=response, code=0)
self.device_id = UtilsResponse().get_data(response=response)
@allure.step("断言返回结果")
def check_code(self, response, code):
_code = UtilsResponse().get_code(response=response)
assert _code == code
@allure.step("断言返回结果数据")
def check_msg(self, response, msg):
_msg = UtilsResponse().get_msg(response=response)
assert _msg == msg
@allure.title("device.asset.ips:token未传")
@allure.story("查询资产IPS下拉框:token未传")
@allure.severity("blocker")
@pytest.mark.skip(reason="此接口未加入token验证")
def test_case_01(self):
self.get_base_token()
response = self.device_asset_ips()
self.check_code(response=response, code=2001)
self.check_msg(response=response, msg="无token,请重新登录")
response = self.app.device_asset_ips()
self.app.check_code(response=response, code=2001)
self.app.check_msg(response=response, msg="无token,请重新登录")
@allure.title("device.asset.ips:token的用户已删除")
@allure.story("查询资产IPS下拉框:token的用户已删除")
@allure.severity("blocker")
@pytest.mark.skip(reason="此接口未加入token验证")
def test_case_02(self):
self.get_base_token()
self.update_members_del_flag(members_id=self.base_id, value=CaseBase().del_flag["ON"])
response = self.device_asset_ips(token=self.token)
self.check_code(response=response, code=2004)
self.check_msg(response=response, msg="用户不存在")
self.app.update_members_del_flag(members_id=self.base_id, value=CaseBase().del_flag["ON"])
response = self.app.device_asset_ips(token=self.token)
self.app.check_code(response=response, code=2004)
self.app.check_msg(response=response, msg="用户不存在")
@allure.title("device.asset.ips:token的用户已禁用")
@allure.story("查询资产IPS下拉框:token的用户已禁用")
@allure.severity("blocker")
@pytest.mark.skip(reason="此接口未加入token验证")
def test_case_03(self):
self.get_base_token()
self.update_members_status(members_id=self.base_id, value=CaseBase().status["OFF"])
response = self.device_asset_ips(token=self.token)
self.check_code(response=response, code=2003)
self.check_msg(response=response, msg="账户已被禁用")
self.app.update_members_status(members_id=self.base_id, value=CaseBase().status["OFF"])
response = self.app.device_asset_ips(token=self.token)
self.app.check_code(response=response, code=2003)
self.app.check_msg(response=response, msg="账户已被禁用")
@allure.title("device.asset.ips:无参数调用")
@allure.story("查询资产IPS下拉框:无参数调用")
@allure.severity("blocker")
def test_case_04(self):
self.get_base_token()
self.case_create(num="04")
response = self.device_asset_ips(token=self.token)
self.check_code(response=response, code=0)
response = self.app.device_asset_ips(token=self.token)
self.app.check_code(response=response, code=0)
@allure.title("device.asset.ips:ipName查询")
@allure.story("查询资产IPS下拉框:ipName查询")
def test_case_05(self):
self.get_base_token()
self.case_create(num="05")
name = self.base_name + "05"
self.app.case_create_host(name=name, host_type=self.base_type_snmp,
iplist=self.base_ip, port=self.base_port,
businessIds=self.base_business_id_1 + "," + self.base_business_id_2)
response = self.device_asset_ips(token=self.token, ipName=self.base_ip)
self.check_code(response=response, code=0)
data = UtilsResponse().get_data(response=response)
ids = []
for x, y in enumerate(data):
ids.append(y["id"])
sql = self.select_hosts(hostid=y["id"])
for q, w in enumerate(sql):
assert y["name"] == w.manage_ip
assert self.device_id in ids
response = self.app.device_asset_ips(token=self.token, ipName=self.base_ip)
self.app.check_code(response=response, code=0)
self.app.check_keyValue(_json=UtilsResponse().get_data(response=response), _key="name", _value=self.base_ip)
if __name__ == "__main__":
......@@ -238,4 +158,5 @@ if __name__ == "__main__":
# a = TestDeviceAssetIps()
# a.setup_class()
# a.setup_method()
# a.test_case_05()
......@@ -70,6 +70,7 @@ class TestDeviceDelete(object):
def teardown_method(self):
self.log.debug("测试用例执行结束...")
self.app.update_members_del_flag(session=self.db_session(), members_id=self.base_id, value=CaseBase().del_flag["OFF"])
self.app.update_members_status(session=self.db_session(), members_id=self.base_id, value=CaseBase().status["ON"])
......@@ -135,7 +136,8 @@ class TestDeviceDelete(object):
@allure.story("删除资产:删除一个")
@allure.severity("blocker")
def test_case_04(self):
host_id_1 = self.app.case_create_host(base_name=self.base_name, num="04", host_type=self.base_type_snmp,
name = self.base_name + "04"
host_id_1 = self.app.case_create_host(name=name, host_type=self.base_type_snmp,
iplist=self.base_ip, port=self.base_port,
businessIds=self.base_business_id_1 + "," + self.base_business_id_2)
response = self.app.device_delete(token=self.token, hostids=[host_id_1])
......@@ -146,13 +148,16 @@ class TestDeviceDelete(object):
@allure.story("删除资产:删除多个")
@allure.severity("blocker")
def test_case_05(self):
host_id_1 = self.app.case_create_host(base_name=self.base_name, num="051", host_type=self.base_type_snmp,
name_1 = self.base_name + "051"
name_2 = self.base_name + "052"
name_3 = self.base_name + "053"
host_id_1 = self.app.case_create_host(name=name_1, host_type=self.base_type_snmp,
iplist=self.base_ip, port=self.base_port,
businessIds=self.base_business_id_1 + "," + self.base_business_id_2)
host_id_2 = self.app.case_create_host(base_name=self.base_name, num="052", host_type=self.base_type_snmp,
host_id_2 = self.app.case_create_host(name=name_2, host_type=self.base_type_snmp,
iplist=self.base_ip, port=self.base_port,
businessIds=self.base_business_id_1 + "," + self.base_business_id_2)
host_id_3 = self.app.case_create_host(base_name=self.base_name, num="053", host_type=self.base_type_snmp,
host_id_3 = self.app.case_create_host(name=name_3, host_type=self.base_type_snmp,
iplist=self.base_ip, port=self.base_port,
businessIds=self.base_business_id_1 + "," + self.base_business_id_2)
response = self.app.device_delete(token=self.token, hostids=[host_id_1, host_id_2, host_id_3])
......
......@@ -158,3 +158,21 @@ class DataHosts(object):
self.log.error(error)
session.close()
return error
def select_all_like_host_name(self, session, host_name):
"""
:param session: 指针
:param host_name:
:return: 查询结果
"""
self.log.debug("查询数据库:")
try:
sql_rep = session.query(self.table).filter(self.table.host_name.like(str(host_name) + "%")).all()
self.log.debug(sql_rep)
session.close()
return sql_rep
except UtilsDataBase().errors as error:
self.log.error("异常:")
self.log.error(error)
session.close()
return error
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment