|
|
@@ -0,0 +1,131 @@
|
|
|
+import json
|
|
|
+import random
|
|
|
+import time
|
|
|
+
|
|
|
+from Config import mysql_pool, conn, headers, page, size
|
|
|
+
|
|
|
+print("开始时间(精确到毫秒):", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), time.time() * 1000)
|
|
|
+
|
|
|
+
|
|
|
+# 因为这里数据一条人员信息amac_member_user中包含了对应的多个证书,所以需要先循环把证书拿出来
|
|
|
+def savetodb(data):
|
|
|
+ # 判断是否为空,为空则跳过直接返回
|
|
|
+ if data is None: return
|
|
|
+
|
|
|
+ person_record = []
|
|
|
+ cert_history_record = []
|
|
|
+ for item in data:
|
|
|
+ # 构建人员信息记录
|
|
|
+ person_record.append({
|
|
|
+ 'account_id': item.get("accountId"),
|
|
|
+ 'user_id': item.get("userId"),
|
|
|
+ 'user_name': item.get("userName"),
|
|
|
+ 'sex': item.get("sex"),
|
|
|
+ 'org_name': item.get("orgName"),
|
|
|
+ 'own_org_name': item.get("ownOrgName"),
|
|
|
+ 'cert_code': item.get("certCode"),
|
|
|
+ 'cert_name': item.get("certName"),
|
|
|
+ 'education': item.get("education"),
|
|
|
+ 'education_name': item.get("educationName"),
|
|
|
+ 'cert_obtain_date': item.get("certObtainDate"),
|
|
|
+ 'cert_end_date': item.get("certEndDate"),
|
|
|
+ 'cert_status_change_times': item.get("certStatusChangeTimes"),
|
|
|
+ 'credit_record_num': item.get("creditRecordNum"),
|
|
|
+ 'status': item.get("status"),
|
|
|
+ 'status_name': item.get("statusName"),
|
|
|
+ 'office_state': item.get("officeState"),
|
|
|
+ 'removed': item.get("removed"),
|
|
|
+ 'biz_id': item.get("bizId"),
|
|
|
+ 'apply_id': item.get("applyId"),
|
|
|
+ 'apply_status': item.get("applyStatus"),
|
|
|
+ 'person_photo_base64': item.get("personPhotoBase64")
|
|
|
+ })
|
|
|
+
|
|
|
+ # 获取当前人员的 user_id
|
|
|
+ user_id = item.get("userId")
|
|
|
+ person_cert_history_list = item.get("personCertHistoryList", [])
|
|
|
+ # 判断是否为空,为空则跳过直接返回
|
|
|
+ if person_cert_history_list is None: continue
|
|
|
+ if person_cert_history_list:
|
|
|
+ for person_cert_history_item in person_cert_history_list:
|
|
|
+ cert_history_record.append({
|
|
|
+ 'id': person_cert_history_item.get('id'),
|
|
|
+ 'user_id': person_cert_history_item.get('userId', user_id),
|
|
|
+ 'org_name': person_cert_history_item.get('orgName'),
|
|
|
+ 'cert_code': person_cert_history_item.get('certCode'),
|
|
|
+ 'cert_name': person_cert_history_item.get('certName'),
|
|
|
+ 'cert_obtain_date': person_cert_history_item.get('certObtainDate'),
|
|
|
+ 'cert_end_date': person_cert_history_item.get('certEndDate'),
|
|
|
+ 'status_name': person_cert_history_item.get('statusName'),
|
|
|
+ 'status': person_cert_history_item.get('status'),
|
|
|
+ 'creation_date': person_cert_history_item.get('creationDate'),
|
|
|
+ 'qlf_id': person_cert_history_item.get('qlfId'),
|
|
|
+ 'size': person_cert_history_item.get('size')
|
|
|
+ })
|
|
|
+
|
|
|
+ # 批量插入
|
|
|
+ mysql_pool.insert('amac_member_user', person_record)
|
|
|
+ mysql_pool.insert('amac_member_user_person_cert', cert_history_record)
|
|
|
+
|
|
|
+
|
|
|
+# 开始获取amac_person_org_registration表中的数据并进行数据抓取
|
|
|
+amac_org_type = mysql_pool.fetchall("select * from amac_person_org_registration")
|
|
|
+# amac_org_type = [{"user_id": "2309140927320704"}]
|
|
|
+
|
|
|
+for amac_org_type_item in amac_org_type:
|
|
|
+ user_id = amac_org_type_item.get("user_id")
|
|
|
+ this_page = page
|
|
|
+ this_size = size
|
|
|
+ print(f"开始处理人员查询:{user_id}")
|
|
|
+ payload = json.dumps(
|
|
|
+ {"userName": "", "certCode": "", "certName": "", "userId": str(user_id), "page": "1"})
|
|
|
+
|
|
|
+ http_url = "/amac-infodisc/api/pof/person"
|
|
|
+ random_float = round(random.random(), 14)
|
|
|
+ print(f"请求url={http_url}?rand={random_float}&page={this_page}&size={this_size}")
|
|
|
+
|
|
|
+ conn.request("POST",
|
|
|
+ f"{http_url}?rand={random_float}&page={this_page}&size={this_size}",
|
|
|
+ payload, headers)
|
|
|
+
|
|
|
+ res = conn.getresponse()
|
|
|
+ data = res.read()
|
|
|
+ json_str = data.decode("utf-8")
|
|
|
+ # print("JSON String", json_str)
|
|
|
+
|
|
|
+ try:
|
|
|
+ json_obj = json.loads(json_str)
|
|
|
+ print("获取到总条数为:", json_obj.get("totalPages"))
|
|
|
+
|
|
|
+ data_to_insert = json_obj.get("content")
|
|
|
+
|
|
|
+ if data_to_insert is not None:
|
|
|
+ # 先睡一秒,确保峰值不会太高
|
|
|
+ time.sleep(1)
|
|
|
+
|
|
|
+ print(f"\n开始插入数据==>{this_page}*{this_size}\n")
|
|
|
+ savetodb(data_to_insert)
|
|
|
+
|
|
|
+ if json_obj.get("totalPages") > 1:
|
|
|
+ for i in range(1, json_obj.get("totalPages")):
|
|
|
+ this_page = i
|
|
|
+ print(f"请求url={http_url}?rand={random_float}&page={this_page}&size={this_size}")
|
|
|
+ conn.request("POST",
|
|
|
+ f"{http_url}?rand={random_float}&page={this_page}&size={this_size}",
|
|
|
+ payload, headers)
|
|
|
+ res = conn.getresponse()
|
|
|
+ data = res.read()
|
|
|
+ json_str = data.decode("utf-8")
|
|
|
+ print("JSON String", json_str)
|
|
|
+ try:
|
|
|
+ json_obj_arr = json.loads(json_str)
|
|
|
+ print("获取到总条数为:", json_obj_arr.get("totalPages"))
|
|
|
+ data_to_insert = json_obj_arr.get("content")
|
|
|
+ print(f"\n开始插入数据==>{i}*{this_size}\n")
|
|
|
+ savetodb(data_to_insert)
|
|
|
+ except json.JSONDecodeError as e:
|
|
|
+ print(f"循环 Error decoding JSON: {e}")
|
|
|
+ except json.JSONDecodeError as e:
|
|
|
+ print(f"请求 Error decoding JSON: {e}")
|
|
|
+
|
|
|
+ print("结束时间(精确到毫秒):", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), time.time() * 1000)
|