sqlalchemy 和 flask-sqlalchemy 分表

对于 sqlalchemy 和 flask-sqlalchemy 分表的方案,网上的资料很少,经过google和自己项目的尝试,找到比较合适的解决的方案。 下面的 user 表根据用户的ID % 100 分布在 0 - 99 张表中。 另外,如果要处理分库的话,只需要在model中加入 __bind_key__,等于分库的规则。 sqlalchemy 分表示例: #!/usr/bin/env python # -*- coding: utf-8 -*- from sqlalchemy import Column, Integer, String, BigInteger, SmallInteger from sqlalchemy import create_engine from sqlalchemy.orm import Session from sqlalchemy.ext.declarative import declarative_base Base = declarative_base() class UserData(object): _mapper = {} @staticmethod def model(user_id): table_index = user_id % 100 class_name = 'user_%d' % table_index ModelClass = UserData._mapper.get(class_name, None) if ModelClass is None: ModelClass = type(class_name, (Base,), dict( __module__=__name__, __name__=class_name, __tablename__='user_%d' % table_index, id=Column(BigInteger, primary_key=True), name=Column(String(255)), picture=Column(String(255)), valid=Column(SmallInteger), utime=Column(Integer), ctime=Column(Integer) )) UserData._mapper[class_name] = ModelClass cls = ModelClass return cls if __name__ == '__main__': engine = create_engine('mysql+pymysql://%s:%s@%s:%s/%s' % ('root', 'root', '127.0.0.1', '3306', 'test'), encoding='utf8') Base.metadata.create_all(engine) session = Session(engine) # user_id = 100001 user_id = 1000001 user = UserData.model(user_id) result_1 = session.query(user).count() result_2 = session.query(user).filter(user.name == "test", user.valid == 0).all() result_3 = session.query(user).filter_by(name="test", valid=0).all() print(result_1, result_2, result_3) flask-sqlalchemy 分表示例: #!/usr/bin/env python # -*- coding: utf-8 -*- from flask import Flask, jsonify from flask_sqlalchemy import SQLAlchemy app = Flask(__name__) app.config.from_pyfile('flask-sqlalchmey-sharding.cfg') db = SQLAlchemy(app) class User(object): '''用户分表''' _mapper = {} @staticmethod def model(user_id): table_index = user_id % 100 # db_index = int((user_id % 100) / 25) class_name = 'user_%d' % table_index ModelClass = User._mapper.get(class_name, None) if ModelClass is None: ModelClass = type(class_name, (db.Model,), { '__module__': __name__, '__name__': class_name, '__tablename__': 'user_%d' % table_index, 'id': db.Column(db.BigInteger, primary_key=True), 'name': db.Column(db.String(100)), 'phone': db.Column(db.String(20)), 'status': db.Column(db.SmallInteger, default=0), 'ctime': db.Column(db.Integer, default=0), 'utime': db.Column(db.Integer, default=0) }) User._mapper[class_name] = ModelClass cls = ModelClass return cls @app.route('/ /test', methods=['GET', 'POST']) def test(user_id): user = User.model(user_id) count = user.query.filter_by(id=user_id).count() item = user.query.filter(user.id == user_id, user.status == 0).first() # default name = item.name if item else "default" return jsonify(count=count, name=name) if __name__ == '__main__': app.run() flask-sqlalchmey-sharding.cfg 文件: SQLALCHEMY_DATABASE_URI = = 'mysql+pymysql://root:root@localhost:3306/test' SQLALCHEMY_ECHO = True SECRET_KEY = '\xfb\x12\xd ...

阅读全文 »

菜谱 14:从.zip文件中读取数据

Python 能够直接读取 zip 文件中的数据。我们现在需要实现这样一个小任务:直接读取一个 zip 文件,获取里面包含的文件列表以及每个文件的大小。

Python 的 zipfile 模块可以轻松地帮助我们解决这个任务:


#!/usr/bin/env python
# -*- coding: utf-8 -*-

import zipfile

z = zipfile.ZipFile("11.zip", "r")
for filename in z.namelist():
        print filename
        bytes = z.read(filename)
        print len(bytes)

这里需要注意地是 zipfile 模块有些 zip 文件是无法处理,具体是里面插入了注释的 zip 文件或者多分卷的 zip 文件。

阅读全文 »

菜谱 13:在文件中搜索以及替换文本

使用命令行简单地替换一个文件中的文本内容,并且生成一个新的自定义文件名的文件。这是我们平时工作中常见的一个小任务,下面的这一段小代码能够轻松地完成这个任务:


#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import sys
usage = "usage: %s search_text replace_text [infilename [outfilename]]" % os.path.basename(
    sys.argv[0])

if len(sys.argv) < 3:
    print usage

else:
    stext = sys.argv[1]
    rtext = sys.argv[2]
    print "There are %s args " % len(sys.argv)

    if len(sys.argv) > 4:
        input = open(sys.argv[3])
        output = open(sys.argv[4], 'w')

        for s in input:
            output.write(s.replace(stext, rtext))

        input.close()
        output.close()

当我们使用 “python cookbook_13.py 1 a test.txt new.txt” 命令行的时候,test.txt 中 1 会被替换成 a,并且替换后的内容写入到 new.txt 中。

注意:infilename,outfilename 这两个参数没有的话,程序并不会报错,但是会输出类似 “There are ...” 的语句。如果命令行参数小于 3 的话,会输出 “usage:。。。”。


阅读全文 »

菜谱 12:使用 UDP 数据包发送消息

数据包发送短的文本消息实现是很简单的并且提供可一个非常轻量级的消息传递通道。但是这种模式有很大的缺陷,就是不保证的数据的可靠性,有可能会存在丢包的情况,甚至严重的情况就是服务器不可用的时候,会完全丢失你的消息。不过这个任务会在有些情况下十分有作用:
  • 你不关心消息是否丢失;
  • 你不想要终止程序只是因为消息无法传递;



#!/usr/bin/env python
# -*- coding: utf-8 -*-

# server.py
import socket
port = 8081
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind(("", port))
print "waiting on port:", port
while 1:
    data, addr = s.recvfrom(1024)
    print data


# client.py
import socket
port = 8081
host = "localhost"
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind(("", 0))
s.sendto("Holy Guido! It's working.", (host, port))

还有一个提醒事项,不要用上面的程序发送大量的数据包,尤其是在 Windows 上。要是想要发送大的消息的话,你可以这样做:

	BUFSIZE = 1024
	while msg:
	    s.sendto(msg[:BUFSIZE], (host, port))
	    msg = msg[BUFSIZE:]

阅读全文 »

菜谱 11:使用列表实现循环数据结构

在一些实际应用中,设计一个循环的数据结构是十分有利的。这里的循环数据结构指的是最后一个元素指向第一元素的结构。Python 内置的 list 能够很容易实现这个任务。


#!/usr/bin/env python
# -*- coding: utf-8 -*-


class Ring(object):

    def __init__(self, l):
        if not len(l):
            raise "ring must have at least one element"
        self._data = l

    def __repr__(self):
        return repr(self._data)

    def __len__(self):
        return len(self._data)

    def __getitem__(self, i):
        return self._data[i]

    def turn(self):
        last = self._data.pop(-1)
        self._data.insert(0, last)

    def first(self):
        return self._data[0]

    def last(self):
        return self._data[-1]

使用这个结构的方式:

	>>> l = [{1:1}, {2:2}, {3:3}]
	>>> r = Ring(l)
	>>> r
	[{1: 1}, {2: 2}, {3: 3}]
	>>> r.first()
	{1: 1}
	>>> r.last()
	{3: 3}
	>>> r.turn()
	>>> r
	[{3: 3}, {1: 1}, {2: 2}]
	>>> r.turn()
	>>> r
	[{2: 2}, {3: 3}, {1: 1}]

阅读全文 »

菜谱 10:统计单词出现的频率

平时我们在工作的时候需要统计一篇文章或者网页出现频率最高的单词,或者需要统计单词出现频率排序。那么如何完成这个任务了?


例如,我们输入的语句是 “Hello there this is a test.  Hello there this was a test, but now it is not.",希望得到的升序的结果:

[[1, 'but'], [1, 'it'], [1, 'not.'], [1, 'now'], [1, 'test,'], [1, 'test.'], [1, 'was'], [2, 'Hello'], [2, 'a'], [2, 'is'], [2, 'there'], [2, 'this']]

得到降序的结果是:

[[2, 'this'], [2, 'there'], [2, 'is'], [2, 'a'], [2, 'Hello'], [1, 'was'], [1, 'test.'], [1, 'test,'], [1, 'now'], [1, 'not.'], [1, 'it'], [1, 'but']]


完成这个结果的代码如下:
#!/usr/bin/env python
# -*- coding: utf-8 -*-


class Counter(object):

    def __init__(self):
        self.dict = {}

    def add(self, item):
        count = self.dict.setdefault(item, 0)
        self.dict[item] = count + 1

    def counts(self, desc=None):
        result = [[val, key] for (key, val) in self.dict.items()]
        result.sort()
        if desc:
            result.reverse()
        return result

if __name__ == '__main__':

    '''Produces:

        >>> Ascending count:
        [[1, 'but'], [1, 'it'], [1, 'not.'], [1, 'now'], [1, 'test,'], [1, 'test.'], [1, 'was'], [2, 'Hello'], [2, 'a'], [2, 'is'], [2, 'there'], [2, 'this']]
        Descending count:
        [[2, 'this'], [2, 'there'], [2, 'is'], [2, 'a'], [2, 'Hello'], [1, 'was'], [1, 'test.'], [1, 'test,'], [1, 'now'], [1, 'not.'], [1, 'it'], [1, 'but']]
    '''

    sentence = "Hello there this is a test.  Hello there this was a test, but now it is not."
    words = sentence.split()
    c = Counter()
    for word in words:
        c.add(word)
    print "Ascending count:"
    print c.counts()
    print "Descending count:"
    print c.counts(1)

阅读全文 »

菜谱 9:soundex 算法

SOUNDEX 返回由四个字符组成的代码 (SOUNDEX) 以评估两个字符串的相似性。返回的第一个字符是输入字符串的第一个字符,返回的第二个字符到第四个字符是数字。

soundex 代码如下:


#!/usr/bin/env python
# -*- coding: utf-8 -*-


def soundex(name, len=4):
    """ soundex module conforming to Knuth's algorithm
        implementation 2000-12-24 by Gregory Jorgensen
        public domain
    """

    # digits holds the soundex values for the alphabet
    digits = '01230120022455012623010202'
    sndx = ''
    fc = ''

    # translate alpha chars in name to soundex digits
    for c in name.upper():
        if c.isalpha():
            if not fc:
                fc = c   # remember first letter
            d = digits[ord(c) - ord('A')]
            # duplicate consecutive soundex digits are skipped
            if not sndx or (d != sndx[-1]):
                sndx += d
    print sndx

    # replace first digit with first alpha character
    sndx = fc + sndx[1:]

    # remove all 0s from the soundex code
    sndx = sndx.replace('0', '')

    # return soundex code padded to len characters
    return (sndx + (len * '0'))[:len]

需要注意的是代码设计为处理英文名称。

阅读全文 »

菜谱 8:支持简单命令行

本任务最初的目的只是为了在测试过程中使用简单的命令行运行不同的函数,类似运行 “python test_test.py” 运行整个测试,运行 “python test_test.py debug” 来运行测试但是不收集运行结果,请看如下的代码:


#!/usr/bin/env python
# -*- coding: utf-8 -*-

import unittest
import sys


class Tests(unittest.TestCase):

    def testAddOnePlusOne(self):
        assert 1 == 2


def main():
    unittest.TextTestRunner().run(test_suite())


def test_suite():
    return unittest.makeSuite(Tests, 'test')


def debug():
    test_suite().debug()

if __name__ == '__main__':

    if len(sys.argv) > 1:
        globals()[sys.argv[1]]()
    else:
        main()

这里如果在命令行中直接运行 “python cookbook_8.py” 就会执行 “main()”;如果在命令行中运行 “python cookbook_8.py debug” 会执行 “debug()”。

“globals()” 返回的是当前全局变量的引用。如果有其它的需求,可以充分利用本任务来延伸!


阅读全文 »

菜谱 7:发送混合邮件

我们平时需要使用 Python 发送各类邮件,这个需求怎么来实现?答案其实很简单,smtplib 和email 库可以帮忙实现这个需求。smtplib 和 email 的组合可以用来发送各类邮件:普通文本,HTML 形式,带附件,群发邮件,带图片的邮件等等。我们这里将会分几节把发送邮件功能解释完成。

smtplib 是 Python 用来发送邮件的模块,email 是用来处理邮件消息。

发送邮件系列最后一篇将会介绍发送混合邮件:里面包含附件,HTML 形式,不同文本:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText

sender = '***'
receiver = '***'
subject = 'python email test'
smtpserver = 'smtp.163.com'
username = '***'
password = '***'

# Create message container - the correct MIME type is multipart/alternative.
msg = MIMEMultipart('mixed')
msg['Subject'] = "Link"

# Create the body of the message (a plain-text and an HTML version).
text = "Hi!\nHow are you?\nHere is the link you wanted:\nhttp://www.python.org"
html = """\

 
Hi!

       How are you?

       Here is the link you wanted.

 

"""

# Record the MIME types of both parts - text/plain and text/html.
part1 = MIMEText(text, 'plain')
part2 = MIMEText(html, 'html')

# Attach parts into message container.
# According to RFC 2046, the last part of a multipart message, in this case
# the HTML message, is best and preferred.
msg.attach(part1)
msg.attach(part2)
# 构造附件
att = MIMEText(open('/Users/1.jpg', 'rb').read(), 'base64', 'utf-8')
att["Content-Type"] = 'application/octet-stream'
att["Content-Disposition"] = 'attachment; filename="1.jpg"'
msg.attach(att)

smtp = smtplib.SMTP()
smtp.connect(smtpserver)
smtp.login(username, password)
smtp.sendmail(sender, receiver, msg.as_string())
smtp.quit()

注意:这里的代码并没有把异常处理加入,需要读者自己处理异常。

阅读全文 »

菜谱 6:群发邮件

我们平时需要使用 Python 发送各类邮件,这个需求怎么来实现?答案其实很简单,smtplib 和email 库可以帮忙实现这个需求。smtplib 和 email 的组合可以用来发送各类邮件:普通文本,HTML 形式,带附件,群发邮件,带图片的邮件等等。我们这里将会分几节把发送邮件功能解释完成。

smtplib 是 Python 用来发送邮件的模块,email 是用来处理邮件消息。

群发邮件的时候需要注意收件人(receiver)的值,它为列表形式:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import smtplib
from email.mime.text import MIMEText

sender = '***'
receiver = ['***', '***', '...', '***']
subject = 'python email test'
smtpserver = 'smtp.163.com'
username = '***'
password = '***'

msg = MIMEText('你好', 'plain', 'utf-8')

msg['Subject'] = subject

smtp = smtplib.SMTP()
smtp.connect(smtpserver)
smtp.login(username, password)
smtp.sendmail(sender, receiver, msg.as_string())
smtp.quit()

注意:这里的代码并没有把异常处理加入,需要读者自己处理异常。

阅读全文 »