博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
001---进程
阅读量:4350 次
发布时间:2019-06-07

本文共 8789 字,大约阅读时间需要 29 分钟。

进程

什么是进程

进程:正在进行的一个过程或者一个任务,执行任务的是cpu。

进程与程序的区别

程序仅仅是一堆代码而已,而进程指的是程序的运行过程。

比喻:食谱就是程序,而进程就是做这道菜的过程:阅读食谱、放料、等。

并发与并行

无论是并发还是并行,在用户看起来都是同时运行的。不管是进程还是线程,都只是一个任务而已,而真正执行任务的是cpu。cpu同一时刻只能执行一个任务。

  • 并发(具备处理多个任务的能力):伪并行,看起来是同时。单个cpu + 多道技术就可以实现并发。

    你是一个cpu,你同时谈了三个女朋友,每一个都可以是一个恋爱任务,你被这三个任务共享。要玩出并发恋爱的效果,应该是你先跟女友1去看电影,看了一会说:不好,我要拉肚子,然后跑去跟第二个女友吃饭,吃了一会说:那啥,我去趟洗手间,然后跑去跟女友3开了个房
  • 并行(具备同时处理多个任务的能力):同时运行,只有具备多个cpu才能实现并行。

区别:一个是同一时间段、一个是同一时刻。

1392812-20190217164344080-48918670.jpg

进程的状态

  • 就绪:开始一个新的任务

  • 运行:正在运行这个任务

  • 挂起:这个任务遇到IO就阻塞。然后让出cpu,给其他任务去执行。

开启进程的两种方式

注意:windows中Process()一定要在if name == 'main'下

  • 普通方式

    def func1(title):    print(title)if __name__ == '__main__':    p1 = Process(target=func1, args=('子进程1',))    p2 = Process(target=func1, args=('子进程2',))    p3 = Process(target=func1, args=('子进程3',))    p1.start()    p2.start()    p3.start()    print('主进程')
  • 类创建

    from multiprocessing import Processclass MyProcess(Process):    def __init__(self, title):        super(MyProcess, self).__init__()        self.title = title    def run(self):        print(self.title)if __name__ == '__main__':    p1 = MyProcess('子进程1')    p2 = MyProcess('子进程2')    p3 = MyProcess('子进程3')    p1.start()    p2.start()    p3.start()    print('主进程')

进程的相关方法和属性

  • p.start():启动进程,调用子进程的run()

  • p.terminate():强制终止进程p,谨慎使用

  • p.is_alive():判断进程是否存活,布尔值

  • p.join():感知一个子进程的结束,将异步改为同步

    import timefrom multiprocessing import Processdef func(arg1, arg2):    print('*' * arg1)    time.sleep(5)    print('*' * arg2)if __name__ == '__main__':    p = Process(target=func, args=(10, 20))    p.start()    print('开启子进程')    p.join()  # 感知一个子进程的结束,将异步的程序改为同步    print('子进程都运行完了')

    疑问:必须明确,join是让谁等,是让主进程等子进程p.start()后。子进程就已经并发执行了。当有多个join时,四个join花费的时间也是耗费时间最长的那个子进程

  • p.daemon = True:默认false,设为true

    import timefrom multiprocessing import Processdef task(name):    print('%s is piao ing' % name)    time.sleep(3)    print('%s is piao end' % name)if __name__ == '__main__':    p = Process(target=task, args=('egon',))    p.daemon = True # 一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程代码执行结束,p即终止运行    p.start()    time.sleep(1)   # 子进程只执行一会    # time.sleep(5) # 子进程会执行完毕    print('主')     # 只要终端打印出这一行内容,那么守护进程p也就跟着结束掉了

    代表p为守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置

    比喻:皇帝身边的老太监:皇帝驾崩,老太监跟着死。
  • p.name:进程的名称

  • p.pid:进程的pid

进程间的数据隔离问题

import os, timefrom multiprocessing import Processdef func():    global n    n = 0    print('子--start:%s' % os.getpid(), n)if __name__ == '__main__':    n = 100    print('主--start:%s' % os.getpid(), n)    p = Process(target=func, )    p.start()    time.sleep(3)    print('尽管使用了全局变量n, 但是不会修改n的值,依然是 % d' % n)    print('主--end:%s' % os.getpid(), n)    """主--start:15704 100子--start:7076 0尽管使用了全局变量n, 但是不会修改n的值,依然是  100主--end:15704 100"""

进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,

而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理。

同一终端打印

  • 未加锁

    # 并发运行,效率高,但竞争同一打印终端,带来了打印错乱from multiprocessing import Processimport os, timedef work():print('%s is running' % os.getpid())time.sleep(2)print('%s is done' % os.getpid())if __name__ == '__main__':for i in range(3):p = Process(target=work)p.start()"""13924 is running13576 is running10080 is running13924 is done13576 is done10080 is done"""
  • 加锁
# 由并发变成了串行,牺牲了运行效率,但避免了竞争  from multiprocessing import Process, Lock  import os, time  def work(lock):  lock.acquire()  print('%s is running' % os.getpid())  time.sleep(2)  print('%s is done' % os.getpid())  lock.release()  if __name__ == '__main__':  lock = Lock()  for i in range(3):  p = Process(target=work, args=(lock,))  p.start()  """  3964 is running  3964 is done  14000 is running  14000 is done  11592 is running  11592 is done  """

模拟抢票

#! /usr/bin/env python# -*- coding: utf-8 -*-# __author__ = "ziya"# Date: 2018-09-07import json, timefrom multiprocessing import Processfrom multiprocessing import Lockdef show(i):    with open('ticket') as f:        ticket = json.load(f).get('ticket')    print('余票为:%s' % ticket)def buy(i, lock):    # 自动加锁和解锁    with lock:        with open('ticket') as f:            ticket = json.load(f).get('ticket')        time.sleep(0.1)        if ticket:            ticket -= 1            print('%s买到了票' % i)        else:            print('%s凉凉' % i)        time.sleep(0.1)        with open('ticket', 'w') as f:            json.dump({"ticket": ticket, }, f)if __name__ == '__main__':    for i in range(10):        p = Process(target=show, args=(i,))        p.start()    lock = Lock()    for i in range(10):        p = Process(target=buy, args=(i, lock))        p.start()

总结:加锁能保证多个进程修改同一个数据时,同一时间只能有一个任务可以进行修改,虽然牺牲了效率,但保证了数据安全。

缺点

  • 效率低
  • 需要手动进行加锁处理

所以我们需要更好的方式来解决此问题:队列和管道。都是将数据存放于内存中。

队列queue(推荐使用)

def produce(q):    q.put('hello')def consume(q):    print(q.get())if __name__ == '__main__':    q = Queue()    p = Process(target=produce,args=(q,))    p.start()    c = Process(target=consume,args=(q,))    c.start()"""  q = Queue() # 省略则无大小限制,存放的不是数据大小。而且最大项数,但是也受限于内存大小。  q.put():放,满了会阻塞  q.get():拿:空了会阻塞  q.get_nowait():当队列为空时,不会等待。直接跑出异常,可以捕捉。  q.full():判断是否满了  q.empty():判断是否空了  """

生产者消费者模型

为什么要有这个模型

  • 平衡生产者和消费者之间的速度差
  • 程序之间解耦

JoinableQueue实现生产者消费者模型

from multiprocessing import Process, JoinableQueueimport time,random,osdef consumer(q):    while True:        res=q.get()        time.sleep(random.randint(1,3))        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))        q.task_done() # 向q.join()发送一次信号,证明一个数据已经被取走了def producer(name,q):    for i in range(10):        time.sleep(random.randint(1,3))        res='%s%s' %(name,i)        q.put(res)        print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res))    q.join()  # 阻塞。感知一个队列中的数据,全部被执行完毕if __name__ == '__main__':    q=JoinableQueue()        #生产者们:即厨师们    p1=Process(target=producer,args=('包子',q))    p2=Process(target=producer,args=('骨头',q))    p3=Process(target=producer,args=('泔水',q))    #消费者们:即吃货们    c1=Process(target=consumer,args=(q,))    c2=Process(target=consumer,args=(q,))    c1.daemon=True    c2.daemon=True    #开始    p_l=[p1,p2,p3,c1,c2]    for p in p_l:        p.start()    p1.join()    p2.join()    p3.join()    print('主')         #主进程等--->p1,p2,p3等---->c1,c2    #p1,p2,p3结束了,证明c1,c2肯定全都收完了p1,p2,p3发到队列的数据    #因而c1,c2也没有存在的价值了,应该随着主进程的结束而结束,所以设置成守护进程

进程池

为什么有进程池的概念

不可能开启无限进程,一般有几个核就开几个。否则开启过多进程会造成系统调度变慢、效率反而会下降所以创建一个属于子进程的池子,对进程数加以控制

from multiprocessing import Poolp = Pool(5)  # 默认使用os.cpu_count(),一旦创建,至始至终就这5个进程交替,不会开启其他进程p.close()    # 关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成p.join()     # 等待所有工作进程退出。此方法只能在close()或teminate()之后调用

方法

  • p.map(func, range(100)):自带join
  • p.apply_async(func, args, kwargs) :异步调用,p.close()、p.join()
  • p.apply(func, ):同步调用,直到本次任务拿到返回值,或者结束

返回值

  • p.apply():同步的,直接取得结果,就是func的返回值
  • p.apply_async(func,args,kwargs):异步的,需要p.get()

回调函数

回调函数:可以为每个进程或线程绑定一个函数,该函数在进程或线程的任务执行完毕后自动触发

并接收任务的返回值当作参数。

  • p.apply_async(get, args=(url,), callback=func)func是主进程执行的

多进程实现socket服务端的并发

服务端

import socketfrom multiprocessing import Processip_port = ('127.0.0.1', 8001)buffsize = 1024def pro(conn):    while True:        try:            data = '>>>: 你好'            print('准备给客户端发送消息', data)            conn.send(data.encode('utf-8'))            msg = conn.recv(buffsize).decode('utf-8')            print('收到客户端的消息', msg)        except Exception as e:            break    conn.close()if __name__ == '__main__':    server = socket.socket()    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)    server.bind(ip_port)    server.listen(5)    while True:        conn, addr = server.accept()        print(conn, addr)        p = Process(target=pro, args=(conn,))        p.start()    server.close()

客户端

import socketip_port = ('127.0.0.1', 8001)buffsize = 1024client = socket.socket()client.connect(ip_port)while 1:    data = client.recv(buffsize).decode('utf-8')    print('收到服务器发来的消息', data)    msg = input('>>>:')    if not msg: continue    print('准备给服务器发送消息', msg)    client.send(msg.encode('utf-8'))client.close()

问题

每来一个客户端,都在服务端开启一个进程,如果并发来一个万个客户端,要开启一万个进程吗,你自己尝试着在你自己的机器上开启一万个,10万个进程试一试。

解决方法:进程池

进程池实现socket服务端并发

服务端

from socket import *from multiprocessing import Poolimport osdef talk(conn,client_addr):    print('进程pid: %s' %os.getpid())    while True:        try:            msg=conn.recv(1024)            if not msg:break            conn.send(msg.upper())        except Exception:            breakif __name__ == '__main__':    server=socket(AF_INET,SOCK_STREAM)    server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)    server.bind(('127.0.0.1',8080))    server.listen(5)    p=Pool()    while True:        conn,client_addr=server.accept()        p.apply_async(talk,args=(conn,client_addr))        # p.apply(talk,args=(conn,client_addr)) # 同步的话,则同一时间只有一个客户端能访问

客户端

from socket import *client=socket(AF_INET,SOCK_STREAM)client.connect(('127.0.0.1',8080))while True:    msg=input('>>: ').strip()    if not msg:continue    client.send(msg.encode('utf-8'))    msg=client.recv(1024)    print(msg.decode('utf-8'))

转载于:https://www.cnblogs.com/xjmlove/p/10391713.html

你可能感兴趣的文章
动态规划典型例题--背包问题九讲
查看>>
Qt之QHeaderView自定义排序(终极版)
查看>>
python----logging
查看>>
LBP特征 学习笔记
查看>>
201671010430司昕劼 实验三作业互评与改进
查看>>
OO - 原则
查看>>
主机断开网络时,virtualbox住宿互访
查看>>
win10系统80端口被占用怎么办?
查看>>
Append 后如何使用 fadein淡入效果
查看>>
SourceTree软件
查看>>
day1 java基础语法
查看>>
与TIME_WAIT相关的几个内核参数修改测试讨论结论
查看>>
Javascript闭包(Closure)详解
查看>>
【HTML XHTML CSS基础教程(第6版)】笔记之CSS笔记(7~25章)
查看>>
IO流OutputStream
查看>>
tomcat8.5 manager 配置
查看>>
前端实现某一列不能重复不能且不能为空
查看>>
测试流程的注意事项
查看>>
tyvj 1860 后缀数组
查看>>
最小二乘法曲线拟合
查看>>