13.6. 数据库模块化

13.6.1. 模块化1

config/db.ini

[MySQL]
host=localhost
user=root
password=admin#123
port=3306
schema=my_blog
charset=UTF8

common/named_tuples.py

#!/usr/bin/env python
#-*- coding:utf8 -*-

#定义必须的名称元祖
import collections

#用于MySQL的服务信息
MySQL = collections.namedtuple('MySQL', ['host', 'user', 'password', 'port', 'charset', 'schema'])

common/conf_utils.py

#!/usr/bin/env python
#-*- coding:utf8 -*-
from common.named_tuples import MySQL
import configparser

class ConfigReader:
    #path为配置文件的完整路径,调用者传入
    def __init__(self, path):
        if path is None or len(path) < 1:
            raise ValueError("The config ini file path required..")
        else:
            self.conf = configparser.ConfigParser()
            self.conf.read(path, encoding="utf-8")

    def get_mysql_info(self):
        '''
        获取mysql服务器信息,返回MySQL(host,user,password,port,charset,schema)
        :return:
        '''
        host = self.conf.get('MySQL', 'host')
        user = self.conf.get('MySQL', 'user')
        pswd = self.conf.get('MySQL', 'password')
        port = self.conf.get('MySQL', 'port')
        charset = self.conf.get('MySQL', 'charset')
        schema = self.conf.get('MySQL', 'schema')

        return MySQL(host, user, pswd, int(port), charset, schema)


# a = ConfigReader("../config/db.ini")
# print(a.get_mysql_info())

common/mysql_client.py

#!/usr/bin/env python
# -*- coding:utf8 -*-

# Mysql操作函数
from common.conf_utils import ConfigReader
import pymysql
import datetime


# MySQL获取数据库连接
def connect_pool():
    cr = ConfigReader("../config/db.ini")
    conf = cr.get_mysql_info()
    return pymysql.connections.Connection(host=conf.host,
                                          port=conf.port,
                                          password=conf.password,
                                          database=conf.schema,
                                          charset=conf.charset)

def query_table(sql):
    '''
    MySQL统一数据查询方法
    :param sql:
    :return:
    '''
    print("MySQL clinet query start....")
    start = datetime.datetime.now()
    print(sql)
    result = []
    try:
        conn = connect_pool()
        cur = conn.cursor()
        cur.execute(sql)
        for row in cur.fetchall():
            result.append([cell for cell in row])
    except Exception as e:
        print("Query from MySQL table failed . Case :{} \n".format(e))

    finally:
        if cur:
            cur.close()
        if conn:
            conn.close()

    records = len(result)
    end = datetime.datetime.now()
    print('Mysql client query completed in %s seconds. Records found: %s\n'% ((end - start).seconds, records))
    return result



def update_record(sql):
    '''
    MySQL 统一数据更新方法
    :param sql:
    :return:
    '''
    global cur
    result = []
    try:
        conn = connect_pool()
        cur = conn.cursor()
        cur.execute(sql)
    except Exception as e:
        print("update MySQL table failed . Case: {} \n".format(e))
    finally:
        conn.commit()
        if cur:
            cur.close()
        if conn:
            conn.close()
    return


if __name__ == '__main__':
    query_table("show tables;")

13.6.2. 模块化2

dbMysqlConfig.cnf

[dbMysql]
host = localhost
port = 3306
user = root
password = 123456
db_name = house

mysql_DBUtils.py

#!/usr/bin/python3
# -*- coding:utf-8 -*-
import pymysql, os, configparser
from pymysql.cursors import DictCursor
from DBUtils.PooledDB import PooledDB


class Config(object):
    """
    # Config().get_content("user_information")
    配置文件里面的参数
    [dbMysql]
    host = 192.168.1.80
    port = 3306
    user = root
    password = 123456
    """

    def __init__(self, config_filename="dbMysqlConfig.cnf"):
        file_path = os.path.join(os.path.dirname(__file__), config_filename)
        self.cf = configparser.ConfigParser()
        self.cf.read(file_path)

    def get_sections(self):
        return self.cf.sections()

    def get_options(self, section):
        return self.cf.options(section)

    def get_content(self, section):
        result = {}
        for option in self.get_options(section):
            value = self.cf.get(section, option)
            result[option] = int(value) if value.isdigit() else value
        return result


class BasePymysqlPool(object):
    def __init__(self, host, port, user, password, db_name):
        self.db_host = host
        self.db_port = int(port)
        self.user = user
        self.password = str(password)
        self.db = db_name
        self.conn = None
        self.cursor = None


class MyPymysqlPool(BasePymysqlPool):
    """
    MYSQL数据库对象,负责产生数据库连接 , 此类中的连接采用连接池实现
     获取连接对象:conn = Mysql.getConn()
     释放连接对象;conn.close()或del conn
    """
    # 连接池对象
    __pool = None

    def __init__(self, conf_name=None):
        self.conf = Config().get_content(conf_name)
        super(MyPymysqlPool, self).__init__(**self.conf)
        # 数据库构造函数,从连接池中取出连接,并生成操作游标
        self._conn = self.__getConn()
        self._cursor = self._conn.cursor()

    def __getConn(self):
        """
        @summary: 静态方法,从连接池中取出连接
        @return MySQLdb.connection
        """
        if MyPymysqlPool.__pool is None:
            __pool = PooledDB(creator=pymysql,
                              mincached=1,
                              maxcached=20,
                              host=self.db_host,
                              port=self.db_port,
                              user=self.user,
                              passwd=self.password,
                              db=self.db,
                              use_unicode=True,
                              charset="utf8",
                              cursorclass=DictCursor)
        return __pool.connection()

    def getAll(self, sql, param=None):
        """
        @summary: 执行查询,并取出所有结果集
        @param sql:查询SQL,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
        @param param: 可选参数,条件列表值(元组/列表)
        @return: result list(字典对象)/boolean 查询到的结果集
        """
        if param is None:
            count = self._cursor.execute(sql)
        else:
            count = self._cursor.execute(sql, param)
        if count > 0:
            result = self._cursor.fetchall()
        else:
            result = False
        return result

    def getOne(self, sql, param=None):
        """
        @summary: 执行查询,并取出第一条
        @param sql:查询SQL,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
        @param param: 可选参数,条件列表值(元组/列表)
        @return: result list/boolean 查询到的结果集
        """
        if param is None:
            count = self._cursor.execute(sql)
        else:
            count = self._cursor.execute(sql, param)
        if count > 0:
            result = self._cursor.fetchone()
        else:
            result = False
        return result

    def getMany(self, sql, num, param=None):
        """
        @summary: 执行查询,并取出num条结果
        @param sql:查询SQL,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
        @param num:取得的结果条数
        @param param: 可选参数,条件列表值(元组/列表)
        @return: result list/boolean 查询到的结果集
        """
        if param is None:
            count = self._cursor.execute(sql)
        else:
            count = self._cursor.execute(sql, param)
        if count > 0:
            result = self._cursor.fetchmany(num)
        else:
            result = False
        return result

    def insertMany(self, sql, values):
        """
        @summary: 向数据表插入多条记录
        @param sql:要插入的SQL格式
        @param values:要插入的记录数据tuple(tuple)/list[list]
        @return: count 受影响的行数
        """
        count = self._cursor.executemany(sql, values)
        return count

    def __query(self, sql, param=None):
        if param is None:
            count = self._cursor.execute(sql)
        else:
            count = self._cursor.execute(sql, param)
        return count

    def update(self, sql, param=None):
        """
        @summary: 更新数据表记录
        @param sql: SQL格式及条件,使用(%s,%s)
        @param param: 要更新的  值 tuple/list
        @return: count 受影响的行数
        """
        return self.__query(sql, param)

    def insert(self, sql, param=None):
        """
        @summary: 更新数据表记录
        @param sql: SQL格式及条件,使用(%s,%s)
        @param param: 要更新的  值 tuple/list
        @return: count 受影响的行数
        """
        return self.__query(sql, param)

    def delete(self, sql, param=None):
        """
        @summary: 删除数据表记录
        @param sql: SQL格式及条件,使用(%s,%s)
        @param param: 要删除的条件 值 tuple/list
        @return: count 受影响的行数
        """
        return self.__query(sql, param)

    def begin(self):
        """
        @summary: 开启事务
        """
        self._conn.autocommit(0)

    def end(self, option='commit'):
        """
        @summary: 结束事务
        """
        if option == 'commit':
            self._conn.commit()
        else:
            self._conn.rollback()

    def dispose(self, isEnd=1):
        """
        @summary: 释放连接池资源
        """
        if isEnd == 1:
            self.end('commit')
        else:
            self.end('rollback')
        self._cursor.close()
        self._conn.close()


mysql = MyPymysqlPool("dbMysql")

if __name__ == '__main__':
    sqlAll = "select id, title from novel limit 2;"
    result = mysql.getAll(sqlAll)
    print(result)
    # 释放资源
    mysql.dispose()