Ruby 的 Websocket Server 发送压缩后的 Binary Frame 格式的数据

Ruby 的 Websocket Server 发送压缩后的 Binary Frame 格式的数据

正常使用websocket,通常发送的数据格式都是json。 但是在做强实时应用时,如果每秒发几百k的数据的话,带宽还是比较捉急的,需要压缩一下。

Node js

用nodejs做server的话比较简单。 nodejs可直接推送gzip后的数据。 引入一个叫pako的处理gzip的库 https://github.com/nodeca/pako pako.deflate

var WebSocket = require('faye-websocket'),
    http      = require('http');
var pako = require('pako');

var server = http.createServer();


server.on('upgrade', function(request, socket, body) {
  if (WebSocket.isWebSocket(request)) {
    var ws = new WebSocket(request, socket, body);
    ws.on('open' , function(event){
      setInterval(function(){
        var data = {'aaa' : 'aaa' , 'bbb' : 'bbb'}
        var input = JSON.stringify(data)
        var output = pako.deflate(input , {to: 'string'});
        ws.send(output)
      } , 1000)
    });
    ws.on('message', function(event) {

    });

    ws.on('close', function(event) {
      console.log('close', event.code, event.reason);
      // ws = null;
    });
  }
});

server.listen(8080);

前端直接解压数据后即可使用 pako.inflate

var ws = new WebSocket('ws://localhost:8080/websocket')
ws.onopen = function(){
  ws.send('hello')
}

ws.onmessage = function(e) {
  var blob = e.data;
  console.log(pako.inflate(e.data, { to: 'string' }))
}

我们看到的效果是这样的

看起来是text格式的。

ruby gzip压缩只需要

require 'active_support/gzip'

string = 'balabala'
gzip_string = ActiveSupport::Gzip.compress(string)


但是在Ruby的websocket server中,压缩后的数据,无法直接send,因为Ruby的websocket发送的text数据格式只能是utf8。

websocket-eventmachine-server

websocket-eventmachine-server 中提供了二进制模式,只要指定数据类型是binary即可。

ws.send gzip_string , :type => :binary

require 'json'
require 'active_support/gzip'
require 'websocket-eventmachine-server'



h ={aaa: 'bbb' , ccc: 'ddd'}
string = JSON(h)
gzip_string = ActiveSupport::Gzip.compress(string)


EventMachine.run do
  WebSocket::EventMachine::Server.start(:host => "0.0.0.0", :port => 8080) do |ws|

    ws.onopen do
      puts "Client connected"
    end

    ws.onmessage do |msg, type|
      ws.send gzip_string , :type => :binary
    end

    ws.onclose do
      puts "Client disconnected"
    end

  end
end

这时前端看到的效果有点酷了

事实上这也是目前主流的websocket的做法。自带加密效果。

(如果想达到更好的加密,需要把客户端send的message也gzip一下。)

前端js接收数据的方式要改变一下了,用FileReader读取二进制流。

var ws = new WebSocket('ws://localhost:8080/websocket')
ws.onopen = function(){
  ws.send('hello')
}

ws.onmessage = function(e) {
  var blob = e.data;
  var reader = new FileReader();
  reader.readAsBinaryString(blob);
  reader.onload = function (evt) {
    var data = pako.inflate(evt.target.result, { to: 'string' })
    console.log(JSON.parse(data))
  };
}

faye-websocket-ruby

使用websocket-eventmachine-server 需要自己管理进程,比较烦,用faye的话就可以随passenger启动。

faye也是支持binary frame的,但要求我们手工转成二进制array

转换的方法

zip_string =  ActiveSupport::Gzip.compress(JSON(data))
unpack = zip_string.unpack('C*')

全部代码

#config.ru
require 'faye/websocket'
require 'active_support/gzip'
require 'json'

app = lambda do |env|
  if env['PATH_INFO'] == '/websocket'
    ws = Faye::WebSocket.new(env)

    timer = EM.add_periodic_timer(3) do
      data = {aaa: :bbb , ccc: :ddd}
      zip_string =  ActiveSupport::Gzip.compress(JSON(data))
      unpack = zip_string.unpack('C*')
      ws.send(unpack)
    end

    ws.on :message do |event|
      ws.send("You sent: #{event.data}")
    end

    ws.on :close do |event|
      EM.cancel_timer(timer)
      p 'close'
      ws = nil
    end

    # Return async Rack response
    ws.rack_response

  else
    [404, { "Content-Type" => "text/plain" }, ["Not found"]]
  end
end

# See https://www.phusionpassenger.com/library/config/tuning_sse_and_websockets/
if defined?(PhusionPassenger)
  PhusionPassenger.advertised_concurrency_level = 0
end

run app

随passenger的启动方式见 https://github.com/phusion/passenger-ruby-faye-websocket-demo

Action Cable

并没有发现ActionCable有提供Binary传输参数。

分析了一下。ActionCable利用了redis的pub/sub。而直接往redis里推gzip后的数据是推不进去的,但是可以把gzip后的数据base64一下。

class CommentRelayJob < ApplicationJob
  def perform(data)
    ActionCable.server.broadcast "messages:1:comments", Base64.encode64(ActiveSupport::Gzip.compress(data.to_json))
  end
end


这时我们看到的数据是酱紫的

前端只需要base64解码后 gzip解压数据即可食用。

App.comments = App.cable.subscriptions.create "CommentsChannel",
  collection: -> $("[data-channel='comments']")

  connected: ->
    #code

  received: (data) ->
    # console.log data
    console.log JSON.parse pako.inflate(atob(data), { to: 'string' })

压缩效果对比

ActiveSupport::Gzip.compress(data , 9 , 1) #最大压缩比

压缩效果对比

压缩比Base64体积压缩比base64体积增大百分比平均耗时
不压缩1217130%
default2710622.27%0.005862s
default3674730.19%35.56%0.005878s
max2476120.34%0.012196s
max3356727.58%35.56%0.012197s

对比结果,默认压缩性价比比较高。最大压缩比耗时多用了1倍。base64看起来不耗时。

不压缩测试字符串121k,默认压缩后27k,base64后36k,base64后体积增大9k、增大35.56%。

对于没有太多保密需求的应用,用actioncable + gzip+base64 体积压缩了70%,应该还是可以接受的。

代码改动量很小,业务比较容易写,websocket进程管理又可以丢给nginx+passenger。

Action Cable Patch

仔细研究了一下action cable源码,发现其实在ActionCable::Connection::ClientSocket里二进制传输是有预留的,只要给的message是个array就行了。

module ActionCable
  module Connection
    class ClientSocket
      ...
      def transmit(message)
        return false if @ready_state > OPEN
        case message
        when Numeric then @driver.text(message.to_s)
        when String  then @driver.text(message)
        when Array   then @driver.binary(message)
        else false
        end
      end
      ...
    end
  end
end

不过从redis pub过来的数据肯定是个string。

所以需要通过配置或参数,通知actioncable将数据转成binary array。

翻看源码,只需要在ActionCable::Connection::Base初始化时指定encode decode 的coder即可,默认是ActiveSupport::JSON

module ActionCable
  module Connection
    class Base
      include Identification
      include InternalChannel
      include Authorization

      attr_reader :server, :env, :subscriptions, :logger, :worker_pool, :protocol
      delegate :event_loop, :pubsub, to: :server

      def initialize(server, env, coder: ActiveSupport::JSON)
        @server, @env, @coder = server, env, coder

        @worker_pool = server.worker_pool
        @logger = new_tagged_logger

        @websocket      = ActionCable::Connection::WebSocket.new(env, self, event_loop)
        @subscriptions  = ActionCable::Connection::Subscriptions.new(self)
        @message_buffer = ActionCable::Connection::MessageBuffer.new(self)

        @_internal_subscriptions = nil
        @started_at = Time.now
      end
  end
end

coder

module ActionCable
  module Connection
    class coder
      def self.encode(data)    
        ActiveSupport::Gzip.compress(data.to_s)
      end
      def self.decode(data)
        ActiveSupport::Gzip.decompress(data)
      end
    end
  end
end

不过actioncable启动时并没有给配置coder的地方,所以看来需要打patch或提PR了

也可以看看GoEasy文库的其他资料。

发表评论

邮箱地址不会被公开。

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据