Skip to content

#229: OAuth2 and JWT to Protect a FastAPI Application

The HTTP basic authentication from last week has been working, but it feels a bit messy. When we search for a more current style of authentication, we end up with OAuth2 and JWT. There is an example in the FastAPI documentation that I changed a bit to use a different library for JWT, store the secret key in a .env file and use BCrypt to hash the passwords.

Install PyJWT

To create our JWT tokens, we need a library that transforms the input data into the right fields of JWT. One option we have in Python is PyJWT that we can install with this command:

pip install PyJWT

Why not python-jose?

The official tutorial for FastAPI and OAuth2 uses python-jose for JWT. Unfortunately, as with passlib for BCrypt, the last release was 3 years ago – too long for a security related project.

Another reason against python-jose is that I get this warning with Python 3.12:

======== warnings summary ========
test_jwt.py::test_login_works
test_jwt.py::test_login_mike
****\Python\Python312\Lib\site-packages\jose\jwt.py:311: 
DeprecationWarning: datetime.datetime.utcnow() is deprecated and scheduled for removal 
in a future version. Use timezone-aware objects to represent datetimes in UTC:
datetime.datetime.now(datetime.UTC).
now = timegm(datetime.utcnow().utctimetuple())

 Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html

There is a small migration guide to move from python-jose to PyJWT. To switch the tutorial from python-jose to PyJWT I only needed to change the imports and replace JWTError with InvalidTokenError:

1
2
3
# from jose import JWTError, jwt
import jwt
from jwt import InvalidTokenError

Create a secret key

We can use this command to create a secret key to sign our JWT tokens:

openssl rand -hex 32

This key we can put into a .env file next to the key SECRET_KEY_ENV:

SECRET_KEY_ENV=......

That way we can keep the secret out of version control and still access it with ease.

From username and password to JWT tokens

The OAuth2 and JWT bearer token example needs a bit more work to access the endpoints. We first need to send our username and password to the /token endpoint to get a token that we then use in the header of our requests to the protected endpoint. We can recreate similar tests to last week, but they need more steps and have a bit too much duplication for me – something we address in a future post.

from fastapi.testclient import TestClient
from .main import app

client = TestClient(app)


def test_without_login_gets_401_unauthorized():
    response = client.get("/users/me")
    assert response.status_code == 401


def test_login_with_stanley():
    credentials = {"username": "stanley",
                   "password": "secret"}
    response = client.post("/token", data=credentials)
    assert response.status_code == 200
    jwt = response.json()["access_token"]

    print("*" * 50)
    print(jwt)
    print("*" * 50)

    response_me = client.get("/users/me", 
                             headers={"Authorization": "Bearer " + jwt})
    assert response_me.status_code == 200
    assert response_me.json() == {'username': 'stanley', 
                                  'email': 'Stanley.Jobson@localhost', 
                                  'full_name': 'Stanley Jobson', 
                                  'disabled': False}


def test_login_with_mike():
    credentials = {"username": "mike",
                   "password": "password"}
    response = client.post("/token", data=credentials)
    assert response.status_code == 200
    jwt = response.json()["access_token"]

    print("*" * 50)
    print(jwt)
    print("*" * 50)

    response_me = client.get("/users/me", 
                             headers={"Authorization": "Bearer " + jwt})
    assert response_me.status_code == 200
    assert response_me.json() == {'username': 'mike', 
                                  'email': 'Mike.Doe@localhost', 
                                  'full_name': 'Mike Doe', 
                                  'disabled': False}

The JWT based authentication

I use the last example in the tutorial as the base for this post. While most of the code is the same, there are a few important differences:

  • The secret key is in a .env file that we load at the beginning of our main.py.
  • Instead of python-jose we use PyJWT for the encoding and decoding of the JWT tokens.
  • Instead of passlib we use bcrypt to hash and verify passwords.
  • Our two users from last week are in the fake data store users instead of the demo user of the tutorial.

The extended code for the API now looks like this:

from datetime import datetime, timedelta, timezone
from typing import Annotated

from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
# from jose import JWTError, jwt
import jwt
import bcrypt
from pydantic import BaseModel

import os
from dotenv import load_dotenv

load_dotenv()

# Create a secret key with this command and add it to the .env file
# openssl rand -hex 32
SECRET_KEY = os.getenv('SECRET_KEY_ENV')
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

users = {
    "stanley" : {
        "username": "stanley",
        "full_name": "Stanley Jobson",
        "email": "Stanley.Jobson@localhost",
        "hashed_password": b'$2b$12$HIbxs5kbjinDEUzbQJYqpeTp.GxRgy4m8hdQM4JnSunGQ6VaY5Ld6', # secret
        "disabled": False
    },
    "mike" : {
        "username": "mike",
        "full_name": "Mike Doe",
        "email": "Mike.Doe@localhost",
        "hashed_password": b'$2b$12$JAM3vz8gEZDeNSDAILiaReTmvoNM5EEP33Elhq5fCoTgno4SxfqKO', # password
        "disabled": False
    }
}

class Token(BaseModel):
    access_token: str
    token_type: str


class TokenData(BaseModel):
    username: str | None = None


class User(BaseModel):
    username: str
    email: str | None = None
    full_name: str | None = None
    disabled: bool | None = None


class UserInDB(User):
    hashed_password: str



oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

app = FastAPI()


def verify_password(plain_password, hashed_password):
    return bcrypt.checkpw(plain_password.encode("utf-8"), 
                          hashed_password.encode("utf-8"))


def get_password_hash(password):
    return bcrypt.hash(password.encode("utf-8"), 
                       bcrypt.gensalt(14))


def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)


def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user


def create_access_token(data: dict, 
                        expires_delta: timedelta | None = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.now(timezone.utc) + expires_delta
    else:
        expire = datetime.now(timezone.utc) + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    user = get_user(users, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user


async def get_current_active_user(
    current_user: Annotated[User, Depends(get_current_user)],
):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user


@app.post("/token")
async def login_for_access_token(
    form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
) -> Token:
    user = authenticate_user(users, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return Token(access_token=access_token, token_type="bearer")


@app.get("/users/me/", response_model=User)
async def read_users_me(
    current_user: Annotated[User, Depends(get_current_active_user)],
):
    return current_user


@app.get("/users/me/items/")
async def read_own_items(
    current_user: Annotated[User, Depends(get_current_active_user)],
):
    return [{"item_id": "Foo", "owner": current_user.username}]

With that code and an empty __init__.py in the same folder, we can now run the tests and they should all pass. If you run pytest with the -s option, you should find the tokens in the output. You can copy them and head over to jwt.io to decode them and see their content:

The decoded token of our users

JWT works with the OpenAPI documentation

We can login with username and password with the "Authorize" button at the top so that the built-in JavaScript client in the OpenAPI documentation can grab our JWT token and send it to the authenticated endpoints. That way the flow for the user is comparable to the one for HTTP basic authentication from last week. The main difference for the user is that the login screen offers more fields:

The login screen is like the one for HTTP basic authentication, but it offers us additional fields for client_id and client_secret.

Helpful tutorials

If you want to know a bit more about FastAPI and JWT, I can suggest these three tutorials:

Next

The authentication with JWT gives me a much better feeling. On the other hand, implementing that many methods for authentication by oneself is risky. A dangerous error is only a typo away. Before we look at some pre-built solutions, we need to separate the authentication code from the business endpoints. Unfortunately, next week we first need to fix a warning that popped up while preparing the next example.