SuperSocket源码解析之消息处理
⼀简述
Tcp消息的处理本⾝是与Tcp消息传输过程独⽴的,是消息的两个不同阶段,从前⾯的会话⽣命周期我们已经知道消息的传输主要有SocketSession实现,⽽真正处理则交由AppSession实现,SuperSocket的层次划分也是⾮常清晰明了。
SuperSocket消息处理主要流程:接收=》原始过滤=》协议解析=》命令路由并执⾏=》找不到命令则直接⼀分不动发给客户端
⼆消息接收
1 开始接收
代码位置:AsyncSocketSession=》StartReceive
1private void StartReceive(SocketAsyncEventArgs e)
2 {
3 StartReceive(e, 0);
4 }
5
6private void StartReceive(SocketAsyncEventArgs e, int offtDelta)
7 {
8bool willRaiEvent = fal;
9
10try
11 {
12if (offtDelta < 0 || offtDelta >= Config.ReceiveBufferSize)
13throw new ArgumentException(string.Format("Illigal offtDelta: {0}", offtDelta), "offtDelta");
14
15var predictOfft = SocketAsyncProxy.OrigOfft + offtDelta;
16
17if (e.Offt != predictOfft)
18 {
19 e.SetBuffer(predictOfft, Config.ReceiveBufferSize - offtDelta);
20 }
21
22if (IsInClosingOrClod)
23return;
24
25 OnReceiveStarted();
26 willRaiEvent = Client.ReceiveAsync(e);
27 }
28catch (Exception exc)
29 {
30 LogError(exc);
31
32 OnReceiveError(CloReason.SocketError);
33return;
34 }
35
36if (!willRaiEvent)
37 {
38 ProcessReceive(e);
39 }
40 }
View Code
在接收数据前后触发ReceiveStarted,ReceiveEnd事件
2 接收有消息处理并转⼊处理
1public void ProcessReceive(SocketAsyncEventArgs e)
2 {
3if (!ProcessCompleted(e))
4 {
5 OnReceiveError(CloReason.ClientClosing);
6return;
7 }
8
9 OnReceiveEnded();
10
11int offtDelta;
12
13try
14 {
15//交给app会话处理接收到的数据,⽽appssion⼜交给接收请求处理器处理
16 offtDelta = this.AppSession.ProcessRequest(e.Buffer, e.Offt, e.BytesTransferred, true);
17 }
18catch (Exception exc)
19 {
20 LogError("Protocol error", exc);
21this.Clo(CloReason.ProtocolError);
22return;
23 }
24
25//read the next block of data nt from the client
26 StartReceive(e, offtDelta);
27 }
View Code
三消息处理
1 ⼊⼝:按照协议解析,每次只处理⼀个数据包,因此便有了如下的⼊⼝代码
1int IAppSession.ProcessRequest(byte[] readBuffer, int offt, int length, bool toBeCopied)
2 {
3int rest, offtDelta;
4
5while (true)
6 {
7var requestInfo = FilterRequest(readBuffer, offt, length, toBeCopied, out rest, out offtDelta);
8
9if (requestInfo != null)
10 {
11try
12 {
13 AppServer.ExecuteCommand(this, requestInfo);
14 }
15catch (Exception e)
16 {
17 HandleException(e);
18 }
19 }
20
21if (rest <= 0)
22 {
23return offtDelta;
24 }
25
26//Still have data has not been procesd
27 offt = offt + length - rest;
28 length = rest;
29 }
30 }
View Code
2 原始过滤
此处以原始数据接收事件⽅式,预留给AppServer⼦类处理该原始数据包,意味着可以对原始数据包执⾏⼀次拦截处理,如果经过⼀些逻辑处理后不能满⾜,则将终⽌该数据包继续传播
3 协议解析
协议解析由AppSession的m_ReceiveFilter成员完成,该成员的实例化有2种实现⽅式
默认实例化:默认命令⾏协议,该协议举例
下⾯是2⾏命令,⾏使⽤\r\n 作为结束标识,也就是回车换⾏,命令⾏内部使⽤空格分隔,第⼀个控制之前如echo 为消息头也就是key,其后的字符串也使⽤空格分隔,作为参数,其整体=body
echo cc xxd
cdc ds mmm
解析后为2个StringRequestInfo对象
默认的协议解析⼯⼚
实例化默认协议解析对象
通过AppServer构造函数传递解析⼯⼚
4 命令路由
上⾯我们已经解析到客户端发来的2条命令分别为echo 参数为cc xxd;cdc ds mmm;
其中key分别为echo和cdc,对于命令模式来说命令本⾝使⽤name字段进⾏标识,如果我们的key与name匹配那么我们即可路由到⼀个已有的命令,进⽽执⾏该命令,来看代码
对于能够路由到的命令我们执⾏命令
对于路由失败来说SuperSocket⼜是怎么做的呢?
找不到命令来处理该消息,将该命令名字发送会客户端,意思说明服务器没有实现该命令,那么命令从何⽽来?
5 命令
这还的从CommandLoader说起,CommandLoader⼜追溯到AppServer的构建过程
在没有显⽰配置CommandLoader的情况下默认为ReflectCommandLoader
ReflectCommandLoader创建命令
ReflectCommandLoader将扫描应⽤程序根⽬录下所有程序,并将实现了命令接⼝的实例通过反射创建出来
1public override bool TryLoadCommands(out IEnumerable<TCommand> commands)
2 {
3 commands = null;
4
5var commandAsmblies = new List<Asmbly>();
6
7if (m_AppServer.GetType().Asmbly != this.GetType().Asmbly)
8 commandAsmblies.Add(m_AppServer.GetType().Asmbly);
9
10string commandAsmbly = m_AppServer.Config.Options.GetValue("commandAsmbly");
11
12if (!string.IsNullOrEmpty(commandAsmbly))
13 {
14 OnError("The configuration attribute 'commandAsmbly' is not in ud, plea try to u the child node 'commandAsmblies' instead!");
15return fal;
16 }
17
18
19if (m_AppServer.Config.CommandAsmblies != null && m_AppServer.Config.CommandAsmblies.Any())
20 {
21try
22 {
23var definedAsmblies = AsmblyUtil.GetAsmbliesFromStrings(m_AppServer.Config.CommandAsmblies.Select(a => a.Asmbly).ToArray()); 24
25if (definedAsmblies.Any())
26 commandAsmblies.AddRange(definedAsmblies);
27 }
28catch (Exception e)
29 {
30 OnError(new Exception("Failed to load defined command asmblies!", e));
31return fal;
32 }
33 }
34
35if (!commandAsmblies.Any())
36 {
37 commandAsmblies.Add(Asmbly.GetEntryAsmbly());
38 }
39
40var outputCommands = new List<TCommand>();
41
42foreach (var asmbly in commandAsmblies)
43 {
44try
45 {
46 outputCommands.AddRange(asmbly.GetImplementedObjectsByInterface<TCommand>());
47 }
48catch (Exception exc)
49 {
50 OnError(new Exception(string.Format("Failed to get commands from the asmbly {0}!", asmbly.FullName), exc));
51return fal;
52 }
53 }
54
55 commands = outputCommands;
56
57return true;
58 }
View Code
因此默认的我们只需要定义⼀些实现命令接⼝ICommand<TAppSession, TRequestInfo>的命令出来,
6 ⾃定义命令
直接继承CommandBa抽象类即可
到此SuperSocket对消息的处理流程差不多就是这样了,SuperSocket的框架的使⽤我们只需要⾃定义⾃⼰的AppServer,以及AppServer 配套的AppSession,ReciverFilter,以及命令等即可,这些在官⽅提供的例⼦中已经很清晰