sqlalchemy 和 flask-sqlalchemy 分表

对于 sqlalchemy 和 flask-sqlalchemy 分表的方案,网上的资料很少,经过google和自己项目的尝试,找到比较合适的解决的方案。

下面的 user 表根据用户的ID % 100 分布在 0 - 99 张表中。


另外,如果要处理分库的话,只需要在model中加入 __bind_key__,等于分库的规则。


sqlalchemy 分表示例:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from sqlalchemy import Column, Integer, String, BigInteger, SmallInteger
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class UserData(object):
    _mapper = {}

    @staticmethod
    def model(user_id):
        table_index = user_id % 100
        class_name = 'user_%d' % table_index
        ModelClass = UserData._mapper.get(class_name, None)
        if ModelClass is None:
            ModelClass = type(class_name, (Base,), dict(
                __module__=__name__,
                __name__=class_name,
                __tablename__='user_%d' % table_index,
                id=Column(BigInteger, primary_key=True),
                name=Column(String(255)),
                picture=Column(String(255)),
                valid=Column(SmallInteger),
                utime=Column(Integer),
                ctime=Column(Integer)

            ))
            UserData._mapper[class_name] = ModelClass
        cls = ModelClass
        return cls


if __name__ == '__main__':
    engine = create_engine('mysql+pymysql://%s:%s@%s:%s/%s' % ('root',
                                                               'root',
                                                               '127.0.0.1',
                                                               '3306',
                                                               'test'), encoding='utf8')
    Base.metadata.create_all(engine)

    session = Session(engine)

    # user_id = 100001
    user_id = 1000001
    user = UserData.model(user_id)
    result_1 = session.query(user).count()
    result_2 = session.query(user).filter(user.name == "test", user.valid == 0).all()
    result_3 = session.query(user).filter_by(name="test", valid=0).all()
    print(result_1, result_2, result_3)

flask-sqlalchemy 分表示例:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from flask import Flask, jsonify
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)
app.config.from_pyfile('flask-sqlalchmey-sharding.cfg')
db = SQLAlchemy(app)


class User(object):
    '''用户分表'''
    _mapper = {}

    @staticmethod
    def model(user_id):
        table_index = user_id % 100
        # db_index = int((user_id % 100) / 25)
        class_name = 'user_%d' % table_index

        ModelClass = User._mapper.get(class_name, None)
        if ModelClass is None:
            ModelClass = type(class_name, (db.Model,), {
                '__module__': __name__,
                '__name__': class_name,
                '__tablename__': 'user_%d' % table_index,
                'id': db.Column(db.BigInteger, primary_key=True),
                'name': db.Column(db.String(100)),
                'phone': db.Column(db.String(20)),
                'status': db.Column(db.SmallInteger, default=0),
                'ctime': db.Column(db.Integer, default=0),
                'utime': db.Column(db.Integer, default=0)

            })
            User._mapper[class_name] = ModelClass

        cls = ModelClass
        return cls


@app.route('//test', methods=['GET', 'POST'])
def test(user_id):
    user = User.model(user_id)
    count = user.query.filter_by(id=user_id).count()
    item = user.query.filter(user.id == user_id, user.status == 0).first()
    # default
    name = item.name if item else "default"
    return jsonify(count=count, name=name)


if __name__ == '__main__':
    app.run()


flask-sqlalchmey-sharding.cfg 文件: 

SQLALCHEMY_DATABASE_URI = = 'mysql+pymysql://root:root@localhost:3306/test'
SQLALCHEMY_ECHO = True
SECRET_KEY = '\xfb\x12\xdf\xa1@i\xd6>V\xc0\xbb\x8fp\x16#Z\x0b\x81\xeb\x16'
DEBUG = True


github地址:https://github.com/sixu05202004/sharding