Python学习笔记 —— 快速实现WebSocket服务端

记录


#配置WebSocket地址
websocketsSetting = WebSocketSetting("127.0.0.1","9395")
print("WebSocket服务启动,地址:{ip}:{port}".format(ip=websocketsSetting.getHost(),port=websocketsSetting.getPort()))


# Server消息处理
async def onWebSocketService(websocket,path):
    while True:
        try:
            receiverMessage = await websocket.recv()
            # 校验发送的消息不是JSON格式
            if str(receiverMessage).startswith("{") == False and str(receiverMessage).endswith("}") == False:
                continue
            receiverValue = json.loads(receiverMessage)
            code = receiverValue["code"]
            data = receiverValue['data']
            # 如果是心跳包
            if code == 999:
                # 不处理
                continue
           element = {
                "code": code,
                "data": data
            }
            val = json.dumps(element)

            print(f"val: {val}")
            await websocket.send(val)
        #如果是非JSON数据或转化有问题则打印失败
        except JSONDecodeError:
            val = Result.toErrorResult(True)
            await websocket.send(val)
            pass
        except websockets.ConnectionClosedOK:
            pass
        except websockets.ConnectionClosedError:
            pass



# 定义推送消息模式
server = websockets.serve(onWebSocketService,websocketsSetting.getHost(),websocketsSetting.getPort())
asyncio.get_event_loop().run_until_complete(server)
asyncio.get_event_loop().run_forever()

暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇