Flask实现mysql读写分离

自定义Session类
重写get_bind方法
根据self._flushing判断读写操作, 选择对应的数据库
自定义SQLAlchemy类
重写create_session, 在其中使用自定义的Session类

from flask import Flask
from flask_sqlalchemy import SQLAlchemy, SignallingSession, get_state
from sqlalchemy import orm
 
 
class RoutingSession(SignallingSession):
  def get_bind(self, mapper=None, clause=None):
    state = get_state(self.app)
 
    # 判断读写操作
    if self._flushing: # 写操作 ,使用主数据库
      print("写入数据")
      return state.db.get_engine(self.app, bind='master')
    else: # 读操作, 使用从数据库
      print('读取数据')
      return state.db.get_engine(self.app, bind='slave')
 
 
class RoutingSQLAlchemy(SQLAlchemy):
  def create_session(self, options):
    return orm.sessionmaker(class_=RoutingSession, db=self, **options)
 
 
app = Flask(__name__)
# 设置数据库的连接地址
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@192.168.105.134:3306/demo'
# 设置数据库的绑定地址
app.config['SQLALCHEMY_BINDS'] = {
  'master': "mysql://root:mysql@192.168.105.134:3306/demo",
  'slave': "mysql://root:mysql@192.168.105.134:8306/demo"
}
# 设置是否追踪数据库变化  一般不会开启, 影响性能
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# 设置是否打印底层执行的SQL语句
app.config['SQLALCHEMY_ECHO'] = False
 
# 创建数据库连接对象
db = RoutingSQLAlchemy(app)
 
# 用户表 一
class User(db.Model):
  __tablename__ = 't_user'
  id = db.Column(db.Integer, primary_key=True)
  name = db.Column(db.String(20), unique=True)
 
 
@app.route('/')
def index():
  # 增加数据
  user1 = User(name='zs')
  db.session.add(user1)
  db.session.commit()
 
  # 查询数据
  users = User.query.all()
  print(users)
  return "index"
 
if __name__ == '__main__':
  # 删除所有继承自db.Model的表
  db.drop_all()
  # 创建所有继承自db.Model的表
  db.create_all()
  app.run(debug=True)

发表评论