V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
gjquoiai
V2EX  ›  Python

asyncio & aiohttp 初入坑,向大佬们请教一些问题

  •  
  •   gjquoiai · 2018-05-22 20:04:33 +08:00 · 1871 次点击
    这是一个创建于 2439 天前的主题,其中的信息可能已经有所发展或是发生改变。

    需求是服务端单向向客户端(浏览器)推送消息。由于会出现同一用户同时登录着多个客户端的情况,希望能把消息同时推送给所有的终端。所以我是这么做的:

    async def handler(request):
        admin = Admin(request)
        ws = web.WebSocketResponse()
        await ws.prepare(request)
        request.app['websockets'][admin.id].add(ws)
        try:
            while True:
                msg = await admin.get_unread_msg(admin.id)
                for ws in request.app['websockets'][admin.id]:
                    await ws.send_json({'data': msg})
        finally:
            request.app['websockets'][admin.id].discard(ws)
            await ws.close()
        return ws
    

    question:这么做有没有问题 = =

    然后消息队列用的是 redis + aioredis,上面的 get_unread_msg 我是这么做的:

    async def get_unread_msg(self, admin_id):
        while True:
            data = await self.redis.lpop(f'msgs:{admin_id}')
            if data:
                return data
            await asyncio.sleep(0.1)
    

    初始化 redis:

    async def setup_redis(app):
        redis_url = app['config']['REDIS_URL']
        app['redis'] = await aioredis.create_redis_pool(redis_url)
        yield
        app['redis'].close()
        await app['redis'].wait_closed()
    

    question:我在这里使用 blpop 会阻塞其它请求,这是为什么 = =

    至于哪里来的其它请求,是我写了另一个请求用来添加消息:

    async def add_msg(request):
        redis = request.app['redis']
        msg_id = await redis.incr('msg_id')
        import random
        msg = {
            'id': msg_id,
            'text': f'hello~{msg_id}'
        }
        await redis.rpush('msgs', json.dumps(msg))
    

    这里会把消息都写进一个大列表里,所以还有一个后台任务做消息分发:

    @register_background_tasks
    async def distribute_msgs(app):
        redis = app['redis']
        while True:
            data = await redis.lpop('msgs')
            if data:
                json_data = json.loads(data)
                to_admin = redis.rpush(f'msgs:{json_data["admin_id"]}', data)
                await asyncio.gather(to_sa, to_admin)
            await asyncio.sleep(0.1)
    

    这里换成 blpop 也会阻塞请求= =

    感谢大佬们读到这里,最后一个问题。。

    如果我用 gunicorn 开启了多个 worker,会出现同一用户的多个请求发送到了不同的进程上,就没法保证所有连接都收到消息了。这种情况该怎么办?

    1 条回复    2018-05-23 19:37:54 +08:00
    gjquoiai
        1
    gjquoiai  
    OP
       2018-05-23 19:37:54 +08:00
    o(╥﹏╥)o 路过的大佬请伸出你们的援手~
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2393 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 22ms · UTC 13:26 · PVG 21:26 · LAX 05:26 · JFK 08:26
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.