12.23. pymysql+log+configparser封装

12.23.1. 配置文件:

[local_db]
host=127.0.0.1
port=3306
user=root
password=123456
database=userinfo

12.23.2. 配置文件解析:

import copy
import configparser

conf_file = "conf"


class ConfParser():
    def __init__(self, filename, section=None, option=None):
        self.conf_file = filename
        self.conf_parser = configparser.ConfigParser()
        self._parser()

    def _parser(self):
        self.conf_parser.read(self.conf_file)

    def sections(self):
        return self.conf_parser.sections()

    def options(self, section):
        return self.conf_parser.options(section)

    def get_section_value(self, section, keys=None):
        options = self.conf_parser.options(section)
        key_value = {option: self.conf_parser.get(section, option) for option in options}
        result = copy.deepcopy(key_value)
        if keys:
            result = {i: key_value.get(i, None) for i in keys}
        return result


if __name__ == '__main__':
    parser = ConfParser(filename=conf_file)
    print(parser.get_section_value('local_db'))

12.23.3. 日志文件:

日志分段配置类在logging.handler中

import logging
from logging.handlers import RotatingFileHandler


class Logger(object):
    def __init__(self, name=None, level=logging.DEBUG):
        # (1)创建logger对象
        self._logger = logging.getLogger(name=name)
        self._logger.setLevel(level=level)

        # (2)定义默认日志格式
        self._fmt = logging.Formatter(
            '[%(asctime)s %(name)s-%(filename)s[%(levelname)s]-%(process)d:%(threadName)s-%(thread)d]:%(message)s')

    def create_log(self, filename, fh_fmt=None, fh_level=logging.DEBUG, sh_level=None, sh_fmt=None):
        self.file_handler(filename, fh_fmt, fh_level)
        if sh_level:
            self.stream_handler(sh_fmt, sh_level)
        return self._logger

    def file_handler(self, filename, fmt=None, level=logging.DEBUG):
        level = getattr(logging, level.upper()) if type(level) is str else level

        # (3)1日志操作符(文件)
        # fh = logging.FileHandler(filename=filename, encoding='utf-8')
        fh = RotatingFileHandler(filename, maxBytes=1024*1024, backupCount=2)
        fh.setFormatter(fmt=fmt if fmt else self._fmt)
        fh.setLevel(level=level)
        self._logger.addHandler(fh)
        return self._logger

    def stream_handler(self, fmt=None, level=logging.INFO):
        level = getattr(logging, level.upper()) if type(level) is str else level
        # (3)2日志操作符(屏幕)
        sh = logging.StreamHandler()
        sh.setFormatter(fmt=fmt if fmt else self._fmt)
        sh.setLevel(level=level)
        self._logger.addHandler(sh)
        return self._logger


if __name__ == '__main__':
    filename = 'test.log'
    log_obj = Logger(name='test_log')
    logger = log_obj.create_log(filename, sh_level='debug')

    for i in range(100):
        logger.debug(msg=f'{i}debug')
        logger.warning(msg=f'{i}warning')
        logger.info(msg=f'{i}info')
        logger.error(msg=f'{i}error')
        logger.critical(msg=f'{i}critical')

12.23.4. 数据库类:

import pymysql
from log import Logger
from conf_parse import ConfParser

log_obj = Logger(name='test_log')
log = log_obj.create_log(filename='test.log', sh_level='info')


class DbConnection():
    def __init__(self, host, user, password, database, port=3306, cursorclass=pymysql.cursors.Cursor, autocommit=False,
                 **kwargs):
        """

        :param host: 主机ip
        :param user: 用户名
        :param password: 密码
        :param database: 数据库
        :param port: 端口号
        :param cursorclass: 游标类(输出结果样式)
        :param autocommit: 是否自动提交
        :param kwargs: 其它参数
        """
        __kwargs = {"host": host, "port": int(port), "user": user, "password": password,
                    "database": database, "cursorclass": cursorclass, "autocommit": autocommit}
        self._reconnect(**__kwargs, **kwargs)

    def _reconnect(self, **kwargs):
        self.close()
        self._db_conn = pymysql.connect(**kwargs)
        #  是否自动提交,可以再实例化建立连接时指定参数autocommit,后续在写操作时无需commit()
        # self._db_conn.autocommit(value=True)

    def close(self):
        if getattr(self, '_db_conn', None) is not None:
            self._db_conn.close()
            self._db_conn = None

    def write(self, sql):
        """

        :param sql: DDL>>insert, update, delete,DML:create;DCL>>grant, remove
        :return: None
        """
        _cursor = self._db_conn.cursor()
        thread_id = self._db_conn.thread_id()
        try:
            _cursor.execute(sql)
            # 事务提交,autocommit=True自动开启
            self._db_conn.commit()
        except Exception as e:
            self._db_conn.rollback()
            log.error('write fail[error:%s]:[Thread-%s]%s' % (e, thread_id, sql))
        else:
            log.debug('write success:[Thread-%s]%s' % (thread_id, sql))
        finally:
            _cursor.close()

    def read(self, sql, allinfo=True, size=None):
        """

        :param sql:DQL>>select;
        :param all: False--fetchone, True--fetchall()
        :param size: size--int
        :return:
        """
        result = None
        _cursor = self._db_conn.cursor()
        thread_id = self._db_conn.thread_id()

        try:
            _cursor.execute(sql)
        except Exception as e:
            log.error('read fail[error:%s]:[Thread-%s]%s' % (e, thread_id, sql))
        else:
            log.debug('read success:[Thread-%s]%s' % (thread_id, sql))
            if size:
                result = _cursor.fetchmany(size)
            elif not allinfo:
                result = _cursor.fetchone()
            elif allinfo:
                result = _cursor.fetchall()
        finally:
            _cursor.close()

        return result


if __name__ == '__main__':
    conf_file = "conf"
    parser = ConfParser(filename=conf_file)
    db_conf = parser.get_section_value('local_db')


    db_conn = DbConnection(**db_conf, cursorclass=pymysql.cursors.DictCursor)

    sql = "select * from username;"
    ret = db_conn.read(sql, allinfo=True)

    # 插入数据
    # sql = "insert into username values('c',14),('d',15);"
    # db_conn.write(sql)

    sql = "select * from username;"
    ret = db_conn.read(sql)
    print(ret)

    sql = "select * from username;"
    ret = db_conn.read(sql,size=3)
    print(ret)

    sql = "select * from username;"
    ret = db_conn.read(sql, allinfo=False)
    print(ret)

    db_conn.close()

12.23.5. logging封装

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @auther:   18793
# @Date:    2021/3/15 16:07
# @filename: logging模块封装.py
# @Email:    1879324764@qq.com
# @Software: PyCharm
import logging

"""
基于logging封装操作类
"""


class My_logger:
    _logger = None

    def __init__(self, path, console_level=logging.DEBUG, file_level=logging.DEBUG):
        self._logger = logging.getLogger(path)
        self._logger.setLevel(logging.DEBUG)
        fmt = logging.Formatter('%(asctime)-12s %(levelname)-8s %(name)-10s %(message)-12s')

        # 设置命令行日志
        sh = logging.StreamHandler()
        sh.setLevel(console_level)
        sh.setFormatter(fmt)

        # 设置文件日志
        fh = logging.FileHandler(path, encoding="utf-8")
        fh.setFormatter(fmt)
        fh.setLevel(file_level)

        self._logger.addHandler(sh)
        self._logger.addHandler(fh)

    # debug
    def debug(self, message):
        self._logger.debug(message)

    # info
    def info(self, message):
        self._logger.info(message)

    # warning
    def warning(self, message):
        self._logger.warning(message)

    # error
    def error(self, message):
        self._logger.error(message)

    # critical
    def critical(self, message):
        self._logger.critical(message)


if __name__ == '__main__':
    logger = My_logger("./2012log.log")
    logger.debug("debug info ")
    # logger.info("202cdssssssssss")