在日常开发中,我们经常会遇到需要实时获取数据的情况,之前实现这种相似的功能通常都是用ajax长轮询,在HTML5规范中定义了新的通信方式,WebSocket和SSE。websocket相对SSE更常用一些,本文着重来介绍SSE的应用。
SSE API允许网页获得来自服务器的更新(HTML5),用于创建到服务器的单向连接,服务器通过这个连接可以发送任意数量的数据。服务器响应的MIME类型必须是text/event-stream,而且是浏览器中的JavaScript API 能解析格式输出,它可以看作是单向发布-订阅模式。
原理:
SSE本质是发送的不是一次性的数据包,而是一个数据流。与正常的 HTTP 请求一样。服务端连续不断的发送,客户端不会关闭连接,如果连接断开,浏览器会尝试重新连接,如果连接被关闭,客户端可以被告知停止重新连接。
对于客户端只需接收从服务器传入的更新的应用程序,SSE 的半双工通信模型最适合。需要注意的是,SSE只适用于高级浏览器,ie不支持,因为ie上的XMLHttpRequest对象不支持获取部分的响应内容,只有在响应完成之后才能获取其内容,需要使用 polyfilled 方案。
SSE中应用的是EventSource接口,EventSource 是服务器推送的一个网络事件接口。一个 EventSource 实例会对 HTTP 服务开启一个持久化的连接,以text/event-stream 格式发送事件,会一直保持开启直到被要求关闭。一旦连接开启,来自服务端传入的消息会以事件的形式分发至你代码中。如果接收消息中有一个事件字段,触发的事件与事件字段的值相同。如果没有事件字段存在,则将触发通用事件。
事件 描述
- onopen 当通往服务器的连接被打开
- onmessage 当接收到消息
- onerror 当发生错误
方法 描述
- close
关闭连接
EventSource接受事件流格式消息:
事件流仅仅是一个简单的文本数据流,文本应该使用 UTF-8 格式的编码。每条消息后面都由一个空行作为分隔符,每条消息是由多个字段组成的,每个字段由字段名,一个冒号,以及字段值组成。
以冒号开头的行为注释行,会被忽略(stream.write(‘: nn’);),注释行可以用来防止连接超时,服务器可以定期发送一条消息注释行,以保持连接不断。
字段名 描述
- event
事件类型。如果指定了该字段,则在客户端接收到该条消息时,会在当前的EventSource对象上触发一个事件,事件类型就是该字段的字段值,你可以使用addEventListener()
方法在当前 EventSource对象上监听任意类型的命名事件,如果该条消息没有event字段,则会触发onmessage
属性上的事件处理函数。 - id 事件 ID,会成为当前EventSource对象的内部属性"最后一个事件 ID"的属性值。
- data 消息的数据字段。如果该条消息包含多个data字段,则客户端会用换行符把它们连接成一个字符串来作为字段值。
- retry 一个整数值,指定了重新连接的时间 (单位为毫秒),如果该字段值不是整数,则会被忽略。
除了上面规定的字段名,其他所有的字段名都会被忽略。
备注: 如果一行文本中不包含冒号,则整行文本会被解析成为字段名,其字段值为空。
DEMO:
客户端:
mounted () {this.initSSE()},methods: {initSSE () {const source = new EventSource("http://localhost:5000/sse");source.onopen = (data) => {console.log("Connected", data);}source.onmessage = (res) => {console.log("onmessage", res);this.getData(JSON.parse(res.data))}source.onerror = (e) => {console.log("onerror", e, EventSource.CLOSED, EventSource.CONNECTING);if (e.target.readyState === EventSource.CLOSED) {console.log(\'Disconnected\');} else if (e.target.readyState === EventSource.CONNECTING) {console.log(\'Connecting...\');}},// 可自定义 服务端通知source.addEventListener(\'close\', function(e) {console.log(e, \'当前\');source.close()});},getData(data) {this.list = data.listthis.info = data.infothis.index = data.index}}
服务端
const app = new Koa();
app.use(async (ctx, next) => {if (ctx.path !== "/sse") {return await next();}ctx.request.socket.setTimeout(0);ctx.req.socket.setNoDelay(true);ctx.req.socket.setKeepAlive(true);ctx.set({"Content-Type": "text/event-stream","Cache-Control": "no-cache","Connection": "keep-alive",});const stream = new PassThrough();ctx.status = 200;ctx.body = stream;const str = \'赵钱孙李周吴郑王冯陈褚卫蒋沈韩杨朱秦尤许何吕施张孔曹严华金魏陶姜戚谢邹喻柏水窦章云苏潘葛奚范彭郎鲁韦昌马苗凤花方俞任袁柳酆鲍史唐费廉岑薛雷贺倪汤滕殷罗毕郝邬安常乐于时傅皮卞齐康伍余元卜顾孟平黄和穆萧尹姚邵湛汪祁毛禹狄米贝明臧计伏成戴谈宋茅庞熊纪舒屈项祝董梁杜阮蓝闵席季麻强贾路娄危江童颜郭梅盛林刁钟徐邱骆高夏蔡田樊胡凌霍虞万支柯昝管卢莫经房裘缪干解应宗丁宣贲邓郁单杭洪包诸左石崔吉钮龚程嵇邢滑裴陆荣翁荀羊於惠甄曲家封芮羿储靳汲邴糜松井段富巫乌焦巴弓牧隗山谷车侯宓蓬全郗班仰秋仲伊宫宁仇栾暴甘钭厉戎祖武符刘景詹束龙叶幸司韶郜黎蓟薄印宿白怀蒲邰从鄂索咸籍赖卓蔺屠蒙池乔阴鬱胥能苍双闻莘党翟谭贡劳逄姬申扶堵冉宰郦雍郤璩桑桂濮牛寿通边扈燕冀郏浦尚农温别庄晏柴瞿阎充慕连茹习宦艾鱼容向古易慎戈廖庾终暨居衡步都耿满弘匡国文寇广禄阙东欧殳沃利蔚越夔隆师巩厍聂晁勾敖融冷訾辛阚那简饶空曾毋沙乜养鞠须丰巢关蒯相查后荆红游竺权逯盖益桓公万俟司马上官欧阳夏侯诸葛闻人东方赫连皇甫尉迟公羊澹台公冶宗政濮阳淳于单于太叔申屠公孙仲孙轩辕令狐钟离宇文长孙慕容鲜于闾丘司徒司空丌官司寇仉督子车颛孙端木巫马公西漆雕乐正壤驷公良拓跋夹谷宰父谷梁晋楚闫法汝鄢涂钦段干百里东郭南门呼延归海羊舌微生岳帅缑亢况郈有琴梁丘左丘东门西门商牟佘佴伯赏南宫墨哈谯笪年爱阳佟第五言福百家姓终\'let index = 0setInterval(() => {const list = []while(list.length < 10) {const idx = Math.floor(Math.random() * 568)list.push({name: str[idx] + (str[idx + 15] || \'\') + (str[idx - 5] || \'\'),value: idx})}const obj = {list,info: `${new Date()}`,index: ++index}stream.write(\'id: \' + index + \'n\');stream.write(`data: ${JSON.stringify(obj)}nn`);stream.write(\'retry: 10000n\'); // 重连时间// ... 处理判断啥时候终止}, 3000);}).use(ctx => {ctx.status = 200;ctx.body = "ok";}).listen(5000, () => console.log("Listening 5000"));
我们在浏览器network中可以看到,第一次请求状态码200,与普通http请求一致,类型是eventsource
如果服务器返回的数据中包含了事件标识符,也就是我们代码中的id,浏览器会记录最后一次接收的事件的标识符。如果与服务器的连接中断,当浏览器再次进行连接时,会通过http头Last-Event-ID来声明最后一次接收的事件的标识符。服务器端可以通过浏览器发送的事件标识符来确定从哪个事件来继续连接。