Posts tagged: sqlite

SQLite DB Migrations with PRAGMA user_version

2017-12-06 06:26:13 ][ Tags: python databases sqlite

This blog is using a simple homegrown blogging engine that I wrote backed by a SQLite database. I have a function in the flask app that performs database migrations. My current approach has been to keep a folder full of migrations and run them sequentially whenever the app starts.

This works well for the case of adding and removing tables since SQLite has the handy IF NOT EXISTS option. However, when you are altering an existing table, this entire model falls apart since IF NOT EXISTS no longer works.

Practically, this means that outside of a fresh install my database migrations are useless.

I am still being stubborn and not using a well written solution like Alembic (which I would highly recommend for a "serious" project) for this blog. Instead, I discovered that SQLite comes with a built in mechanism to keep track of the user schema. This is the pragma statement, and specifically user_version.

Using PRAGMA user_data for DB Migrations

My migrations folder structure looks like this:

├── blog.db
├── blog.py
├── __init__.py
├── migrations
│   ├── 0001_initial_schema.sql
│   ├── 0002_add_unique_index_to_posts_tags.sql
│   ├── 0003_add_fts.sql
│   ├── 0004_add_column_to_post.sql
│   ├── 0005_add_comments_table.sql
│   └── 0006_add_admin_flag_to_comments.sql

As you can see the naming convention is 000N_migration_description.sql. Each migration file has the following statement in it:

PRAGMA user_version=N; (where N is the 000"N" part of the file name)

This steps the current user_version to be equal to the current version as defined by the file name.

The code to do stuff with the database is shown below:

def connect_db():
    """Connects to Database."""
    rv = sqlite3.connect(
        detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES)
    rv.row_factory = sqlite3.Row
    return rv

def get_db():
    """Opens new db connection if there is not an
    existing one for the current app ctx.
    if not hasattr(g, 'sqlite_db'):
        g.sqlite_db = connect_db()
    return g.sqlite_db

def migrate_db():
    """Run database migrations."""

    def get_script_version(path):
        return int(path.split('_')[0].split('/')[1])

    db = get_db()
    current_version = db.cursor().execute('pragma user_version').fetchone()[0]

    directory = os.path.dirname(__file__)
    migrations_path = os.path.join(directory, 'migrations/')
    migration_files = list(os.listdir(migrations_path))
    for migration in sorted(migration_files):
        path = "migrations/{0}".format(migration)
        migration_version = get_script_version(path)

        if migration_version > current_version:
            print("applying migration {0}".format(migration_version))
            with app.open_resource(path, mode='r') as f:
                 print("database now at version {0}".format(migration_version))
            print("migration {0} already applied".format(migration_version))

The relevant part to this blog post is the migrate_db() function. Two things are happening.

  1. The get_script_version() helper function extracts the integer from the migration name.
  2. current_version gets the current value of user_version of your database.
  3. We iterate over each migration file in the migrations folder and perform a simple check. If the migration version is larger than the current_version we run the migration, otherwise it gets skipped.

This solves for most cases and allows for a smooth upgrade path if anyone ever decides to start using this blogging engine for themselves. I am still pretty happy with this approach because this is essentially a fully functional migration system in just a handful of lines of python.