V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
songcser
V2EX  ›  Python

基于 Sanic 的微服务基础架构

  •  
  •   songcser · 2017-12-26 09:21:54 +08:00 · 3829 次点击
    这是一个创建于 2551 天前的主题,其中的信息可能已经有所发展或是发生改变。

    介绍

    使用 python 做 web 开发面临的一个最大的问题就是性能,在解决 C10K 问题上显的有点吃力。有些异步框架 Tornado、Twisted、Gevent 等就是为了解决性能问题。这些框架在性能上有些提升,但是也出现了各种古怪的问题难以解决。

    在 python3.6 中,官方的异步协程库 asyncio 正式成为标准。在保留便捷性的同时对性能有了很大的提升,已经出现许多的异步框架使用 asyncio。

    使用较早的异步框架是 aiohttp,它提供了 server 端和 client 端,对 asyncio 做了很好的封装。但是开发方式和最流行的微框架 flask 不同,flask 开发简单,轻量,高效。

    微服务是最近最火开发模式,它解决了复杂性问题,提高开发效率,便于部署等优点。

    正是结合这些优点, 以 Sanic 为基础,集成多个流行的库来搭建微服务。Sanic 框架是和 Flask 相似的异步协程框架,简单轻量,并且性能很高。

    本项目就是以 Sanic 为基础搭建的微服务框架。

    特点

    • 使用 sanic 异步框架,简单,轻量,高效。
    • 使用 uvloop 为核心引擎,使 sanic 在很多情况下单机并发甚至不亚于 Golang。
    • 使用 asyncpg 为数据库驱动,进行数据库连接,执行 sql 语句执行。
    • 使用 aiohttp 为 Client,对其他微服务进行访问。
    • 使用 peewee 为 ORM,但是只是用来做模型设计和 migration。
    • 使用 opentracing 为分布式追踪系统。
    • 使用 unittest 做单元测试,并且使用 mock 来避免访问其他微服务。
    • 使用 swagger 做 API 标准,能自动生成 API 文档。

    使用

    项目地址: sanic-ms

    Example

    服务端

    使用 sanic 异步框架,有较高的性能,但是使用不当会造成 blocking, 对于有 IO 请求的都要选用异步库。添加库要慎重。 sanic 使用 uvloop 异步驱动,uvloop 基于 libuv 使用 Cython 编写,性能比 nodejs 还要高。

    功能说明:

    启动前

    @app.listener('before_server_start')
    async def before_srver_start(app, loop):
        queue = asyncio.Queue()
        app.queue = queue
        loop.create_task(consume(queue, app.config.ZIPKIN_SERVER))
        reporter = AioReporter(queue=queue)
        tracer = BasicTracer(recorder=reporter)
        tracer.register_required_propagators()
        opentracing.tracer = tracer
        app.db = await ConnectionPool(loop=loop).init(DB_CONFIG)
    
    • 创建 DB 连接池
    • 创建 Client 连接
    • 创建 queue, 消耗 span,用于日志追踪
    • 创建 opentracing.tracer 进行日志追踪

    中间件

    @app.middleware('request')
    async def cros(request):
        if request.method == 'POST' or request.method == 'PUT':
            request['data'] = request.json
        span = before_request(request)
        request['span'] = span
    
    
    @app.middleware('response')
    async def cors_res(request, response):
        span = request['span'] if 'span' in request else None
        if response is None:
            return response
        result = {'code': 0}
        if not isinstance(response, HTTPResponse):
            if isinstance(response, tuple) and len(response) == 2:
                result.update({
                    'data': response[0],
                    'pagination': response[1]
                })
            else:
                result.update({'data': response})
            response = json(result)
            if span:
                span.set_tag('http.status_code', "200")
        if span:
            span.set_tag('component', request.app.name)
            span.finish()
        return response
    
    • 创建 span, 用于日志追踪
    • 对 response 进行封装,统一格式

    异常处理

    对抛出的异常进行处理,返回统一格式

    任务

    创建 task 消费 queue 中对 span,用于日志追踪

    异步处理

    由于使用的是异步框架,可以将一些 IO 请求并行处理

    Example:

    async def async_request(datas):
        # async handler request
        results = await asyncio.gather(*[data[2] for data in datas])
        for index, obj in enumerate(results):
            data = datas[index]
            data[0][data[1]] = results[index]
    
    @user_bp.get('/<id:int>')
    @doc.summary("get user info")
    @doc.description("get user info by id")
    @doc.produces(Users)
    async def get_users_list(request, id):
        async with request.app.db.acquire(request) as cur:
            record = await cur.fetch(
                """ SELECT * FROM users WHERE id = $1 """, id)
            datas = [
                [record, 'city_id', get_city_by_id(request, record['city_id'])]
                [record, 'role_id', get_role_by_id(request, record['role_id'])]
            ]
            await async_request(datas)
            return record
    

    get_city_by_id, get_role_by_id 是并行处理。

    相关连接

    sanic

    模型设计 & ORM

    Peewee is a simple and small ORM. It has few (but expressive) concepts, making it easy to learn and intuitive to use。

    ORM 使用 peewee, 只是用来做模型设计和 migration, 数据库操作使用 asyncpg。

    Example:

    # models.py
    
    class Users(Model):
        id = PrimaryKeyField()
        create_time = DateTimeField(verbose_name='create time',
                                    default=datetime.datetime.utcnow)
        name = CharField(max_length=128, verbose_name="user's name")
        age = IntegerField(null=False, verbose_name="user's age")
        sex = CharField(max_length=32, verbose_name="user's sex")
        city_id = IntegerField(verbose_name='city for user', help_text=CityApi)
        role_id = IntegerField(verbose_name='role for user', help_text=RoleApi)
    
        class Meta:
            db_table = 'users'
    
    
    # migrations.py
    
    from sanic_ms.migrations import MigrationModel, info, db
    
    class UserMigration(MigrationModel):
        _model = Users
    
        # @info(version="v1")
        # def migrate_v1(self):
        #     migrate(self.add_column('sex'))
    
    def migrations():
        try:
            um = UserMigration()
            with db.transaction():
                um.auto_migrate()
                print("Success Migration")
        except Exception as e:
            raise e
    
    if __name__ == '__main__':
        migrations()
    
    • 运行命令 python migrations.py
    • migrate_v1 函数添加字段 sex, 在 BaseModel 中要先添加 name 字段
    • info 装饰器会创建表 migrate_record 来记录 migrate,version 每个 model 中必须唯一,使用 version 来记录是否执行过,还可以记录 author,datetime
    • migrate 函数必须以**migrate_**开头

    相关连接

    peewee

    数据库操作

    asyncpg is the fastest driver among common Python, NodeJS and Go implementations

    使用 asyncpg 为数据库驱动, 对数据库连接进行封装, 执行数据库操作。

    不使用 ORM 做数据库操作,一个原因是性能,ORM 会有性能的损耗,并且无法使用 asyncpg 高性能库。另一个是单个微服务是很简单的,表结构不会很复杂,简单的 SQL 语句就可以处理来,没必要引入 ORM。使用 peewee 只是做模型设计

    Example:

    sql = "SELECT * FROM users WHERE name=$1"
    name = "test"
    async with request.app.db.acquire(request) as cur:
        data = await cur.fetchrow(sql, name)
    
    async with request.app.db.transaction(request) as cur:
        data = await cur.fetchrow(sql, name)
    
    • acquire() 函数为非事务, 对于只涉及到查询的使用非事务,可以提高查询效率
    • tansaction() 函数为事务操作,对于增删改必须使用事务操作
    • 传入 request 参数是为了获取到 span,用于日志追踪
    • TODO 数据库读写分离

    相关连接

    asyncpg benchmarks

    客户端

    使用 aiohttp 中的 client,对客户端进行了简单的封装,用于微服务之间访问。

    Don ’ t create a session per request. Most likely you need a session per application which performs all requests altogether. A session contains a connection pool inside, connection reusage and keep-alives (both are on by default) may speed up total performance.

    Example:

    @app.listener('before_server_start')
    async def before_srver_start(app, loop):
        app.client =  Client(loop, url='http://host:port')
    
    async def get_role_by_id(request, id):
        cli = request.app.client.cli(request)
        async with cli.get('/cities/{}'.format(id)) as res:
            return await res.json()
    
    @app.listener('before_server_stop')
    async def before_server_stop(app, loop):
        app.client.close()
    
    

    对于访问不同的微服务可以创建多个不同的 client,这样每个 client 都会 keep-alives

    相关连接

    aiohttp

    日志 & 分布式追踪系统

    使用官方 logging, 配置文件为 logging.yml, sanic 版本要 0.6.0 及以上。JsonFormatter 将日志转成 json 格式,用于输入到 ES

    Enter OpenTracing: by offering consistent, expressive, vendor-neutral APIs for popular platforms, OpenTracing makes it easy for developers to add (or switch) tracing implementations with an O(1) configuration change. OpenTracing also offers a lingua franca for OSS instrumentation and platform-specific tracing helper libraries. Please refer to the Semantic Specification.

    装饰器 logger

    @logger(type='method', category='test', detail='detail', description="des", tracing=True, level=logging.INFO)
    async def get_city_by_id(request, id):
        cli = request.app.client.cli(request)
    
    • type: 日志类型,如 method, route
    • category: 日志类别,默认为 app 的 name
    • detail: 日志详细信息
    • description: 日志描述,默认为函数的注释
    • tracing: 日志追踪,默认为 True
    • level: 日志级别,默认为 INFO

    分布式追踪系统

    • OpenTracing 是以 Dapper,Zipkin 等分布式追踪系统为依据, 建立了统一的标准。
    • Opentracing 跟踪每一个请求,记录请求所经过的每一个微服务,以链条的方式串联起来,对分析微服务的性能瓶颈至关重要。
    • 使用 opentracing 框架,但是在输出时转换成 zipkin 格式。 因为大多数分布式追踪系统考虑到性能问题,都是使用的 thrift 进行通信的,本着简单,Restful 风格的精神,没有使用 RPC 通信。以日志的方式输出, 可以使用 fluentd, logstash 等日志收集再输入到 Zipkin。Zipkin 是支持 HTTP 输入的。
    • 生成的 span 先无阻塞的放入 queue 中,在 task 中消费队列的 span。后期可以添加上采样频率。
    • 对于 DB,Client 都加上了 tracing

    相关连接

    opentracing zipkin jaeger

    API 接口

    api 文档使用 swagger 标准。

    Example:

    from sanic_ms import doc
    
    @user_bp.post('/')
    @doc.summary('create user')
    @doc.description('create user info')
    @doc.consumes(Users)
    @doc.produces({'id': int})
    async def create_user(request):
        data = request['data']
        async with request.app.db.transaction(request) as cur:
            record = await cur.fetchrow(
                """ INSERT INTO users(name, age, city_id, role_id)
                    VALUES($1, $2, $3, $4, $5)
                    RETURNING id
                """, data['name'], data['age'], data['city_id'], data['role_id']
            )
            return {'id': record['id']}
    
    • summary: api 概要
    • description: 详细描述
    • consumes: request 的 body 数据
    • produces: response 的返回数据
    • tag: API 标签
    • 在 consumes 和 produces 中传入的参数可以是 peewee 的 model,会解析 model 生成 API 数据, 在 field 字段的 help_text 参数来表示引用对象
    • http://host:ip/openapi/spec.json 获取生成的 json 数据

    相关连接

    swagger

    Response 数据

    在返回时,不要返回 sanic 的 response,直接返回原始数据,会在 Middleware 中对返回的数据进行处理,返回统一的格式,具体的格式可以[查看]

    单元测试

    单元测试使用 unittest。mock 是自己创建了 MockClient,因为 unittest 还没有 asyncio 的 mock,并且 sanic 的测试接口也是发送 request 请求,所以比较麻烦. 后期可以使用 pytest。

    Example:

    from sanic_ms.tests import APITestCase
    from server import app
    
    class TestCase(APITestCase):
        _app = app
        _blueprint = 'visit'
    
        def setUp(self):
            super(TestCase, self).setUp()
            self._mock.get('/cities/1',
                           payload={'id': 1, 'name': 'shanghai'})
            self._mock.get('/roles/1',
                           payload={'id': 1, 'name': 'shanghai'})
    
        def test_create_user(self):
            data = {
                'name': 'test',
                'age': 2,
                'city_id': 1,
                'role_id': 1,
            }
            res = self.client.create_user(data=data)
            body = ujson.loads(res.text)
            self.assertEqual(res.status, 200)
    
    • 其中_blueprint 为 blueprint 名称
    • 在 setUp 函数中,使用_mock 来注册 mock 信息, 这样就不会访问真实的服务器, payload 为返回的 body 信息
    • 使用 client 变量调用各个函数, data 为 body 信息,params 为路径的参数信息,其他参数是 route 的参数

    代码覆盖

    coverage erase
    coverage run --source . -m sanic_ms tests
    coverage xml -o reports/coverage.xml
    coverage2clover -i reports/coverage.xml -o reports/clover.xml
    coverage html -d reports
    
    • coverage2colver 是将 coverage.xml 转换成 clover.xml ,bamboo 需要的格式是 clover 的。

    相关连接

    unittest coverage

    异常处理

    使用 app.error_handler = CustomHander() 对抛出的异常进行处理

    Example:

    from sanic_ms.exception import ServerError
    
    @visit_bp.delete('/users/<id:int>')
    async def del_user(request, id):
        raise ServerError(error='内部错误',code=10500, message="msg")
    
    • code: 错误码,无异常时为 0,其余值都为异常
    • message: 状态码信息
    • error: 自定义错误信息
    • status_code: http 状态码,使用标准的 http 状态码
    2 条回复    2019-07-19 10:01:21 +08:00
    timqi
        1
    timqi  
       2018-01-06 20:36:41 +08:00
    sanic 已经有生产环境使用的了么?
    yeyu123
        2
    yeyu123  
       2019-07-19 10:01:21 +08:00
    赞一个
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   999 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 23ms · UTC 20:03 · PVG 04:03 · LAX 12:03 · JFK 15:03
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.