Commit 9f8f351b by sanshi

all

parent dbb32845
# -*- coding: utf-8 -*-
# 命令行基础类
# 作者: 陈磊
# 时间: 2019-10-22
import sys
import getopt
from WorkUtils.UtilsLog import UtilsLog
class UtilsCmd:
def __init__(self):
self.log = UtilsLog()
self.log.info("调用命令行基础类")
self.log.info(self.__class__)
def usage(self):
self.log.debug("帮助信息:")
self.log.debug(" * -h : 打印当前信息")
self.log.debug(" * -v [val] : 版本号")
self.log.debug(" * -p [val] : 测试路径")
def unittest_cmd(self):
self.log.debug("运行用例执行参数模式......")
version = ""
description = ""
try:
opts, args = getopt.getopt(sys.argv[1:], "hd:v:")
for op, value in opts:
if op == "-d":
description = value
elif op == "-v":
version = value
elif op == "-h":
# 帮助信息
self.usage()
sys.exit()
else:
sys.exit()
except getopt.GetoptError as e:
self.log.error("出现ERROR:")
self.log.error(e)
return {
"description": description,
"version": version
}
def pytest_cmd(self):
"""
pytest框架
:return:
"""
self.log.debug("运行用例执行参数模式......")
version = None
environment = None
path = None
try:
opts, args = getopt.getopt(sys.argv[1:], "hv:e:p;")
for op, value in opts:
if op == "-v":
version = value
elif op == "-e":
environment = value
elif op == "-p":
path = value.replace("/", "\\")
elif op == "-h":
# 帮助信息
self.usage()
sys.exit()
else:
sys.exit()
except getopt.GetoptError as e:
self.log.error("出现ERROR:")
self.log.error(e)
return {
"version": version,
"environment": environment,
"path": path
}
# -*- coding: utf-8 -*-
# 数据库连接类
# 作者: 陈磊
# 时间: 2019-10-21
from WorkUtils.UtilsLog import UtilsLog
from sqlalchemy.exc import ProgrammingError, OperationalError
from sqlalchemy.orm.exc import MultipleResultsFound, NoResultFound
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
class UtilsDataBase:
def __init__(self):
self.log = UtilsLog()
self.log.info("调用基础数据库连接类")
self.log.info(self.__class__)
self.errors = ProgrammingError, OperationalError, MultipleResultsFound, NoResultFound, AttributeError
self.param = {}
def conn_mysql(self, db_url, db_port, db_base, db_user, db_pw):
"""
mysql数据库连接方法
:param db_url: 数据库地址
:param db_port: 数据库端口
:param db_base: 数据库库名
:param db_user: 数据库登录用户名
:param db_pw: 数据库登录密码
:return:
"""
try:
db = "mysql://%s:%s@%s:%s/%s?charset=utf8" % (db_user, db_pw, db_url, db_port, db_base)
engine = create_engine(db, encoding="utf-8")
# echo=True,
db_session = sessionmaker(bind=engine)
session = db_session()
self.log.debug("数据库请求地址:")
self.log.debug(db)
self.log.debug(db_session)
self.log.debug(session)
return session
except self.errors as e:
self.log.error("异常:")
self.log.error(e)
def add_param(self, _key, value):
if value is None:
pass
else:
self.param[_key] = value
# -*- coding: utf-8 -*-
# 图片相关类
# 作者: 陈磊
# 时间: 2019-10-21
from WorkUtils.UtilsLog import UtilsLog
from WorkBase import WorkBase
import os
import base64
import datetime
from PIL import Image
from PIL import ImageEnhance
from pytesseract import *
from PIL import ImageOps
class UtilsImage:
def __init__(self):
self.log = UtilsLog()
self.log.info("调用图片相关类")
self.log.info(self.__class__)
self.create_time = datetime.datetime.today().strftime("%Y%m%d%H%M%S")
year = str(datetime.datetime.now().year) + u"年"
month = str(datetime.datetime.now().month) + u"月"
day = str(datetime.datetime.now().day) + u"日"
self.img_path = WorkBase().img_path + year + "\\" + month + "\\" + day + "\\"
self.img_name = ""
self.img_file = ""
self.code = ""
def change_base64(self, bs64):
self.log.info("获取base64图片内容")
self.log.info("解析图片...")
img_file = base64.b64decode(bs64)
# 将二进制数据装换为图片
# 生成测试报告文件
# 更新获取的版本号生成文件
if os.path.exists(self.img_path):
pass
else:
os.makedirs(self.img_path)
self.img_name = self.create_time + ".png"
self.img_file = self.img_path + self.img_name
with open(self.img_file, "wb") as img_png:
img_png.write(img_file)
self.log.info("保存解析的图片")
def init_table(self, threshold=190):
self.log.info("二值化阈值:%s" % threshold)
table = []
for i in range(256):
if i < threshold:
table.append(0)
else:
table.append(1)
return table
def get_code(self):
self.log.info("解析图片验证码...")
# 测试
self.img_file = "E:\\TestFile\\Img\\2019年\\6月\\19日\\20190619152453.png"
# 新建Image对象
image = Image.open(self.img_file)
# 进行置灰处理
image = image.convert('L')
# 这个是二值化阈值
threshold = 192
table = []
for i in range(256):
if i < threshold:
table.append(0)
else:
table.append(1)
# 通过表格转换成二进制图片,1的作用是白色,不然就全部黑色了
image = image.point(table, "1")
image.show()
self.code = pytesseract.image_to_string(image, lang="eng")
self.log.info("验证码:" + self.code)
if __name__ == '__main__':
i = UtilsImage()
i.get_code()
\ No newline at end of file
# -*- coding: utf-8 -*-
# 性能测试类
# 作者: 陈磊
# 时间: 2019-10-21
from WorkUtils.UtilsLog import UtilsLog
from locust import HttpLocust, TaskSet, task
import subprocess
log = UtilsLog()
class BaseTask(TaskSet):
def on_start(self):
log.info(self.__doc__)
@task(1)
def base_post(self, url, param, json):
"""
:param url:
:param param:
:param json:
:return:
"""
response = self.client.post(
url=url,
params=param,
json=json
)
if response.status_code != 200:
log.info("返回状态码异常")
log.info("返回状态码:%s" % response.status_code)
print("")
print()
elif response.status_code == 200:
log.info("返回状态码正常")
class MobileUserLocust(HttpLocust):
u"""
min_wait :用户执行任务之间等待时间的下界,单位:毫秒。
max_wait :用户执行任务之间等待时间的上界,单位:毫秒。
"""
# weight = 3
task_set = BaseTask.base_post(url="/", param={}, json={})
host = "http://www.baidu.com"
min_wait = 3000
max_wait = 6000
if __name__ == "__main__":
subprocess.Popen("locust -f UtilsLocust.py", shell=True)
\ No newline at end of file
# -*- coding: utf-8 -*-
# 新版日志打印类
# 作者: 陈磊
# 时间: 2019-10-30
import logging
import datetime
import os
from logging.handlers import RotatingFileHandler
from WorkBase import WorkBase
class UtilsLog:
def __init__(self):
self.logger = logging.getLogger(__name__)
# 日志格式
self.log_format = "%(asctime)s - %(levelname)s - %(message)s"
self.log_datefmt = "%Y-%m-%d %H:%M:%S"
logging.basicConfig(format=self.log_format, datefmt=self.log_datefmt, level=logging.DEBUG)
def info(self, message):
return self.logger.info(message)
def debug(self, message):
return self.logger.debug(message)
def warning(self, message):
return self.logger.warning(message)
def error(self, message):
return self.logger.error(message)
# -*- coding: utf-8 -*-
# 日志打印类
# 作者: 陈磊
# 时间: 2019-10-21
import logging
import datetime
import os
from logging.handlers import RotatingFileHandler
from WorkBase import WorkBase
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# 日志格式
log_format = "%(asctime)s - %(levelname)s - %(message)s"
log_datefmt = "%Y-%m-%d %H:%M:%S"
fmt = logging.Formatter(log_format, log_datefmt)
# 控制台日志
sh = logging.StreamHandler()
sh.setLevel(logging.DEBUG)
sh.setFormatter(fmt)
# 文件日志
year = str(datetime.datetime.now().year) + u"年"
month = str(datetime.datetime.now().month) + u"月"
day = str(datetime.datetime.now().day) + u"日"
# 导入日志文件路径
path = WorkBase().log_path + year + "\\" + month + "\\" + day + "\\"
if os.path.exists(path):
pass
else:
os.makedirs(path)
logger_name = str(datetime.datetime.today().strftime("%Y%m%d%H%M%S")) + ".txt"
log_path = path + logger_name
# fh = logging.FileHandler(log_path)
# 根据日志文件大小分割 最多9999份
fh = RotatingFileHandler(log_path, maxBytes=8*1024*1024, backupCount=9999)
fh.setLevel(logging.DEBUG)
fh.setFormatter(fmt)
# 添加日志处理器
logger.addHandler(sh)
logger.addHandler(fh)
class UtilsLogOld:
def __init__(self):
self.logger = logger
self.path = path
self.logger_name = logger_name
def info(self, message):
return self.logger.info(message)
def debug(self, message):
return self.logger.debug(message)
def warning(self, message):
return self.logger.warning(message)
def error(self, message):
return self.logger.error(message)
# -*- coding: utf-8 -*-
# PyTest测试框架类
# 作者: 陈磊
# 时间: 2019-10-29
import pytest
import sys
import os
from datetime import datetime
from WorkBase import WorkBase
from WorkUtils.UtilsLog import UtilsLog
class UtilsPyTest:
def __init__(self, case, case_info=None):
"""
:param case: 用例
:param case_info: 信息
"""
self.log = UtilsLog()
self.log.info("调用PyTest测试框架类")
self.log.info(self.__class__)
self.case_name = ""
self.create_time = ""
self.case = case
self.version = self.case["version"]
self.case_info = case_info
self.xml_path = ""
self.report_path = ""
def mkdir_case(self):
"""
创建用例文件的报告文件夹
:return:
"""
self.log.debug("创建测试报告XML文件路径......")
# 解析用例路径信息
self.create_time = datetime.today().strftime("%Y%m%d%H%M%S")
# 判断是多用例还是单用例模式
if self.case_info is not None:
# 得到测试文件名称
self.log.debug("解析文件路径:%s" % self.case_info[0])
self.log.debug("解析文件名称:%s" % self.case_info[1])
self.case_name = self.case_info[1]
self.xml_path = WorkBase().report_path + self.version + "\\" + str(self.case_name)[0:-3] + "\\" + self.create_time + "\\"
else:
self.log.debug("多个用例文件运行模式")
self.case_name = self.case["path"]
self.log.debug("测试路径:%s" % self.case_name)
self.xml_path = WorkBase().report_path + self.version + "\\" + "Zmpos" + "\\" + self.create_time + "\\"
# 判断是否创建报告路径
if os.path.exists(self.xml_path):
pass
else:
os.makedirs(self.xml_path)
self.log.debug("创建测试报告HTML文件路径......")
self.report_path = self.xml_path + "allure-report"
def run_case(self):
"""
运行测试用例
:return:
"""
self.log.debug("开始执行自动化测试用例......")
self.log.debug("打开测试用例路径......")
# case_path = self.base_path + self.path
# sys.path.append(case_path)
self.log.info("运行测试文件......")
pytest.main(["-q", "-s", "--alluredir=" + self.xml_path, self.case_name])
# 多进程
# pytest.main(["-q", "-s", "-n 5", "--alluredir=" + self.xml_path, self.case_name])
# 重试运行
# pytest.main(["-q", "-s", "--reruns 5", "--alluredir=" + self.xml_path, self.case_name])
self.log.info("运行结束......")
def run_xml(self):
"""
生成测试报告
:return:
"""
self.log.info("生成测试报告......")
cmd = "allure generate " + self.xml_path + " -o " + self.report_path + " --clean"
# self.log.debug(cmd)
os.system(cmd)
def run_main(self):
"""
加入校验的运行方法
:return:
"""
if self.version is None:
self.log.error("运行参数解析失败,请重新输入参数!")
self.log.error("可以输入 -h,获取帮助信息!")
else:
self.mkdir_case()
self.run_case()
self.run_xml()
# -*- coding: utf-8 -*-
# 随机数生成类
# 作者: 陈磊
# 时间: 2019-10-21
import random
import string
class UtilsRandom(object):
def __init__(self):
# [chr(i) for i in range(65,91)] 大写字母
# [chr(i) for i in range(97, 123)] 小写字母
# [str(i) for i in range(1, 10)] 1-10数字
self.library = [chr(i) for i in range(97, 123)] + [str(i) for i in range(1, 10)]
def get_random(self, size):
"""
:param size: 字符串长度
:return: 返回随机字符串
"""
num = random.sample(self.library, size)
num_str = ""
value = num_str.join(num)
return value
# -*- coding: utf-8 -*-
# Redis通用类
# 作者: 陈磊
# 时间: 2019-10-21
import redis
from WorkUtils.UtilsLog import UtilsLog
class UtilsRedis(object):
def __init__(self, host):
self.log = UtilsLog()
self.log.info("调用基础Redis通用类")
self.log.info(self.__class__)
self.host = host
def conn_redis(self, ):
self.log.debug("连接Redis")
pool = redis.ConnectionPool(host=self.host, port="6379")
r = redis.Redis(connection_pool=pool)
return r
if __name__ == '__main__':
rt = UtilsRedis(host="192.168.1.180",).conn_redis()
print(rt.get("checkCode_15151969022"))
\ No newline at end of file
# -*- coding: utf-8 -*-
# 报告生成报告类
# 作者: 陈磊
# 时间: 2019-10-21
import HTMLTestRunner
import unittest
import time
import sys
import os
from datetime import datetime
from WorkBase import WorkBase
from WorkUtils.UtilsLogOld import UtilsLogOld
class UtilsReport:
def __init__(self, case_info, description, version):
"""
:param case_info: 用例信息
:param description: 用例描述
:param version: 版本号
"""
self.log = UtilsLogOld()
self.log_path = self.log.path
self.log_name = self.log.logger_name
# 注释掉通过拼接方式生成路径
# self.case_path = WorkBase().case_path + case_path
self.case_info = case_info
self.case_path = case_info[0]
self.case_name = case_info[-1][0:-3]
self.report_path = ""
self.report_name = ""
self.description = description
self.create_time = datetime.today().strftime("%Y%m%d%H%M%S")
self.result = []
self.version = version
def run(self):
"""
运行方法
:return:
"""
self.log.debug("开始执行自动化测试用例......")
test_case = unittest.TestSuite()
self.log.debug("加载测试用例......")
sys.path.append(self.case_path)
case = __import__(self.case_name)
test_suite = [case.case_suite()]
for i in range(len(test_suite)):
test_case.addTests(test_suite[i])
self.log.debug("加载完成")
# 生成测试报告文件
# 更新获取的版本号生成文件
self.report_path = WorkBase().report_path + self.version + "\\" + self.case_name + "\\"
if os.path.exists(self.report_path):
pass
else:
os.makedirs(self.report_path)
self.report_name = self.description[0:6] + "_" + self.create_time + ".html"
report = self.report_path + self.report_name
# 打开文件
fp = open(report, "wb")
# 执行测试报告
runner = HTMLTestRunner.HTMLTestRunner(
stream=fp,
title="嫲嫲团接口自动化测试报告",
description="当前版本号:"+str(self.version)+",测试模块:"+self.description,
verbosity=2
)
self.log.debug("自动化任务执行开始")
result = runner.run(test_case)
self.log.debug("保存文件...")
self.log.debug("测试报告文件路径:")
self.log.debug(self.report_path)
self.log.debug("测试报告文件名称:")
self.log.debug(self.report_name)
self.log.debug("日志文件路径:")
self.log.debug(self.log_path)
self.log.debug("日志文件名称:")
self.log.debug(self.log_name)
# 保存文件
self.log.debug("自动化任务执行结束")
self.result = result.result
fp.close()
self.print_result()
def print_result(self):
"""
输出测试结果方法
:return:
"""
time.sleep(2)
success = 0
failures = 0
failures_list = []
errors = 0
errors_list = []
total = len(self.result)
for x, y in enumerate(self.result):
if y[0] == 0:
success += 1
elif y[0] == 1:
failures += 1
failures_list.append(y[1])
elif y[0] == 2:
errors += 1
errors_list.append(y[1])
print("----------------------------------------------------------------")
# print("当前版本号:", self.version)
print("自动化任务结果预览:")
print("执行用例个数:", total)
print("成功:", success)
print("失败:", failures)
print("失败用例:", failures_list)
print("错误:", errors)
print("错误用例:", errors_list)
print("----------------------------------------------------------------")
def cmd_run(self):
"""
CMD下运行方法
:return:
"""
if self.description == "" or self.version == "":
self.log.error("运行参数解析失败,请重新输入参数!")
self.log.error("可以输入 -h,获取帮助信息!")
else:
self.run()
# -*- coding: utf-8 -*-
# 基础接口请求类
# 作者: 陈磊
# 时间: 2019-10-21
import requests
import urllib3
from WorkUtils.UtilsLog import UtilsLog
from requests.exceptions import ConnectTimeout, ReadTimeout, ConnectionError, RequestException
class UtilsRequest:
def __init__(self):
# 禁用验证
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
self.log = UtilsLog()
self.log.info("调用基础请求封装类")
self.log.info(self.__class__)
self.session = requests.Session()
self.api = requests
def post(self, url, headers=None, params=None, json=None, data=None):
"""
Post请求方法
:param url: 请求地址
:param headers: 请求头
:param params: 请求头参数
:param json: 请求JSON参数
:param data: 请求DATA参数
:return:
"""
self.log.info("调用Post请求方法")
self.log.debug("接口请求url:")
self.log.debug(url)
self.log.debug("接口请求headers:")
if headers is None:
headers = {}
self.log.debug(str(headers).replace('\'', '\"'))
self.log.debug("接口请求params:")
if params is None:
params = {}
self.log.debug(str(params).replace('\'', '\"'))
self.log.debug("接口请求json:")
if json is None:
json = {}
self.log.debug(str(json).replace('\'', '\"'))
self.log.debug("接口请求data:")
if data is None:
data = {}
self.log.debug(str(data).replace('\'', '\"'))
try:
response = self.api.post(url=url, headers=headers, params=params, json=json, data=data, timeout=60, verify=False)
code = response.status_code
text = response.text
if code != 200:
self.log.error("接口请求返回状态码错误:" + str(code))
self.log.info(text)
return text
else:
self.log.info("接口请求返回正常:")
self.log.info(text)
return text
except ConnectTimeout:
self.log.error("接口连接超时!")
return False
except ReadTimeout:
self.log.error("接口请求超时!")
return False
except ConnectionError:
self.log.error("接口请求错误!")
return False
except RequestException:
self.log.error("接口请求返回错误!")
return False
def get(self, url, headers=None, params=None):
"""
Get请求方法
:param url: 请求地址
:param headers: 请求头
:param params: 请求头参数
:return:
"""
self.log.info("调用Get请求方法")
self.log.debug("接口请求url:")
self.log.debug(url)
self.log.debug("接口请求headers:")
if headers is None:
headers = {}
self.log.debug(str(headers).replace('\'', '\"'))
self.log.debug("接口请求params:")
if params is None:
params = {}
self.log.debug(str(params).replace('\'', '\"'))
self.log.debug("接口请求json:")
try:
response = self.api.get(url=url, headers=headers, params=params, timeout=60, verify=False)
code = response.status_code
text = response.text
if code != 200:
self.log.error("接口请求返回状态码错误:" + str(code))
self.log.info(text)
return text
else:
self.log.info("接口请求返回正常:")
self.log.info(text)
return text
except ConnectTimeout:
self.log.error("接口连接超时!")
return False
except ReadTimeout:
self.log.error("接口请求超时!")
return False
except ConnectionError:
self.log.error("接口请求错误!")
return False
except RequestException:
self.log.error("接口请求返回错误!")
return False
# -*- coding: utf-8 -*-
# 基础接口返回信息解析类
# 作者: 陈磊
# 时间: 2019-10-21
import json
from WorkUtils.UtilsLog import UtilsLog
class UtilsResponse:
def __init__(self):
self.log = UtilsLog()
self.log.info("调用基础接口返回信息解析类")
self.log.info(self.__class__)
def get_code(self, response):
"""
:param response: 返回信息
:return: 返回value
"""
self.log.debug(u"解析返回信息中code:")
try:
value = json.loads(response)["code"]
self.log.debug(value)
return value
except KeyError as error:
self.log.error(u"json解析出错缺少Key:")
self.log.error(error)
return error
def get_status(self, response):
"""
:param response: 返回信息
:return: 返回value
"""
self.log.debug(u"解析返回信息中status:")
try:
value = json.loads(response)["status"]
self.log.debug(value)
return value
except KeyError as error:
self.log.error(u"json解析出错缺少Key:")
self.log.error(error)
return error
def get_msg(self, response):
"""
:param response: 返回信息
:return: 返回value
"""
self.log.debug(u"解析返回信息中msg:")
try:
value = json.loads(response)["msg"]
self.log.debug(value)
return value
except KeyError as error:
self.log.error(u"json解析出错缺少Key:")
self.log.error(error)
return error
def get_result(self, response):
"""
:param response: 返回信息
:return: 返回value
"""
self.log.debug(u"解析返回信息中result:")
try:
value = json.loads(response)["result"]
self.log.debug(value)
return value
except KeyError as error:
self.log.error(u"json解析出错缺少Key:")
self.log.error(error)
return error
def get_values_from_key(self, _dict, key):
"""
根据key得到所有value
:return:
"""
values = []
self.log.debug("查询")
self.log.debug("key=" + str(key))
for x, y in enumerate(_dict):
values.append(y[key])
self.log.debug("values=" + str(values))
return values
def get_values_from_sql(self, _sql, key):
"""
根据key得到所有value
:return:
"""
values = []
self.log.debug("查询")
self.log.debug("key=" + str(key))
for x, y in enumerate(_sql):
# self.log.debug(y.__dict__)
values.append(y.__dict__.get(key))
self.log.debug("values=" + str(values))
return values
def check_sort(self, _list, key, possible):
self.log.debug("校验排序")
if possible == "ASC":
start = 0
for x, y in enumerate(_list):
assert y[key] > start
start = y[key]
elif possible == "DESC":
start = float("inf")
for x, y in enumerate(_list):
assert y[key] < start
start = y[key]
else:
start = 0
for x, y in enumerate(_list):
assert y[key] > start
start = y[key]
def get_data(self, response):
"""
:param response: 返回信息
:return: 返回value
"""
self.log.debug(u"解析返回信息中data:")
try:
value = json.loads(response)["data"]
self.log.debug(value)
return value
except KeyError as error:
self.log.error(u"json解析出错缺少Key:")
self.log.error(error)
return error
# -*- coding: utf-8 -*-
# 多线程类
# 作者: 陈磊
# 时间: 2019-10-21
import threading
import requests
import time
import json
from WorkUtils.UtilsLog import UtilsLog
class UtilsThread:
def __init__(self, url, headers, params, method, thread_num, every_num):
self.log = UtilsLog()
self.log.info("调用多线程类")
self.log.info(self.__class__)
self.url = url
self.headers = headers
self.params = params
self.method = method
self.thread_num = thread_num
self.every_num = every_num
self.response = ""
self.responses = []
def thread_api(self, headers):
try:
if self.method == "Post":
self.response = requests.post(url=self.url, headers=headers, params=self.params)
return self.response
if self.method == "Get":
self.response = requests.get(url=self.url, headers=headers, params=self.params)
return self.response
else:
return False
except requests.ConnectionError:
return False
def thread_error(self, headers):
try:
code = self.thread_api(headers).status_code
if code == 200:
text = self.thread_api(headers).text
self.log.info("状态码:" + str(code))
self.log.info(text)
self.responses.append(text)
return True
else:
self.log.info("状态码:" + str(code))
return False
except AttributeError:
self.log.info("请求异常")
return False
def thread_work(self, headers):
self.log.info("请求地址:" + self.url)
thread_name = threading.currentThread().getName()
self.log.info(time.asctime(time.localtime(time.time())))
self.log.info(thread_name + "线程开始")
self.thread_api(headers)
self.thread_error(headers)
self.log.info(thread_name + "线程结束...")
self.log.info(time.asctime(time.localtime(time.time())))
def thread_run(self):
threads = []
for i in range(self.thread_num):
self.log.info(self.headers[i])
t = threading.Thread(target=self.thread_work(self.headers[i]))
t.setDaemon(True)
threads.append(t)
for t in threads:
t.start()
# 启动所有线程
for t in threads:
t.join()
if __name__ == '__main__':
# h = [
# {'userToken': '74b6873e1240f1d2d16edd692b8cf7e4', 'platForm': 'Test'},
# {'userToken': 'cc18e1c142d05f5d2df543ec63c2fbaf', 'platForm': 'Test'}
# ]
# a = UtilsThread(url="http://shop.uat.cnmmt.com/rapi/coupon/add", headers=h,
# params={'coupon_id': '179'}, method="Post", thread_num=2, every_num=1)
# a.thread_run()
import datetime
a = time.strftime("%Y-%m-%d 00:00:00")
print()
# -*- coding: utf-8 -*-
# 配置文件yaml解析类
# 作者: 陈磊
# 时间: 2019-10-21
import yaml
import os
from WorkUtils.UtilsLog import UtilsLog
from WorkBase import WorkBase
class UtilsYaml:
def __init__(self, filename):
self.log = UtilsLog()
self.log.info("调用配置文件yaml解析类")
self.log.info(self.__class__)
self.filename = filename
def read(self):
self.log.debug("打开yaml文件")
yaml_file = os.path.join(WorkBase().yaml_path + self.filename)
self.log.debug(yaml_file)
with open(yaml_file, "rb") as f:
info = f.read()
r_info = yaml.load(info)
self.log.debug("成功解析yaml文件...")
self.log.debug(r_info)
return r_info
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment