. venv/Scripts/activate
uvicorn main:app --reload
requirements.txt
filefastapi uvicorn
# FastAPI fastapi # Local server uvicorn # Validation of values pydantic # For uploading files python-multipart # For static files (used for uploaded files) aiofiles # For database sqlalchemy # For hashing password passlib bcrypt #for loading environment variables python-dotenv # Used for authentication python-jose
.gitignore
filevenv/ __pycache__/
main.py
filefrom fastapi import FastAPI app = FastAPI() @app.get("/") def index(): return {'message': "Hello World!!!"}
python -m venv venv
pip install -r requirements.txt
. venv/Scripts/activate
(venv)
Note: if there are some problems, make sure Git and GitBash are installed in the computer.
uvicorn main:app --reload
fastapi-practice
.gitignore
file with the following content
venv/ __pycache__/
fastapi-practice
folderPerform in the bash terminal:
python -m venv venv
python3 -m venv venv
Perform in the bash terminal:
. venv/Scripts/activate
Expected result on the terminal: (venv) <-- a folder name
Check Virtual Environment was Activated
which python
pip install fastapi
to make endpoints available on a local machine
pip install uvicorn
uvicorn --versionfrom fastapi import FastAPI app = FastAPI() @app.get("/") def index(): return {'message': "Hello World!!!"}
uvicorn main:app --reload
. venv/Scripts/activate
requirements.txt
fastapi uvicorn sqlalchemy
pip install -r requirements.txt
pip freeze > current_requirements.txt
Use JSON Formatter extension for Chrome to see error messages in a nice way
@app.get("/blog/{id}") def get_blog(id): return {'message': f"Blog with id {id}"}
@app.get("/blog/all") def get_all_blog(): return {'message': "All blogs provided"} @app.get("/blog/{id}") def get_blog(id: int): return {'message': f"Blog with id {id}"}
@app.get("/blog/{id}") def get_blog(id: int): return {'message': f"Blog with id {id}"} @app.get("/blog/all") def get_all_blog(): return {'message': "All blogs provided"}
Example:
from enum import Enum class BlogType(str, Enum): short = 'short' story = 'story' howto = 'howto' @app.get("/blog/type/{type}") def get_blog_type(type: BlogType): return {'message': f"Blog type {type}"}
@app.get("/blog/all") def get_blogs(page: int, page_size: int): return {'message': f"All {page_size} blogs on page {page}"}
@app.get("/blog/all") def get_blogs(page = 1, page_size = 10): return {'message': f"All {page_size} blogs on page {page}"}
from typing import Optional @app.get("/blog/all") def get_blogs(page = 1, page_size: Optional[int] = None): return {'message': f"All {page_size} blogs on page {page}"}
@app.get("/blog/{id}/comments/{comment_id}") def get_comment(id: int, comment_id: int, valid: bool = True, username: Optional[str] = None): return {'message': f"Blog_id {id}, comment_id {comment_id}, valid {valid}, username {username}"}
http://127.0.0.1:8000/blog/5/comments/10/?valid=False&username=guest
routers/blog/blog_post.py
from fastapi import APIRouter from models.blog.blog_model import BlogModel router = APIRouter( prefix='/blog', tags=['blog'] ) @router.post("/") def create_blog(blog: BlogModel): title = blog.title return {"data": blog}
models/blog/blog_model.py
from typing import List, Optional from pydantic import BaseModel class BlogModel(BaseModel): title: str content: str number_of_comments: int is_published: Optional[bool] tags: List[str] = [] # default value is [], "tags": ['tag1', 'tag2']
# Example: POST: http://127.0.0.1:8000/blog/25?version=3 @router.post("/{id}") def create_blog(blog: BlogModel, id: int, version: int = 1): return { "id": id, "blog": blog, "version": version }
@router.post('{id}/comment') def create_comment(blog: BlogModel, id: int, comment_id: int = Query(None, title='Id of the comment', <-- not implemented yet description='Some description of comment_id' )): return { "id": id, "blog": blog, "comment_id": comment_id }
@router.post('{id}/comment') def create_comment(blog: BlogModel, id: int, comment_id: int = Query(None, title='Id of the comment', description='Some description of comment_id', alias='commentId' <----- )): return { "id": id, "blog": blog, "comment_id": comment_id }
@router.post('{id}/comment') def create_comment(blog: BlogModel, id: int, comment_id: int = Query(None, title='Id of the comment', description='Some description of comment_id', alias='commentId', deprecated=True )): return { "id": id, "blog": blog, "comment_id": comment_id }
from fastapi import APIRouter, Query, Body, Path from typing import List, Optional from models.blog.blog_model import BlogModel @router.post('{id}/comment') def create_comment(blog: BlogModel, id: int, comment_id: int = Query(None, title='Id of the comment', description='Some description of comment_id', alias='commentId', deprecated=True ), content: str = Body('hi, how are you?') # optional parameter ): return { "id": id, "blog": blog, "comment_id": comment_id, "content": content }
@router.post('{id}/comment') def create_comment(blog: BlogModel, id: int, comment_id: int = Query(None, title='Id of the comment', description='Some description of comment_id', alias='commentId', deprecated=True ), content: str = Body(...) # same as content: str = Body(Ellipsis) ): return { "id": id, "blog": blog, "comment_id": comment_id, "content": content }
@router.post('{id}/comment') def create_comment(blog: BlogModel, id: int, comment_id: int, content: str = Body(..., min_length=10 ) ): return { "id": id, "blog": blog, "comment_id": comment_id, "content": content }
@router.post('{id}/comment') def create_comment(blog: BlogModel, id: int, comment_id: int, content: str = Body(..., min_length=10, max_length=12 ) ): return { "id": id, "blog": blog, "comment_id": comment_id, "content": content }
@router.post('{id}/comment') def create_comment(blog: BlogModel, id: int, comment_id: int, content: str = Body(..., min_length=10, max_length=50, regex='^[a-z\s]*$' ) ): return { "id": id, "blog": blog, "comment_id": comment_id, "content": content }
#Example: POST http://127.0.0.1:8000/blog/1/comment?comment_id=2&ver=1.0&ver=1.1 @router.post('{id}/comment') def create_comment(blog: BlogModel, id: int, comment_id: int, content: str = Body(..., min_length=10, max_length=50, regex='^[a-z\s]*$' ), ver: Optional[List[str]] = Query(None) <---- ): return { "id": id, "blog": blog, "comment_id": comment_id, "content": content, "ver": ver }
{ "id": 1, "blog": { "title": "string", "content": "string", "is_published": true, "number_of_comments": 0 }, "comment_id": 2, "content": "rwhlxapmoajqlfttukakpntpti", "ver": [ "1.0", "1.1" ] }
#Example: POST http://127.0.0.1:8000/blog/1/comment?comment_id=2&ver=2.0&ver=2.1 @router.post('{id}/comment') def create_comment(blog: BlogModel, id: int, comment_id: int, content: str = Body(..., min_length=10, max_length=50, regex='^[a-z\s]*$' ), # ver: Optional[List[str]] = Query(None) ver: Optional[List[str]] = Query(['1.0', '1.1', '1.2']) ): return { "id": id, "blog": blog, "comment_id": comment_id, "content": content, "ver": ver }
{ "id": 1, "blog": { "title": "string", "content": "string", "is_published": true, "number_of_comments": 0 }, "comment_id": 2, "content": "ssssssssss", "ver": [ "1.0", "1.1", "1.2", "2.0" ] }
comment_id: int = Path(None, gt=5)
comment_id: int = Path(None, ge=5)
comment_id: int = Path(None, lt=5)
comment_id: int = Path(None, le=5)
Example: POST http://127.0.0.1:8000/blog1/comment/6?ver=1.0&ver=1.1&ver=1.2 from fastapi import APIRouter, Query, Body, Path from typing import List, Optional from models.blog.blog_model import BlogModel @router.post('{id}/comment/{comment_id}') def create_comment(blog: BlogModel, id: int, comment_id: int = Path(None, gt=5, le=10), # <---- comment_title: int = Query(None, title='Id of the comment', description='Some description of comment_id' ), content: str = Body(..., min_length=10, max_length=50, regex='^[a-z\s]*$' ), ver: Optional[List[str]] = Query(['1.0', '1.1', '1.2']) ): return { "id": id, "blog": blog, "comment_id": comment_id, "comment_title": comment_title, "content": content, "ver": ver }
from typing import List, Dict, Optional from pydantic import BaseModel class Image(BaseModel): url: str alias: str
class BlogModel(BaseModel): title: str content: str number_of_comments: int is_published: Optional[bool] tags: List[str] = [] # default value is [] some_data: Dict[str, str] = {} # {'key': 'val1'} image: Optional[Image] = None
from fastapi import FastAPI, Response, status @app.get("/blog/{id}", status_code=status.HTTP_200_OK) def get_blog(id: int, response: Response): if id > 5: response.status_code = status.HTTP_404_NOT_FOUND return {'error': f'Blog {id} not found'} else: response.status_code = status.HTTP_200_OK return {'message': f"Blog with id {id}"}
Tags structure a documentation in a very nice way
@app.get( "/blog/all", tags=['blog'] ) def get_blogs(page=1, page_size: Optional[int] = None): return {'message': f"All {page_size} blogs on page {page}"}
@app.get("/blog/{id}", status_code=status.HTTP_200_OK, tags=['blog']) def get_blog(id: int, response: Response): if id > 5: response.status_code = status.HTTP_404_NOT_FOUND return {'error': f'Blog {id} not found'} else: response.status_code = status.HTTP_200_OK return {'message': f"Blog with id {id}"}
@app.get("/blog/{id}/comments/{comment_id}", tags=['blog', 'comment']) def get_comment(id: int, comment_id: int, valid: bool = True, username: Optional[str] = None): return {'message': f"Blog_id {id}, comment_id {comment_id}, valid {valid}, username {username}"}
Used for documentation
@app.get( "/blog/all", tags=['blog'], summary="Retrieve all blogs", description="This API call simulates fetching all blogs" ) def get_blogs(page=1, page_size: Optional[int] = None): return {'message': f"All {page_size} blogs on page {page}"}
@app.get("/blog/{id}/comments/{comment_id}", tags=['blog', 'comment']) def get_comment(id: int, comment_id: int, valid: bool = True, username: Optional[str] = None): """ Simulates retrieving a comment of a blog - **id** mandatory path parameter - **comment_id** mandatory path parameter - **valid** optional query parameter - **username** optional query parameter """ return {'message': f"Blog_id {id}, comment_id {comment_id}, valid {valid}, username {username}"}
Info about the output
@app.get( "/blog/all", tags=['blog'], summary="Retrieve all blogs", description="This API call simulates fetching all blogs", response_description="The list of available blogs" <--- ) def get_blogs(page=1, page_size: Optional[int] = None): return {'message': f"All {page_size} blogs on page {page}"}
See project # 463/3
from fastapi import FastAPI from routers.blog import blog_get from routers.blog import blog_post app = FastAPI() app.include_router(blog_get.router) app.include_router(blog_post.router) @app.get("/") def index(): return {'message': "Hello World!!!"}
from fastapi import APIRouter, Response, status from typing import Optional router = APIRouter( prefix='/blog', tags=['blog'] <-- tags for all paths ) @router.get( "/all", summary="Retrieve all blogs", description="This API call simulates fetching all blogs", response_description="The list of available blogs" ) def get_blogs(page=1, page_size: Optional[int] = None): return {'message': f"All {page_size} blogs on page {page}"}
from fastapi import APIRouter router = APIRouter( prefix='/blog', tags=['blog'] ) @router.post("/") def create_blog(): pass
def required_functionality(): return {'message': 'Learning FastAPI is important'}
from fastapi import Depends ... from routers.blog.blog_post import required_functionality
@router.get("/all") def get_blogs(page=1, page_size: Optional[int] = None, req_parameter: dict = Depends(required_functionality)): return { 'message': f"All {page_size} blogs on page {page}", 'info': req_parameter }
Based on Complete FastAPI masterclass from scratch 2023 on Udemy by Catalin Stefan
Step | Folder in the project | File |
---|---|---|
Open TablePlus software on the PC |
||
Prepare requirements.txt |
root | requirements.txt
fastapi uvicorn sqlalchemy passlib bcrypt |
Install Packages |
root |
deactivate . venv/Scripts/activate pip install -r requirements.txt uvicorn main:app --reload http://127.0.0.1:8000/docs |
Database definition |
db\ | database.py |
Model definitionCreation of a user in the database |
db\models\ | db_user_model.py |
Create database |
root | main.py |
Schema definition |
schemas\user\ | user_schema.py |
ORM functionality |
db\models\ | db_user.py |
API functionality |
routers\user\ | user.py |
|
root | fastapi-practice.db |
db\database.py
from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker SQLALCHEMY_DATABASE_URL = "sqlite:///./fastapi-practice.db" <-- name of db file engine = create_engine( SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False} ) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) Base = declarative_base() def get_db(): db = SessionLocal() # try to provide the db, close it at the end try: yield db finally: db.close()
db\models\db_user_model.py
from sqlalchemy.sql.sqltypes import Integer, String, Boolean from db.database import Base from sqlalchemy import Column class DbUser(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True, index=True) # id = Column(Integer, primary_key=True, index=True) # will be automatically incremented when we add a new line in the table username = Column(String) email = Column(String) password = Column(String)
main.py
from fastapi import FastAPI from routers.user import user from routers.blog import blog_get from routers.blog import blog_post from db.models.user import db_user_modal from db.database import engine app = FastAPI() app.include_router(user.router) app.include_router(blog_get.router) app.include_router(blog_post.router) @app.get("/") def index(): return {'message': "Hello World!!!"} db_user_modal.Base.metadata.create_all(engine)
schemas\user\user_schema.py
from pydantic import BaseModel from typing import List, Dict, Optional class UserBase(BaseModel): username: str email: str password: str class UserDisplay(BaseModel): # what will be returned by API username: str email: str class Config(): # allows the system automatically to return database data type (DbUser) into format we provided in UserDisplay class orm_mode = True # object relational mapping - map what we have in the database to this class
db\db_user.py
from sqlalchemy.orm.session import Session from schemas.user.user_schema import UserBase from db.models.user.db_user_modal import DbUser from db.hash import Hash
def create_user(db: Session, request: UserBase): new_user = DbUser( username = request.username, email = request.email, password = Hash.bcrypt(request.password) ) db.add(new_user) db.commit() # actually send the operation to the database db.refresh(new_user) # will get user info with the id created in the database return new_user
def get_all_users(db: Session): return db.query(DbUser).all()
from fastapi import HTTPException, status ... def get_user(db: Session, id: int): # Example of several filters: # db.query(DbUser).filter(DbUser.id == id).filter(DbUser.email == email).first() user = db.query(DbUser).filter(DbUser.id == id).first() if not user: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f'User with id {id} not found') return user
def update_user(db: Session, id: int, request: UserBase): user = db.query(DbUser).filter(DbUser.id == id) user.update({ DbUser.username: request.username, DbUser.email: request.email, DbUser.password: Hash.bcrypt(request.password) }) db.commit() return 'OK'
from fastapi import HTTPException, status...
def delete_user(db: Session, id: int): user = db.query(DbUser).filter(DbUser.id == id).first() if not user: raise HTTPException( status_code = status.HTTP_404_NOT_FOUND, detail = f'User with id {id} not found' ) db.delete(user) db.commit() return {'message': 'OK'}
routers\user\user.py
from fastapi import APIRouter, Response, status, Depends from sqlalchemy.orm import Session from typing import List from schemas.user.user_schema import UserBase from schemas.user.user_schema import UserDisplay from db.database import get_db from db import db_user router = APIRouter( prefix='/user', tags=['user'] )
@router.post('/', response_model=UserDisplay) def create_user(request: UserBase, db: Session = Depends(get_db)): return db_user.create_user(db, request)
@router.get('/', response_model=List[UserDisplay]) def get_all_users(db: Session = Depends(get_db)): return db_user.get_all_users(db)
@router.get('/{id}', response_model=UserDisplay) def get_user(id: int, db: Session = Depends(get_db)): return db_user.get_user(db, id)
@router.post('/{id}') def update_user(id: int, request: UserBase, db: Session = Depends(get_db)): return db_user.update_user(db, id, request)
@router.delete('/{id}') def delete_user(id: int, db: Session = Depends(get_db)): return db_user.delete_user(db, id)
. venv/Scripts/activate uvicorn main:app --reload
As a result a new file named fastapi-practice.db
will be created in the project's root folder. The name is specified in db\database.py
file.
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) detail=f"User with id {id} not found")
exceptions.py
class StoryException(Exception): <--- def __init__(self, name: str): self.name = name
db\db_user.py
from exceptions import StoryException ... # Creation of a user in the database def create_user(db: Session, request: UserBase): if request.username.startswith("myuser"): raise StoryException("Here some text of the exception") <--- new_user = DbUser( username = request.username, email = request.email, password = Hash.bcrypt(request.password) ) db.add(new_user) db.commit() # actually send the operation to the database db.refresh(new_user) # will get user info with the id created in the database return new_user
main
from fastapi import FastAPI, Request from fastapi.responses import JSONResponse from routers.user import user...
from db.models.user import db_user_modal from db.database import engine from exceptions import StoryException <---
app = FastAPI() app.include_router(user.router) ...
@app.get("/") def index(): return {'message': "Hello World!!!"} @app.exception_handler(StoryException) <--- def story_exception_handler(request: Request, exc: StoryException): return JSONResponse( status_code = 418, content={'detail': exc.name} ) db_user_modal.Base.metadata.create_all(engine)
from fastapi.responses import JSONResponse, PlainTextResponse ...
@app.get("/") def index(): return {'message': "Hello World!!!"}...
# Intercept all the exceptions <---- @app.exception_handler(HTTPException) def custom_handler(request: Request, exc: StoryException): return PlainTextResponse(str(exc), status_code=400) db_user_modal.Base.metadata.create_all(engine)
routers\product\product.py
from fastapi import APIRouter from fastapi.responses import Response router = APIRouter( prefix='/product', tags=['product'] ) products = ['watch', 'camera', 'phone'] @router.get("/all") def get_all_products(): # return products # Example of returning data as a plain text data = " ".join(products) # response is: watch camera phone return Response(content=data, media_type="text/plain")
main.py
... from routers.product import product
app = FastAPI() app.include_router(product.router)
product.py
from fastapi.responses import Response, HTMLResponse, PlainTextResponse
@router.get("/{id}", responses={ 200: { "content": { "text/html": { "example": "Product>" }, }, "description": "Return the HTML for an object" }, 404: { "content": { "text/plain": { "example": "Product not found" }, }, "description": "A clear text error message" } }) def get_product(id: int): if id > len(products): # Example: return a plain text instead of HTML result = "Product not found" return PlainTextResponse(status_code=404, content=result, media_type="text/plain") else: product = products[id] result = f""" <head> <style> .product{{ width: px; height: 30px; border: 2px inset green; background-color: lightblue; text-align: center; }} </style> </head> <body> <div class="product">{product}</div> </body> """ return HTMLResponse(content=result, media_type="text/html")
@router.get('/') def fun(custom_header: Optional [str] = Header(None))
@router.get('/') def fun(custom_header: Optional [List[str]] = Header(None))
The list of values are separated by a comma
custom-header
def get_all_users(response: Response): response.headers['c-custom-header'] = 'abc'
from fastapi import APIRouter, Header from fastapi.responses import Response, HTMLResponse, PlainTextResponse from typing import List, Optional
... @router.get("/withheader") def get_products( response: Response, custom_header: Optional[List[str]]=Header(None) ): response.headers['custom_response_header'] = ", ".join(custom_header) return products
response.set_cookie(key="test_cookie", value='test_cookie_value')
test_cookie: Optional[str] = Cookie(None)
product.py
from fastapi import APIRouter, Header, Cookie from fastapi.responses import Response, HTMLResponse, PlainTextResponse from typing import List, Optional ...
@router.get("/all") def get_all_products(): # return products # Example of returning data as a plain text data = " ".join(products) # response is: watch camera phone response = Response(content=data, media_type="text/plain") response.set_cookie(key="test_cookie", value="test_cookie_value") return response
@router.get("/withheader") def get_products( response: Response, custom_header: Optional[List[str]] = Header(None), test_cookie: Optional[str] = Cookie(None) ): if custom_header: response.headers['custom_response_header'] = ", ".join(custom_header) return { 'data': products, 'custom_header': custom_header, 'my_cookie': test_cookie }
python-multipart
package is requiredfrom fastapi import APIRouter, Header, Form, Cookie def create_product(name: str = Form(...)):
product.py
from fastapi import APIRouter, Header, Form, Cookie @router.post('/new') def create_product(product_name: str = Form(...)): products.append(product_name) return products
app.add_middleware( CORSMiddleware, allow_origins=['http://127.0.0.1:3000'], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], )
main.py
from fastapi.middleware.cors import CORSMiddleware ... origins = [ 'http://127.0.0.1:3000' # should not contain '/' at the end ] # Should be at the end app.add_middleware( CORSMiddleware, allow_origins = origins, allow_credentials = True, allow_methods = ["*"], # GET, POST, ... allow_headers = ["*"] # everything )
python-jose
package is requiredrouters\blog.py
from fastapi import APIRouter, Response, status, Depends from auth.oauth2 import get_current_user ...
# Get specific blog based on currently logged in user @router.get("/{id}", status_code=status.HTTP_200_OK) def get_blog(id: int, response: Response, current_user: UserBase = Depends(get_current_user)): # TODO - return user without the password by creating a new class for the response setting response_class= . Search for response_class=FileResponse return { 'data': f"Blog with id {id}", 'current_user': current_user }
openssl rand -hex 32
auth\oauth2.py
from fastapi.security import OAuth2PasswordBearer from fastapi.param_functions import Depends from typing import Optional from datetime import datetime, timedelta from jose import jwt # token end point should be the same in # auth\authentication.py # @router.post('/token') oauth2_schema = OAuth2PasswordBearer(tokenUrl="token") # Generate a unique key for current API in the terminal by running: # openssl rand -hex 32 API_PRIVATE_SECRET_KEY = '89407c7339a6c00544e51af1101c4abb4aea2a31157ca5f7dfd87da02a628107' ALGORITHM = 'HS256' ACCESS_TOKEN_EXPIRE_MINUTES = 30 def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, API_PRIVATE_SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt
auth\authentication.py
from fastapi import APIRouter, HTTPException, status from fastapi.param_functions import Depends from fastapi.security import OAuth2PasswordRequestForm from sqlalchemy.orm.session import Session from db.database import get_db from db.models.user.db_user_modal import DbUser from db.hash import Hash from auth import oauth2 router = APIRouter( tags=['authentication'] ) @router.post('/token') def get_token(request: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)): user = db.query(DbUser).filter(DbUser.username == request.username).first() if not user: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Invalid credentials") if not Hash.verify(user.password, request.password): raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Incorrect password") access_token = oauth2.create_access_token(data={'sub: user.username'}) return { 'access_token': access_token, 'token_type': 'bearer', 'user_id': user.id, 'username': user.username }
def get_current_user(token: str = Depends(oauth2_schema), db: Session = Depends(get_db)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail='Could not validate credentials', headers={"WWW-Authenticate": "Bearer"} ) try: payload = jwt.decode(token, API_PRIVATE_SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception except JWTError: raise credentials_exception user = db_user.get_user_by_username(db, username) if user is None: raise credentials_exception return user
main.py
from auth import authentication from routers.user import user
app = FastAPI() app.include_router(authentication.router) app.include_router(user.router)
db\db_user.py
# Read one user with specified username from the database def get_user_by_username(db: Session, username: str): # Example of several filters: # db.query(DbUser).filter(DbUser.id == id).filter(DbUser.email == email).first() user = db.query(DbUser).filter(DbUser.username == username).first() if not user: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f'User with username {username} not found') return user
main.py
from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from database.database import engine from database.models.post import db_post_model from routers.post import post
app = FastAPI() app.include_router(post.router) origins = [ # should not contain '/' at the end 'http://127.0.0.1:3000' ] app.add_middleware( CORSMiddleware, allow_origins = origins, allow_credentials = True, allow_methods = ["*"], # GET, POST, ... allow_headers=["*"] # everything ) app.mount('/images', StaticFiles(directory='uploads'), name='images') db_post_model.Base.metadata.create_all(engine)
routers\post\post.py
import string import random import shutil from fastapi import APIRouter, UploadFile, File, Depends
@router.post('/image') def upload_image(image: UploadFile = File(...)): letters = string.ascii_letters # generate 6 random letters from this collection rand_letters = ''.join(random.choice(letters) for i in range(6)) random_string = f'_{rand_letters}.' # setting the maxsplit parameter to 1, will return a list with 2 elements # Example: original image name is cat.jpg # final image name is cat_BtFbgZ.jpg filename = random_string.join(image.filename.rsplit('.', 1)) path = f'uploads/{filename}' # Store the file in the path # Override the file if it exist, create the file if it doesn't exist with open(path, "w+b") as buffer: shutil.copyfileobj(image.file, buffer) # return a filename full path return { 'filename': path, 'type': image.content_type }
cat.jpg
{ "filename": "uploads/cat_ZFeKbI.jpg", "type": "image/jpeg" }
def get_file(file: bytes = File(...))
file\file.py
from fastapi import APIRouter, File, Header, Form, Cookie from fastapi.responses import Response, HTMLResponse, PlainTextResponse from typing import List, Optional router = APIRouter( prefix='/file', tags=['file'] ) # Assumptions # - the file contains a text # - the file small because it's stored in memory @router.post('/smallfile') def get_file(file: bytes = File(...)): content = file.decode('utf-8') lines = content.split('\n') return {'lines': lines}
main.py
from routers.file import file
app = FastAPI() app.include_router(file.router)
def get_upload_file(upload_file: UploadFile = File(...))
file\file.py
from fastapi import APIRouter, File, UploadFile import shutil # to play around files on local machine
# Create 'uploaded_files' folder in the root of the project. # All uploaded file will go to this folder @router.post('/uploadfile') def get_uploadfile(upload_file: UploadFile = File(...)): path = f"uploaded_files/{upload_file.filename}" with open(path, "w+b") as buffer: # override the file if it exist, create the file if it doesn't exist shutil.copyfileobj(upload_file.file, buffer) return { 'filename': path, 'type': upload_file.content_type }
# Just for testing @router.post('/uploadfile') def get_uploadfile(upload_file: UploadFile = File(...)): return { 'filename': upload_file.filename }
aiofiles
package is requiredpic.jpg
in uploaded_files
folderfile\file.py
# Create 'uploaded_files' folder in the root of the project. # All uploaded file will go to this folder @router.post('/uploadfile') def get_uploadfile(upload_file: UploadFile = File(...)): path = f"uploaded_files/{upload_file.filename}" with open(path, "w+b") as buffer: # override the file if it exist, create the file if it doesn't exist shutil.copyfileobj(upload_file.file, buffer) return { 'filename': path, 'type': upload_file.content_type }
main.py
from fastapi.staticfiles import StaticFiles app.add_middleware( CORSMiddleware, allow_origins = origins, allow_credentials = True, allow_methods = ["*"], # GET, POST, ... allow_headers=["*"] # everything ) app.mount('/files', StaticFiles(directory="uploaded_files"), name='files')
@router.get("/download/{name}", response_class=FileResponse) def download_file(name: str): ... return path
file\file.py
# Allow to download file located in uploaded_files # TODO - check downloading Excel files @router.get('/download/{name}', response_class=FileResponse) def get_file(name: str): path = f"uploaded_files/{name}" return path
requests
and pytest
packagesimport requests def test_get_all_blogs(): response = requests.get('http://127.0.0.1:8000/blog/all') assert response.status_code == 200
pytest
logger\custom_log.py
def log(tag="", log_level="", message=""): with open("log.txt", "a+") as log: # create a file if doesn't exists log.write(f"{tag}: {log_level} {message} \n")
routers\product\product.py
from logger.custom_log import log @router.get("/all") def get_all_products(): ... log("MyAPI", "INFO", "Call to get all products") return response
log.txt
at the root folder of the project.await
means the process can be paused
async
defines a function with suspendable points
routers\product\product.py
import time async def time_consuming_functionality(): time.sleep(5) # sleep 5 seconds return 'ok'
@router.get("/all") async def get_all_products(): await time_consuming_functionality() data = " ".join(products) response = Response(content=data, media_type="text/plain") return response
@app.middleware("http") async def add_middleware(request: Request, call_next): ... response = await call_next(request) ... return response
main.py
import time ... db_user_modal.Base.metadata.create_all(engine) # Add duration time of each request as part of header @app.middleware("http") async def add_middleware(request: Request, call_next): start_time = time.time() # give current time response = await call_next(request) duration = time.time() - start_time response.headers['duration'] = str(duration) # duration in seconds return response origins = [ 'http://127.0.0.1:3000' # should not contain '/' at the end ]
@router.get("/{id}") def read_item(id: str, bt: BackgroundTasks): bt.add_task(some_functionality, params)
routes\product\product.py
from fastapi import APIRouter, BackgroundTasks from logger.custom_log import log...
def log_product_call(message: str): log("MyAPI", "INFO", message)...
@router.get("/{id}", responses={ ... }) def get_product( id: int, bt: BackgroundTasks ): message = f"Product with id {id} was requested" bt.add_task(log_product_call, message) ...
def get_items(headers = Depends(common_func)): return { 'headers': headers }
main.py
from routers.dependencies import dependencies...
app = FastAPI() app.include_router(dependencies.router)
routers\dependencies\dependencies.py
from fastapi import APIRouter from fastapi.requests import Request from fastapi.param_functions import Depends router = APIRouter( prefix='/dependencies', tags=['dependencies'] )
routers\dependencies\dependencies.py
# Retrieve the request that is being call def convert_header(request: Request): out_headers = [] for key, value in request.headers.items(): out_headers.append(f"{key} -- {value}") return out_headers
# request is gonna be automatically passed to get_items because it is an end point # and end point receive a request by default # as a result, the request will be passed as a parameter # to convert_header @router.get('') def get_items(headers=Depends(convert_header)): return { 'items': ['a', 'b', 'c'], 'headers': headers } @router.post('') def create_item(headers=Depends(convert_header)): return { 'result': 'New item created', 'headers': headers }
def convert_header(request: Request, separator: str = '--'): out_headers = [] for key, value in request.headers.items(): out_headers.append(f"{key} {separator} {value}") return out_headers @router.get('') def get_items(separator: str = '--', headers=Depends(convert_header)): return { 'items': ['a', 'b', 'c'], 'headers': headers } @router.post('') def create_item(headers=Depends(convert_header)): return { 'result': 'New item created', 'headers': headers }
routers\dependencies\dependencies.py
from fastapi import APIRouter from fastapi.requests import Request from fastapi.param_functions import Depends
class Account: def __init__(self, name: str, email: str): self.name = name self.email = email # Creates an account and retrieves an information of the account @router.post('/user') def create_user(name: str, email: str, password: str, account: Account = Depends()): # Same as # def create_user(name: str, email: str, password: str, account: Account = Depends(Account)): # account - perform whatever operations return { 'name': account.name, 'email': account.email }
routers\dependencies\dependencies.py
def convert_params(request: Request, separator: str): query = [] for key, value in request.query_params.items(): query.append(f"{key} {separator} {value}") return query def convert_header(request: Request, separator: str = '--', query = Depends(convert_params)): out_headers = [] for key, value in request.headers.items(): out_headers.append(f"{key} {separator} {value}") return { 'headers': out_headers, 'query': query } @router.get('') def get_items(separator: str = '--', headers=Depends(convert_header)): return { 'items': ['a', 'b', 'c'], 'headers': headers }
router = APIRouter( prefix='/dependencies', tags=['dependencies'], dependencies=[Depends(common_functionality)] )
logger\custom_log.py
from fastapi.requests import Request def log(tag="MyApp", log_level="INFO", message="", request: Request = None): with open("log.txt", "a+") as log: # create a file if doesn't exists log.write(f"{tag}: {log_level} {message} \n") log.write(f"\t{request.url} \n")
routers\dependencies\dependencies.py
from fastapi import APIRouter from fastapi.requests import Request from fastapi.param_functions import Depends from logger.custom_log import log router = APIRouter( prefix='/dependencies', tags=['dependencies'], dependencies=[Depends(log)] )
main.py
app = FastAPI(dependencies=[Depends(common_functionality)])
pip install freeze
pip freeze
requirements-dev.txt
fastapi==0.94.1
When package was installed outside the virtual environment
which python
If it shows general computer folder of Python installation, means there is a problem.
In case everything OK, it shows a folder of the virtual environment and not the general computer folder of Python installation
deactivate
python -m venv venv
. venv/Scripts/activate
which python
It should show a folder of the virtual environment and not the general computer folder of Python installation.
pip install -r requirements.txt