异步读取数据, 如何添加线程
往哪里添加线程啊???对于异步处理.代码是服务端,接受数据并显示。handle_read是读取数据函数,如果数据发送过多, 假设有多个客户端发送数据过来是否需要用线程来处理???线程 貌似加不上, 谁个演示,加上个线程,来处理数据。 class clientSession :public boost::enable_shared_from_this<clientSession>{public: clientSession(boost::asio::io_service& ioservice) :m_socket(ioservice) { memset(data_,'\0',sizeof(data_)); } ~clientSession() {} tcp::socket& socket() { return m_socket; } void start() { m_socket.async_read_some(boost::asio::buffer(data_,max_len), boost::bind(&clientSession::handle_read,shared_from_this(), boost::asio::placeholders::error)); }private: void handle_read(const boost::system::error_code& error) { if(!error) { m_socket.async_read_some(boost::asio::buffer(data_,max_len), boost::bind(&clientSession::handle_read,shared_from_this(), boost::asio::placeholders::error)); std::cout << data_ << std::endl; } else { m_socket.close(); } }private: tcp::socket m_socket; char data_[max_len];};class serverApp{ typedef boost::shared_ptr<clientSession> session_ptr;public: serverApp(boost::asio::io_service& ioservice,tcp::endpoint& endpoint) :m_ioservice(ioservice), acceptor_(ioservice,endpoint) { session_ptr new_session(new clientSession(ioservice)); acceptor_.async_accept(new_session->socket(), boost::bind(&serverApp::handle_accept,this,boost::asio::placeholders::error, new_session)); } private: void handle_accept(const boost::system::error_code& error,session_ptr& session) { if(!error) { std::cout << "get a new client!" << std::endl; //实现对每个客户端的数据处理 session->start(); //在这就应该看出为什么要封session类了吧,每一个session就是一个客户端 session_ptr new_session(new clientSession(m_ioservice)); acceptor_.async_accept(new_session->socket(), boost::bind(&serverApp::handle_accept,this,boost::asio::placeholders::error, new_session)); } }private: boost::asio::io_service& m_ioservice; tcp::acceptor acceptor_;};
std::vector<boost::shared_ptr<boost::thread> > threads; for (std::size_t i = 0; i < io_thread_pool_size_; ++i) { boost::shared_ptr<boost::thread> thread(new boost::thread( boost::bind(&boost::asio::io_service::run, &io_service_))); threads.push_back(thread); }
------解决方案--------------------
#include <boost/asio.hpp>#include <boost/smart_ptr/enable_shared_from_this.hpp>#include <boost/bind.hpp>#include <boost/thread/thread.hpp>#include <string>#include <vector>using boost::enable_shared_from_this;using boost::shared_ptr;using namespace boost::asio;using boost::asio::ip::tcp;using std::string;typedef shared_ptr<tcp::socket> sock_pt;#define max_len 200#define client_count 10#define THREAD_NUM 1class ClientSession : public enable_shared_from_this<ClientSession>{public: ClientSession(io_service& ioservice, sock_pt sock) :m_socket(sock), m_pos(0) { memset(m_data, '\0', sizeof(m_data)); } sock_pt socket() { return m_socket; } void start(int nPos) { m_pos = 0; m_socket->async_read_some(buffer(m_data, max_len), boost::bind(&ClientSession::handle_read, shared_from_this(), placeholders::error)); }private: //fileName是文件名 void handle_read(const boost::system::error_code& error) { if (! error) { m_socket->async_read_some(buffer(m_data, max_len), boost::bind(&ClientSession::handle_read, shared_from_this(), placeholders::error)); //存盘 printf("Client(%d) recv msg:%s\n", m_pos, m_data); } else { m_socket->close(); } }private: sock_pt m_socket; char m_data[max_len]; int m_pos;};class serverApp{ typedef boost::shared_ptr<ClientSession> session_ptr;public: serverApp(io_service& ioservice,tcp::endpoint& endpoint) :m_ioservice(ioservice), m_acceptor(ioservice, endpoint) { for(int i= 0 ;i < client_count; i++) { sock_pt sock(new tcp::socket(ioservice)); m_acceptor.async_accept(*sock, boost::bind(&serverApp::handle_accept, this, placeholders::error, sock, i)); } }private: void handle_accept(const boost::system::error_code& error, sock_pt sock, int nPos) { if (! error) { session_ptr new_session(new ClientSession(m_ioservice, sock)); new_session->start(nPos); } }private: io_service& m_ioservice; tcp::acceptor m_acceptor;};int _tmain(int argc, _TCHAR* argv[]){ boost::asio::io_service ios; serverApp serv(ios, tcp::endpoint(tcp::v4(), 6688)); boost::thread_group tg; for (int i = 0; i < THREAD_NUM; i++) tg.add_thread(new boost::thread(boost::bind(&io_service::run, &ios))); tg.join_all(); return 0;}