| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293 |
- import pymysql
- import Utils
- import re
- from dbutils.pooled_db import PooledDB
- class MySQLPool:
- def __init__(self, host, user, password, database, mincached=1, maxcached=10):
- self.pool = PooledDB(
- creator=pymysql, # 使用 pymysql 作为连接数据库的驱动
- host=host,
- user=user,
- password=password,
- database=database,
- mincached=mincached, # 初始化时创建的空连接数量
- maxcached=maxcached, # 连接池中最多闲置的连接数量
- cursorclass=pymysql.cursors.DictCursor # 返回字典格式的查询结果
- )
- def execute(self, sql, params=None):
- """ 执行SQL语句 """
- conn = self.pool.connection()
- try:
- with conn.cursor() as cursor:
- cursor.execute(sql, params)
- conn.commit()
- except Exception as e:
- print(f"Error executing SQL: {e}")
- conn.rollback()
- finally:
- conn.close()
- def fetchall(self, sql, params=None):
- """ 查询多条记录 """
- conn = self.pool.connection()
- try:
- with conn.cursor() as cursor:
- cursor.execute(sql, params)
- return cursor.fetchall()
- except Exception as e:
- print(f"Error fetching data: {e}")
- return []
- finally:
- conn.close()
- def insert(self, table, data):
- """
- 批量插入数据
- :param table: 表名
- :param data: 列表,每个元素是一个字典,表示一行数据
- """
- if not data:
- return
- columns = ', '.join(data[0].keys())
- columns = Utils.convert_columns_to_snake_case(columns)
- values_placeholder = ', '.join(['%s'] * len(data[0]))
- values = [tuple(item.values()) for item in data]
- sql = f"INSERT INTO {table} ({columns}) VALUES ({values_placeholder})"
- print("行数据sql=>", sql)
- conn = self.pool.connection()
- try:
- with conn.cursor() as cursor:
- cursor.executemany(sql, values)
- conn.commit()
- except Exception as e:
- print(f"Error inserting data: {e}")
- conn.rollback()
- finally:
- conn.close()
- def update(self, table, data, condition):
- """
- 批量更新数据
- :param table: 表名
- :param data: 字典,更新的字段和值
- :param condition: 字典,更新条件
- """
- set_clause = ', '.join([f"{key} = %s" for key in data.keys()])
- condition_clause = ' AND '.join([f"{key} = %s" for key in condition.keys()])
- values = list(data.values()) + list(condition.values())
- sql = f"UPDATE {table} SET {set_clause} WHERE {condition_clause}"
- conn = self.pool.connection()
- try:
- with conn.cursor() as cursor:
- cursor.execute(sql, values)
- conn.commit()
- except Exception as e:
- print(f"Error updating data: {e}")
- conn.rollback()
- finally:
- conn.close()
|