Python实现半双工的实时通信SSE(Server-Sent Events)

作者 : admin 本文共1426个字,预计阅读时间需要4分钟 发布时间: 2024-06-10 共2人阅读

Python实现半双工的实时通信SSE(Server-Sent Events)

1 简介

实现实时通信一般有WebSocket、Socket.IO和SSE(Server-Sent Events)三种方法。WebSocket和Socket.IO是全双工的实时双向通信技术,适合用于聊天和会话等,但相对于SSE比较笨重,SSE适合用于服务器主动向客户端实时推送数据,例如:用于大模型实时对话。

WebSocket是一种HTML5提供的全双工通信协议,它基于TCP在客户端和服务器之间建立持久性的连接,实现两者之间实时双向数据通信。

Socket.IO是一个封装了 Websocket 的实时双向数据通信库,它封装了自动重连、自动检测网络状况和自动跨浏览器兼容性等。

SSE(Server-Sent Events)是一种利用 HTTP 协议长连接特性,在服务器与客户端之间建立持久化连接,实现服务器主动向客户端推送数据的半双工实时数据通信技术,也被称为“事件流”(Event Stream)。

本文使用Python和Vue3实现SSE的实时通信,现在浏览器支持EventSource,不需要额外安装依赖包。

2 前端Vue3代码




  

{{ msg }}

.read-the-docs { color: #888; }

3 后端Python代码

# 导入所需的模块
import json
import time
import datetime
from flask_cors import CORS
from flask import Flask, request, Response

app = Flask(__name__)
# 解决跨域问题
CORS(app, supports_credentials=True)


def get_data():
    # 获取当前时间,并转换为 JSON 格式
    dt_ms = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
    return json.dumps({'time': dt_ms}, ensure_ascii=False)


@app.route('/sse')
def stream():
    data_id = request.args.get('data_id')
    print(data_id)
    return Response(eventStream(), mimetype="text/event-stream")


def build_message(message: str, event="message"):
    """
    构建消息
    :param message: 数据消息
    :param event: 事件,默认事件是“message”,可以根据自己的需求定制事件,对应前端的eventSource.addEventListener('message',()=>{}, false)中的message。
    :return:
    """
    head = "event:" + event + "
" + "data:"
    tail = "

"
    return head + message + tail

def eventStream():
    id = 0
    while True:
        id += 1
        # 睡眠
        time.sleep(1)

        str_out = build_message(get_data())

        print(str_out)
        # 构建迭代器
        yield str_out


if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000, debug=True)

4 执行结果

Python实现半双工的实时通信SSE(Server-Sent Events)插图

本站无任何商业行为
个人在线分享 » Python实现半双工的实时通信SSE(Server-Sent Events)
E-->