首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 软件管理 > 软件架构设计 >

openfire(八)openfire中的mina框架使用

2013-03-22 
openfire(8)openfire中的mina框架使用? ? ? ? ? ? ? ? ? ? ? ? e.getMessage())? ? ? ? ? ? ? ? Log.erro

openfire(8)openfire中的mina框架使用

? ? ? ? ? ? ? ? ? ? ? ? e.getMessage());

? ? ? ? ? ? ? ? Log.error(LocaleUtils.getLocalizedString("admin.error.socket-setup"), e);

? ? ? ? ? ? }

? ? ? ? }

? ? }

进入SocketAcceptThread会发现其真面目,

public class SocketAcceptThread extends Thread {

?

? ? /**

? ? ?* Holds information about the port on which the server will listen for connections.

? ? ?*/

? ? private ServerPort serverPort;

?

? ? private SocketAcceptingMode acceptingMode;

?

? ? public SocketAcceptThread(ConnectionManager connManager, ServerPort serverPort)

? ? ? ? ? ? throws IOException {

? ? ? ? super("Socket Listener at port " + serverPort.getPort());

? ? ? ? // Listen on a specific network interface if it has been set.

? ? ? ? String interfaceName = JiveGlobals.getXMLProperty("network.interface");

? ? ? ? InetAddress bindInterface = null;

? ? ? ? if (interfaceName != null) {

? ? ? ? ? ? if (interfaceName.trim().length() > 0) {

? ? ? ? ? ? ? ? bindInterface = InetAddress.getByName(interfaceName);

? ? ? ? ? ? ? ? // Create the new server port based on the new bind address

? ? ? ? ? ? ? ? serverPort = new ServerPort(serverPort.getPort(),

? ? ? ? ? ? ? ? ? ? ? ? serverPort.getDomainNames().get(0), interfaceName, serverPort.isSecure(),

? ? ? ? ? ? ? ? ? ? ? ? serverPort.getSecurityType(), serverPort.getType());

? ? ? ? ? ? }

? ? ? ? }

? ? ? ? this.serverPort = serverPort;

? ? ? ? // Set the blocking reading mode to use

? ? ? ? acceptingMode = new BlockingAcceptingMode(connManager, serverPort, bindInterface);

? ? }

?

? ? /**

? ? ?* Retrieve the port this server socket is bound to.

? ? ?*

? ? ?* @return the port the socket is bound to.

? ? ?*/

? ? public int getPort() {

? ? ? ? return serverPort.getPort();

? ? }

?

? ? /**

? ? ?* Returns information about the port on which the server is listening for connections.

? ? ?*

? ? ?* @return information about the port on which the server is listening for connections.

? ? ?*/

? ? public ServerPort getServerPort() {

? ? ? ? return serverPort;

? ? }

?

? ? /**

? ? ?* Unblock the thread and force it to terminate.

? ? ?*/

? ? public void shutdown() {

? ? ? ? acceptingMode.shutdown();

? ? }

?

? ? /**

? ? ?* About as simple as it gets. ?The thread spins around an accept

? ? ?* call getting sockets and handing them to the SocketManager.

? ? ?*/

? ? @Override

public void run() {

? ? ? ? acceptingMode.run();

? ? ? ? // We stopped accepting new connections so close the listener

? ? ? ? shutdown();

? ? }

}

?

class BlockingAcceptingMode extends SocketAcceptingMode {

?

private static final Logger Log = LoggerFactory.getLogger(BlockingAcceptingMode.class);

?

? ? protected BlockingAcceptingMode(ConnectionManager connManager, ServerPort serverPort,

? ? ? ? ? ? InetAddress bindInterface) throws IOException {

? ? ? ? super(connManager, serverPort);

? ? ? ? serverSocket = new ServerSocket(serverPort.getPort(), -1, bindInterface);

? ? }

?

? ? /**

? ? ?* About as simple as it gets. ?The thread spins around an accept

? ? ?* call getting sockets and creating new reading threads for each new connection.

? ? ?*/

? ? @Override

public void run() {

? ? ? ? while (notTerminated) {

? ? ? ? ? ? try {

? ? ? ? ? ? ? ? Socket sock = serverSocket.accept();

? ? ? ? ? ? ? ? if (sock != null) {

? ? ? ? ? ? ? ? ? ? Log.debug("Connect " + sock.toString());

? ? ? ? ? ? ? ? ? ? SocketReader reader =

? ? ? ? ? ? ? ? ? ? ? ? ? ? connManager.createSocketReader(sock, false, serverPort, true);

? ? ? ? ? ? ? ? ? ? Thread thread = new Thread(reader, reader.getName());

? ? ? ? ? ? ? ? ? ? thread.setDaemon(true);

? ? ? ? ? ? ? ? ? ? thread.setPriority(Thread.NORM_PRIORITY);

? ? ? ? ? ? ? ? ? ? thread.start();

? ? ? ? ? ? ? ? }

? ? ? ? ? ? }

? ? ? ? ? ? catch (IOException ie) {

? ? ? ? ? ? ? ? if (notTerminated) {

? ? ? ? ? ? ? ? ? ? Log.error(LocaleUtils.getLocalizedString("admin.error.accept"),

? ? ? ? ? ? ? ? ? ? ? ? ? ? ie);

? ? ? ? ? ? ? ? }

? ? ? ? ? ? }

? ? ? ? ? ? catch (Throwable e) {

? ? ? ? ? ? ? ? Log.error(LocaleUtils.getLocalizedString("admin.error.accept"), e);

? ? ? ? ? ? }

? ? ? ? }

? ? }

}

//此处只要这个服务器,接受到一个请求,就会创建一个新的线程去解析相关数据,并做逻辑处理。

?

?@Override

public void run() {

? ? ? ? try {

? ? ? ? ? ? socketReader.reader.getXPPParser().setInput(new InputStreamReader(

? ? ? ? ? ? ? ? ? ? ServerTrafficCounter.wrapInputStream(socket.getInputStream()), CHARSET));

?

? ? ? ? ? ? // Read in the opening tag and prepare for packet stream

? ? ? ? ? ? try {

? ? ? ? ? ? ? ? socketReader.createSession();

? ? ? ? ? ? }

? ? ? ? ? ? catch (IOException e) {

? ? ? ? ? ? ? ? Log.debug("Error creating session", e);

? ? ? ? ? ? ? ? throw e;

? ? ? ? ? ? }

?

? ? ? ? ? ? // Read the packet stream until it ends

? ? ? ? ? ? if (socketReader.session != null) {

? ? ? ? ? ? ? ? readStream();

? ? ? ? ? ? }

?

? ? ? ? }

? ? ? ? catch (EOFException eof) {

? ? ? ? ? ? // Normal disconnect

? ? ? ? }

? ? ? ? catch (SocketException se) {

? ? ? ? ? ? // The socket was closed. The server may close the connection for several

? ? ? ? ? ? // reasons (e.g. user requested to remove his account). Do nothing here.

? ? ? ? }

? ? ? ? catch (AsynchronousCloseException ace) {

? ? ? ? ? ? // The socket was closed.

? ? ? ? }

? ? ? ? catch (XmlPullParserException ie) {

? ? ? ? ? ? // It is normal for clients to abruptly cut a connection

? ? ? ? ? ? // rather than closing the stream document. Since this is

? ? ? ? ? ? // normal behavior, we won't log it as an error.

? ? ? ? ? ? // Log.error(LocaleUtils.getLocalizedString("admin.disconnect"),ie);

? ? ? ? }

? ? ? ? catch (Exception e) {

? ? ? ? ? ? if (socketReader.session != null) {

? ? ? ? ? ? ? ? Log.warn(LocaleUtils.getLocalizedString("admin.error.stream") + ". Session: " +

? ? ? ? ? ? ? ? ? ? ? ? socketReader.session, e);

? ? ? ? ? ? }

? ? ? ? }

? ? ? ? finally {

? ? ? ? ? ? if (socketReader.session != null) {

? ? ? ? ? ? ? ? if (Log.isDebugEnabled()) {

? ? ? ? ? ? ? ? ? ? Log.debug("Logging off " + socketReader.session.getAddress() + " on " + socketReader.connection);

? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? try {

? ? ? ? ? ? ? ? ? ? socketReader.session.close();

? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? catch (Exception e) {

? ? ? ? ? ? ? ? ? ? Log.warn(LocaleUtils.getLocalizedString("admin.error.connection") + socket.toString());

? ? ? ? ? ? ? ? }

? ? ? ? ? ? }

? ? ? ? ? ? else {

? ? ? ? ? ? ? ? // Close and release the created connection

? ? ? ? ? ? ? ? socketReader.connection.close();

? ? ? ? ? ? ? ? Log.debug(LocaleUtils.getLocalizedString("admin.error.connection") + socket.toString());

? ? ? ? ? ? }

? ? ? ? ? ? socketReader.shutdown();

? ? ? ? }

? ? }

?

//createSession中进行报文的解析,

protected void createSession()

? ? ? ? ? ? throws UnauthorizedException, XmlPullParserException, IOException {

? ? ? ? XmlPullParser xpp = reader.getXPPParser();

? ? ? ? for (int eventType = xpp.getEventType(); eventType != XmlPullParser.START_TAG;) {

? ? ? ? ? ? eventType = xpp.next();

? ? ? ? }

?

? ? ? ? // Check that the TO attribute of the stream header matches the server name or a valid

? ? ? ? // subdomain. If the value of the 'to' attribute is not valid then return a host-unknown

? ? ? ? // error and close the underlying connection.

? ? ? ? System.out.println(reader.getXPPParser().toString());

? ? ? ? String host = reader.getXPPParser().getAttributeValue("", "to");

? ? ? ? if (validateHost() && isHostUnknown(host)) {

? ? ? ? ? ? StringBuilder sb = new StringBuilder(250);

? ? ? ? ? ? sb.append("<?xml version='1.0' encoding='");

? ? ? ? ? ? sb.append(CHARSET);

? ? ? ? ? ? sb.append("'?>");

? ? ? ? ? ? // Append stream header

? ? ? ? ? ? sb.append("<stream:stream ");

? ? ? ? ? ? sb.append("from="").append(serverName).append("" ");

? ? ? ? ? ? sb.append("id="").append(StringUtils.randomString(5)).append("" ");

? ? ? ? ? ? sb.append("xmlns="").append(xpp.getNamespace(null)).append("" ");

? ? ? ? ? ? sb.append("xmlns:stream="").append(xpp.getNamespace("stream")).append("" ");

? ? ? ? ? ? sb.append("version="1.0">");

? ? ? ? ? ? // Set the host_unknown error

? ? ? ? ? ? StreamError error = new StreamError(StreamError.Condition.host_unknown);

? ? ? ? ? ? sb.append(error.toXML());

? ? ? ? ? ? // Deliver stanza

? ? ? ? ? ? connection.deliverRawText(sb.toString());

? ? ? ? ? ? // Close the underlying connection

? ? ? ? ? ? connection.close();

? ? ? ? ? ? // Log a warning so that admins can track this cases from the server side

? ? ? ? ? ? Log.warn("Closing session due to incorrect hostname in stream header. Host: " + host +

? ? ? ? ? ? ? ? ? ? ". Connection: " + connection);

? ? ? ? }

?

? ? ? ? // Create the correct session based on the sent namespace. At this point the server

? ? ? ? // may offer the client to secure the connection. If the client decides to secure

? ? ? ? // the connection then a <starttls> stanza should be received

? ? ? ? else if (!createSession(xpp.getNamespace(null))) {

? ? ? ? ? ? // No session was created because of an invalid namespace prefix so answer a stream

? ? ? ? ? ? // error and close the underlying connection

? ? ? ? ? ? StringBuilder sb = new StringBuilder(250);

? ? ? ? ? ? sb.append("<?xml version='1.0' encoding='");

? ? ? ? ? ? sb.append(CHARSET);

? ? ? ? ? ? sb.append("'?>");

? ? ? ? ? ? // Append stream header

? ? ? ? ? ? sb.append("<stream:stream ");

? ? ? ? ? ? sb.append("from="").append(serverName).append("" ");

? ? ? ? ? ? sb.append("id="").append(StringUtils.randomString(5)).append("" ");

? ? ? ? ? ? sb.append("xmlns="").append(xpp.getNamespace(null)).append("" ");

? ? ? ? ? ? sb.append("xmlns:stream="").append(xpp.getNamespace("stream")).append("" ");

? ? ? ? ? ? sb.append("version="1.0">");

? ? ? ? ? ? // Include the bad-namespace-prefix in the response

? ? ? ? ? ? StreamError error = new StreamError(StreamError.Condition.bad_namespace_prefix);

? ? ? ? ? ? sb.append(error.toXML());

? ? ? ? ? ? connection.deliverRawText(sb.toString());

? ? ? ? ? ? // Close the underlying connection

? ? ? ? ? ? connection.close();

? ? ? ? ? ? // Log a warning so that admins can track this cases from the server side

? ? ? ? ? ? Log.warn("Closing session due to bad_namespace_prefix in stream header. Prefix: " +

? ? ? ? ? ? ? ? ? ? xpp.getNamespace(null) + ". Connection: " + connection);

? ? ? ? }

? ? }

将相关信息转换为XMPP报文格式。

?

2.? createConnectionManagerListener();

?private void createConnectionManagerListener() {

? ? ? ? // Start multiplexers socket unless it's been disabled.

? ? ? ? if (isConnectionManagerListenerEnabled()) {

? ? ? ? ? ? // Create SocketAcceptor with correct number of processors

? ? ? ? ? ? multiplexerSocketAcceptor = buildSocketAcceptor();

? ? ? ? ? ? // Customize Executor that will be used by processors to process incoming stanzas

? ? ? ? ? ? ExecutorThreadModel threadModel = ExecutorThreadModel.getInstance("connectionManager");

? ? ? ? ? ? int eventThreads = JiveGlobals.getIntProperty("xmpp.multiplex.processing.threads", 16);

? ? ? ? ? ? ThreadPoolExecutor eventExecutor = (ThreadPoolExecutor) threadModel.getExecutor();

? ? ? ? ? ? eventExecutor.setCorePoolSize(eventThreads + 1);

? ? ? ? ? ? eventExecutor.setMaximumPoolSize(eventThreads + 1);

? ? ? ? ? ? eventExecutor.setKeepAliveTime(60, TimeUnit.SECONDS);

? ? ? ? ? ? multiplexerSocketAcceptor.getDefaultConfig().setThreadModel(threadModel);

? ? ? ? ? ? // Add the XMPP codec filter

? ? ? ? ? ? multiplexerSocketAcceptor.getFilterChain().addFirst("xmpp", new ProtocolCodecFilter(new XMPPCodecFactory()));

?

? ? ? ? }

? ? }

?

从?multiplexerSocketAcceptor.bind(new InetSocketAddress(bindInterface, port), new MultiplexerConnectionHandler(serverName));中可以知道其报文处理是MultiplexerConnectionHandler。

MultiplexerConnectionHandler,ClientConnectionHandler,这个3个类都是继承与ConnectionHandler,下面主要讲解ConnectionHandler。

?

ConnectionHandler继承与IoHandlerAdapter,重写了IoHandlerAdapter的6个方法。

当客户端和服务器建立连接的时候会调用:

@Override

public void sessionOpened(IoSession session) throws Exception {

? ? ? ? // Create a new XML parser for the new connection. The parser will be used by the XMPPDecoder filter.

? ? ? ? final XMLLightweightParser parser = new XMLLightweightParser(CHARSET);

? ? ? ? session.setAttribute(XML_PARSER, parser);

? ? ? ? // Create a new NIOConnection for the new session

? ? ? ? final NIOConnection connection = createNIOConnection(session);

? ? ? ? session.setAttribute(CONNECTION, connection);

? ? ? ? session.setAttribute(HANDLER, createStanzaHandler(connection));

? ? ? ? // Set the max time a connection can be idle before closing it. This amount of seconds

? ? ? ? // is divided in two, as Openfire will ping idle clients first (at 50% of the max idle time)

? ? ? ? // before disconnecting them (at 100% of the max idle time). This prevents Openfire from

? ? ? ? // removing connections without warning.

? ? ? ? final int idleTime = getMaxIdleTime() / 2;

? ? ? ? if (idleTime > 0) {

? ? ? ? ? ? session.setIdleTime(IdleStatus.READER_IDLE, idleTime);

? ? ? ? }

? ? }

//当有一个连接被打开的时候,就会初始化XMPP 的解析器,业务数据解析,以及会话空闲时间。

?

当客户端向服务端发起消息时会调用:

?@Override

public void messageReceived(IoSession session, Object message) throws Exception {

? ? ? ? // Get the stanza handler for this session

? ? ? ? StanzaHandler handler = (StanzaHandler) session.getAttribute(HANDLER);

? ? ? ? // Get the parser to use to process stanza. For optimization there is going

? ? ? ? // to be a parser for each running thread. Each Filter will be executed

? ? ? ? // by the Executor placed as the first Filter. So we can have a parser associated

? ? ? ? // to each Thread

? ? ? ? int hashCode = Thread.currentThread().hashCode();

? ? ? ? XMPPPacketReader parser = parsers.get(hashCode);

? ? ? ? if (parser == null) {

? ? ? ? ? ? parser = new XMPPPacketReader();

? ? ? ? ? ? parser.setXPPFactory(factory);

? ? ? ? ? ? parsers.put(hashCode, parser);

? ? ? ? }

? ? ? ? // Update counter of read btyes

? ? ? ? updateReadBytesCounter(session);

? ? ? ? //System.out.println("RCVD: " + message);

? ? ? ? // Let the stanza handler process the received stanza

? ? ? ? try {

? ? ? ? ? ? handler.process((String) message, parser);

? ? ? ? } catch (Exception e) {

? ? ? ? ? ? Log.error("Closing connection due to error while processing message: " + message, e);

? ? ? ? ? ? Connection connection = (Connection) session.getAttribute(CONNECTION);

? ? ? ? ? ? connection.close();

? ? ? ? }

? ? }

?

?public void process(String stanza, XMPPPacketReader reader) throws Exception {

?

? ? ? ? boolean initialStream = stanza.startsWith("<stream:stream") || stanza.startsWith("<flash:stream");

? ? ? ? if (!sessionCreated || initialStream) {

? ? ? ? ? ? if (!initialStream) {

? ? ? ? ? ? ? ? // Allow requests for flash socket policy files directly on the client listener port

? ? ? ? ? ? ? ? if (stanza.startsWith("<policy-file-request/>")) {

? ? ? ? ? ? ? ? ? ? String crossDomainText = FlashCrossDomainServlet.CROSS_DOMAIN_TEXT +

? ? ? ? ? ? ? ? ? ? ? ? ? ? XMPPServer.getInstance().getConnectionManager().getClientListenerPort() +

? ? ? ? ? ? ? ? ? ? ? ? ? ? FlashCrossDomainServlet.CROSS_DOMAIN_END_TEXT + '\0';

? ? ? ? ? ? ? ? ? ? connection.deliverRawText(crossDomainText);

? ? ? ? ? ? ? ? ? ? return;

? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? else {

? ? ? ? ? ? ? ? ? ? // Ignore <?xml version="1.0"?>

? ? ? ? ? ? ? ? ? ? return;

? ? ? ? ? ? ? ? }

? ? ? ? ? ? }

? ? ? ? ? ? // Found an stream:stream tag...

? ? ? ? ? ? if (!sessionCreated) {

? ? ? ? ? ? ? ? sessionCreated = true;

? ? ? ? ? ? ? ? MXParser parser = reader.getXPPParser();

? ? ? ? ? ? ? ? parser.setInput(new StringReader(stanza));

? ? ? ? ? ? ? ? createSession(parser);

? ? ? ? ? ? }

? ? ? ? ? ? else if (startedTLS) {

? ? ? ? ? ? ? ? startedTLS = false;

? ? ? ? ? ? ? ? tlsNegotiated();

? ? ? ? ? ? }

? ? ? ? ? ? else if (startedSASL && saslStatus == SASLAuthentication.Status.authenticated) {

? ? ? ? ? ? ? ? startedSASL = false;

? ? ? ? ? ? ? ? saslSuccessful();

? ? ? ? ? ? }

? ? ? ? ? ? else if (waitingCompressionACK) {

? ? ? ? ? ? ? ? waitingCompressionACK = false;

? ? ? ? ? ? ? ? compressionSuccessful();

? ? ? ? ? ? }

? ? ? ? ? ? return;

? ? ? ? }

?

? ? ? ? // Verify if end of stream was requested

? ? ? ? if (stanza.equals("</stream:stream>")) {

? ? ? ? ? ? session.close();

? ? ? ? ? ? return;

? ? ? ? }

? ? ? ? // Ignore <?xml version="1.0"?> stanzas sent by clients

? ? ? ? if (stanza.startsWith("<?xml")) {

? ? ? ? ? ? return;

? ? ? ? }

? ? ? ? // Create DOM object from received stanza

? ? ? ? Element doc = reader.read(new StringReader(stanza)).getRootElement();

? ? ? ? if (doc == null) {

? ? ? ? ? ? // No document found.

? ? ? ? ? ? return;

? ? ? ? }

? ? ? ? String tag = doc.getName();

? ? ? ? if ("starttls".equals(tag)) {

? ? ? ? ? ? // Negotiate TLS

? ? ? ? ? ? if (negotiateTLS()) {

? ? ? ? ? ? ? ? startedTLS = true;

? ? ? ? ? ? }

? ? ? ? ? ? else {

? ? ? ? ? ? ? ? connection.close();

? ? ? ? ? ? ? ? session = null;

? ? ? ? ? ? }

? ? ? ? }

? ? ? ? else if ("auth".equals(tag)) {

? ? ? ? ? ? // User is trying to authenticate using SASL

? ? ? ? ? ? startedSASL = true;

? ? ? ? ? ? // Process authentication stanza

? ? ? ? ? ? saslStatus = SASLAuthentication.handle(session, doc);

? ? ? ? }

? ? ? ? else if (startedSASL && "response".equals(tag)) {

? ? ? ? ? ? // User is responding to SASL challenge. Process response

? ? ? ? ? ? saslStatus = SASLAuthentication.handle(session, doc);

? ? ? ? }

? ? ? ? else if ("compress".equals(tag)) {

? ? ? ? ? ? // Client is trying to initiate compression

? ? ? ? ? ? if (compressClient(doc)) {

? ? ? ? ? ? ? ? // Compression was successful so open a new stream and offer

? ? ? ? ? ? ? ? // resource binding and session establishment (to client sessions only)

? ? ? ? ? ? ? ? waitingCompressionACK = true;

? ? ? ? ? ? }

? ? ? ? }

? ? ? ? else {

? ? ? ? ? ? process(doc);

? ? ? ? }

? ? }

//此过程主要的工作是:

1.在调用此方法之前,会调用其编码解码器(XMPPDecoder),对来的报文进行解码。

2.获取XML解析对象XMPPPacketReader,统计接受的数据量,对数据进行解码。

3.根据不同的消息头做不同的处理

?

?

?

?

此篇文章就到此,稍后会有更多关于openfire的个人解读。

联系方式(qq):851392159

出处:http://buerkai.iteye.com

热点排行