trinitycore魔兽服务器源码分析(⼆)⽹络
书接上⽂继续分析Socket.h SocketMgr.h
template<class T>
class Socket : public std::enable_shared_from_this<T>
根据智能指针的使⽤规则类中有使⽤本类⾃⼰的指针必须继承⾃enable_shared_from_this<> 防⽌⾃引⽤不能释放的BUG
class Socket封装了asio中的socket类获取远端ip 端⼝等功能,并且额外提供异步读写的功能
类中的两个原⼦变量 _clod _closing标记该socket的关闭开启状态
bool Update()函数根据socket是否是同步异步标记进⾏写⼊队列的处理。同步则进⾏处理异步则暂缓
void AsyncRead() void AsyncReadWithCallback(void (T::*callback)(boost::system::error_code, std::size_t))
则采取异步读取socket 调⽤默认函数ReadHandlerInternal() 或者输⼊函数T::*callback()
由于AsyncReadWithCallback 函数中bind 需要 T类的指针所以才有开头的继承std::enable_shared_from_this<T>
但是使⽤⽐较怪异 std::enable_shared_from_this<>⽤法⼀般是继承⾃⼰本⾝
class lf :: public std::enable_shared_from_this<lf>{
public:
void test(){
// only for test
std::bind(&lf ::test, shared_from_this());
}
}
异步写write类似 ,由bool AsyncProcessQueue()函数发起
使⽤asio的async_write_some函数异步读取连接内容并调⽤回调函数WriteHandler()或者WriteHandlerWrapper()
不过需要结合MessageBuffer ⼀起跟进流程
类代码如下
大白鲨电影1/*
2 * Copyright (C) 2008-2017 TrinityCore &initycore/>
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public Licen as published by the
6 * Free Software Foundation; either version 2 of the Licen, or (at your
7 * option) any later version.
8 *
9 * This program is distributed in the hope that it will be uful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public Licen for
12 * more details.
13 *
14 * You should have received a copy of the GNU General Public Licen along
15 * with this program. If not, e &u/licens/>.
16*/
17
18 #ifndef __SOCKET_H__
19#define __SOCKET_H__
20
21 #include "MessageBuffer.h"
22 #include "Log.h"
23 #include <atomic>
24 #include <queue>
25 #include <memory>
26 #include <functional>
27 #include <type_traits>
28 #include <boost/asio/ip/tcp.hpp>
29
30using boost::asio::ip::tcp;
31
32#define READ_BLOCK_SIZE 4096
33 #ifdef BOOST_ASIO_HAS_IOCP
34#define TC_SOCKET_USE_IOCP
35#endif
36
37 template<class T>
38class Socket : public std::enable_shared_from_this<T>
39 {
40public:
41explicit Socket(tcp::socket&& socket) : _socket(std::move(socket)), _remoteAddress(__endpoint().address()),
42 _remotePort(__endpoint().port()), _readBuffer(), _clod(fal), _closing(fal), _isWritingAsync(fal)
怎么辨别朱砂的真假简单方法43 {
44 _readBuffer.Resize(READ_BLOCK_SIZE);
45 }
46
47virtual ~Socket()
48 {
49 _clod = true;
50 boost::system::error_code error;
51 _socket.clo(error);
52 }
53
54virtual void Start() = 0;
55
56virtual bool Update()
57 {
58if (_clod)
59return fal;
60
61 #ifndef TC_SOCKET_USE_IOCP
62if (_isWritingAsync || (_pty() && !_closing))
63return true;
64
65for (; HandleQueue();)
66 ;
67#endif
68
69return true;
70 }
71
72 boost::asio::ip::address GetRemoteIpAddress() const
73 {
74return _remoteAddress;
75 }
76
77 uint16 GetRemotePort() const
78 {
79return _remotePort;
80 }
81
82void AsyncRead()
83 {
84if (!IsOpen())
85return;
86
87 _readBuffer.Normalize();
88 _readBuffer.EnsureFreeSpace();
89 _socket.async_read_some(boost::asio::buffer(_readBuffer.GetWritePointer(), _readBuffer.GetRemainingSpace()),
90 std::bind(&Socket<T>::ReadHandlerInternal, this->shared_from_this(), std::placeholders::_1, std::placeholders::_2));
91 }
92
93void AsyncReadWithCallback(void (T::*callback)(boost::system::error_code, std::size_t))
94 {
95if (!IsOpen())
96return;
97
98 _readBuffer.Normalize();
99 _readBuffer.EnsureFreeSpace();
100 _socket.async_read_some(boost::asio::buffer(_readBuffer.GetWritePointer(), _readBuffer.GetRemainingSpace()),
101 std::bind(callback, this->shared_from_this(), std::placeholders::_1, std::placeholders::_2));
102 }
103
104void QueuePacket(MessageBuffer&& buffer)
105 {
106 _writeQueue.push(std::move(buffer));
107
108 #ifdef TC_SOCKET_USE_IOCP
109 AsyncProcessQueue();
110#endif
111 }
112
113bool IsOpen() const { return !_clod && !_closing; }
114
115void CloSocket()
116 {
117if (_hange(true))
庄子逍遥游118return;
119
120 boost::system::error_code shutdownError;
121 _socket.shutdown(boost::asio::socket_ba::shutdown_nd, shutdownError);
122if (shutdownError)
123 TC_LOG_DEBUG("network", "Socket::CloSocket: %s errored when shutting down socket: %i (%s)", GetRemoteIpAddress().to_string().c_str(), 124 shutdownError.value(), ssage().c_str());
125
126 OnClo();
127 }
128
129/// Marks the socket for closing after write buffer becomes empty
130void DelayedCloSocket() { _closing = true; }
131
132 MessageBuffer& GetReadBuffer() { return _readBuffer; }
133
134protected:
135virtual void OnClo() { }
136
137virtual void ReadHandler() = 0;
138
139bool AsyncProcessQueue()
140 {
141if (_isWritingAsync)
142return fal;
143
144 _isWritingAsync = true;
145
146 #ifdef TC_SOCKET_USE_IOCP
147 MessageBuffer& buffer = _writeQueue.front();
148 _socket.async_write_some(boost::asio::buffer(buffer.GetReadPointer(), buffer.GetActiveSize()), std::bind(&Socket<T>::WriteHandler,
149this->shared_from_this(), std::placeholders::_1, std::placeholders::_2));
150#el
151 _socket.async_write_some(boost::asio::null_buffers(), std::bind(&Socket<T>::WriteHandlerWrapper,
152this->shared_from_this(), std::placeholders::_1, std::placeholders::_2));
153#endif
154
155return fal;
156 }
157
158void SetNoDelay(bool enable)
159 {
160 boost::system::error_code err;
161 _socket.t_option(tcp::no_delay(enable), err);
162if (err)
163 TC_LOG_DEBUG("network", "Socket::SetNoDelay: failed to t_option(boost::asio::ip::tcp::no_delay) for %s - %d (%s)",
164 GetRemoteIpAddress().to_string().c_str(), err.value(), ssage().c_str());
165 }
166
167private:
168void ReadHandlerInternal(boost::system::error_code error, size_t transferredBytes)
169 {
170if (error)
171 {
172 CloSocket();
173return;
174 }
175
176 _readBuffer.WriteCompleted(transferredBytes);
177 ReadHandler();
178 }
179
180 #ifdef TC_SOCKET_USE_IOCP
181
小虎队爱的歌词182void WriteHandler(boost::system::error_code error, std::size_t transferedBytes)
183 {
184if (!error)
185 {
186 _isWritingAsync = fal;
187 _writeQueue.front().ReadCompleted(transferedBytes);
188if (!_writeQueue.front().GetActiveSize())
189 _writeQueue.pop();
190
191if (!_pty())
192 AsyncProcessQueue();
193el if (_closing)
194 CloSocket();
195 }
196el
197 CloSocket();
198 }卸妆水和卸妆油的区别
199
200#el
201
202void WriteHandlerWrapper(boost::system::error_code /*error*/, std::size_t /*transferedBytes*/)
203 {
204 _isWritingAsync = fal;
205 HandleQueue();
206 }
207
208bool HandleQueue()
209 {
210if (_pty())
211return fal;
212
213 MessageBuffer& queuedMessage = _writeQueue.front();
214
215 std::size_t bytesToSend = queuedMessage.GetActiveSize();
216
217 boost::system::error_code error;
218 std::size_t bytesSent = _socket.write_some(boost::asio::buffer(queuedMessage.GetReadPointer(), bytesToSend), error);
219
220if (error)
221 {
222if (error == boost::asio::error::would_block || error == boost::asio::error::try_again)
223return AsyncProcessQueue();
224
225 _writeQueue.pop();
226if (_closing && _pty())
227 CloSocket();
228return fal;
229 }
230el if (bytesSent == 0)
231 {
232 _writeQueue.pop();
233if (_closing && _pty())
234 CloSocket();
235return fal;
236 }
237el if (bytesSent < bytesToSend) // now n > 0
238 {
239 queuedMessage.ReadCompleted(bytesSent);
240return AsyncProcessQueue();
241 }
242
243 _writeQueue.pop();
244if (_closing && _pty())
245 CloSocket();
246return !_pty();
247 }
248
249#endif
250
251 tcp::socket _socket;
252
253 boost::asio::ip::address _remoteAddress;
254 uint16 _remotePort;
255
256 MessageBuffer _readBuffer;
257 std::queue<MessageBuffer> _writeQueue;
258
259 std::atomic<bool> _clod;
用人山人海造句
260 std::atomic<bool> _closing;
261
262bool _isWritingAsync;
263 };
264
265#endif// __SOCKET_H__
View Code
//======================================================
template<class SocketType>
class SocketMgr
将之前的Socket NetworkThread AsyncAcceptor
老电影洪湖赤卫队
整合了起来
virtual bool StartNetwork(boost::asio::io_rvice& rvice, std::string const& bindIp, uint16 port, int threadCount)函数
开启threadCount个NetworkThread
创建⼀个AsyncAcceptor 异步ACCEPT连接
uint32 SelectThreadWithMinConnections() 函数会返回连接数⽬最少的NetworkThread 的线程索引
std::pair<tcp::socket*, uint32> GetSocketForAccept()则返回连接数⽬最少的线程索引和该线程⽤于异步连接Socket指针
其余的start stop 就没什么了
值得关注的是virtual void OnSocketOpen(tcp::socket&& sock, uint32 threadIndex)
当继承SocketMgr的服务器在accept的时候会调⽤该函数
函数功能是运⾏accept的Socket的run函数
并且讲Socket加⼊到NetworkThread 的Socket容器中(AddSocket函数)
整个类的代码如下
1/*
2 * Copyright (C) 2008-2017 TrinityCore &initycore/>打印机原理
3 *
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public Licen as published by the
6 * Free Software Foundation; either version 2 of the Licen, or (at your
7 * option) any later version.
8 *
9 * This program is distributed in the hope that it will be uful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public Licen for
12 * more details.
13 *
14 * You should have received a copy of the GNU General Public Licen along
15 * with this program. If not, e &u/licens/>.
16*/
17
18 #ifndef SocketMgr_h__
19#define SocketMgr_h__
20
21 #include "AsyncAcceptor.h"
22 #include "Errors.h"
23 #include "NetworkThread.h"
24 #include <boost/asio/ip/tcp.hpp>
25 #include <memory>
26
27using boost::asio::ip::tcp;
28
29 template<class SocketType>
30class SocketMgr
31 {
32public:
33virtual ~SocketMgr()
34 {
35 ASSERT(!_threads && !_acceptor && !_threadCount, "StopNetwork must be called prior to SocketMgr destruction");
36 }
37
38virtual bool StartNetwork(boost::asio::io_rvice& rvice, std::string const& bindIp, uint16 port, int threadCount)
39 {
40 ASSERT(threadCount > 0);
41
42 AsyncAcceptor* acceptor = nullptr;
43try
44 {
45 acceptor = new AsyncAcceptor(rvice, bindIp, port);
46 }
47catch (boost::system::system_error const& err)
48 {
49 TC_LOG_ERROR("network", "Exception caught in SocketMgr.StartNetwork (%s:%u): %s", bindIp.c_str(), port, err.what());
50return fal;
51 }
52
53if (!acceptor->Bind())
54 {
55 TC_LOG_ERROR("network", "StartNetwork failed to bind socket acceptor");