简单的SEDA框架实现
/** * 段容器接口定义 * * @author wuyuhou * */public interface IStagedContainer {//取得标识String getId();//发送事件void sendEvent(IEvent e);// 启动void start();// 停止void stop();}/** * 事件接口 * * @author yourname (mailto:yourname@primeton.com) */public interface IEvent {String getId();<T> T getData();void setData(Object data);Throwable getException(); void setException(Throwable exception);}/** * 事件处理接口 * * @author yourname (mailto:yourname@primeton.com) */public interface IEventHandler {/** * 事件处理,原则上不允许抛出异常 * * @param event */void handleEvent(IEvent event);}/** * 事件处理回调接口 * * @author wuyuhou * */public interface IEventCallback {/** * 回掉处理 * * @param event */void callback(IEvent event);}/** * 事件路由器 * * @author wuyuhou * */public interface IEventRouter {/** * 路由处理 * * @param event */void route(IEvent event);}/** * 段容器实现 * * @author yourname (mailto:yourname@primeton.com) */public class StagedContainer implements IStagedContainer {private static final ILogger log = DebugLoggerFactory.getLogger(StagedContainer.class);//唯一标识private String id = null;//执行器(线程池管理)private Executor executor = null;//事件队列(可以持久化实现)private IQueue<IEvent> queue = null;//事件处理者private IEventHandler eventHandler = null;//事件路由处理private IEventRouter eventRouter = null;//事件处理主线程private EventHandelMainThread eventHandelMainThread = null;// 空闲间隔时间,默认一秒private int idleTime = 1000;private boolean isStarted = false;/** * * 构造方法 * */public StagedContainer(String id) {if (id == null || id.trim().length() == 0) {throw new IllegalArgumentException("StagedContainerId is null!");}this.id = id;}public String getId() {return id;} protected Executor getExecutor() {return executor;}protected IQueue<IEvent> getQueue() {return queue;}protected void setExecutor(Executor executor) {if (executor == null) {throw new IllegalArgumentException("executor is null!");}this.executor = executor;}protected void setQueue(IQueue<IEvent> queue) {if (queue == null) {throw new IllegalArgumentException("queue is null!");}this.queue = queue;}public IEventHandler getEventHandler() {return eventHandler;}public void setEventHandler(IEventHandler eventHandler) {if (eventHandler == null) {throw new IllegalArgumentException("eventHandler is null!");}this.eventHandler = eventHandler;}public IEventRouter getEventRouter() {return eventRouter;}public void setEventRouter(IEventRouter eventRouter) {if (eventRouter == null) {throw new IllegalArgumentException("eventRouter is null!");}this.eventRouter = eventRouter;}public int getIdleTime() {return idleTime;}public void setIdleTime(int idleTime) {if (idleTime <= 0) {throw new IllegalArgumentException("IdleTime is not less than zero!");}this.idleTime = idleTime;}// 发送事件public void sendEvent(IEvent e) {if (!isStarted) {throw new IllegalStateException("StagedContianer has not yet started!");}if (e == null) {throw new IllegalArgumentException("event is null!");}getQueue().offer(e);}public void start() {if (queue == null) {//默认没有持久化queue = new PersistenceQueue<IEvent>(3000, new DirPersistence<IEvent>("d:/test/queue"));}queue.start();if (executor == null) {//默认是10个线程的定长线程池executor = Executors.newFixedThreadPool(10, new ThreadFactoryWithName("StagedContainer:" + getId()));}eventHandelMainThread = new EventHandelMainThread(getId(), getQueue(), getExecutor(), new IEventHandler() {public void handleEvent(IEvent event) {String eventId = event.getId();//取消事件处理if (CancelEventCache.containCancelEvent(eventId)) {CancelEventCache.removeCancelEvent(eventId);log.warn("Event[{0}] is cancel!", new Object[]{eventId});return;}try {//事件处理IEventHandler eventHandler = getEventHandler();if (eventHandler != null) {eventHandler.handleEvent(event);}} finally {//路由处理IEventRouter eventRouter = getEventRouter();if (eventRouter != null) {eventRouter.route(event);}}}}, idleTime);eventHandelMainThread.start();isStarted = true; }public void stop() {if (executor != null) {//停止线程执行if (executor instanceof ExecutorService) {ExecutorService es = (ExecutorService) executor;try {es.shutdownNow();} catch (Exception e) {try {es.shutdown();}catch (Exception ignore) {}}}}eventHandelMainThread.shutdownThread();if (queue != null) {queue.stop();}executor = null;eventHandelMainThread = null;queue = null;isStarted = false; }// 事件处理主线程static class EventHandelMainThread extends Thread {private boolean isShutdown = false;private IQueue<IEvent> eventQueue = null;private Executor executor = null;private IEventHandler eventHandler = null;private int idleTime;public EventHandelMainThread(String name, IQueue<IEvent> eventQueue, Executor executor, IEventHandler eventHandler, int idleTime) {super(name);this.eventQueue = eventQueue;this.executor = executor;this.eventHandler = eventHandler;this.idleTime = idleTime;}@Overridepublic void run() {while (!isShutdown) {final IEvent event = eventQueue.poll();//如果队列里没有事件if (event == null) {try {Thread.sleep(idleTime);} catch (InterruptedException e) {}continue;}executor.execute(new Runnable(){public void run() {eventHandler.handleEvent(event);}});}}//关闭主线程public void shutdownThread() {isShutdown = true;this.interrupt();}}}/** * 取消的事件缓存 * * @author yourname (mailto:yourname@primeton.com) */public class CancelEventCache {private static Object OBJECT = new Object();private static ConcurrentHashMap<String, Object> eventMap = new ConcurrentHashMap<String, Object>();public static void addCancelEvent(String eventId) {if (eventId == null || eventId.trim().length() == 0) {return;}eventMap.put(eventId, OBJECT);}public static void removeCancelEvent(String eventId) {if (eventId == null || eventId.trim().length() == 0) {return;}eventMap.remove(eventId);}public static boolean containCancelEvent(String eventId) {if (eventId == null || eventId.trim().length() == 0) {return false;}return eventMap.containsKey(eventId);}}/** * Callback管理 * * @author yourname (mailto:yourname@primeton.com) */public class EventCallbackManager {private static ConcurrentHashMap<String, IEventCallback> conMap = new ConcurrentHashMap<String, IEventCallback>();public static IEventCallback getEventCallback(String eventId) {if (eventId == null || eventId.trim().length() == 0) {return null;}IEventCallback callback = conMap.get(eventId);conMap.remove(eventId);return callback;}public static void register(String eventId, IEventCallback callback) {if (eventId == null || eventId.trim().length() == 0 || callback == null) {return;}conMap.put(eventId, callback);}public static void clear() {conMap.clear();}}/** * 可以指定名称的线程工厂 * * @author yourname (mailto:yourname@primeton.com) */public class ThreadFactoryWithName implements ThreadFactory {static final AtomicInteger poolNumber = new AtomicInteger(1);final ThreadGroup group;final AtomicInteger threadNumber = new AtomicInteger(1);final String namePrefix;final boolean isDaemon;public ThreadFactoryWithName(String name) {this(name, false);}public ThreadFactoryWithName(String name, boolean isDaemon) {SecurityManager s = System.getSecurityManager();group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();namePrefix = name == null ? "Seda-Default" : name + "-pool-" + poolNumber.getAndIncrement() + "-thread-";this.isDaemon = isDaemon;}public Thread newThread(Runnable r) {Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement());t.setDaemon(isDaemon);if (t.getPriority() != Thread.NORM_PRIORITY) {t.setPriority(Thread.NORM_PRIORITY);}return t;}}1 楼 yuanyu5237 2012-01-17 楼主您好,请教一下,代码中的IQueue在哪儿? 2 楼 yuanyu5237 2012-01-17 找到了,呵呵,多谢楼主,这恐怕是我在网上找到的唯一一个比较简单可学习的seda框架代码 3 楼 wuyuhou 2012-04-12 yuanyu5237 写道楼主您好,请教一下,代码中的IQueue在哪儿?