Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
Z
zmops-test
Project
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
sanshi
zmops-test
Commits
3b0cfe1e
Commit
3b0cfe1e
authored
Dec 04, 2019
by
sanshi
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
经验手册
parent
17f0b800
Show whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
408 additions
and
144 deletions
+408
-144
Administrator.xml
.idea/dictionaries/Administrator.xml
+1
-0
test_experience_create.py
WorkCase/APP/Experience/test_experience_create.py
+291
-110
hosts_items.py
WorkData/Argus/hosts_items.py
+1
-1
problem.py
WorkData/Argus/problem.py
+37
-33
tag.py
WorkData/Argus/tag.py
+78
-0
No files found.
.idea/dictionaries/Administrator.xml
View file @
3b0cfe1e
...
...
@@ -83,6 +83,7 @@
<w>
sortorder
</w>
<w>
tablename
</w>
<w>
tagid
</w>
<w>
tagids
</w>
<w>
templated
</w>
<w>
templateid
</w>
<w>
templateids
</w>
...
...
WorkCase/APP/Experience/test_experience_create.py
View file @
3b0cfe1e
# -*- coding: utf-8 -*-
# 测试用例
# 作者: 陈磊
# 时间: 2019-1
1-18
# 时间: 2019-1
2-04
from
__future__
import
division
from
WorkApi.APP.Api.api_login
import
ApiLogin
from
WorkCase
import
CaseBase
from
WorkData.Argus.members
import
DataMembers
from
WorkUtils.UtilsLog
import
UtilsLog
from
WorkUtils.UtilsResponse
import
UtilsResponse
from
WorkUtils.UtilsDataBase
import
UtilsDataBase
from
WorkApi.APP.Experience.experience_create
import
ExperienceCreate
from
WorkData.Argus.experience
import
DataExperience
from
WorkData.Argus.experience_alarm_rule
import
DataExperienceAlarmRule
from
WorkData.Argus.experience_businesses
import
DataExperienceBusinesses
from
WorkData.Argus.experience_hosts
import
DataExperienceHosts
from
WorkData.Argus.experience_hosts_type
import
DataExperienceHostsType
from
WorkData.Argus.experience_tag
import
DataExperienceTag
from
WorkData.Argus.experience_items
import
DataExperienceItems
from
WorkData.Argus.experience_ips
import
DataExperienceIps
from
WorkData.Argus.experience_problem
import
DataExperienceProblem
from
WorkData.Argus.alarm_rule
import
DataAlarmRule
from
WorkData.Argus.businesses
import
DataBusinesses
from
WorkData.Argus.hosts
import
DataHosts
from
WorkData.Argus.hosts_items
import
DataHostsItems
from
WorkData.Argus.problem
import
DataProblem
from
WorkData.Argus.tag
import
DataTag
import
allure
...
...
@@ -29,6 +46,13 @@ class TestExperienceCreate(object):
db_pw
=
env
[
"db_pw"
]
db_base
=
env
[
"db_base"
]
login_name
=
"SS测试用户名"
base_id
=
10
base_password
=
"123456"
base_type_agent
=
126
base_type_snmp
=
127
base_title
=
"SS测试经验标题"
base_description
=
"SS测试经验描述"
base_content
=
"SS测试经验内容"
...
...
@@ -36,6 +60,8 @@ class TestExperienceCreate(object):
base_monitorName
=
"SS测试经验监控名称"
base_hostid
=
10266
base_name
=
"经验手册-测试数据"
@classmethod
def
setup_class
(
cls
):
cls
.
log
.
debug
(
"开始执行测试套件......."
)
...
...
@@ -48,26 +74,57 @@ class TestExperienceCreate(object):
def
setup_method
(
self
):
self
.
log
.
debug
(
"测试用例执行开始..."
)
self
.
update_members_del_flag
(
members_id
=
self
.
base_id
,
value
=
0
)
self
.
update_members_status
(
members_id
=
self
.
base_id
,
value
=
0
)
def
teardown_method
(
self
):
self
.
log
.
debug
(
"测试用例执行结束..."
)
self
.
update_members_del_flag
(
members_id
=
self
.
base_id
,
value
=
0
)
self
.
update_members_status
(
members_id
=
self
.
base_id
,
value
=
0
)
@allure.step
(
"更新表数据:members -- del_flag"
)
def
update_members_del_flag
(
self
,
members_id
=
None
,
value
=
None
):
session
=
self
.
db_session
()
sql
=
DataMembers
()
.
update_del_flag
(
session
=
session
,
_id
=
members_id
,
value
=
value
)
return
sql
@allure.step
(
"更新表数据:members -- status"
)
def
update_members_status
(
self
,
members_id
=
None
,
value
=
None
):
session
=
self
.
db_session
()
sql
=
DataMembers
()
.
update_status
(
session
=
session
,
_id
=
members_id
,
value
=
value
)
return
sql
@allure.step
(
"调用接口:api.login"
)
def
api_login
(
self
,
name
=
None
,
password
=
None
):
api
=
ApiLogin
(
_host
=
self
.
host
)
api
.
name
=
name
api
.
password
=
password
api
.
get_response
()
return
api
.
response
@allure.step
(
"获取token"
)
def
get_base_token
(
self
):
response
=
self
.
api_login
(
name
=
self
.
login_name
,
password
=
self
.
base_password
)
self
.
token
=
UtilsResponse
()
.
get_data
(
response
=
response
)
@allure.step
(
"调用接口:experience.create"
)
def
experience_create
(
self
,
title
=
None
,
content
=
None
,
businessid
=
None
,
businessName
=
None
,
hostType
=
None
,
hostid
=
None
,
versions
=
None
,
monitorid
=
None
,
monitorName
=
None
,
alarmRuleids
=
None
,
description
=
None
,
tagNameList
=
None
):
def
experience_create
(
self
,
token
=
None
,
title
=
None
,
content
=
None
,
businessids
=
None
,
hostTypes
=
None
,
hostids
=
None
,
itemids
=
None
,
manageIpList
=
None
,
alarmRuleids
=
None
,
description
=
None
,
tagNameList
=
None
,
problemidList
=
None
,
hostTypeTree
=
None
):
api
=
ExperienceCreate
(
_host
=
self
.
host
)
api
.
token
=
token
api
.
title
=
title
api
.
content
=
content
api
.
businessid
=
businessid
api
.
businessName
=
businessName
api
.
hostType
=
hostType
api
.
hostid
=
hostid
api
.
versions
=
versions
api
.
monitorid
=
monitorid
api
.
monitorName
=
monitorName
api
.
businessids
=
businessids
api
.
hostTypes
=
hostTypes
api
.
hostids
=
hostids
api
.
itemids
=
itemids
api
.
manageIpList
=
manageIpList
api
.
alarmRuleids
=
alarmRuleids
api
.
description
=
description
api
.
tagNameList
=
tagNameList
api
.
problemidList
=
problemidList
api
.
hostTypeTree
=
hostTypeTree
api
.
get_response
()
return
api
.
response
...
...
@@ -76,122 +133,246 @@ class TestExperienceCreate(object):
_code
=
UtilsResponse
()
.
get_code
(
response
=
response
)
assert
_code
==
code
@allure.step
(
"断言返回结果数据"
)
def
check_msg
(
self
,
response
,
msg
):
_msg
=
UtilsResponse
()
.
get_msg
(
response
=
response
)
assert
_msg
==
msg
@allure.step
(
"连接数据库:Argus"
)
def
db_session
(
self
):
session
=
UtilsDataBase
()
.
conn_mysql
(
db_url
=
self
.
db_url
,
db_port
=
self
.
db_port
,
db_base
=
self
.
db_base
,
db_user
=
self
.
db_user
,
db_pw
=
self
.
db_pw
)
return
session
@allure.step
(
"查询表:experience"
)
def
select_experience
(
self
,
experienceid
=
None
,
title
=
None
,
description
=
None
):
def
select_experience
(
self
,
experienceid
=
None
,
title
=
None
,
content
=
None
):
session
=
self
.
db_session
()
sql
=
DataExperience
()
.
select_all_from_allKeys
(
session
=
session
,
experienceid
=
experienceid
,
title
=
title
,
content
=
content
)
return
sql
@allure.step
(
"查询表:experience_alarm_rule"
)
def
select_experience_alarm_rule
(
self
,
experienceid
=
None
):
session
=
self
.
db_session
()
sql
=
DataExperienceAlarmRule
()
.
select_all_from_allKeys
(
session
=
session
,
experienceid
=
experienceid
)
return
sql
@allure.step
(
"查询表:experience_businesses"
)
def
select_experience_businesses
(
self
,
experienceid
=
None
):
session
=
self
.
db_session
()
sql
=
DataExperienceBusinesses
()
.
select_all_from_allKeys
(
session
=
session
,
experienceid
=
experienceid
)
return
sql
@allure.step
(
"查询表:experience_hosts"
)
def
select_experience_hosts
(
self
,
experienceid
=
None
):
session
=
self
.
db_session
()
sql
=
DataExperience
()
.
select_all_from_allKeys
(
session
=
session
,
experienceid
=
experienceid
,
title
=
title
,
description
=
description
)
sql
=
DataExperience
Hosts
()
.
select_all_from_allKeys
(
session
=
session
,
experienceid
=
experienceid
)
return
sql
@allure.step
(
"校验查询结果"
)
def
check_select
(
self
,
experienceid
,
title
,
content
,
businessid
,
business_name
,
host_type
,
hostid
,
versions
,
monitorid
,
monitor_name
,
description
,
eventid
):
sql
=
self
.
select_experience
(
experienceid
=
experienceid
)
@allure.step
(
"查询表:experience_hosts_type"
)
def
select_experience_hosts_type
(
self
,
experienceid
=
None
):
session
=
self
.
db_session
()
sql
=
DataExperienceHostsType
()
.
select_all_from_allKeys
(
session
=
session
,
experienceid
=
experienceid
)
return
sql
@allure.step
(
"查询表:experience_tag"
)
def
select_experience_tag
(
self
,
experienceid
=
None
):
session
=
self
.
db_session
()
sql
=
DataExperienceTag
()
.
select_all_from_allKeys
(
session
=
session
,
experienceid
=
experienceid
)
return
sql
@allure.step
(
"查询表:tag"
)
def
select_tag
(
self
,
tag
=
None
):
session
=
self
.
db_session
()
sql
=
DataTag
()
.
select_all_from_allKeys
(
session
=
session
,
tag
=
tag
)
return
sql
@allure.step
(
"查询表:experience_items"
)
def
select_experience_items
(
self
,
experienceid
=
None
):
session
=
self
.
db_session
()
sql
=
DataExperienceItems
()
.
select_all_from_allKeys
(
session
=
session
,
experienceid
=
experienceid
)
return
sql
@allure.step
(
"查询表:experience_ips"
)
def
select_experience_ips
(
self
,
experienceid
=
None
):
session
=
self
.
db_session
()
sql
=
DataExperienceIps
()
.
select_all_from_allKeys
(
session
=
session
,
experienceid
=
experienceid
)
return
sql
@allure.step
(
"查询表:experience_problem"
)
def
select_experience_problem
(
self
,
experienceid
=
None
):
session
=
self
.
db_session
()
sql
=
DataExperienceProblem
()
.
select_all_from_allKeys
(
session
=
session
,
experienceid
=
experienceid
)
return
sql
@allure.step
(
"创建测试数据"
)
def
case_create
(
self
,):
# 删除第一个表
session
=
self
.
db_session
()
DataAlarmRule
()
.
delete_like_name
(
session
=
session
,
name
=
self
.
base_name
)
# 删除第二个表
session
=
self
.
db_session
()
DataBusinesses
()
.
delete_like_name
(
session
=
session
,
name
=
self
.
base_name
)
session
=
self
.
db_session
()
sql
=
DataHosts
()
.
select_all_from_allKeys
(
session
=
session
,
host_name
=
self
.
base_name
)
for
x
,
y
in
enumerate
(
sql
):
# 删除第三个表
session
=
self
.
db_session
()
DataHostsItems
()
.
delete_host_id
(
session
=
session
,
host_id
=
y
.
hostid
)
# 删除第四个表
session
=
self
.
db_session
()
DataProblem
()
.
delete_hostid
(
session
=
session
,
hostid
=
y
.
hostid
)
# 删除第五个表
session
=
self
.
db_session
()
DataHosts
()
.
delete_like_host_name
(
session
=
session
,
host_name
=
self
.
base_name
)
# 删除第六个表
session
=
self
.
db_session
()
DataTag
()
.
delete_like_tag
(
session
=
session
,
tag
=
self
.
base_name
)
session
=
self
.
db_session
()
self
.
alarm_ruleid
=
DataAlarmRule
()
.
insert_one
(
session
=
session
,
name
=
self
.
base_name
,
status
=
1
,
severity
=
1
,
manual_close
=
0
,
enable_time_default
=
1
,
create_time
=
1
)
session
=
self
.
db_session
()
self
.
businessid
=
DataBusinesses
()
.
insert_one
(
session
=
session
,
name
=
self
.
base_name
,
_type
=
1
)
session
=
self
.
db_session
()
self
.
hostid
=
DataHosts
()
.
insert_one
(
session
=
session
,
host_name
=
self
.
base_name
,
del_flag
=
0
)
session
=
self
.
db_session
()
self
.
itemid
=
DataHostsItems
()
.
insert_one
(
session
=
session
,
host_id
=
self
.
hostid
,
name
=
self
.
base_name
,
key_
=
"key_"
)
session
=
self
.
db_session
()
self
.
eventid
=
DataProblem
()
.
insert_one
(
session
=
session
,
eventid
=
999999
,
source
=
0
,
_object
=
0
,
objectid
=
0
,
clock
=
0
,
ns
=
0
,
r_clock
=
0
,
r_ns
=
0
,
acknowledged
=
0
,
acknowledged_time
=
0
,
severity
=
0
,
status
=
0
,
hostid
=
self
.
hostid
,
name
=
self
.
base_name
)
@allure.step
(
"校验数据"
)
def
check_select
(
self
,
response
):
self
.
experienceid
=
UtilsResponse
()
.
get_data
(
response
=
response
)
sql
=
self
.
select_experience
(
experienceid
=
self
.
experienceid
)
for
x
,
y
in
enumerate
(
sql
):
assert
y
.
title
==
title
assert
y
.
content
==
content
assert
y
.
businessid
==
businessid
assert
y
.
business_name
==
business_name
assert
y
.
host_type
==
host_type
assert
y
.
hostid
==
hostid
assert
y
.
versions
==
versions
assert
y
.
monitorid
==
monitorid
assert
y
.
monitor_name
==
monitor_name
assert
y
.
description
==
description
assert
y
.
eventid
==
eventid
assert
y
.
title
==
self
.
title
assert
y
.
content
==
self
.
content
@allure.title
(
"experience.create:必填项"
)
@allure.story
(
"创建经验手册:必填项"
)
sql
=
self
.
select_experience_alarm_rule
(
experienceid
=
self
.
experienceid
)
for
x
,
y
in
enumerate
(
sql
):
assert
y
.
title
==
self
.
title
assert
y
.
content
==
self
.
content
@allure.title
(
"experience.create:token未传"
)
@allure.story
(
"创建经验手册:token未传"
)
@allure.severity
(
"blocker"
)
def
test_case_01
(
self
):
title
=
self
.
base_title
+
"01"
description
=
self
.
base_description
+
"01"
response
=
self
.
experience_create
(
title
=
title
,
description
=
description
,
hostType
=
1
)
self
.
check_code
(
response
=
response
,
code
=
0
)
experienceid
=
UtilsResponse
()
.
get_data
(
response
=
response
)
self
.
check_select
(
experienceid
=
experienceid
,
title
=
title
,
description
=
description
,
host_type
=
1
,
content
=
None
,
businessid
=
None
,
business_name
=
None
,
hostid
=
None
,
versions
=
1
,
monitorid
=
None
,
monitor_name
=
None
,
eventid
=
None
)
self
.
get_base_token
()
response
=
self
.
experience_create
()
self
.
check_code
(
response
=
response
,
code
=
2001
)
self
.
check_msg
(
response
=
response
,
msg
=
"无token,请重新登录"
)
@allure.title
(
"experience.create:模糊查询hostName"
)
@allure.story
(
"创建经验手册:模糊查询hostName"
)
@allure.title
(
"experience.create:token的用户已删除"
)
@allure.story
(
"创建经验手册:token的用户已删除"
)
@allure.severity
(
"blocker"
)
def
test_case_02
(
self
):
title
=
self
.
base_title
+
"02"
description
=
self
.
base_description
+
"02"
content
=
self
.
base_content
+
"02"
business_name
=
self
.
base_businessName
+
"02"
monitor_name
=
self
.
base_monitorName
+
"02"
response
=
self
.
experience_create
(
title
=
title
,
description
=
description
,
hostType
=
1
,
content
=
content
,
businessid
=
1
,
businessName
=
business_name
,
monitorid
=
1
,
monitorName
=
monitor_name
,
hostid
=
self
.
base_hostid
)
self
.
check_code
(
response
=
response
,
code
=
0
)
experienceid
=
UtilsResponse
()
.
get_data
(
response
=
response
)
self
.
check_select
(
experienceid
=
experienceid
,
title
=
title
,
description
=
description
,
host_type
=
1
,
content
=
content
,
businessid
=
1
,
business_name
=
business_name
,
hostid
=
self
.
base_hostid
,
versions
=
1
,
monitorid
=
1
,
monitor_name
=
monitor_name
,
eventid
=
None
)
self
.
get_base_token
()
self
.
update_members_del_flag
(
members_id
=
self
.
base_id
,
value
=
1
)
response
=
self
.
experience_create
(
token
=
self
.
token
)
self
.
check_code
(
response
=
response
,
code
=
2004
)
self
.
check_msg
(
response
=
response
,
msg
=
"用户不存在"
)
@allure.title
(
"experience.create:查询assetCode"
)
@allure.story
(
"创建经验手册:查询assetCode"
)
@allure.title
(
"experience.create:token的用户已禁用"
)
@allure.story
(
"创建经验手册:token的用户已禁用"
)
@allure.severity
(
"blocker"
)
def
test_case_03
(
self
):
host_name
=
self
.
base_host_name
+
"03"
asset_code
=
self
.
base_asset_code
+
"03"
manage_ip
=
self
.
base_manage_ip
+
"3"
parent_host
=
self
.
base_parent_host
+
"03"
self
.
insert_hosts
(
host_name
=
host_name
,
asset_code
=
asset_code
,
manage_ip
=
manage_ip
,
parent_host
=
parent_host
,
available
=
1
,
monitor_status
=
1
)
response
=
self
.
networkmonitor_get
(
assetCode
=
self
.
base_asset_code
)
self
.
check_code
(
response
=
response
,
code
=
0
)
self
.
check_data
(
response
=
response
,
key
=
"assetCode"
,
value
=
self
.
base_asset_code
,
is_default
=
0
)
self
.
get_base_token
()
self
.
update_members_status
(
members_id
=
self
.
base_id
,
value
=
1
)
response
=
self
.
experience_create
(
token
=
self
.
token
)
self
.
check_code
(
response
=
response
,
code
=
2003
)
self
.
check_msg
(
response
=
response
,
msg
=
"账户已被禁用"
)
@allure.title
(
"experience.create:模糊查询manageIp"
)
@allure.story
(
"创建经验手册:模糊查询manageIp"
)
@allure.title
(
"experience.create:必填项"
)
@allure.story
(
"创建经验手册:必填项"
)
@allure.severity
(
"blocker"
)
def
test_case_04
(
self
):
host_name
=
self
.
base_host_name
+
"04"
asset_code
=
self
.
base_asset_code
+
"04"
manage_ip
=
self
.
base_manage_ip
+
"4"
parent_host
=
self
.
base_parent_host
+
"04"
self
.
insert_hosts
(
host_name
=
host_name
,
asset_code
=
asset_code
,
manage_ip
=
manage_ip
,
parent_host
=
parent_host
,
available
=
1
,
monitor_status
=
1
)
response
=
self
.
networkmonitor_get
(
manageIp
=
self
.
base_manage_ip
)
self
.
get_base_token
()
self
.
title
=
self
.
base_title
+
"04"
self
.
content
=
self
.
base_content
+
"04"
response
=
self
.
experience_create
(
token
=
self
.
token
,
title
=
self
.
title
,
content
=
self
.
content
,
hostTypes
=
[
self
.
base_type_agent
])
self
.
check_code
(
response
=
response
,
code
=
0
)
self
.
check_data
(
response
=
response
,
key
=
"manageIp"
,
value
=
self
.
base_manage_ip
,
is_default
=
0
)
self
.
experienceid
=
UtilsResponse
()
.
get_data
(
response
=
response
)
sql
=
self
.
select_experience
(
experienceid
=
self
.
experienceid
)
for
x
,
y
in
enumerate
(
sql
):
assert
y
.
title
==
self
.
title
assert
y
.
content
==
self
.
content
@allure.title
(
"experience.create:模糊查询parentHost"
)
@allure.story
(
"创建经验手册:模糊查询parentHost"
)
@allure.title
(
"experience.create:所有字段"
)
@allure.story
(
"创建经验手册:所有字段"
)
@allure.severity
(
"blocker"
)
def
test_case_05
(
self
):
host_name
=
self
.
base_host_name
+
"05"
asset_code
=
self
.
base_asset_code
+
"05"
manage_ip
=
self
.
base_manage_ip
+
"5"
parent_host
=
self
.
base_parent_host
+
"05"
self
.
insert_hosts
(
host_name
=
host_name
,
asset_code
=
asset_code
,
manage_ip
=
manage_ip
,
parent_host
=
parent_host
,
available
=
1
,
monitor_status
=
1
)
response
=
self
.
networkmonitor_get
(
parentHost
=
self
.
base_parent_host
)
self
.
check_code
(
response
=
response
,
code
=
0
)
self
.
check_data
(
response
=
response
,
key
=
"parentHost"
,
value
=
self
.
base_parent_host
,
is_default
=
0
)
self
.
case_create
()
self
.
get_base_token
()
@allure.title
(
"experience.create:必填项"
)
@allure.story
(
"创建经验手册:必填项"
)
def
test_case_06
(
self
):
host_name
=
self
.
base_host_name
+
"06"
asset_code
=
self
.
base_asset_code
+
"06"
manage_ip
=
self
.
base_manage_ip
+
"6"
parent_host
=
self
.
base_parent_host
+
"06"
self
.
insert_hosts
(
host_name
=
host_name
,
asset_code
=
asset_code
,
manage_ip
=
manage_ip
,
parent_host
=
parent_host
,
available
=
0
,
monitor_status
=
1
)
response
=
self
.
networkmonitor_get
(
available
=
0
)
self
.
check_code
(
response
=
response
,
code
=
0
)
self
.
check_data
(
response
=
response
,
key
=
"available"
,
value
=
0
,
is_default
=
1
)
@allure.title
(
"experience.create:模糊查询monitor_status"
)
@allure.story
(
"创建经验手册:模糊查询monitor_status"
)
def
test_case_07
(
self
):
host_name
=
self
.
base_host_name
+
"07"
asset_code
=
self
.
base_asset_code
+
"07"
manage_ip
=
self
.
base_manage_ip
+
"7"
parent_host
=
self
.
base_parent_host
+
"07"
self
.
insert_hosts
(
host_name
=
host_name
,
asset_code
=
asset_code
,
manage_ip
=
manage_ip
,
parent_host
=
parent_host
,
available
=
1
,
monitor_status
=
0
)
response
=
self
.
networkmonitor_get
(
status
=
0
)
self
.
title
=
self
.
base_title
+
"05"
self
.
content
=
self
.
base_content
+
"05"
response
=
self
.
experience_create
(
token
=
self
.
token
,
title
=
self
.
title
,
content
=
self
.
content
,
hostTypes
=
[
self
.
base_type_agent
],
alarmRuleids
=
[
self
.
alarm_ruleid
],
businessids
=
[
self
.
businessid
],
hostids
=
[
self
.
hostid
],
manageIpList
=
[
self
.
hostid
],
itemids
=
[
self
.
itemid
],
problemidList
=
[
self
.
eventid
],
tagNameList
=
[
self
.
base_name
])
self
.
check_code
(
response
=
response
,
code
=
0
)
self
.
check_data
(
response
=
response
,
key
=
"monitorStatus"
,
value
=
0
,
is_default
=
1
)
self
.
experienceid
=
UtilsResponse
()
.
get_data
(
response
=
response
)
sql
=
self
.
select_experience
(
experienceid
=
self
.
experienceid
)
assert
sql
!=
[]
for
x
,
y
in
enumerate
(
sql
):
assert
y
.
title
==
self
.
title
assert
y
.
content
==
self
.
content
# 校验第一个表
sql
=
self
.
select_experience_alarm_rule
(
experienceid
=
self
.
experienceid
)
assert
sql
!=
[]
for
x
,
y
in
enumerate
(
sql
):
assert
y
.
alarm_ruleid
==
self
.
alarm_ruleid
# 校验第二个表
sql
=
self
.
select_experience_businesses
(
experienceid
=
self
.
experienceid
)
for
x
,
y
in
enumerate
(
sql
):
assert
y
.
businessid
==
self
.
businessid
# 校验第三个表
sql
=
self
.
select_experience_hosts
(
experienceid
=
self
.
experienceid
)
assert
sql
!=
[]
for
x
,
y
in
enumerate
(
sql
):
assert
y
.
hostid
==
self
.
hostid
# 校验第四个表
sql
=
self
.
select_experience_hosts_type
(
experienceid
=
self
.
experienceid
)
assert
sql
!=
[]
for
x
,
y
in
enumerate
(
sql
):
assert
y
.
hosts_type
==
self
.
base_type_agent
# 校验第五个表
sql
=
self
.
select_experience_ips
(
experienceid
=
self
.
experienceid
)
assert
sql
!=
[]
for
x
,
y
in
enumerate
(
sql
):
assert
y
.
hostid
==
self
.
hostid
# 校验第六个表
sql
=
self
.
select_experience_items
(
experienceid
=
self
.
experienceid
)
assert
sql
!=
[]
for
x
,
y
in
enumerate
(
sql
):
assert
y
.
itemid
==
self
.
itemid
# 校验第七个表
sql
=
self
.
select_experience_problem
(
experienceid
=
self
.
experienceid
)
assert
sql
!=
[]
for
x
,
y
in
enumerate
(
sql
):
assert
y
.
problemid
==
self
.
eventid
# 校验第八个表
sql
=
self
.
select_tag
(
tag
=
self
.
base_name
)
assert
sql
!=
[]
tagids
=
[]
for
x
,
y
in
enumerate
(
sql
):
assert
y
.
tag
==
self
.
base_name
assert
x
==
0
tagids
.
append
(
y
.
tagid
)
sql
=
self
.
select_experience_tag
(
experienceid
=
self
.
experienceid
)
assert
sql
!=
[]
for
x
,
y
in
enumerate
(
sql
):
assert
y
.
tagid
==
tagids
[
0
]
if
__name__
==
"__main__"
:
...
...
@@ -201,11 +382,11 @@ if __name__ == "__main__":
import
os
# 执行自动化测试用例
#
case_info = os.path.split(__file__)
#
case = UtilsCmd().pytest_cmd()
#
r = UtilsPyTest(case=case, case_info=case_info)
#
r.run_main()
a
=
TestExperienceCreate
()
a
.
setup_class
()
a
.
test_case_02
()
case_info
=
os
.
path
.
split
(
__file__
)
case
=
UtilsCmd
()
.
pytest_cmd
()
r
=
UtilsPyTest
(
case
=
case
,
case_info
=
case_info
)
r
.
run_main
()
#
a = TestExperienceCreate()
#
a.setup_class()
# a.test_case_05
()
WorkData/Argus/hosts_items.py
View file @
3b0cfe1e
...
...
@@ -105,7 +105,7 @@ class DataHostsItems(object):
session
.
add
(
sql_rep
)
session
.
commit
()
_id
=
sql_rep
.
id
self
.
log
.
debug
(
"新增的
host
id:
%
s"
%
_id
)
self
.
log
.
debug
(
"新增的
item
id:
%
s"
%
_id
)
session
.
close
()
return
_id
except
UtilsDataBase
()
.
errors
as
error
:
...
...
WorkData/Argus/problem.py
View file @
3b0cfe1e
...
...
@@ -77,16 +77,16 @@ class DataProblem(object):
session
.
close
()
return
error
def
delete_
like_host_name
(
self
,
session
,
host_name
):
def
delete_
hostid
(
self
,
session
,
hostid
):
"""
:param session: 指针
:param host
_name
:
:param host
id
:
:return:
"""
self
.
log
.
debug
(
"查询数据库:"
)
try
:
self
.
log
.
debug
(
"删除相关测试数据"
)
session
.
query
(
self
.
table
)
.
filter
(
self
.
table
.
host_name
.
like
(
str
(
host_name
)
+
"
%
"
)
)
.
delete
(
synchronize_session
=
False
)
session
.
query
(
self
.
table
)
.
filter
_by
(
hostid
=
hostid
)
.
delete
(
synchronize_session
=
False
)
# session.delete(sql)
session
.
commit
()
session
.
close
()
...
...
@@ -98,48 +98,52 @@ class DataProblem(object):
session
.
close
()
return
error
def
insert_one
(
self
,
session
,
host_name
=
None
,
asset_code
=
None
,
manage_ip
=
None
,
departmentid
=
None
,
manage_level
=
None
,
host_type
=
None
,
parent_host
=
None
,
available
=
None
,
monitor_status
=
None
,
iplist
=
None
,
monitor_interfac
e
=
None
,
del_flag
=
None
):
def
insert_one
(
self
,
session
,
eventid
=
None
,
source
=
None
,
_object
=
None
,
objectid
=
None
,
clock
=
None
,
ns
=
None
,
r_clock
=
None
,
r_ns
=
None
,
hostid
=
None
,
name
=
None
,
acknowledged
=
None
,
acknowledged_tim
e
=
None
,
severity
=
None
,
status
=
None
,
):
"""
:param session: 指针
:param host_name:
:param asset_code:
:param manage_ip:
:param departmentid:
:param manage_level:
:param host_type:
:param parent_host:
:param available:
:param monitor_status:
:param iplist:
:param monitor_interface:
:param del_flag:
:param eventid:
:param source:
:param _object:
:param objectid:
:param clock:
:param ns:
:param r_clock:
:param r_ns:
:param hostid:
:param name:
:param acknowledged:
:param acknowledged_time:
:param severity:
:param status:
:return:
"""
self
.
log
.
debug
(
"新增表数据:"
)
try
:
base
=
UtilsDataBase
()
base
.
add_param
(
_key
=
"host_name"
,
value
=
host_name
)
base
.
add_param
(
_key
=
"asset_code"
,
value
=
asset_code
)
base
.
add_param
(
_key
=
"manage_ip"
,
value
=
manage_ip
)
base
.
add_param
(
_key
=
"departmentid"
,
value
=
departmentid
)
base
.
add_param
(
_key
=
"manage_level"
,
value
=
manage_level
)
base
.
add_param
(
_key
=
"host_type"
,
value
=
host_type
)
base
.
add_param
(
_key
=
"parent_host"
,
value
=
parent_host
)
base
.
add_param
(
_key
=
"available"
,
value
=
available
)
base
.
add_param
(
_key
=
"monitor_status"
,
value
=
monitor_status
)
base
.
add_param
(
_key
=
"iplist"
,
value
=
iplist
)
base
.
add_param
(
_key
=
"monitor_interface"
,
value
=
monitor_interface
)
base
.
add_param
(
_key
=
"del_flag"
,
value
=
del_flag
)
base
.
add_param
(
_key
=
"eventid"
,
value
=
eventid
)
base
.
add_param
(
_key
=
"hostid"
,
value
=
hostid
)
base
.
add_param
(
_key
=
"name"
,
value
=
name
)
base
.
add_param
(
_key
=
"source"
,
value
=
source
)
base
.
add_param
(
_key
=
"object"
,
value
=
_object
)
base
.
add_param
(
_key
=
"objectid"
,
value
=
objectid
)
base
.
add_param
(
_key
=
"clock"
,
value
=
clock
)
base
.
add_param
(
_key
=
"ns"
,
value
=
ns
)
base
.
add_param
(
_key
=
"r_clock"
,
value
=
r_clock
)
base
.
add_param
(
_key
=
"r_ns"
,
value
=
r_ns
)
base
.
add_param
(
_key
=
"acknowledged"
,
value
=
acknowledged
)
base
.
add_param
(
_key
=
"acknowledged_time"
,
value
=
acknowledged_time
)
base
.
add_param
(
_key
=
"severity"
,
value
=
severity
)
base
.
add_param
(
_key
=
"status"
,
value
=
status
)
sql_rep
=
self
.
table
(
**
base
.
param
)
session
.
add
(
sql_rep
)
session
.
commit
()
hostid
=
sql_rep
.
hos
tid
self
.
log
.
debug
(
"新增的
hostid:
%
s"
%
hos
tid
)
eventid
=
sql_rep
.
even
tid
self
.
log
.
debug
(
"新增的
eventid:
%
s"
%
even
tid
)
session
.
close
()
return
hos
tid
return
even
tid
except
UtilsDataBase
()
.
errors
as
error
:
self
.
log
.
error
(
"异常:"
)
self
.
log
.
error
(
error
)
...
...
WorkData/Argus/tag.py
0 → 100644
View file @
3b0cfe1e
# -*- coding: utf_8 -*-
# 表名: tag
# 作者: 陈磊
# 时间: 2019-12-04
from
sqlalchemy.ext.declarative
import
declarative_base
from
sqlalchemy
import
Column
,
Integer
,
VARCHAR
,
SMALLINT
,
DATETIME
,
DECIMAL
,
BigInteger
,
FLOAT
,
NUMERIC
,
TEXT
from
sqlalchemy.orm.exc
import
MultipleResultsFound
,
NoResultFound
from
sqlalchemy
import
or_
from
WorkUtils.UtilsLog
import
UtilsLog
from
WorkUtils.UtilsDataBase
import
UtilsDataBase
import
time
Base
=
declarative_base
()
class
Tag
(
Base
):
"""
标签表
"""
__tablename__
=
"tag"
tagid
=
Column
(
BigInteger
,
primary_key
=
True
)
tag
=
Column
(
VARCHAR
(
255
),
comment
=
"标签值"
)
class
DataTag
(
object
):
def
__init__
(
self
):
self
.
log
=
UtilsLog
()
self
.
log
.
info
(
self
.
__class__
)
self
.
table
=
Tag
def
select_all_from_allKeys
(
self
,
session
,
tagid
=
None
,
tag
=
None
):
"""
:param session: 指针
:param tagid:
:param tag:
:return: 查询结果
"""
self
.
log
.
debug
(
"查询数据库:"
)
try
:
base
=
UtilsDataBase
()
base
.
add_param
(
_key
=
"tagid"
,
value
=
tagid
)
base
.
add_param
(
_key
=
"tag"
,
value
=
tag
)
sql_rep
=
session
.
query
(
self
.
table
)
.
filter_by
(
**
base
.
param
)
.
all
()
self
.
log
.
debug
(
sql_rep
)
session
.
close
()
return
sql_rep
except
UtilsDataBase
()
.
errors
as
error
:
self
.
log
.
error
(
"异常:"
)
self
.
log
.
error
(
error
)
session
.
close
()
return
error
def
delete_like_tag
(
self
,
session
,
tag
):
"""
:param session: 指针
:param tag:
:return:
"""
self
.
log
.
debug
(
"查询数据库:"
)
try
:
self
.
log
.
debug
(
"删除相关测试数据"
)
session
.
query
(
self
.
table
)
.
filter
(
self
.
table
.
tag
.
like
(
str
(
tag
)
+
"
%
"
))
.
delete
(
synchronize_session
=
False
)
# session.delete(sql)
session
.
commit
()
session
.
close
()
self
.
log
.
debug
(
"删除成功"
)
return
True
except
UtilsDataBase
()
.
errors
as
error
:
self
.
log
.
error
(
"异常:"
)
self
.
log
.
error
(
error
)
session
.
close
()
return
error
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment