openfire(8)openfire中的mina框架使用
? ? ? ? ? ? ? ? ? ? ? ? e.getMessage());
? ? ? ? ? ? ? ? Log.error(LocaleUtils.getLocalizedString("admin.error.socket-setup"), e);
? ? ? ? ? ? }
? ? ? ? }
? ? }
进入SocketAcceptThread会发现其真面目,
public class SocketAcceptThread extends Thread {
?
? ? /**
? ? ?* Holds information about the port on which the server will listen for connections.
? ? ?*/
? ? private ServerPort serverPort;
?
? ? private SocketAcceptingMode acceptingMode;
?
? ? public SocketAcceptThread(ConnectionManager connManager, ServerPort serverPort)
? ? ? ? ? ? throws IOException {
? ? ? ? super("Socket Listener at port " + serverPort.getPort());
? ? ? ? // Listen on a specific network interface if it has been set.
? ? ? ? String interfaceName = JiveGlobals.getXMLProperty("network.interface");
? ? ? ? InetAddress bindInterface = null;
? ? ? ? if (interfaceName != null) {
? ? ? ? ? ? if (interfaceName.trim().length() > 0) {
? ? ? ? ? ? ? ? bindInterface = InetAddress.getByName(interfaceName);
? ? ? ? ? ? ? ? // Create the new server port based on the new bind address
? ? ? ? ? ? ? ? serverPort = new ServerPort(serverPort.getPort(),
? ? ? ? ? ? ? ? ? ? ? ? serverPort.getDomainNames().get(0), interfaceName, serverPort.isSecure(),
? ? ? ? ? ? ? ? ? ? ? ? serverPort.getSecurityType(), serverPort.getType());
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? this.serverPort = serverPort;
? ? ? ? // Set the blocking reading mode to use
? ? ? ? acceptingMode = new BlockingAcceptingMode(connManager, serverPort, bindInterface);
? ? }
?
? ? /**
? ? ?* Retrieve the port this server socket is bound to.
? ? ?*
? ? ?* @return the port the socket is bound to.
? ? ?*/
? ? public int getPort() {
? ? ? ? return serverPort.getPort();
? ? }
?
? ? /**
? ? ?* Returns information about the port on which the server is listening for connections.
? ? ?*
? ? ?* @return information about the port on which the server is listening for connections.
? ? ?*/
? ? public ServerPort getServerPort() {
? ? ? ? return serverPort;
? ? }
?
? ? /**
? ? ?* Unblock the thread and force it to terminate.
? ? ?*/
? ? public void shutdown() {
? ? ? ? acceptingMode.shutdown();
? ? }
?
? ? /**
? ? ?* About as simple as it gets. ?The thread spins around an accept
? ? ?* call getting sockets and handing them to the SocketManager.
? ? ?*/
? ? @Override
public void run() {
? ? ? ? acceptingMode.run();
? ? ? ? // We stopped accepting new connections so close the listener
? ? ? ? shutdown();
? ? }
}
?
class BlockingAcceptingMode extends SocketAcceptingMode {
?
private static final Logger Log = LoggerFactory.getLogger(BlockingAcceptingMode.class);
?
? ? protected BlockingAcceptingMode(ConnectionManager connManager, ServerPort serverPort,
? ? ? ? ? ? InetAddress bindInterface) throws IOException {
? ? ? ? super(connManager, serverPort);
? ? ? ? serverSocket = new ServerSocket(serverPort.getPort(), -1, bindInterface);
? ? }
?
? ? /**
? ? ?* About as simple as it gets. ?The thread spins around an accept
? ? ?* call getting sockets and creating new reading threads for each new connection.
? ? ?*/
? ? @Override
public void run() {
? ? ? ? while (notTerminated) {
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? Socket sock = serverSocket.accept();
? ? ? ? ? ? ? ? if (sock != null) {
? ? ? ? ? ? ? ? ? ? Log.debug("Connect " + sock.toString());
? ? ? ? ? ? ? ? ? ? SocketReader reader =
? ? ? ? ? ? ? ? ? ? ? ? ? ? connManager.createSocketReader(sock, false, serverPort, true);
? ? ? ? ? ? ? ? ? ? Thread thread = new Thread(reader, reader.getName());
? ? ? ? ? ? ? ? ? ? thread.setDaemon(true);
? ? ? ? ? ? ? ? ? ? thread.setPriority(Thread.NORM_PRIORITY);
? ? ? ? ? ? ? ? ? ? thread.start();
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? catch (IOException ie) {
? ? ? ? ? ? ? ? if (notTerminated) {
? ? ? ? ? ? ? ? ? ? Log.error(LocaleUtils.getLocalizedString("admin.error.accept"),
? ? ? ? ? ? ? ? ? ? ? ? ? ? ie);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? catch (Throwable e) {
? ? ? ? ? ? ? ? Log.error(LocaleUtils.getLocalizedString("admin.error.accept"), e);
? ? ? ? ? ? }
? ? ? ? }
? ? }
}
//此处只要这个服务器,接受到一个请求,就会创建一个新的线程去解析相关数据,并做逻辑处理。
?
?@Override
public void run() {
? ? ? ? try {
? ? ? ? ? ? socketReader.reader.getXPPParser().setInput(new InputStreamReader(
? ? ? ? ? ? ? ? ? ? ServerTrafficCounter.wrapInputStream(socket.getInputStream()), CHARSET));
?
? ? ? ? ? ? // Read in the opening tag and prepare for packet stream
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? socketReader.createSession();
? ? ? ? ? ? }
? ? ? ? ? ? catch (IOException e) {
? ? ? ? ? ? ? ? Log.debug("Error creating session", e);
? ? ? ? ? ? ? ? throw e;
? ? ? ? ? ? }
?
? ? ? ? ? ? // Read the packet stream until it ends
? ? ? ? ? ? if (socketReader.session != null) {
? ? ? ? ? ? ? ? readStream();
? ? ? ? ? ? }
?
? ? ? ? }
? ? ? ? catch (EOFException eof) {
? ? ? ? ? ? // Normal disconnect
? ? ? ? }
? ? ? ? catch (SocketException se) {
? ? ? ? ? ? // The socket was closed. The server may close the connection for several
? ? ? ? ? ? // reasons (e.g. user requested to remove his account). Do nothing here.
? ? ? ? }
? ? ? ? catch (AsynchronousCloseException ace) {
? ? ? ? ? ? // The socket was closed.
? ? ? ? }
? ? ? ? catch (XmlPullParserException ie) {
? ? ? ? ? ? // It is normal for clients to abruptly cut a connection
? ? ? ? ? ? // rather than closing the stream document. Since this is
? ? ? ? ? ? // normal behavior, we won't log it as an error.
? ? ? ? ? ? // Log.error(LocaleUtils.getLocalizedString("admin.disconnect"),ie);
? ? ? ? }
? ? ? ? catch (Exception e) {
? ? ? ? ? ? if (socketReader.session != null) {
? ? ? ? ? ? ? ? Log.warn(LocaleUtils.getLocalizedString("admin.error.stream") + ". Session: " +
? ? ? ? ? ? ? ? ? ? ? ? socketReader.session, e);
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? finally {
? ? ? ? ? ? if (socketReader.session != null) {
? ? ? ? ? ? ? ? if (Log.isDebugEnabled()) {
? ? ? ? ? ? ? ? ? ? Log.debug("Logging off " + socketReader.session.getAddress() + " on " + socketReader.connection);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? socketReader.session.close();
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? catch (Exception e) {
? ? ? ? ? ? ? ? ? ? Log.warn(LocaleUtils.getLocalizedString("admin.error.connection") + socket.toString());
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? else {
? ? ? ? ? ? ? ? // Close and release the created connection
? ? ? ? ? ? ? ? socketReader.connection.close();
? ? ? ? ? ? ? ? Log.debug(LocaleUtils.getLocalizedString("admin.error.connection") + socket.toString());
? ? ? ? ? ? }
? ? ? ? ? ? socketReader.shutdown();
? ? ? ? }
? ? }
?
//createSession中进行报文的解析,
protected void createSession()
? ? ? ? ? ? throws UnauthorizedException, XmlPullParserException, IOException {
? ? ? ? XmlPullParser xpp = reader.getXPPParser();
? ? ? ? for (int eventType = xpp.getEventType(); eventType != XmlPullParser.START_TAG;) {
? ? ? ? ? ? eventType = xpp.next();
? ? ? ? }
?
? ? ? ? // Check that the TO attribute of the stream header matches the server name or a valid
? ? ? ? // subdomain. If the value of the 'to' attribute is not valid then return a host-unknown
? ? ? ? // error and close the underlying connection.
? ? ? ? System.out.println(reader.getXPPParser().toString());
? ? ? ? String host = reader.getXPPParser().getAttributeValue("", "to");
? ? ? ? if (validateHost() && isHostUnknown(host)) {
? ? ? ? ? ? StringBuilder sb = new StringBuilder(250);
? ? ? ? ? ? sb.append("<?xml version='1.0' encoding='");
? ? ? ? ? ? sb.append(CHARSET);
? ? ? ? ? ? sb.append("'?>");
? ? ? ? ? ? // Append stream header
? ? ? ? ? ? sb.append("<stream:stream ");
? ? ? ? ? ? sb.append("from="").append(serverName).append("" ");
? ? ? ? ? ? sb.append("id="").append(StringUtils.randomString(5)).append("" ");
? ? ? ? ? ? sb.append("xmlns="").append(xpp.getNamespace(null)).append("" ");
? ? ? ? ? ? sb.append("xmlns:stream="").append(xpp.getNamespace("stream")).append("" ");
? ? ? ? ? ? sb.append("version="1.0">");
? ? ? ? ? ? // Set the host_unknown error
? ? ? ? ? ? StreamError error = new StreamError(StreamError.Condition.host_unknown);
? ? ? ? ? ? sb.append(error.toXML());
? ? ? ? ? ? // Deliver stanza
? ? ? ? ? ? connection.deliverRawText(sb.toString());
? ? ? ? ? ? // Close the underlying connection
? ? ? ? ? ? connection.close();
? ? ? ? ? ? // Log a warning so that admins can track this cases from the server side
? ? ? ? ? ? Log.warn("Closing session due to incorrect hostname in stream header. Host: " + host +
? ? ? ? ? ? ? ? ? ? ". Connection: " + connection);
? ? ? ? }
?
? ? ? ? // Create the correct session based on the sent namespace. At this point the server
? ? ? ? // may offer the client to secure the connection. If the client decides to secure
? ? ? ? // the connection then a <starttls> stanza should be received
? ? ? ? else if (!createSession(xpp.getNamespace(null))) {
? ? ? ? ? ? // No session was created because of an invalid namespace prefix so answer a stream
? ? ? ? ? ? // error and close the underlying connection
? ? ? ? ? ? StringBuilder sb = new StringBuilder(250);
? ? ? ? ? ? sb.append("<?xml version='1.0' encoding='");
? ? ? ? ? ? sb.append(CHARSET);
? ? ? ? ? ? sb.append("'?>");
? ? ? ? ? ? // Append stream header
? ? ? ? ? ? sb.append("<stream:stream ");
? ? ? ? ? ? sb.append("from="").append(serverName).append("" ");
? ? ? ? ? ? sb.append("id="").append(StringUtils.randomString(5)).append("" ");
? ? ? ? ? ? sb.append("xmlns="").append(xpp.getNamespace(null)).append("" ");
? ? ? ? ? ? sb.append("xmlns:stream="").append(xpp.getNamespace("stream")).append("" ");
? ? ? ? ? ? sb.append("version="1.0">");
? ? ? ? ? ? // Include the bad-namespace-prefix in the response
? ? ? ? ? ? StreamError error = new StreamError(StreamError.Condition.bad_namespace_prefix);
? ? ? ? ? ? sb.append(error.toXML());
? ? ? ? ? ? connection.deliverRawText(sb.toString());
? ? ? ? ? ? // Close the underlying connection
? ? ? ? ? ? connection.close();
? ? ? ? ? ? // Log a warning so that admins can track this cases from the server side
? ? ? ? ? ? Log.warn("Closing session due to bad_namespace_prefix in stream header. Prefix: " +
? ? ? ? ? ? ? ? ? ? xpp.getNamespace(null) + ". Connection: " + connection);
? ? ? ? }
? ? }
将相关信息转换为XMPP报文格式。
?
2.? createConnectionManagerListener();
?private void createConnectionManagerListener() {
? ? ? ? // Start multiplexers socket unless it's been disabled.
? ? ? ? if (isConnectionManagerListenerEnabled()) {
? ? ? ? ? ? // Create SocketAcceptor with correct number of processors
? ? ? ? ? ? multiplexerSocketAcceptor = buildSocketAcceptor();
? ? ? ? ? ? // Customize Executor that will be used by processors to process incoming stanzas
? ? ? ? ? ? ExecutorThreadModel threadModel = ExecutorThreadModel.getInstance("connectionManager");
? ? ? ? ? ? int eventThreads = JiveGlobals.getIntProperty("xmpp.multiplex.processing.threads", 16);
? ? ? ? ? ? ThreadPoolExecutor eventExecutor = (ThreadPoolExecutor) threadModel.getExecutor();
? ? ? ? ? ? eventExecutor.setCorePoolSize(eventThreads + 1);
? ? ? ? ? ? eventExecutor.setMaximumPoolSize(eventThreads + 1);
? ? ? ? ? ? eventExecutor.setKeepAliveTime(60, TimeUnit.SECONDS);
? ? ? ? ? ? multiplexerSocketAcceptor.getDefaultConfig().setThreadModel(threadModel);
? ? ? ? ? ? // Add the XMPP codec filter
? ? ? ? ? ? multiplexerSocketAcceptor.getFilterChain().addFirst("xmpp", new ProtocolCodecFilter(new XMPPCodecFactory()));
?
? ? ? ? }
? ? }
?
从?multiplexerSocketAcceptor.bind(new InetSocketAddress(bindInterface, port), new MultiplexerConnectionHandler(serverName));中可以知道其报文处理是MultiplexerConnectionHandler。
MultiplexerConnectionHandler,ClientConnectionHandler,这个3个类都是继承与ConnectionHandler,下面主要讲解ConnectionHandler。
?
ConnectionHandler继承与IoHandlerAdapter,重写了IoHandlerAdapter的6个方法。
当客户端和服务器建立连接的时候会调用:
@Override
public void sessionOpened(IoSession session) throws Exception {
? ? ? ? // Create a new XML parser for the new connection. The parser will be used by the XMPPDecoder filter.
? ? ? ? final XMLLightweightParser parser = new XMLLightweightParser(CHARSET);
? ? ? ? session.setAttribute(XML_PARSER, parser);
? ? ? ? // Create a new NIOConnection for the new session
? ? ? ? final NIOConnection connection = createNIOConnection(session);
? ? ? ? session.setAttribute(CONNECTION, connection);
? ? ? ? session.setAttribute(HANDLER, createStanzaHandler(connection));
? ? ? ? // Set the max time a connection can be idle before closing it. This amount of seconds
? ? ? ? // is divided in two, as Openfire will ping idle clients first (at 50% of the max idle time)
? ? ? ? // before disconnecting them (at 100% of the max idle time). This prevents Openfire from
? ? ? ? // removing connections without warning.
? ? ? ? final int idleTime = getMaxIdleTime() / 2;
? ? ? ? if (idleTime > 0) {
? ? ? ? ? ? session.setIdleTime(IdleStatus.READER_IDLE, idleTime);
? ? ? ? }
? ? }
//当有一个连接被打开的时候,就会初始化XMPP 的解析器,业务数据解析,以及会话空闲时间。
?
当客户端向服务端发起消息时会调用:
?@Override
public void messageReceived(IoSession session, Object message) throws Exception {
? ? ? ? // Get the stanza handler for this session
? ? ? ? StanzaHandler handler = (StanzaHandler) session.getAttribute(HANDLER);
? ? ? ? // Get the parser to use to process stanza. For optimization there is going
? ? ? ? // to be a parser for each running thread. Each Filter will be executed
? ? ? ? // by the Executor placed as the first Filter. So we can have a parser associated
? ? ? ? // to each Thread
? ? ? ? int hashCode = Thread.currentThread().hashCode();
? ? ? ? XMPPPacketReader parser = parsers.get(hashCode);
? ? ? ? if (parser == null) {
? ? ? ? ? ? parser = new XMPPPacketReader();
? ? ? ? ? ? parser.setXPPFactory(factory);
? ? ? ? ? ? parsers.put(hashCode, parser);
? ? ? ? }
? ? ? ? // Update counter of read btyes
? ? ? ? updateReadBytesCounter(session);
? ? ? ? //System.out.println("RCVD: " + message);
? ? ? ? // Let the stanza handler process the received stanza
? ? ? ? try {
? ? ? ? ? ? handler.process((String) message, parser);
? ? ? ? } catch (Exception e) {
? ? ? ? ? ? Log.error("Closing connection due to error while processing message: " + message, e);
? ? ? ? ? ? Connection connection = (Connection) session.getAttribute(CONNECTION);
? ? ? ? ? ? connection.close();
? ? ? ? }
? ? }
?
?public void process(String stanza, XMPPPacketReader reader) throws Exception {
?
? ? ? ? boolean initialStream = stanza.startsWith("<stream:stream") || stanza.startsWith("<flash:stream");
? ? ? ? if (!sessionCreated || initialStream) {
? ? ? ? ? ? if (!initialStream) {
? ? ? ? ? ? ? ? // Allow requests for flash socket policy files directly on the client listener port
? ? ? ? ? ? ? ? if (stanza.startsWith("<policy-file-request/>")) {
? ? ? ? ? ? ? ? ? ? String crossDomainText = FlashCrossDomainServlet.CROSS_DOMAIN_TEXT +
? ? ? ? ? ? ? ? ? ? ? ? ? ? XMPPServer.getInstance().getConnectionManager().getClientListenerPort() +
? ? ? ? ? ? ? ? ? ? ? ? ? ? FlashCrossDomainServlet.CROSS_DOMAIN_END_TEXT + '\0';
? ? ? ? ? ? ? ? ? ? connection.deliverRawText(crossDomainText);
? ? ? ? ? ? ? ? ? ? return;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? else {
? ? ? ? ? ? ? ? ? ? // Ignore <?xml version="1.0"?>
? ? ? ? ? ? ? ? ? ? return;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? // Found an stream:stream tag...
? ? ? ? ? ? if (!sessionCreated) {
? ? ? ? ? ? ? ? sessionCreated = true;
? ? ? ? ? ? ? ? MXParser parser = reader.getXPPParser();
? ? ? ? ? ? ? ? parser.setInput(new StringReader(stanza));
? ? ? ? ? ? ? ? createSession(parser);
? ? ? ? ? ? }
? ? ? ? ? ? else if (startedTLS) {
? ? ? ? ? ? ? ? startedTLS = false;
? ? ? ? ? ? ? ? tlsNegotiated();
? ? ? ? ? ? }
? ? ? ? ? ? else if (startedSASL && saslStatus == SASLAuthentication.Status.authenticated) {
? ? ? ? ? ? ? ? startedSASL = false;
? ? ? ? ? ? ? ? saslSuccessful();
? ? ? ? ? ? }
? ? ? ? ? ? else if (waitingCompressionACK) {
? ? ? ? ? ? ? ? waitingCompressionACK = false;
? ? ? ? ? ? ? ? compressionSuccessful();
? ? ? ? ? ? }
? ? ? ? ? ? return;
? ? ? ? }
?
? ? ? ? // Verify if end of stream was requested
? ? ? ? if (stanza.equals("</stream:stream>")) {
? ? ? ? ? ? session.close();
? ? ? ? ? ? return;
? ? ? ? }
? ? ? ? // Ignore <?xml version="1.0"?> stanzas sent by clients
? ? ? ? if (stanza.startsWith("<?xml")) {
? ? ? ? ? ? return;
? ? ? ? }
? ? ? ? // Create DOM object from received stanza
? ? ? ? Element doc = reader.read(new StringReader(stanza)).getRootElement();
? ? ? ? if (doc == null) {
? ? ? ? ? ? // No document found.
? ? ? ? ? ? return;
? ? ? ? }
? ? ? ? String tag = doc.getName();
? ? ? ? if ("starttls".equals(tag)) {
? ? ? ? ? ? // Negotiate TLS
? ? ? ? ? ? if (negotiateTLS()) {
? ? ? ? ? ? ? ? startedTLS = true;
? ? ? ? ? ? }
? ? ? ? ? ? else {
? ? ? ? ? ? ? ? connection.close();
? ? ? ? ? ? ? ? session = null;
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? else if ("auth".equals(tag)) {
? ? ? ? ? ? // User is trying to authenticate using SASL
? ? ? ? ? ? startedSASL = true;
? ? ? ? ? ? // Process authentication stanza
? ? ? ? ? ? saslStatus = SASLAuthentication.handle(session, doc);
? ? ? ? }
? ? ? ? else if (startedSASL && "response".equals(tag)) {
? ? ? ? ? ? // User is responding to SASL challenge. Process response
? ? ? ? ? ? saslStatus = SASLAuthentication.handle(session, doc);
? ? ? ? }
? ? ? ? else if ("compress".equals(tag)) {
? ? ? ? ? ? // Client is trying to initiate compression
? ? ? ? ? ? if (compressClient(doc)) {
? ? ? ? ? ? ? ? // Compression was successful so open a new stream and offer
? ? ? ? ? ? ? ? // resource binding and session establishment (to client sessions only)
? ? ? ? ? ? ? ? waitingCompressionACK = true;
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? else {
? ? ? ? ? ? process(doc);
? ? ? ? }
? ? }
//此过程主要的工作是:
1.在调用此方法之前,会调用其编码解码器(XMPPDecoder),对来的报文进行解码。
2.获取XML解析对象XMPPPacketReader,统计接受的数据量,对数据进行解码。
3.根据不同的消息头做不同的处理
?
?
?
?
此篇文章就到此,稍后会有更多关于openfire的个人解读。
联系方式(qq):851392159
出处:http://buerkai.iteye.com