首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 其他相关 >

兑现可中断的线程

2012-10-31 
实现可中断的线程在《从nginx日志读取URL来做性能测试》(http://san-yun.iteye.com/blog/1679215)这篇文章中

实现可中断的线程
在《从nginx日志读取URL来做性能测试》(http://san-yun.iteye.com/blog/1679215)这篇文章中我实现了一个python多线程来做性能测试,但存在一个问题,线程不可中断,包括两方面:
1. 用户通过kill命令来中断
2. 程序满足某种条件中断(比如测试量大于1000则退出)

下面是我的实现:

# -*- coding: utf-8 -*-import reimport urllib2import jsonimport threadingimport Queueimport osimport timefrom time import sleepfrom threading import Lockfrom signal import signal,SIGTERM,SIGINT,SIGQUITclass Executor:def __init__(self,size):self.queue = Queue.Queue()self.tasks = []self.running = Truefor i  in range(size):t = Task(self.queue)t.setDaemon(True)t.start()self.tasks.append(t)self._signal()def _signal(self):signal(SIGTERM,self._exit)signal(SIGINT,self._exit)signal(SIGQUIT,self._exit)def _exit(self,a=None,b=None):print 'clean'self.cancel()def cancel(self):for task in self.tasks:    while not task.cancel():        passself.running = Falseself.onCancel()def submit(self,call):self.queue.put(call)def join(self):#self.queue.join() queue.join()会阻塞,所以不用while self.running and not self.queue.empty():sleep(0.1)if  self.cancelTrigger():self.cancel()def setCancelTrigger(self,cancelTrigger):self.cancelTrigger = cancelTriggerdef setOnCancel(self,onCancel):self.onCancel = onCancelclass Task(threading.Thread):def __init__(self,queue):threading.Thread.__init__(self)self.queue = queueself.running = Trueself.canceled = Falsedef cancel(self):self.canceled=Truereturn self.isCanceled()def isCanceled(self):return self.running==Falsedef run(self):while self.running:call = self.queue.get()call.run()self.queue.task_done()if self.canceled:    self.running = False


客户端使用:
host = "http://7.s.duitang.com"thread_count = 10 #并发数max_count=100     #运行次数total = 0fail = 0avg = 0lock = Lock()def cancelTrigger():    return total>=max_countdef onCancel():    print 'total %s'%total    print 'fail %s'%fail    print 'avg %s'%(avg/total)if __name__ == "__main__":    f = open("napi","r")    executor = Executor(thread_count)    executor.setCancelTrigger(cancelTrigger)    executor.setOnCancel(onCancel)    analysis(f.readlines(),executor)    executor.join()


在实现的时候比较纠结的点:
1. Task如果不是daemon会导致任务永远不会停止,但是如果Task是daemon线程,main线程结束之后daemon就结束了。所以这时需要实现一个join()来阻塞main线程:
  def join(self):          #self.queue.join() queue.join()会阻塞,所以不用          while self.running and not self.queue.empty():              sleep(0.1)              if  self.cancelTrigger():                  self.cancel()  


2. 线程应该可cancel的,之前是直接修改while isrunning的变量,但这样会导致task其实还没有完成的停止下来。所以对于task我引入两个变量来实现安全的停止。


    def cancel(self):          for task in self.tasks:              while not task.cancel():                  pass            self.running = False          self.onCancel()      def cancel(self):          self.canceled=True          return self.isCanceled()      def run(self):          while self.running:              call = self.queue.get()              call.run()              self.queue.task_done()              if self.canceled:                  self.running = False  


3.对于signal专门写了一片文章记录()

热点排行