Contents
19.3. 业务服务监控详解¶
19.3.1. 文件内容差异对比方法¶
示例1:两个字符串的差异对比¶
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @auther: 18793
# @Date: 2020/10/29 15:08
# @filename: 2.1.py
# @Email: 1879324764@qq.com
# @Software: PyCharm
import difflib
text1 = """text1:
This module provides classes and functions for comparing sequences.
including HTML and context and unified diffs.
difflib document v7.4
add string"""
text1 = text1.splitlines() # 以行进行分割
text2 = """text2:
This module provides classes and functions for Comparing sequences.
including HTML and context and unified diffs.
difflib document v7.5"""
text2 = text2.splitlines()
d = difflib.Differ()
diff = d.compare(text1, text2)
print("\n".join(list(diff)))
本示例采用Differ()类对两个字符串进行比较, 另外difflib的SequenceMatcher()类支持任意类型序列的比较, HtmlDiff()类支持将比较结果输出为HTML格式,
生成美观的对比HTML格式文档
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @auther: 18793
# @Date: 2020/10/29 15:08
# @filename: 2.1.py
# @Email: 1879324764@qq.com
# @Software: PyCharm
import difflib
text1 = """text1:
This module provides classes and functions for comparing sequences.
including HTML and context and unified diffs.
difflib document v7.4
add string"""
text1 = text1.splitlines() # 以行进行分割
text2 = """text2:
This module provides classes and functions for Comparing sequences.
including HTML and context and unified diffs.
difflib document v7.5"""
text2 = text2.splitlines()
# d = difflib.Differ()
# diff = d.compare(text1, text2)
# print("\n".join(list(diff)))
d = difflib.HtmlDiff()
# 创建index.html文件并写入内容
html = d.make_file(text1, text2)
with open("index.html", "w", encoding="utf-8") as f:
f.write(html)
示例2:对比Nginx配置文件差异¶
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @auther: 18793
# @Date: 2020/10/29 15:23
# @filename: 2.2.py
# @Email: 1879324764@qq.com
# @Software: PyCharm
import difflib
import sys
try:
textfile1 = sys.argv[1]
textfile2 = sys.argv[2]
except Exception as e:
print("Error:" + str(e))
print("Usage: simple.py filename1 filename2")
sys.exit()
def readfile(filename):
try:
fileHandle = open(filename, "r", encoding="utf-8")
text = fileHandle.read().splitlines() # 读取后进行分割
fileHandle.close()
return text
except Exception as e:
print("Read file error:", str(e))
sys.exit()
if textfile1 == "" or textfile2 == "":
print("Usage: simple.py filename1 filename2")
sys.exit()
# 获取分割后的字符串
text1_lines = readfile(textfile1)
text2_lines = readfile(textfile2)
d = difflib.HtmlDiff()
html = d.make_file(text1_lines, text2_lines)
with open("index2.html", "w", encoding="utf-8") as f:
f.write(html)
19.3.2. 文件与目录差异对比方法¶
Python的标准库filecmp模块可以实现文件、目录、遍历子目录的差异对比功能。
#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2019/8/17 10:18
# filename: 校验源与备份目录差异.py
import os, sys
import filecmp
import re
import shutil
PWd_path = os.path.abspath(os.path.dirname(__file__))
# 创建一个空列表
holderlist = []
def compareme(dir1, dir2):
"""
递归获取更新项函数
:param dir1: source dir
:param dir2: target dir
:return:
"""
dircomp = filecmp.dircmp(dir1, dir2)
only_in_one = dircomp.left_only # 源目录新文件或目录
diff_in_one = dircomp.diff_files # 不匹配文件,源目录已经发生变化
dirpath = os.path.abspath(dir1) # 获取源目录绝对路径
[holderlist.append(os.path.abspath(os.path.join(dir1, x))) for x in only_in_one]
[holderlist.append(os.path.abspath(os.path.join(dir1, x))) for x in diff_in_one]
if len(dircomp.common_dirs) > 0:
for item in dircomp.common_dirs:
compareme(os.path.abspath(os.path.join(dir1, item)), \
os.path.abspath(os.path.join(dir2, item)))
return holderlist
else:
return holderlist
def main():
# if len(sys.argv) > 2:
# # dir1 = sys.argv[1]
# dir1 = "D:/1.学习知识待整理\DevOps自动化运维/https-github.com-yorkoliu-pyauto-master/第二章/filecmp/dir1"
# # dir2 = sys.argv[2]
# dir2 = "D:/1.学习知识待整理\DevOps自动化运维/https-github.com-yorkoliu-pyauto-master/第二章/filecmp/dir2"
# else:
# print("Usage: ", sys.argv[0], "datadir backupdir")
# sys.exit()
dir1 = PWd_path + "/dir1/"
dir2 = PWd_path + "/dir2/"
source_files = compareme(dir1, dir2)
if not dir2.endswith('/'): dir2 = dir2 + '/'
dir1 = dir1.replace("\\", "/")
dir2 = dir2.replace("\\", "/")
destination_files = []
createdir_bool = False
#
for item in source_files:
item = item.replace("\\", "/")
# print(dir1, dir2, item)
destination_dir = re.sub(dir1, dir2, item)
destination_files.append(destination_dir)
# print(destination_files)
if os.path.isdir(item):
if not os.path.exists(destination_dir):
os.makedirs(destination_dir)
createdir_bool = True
if createdir_bool:
destination_files = []
source_files = []
source_files = compareme(dir1, dir2)
for item in source_files:
destination_dir = re.sub(dir1, dir2, item)
destination_files.append(destination_dir)
print("update item:")
print("---------------------------------------")
print(source_files) # 输出更新列表清单
print("---------------------------------------")
#
copy_pair = zip(source_files, destination_files)
for item in copy_pair:
if os.path.isfile(item[0]):
shutil.copyfile(item[0], item[1])
if __name__ == '__main__':
main()
19.3.3. 发送电子邮件模块smtplib¶
一个发邮件的简单示例¶
代码示例1:
import smtplib
import string
HOST = "smtp.gmail.com"
SUBJECT = "Test email from Python"
TO = "test@qq.com"
FROM = "test@gmail.com"
text = "Python rules them all!"
BODY = string.join((
"From: %s" % FROM,
"To: %s" % TO,
"Subject: %s" % SUBJECT ,
"",
text
), "\r\n")
server = smtplib.SMTP()
server.connect(HOST,"25")
server.starttls()
server.login("test@gmail.com","123456")
server.sendmail(FROM, [TO], BODY)
server.quit()
实现html格式的数据报表邮件¶
代码示例2
#coding: utf-8
import smtplib
from email.mime.text import MIMEText
HOST = "smtp.gmail.com"
SUBJECT = u"官网流量数据报表"
TO = "test@qq.com"
FROM = "test@gmail.com"
msg = MIMEText("""
<table width="800" border="0" cellspacing="0" cellpadding="4">
<tr>
<td bgcolor="#CECFAD" height="20" style="font-size:14px">*官网数据 <a href="monitor.domain.com">更多>></a></td>
</tr>
<tr>
<td bgcolor="#EFEBDE" height="100" style="font-size:13px">
1)日访问量:<font color=red>152433</font> 访问次数:23651 页面浏览量:45123 点击数:545122 数据流量:504Mb<br>
2)状态码信息<br>
500:105 404:3264 503:214<br>
3)访客浏览器信息<br>
IE:50% firefox:10% chrome:30% other:10%<br>
4)页面信息<br>
/index.php 42153<br>
/view.php 21451<br>
/login.php 5112<br>
</td>
</tr>
</table>""","html","utf-8")
msg['Subject'] = SUBJECT
msg['From']=FROM
msg['To']=TO
try:
server = smtplib.SMTP()
server.connect(HOST,"25")
server.starttls()
server.login("test@gmail.com","123456")
server.sendmail(FROM, TO, msg.as_string())
server.quit()
print "邮件发送成功!"
except Exception, e:
print "失败:"+str(e)
实现图文格式的服务器性能报表邮件¶
代码示例3
#coding: utf-8
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.image import MIMEImage
HOST = "smtp.gmail.com"
SUBJECT = u"业务性能数据报表"
TO = "test@qq.com"
FROM = "test@gmail.com"
def addimg(src,imgid):
fp = open(src, 'rb')
msgImage = MIMEImage(fp.read())
fp.close()
msgImage.add_header('Content-ID', imgid)
return msgImage
msg = MIMEMultipart('related')
msgtext = MIMEText("""
<table width="600" border="0" cellspacing="0" cellpadding="4">
<tr bgcolor="#CECFAD" height="20" style="font-size:14px">
<td colspan=2>*官网性能数据 <a href="monitor.domain.com">更多>></a></td>
</tr>
<tr bgcolor="#EFEBDE" height="100" style="font-size:13px">
<td>
<img src="cid:io"></td><td>
<img src="cid:key_hit"></td>
</tr>
<tr bgcolor="#EFEBDE" height="100" style="font-size:13px">
<td>
<img src="cid:men"></td><td>
<img src="cid:swap"></td>
</tr>
</table>""","html","utf-8")
msg.attach(msgtext)
msg.attach(addimg("img/bytes_io.png","io"))
msg.attach(addimg("img/myisam_key_hit.png","key_hit"))
msg.attach(addimg("img/os_mem.png","men"))
msg.attach(addimg("img/os_swap.png","swap"))
msg['Subject'] = SUBJECT
msg['From']=FROM
msg['To']=TO
try:
server = smtplib.SMTP()
server.connect(HOST,"25")
server.starttls()
server.login("test@gmail.com","123456")
server.sendmail(FROM, TO, msg.as_string())
server.quit()
print "邮件发送成功!"
except Exception, e:
print "失败:"+str(e)
实现带附件格式的业务服务质量周报邮件¶
#coding: utf-8
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.image import MIMEImage
HOST = "smtp.gmail.com"
SUBJECT = u"官网业务服务质量周报"
TO = "test@qq.com"
FROM = "test@gmail.com"
def addimg(src,imgid):
fp = open(src, 'rb')
msgImage = MIMEImage(fp.read())
fp.close()
msgImage.add_header('Content-ID', imgid)
return msgImage
msg = MIMEMultipart('related')
msgtext = MIMEText("<font color=red>官网业务周平均延时图表:<br><img src=\"cid:weekly\" border=\"1\"><br>详细内容见附件。</font>","html","utf-8")
msg.attach(msgtext)
msg.attach(addimg("img/weekly.png","weekly"))
attach = MIMEText(open("doc/week_report.xlsx", "rb").read(), "base64", "utf-8")
attach["Content-Type"] = "application/octet-stream"
#attach["Content-Disposition"] = "attachment; filename=\"业务服务质量周报(12周).xlsx\"".decode("utf-8").encode("gb18030")
msg.attach(attach)
msg['Subject'] = SUBJECT
msg['From']=FROM
msg['To']=TO
try:
server = smtplib.SMTP()
server.connect(HOST,"25")
server.starttls()
server.login("test@gmail.com","123456")
server.sendmail(FROM, TO, msg.as_string())
server.quit()
print "邮件发送成功!"
except Exception, e:
print "失败:"+str(e)
19.3.4. 探测web服务质量方法¶
pyurl是一个用C语言写的libcurl Python实现,功能非常强大,支持的协议有FTP、HTTP、HTTPS、TELNET等, 可以理解成Linux下的curl命令功能的Python封装,简单易用。
实现探测Web服务质量¶
# -*- coding: utf-8 -*-
import os,sys
import time
import sys
import pycurl
URL="http://www.google.com.hk"
c = pycurl.Curl()
c.setopt(pycurl.URL, URL)
#连接超时时间,5秒
c.setopt(pycurl.CONNECTTIMEOUT, 5)
#下载超时时间,5秒
c.setopt(pycurl.TIMEOUT, 5)
c.setopt(pycurl.FORBID_REUSE, 1)
c.setopt(pycurl.MAXREDIRS, 1)
c.setopt(pycurl.NOPROGRESS, 1)
c.setopt(pycurl.DNS_CACHE_TIMEOUT,30)
# 创建一个文件对象 以“wb”方式打开,用来存储返回的http头部及页面内容
indexfile = open(os.path.dirname(os.path.realpath(__file__))+"/content.txt", "wb")
c.setopt(pycurl.WRITEHEADER, indexfile)
c.setopt(pycurl.WRITEDATA, indexfile)
try:
c.perform()
except Exception,e:
print "connecion error:"+str(e)
indexfile.close()
c.close()
sys.exit()
NAMELOOKUP_TIME = c.getinfo(c.NAMELOOKUP_TIME)
CONNECT_TIME = c.getinfo(c.CONNECT_TIME)
PRETRANSFER_TIME = c.getinfo(c.PRETRANSFER_TIME)
STARTTRANSFER_TIME = c.getinfo(c.STARTTRANSFER_TIME)
TOTAL_TIME = c.getinfo(c.TOTAL_TIME)
HTTP_CODE = c.getinfo(c.HTTP_CODE)
SIZE_DOWNLOAD = c.getinfo(c.SIZE_DOWNLOAD)
HEADER_SIZE = c.getinfo(c.HEADER_SIZE)
SPEED_DOWNLOAD=c.getinfo(c.SPEED_DOWNLOAD)
print "HTTP状态码:%s" %(HTTP_CODE)
print "DNS解析时间:%.2f ms"%(NAMELOOKUP_TIME*1000)
print "建立连接时间:%.2f ms" %(CONNECT_TIME*1000)
print "准备传输时间:%.2f ms" %(PRETRANSFER_TIME*1000)
print "传输开始时间:%.2f ms" %(STARTTRANSFER_TIME*1000)
print "传输结束总时间:%.2f ms" %(TOTAL_TIME*1000)
print "下载数据包大小:%d bytes/s" %(SIZE_DOWNLOAD)
print "HTTP头部大小:%d byte" %(HEADER_SIZE)
print "平均下载速度:%d bytes/s" %(SPEED_DOWNLOAD)
indexfile.close()
c.close()
19.3.5. Python操作Excel表格¶
简单示例¶
插入文字、数字、图片单元格简单示例 代码示例1
#coding: utf-8
import xlsxwriter
# Create an new Excel file and add a worksheet.
workbook = xlsxwriter.Workbook('demo1.xlsx')
worksheet = workbook.add_worksheet()
# Widen the first column to make the text clearer.
worksheet.set_column('A:A', 20)
# Add a bold format to use to highlight cells.
#bold = workbook.add_format({'bold': True})
bold = workbook.add_format()
bold.set_bold()
# Write some simple text.
worksheet.write('A1', 'Hello')
# Text with formatting.
worksheet.write('A2', 'World', bold)
worksheet.write('B2', u'中文测试', bold)
# Write some numbers, with row/column notation.
worksheet.write(2, 0, 32)
worksheet.write(3, 0, 35.5)
worksheet.write(4, 0, '=SUM(A3:A4)')
# Insert an image.
worksheet.insert_image('B5', 'img/python-logo.png')
workbook.close()
定制自动化业务流量报表周报¶
代码示例
#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2019/8/17 10:34
# filename: python操作Excel方法.py
import xlwt
# 定义数据表头列表
title = ["业务名称", "星期一", "星期二", "星期三", "星期四", "星期五", "星期六", "星期日", "平均流量"]
# 定义5频道一周7天的流量数据
data = [
["业务官网", 150, 152, 158, 149, 155, 145, 148],
["新闻中心", 89, 88, 95, 56, 48, 100, 99],
["购物频道", 200, 201, 222, 234, 180, 179, 190],
["体育频道", 77, 88, 99, 55, 66, 48, 90],
["亲子频道", 81, 82, 83, 84, 85, 86, 87],
]
# for da in data:
# da.append(sum(da[1:])/len(da[1:]))
# 计算平均值
[da.append(sum(da[1:]) / len(da[1:])) for da in data]
book = xlwt.Workbook(encoding="utf-8")
sheet = book.add_sheet("Sheet1")
for h in range(len(title)):
sheet.write(0, h, title[h])
i = 1
for list in data:
j = 0
for data in list:
sheet.write(i, j, data)
j += 1
i += 1
book.save("excel测试.xls")
19.3.6. 实现高效的端口扫描¶
单线程端口扫描示例¶
#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2020/3/21 11:01
# filename: smaple02.py
from __future__ import print_function
import telnetlib
def conn_scan(host, port):
t = telnetlib.Telnet()
try:
t.open(host, port, timeout=1)
print(host, port, 'is avaliable')
except Exception as e:
pass
# print(host, port, 'is not avaliable',e)
finally:
t.close()
def main():
host = '127.0.0.1'
for port in range(80, 1000):
conn_scan(host, port)
if __name__ == '__main__':
main()
代码示例
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @auther: 18793
# @Date: 2020/11/3 15:34
# @filename: sample03.py
# @Email: 1879324764@qq.com
# @Software: PyCharm
import socket
def check_server(address, port=None):
s = socket.socket()
print('Attempting to connect to %s on port %s' % (address, port))
try:
s.connect((address, port))
print('Connected to %s on port %s' % (address, port))
return True
except socket.error as e:
print('Connection to %s on port %s failed: %s' % (address, port, e))
return False
finally:
s.close()
check_server("127.0.0.1",port=443)
多线程端口扫描示例¶
#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2020/3/21 10:58
# filename: tasks.py
from __future__ import print_function
from socket import *
def conn_scan(port):
conn = socket(AF_INET, SOCK_STREAM)
host = "127.0.0.1"
try:
conn.connect((host, port))
print(host, port, 'is available')
except Exception as e:
# print(host, port, 'is not available', e)
pass
finally:
conn.close()
def mulit_run(func, max_workers, args):
"""
多线程执行命令
:param func: 执行函数
:param max_workers: 最多线程数
:param args: 可迭代对象
:return:
"""
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED
executor = ThreadPoolExecutor(max_workers=max_workers)
all_task = [executor.submit(func, i) for i in args]
wait(all_task, return_when=ALL_COMPLETED)
def main():
ports = [p for p in range(60, 1000)]
mulit_run(conn_scan, len(ports), ports)
if __name__ == '__main__':
main()
使用python-nmap实现端口扫描¶
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @auther: 18793
# @Date: 2020/11/3 15:48
# @filename: nmap_sample01.py
# @Email: 1879324764@qq.com
# @Software: PyCharm
import sys
import nmap
scan_row = []
input_data = input('Please input hosts and port: ')
scan_row = input_data.split(" ")
if len(scan_row) != 2:
print("Input errors,example \"192.168.1.0/24 80,443,22\"")
sys.exit(0)
hosts = scan_row[0] # 接收用户输入的主机
port = scan_row[1] # 接收用户输入的端口
try:
nm = nmap.PortScanner() # 创建端口扫描对象
except nmap.PortScannerError:
print('Nmap not found', sys.exc_info()[0])
sys.exit(0)
except:
print("Unexpected error:", sys.exc_info()[0])
sys.exit(0)
try:
nm.scan(hosts=hosts, arguments=' -v -sS -p ' + port) # 调用扫描方法,参数指定扫描主机hosts,nmap扫描命令行参数arguments
except Exception as e:
print("Scan erro:" + str(e))
for host in nm.all_hosts(): # 遍历扫描主机
print('----------------------------------------------------')
print('Host : %s (%s)' % (host, nm[host].hostname())) # 输出主机及主机名
print('State : %s' % nm[host].state()) # 输出主机状态,如up、down
for proto in nm[host].all_protocols(): # 遍历扫描协议,如tcp、udp
print('----------')
print('Protocol : %s' % proto) # 输入协议名
lport = nm[host][proto].keys() # 获取协议的所有扫描端口
lport.sort() # 端口列表排序
for port in lport: # 遍历端口及输出端口与状态
print('port : %s\tstate : %s' % (port, nm[host][proto][port]['state']))
19.3.7. 主机信息监测¶
多线程版主机监测¶
#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2020/3/21 10:50
# filename: ping001.py
from __future__ import print_function
import subprocess
import threading
def is_reacheable(ip):
if subprocess.call(["ping", "-c", "1", ip]):
print("{0} is alive".format(ip))
else:
print("{0} is unreacheable".format(ip))
def main():
with open('ips.txt') as f:
lines = f.readlines()
threads = []
for line in lines:
thr = threading.Thread(target=is_reacheable, args=(line,))
thr.start()
threads.append(thr)
for thr in threads:
thr.join()
if __name__ == '__main__':
main()
消息队列版主机监测¶
#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2020/3/21 10:54
# filename: ping002.py
from __future__ import print_function
import subprocess
import threading
from queue import Queue
from queue import Empty
def call_ping(ip):
if subprocess.call(["ping", "-c", "1", ip]):
print("{0} is alive".format(ip))
else:
print("{0} is unreacheable".format(ip))
def is_reacheable(q):
try:
while True:
ip = q.get_nowait()
call_ping(ip)
except Empty:
pass
def main():
q = Queue()
with open('ips.txt') as f:
for line in f:
q.put(line)
threads = []
for i in range(10):
thr = threading.Thread(target=is_reacheable, args=(q,))
thr.start()
threads.append(thr)
for thr in threads:
thr.join()
if __name__ == '__main__':
main()