| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104 |
- import json
- import re
- import random
- import time
- from datetime import datetime
- from http.client import RemoteDisconnected
- import callback
- def camel_to_snake(name: str) -> str:
- # 匹配大写字母,并在前面加上下划线,然后转换为小写
- return re.sub(r'(?<!^)(?=[A-Z])', '_', name).lower()
- def convert_columns_to_snake_case(columns: str) -> str:
- # 拆分字符串为字段列表,去除空格
- fields = [field.strip() for field in columns.split(",")]
- # 转换每个字段为下划线格式
- snake_case_fields = [camel_to_snake(field) for field in fields]
- # 重新组合回逗号分隔的字符串
- return ", ".join(snake_case_fields)
- def random_float():
- # 生成一个随机的浮点数并四舍五入到14位小数
- return round(random.random(), 14)
- def data_time():
- # 输出当前时间,精确到毫秒
- return datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] # `[:-3]` 用于截取前3位保留到毫秒
- def get_conn_result(conn, file_name=""):
- max_retries = 3
- for attempt in range(max_retries):
- try:
- result = conn.getresponse()
- return result
- except RemoteDisconnected:
- if attempt < max_retries - 1:
- print(f"RemoteDisconnected,{file_name} - {data_time()} 重试 {attempt + 1}/{max_retries}...")
- time.sleep(10) # 等待 10 秒后重试
- else:
- print(f"{file_name} - {data_time()}请求失败,达到最大重试次数")
- raise
- def get_page_result(http_url, this_page, this_size, payload, headers, conn, savetodb: callback, file_name=""):
- this_http_url = f"{http_url}?rand={random_float()}&page={this_page}&size={this_size}"
- print(f"请求url={this_http_url}")
- conn.request("POST",
- this_http_url,
- payload, headers)
- data = get_conn_result(conn, file_name).read()
- json_str = data.decode("utf-8")
- try:
- json_obj = json.loads(json_str)
- print("获取到总条数为:", json_obj.get("totalPages"))
- data_to_insert = json_obj.get("content")
- # if data_to_insert is not None:
- # # 先睡一秒,确保峰值不会太高
- # sleep(1)
- print(f"\n开始插入数据==>{this_page}*{this_size}\n")
- savetodb(data_to_insert)
- if json_obj.get("totalPages") > 1:
- try:
- for i in range(1, json_obj.get("totalPages")):
- this_page = i
- print(f"请求url={http_url}?rand={random_float()}&page={this_page}&size={this_size}")
- conn.request("POST",
- f"{http_url}?rand={random_float()}&page={this_page}&size={this_size}",
- payload, headers)
- data = get_conn_result(conn, file_name).read()
- json_str = data.decode("utf-8")
- print("JSON String", json_str)
- json_obj_arr = json.loads(json_str)
- print(f"获取到总页数为:{json_obj_arr.get("totalPages")}页")
- data_to_insert = json_obj_arr.get("content")
- # if data_to_insert is not None:
- # # 先睡一秒,确保峰值不会太高
- # time.sleep(1)
- print(f"\n开始插入数据==>{i}页 * {this_size}\n")
- savetodb(data_to_insert)
- except json.JSONDecodeError as e:
- print(f"循环 Error decoding JSON: {e}")
- except json.JSONDecodeError as e:
- print(f"请求 Error decoding JSON: {e}")
- # 创建一个方法写入日志到根目录下runtime/log/,错误的是 error.log,正确的是 success.log
- def write_log(log_type, log_content):
- with open(f"runtime/log/{log_type}.log", "a", encoding="utf-8") as f:
- f.write(log_content + "\n")
- f.close()
|