Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
Z
zmops-test
Project
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
sanshi
zmops-test
Commits
b089ab25
Commit
b089ab25
authored
Nov 27, 2019
by
sanshi
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
历史告警查询
parent
d9c9687e
Show whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
452 additions
and
4 deletions
+452
-4
experience_items_list.py
WorkApi/APP/Experience/Items/experience_items_list.py
+1
-1
experience_problem_list.py
WorkApi/APP/Experience/Problem/experience_problem_list.py
+6
-3
historyWarning_get.py
WorkApi/APP/HistoryWarning/historyWarning_get.py
+29
-0
members_list.py
WorkApi/APP/Members/members_list.py
+38
-0
test_problem_get.py
WorkCase/API/Problem/test_problem_get.py
+8
-0
test_experience_problem_list.py
WorkCase/APP/Experience/test_experience_problem_list.py
+209
-0
test_historyWarning_get.py
WorkCase/APP/HistoryWarning/test_historyWarning_get.py
+161
-0
No files found.
WorkApi/APP/Experience/Items/experience_items_list.py
View file @
b089ab25
...
...
@@ -7,7 +7,7 @@ from WorkUtils.UtilsLog import UtilsLog
from
WorkApi.ApiBase
import
ApiBase
,
GetBase
class
ExperienceItems
Ge
t
(
object
):
class
ExperienceItems
Lis
t
(
object
):
def
__init__
(
self
,
_host
):
"""
:param _host: 域名
...
...
WorkApi/APP/Experience/Problem/experience_problem_list.py
View file @
b089ab25
...
...
@@ -7,7 +7,7 @@ from WorkUtils.UtilsLog import UtilsLog
from
WorkApi.ApiBase
import
ApiBase
,
GetBase
class
ExperienceProblem
Ge
t
(
object
):
class
ExperienceProblem
Lis
t
(
object
):
def
__init__
(
self
,
_host
):
"""
:param _host: 域名
...
...
@@ -26,9 +26,8 @@ class ExperienceProblemGet(object):
self
.
_data
=
{}
self
.
response
=
""
self
.
local_json
=
{}
self
.
token
=
None
self
.
problemName
=
None
self
.
api
=
UtilsRequest
()
...
...
@@ -37,4 +36,8 @@ class ExperienceProblemGet(object):
base
.
dict_add_key
(
_key
=
"token"
,
value
=
self
.
token
)
self
.
_headers
=
base
.
_json
base
=
ApiBase
()
base
.
dict_add_key
(
_key
=
"problemName"
,
value
=
self
.
problemName
)
self
.
_params
=
base
.
_json
self
.
response
=
self
.
api
.
get
(
url
=
self
.
_url
,
params
=
self
.
_params
)
WorkApi/APP/HistoryWarning/historyWarning_get.py
View file @
b089ab25
...
...
@@ -27,6 +27,19 @@ class HistoryWarningGet(object):
self
.
response
=
""
self
.
token
=
{}
self
.
severity
=
None
self
.
manageLevel
=
None
self
.
solveStatus
=
None
self
.
acknowledged
=
None
self
.
ignoreStatus
=
None
self
.
knowledgeBase
=
None
self
.
dateFrom
=
None
self
.
dateTo
=
None
self
.
object
=
None
self
.
source
=
None
self
.
alarmRuleid
=
None
self
.
experienceid
=
None
self
.
businessid
=
None
self
.
api
=
UtilsRequest
()
...
...
@@ -34,4 +47,20 @@ class HistoryWarningGet(object):
base
=
ApiBase
()
base
.
dict_add_key
(
_key
=
"token"
,
value
=
self
.
token
)
self
.
_headers
=
base
.
_json
base
=
ApiBase
()
base
.
dict_add_key
(
_key
=
"severity"
,
value
=
self
.
severity
)
base
.
dict_add_key
(
_key
=
"manageLevel"
,
value
=
self
.
manageLevel
)
base
.
dict_add_key
(
_key
=
"solveStatus"
,
value
=
self
.
solveStatus
)
base
.
dict_add_key
(
_key
=
"acknowledged"
,
value
=
self
.
acknowledged
)
base
.
dict_add_key
(
_key
=
"ignoreStatus"
,
value
=
self
.
ignoreStatus
)
base
.
dict_add_key
(
_key
=
"knowledgeBase"
,
value
=
self
.
knowledgeBase
)
base
.
dict_add_key
(
_key
=
"dateFrom"
,
value
=
self
.
dateFrom
)
base
.
dict_add_key
(
_key
=
"dateTo"
,
value
=
self
.
dateTo
)
base
.
dict_add_key
(
_key
=
"object"
,
value
=
self
.
object
)
base
.
dict_add_key
(
_key
=
"source"
,
value
=
self
.
source
)
base
.
dict_add_key
(
_key
=
"alarmRuleid"
,
value
=
self
.
alarmRuleid
)
base
.
dict_add_key
(
_key
=
"experienceid"
,
value
=
self
.
experienceid
)
base
.
dict_add_key
(
_key
=
"businessid"
,
value
=
self
.
businessid
)
self
.
_params
=
base
.
_json
self
.
response
=
self
.
api
.
get
(
url
=
self
.
_url
,
params
=
self
.
_params
)
WorkApi/APP/Members/members_list.py
0 → 100644
View file @
b089ab25
# -*- coding: utf-8 -*-
# 查询主机下拉框
# 作者: 陈磊
# 时间: 2019-11-27
from
WorkUtils.UtilsRequest
import
UtilsRequest
from
WorkUtils.UtilsLog
import
UtilsLog
from
WorkApi.ApiBase
import
ApiBase
,
GetBase
class
MembersList
(
object
):
def
__init__
(
self
,
_host
):
"""
:param _host: 域名
:return:
"""
self
.
log
=
UtilsLog
()
self
.
log
.
info
(
"调用主机下拉框"
)
self
.
log
.
info
(
self
.
__class__
)
self
.
_host
=
_host
self
.
_headers
=
{}
self
.
_path
=
"/members/list"
self
.
_url
=
self
.
_host
+
self
.
_path
self
.
_params
=
{}
self
.
_json
=
{}
self
.
_data
=
{}
self
.
response
=
""
self
.
token
=
None
self
.
api
=
UtilsRequest
()
def
get_response
(
self
):
base
=
ApiBase
()
base
.
dict_add_key
(
_key
=
"token"
,
value
=
self
.
token
)
self
.
_headers
=
base
.
_json
self
.
response
=
self
.
api
.
get
(
url
=
self
.
_url
,
headers
=
self
.
_headers
,
params
=
self
.
_params
)
WorkCase/API/Problem/test_problem_get.py
View file @
b089ab25
...
...
@@ -180,6 +180,14 @@ class TestProblemGet(object):
self
.
check_code
(
response
=
response
,
code
=
0
)
self
.
check_sql
(
response
=
response
)
@allure.title
(
"problem.get:模糊搜索, output=[source, object, objectid, name, severity]"
)
@allure.story
(
"查询告警:模糊搜索, output=[source, object, objectid, name, severity]"
)
def
test_case_09
(
self
):
response
=
self
.
problem_get
(
search
=
{
"name"
:
[
"*"
+
"No"
+
"*"
]},
searchByAny
=
True
,
searchWildcardsEnabled
=
True
,
output
=
[
"source"
,
"object"
,
"objectid"
,
"name"
,
"severity"
])
self
.
check_code
(
response
=
response
,
code
=
0
)
self
.
check_sql
(
response
=
response
)
if
__name__
==
"__main__"
:
from
WorkUtils.UtilsPyTest
import
UtilsPyTest
...
...
WorkCase/APP/Experience/test_experience_problem_list.py
View file @
b089ab25
# -*- coding: utf-8 -*-
# 测试用例
# 作者: 陈磊
# 时间: 2019-11-27
from
__future__
import
division
from
WorkApi.API.Problem.problem_get
import
ProblemGet
from
WorkApi.APP.Api.api_login
import
ApiLogin
from
WorkUtils.UtilsLog
import
UtilsLog
from
WorkUtils.UtilsDataBase
import
UtilsDataBase
from
WorkUtils.UtilsResponse
import
UtilsResponse
from
WorkApi.APP.Experience.Problem.experience_problem_list
import
ExperienceProblemList
from
WorkCase
import
CaseBase
import
allure
@allure.feature
(
"测试模块:experience.problem.list"
)
class
TestExperienceProblemList
(
object
):
log
=
UtilsLog
()
env
=
CaseBase
()
.
app_environment
host
=
env
[
"host"
]
db_url
=
env
[
"db_url"
]
db_port
=
env
[
"db_port"
]
db_user
=
env
[
"db_user"
]
db_pw
=
env
[
"db_pw"
]
db_base
=
env
[
"db_base"
]
base_env
=
CaseBase
()
.
environment
base_host
=
base_env
[
"host"
]
login_name
=
"SS测试用户名"
base_id
=
10
base_password
=
"123456"
@classmethod
def
setup_class
(
cls
):
cls
.
log
.
debug
(
"开始执行测试套件......."
)
@classmethod
def
teardown_class
(
cls
):
cls
.
log
.
debug
(
"结束执行测试套件......."
)
def
setup_method
(
self
):
self
.
log
.
debug
(
"测试用例执行开始..."
)
def
teardown_method
(
self
):
self
.
log
.
debug
(
"测试用例执行结束..."
)
@allure.step
(
"调用接口:experience.problem.list"
)
def
experience_problem_list
(
self
,
token
=
None
,
problemName
=
None
):
api
=
ExperienceProblemList
(
_host
=
self
.
host
)
api
.
token
=
token
api
.
problemName
=
problemName
api
.
get_response
()
return
api
.
response
@allure.step
(
"连接数据库:Argus"
)
def
db_session
(
self
):
session
=
UtilsDataBase
()
.
conn_mysql
(
db_url
=
self
.
db_url
,
db_port
=
self
.
db_port
,
db_base
=
self
.
db_base
,
db_user
=
self
.
db_user
,
db_pw
=
self
.
db_pw
)
return
session
@allure.step
(
"断言返回结果:校验排序"
)
def
check_sortfield
(
self
,
response
,
possible
):
result
=
UtilsResponse
()
.
get_result
(
response
=
response
)
UtilsResponse
()
.
check_sort
(
_list
=
result
,
key
=
"eventid"
,
possible
=
possible
)
@allure.step
(
"断言返回结果:校验返回数据的数量"
)
def
check_num
(
self
,
response
,
num
):
result
=
UtilsResponse
()
.
get_result
(
response
=
response
)
assert
len
(
result
)
==
num
@allure.step
(
"断言返回结果"
)
def
check_code
(
self
,
response
,
code
):
_code
=
UtilsResponse
()
.
get_code
(
response
=
response
)
assert
_code
==
code
@allure.step
(
"调用接口:api.login"
)
def
api_login
(
self
,
name
=
None
,
password
=
None
):
api
=
ApiLogin
(
_host
=
self
.
host
)
api
.
name
=
name
api
.
password
=
password
api
.
get_response
()
return
api
.
response
@allure.step
(
"获取token"
)
def
get_base_token
(
self
):
response
=
self
.
api_login
(
name
=
self
.
login_name
,
password
=
self
.
base_password
)
self
.
token
=
UtilsResponse
()
.
get_data
(
response
=
response
)
@allure.step
(
"调用接口:problem.get"
)
def
problem_get
(
self
,
eventids
=
None
,
groupids
=
None
,
hostids
=
None
,
objectids
=
None
,
applicationids
=
None
,
source
=
None
,
_object
=
None
,
acknowledged
=
None
,
suppressed
=
None
,
severities
=
None
,
evaltype
=
None
,
tags
=
None
,
recent
=
None
,
eventid_from
=
None
,
eventid_till
=
None
,
time_from
=
None
,
time_till
=
None
,
selectAcknowledges
=
None
,
selectTags
=
None
,
selectSuppressionData
=
None
,
countOutput
=
None
,
editable
=
None
,
excludeSearch
=
None
,
_filter
=
None
,
limit
=
None
,
output
=
None
,
preservekeys
=
None
,
search
=
None
,
searchByAny
=
None
,
searchWildcardsEnabled
=
None
,
sortfield
=
None
,
sortorder
=
None
,
startSearch
=
None
):
api
=
ProblemGet
(
_host
=
self
.
base_host
)
api
.
eventids
=
eventids
api
.
groupids
=
groupids
api
.
hostids
=
hostids
api
.
objectids
=
objectids
api
.
applicationids
=
applicationids
api
.
source
=
source
api
.
object
=
_object
api
.
acknowledged
=
acknowledged
api
.
suppressed
=
suppressed
api
.
severities
=
severities
api
.
evaltype
=
evaltype
api
.
tags
=
tags
api
.
recent
=
recent
api
.
eventid_from
=
eventid_from
api
.
eventid_till
=
eventid_till
api
.
time_from
=
time_from
api
.
time_till
=
time_till
api
.
selectAcknowledges
=
selectAcknowledges
api
.
selectTags
=
selectTags
api
.
selectSuppressionData
=
selectSuppressionData
api
.
countOutput
=
countOutput
api
.
editable
=
editable
api
.
excludeSearch
=
excludeSearch
api
.
filter
=
_filter
api
.
limit
=
limit
api
.
output
=
output
api
.
preservekeys
=
preservekeys
api
.
search
=
search
api
.
searchByAny
=
searchByAny
api
.
searchWildcardsEnabled
=
searchWildcardsEnabled
api
.
sortfield
=
sortfield
api
.
sortorder
=
sortorder
api
.
startSearch
=
startSearch
api
.
get_response
()
return
api
.
response
@allure.title
(
"experience.problem.list:token未传"
)
@allure.story
(
"查询告警下拉框:token未传"
)
@allure.severity
(
"blocker"
)
def
test_case_01
(
self
):
self
.
get_base_token
()
response
=
self
.
experience_problem_list
()
self
.
check_code
(
response
=
response
,
code
=
2001
)
self
.
check_msg
(
response
=
response
,
msg
=
"无token,请重新登录"
)
@allure.title
(
"experience.problem.list:token的用户已删除"
)
@allure.story
(
"查询告警下拉框:token的用户已删除"
)
@allure.severity
(
"blocker"
)
def
test_case_02
(
self
):
self
.
get_base_token
()
self
.
update_members_del_flag
(
members_id
=
self
.
base_id
,
value
=
1
)
response
=
self
.
experience_problem_list
(
token
=
self
.
token
)
self
.
check_code
(
response
=
response
,
code
=
2004
)
self
.
check_msg
(
response
=
response
,
msg
=
"用户不存在"
)
@allure.title
(
"experience.problem.list:token的用户已禁用"
)
@allure.story
(
"查询告警下拉框:token的用户已禁用"
)
@allure.severity
(
"blocker"
)
def
test_case_03
(
self
):
self
.
get_base_token
()
self
.
update_members_status
(
members_id
=
self
.
base_id
,
value
=
1
)
response
=
self
.
experience_problem_list
(
token
=
self
.
token
)
self
.
check_code
(
response
=
response
,
code
=
2003
)
self
.
check_msg
(
response
=
response
,
msg
=
"账户已被禁用"
)
@allure.title
(
"experience.problem.list:problemName成功查询"
)
@allure.story
(
"查询告警下拉框:problemName成功查询"
)
@allure.severity
(
"blocker"
)
def
test_case_04
(
self
):
self
.
get_base_token
()
epl_response
=
self
.
experience_problem_list
(
token
=
self
.
token
,
problemName
=
"No"
)
self
.
check_code
(
response
=
epl_response
,
code
=
0
)
epl_ids
=
[]
for
x
,
y
in
enumerate
(
UtilsResponse
()
.
get_data
(
response
=
epl_response
)):
epl_ids
.
append
(
y
[
"id"
])
pg_response
=
self
.
problem_get
(
search
=
{
"name"
:
[
"*"
+
"No"
+
"*"
]},
searchByAny
=
True
,
searchWildcardsEnabled
=
True
,
output
=
[
"source"
,
"object"
,
"objectid"
,
"name"
,
"severity"
])
self
.
check_code
(
response
=
pg_response
,
code
=
0
)
pg_ids
=
[]
for
x
,
y
in
enumerate
(
UtilsResponse
()
.
get_result
(
response
=
pg_response
)):
pg_ids
.
append
(
y
[
"eventid"
])
self
.
log
.
debug
(
"校验与底层接口返回的数据是否一致"
)
for
x
,
y
in
enumerate
(
epl_ids
):
assert
y
in
pg_ids
for
x
,
y
in
enumerate
(
pg_ids
):
assert
y
in
epl_ids
if
__name__
==
"__main__"
:
from
WorkUtils.UtilsPyTest
import
UtilsPyTest
from
WorkUtils.UtilsCmd
import
UtilsCmd
import
os
# 执行自动化测试用例
case_info
=
os
.
path
.
split
(
__file__
)
case
=
UtilsCmd
()
.
pytest_cmd
()
r
=
UtilsPyTest
(
case
=
case
,
case_info
=
case_info
)
r
.
run_main
()
# a = TestExperienceProblemList()
# a.setup_class()
# a.test_case_04()
WorkCase/APP/HistoryWarning/test_historyWarning_get.py
View file @
b089ab25
# -*- coding: utf-8 -*-
# 测试用例
# 作者: 陈磊
# 时间: 2019-11-27
from
__future__
import
division
from
WorkApi.APP.Api.api_login
import
ApiLogin
from
WorkUtils.UtilsLog
import
UtilsLog
from
WorkUtils.UtilsDataBase
import
UtilsDataBase
from
WorkUtils.UtilsResponse
import
UtilsResponse
from
WorkApi.APP.HistoryWarning.historyWarning_get
import
HistoryWarningGet
from
WorkCase
import
CaseBase
import
allure
@allure.feature
(
"测试模块:historyWarning.get"
)
class
TestHistoryWarningGet
(
object
):
log
=
UtilsLog
()
env
=
CaseBase
()
.
app_environment
host
=
env
[
"host"
]
db_url
=
env
[
"db_url"
]
db_port
=
env
[
"db_port"
]
db_user
=
env
[
"db_user"
]
db_pw
=
env
[
"db_pw"
]
db_base
=
env
[
"db_base"
]
base_env
=
CaseBase
()
.
environment
base_host
=
base_env
[
"host"
]
login_name
=
"SS测试用户名"
base_id
=
10
base_password
=
"123456"
@classmethod
def
setup_class
(
cls
):
cls
.
log
.
debug
(
"开始执行测试套件......."
)
@classmethod
def
teardown_class
(
cls
):
cls
.
log
.
debug
(
"结束执行测试套件......."
)
def
setup_method
(
self
):
self
.
log
.
debug
(
"测试用例执行开始..."
)
def
teardown_method
(
self
):
self
.
log
.
debug
(
"测试用例执行结束..."
)
@allure.step
(
"调用接口:historyWarning.get"
)
def
historyWarning_get
(
self
,
token
=
None
,
severity
=
None
,
manageLevel
=
None
,
solveStatus
=
None
,
acknowledged
=
None
,
ignoreStatus
=
None
,
knowledgeBase
=
None
,
dateFrom
=
None
,
dateTo
=
None
,
_object
=
None
,
source
=
None
,
alarmRuleid
=
None
,
experienceid
=
None
,
businessid
=
None
):
api
=
HistoryWarningGet
(
_host
=
self
.
host
)
api
.
token
=
token
api
.
severity
=
severity
api
.
manageLevel
=
manageLevel
api
.
solveStatus
=
solveStatus
api
.
acknowledged
=
acknowledged
api
.
ignoreStatus
=
ignoreStatus
api
.
knowledgeBase
=
knowledgeBase
api
.
dateFrom
=
dateFrom
api
.
dateTo
=
dateTo
api
.
object
=
_object
api
.
source
=
source
api
.
alarmRuleid
=
alarmRuleid
api
.
experienceid
=
experienceid
api
.
businessid
=
businessid
api
.
get_response
()
return
api
.
response
@allure.step
(
"连接数据库:Argus"
)
def
db_session
(
self
):
session
=
UtilsDataBase
()
.
conn_mysql
(
db_url
=
self
.
db_url
,
db_port
=
self
.
db_port
,
db_base
=
self
.
db_base
,
db_user
=
self
.
db_user
,
db_pw
=
self
.
db_pw
)
return
session
@allure.step
(
"断言返回结果:校验排序"
)
def
check_sortfield
(
self
,
response
,
possible
):
result
=
UtilsResponse
()
.
get_result
(
response
=
response
)
UtilsResponse
()
.
check_sort
(
_list
=
result
,
key
=
"eventid"
,
possible
=
possible
)
@allure.step
(
"断言返回结果:校验返回数据的数量"
)
def
check_num
(
self
,
response
,
num
):
result
=
UtilsResponse
()
.
get_result
(
response
=
response
)
assert
len
(
result
)
==
num
@allure.step
(
"断言返回结果"
)
def
check_code
(
self
,
response
,
code
):
_code
=
UtilsResponse
()
.
get_code
(
response
=
response
)
assert
_code
==
code
@allure.step
(
"调用接口:api.login"
)
def
api_login
(
self
,
name
=
None
,
password
=
None
):
api
=
ApiLogin
(
_host
=
self
.
host
)
api
.
name
=
name
api
.
password
=
password
api
.
get_response
()
return
api
.
response
@allure.step
(
"获取token"
)
def
get_base_token
(
self
):
response
=
self
.
api_login
(
name
=
self
.
login_name
,
password
=
self
.
base_password
)
self
.
token
=
UtilsResponse
()
.
get_data
(
response
=
response
)
@allure.title
(
"historyWarning.get:token未传"
)
@allure.story
(
"查询历史告警:token未传"
)
@allure.severity
(
"blocker"
)
def
test_case_01
(
self
):
self
.
get_base_token
()
response
=
self
.
historyWarning_get
()
self
.
check_code
(
response
=
response
,
code
=
2001
)
self
.
check_msg
(
response
=
response
,
msg
=
"无token,请重新登录"
)
@allure.title
(
"historyWarning.get:token的用户已删除"
)
@allure.story
(
"查询历史告警:token的用户已删除"
)
@allure.severity
(
"blocker"
)
def
test_case_02
(
self
):
self
.
get_base_token
()
self
.
update_members_del_flag
(
members_id
=
self
.
base_id
,
value
=
1
)
response
=
self
.
historyWarning_get
(
token
=
self
.
token
)
self
.
check_code
(
response
=
response
,
code
=
2004
)
self
.
check_msg
(
response
=
response
,
msg
=
"用户不存在"
)
@allure.title
(
"historyWarning.get:token的用户已禁用"
)
@allure.story
(
"查询历史告警:token的用户已禁用"
)
@allure.severity
(
"blocker"
)
def
test_case_03
(
self
):
self
.
get_base_token
()
self
.
update_members_status
(
members_id
=
self
.
base_id
,
value
=
1
)
response
=
self
.
historyWarning_get
(
token
=
self
.
token
)
self
.
check_code
(
response
=
response
,
code
=
2003
)
self
.
check_msg
(
response
=
response
,
msg
=
"账户已被禁用"
)
@allure.title
(
"historyWarning.get:problemName成功查询"
)
@allure.story
(
"查询历史告警:problemName成功查询"
)
@allure.severity
(
"blocker"
)
def
test_case_04
(
self
):
self
.
get_base_token
()
response
=
self
.
historyWarning_get
(
token
=
self
.
token
)
self
.
check_code
(
response
=
response
,
code
=
0
)
if
__name__
==
"__main__"
:
from
WorkUtils.UtilsPyTest
import
UtilsPyTest
from
WorkUtils.UtilsCmd
import
UtilsCmd
import
os
# 执行自动化测试用例
# case_info = os.path.split(__file__)
# case = UtilsCmd().pytest_cmd()
# r = UtilsPyTest(case=case, case_info=case_info)
# r.run_main()
a
=
TestHistoryWarningGet
()
a
.
setup_class
()
a
.
test_case_04
()
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment