Revoking Tokens
Revoking tokens lets you block a specific token even if it has not expired yet.
Choose which token types should be checked with
authpaseto_denylist_token_checks, then register a callback with
token_in_denylist_loader(). That callback receives the decoded token payload
and should return True when the token has been revoked.
Only the configured token types are checked. For example, if you set
authpaseto_denylist_token_checks=["refresh"], access tokens skip denylist
lookups while refresh tokens still enforce them.
This can be utilized to invalidate token in multiple cases, e.g.: - A user logs out and their currently active tokens need to be invalidated - You detect a replay attack and the leaked tokens need to be blocked
Each generated token includes a UUID in the jti claim, which is typically the
identifier you store in your denylist backend.
Here is a basic example use tokens revoking:
from typing import Dict
from fastapi import FastAPI, HTTPException, Depends, Request
from fastapi.responses import JSONResponse
from fastapi_paseto import AuthPASETO
from fastapi_paseto.exceptions import AuthPASETOException
from pydantic import BaseModel
app = FastAPI()
class User(BaseModel):
username: str
password: str
@AuthPASETO.load_config
def get_config():
return {
"authpaseto_secret_key": "secret",
"authpaseto_denylist_enabled": True,
"authpaseto_denylist_token_checks": ["access", "refresh"],
}
@app.exception_handler(AuthPASETOException)
def authpaseto_exception_handler(request: Request, exc: AuthPASETOException):
return JSONResponse(status_code=exc.status_code, content={"detail": exc.message})
# A storage engine to save revoked tokens. in production,
# you can use Redis for storage system
denylist = set()
# For this example, we are just checking if the tokens jti
# (unique identifier) is in the denylist set. This could
# be made more complex, for example storing the blocked token in Redis
@AuthPASETO.token_in_denylist_loader
def check_if_token_in_denylist(token_payload: Dict):
jti = token_payload["jti"]
return jti in denylist
@app.post("/login")
def login(user: User, Authorize: AuthPASETO = Depends()):
if user.username != "test" or user.password != "test":
raise HTTPException(status_code=401, detail="Bad username or password")
access_token = Authorize.create_access_token(subject=user.username)
refresh_token = Authorize.create_refresh_token(subject=user.username)
return {"access_token": access_token, "refresh_token": refresh_token}
# Standard refresh endpoint.
# A token whose identifier/jti was saved in the denylist will not
# be able to access this endpoint
@app.post("/refresh")
def refresh(Authorize: AuthPASETO = Depends()):
Authorize.paseto_required(refresh_token=True)
current_user = Authorize.get_subject()
new_access_token = Authorize.create_access_token(subject=current_user)
return {"access_token": new_access_token}
# Endpoint for revoking the current users access token
@app.delete("/access-revoke")
def access_revoke(Authorize: AuthPASETO = Depends()):
Authorize.paseto_required()
jti = Authorize.get_token_payload()["jti"]
denylist.add(jti)
return {"detail": "Access token has been revoked"}
# Endpoint for revoking the current users refresh token
@app.delete("/refresh-revoke")
def refresh_revoke(Authorize: AuthPASETO = Depends()):
Authorize.paseto_required(refresh_token=True)
jti = Authorize.get_token_payload()["jti"]
denylist.add(jti)
return {"detail": "Refresh token has been revoke"}
# A token in denylist will not be able to access this any more
@app.get("/protected")
def protected(Authorize: AuthPASETO = Depends()):
Authorize.paseto_required()
current_user = Authorize.get_subject()
return {"user": current_user}
In production, you will usually want a database or in-memory store such as Redis for revoked-token state. A TTL-based store works well because the denylist entry can expire when the token would have expired anyway.
Make sure Redis is running locally before trying the Redis example. One option is:
docker run -d -p 6379:6379 redis
Here is an example that uses Redis for revoking tokens:
from fastapi import FastAPI, HTTPException, Depends, Request
from fastapi.responses import JSONResponse
from fastapi_paseto import AuthPASETO
from fastapi_paseto.exceptions import AuthPASETOException
from pydantic import BaseModel
from datetime import timedelta
from redis import Redis
app = FastAPI()
class User(BaseModel):
username: str
password: str
settings = {
"authpaseto_secret_key": "secret",
"authpaseto_denylist_enabled": True,
"authpaseto_denylist_token_checks": ["access", "refresh"],
"access_expires": timedelta(minutes=15),
"refresh_expires": timedelta(days=30),
}
@AuthPASETO.load_config
def get_config():
return settings
@app.exception_handler(AuthPASETOException)
def authpaseto_exception_handler(request: Request, exc: AuthPASETOException):
return JSONResponse(status_code=exc.status_code, content={"detail": exc.message})
# Setup our redis connection for storing the denylist tokens
redis_conn = Redis(host="localhost", port=6379, db=0, decode_responses=True)
# Create our function to check if a token has been revoked. In this simple
# case, we will just store the tokens jti (unique identifier) in redis.
# This function will return the revoked status of a token. If a token exists
# in redis, token has been revoked
@AuthPASETO.token_in_denylist_loader
def check_if_token_in_denylist(decrypted_token):
jti = decrypted_token["jti"]
entry = redis_conn.get(jti)
return entry
@app.post("/login")
def login(user: User, Authorize: AuthPASETO = Depends()):
if user.username != "test" or user.password != "test":
raise HTTPException(status_code=401, detail="Bad username or password")
access_token = Authorize.create_access_token(subject=user.username)
refresh_token = Authorize.create_refresh_token(subject=user.username)
return {"access_token": access_token, "refresh_token": refresh_token}
# Standard refresh endpoint. Token in denylist will not
# be able to access this endpoint
@app.post("/refresh")
def refresh(Authorize: AuthPASETO = Depends()):
Authorize.paseto_required(refresh_token=True)
current_user = Authorize.get_subject()
new_access_token = Authorize.create_access_token(subject=current_user)
return {"access_token": new_access_token}
# Endpoint for revoking the current users access token
@app.delete("/access-revoke")
def access_revoke(Authorize: AuthPASETO = Depends()):
Authorize.paseto_required()
# Store the tokens in redis with the value true for revoked.
# We can also set an expires time on these tokens in redis,
# so they will get automatically removed after they expired.
jti = Authorize.get_token_payload()["jti"]
redis_conn.setex(jti, settings.access_expires, "true")
return {"detail": "Access token has been revoke"}
# Endpoint for revoking the current users refresh token
@app.delete("/refresh-revoke")
def refresh_revoke(Authorize: AuthPASETO = Depends()):
Authorize.paseto_required(refresh_token=True)
jti = Authorize.get_token_payload()["jti"]
redis_conn.setex(jti, settings.refresh_expires, "true")
return {"detail": "Refresh token has been revoke"}
# A token in denylist will not be able to access this any more
@app.get("/protected")
def protected(Authorize: AuthPASETO = Depends()):
Authorize.paseto_required()
current_user = Authorize.get_subject()
return {"user": current_user}