13.4. python操作mysql数据库封装类和方法¶
13.4.1. 封装类方法 1¶
- 创建mysql_env.py文件
#!/usr/bin/env python
# -*- coding:utf8 -*-
mysql_info = {
"host": "127.0.0.1",
"user": "hujianli",
"passwd": "123.com",
"dbName": "school"
}
#!/usr/bin/env python
# -*- coding: utf-8 -*-
__author__ = 'xiaojian'
import pymysql
from mysql_env import mysql_info
class Mysql_SQL():
def __init__(self, host, user, passwd, dbName):
self.host = host
self.user = user
self.passwd = passwd
self.dbName = dbName
def connet(self):
self.db = pymysql.connect(self.host, self.user,
self.passwd, self.dbName)
self.cursor = self.db.cursor()
def close(self):
self.cursor.close()
self.db.close()
def get_one(self, sql):
rest = None
try:
self.connet()
self.cursor.execute(sql)
res1 = self.cursor.fetchone()
title = self.cursor.description
# print(res)
# print(title)
rest = dict(zip([k[0] for k in title], res1))
self.close()
except:
print("查询数据失败")
return rest
def get_all(self, sql):
res1 = ()
try:
self.connet()
self.cursor.execute(sql)
res = self.cursor.fetchall()
title = self.cursor.description
rest1 = [dict(zip([k[0] for k in title], row)) for row in res]
self.close()
except:
print("查询数据失败")
return rest1
def insert(self, sql):
return self.__edit_one(sql)
def update(self, sql):
return self.__edit_one(sql)
def delete(self, sql):
return self.__edit_one(sql)
def __edit_one(self, sql):
"""
#准备SQL
#获取链接和cursor
#提交数据到数据库
#提交事务
#关闭cursor和链接
:param sql:
:return:
"""
count = 0
try:
# 连接数据库
self.connet()
count = self.cursor.execute(sql)
# 提交事务
self.db.commit()
# 关闭数据库
self.close()
print("数据库语句执行完毕!")
except:
print("事务提交失败!")
self.db.rollback()
return count
if __name__ == '__main__':
hu_mysql = Mysql_SQL(**mysql_info)
hu_mysql.connet()
# get_one = hu_mysql.get_one("select * from students1;")
# print(get_one)
# find_all = hu_mysql.get_all("select * from students1;")
# for i in find_all:
# print(i)
# sql = 'INSERT INTO students1 (`name`,`nickname`,`sex`) VALUES ("hu2","xiaojian2","男");'
# hu_mysql.insert(sql)
sql = 'INSERT INTO students1 (`name`,`nickname`,`sex`) VALUES ({},{},{});'
sql = sql.format('"hu3"','"xiaojian3"','"男"')
hu_mysql.insert(sql)
使用mysql封装类中的方法¶
#!/usr/bin/env python
# -*- coding: utf-8 -*-
__author__ = 'xiaojian'
from Mysql_base_class import Mysql_SQL
hu = Mysql_SQL("192.168.2.122","root","123456","ttmgrportal")
res = hu.get_all("select * from student4 where money>100")
for row in res:
print("%d -- %d" % (row[0], row[1]))
13.4.2. 封装类方法 2¶
#!/usr/bin/env python
#-*- coding:utf8 -*-
#封装类
# 导入mysql模块
from pymysql import *
class MysqlPython:
def __init__(self, database, # 库
host="127.0.0.1", # ip地址
user="root", # 用户名
password="123456", # 密码
port=3306, # 端口
charset="utf8"): # 字符集
self.host = host
self.database = database
self.user = user
self.password = password
self.port = port
self.charset = charset
def open(self): # 创建数据库链接函数
self.db = connect(host=self.host,
database=self.database,
user=self.user,
password=self.password,
port=self.port,
charset=self.charset)
self.cur = self.db.cursor() # 创建游标对象
def close(self): # 创建断开数据库链接 关闭游标函数
self.cur.close()
self.db.close()
def zhixing(self, sql, L=[]): # 创建pymysql.execute() 方法函数
try:
self.open() # 链接数据库
self.cur.execute(sql, L) # 参数化执行SQL命令
self.db.commit() # 提交数据
print("ok")
except Exception as e:
self.db.rollback() # 出错取消提交
print("Failed", e)
self.close() # 断开数据库链接 关闭游标
def all(self, sql, L=[]):
try:
self.open()
self.cur.execute(sql, L)
result = self.cur.fetchall()
return result
except Exception as e:
print("Failed", e)
self.close()
13.4.3. 封装类方法3¶
#!/usr/bin/python3
# -*- coding:utf-8 -*-
import pymysql
import os
import configparser
from pymysql.cursors import DictCursor
from DBUtils.PooledDB import PooledDB
class Config(object):
"""
# Config().get_content("user_information")
配置文件里面的参数
[dbMysql]
host = 192.168.1.180
port = 3306
user = root
password = 123456
"""
def __init__(self, config_filename="dbMysqlConfig.cnf"):
file_path = os.path.join(os.path.dirname(__file__), config_filename)
self.cf = configparser.ConfigParser()
self.cf.read(file_path)
def get_sections(self):
return self.cf.sections()
def get_options(self, section):
return self.cf.options(section)
def get_content(self, section):
result = {}
for option in self.get_options(section):
value = self.cf.get(section, option)
result[option] = int(value) if value.isdigit() else value
return result
class BasePymysqlPool(object):
def __init__(self, host, port, user, password, db_name):
self.db_host = host
self.db_port = int(port)
self.user = user
self.password = str(password)
self.db = db_name
self.conn = None
self.cursor = None
class MyPymysqlPool(BasePymysqlPool):
"""
MYSQL数据库对象,负责产生数据库连接 , 此类中的连接采用连接池实现
获取连接对象:conn = Mysql.getConn()
释放连接对象;conn.close()或del conn
"""
# 连接池对象
__pool = None
def __init__(self, conf_name=None):
self.conf = Config().get_content(conf_name)
super(MyPymysqlPool, self).__init__(**self.conf)
# 数据库构造函数,从连接池中取出连接,并生成操作游标
self._conn = self.__getConn()
self._cursor = self._conn.cursor()
def __getConn(self):
"""
@summary: 静态方法,从连接池中取出连接
@return MySQLdb.connection
"""
if MyPymysqlPool.__pool is None:
__pool = PooledDB(creator=pymysql,
mincached=1,
maxcached=20,
host=self.db_host,
port=self.db_port,
user=self.user,
passwd=self.password,
db=self.db,
use_unicode=True,
charset="utf8",
cursorclass=DictCursor)
print("12211212")
return __pool.connection()
def getAll(self, sql, param=None):
"""
@summary: 执行查询,并取出所有结果集
@param sql:查询SQL,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
@param param: 可选参数,条件列表值(元组/列表)
@return: result list(字典对象)/boolean 查询到的结果集
"""
if param is None:
count = self._cursor.execute(sql)
else:
count = self._cursor.execute(sql, param)
if count > 0:
result = self._cursor.fetchall()
else:
result = False
return result
def getOne(self, sql, param=None):
"""
@summary: 执行查询,并取出第一条
@param sql:查询SQL,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
@param param: 可选参数,条件列表值(元组/列表)
@return: result list/boolean 查询到的结果集
"""
if param is None:
count = self._cursor.execute(sql)
else:
count = self._cursor.execute(sql, param)
if count > 0:
result = self._cursor.fetchone()
else:
result = False
return result
def getMany(self, sql, num, param=None):
"""
@summary: 执行查询,并取出num条结果
@param sql:查询SQL,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
@param num:取得的结果条数
@param param: 可选参数,条件列表值(元组/列表)
@return: result list/boolean 查询到的结果集
"""
if param is None:
count = self._cursor.execute(sql)
else:
count = self._cursor.execute(sql, param)
if count > 0:
result = self._cursor.fetchmany(num)
else:
result = False
return result
def insertMany(self, sql, values):
"""
@summary: 向数据表插入多条记录
@param sql:要插入的SQL格式
@param values:要插入的记录数据tuple(tuple)/list[list]
@return: count 受影响的行数
"""
count = self._cursor.executemany(sql, values)
return count
def __query(self, sql, param=None):
if param is None:
count = self._cursor.execute(sql)
else:
count = self._cursor.execute(sql, param)
return count
def update(self, sql, param=None):
"""
@summary: 更新数据表记录
@param sql: SQL格式及条件,使用(%s,%s)
@param param: 要更新的 值 tuple/list
@return: count 受影响的行数
"""
return self.__query(sql, param)
def insert(self, sql, param=None):
"""
@summary: 更新数据表记录
@param sql: SQL格式及条件,使用(%s,%s)
@param param: 要更新的 值 tuple/list
@return: count 受影响的行数
"""
return self.__query(sql, param)
def delete(self, sql, param=None):
"""
@summary: 删除数据表记录
@param sql: SQL格式及条件,使用(%s,%s)
@param param: 要删除的条件 值 tuple/list
@return: count 受影响的行数
"""
return self.__query(sql, param)
def begin(self):
"""
@summary: 开启事务
"""
self._conn.autocommit(0)
def end(self, option='commit'):
"""
@summary: 结束事务
"""
if option == 'commit':
self._conn.commit()
else:
self._conn.rollback()
def dispose(self, isEnd=1):
"""
@summary: 释放连接池资源
"""
if isEnd == 1:
self.end('commit')
else:
self.end('rollback')
self._cursor.close()
self._conn.close()
if __name__ == '__main__':
mysql = MyPymysqlPool("dbMysql")
sqlAll = "select * from seckill;"
result = mysql.getAll(sqlAll)
print(result)
# 释放资源
mysql.dispose()
13.4.4. 封装数据库4¶
#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2019/12/18 20:09
# filename: 03.连接mysql的封装.py
import pymysql
class dbHelper:
def __init__(self, host, user, password, port, database):
self.host = host
self.user = user
self.passsword = password
self.port = port
self.database = database
# 连接
def connect(self):
self.conn = pymysql.connect(host=self.host, user=self.user, password=self.passsword, port=self.port,
database=self.database)
self.cursor = self.conn.cursor()
# 关闭
def close(self):
self.cursor.close()
self.conn.close()
# 封装增删改
def __oper(self, sql, params):
row = 0
try:
self.connect()
row = self.cursor.execute(sql, params)
self.conn.commit()
self.close()
except Exception as e:
# 出现错误进行回滚
self.conn.rollback()
return row
def insert(self, sql, params=[]):
"""
:param sql:
:param params:
:return: 增
"""
return self.__oper(sql, params)
def delete(self, sql, params=[]):
"""
:param sql:
:param params:
:return: 删
"""
return self.__oper(sql, params)
def update(self, sql, params=[]):
"""
:param sql:
:param params:
:return: 改
"""
return self.__oper(sql, params)
def find_one(self, sql, params):
"""
:param sql:
:param params:
:return: 查询单条记录
"""
data = None
try:
self.connect()
self.cursor.execute(sql, params)
data = self.cursor.fetchone()
self.close()
except Exception as e:
print(e)
return data
def find_all(self, sql, params=[]):
"""
:param sql:
:param params:
:return: 查询多条记录
"""
datas = None
try:
self.connect()
self.cursor.execute(sql, params)
datas = self.cursor.fetchall()
self.close()
except Exception as e:
print(e)
return datas
if __name__ == '__main__':
dbinfo = dbHelper("127.0.0.1", "root", "admin#123", "3306", "students")
13.4.5. 创建class的DAO类方式¶
config.ini
;数据库设置
[db]
host = 127.0.0.1
port = 3306
user = root
password = admin#123
database = petstore
charset = utf8
base_dao.py
# coding=utf-8
# 代码文件:chapter22/PetStore/com/zhijieketang/petstore/dao/base_dao.py
"""定义DAO基类"""
import pymysql
import configparser
class BaseDao(object):
def __init__(self):
self.config = configparser.ConfigParser()
self.config.read('config.ini', encoding='utf-8')
host = self.config['db']['host']
user = self.config['db']['user']
# 读取整数port数据
port = self.config.getint('db', 'port')
password = self.config['db']['password']
database = self.config['db']['database']
charset = self.config['db']['charset']
self.conn = pymysql.connect(host=host,
user=user,
port=port,
password=password,
database=database,
charset=charset)
def close(self):
"""关闭数据库连接"""
self.conn.close()
使用dao类
# coding=utf-8
# 代码文件:chapter22/PetStore/com/zhijieketang/petstore/dao/account_dao.py
"""商品管理DAO"""
from com.zhijieketang.petstore.dao.base_dao import BaseDao
class ProductDao(BaseDao):
def __init__(self):
super().__init__()
def findall(self):
"""查询所有商品信息"""
products = []
try:
# 2. 创建游标对象
with self.conn.cursor() as cursor:
# 3. 执行SQL操作
sql = 'select productid,category,cname,ename,image,listprice,unitcost,descn ' \
'from products'
cursor.execute(sql)
# 4. 提取结果集
result_set = cursor.fetchall()
for row in result_set:
product = {}
product['productid'] = row[0]
product['category'] = row[1]
product['cname'] = row[2]
product['ename'] = row[3]
product['image'] = row[4]
product['listprice'] = row[5]
product['unitcost'] = row[6]
product['descn'] = row[7]
products.append(product)
# with代码块结束 5. 关闭游标
finally:
# 6. 关闭数据连接
self.close()
return products
def findbycat(self, catname):
"""按照商品类别查询商品"""
products = []
try:
# 2. 创建游标对象
with self.conn.cursor() as cursor:
# 3. 执行SQL操作
sql = 'select productid,category,cname,ename,image,listprice,unitcost,descn ' \
'from products where category=%s'
cursor.execute(sql, catname)
# 4. 提取结果集
result_set = cursor.fetchall()
for row in result_set:
product = {}
product['productid'] = row[0]
product['category'] = row[1]
product['cname'] = row[2]
product['ename'] = row[3]
product['image'] = row[4]
product['listprice'] = row[5]
product['unitcost'] = row[6]
product['descn'] = row[7]
products.append(product)
# with代码块结束 5. 关闭游标
finally:
# 6. 关闭数据连接
self.close()
return products
def findbyid(self, productid):
"""按照商品id查询商品"""
product = None
try:
# 2. 创建游标对象
with self.conn.cursor() as cursor:
# 3. 执行SQL操作
sql = 'select productid,category,cname,ename,image,listprice,unitcost,descn' \
' from products where productid=%s'
cursor.execute(sql, productid)
# 4. 提取结果集
row = cursor.fetchone()
if row is not None:
product = {}
product['productid'] = row[0]
product['category'] = row[1]
product['cname'] = row[2]
product['ename'] = row[3]
product['image'] = row[4]
product['listprice'] = row[5]
product['unitcost'] = row[6]
product['descn'] = row[7]
# with代码块结束 5. 关闭游标
finally:
# 6. 关闭数据连接
self.close()
return product
代码片段
import pymysql
class MengSql():
def __init__(self, host, user, passwd, dbName):
self.host = host
self.user = user
self.passwd = passwd
self.dbName = dbName
def connet(self):
self.db = pymysql.connect(self.host, self.user, self.passwd, self.dbName)
self.cursor = self.db.cursor()
def close(self):
self.cursor.close()
self.db.close()
def get_one(self, sql):
res = None
try:
self.connet()
self.cursor.execute(sql)
res = self.cursor.fetchone()
self.close()
except:
print("查询失败")
return res
def get_all(self, sql):
res = ()
try:
self.connet()
self.cursor.execute(sql)
res = self.cursor.fetchall()
self.close()
except:
print("查询失败")
return res
def insert(self, sql):
return self.__edit(sql)
def update(self, sql):
return self.__edit(sql)
def delete(self, sql):
return self.__edit(sql)
def __edit(self, sql):
count = 0
try:
self.connet()
count = self.cursor.execute(sql)
self.db.commit()
self.close()
except:
print("事物提交失败")
self.db.rollback()
return count
eg
#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2019/8/19 9:39
# filename: 02.连接mysql数据库的封装.py
"""
安装pymysql数据库驱动程序:
pip install pymysql
运行完毕 查看是否成功 pip -m list
"""
import time
import pymysql
# import decimal
class MSSQL:
def __init__(self, host, user, pwd, db):
self.host = host
self.user = user
self.pwd = pwd
self.db = db
def GetConnect(self):
if not self.db:
raise (NameError, '没有目标数据库')
self.connect = pymysql.connect(host=self.host, user=self.user, password=self.pwd, database=self.db,
charset='utf8')
cur = self.connect.cursor()
if not cur:
raise (NameError, '数据库访问失败')
else:
return cur
def ExecSql(self, sql):
cur = self.GetConnect()
cur.execute(sql)
self.connect.commit()
self.connect.close()
def ExecQuery(self, sql):
cur = self.GetConnect()
cur.execute(sql)
resList = cur.fetchall()
self.connect.close()
return resList
def main():
ms = MSSQL(host="192.168.0.108", user="sa", pwd="sa", db="ComPrject")
resList = ms.ExecQuery("select *from TestModel")
print(resList)
if __name__ == '__main__':
main()
input("执行完成:")
参考文献
https://www.jb51.net/article/76231.htm https://www.jb51.net/article/45077.htm