amac_private_fund_manager_api.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. import json
  2. import time
  3. import Utils
  4. from time import sleep
  5. from http.client import RemoteDisconnected
  6. from Config import mysql_pool, conn, headers, page, size
  7. print("开始时间(精确到毫秒):", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), time.time() * 1000)
  8. # 因为这里数据一条人员信息amac_member_user中包含了对应的多个证书,所以需要先循环把证书拿出来
  9. def savetodb(data):
  10. # 判断是否为空,为空则跳过直接返回
  11. if data is None: return
  12. person_record = []
  13. for item in data:
  14. # 构建人员信息记录
  15. this_data = {
  16. 'id': item.get("id"),
  17. 'manager_id': "",
  18. 'fund_no': item.get("fundNo", ""),
  19. 'fund_name': item.get("fundName"),
  20. 'manager_name': item.get("managerName"),
  21. 'manager_type': item.get("managerType"),
  22. 'working_state': item.get("workingState"),
  23. 'put_on_record_date': item.get("putOnRecordDate"),
  24. 'last_quarter_update': item.get("lastQuarterUpdate"),
  25. 'is_depute_manage': item.get("isDeputeManage"),
  26. 'url': item.get("url"),
  27. 'establish_date': item.get("establishDate"),
  28. 'manager_url': item.get("managerUrl"),
  29. 'mandator_name': item.get("mandatorName")
  30. }
  31. managers_info = item.get("managersInfo", [])
  32. # 找到到对应的managerId
  33. if managers_info:
  34. for info in managers_info:
  35. this_data['manager_id'] = info.get("managerId", "")
  36. break
  37. person_record.append(this_data)
  38. # 批量插入
  39. mysql_pool.insert('amac_private_fund_manager', person_record)
  40. this_page = page
  41. this_size = size
  42. payload = json.dumps({})
  43. http_url = "/amac-infodisc/api/pof/fund"
  44. print(f"请求url={http_url}?rand={Utils.random_float()}&page={this_page}&size={this_size}")
  45. conn.request("POST",
  46. f"{http_url}?rand={Utils.random_float()}&page={this_page}&size={this_size}",
  47. payload, headers)
  48. data = Utils.get_conn_result(conn, __file__).read()
  49. json_str = data.decode("utf-8")
  50. try:
  51. json_obj = json.loads(json_str)
  52. print("获取到总条数为:", json_obj.get("totalPages"))
  53. data_to_insert = json_obj.get("content")
  54. if data_to_insert is not None:
  55. # 先睡一秒,确保峰值不会太高
  56. sleep(1)
  57. print(f"\n开始插入数据==>{this_page}*{this_size}\n")
  58. savetodb(data_to_insert)
  59. if json_obj.get("totalPages") > 1:
  60. for i in range(1, json_obj.get("totalPages")):
  61. this_page = i
  62. print(f"请求url={http_url}?rand={Utils.random_float()}&page={this_page}&size={this_size}")
  63. conn.request("POST",
  64. f"{http_url}?rand={Utils.random_float()}&page={this_page}&size={this_size}",
  65. payload, headers)
  66. res = conn.getresponse()
  67. data = res.read()
  68. json_str = data.decode("utf-8")
  69. print("JSON String", json_str)
  70. try:
  71. json_obj_arr = json.loads(json_str)
  72. print("获取到总条数为:", json_obj_arr.get("totalPages"))
  73. data_to_insert = json_obj_arr.get("content")
  74. if data_to_insert is not None:
  75. # 先睡一秒,确保峰值不会太高
  76. time.sleep(1)
  77. print(f"\n开始插入数据==>{i}*{this_size}\n")
  78. savetodb(data_to_insert)
  79. except json.JSONDecodeError as e:
  80. print(f"循环 Error decoding JSON: {e}")
  81. except json.JSONDecodeError as e:
  82. print(f"请求 Error decoding JSON: {e}")
  83. print("结束时间(精确到毫秒):", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), time.time() * 1000)