import json import re import random import time from datetime import datetime from http.client import RemoteDisconnected import callback def camel_to_snake(name: str) -> str: # 匹配大写字母,并在前面加上下划线,然后转换为小写 return re.sub(r'(? str: # 拆分字符串为字段列表,去除空格 fields = [field.strip() for field in columns.split(",")] # 转换每个字段为下划线格式 snake_case_fields = [camel_to_snake(field) for field in fields] # 重新组合回逗号分隔的字符串 return ", ".join(snake_case_fields) def random_float(): # 生成一个随机的浮点数并四舍五入到14位小数 return round(random.random(), 14) def data_time(): # 输出当前时间,精确到毫秒 return datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] # `[:-3]` 用于截取前3位保留到毫秒 def get_conn_result(conn, file_name=""): max_retries = 3 for attempt in range(max_retries): try: result = conn.getresponse() return result except RemoteDisconnected: if attempt < max_retries - 1: print(f"RemoteDisconnected,{file_name} - {data_time()} 重试 {attempt + 1}/{max_retries}...") time.sleep(10) # 等待 10 秒后重试 else: print(f"{file_name} - {data_time()}请求失败,达到最大重试次数") raise def get_page_result(http_url, this_page, this_size, payload, headers, conn, savetodb: callback, file_name=""): this_http_url = f"{http_url}?rand={random_float()}&page={this_page}&size={this_size}" print(f"请求url={this_http_url}") conn.request("POST", this_http_url, payload, headers) data = get_conn_result(conn, file_name).read() json_str = data.decode("utf-8") try: json_obj = json.loads(json_str) print("获取到总条数为:", json_obj.get("totalPages")) data_to_insert = json_obj.get("content") # if data_to_insert is not None: # # 先睡一秒,确保峰值不会太高 # sleep(1) print(f"\n开始插入数据==>{this_page}*{this_size}\n") savetodb(data_to_insert) if json_obj.get("totalPages") > 1: try: for i in range(1, json_obj.get("totalPages")): this_page = i print(f"请求url={http_url}?rand={random_float()}&page={this_page}&size={this_size}") conn.request("POST", f"{http_url}?rand={random_float()}&page={this_page}&size={this_size}", payload, headers) data = get_conn_result(conn, file_name).read() json_str = data.decode("utf-8") print("JSON String", json_str) json_obj_arr = json.loads(json_str) print(f"获取到总页数为:{json_obj_arr.get("totalPages")}页") data_to_insert = json_obj_arr.get("content") # if data_to_insert is not None: # # 先睡一秒,确保峰值不会太高 # time.sleep(1) print(f"\n开始插入数据==>{i}页 * {this_size}\n") savetodb(data_to_insert) except json.JSONDecodeError as e: print(f"循环 Error decoding JSON: {e}") except json.JSONDecodeError as e: print(f"请求 Error decoding JSON: {e}") # 创建一个方法写入日志到根目录下runtime/log/,错误的是 error.log,正确的是 success.log def write_log(log_type, log_content): with open(f"runtime/log/{log_type}.log", "a", encoding="utf-8") as f: f.write(log_content + "\n") f.close()