티스토리 뷰

Read the Docs

Prefect 교양 지식(2) - Tasks

onaeonae1 2025. 10. 3. 14:26

회사에서 Prefect 를 쓸 일이 생겼다. 본격적인 업무 투입 전 공식문서를 읽고 개념들을 파악해보자.

 

지난 시간에는 Flow의 개념에 대해서 파악했고, 오늘 알아볼 것은 Task이다.

 

Tasks

Def) tasks are atomic units of work with transactional semantics

flow 와 비슷하게 decorator 를 사용해서 정의한다

from prefect import task

@task(log_prints=True)
def explain_tasks():
    print("run any python code here!")
    print("but maybe just a little bit")

주로 Flow 보다 작은 기능들을 커버할 때 사용된다(thread 같은 거라고 보면 됨)

일반적으로 flow 처럼 python function 이랑 동작하는게 매우 비슷하나 다음의 특징을 가진다.

일단 flow 와 동일한 특징은 다음과 같다.

  1. metadata (state 포함) 관리됨
  2. 실패 시 retry (횟수 조정 가능)
  3. timeout 을 설정할 수 있다.

flow 에는 없는 특징은 다음과 같다.

  1. 각 task 가 특정 state 에 진입할 때마다 기록됨 ⇒ state based logic 가능
  2. upstream task 가 뿌려지는 future 를 downstream task 에서 resolve 가능함
  3. cachable ⇒ result 의 reuse 가 가능하다
  4. concurrency ⇒ .submit(), .map()
  5. background _task ⇒ .serve(), .delay()
  6. task_key (hash composed of the task_name and fqdn of the function) 로 식별됨

여기서 중요한 것만 뽑아보면 아마 다음과 같을 것이다.

  1. concurrency (.submit, .map)
  2. background_task (.serve, .delay)
  3. cachable

task 의 생명 주기

 

flow 처럼 task는 각각의 state lifecycle 을 갖는다

참고로 .delay() 를 사용해서 task 를 background 로 돌리고자 할 경우, “Scheduled” 라는 state 를 처음에 가짐

 

task 를 실행시키는 방법

간단히 요약하자면, 다음의 3가지 방법으로 실행시킬 수 있음

  1. 그냥 함수 호출 ⇒ BLOCKING 발생하므로 권장되지 않음
    1. task decorator 에서는 call 을 따로 override 하지 않음
  2. task_runner 에 .submit() 혹은 .map() ⇒ concurrent 하게 돌리는 방법
  3. task_worker 에 delay() 혹은 serve() ⇒ background로 돌리는 방법

Concurrent Task

다시 짚고 넘어가자면, FLOW는 어디까지나 단위임

세부적인 runtime 단을 handling 할 때는 task를 조작해서 움직임

이제 그럼 이걸 어떻게 concurrent 하게 돌리는지 알아보자

.submit() 을 사용하는 방법

기본적으로, FLOW 안에서 task 들을 .submit() 하면 task_runner 에 전달되어 concurrent 하게 동작함

import time

from prefect import flow, task
from prefect.futures import wait

@task
def stop_at_floor(floor: int):
    print(f"elevator moving to floor {floor}")
    time.sleep(floor)
    print(f"elevator stops on floor {floor}")

@flow
def elevator():
    floors = []
    for floor in range(10, 0, -1):
        floors.append(stop_at_floor.submit(floor))
    wait(floors)

이렇게 하면 “기본적으로” ThreadPoolTaskRunner” 라는 task_runner에 Task 를 던지고 거기서 실행하도록 한다

  • thread pool executor 를 사용하는 것과 동일함

submit 한 task 가 돌아갈 runner 종류마다 어느정도 주의가 필요하다.

일단 이 runner 에서는 Thread-Safe 한 task 만이 가능하다는 점만 짚고 넘어가자

https://docs.prefect.io/v3/concepts/task-runners#design-considerations

 

Task runners - Prefect

Learn about task runners for concurrent, parallel or distributed execution of tasks.

docs.prefect.io

 

다른 task_runner 에 돌리려면? ⇒ flow 단에서 지정 필요

이때 의존성을 설치해줘야 한다. 예를 들면 아래의 코드에서는 prefect[dask] 를 설치해줘야 한다

import time

from prefect import flow, task
from prefect.futures import wait
from prefect_dask.task_runners import DaskTaskRunner

@task
def stop_at_floor(floor: int):
    print(f"elevator moving to floor {floor}")
    time.sleep(floor)
    print(f"elevator stops on floor {floor}")

@flow(task_runner=DaskTaskRunner())
def elevator():
    floors = []
    for floor in range(10, 0, -1):
        floors.append(stop_at_floor.submit(floor))
    wait(floors)

 

handling futures

task 는 항상 future 임. 그러니까 a = task() 이렇게 하면 PrefectFuture 이런게 a 로 받아짐

그래서 그 task 가 실제로 return 할 것이라고 기대되는 무언가를 받으려면 .result() 로 접근 필요

그럼 이게 끝날때까지 기다리고(await) 결과를 획득해줌

from prefect import task, flow

@task
def cool_task():
    return "sup"
    
@flow
def my_workflow():
    future = cool_task.submit()
    result = future.result()
    print(result) # "sup

당장의 결과엔 관심없고 그냥 완료되기를 기대한다? 그러면 future.wait() 혹은 wait(futures) 사용

from prefect import task, flow
from prefect.futures import wait

@task
def cool_task():
    return "sup"
    
    
@flow
def my_workflow_multiple():
    futures = [cool_task.submit() for _ in range(10)]
    wait(futures)
    
@flow
def my_workflow_single():
    future = cool_task.submit()
    future.wait()

.map() 을 사용하는 방법

iterable 하게 task 를 submit 하려면 map 을 쓰면 됨 여러개 .submit 하는 것과 약간 차이 존재

그냥 일반적으로 map 쓰는것과 비슷하다고 보면 됨

import time

from prefect import flow, task
from prefect.futures import wait

@task
def stop_at_floor(floor: int):
    print(f"elevator moving to floor {floor}")
    time.sleep(floor)
    print(f"elevator stops on floor {floor}")

@flow
def elevator():
    floors = list(range(10, 0, -1))
    floors = stop_at_floor.map(floors)
    wait(floors)

map을 쓰면 일종의 lambda 식 처럼 처리되게 되는데, (x: iteralbe item 이런식으로)

만약 static iterable 을 전달하고 싶다면 다음과 같이 unmapped 를 사용하면 됨

from prefect import flow, task, unmapped

@task
def sum_plus(x, static_iterable):
    return x + sum(static_iterable)

@flow
def sum_it(numbers, static_iterable):
    futures = sum_plus.map(numbers, unmapped(static_iterable))
    return futures.result()

resulting_sum = sum_it([4, 5, 6], [1, 2, 3])
assert resulting_sum == [10, 11, 12]

wait_for

task 여러개를 flow 에서 실행하는데 서로 간의 의존관계를 맺어주고 싶다. ⇒ wait_for 사용

from prefect import flow, task
@task
def task_a():
    pass

@task
def task_b():
    pass

@task
def task_c():
    pass
    
@task
def task_d():
    pass

@flow
def my_flow():
    a = task_a.submit()
    b = task_b.submit()
    # Wait for task_a and task_b to complete
    c = task_c.submit(wait_for=[a, b])
    # task_d will wait for task_c to complete
    # Note: If waiting for one task it must still be in a list.
    d = task_d(wait_for=[c])

일종의 DAG 같은 거 그린다고 생각하면 되는데 여기서는

a,b 일단 던지고 c는 둘다 완료되기 전까지 대기, d는 c 완료되기 전까지 대기

handling failures in concurrent work

concurrent 한 task 중 하나가 터졌다고 해서 workflow 단이 다 터지면 안됨 → 애초에 그렇게 되어있음

암튼 실패한 task 들을 추적하고 싶을 수 있는데 그건 wait 을 사용해주면 되긴 함 (물론 workflow 단에서 다 추적해주긴 하나 코드 단에서 잡고 싶음을 가정)

from typing import Any

from prefect import flow, task
from prefect.futures import wait
from prefect.states import State

@task
def process_data(item: int) -> str:
    if item < 0:
        raise ValueError(f"Cannot process negative value: {item}")
    return f"Processed {item}"

@flow
def batch_processing():
    items = [1, -2, 3, -4, 5]
    
    # Submit all tasks and return the futures
    futures = process_data.map(items)
    
    # Wait for all futures to complete concurrently
    done, not_done = wait(futures)
    
    # Check each future's state
    successful: list[Any] = []
    failed: list[State] = []
    
    for future in done:
        if future.state.is_completed():
            successful.append(future.result())
        else:
            failed.append(future.state)
    
    print(f"Processed {len(successful)} items successfully")
    print(f"Failed to process {len(failed)} items")
    
    return successful

wait 은 단순히 blocking 만 하는게 아니라 done, not_done 이라는 futures tuple 반환함

각 future 에는 state property 가 있어서 inspect 가능함..

이 방법이 무조건 왕도는 아니라 그냥 참고용임 failure handling 은 하기 나름임

asyncio 사용

import asyncio

from prefect import flow, task

@task
async def stop_at_floor(floor: int):
    print(f"elevator moving to floor {floor}")
    await asyncio.sleep(floor)
    print(f"elevator stops on floor {floor}")

@flow
async def elevator():
    floors = list(range(10, 0, -1))
    await asyncio.gather(*[stop_at_floor(floor) for floor in floors])

if __name__ == "__main__":
    asyncio.run(elevator())
 
  • task 가 비동기 코드로만 작성되어야 하고
  • flow 에서 task 들을 .gather 와 같은 비동기 호출용 코드여야 하며
  • flow는 asyncio.run 으로 커버해줘야 함

Mapped Task Real world example

Prefect 에서 mapped task는 다음의 상황들에서 추천하고 있다.

  1. Machine learning model evaluation → 같은 datasource 로 여러 model 트레이닝 필요
  2. ETL pipelines ⇒ 다양한 datasource 로 다양한 transform 필요
  3. API data enrichment ⇒ 여러 Record 들에 대해 여러 외부 호출 필요

요약하자면, 그냥 map 의 장점과 똑같다. 벡터 느낌으로 concurrent 하게 접근할 때 필요하단 뜻이다.

import random
from dataclasses import dataclass

from prefect import flow, task
from prefect.futures import PrefectFuture, wait

@dataclass
class Dataset:
    name: str

@dataclass
class ModelConfig:
    name: str

@task(task_run_name="train on {dataset.name} with {model_config.name}")
def train_model(dataset: Dataset, model_config: ModelConfig) -> dict:
    return {
        "dataset": dataset.name,
        "model": model_config.name,
        "score": random.random(),
    }

@flow
def evaluate_models(datasets: list[Dataset], model_configs: list[ModelConfig]):
    all_futures: list[PrefectFuture[dict[str, object]]] = []
    for dataset in datasets:
        futures = train_model.map(
            dataset=dataset,
            model_config=model_configs,
        )
        all_futures.extend(futures)

    results = [future.result() for future in wait(all_futures).done]

    print(f"\\nBest model: {max(results, key=lambda r: r['score'])}")

evaluate_models(
    datasets=[
        Dataset("customers"), Dataset("products"), Dataset("orders")
    ],
    model_configs=[
        ModelConfig("random_forest"), ModelConfig("gradient_boosting")
    ],
)

Background Task

background task 에 대해 먼저 알아야 한다.

backgorund task 는 일단 어떻게든 “떠 있다가” trigger 되는 그런 개념

concurrent task 와 다른 점은 이건 실행 위치가 workflow 바깥이라는 것

Task를 background 로 띄워보자

from prefect import task

@task(log_prints=True)
def add(a: int, b: int):
    print(f"{a} + {b} = {a + b}")

add.serve()

이렇게 하면 실행은 안되고 떠 있을 준비는 되는 것

add.delay(1,2) 

이렇게 하면 실제로 로직이 백그라운드에서 동작함

celery 로 간단히 비유하자면

  • serve ⇒ shared_task annotation 으로 celery task 정의 및 생성
  • delay ⇒ shared_task를 백그라운드로 직접 실행시키는 것

좀 더 깔끔하게 비유하자면,

  • serve ⇒ message queue 를 소비할 Consumer 등록하여 Consume 시작.
  • delay ⇒ message queue 에 토픽을 Produce.
from prefect import task
from prefect.task_worker import serve

@task(log_prints=True)
def add(a: int, b: int):
    print(f"{a} + {b} = {a + b}")

@task(log_prints=True)
def multiply(a: int, b: int):
    print(f"{a} * {b} = {a * b}")

# Both these invocations will run once the worker is started
add.delay(1, 2)
multiply.delay(1, 4)

serve(add, multiply)

이 경우 queue 에 메시지가 2개 쌓여 있다가 serve 로 뜨는 순간 바로 소비됨 (serve 안하면 소비 안됨)

Task Result Cache

설정에 따라 각 Task 의 결과를 caching 할 수 있다고 함

실질적인 캐시 저장 위치는 기본적으로 prefect 에서 local 에 저장하고 있음

  • ~/.prefect/storeages/
from prefect import task, flow

@task(persist_result=True)
def add_one(x: int):
    return x + 1

@flow
def my_flow():
    add_one(1) # will not be cached
    add_one(1) # will be cached
    add_one(2) # will not be cached

if __name__ == "__main__":
    my_flow()

이런 식으로 task 를 선언할 때 cache 여부를 지정해주면 사용가능함

이때 cache policy 는 DEFAULT 를 사용한다. (여러 종류 존재함)

참고로 DEFAULT = {INPUTS && TASK_SOURCE && RUN_ID} 를 key 로 갖는다.

즉, 같은 input 으로 같은 parent 로부터 같은 task 를 호출할 경우 result 는 cached

세부적인 behaviour 를 살펴보자

Cache Policy & Expiration

from prefect import task
from prefect.cache_policies import INPUTS

import time
from datetime import timedelta

@task(cache_policy=INPUTS, cache_expiration=timedelta(seconds=10))
def my_stateful_task(x: int):
    print('sleeping')
    time.sleep(10)
    return x + 1

my_stateful_task(x=1) # sleeps
my_stateful_task(x=1) # does not sleep
# ... 10 seconds pass ...
my_stateful_task(x=1) # sleeps again

cache_policy, cache_expiration 등을 그냥 설정하면 됨

Cache in a distributed Environment

from prefect import task
from prefect.cache_policies import INPUTS

from prefect_aws import AwsCredentials, S3Bucket

s3_bucket = S3Bucket(
    credentials=AwsCredentials(
        aws_access_key_id="my-access-key-id",
        aws_secret_access_key="my-secret-access-key",
    ),
    bucket_name="my-bucket",
)
# save the block to ensure it is available across machines
s3_bucket.save("my-cache-bucket")

@task(cache_policy=INPUTS, result_storage=s3_bucket)
def my_cached_task(x: int):
    return x + 42

아까 설명했든 cache 는 로컬 파일로 저장됨 (sqlite 든 json 이든) 그래서 저장 위치만 잘 조정해주면 분산된 환경에서 일관된 cache 를 사용가능함

위의 예제에서는 s3 에 업로드하는 것으로 이를 커버함

 

 

다음 시간에는 Configure Workflow 및 Task Runner, Design Consideration 에 대해서 알아보도록 하자.

공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2026/05   »
1 2
3 4 5 6 7 8 9
10 11 12 13 14 15 16
17 18 19 20 21 22 23
24 25 26 27 28 29 30
31
글 보관함