首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 开源软件 >

多线程测试对照infobright取数

2013-08-04 
多线程测试对比infobright取数#! /usr/bin/env python#codingutf-8import threading,sysimport randomimp

多线程测试对比infobright取数

#! /usr/bin/env python#coding=utf-8import threading,sysimport randomimport timefrom Queue import Queueimport MySQLdb,time,datetimeisFinish=Falsecount_num_sql='''select count(1) t from fact_user_msg '''#con_sql='''where  ( d_o_lastordertime>='2012-06-19' and s_usr_level>60 and i_u_verified=1 ) '''con_sql='''where  ( i_u_sex=1 and s_o_usermob_type=2 and i_o_ordesnum_3m=4  and i_u_verified=1 ) '''#con_sq3='''where  ( i_u_sex=1 and s_o_usermob_type=2 and i_o_ordesnum_3m=4  and i_u_verified=1 and f_o_total_spend_3m<200 and f_o_kdj_3m>300 and i_o_sendNumber_3m<10) '''fetch_data_sql='''select s_o_usermob from fact_user_msg '''pageSize=50000condition = threading.Condition()class Producer(threading.Thread):    def __init__(self, threadname, queue):        threading.Thread.__init__(self, name = threadname)        self.sharedata = queue    def run(self):        #建立和数据库系统的连接        conn_src = MySQLdb.connect(host='10.28.178.11', user='root',passwd='123456',db='jdsms',port=3307)        #conn_src = MySQLdb.connect(host='10.28.178.11', user='root',passwd='123456',db='jdsms',port=3306)        #获取操作游标        cursor_src = conn_src.cursor()        #cursor_src.execute("set autocommit = 1")         startTime=datetime.datetime.now()        print "----start time:"+startTime.strftime("%a, %d %b %Y %H:%M:%S +0000")        cursor_src.execute(count_num_sql+con_sql)        result = cursor_src.fetchone()        endTime=datetime.datetime.now()        print "----end time:"+endTime.strftime("%a, %d %b %Y %H:%M:%S +0000")        print "count cost time(s):"+str((endTime-startTime).seconds)        total=result[0]        print "data total lines:"+str(total)                 maxNum=500000        if(total>maxNum):            total=maxNum         for i in range(0,total,pageSize):            #print self.getName()+'add to queue'+str(i)            self.sharedata.put(i)            #print self.getName()+'add end to queue'+str(i)            #time.sleep(random.randrange(10)/10.0)        #time.sleep(8)        for j in range(1,5):            #print 'add None to queue'+str(j)            self.sharedata.put(None)        #print '======== NEW ==========='        isFinish=True        conn_src.close();        if condition.acquire():             condition.notify()        condition.release()                print self.getName()+'Finished'        # Consumer threadclass Consumer(threading.Thread):    def __init__(self, threadname, queue):        threading.Thread.__init__(self, name = threadname)        self.sharedata = queue    def run(self):        conn_src = MySQLdb.connect(host='10.28.178.11', user='root',passwd='123456',db='jdsms',port=3307,charset='utf8')        #conn_src = MySQLdb.connect(host='10.28.178.11', user='root',passwd='123456',db='jdsms',port=3306,charset='utf8')                #获取操作游标        cursor_src = conn_src.cursor()                while True:            try:                item=self.sharedata.get()                if item==None:                      #print self.getName()+"is empty2"                      self.sharedata.task_done()                      break                #print self.getName()+'got a value:'+str(item)                print "exec fetcch start"+str(item)+datetime.datetime.now().strftime("%a, %d %b %Y %H:%M:%S +0000")                cursor_src.execute(fetch_data_sql+con_sql+" limit "+str(item)+","+str(pageSize))                cursor_src.fetchone()                endTime1=datetime.datetime.now()                print "fetcch----end time:"+endTime1.strftime("%a, %d %b %Y %H:%M:%S +0000")                                self.sharedata.task_done()            except Queue.empty:                print sys.exc_info()[:2]                break            except :                print "over"                break                                  #time.sleep(random.randrange(10)/10.0)        print self.getName()+'Finished'        #self.sharedata.task_done()        conn_src.close();        return# Main threaddef main():    queue = Queue()    producer = Producer('Producer', queue)    consumer1 = Consumer('Consumer1', queue)    consumer2 = Consumer('Consumer2', queue)    consumer3 = Consumer('Consumer3', queue)    consumer4 = Consumer('Consumer4', queue)    print 'Starting threads ...'    producer.start()    startTime1=datetime.datetime.now()    print "----start1 time:"+startTime1.strftime("%a, %d %b %Y %H:%M:%S +0000")        consumer1.start()    consumer2.start()    consumer3.start()    consumer4.start()    if condition.acquire():       condition.wait()        queue.join()    endTime1=datetime.datetime.now()    print "----end1 time:"+endTime1.strftime("%a, %d %b %Y %H:%M:%S +0000")    print "fetcch data cost time(s):"+str((endTime1-startTime1).seconds)        print 'All threads have terminated.'if __name__ == '__main__':    main()

热点排行