티스토리 뷰

 

Define/Configure workflow

from prefect import flow

@flow
def my_workflow() -> str:
    return "Hello, world!"

앞서 설명했듯 정의하는 것 자체는 매우 간단하다. 위와 같이 지원되는 함수에 데코레이터를 붙이기만 하면 기본적인 정의는 된다.

지원되는 함수 ⇒ 일반 함수, classmethod, instance_method, generator, …

Flow Configuration

flow 를 정의할 때 decorator 에 넣어줄 수 있는 configuration 은 다음과 같다.

참고로 모두 optional 이다.

  • description
  • name → flow 의 이름
  • retries ⇒ failure 시 재시도를 최대 몇 번이나 할 건지 (기본값 0)
  • retry_delay_seconds ⇒ 각 retry 마다 얼마나 대기할지
  • flow_run_name
  • task_runner ⇒ flow 내 task 들을 돌릴 환경(기본 값 ThreadPoolExecutor)
  • timeout_seconds ⇒ flow 에 대해 최대 실행할 시간. 그 이상이면 failed
  • validate_parameters ⇒ flow 의 param 으로 들어오는 것에 대해 pydantic 기반 타입 검증 할건지(bool)
  • version

Task Configuration

task를 정의할 때 decorator 에 넣어줄 수 있는 configuration 은 다음과 같다.

참고로 모두 optional 이다.

  • name → task 의 이름 (참고로 변수를 받을 수 있음)
  • description
  • tags → task의 분류 같은 건데, 주로 concurrency limit 을 설정할 때 자주 씀
  • timeout_seconds
  • cache_key_fn ⇒ task 의 결과에 대한 cache_key를 어떻게 생성할지
  • cache_policy ⇒ cache_key_fn 대신 사전에 정의된 cache_key 생성 policy 명시 (’+’ 를 사용해서 여러 policy 혼합 가능)
  • cache_expiration ⇒ 캐시 유효기간
  • retries
  • retry_delay_seconds
  • retry_condition_fn ⇒ retry 시 호출하는, bool을 반환하는 함수 ⇒ true 면 task retry 함
  • log_prints ⇒ print 를 log 로 자동으로 기록할건지

Case: automatically rerun a workflow when it fails

Workflow 단에 실패 시 재시도를 거는 것은 retry 관련 설정들만 잘 해주면 된다

  1. flow/task 단에 retry를 설정
  2. task 단에 retry_condition_fn 을 설정하는 등
  3. 혹은 prefect config 단에 전역적으로 retry 관련 설정을 해주는 것으로 처리 가능

Task Runners (for concurrent task in flow)

task 를 concurrent 하게 돌리는 상황은 어디까지나 flow 안에서이다.

프로세스 안에서 concurrent 하게 실행시키려면 thread 를 사용하는 것처럼 prefect 에서는 이때 task runner 라는 것을 사용한다

따라서 Flow 를 정의할 때 task runner 을 정의하면, 이후 .submit() .map() 로 task 실행 요청시 그 쪽에서 처리된다

가능한 task runner 목록

  1. ThreadPoolTaskRunner ⇒ 따로 지정 안할 경우 기본적으로 사용. thread-safe 한 task 작성 필요
  2. ProcessPoolTaskRunner ⇒ cpu-intensive task 들 돌릴 때 좋음 (python multiprocessing에 기반함)
  3. DaskTaskRunner ⇒ Dask 를 사용함
  4. RayTaskRunner ⇒ Ray 를 사용함

어느쪽을 사용하건 Flow 에서는 .submit(), .map() 을 통해 실행을 요청. Future 가 반환됨. future.result() 를 통해 결과에 접근은 동일

Desigin Considerations

  1. Task 를 “잘” 작성해야함 ⇒ 너무 복잡하지 않으면서도 너무 작지 않게(=context switch 로 인한 overhead)
  2. 자원 제약 ⇒ map 같은거 한번에 많이 돌리면 메모리 초과되기 좋음 ⇒ BATCH 성으로 처리 하길
  3. data transfer ⇒ task 단에 large data 를 보내는 것은 io performance 안좋음. 그냥 ref 전달하는게 좋음
  4. parallelism ⇒ 진짜 병렬성을 원한다면 ray, dask를 쓰는 것을 추천함(나머지는 concurrent 임)
  5. unsafe global state 의 접근 조심 ⇒ thread-safe 한 코드를 작성하는 것과 일맥상통함

'Read the Docs' 카테고리의 다른 글

Kafka partition : consumer group instance  (0) 2025.10.10
Prefect 교양 지식(2) - Tasks  (0) 2025.10.03
Prefect 교양 지식(1) - Flow  (0) 2025.09.25
[Read the Docs?]: Three Dot  (2) 2022.01.19
[Read the Docs]: MDN Javascript Objects  (0) 2022.01.19
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2026/05   »
1 2
3 4 5 6 7 8 9
10 11 12 13 14 15 16
17 18 19 20 21 22 23
24 25 26 27 28 29 30
31
글 보관함