Contents
21.22. 案例:爬取纳斯达克股票数据¶
代码示例:
# coding=utf-8
# 代码文件:chapter21/ch21.4.5-end.py
"""项目实战:抓取纳斯达克股票数据"""
import datetime
import hashlib
import logging
import os
import re
import threading
import time
import urllib.request
from bs4 import BeautifulSoup
from db.db_access import insert_hisq_data
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(threadName)s - '
'%(name)s - %(funcName)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
url = 'https://www.nasdaq.com/symbol/aapl/historical#.UWdnJBDMhHk'
def validateUpdate(html):
"""验证数据是否更新,更新返回True,未更新返回False"""
# 创建md5对象
md5obj = hashlib.md5()
md5obj.update(html.encode(encoding='utf-8'))
md5code = md5obj.hexdigest()
old_md5code = ''
f_name = 'md5.txt'
if os.path.exists(f_name): # 如果文件存在读取文件内容
with open(f_name, 'r', encoding='utf-8') as f:
old_md5code = f.read()
if md5code == old_md5code:
logger.info('数据没有更新')
return False
else:
# 把新的md5码写入到文件中
with open(f_name, 'w', encoding='utf-8') as f:
f.write(md5code)
logger.info('数据更新')
return True
# 线程运行标志
isrunning = True
# 爬虫工作间隔
interval = 5
def controlthread_body():
"""控制线程体函数"""
global interval, isrunning
while isrunning:
# 控制爬虫工作计划
i = input('输入Bye终止爬虫,输入数字改变爬虫工作间隔,单位秒:')
logger.info('控制输入{0}'.format(i))
try:
interval = int(i)
except ValueError:
if i.lower() == 'bye':
isrunning = False
def istradtime():
"""判断交易时间"""
now = datetime.datetime.now()
df = '%H%M%S'
strnow = now.strftime(df)
starttime = datetime.time(9, 30).strftime(df)
endtime = datetime.time(15, 30).strftime(df)
if now.weekday() == 5 \
or now.weekday() == 6 \
or (strnow < starttime or strnow > endtime):
# 非工作时间
return False
# 工作时间
return True
def workthread_body():
"""工作线程体函数"""
global interval, isrunning
while isrunning:
if istradtime():
# 交易时间内不工作
logger.info('交易时间,爬虫休眠1小时...')
time.sleep(60 * 60)
continue
logger.info('爬虫开始工作...')
req = urllib.request.Request(url)
with urllib.request.urlopen(req) as response:
data = response.read()
html = data.decode()
sp = BeautifulSoup(html, 'html.parser')
# 返回指定CSS选择器的div标签列表
div = sp.select('div#quotes_content_left_pnlAJAX')
# 从列表中返回第一个元素
divstring = div[0]
if validateUpdate(divstring): # 数据更新
# 分析数据
trlist = sp.select('div#quotes_content_left_pnlAJAX table tbody tr')
data = []
for tr in trlist:
trtext = tr.text.strip('\n\r ')
if trtext == '':
continue
rows = re.split(r'\s+', trtext)
fields = {}
try:
df = '%m/%d/%Y'
fields['Date'] = datetime.datetime.strptime(rows[0], df)
except ValueError:
# 实时数据不分析(只有时间,如10:12)
continue
fields['Open'] = float(rows[1])
fields['High'] = float(rows[2])
fields['Low'] = float(rows[3])
fields['Close'] = float(rows[4])
fields['Volume'] = int(rows[5].replace(',', ''))
data.append(fields)
# 保存数据到数据库
for row in data:
row['Symbol'] = 'AAPL'
# print(row)
insert_hisq_data(row)
# 爬虫休眠
logger.info('爬虫休眠{0}秒...'.format(interval))
time.sleep(interval)
def main():
"""主函数"""
global interval, isrunning
# 创建工作线程对象workthread
workthread = threading.Thread(target=workthread_body, name='WorkThread')
# 启动线程workthread
workthread.start()
# 创建控制线程对象controlthread
controlthread = threading.Thread(target=controlthread_body, name='ControlThread')
# 启动线程controlthread
controlthread.start()
if __name__ == '__main__':
main()