本文共 5814 字,大约阅读时间需要 19 分钟。
线程同步,线程间协同,通过某种技术,让一个线程访问某些数据时,其他线程不能访问这些数据,直到该线程完成对数据的操作。
不同操作系统实现技术有所不同,有临界区(Critical Section)、互斥量(Mutex)、信号量(Semaphore)、事件Event等。Event事件,是线程间通信机制中最简单的实现,使用一个内部的标记flag,通过flag的True或False的变化来进行操作。
需求: 老板雇佣一个工人,让他生产杯子,老板一直等着这个工人,直到生产了10个杯子。from threading import Threadimport timeimport threadingimport loggingFORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s' # 注意%后不能有空格logging.basicConfig(format=FORMAT, level=logging.INFO)# flag = Falseevent = threading.Event()def worker(e, count=10): # global flag logging.info("I'm working for U") cups = [] while True: time.sleep(1) if len(cups) >= count: e.set() break cups.append(1) logging.info("I finish my job. {}".format(len(cups))) # flag = Truedef boss(e): logging.info("I'm boss, waiting U") # while not flag: # time.sleep(1) e.wait() logging.info("Good job")b = Thread(target=boss, name='boss', args=(event, ))w = Thread(target=worker, name='worker', args=(event, 10))w.start()b.start()print("++end++") # 注意此语句的输出顺序不是固定的"""\++end++2019-06-09 20:03:24,950 boss 8780 I'm boss, waiting U2019-06-09 20:03:24,950 worker 8288 I'm working for U2019-06-09 20:03:36,006 worker 8288 I finish my job. 102019-06-09 20:03:36,006 boss 8780 Good job"""
总结:
使用同一个Event对象的标记flag。 谁wait就是等到flag变为True,或等到超时返回False。不限制等待的个数。from threading import Event, Threadimport logginglogging.basicConfig(level=logging.INFO)def do(event: Event, interval: int): while not event.wait(interval): # 条件中使用,返回True或者False logging.info('do sth.')e = Event()Thread(target=do, args=(e, 3)).start()e.wait(10) # 也可以使用time.sleep(10)e.set()print('main exit')"""\INFO:root:do sth.INFO:root:do sth.INFO:root:do sth.main exit"""
Event的wait优于time.sleep,它会更快的切换到其它线程,提高并发效率。
实现Timer,延时执行的线程,延时计算add(x, y)
思路: Timer的构造函数中参数得有哪些? 如何实现start启动一个线程执行 如何cancel取消执行任务from threading import Event, Threadimport datetimeimport logging# FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"# logging.basicConfig(format=FORMAT, level=logging.INFO)logging.basicConfig(level=logging.INFO)def add(x: int, y: int): logging.info(x + y)class Timer: def __init__(self, interval, fn, *args, **kwargs): self.interval = interval self.fn = fn self.args = args self.kwargs = kwargs self.event = Event() def start(self): Thread(target=self.__run).start() def cancel(self): return self.event.set() def __run(self): start = datetime.datetime.now() logging.info('waiting') self.event.wait(self.interval if not self.event.is_set(): self.fn(*self.args, **self.kwargs) delta = (datetime.datetime.now() - start).total_seconds() logging.info('finished {}'.format(delta)) self.event.set()t = Timer(10, add, 4, 50)t.start()# t.cancel()e = Event()e.wait(4)print("+++++++++end+++++++++")"""\INFO:root:waiting+++++++++end+++++++++INFO:root:54INFO:root:finished 10.00982"""
锁,凡是存在共享资源争抢的地方都可以使用锁,从而保证只有一个使用者可以完全使用这个资源。锁,一旦线程获得锁,其它试图获取锁的线程将被阻塞。
需求: 订单要求生产1000 个杯子,组织10个工人生产import threadingfrom threading import Thread, Lockimport loggingimport timeFORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'logging.basicConfig(format=FORMAT, level=logging.INFO)cups = []lock = Lock()def worker(l: threading.Lock, count=10): logging.info("I'm working for U.") flag = False while True: l.acquire() # 加了一把锁,只有当前线程能使用该资源,其他线程全部阻塞 if len(cups) >= count: flag = True time.sleep(0.0001) # 为了看出线程切换效果 # l.release() # 此位置不能释放锁,假设此时已经生产了999个杯子,此时释放锁,很多线程都可以看到999个杯子,没到1000,会继续生产,结果肯定会超过1000 if not flag: cups.append(1) l.release() if flag: # 注意flag是局部变量,线程之间互不干扰,线程函数压栈是独立的 break # l.release() # 此位置不行,前面假设已经生产了1000,直接break了,不会再释放锁,就形成了死锁 logging.info('I finished. cups = {}'.format(len(cups)))for _ in range(10): Thread(target=worker, args=(lock, 1000)).start()
计数器类,可以加、可以减
from threading import Thread, Lockimport threadingimport timeclass Counter: def __init__(self): self._val = 0 self._lock = Lock() @property def value(self): with self._lock: # 此位置可以不加锁,但是加锁是一个好习惯 return self._val def inc(self): with self._lock: # 上下文管理 self._val += 1 def dec(self): with self._lock: self._val -= 1def run(r: Counter, count=100): for _ in range(count): for j in range(-50, 50): if j < 0: r.dec() else: r.inc()thread_list = []c = Counter()c1 = 10 # 线程数c2 = 10for i in range(c1): t = Thread(target=run, args=(c, c2)) t.start() # 多线程,线程之间相互干扰,所以考虑添加锁 thread_list.append(t)for x in thread_list: x.join() # 注意分析,为什么可以?考虑速度最慢的线程和速度最快的线程print(c.value, "~~~~~~~~~~~") # 注意这是主线程的语句,必须等其他工作线程都结束了,再执行该条语句
print(c.value) 这一句在主线程中,很早就执行了。退出条件是,只剩下主线程的时候。这样的改造后,代码可以保证最后得到的value值一定是0。
加锁、解锁:
一般来说,加锁就需要解锁,但是加锁后解锁前,还要有一些代码执行,就有可能抛异常,一旦出现异常,锁是无法释放,但是当前线程可能因为这个异常被终止了,这就产生了死锁。
加锁、解锁常用语句:锁适用于访问和修改同一个共享资源的时候,即读写同一个资源的时候。
如果全部都是读取同一个共享资源需要锁吗?不需要。因为这时可以认为共享资源是不可变的,每一次读取它都是一样的值,所以不用加锁 使用锁的注意事项:不使用锁,有了效率,但是结果是错的。
使用了锁,效率低下,但是结果是对的。 所以,我们是为了效率要错误结果呢?还是为了对的结果,让计算机去计算吧lock = threading.Lock()def worker(l: threading.Lock): print("enter ~~~~~~~") while True: time.sleep(1) f = l.acquire(False) # 非阻塞的锁 if f: print("True") # 此句能输出,说明捕获到了锁 break print("exit")for y in range(3): t = threading.Thread(target=worker, name="worker-{}".format(y), args=(lock,)) t.start()"""\enter ~~~~~~~enter ~~~~~~~enter ~~~~~~~Trueexit"""
转载地址:http://apfvi.baihongyu.com/