Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
Z
zmops-test
Project
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
sanshi
zmops-test
Commits
72fd8898
Commit
72fd8898
authored
Dec 04, 2019
by
sanshi
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
资产ips下拉框
parent
d32a6f8d
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
364 additions
and
0 deletions
+364
-0
host_update.py
WorkApi/API/Host/host_update.py
+152
-0
test_host_update.py
WorkCase/API/Host/test_host_update.py
+212
-0
No files found.
WorkApi/API/Host/host_update.py
0 → 100644
View file @
72fd8898
# -*- coding: utf-8 -*-
# 更新主机
# 作者: 陈磊
# 时间: 2019-12-04
from
WorkApi.ApiBase
import
ApiBase
from
WorkUtils.UtilsRequest
import
UtilsRequest
from
WorkUtils.UtilsLog
import
UtilsLog
class
HostUpdate
(
object
):
def
__init__
(
self
,
_host
):
"""
:param _host: 域名
:return:
"""
self
.
log
=
UtilsLog
()
self
.
log
.
info
(
"调用更新主机"
)
self
.
log
.
info
(
self
.
__class__
)
self
.
_host
=
_host
self
.
_headers
=
{}
self
.
_path
=
"/host/update"
self
.
_url
=
self
.
_host
+
self
.
_path
self
.
_params
=
{}
self
.
_json
=
{}
self
.
local_json
=
{}
self
.
response
=
""
self
.
hostid
=
None
self
.
host
=
None
self
.
available
=
None
self
.
description
=
None
self
.
flags
=
None
self
.
inventory_mode
=
None
self
.
ipmi_authtype
=
None
self
.
ipmi_available
=
None
self
.
ipmi_disable_until
=
None
self
.
ipmi_error
=
None
self
.
ipmi_errors_from
=
None
self
.
ipmi_password
=
None
self
.
ipmi_privilege
=
None
self
.
ipmi_username
=
None
self
.
jmx_available
=
None
self
.
jmx_disable_until
=
None
self
.
jmx_error
=
None
self
.
jmx_errors_from
=
None
self
.
maintenance_from
=
None
self
.
maintenance_status
=
None
self
.
maintenance_type
=
None
self
.
maintenanceid
=
None
self
.
name
=
None
self
.
proxy_hostid
=
None
self
.
snmp_available
=
None
self
.
snmp_disable_until
=
None
self
.
snmp_error
=
None
self
.
snmp_errors_from
=
None
self
.
interfaces
=
None
self
.
groupids
=
None
self
.
tags
=
None
self
.
templateids
=
None
self
.
macros
=
None
self
.
inventory
=
None
self
.
status
=
None
self
.
tls_connect
=
None
self
.
tls_accept
=
None
self
.
tls_issuer
=
None
self
.
tls_subject
=
None
self
.
tls_psk_identity
=
None
self
.
tls_psk
=
None
self
.
macros_macro
=
None
self
.
macros_value
=
None
self
.
interfaces_type
=
None
self
.
interfaces_main
=
None
self
.
interfaces_useip
=
None
self
.
interfaces_ip
=
None
self
.
interfaces_dns
=
None
self
.
interfaces_port
=
None
self
.
api
=
UtilsRequest
()
def
get_response
(
self
):
if
self
.
interfaces_ip
is
not
None
:
self
.
interfaces
=
[]
for
x
,
y
in
enumerate
(
self
.
interfaces_ip
):
base
=
ApiBase
()
base
.
dict_add_key
(
_key
=
"type"
,
value
=
self
.
interfaces_type
[
x
])
base
.
dict_add_key
(
_key
=
"main"
,
value
=
self
.
interfaces_main
[
x
])
base
.
dict_add_key
(
_key
=
"useip"
,
value
=
self
.
interfaces_useip
[
x
])
base
.
dict_add_key
(
_key
=
"ip"
,
value
=
self
.
interfaces_ip
[
x
])
base
.
dict_add_key
(
_key
=
"dns"
,
value
=
self
.
interfaces_dns
[
x
])
base
.
dict_add_key
(
_key
=
"port"
,
value
=
self
.
interfaces_port
[
x
])
self
.
interfaces
.
append
(
base
.
_json
)
else
:
pass
if
self
.
macros_macro
is
not
None
:
self
.
macros
=
[]
for
x
,
y
in
enumerate
(
self
.
macros_macro
):
base
=
ApiBase
()
base
.
dict_add_key
(
_key
=
"macro"
,
value
=
self
.
macros_macro
[
x
])
base
.
dict_add_key
(
_key
=
"value"
,
value
=
self
.
macros_value
[
x
])
self
.
macros
.
append
(
base
.
_json
)
else
:
pass
base
=
ApiBase
()
base
.
dict_add_key
(
_key
=
"hostid"
,
value
=
self
.
hostid
)
base
.
dict_add_key
(
_key
=
"host"
,
value
=
self
.
host
)
base
.
dict_add_key
(
_key
=
"available"
,
value
=
self
.
available
)
base
.
dict_add_key
(
_key
=
"description"
,
value
=
self
.
description
)
base
.
dict_add_key
(
_key
=
"flags"
,
value
=
self
.
flags
)
base
.
dict_add_key
(
_key
=
"inventory_mode"
,
value
=
self
.
inventory_mode
)
base
.
dict_add_key
(
_key
=
"ipmiAuthtype"
,
value
=
self
.
ipmi_authtype
)
base
.
dict_add_key
(
_key
=
"ipmiAvailable"
,
value
=
self
.
ipmi_available
)
base
.
dict_add_key
(
_key
=
"ipmiDisable_until"
,
value
=
self
.
ipmi_disable_until
)
base
.
dict_add_key
(
_key
=
"ipmiError"
,
value
=
self
.
ipmi_error
)
base
.
dict_add_key
(
_key
=
"ipmiErrorsFrom"
,
value
=
self
.
ipmi_errors_from
)
base
.
dict_add_key
(
_key
=
"ipmiPassword"
,
value
=
self
.
ipmi_password
)
base
.
dict_add_key
(
_key
=
"ipmiPrivilege"
,
value
=
self
.
ipmi_privilege
)
base
.
dict_add_key
(
_key
=
"ipmiUsername"
,
value
=
self
.
ipmi_username
)
base
.
dict_add_key
(
_key
=
"jmxAvailable"
,
value
=
self
.
jmx_available
)
base
.
dict_add_key
(
_key
=
"jmxDisableUntil"
,
value
=
self
.
jmx_disable_until
)
base
.
dict_add_key
(
_key
=
"jmxError"
,
value
=
self
.
jmx_error
)
base
.
dict_add_key
(
_key
=
"jmxErrorsFrom"
,
value
=
self
.
jmx_errors_from
)
base
.
dict_add_key
(
_key
=
"maintenanceFrom"
,
value
=
self
.
maintenance_from
)
base
.
dict_add_key
(
_key
=
"maintenanceStatus"
,
value
=
self
.
maintenance_status
)
base
.
dict_add_key
(
_key
=
"maintenanceType"
,
value
=
self
.
maintenance_type
)
base
.
dict_add_key
(
_key
=
"maintenanceid"
,
value
=
self
.
maintenanceid
)
base
.
dict_add_key
(
_key
=
"name"
,
value
=
self
.
name
)
base
.
dict_add_key
(
_key
=
"proxyHostid"
,
value
=
self
.
proxy_hostid
)
base
.
dict_add_key
(
_key
=
"snmpAvailable"
,
value
=
self
.
snmp_available
)
base
.
dict_add_key
(
_key
=
"snmpDisableUntil"
,
value
=
self
.
snmp_disable_until
)
base
.
dict_add_key
(
_key
=
"snmpError"
,
value
=
self
.
snmp_error
)
base
.
dict_add_key
(
_key
=
"snmpErrorsFrom"
,
value
=
self
.
snmp_errors_from
)
base
.
dict_add_key
(
_key
=
"interfaces"
,
value
=
self
.
interfaces
)
base
.
dict_add_key
(
_key
=
"groupids"
,
value
=
self
.
groupids
)
base
.
dict_add_key
(
_key
=
"tags"
,
value
=
self
.
tags
)
base
.
dict_add_key
(
_key
=
"templateids"
,
value
=
self
.
templateids
)
base
.
dict_add_key
(
_key
=
"macros"
,
value
=
self
.
macros
)
base
.
dict_add_key
(
_key
=
"inventory"
,
value
=
self
.
inventory
)
base
.
dict_add_key
(
_key
=
"status"
,
value
=
self
.
status
)
base
.
dict_add_key
(
_key
=
"tlsConnect"
,
value
=
self
.
tls_connect
)
base
.
dict_add_key
(
_key
=
"tlsAccept"
,
value
=
self
.
tls_accept
)
base
.
dict_add_key
(
_key
=
"tlsIssuer"
,
value
=
self
.
tls_issuer
)
base
.
dict_add_key
(
_key
=
"tlsSubject"
,
value
=
self
.
tls_subject
)
base
.
dict_add_key
(
_key
=
"tlsPskIdentity"
,
value
=
self
.
tls_psk_identity
)
base
.
dict_add_key
(
_key
=
"tlsPsk"
,
value
=
self
.
tls_psk
)
self
.
local_json
=
base
.
_json
self
.
_json
=
self
.
local_json
self
.
response
=
self
.
api
.
post
(
url
=
self
.
_url
,
json
=
self
.
_json
)
WorkCase/API/Host/test_host_update.py
0 → 100644
View file @
72fd8898
# -*- coding: utf-8 -*-
# 测试用例
# 作者: 陈磊
# 时间: 2019-12-04
from
__future__
import
division
from
WorkUtils.UtilsLog
import
UtilsLog
from
WorkUtils.UtilsDataBase
import
UtilsDataBase
from
WorkUtils.UtilsResponse
import
UtilsResponse
from
WorkApi.API.Host.host_create
import
HostCreate
from
WorkApi.API.Host.host_update
import
HostUpdate
from
WorkData.Zabbix.hosts
import
DataHosts
from
WorkData.Zabbix.items
import
DataItems
from
WorkData.Zabbix.hostmacro
import
DataHostsMacro
from
WorkCase
import
CaseBase
import
allure
@allure.feature
(
"测试模块:host.update"
)
class
TestHostUpdate
(
object
):
log
=
UtilsLog
()
env
=
CaseBase
()
.
environment
host
=
env
[
"host"
]
db_url
=
env
[
"db_url"
]
db_port
=
env
[
"db_port"
]
db_user
=
env
[
"db_user"
]
db_pw
=
env
[
"db_pw"
]
db_base
=
env
[
"db_base"
]
base_groupid
=
101
base_name
=
"SS TEST HOST UPDATE"
base_ip
=
"10.0.0.10"
base_description
=
"SS自动化测试主机描述, 请勿删除"
@classmethod
def
setup_class
(
cls
):
cls
.
log
.
debug
(
"开始执行测试套件......."
)
session
=
UtilsDataBase
()
.
conn_mysql
(
db_url
=
cls
.
db_url
,
db_port
=
cls
.
db_port
,
db_base
=
cls
.
db_base
,
db_user
=
cls
.
db_user
,
db_pw
=
cls
.
db_pw
)
sql
=
DataHosts
()
.
select_like_name
(
session
=
session
,
name
=
cls
.
base_name
)
for
x
,
y
in
enumerate
(
sql
):
session
=
UtilsDataBase
()
.
conn_mysql
(
db_url
=
cls
.
db_url
,
db_port
=
cls
.
db_port
,
db_base
=
cls
.
db_base
,
db_user
=
cls
.
db_user
,
db_pw
=
cls
.
db_pw
)
DataItems
()
.
delete_hostid
(
session
=
session
,
hostid
=
y
.
hostid
)
session
=
UtilsDataBase
()
.
conn_mysql
(
db_url
=
cls
.
db_url
,
db_port
=
cls
.
db_port
,
db_base
=
cls
.
db_base
,
db_user
=
cls
.
db_user
,
db_pw
=
cls
.
db_pw
)
DataHosts
()
.
delete_like_name
(
session
=
session
,
name
=
cls
.
base_name
)
@classmethod
def
teardown_class
(
cls
):
cls
.
log
.
debug
(
"结束执行测试套件......."
)
def
setup_method
(
self
):
self
.
log
.
debug
(
"测试用例执行开始..."
)
def
teardown_method
(
self
):
self
.
log
.
debug
(
"测试用例执行结束..."
)
@allure.step
(
"调用接口:host.create"
)
def
host_create
(
self
,
host
=
None
,
name
=
None
,
description
=
None
,
groupids
=
None
,
interfaces
=
None
,
tags
=
None
,
templateids
=
None
,
macros
=
None
,
inventory
=
None
,
status
=
None
,
inventory_mode
=
None
):
api
=
HostCreate
(
_host
=
self
.
host
)
api
.
host
=
host
api
.
name
=
name
api
.
description
=
description
api
.
groupids
=
groupids
api
.
interfaces
=
interfaces
api
.
tags
=
tags
api
.
templateids
=
templateids
api
.
macros
=
macros
api
.
inventory
=
inventory
api
.
status
=
status
api
.
inventory_mode
=
inventory_mode
api
.
tls_connect
=
1
api
.
tls_accept
=
1
api
.
tls_psk_identity
=
""
api
.
tls_psk
=
""
api
.
get_response
()
return
api
.
response
@allure.step
(
"调用接口:host.update"
)
def
host_update
(
self
,
groupids
=
None
,
hostid
=
None
,
countOutput
=
None
,
editable
=
None
,
excludeSearch
=
None
,
_filter
=
None
,
limit
=
None
,
output
=
None
,
preservekeys
=
None
,
interfaces
=
None
,
macros_macro
=
None
,
macros_value
=
None
,
ipmi_authtype
=
None
,
ipmi_privilege
=
None
,
ipmi_username
=
None
,
ipmi_password
=
None
,
interfaces_type
=
None
,
interfaces_main
=
None
,
interfaces_useip
=
None
):
api
=
HostUpdate
(
_host
=
self
.
host
)
api
.
groupids
=
groupids
api
.
hostid
=
hostid
api
.
countOutput
=
countOutput
api
.
editable
=
editable
api
.
excludeSearch
=
excludeSearch
api
.
filter
=
_filter
api
.
limit
=
limit
api
.
output
=
output
api
.
preservekeys
=
preservekeys
api
.
interfaces
=
interfaces
api
.
ipmi_authtype
=
ipmi_authtype
api
.
ipmi_privilege
=
ipmi_privilege
api
.
ipmi_username
=
ipmi_username
api
.
ipmi_password
=
ipmi_password
api
.
macros_macro
=
macros_macro
api
.
macros_value
=
macros_value
api
.
interfaces_type
=
interfaces_type
api
.
interfaces_main
=
interfaces_main
api
.
interfaces_useip
=
interfaces_useip
# api.int
api
.
get_response
()
return
api
.
response
@allure.step
(
"连接数据库:zabbix"
)
def
db_session
(
self
):
session
=
UtilsDataBase
()
.
conn_mysql
(
db_url
=
self
.
db_url
,
db_port
=
self
.
db_port
,
db_base
=
self
.
db_base
,
db_user
=
self
.
db_user
,
db_pw
=
self
.
db_pw
)
return
session
@allure.step
(
"查询表:hosts"
)
def
select_hosts
(
self
,
hostid
):
session
=
self
.
db_session
()
sql
=
DataHosts
()
.
select_all_from_allKeys
(
session
=
session
,
hostid
=
hostid
)
return
sql
@allure.step
(
"查询表:hosts_macro"
)
def
select_macro
(
self
,
hostid
):
session
=
self
.
db_session
()
sql
=
DataHostsMacro
()
.
select_all_from_allKeys
(
session
=
session
,
hostid
=
hostid
)
return
sql
@allure.step
(
"断言返回结果:校验排序"
)
def
check_sortfield
(
self
,
response
,
possible
):
result
=
UtilsResponse
()
.
get_result
(
response
=
response
)
UtilsResponse
()
.
check_sort
(
_list
=
result
,
key
=
"hostid"
,
possible
=
possible
)
@allure.step
(
"断言返回结果:校验返回数据的数量"
)
def
check_num
(
self
,
response
,
num
):
result
=
UtilsResponse
()
.
get_result
(
response
=
response
)
assert
len
(
result
)
==
num
@allure.step
(
"断言返回结果"
)
def
check_code
(
self
,
response
,
code
):
_code
=
UtilsResponse
()
.
get_code
(
response
=
response
)
assert
_code
==
code
@allure.step
(
"创建测试数据"
)
def
case_create
(
self
,
num
):
templateid
=
10001
host
=
self
.
base_name
+
num
description
=
self
.
base_description
+
num
interfaces
=
{
"type"
:
2
,
"main"
:
1
,
"useip"
:
1
,
"ip"
:
self
.
base_ip
,
"dns"
:
""
,
"port"
:
100
,
"bulk"
:
1
}
response
=
self
.
host_create
(
host
=
host
,
description
=
description
,
groupids
=
[
self
.
base_groupid
],
interfaces
=
[
interfaces
],
status
=
0
,
templateids
=
[
templateid
],
macros
=
[{
"macro"
:
"{$SNMP_COMMUNITY}"
,
"value"
:
"public"
}])
self
.
check_code
(
response
=
response
,
code
=
0
)
self
.
hostid
=
UtilsResponse
()
.
get_result
(
response
=
response
)[
"hostids"
][
0
]
return
self
.
hostid
@allure.step
(
"断言返回结果:与数据库结果是否一致"
)
def
check_sql
(
self
):
sql
=
self
.
select_hosts
(
hostid
=
self
.
hostid
)
for
x
,
y
in
enumerate
(
sql
):
assert
y
.
ipmi_authtype
==
2
assert
y
.
ipmi_privilege
==
3
assert
y
.
ipmi_username
==
"ipmi_username"
assert
y
.
ipmi_password
==
"ipmi_password"
sql
=
self
.
select_macro
(
hostid
=
self
.
hostid
)
for
x
,
y
in
enumerate
(
sql
):
assert
y
.
macro
==
"{$SNMP_COMMUNITY}"
assert
y
.
value
==
"public"
@allure.title
(
"host.update:无参数调用"
)
@allure.story
(
"更新主机:无参数调用"
)
@allure.severity
(
"blocker"
)
def
test_case_01
(
self
):
self
.
case_create
(
num
=
"01"
)
response
=
self
.
host_update
(
hostid
=
self
.
hostid
,
macros_macro
=
[
"{$SNMP_COMMUNITY}"
],
macros_value
=
[
"public"
],
ipmi_authtype
=
2
,
ipmi_password
=
"ipmi_password"
,
ipmi_privilege
=
3
,
ipmi_username
=
"ipmi_username"
,
)
self
.
check_code
(
response
=
response
,
code
=
0
)
self
.
check_sql
()
if
__name__
==
"__main__"
:
from
WorkUtils.UtilsPyTest
import
UtilsPyTest
from
WorkUtils.UtilsCmd
import
UtilsCmd
import
os
# 执行自动化测试用例
case_info
=
os
.
path
.
split
(
__file__
)
case
=
UtilsCmd
()
.
pytest_cmd
()
r
=
UtilsPyTest
(
case
=
case
,
case_info
=
case_info
)
r
.
run_main
()
# a = TestHostUpdate()
# a.setup_class()
# a.test_case_01()
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment