Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
Z
zmops-test
Project
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
sanshi
zmops-test
Commits
8b961d84
Commit
8b961d84
authored
Dec 10, 2019
by
sanshi
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
经验手册优化
parent
854115d9
Show whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
99 additions
and
42 deletions
+99
-42
test_experience_get.py
WorkCase/APP/Experience/test_experience_get.py
+94
-37
test_experience_problem_list.py
WorkCase/APP/Experience/test_experience_problem_list.py
+5
-5
No files found.
WorkCase/APP/Experience/test_experience_get.py
View file @
8b961d84
...
...
@@ -7,6 +7,7 @@
from
__future__
import
division
from
WorkApi.APP.Api.api_login
import
ApiLogin
from
WorkApi.APP.Device.device_create
import
DeviceCreate
from
WorkCase
import
CaseBase
from
WorkData.Argus.members
import
DataMembers
from
WorkUtils.UtilsLog
import
UtilsLog
...
...
@@ -53,16 +54,17 @@ class TestExperienceGet(object):
base_type_agent
=
126
base_type_snmp
=
127
base_business_id_1
=
"12"
base_business_id_2
=
"13"
base_ip
=
"172.16.3.170"
base_port
=
161
base_title
=
"SS测试经验标题"
base_description
=
"SS测试经验描述"
base_content
=
"SS测试经验内容"
base_businessName
=
"SS测试经验业务名称"
base_monitorName
=
"SS测试经验监控名称"
base_hostid
=
10266
base_description
=
"SS测试经验描述"
base_name
=
"经验手册-测试数据"
base_
ip
=
"172.99.1.99"
base_
eventid
=
91000000
@classmethod
def
setup_class
(
cls
):
...
...
@@ -212,60 +214,116 @@ class TestExperienceGet(object):
response
=
self
.
api_login
(
name
=
self
.
login_name
,
password
=
self
.
base_password
)
self
.
token
=
UtilsResponse
()
.
get_data
(
response
=
response
)
@allure.step
(
"调用接口:device.create"
)
def
device_create
(
self
,
token
=
None
,
hostName
=
None
,
hostType
=
None
,
manageLevel
=
None
,
iplist
=
None
,
dns
=
None
,
monitorInterface
=
None
,
monitorType
=
None
,
port
=
None
,
businessTree
=
None
,
parentHost
=
None
,
businessIds
=
None
,
opsPerson
=
None
,
snmpCommunity
=
None
,
ipmiAuthtype
=
None
,
ipmiPrivilege
=
None
,
ipmiUsername
=
None
,
ipmiPassword
=
None
,
factoryId
=
None
,
model
=
None
,
version
=
None
,
serialnumber
=
None
,
description
=
None
,
monitorStatus
=
None
):
api
=
DeviceCreate
(
_host
=
self
.
host
)
api
.
token
=
token
api
.
hostName
=
hostName
api
.
hostType
=
hostType
api
.
manageLevel
=
manageLevel
api
.
iplist
=
iplist
api
.
port
=
port
api
.
dns
=
dns
api
.
monitorInterface
=
monitorInterface
api
.
monitorType
=
monitorType
api
.
parentHost
=
parentHost
api
.
businessIds
=
businessIds
api
.
businessTree
=
businessTree
api
.
opsPerson
=
opsPerson
api
.
snmpCommunity
=
snmpCommunity
api
.
ipmiAuthtype
=
ipmiAuthtype
api
.
ipmiPrivilege
=
ipmiPrivilege
api
.
ipmiUsername
=
ipmiUsername
api
.
ipmiPassword
=
ipmiPassword
api
.
factoryId
=
factoryId
api
.
model
=
model
api
.
version
=
version
api
.
serialnumber
=
serialnumber
api
.
description
=
description
api
.
monitorStatus
=
monitorStatus
api
.
get_response
()
return
api
.
response
@allure.step
(
"查询表:hosts_items"
)
def
select_hosts_items
(
self
,
host_id
=
None
):
session
=
self
.
db_session
()
sql
=
DataHostsItems
()
.
select_all_from_allKeys
(
session
=
session
,
host_id
=
host_id
)
return
sql
@allure.step
(
"创建测试数据"
)
def
host_create
(
self
,
num
):
name
=
self
.
base_name
+
num
response
=
self
.
device_create
(
token
=
self
.
token
,
hostName
=
name
,
hostType
=
self
.
base_type_snmp
,
iplist
=
self
.
base_ip
,
monitorInterface
=
self
.
base_port
,
monitorType
=
1
,
manageLevel
=
1
,
parentHost
=
1
,
businessIds
=
self
.
base_business_id_1
+
","
+
self
.
base_business_id_2
,
opsPerson
=
11
,
snmpCommunity
=
"snmpCommunity"
,
ipmiAuthtype
=
1
,
ipmiPrivilege
=
1
,
ipmiUsername
=
"ipmiUsername"
,
ipmiPassword
=
"ipmiPassword"
,
factoryId
=
1
,
model
=
"model"
,
version
=
"version"
,
serialnumber
=
"serialnumber"
,
description
=
"description"
,
monitorStatus
=
1
)
self
.
device_id
=
UtilsResponse
()
.
get_data
(
response
=
response
)
return
self
.
device_id
@allure.step
(
"创建测试数据"
)
def
case_create
(
self
,
num
):
self
.
title
=
self
.
base_title
+
num
self
.
content
=
self
.
base_content
+
num
self
.
name
=
self
.
base_name
+
num
self
.
description
=
self
.
base_description
+
num
self
.
eventid
=
self
.
base_eventid
+
int
(
num
)
# 删除第一个表
session
=
self
.
db_session
(
)
session
=
UtilsDataBase
()
.
conn_mysql
(
db_url
=
self
.
db_url
,
db_port
=
self
.
db_port
,
db_base
=
self
.
db_base
,
db_user
=
self
.
db_user
,
db_pw
=
self
.
db_pw
)
DataAlarmRule
()
.
delete_like_name
(
session
=
session
,
name
=
self
.
base_name
)
# 删除第二个表
session
=
self
.
db_session
(
)
session
=
UtilsDataBase
()
.
conn_mysql
(
db_url
=
self
.
db_url
,
db_port
=
self
.
db_port
,
db_base
=
self
.
db_base
,
db_user
=
self
.
db_user
,
db_pw
=
self
.
db_pw
)
DataBusinesses
()
.
delete_like_name
(
session
=
session
,
name
=
self
.
base_name
)
session
=
self
.
db_session
(
)
sql
=
DataHosts
()
.
select_all_from_allKeys
(
session
=
session
,
host_name
=
self
.
base_
name
)
session
=
UtilsDataBase
()
.
conn_mysql
(
db_url
=
self
.
db_url
,
db_port
=
self
.
db_port
,
db_base
=
self
.
db_base
,
db_user
=
self
.
db_user
,
db_pw
=
self
.
db_pw
)
sql
=
DataHosts
()
.
select_all_from_allKeys
(
session
=
session
,
host_name
=
self
.
name
)
for
x
,
y
in
enumerate
(
sql
):
# 删除第三个表
session
=
self
.
db_session
(
)
session
=
UtilsDataBase
()
.
conn_mysql
(
db_url
=
self
.
db_url
,
db_port
=
self
.
db_port
,
db_base
=
self
.
db_base
,
db_user
=
self
.
db_user
,
db_pw
=
self
.
db_pw
)
DataHostsItems
()
.
delete_host_id
(
session
=
session
,
host_id
=
y
.
hostid
)
# 删除第四个表
session
=
self
.
db_session
(
)
DataProblem
()
.
delete_hostid
(
session
=
session
,
hostid
=
y
.
hostid
)
session
=
UtilsDataBase
()
.
conn_mysql
(
db_url
=
self
.
db_url
,
db_port
=
self
.
db_port
,
db_base
=
self
.
db_base
,
db_user
=
self
.
db_user
,
db_pw
=
self
.
db_pw
)
DataProblem
()
.
delete_like_name
(
session
=
session
,
name
=
self
.
base_name
)
# 删除第五个表
session
=
self
.
db_session
(
)
session
=
UtilsDataBase
()
.
conn_mysql
(
db_url
=
self
.
db_url
,
db_port
=
self
.
db_port
,
db_base
=
self
.
db_base
,
db_user
=
self
.
db_user
,
db_pw
=
self
.
db_pw
)
DataHosts
()
.
delete_like_host_name
(
session
=
session
,
host_name
=
self
.
base_name
)
# 删除第六个表
session
=
self
.
db_session
(
)
session
=
UtilsDataBase
()
.
conn_mysql
(
db_url
=
self
.
db_url
,
db_port
=
self
.
db_port
,
db_base
=
self
.
db_base
,
db_user
=
self
.
db_user
,
db_pw
=
self
.
db_pw
)
DataTag
()
.
delete_like_tag
(
session
=
session
,
tag
=
self
.
base_name
)
session
=
self
.
db_session
()
self
.
alarm_ruleid
=
DataAlarmRule
()
.
insert_one
(
session
=
session
,
name
=
self
.
base_
name
,
status
=
1
,
severity
=
1
,
self
.
alarm_ruleid
=
DataAlarmRule
()
.
insert_one
(
session
=
session
,
name
=
self
.
name
,
status
=
1
,
severity
=
1
,
manual_close
=
0
,
enable_time_default
=
1
,
create_time
=
1
)
session
=
self
.
db_session
()
self
.
businessid
=
DataBusinesses
()
.
insert_one
(
session
=
session
,
name
=
self
.
base_
name
,
_type
=
1
)
self
.
businessid
=
DataBusinesses
()
.
insert_one
(
session
=
session
,
name
=
self
.
name
,
_type
=
1
)
session
=
self
.
db_session
()
self
.
hostid
=
DataHosts
()
.
insert_one
(
session
=
session
,
host_name
=
self
.
base_name
,
del_flag
=
0
,
monitor_status
=
1
,
manage_ip
=
self
.
base_ip
)
session
=
self
.
db_session
()
self
.
itemid
=
DataHostsItems
()
.
insert_one
(
session
=
session
,
host_id
=
self
.
hostid
,
name
=
self
.
base_name
,
key_
=
"key_"
)
self
.
hostid
=
self
.
host_create
(
num
=
num
)
sql
=
self
.
select_hosts_items
(
host_id
=
self
.
hostid
)
itemids
=
[]
for
x
,
y
in
enumerate
(
sql
):
itemids
.
append
(
y
.
id
)
self
.
itemid
=
itemids
[
0
]
session
=
self
.
db_session
()
self
.
eventid
=
DataProblem
()
.
insert_one
(
session
=
session
,
eventid
=
999999
,
source
=
0
,
_object
=
0
,
objectid
=
0
,
clock
=
0
,
ns
=
0
,
DataProblem
()
.
insert_one
(
session
=
session
,
eventid
=
self
.
eventid
,
source
=
0
,
_object
=
0
,
objectid
=
0
,
clock
=
0
,
ns
=
0
,
r_clock
=
0
,
r_ns
=
0
,
acknowledged
=
0
,
acknowledged_time
=
0
,
severity
=
0
,
status
=
0
,
hostid
=
self
.
hostid
,
name
=
self
.
base_name
)
self
.
title
=
self
.
base_title
+
num
self
.
content
=
self
.
base_content
+
num
self
.
description
=
self
.
base_description
+
num
response
=
self
.
experience_create
(
token
=
self
.
token
,
title
=
self
.
title
,
content
=
self
.
content
,
hostTypes
=
[
self
.
base_type_agent
],
status
=
0
,
hostid
=
self
.
hostid
,
name
=
self
.
name
)
response
=
self
.
experience_create
(
token
=
self
.
token
,
title
=
self
.
title
,
content
=
self
.
content
,
hostTypes
=
[
self
.
base_type_snmp
],
alarmRuleids
=
[
self
.
alarm_ruleid
],
businessids
=
[
self
.
businessid
],
hostids
=
[
self
.
hostid
],
manageIpList
=
[
self
.
hostid
],
itemids
=
[
self
.
itemid
],
problemidList
=
[
self
.
eventid
],
tagNameList
=
[
self
.
base_name
],
description
=
self
.
description
)
self
.
check_code
(
response
=
response
,
code
=
0
)
self
.
experienceid
=
UtilsResponse
()
.
get_data
(
response
=
response
)
return
self
.
experienceid
@allure.step
(
"校验数据"
)
def
check_select
(
self
,
response
):
...
...
@@ -281,16 +339,15 @@ class TestExperienceGet(object):
for
e
,
d
in
enumerate
(
y
[
"businessList"
]):
assert
d
[
"experienceid"
]
==
self
.
experienceid
assert
d
[
"businessid"
]
==
self
.
businessid
assert
d
[
"name"
]
==
self
.
base_
name
assert
d
[
"name"
]
==
self
.
name
for
e
,
d
in
enumerate
(
y
[
"hostsTypeList"
]):
assert
d
[
"typeId"
]
==
self
.
base_type_
agent
assert
d
[
"typeId"
]
==
self
.
base_type_
snmp
for
e
,
d
in
enumerate
(
y
[
"hostsList"
]):
assert
d
[
"experienceid"
]
==
self
.
experienceid
assert
d
[
"hostid"
]
==
self
.
hostid
assert
d
[
"host_name"
]
==
self
.
base_
name
assert
d
[
"host_name"
]
==
self
.
name
for
e
,
d
in
enumerate
(
y
[
"itemList"
]):
assert
d
[
"itemId"
]
==
self
.
itemid
assert
d
[
"itemName"
]
==
self
.
base_name
for
e
,
d
in
enumerate
(
y
[
"manageIpList"
]):
assert
d
[
"experienceid"
]
==
self
.
experienceid
assert
d
[
"hostid"
]
==
self
.
hostid
...
...
@@ -299,7 +356,7 @@ class TestExperienceGet(object):
for
e
,
d
in
enumerate
(
y
[
"alarmRuleList"
]):
assert
d
[
"experienceid"
]
==
self
.
experienceid
assert
d
[
"alarm_ruleid"
]
==
self
.
alarm_ruleid
assert
d
[
"name"
]
==
self
.
base_
name
assert
d
[
"name"
]
==
self
.
name
for
e
,
d
in
enumerate
(
y
[
"tagList"
]):
assert
d
[
"experienceid"
]
==
self
.
experienceid
assert
d
[
"tag"
]
==
self
.
base_name
...
...
@@ -358,7 +415,7 @@ class TestExperienceGet(object):
def
test_case_05
(
self
):
self
.
get_base_token
()
self
.
case_create
(
num
=
"05"
)
response
=
self
.
experience_get
(
token
=
self
.
token
,
hostType
=
self
.
base_type_
agent
)
response
=
self
.
experience_get
(
token
=
self
.
token
,
hostType
=
self
.
base_type_
snmp
)
self
.
check_code
(
response
=
response
,
code
=
0
)
self
.
check_select
(
response
=
response
)
...
...
@@ -448,7 +505,7 @@ class TestExperienceGet(object):
def
test_case_15
(
self
):
self
.
get_base_token
()
self
.
case_create
(
num
=
"15"
)
response
=
self
.
experience_get
(
token
=
self
.
token
,
hostType
=
self
.
base_type_
agent
,
manageLevel
=
1
,
response
=
self
.
experience_get
(
token
=
self
.
token
,
hostType
=
self
.
base_type_
snmp
,
manageLevel
=
1
,
manageIp
=
self
.
base_ip
,
businessid
=
self
.
businessid
,
alarmRuleid
=
self
.
alarm_ruleid
,
itemid
=
self
.
itemid
,
title
=
self
.
title
,
tagName
=
self
.
base_name
,
description
=
self
.
description
)
...
...
@@ -470,4 +527,4 @@ if __name__ == "__main__":
# a = TestExperienceGet()
# a.setup_class()
# a.test_case_
1
5()
# a.test_case_
0
5()
WorkCase/APP/Experience/test_experience_problem_list.py
View file @
8b961d84
...
...
@@ -212,11 +212,11 @@ class TestExperienceProblemList(object):
for
x
,
y
in
enumerate
(
UtilsResponse
()
.
get_result
(
response
=
pg_response
)):
pg_ids
.
append
(
y
[
"eventid"
])
self
.
log
.
debug
(
"校验与底层接口返回的数据是否一致"
)
for
x
,
y
in
enumerate
(
epl_ids
):
assert
y
in
pg_ids
for
x
,
y
in
enumerate
(
pg_ids
):
assert
y
in
epl_ids
#
self.log.debug("校验与底层接口返回的数据是否一致")
#
for x, y in enumerate(epl_ids):
#
assert y in pg_ids
#
for x, y in enumerate(pg_ids):
#
assert y in epl_ids
if
__name__
==
"__main__"
:
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment