Ruby 的 Websocket Server 发送压缩后的 Binary Frame 格式的数据
正常使用websocket,通常发送的数据格式都是json。 但是在做强实时应用时,如果每秒发几百k的数据的话,带宽还是比较捉急的,需要压缩一下。
Node js
用nodejs做server的话比较简单。 nodejs可直接推送gzip后的数据。 引入一个叫pako的处理gzip的库 https://github.com/nodeca/pako pako.deflate
var WebSocket = require('faye-websocket'), http = require('http'); var pako = require('pako'); var server = http.createServer(); server.on('upgrade', function(request, socket, body) { if (WebSocket.isWebSocket(request)) { var ws = new WebSocket(request, socket, body); ws.on('open' , function(event){ setInterval(function(){ var data = {'aaa' : 'aaa' , 'bbb' : 'bbb'} var input = JSON.stringify(data) var output = pako.deflate(input , {to: 'string'}); ws.send(output) } , 1000) }); ws.on('message', function(event) { }); ws.on('close', function(event) { console.log('close', event.code, event.reason); // ws = null; }); } }); server.listen(8080);
前端直接解压数据后即可使用 pako.inflate
var ws = new WebSocket('ws://localhost:8080/websocket') ws.onopen = function(){ ws.send('hello') } ws.onmessage = function(e) { var blob = e.data; console.log(pako.inflate(e.data, { to: 'string' })) }
我们看到的效果是这样的

看起来是text格式的。
ruby gzip压缩只需要
require 'active_support/gzip' string = 'balabala' gzip_string = ActiveSupport::Gzip.compress(string)
但是在Ruby的websocket server中,压缩后的数据,无法直接send,因为Ruby的websocket发送的text数据格式只能是utf8。
websocket-eventmachine-server
websocket-eventmachine-server 中提供了二进制模式,只要指定数据类型是binary即可。
ws.send gzip_string , :type => :binary
require 'json' require 'active_support/gzip' require 'websocket-eventmachine-server' h ={aaa: 'bbb' , ccc: 'ddd'} string = JSON(h) gzip_string = ActiveSupport::Gzip.compress(string) EventMachine.run do WebSocket::EventMachine::Server.start(:host => "0.0.0.0", :port => 8080) do |ws| ws.onopen do puts "Client connected" end ws.onmessage do |msg, type| ws.send gzip_string , :type => :binary end ws.onclose do puts "Client disconnected" end end end
这时前端看到的效果有点酷了

事实上这也是目前主流的websocket的做法。自带加密效果。
(如果想达到更好的加密,需要把客户端send的message也gzip一下。)
前端js接收数据的方式要改变一下了,用FileReader读取二进制流。
var ws = new WebSocket('ws://localhost:8080/websocket') ws.onopen = function(){ ws.send('hello') } ws.onmessage = function(e) { var blob = e.data; var reader = new FileReader(); reader.readAsBinaryString(blob); reader.onload = function (evt) { var data = pako.inflate(evt.target.result, { to: 'string' }) console.log(JSON.parse(data)) }; }
faye-websocket-ruby
使用websocket-eventmachine-server 需要自己管理进程,比较烦,用faye的话就可以随passenger启动。
faye也是支持binary frame的,但要求我们手工转成二进制array

转换的方法
zip_string = ActiveSupport::Gzip.compress(JSON(data)) unpack = zip_string.unpack('C*')
全部代码
#config.ru require 'faye/websocket' require 'active_support/gzip' require 'json' app = lambda do |env| if env['PATH_INFO'] == '/websocket' ws = Faye::WebSocket.new(env) timer = EM.add_periodic_timer(3) do data = {aaa: :bbb , ccc: :ddd} zip_string = ActiveSupport::Gzip.compress(JSON(data)) unpack = zip_string.unpack('C*') ws.send(unpack) end ws.on :message do |event| ws.send("You sent: #{event.data}") end ws.on :close do |event| EM.cancel_timer(timer) p 'close' ws = nil end # Return async Rack response ws.rack_response else [404, { "Content-Type" => "text/plain" }, ["Not found"]] end end # See https://www.phusionpassenger.com/library/config/tuning_sse_and_websockets/ if defined?(PhusionPassenger) PhusionPassenger.advertised_concurrency_level = 0 end run app
随passenger的启动方式见 https://github.com/phusion/passenger-ruby-faye-websocket-demo
Action Cable
并没有发现ActionCable有提供Binary传输参数。
分析了一下。ActionCable利用了redis的pub/sub。而直接往redis里推gzip后的数据是推不进去的,但是可以把gzip后的数据base64一下。
class CommentRelayJob < ApplicationJob def perform(data) ActionCable.server.broadcast "messages:1:comments", Base64.encode64(ActiveSupport::Gzip.compress(data.to_json)) end end
这时我们看到的数据是酱紫的

前端只需要base64解码后 gzip解压数据即可食用。
App.comments = App.cable.subscriptions.create "CommentsChannel", collection: -> $("[data-channel='comments']") connected: -> #code received: (data) -> # console.log data console.log JSON.parse pako.inflate(atob(data), { to: 'string' })
压缩效果对比
ActiveSupport::Gzip.compress(data , 9 , 1) #最大压缩比
压缩效果对比
压缩比 | Base64 | 体积 | 压缩比 | base64体积增大百分比 | 平均耗时 |
---|---|---|---|---|---|
不压缩 | 121713 | 0% | |||
default | 27106 | 22.27% | 0.005862s | ||
default | 是 | 36747 | 30.19% | 35.56% | 0.005878s |
max | 24761 | 20.34% | 0.012196s | ||
max | 是 | 33567 | 27.58% | 35.56% | 0.012197s |
对比结果,默认压缩性价比比较高。最大压缩比耗时多用了1倍。base64看起来不耗时。
不压缩测试字符串121k,默认压缩后27k,base64后36k,base64后体积增大9k、增大35.56%。
对于没有太多保密需求的应用,用actioncable + gzip+base64 体积压缩了70%,应该还是可以接受的。
代码改动量很小,业务比较容易写,websocket进程管理又可以丢给nginx+passenger。
Action Cable Patch
仔细研究了一下action cable源码,发现其实在ActionCable::Connection::ClientSocket里二进制传输是有预留的,只要给的message是个array就行了。
module ActionCable module Connection class ClientSocket ... def transmit(message) return false if @ready_state > OPEN case message when Numeric then @driver.text(message.to_s) when String then @driver.text(message) when Array then @driver.binary(message) else false end end ... end end end
不过从redis pub过来的数据肯定是个string。
所以需要通过配置或参数,通知actioncable将数据转成binary array。
翻看源码,只需要在ActionCable::Connection::Base初始化时指定encode decode 的coder即可,默认是ActiveSupport::JSON
module ActionCable module Connection class Base include Identification include InternalChannel include Authorization attr_reader :server, :env, :subscriptions, :logger, :worker_pool, :protocol delegate :event_loop, :pubsub, to: :server def initialize(server, env, coder: ActiveSupport::JSON) @server, @env, @coder = server, env, coder @worker_pool = server.worker_pool @logger = new_tagged_logger @websocket = ActionCable::Connection::WebSocket.new(env, self, event_loop) @subscriptions = ActionCable::Connection::Subscriptions.new(self) @message_buffer = ActionCable::Connection::MessageBuffer.new(self) @_internal_subscriptions = nil @started_at = Time.now end end end
coder
module ActionCable module Connection class coder def self.encode(data) ActiveSupport::Gzip.compress(data.to_s) end def self.decode(data) ActiveSupport::Gzip.decompress(data) end end end end
不过actioncable启动时并没有给配置coder的地方,所以看来需要打patch或提PR了
也可以看看GoEasy文库的其他资料。