티스토리 뷰
회사에서 Prefect 를 쓸 일이 생겼다. 본격적인 업무 투입 전 공식문서를 읽고 개념들을 파악해보자.
지난 시간에는 Flow의 개념에 대해서 파악했고, 오늘 알아볼 것은 Task이다.
Tasks
Def) tasks are atomic units of work with transactional semantics
flow 와 비슷하게 decorator 를 사용해서 정의한다
from prefect import task
@task(log_prints=True)
def explain_tasks():
print("run any python code here!")
print("but maybe just a little bit")
주로 Flow 보다 작은 기능들을 커버할 때 사용된다(thread 같은 거라고 보면 됨)
일반적으로 flow 처럼 python function 이랑 동작하는게 매우 비슷하나 다음의 특징을 가진다.
일단 flow 와 동일한 특징은 다음과 같다.
- metadata (state 포함) 관리됨
- 실패 시 retry (횟수 조정 가능)
- timeout 을 설정할 수 있다.
flow 에는 없는 특징은 다음과 같다.
- 각 task 가 특정 state 에 진입할 때마다 기록됨 ⇒ state based logic 가능
- upstream task 가 뿌려지는 future 를 downstream task 에서 resolve 가능함
- cachable ⇒ result 의 reuse 가 가능하다
- concurrency ⇒ .submit(), .map()
- background _task ⇒ .serve(), .delay()
- task_key (hash composed of the task_name and fqdn of the function) 로 식별됨
여기서 중요한 것만 뽑아보면 아마 다음과 같을 것이다.
- concurrency (.submit, .map)
- background_task (.serve, .delay)
- cachable
task 의 생명 주기

flow 처럼 task는 각각의 state lifecycle 을 갖는다
참고로 .delay() 를 사용해서 task 를 background 로 돌리고자 할 경우, “Scheduled” 라는 state 를 처음에 가짐
task 를 실행시키는 방법
간단히 요약하자면, 다음의 3가지 방법으로 실행시킬 수 있음
- 그냥 함수 호출 ⇒ BLOCKING 발생하므로 권장되지 않음
- task decorator 에서는 call 을 따로 override 하지 않음
- task_runner 에 .submit() 혹은 .map() ⇒ concurrent 하게 돌리는 방법
- task_worker 에 delay() 혹은 serve() ⇒ background로 돌리는 방법
Concurrent Task
다시 짚고 넘어가자면, FLOW는 어디까지나 단위임
세부적인 runtime 단을 handling 할 때는 task를 조작해서 움직임
이제 그럼 이걸 어떻게 concurrent 하게 돌리는지 알아보자
.submit() 을 사용하는 방법
기본적으로, FLOW 안에서 task 들을 .submit() 하면 task_runner 에 전달되어 concurrent 하게 동작함
import time
from prefect import flow, task
from prefect.futures import wait
@task
def stop_at_floor(floor: int):
print(f"elevator moving to floor {floor}")
time.sleep(floor)
print(f"elevator stops on floor {floor}")
@flow
def elevator():
floors = []
for floor in range(10, 0, -1):
floors.append(stop_at_floor.submit(floor))
wait(floors)
이렇게 하면 “기본적으로” ThreadPoolTaskRunner” 라는 task_runner에 Task 를 던지고 거기서 실행하도록 한다
- thread pool executor 를 사용하는 것과 동일함
submit 한 task 가 돌아갈 runner 종류마다 어느정도 주의가 필요하다.
일단 이 runner 에서는 Thread-Safe 한 task 만이 가능하다는 점만 짚고 넘어가자
https://docs.prefect.io/v3/concepts/task-runners#design-considerations
Task runners - Prefect
Learn about task runners for concurrent, parallel or distributed execution of tasks.
docs.prefect.io
다른 task_runner 에 돌리려면? ⇒ flow 단에서 지정 필요
이때 의존성을 설치해줘야 한다. 예를 들면 아래의 코드에서는 prefect[dask] 를 설치해줘야 한다
import time
from prefect import flow, task
from prefect.futures import wait
from prefect_dask.task_runners import DaskTaskRunner
@task
def stop_at_floor(floor: int):
print(f"elevator moving to floor {floor}")
time.sleep(floor)
print(f"elevator stops on floor {floor}")
@flow(task_runner=DaskTaskRunner())
def elevator():
floors = []
for floor in range(10, 0, -1):
floors.append(stop_at_floor.submit(floor))
wait(floors)
handling futures
task 는 항상 future 임. 그러니까 a = task() 이렇게 하면 PrefectFuture 이런게 a 로 받아짐
그래서 그 task 가 실제로 return 할 것이라고 기대되는 무언가를 받으려면 .result() 로 접근 필요
그럼 이게 끝날때까지 기다리고(await) 결과를 획득해줌
from prefect import task, flow
@task
def cool_task():
return "sup"
@flow
def my_workflow():
future = cool_task.submit()
result = future.result()
print(result) # "sup
당장의 결과엔 관심없고 그냥 완료되기를 기대한다? 그러면 future.wait() 혹은 wait(futures) 사용
from prefect import task, flow
from prefect.futures import wait
@task
def cool_task():
return "sup"
@flow
def my_workflow_multiple():
futures = [cool_task.submit() for _ in range(10)]
wait(futures)
@flow
def my_workflow_single():
future = cool_task.submit()
future.wait()
.map() 을 사용하는 방법
iterable 하게 task 를 submit 하려면 map 을 쓰면 됨 여러개 .submit 하는 것과 약간 차이 존재
그냥 일반적으로 map 쓰는것과 비슷하다고 보면 됨
import time
from prefect import flow, task
from prefect.futures import wait
@task
def stop_at_floor(floor: int):
print(f"elevator moving to floor {floor}")
time.sleep(floor)
print(f"elevator stops on floor {floor}")
@flow
def elevator():
floors = list(range(10, 0, -1))
floors = stop_at_floor.map(floors)
wait(floors)
map을 쓰면 일종의 lambda 식 처럼 처리되게 되는데, (x: iteralbe item 이런식으로)
만약 static iterable 을 전달하고 싶다면 다음과 같이 unmapped 를 사용하면 됨
from prefect import flow, task, unmapped
@task
def sum_plus(x, static_iterable):
return x + sum(static_iterable)
@flow
def sum_it(numbers, static_iterable):
futures = sum_plus.map(numbers, unmapped(static_iterable))
return futures.result()
resulting_sum = sum_it([4, 5, 6], [1, 2, 3])
assert resulting_sum == [10, 11, 12]
wait_for
task 여러개를 flow 에서 실행하는데 서로 간의 의존관계를 맺어주고 싶다. ⇒ wait_for 사용
from prefect import flow, task
@task
def task_a():
pass
@task
def task_b():
pass
@task
def task_c():
pass
@task
def task_d():
pass
@flow
def my_flow():
a = task_a.submit()
b = task_b.submit()
# Wait for task_a and task_b to complete
c = task_c.submit(wait_for=[a, b])
# task_d will wait for task_c to complete
# Note: If waiting for one task it must still be in a list.
d = task_d(wait_for=[c])
일종의 DAG 같은 거 그린다고 생각하면 되는데 여기서는
a,b 일단 던지고 c는 둘다 완료되기 전까지 대기, d는 c 완료되기 전까지 대기
handling failures in concurrent work
concurrent 한 task 중 하나가 터졌다고 해서 workflow 단이 다 터지면 안됨 → 애초에 그렇게 되어있음
암튼 실패한 task 들을 추적하고 싶을 수 있는데 그건 wait 을 사용해주면 되긴 함 (물론 workflow 단에서 다 추적해주긴 하나 코드 단에서 잡고 싶음을 가정)
from typing import Any
from prefect import flow, task
from prefect.futures import wait
from prefect.states import State
@task
def process_data(item: int) -> str:
if item < 0:
raise ValueError(f"Cannot process negative value: {item}")
return f"Processed {item}"
@flow
def batch_processing():
items = [1, -2, 3, -4, 5]
# Submit all tasks and return the futures
futures = process_data.map(items)
# Wait for all futures to complete concurrently
done, not_done = wait(futures)
# Check each future's state
successful: list[Any] = []
failed: list[State] = []
for future in done:
if future.state.is_completed():
successful.append(future.result())
else:
failed.append(future.state)
print(f"Processed {len(successful)} items successfully")
print(f"Failed to process {len(failed)} items")
return successful
wait 은 단순히 blocking 만 하는게 아니라 done, not_done 이라는 futures tuple 반환함
각 future 에는 state property 가 있어서 inspect 가능함..
이 방법이 무조건 왕도는 아니라 그냥 참고용임 failure handling 은 하기 나름임
asyncio 사용
import asyncio
from prefect import flow, task
@task
async def stop_at_floor(floor: int):
print(f"elevator moving to floor {floor}")
await asyncio.sleep(floor)
print(f"elevator stops on floor {floor}")
@flow
async def elevator():
floors = list(range(10, 0, -1))
await asyncio.gather(*[stop_at_floor(floor) for floor in floors])
if __name__ == "__main__":
asyncio.run(elevator())
- task 가 비동기 코드로만 작성되어야 하고
- flow 에서 task 들을 .gather 와 같은 비동기 호출용 코드여야 하며
- flow는 asyncio.run 으로 커버해줘야 함
Mapped Task Real world example
Prefect 에서 mapped task는 다음의 상황들에서 추천하고 있다.
- Machine learning model evaluation → 같은 datasource 로 여러 model 트레이닝 필요
- ETL pipelines ⇒ 다양한 datasource 로 다양한 transform 필요
- API data enrichment ⇒ 여러 Record 들에 대해 여러 외부 호출 필요
요약하자면, 그냥 map 의 장점과 똑같다. 벡터 느낌으로 concurrent 하게 접근할 때 필요하단 뜻이다.
import random
from dataclasses import dataclass
from prefect import flow, task
from prefect.futures import PrefectFuture, wait
@dataclass
class Dataset:
name: str
@dataclass
class ModelConfig:
name: str
@task(task_run_name="train on {dataset.name} with {model_config.name}")
def train_model(dataset: Dataset, model_config: ModelConfig) -> dict:
return {
"dataset": dataset.name,
"model": model_config.name,
"score": random.random(),
}
@flow
def evaluate_models(datasets: list[Dataset], model_configs: list[ModelConfig]):
all_futures: list[PrefectFuture[dict[str, object]]] = []
for dataset in datasets:
futures = train_model.map(
dataset=dataset,
model_config=model_configs,
)
all_futures.extend(futures)
results = [future.result() for future in wait(all_futures).done]
print(f"\\nBest model: {max(results, key=lambda r: r['score'])}")
evaluate_models(
datasets=[
Dataset("customers"), Dataset("products"), Dataset("orders")
],
model_configs=[
ModelConfig("random_forest"), ModelConfig("gradient_boosting")
],
)
Background Task
background task 에 대해 먼저 알아야 한다.
backgorund task 는 일단 어떻게든 “떠 있다가” trigger 되는 그런 개념
concurrent task 와 다른 점은 이건 실행 위치가 workflow 바깥이라는 것
Task를 background 로 띄워보자
from prefect import task
@task(log_prints=True)
def add(a: int, b: int):
print(f"{a} + {b} = {a + b}")
add.serve()
이렇게 하면 실행은 안되고 떠 있을 준비는 되는 것
add.delay(1,2)
이렇게 하면 실제로 로직이 백그라운드에서 동작함
celery 로 간단히 비유하자면
- serve ⇒ shared_task annotation 으로 celery task 정의 및 생성
- delay ⇒ shared_task를 백그라운드로 직접 실행시키는 것
좀 더 깔끔하게 비유하자면,
- serve ⇒ message queue 를 소비할 Consumer 등록하여 Consume 시작.
- delay ⇒ message queue 에 토픽을 Produce.
from prefect import task
from prefect.task_worker import serve
@task(log_prints=True)
def add(a: int, b: int):
print(f"{a} + {b} = {a + b}")
@task(log_prints=True)
def multiply(a: int, b: int):
print(f"{a} * {b} = {a * b}")
# Both these invocations will run once the worker is started
add.delay(1, 2)
multiply.delay(1, 4)
serve(add, multiply)
이 경우 queue 에 메시지가 2개 쌓여 있다가 serve 로 뜨는 순간 바로 소비됨 (serve 안하면 소비 안됨)
Task Result Cache
설정에 따라 각 Task 의 결과를 caching 할 수 있다고 함
실질적인 캐시 저장 위치는 기본적으로 prefect 에서 local 에 저장하고 있음
- ~/.prefect/storeages/
from prefect import task, flow
@task(persist_result=True)
def add_one(x: int):
return x + 1
@flow
def my_flow():
add_one(1) # will not be cached
add_one(1) # will be cached
add_one(2) # will not be cached
if __name__ == "__main__":
my_flow()
이런 식으로 task 를 선언할 때 cache 여부를 지정해주면 사용가능함
이때 cache policy 는 DEFAULT 를 사용한다. (여러 종류 존재함)
참고로 DEFAULT = {INPUTS && TASK_SOURCE && RUN_ID} 를 key 로 갖는다.
즉, 같은 input 으로 같은 parent 로부터 같은 task 를 호출할 경우 result 는 cached
세부적인 behaviour 를 살펴보자
Cache Policy & Expiration
from prefect import task
from prefect.cache_policies import INPUTS
import time
from datetime import timedelta
@task(cache_policy=INPUTS, cache_expiration=timedelta(seconds=10))
def my_stateful_task(x: int):
print('sleeping')
time.sleep(10)
return x + 1
my_stateful_task(x=1) # sleeps
my_stateful_task(x=1) # does not sleep
# ... 10 seconds pass ...
my_stateful_task(x=1) # sleeps again
cache_policy, cache_expiration 등을 그냥 설정하면 됨
Cache in a distributed Environment
from prefect import task
from prefect.cache_policies import INPUTS
from prefect_aws import AwsCredentials, S3Bucket
s3_bucket = S3Bucket(
credentials=AwsCredentials(
aws_access_key_id="my-access-key-id",
aws_secret_access_key="my-secret-access-key",
),
bucket_name="my-bucket",
)
# save the block to ensure it is available across machines
s3_bucket.save("my-cache-bucket")
@task(cache_policy=INPUTS, result_storage=s3_bucket)
def my_cached_task(x: int):
return x + 42
아까 설명했든 cache 는 로컬 파일로 저장됨 (sqlite 든 json 이든) 그래서 저장 위치만 잘 조정해주면 분산된 환경에서 일관된 cache 를 사용가능함
위의 예제에서는 s3 에 업로드하는 것으로 이를 커버함
다음 시간에는 Configure Workflow 및 Task Runner, Design Consideration 에 대해서 알아보도록 하자.
'Read the Docs' 카테고리의 다른 글
| Kafka partition : consumer group instance (0) | 2025.10.10 |
|---|---|
| Prefect 교양 지식(3) - Define Workflow, Task Runner, Design Considerations (0) | 2025.10.10 |
| Prefect 교양 지식(1) - Flow (0) | 2025.09.25 |
| [Read the Docs?]: Three Dot (2) | 2022.01.19 |
| [Read the Docs]: MDN Javascript Objects (0) | 2022.01.19 |
- Total
- Today
- Yesterday
- 불필요한 값 무시하기
- BOJ
- 힙
- docker-compose update
- Til
- 스택
- PREFECT
- 최대한 간략화하기
- Python
- jwt
- endl을절대쓰지마
- cipher suite
- 삽질
- Javascript
- 그리디
- 우선순위큐
- 위상정렬
- Remote
- 파이썬
- django testcase
- kafka쓰고싶어요
- 코딩테스트
- 이것도모르면바보
- vscode
- SSL
- 백준
- 회고
- SQL
- 프로그래머스
- requests
| 일 | 월 | 화 | 수 | 목 | 금 | 토 |
|---|---|---|---|---|---|---|
| 1 | 2 | |||||
| 3 | 4 | 5 | 6 | 7 | 8 | 9 |
| 10 | 11 | 12 | 13 | 14 | 15 | 16 |
| 17 | 18 | 19 | 20 | 21 | 22 | 23 |
| 24 | 25 | 26 | 27 | 28 | 29 | 30 |
| 31 |