Utils.py 3.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. import json
  2. import re
  3. import random
  4. import time
  5. from datetime import datetime
  6. from http.client import RemoteDisconnected
  7. import callback
  8. def camel_to_snake(name: str) -> str:
  9. # 匹配大写字母,并在前面加上下划线,然后转换为小写
  10. return re.sub(r'(?<!^)(?=[A-Z])', '_', name).lower()
  11. def convert_columns_to_snake_case(columns: str) -> str:
  12. # 拆分字符串为字段列表,去除空格
  13. fields = [field.strip() for field in columns.split(",")]
  14. # 转换每个字段为下划线格式
  15. snake_case_fields = [camel_to_snake(field) for field in fields]
  16. # 重新组合回逗号分隔的字符串
  17. return ", ".join(snake_case_fields)
  18. def random_float():
  19. # 生成一个随机的浮点数并四舍五入到14位小数
  20. return round(random.random(), 14)
  21. def data_time():
  22. # 输出当前时间,精确到毫秒
  23. return datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] # `[:-3]` 用于截取前3位保留到毫秒
  24. def get_conn_result(conn, file_name=""):
  25. max_retries = 3
  26. for attempt in range(max_retries):
  27. try:
  28. result = conn.getresponse()
  29. return result
  30. except RemoteDisconnected:
  31. if attempt < max_retries - 1:
  32. print(f"RemoteDisconnected,{file_name} - {data_time()} 重试 {attempt + 1}/{max_retries}...")
  33. time.sleep(10) # 等待 10 秒后重试
  34. else:
  35. print(f"{file_name} - {data_time()}请求失败,达到最大重试次数")
  36. raise
  37. def get_page_result(http_url, this_page, this_size, payload, headers, conn, savetodb: callback, file_name=""):
  38. print(f"请求url={http_url}?rand={random_float()}&page={this_page}&size={this_size}")
  39. conn.request("POST",
  40. f"{http_url}?rand={random_float()}&page={this_page}&size={this_size}",
  41. payload, headers)
  42. data = get_conn_result(conn, file_name).read()
  43. json_str = data.decode("utf-8")
  44. try:
  45. json_obj = json.loads(json_str)
  46. print("获取到总条数为:", json_obj.get("totalPages"))
  47. data_to_insert = json_obj.get("content")
  48. # if data_to_insert is not None:
  49. # # 先睡一秒,确保峰值不会太高
  50. # sleep(1)
  51. print(f"\n开始插入数据==>{this_page}*{this_size}\n")
  52. savetodb(data_to_insert)
  53. if json_obj.get("totalPages") > 1:
  54. try:
  55. for i in range(1, json_obj.get("totalPages")):
  56. this_page = i
  57. print(f"请求url={http_url}?rand={random_float()}&page={this_page}&size={this_size}")
  58. conn.request("POST",
  59. f"{http_url}?rand={random_float()}&page={this_page}&size={this_size}",
  60. payload, headers)
  61. data = get_conn_result(conn, file_name).read()
  62. json_str = data.decode("utf-8")
  63. print("JSON String", json_str)
  64. json_obj_arr = json.loads(json_str)
  65. print(f"获取到总页数为:{json_obj_arr.get("totalPages")}页")
  66. data_to_insert = json_obj_arr.get("content")
  67. # if data_to_insert is not None:
  68. # # 先睡一秒,确保峰值不会太高
  69. # time.sleep(1)
  70. print(f"\n开始插入数据==>{i}页 * {this_size}\n")
  71. savetodb(data_to_insert)
  72. except json.JSONDecodeError as e:
  73. print(f"循环 Error decoding JSON: {e}")
  74. except json.JSONDecodeError as e:
  75. print(f"请求 Error decoding JSON: {e}")