728x90
๋ฐ์ํ

๐ฐ FastAPI ๊ณต์๋ฌธ์๋ฅผ ๋ณด๋ฉด์ ๊ฐ์ธ์ ์ผ๋ก ์ ๋ฆฌํ ๊ธ ์
๋๋ค.
FastAPI์์ Back Ground Task๋ ๋ฌด๊ฑฐ์ด ์์ ์ Back Ground๋ก ๋๋ ค์ ์ฒ๋ฆฌํ ๋ ์ฌ์ฉํ๋ค.
ex. ์์ ์ํํ ์ด๋ฉ์ผ ์ ์ก, ๋ฐ์ดํฐ ์ฒ๋ฆฌ
์ด๋ฌํ ์์ ๋ค์ ๋ณ๋๋ก ์ฒ์ฒํ ์ฒ๋ฆฌํด๋ ๋๋ ์์ ๋ค์๊ฒ ์ฌ์ฉํ๋ค.
* ์์ฒญ ๋ฌด๊ฑฐ์ด ์์ ๋ค์ ์์ ๋ณ๋(Multi Processing)๋ก ์ฒ๋ฆฌํ๋ ๊ฒ์ด ์ข๋ค.
์ฌ์ฉ๋ฐฉ๋ฒ์ fastapi์ BackgroundTasks ๊ฐ์ฒด์ add_task()์ ์์ (ํจ์)์ ๋งค๊ฐ๋ณ์๋ก ์ ๋ฌํ๋ค.
๋ฐฑ๊ทธ๋ผ์ด๋ ์์ ์ ๋ํ ์์กด์ฑ๋ ์ถ๊ฐํ ์ ์๋ค.
from typing import Union
from fastapi import BackgroundTasks, Depends, FastAPI
app = FastAPI()
def write_log(message: str):
with open("log.txt", mode="a") as log:
log.write(message)
def get_query(background_tasks: BackgroundTasks, q: Union[str, None] = None):
if q:
message = f"found query: {q}\n"
background_tasks.add_task(write_log, message)
return q
@app.post("/send-notification/{email}")
async def send_notification(
email: str, background_tasks: BackgroundTasks, q: str = Depends(get_query)
):
message = f"message to {email}\n"
background_tasks.add_task(write_log, message)
return {"message": "Message sent"}

๋ฉ์์ง๋ฅผ ๋ณด๋ด๋ฉด background ์์ ์ ๋ํ log๊ฐ ์ ์ฅ๋๋ค.

728x90
๋ฐ์ํ
'Back-end & Server > FastAPI' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[FastAPI] Metadata Docs, Testing, Debugging (0) | 2024.02.24 |
---|---|
[FastAPI] Template (0) | 2024.02.23 |
[FastAPI] ํ์ผ ๋ถํ (0) | 2024.02.23 |
[FastAPI] Relational Database (0) | 2024.02.22 |
[FastAPI] ๋ฏธ๋ค์จ์ด (0) | 2024.02.20 |