1.Thread.join([timeout])
Wait until the thread terminates. This blocks the calling thread until the thread whose method is called terminates – either normally or through an unhandled exception – or until the optional timeout occurs.
等待进程结束。也就是说,其屏蔽调用线程,直到此线程方法终止(要么正常执行完毕,或者未处理的异常,或者时间超时)
下面通过例子来说明:
没有设定timeout情况,Main 线程启动,work1 和work2 线程执行,完毕退出,Main线程执行终止
1 import os 2 import threading 3 import time 4 import logging 5 import random 6 7 def work1(): 8 count=0 9 while count<=5:10 threadname= threading.currentThread()11 wait_time=random.randrange(1,4)12 print("%s,count =%s wait for =%s s,time %s "%(threadname,count,wait_time,time.ctime()[-13:]))13 time.sleep(wait_time)14 count +=115 16 def work2():17 i=018 while i<=5:19 threadname= threading.currentThread()20 wait_time=random.randrange(1,4)21 print("%s,i =%s wait for =%s s,time %s "%(threadname,i,wait_time,time.ctime()[-13:]))22 time.sleep(wait_time)23 i +=124 25 if __name__ =="__main__":26 mainthread= threading.currentThread()27 print '%s main thread is waiting for exit'% mainthread28 test1=threading.Thread(name='work1',target=work1)29 test2=threading.Thread(name='work2',target=work2)30 test1.start()31 test2.start()32 test1.join()33 test2.join() 34 print 'main thread finish'
2个线程设定超时时间work1 5s,work2 4s,9s之后调用线程结束而不等待超时的线程:
1 import os 2 import threading 3 import time 4 import logging 5 import random 6 7 def work1(): 8 count=0 9 while count<=5:10 threadname= threading.currentThread()11 wait_time=random.randrange(1,4)12 print("%s,count =%s wait for =%s s,time %s "%(threadname,count,wait_time,time.ctime()[-13:]))13 time.sleep(wait_time)14 count +=115 16 def work2():17 i=018 while i<=5:19 threadname= threading.currentThread()20 wait_time=random.randrange(1,4)21 print("%s,i =%s wait for =%s s,time %s "%(threadname,i,wait_time,time.ctime()[-13:]))22 time.sleep(wait_time)23 i +=124 25 if __name__ =="__main__":26 mainthread= threading.currentThread()27 print '%s main thread is waiting for exit'% mainthread28 test1=threading.Thread(name='work1',target=work1)29 test2=threading.Thread(name='work2',target=work2)30 test1.start()31 test2.start()32 test1.join(4)33 test2.join(5) 34 print 'main thread finish
2.Producer and comsumer
1 import Queue 2 import threading 3 import random 4 5 #writelock =threading.Lock() 6 class Producer(threading.Thread): 7 def __init__(self,q,con,name): 8 super(Producer,self).__init__() 9 self.q = q10 self.con = con11 self.name = name12 print "produce" +self.name+"started"13 def run(self):14 while 1:15 #global writelock 16 self.con.acquire()#acquire the lock 17 if self.q.full():#if queue is full18 #with writelock:#output info19 print "queue is full, producer wait"20 self.con.wait()#wait for resource21 else:22 value = random.ranint(0,10)23 #with writelock:24 print self.name+"put value"+self.name+":"+str(value)+"into queue"25 self.q.put((self.name+":"+str(value)))#put to queue26 self.con.notify()#inform consumer27 self.con.release()#release the lock28 29 class Consumer(threading.Thread):30 def __init__(self,q,con,name):31 super(Consumer,self).__init__()32 self.q = q33 self.con = con34 self.name = name35 print "consume" +self.name+"started\n"36 def run(self):37 while 1:38 #global writelock39 self.con.acquire()40 if self.q.empty():#if empty41 #with writelock:42 print "queue is empty,consumer wait"43 self.con.wait()#wait the resource ready44 else:45 value = self.q.get()#get one element from queue46 #with writelock:47 print self.name +"get value"+ value+"from queue"48 self.q.notify()#inform producer49 self.con.release()#release the lock50 51 52 if __name__ == "__main__":53 print "start to run\n"54 q = Queue.Queue(10)55 con = threading.Condition()56 p = Producer(q,con,"p1")57 p.start()58 p1 = Producer(q,con,"p2")59 p1.start()60 c1 = Consumer(q,con,"c1")61 c1.start()62 63 64 65 66 67
3.Queue
programming python 4th 205页
Queue 提供标准的队列数据结构,实现python对象的先进先出,其可包含基本类型(string,list,dictionary......),类实例,任何可调用函数或绑定的方法等。但是Queue不像正常的list,因为其自动被线程获取和释放锁操作。
#coding:utf-8"import logging,threading,timeimport Queuedef fibo_task(cond): with cond: while shared_queue.empty(): logger.info("[%s]- waiting for element in queue......" % threading.current_thread().name) cond.wait() else: value =shared_queue.get() a,b=0,1 for item in range(value): a,b=b,a+b fibo_dict[value] = a shared_queue.task_done() time.sleep(2) logger.debug("[%s] fibo of key[%d] with result[%d]" % (threading.current_thread().name,value,fibo_dict[value])) def queue_task(cond): logging.debug('starting list data to queue......') with cond: for data in impit_list: shared_queue.put(data) #[shared_queue.put(data) for data in impit_list] cond.notifyAll()if __name__ == "__main__": logger =logging.getLogger() logger.setLevel(logging.DEBUG) formatter =logging.Formatter('%(asctime)s =%(message)s') ch=logging.StreamHandler() ch.setLevel(logging.DEBUG) ch.setFormatter(formatter) logger.addHandler(ch) fibo_dict={} shared_queue =Queue.Queue() impit_list =[3,10,5,7] queue_cond=threading.Condition() print "main thread starting......" threads =[threading.Thread(target=fibo_task,args=(queue_cond,)) for i in range(4)] #for thread in threads: #thread.setDaemon(True) #print 'daemon is %d' % thread.isDaemon() [thread.start() for thread in threads] prod = threading.Thread(name='queue_task_thread',target=queue_task,args=(queue_cond,)) prod.setDaemon(True) prod.start() [thread.join() for thread in threads] print "main thread done"