19.3. 业务服务监控详解

19.3.1. 文件内容差异对比方法

示例1:两个字符串的差异对比

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @auther:   18793
# @Date:    2020/10/29 15:08
# @filename: 2.1.py
# @Email:    1879324764@qq.com
# @Software: PyCharm
import difflib

text1 = """text1:
This module provides classes and functions for comparing sequences.
including HTML and context and unified diffs.
difflib document v7.4
add string"""

text1 = text1.splitlines()  # 以行进行分割

text2 = """text2:
This module provides classes and functions for Comparing sequences.
including HTML and context and unified diffs.
difflib document v7.5"""
text2 = text2.splitlines()
d = difflib.Differ()
diff = d.compare(text1, text2)
print("\n".join(list(diff)))

本示例采用Differ()类对两个字符串进行比较, 另外difflib的SequenceMatcher()类支持任意类型序列的比较, HtmlDiff()类支持将比较结果输出为HTML格式,

生成美观的对比HTML格式文档

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @auther:   18793
# @Date:    2020/10/29 15:08
# @filename: 2.1.py
# @Email:    1879324764@qq.com
# @Software: PyCharm
import difflib

text1 = """text1:
This module provides classes and functions for comparing sequences.
including HTML and context and unified diffs.
difflib document v7.4
add string"""

text1 = text1.splitlines()  # 以行进行分割

text2 = """text2:
This module provides classes and functions for Comparing sequences.
including HTML and context and unified diffs.
difflib document v7.5"""
text2 = text2.splitlines()
# d = difflib.Differ()
# diff = d.compare(text1, text2)
# print("\n".join(list(diff)))
d = difflib.HtmlDiff()

# 创建index.html文件并写入内容
html = d.make_file(text1, text2)
with open("index.html", "w", encoding="utf-8") as f:
    f.write(html)

示例2:对比Nginx配置文件差异

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @auther:   18793
# @Date:    2020/10/29 15:23
# @filename: 2.2.py
# @Email:    1879324764@qq.com
# @Software: PyCharm
import difflib
import sys

try:
    textfile1 = sys.argv[1]
    textfile2 = sys.argv[2]

except Exception as e:
    print("Error:" + str(e))
    print("Usage: simple.py filename1 filename2")
    sys.exit()


def readfile(filename):
    try:
        fileHandle = open(filename, "r", encoding="utf-8")
        text = fileHandle.read().splitlines()  # 读取后进行分割
        fileHandle.close()
        return text
    except Exception as e:
        print("Read file error:", str(e))
        sys.exit()


if textfile1 == "" or textfile2 == "":
    print("Usage: simple.py filename1 filename2")
    sys.exit()

# 获取分割后的字符串
text1_lines = readfile(textfile1)
text2_lines = readfile(textfile2)

d = difflib.HtmlDiff()
html = d.make_file(text1_lines, text2_lines)

with open("index2.html", "w", encoding="utf-8") as f:
    f.write(html)

19.3.2. 文件与目录差异对比方法

Python的标准库filecmp模块可以实现文件、目录、遍历子目录的差异对比功能。

#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2019/8/17 10:18
# filename: 校验源与备份目录差异.py
import os, sys
import filecmp
import re
import shutil

PWd_path = os.path.abspath(os.path.dirname(__file__))

# 创建一个空列表
holderlist = []


def compareme(dir1, dir2):
    """
    递归获取更新项函数
    :param dir1: source dir
    :param dir2: target dir
    :return:
    """
    dircomp = filecmp.dircmp(dir1, dir2)
    only_in_one = dircomp.left_only  # 源目录新文件或目录
    diff_in_one = dircomp.diff_files  # 不匹配文件,源目录已经发生变化
    dirpath = os.path.abspath(dir1)  # 获取源目录绝对路径
    [holderlist.append(os.path.abspath(os.path.join(dir1, x))) for x in only_in_one]
    [holderlist.append(os.path.abspath(os.path.join(dir1, x))) for x in diff_in_one]
    if len(dircomp.common_dirs) > 0:
        for item in dircomp.common_dirs:
            compareme(os.path.abspath(os.path.join(dir1, item)), \
                      os.path.abspath(os.path.join(dir2, item)))
            return holderlist
    else:
        return holderlist


def main():
    # if len(sys.argv) > 2:
    #     # dir1 = sys.argv[1]
    #     dir1 = "D:/1.学习知识待整理\DevOps自动化运维/https-github.com-yorkoliu-pyauto-master/第二章/filecmp/dir1"
    #     # dir2 = sys.argv[2]
    #     dir2 = "D:/1.学习知识待整理\DevOps自动化运维/https-github.com-yorkoliu-pyauto-master/第二章/filecmp/dir2"
    # else:
    #     print("Usage: ", sys.argv[0], "datadir backupdir")
    #     sys.exit()

    dir1 = PWd_path + "/dir1/"
    dir2 = PWd_path + "/dir2/"
    source_files = compareme(dir1, dir2)

    if not dir2.endswith('/'): dir2 = dir2 + '/'
    dir1 = dir1.replace("\\", "/")
    dir2 = dir2.replace("\\", "/")
    destination_files = []
    createdir_bool = False
    #
    for item in source_files:
        item = item.replace("\\", "/")
        # print(dir1, dir2, item)
        destination_dir = re.sub(dir1, dir2, item)
        destination_files.append(destination_dir)
        # print(destination_files)
        if os.path.isdir(item):
            if not os.path.exists(destination_dir):
                os.makedirs(destination_dir)
                createdir_bool = True

    if createdir_bool:
        destination_files = []
        source_files = []
        source_files = compareme(dir1, dir2)
        for item in source_files:
            destination_dir = re.sub(dir1, dir2, item)
            destination_files.append(destination_dir)

    print("update item:")
    print("---------------------------------------")
    print(source_files)     # 输出更新列表清单
    print("---------------------------------------")

    #
    copy_pair = zip(source_files, destination_files)
    for item in copy_pair:
        if os.path.isfile(item[0]):
            shutil.copyfile(item[0], item[1])


if __name__ == '__main__':
    main()

19.3.3. 发送电子邮件模块smtplib

一个发邮件的简单示例

代码示例1:

import smtplib
import string

HOST = "smtp.gmail.com"
SUBJECT = "Test email from Python"
TO = "test@qq.com"
FROM = "test@gmail.com"
text = "Python rules them all!"
BODY = string.join((
        "From: %s" % FROM,
        "To: %s" % TO,
        "Subject: %s" % SUBJECT ,
        "",
        text
        ), "\r\n")
server = smtplib.SMTP()
server.connect(HOST,"25")
server.starttls()
server.login("test@gmail.com","123456")
server.sendmail(FROM, [TO], BODY)
server.quit()

实现html格式的数据报表邮件

代码示例2

#coding: utf-8
import smtplib
from email.mime.text import MIMEText

HOST = "smtp.gmail.com"
SUBJECT = u"官网流量数据报表"
TO = "test@qq.com"
FROM = "test@gmail.com"

msg = MIMEText("""
    <table width="800" border="0" cellspacing="0" cellpadding="4">
      <tr>
        <td bgcolor="#CECFAD" height="20" style="font-size:14px">*官网数据  <a href="monitor.domain.com">更多>></a></td>
      </tr>
      <tr>
        <td bgcolor="#EFEBDE" height="100" style="font-size:13px">
        1)日访问量:<font color=red>152433</font>  访问次数:23651 页面浏览量:45123 点击数:545122  数据流量:504Mb<br>
        2)状态码信息<br>
        &nbsp;&nbsp;500:105  404:3264  503:214<br>
        3)访客浏览器信息<br>
        &nbsp;&nbsp;IE:50%  firefox:10% chrome:30% other:10%<br>
        4)页面信息<br>
        &nbsp;&nbsp;/index.php 42153<br>
        &nbsp;&nbsp;/view.php 21451<br>
        &nbsp;&nbsp;/login.php 5112<br>
    </td>
      </tr>
    </table>""","html","utf-8")
msg['Subject'] = SUBJECT
msg['From']=FROM
msg['To']=TO
try:
    server = smtplib.SMTP()
    server.connect(HOST,"25")
    server.starttls()
    server.login("test@gmail.com","123456")
    server.sendmail(FROM, TO, msg.as_string())
    server.quit()
    print "邮件发送成功!"
except Exception, e:
    print "失败:"+str(e)

实现图文格式的服务器性能报表邮件

代码示例3

#coding: utf-8
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.image import MIMEImage

HOST = "smtp.gmail.com"
SUBJECT = u"业务性能数据报表"
TO = "test@qq.com"
FROM = "test@gmail.com"

def addimg(src,imgid):
    fp = open(src, 'rb')
    msgImage = MIMEImage(fp.read())
    fp.close()
    msgImage.add_header('Content-ID', imgid)
    return msgImage

msg = MIMEMultipart('related')
msgtext = MIMEText("""
<table width="600" border="0" cellspacing="0" cellpadding="4">
      <tr bgcolor="#CECFAD" height="20" style="font-size:14px">
        <td colspan=2>*官网性能数据  <a href="monitor.domain.com">更多>></a></td>
      </tr>
      <tr bgcolor="#EFEBDE" height="100" style="font-size:13px">
        <td>
         <img src="cid:io"></td><td>
         <img src="cid:key_hit"></td>
      </tr>
      <tr bgcolor="#EFEBDE" height="100" style="font-size:13px">
         <td>
         <img src="cid:men"></td><td>
         <img src="cid:swap"></td>
      </tr>
    </table>""","html","utf-8")
msg.attach(msgtext)
msg.attach(addimg("img/bytes_io.png","io"))
msg.attach(addimg("img/myisam_key_hit.png","key_hit"))
msg.attach(addimg("img/os_mem.png","men"))
msg.attach(addimg("img/os_swap.png","swap"))

msg['Subject'] = SUBJECT
msg['From']=FROM
msg['To']=TO
try:
    server = smtplib.SMTP()
    server.connect(HOST,"25")
    server.starttls()
    server.login("test@gmail.com","123456")
    server.sendmail(FROM, TO, msg.as_string())
    server.quit()
    print "邮件发送成功!"
except Exception, e:
    print "失败:"+str(e)

实现带附件格式的业务服务质量周报邮件

#coding: utf-8
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.image import MIMEImage

HOST = "smtp.gmail.com"
SUBJECT = u"官网业务服务质量周报"
TO = "test@qq.com"
FROM = "test@gmail.com"

def addimg(src,imgid):
    fp = open(src, 'rb')
    msgImage = MIMEImage(fp.read())
    fp.close()
    msgImage.add_header('Content-ID', imgid)
    return msgImage

msg = MIMEMultipart('related')
msgtext = MIMEText("<font color=red>官网业务周平均延时图表:<br><img src=\"cid:weekly\" border=\"1\"><br>详细内容见附件。</font>","html","utf-8")
msg.attach(msgtext)
msg.attach(addimg("img/weekly.png","weekly"))

attach = MIMEText(open("doc/week_report.xlsx", "rb").read(), "base64", "utf-8")
attach["Content-Type"] = "application/octet-stream"
#attach["Content-Disposition"] = "attachment; filename=\"业务服务质量周报(12周).xlsx\"".decode("utf-8").encode("gb18030")
msg.attach(attach)

msg['Subject'] = SUBJECT
msg['From']=FROM
msg['To']=TO
try:
    server = smtplib.SMTP()
    server.connect(HOST,"25")
    server.starttls()
    server.login("test@gmail.com","123456")
    server.sendmail(FROM, TO, msg.as_string())
    server.quit()
    print "邮件发送成功!"
except Exception, e:
    print "失败:"+str(e)

19.3.4. 探测web服务质量方法

pyurl是一个用C语言写的libcurl Python实现,功能非常强大,支持的协议有FTP、HTTP、HTTPS、TELNET等, 可以理解成Linux下的curl命令功能的Python封装,简单易用。

实现探测Web服务质量

# -*- coding: utf-8 -*-
import os,sys
import time
import sys
import pycurl

URL="http://www.google.com.hk"
c = pycurl.Curl()
c.setopt(pycurl.URL, URL)

#连接超时时间,5秒
c.setopt(pycurl.CONNECTTIMEOUT, 5)

#下载超时时间,5秒
c.setopt(pycurl.TIMEOUT, 5)
c.setopt(pycurl.FORBID_REUSE, 1)
c.setopt(pycurl.MAXREDIRS, 1)
c.setopt(pycurl.NOPROGRESS, 1)
c.setopt(pycurl.DNS_CACHE_TIMEOUT,30)

# 创建一个文件对象 以“wb”方式打开,用来存储返回的http头部及页面内容
indexfile = open(os.path.dirname(os.path.realpath(__file__))+"/content.txt", "wb")
c.setopt(pycurl.WRITEHEADER, indexfile)
c.setopt(pycurl.WRITEDATA, indexfile)
try:
    c.perform()
except Exception,e:
    print "connecion error:"+str(e)
    indexfile.close()
    c.close()
    sys.exit()

NAMELOOKUP_TIME =  c.getinfo(c.NAMELOOKUP_TIME)
CONNECT_TIME =  c.getinfo(c.CONNECT_TIME)
PRETRANSFER_TIME =   c.getinfo(c.PRETRANSFER_TIME)
STARTTRANSFER_TIME = c.getinfo(c.STARTTRANSFER_TIME)
TOTAL_TIME = c.getinfo(c.TOTAL_TIME)
HTTP_CODE =  c.getinfo(c.HTTP_CODE)
SIZE_DOWNLOAD =  c.getinfo(c.SIZE_DOWNLOAD)
HEADER_SIZE = c.getinfo(c.HEADER_SIZE)
SPEED_DOWNLOAD=c.getinfo(c.SPEED_DOWNLOAD)

print "HTTP状态码:%s" %(HTTP_CODE)
print "DNS解析时间:%.2f ms"%(NAMELOOKUP_TIME*1000)
print "建立连接时间:%.2f ms" %(CONNECT_TIME*1000)
print "准备传输时间:%.2f ms" %(PRETRANSFER_TIME*1000)
print "传输开始时间:%.2f ms" %(STARTTRANSFER_TIME*1000)
print "传输结束总时间:%.2f ms" %(TOTAL_TIME*1000)

print "下载数据包大小:%d bytes/s" %(SIZE_DOWNLOAD)
print "HTTP头部大小:%d byte" %(HEADER_SIZE)
print "平均下载速度:%d bytes/s" %(SPEED_DOWNLOAD)

indexfile.close()
c.close()

19.3.5. Python操作Excel表格

简单示例

插入文字、数字、图片单元格简单示例 代码示例1

#coding: utf-8
import xlsxwriter


# Create an new Excel file and add a worksheet.
workbook = xlsxwriter.Workbook('demo1.xlsx')
worksheet = workbook.add_worksheet()

# Widen the first column to make the text clearer.
worksheet.set_column('A:A', 20)

# Add a bold format to use to highlight cells.
#bold = workbook.add_format({'bold': True})
bold = workbook.add_format()
bold.set_bold()

# Write some simple text.
worksheet.write('A1', 'Hello')

# Text with formatting.
worksheet.write('A2', 'World', bold)

worksheet.write('B2', u'中文测试', bold)

# Write some numbers, with row/column notation.
worksheet.write(2, 0, 32)
worksheet.write(3, 0, 35.5)
worksheet.write(4, 0, '=SUM(A3:A4)')

# Insert an image.
worksheet.insert_image('B5', 'img/python-logo.png')

workbook.close()

定制自动化业务流量报表周报

代码示例

#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2019/8/17 10:34
# filename: python操作Excel方法.py

import xlwt

# 定义数据表头列表
title = ["业务名称", "星期一", "星期二", "星期三", "星期四", "星期五", "星期六", "星期日", "平均流量"]

# 定义5频道一周7天的流量数据
data = [
    ["业务官网", 150, 152, 158, 149, 155, 145, 148],
    ["新闻中心", 89, 88, 95, 56, 48, 100, 99],
    ["购物频道", 200, 201, 222, 234, 180, 179, 190],
    ["体育频道", 77, 88, 99, 55, 66, 48, 90],
    ["亲子频道", 81, 82, 83, 84, 85, 86, 87],
]

# for da in data:
#     da.append(sum(da[1:])/len(da[1:]))

# 计算平均值
[da.append(sum(da[1:]) / len(da[1:])) for da in data]

book = xlwt.Workbook(encoding="utf-8")
sheet = book.add_sheet("Sheet1")

for h in range(len(title)):
    sheet.write(0, h, title[h])

i = 1
for list in data:
    j = 0
    for data in list:
        sheet.write(i, j, data)
        j += 1
    i += 1
book.save("excel测试.xls")

19.3.6. 实现高效的端口扫描

单线程端口扫描示例

#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2020/3/21 11:01
# filename: smaple02.py

from __future__ import print_function
import telnetlib


def conn_scan(host, port):
    t = telnetlib.Telnet()
    try:
        t.open(host, port, timeout=1)
        print(host, port, 'is avaliable')
    except Exception as e:
        pass
        # print(host, port, 'is not avaliable',e)
    finally:
        t.close()


def main():
    host = '127.0.0.1'
    for port in range(80, 1000):
        conn_scan(host, port)


if __name__ == '__main__':
    main()

代码示例

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @auther:   18793
# @Date:    2020/11/3 15:34
# @filename: sample03.py
# @Email:    1879324764@qq.com
# @Software: PyCharm

import socket


def check_server(address, port=None):
    s = socket.socket()
    print('Attempting to connect to %s on port %s' % (address, port))
    try:
        s.connect((address, port))
        print('Connected to %s on port %s' % (address, port))
        return True
    except socket.error as e:
        print('Connection to %s on port %s failed: %s' % (address, port, e))
        return False
    finally:
        s.close()

check_server("127.0.0.1",port=443)

多线程端口扫描示例

#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2020/3/21 10:58
# filename: tasks.py
from __future__ import print_function
from socket import *


def conn_scan(port):
    conn = socket(AF_INET, SOCK_STREAM)
    host = "127.0.0.1"
    try:
        conn.connect((host, port))
        print(host, port, 'is available')
    except Exception as e:
        # print(host, port, 'is not available', e)
        pass
    finally:
        conn.close()


def mulit_run(func, max_workers, args):
    """
    多线程执行命令
    :param func:  执行函数
    :param max_workers: 最多线程数
    :param args: 可迭代对象
    :return:
    """
    from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED
    executor = ThreadPoolExecutor(max_workers=max_workers)
    all_task = [executor.submit(func, i) for i in args]
    wait(all_task, return_when=ALL_COMPLETED)


def main():
    ports = [p for p in range(60, 1000)]
    mulit_run(conn_scan, len(ports), ports)


if __name__ == '__main__':
    main()

使用python-nmap实现端口扫描

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @auther:   18793
# @Date:    2020/11/3 15:48
# @filename: nmap_sample01.py
# @Email:    1879324764@qq.com
# @Software: PyCharm
import sys
import nmap

scan_row = []
input_data = input('Please input hosts and port: ')
scan_row = input_data.split(" ")
if len(scan_row) != 2:
    print("Input errors,example \"192.168.1.0/24 80,443,22\"")
    sys.exit(0)

hosts = scan_row[0]  # 接收用户输入的主机
port = scan_row[1]  # 接收用户输入的端口

try:
    nm = nmap.PortScanner()  # 创建端口扫描对象
except nmap.PortScannerError:
    print('Nmap not found', sys.exc_info()[0])
    sys.exit(0)
except:
    print("Unexpected error:", sys.exc_info()[0])
    sys.exit(0)

try:
    nm.scan(hosts=hosts, arguments=' -v -sS -p ' + port)  # 调用扫描方法,参数指定扫描主机hosts,nmap扫描命令行参数arguments
except Exception as e:
    print("Scan erro:" + str(e))

for host in nm.all_hosts():  # 遍历扫描主机
    print('----------------------------------------------------')
    print('Host : %s (%s)' % (host, nm[host].hostname()))  # 输出主机及主机名
    print('State : %s' % nm[host].state())  # 输出主机状态,如up、down

    for proto in nm[host].all_protocols():  # 遍历扫描协议,如tcp、udp
        print('----------')
        print('Protocol : %s' % proto)  # 输入协议名

        lport = nm[host][proto].keys()  # 获取协议的所有扫描端口
        lport.sort()  # 端口列表排序
        for port in lport:  # 遍历端口及输出端口与状态
            print('port : %s\tstate : %s' % (port, nm[host][proto][port]['state']))

19.3.7. 主机信息监测

多线程版主机监测

#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2020/3/21 10:50
# filename: ping001.py

from __future__ import print_function
import subprocess
import threading


def is_reacheable(ip):
    if subprocess.call(["ping", "-c", "1", ip]):
        print("{0} is alive".format(ip))
    else:
        print("{0} is unreacheable".format(ip))


def main():
    with open('ips.txt') as f:
        lines = f.readlines()
        threads = []
        for line in lines:
            thr = threading.Thread(target=is_reacheable, args=(line,))
            thr.start()
            threads.append(thr)

        for thr in threads:
            thr.join()


if __name__ == '__main__':
    main()

消息队列版主机监测

#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2020/3/21 10:54
# filename: ping002.py
from __future__ import print_function
import subprocess
import threading
from queue import Queue
from queue import Empty


def call_ping(ip):
    if subprocess.call(["ping", "-c", "1", ip]):
        print("{0} is alive".format(ip))
    else:
        print("{0} is unreacheable".format(ip))


def is_reacheable(q):
    try:
        while True:
            ip = q.get_nowait()
            call_ping(ip)
    except Empty:
        pass


def main():
    q = Queue()
    with open('ips.txt') as f:
        for line in f:
            q.put(line)

    threads = []
    for i in range(10):
        thr = threading.Thread(target=is_reacheable, args=(q,))
        thr.start()
        threads.append(thr)

    for thr in threads:
        thr.join()


if __name__ == '__main__':
    main()