深入理解Python多线程 多进程 协程----(多进程)

1 什么是进程

进程:正在进行的一个过程或者说一个任务。而负责执行任务则是cpu

1.1 并发与并行

无论是并行还是并发,在用户看来都是’同时’运行的,不管是进程还是线程,都只是一个任务而已,真是干活的是cpu,cpu来做这些任务,而一个cpu同一时刻只能执行一个任务

  • 并发:是伪并行,即看起来是同时运行。单个cpu+多道技术就可以实现并发,(并行也属于并发)

  • 并行:同时运行,只有具备多个cpu才能实现并行

单核下,可以利用多道技术,多个核,每个核也都可以利用多道技术(多道技术是针对单核而言的)有四个核,六个任务,这样同一时间有四个任务被执行,假设分别被分配给了cpu1,cpu2,cpu3,cpu4,一旦任务1遇到I/O就被迫中断执行,此时任务5就拿到cpu1的时间片去执行,这就是单核下的多道技术而一旦任务1的I/O结束了,操作系统会重新调用它(需知进程的调度、分配给哪个cpu运行,由操作系统说了算),可能被分配给四个cpu中的任意一个去执行

多道技术概念回顾:内存中同时存入多道(多个)程序,cpu从一个进程快速切换到另外一个,使每个进程各自运行几十或几百毫秒,这样,虽然在某一个瞬间,一个cpu只能执行一个任务,但在1秒内,cpu却可以运行多个进程,这就给人产生了并行的错觉,即伪并行,以此来区分多处理器操作系统的真正硬件并行(多个cpu共享同一个物理内存)

1.2 同步\异步and阻塞\非阻塞

  • 同步:同步就是在发出一个功能调用时,在没有得到结果之前,该调用不会返回。
  • 异步:异步就是当一个异步功能调用发出后,调用者不能立即得到结果,当异步功能完成后,通过状态、通知、或者回调来通知调用者。
  • 阻塞:阻塞式指调用结果返回之前,当前线程会被挂起,函数只有在得到结果之后才会将阻塞的线程激活
  • 非阻塞:非阻塞是指不能立即得到结果之前也能立即返回,同时该函数也不会阻塞当前线程。

1.3 multiprocessing模块

python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分情况需要使用多进程。Python提供了multiprocessin
multiprocessing模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。

需要再次强调的一点是:与线程不同,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。

2 Process类的使用

python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分情况需要使用多进程。Python提供了multiprocessing。

在windows中Process()必须放到# if name == ‘main‘:下

2.1 创建并开启子进程的两种方式

方式1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import time
import random
from multiprocessing import Process
def foo(name):
print('%s running ' % name)
time.sleep(random.randrange(1, 5))
print('%s ending ' % name)
if __name__ == '__main__':
p1 = Process(target=foo, args=('p1',)) # 必须加逗号
p2 = Process(target=foo, args=('p2',))
p3 = Process(target=foo, args=('p3',))
p4 = Process(target=foo, args=('p4',))
p1.start()
p2.start()
p3.start()
p4.start()
print('主线程')

结果:

1
2
3
4
5
6
7
8
9
主进程
p2 running
p4 running
p3 running
p1 running
p2 ending
p1 ending
p3 ending
p4 ending

方式2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import time
import random
from multiprocessing import Process
class Foo(Process):
def __init__(self, name):
# super(Foo, self).__init__()
super().__init__()
self.name = name
def run(self):
print('%s running ' % self.name)
time.sleep(random.randrange(1, 5))
print('%s ending ' % self.name)
if __name__ == '__main__':
p1 = Foo('p1')
p2 = Foo('p2')
p3 = Foo('p3')
p4 = Foo('p4')
p1.start() # 自动执行run方法
p2.start()
p3.start()
p4.start()
print('主线程')

结果:

1
2
3
4
5
6
7
8
9
主进程
p4 running
p2 running
p1 running
p3 running
p3 ending
p1 ending
p4 ending
p2 ending

2.2 Process对象的join方法

如下p1调用了join方法,主进程要等待p1先结束后才结束
很明显p1.join()是让主线程等待p1的结束,卡住的是主线程而绝非进程p1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import time
import random
from multiprocessing import Process
class Foo(Process):
def __init__(self, name):
super().__init__()
self.name = name
def run(self):
print('%s running ' % self.name)
time.sleep(2)
print('%s ending ' % self.name)
if __name__ == '__main__':
p1 = Foo('p1')
p2 = Foo('p2')
p3 = Foo('p3')
p4 = Foo('p4')
p1.start() # 自动执行run方法
p1.join(0.1)
p2.start()
p3.start()
p4.start()
# p1.join()
# p2.join()
# p3.join()
# p4.join()
print('主线程')

结果:
p1 running
p1 ending
主线程
p3 running
p2 running
p4 running
p3 ending
p2 ending
p4 ending

1
2
3
4
5
6
7
8
9
10
#进程只要start就会在开始运行了,所以p1-p4.start()时,系统中已经有四个并发的进程了
#而我们p1.join()是在等p1结束,没错p1只要不结束主线程就会一直卡在原地,这也是问题的关键
#join是让主线程等,而p1-p4仍然是并发执行的,p1.join的时候,其余p2,p3,p4仍然在运行,等#p1.join结束,可能p2,p3,p4早已经结束了,这样p2.join,p3.join.p4.join直接通过检测,无需等待
# 所以4个join花费的总时间仍然是耗费时间最长的那个进程运行的时间
p1.join()
p2.join()
p3.join()
p4.join()
print('主线程')

简写的方式:

1
2
3
4
5
6
p_list = [p1,p2,p3,p4]
for p in p_list:
p.start()
for p in p_list:
p.join()

2.3 Process的其他方法 属性 treminate is_alive

  • treminate is_alive

    1
    2
    3
    4
    5
    6
    7
    8
    p1.start() # 自动执行run方法
    p1.terminate() # 不会立即关闭
    print(p1.is_alive()) # 通过打印is_alive 结果仍然是True
    print('主线程')
    time.sleep(3)
    print(p1.is_alive())
  • name 与pid
    name在super前面和后面的结果是不一样的,放在super前面会被重新赋值,得到的结果是Foo-1,Foo-2…
    name 在super后面的结果是p1,p2

p.pid查看进程号

3 守护进程

主进程创建守护进程

  • 守护进程会在主进程结束后终止
  • 守护进程内无法开启子进程,否则抛出异常

注意:进程之间是相互独立的,主进程代码运行结束,守护进程立即结束

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import time
import random
from multiprocessing import Process
class Foo(Process):
def __init__(self, name):
# super(Foo, self).__init__()
super().__init__()
self.name = name
def run(self):
print('%s running ' % self.name)
time.sleep(2)
print('%s ending ' % self.name)
if __name__ == '__main__':
p1 = Foo('p1')
# p2 = Foo('p2')
# p3 = Foo('p3')
# p4 = Foo('p4')
p1.daemon = True
p1.start()
print('主')

结果:

从结果中可以看出,主进程结束后,守护进程立即结束,没有运行

3 进程同步(锁)

进程之间的数据是不共享的,但是共享同一套文件系统,或者同一套打印终端,竞争带来的结果就是错乱,如何控制,就是加锁处理。

加锁可以保证多个进程修改同一块数据时,同一时间内只有一个任务可以修改数据,即串行修改,牺牲了速速,但是保证了数据的安全
虽然可以用文件共享数据实现进程间通信,但问题是:
1.效率低(共享数据基于文件,而文件是硬盘上的数据)
2.需要自己加锁处理

因此我们最好找寻一种解决方案能够兼顾:
1、效率高(多个进程共享一块内存的数据)
2、帮我们处理好锁问题。这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。

我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁的问题,在进程数目增多时,可以获得更多的可扩展性。

4 队列 (推荐使用)

进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的

1
2
3
4
5
6
7
8
9
1 q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
2 q.get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.
3
4 q.get_nowait():同q.get(False)
5 q.put_nowait():同q.put(False)
6
7 q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。
8 q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
9 q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样
1
2
3
4
5
6
7
8
9
10
11
12
13
14
from multiprocessing import Process,Queue
import time
q = Queue(3)
q.put(3)
q.put(3)
q.put(3)
print(q.full())
print(q.get())
print(q.get())
print(q.get())
print(q.empty())

4.1 生产者 消费者模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from multiprocessing import Process, Queue
import time, random, os
def consumer(q):
"""
消费者
:param q:
:return:
"""
while True:
res = q.get()
time.sleep(random.randint(1, 3))
print("%s吃%s" % (os.getpid(), res))
def prodcer(q):
"""
生产者
:param q:
:return:
"""
for i in range(10):
time.sleep(random.randint(1, 3))
res = "包子"
q.put(res)
print("%s生产了%s" % (os.getpid(), res))
if __name__ == '__main__':
q = Queue()
p1 = Process(target=prodcer,args=(q,))
p2 = Process(target=consumer,args=(q,))
p1.start()
p2.start()
print('主')

加上结束信号:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from multiprocessing import Process, Queue
import time, random, os
def consumer(q):
"""
消费者
:param q:
:return:
"""
while True:
res = q.get()
if res is None: break # 如果是None 跳出循环
time.sleep(random.randint(1, 3))
print("%s吃%s" % (os.getpid(), res))
def prodcer(q):
"""
生产者
:param q:
:return:
"""
for i in range(10):
time.sleep(random.randint(1, 3))
res = "包子"
q.put(res)
print("%s生产了%s" % (os.getpid(), res))
q.put(None) # 生产者生产完成后发送None
if __name__ == '__main__':
q = Queue()
p1 = Process(target=prodcer,args=(q,))
p2 = Process(target=consumer,args=(q,))
p1.start()
p2.start()
print('主')

此时的问题是主进程永远不会结束,原因是:生产者p在生产完后就结束了,但是消费者c在取空了q之后,则一直处于死循环中且卡在q.get()这一步。

解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就可以break出死循环

跳出循环由主进程发送None指令,这里生产者设置join

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def consumer(q):
"""
消费者
:param q:
:return:
"""
while True:
res = q.get()
if res is None: break # 如果是None 跳出循环
time.sleep(random.randint(1, 3))
print("%s吃%s" % (os.getpid(), res))
def prodcer(q):
"""
生产者
:param q:
:return:
"""
for i in range(5):
time.sleep(random.randint(1, 3))
res = "包子"
q.put(res)
print("%s生产了%s" % (os.getpid(), res))
# q.put(None) # 生产者生产完成后发送None
if __name__ == '__main__':
q = Queue()
p1 = Process(target=prodcer,args=(q,))
p2 = Process(target=consumer,args=(q,))
p1.start()
p2.start()
p1.join() # p1设置join
q.put(None) # 由主进程发送None
print('主')

有多个生产者和消费者

  • 设置join
  • 有多少个消费者就要通知多少个消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
if __name__ == '__main__':
q = Queue()
p1 = Process(target=prodcer,args=(q,))
p2 = Process(target=prodcer,args=(q,))
p3 = Process(target=prodcer,args=(q,))
p4 = Process(target=prodcer,args=(q,))
c1 = Process(target=consumer,args=(q,))
c2 = Process(target=consumer,args=(q,))
c3 = Process(target=consumer,args=(q,))
p1.start()
p2.start()
p3.start()
p4.start()
c1.start()
c2.start()
c3.start()
p1.join() #必须保证生产者全部生产完毕,才应该发送结束信号
p2.join()
p3.join()
p4.join()
q.put(None) # 有几个消费者就应该发送几次结束信号None
q.put(None)
q.put(None)
print('主')

使用JoinableQueue实现发送结束信号

1
2
3
4
5
6
7
8
#JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
#参数介绍:
maxsize是队列中允许最大项数,省略则无大小限制。
  #方法介绍:
JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:
q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
from multiprocessing import Process,JoinableQueue
import time, random, os
def consumer(q):
"""
消费者
:param q:
:return:
"""
while True:
res = q.get()
time.sleep(random.randint(1, 3))
print("\033[34m%s 吃 %s\033[0m" % (os.getpid(), res))
q.task_done() # 向q.join()发送一次信号 证明一个数据已经被取走
def prodcer(name,q):
"""
生产者
:param q:
:return:
"""
for i in range(5):
time.sleep(random.randint(1, 3))
res = "%s%s"%(name,i)
q.put(res)
print("\033[35m%s 生产了 %s\033[0m" % (os.getpid(), res))
q.join() # 生产者需要用join 这里是等消费者
if __name__ == '__main__':
q = JoinableQueue()
p1 = Process(target=prodcer,args=('包子',q,))
p2 = Process(target=prodcer,args=('馒头',q,))
p3 = Process(target=prodcer,args=('面条',q,))
c1 = Process(target=consumer,args=(q,))
c2 = Process(target=consumer,args=(q,))
c1.daemon = True # c1 c2 设置守护进程
c2.daemon = True
p_l = [p1, p2, p3, c1, c2]
for p in p_l:
p.start()
p1.join()
p2.join()
p3.join()
print('主')

5 共享数据

基于消息传递的并发编程是大势所趋,Go是并发编程,未来会火

6 进程池

进程池可以指定Pool中进程的数量,当有新的请求到来时,如果池没有满,就创建新的进程,如果满了就等待,知道进程池中有进程结束,然后重用该进程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from multiprocessing import Pool
import os
import time
def work(n):
print("%s run" % os.getpid())
return n**2
if __name__ == '__main__':
p = Pool(3) # 进程池中创建3个进程
res_l = []
for i in range(10):
res = p.apply(work, args=(i,)) # 同步调用
res_l.append(res)
print(res_l)

socket使用进程池

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# Pool内的进程数默认是cpu核数,假设为4(查看方法os.cpu_count())
# 开启6个客户端,会发现2个客户端处于等待状态
# 在每个进程内查看pid,会发现pid使用为4个,即多个客户端公用4个进程
from socket import *
from multiprocessing import Pool
import os
server = socket(AF_INET, SOCK_STREAM)
server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
server.bind(('127.0.0.1', 8080))
server.listen(5)
def talk(conn, client_addr):
print('进程pid: %s' % os.getpid())
while True:
try:
msg = conn.recv(1024)
if not msg: break
conn.send(msg.upper())
except Exception:
break
if __name__ == '__main__':
p = Pool() # 默认是os.cpu_count() 4个
while True:
conn, client_addr = server.accept()
p.apply_async(talk, args=(conn, client_addr))
# p.apply(talk,args=(conn,client_addr)) #同步的话,则同一时间只有一个客户端能访问

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
from socket import *
client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))
while True:
msg=input('>>: ').strip()
if not msg:continue
client.send(msg.encode('utf-8'))
msg=client.recv(1024)
print(msg.decode('utf-8'))

使用concurrent.futures实现进程池

https://docs.python.org/dev/library/concurrent.futures.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import time
from concurrent.futures import ProcessPoolExecutor
def task(i):
print("%s run"%os.getpid())
time.sleep(1)
# print(i)
if __name__ == '__main__': # 必须要有这个 否则报错
p = ProcessPoolExecutor(10) # 创建10个进程
for row in range(15):
p.submit(task, row)

http://www.cnblogs.com/linhaifeng/articles/6817679.html
http://www.jb51.net/article/87145.htm?pc
http://www.cnblogs.com/linhaifeng/articles/7428874.html

© 2018 Peter's Blog Center All Rights Reserved.
Theme by hiero