客户端websocket(C#)长连接及简易RPC框架设计
- 背景
最近有个需求:与Web服务器保持长连接,接收服务端发来的消息,并通过某RPC协议invoke客户端某些回调函数。故有了本文的客户端websocket(C#)长连接及简易RPC框架设计内容,此内容分两次介绍,(一)介绍websocket如何与服务端保持长连接及异步消息处理;(二)介绍简易RPC的框架设计及流程调用。以下为(一)内容。
- websocket(c#)主要钩子介绍
WebSocket是一种计算机通信协议,通过单个TCP连接提供全双工通信信道。它与HTTP的唯一关系是它的握手被HTTP服务器解释为升级请求。
WebSocket协议使得浏览器和Web服务器之间可以进行更多的交互,促进从服务器到客户端的实时数据传输。它通过提供一种标准化的方式,使服务器向浏览器发送内容,而不需要客户端请求,并允许消息来回传递,同时保持连接打开。以这种方式,可以在客户端和服务器之间进行全双工对话。
WebSocket的C#版在github [https://github.com/sta/websocket-sharp] 发布,对于如何使用它,在README.md中有讲到,在这里简单阐述几个主要的钩子函数。
a). 构造
构造一个WebSocket需要以下几步。
第1步:
需要声明命名空间
using WebSocketSharp;
第2步:
使用构造函数创建一个WebSocket实例,在参数中填入需要连接Web服务器的host。
var ws = new WebSocket (“ws://example.com”)
当然,因为WebSocket实现了System.IDisposable接口,所以可以使用using语义:
using (var ws = new WebSocket (“ws://example.com”)) {
…
}
当程序执行完using代码块后,WebSocket会自动关闭,并返回状态码1001(离开状态)。
b). 事件注册
接下来就是注册事件了,WebSocket提供了四个事件注册,即:开启(OnOpen),消息达到(OnMessage),出错(OnError),关闭(OnClose)。
当WebSocket与web服务器连接完成之后,就会触发OnOpen事件。注册OnOpen事件如下:
ws.OnOpen += (sender, e) => {
…
};
消息到达的事件如下:
ws.OnMessage += (sender, e) => {
…
};
可以通过e.Data(返回string的text消息数据)或e.RawData得到(返回byte[]的二进制消息数据)。
当WebSocket出错了,将会触发OnError事件:
ws.OnError += (sender, e) => {
…
};
可以通过e.Message获得错误信息。
当WebSocket关闭时,会触发OnClose事件:
ws.OnClose += (sender, e) => {
…
};
如果想要获取WebSocket退出的原因,可以通过e.Code(unsigned short 状态码)或是e.Reason(string text类型原因)获得。
c). 连接,发送数据及关闭
在新建一个webSocket实例,并注册一系列事件后,可以使用
ws.Connect ();
同步连接Web服务器。也可以通过
ws.ConnectAsync ()
进行异步连接。
在建立连接之后,可以通过
ws.Send()
发送数据。
当想要断开WebSocket时候,可以使用
WebSocket.Close (ushort code, string reason);
并可以传入状态码或原因。
- 异步消息处理
有了WebSocket这个利器,就不用关心具体的网络编程技术,它能帮助我们把服务端的整一条消息发送过来。我们所关系的中重点可以在异步的消息处理和RPC调用。
在本节中叙述了消息的异步处理。
a). 消息类 NotificationMessage
首先我们自定义的消息类NotificationMessage,它有两个属性,Header和Body。其中Header为消息头,他可以是说明那种事件产生的消息,例如:OnError,OnOpen,OnMessage,OnClose,当然我们还以对OnMessage定义地更加细致一些。该消息可以通过
var newMsg = new NotificationMessage
{
Header = “WebSocket Message”,
Body = e.Data
}
进行实例化。
b). 线程池处理
异步消息处理,从最根本来说是将接收的消息放入一个消息队列中,然后起一个线程池去处理这些消息。如下图1所示:
当然考虑到高并发,高可用,然后衍生出许多消息队列工具,分布式生产消费框架等等。在这里就从最简单的入手。
在这里我们实例了一个Notifier,在构造的时候,创建一个线程池去工作:
public Notifier()
{
_queue = new Queue();
ThreadPool.QueueUserWorkItem( state => { while (_enabled || Count > 0) { var msg = dequeue(); if (msg != null) { Console.WriteLine(msg); } else { Thread.Sleep(500); } } } );
}
c). 消息入队列
线程池在队列的头部dequeue一个消息进行“消费”,而在WebSocket事件触发后,创建一个NotificationMessage实例,并将该消息实例enqueue入到队列中,如:
ws.OnMessage += (sender, e) =>
nf.Notify(
new NotificationMessage
{
Header = “WebSocket Message”,
Body = !e.IsPing ? e.Data : “Received a ping.”
}
);
因此,整个消息的流转过程如下图2所示:
如果想要查阅这个例子的源代码,请移步[csdz github]。
- 总结
本文主要介绍了客户端websocket(C#)如何与服务端保持长连接及异步消息处理,这里的WebSocket以及相关例子是由github中提供,本文只是做了一些修改。
细心的读者可能发现了,对于多线程处理消息,那么如何保证消息有序(消息处理完成顺序即消息从WebSocket到达顺序)?
其实对于多线程处理消息,确实并不能保证消息有序,所以我们只能做一些建议,使得这个问题显得不那么突出。
第一个建议是,这个异步消息处理只适用低并发,所以如果有高并发的需求,建议采用更加成熟的框架。
第二个建议是,可以在消息中增加一个消息id字段,并随着消息的产生逐条递增,在客户端处理消息后,需要修改某些状态的时候,如果发现该状态已经被更大的消息id处理过了,那么这些消息就可以作为丢弃处理(比如位移信息,技能CD时间等等) 。
也可以看看 GoEasy文库 的其他资料.