1 什么是进程
进程:正在进行的一个过程或者说一个任务。而负责执行任务则是cpu
1.1 并发与并行
无论是并行还是并发,在用户看来都是’同时’运行的,不管是进程还是线程,都只是一个任务而已,真是干活的是cpu,cpu来做这些任务,而一个cpu同一时刻只能执行一个任务
单核下,可以利用多道技术,多个核,每个核也都可以利用多道技术(多道技术是针对单核而言的)有四个核,六个任务,这样同一时间有四个任务被执行,假设分别被分配给了cpu1,cpu2,cpu3,cpu4,一旦任务1遇到I/O就被迫中断执行,此时任务5就拿到cpu1的时间片去执行,这就是单核下的多道技术而一旦任务1的I/O结束了,操作系统会重新调用它(需知进程的调度、分配给哪个cpu运行,由操作系统说了算),可能被分配给四个cpu中的任意一个去执行

多道技术概念回顾:内存中同时存入多道(多个)程序,cpu从一个进程快速切换到另外一个,使每个进程各自运行几十或几百毫秒,这样,虽然在某一个瞬间,一个cpu只能执行一个任务,但在1秒内,cpu却可以运行多个进程,这就给人产生了并行的错觉,即伪并行,以此来区分多处理器操作系统的真正硬件并行(多个cpu共享同一个物理内存)
1.2 同步\异步and阻塞\非阻塞
- 同步:同步就是在发出一个功能调用时,在没有得到结果之前,该调用不会返回。
- 异步:异步就是当一个异步功能调用发出后,调用者不能立即得到结果,当异步功能完成后,通过状态、通知、或者回调来通知调用者。
- 阻塞:阻塞式指调用结果返回之前,当前线程会被挂起,函数只有在得到结果之后才会将阻塞的线程激活
- 非阻塞:非阻塞是指不能立即得到结果之前也能立即返回,同时该函数也不会阻塞当前线程。
1.3 multiprocessing模块
python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分情况需要使用多进程。Python提供了multiprocessin
multiprocessing模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。
需要再次强调的一点是:与线程不同,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。
2 Process类的使用
python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分情况需要使用多进程。Python提供了multiprocessing。
在windows中Process()必须放到# if name == ‘main‘:下
2.1 创建并开启子进程的两种方式
方式1:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| import time import random from multiprocessing import Process def foo(name): print('%s running ' % name) time.sleep(random.randrange(1, 5)) print('%s ending ' % name) if __name__ == '__main__': p1 = Process(target=foo, args=('p1',)) p2 = Process(target=foo, args=('p2',)) p3 = Process(target=foo, args=('p3',)) p4 = Process(target=foo, args=('p4',)) p1.start() p2.start() p3.start() p4.start() print('主线程')
|
结果:
1 2 3 4 5 6 7 8 9
| 主进程 p2 running p4 running p3 running p1 running p2 ending p1 ending p3 ending p4 ending
|
方式2:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| import time import random from multiprocessing import Process class Foo(Process): def __init__(self, name): super().__init__() self.name = name def run(self): print('%s running ' % self.name) time.sleep(random.randrange(1, 5)) print('%s ending ' % self.name) if __name__ == '__main__': p1 = Foo('p1') p2 = Foo('p2') p3 = Foo('p3') p4 = Foo('p4') p1.start() p2.start() p3.start() p4.start() print('主线程')
|
结果:
1 2 3 4 5 6 7 8 9
| 主进程 p4 running p2 running p1 running p3 running p3 ending p1 ending p4 ending p2 ending
|
2.2 Process对象的join方法
如下p1调用了join方法,主进程要等待p1先结束后才结束
很明显p1.join()是让主线程等待p1的结束,卡住的是主线程而绝非进程p1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| import time import random from multiprocessing import Process class Foo(Process): def __init__(self, name): super().__init__() self.name = name def run(self): print('%s running ' % self.name) time.sleep(2) print('%s ending ' % self.name) if __name__ == '__main__': p1 = Foo('p1') p2 = Foo('p2') p3 = Foo('p3') p4 = Foo('p4') p1.start() p1.join(0.1) p2.start() p3.start() p4.start() print('主线程')
|
结果:
p1 running
p1 ending
主线程
p3 running
p2 running
p4 running
p3 ending
p2 ending
p4 ending
1 2 3 4 5 6 7 8 9 10
| p1.join() p2.join() p3.join() p4.join() print('主线程')
|
简写的方式:
1 2 3 4 5 6
| p_list = [p1,p2,p3,p4] for p in p_list: p.start() for p in p_list: p.join()
|
2.3 Process的其他方法 属性 treminate is_alive
p.pid查看进程号
3 守护进程
主进程创建守护进程
- 守护进程会在主进程结束后终止
- 守护进程内无法开启子进程,否则抛出异常
注意:进程之间是相互独立的,主进程代码运行结束,守护进程立即结束
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| import time import random from multiprocessing import Process class Foo(Process): def __init__(self, name): super().__init__() self.name = name def run(self): print('%s running ' % self.name) time.sleep(2) print('%s ending ' % self.name) if __name__ == '__main__': p1 = Foo('p1') p1.daemon = True p1.start() print('主')
|
结果:
主
从结果中可以看出,主进程结束后,守护进程立即结束,没有运行
3 进程同步(锁)
进程之间的数据是不共享的,但是共享同一套文件系统,或者同一套打印终端,竞争带来的结果就是错乱,如何控制,就是加锁处理。
加锁可以保证多个进程修改同一块数据时,同一时间内只有一个任务可以修改数据,即串行修改,牺牲了速速,但是保证了数据的安全
虽然可以用文件共享数据实现进程间通信,但问题是:
1.效率低(共享数据基于文件,而文件是硬盘上的数据)
2.需要自己加锁处理
因此我们最好找寻一种解决方案能够兼顾:
1、效率高(多个进程共享一块内存的数据)
2、帮我们处理好锁问题。这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。
我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁的问题,在进程数目增多时,可以获得更多的可扩展性。
4 队列 (推荐使用)
进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的
1 2 3 4 5 6 7 8 9
| 1 q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。 2 q.get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常. 3 4 q.get_nowait():同q.get(False) 5 q.put_nowait():同q.put(False) 6 7 q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。 8 q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。 9 q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| from multiprocessing import Process,Queue import time q = Queue(3) q.put(3) q.put(3) q.put(3) print(q.full()) print(q.get()) print(q.get()) print(q.get()) print(q.empty())
|
4.1 生产者 消费者模型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| from multiprocessing import Process, Queue import time, random, os def consumer(q): """ 消费者 :param q: :return: """ while True: res = q.get() time.sleep(random.randint(1, 3)) print("%s吃%s" % (os.getpid(), res)) def prodcer(q): """ 生产者 :param q: :return: """ for i in range(10): time.sleep(random.randint(1, 3)) res = "包子" q.put(res) print("%s生产了%s" % (os.getpid(), res)) if __name__ == '__main__': q = Queue() p1 = Process(target=prodcer,args=(q,)) p2 = Process(target=consumer,args=(q,)) p1.start() p2.start() print('主')
|
加上结束信号:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| from multiprocessing import Process, Queue import time, random, os def consumer(q): """ 消费者 :param q: :return: """ while True: res = q.get() if res is None: break time.sleep(random.randint(1, 3)) print("%s吃%s" % (os.getpid(), res)) def prodcer(q): """ 生产者 :param q: :return: """ for i in range(10): time.sleep(random.randint(1, 3)) res = "包子" q.put(res) print("%s生产了%s" % (os.getpid(), res)) q.put(None) if __name__ == '__main__': q = Queue() p1 = Process(target=prodcer,args=(q,)) p2 = Process(target=consumer,args=(q,)) p1.start() p2.start() print('主')
|
此时的问题是主进程永远不会结束,原因是:生产者p在生产完后就结束了,但是消费者c在取空了q之后,则一直处于死循环中且卡在q.get()这一步。
解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就可以break出死循环
跳出循环由主进程发送None指令,这里生产者设置join
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| def consumer(q): """ 消费者 :param q: :return: """ while True: res = q.get() if res is None: break time.sleep(random.randint(1, 3)) print("%s吃%s" % (os.getpid(), res)) def prodcer(q): """ 生产者 :param q: :return: """ for i in range(5): time.sleep(random.randint(1, 3)) res = "包子" q.put(res) print("%s生产了%s" % (os.getpid(), res)) if __name__ == '__main__': q = Queue() p1 = Process(target=prodcer,args=(q,)) p2 = Process(target=consumer,args=(q,)) p1.start() p2.start() p1.join() q.put(None) print('主')
|
有多个生产者和消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| if __name__ == '__main__': q = Queue() p1 = Process(target=prodcer,args=(q,)) p2 = Process(target=prodcer,args=(q,)) p3 = Process(target=prodcer,args=(q,)) p4 = Process(target=prodcer,args=(q,)) c1 = Process(target=consumer,args=(q,)) c2 = Process(target=consumer,args=(q,)) c3 = Process(target=consumer,args=(q,)) p1.start() p2.start() p3.start() p4.start() c1.start() c2.start() c3.start() p1.join() p2.join() p3.join() p4.join() q.put(None) q.put(None) q.put(None) print('主')
|
使用JoinableQueue实现发送结束信号
1 2 3 4 5 6 7 8
| #JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。 #参数介绍: maxsize是队列中允许最大项数,省略则无大小限制。 #方法介绍: JoinableQueue的实例p除了与Queue对象相同的方法之外还具有: q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常 q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| from multiprocessing import Process,JoinableQueue import time, random, os def consumer(q): """ 消费者 :param q: :return: """ while True: res = q.get() time.sleep(random.randint(1, 3)) print("\033[34m%s 吃 %s\033[0m" % (os.getpid(), res)) q.task_done() def prodcer(name,q): """ 生产者 :param q: :return: """ for i in range(5): time.sleep(random.randint(1, 3)) res = "%s%s"%(name,i) q.put(res) print("\033[35m%s 生产了 %s\033[0m" % (os.getpid(), res)) q.join() if __name__ == '__main__': q = JoinableQueue() p1 = Process(target=prodcer,args=('包子',q,)) p2 = Process(target=prodcer,args=('馒头',q,)) p3 = Process(target=prodcer,args=('面条',q,)) c1 = Process(target=consumer,args=(q,)) c2 = Process(target=consumer,args=(q,)) c1.daemon = True c2.daemon = True p_l = [p1, p2, p3, c1, c2] for p in p_l: p.start() p1.join() p2.join() p3.join() print('主')
|
5 共享数据
基于消息传递的并发编程是大势所趋,Go是并发编程,未来会火
6 进程池
进程池可以指定Pool中进程的数量,当有新的请求到来时,如果池没有满,就创建新的进程,如果满了就等待,知道进程池中有进程结束,然后重用该进程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| from multiprocessing import Pool import os import time def work(n): print("%s run" % os.getpid()) return n**2 if __name__ == '__main__': p = Pool(3) res_l = [] for i in range(10): res = p.apply(work, args=(i,)) res_l.append(res) print(res_l)
|
socket使用进程池
服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| from socket import * from multiprocessing import Pool import os server = socket(AF_INET, SOCK_STREAM) server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) server.bind(('127.0.0.1', 8080)) server.listen(5) def talk(conn, client_addr): print('进程pid: %s' % os.getpid()) while True: try: msg = conn.recv(1024) if not msg: break conn.send(msg.upper()) except Exception: break if __name__ == '__main__': p = Pool() while True: conn, client_addr = server.accept() p.apply_async(talk, args=(conn, client_addr))
|
客户端
1 2 3 4 5 6 7 8 9 10 11 12 13
| from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) msg=client.recv(1024) print(msg.decode('utf-8'))
|
使用concurrent.futures实现进程池
https://docs.python.org/dev/library/concurrent.futures.html
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| import time from concurrent.futures import ProcessPoolExecutor def task(i): print("%s run"%os.getpid()) time.sleep(1) if __name__ == '__main__': p = ProcessPoolExecutor(10) for row in range(15): p.submit(task, row)
|
http://www.cnblogs.com/linhaifeng/articles/6817679.html
http://www.jb51.net/article/87145.htm?pc
http://www.cnblogs.com/linhaifeng/articles/7428874.html