app离线消息推送服务器,在线消息推送和离线消息推送(3)

更新时间:2023-07-01 11:39:51 阅读: 评论:0

app离线消息推送服务器,在线消息推送和离线消息推送(3)⾸先介绍⼀下具体的功能设计。
整个系统由服务端、客户端组成。客户端包含前端(浏览器端、pyqt桌⾯端)和后台(⽤于添加要推送的消息)。
因为浏览器端不容易注意到实时提醒,所以我将实时提醒功能放到pyqt桌⾯端中实现(如托盘区闪烁等醒⽬的⽅式)。
浏览器端中只对消息进⾏拉取操作(刷新⽹页时),⽽不采取实时推的⽅式。
pyqt桌⾯端主要接收新消息提⽰即可,通知⽤户到⽹页端查看最新消息。(登录时主动拉取⼀次消息判断有⽆新消息,或者服务器主动推送所有新消息)
实时推送消息和主动拉取消息两个功能实际上是完全分离的,可以独⽴⾏使各⾃的职能。
不过下⾯我就不写浏览器端的代码了, 直接把相关的逻辑也放到pyqt⾥实现。
然后再来讲⼀下拉取服务端消息的逻辑。
电视剧观后感每个⽤户拥有⾃⼰的消息库。当消息库为空时,⼀次性拉取服务端所有消息⼊⽤户库;当消息库内有消
息时,每次拉取只拉取⽤户库内最新的消息发布时间之后的消息(将未⼊⽤户库的消息⼊库后再从⽤户库拉取)。
代码共5部分
1.鉴权服务器:⽤于服务和服务之间对⽤户提供的token做权限检查
2.消息数据服务器:⽤于读取和处理⽤户消息在数据库中的状态。
3.消息在线推送服务器:将管理员发布的消息在线实时推送给所有在线⽤户(若使⽤发布器的在线推送功能)
4.管理员消息发布器:分为在线推送和离线⼊库两个功能,可独⽴或组合使⽤。在线推送消息不会⼊消息库。
5.⽤户消息器:登陆后可接收管理员推送的在线消息,也可主动拉取⽤户对应消息库中的所有已读、未读消息,并标记消息的已读、未读状态。
下⾯是代码部分(为了简化代码逻辑,后⾯代码中的token即⽤户名。)
1.鉴权服务器
from flask import Flask, request, jsonify
app = Flask(__name__)
class Lib:
@staticmethod
def auth_admin_token(token):
return True
@staticmethod
def auth_ur_token(token):
return True
@ute('/admin', methods = ['POST'])
def admin_auth_token():
data = request.json
if Lib.auth_admin_token(data['token']):
return jsonify({'code': 200, 'msg': '鉴权成功'})销售怎么做
return jsonify({'code': 400, 'msg': '鉴权失败'})
@ute('/ur', methods = ['POST'])
def ur_auth_token():
data = request.json
if Lib.auth_ur_token(data['token']):
return jsonify({'code': 200, 'msg': '鉴权成功'})
return jsonify({'code':400, 'msg': '鉴权失败'})
if __name__ == '__main__':
app.run(host = '0.0.0.0', port = 5009)
2.消息数据服务器
# coding : utf-8
# author : ['Wang Suyin', ]
# data : 2020/2/21 14:14
尘与土# software : PyCharm
# python_version : '3.5.3 64bit'
# file : 2.消息数据服务器.py
"""
说明⽂档:
"""
import datetime
import requests
import json
from flask import Flask,request,jsonify
app=Flask(__name__)
# 虚拟数据
db = {undefined
'message_table': [遥远的什么填空
{'id': 1, 'message': '消息1', 'publish_time': (2020, 1, 20, 3, 4, 5)}, {'id': 2, 'message': '消息2', 'publish_time': (2020, 1, 20, 3, 4, 6)}, {'id': 3, 'message': '消息3', 'publish_time': (2020, 1, 20, 3, 4, 7)}, ],
'ur_to_messages_table': [
{'urname': 'tester', 'message_id': 1, 'read': True},
{'urname': 'tester', 'message_id': 2, 'read': Fal},
],
电销贷款最佳话术}
class Utils:
@staticmethod
def auth_admin_token(token):
data = {'token': token}
r = requests.post(url = '127.0.0.1:5009/admin', json = data)
r = json.)
if r['code'] == 200:
动的成语return True
return Fal
@staticmethod
def add_message(message):
t = w()
publish_time = (t.year, t.month, t.day, t.hour, t.minute, t.cond)
db['message_table'].append({'id':len(db['message_table'])+1,'message': message, 'publish_time': publish_time})
@staticmethod
def get_ur_messages(urname):
print(db['message_table'])
# 拉取新消息⼊⽤户库
# 原先考虑的是拉取最新时间后的消息,⽬前看来求消息ID表的差集更简单些,也更保险些(防⽌遗漏)。当然,要根据具体场景优化,代码也要更换为数据库代码,这⾥仅为⽰意。
for message_id in list(
t([e['id'] for e in db['message_table']])
- t([e['message_id'] for e in [e_ for e_ in db['ur_to_messages_table'] if e_['urname']==urname]])
):
db['ur_to_messages_table'].append({'urname': urname, 'message_id': message_id, 'read': Fal})
# 在通知量少的情况下,可⼀次性返回所有内容。
# 在通知量多的情况下,可分页返回,也可只返回未读消息,已读消息由客户端本地保存。
# 简单起见,以下⼀次性返回所有内容
res = []
for e in db['ur_to_messages_table']:
if e['urname'] == urname:
d = [e_ for e_ in db['message_table'] if e_['id'] == e['message_id']][0]
d.update({'read': e['read']})
res.append(d)
return res
@staticmethod
def mark_as_read(urname, message_id,read_status):
#将消息标记为已读、未读状态,⽐较简单,就不具体谢写了
pass
@ute('/add_message', methods=['POST'])
def add_message():
data = request.json
token = data['token']
r = requests.post(url='127.0.0.1:5009/admin',json={ 'token':token } ) r = json.)
if not r['code'] == 200:
return jsonify({'code':400,'msg':'鉴权失败'})
Utils.add_message(data['message'])
print('新的消息⼊库成功')
return jsonify({'code': 200, 'msg': '消息发布成功'})
@ute('/get_ur_messages', methods=['POST'])
一年级读书笔记def get_ur_messages():
data = request.json
token = data['token']
if not Utils.auth_admin_token(token):
return jsonify({'code':400,'msg':'鉴权失败'})
urname = token
data = _ur_messages(urname)
return jsonify({'code': 200, 'msg': '拉取⽤户消息成功','data':data})
@ute('/mark_as_read', methods=['POST'])
def mark_as_read():
data = request.json
token = data['token']
if not Utils.auth_admin_token(token):
return jsonify({'code':400,'msg':'鉴权失败'})
urname = token
message_id = data['message_id']
read_status = data['read_status']
data = Utils.mark_as_read(urname, message_id,read_status) return jsonify({'code': 200, 'msg': '拉取⽤户消息成功','data':data}) if __name__ == '__main__':
app.run(host = '0.0.0.0', port = 5008)
3.消息在线推送服务器
# coding : utf-8
# author : ['Wang Suyin', ]
# data : 2020/2/20 16:48
# software : PyCharm
# python_version : '3.5.3 64bit'
# file : 3.消息在线推送服务器.py
"""
说明⽂档:
"""
import requests
import json
from flask import Flask
from flask_sockets import Sockets
app = Flask(__name__)
sockets = Sockets(app)
ws_pool = [] # 推送⽬标池
def auth_admin_token(token):
会计实习报告3000字data = { 'token':token }
r = requests.post(url='127.0.0.1:5009/admin',json=data ) r = json.)
if r['code'] == 200:
return True
return Fal
def auth_ur_token(token):
data = { 'token':token }
r = requests.post(url='127.0.0.1:5009/ur',json=data ) r = json.)

本文发布于:2023-07-01 11:39:51,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/fan/82/1072059.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:消息   推送   拉取   服务器
相关文章
留言与评论(共有 0 条评论)
   
验证码:
推荐文章
排行榜
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图