import pymysql import Utils import re from dbutils.pooled_db import PooledDB class MySQLPool: def __init__(self, host, user, password, database, mincached=1, maxcached=10): self.pool = PooledDB( creator=pymysql, # 使用 pymysql 作为连接数据库的驱动 host=host, user=user, password=password, database=database, mincached=mincached, # 初始化时创建的空连接数量 maxcached=maxcached, # 连接池中最多闲置的连接数量 cursorclass=pymysql.cursors.DictCursor # 返回字典格式的查询结果 ) def execute(self, sql, params=None): """ 执行SQL语句 """ conn = self.pool.connection() try: with conn.cursor() as cursor: cursor.execute(sql, params) conn.commit() except Exception as e: message = f"Error executing SQL: {e}" print(message) Utils.write_log('error.log', message) conn.rollback() finally: conn.close() def fetchall(self, sql, params=None): """ 查询多条记录 """ conn = self.pool.connection() try: with conn.cursor() as cursor: cursor.execute(sql, params) return cursor.fetchall() except Exception as e: message = f"Error fetching data: {e}" print(message) Utils.write_log('error.log', message) return [] finally: conn.close() def insert(self, table, data): """ 批量插入数据 :param table: 表名 :param data: 列表,每个元素是一个字典,表示一行数据 """ if not data: return columns = ', '.join(data[0].keys()) columns = Utils.convert_columns_to_snake_case(columns) values_placeholder = ', '.join(['%s'] * len(data[0])) values = [tuple(item.values()) for item in data] sql = f"INSERT INTO {table} ({columns}) VALUES ({values_placeholder})" print("行数据sql=>", sql) conn = self.pool.connection() try: with conn.cursor() as cursor: cursor.executemany(sql, values) conn.commit() except Exception as e: message = f"Error inserting data: {e}" print(message) Utils.write_log('error.log', message) conn.rollback() finally: conn.close() def update(self, table, data, condition): """ 批量更新数据 :param table: 表名 :param data: 字典,更新的字段和值 :param condition: 字典,更新条件 """ set_clause = ', '.join([f"{key} = %s" for key in data.keys()]) condition_clause = ' AND '.join([f"{key} = %s" for key in condition.keys()]) values = list(data.values()) + list(condition.values()) sql = f"UPDATE {table} SET {set_clause} WHERE {condition_clause}" conn = self.pool.connection() try: with conn.cursor() as cursor: cursor.execute(sql, values) conn.commit() except Exception as e: message = f"Error updating data: {e}" print(message) Utils.write_log('error.log', message) conn.rollback() finally: conn.close()