实现可中断的线程
在《从nginx日志读取URL来做性能测试》(http://san-yun.iteye.com/blog/1679215)这篇文章中我实现了一个python多线程来做性能测试,但存在一个问题,线程不可中断,包括两方面:
1. 用户通过kill命令来中断
2. 程序满足某种条件中断(比如测试量大于1000则退出)
下面是我的实现:
# -*- coding: utf-8 -*-import reimport urllib2import jsonimport threadingimport Queueimport osimport timefrom time import sleepfrom threading import Lockfrom signal import signal,SIGTERM,SIGINT,SIGQUITclass Executor:def __init__(self,size):self.queue = Queue.Queue()self.tasks = []self.running = Truefor i in range(size):t = Task(self.queue)t.setDaemon(True)t.start()self.tasks.append(t)self._signal()def _signal(self):signal(SIGTERM,self._exit)signal(SIGINT,self._exit)signal(SIGQUIT,self._exit)def _exit(self,a=None,b=None):print 'clean'self.cancel()def cancel(self):for task in self.tasks: while not task.cancel(): passself.running = Falseself.onCancel()def submit(self,call):self.queue.put(call)def join(self):#self.queue.join() queue.join()会阻塞,所以不用while self.running and not self.queue.empty():sleep(0.1)if self.cancelTrigger():self.cancel()def setCancelTrigger(self,cancelTrigger):self.cancelTrigger = cancelTriggerdef setOnCancel(self,onCancel):self.onCancel = onCancelclass Task(threading.Thread):def __init__(self,queue):threading.Thread.__init__(self)self.queue = queueself.running = Trueself.canceled = Falsedef cancel(self):self.canceled=Truereturn self.isCanceled()def isCanceled(self):return self.running==Falsedef run(self):while self.running:call = self.queue.get()call.run()self.queue.task_done()if self.canceled: self.running = False
host = "http://7.s.duitang.com"thread_count = 10 #并发数max_count=100 #运行次数total = 0fail = 0avg = 0lock = Lock()def cancelTrigger(): return total>=max_countdef onCancel(): print 'total %s'%total print 'fail %s'%fail print 'avg %s'%(avg/total)if __name__ == "__main__": f = open("napi","r") executor = Executor(thread_count) executor.setCancelTrigger(cancelTrigger) executor.setOnCancel(onCancel) analysis(f.readlines(),executor) executor.join()
def join(self): #self.queue.join() queue.join()会阻塞,所以不用 while self.running and not self.queue.empty(): sleep(0.1) if self.cancelTrigger(): self.cancel()
def cancel(self): for task in self.tasks: while not task.cancel(): pass self.running = False self.onCancel() def cancel(self): self.canceled=True return self.isCanceled() def run(self): while self.running: call = self.queue.get() call.run() self.queue.task_done() if self.canceled: self.running = False