Utils.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. import json
  2. import re
  3. import random
  4. import time
  5. from datetime import datetime
  6. from http.client import RemoteDisconnected
  7. import callback
  8. def camel_to_snake(name: str) -> str:
  9. # 匹配大写字母,并在前面加上下划线,然后转换为小写
  10. return re.sub(r'(?<!^)(?=[A-Z])', '_', name).lower()
  11. def convert_columns_to_snake_case(columns: str) -> str:
  12. # 拆分字符串为字段列表,去除空格
  13. fields = [field.strip() for field in columns.split(",")]
  14. # 转换每个字段为下划线格式
  15. snake_case_fields = [camel_to_snake(field) for field in fields]
  16. # 重新组合回逗号分隔的字符串
  17. return ", ".join(snake_case_fields)
  18. def random_float():
  19. # 生成一个随机的浮点数并四舍五入到14位小数
  20. return round(random.random(), 14)
  21. def data_time():
  22. # 输出当前时间,精确到毫秒
  23. return datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] # `[:-3]` 用于截取前3位保留到毫秒
  24. def get_conn_result(conn, file_name=""):
  25. max_retries = 3
  26. for attempt in range(max_retries):
  27. try:
  28. result = conn.getresponse()
  29. return result
  30. except RemoteDisconnected:
  31. if attempt < max_retries - 1:
  32. print(f"RemoteDisconnected,{file_name} - {data_time()} 重试 {attempt + 1}/{max_retries}...")
  33. time.sleep(10) # 等待 10 秒后重试
  34. else:
  35. print(f"{file_name} - {data_time()}请求失败,达到最大重试次数")
  36. raise
  37. def get_page_result(http_url, this_page, this_size, payload, headers, conn, savetodb: callback, file_name=""):
  38. this_http_url = f"{http_url}?rand={random_float()}&page={this_page}&size={this_size}"
  39. print(f"请求url={this_http_url}")
  40. conn.request("POST",
  41. this_http_url,
  42. payload, headers)
  43. data = get_conn_result(conn, file_name).read()
  44. json_str = data.decode("utf-8")
  45. try:
  46. json_obj = json.loads(json_str)
  47. print("获取到总条数为:", json_obj.get("totalPages"))
  48. data_to_insert = json_obj.get("content")
  49. # if data_to_insert is not None:
  50. # # 先睡一秒,确保峰值不会太高
  51. # sleep(1)
  52. print(f"\n开始插入数据==>{this_page}*{this_size}\n")
  53. savetodb(data_to_insert)
  54. if json_obj.get("totalPages") > 1:
  55. try:
  56. for i in range(1, json_obj.get("totalPages")):
  57. this_page = i
  58. print(f"请求url={http_url}?rand={random_float()}&page={this_page}&size={this_size}")
  59. conn.request("POST",
  60. f"{http_url}?rand={random_float()}&page={this_page}&size={this_size}",
  61. payload, headers)
  62. data = get_conn_result(conn, file_name).read()
  63. json_str = data.decode("utf-8")
  64. print("JSON String", json_str)
  65. json_obj_arr = json.loads(json_str)
  66. print(f"获取到总页数为:{json_obj_arr.get("totalPages")}页")
  67. data_to_insert = json_obj_arr.get("content")
  68. # if data_to_insert is not None:
  69. # # 先睡一秒,确保峰值不会太高
  70. # time.sleep(1)
  71. print(f"\n开始插入数据==>{i}页 * {this_size}\n")
  72. savetodb(data_to_insert)
  73. except json.JSONDecodeError as e:
  74. print(f"循环 Error decoding JSON: {e}")
  75. except json.JSONDecodeError as e:
  76. print(f"请求 Error decoding JSON: {e}")
  77. # 创建一个方法写入日志到根目录下runtime/log/,错误的是 error.log,正确的是 success.log
  78. def write_log(log_type, log_content):
  79. with open(f"runtime/log/{log_type}.log", "a", encoding="utf-8") as f:
  80. f.write(log_content + "\n")
  81. f.close()