81 Python并发编程之多进程

共计 23267 个字符,预计需要花费 59 分钟才能阅读完成。

引入

在进入多进程的学习之前, 一定需要先了解一个应用程序是如何开启一个进程的, 以及操作系统对进程是如何进行分配资源的, 进程、线程、进程池、进程三态、同步、异步、并发、并行、串行 的概念也要非常的明确, 下面将介绍 Python 并发编程之多进程

一.multiprocessing 模块介紹

1. 什么是 multiprocessing 模块

  • multiprocess 模块是 Python 中的多进程管理模块

2.multiprocessing 模块简介

  • python 中的多线程无法利用多核优势,如果想要充分地使用多核 CPU 的资源(os.cpu_count()查看),在 python 中大部分情况需要使用多进程, Python 提供了 multiprocessing

3.multiprocessing 模块的作用

  • multiprocessing 模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块 threading 的编程接口类似
  • multiprocessing 模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了 Process、Queue、Pipe、Lock 等组件

ps : 值得注意的是 : 与线程不同,进程没有任何共享状态,多个进程的内存空间相互物理隔离, 进程修改的数据,改动仅限于该进程内

二.Process 类介绍

multiprocessing 模块提供了 Process 类,该类可用来在 Windows 平台上创建新进程

使用 Process 类创建实例化对象,其本质是调用该类的构造方法创建新进程

Process([group [, target [, name [, args [, kwargs]]]]])  # 实际上是调用了下面的构造方法
def __init__(self,group=None,target=None,name=None,args=(),kwargs={})

值得注意的是 :

​ 参数的指定需要使用关键字的方式

args 指定的值是为 target 指定的函数的位置参数, 并且是一个元组形式, 一个值必须带逗号

  • 参数含义 :

参数名 说明
group 该参数未进行实现,不需要传参
target 为新建进程指定执行任务,也就是指定一个函数
name 为新建进程设置名称
args 为 target 参数指定的参数传递非关键字参数
kwargs 为 target 参数指定的参数传递关键字参数
  • 常用方法

方法 作用
run() 第 2 种创建进程的方式需要用到,继承类中需要对方法进行重写,该方法中包含的是新进程要执行的代码
start() 和启动子线程一样,新创建的进程也需要手动启动,该方法的功能就是启动新创建的线程
join([timeout]) 主线程等待子进程终止(强调:是主线程处于等的状态,而 p 是处于运行的状态),timeout 是可选的超时时间,需要强调的是,p.join 只能 join 住 start 开启的进程,而不能 join 住 run 开启的进程
is_alive() 判断当前进程是否还活着
terminate() 中断该进程
  • 常用属性

属性 作用
name 可以为该进程重命名,也可以获得该进程的名称。
daemon 和守护线程类似,通过设置该属性为 True,可将新建进程设置为“守护进程”
pid 返回进程的 ID 号。大多数操作系统都会为每个进程配备唯一的 ID 号
exitcode 进程在运行时为 None、如果为–N,表示被信号 N 结束(了解即可)
authkey 进程的身份验证键, 默认是由 os.urandom()随机生成的 32 字符的字符串。这个键的用途是为涉及网

三.Process 类创建子进程的两种方式

0.Process 类使用的注意点

WindowsProcess() 必须放在 if __name__ == '__main__': 之下

  • 这是 Windows 上多进程的实现问题, 在 Windows 上,子进程会自动 import 启动它的这个文件,而在 import 的时候是会执行这些语句的。如果你这么写的话就会无限递归创建子进程报错, 所以必须把创建子进程的部分用那个 if 判断保护起来,import 的时候如果不是当前执行文件就不会执行 Process, 也就不会无限递归了 (Linux 上没有这个问题)

ps : fork 是 OS 提供的方法 os.fork(), 该方法可以在当前程序中再创建出一个进程, 但是在 Windows 平台上无效, 只在 Linux, UNIX, Mac OSX 上有效

1. 开启子进程方式一

  • 直接创建 Process 类的实例对象,由此就可以创建一个新的进程
from multiprocessing import Process
import time,os
def test(n):
    print(f" 父进程{os.getppid()}, 紫禁城{os.getpid()}")
    time.sleep(n)
    print(f" 父进程{os.getppid()}, 紫禁城{os.getpid()}")
if __name__ == '__main__':
    p = Process(target=test,args=(2,))
    p.start()  # 做发起系统调用的活
    print(f" 当前执行文件{os.getpid()}")
'''
当前执行文件 16860
父进程 16860, 紫禁城 6404
父进程 16860, 紫禁城 6404
'''

2. 开启子进程方式二

  • 通过继承 Process 类的子类,创建实例对象,也可以创建新的进程
  • 继承 Process 类的子类需重写父类的 run() 方法
from multiprocessing import Process
import time,os
class MyProcess(Process):
    def __init__(self,n):
        super().__init__()
        self.n = n
    def run(self) -> None:
        print(f" 父进程{os.getppid()}, 紫禁城{self.pid}")
        time.sleep(self.n)
        print(f" 父进程{os.getppid()}, 紫禁城{os.getpid()}")
if __name__ == '__main__':
    p = MyProcess(2)
    p.start()
    print(f" 当前执行文件{os.getpid()}")
'''
当前执行文件 8136
父进程 8136, 紫禁城 1280
父进程 8136, 紫禁城 1280
'''

四. 验证进程的内存空间是相互隔离的

from multiprocessing import Process
import time
x = 222
def test():
    global x
    x = 111
if __name__ == '__main__':
    p = Process(target=test)
    p.start()     # 发送系统调用
    time.sleep(1) # 等待子进程运行完
    print(x)      # 222 (还是原来的)

子进程 test 函数中声明全局变量 x, 并修改 x 的值, 等待子进程运行完毕, 最后打印 x , 发现 x 的值并没有改变

五.Process 对象的 join 方法

  • 让父进程等待子进程的终止, 父进程在等, 子进程在运行
from multiprocessing import Process
x = 222
def test():
    global x
    x = 111
if __name__ == '__main__':
    p = Process(target=test)
    p.start()   # 发送系统调用
    p.join()    # 等待子进程运行完(之前我们使用 sleep 并不能精确的知道子进程结束运行的时间)
    print(x)    # 222 (还是原来的)
  • 参数 timeout 是可选的超时间, 等多久就不等了
from multiprocessing import Process
x = 222
def test():
    global x
    x = 111
if __name__ == '__main__':
    p = Process(target=test)
    p.start()        # 发送系统调用
    p.join(0.001)    # 等待 0.001 秒就不等了
  • 注意点 : start() 只是发起系统调用, 并不是运行子进程, 当 start() 执行完后紧接着就执行后面的代码
  • start() 发起调用之后, 是通知操作系统创建一个子进程, 操作系统需要申请一个内存空间, 将父进程的数据复制一份到子进程的内存空间中作为初始化用 (Linux 是将父进程的数据原原本本的复制一份, 而 Windows 稍有些不同), 然后子进程才运行起来
import time,os
def test(n):
    time.sleep(n)
    print(f" 父进程{os.getppid()} 子进程{os.getpid()}")
if __name__ == '__main__':
    p1 = Process(target=test,args=(3,))
    p2 = Process(target=test,args=(2,))
    p3 = Process(target=test,args=(1,))
    p1.start()  # 用时 3 秒
    p2.start()  # 用时 2 秒
    p3.start()  # 用时 1 秒
    start_time = time.time()
    p1.join()
    p2.join()
    p3.join()   # 三个进程都在并发的运行, 主进程一共运行 3 秒多
    stop_time = time.time()
    print(f'主进程{os.getpid()} 用时{stop_time-start_time}')
'''
父进程 10888 子进程 6792
父进程 10888 子进程 13368
父进程 10888 子进程 14800
主进程 10888 用时 3.131737470626831
'''

六. Process 对象其他常用方法介绍

1.terminate() : 关闭进程

2.is_alive() : 查看进程是否存活

from multiprocessing import Process
import time
def test():
    time.sleep(1)
if __name__ == '__main__':
    p = Process(target=test,args=(2,))
    p.start()
    p.terminate()  # 只是发起系统调用, 通知操作系统关闭子进程
    print(p.is_alive())  # True

由上面可知 terminate() 只是发起系统调用, 并不是立即关闭子进程, 操作系统关闭子进程回收资源也要一小会, 我们可以使用 sleep 简单延时

from multiprocessing import Process
import time
def test():
    time.sleep(1)
if __name__ == '__main__':
    p = Process(target=test,args=(2,))
    p.start()
    p.terminate()   # 只是发起系统调用, 通知操作系统关闭子进程
    time.sleep(0.1) # 稍微延时一点
    print(p.is_alive())  # False

3.name : 为新建进程设置名字

4.pid : 进程号

from multiprocessing import Process
import time,os
class MyProcess(Process):
    def __init__(self,n,name):
        super().__init__()
        self.n = n
        self.name = name
    def run(self) -> None:
        time.sleep(self.n)
        print(f" 子进程 pid:{self.pid}")    # 子进程 pid:14156
        print(f" 子进程模块名:{__name__}")  # 子进程模块名:__mp_main__
        print(f" 子进程名:{self.name}")    # 子进程名:aaaa
if __name__ == '__main__':
    p = MyProcess(1,"aaaa")
    p.start()
    p.join()
    print(f" 打印子进程 pid:{p.pid}")       # 打印子进程 pid:14156
    print(f" 打印主进程 pid:{os.getpid()}") # 打印主进程 pid:16340
    print(f" 子进程名:{p.name}")           # 子进程名:aaaa
    print(f" 主进程模块名:{__name__}")      # 主进程模块名:__main__

__name__ : Python 中每个模块都有自己的名字, __name__是一个系统变量, 是模块的标识符, 值是模块的名称, 并且在自身模块中:__name__的值等于__mian__

七. 孤儿进程

1. 什么是孤儿进程

  • 当一个父进程创建了多个子进程, 子进程再创建子子进程等等
  • 父进程因正常 运行完毕 或其他情况 被干掉 的时候, 它的子进程就变成了 孤儿进程
  • 为了避免孤儿进程完成任务后没有父亲通知操作系统回收资源
  • 于是 PID 为 “1” 的顶级进程 systemd 就接手了这个孤儿进程
  • systemd 相当于一个孤儿院, 但凡是孤儿进程都会成为它的子进程

2. 孤儿进程演示

  • 先在一个虚拟终端里开启一个 Bash 进程, 把他当做父进程
  • 紧接着开启一个 “sleep 1000 &” 进程, 把它当做子进程
  • 然后在另一个虚拟终端查看这两个进程信息

81 Python 并发编程之多进程

  • 再杀掉 sleep 的父进程 Bash 看看结果如何

81 Python 并发编程之多进程

  • 图示

81 Python 并发编程之多进程

八. 僵尸进程

1. 什么是僵尸进程

  • 这是 Linux 出于 好心 的设计
  • 一个父进程开启了一堆子进程, 当 子进程 比父进程 先运行完(死掉)
  • 操作系统会 释放 子进程占用的 重型资源(内存空间, CPU 资源, 打开的文件)
  • 但会 保留 子进程的 关键信息(PID, 退出状态, 运行时间等)
  • 目的是为了让父进程能随时 查看 自己的 子进程信息(不管该子进程有没有死掉)
  • 这种已经死掉的子进程都会进入僵尸状态, ”僵尸进程’‘ 是 Linux 系统的一种数据结构

ps : 任何正常结束的子进程都会进入到僵尸状态, 而被强制终止的进程的所有信息将会被清除

2. 僵尸进程回收 —- 概念

  • 操作系统保留子进程信息供父进程查看
  • 当父进程觉得 不再需要 查看的时候, 会向操作系统发送一个 wait / waitpid 系统调用
  • 于是操作系统 再次清理 僵尸进程的残余信息

3. 僵尸进程回收 —- 实际

  • 优秀的开源软件
这些软件在开启子进程时, 父进程内部会及时调用 "wait" / "waitpid" 通知操作系统来回收僵尸进程
  • 水平良好的开发者
功底深厚, 知道父进程要对子进程负责
会在父进程内部考虑到调用 "wait" / "waitpid" 通知操作系统回收僵尸进程
但是发起系统调用时间可能慢了一点
于是我们就可以使用 "ps aux | grep [z]+" 命令查看到僵尸进程
  • 水平非常低的开发者
技术半吊子, 只知道开子进程, 父进程也不结束, 并在那一直开子进程, 不知道什么是僵尸进程
系统调用 "wait" / "waitpid" 也没有听说过
于是计算机会堆积许多的僵尸进程, 占用着大量的 "pid",(每启动一个进程就会分配一个 "pid 号 ")
计算机进入一个奇怪的现象: 内存够用, 硬盘充足,CPU 空闲, 但新的程序无法启动
这就是因为 "PID" 不够用了

4. 如何清理僵尸进程

  • 针对良好的开发者
我们可以手动发信号给父进程: "# kill -CHLD [父进程的 PID]"
通知父进程快点向操作系统发起系统调用 "wait" / "waitpid" 来清理变成僵尸的儿子们
  • 针对半吊子水平的开发者
这种情况子下, 我们只能将父进程终结, 因为你发给它的信号不会得到回应
父进程被杀死," 僵尸进程 " 将会变成 " 僵尸孤儿进程 "
但凡是 " 孤儿进程 " 都会被 Linux 系统中 "PID" 为 "1" 的顶级进程 "systemd" 回收
"systemd" 会发起系统调用 "wait" / "waitpid" 来通知操作系统清理僵尸进程
# Centos7 的顶级进程为 systemd
# Centos6 的顶级进程为 init

5. 使用 Process 类制造僵尸进程

原本 multiprocessing 模块在你发起系统调用 start() 开启子进程的时候会自动检测当前状态下是否存在僵尸进程, 并将其回收, join() 调用也是一样, 我们可以查看这两个调用的源码进行查看 :

81 Python 并发编程之多进程

81 Python 并发编程之多进程

  • 我们可以让父进程创建子进程后暂停在原地什么事情都不做, 于是 multiprocessing 模块的底层机制都没有运行, 也就没法清除运行完毕并变成僵尸态的子进程, 下面再 Linux 上进行演示 :
# coding:utf-8
from multiprocessing import Process
import os,time
def task():
    print(" 子进程:%s"%os.getpid())
    time.sleep(4)  # 子进程 4 秒后结束变成僵尸进程
if __name__ == "__main__":
    for i in range(400):
        print(" 父进程:%s"%os.getpid())
        p = Process(target=task)
        p.start()
    time.sleep(100000)  # 让父进程停在原地什么也不做

使用 top 命令查看系统状态信息, 可以发现已经出现了 400 个僵尸进程

81 Python 并发编程之多进程

我们可以通过 kill 刚运行的 py 文件将这些僵尸进程变成孤儿进程, 从而被 systemd 接管, systemd 再发起系统调用将其清除

九. 守护进程

1. 什么是守护进程

由主进程创建, 并会随着主进程的结束而结束

2. 守护进程的生命周期

  • 进程之间是相互独立的, 守护进程会在主进程 代码执行结束 后就终止

  • 守护进程内无法再次开启子进程, 否则会抛出异常 : AssertionError: daemonic processes are not allowed to have children

from multiprocessing import Process
import os,time
class MyProcess(Process):
    def __init__(self,n):
        super().__init__()
        self.n = n
    def run(self) -> None:
        print(f'子进程:{os.getpid()}开始')
        time.sleep(2)
        print(f" 子进程:{os.getpid()}结束 ")
if __name__ == '__main__':
    p = MyProcess(2)
    p.daemon = True  # 需要在 strat() 之前设置
    p.start()
    print(f" 主进程:{os.getpid()}结束 ")  
    # 在当前主进程的代码已经运行完毕, 守护进程就会终止, 甚至守护进程还没来的急启动
''' 输出
主进程:16924 结束
'''

我们使用 sleep 让主进程简单延时一下好让子进程启动起来

from multiprocessing import Process
import os,time
class MyProcess(Process):
    def __init__(self,n):
        super().__init__()
        self.n = n
    def run(self) -> None:
        print(f'子进程:{os.getpid()}开始')
        time.sleep(2)
        print(f" 子进程:{os.getpid()}结束 ")
if __name__ == '__main__':
    p = MyProcess(2)
    p.daemon = True
    p.start()
    time.sleep(1)  # 延时一秒, 足够操作系统将子进程开起来
    print(f" 主进程:{os.getpid()}结束 ")
''' 输出
子进程:8620 开始
主进程:10480 结束
'''

再次强调, 守护进程是在主进程的代码执行完毕终止

from multiprocessing import Process
import os,time
def Foo():
    print(f"Foo:{os.getpid()}-->111")
    time.sleep(1)
    print(f"Foo--->222")
def Bar():
    print(f"Bar:{os.getpid()}-->333")
    time.sleep(2)
    print(f"Bar--->444")
if __name__ == '__main__':
    p1 = Process(target=Foo)
    p2 = Process(target=Bar)
    p1.daemon = True  # 将 p1 设置守护进程
    p1.start()
    p2.start()
    print("------>end")
# 当运行到这一行的时候主进程代码已经运行完了, 那么守护进程也已经终止了, 与主进程在等着 p2 运行无关, 这时操作系统还没来的急启动 p1 这个子进程
''' 输出
------>end
Bar:18124-->333
Bar--->444
'''

十. 进程同步锁(互斥锁 / 排它锁)

上面我们实现了进程的并发, 进程之间的数据是不共享的, 但是他们可以共享同一个文件(硬盘空间), 或者是同一个打印空间, 然而在共享的同时也带来了问题 : 进程的运行不是同时进行的, 它们没有先后顺序, 一旦开启也不受我们的限制, 当多个进程使用同一份数据资源时, 就会引发数据安全或者数据混乱问题

1. 什么是互斥锁

我们打个简单的比方, 公司里的一台打印机, 每个人都可以使用, 但同事只能有一个人在使用, 不然就会造成打印错乱; 又比如合租房的卫生间, 合住的同伴都可以使用卫生间, 但每次只能一个人进去, 进去之后门就锁上了(相当于加锁 Lock().acquire()), 出来之后开门, 其他人又可以使用卫生间了(相当于解锁Lock().release())

  • 模拟多个用户共同使用同一份文件(抢票)

🍓余票文件 "aaa.json"
{"count": 1}  # 剩一张票
🍓模拟多个人抢票
# coding:utf-8
from multiprocessing import Process
import os,time,json
def check():           # 先查票
    time.sleep(1)      # 模拟网络延迟
    with open("aaa.json")as f:
        dic = json.load(f)
        print(f" 剩余票数 : {dic['count']}")
def get():             # 查完之后开始抢
    time.sleep(1)      # 模拟网络延迟
    with open("aaa.json")as f:
        dic = json.load(f)
    if dic["count"] >0:
        dic["count"] -= 1
        time.sleep(1)  # 模拟网络延迟
        with open("aaa.json","w")as f2:  # 抢完之后修改数据并提交到服务端
            json.dump(dic,f2)
        print(f" 用户 : {os.getpid()} 抢票成功 ")
    else:
        print(f" 用户 : {os.getpid()} 抢票失败 ")
def run():
    check()
    time.sleep(1)      # 模拟网络延迟
    get()
if __name__ == "__main__":
    for i in range(4):
        p = Process(target=run)
        p.start()
''' 输出
剩余票数 : 1
剩余票数 : 1
剩余票数 : 1
剩余票数 : 1
用户 : 13116 抢票成功
用户 : 2364 抢票成功
用户 : 1796 抢票成功
用户 : 6228 抢票成功
'''

打印的结果发现只有一张票, 但是四个人都抢成功了, 这就非常不合理, 造成了数据混乱

  • 加入锁进行抢票

# coding:utf-8
from multiprocessing import Process,Lock
import os,time,json
def check():           # 先查票
    time.sleep(1)      # 模拟网络延迟
    with open("aaa.json")as f:
        dic = json.load(f)
        print(f" 剩余票数 : {dic['count']}")
def get():             # 查完之后开始抢
    time.sleep(1)      # 模拟网络延迟
    with open("aaa.json")as f:
        dic = json.load(f)
    if dic["count"] >0:
        dic["count"] -= 1
        time.sleep(1)  # 模拟网络延迟
        with open("aaa.json","w")as f2:  # 抢完之后修改数据并提交到服务端
            json.dump(dic,f2)
        print(f" 用户 : {os.getpid()} 抢票成功 ")
    else:
        print(f" 用户 : {os.getpid()} 抢票失败 ")
def run(lock):
    check()
    time.sleep(1)      # 模拟网络延迟
    lock.acquire()     # 在抢票环节加锁
    get()
    lock.release()     # 抢完后解锁
if __name__ == "__main__":
    lock = Lock()
    for i in range(4):
        p = Process(target=run,args=(lock,))
        p.start()
''' 输出
剩余票数 : 1
剩余票数 : 1
剩余票数 : 1
剩余票数 : 1
用户 : 432 抢票成功
用户 : 2636 抢票失败
用户 : 7772 抢票失败
用户 : 1272 抢票失败
'''

加锁之后, 一张票只有一个人能抢成功, 其实就是让抢票这个局部环节变成了 串行 , 谁抢到了就谁用, 牺牲了效率, 提升了数据安全性

2. 总结

  • 以上加锁的操作方法可以保证多个进程修改同一份数据时保证数据的安全性, 即串行的修改, 但是也带来了一些问题 :
1、共享的数据基于文件, 文件又属于硬盘, 效率就比较低
2、需要自己加锁和解锁操作, 这是一件非常危险的操作, 如果忘记解锁程序就停在原地
  • 因此我们就需要一种兼顾效率以及能自动帮我们处理锁的问题的这种介质
🍑需求
1、多个进程共享同一块内存数据, 实现高效率
2、找到一个能帮我们处理好锁的问题的机制 : multiprocessing 模块为我们提供了 IPC 通信机制: 管道和队列
🍑介质
1、管道和队列, 基于内存中的空间存放数据
2、队列是基于管道和锁实现的, 可以让我们从复杂的锁问题中解脱出来

ps : 我们应该尽量避免使用共享数据, (比如一个文件的传递应该将文件保存到硬盘, 在管道中放的应该是一个路径, 而不应该是一个完整的文件), 尽可能使用消息传递和队列, 避免处理复杂 的同步和锁问题, 而且在进程数目增多时, 往往可以获得更好的可扩展性

十一. 进程间通信 (IPC)

进程间通信机制简称 IPC (Inter Process Communication)

进程间彼此隔离, 要实现 IPC, multiprocessing 模块为我们提供了 队列和管道 这两种形式

1. 什么是管道, 什么是队列

  • 管道(了解即可) : 一个进程将一个数据放入管道内(共享内存), 另一个进程从管道内取出数据进行处理
  • 队列 : 管道加锁, 先进先出, 帮我们实现了复杂的加锁解锁操作, 以下我们主要介绍队列的使用

2. 队列的常用方法

  • 创建一个队列实例
🍑导入模块
from multiprocessing import Queue
🍑创建一个队列对象
q = Queue([maxsize])  # 多进程可以使用 Queue 进行数据传递
🍑参数介绍
maxsize               # 是队列中允许最大项, 省略则无大小限制
  • 常用方法介绍
方法 功能
q.put() 向队列中传入数据, 可选参数 : blocked(锁定状态)和 timeout(超时时间)。如果 blocked 为 True(默认值), 并且 timeout 为正值, 该方法会阻塞 timeout 指定的时间, 直到该队列有剩余的空间。如果超时,会抛出 Queue.Full 异常。如果 blocked 为 False,但该 Queue 已满,会立即抛出 Queue.Full 异常
q.get() 从队列读取走一个元素, 有两个可选参数:blocked 和 timeout。如果 blocked 为 True(默认值),并且 timeout 为正值,那么在等待时间内没有取到任何元素,会抛出 Queue.Empty 异常。如果 blocked 为 False,有两种情况存在,如果 Queue 有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出 Queue.Empty 异常
q.get_nowait() 同 q.get(blocked=False)
q.put_nowait() 同 q.put(blocked=False)
q.empty() 调用此方法时 q 为空则返回 True,该结果不可靠,比如在返回 True 的过程中,如果队列中又加入了项目
q.full() 调用此方法时 q 已满则返回 True,该结果不可靠,比如在返回 True 的过程中,如果队列中的项目被取走
q.qsize() 返回队列中目前项目的正确数量,结果也不可靠,理由同 q.empty()和 q.full()一样
  • 其他方法介绍
方法 作用
q.cancel_join_thread() 不会在进程退出时自动连接后台线程, 可以防止 join_thread()方法阻塞
q.close() 关闭队列, 防止队列中加入更多数据。调用此方法,后台线程将继续写入那些已经入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果 q 被垃圾收集,将调用此方法。关闭队列不会在队列使用者中产生任何类型的数据结束信号或异常。例如,如果某个使用者正在被阻塞在 get()操作上,关闭生产者中的队列不会导致 get()方法返回错误
q.join_thread() 连接队列的后台线程。此方法用于在调用 q.close()方法之后,等待所有队列项被消耗。默认情况下,此方法由不是 q 的原始创建者的所有进程调用。调用 q.cancel_join_thread 方法可以禁止这种行为

3. 队列的使用

from multiprocessing import Queue
q = Queue(3)           # 创建一个队列, 设置最大项为 3
q.put({"name":"ii"})   # 放入一个字典
q.put([1,2,3,4,5])     # 放入一个列表
q.put("shawn")         # 放入一个字符串
try:
    # q.put(1777,block=True,timeout=3)
    q.put(1777,block=False)  # 放入一个整形, 并设置队列已满立马抛异常
except Exception:
    print(" 队列已满 ")
print(q.get())         # 取一个值
print(q.get())         # 2
print(q.get())         # 3
try:
    # print(q.get(block=True,timeout=3))
    print(q.get(block=False))  # 取一个值, 队列为空立马抛出异常
except Exception:
    print(" 队列已空 ")
''' 输出
队列已满
{'namwe': 'ahsns'}
[1, 2, 3, 4, 5]
shawn
队列已空
'''

十二. 生产者消费者模型

1. 什么是生产者消费者模型

  • 生产者 : 程序中负责产生数据的一方
  • 消费者 : 程序中负责处理数据的一方

2. 为什么引入生产者消费者模型

在并发编程中, 生产者消费者模式通过一个容器来解决生产者和消费者之间的强耦合性, 两者之间不再是直接通信, 而是通过堵塞队列来进行通信, 生产者 (生产速度快) 不必再等待消费者是否处理完数据, 消费者直接从队列中取, 该队列就相当于一个缓冲区, 平衡了生产者和消费者的工作能力, 从而提高了程序整体的数据处理速度

3. 如何实现

通过队列 : 生产者 ——> 队列 ——-> 消费者

4. 生产者消费者示例

from multiprocessing import Process, Queue
import time, random
def producer(q, name, food):
    for i in range(3):
        res = f"{food}{i}"
        time.sleep(random.randint(1, 3))  # 模拟生产者数据产出时间
        q.put(res)             # 将产生的数据放入到队列中
        print(f"\033[1;35m{name}: 生产了:{res}\033[0m")
def consumer(q, name):
    while True:
        res = q.get()           # 取出数据
        if res == None: break   # 判断是否 None, None 代表队列取完了, 结束
        time.sleep(random.randint(1, 3))   # 模拟消费者处理数据时间
        print(f"\033[1;36m{name}吃了{res}\033[0m")
if __name__ == "__main__":
    q = Queue()  # 创建队列
    # 开启三个生产者进程
    p1 = Process(target=producer, args=(q, "shawn", " 香肠 ")) 
    p2 = Process(target=producer, args=(q, " 派大星 ", " 热狗 "))
    p3 = Process(target=producer, args=(q, " 海绵宝宝 ", " 鸡 "))
    # 开启两个消费者进程
    c1 = Process(target=consumer, args=(q, " 章鱼哥 "))
    c2 = Process(target=consumer, args=(q, " 蟹老板 "))
    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()
    # 等待生产者全部生产完毕结束进程
    p1.join()
    p2.join()
    p3.join()
    # 主进程再想队列里面放入两个 None, 当消费者拿到后代表取完了
    q.put(None)
    q.put(None)
    print(" 痞老板: 主 ")
''' 输出
shawn: 生产了: 香肠 0
派大星: 生产了: 热狗 0
章鱼哥吃了香肠 0
蟹老板吃了热狗 0
派大星: 生产了: 热狗 1
shawn: 生产了: 香肠 1
海绵宝宝: 生产了: 鸡 0
章鱼哥吃了热狗 1
海绵宝宝: 生产了: 鸡 1
派大星: 生产了: 热狗 2
章鱼哥吃了鸡 0
蟹老板吃了香肠 1
shawn: 生产了: 香肠 2
海绵宝宝: 生产了: 鸡 2
痞老板: 主
蟹老板吃了热狗 2
章鱼哥吃了鸡 1
蟹老板吃了香肠 2
章鱼哥吃了鸡 2
Process finished with exit code 0
'''

5. 第二种生产者消费者模型使用 JoinableQueue 类 (了解)

  • JoinableQueue类的实例

q = JoinableQueue([maxsize]) : 与 Queue 的对象一样, 但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的

  • 方法
方法 作用
q.task_done() 使用者使用此方法发出信号,表示 q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发 ValueError 异常
q.join() 生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用 q.task_done()方法为止
from multiprocessing import Process, JoinableQueue
import time, random
def producer(q, name, food):
    for i in range(3):
        res = f"{food}{i}"
        q.put(res)
        time.sleep(random.randint(1, 3))
        print(f"\033[1;35m{name}: 生产了:{res}\033[0m")
    q.join()  # 等待每个生产者自己放入的数据被消费者取完才结束该进程
def consumer(q, name):
    while True:
        res = q.get()
        if res == None: break
        time.sleep(random.randint(1, 3))
        print(f"\033[1;36m{name}吃了{res}\033[0m")
        q.task_done()  
        # 消费者每次取走一个数据都发送一个 task_done 信号, 生产者那边的计数相应减 1
if __name__ == "__main__":
    q = JoinableQueue()  # 创建一个对象
    # 创建三个生产者
    p1 = Process(target=producer, args=(q, "shawn", " 香肠 "))
    p2 = Process(target=producer, args=(q, " 派大星 ", " 热狗 "))
    p3 = Process(target=producer, args=(q, " 海绵宝宝 ", " 鸡 "))
    # 创建两个消费者
    c1 = Process(target=consumer, args=(q, " 章鱼哥 "))
    c2 = Process(target=consumer, args=(q, " 蟹老板 "))
    # 将两个消费者设置成守护进程, 主进程代码结束, 这两个消费者进程相应结束
    c1.daemon = True
    c2.daemon = True
    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()
    # 等待三个生产者进程结束
    p1.join()
    p2.join()
    p3.join()
🔰# 原理分析 : 生产者生产数据, 假设一个生产者生产 3 个数据带队列, 每个相应的计数为 3
🔰# 消费者从队列中取走数据的时候发送 task_done 信号给生产者, 生产者的计数 3 -1, 剩下两个
🔰# 消费者继续取数据并发送信号, 当生产者的计数为 0, 代表队列已经取完了, 这时 q.join()就不再进行堵塞, 生产者进程结束
🔰# 而此时的消费者也已经没有作用了, 将消费者进程设置成守护进程, 主进程等待生产者进程结束就结束, 消费者进程自然被带走
''' 输出
shawn: 生产了: 香肠 0
海绵宝宝: 生产了: 鸡 0
章鱼哥吃了香肠 0
派大星: 生产了: 热狗 0
蟹老板吃了热狗 0
shawn: 生产了: 香肠 1
海绵宝宝: 生产了: 鸡 1
章鱼哥吃了鸡 0
蟹老板吃了香肠 1
shawn: 生产了: 香肠 2
派大星: 生产了: 热狗 1
章鱼哥吃了鸡 1
蟹老板吃了热狗 1
海绵宝宝: 生产了: 鸡 2
派大星: 生产了: 热狗 2
章鱼哥吃了香肠 2
蟹老板吃了鸡 2
章鱼哥吃了热狗 2
Process finished with exit code 0
'''

十三. 信号量 Semaphore (了解)

互斥锁同时只允许一个线程修改数据, 而 Semaphore 允许同时有一定数量的进程更改数据, 就像理发店, 比如只有 3 个托尼老师, 那最多只允许 3 个人同时理发, 后面的人只能等到有人理完了才能开始, 如果指定信号量为 3, 那么来一个人获得一把锁, 计数加 1, 当计数等于 3 时, 后面的人均需要等待 , 一旦释放, 就有人可以获得一把锁

from multiprocessing import Semaphore,Process
import time,random
def haircut(sem,name):
    start_time = time.time()
    sem.acquire()  # 加锁
    print(f"{name}开始理发 ")
    time.sleep(random.randint(2,3)) # 模拟理发时间
    print(f"{name}理发加等待用时 %.2f"%(time.time()-start_time))
    sem.release()  # 解锁
if __name__ == '__main__':
    sem = Semaphore(3)  # 最大进程数为 3
    user_list = []
    for i in range(8):
        p = Process(target=haircut,args=(sem,f" 明星{i}"))
        p.start()
        user_list.append(p)
    for obj in user_list:
        obj.join()
    print(" 关门 ")
''' 输出
明星 0 开始理发
明星 1 开始理发
明星 2 开始理发
明星 0 理发加等待用时 3.00
明星 3 开始理发
明星 1 理发加等待用时 3.00
明星 4 开始理发
明星 2 理发加等待用时 3.00
明星 5 开始理发
明星 3 理发加等待用时 4.93
明星 6 开始理发
明星 4 理发加等待用时 4.87
明星 7 开始理发
明星 5 理发加等待用时 5.82
明星 7 理发加等待用时 6.69
明星 6 理发加等待用时 7.74
关门
Process finished with exit code 0
'''

十四. 死锁与递归锁

进程死锁、递归锁与线程死锁、递归锁一样, 将统一放在线程一起讲, 请参见多线程

十五. 事件 (Event)

参见多线程

十五. 进程池概念

1. 什么是进程池?

  • 👉进程池是 资源进程 , 管理进程 组成的技术的应用.

2. 为什么要有进程池?

😮忙时会有成千上万的任务需要被执行,闲时可能只有零星任务。😒那么在成千上万个任务需要被执行的时候,我们就需要去创建成千上万个进程么?😓首先,创建进程需要消耗时间,销毁进程也需要消耗时间。😟第二即便开启了成千上万的进程,操作系统也不能让他们同时执行,这样反而会影响程序的效率。😥因此我们不能无限制的根据任务去开启或者结束进程。那么我们要怎么做呢?

3. 进程池的概念

  • 😺定义一个池子,在里面 放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务

  • 😸等到处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务

  • 😹如果有很多任务需要执行,池中的进程数量不够,任务就要等待 之前的进程执行任务完毕归来,拿到空闲进程才能继续执行。

  • 😻也就是说,进池中进程的数量是固定的,那么同一时间最多有 固定数量 的进程在运行

  • 😼这样不会增加操作系统的调度难度,还节省了开关进程的时间,也一定程度上能够 实现并发效果

4. 资源进程

  • 👉预先创建好的空闲进程,管理进程 (好比池子🏊) 会把工作分发到空闲进程来处理。

5. 管理进程🏊

  • 👉管理进程负责创建资源进程,把工作交给空闲资源进程处理,回收已经处理完工作的资源进程。

  • 资源进程与管理进程的交互

😱管理进程如何有效的管理资源进程,分配任务给资源进程?
👉通过 IPC,信号,信号量,消息队列,管道等进行交互。

十六. 进程池的使用

我们可以通过维护一个进程池来控制进程的数目, 比如使用 httpd 的进程模式, 可以规定最大进程数和最小进程数, multiprocessing模块 Pool 类可以提供指定数量的进程供用户调用

1. 创建一个进程池

  • 方法 : Pool([numprocess],[initializer],[initargs])
  • ps : 在多线程那篇文章中介绍另一种创建进程池方式 from concurrent.future import ProcessPoolExecutor

  • 参数

参数 作用
numprocess 要创建的进程数,如果省略,将默认使用 cpu_count()的值
initializer 每个工作进程启动时要执行的可调用对象,默认为 None
initargs 传给 initializer 的参数组

2. 常用方法介绍

方法 作用
p.apply(func,args,kwargs) (同步调用)在进程池工作的进程中执行 func 函数, 后面是参数, 然后返回结果, 如果想要传入不同的参数并发的执行 func, 就需要以不同的线程去调用 p.apply()函数或者使用 p.apply_async()
p.apply_async(func,args,kwargs) (异步调用)在进程池工作的进程中执行 func 函数, 后面是参数, 然后返回结果, 结果是 AsyncResult 类的实例, 可以使用回调函数 callback, 将前面 funct 返回的结果单做参数传给回调函数
p.close() 关闭进程池,防止进一步操作, 如果所有操作持续挂起,它们将在工作进程终止前完成
P.jion() 等待所有工作进程退出。此方法只能在 close()或 teminate()之后调用, 否则报错

3. 其他方法

以下方法运用于 pply_async()map_async()的返回值, 返回值是 AsyncResul 实例的对象, 也就是该对象的方法

方法 作用
obj.get() 返回结果,如果有必要则等待结果到达。timeout 是可选的。如果在指定时间内还没有到达,将引发一场。如果远程操作中引发了异常,它将在调用此方法时再次被引发
obj.ready() 如果调用完成,返回 True
obj.successful() 如果调用完成且没有引发异常,返回 True,如果在结果就绪之前调用此方法,引发异常
obj.wait([timeout]) 等待结果变为可用, 参数是超时时间
obj.terminate() 立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果对象被垃圾回收, 将自动调用这个方法

4. 同步调用示例 (apply)

from multiprocessing import Pool
import time,os,random
def test(n):
    print(f" 子进程:{os.getpid()}")
    time.sleep(2)
    return n*random.randint(2,9)
if __name__ == '__main__':
    n = os.cpu_count()     # 本机 CPU 个数, 我的是 4, 进程池容量个数自定义, 默认 CPU 核数
    p = Pool(processes=n)  # 设置进程池进程个数, 从无到有, 并且以后一直只有这四个进程在执行任务
    li = []
    start_time = time.time()
    for i in range(10):
        res = p.apply(test,args=(2,))  # 创建十个个任务, 使用同步调用的方式
        li.append(res)
    p.close()  # 先关闭进程池, 不会再有新的进程加入到 pool 中, 防止进一步的操作(同步调用可以不加此方法)
    p.join()   # 必须在 close 调用之后执行, 否则报错, 执行后等待所有子进程结束(同步调用可以不加此方法)
    print(li)  # 同步调用, 得到的就是最终结果,(异步调用得到的是对象, 需要使用 get 方法取值)
    print(f'使用时间:{time.time()-start_time}')
''' 输出
子进程:7768
子进程:16276
子进程:17544
子进程:15680
子进程:7768
子进程:16276
子进程:17544
子进程:15680
子进程:7768
子进程:16276
[4, 18, 14, 14, 12, 14, 16, 14, 6, 10]
使用时间:20.226498126983643
'''

从上面的输出结果可以看到, 进程一直是那四个 : 7769、16276、17544、15680, 并且异步提交需要等待上一个任务结束拿到结果才能进行下一个任务, 所以用时 20 秒多一点

5. 异步调用示例 (apply_async)

from multiprocessing import Pool
import time,os,random
def test(n):
    print(f" 子进程:{os.getpid()}")
    time.sleep(2)
    return n*n*random.randint(2,9)
if __name__ == '__main__':
    n = os.cpu_count()     # 本机 CPU 个数, 我的是 4, 进程池容量个数自定义, 默认 CPU 核数
    p = Pool(processes=n)  # 设置进程池大小, 从无到有, 并之后只有这四个进程执行任务
    li = []
    start_time = time.time()
    for i in range(10):
        res = p.apply_async(test,args=(2,))  # 开启十个任务, 使用异步调用的方式
        li.append(res)
    p.close()  # 关闭进程池, 不会再有新的进程加入到 pool 中, 防止进一步的操作
    p.join()   # join 必须在 close 函数之后进行, 否则报错, 执行后等待所有子进程结束
    print(li)  # 返回的是 AsyncResul 的对象[<multiprocessing.pool.ApplyResult object at 0x000002318511B408>,....]
    print([i.get() for i in li])  # 使用 get 方法来获取异步调用的值(同步调用没有该方法), 并放入列表中打印
    print(f" 使用时间:{time.time()-start_time}")
''' 输出
子进程:8636
子进程:10828
子进程:7432
子进程:13976
子进程:8636
子进程:10828
子进程:7432
子进程:13976
子进程:8636
子进程:10828
[<multiprocessing.pool.ApplyResult object at 0x000001623059B308>,... 省略]
[16, 24, 24, 24, 16, 28, 36, 28, 8, 32]
使用时间:6.301024436950684
'''

从上面结果也能看出自始至终都只有四个进程在工作 : 8636、10828、7432、13976,异步调用方式如果任务进行时遇到阻塞操作将立马接收其它异步操作中的结果, 如果进程池满了, 则只能等待任务进行完毕拿到结果, 拿到的结果是 AsyncResul 的对象, 需要使用 get 方法取值, 用时 6 秒多一点

6. 服务端使用进程池来控制接入客户端进程的个数示例

  • 服务端
from socket import *
from multiprocessing import Pool
import os
s = socket(AF_INET,SOCK_STREAM)
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)  # 重用 IP 和端口
s.bind(("127.0.0.1",8055))
s.listen(5)
def connection(conn):
    print(f" 当前进程:{os.getpid()}")
    while 1:
        try:
            date = conn.recv(1024)
            if len(date) == 0:break
            conn.send(" 阿巴阿巴 ".encode("utf-8"))
        except Exception:
            break
if __name__ == '__main__':
    p = Pool(2)  # 不指定, 默认本机 CPU 核数
    print("connection....")
    while 1:
        conn,addr = s.accept()
        print(f" 已连上{addr}")
        p.apply_async(connection,args=(conn,))
  • 客户端(我们开启四个客户端来做实验)
from socket import *
c = socket(AF_INET,SOCK_STREAM)
c.connect(("127.0.0.1",8055))
while 1:
    msg = input(" 内容 >>").strip()
    if len(msg) == 0:continue
    c.send(msg.encode("utf-8"))
    date = c.recv(1024)
    print(f" 服务端的回复:{date.decode('utf-8')}")
  • 测试效果

四个客户端一个服务端 :

81 Python 并发编程之多进程

启动五台机器, 让四台客户端发送信息

前两台能发收消息, 后两台阻塞原地

81 Python 并发编程之多进程

81 Python 并发编程之多进程

81 Python 并发编程之多进程

81 Python 并发编程之多进程

服务端显示两个进程启动成功 : 8928、17584, 剩余两个阻塞

81 Python 并发编程之多进程

我们将前面两个客户端进程关闭, 看看进程号是否变化

关闭前两个客户端进程之后, 后两个客户端进程立马启动起来了, 并且发现 PID 还是原来的两个

81 Python 并发编程之多进程

十七. 回调函数 (callback)

1. 什么是回调函数

将第一个函数的指针 (也就是内存地址,Python 中淡化了指针的概念) 作为参数传给另一个函数处理, 这第一个函数就称为回调函数

2. 简单示例

def foo(n):
    print(f"foo 输出{n}")
def Bar(i,func):
    func(i)
for i in range(3):
    Bar(i,foo)
''' 输出
foo 输出 0
foo 输出 1
foo 输出 2
'''

3. 回调函数应用场景

当进程池中一个任务处理完之后, 它去通知主进程自己结束了, 让主进程处理自己的结果, 于是主进程去调用另一个函数去处理该结果, 我们可以将耗时间或者阻塞的任务放入进程池, 在主进程中指定回调函数, 并由主进程负责执行, 这样主进程在执行回调函数的时候就省去了 I/O 的过程, 直接拿到的就是任务的结果

  • 举一个简单易于理解的示例
from multiprocessing import Pool
import os
def get(n):
    print(f"get--->{os.getpid()}")
    return n   # 返回任务执行的结果
def set(num):  # 拿到回调函数的处理结果 --->num
    print(f"set--->{os.getpid()} : {num**2}")
if __name__ == '__main__':
    p = Pool(3)
    nums = [2,3,4,1]
    li = []
    for i in nums:
        # 异步调用, 并使用 callback 设置回调
        res = p.apply_async(get,args=(i,),callback=set)  
        li.append(res)
    p.close()  # 关闭进程池
    p.join()   # 等待子进程结束
    print([ii.get() for ii in li])  # 使用 get 方法拿到结果
''' 输出
get--->8388
get--->8388
set--->8768 : 4
get--->8388
set--->8768 : 9
get--->8388
set--->8768 : 16
set--->8768 : 1
[2, 3, 4, 1]
'''
  • 获取网页源码大小示例
from multiprocessing import Pool
import requests,os
def get_htm(url):
    print(f" 进程:{os.getpid()}开始获取:{url}网页 ")
    response = requests.get(url)
    if response.status_code == 200:   # 如果是 200, 则获取成功
        return {'url':url,'text':response.text}
    else:
        return {'url':url,'text':''}  # 有些网页获取不到, 设置空
def parse_htm(htm_dic):
    print(f'进程:{os.getpid()}正在处理:{htm_dic["url"]}的 text')
    parse_data = f"url:{htm_dic['url']} size:{len(htm_dic['text'])}"
    with open("./db.txt","a")as f:    # 将 URL 和对应网页源码大小保存到文件
        f.write(f"{parse_data}\n")
if __name__ == '__main__':
    urls=[
        'https://zhuanlan.zhihu.com',
        'https://www.cnblogs.com',
        'https://www.python.org',
        'https://blog.csdn.net',
        'http://www.china.com.cn',
    ]
    p = Pool(3)  # 设置进程池最大进程数为 3
    li = []
    for url in urls:
        # 异步调用并设置回调
        res = p.apply_async(get_htm,args=(url,),callback=parse_htm)  
        li.append(res)
    p.close()  # 关闭进程池
    p.join()   # 等待子进程结束
    print([i.get() for i in li])  # 使用 get 方法获取结果
''' 输出
进程:11484 开始获取:https://zhuanlan.zhihu.com 网页
进程:17344 开始获取:https://www.cnblogs.com 网页
进程:2688 开始获取:https://www.python.org 网页
进程:11484 开始获取:https://blog.csdn.net 网页
进程:3928 正在处理:https://zhuanlan.zhihu.com 的 text
进程:17344 开始获取:http://www.china.com.cn 网页
进程:3928 正在处理:https://www.cnblogs.com 的 text
进程:3928 正在处理:https://blog.csdn.net 的 text
进程:3928 正在处理:http://www.china.com.cn 的 text
进程:3928 正在处理:https://www.python.org 的 text
[{'url': 'https://zhuanlan.zhihu.com', 'text': ''},... 一堆网页源码的 bytes(省略)]'''
  • 查看一下保存的 “db.txt” 文件

81 Python 并发编程之多进程

4. 爬取福布斯全球排行榜

81 Python 并发编程之多进程

from multiprocessing import Pool
import re
import requests
def get_htm(url,format1):
    response = requests.get(url)
    if response.status_code == 200:
        return (response.text,format1)
    else:
        return ('',format1)
def parse_htm(res):
    text,format1 = res
    data_list = re.findall(format1,text)
    for data in data_list:
        with open(" 福布斯排行.txt","a",encoding="utf-8")as f:
            f.write(f" 排名:{data[0]}, 名字:{data[1]}, 身价:{data[2]}, 公司:{data[3]}, 国家:{data[4]}\n")
if __name__ == '__main__':
    url1 = "https://www.phb123.com/renwu/fuhao/shishi.html"
    # 使用正则匹配关键字
    format1 = re.compile(r'<td.*?"xh".*?>(\d+)<.*?title="(.*?)".*?alt.*?<td>(.*?)</td>.*?<td>(.*?)<.*?title="(.*?)"', re.S)
    url_list = [url1]
    for i in range(2, 16):  # 总共 15 页排行, 将链接都加进列表里
        url_list.append(f"https://www.phb123.com/renwu/fuhao/shishi_{i}.html")
    p = Pool()
    li = []
    for url in url_list:
        res = p.apply_async(get_htm,args=(url,format1),callback=parse_htm)
        li.append(res)
    p.close()
    p.join()
    print(" 保存完成 ")
  • 查看一下文件, 看看自己有没有上榜

81 Python 并发编程之多进程

81 Python 并发编程之多进程

ps : 如果你就是想在主进程中等待进程池中所有任务都执行完毕后,再统一处理结果,则无需回调函数

© 版权声明
THE END
喜欢就支持一下吧
点赞6 分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

取消
昵称表情代码图片

    暂无评论内容