MySQLPool.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. import pymysql
  2. import Utils
  3. import re
  4. from dbutils.pooled_db import PooledDB
  5. class MySQLPool:
  6. def __init__(self, host, user, password, database, mincached=1, maxcached=10):
  7. self.pool = PooledDB(
  8. creator=pymysql, # 使用 pymysql 作为连接数据库的驱动
  9. host=host,
  10. user=user,
  11. password=password,
  12. database=database,
  13. mincached=mincached, # 初始化时创建的空连接数量
  14. maxcached=maxcached, # 连接池中最多闲置的连接数量
  15. cursorclass=pymysql.cursors.DictCursor # 返回字典格式的查询结果
  16. )
  17. def execute(self, sql, params=None):
  18. """ 执行SQL语句 """
  19. conn = self.pool.connection()
  20. try:
  21. with conn.cursor() as cursor:
  22. cursor.execute(sql, params)
  23. conn.commit()
  24. except Exception as e:
  25. message = f"Error executing SQL: {e}"
  26. print(message)
  27. Utils.write_log('error.log', message)
  28. conn.rollback()
  29. finally:
  30. conn.close()
  31. def fetchall(self, sql, params=None):
  32. """ 查询多条记录 """
  33. conn = self.pool.connection()
  34. try:
  35. with conn.cursor() as cursor:
  36. cursor.execute(sql, params)
  37. return cursor.fetchall()
  38. except Exception as e:
  39. message = f"Error fetching data: {e}"
  40. print(message)
  41. Utils.write_log('error.log', message)
  42. return []
  43. finally:
  44. conn.close()
  45. def insert(self, table, data):
  46. """
  47. 批量插入数据
  48. :param table: 表名
  49. :param data: 列表,每个元素是一个字典,表示一行数据
  50. """
  51. if not data:
  52. return
  53. columns = ', '.join(data[0].keys())
  54. columns = Utils.convert_columns_to_snake_case(columns)
  55. values_placeholder = ', '.join(['%s'] * len(data[0]))
  56. values = [tuple(item.values()) for item in data]
  57. sql = f"INSERT INTO {table} ({columns}) VALUES ({values_placeholder})"
  58. print("行数据sql=>", sql)
  59. conn = self.pool.connection()
  60. try:
  61. with conn.cursor() as cursor:
  62. cursor.executemany(sql, values)
  63. conn.commit()
  64. except Exception as e:
  65. message = f"Error inserting data: {e}"
  66. print(message)
  67. Utils.write_log('error.log', message)
  68. conn.rollback()
  69. finally:
  70. conn.close()
  71. def update(self, table, data, condition):
  72. """
  73. 批量更新数据
  74. :param table: 表名
  75. :param data: 字典,更新的字段和值
  76. :param condition: 字典,更新条件
  77. """
  78. set_clause = ', '.join([f"{key} = %s" for key in data.keys()])
  79. condition_clause = ' AND '.join([f"{key} = %s" for key in condition.keys()])
  80. values = list(data.values()) + list(condition.values())
  81. sql = f"UPDATE {table} SET {set_clause} WHERE {condition_clause}"
  82. conn = self.pool.connection()
  83. try:
  84. with conn.cursor() as cursor:
  85. cursor.execute(sql, values)
  86. conn.commit()
  87. except Exception as e:
  88. message = f"Error updating data: {e}"
  89. print(message)
  90. Utils.write_log('error.log', message)
  91. conn.rollback()
  92. finally:
  93. conn.close()