Programming with JMeter-- ThreadGroup
@Overridepublic void runTest() throws JMeterEngineException{try {tcClassloader = Thread.currentThread().getContextClassLoader();Future<?> f = DefaultExecutorService.getInstance().submit(this);f.get();}catch (Exception err) {stopTest();throw new JMeterEngineException(err);}finally {}} @Overridepublic void run(){ClassLoader oldCl = Thread.currentThread().getContextClassLoader();Thread.currentThread().setContextClassLoader(tcClassloader);try {super.run();}finally {Thread.currentThread().setContextClassLoader(oldCl);}}
?
? ? ? ?JMeterEngine从JMeter的数据结构HashTree中取得ThreadGroup之后,通过ThreadGroup的API: start / stopThread / waitThreadsStopped / tellThreadsToStop / ?numberOfActiveThreads / verifyThreadsStopped 进行测试线程的开始 / 结束,并在相应的阶段通知一些监听器,而真正创建和启动Test Threads的地方是ThreadGroup。看ThreadGroup#start(int groupCount, ListenerNotifier notifier, ListedHashTree threadGroupTree, StandardJMeterEngine engine)方法,有两种启动线程的方式:
1) 如果是dealyedStartup,即HashTree中配置了ThreadGroup.delayedStart为true,ThreadGroup会交给内部类ThreadStart来创建/启动测试线程。ThreadStart作为一个单独的线程,会计算每一个测试线程的delay time,duration time,end time,根据这些time值创建 / 开始 / 结束 测试线程。
? ? ? ?我们知道JMeter中对线程Stop Condition 提供一些默认的选择:LoopController, IfController, RunTime等。当我们需要 ”执行测试 一段时间停止“ 这样的场景时,往往需要选择org.apache.jmeter.control.RunTime对每一个线程的循环进行控制。而如果在线程启动阶段endTime已经过时的话,就没有必要再启动测试线程了。
?
2) 若不是dealyedStartup,就创建并启动所有测试线程。
? ? ??
? ? ? ?ThreadGroup是一个很重要的扩展点,一放面实际项目往往有很多资源需要整合,例如项目中有自己实现统一的线程池,有特定的服务器等等,另一方面对于两种线程启动模式略显复杂,尤其是当出现class load出问题,我们需要控制线程上下文classloader的时候,所以实际项目中重写ThreadGroup往往合并一种模式就够了。以本人个例,需要整合weblogic统一的WorkManager进行线程调度,区分 Remote和Local 的测试线程,大概可以这样参考(略去部分业务相关代码)
?
?ThreadGroup#start:
@Overridepublic void start(int groupCount, ListenerNotifier notifier, ListedHashTree threadGroupTree, StandardJMeterEngine engine){running = true;int numThreads = getNumThreads();int rampUp = getRampUp();double perThreadDelay = rampUp * 1000.0 / getNumThreads();log.info("Starting thread group number " + groupCount + " threads " + numThreads);final JMeterContext context = JMeterContextService.getContext();long now = System.currentTimeMillis();for (int i = 0; running && i < numThreads; i++) {final CustomJMeterThread jmThread = makeThread(groupCount, notifier, threadGroupTree, engine, i, context);jmThread.setInitialDelay((int) (i * perThreadDelay));scheduleThread(jmThread, now); //ITestResourceHolder:自定义接口用来存放一些共用的测试变量,重写JMeterEngine时顺便实现了该接口,故可在此拿到很多需要的东西final ITestResourceHolder resourceHolder = (ITestResourceHolder) engine;Runnable runnable = null;switch ((TestType) resourceHolder.getResourceMap().get(TestType.class.getName())) {case LOCAL:runnable = new DaemonizableNamedRunnable() {@Overridepublic String getName(){return jmThread.getThreadName();}@Overridepublic boolean isDaemon(){return true;}@Overridepublic void release(){}@Overridepublic void run(){jmThread.run();}};break;case REMOTE:final Subject subj = Security.getCurrentSubject();runnable = new DaemonizableNamedRunnable() {@Overridepublic String getName(){return jmThread.getThreadName();}@Overridepublic boolean isDaemon(){return true;}@Overridepublic void release(){}@Overridepublic void run(){Security.runAs(subj, new PrivilegedAction<Void>() {@Overridepublic Void run(){jmThread.run();return null;}});}};break;}Future<?> f = DefaultExecutorService.getInstance().submit(runnable);allThreads.put(jmThread, f);}log.info("Started thread group number " + groupCount);}?
? ? ? start已经重写,stop之类相关的也要相应重写:
? ? ?
/* (non-Javadoc)* @see org.apache.jmeter.threads.ThreadGroup#stop()*/@Overridepublic void stop(){running = false;for (JMeterThread item : allThreads.keySet()) {item.stop();}}/* (non-Javadoc) * @see org.apache.jmeter.threads.ThreadGroup#tellThreadsToStop() */@Overridepublic void tellThreadsToStop(){running = false;for (Entry<JMeterThread, Future<?>> entry : allThreads.entrySet()) {JMeterThread item = entry.getKey();item.stop(); // set stop flagitem.interrupt(); // interrupt sampler if possibleFuture<?> f = entry.getValue();if (f != null) {f.cancel(true);allThreads.remove(item);}}}/* (non-Javadoc) * @see org.apache.jmeter.threads.ThreadGroup#waitThreadsStopped() */@Overridepublic void waitThreadsStopped(){for (Future<?> f : allThreads.values()) {try {if (getDuration() > 0) {f.get(getDuration(), TimeUnit.SECONDS);}else {f.get();}}catch (InterruptedException e) {return;}catch (ExecutionException e) {log.error("Exception occurred when try to retrive future value from JMeterThread ", e);}catch (TimeoutException e) {tellThreadsToStop();return;}}allThreads.clear();}?
?