๋ณธ๋ฌธ ๋ฐ”๋กœ๊ฐ€๊ธฐ

Back-end & Server/FastAPI

[FastAPI] Background Task

728x90
๋ฐ˜์‘ํ˜•

 

๐Ÿฐ FastAPI ๊ณต์‹๋ฌธ์„œ๋ฅผ ๋ณด๋ฉด์„œ ๊ฐœ์ธ์ ์œผ๋กœ ์ •๋ฆฌํ•œ ๊ธ€ ์ž…๋‹ˆ๋‹ค.

 

FastAPI์—์„œ Back Ground Task๋Š” ๋ฌด๊ฑฐ์šด ์ž‘์—…์„ Back Ground๋กœ ๋Œ๋ ค์„œ ์ฒ˜๋ฆฌํ•  ๋•Œ ์‚ฌ์šฉํ•œ๋‹ค.

ex. ์ž‘์—… ์ˆ˜ํ–‰ํ›„ ์ด๋ฉ”์ผ ์ „์†ก, ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ

์ด๋Ÿฌํ•œ ์ž‘์—…๋“ค์€ ๋ณ„๋„๋กœ ์ฒœ์ฒœํžˆ ์ฒ˜๋ฆฌํ•ด๋„ ๋˜๋Š” ์ž‘์—…๋“ค์—๊ฒŒ ์‚ฌ์šฉํ•œ๋‹ค.

 

* ์—„์ฒญ ๋ฌด๊ฑฐ์šด ์ž‘์—…๋“ค์€ ์™„์ „ ๋ณ„๋„(Multi Processing)๋กœ ์ฒ˜๋ฆฌํ•˜๋Š” ๊ฒƒ์ด ์ข‹๋‹ค.

 

 

์‚ฌ์šฉ๋ฐฉ๋ฒ•์€ fastapi์— BackgroundTasks ๊ฐœ์ฒด์— add_task()์— ์ž‘์—…(ํ•จ์ˆ˜)์„ ๋งค๊ฐœ๋ณ€์ˆ˜๋กœ ์ „๋‹ฌํ•œ๋‹ค.

 

๋ฐฑ๊ทธ๋ผ์šด๋“œ ์ž‘์—…์— ๋Œ€ํ•œ ์˜์กด์„ฑ๋„ ์ถ”๊ฐ€ํ•  ์ˆ˜ ์žˆ๋‹ค.

from typing import Union

from fastapi import BackgroundTasks, Depends, FastAPI

app = FastAPI()


def write_log(message: str):
    with open("log.txt", mode="a") as log:
        log.write(message)


def get_query(background_tasks: BackgroundTasks, q: Union[str, None] = None):
    if q:
        message = f"found query: {q}\n"
        background_tasks.add_task(write_log, message)
    return q


@app.post("/send-notification/{email}")
async def send_notification(
    email: str, background_tasks: BackgroundTasks, q: str = Depends(get_query)
):
    message = f"message to {email}\n"
    background_tasks.add_task(write_log, message)
    return {"message": "Message sent"}

 

๋ฉ”์‹œ์ง€๋ฅผ ๋ณด๋‚ด๋ฉด background ์ž‘์—…์— ๋Œ€ํ•œ log๊ฐ€ ์ €์žฅ๋œ๋‹ค.

 

 

 

728x90
๋ฐ˜์‘ํ˜•

'Back-end & Server > FastAPI' ์นดํ…Œ๊ณ ๋ฆฌ์˜ ๋‹ค๋ฅธ ๊ธ€

[FastAPI] Metadata Docs, Testing, Debugging  (0) 2024.02.24
[FastAPI] Template  (0) 2024.02.23
[FastAPI] ํŒŒ์ผ ๋ถ„ํ•   (0) 2024.02.23
[FastAPI] Relational Database  (0) 2024.02.22
[FastAPI] ๋ฏธ๋“ค์›จ์–ด  (0) 2024.02.20