13.4. python操作mysql数据库封装类和方法

13.4.1. 封装类方法 1

- 创建mysql_env.py文件

#!/usr/bin/env python
# -*- coding:utf8 -*-

mysql_info = {
     "host": "127.0.0.1",
     "user": "hujianli",
     "passwd": "123.com",
     "dbName": "school"
}
#!/usr/bin/env python
# -*- coding: utf-8 -*-
__author__ = 'xiaojian'
import pymysql
from mysql_env import mysql_info


class Mysql_SQL():
    def __init__(self, host, user, passwd, dbName):
        self.host = host
        self.user = user
        self.passwd = passwd
        self.dbName = dbName

    def connet(self):
        self.db = pymysql.connect(self.host, self.user,
                                  self.passwd, self.dbName)
        self.cursor = self.db.cursor()

    def close(self):
        self.cursor.close()
        self.db.close()

    def get_one(self, sql):
        rest = None
        try:
            self.connet()
            self.cursor.execute(sql)
            res1 = self.cursor.fetchone()
            title = self.cursor.description
            # print(res)
            # print(title)
            rest = dict(zip([k[0] for k in title], res1))
            self.close()
        except:
            print("查询数据失败")
        return rest

    def get_all(self, sql):
        res1 = ()
        try:
            self.connet()
            self.cursor.execute(sql)
            res = self.cursor.fetchall()
            title = self.cursor.description
            rest1 = [dict(zip([k[0] for k in title], row)) for row in res]
            self.close()
        except:
            print("查询数据失败")
        return rest1

    def insert(self, sql):
        return self.__edit_one(sql)

    def update(self, sql):
        return self.__edit_one(sql)

    def delete(self, sql):
        return self.__edit_one(sql)

    def __edit_one(self, sql):
        """
        #准备SQL
        #获取链接和cursor
        #提交数据到数据库
        #提交事务
        #关闭cursor和链接
        :param sql:
        :return:
        """
        count = 0
        try:
            # 连接数据库
            self.connet()
            count = self.cursor.execute(sql)
            # 提交事务
            self.db.commit()
            # 关闭数据库
            self.close()
            print("数据库语句执行完毕!")
        except:
            print("事务提交失败!")
            self.db.rollback()
        return count


if __name__ == '__main__':
    hu_mysql = Mysql_SQL(**mysql_info)
    hu_mysql.connet()
    # get_one = hu_mysql.get_one("select * from students1;")
    # print(get_one)

    # find_all = hu_mysql.get_all("select * from students1;")
    # for i in find_all:
    #     print(i)
    # sql = 'INSERT INTO students1 (`name`,`nickname`,`sex`) VALUES ("hu2","xiaojian2","男");'
    # hu_mysql.insert(sql)

    sql = 'INSERT INTO students1 (`name`,`nickname`,`sex`) VALUES ({},{},{});'
    sql = sql.format('"hu3"','"xiaojian3"','"男"')
    hu_mysql.insert(sql)

使用mysql封装类中的方法

#!/usr/bin/env python
# -*- coding: utf-8 -*-
__author__ = 'xiaojian'
from Mysql_base_class import Mysql_SQL

hu = Mysql_SQL("192.168.2.122","root","123456","ttmgrportal")

res = hu.get_all("select * from student4 where money>100")
for row in res:
    print("%d -- %d" % (row[0], row[1]))

13.4.2. 封装类方法 2

#!/usr/bin/env python
#-*- coding:utf8 -*-
#封装类

# 导入mysql模块
from pymysql import *


class MysqlPython:
    def __init__(self, database,  # 库
                 host="127.0.0.1",  # ip地址
                 user="root",  # 用户名
                 password="123456",  # 密码
                 port=3306,  # 端口
                 charset="utf8"):  # 字符集
        self.host = host
        self.database = database
        self.user = user
        self.password = password
        self.port = port
        self.charset = charset

    def open(self):  # 创建数据库链接函数
        self.db = connect(host=self.host,
                          database=self.database,
                          user=self.user,
                          password=self.password,
                          port=self.port,
                          charset=self.charset)
        self.cur = self.db.cursor()  # 创建游标对象

    def close(self):  # 创建断开数据库链接 关闭游标函数
        self.cur.close()
        self.db.close()

    def zhixing(self, sql, L=[]):  # 创建pymysql.execute() 方法函数
        try:
            self.open()  # 链接数据库
            self.cur.execute(sql, L)  # 参数化执行SQL命令
            self.db.commit()  # 提交数据
            print("ok")
        except Exception as e:
            self.db.rollback()  # 出错取消提交
            print("Failed", e)
        self.close()  # 断开数据库链接 关闭游标

    def all(self, sql, L=[]):
        try:
            self.open()
            self.cur.execute(sql, L)
            result = self.cur.fetchall()
            return result
        except Exception as e:
            print("Failed", e)
        self.close()

13.4.3. 封装类方法3

#!/usr/bin/python3
# -*- coding:utf-8 -*-
import pymysql
import os
import configparser
from pymysql.cursors import DictCursor
from DBUtils.PooledDB import PooledDB


class Config(object):
    """
    # Config().get_content("user_information")
    配置文件里面的参数
    [dbMysql]
    host = 192.168.1.180
    port = 3306
    user = root
    password = 123456
    """

    def __init__(self, config_filename="dbMysqlConfig.cnf"):
        file_path = os.path.join(os.path.dirname(__file__), config_filename)
        self.cf = configparser.ConfigParser()
        self.cf.read(file_path)

    def get_sections(self):
        return self.cf.sections()

    def get_options(self, section):
        return self.cf.options(section)

    def get_content(self, section):
        result = {}
        for option in self.get_options(section):
            value = self.cf.get(section, option)
            result[option] = int(value) if value.isdigit() else value
        return result


class BasePymysqlPool(object):
    def __init__(self, host, port, user, password, db_name):
        self.db_host = host
        self.db_port = int(port)
        self.user = user
        self.password = str(password)
        self.db = db_name
        self.conn = None
        self.cursor = None


class MyPymysqlPool(BasePymysqlPool):
    """
    MYSQL数据库对象,负责产生数据库连接 , 此类中的连接采用连接池实现
        获取连接对象:conn = Mysql.getConn()
        释放连接对象;conn.close()或del conn
    """
    # 连接池对象
    __pool = None

    def __init__(self, conf_name=None):
        self.conf = Config().get_content(conf_name)
        super(MyPymysqlPool, self).__init__(**self.conf)
        # 数据库构造函数,从连接池中取出连接,并生成操作游标
        self._conn = self.__getConn()
        self._cursor = self._conn.cursor()

    def __getConn(self):
        """
        @summary: 静态方法,从连接池中取出连接
        @return MySQLdb.connection
        """
        if MyPymysqlPool.__pool is None:
            __pool = PooledDB(creator=pymysql,
                              mincached=1,
                              maxcached=20,
                              host=self.db_host,
                              port=self.db_port,
                              user=self.user,
                              passwd=self.password,
                              db=self.db,
                              use_unicode=True,
                              charset="utf8",
                              cursorclass=DictCursor)
            print("12211212")
        return __pool.connection()

    def getAll(self, sql, param=None):
        """
        @summary: 执行查询,并取出所有结果集
        @param sql:查询SQL,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
        @param param: 可选参数,条件列表值(元组/列表)
        @return: result list(字典对象)/boolean 查询到的结果集
        """
        if param is None:
            count = self._cursor.execute(sql)
        else:
            count = self._cursor.execute(sql, param)
        if count > 0:
            result = self._cursor.fetchall()
        else:
            result = False
        return result

    def getOne(self, sql, param=None):
        """
        @summary: 执行查询,并取出第一条
        @param sql:查询SQL,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
        @param param: 可选参数,条件列表值(元组/列表)
        @return: result list/boolean 查询到的结果集
        """
        if param is None:
            count = self._cursor.execute(sql)
        else:
            count = self._cursor.execute(sql, param)
        if count > 0:
            result = self._cursor.fetchone()
        else:
            result = False
        return result

    def getMany(self, sql, num, param=None):
        """
        @summary: 执行查询,并取出num条结果
        @param sql:查询SQL,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
        @param num:取得的结果条数
        @param param: 可选参数,条件列表值(元组/列表)
        @return: result list/boolean 查询到的结果集
        """
        if param is None:
            count = self._cursor.execute(sql)
        else:
            count = self._cursor.execute(sql, param)
        if count > 0:
            result = self._cursor.fetchmany(num)
        else:
            result = False
        return result

    def insertMany(self, sql, values):
        """
        @summary: 向数据表插入多条记录
        @param sql:要插入的SQL格式
        @param values:要插入的记录数据tuple(tuple)/list[list]
        @return: count 受影响的行数
        """
        count = self._cursor.executemany(sql, values)
        return count

    def __query(self, sql, param=None):
        if param is None:
            count = self._cursor.execute(sql)
        else:
            count = self._cursor.execute(sql, param)
        return count

    def update(self, sql, param=None):
        """
        @summary: 更新数据表记录
        @param sql: SQL格式及条件,使用(%s,%s)
        @param param: 要更新的  值 tuple/list
        @return: count 受影响的行数
        """
        return self.__query(sql, param)

    def insert(self, sql, param=None):
        """
        @summary: 更新数据表记录
        @param sql: SQL格式及条件,使用(%s,%s)
        @param param: 要更新的  值 tuple/list
        @return: count 受影响的行数
        """
        return self.__query(sql, param)

    def delete(self, sql, param=None):
        """
        @summary: 删除数据表记录
        @param sql: SQL格式及条件,使用(%s,%s)
        @param param: 要删除的条件 值 tuple/list
        @return: count 受影响的行数
        """
        return self.__query(sql, param)

    def begin(self):
        """
        @summary: 开启事务
        """
        self._conn.autocommit(0)

    def end(self, option='commit'):
        """
        @summary: 结束事务
        """
        if option == 'commit':
            self._conn.commit()
        else:
            self._conn.rollback()

    def dispose(self, isEnd=1):
        """
        @summary: 释放连接池资源
        """
        if isEnd == 1:
            self.end('commit')
        else:
            self.end('rollback')
        self._cursor.close()
        self._conn.close()


if __name__ == '__main__':
    mysql = MyPymysqlPool("dbMysql")
    sqlAll = "select * from seckill;"
    result = mysql.getAll(sqlAll)
    print(result)
    # 释放资源
    mysql.dispose()

13.4.4. 封装数据库4

#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2019/12/18 20:09
# filename: 03.连接mysql的封装.py
import pymysql


class dbHelper:
    def __init__(self, host, user, password, port, database):
        self.host = host
        self.user = user
        self.passsword = password
        self.port = port
        self.database = database

    # 连接
    def connect(self):
        self.conn = pymysql.connect(host=self.host, user=self.user, password=self.passsword, port=self.port,
                                    database=self.database)
        self.cursor = self.conn.cursor()

    # 关闭
    def close(self):
        self.cursor.close()
        self.conn.close()

    # 封装增删改
    def __oper(self, sql, params):
        row = 0
        try:
            self.connect()
            row = self.cursor.execute(sql, params)
            self.conn.commit()
            self.close()
        except Exception as e:
            # 出现错误进行回滚
            self.conn.rollback()
        return row

    def insert(self, sql, params=[]):
        """
        :param sql:
        :param params:
        :return: 增
        """
        return self.__oper(sql, params)

    def delete(self, sql, params=[]):
        """
        :param sql:
        :param params:
        :return: 删
        """
        return self.__oper(sql, params)

    def update(self, sql, params=[]):
        """
        :param sql:
        :param params:
        :return: 改
        """
        return self.__oper(sql, params)

    def find_one(self, sql, params):
        """
        :param sql:
        :param params:
        :return: 查询单条记录
        """
        data = None
        try:
            self.connect()
            self.cursor.execute(sql, params)
            data = self.cursor.fetchone()
            self.close()
        except Exception as e:
            print(e)

        return data

    def find_all(self, sql, params=[]):
        """
        :param sql:
        :param params:
        :return: 查询多条记录
        """
        datas = None
        try:
            self.connect()
            self.cursor.execute(sql, params)
            datas = self.cursor.fetchall()
            self.close()
        except Exception as e:
            print(e)

        return datas


if __name__ == '__main__':
    dbinfo = dbHelper("127.0.0.1", "root", "admin#123", "3306", "students")

13.4.5. 创建class的DAO类方式

config.ini

;数据库设置
[db]
host = 127.0.0.1
port = 3306
user = root
password = admin#123
database = petstore
charset = utf8

base_dao.py

# coding=utf-8
# 代码文件:chapter22/PetStore/com/zhijieketang/petstore/dao/base_dao.py

"""定义DAO基类"""
import pymysql
import configparser


class BaseDao(object):
    def __init__(self):
        self.config = configparser.ConfigParser()
        self.config.read('config.ini', encoding='utf-8')

        host = self.config['db']['host']
        user = self.config['db']['user']
        # 读取整数port数据
        port = self.config.getint('db', 'port')
        password = self.config['db']['password']
        database = self.config['db']['database']
        charset = self.config['db']['charset']

        self.conn = pymysql.connect(host=host,
                                    user=user,
                                    port=port,
                                    password=password,
                                    database=database,
                                    charset=charset)

    def close(self):
        """关闭数据库连接"""

        self.conn.close()

使用dao类

# coding=utf-8
# 代码文件:chapter22/PetStore/com/zhijieketang/petstore/dao/account_dao.py

"""商品管理DAO"""
from com.zhijieketang.petstore.dao.base_dao import BaseDao


class ProductDao(BaseDao):
    def __init__(self):
        super().__init__()

    def findall(self):
        """查询所有商品信息"""

        products = []

        try:
            # 2. 创建游标对象
            with self.conn.cursor() as cursor:
                # 3. 执行SQL操作
                sql = 'select productid,category,cname,ename,image,listprice,unitcost,descn ' \
                      'from products'
                cursor.execute(sql)
                # 4. 提取结果集
                result_set = cursor.fetchall()

                for row in result_set:
                    product = {}
                    product['productid'] = row[0]
                    product['category'] = row[1]
                    product['cname'] = row[2]
                    product['ename'] = row[3]
                    product['image'] = row[4]
                    product['listprice'] = row[5]
                    product['unitcost'] = row[6]
                    product['descn'] = row[7]
                    products.append(product)
                # with代码块结束 5. 关闭游标
        finally:
            # 6. 关闭数据连接
            self.close()

        return products

    def findbycat(self, catname):
        """按照商品类别查询商品"""

        products = []
        try:
            # 2. 创建游标对象
            with self.conn.cursor() as cursor:
                # 3. 执行SQL操作
                sql = 'select productid,category,cname,ename,image,listprice,unitcost,descn ' \
                      'from products where category=%s'
                cursor.execute(sql, catname)
                # 4. 提取结果集
                result_set = cursor.fetchall()

                for row in result_set:
                    product = {}
                    product['productid'] = row[0]
                    product['category'] = row[1]
                    product['cname'] = row[2]
                    product['ename'] = row[3]
                    product['image'] = row[4]
                    product['listprice'] = row[5]
                    product['unitcost'] = row[6]
                    product['descn'] = row[7]
                    products.append(product)
                # with代码块结束 5. 关闭游标
        finally:
            # 6. 关闭数据连接
            self.close()

        return products


    def findbyid(self, productid):
        """按照商品id查询商品"""

        product = None
        try:
            # 2. 创建游标对象
            with self.conn.cursor() as cursor:
                # 3. 执行SQL操作
                sql = 'select productid,category,cname,ename,image,listprice,unitcost,descn' \
                      ' from products where productid=%s'
                cursor.execute(sql, productid)
                # 4. 提取结果集
                row = cursor.fetchone()

                if row is not None:
                    product = {}
                    product['productid'] = row[0]
                    product['category'] = row[1]
                    product['cname'] = row[2]
                    product['ename'] = row[3]
                    product['image'] = row[4]
                    product['listprice'] = row[5]
                    product['unitcost'] = row[6]
                    product['descn'] = row[7]

                # with代码块结束 5. 关闭游标

        finally:
            # 6. 关闭数据连接
            self.close()

        return product

代码片段

import pymysql


class MengSql():
    def __init__(self, host, user, passwd, dbName):
        self.host = host
        self.user = user
        self.passwd = passwd
        self.dbName = dbName

    def connet(self):
        self.db = pymysql.connect(self.host, self.user, self.passwd, self.dbName)
        self.cursor = self.db.cursor()

    def close(self):
        self.cursor.close()
        self.db.close()

    def get_one(self, sql):
        res = None
        try:
            self.connet()
            self.cursor.execute(sql)
            res = self.cursor.fetchone()
            self.close()
        except:
            print("查询失败")
        return res

    def get_all(self, sql):
        res = ()
        try:
            self.connet()
            self.cursor.execute(sql)
            res = self.cursor.fetchall()
            self.close()
        except:
            print("查询失败")
        return res

    def insert(self, sql):
        return self.__edit(sql)

    def update(self, sql):
        return self.__edit(sql)

    def delete(self, sql):
        return self.__edit(sql)

    def __edit(self, sql):
        count = 0
        try:
            self.connet()
            count = self.cursor.execute(sql)
            self.db.commit()
            self.close()
        except:
            print("事物提交失败")
            self.db.rollback()
        return count

eg

#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2019/8/19 9:39
# filename: 02.连接mysql数据库的封装.py

"""
安装pymysql数据库驱动程序:
pip install pymysql
运行完毕 查看是否成功 pip -m  list

"""

import time
import pymysql


# import decimal
class MSSQL:
    def __init__(self, host, user, pwd, db):
        self.host = host
        self.user = user
        self.pwd = pwd
        self.db = db

    def GetConnect(self):
        if not self.db:
            raise (NameError, '没有目标数据库')
        self.connect = pymysql.connect(host=self.host, user=self.user, password=self.pwd, database=self.db,
                                       charset='utf8')
        cur = self.connect.cursor()
        if not cur:
            raise (NameError, '数据库访问失败')
        else:
            return cur

    def ExecSql(self, sql):
        cur = self.GetConnect()
        cur.execute(sql)
        self.connect.commit()
        self.connect.close()

    def ExecQuery(self, sql):
        cur = self.GetConnect()
        cur.execute(sql)
        resList = cur.fetchall()
        self.connect.close()
        return resList


def main():
    ms = MSSQL(host="192.168.0.108", user="sa", pwd="sa", db="ComPrject")
    resList = ms.ExecQuery("select *from TestModel")
    print(resList)


if __name__ == '__main__':
    main()
    input("执行完成:")

参考文献

https://www.jb51.net/article/76231.htm https://www.jb51.net/article/45077.htm