Update with Extra Data (Hashed Passwords) with FastAPI¶
In the previous chapter I explained to you how to update data in the database from input data coming from a FastAPIpath operation.
Now I'll explain to you how to add extra data, additional to the input data, when updating or creating a model object.
This is particularly useful when you need to generate some data in your code that is not coming from the client, but you need to store it in the database. For example, to store a hashed password.
fromfastapiimportFastAPI,HTTPException,QueryfromsqlmodelimportField,Session,SQLModel,create_engine,selectclassHeroBase(SQLModel):name:str=Field(index=True)secret_name:strage:int|None=Field(default=None,index=True)classHero(HeroBase,table=True):id:int|None=Field(default=None,primary_key=True)hashed_password:str=Field()classHeroCreate(HeroBase):password:strclassHeroPublic(HeroBase):id:intclassHeroUpdate(SQLModel):name:str|None=Nonesecret_name:str|None=Noneage:int|None=Nonepassword:str|None=Nonesqlite_file_name="database.db"sqlite_url=f"sqlite:///{sqlite_file_name}"connect_args={"check_same_thread":False}engine=create_engine(sqlite_url,echo=True,connect_args=connect_args)defcreate_db_and_tables():SQLModel.metadata.create_all(engine)defhash_password(password:str)->str:# Use something like passlib herereturnf"not really hashed {password} hehehe"app=FastAPI()@app.on_event("startup")defon_startup():create_db_and_tables()@app.post("/heroes/",response_model=HeroPublic)defcreate_hero(hero:HeroCreate):hashed_password=hash_password(hero.password)withSession(engine)assession:extra_data={"hashed_password":hashed_password}db_hero=Hero.model_validate(hero,update=extra_data)session.add(db_hero)session.commit()session.refresh(db_hero)returndb_hero@app.get("/heroes/",response_model=list[HeroPublic])defread_heroes(offset:int=0,limit:int=Query(default=100,le=100)):withSession(engine)assession:heroes=session.exec(select(Hero).offset(offset).limit(limit)).all()returnheroes@app.get("/heroes/{hero_id}",response_model=HeroPublic)defread_hero(hero_id:int):withSession(engine)assession:hero=session.get(Hero,hero_id)ifnothero:raiseHTTPException(status_code=404,detail="Hero not found")returnhero@app.patch("/heroes/{hero_id}",response_model=HeroPublic)defupdate_hero(hero_id:int,hero:HeroUpdate):withSession(engine)assession:db_hero=session.get(Hero,hero_id)ifnotdb_hero:raiseHTTPException(status_code=404,detail="Hero not found")hero_data=hero.model_dump(exclude_unset=True)extra_data={}if"password"inhero_data:password=hero_data["password"]hashed_password=hash_password(password)extra_data["hashed_password"]=hashed_passworddb_hero.sqlmodel_update(hero_data,update=extra_data)session.add(db_hero)session.commit()session.refresh(db_hero)returndb_hero
fromtypingimportOptionalfromfastapiimportFastAPI,HTTPException,QueryfromsqlmodelimportField,Session,SQLModel,create_engine,selectclassHeroBase(SQLModel):name:str=Field(index=True)secret_name:strage:Optional[int]=Field(default=None,index=True)classHero(HeroBase,table=True):id:Optional[int]=Field(default=None,primary_key=True)hashed_password:str=Field()classHeroCreate(HeroBase):password:strclassHeroPublic(HeroBase):id:intclassHeroUpdate(SQLModel):name:Optional[str]=Nonesecret_name:Optional[str]=Noneage:Optional[int]=Nonepassword:Optional[str]=Nonesqlite_file_name="database.db"sqlite_url=f"sqlite:///{sqlite_file_name}"connect_args={"check_same_thread":False}engine=create_engine(sqlite_url,echo=True,connect_args=connect_args)defcreate_db_and_tables():SQLModel.metadata.create_all(engine)defhash_password(password:str)->str:# Use something like passlib herereturnf"not really hashed {password} hehehe"app=FastAPI()@app.on_event("startup")defon_startup():create_db_and_tables()@app.post("/heroes/",response_model=HeroPublic)defcreate_hero(hero:HeroCreate):hashed_password=hash_password(hero.password)withSession(engine)assession:extra_data={"hashed_password":hashed_password}db_hero=Hero.model_validate(hero,update=extra_data)session.add(db_hero)session.commit()session.refresh(db_hero)returndb_hero@app.get("/heroes/",response_model=list[HeroPublic])defread_heroes(offset:int=0,limit:int=Query(default=100,le=100)):withSession(engine)assession:heroes=session.exec(select(Hero).offset(offset).limit(limit)).all()returnheroes@app.get("/heroes/{hero_id}",response_model=HeroPublic)defread_hero(hero_id:int):withSession(engine)assession:hero=session.get(Hero,hero_id)ifnothero:raiseHTTPException(status_code=404,detail="Hero not found")returnhero@app.patch("/heroes/{hero_id}",response_model=HeroPublic)defupdate_hero(hero_id:int,hero:HeroUpdate):withSession(engine)assession:db_hero=session.get(Hero,hero_id)ifnotdb_hero:raiseHTTPException(status_code=404,detail="Hero not found")hero_data=hero.model_dump(exclude_unset=True)extra_data={}if"password"inhero_data:password=hero_data["password"]hashed_password=hash_password(password)extra_data["hashed_password"]=hashed_passworddb_hero.sqlmodel_update(hero_data,update=extra_data)session.add(db_hero)session.commit()session.refresh(db_hero)returndb_hero
fromtypingimportList,OptionalfromfastapiimportFastAPI,HTTPException,QueryfromsqlmodelimportField,Session,SQLModel,create_engine,selectclassHeroBase(SQLModel):name:str=Field(index=True)secret_name:strage:Optional[int]=Field(default=None,index=True)classHero(HeroBase,table=True):id:Optional[int]=Field(default=None,primary_key=True)hashed_password:str=Field()classHeroCreate(HeroBase):password:strclassHeroPublic(HeroBase):id:intclassHeroUpdate(SQLModel):name:Optional[str]=Nonesecret_name:Optional[str]=Noneage:Optional[int]=Nonepassword:Optional[str]=Nonesqlite_file_name="database.db"sqlite_url=f"sqlite:///{sqlite_file_name}"connect_args={"check_same_thread":False}engine=create_engine(sqlite_url,echo=True,connect_args=connect_args)defcreate_db_and_tables():SQLModel.metadata.create_all(engine)defhash_password(password:str)->str:# Use something like passlib herereturnf"not really hashed {password} hehehe"app=FastAPI()@app.on_event("startup")defon_startup():create_db_and_tables()@app.post("/heroes/",response_model=HeroPublic)defcreate_hero(hero:HeroCreate):hashed_password=hash_password(hero.password)withSession(engine)assession:extra_data={"hashed_password":hashed_password}db_hero=Hero.model_validate(hero,update=extra_data)session.add(db_hero)session.commit()session.refresh(db_hero)returndb_hero@app.get("/heroes/",response_model=List[HeroPublic])defread_heroes(offset:int=0,limit:int=Query(default=100,le=100)):withSession(engine)assession:heroes=session.exec(select(Hero).offset(offset).limit(limit)).all()returnheroes@app.get("/heroes/{hero_id}",response_model=HeroPublic)defread_hero(hero_id:int):withSession(engine)assession:hero=session.get(Hero,hero_id)ifnothero:raiseHTTPException(status_code=404,detail="Hero not found")returnhero@app.patch("/heroes/{hero_id}",response_model=HeroPublic)defupdate_hero(hero_id:int,hero:HeroUpdate):withSession(engine)assession:db_hero=session.get(Hero,hero_id)ifnotdb_hero:raiseHTTPException(status_code=404,detail="Hero not found")hero_data=hero.model_dump(exclude_unset=True)extra_data={}if"password"inhero_data:password=hero_data["password"]hashed_password=hash_password(password)extra_data["hashed_password"]=hashed_passworddb_hero.sqlmodel_update(hero_data,update=extra_data)session.add(db_hero)session.commit()session.refresh(db_hero)returndb_hero
When a client is creating a new hero, they will send the password in the request body.
And when they are updating a hero, they could also send the password in the request body to update it.
The app will receive the data from the client using the HeroCreate model.
This contains the password field with the plain text password, and we cannot use that one. So we need to generate a hash from it.
# Code above omitted 👆defhash_password(password:str)->str:# Use something like passlib herereturnf"not really hashed {password} hehehe"# Code here omitted 👈@app.post("/heroes/",response_model=HeroPublic)defcreate_hero(hero:HeroCreate):hashed_password=hash_password(hero.password)# Code below omitted 👇
# Code above omitted 👆defhash_password(password:str)->str:# Use something like passlib herereturnf"not really hashed {password} hehehe"# Code here omitted 👈@app.post("/heroes/",response_model=HeroPublic)defcreate_hero(hero:HeroCreate):hashed_password=hash_password(hero.password)# Code below omitted 👇
# Code above omitted 👆defhash_password(password:str)->str:# Use something like passlib herereturnf"not really hashed {password} hehehe"# Code here omitted 👈@app.post("/heroes/",response_model=HeroPublic)defcreate_hero(hero:HeroCreate):hashed_password=hash_password(hero.password)# Code below omitted 👇
👀 Full file preview
fromfastapiimportFastAPI,HTTPException,QueryfromsqlmodelimportField,Session,SQLModel,create_engine,selectclassHeroBase(SQLModel):name:str=Field(index=True)secret_name:strage:int|None=Field(default=None,index=True)classHero(HeroBase,table=True):id:int|None=Field(default=None,primary_key=True)hashed_password:str=Field()classHeroCreate(HeroBase):password:strclassHeroPublic(HeroBase):id:intclassHeroUpdate(SQLModel):name:str|None=Nonesecret_name:str|None=Noneage:int|None=Nonepassword:str|None=Nonesqlite_file_name="database.db"sqlite_url=f"sqlite:///{sqlite_file_name}"connect_args={"check_same_thread":False}engine=create_engine(sqlite_url,echo=True,connect_args=connect_args)defcreate_db_and_tables():SQLModel.metadata.create_all(engine)defhash_password(password:str)->str:# Use something like passlib herereturnf"not really hashed {password} hehehe"app=FastAPI()@app.on_event("startup")defon_startup():create_db_and_tables()@app.post("/heroes/",response_model=HeroPublic)defcreate_hero(hero:HeroCreate):hashed_password=hash_password(hero.password)withSession(engine)assession:extra_data={"hashed_password":hashed_password}db_hero=Hero.model_validate(hero,update=extra_data)session.add(db_hero)session.commit()session.refresh(db_hero)returndb_hero@app.get("/heroes/",response_model=list[HeroPublic])defread_heroes(offset:int=0,limit:int=Query(default=100,le=100)):withSession(engine)assession:heroes=session.exec(select(Hero).offset(offset).limit(limit)).all()returnheroes@app.get("/heroes/{hero_id}",response_model=HeroPublic)defread_hero(hero_id:int):withSession(engine)assession:hero=session.get(Hero,hero_id)ifnothero:raiseHTTPException(status_code=404,detail="Hero not found")returnhero@app.patch("/heroes/{hero_id}",response_model=HeroPublic)defupdate_hero(hero_id:int,hero:HeroUpdate):withSession(engine)assession:db_hero=session.get(Hero,hero_id)ifnotdb_hero:raiseHTTPException(status_code=404,detail="Hero not found")hero_data=hero.model_dump(exclude_unset=True)extra_data={}if"password"inhero_data:password=hero_data["password"]hashed_password=hash_password(password)extra_data["hashed_password"]=hashed_passworddb_hero.sqlmodel_update(hero_data,update=extra_data)session.add(db_hero)session.commit()session.refresh(db_hero)returndb_hero
fromtypingimportOptionalfromfastapiimportFastAPI,HTTPException,QueryfromsqlmodelimportField,Session,SQLModel,create_engine,selectclassHeroBase(SQLModel):name:str=Field(index=True)secret_name:strage:Optional[int]=Field(default=None,index=True)classHero(HeroBase,table=True):id:Optional[int]=Field(default=None,primary_key=True)hashed_password:str=Field()classHeroCreate(HeroBase):password:strclassHeroPublic(HeroBase):id:intclassHeroUpdate(SQLModel):name:Optional[str]=Nonesecret_name:Optional[str]=Noneage:Optional[int]=Nonepassword:Optional[str]=Nonesqlite_file_name="database.db"sqlite_url=f"sqlite:///{sqlite_file_name}"connect_args={"check_same_thread":False}engine=create_engine(sqlite_url,echo=True,connect_args=connect_args)defcreate_db_and_tables():SQLModel.metadata.create_all(engine)defhash_password(password:str)->str:# Use something like passlib herereturnf"not really hashed {password} hehehe"app=FastAPI()@app.on_event("startup")defon_startup():create_db_and_tables()@app.post("/heroes/",response_model=HeroPublic)defcreate_hero(hero:HeroCreate):hashed_password=hash_password(hero.password)withSession(engine)assession:extra_data={"hashed_password":hashed_password}db_hero=Hero.model_validate(hero,update=extra_data)session.add(db_hero)session.commit()session.refresh(db_hero)returndb_hero@app.get("/heroes/",response_model=list[HeroPublic])defread_heroes(offset:int=0,limit:int=Query(default=100,le=100)):withSession(engine)assession:heroes=session.exec(select(Hero).offset(offset).limit(limit)).all()returnheroes@app.get("/heroes/{hero_id}",response_model=HeroPublic)defread_hero(hero_id:int):withSession(engine)assession:hero=session.get(Hero,hero_id)ifnothero:raiseHTTPException(status_code=404,detail="Hero not found")returnhero@app.patch("/heroes/{hero_id}",response_model=HeroPublic)defupdate_hero(hero_id:int,hero:HeroUpdate):withSession(engine)assession:db_hero=session.get(Hero,hero_id)ifnotdb_hero:raiseHTTPException(status_code=404,detail="Hero not found")hero_data=hero.model_dump(exclude_unset=True)extra_data={}if"password"inhero_data:password=hero_data["password"]hashed_password=hash_password(password)extra_data["hashed_password"]=hashed_passworddb_hero.sqlmodel_update(hero_data,update=extra_data)session.add(db_hero)session.commit()session.refresh(db_hero)returndb_hero
fromtypingimportList,OptionalfromfastapiimportFastAPI,HTTPException,QueryfromsqlmodelimportField,Session,SQLModel,create_engine,selectclassHeroBase(SQLModel):name:str=Field(index=True)secret_name:strage:Optional[int]=Field(default=None,index=True)classHero(HeroBase,table=True):id:Optional[int]=Field(default=None,primary_key=True)hashed_password:str=Field()classHeroCreate(HeroBase):password:strclassHeroPublic(HeroBase):id:intclassHeroUpdate(SQLModel):name:Optional[str]=Nonesecret_name:Optional[str]=Noneage:Optional[int]=Nonepassword:Optional[str]=Nonesqlite_file_name="database.db"sqlite_url=f"sqlite:///{sqlite_file_name}"connect_args={"check_same_thread":False}engine=create_engine(sqlite_url,echo=True,connect_args=connect_args)defcreate_db_and_tables():SQLModel.metadata.create_all(engine)defhash_password(password:str)->str:# Use something like passlib herereturnf"not really hashed {password} hehehe"app=FastAPI()@app.on_event("startup")defon_startup():create_db_and_tables()@app.post("/heroes/",response_model=HeroPublic)defcreate_hero(hero:HeroCreate):hashed_password=hash_password(hero.password)withSession(engine)assession:extra_data={"hashed_password":hashed_password}db_hero=Hero.model_validate(hero,update=extra_data)session.add(db_hero)session.commit()session.refresh(db_hero)returndb_hero@app.get("/heroes/",response_model=List[HeroPublic])defread_heroes(offset:int=0,limit:int=Query(default=100,le=100)):withSession(engine)assession:heroes=session.exec(select(Hero).offset(offset).limit(limit)).all()returnheroes@app.get("/heroes/{hero_id}",response_model=HeroPublic)defread_hero(hero_id:int):withSession(engine)assession:hero=session.get(Hero,hero_id)ifnothero:raiseHTTPException(status_code=404,detail="Hero not found")returnhero@app.patch("/heroes/{hero_id}",response_model=HeroPublic)defupdate_hero(hero_id:int,hero:HeroUpdate):withSession(engine)assession:db_hero=session.get(Hero,hero_id)ifnotdb_hero:raiseHTTPException(status_code=404,detail="Hero not found")hero_data=hero.model_dump(exclude_unset=True)extra_data={}if"password"inhero_data:password=hero_data["password"]hashed_password=hash_password(password)extra_data["hashed_password"]=hashed_passworddb_hero.sqlmodel_update(hero_data,update=extra_data)session.add(db_hero)session.commit()session.refresh(db_hero)returndb_hero
Let's pause for a second to check this, when working with dictionaries, there's a way to update a dictionary with extra data from another dictionary, something like this:
Similar to how dictionaries have an update method, SQLModel models have a parameter update in Hero.model_validate() that takes a dictionary with extra data, or data that should take precedence:
fromfastapiimportFastAPI,HTTPException,QueryfromsqlmodelimportField,Session,SQLModel,create_engine,selectclassHeroBase(SQLModel):name:str=Field(index=True)secret_name:strage:int|None=Field(default=None,index=True)classHero(HeroBase,table=True):id:int|None=Field(default=None,primary_key=True)hashed_password:str=Field()classHeroCreate(HeroBase):password:strclassHeroPublic(HeroBase):id:intclassHeroUpdate(SQLModel):name:str|None=Nonesecret_name:str|None=Noneage:int|None=Nonepassword:str|None=Nonesqlite_file_name="database.db"sqlite_url=f"sqlite:///{sqlite_file_name}"connect_args={"check_same_thread":False}engine=create_engine(sqlite_url,echo=True,connect_args=connect_args)defcreate_db_and_tables():SQLModel.metadata.create_all(engine)defhash_password(password:str)->str:# Use something like passlib herereturnf"not really hashed {password} hehehe"app=FastAPI()@app.on_event("startup")defon_startup():create_db_and_tables()@app.post("/heroes/",response_model=HeroPublic)defcreate_hero(hero:HeroCreate):hashed_password=hash_password(hero.password)withSession(engine)assession:extra_data={"hashed_password":hashed_password}db_hero=Hero.model_validate(hero,update=extra_data)session.add(db_hero)session.commit()session.refresh(db_hero)returndb_hero@app.get("/heroes/",response_model=list[HeroPublic])defread_heroes(offset:int=0,limit:int=Query(default=100,le=100)):withSession(engine)assession:heroes=session.exec(select(Hero).offset(offset).limit(limit)).all()returnheroes@app.get("/heroes/{hero_id}",response_model=HeroPublic)defread_hero(hero_id:int):withSession(engine)assession:hero=session.get(Hero,hero_id)ifnothero:raiseHTTPException(status_code=404,detail="Hero not found")returnhero@app.patch("/heroes/{hero_id}",response_model=HeroPublic)defupdate_hero(hero_id:int,hero:HeroUpdate):withSession(engine)assession:db_hero=session.get(Hero,hero_id)ifnotdb_hero:raiseHTTPException(status_code=404,detail="Hero not found")hero_data=hero.model_dump(exclude_unset=True)extra_data={}if"password"inhero_data:password=hero_data["password"]hashed_password=hash_password(password)extra_data["hashed_password"]=hashed_passworddb_hero.sqlmodel_update(hero_data,update=extra_data)session.add(db_hero)session.commit()session.refresh(db_hero)returndb_hero
fromtypingimportOptionalfromfastapiimportFastAPI,HTTPException,QueryfromsqlmodelimportField,Session,SQLModel,create_engine,selectclassHeroBase(SQLModel):name:str=Field(index=True)secret_name:strage:Optional[int]=Field(default=None,index=True)classHero(HeroBase,table=True):id:Optional[int]=Field(default=None,primary_key=True)hashed_password:str=Field()classHeroCreate(HeroBase):password:strclassHeroPublic(HeroBase):id:intclassHeroUpdate(SQLModel):name:Optional[str]=Nonesecret_name:Optional[str]=Noneage:Optional[int]=Nonepassword:Optional[str]=Nonesqlite_file_name="database.db"sqlite_url=f"sqlite:///{sqlite_file_name}"connect_args={"check_same_thread":False}engine=create_engine(sqlite_url,echo=True,connect_args=connect_args)defcreate_db_and_tables():SQLModel.metadata.create_all(engine)defhash_password(password:str)->str:# Use something like passlib herereturnf"not really hashed {password} hehehe"app=FastAPI()@app.on_event("startup")defon_startup():create_db_and_tables()@app.post("/heroes/",response_model=HeroPublic)defcreate_hero(hero:HeroCreate):hashed_password=hash_password(hero.password)withSession(engine)assession:extra_data={"hashed_password":hashed_password}db_hero=Hero.model_validate(hero,update=extra_data)session.add(db_hero)session.commit()session.refresh(db_hero)returndb_hero@app.get("/heroes/",response_model=list[HeroPublic])defread_heroes(offset:int=0,limit:int=Query(default=100,le=100)):withSession(engine)assession:heroes=session.exec(select(Hero).offset(offset).limit(limit)).all()returnheroes@app.get("/heroes/{hero_id}",response_model=HeroPublic)defread_hero(hero_id:int):withSession(engine)assession:hero=session.get(Hero,hero_id)ifnothero:raiseHTTPException(status_code=404,detail="Hero not found")returnhero@app.patch("/heroes/{hero_id}",response_model=HeroPublic)defupdate_hero(hero_id:int,hero:HeroUpdate):withSession(engine)assession:db_hero=session.get(Hero,hero_id)ifnotdb_hero:raiseHTTPException(status_code=404,detail="Hero not found")hero_data=hero.model_dump(exclude_unset=True)extra_data={}if"password"inhero_data:password=hero_data["password"]hashed_password=hash_password(password)extra_data["hashed_password"]=hashed_passworddb_hero.sqlmodel_update(hero_data,update=extra_data)session.add(db_hero)session.commit()session.refresh(db_hero)returndb_hero
fromtypingimportList,OptionalfromfastapiimportFastAPI,HTTPException,QueryfromsqlmodelimportField,Session,SQLModel,create_engine,selectclassHeroBase(SQLModel):name:str=Field(index=True)secret_name:strage:Optional[int]=Field(default=None,index=True)classHero(HeroBase,table=True):id:Optional[int]=Field(default=None,primary_key=True)hashed_password:str=Field()classHeroCreate(HeroBase):password:strclassHeroPublic(HeroBase):id:intclassHeroUpdate(SQLModel):name:Optional[str]=Nonesecret_name:Optional[str]=Noneage:Optional[int]=Nonepassword:Optional[str]=Nonesqlite_file_name="database.db"sqlite_url=f"sqlite:///{sqlite_file_name}"connect_args={"check_same_thread":False}engine=create_engine(sqlite_url,echo=True,connect_args=connect_args)defcreate_db_and_tables():SQLModel.metadata.create_all(engine)defhash_password(password:str)->str:# Use something like passlib herereturnf"not really hashed {password} hehehe"app=FastAPI()@app.on_event("startup")defon_startup():create_db_and_tables()@app.post("/heroes/",response_model=HeroPublic)defcreate_hero(hero:HeroCreate):hashed_password=hash_password(hero.password)withSession(engine)assession:extra_data={"hashed_password":hashed_password}db_hero=Hero.model_validate(hero,update=extra_data)session.add(db_hero)session.commit()session.refresh(db_hero)returndb_hero@app.get("/heroes/",response_model=List[HeroPublic])defread_heroes(offset:int=0,limit:int=Query(default=100,le=100)):withSession(engine)assession:heroes=session.exec(select(Hero).offset(offset).limit(limit)).all()returnheroes@app.get("/heroes/{hero_id}",response_model=HeroPublic)defread_hero(hero_id:int):withSession(engine)assession:hero=session.get(Hero,hero_id)ifnothero:raiseHTTPException(status_code=404,detail="Hero not found")returnhero@app.patch("/heroes/{hero_id}",response_model=HeroPublic)defupdate_hero(hero_id:int,hero:HeroUpdate):withSession(engine)assession:db_hero=session.get(Hero,hero_id)ifnotdb_hero:raiseHTTPException(status_code=404,detail="Hero not found")hero_data=hero.model_dump(exclude_unset=True)extra_data={}if"password"inhero_data:password=hero_data["password"]hashed_password=hash_password(password)extra_data["hashed_password"]=hashed_passworddb_hero.sqlmodel_update(hero_data,update=extra_data)session.add(db_hero)session.commit()session.refresh(db_hero)returndb_hero
Now, db_hero (which is a table modelHero) will extract its values from hero (which is a data modelHeroCreate), and then it will update its values with the extra data from the dictionary extra_data.
It will only take the fields defined in Hero, so it will not take the password from HeroCreate. And it will also take its values from the dictionary passed to the update parameter, in this case, the hashed_password.
If there's a field in both hero and the extra_data, the value from the extra_data passed to update will take precedence.
Now let's say we want to update a hero that already exists in the database.
The same way as before, to avoid removing existing data, we will use exclude_unset=True when calling hero.model_dump(), to get a dictionary with only the data sent by the client.
fromfastapiimportFastAPI,HTTPException,QueryfromsqlmodelimportField,Session,SQLModel,create_engine,selectclassHeroBase(SQLModel):name:str=Field(index=True)secret_name:strage:int|None=Field(default=None,index=True)classHero(HeroBase,table=True):id:int|None=Field(default=None,primary_key=True)hashed_password:str=Field()classHeroCreate(HeroBase):password:strclassHeroPublic(HeroBase):id:intclassHeroUpdate(SQLModel):name:str|None=Nonesecret_name:str|None=Noneage:int|None=Nonepassword:str|None=Nonesqlite_file_name="database.db"sqlite_url=f"sqlite:///{sqlite_file_name}"connect_args={"check_same_thread":False}engine=create_engine(sqlite_url,echo=True,connect_args=connect_args)defcreate_db_and_tables():SQLModel.metadata.create_all(engine)defhash_password(password:str)->str:# Use something like passlib herereturnf"not really hashed {password} hehehe"app=FastAPI()@app.on_event("startup")defon_startup():create_db_and_tables()@app.post("/heroes/",response_model=HeroPublic)defcreate_hero(hero:HeroCreate):hashed_password=hash_password(hero.password)withSession(engine)assession:extra_data={"hashed_password":hashed_password}db_hero=Hero.model_validate(hero,update=extra_data)session.add(db_hero)session.commit()session.refresh(db_hero)returndb_hero@app.get("/heroes/",response_model=list[HeroPublic])defread_heroes(offset:int=0,limit:int=Query(default=100,le=100)):withSession(engine)assession:heroes=session.exec(select(Hero).offset(offset).limit(limit)).all()returnheroes@app.get("/heroes/{hero_id}",response_model=HeroPublic)defread_hero(hero_id:int):withSession(engine)assession:hero=session.get(Hero,hero_id)ifnothero:raiseHTTPException(status_code=404,detail="Hero not found")returnhero@app.patch("/heroes/{hero_id}",response_model=HeroPublic)defupdate_hero(hero_id:int,hero:HeroUpdate):withSession(engine)assession:db_hero=session.get(Hero,hero_id)ifnotdb_hero:raiseHTTPException(status_code=404,detail="Hero not found")hero_data=hero.model_dump(exclude_unset=True)extra_data={}if"password"inhero_data:password=hero_data["password"]hashed_password=hash_password(password)extra_data["hashed_password"]=hashed_passworddb_hero.sqlmodel_update(hero_data,update=extra_data)session.add(db_hero)session.commit()session.refresh(db_hero)returndb_hero
fromtypingimportOptionalfromfastapiimportFastAPI,HTTPException,QueryfromsqlmodelimportField,Session,SQLModel,create_engine,selectclassHeroBase(SQLModel):name:str=Field(index=True)secret_name:strage:Optional[int]=Field(default=None,index=True)classHero(HeroBase,table=True):id:Optional[int]=Field(default=None,primary_key=True)hashed_password:str=Field()classHeroCreate(HeroBase):password:strclassHeroPublic(HeroBase):id:intclassHeroUpdate(SQLModel):name:Optional[str]=Nonesecret_name:Optional[str]=Noneage:Optional[int]=Nonepassword:Optional[str]=Nonesqlite_file_name="database.db"sqlite_url=f"sqlite:///{sqlite_file_name}"connect_args={"check_same_thread":False}engine=create_engine(sqlite_url,echo=True,connect_args=connect_args)defcreate_db_and_tables():SQLModel.metadata.create_all(engine)defhash_password(password:str)->str:# Use something like passlib herereturnf"not really hashed {password} hehehe"app=FastAPI()@app.on_event("startup")defon_startup():create_db_and_tables()@app.post("/heroes/",response_model=HeroPublic)defcreate_hero(hero:HeroCreate):hashed_password=hash_password(hero.password)withSession(engine)assession:extra_data={"hashed_password":hashed_password}db_hero=Hero.model_validate(hero,update=extra_data)session.add(db_hero)session.commit()session.refresh(db_hero)returndb_hero@app.get("/heroes/",response_model=list[HeroPublic])defread_heroes(offset:int=0,limit:int=Query(default=100,le=100)):withSession(engine)assession:heroes=session.exec(select(Hero).offset(offset).limit(limit)).all()returnheroes@app.get("/heroes/{hero_id}",response_model=HeroPublic)defread_hero(hero_id:int):withSession(engine)assession:hero=session.get(Hero,hero_id)ifnothero:raiseHTTPException(status_code=404,detail="Hero not found")returnhero@app.patch("/heroes/{hero_id}",response_model=HeroPublic)defupdate_hero(hero_id:int,hero:HeroUpdate):withSession(engine)assession:db_hero=session.get(Hero,hero_id)ifnotdb_hero:raiseHTTPException(status_code=404,detail="Hero not found")hero_data=hero.model_dump(exclude_unset=True)extra_data={}if"password"inhero_data:password=hero_data["password"]hashed_password=hash_password(password)extra_data["hashed_password"]=hashed_passworddb_hero.sqlmodel_update(hero_data,update=extra_data)session.add(db_hero)session.commit()session.refresh(db_hero)returndb_hero
fromtypingimportList,OptionalfromfastapiimportFastAPI,HTTPException,QueryfromsqlmodelimportField,Session,SQLModel,create_engine,selectclassHeroBase(SQLModel):name:str=Field(index=True)secret_name:strage:Optional[int]=Field(default=None,index=True)classHero(HeroBase,table=True):id:Optional[int]=Field(default=None,primary_key=True)hashed_password:str=Field()classHeroCreate(HeroBase):password:strclassHeroPublic(HeroBase):id:intclassHeroUpdate(SQLModel):name:Optional[str]=Nonesecret_name:Optional[str]=Noneage:Optional[int]=Nonepassword:Optional[str]=Nonesqlite_file_name="database.db"sqlite_url=f"sqlite:///{sqlite_file_name}"connect_args={"check_same_thread":False}engine=create_engine(sqlite_url,echo=True,connect_args=connect_args)defcreate_db_and_tables():SQLModel.metadata.create_all(engine)defhash_password(password:str)->str:# Use something like passlib herereturnf"not really hashed {password} hehehe"app=FastAPI()@app.on_event("startup")defon_startup():create_db_and_tables()@app.post("/heroes/",response_model=HeroPublic)defcreate_hero(hero:HeroCreate):hashed_password=hash_password(hero.password)withSession(engine)assession:extra_data={"hashed_password":hashed_password}db_hero=Hero.model_validate(hero,update=extra_data)session.add(db_hero)session.commit()session.refresh(db_hero)returndb_hero@app.get("/heroes/",response_model=List[HeroPublic])defread_heroes(offset:int=0,limit:int=Query(default=100,le=100)):withSession(engine)assession:heroes=session.exec(select(Hero).offset(offset).limit(limit)).all()returnheroes@app.get("/heroes/{hero_id}",response_model=HeroPublic)defread_hero(hero_id:int):withSession(engine)assession:hero=session.get(Hero,hero_id)ifnothero:raiseHTTPException(status_code=404,detail="Hero not found")returnhero@app.patch("/heroes/{hero_id}",response_model=HeroPublic)defupdate_hero(hero_id:int,hero:HeroUpdate):withSession(engine)assession:db_hero=session.get(Hero,hero_id)ifnotdb_hero:raiseHTTPException(status_code=404,detail="Hero not found")hero_data=hero.model_dump(exclude_unset=True)extra_data={}if"password"inhero_data:password=hero_data["password"]hashed_password=hash_password(password)extra_data["hashed_password"]=hashed_passworddb_hero.sqlmodel_update(hero_data,update=extra_data)session.add(db_hero)session.commit()session.refresh(db_hero)returndb_hero
Now, this hero_data dictionary could contain a password. We need to check it, and if it's there, we need to generate the hashed_password.
Then we can put that hashed_password in a dictionary.
And then we can update the db_hero object using the method db_hero.sqlmodel_update().
It takes a model object or dictionary with the data to update the object and also an additional update argument with extra data.
fromfastapiimportFastAPI,HTTPException,QueryfromsqlmodelimportField,Session,SQLModel,create_engine,selectclassHeroBase(SQLModel):name:str=Field(index=True)secret_name:strage:int|None=Field(default=None,index=True)classHero(HeroBase,table=True):id:int|None=Field(default=None,primary_key=True)hashed_password:str=Field()classHeroCreate(HeroBase):password:strclassHeroPublic(HeroBase):id:intclassHeroUpdate(SQLModel):name:str|None=Nonesecret_name:str|None=Noneage:int|None=Nonepassword:str|None=Nonesqlite_file_name="database.db"sqlite_url=f"sqlite:///{sqlite_file_name}"connect_args={"check_same_thread":False}engine=create_engine(sqlite_url,echo=True,connect_args=connect_args)defcreate_db_and_tables():SQLModel.metadata.create_all(engine)defhash_password(password:str)->str:# Use something like passlib herereturnf"not really hashed {password} hehehe"app=FastAPI()@app.on_event("startup")defon_startup():create_db_and_tables()@app.post("/heroes/",response_model=HeroPublic)defcreate_hero(hero:HeroCreate):hashed_password=hash_password(hero.password)withSession(engine)assession:extra_data={"hashed_password":hashed_password}db_hero=Hero.model_validate(hero,update=extra_data)session.add(db_hero)session.commit()session.refresh(db_hero)returndb_hero@app.get("/heroes/",response_model=list[HeroPublic])defread_heroes(offset:int=0,limit:int=Query(default=100,le=100)):withSession(engine)assession:heroes=session.exec(select(Hero).offset(offset).limit(limit)).all()returnheroes@app.get("/heroes/{hero_id}",response_model=HeroPublic)defread_hero(hero_id:int):withSession(engine)assession:hero=session.get(Hero,hero_id)ifnothero:raiseHTTPException(status_code=404,detail="Hero not found")returnhero@app.patch("/heroes/{hero_id}",response_model=HeroPublic)defupdate_hero(hero_id:int,hero:HeroUpdate):withSession(engine)assession:db_hero=session.get(Hero,hero_id)ifnotdb_hero:raiseHTTPException(status_code=404,detail="Hero not found")hero_data=hero.model_dump(exclude_unset=True)extra_data={}if"password"inhero_data:password=hero_data["password"]hashed_password=hash_password(password)extra_data["hashed_password"]=hashed_passworddb_hero.sqlmodel_update(hero_data,update=extra_data)session.add(db_hero)session.commit()session.refresh(db_hero)returndb_hero
fromtypingimportOptionalfromfastapiimportFastAPI,HTTPException,QueryfromsqlmodelimportField,Session,SQLModel,create_engine,selectclassHeroBase(SQLModel):name:str=Field(index=True)secret_name:strage:Optional[int]=Field(default=None,index=True)classHero(HeroBase,table=True):id:Optional[int]=Field(default=None,primary_key=True)hashed_password:str=Field()classHeroCreate(HeroBase):password:strclassHeroPublic(HeroBase):id:intclassHeroUpdate(SQLModel):name:Optional[str]=Nonesecret_name:Optional[str]=Noneage:Optional[int]=Nonepassword:Optional[str]=Nonesqlite_file_name="database.db"sqlite_url=f"sqlite:///{sqlite_file_name}"connect_args={"check_same_thread":False}engine=create_engine(sqlite_url,echo=True,connect_args=connect_args)defcreate_db_and_tables():SQLModel.metadata.create_all(engine)defhash_password(password:str)->str:# Use something like passlib herereturnf"not really hashed {password} hehehe"app=FastAPI()@app.on_event("startup")defon_startup():create_db_and_tables()@app.post("/heroes/",response_model=HeroPublic)defcreate_hero(hero:HeroCreate):hashed_password=hash_password(hero.password)withSession(engine)assession:extra_data={"hashed_password":hashed_password}db_hero=Hero.model_validate(hero,update=extra_data)session.add(db_hero)session.commit()session.refresh(db_hero)returndb_hero@app.get("/heroes/",response_model=list[HeroPublic])defread_heroes(offset:int=0,limit:int=Query(default=100,le=100)):withSession(engine)assession:heroes=session.exec(select(Hero).offset(offset).limit(limit)).all()returnheroes@app.get("/heroes/{hero_id}",response_model=HeroPublic)defread_hero(hero_id:int):withSession(engine)assession:hero=session.get(Hero,hero_id)ifnothero:raiseHTTPException(status_code=404,detail="Hero not found")returnhero@app.patch("/heroes/{hero_id}",response_model=HeroPublic)defupdate_hero(hero_id:int,hero:HeroUpdate):withSession(engine)assession:db_hero=session.get(Hero,hero_id)ifnotdb_hero:raiseHTTPException(status_code=404,detail="Hero not found")hero_data=hero.model_dump(exclude_unset=True)extra_data={}if"password"inhero_data:password=hero_data["password"]hashed_password=hash_password(password)extra_data["hashed_password"]=hashed_passworddb_hero.sqlmodel_update(hero_data,update=extra_data)session.add(db_hero)session.commit()session.refresh(db_hero)returndb_hero
fromtypingimportList,OptionalfromfastapiimportFastAPI,HTTPException,QueryfromsqlmodelimportField,Session,SQLModel,create_engine,selectclassHeroBase(SQLModel):name:str=Field(index=True)secret_name:strage:Optional[int]=Field(default=None,index=True)classHero(HeroBase,table=True):id:Optional[int]=Field(default=None,primary_key=True)hashed_password:str=Field()classHeroCreate(HeroBase):password:strclassHeroPublic(HeroBase):id:intclassHeroUpdate(SQLModel):name:Optional[str]=Nonesecret_name:Optional[str]=Noneage:Optional[int]=Nonepassword:Optional[str]=Nonesqlite_file_name="database.db"sqlite_url=f"sqlite:///{sqlite_file_name}"connect_args={"check_same_thread":False}engine=create_engine(sqlite_url,echo=True,connect_args=connect_args)defcreate_db_and_tables():SQLModel.metadata.create_all(engine)defhash_password(password:str)->str:# Use something like passlib herereturnf"not really hashed {password} hehehe"app=FastAPI()@app.on_event("startup")defon_startup():create_db_and_tables()@app.post("/heroes/",response_model=HeroPublic)defcreate_hero(hero:HeroCreate):hashed_password=hash_password(hero.password)withSession(engine)assession:extra_data={"hashed_password":hashed_password}db_hero=Hero.model_validate(hero,update=extra_data)session.add(db_hero)session.commit()session.refresh(db_hero)returndb_hero@app.get("/heroes/",response_model=List[HeroPublic])defread_heroes(offset:int=0,limit:int=Query(default=100,le=100)):withSession(engine)assession:heroes=session.exec(select(Hero).offset(offset).limit(limit)).all()returnheroes@app.get("/heroes/{hero_id}",response_model=HeroPublic)defread_hero(hero_id:int):withSession(engine)assession:hero=session.get(Hero,hero_id)ifnothero:raiseHTTPException(status_code=404,detail="Hero not found")returnhero@app.patch("/heroes/{hero_id}",response_model=HeroPublic)defupdate_hero(hero_id:int,hero:HeroUpdate):withSession(engine)assession:db_hero=session.get(Hero,hero_id)ifnotdb_hero:raiseHTTPException(status_code=404,detail="Hero not found")hero_data=hero.model_dump(exclude_unset=True)extra_data={}if"password"inhero_data:password=hero_data["password"]hashed_password=hash_password(password)extra_data["hashed_password"]=hashed_passworddb_hero.sqlmodel_update(hero_data,update=extra_data)session.add(db_hero)session.commit()session.refresh(db_hero)returndb_hero
Tip
The method db_hero.sqlmodel_update() was added in SQLModel 0.0.16. 😎
You can use the update parameter in Hero.model_validate() to provide extra data when creating a new object and Hero.sqlmodel_update() to provide extra data when updating an existing object. 🤓