16.1. 进程

进程进程(Process,有时被称为重量级进程)是程序的一次执行。

每个进程都有自己的地址空间、内存、数据栈以及记录运行轨迹的辅助数据,操作系统管理运行的所有进程,并为这些进程公平分配时间。进程可以通过fork和spawn操作完成其他任务。因为各个进程有自己的内存空间、数据栈等,所以只能使用进程间通信(InterProcess Communication, IPC),而不能直接共享信息。

多线程是多个相互关联的线程的组合,多进程是多个互相独立的进程的组合。

线程是最小的执行单元,进程至少由一个线程组成。

16.1.1. multiprocessing模块

multiprocessing模块是跨平台版本的多进程模块。Linux上支持os.fork(),windows上使用multiprocessing
multiprocessing模块提供了一个Process类来代表一个进程对象
#!/usr/bin/env python
# -*- coding:utf8 -*-
# @auther:   18793
# @Date:    2020/6/21 16:39
# @filename: sample01.py
# @Email:    1879324764@qq.com
# @Software: PyCharm
import os
import time
from multiprocessing import Process


def f(x):
    print("子进程id:", os.getpid(), "父进程id:", os.getppid())
    return x * x


if __name__ == '__main__':
    print("主进程id:", os.getpid())
    for i in range(5):      # 这里创建了5个子进程
        p = Process(target=f, args=(i,))
        p.start()
#!/usr/bin/env python
# -*- coding:utf8 -*-
'''
创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动,这样创建进程比fork()还要简单。

join()方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步。

'''


from multiprocessing import Process
import os


# 子进程需要执行的代码
def run_proce(name):
    print("Run child process %s (%s)" % (name, os.getpid()))


if __name__ == '__main__':
    print("Parent process %s." % os.getpid())
    p = Process(target=run_proce, args=("test",))
    print("Child process will start.")
    p.start()
    p.join()
    print("Child process end.")
import os
import multiprocessing


def foo(i):
    # 同样的参数传递方法
    print("这里是 ", multiprocessing.current_process().name)
    print('模块名称:', __name__)
    print('父进程 id:', os.getppid())  # 获取父进程id
    print('当前子进程 id:', os.getpid())  # 获取自己的进程id
    print('------------------------')


if __name__ == '__main__':

    for i in range(5):
        p = multiprocessing.Process(target=foo, args=(i,))
        p.start()

说明

  • 创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动,这样创建进程比fork()还要简单。

  • join()方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步。

join()方法的使用

#!/usr/bin/env python
#-*- coding:utf8 -*-
from multiprocessing import Process
import time
import os

def child_1(n):
    print("子进程({})开始执行,它的父进程是({})".format(os.getpid(),os.getppid()))
    t_start = time.time()
    time.sleep(n)
    t_end = time.time()
    print("子进程({})执行时间为%0.2f秒".format(os.getpid(), t_end-t_start))


def child_2(n):
    print("子进程({})开始执行,它的父进程是({})".format(os.getpid(), os.getppid()))
    t_start = time.time()
    time.sleep(n)
    t_end = time.time()
    print("子进程({})执行时间为%0.2f秒".format(os.getpid(), t_end - t_start))




def main1():
    print("主进程开始")
    print("主进程的PID;{}".format(os.getpid()))
    p1 = Process(target=child_1, args=(1,))
    p2 = Process(target=child_2, args=(2,))
    p1.start()
    p2.start()
    print("p1.is_alive={}".format(p1.is_alive()))
    print("p2.is_alive={}".format(p2.is_alive()))
    print("p1.name={}".format(p1.name))
    print("p1.id={}".format(p1.pid))
    print("p2.name={}".format(p2.name))
    print("p2.id={}".format(p2.pid))
    #使用join函数之后,主进程会等待子进程结束之后才继续往下执行
    p1.join()
    p2.join()
    print("主进程结束..")


if __name__ == '__main__':
    main1()

16.1.2. Process子类创建进程

示例1

#!/usr/bin/env python
#-*- coding:utf8 -*-

from multiprocessing import Process
import time
import os

class SubProcess(Process):
    def __init__(self,interval,name=''):
        super(SubProcess, self).__init__()
        self.interval = interval
        if name:
            self.name = name
    def run(self):
        print("子进程({})开始执行,它的父进程是({})".format(os.getpid(), os.getppid()))
        t_start = time.time()
        time.sleep(self.interval)
        t_end = time.time()
        print("子进程({})执行时间为{:.2f}秒".format(os.getpid(), t_end - t_start))


def main1():
    print("主进程开始")
    print("主进程的PID;{}".format(os.getpid()))
    p1 = SubProcess(interval=1, name="mrsoft")
    p2 = SubProcess(interval=2)
    # 调用start()方法时会自动执行run()方法
    p1.start()
    p2.start()
    print("p1.is_alive={}".format(p1.is_alive()))
    print("p2.is_alive={}".format(p2.is_alive()))
    print("p1.name={}".format(p1.name))
    print("p1.id={}".format(p1.pid))
    print("p2.name={}".format(p2.name))
    print("p2.id={}".format(p2.pid))
    #使用join函数之后,主进程会等待子进程结束之后才继续往下执行
    p1.join()
    p2.join()
    print("主进程结束..")


if __name__ == '__main__':
    main1()

示例2

#!/usr/bin/env python
# -*- coding:utf8 -*-
# @auther:   18793
# @Date:    2020/6/21 16:39
# @filename: sample01.py
# @Email:    1879324764@qq.com
# @Software: PyCharm
import os
import time
from multiprocessing import Process


class Download(Process):
    def __init__(self, interval):
        Process.__init__(self)
        self.interval = interval

    # 重写Process类中的run()方法
    def run(self):
        # 开启这个进程所需执行的代码
        t_start = time.time()
        # time.sleep(3)     # 模拟阻塞的一个实现方式
        print("开启进程:%s进行下载操作" % os.getpid())
        print("子进程(%s)开始执行,父进程为(%s)" % (os.getpid(), os.getppid()))
        time.sleep(self.interval)
        t_stop = time.time()
        print("子进程(%s)执行完毕,耗时(%f)秒" % (os.getpid(), (t_stop - t_start)))


if __name__ == '__main__':
    t_start = time.time()
    print("当前进程(%s)" % os.getpid())
    p = Download(2)
    p.start()
    # p.join(10)        # join 父进程等待子进程执行完毕后立刻执行
    time.sleep(10)      # 模拟阻塞,保证子进程完毕后父进程在执行
    t_stop = time.time()
    print("主进程(%s)执行完毕,耗时(%f)秒" % (os.getpid(), (t_stop - t_start)))

16.1.3. 进程同步

multiprocessing模块提供了三种机制实现进程同步:

multiprocess.Lock、multiprocess.Semaphore、multiprocess.Event。

1.multiprocess.Lock:锁

实现了顺序的执行,程序又重新变成串行了,这样确实会浪费时间,却保证了数据的安全。

#!/usr/bin/env python
# -*- coding:utf8 -*-
# @auther:   18793
# @Date:    2020/6/21 16:59
# @filename: sample01.py
# @Email:    1879324764@qq.com
# @Software: PyCharm
import os
import time
import random

from multiprocessing import Process, Lock


def work(lock, n):
    lock.acquire()  # 上锁
    print("%s: %s is running " % (n, os.getpid()))
    time.sleep(random.random())
    print("%s: %s is done " % (n, os.getpid()))
    lock.release()  # 解锁


if __name__ == '__main__':
    lock = Lock()  # 设置锁
    for i in range(3):
        p = Process(target=work, args=(lock, i))
        p.start()

哪个进程先抢到锁,其他进程只能等待前面进程解锁之后,再次进行抢锁。

2. multiprocess.Semaphore:信号量

互斥锁同时只允许一个线程更改数据,而信号量Semaphore是同时允许一定数量的线程更改数据。信号量同步基于内部计数器,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1;当计数器为0时,acquire()调用被阻塞。这是Dijkstra信号量概念P()和V()的Python实现。

信号量与进程池的概念很像,但是要区分开,信号量涉及加锁的概念

信号量同步机制适用于访问像服务器这样的有限资源。

#!/usr/bin/env python
# -*- coding:utf8 -*-
# @auther:   18793
# @Date:    2020/6/21 16:59
# @filename: sample01.py
# @Email:    1879324764@qq.com
# @Software: PyCharm
import os
import time
import random

from multiprocessing import Process, Semaphore


def go_ktv(sem, user):
    sem.acquire()  # 上锁
    print("%s 占坑" % user)
    time.sleep(random.randint(0, 3))  # 模拟每个人在ktv中待的时间不同
    # time.sleep(2)
    sem.release()  # 释放锁


if __name__ == '__main__':
    sem = Semaphore(2)  # 设置4个信号量
    p1 = []
    for i in range(13):
        p = Process(target=go_ktv, args=(sem, "user%s" % i,))
        p.start()
        p1.append(p)

    for i in p1:
        i.join()
    print("================================>")

3. multiprocess.Event

事件Python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法

set、wait、clear。事件处理的机制:

全局定义了一个“Flag”,如果“Flag”值为False,那么当程序执行event.wait方法时就会阻塞,如果“Flag”值为True,那么执行event.wait方法时便不再阻塞。

clear:将“Flag”设置为False。
set:将“Flag”设置为True。
#!/usr/bin/env python
# -*- coding:utf8 -*-
# @auther:   18793
# @Date:    2020/6/21 17:15
# @filename: sample03.py
# @Email:    1879324764@qq.com
# @Software: PyCharm
from multiprocessing import Process, Event
import time, random


def car(e, n):
    while True:
        if not e.is_set():  # 进程刚开启,is_set()的值是False,模拟信号灯为红色
            print("\033[31m 红灯亮\033[0m. car%s 等着" % n)
            e.wait()  # 阻塞,等待is_set()的值变成True,模拟信号灯为绿色
            print("\033[32m 车%s 绿灯亮了\033[0m" % n)
            time.sleep(random.randint(3, 6))
            if not e.is_set():  # 如果is_set()的值是False,也就是红灯,仍然回到while语句开始
                continue
            print("飘过 --~~~~,car", n)
            break


def police_car(e, n):
    while True:
        if not e.is_set():  # 进程刚开启,is_set()的值是False,模拟信号灯为红色
            print("\033[31m 红灯亮\033[0m. car%s 等着" % n)
            e.wait(0.1)  # 阻塞,等待设置等待时间,等待0.1s之后没有等到绿地就闯红灯走了
            if not e.is_set():
                print("\033[33m红灯,警车飞过\033[0m,car %s" % n)
            else:
                print("\033[33m;46m绿灯,警车正常通过\033[0m,car %s" % n)

        break


def traffic_lights(e, inverval):
    while True:
        time.sleep(inverval)
        if e.is_set():
            print("######", e.is_set())
            e.clear()  # 将is_set()的值设置为Fasle
        else:
            e.set()  # 将is_set()的值设置为True
            print("######", e.is_set())


if __name__ == '__main__':
    e = Event()
    for i in range(10):
        p = Process(target=car, args=(e, i,))
        p.start()

    for i in range(5):
        p = Process(target=police_car, args=(e, i,))
        p.start()

    t = Process(target=traffic_lights, args=(e, 10))
    t.start()
    print("========================================>")

16.1.4. 使用进程池Pool创建进程

from multiprocessing import Pool
import os
import time


def task(name):
    print("子进程 ({})执行的任务是 ({})".format(os.getpid(),name))
    time.sleep(1)

if __name__ == '__main__':
    print("父进程 ({})开始执行".format(os.getpid()))
    p = Pool(4)
    for i in range(10):
        p.apply_async(task, args=(i,))

    p.close()
    p.join()
    print("所有子进程结束.....")

进程池扫描主机端口实例

#!/usr/bin/env python
# -*- coding:utf8 -*-
# @auther:   18793
# @Date:    2020/6/22 11:21
# @filename: Process_Pool.py
# @Email:    1879324764@qq.com
# @Software: PyCharm

"""
进程池扫描主机端口实例
代码4-4利用单进程扫描主机端口,如果要扫描的端口范围比较大,则需要耗费比较长的时间。
利用多个进程同时扫描不同的端口范围,可以缩短程序运行时间。
进程池技术可以一次创建多个子进程,适合于子进程数量事先预知的情况。
代码5-3利用进程池一次创建16个进程,然后利用这些进程扫描主机所有端口(0~65535),
每个进程扫描4096个端口。

"""
from multiprocessing import Pool
import os
import socket


def scan_port(ports):
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.settimeout(1)
    for port in range(ports, ports + 4096):
        result = s.connect_ex((ip, port))
        if result == 0:
            print("I am process %d,port %d is openned!" % (os.getpid(), port))
    s.close()


ip = "127.0.0.1"
p = Pool(16)

for k in range(16):
    p.apply_async(scan_port, args=(k * 4096,))
p.close()
p.join()
print("All subprocesses had finished!")

16.1.5. 进程间通信

· Python提供了多种进程间通信的方式,例如Queue、Pipe、Value+Array等

Queue和Pipe的区别在于

·Pipe常用来在两个进程间通信

·Queue用来在多个进程间实现通信。

Queue多进程队列的使用

  • Queue模块可以用来进行线程间的通信,让各个线程之间共享数据。

  • Python的Queue模块提供了同步、线程安全的队列类, 包括FIFO(先入先出)队列Queue、LIFO(后入先出)队列LifoQueue和优先级队列PriorityQueue。 这些队列都实现了锁原语,能够在多线程中直接使用。可以使用队列实现线程间的同步。

../../_images/queue.PNG
#!/usr/bin/env python
#-*- coding:utf8 -*-
'''

Queue模块可以用来进行线程间的通信,让各个线程之间共享数据。
Python的Queue模块提供了同步、线程安全的队列类,

包括FIFO(先入先出)队列Queue、LIFO(后入先出)队列LifoQueue和优先级队列PriorityQueue。
这些队列都实现了锁原语,能够在多线程中直接使用。可以使用队列实现线程间的同步。

'''
from multiprocessing import Queue

if __name__ == '__main__':
    q = Queue(3)    # 设置队列的大小
    q.put("消息1")
    q.put("消息2")
    print("队列是否已满:{}".format(q.full()))
    q.put("消息3")
    print("队列是否已满:{}".format(q.full()))

    # try:
    #     q.put("消息4",block=True,timeout=2)     # 添加消息队列时候等待2s
    # except:
    #     print("消息队列已满,现有消息数量为{}".format(q.qsize()))
    try:
        q.put_nowait("消息4")     #添加消息队列不需要等待
    except:
        print("消息队列已满,现有消息数量为{}".format(q.qsize()))

    if not q.empty():
        print("从队列中取消息".center(100, "*"))
        for i in range(q.qsize()):
            print(q.get_nowait())

    if not q.full():
        q.put("消息4")
        print(q.qsize())

代码示例1

#!/usr/bin/env python
# -*- coding:utf8 -*-

from multiprocessing import Process, Queue


def f(test):
    test.put("22")


if __name__ == '__main__':
    q = Queue()  # 父进程
    q.put("11")

    p = Process(target=f, args=(q,))  # 子进程
    p.start()
    p.join()

    print("取到: ", q.get_nowait())
    print("取到: ", q.get_nowait())

代码示例2

#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2020/2/26 10:55
# filename: 进程间通信01.py
from multiprocessing import Process, Queue
import os, time, random


# 写数据进程执行的代码
def proc_write(q, urls):
    print('Process(%s) is writing...' % os.getpid())
    for url in urls:
        q.put(url)
        print('Put %s to queue...' % url)
        time.sleep(random.random())


# 读进程执行的代码
def proc_read(q):
    print('Process(%s) is reading...' % os.getpid())
    while True:
        url = q.get(True)
        print('Get %s from queue.' % url)


if __name__ == '__main__':
    # 父进程创建Queue,并传给各个子进程
    q = Queue()
    proc_writer1 = Process(target=proc_write, args=(q, ['url1', 'url2', 'url3']))
    proc_writer2 = Process(target=proc_write, args=(q, ['url4', 'url5', 'url6']))
    proc_reader = Process(target=proc_read, args=(q,))
    # 启动子进程proc_writeer 写入
    proc_writer1.start()
    proc_writer2.start()
    # 启动子进程proc_reader,读取
    proc_reader.start()
    # 等待子进程proc_writer结束
    proc_writer1.join()
    proc_writer2.join()
    # proc_reader进程里是死循环,无法等待其结束,要强行终止
    proc_reader.terminate()

"""
Process(10608) is writing...
Put url1 to queue...
Process(7808) is writing...
Put url4 to queue...
Process(13840) is reading...
Get url1 from queue.
Get url4 from queue.
Put url5 to queue...
Get url5 from queue.
Put url2 to queue...
Get url2 from queue.
Put url6 to queue...
Get url6 from queue.
Put url3 to queue...
Get url3 from queue.
"""

代码示例3

#!/usr/bin/env python
#-*- coding:utf8 -*-
from multiprocessing import Process,Queue
import time

'''
2个子进程在队列中进行写入和读取数据,实现进程之间的通信
'''

def write(q):
    if not q.full():
        for i in range(5):
            message = "消息" + str(i)
            q.put(message)
            print("写入:{}".format(message))

def read(q):
    time.sleep(1)
    while not q.empty():
        print("读取:{}".format(q.get(True,2)))



if __name__ == '__main__':
    print("主进程开始".center(100,"*"))
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    pw.start()
    pr.start()
    pw.join()
    pr.join()
    print("主进程结束".center(100,"*"))

Pipe常用来在两个进程间进行通信,

两个进程分别位于管道的两端。 Pipe方法返回(conn1,conn2)代表一个管道的两个端。

Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。

若duplex为False,conn1只负责接收消息,conn2只负责发送消息。 sendrecv方法分别是发送和接收消息的方法。 例如,在全双工模式下,可以调用conn1.send发送消息conn1.recv接收消息

如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError。

#!/usr/bin/env python
# -*- coding:utf8 -*-
# auther; 18793
# Date:2020/2/26 11:08
# filename: sample01.py
import multiprocessing
import random
import time, os


def proc_send(pipe, urls):
    for url in urls:
        print("Process(%s) send :%s" % (os.getpid(), url))
        pipe.send(url)
        time.sleep(random.random())


def proc_recv(pipe):
    while True:
        print("Process(%s) rev:%s" % (os.getpid(), pipe.recv()))
        time.sleep(random.random())


if __name__ == '__main__':
    pipe = multiprocessing.Pipe()
    p1 = multiprocessing.Process(target=proc_send, args=(pipe[0], ["url_" + str(i) for i in range(10)]))
    p2 = multiprocessing.Process(target=proc_recv, args=(pipe[1],))
    p1.start()
    p2.start()
    p1.join()
    p1.join()
    p2.terminate()

"""
Process(17008) send :url_0
Process(13264) rev:url_0
Process(17008) send :url_1
Process(17008) send :url_2
Process(13264) rev:url_1
Process(17008) send :url_3
Process(17008) send :url_4
Process(13264) rev:url_2
Process(17008) send :url_5
Process(13264) rev:url_3
Process(17008) send :url_6
Process(17008) send :url_7
Process(13264) rev:url_4
Process(13264) rev:url_5
Process(17008) send :url_8
Process(17008) send :url_9
Process(13264) rev:url_6
"""