Parcourir la source

完成个初版

sptkw il y a 1 an
Parent
commit
2428da23c3

+ 27 - 0
Config.py

@@ -0,0 +1,27 @@
+import http.client
+
+from MySQLPool import MySQLPool
+
+# 示例:初始化连接池
+mysql_pool = MySQLPool(
+    host='10.10.10.230',
+    user='py_amac',
+    password='py_amac',
+    database='py_amac'
+)
+
+
+conn = http.client.HTTPSConnection("gs.amac.org.cn")
+
+payload = "{}"
+
+headers = {
+    'Accept': "*/*",
+    'Accept-Encoding': "gzip, deflate, br",
+    'User-Agent': "PostmanRuntime-ApipostRuntime/1.1.0",
+    'Connection': "keep-alive",
+    'Content-Type': "application/json"
+}
+
+page = 0
+size = 20

+ 93 - 0
MySQLPool.py

@@ -0,0 +1,93 @@
+import pymysql
+import Utils
+import re
+from dbutils.pooled_db import PooledDB
+
+
+class MySQLPool:
+    def __init__(self, host, user, password, database, mincached=1, maxcached=10):
+        self.pool = PooledDB(
+            creator=pymysql,  # 使用 pymysql 作为连接数据库的驱动
+            host=host,
+            user=user,
+            password=password,
+            database=database,
+            mincached=mincached,  # 初始化时创建的空连接数量
+            maxcached=maxcached,  # 连接池中最多闲置的连接数量
+            cursorclass=pymysql.cursors.DictCursor  # 返回字典格式的查询结果
+        )
+
+    def execute(self, sql, params=None):
+        """ 执行SQL语句 """
+        conn = self.pool.connection()
+        try:
+            with conn.cursor() as cursor:
+                cursor.execute(sql, params)
+                conn.commit()
+        except Exception as e:
+            print(f"Error executing SQL: {e}")
+            conn.rollback()
+        finally:
+            conn.close()
+
+    def fetchall(self, sql, params=None):
+        """ 查询多条记录 """
+        conn = self.pool.connection()
+        try:
+            with conn.cursor() as cursor:
+                cursor.execute(sql, params)
+                return cursor.fetchall()
+        except Exception as e:
+            print(f"Error fetching data: {e}")
+            return []
+        finally:
+            conn.close()
+
+    def insert(self, table, data):
+        """
+        批量插入数据
+        :param table: 表名
+        :param data: 列表,每个元素是一个字典,表示一行数据
+        """
+        if not data:
+            return
+        columns = ', '.join(data[0].keys())
+        columns = Utils.convert_columns_to_snake_case(columns)
+        values_placeholder = ', '.join(['%s'] * len(data[0]))
+        values = [tuple(item.values()) for item in data]
+
+        sql = f"INSERT INTO {table} ({columns}) VALUES ({values_placeholder})"
+        print("行数据sql=>", sql)
+        conn = self.pool.connection()
+        try:
+            with conn.cursor() as cursor:
+                cursor.executemany(sql, values)
+                conn.commit()
+        except Exception as e:
+            print(f"Error inserting data: {e}")
+            conn.rollback()
+        finally:
+            conn.close()
+
+    def update(self, table, data, condition):
+        """
+        批量更新数据
+        :param table: 表名
+        :param data: 字典,更新的字段和值
+        :param condition: 字典,更新条件
+        """
+        set_clause = ', '.join([f"{key} = %s" for key in data.keys()])
+        condition_clause = ' AND '.join([f"{key} = %s" for key in condition.keys()])
+        values = list(data.values()) + list(condition.values())
+
+        sql = f"UPDATE {table} SET {set_clause} WHERE {condition_clause}"
+        conn = self.pool.connection()
+        try:
+            with conn.cursor() as cursor:
+                cursor.execute(sql, values)
+                conn.commit()
+        except Exception as e:
+            print(f"Error updating data: {e}")
+            conn.rollback()
+        finally:
+            conn.close()

+ 21 - 0
Utils.py

@@ -0,0 +1,21 @@
+import re
+import random
+
+
+def camel_to_snake(name: str) -> str:
+    # 匹配大写字母,并在前面加上下划线,然后转换为小写
+    return re.sub(r'(?<!^)(?=[A-Z])', '_', name).lower()
+
+
+def convert_columns_to_snake_case(columns: str) -> str:
+    # 拆分字符串为字段列表,去除空格
+    fields = [field.strip() for field in columns.split(",")]
+    # 转换每个字段为下划线格式
+    snake_case_fields = [camel_to_snake(field) for field in fields]
+    # 重新组合回逗号分隔的字符串
+    return ", ".join(snake_case_fields)
+
+
+def random_float():
+    # 生成一个随机的浮点数并四舍五入到14位小数
+    return round(random.random(), 14)

+ 101 - 0
amac/amac_private_fund_manager_api.py

@@ -0,0 +1,101 @@
+import json
+import time
+import Utils
+from time import sleep
+from Config import mysql_pool, conn, headers, page, size
+
+print("开始时间(精确到毫秒):", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), time.time() * 1000)
+
+
+# 因为这里数据一条人员信息amac_member_user中包含了对应的多个证书,所以需要先循环把证书拿出来
+def savetodb(data):
+    # 判断是否为空,为空则跳过直接返回
+    if data is None: return
+
+    person_record = []
+    for item in data:
+        # 构建人员信息记录
+        this_data = {
+            'id': item.get("id"),
+            'manager_id': "",
+            'fund_no': item.get("fundNo", ""),
+            'fund_name': item.get("fundName"),
+            'manager_name': item.get("managerName"),
+            'manager_type': item.get("managerType"),
+            'working_state': item.get("workingState"),
+            'put_on_record_date': item.get("putOnRecordDate"),
+            'last_quarter_update': item.get("lastQuarterUpdate"),
+            'is_depute_manage': item.get("isDeputeManage"),
+            'url': item.get("url"),
+            'establish_date': item.get("establishDate"),
+            'manager_url': item.get("managerUrl"),
+            'mandator_name': item.get("mandatorName")
+        }
+
+        managers_info = item.get("managersInfo", [])
+
+        # 找到到对应的managerId
+        if managers_info:
+            for info in managers_info:
+                this_data['manager_id'] = info.get("managerId", "")
+                break
+
+        person_record.append(this_data)
+    # 批量插入
+    mysql_pool.insert('amac_private_fund_manager', person_record)
+
+
+this_page = page
+this_size = size
+payload = json.dumps({})
+
+http_url = "/amac-infodisc/api/pof/fund"
+print(f"请求url={http_url}?rand={Utils.random_float()}&page={this_page}&size={this_size}")
+
+conn.request("POST",
+             f"{http_url}?rand={Utils.random_float()}&page={this_page}&size={this_size}",
+             payload, headers)
+
+res = conn.getresponse()
+data = res.read()
+json_str = data.decode("utf-8")
+
+try:
+    json_obj = json.loads(json_str)
+    print("获取到总条数为:", json_obj.get("totalPages"))
+
+    data_to_insert = json_obj.get("content")
+
+    if data_to_insert is not None:
+        # 先睡一秒,确保峰值不会太高
+        sleep(1)
+
+    print(f"\n开始插入数据==>{this_page}*{this_size}\n")
+    savetodb(data_to_insert)
+
+    if json_obj.get("totalPages") > 1:
+        for i in range(1, json_obj.get("totalPages")):
+            this_page = i
+            print(f"请求url={http_url}?rand={Utils.random_float()}&page={this_page}&size={this_size}")
+            conn.request("POST",
+                         f"{http_url}?rand={Utils.random_float()}&page={this_page}&size={this_size}",
+                         payload, headers)
+            res = conn.getresponse()
+            data = res.read()
+            json_str = data.decode("utf-8")
+            print("JSON String", json_str)
+            try:
+                json_obj_arr = json.loads(json_str)
+                print("获取到总条数为:", json_obj_arr.get("totalPages"))
+                data_to_insert = json_obj_arr.get("content")
+                if data_to_insert is not None:
+                    # 先睡一秒,确保峰值不会太高
+                    time.sleep(1)
+                print(f"\n开始插入数据==>{i}*{this_size}\n")
+                savetodb(data_to_insert)
+            except json.JSONDecodeError as e:
+                print(f"循环 Error decoding JSON: {e}")
+except json.JSONDecodeError as e:
+    print(f"请求 Error decoding JSON: {e}")
+
+print("结束时间(精确到毫秒):", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), time.time() * 1000)

+ 50 - 0
amac/main.py

@@ -0,0 +1,50 @@
+import subprocess
+
+print("执行 开始")
+
+# 定义要运行的 Python 文件列表
+python_files = [
+    # 'member_api.py',  # 会员机构综合查询-会员表
+    # 'person_org_registration_api.py',  # 基金从业人员资格注册信息-人员机构注册表
+    # 'person_org_registration_user_api.py',  # 基金从业人员资格注册信息-人员机构用户信息表、人员机构用户证书表
+    'amac_private_fund_manager_api.py'  # 私募基金管理人基金产品
+]
+
+# 启动所有子进程,并存储它们的引用
+processes = []
+
+for python_file in python_files:
+    # 启动子进程并实时打印输出,指定使用 utf-8 编码
+    process = subprocess.Popen(
+        ['py', python_file],
+        stdout=subprocess.PIPE,
+        stderr=subprocess.PIPE,
+        text=True,  # 确保输出以文本模式读取
+        encoding='utf-8',  # 指定使用 utf-8 编码
+        bufsize=1  # 以行缓冲的模式进行读取
+    )
+    processes.append((python_file, process))
+
+# 逐行读取所有子进程的标准输出和标准错误
+for python_file, process in processes:
+    print(f"开始处理子进程:{python_file}")
+
+    for line in process.stdout:
+        print(f"[{python_file} 输出]: {line}", end='')
+
+    # 读取子进程的错误信息(如果有)
+    for line in process.stderr:
+        print(f"[{python_file} 错误]: {line}", end='')
+
+    # 等待子进程完成
+    process.stdout.close()
+    process.stderr.close()
+    process.wait()
+
+    # 检查返回值
+    if process.returncode == 0:
+        print(f"{python_file} 执行 成功 结束")
+    else:
+        print(f"{python_file} 执行 失败 结束,返回码: {process.returncode}")
+
+print("所有子进程执行 结束")

+ 30 - 0
amac/member.py

@@ -0,0 +1,30 @@
+import requests
+from bs4 import BeautifulSoup
+
+# 发送HTTP GET请求
+url = 'https://gs.amac.org.cn/amac-infodisc/res/pof/member/index.html'  # 注意:可能需要指定到具体的页面或API端点
+response = requests.get(url)
+
+# 检查请求是否成功
+if response.status_code == 200:
+    print("请求成功")
+    # 使用BeautifulSoup解析HTML
+    soup = BeautifulSoup(response.content, 'html.parser')
+
+    # 提取表格数据(这里需要根据实际的HTML结构进行调整)
+    # 假设表格有一个唯一的ID或类名,例如:<table id="table-id">
+    table = soup.find('table', {'id': 'managerList'})  # 替换为实际的ID或类名选择器
+
+    # 提取表格的行和列
+    rows = table.find_all('tr')
+    data = []
+    for row in rows:
+        cols = row.find_all('td')  # 或者'th',根据需要
+        cols = [ele.text.strip() for ele in cols]
+        print(f"-------------rows-------------{cols}")
+        data.append(cols)
+
+        # 打印提取的数据(或者进行其他处理)
+    print(data)
+else:
+    print(f"请求失败,状态码:{response.status_code}")

+ 72 - 0
amac/member_api.py

@@ -0,0 +1,72 @@
+import http.client
+import json
+import random
+
+from Config import mysql_pool
+
+conn = http.client.HTTPSConnection("gs.amac.org.cn")
+
+payload = "{}"
+
+headers = {
+    'Accept': "*/*",
+    'Accept-Encoding': "gzip, deflate, br",
+    'User-Agent': "PostmanRuntime-ApipostRuntime/1.1.0",
+    'Connection': "keep-alive",
+    'Content-Type': "application/json"
+}
+
+http_url = "/amac-infodisc/api/pof/pofMember"
+page = 0
+size = 20
+conn.request("POST", http_url + "?rand=0.680639590677673&page=0&size=20", payload, headers)
+
+res = conn.getresponse()
+data = res.read()
+
+json_str = data.decode("utf-8")
+print("JSON String", json_str)
+
+try:
+    json_obj = json.loads(json_str)
+    print("获取到总条数为:", json_obj.get("totalPages"))
+
+    # 插入数据示例
+    data_to_insert = json_obj.get("content")
+    print("\n开始插入数据==>\n")
+    mysql_pool.insert('amac_member', data_to_insert)
+
+    # # 更新数据示例
+    # update_data = {'column1': 'new_value'}
+    # update_condition = {'id': 1}
+    # mysql_pool.update('your_table', update_data, update_condition)
+
+    # 开始循环
+
+    if (json_obj.get("totalPages") > 1):
+        for i in range(1, json_obj.get("totalPages")):
+            # 生成一个0到1之间的随机浮点数,并格式化为指定的小数位数
+            random_float = round(random.random(), 14)  # 保留14位小数
+            page = i
+            size = 20
+            conn.request("POST",
+                         http_url + "?rand=(" + str(random_float) + ")&page=" + str(page) + "&size=" + str(size),
+                         payload, headers)
+            res = conn.getresponse()
+            data = res.read()
+            json_str = data.decode("utf-8")
+            print("JSON String", json_str)
+            try:
+                json_obj_arr = json.loads(json_str)
+                print("获取到总条数为:", json_obj_arr.get("totalPages"))
+                # 插入数据示例
+                data_to_insert = json_obj_arr.get("content")
+                print("\n开始插入数据==>" + str(i) + "*20\n")
+                mysql_pool.insert('amac_member', data_to_insert)
+
+            except json.JSONDecodeError as e:
+                print(f"Error decoding JSON: {e}")
+
+
+except json.JSONDecodeError as e:
+    print(f"Error decoding JSON: {e}")

+ 78 - 0
amac/person_org_registration_api.py

@@ -0,0 +1,78 @@
+import http.client
+import json
+import random
+
+from Config import mysql_pool, conn, headers, page, size
+
+# 首先查出所有类型 amac_org_type ,然后根据类型循环遍历入库
+amac_org_type = mysql_pool.fetchall("select * from amac_org_type");
+for amac_org_type_item in amac_org_type:
+    org_type_code = amac_org_type_item.get("org_type_code")
+    org_type_name = amac_org_type_item.get("org_type_name")
+    this_page = page
+    this_size = size
+    # org_type_code = "gmjjglgs"
+    # org_type_name = "公募基金管理公司"
+    print("开始处理类型:", org_type_code, org_type_name)
+
+    payload = json.dumps({"page": 0, "orgType": str(org_type_code), "orgName": ""})
+
+    http_url = "/amac-infodisc/api/pof/personOrg"
+
+    # 生成一个0到1之间的随机浮点数,并格式化为指定的小数位数
+    random_float = round(random.random(), 14)  # 保留14位小数
+    print("请求url=", http_url + "?rand=" + str(random_float) + "&page=" + str(this_page) + "&size=" + str(this_size))
+    conn.request("POST",
+                 http_url + "?rand=(" + str(random_float) + ")&page=" + str(this_page) + "&size=" + str(this_size),
+                 payload,
+                 headers)
+
+    res = conn.getresponse()
+
+    data = res.read()
+
+    json_str = data.decode("utf-8")
+    print("JSON String", json_str)
+
+    try:
+        json_obj = json.loads(json_str)
+        print("获取到总条数为:", json_obj.get("totalPages"))
+
+        # 插入数据示例
+        data_to_insert = json_obj.get("content")
+        print("\n开始插入数据==>" + str(this_page) + "*20\n")
+        mysql_pool.insert('amac_person_org_registration', data_to_insert)
+
+        # 开始循环
+        if (json_obj.get("totalPages") > 1):
+            for i in range(1, json_obj.get("totalPages")):
+                this_page = i
+                # # 将 JSON 字符串转换为 Python 字典
+                # data = json.loads(payload)
+                # # 重新给 'page' 字段赋值
+                # data["page"] = page
+                # # 将修改后的字典转换回 JSON 字符串
+                # payload = json.dumps(data)
+                print("请求url=",
+                      http_url + "?rand=" + str(random_float) + "&page=" + str(this_page) + "&size=" + str(this_size))
+                conn.request("POST",
+                             http_url + "?rand=" + str(random_float) + "&page=" + str(this_page) + "&size=" + str(
+                                 this_size),
+                             payload, headers)
+                res = conn.getresponse()
+                data = res.read()
+                json_str = data.decode("utf-8")
+                print("JSON String", json_str)
+                try:
+                    json_obj_arr = json.loads(json_str)
+                    print("获取到总条数为:", json_obj_arr.get("totalPages"))
+                    # 插入数据示例
+                    data_to_insert = json_obj_arr.get("content")
+                    print("\n开始插入数据==>" + str(i) + "*20\n")
+                    mysql_pool.insert('amac_person_org_registration', data_to_insert)
+
+                except json.JSONDecodeError as e:
+                    print(f"循环 Error decoding JSON: {e}")
+
+    except json.JSONDecodeError as e:
+        print(f"请求Error decoding JSON: {e}")

+ 131 - 0
amac/person_org_registration_user_api.py

@@ -0,0 +1,131 @@
+import json
+import random
+import time
+
+from Config import mysql_pool, conn, headers, page, size
+
+print("开始时间(精确到毫秒):", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), time.time() * 1000)
+
+
+# 因为这里数据一条人员信息amac_member_user中包含了对应的多个证书,所以需要先循环把证书拿出来
+def savetodb(data):
+    # 判断是否为空,为空则跳过直接返回
+    if data is None: return
+
+    person_record = []
+    cert_history_record = []
+    for item in data:
+        # 构建人员信息记录
+        person_record.append({
+            'account_id': item.get("accountId"),
+            'user_id': item.get("userId"),
+            'user_name': item.get("userName"),
+            'sex': item.get("sex"),
+            'org_name': item.get("orgName"),
+            'own_org_name': item.get("ownOrgName"),
+            'cert_code': item.get("certCode"),
+            'cert_name': item.get("certName"),
+            'education': item.get("education"),
+            'education_name': item.get("educationName"),
+            'cert_obtain_date': item.get("certObtainDate"),
+            'cert_end_date': item.get("certEndDate"),
+            'cert_status_change_times': item.get("certStatusChangeTimes"),
+            'credit_record_num': item.get("creditRecordNum"),
+            'status': item.get("status"),
+            'status_name': item.get("statusName"),
+            'office_state': item.get("officeState"),
+            'removed': item.get("removed"),
+            'biz_id': item.get("bizId"),
+            'apply_id': item.get("applyId"),
+            'apply_status': item.get("applyStatus"),
+            'person_photo_base64': item.get("personPhotoBase64")
+        })
+
+        # 获取当前人员的 user_id
+        user_id = item.get("userId")
+        person_cert_history_list = item.get("personCertHistoryList", [])
+        # 判断是否为空,为空则跳过直接返回
+        if person_cert_history_list is None: continue
+        if person_cert_history_list:
+            for person_cert_history_item in person_cert_history_list:
+                cert_history_record.append({
+                    'id': person_cert_history_item.get('id'),
+                    'user_id': person_cert_history_item.get('userId', user_id),
+                    'org_name': person_cert_history_item.get('orgName'),
+                    'cert_code': person_cert_history_item.get('certCode'),
+                    'cert_name': person_cert_history_item.get('certName'),
+                    'cert_obtain_date': person_cert_history_item.get('certObtainDate'),
+                    'cert_end_date': person_cert_history_item.get('certEndDate'),
+                    'status_name': person_cert_history_item.get('statusName'),
+                    'status': person_cert_history_item.get('status'),
+                    'creation_date': person_cert_history_item.get('creationDate'),
+                    'qlf_id': person_cert_history_item.get('qlfId'),
+                    'size': person_cert_history_item.get('size')
+                })
+
+    # 批量插入
+    mysql_pool.insert('amac_member_user', person_record)
+    mysql_pool.insert('amac_member_user_person_cert', cert_history_record)
+
+
+# 开始获取amac_person_org_registration表中的数据并进行数据抓取
+amac_org_type = mysql_pool.fetchall("select * from amac_person_org_registration")
+# amac_org_type = [{"user_id": "2309140927320704"}]
+
+for amac_org_type_item in amac_org_type:
+    user_id = amac_org_type_item.get("user_id")
+    this_page = page
+    this_size = size
+    print(f"开始处理人员查询:{user_id}")
+    payload = json.dumps(
+        {"userName": "", "certCode": "", "certName": "", "userId": str(user_id), "page": "1"})
+
+    http_url = "/amac-infodisc/api/pof/person"
+    random_float = round(random.random(), 14)
+    print(f"请求url={http_url}?rand={random_float}&page={this_page}&size={this_size}")
+
+    conn.request("POST",
+                 f"{http_url}?rand={random_float}&page={this_page}&size={this_size}",
+                 payload, headers)
+
+    res = conn.getresponse()
+    data = res.read()
+    json_str = data.decode("utf-8")
+    # print("JSON String", json_str)
+
+    try:
+        json_obj = json.loads(json_str)
+        print("获取到总条数为:", json_obj.get("totalPages"))
+
+        data_to_insert = json_obj.get("content")
+
+        if data_to_insert is not None:
+            # 先睡一秒,确保峰值不会太高
+            time.sleep(1)
+
+        print(f"\n开始插入数据==>{this_page}*{this_size}\n")
+        savetodb(data_to_insert)
+
+        if json_obj.get("totalPages") > 1:
+            for i in range(1, json_obj.get("totalPages")):
+                this_page = i
+                print(f"请求url={http_url}?rand={random_float}&page={this_page}&size={this_size}")
+                conn.request("POST",
+                             f"{http_url}?rand={random_float}&page={this_page}&size={this_size}",
+                             payload, headers)
+                res = conn.getresponse()
+                data = res.read()
+                json_str = data.decode("utf-8")
+                print("JSON String", json_str)
+                try:
+                    json_obj_arr = json.loads(json_str)
+                    print("获取到总条数为:", json_obj_arr.get("totalPages"))
+                    data_to_insert = json_obj_arr.get("content")
+                    print(f"\n开始插入数据==>{i}*{this_size}\n")
+                    savetodb(data_to_insert)
+                except json.JSONDecodeError as e:
+                    print(f"循环 Error decoding JSON: {e}")
+    except json.JSONDecodeError as e:
+        print(f"请求 Error decoding JSON: {e}")
+
+    print("结束时间(精确到毫秒):", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), time.time() * 1000)