客户端websocket(C#)长连接及简易RPC框架设计

客户端websocket(C#)长连接及简易RPC框架设计

  1. 背景

最近有个需求:与Web服务器保持长连接,接收服务端发来的消息,并通过某RPC协议invoke客户端某些回调函数。故有了本文的客户端websocket(C#)长连接及简易RPC框架设计内容,此内容分两次介绍,(一)介绍websocket如何与服务端保持长连接及异步消息处理;(二)介绍简易RPC的框架设计及流程调用。以下为(一)内容。

  1. websocket(c#)主要钩子介绍

WebSocket是一种计算机通信协议,通过单个TCP连接提供全双工通信信道。它与HTTP的唯一关系是它的握手被HTTP服务器解释为升级请求。

WebSocket协议使得浏览器和Web服务器之间可以进行更多的交互,促进从服务器到客户端的实时数据传输。它通过提供一种标准化的方式,使服务器向浏览器发送内容,而不需要客户端请求,并允许消息来回传递,同时保持连接打开。以这种方式,可以在客户端和服务器之间进行全双工对话。

WebSocket的C#版在github [https://github.com/sta/websocket-sharp] 发布,对于如何使用它,在README.md中有讲到,在这里简单阐述几个主要的钩子函数。
a). 构造

构造一个WebSocket需要以下几步。

第1步:

需要声明命名空间

using WebSocketSharp;

第2步:
使用构造函数创建一个WebSocket实例,在参数中填入需要连接Web服务器的host。

var ws = new WebSocket (“ws://example.com”)

当然,因为WebSocket实现了System.IDisposable接口,所以可以使用using语义:

using (var ws = new WebSocket (“ws://example.com”)) {

}

当程序执行完using代码块后,WebSocket会自动关闭,并返回状态码1001(离开状态)。
b). 事件注册

接下来就是注册事件了,WebSocket提供了四个事件注册,即:开启(OnOpen),消息达到(OnMessage),出错(OnError),关闭(OnClose)。

当WebSocket与web服务器连接完成之后,就会触发OnOpen事件。注册OnOpen事件如下:

ws.OnOpen += (sender, e) => {

};

消息到达的事件如下:

ws.OnMessage += (sender, e) => {

};

可以通过e.Data(返回string的text消息数据)或e.RawData得到(返回byte[]的二进制消息数据)。

当WebSocket出错了,将会触发OnError事件:

ws.OnError += (sender, e) => {

};

可以通过e.Message获得错误信息。

当WebSocket关闭时,会触发OnClose事件:

ws.OnClose += (sender, e) => {

};

如果想要获取WebSocket退出的原因,可以通过e.Code(unsigned short 状态码)或是e.Reason(string text类型原因)获得。
c). 连接,发送数据及关闭

在新建一个webSocket实例,并注册一系列事件后,可以使用

ws.Connect ();

同步连接Web服务器。也可以通过

ws.ConnectAsync ()

进行异步连接。

在建立连接之后,可以通过

ws.Send()

发送数据。

当想要断开WebSocket时候,可以使用

WebSocket.Close (ushort code, string reason);

并可以传入状态码或原因。

  1. 异步消息处理

有了WebSocket这个利器,就不用关心具体的网络编程技术,它能帮助我们把服务端的整一条消息发送过来。我们所关系的中重点可以在异步的消息处理和RPC调用。

在本节中叙述了消息的异步处理。
a). 消息类 NotificationMessage

首先我们自定义的消息类NotificationMessage,它有两个属性,Header和Body。其中Header为消息头,他可以是说明那种事件产生的消息,例如:OnError,OnOpen,OnMessage,OnClose,当然我们还以对OnMessage定义地更加细致一些。该消息可以通过

var newMsg = new NotificationMessage
{
Header = “WebSocket Message”,
Body = e.Data
}

进行实例化。
b). 线程池处理

异步消息处理,从最根本来说是将接收的消息放入一个消息队列中,然后起一个线程池去处理这些消息。如下图1所示:

当然考虑到高并发,高可用,然后衍生出许多消息队列工具,分布式生产消费框架等等。在这里就从最简单的入手。

在这里我们实例了一个Notifier,在构造的时候,创建一个线程池去工作:

public Notifier()
{
_queue = new Queue();

ThreadPool.QueueUserWorkItem(
  state =>
  {
      while (_enabled || Count > 0)
      {
          var msg = dequeue();
          if (msg != null)
          {
              Console.WriteLine(msg);
          }
          else
          {
              Thread.Sleep(500);
          }
      }
  }
);

}

c). 消息入队列

线程池在队列的头部dequeue一个消息进行“消费”,而在WebSocket事件触发后,创建一个NotificationMessage实例,并将该消息实例enqueue入到队列中,如:

ws.OnMessage += (sender, e) =>
nf.Notify(
new NotificationMessage
{
Header = “WebSocket Message”,
Body = !e.IsPing ? e.Data : “Received a ping.”
}
);

因此,整个消息的流转过程如下图2所示:

如果想要查阅这个例子的源代码,请移步[csdz github]。

  1. 总结

本文主要介绍了客户端websocket(C#)如何与服务端保持长连接及异步消息处理,这里的WebSocket以及相关例子是由github中提供,本文只是做了一些修改。

细心的读者可能发现了,对于多线程处理消息,那么如何保证消息有序(消息处理完成顺序即消息从WebSocket到达顺序)?

其实对于多线程处理消息,确实并不能保证消息有序,所以我们只能做一些建议,使得这个问题显得不那么突出。

第一个建议是,这个异步消息处理只适用低并发,所以如果有高并发的需求,建议采用更加成熟的框架。

第二个建议是,可以在消息中增加一个消息id字段,并随着消息的产生逐条递增,在客户端处理消息后,需要修改某些状态的时候,如果发现该状态已经被更大的消息id处理过了,那么这些消息就可以作为丢弃处理(比如位移信息,技能CD时间等等) 。

也可以看看 GoEasy文库 的其他资料.

发表评论

邮箱地址不会被公开。

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据