티스토리 뷰
Read the Docs
Prefect 교양 지식(3) - Define Workflow, Task Runner, Design Considerations
onaeonae1 2025. 10. 10. 10:57
Define/Configure workflow
from prefect import flow
@flow
def my_workflow() -> str:
return "Hello, world!"
앞서 설명했듯 정의하는 것 자체는 매우 간단하다. 위와 같이 지원되는 함수에 데코레이터를 붙이기만 하면 기본적인 정의는 된다.
지원되는 함수 ⇒ 일반 함수, classmethod, instance_method, generator, …
Flow Configuration
flow 를 정의할 때 decorator 에 넣어줄 수 있는 configuration 은 다음과 같다.
참고로 모두 optional 이다.
- description
- name → flow 의 이름
- retries ⇒ failure 시 재시도를 최대 몇 번이나 할 건지 (기본값 0)
- retry_delay_seconds ⇒ 각 retry 마다 얼마나 대기할지
- flow_run_name
- task_runner ⇒ flow 내 task 들을 돌릴 환경(기본 값 ThreadPoolExecutor)
- timeout_seconds ⇒ flow 에 대해 최대 실행할 시간. 그 이상이면 failed
- validate_parameters ⇒ flow 의 param 으로 들어오는 것에 대해 pydantic 기반 타입 검증 할건지(bool)
- version
Task Configuration
task를 정의할 때 decorator 에 넣어줄 수 있는 configuration 은 다음과 같다.
참고로 모두 optional 이다.
- name → task 의 이름 (참고로 변수를 받을 수 있음)
- description
- tags → task의 분류 같은 건데, 주로 concurrency limit 을 설정할 때 자주 씀
- timeout_seconds
- cache_key_fn ⇒ task 의 결과에 대한 cache_key를 어떻게 생성할지
- cache_policy ⇒ cache_key_fn 대신 사전에 정의된 cache_key 생성 policy 명시 (’+’ 를 사용해서 여러 policy 혼합 가능)
- cache_expiration ⇒ 캐시 유효기간
- retries
- retry_delay_seconds
- retry_condition_fn ⇒ retry 시 호출하는, bool을 반환하는 함수 ⇒ true 면 task retry 함
- log_prints ⇒ print 를 log 로 자동으로 기록할건지
Case: automatically rerun a workflow when it fails
Workflow 단에 실패 시 재시도를 거는 것은 retry 관련 설정들만 잘 해주면 된다
- flow/task 단에 retry를 설정
- task 단에 retry_condition_fn 을 설정하는 등
- 혹은 prefect config 단에 전역적으로 retry 관련 설정을 해주는 것으로 처리 가능
Task Runners (for concurrent task in flow)
task 를 concurrent 하게 돌리는 상황은 어디까지나 flow 안에서이다.
프로세스 안에서 concurrent 하게 실행시키려면 thread 를 사용하는 것처럼 prefect 에서는 이때 task runner 라는 것을 사용한다
따라서 Flow 를 정의할 때 task runner 을 정의하면, 이후 .submit() .map() 로 task 실행 요청시 그 쪽에서 처리된다
가능한 task runner 목록
- ThreadPoolTaskRunner ⇒ 따로 지정 안할 경우 기본적으로 사용. thread-safe 한 task 작성 필요
- ProcessPoolTaskRunner ⇒ cpu-intensive task 들 돌릴 때 좋음 (python multiprocessing에 기반함)
- DaskTaskRunner ⇒ Dask 를 사용함
- RayTaskRunner ⇒ Ray 를 사용함
어느쪽을 사용하건 Flow 에서는 .submit(), .map() 을 통해 실행을 요청. Future 가 반환됨. future.result() 를 통해 결과에 접근은 동일
Desigin Considerations
- Task 를 “잘” 작성해야함 ⇒ 너무 복잡하지 않으면서도 너무 작지 않게(=context switch 로 인한 overhead)
- 자원 제약 ⇒ map 같은거 한번에 많이 돌리면 메모리 초과되기 좋음 ⇒ BATCH 성으로 처리 하길
- data transfer ⇒ task 단에 large data 를 보내는 것은 io performance 안좋음. 그냥 ref 전달하는게 좋음
- parallelism ⇒ 진짜 병렬성을 원한다면 ray, dask를 쓰는 것을 추천함(나머지는 concurrent 임)
- unsafe global state 의 접근 조심 ⇒ thread-safe 한 코드를 작성하는 것과 일맥상통함
'Read the Docs' 카테고리의 다른 글
| Kafka partition : consumer group instance (0) | 2025.10.10 |
|---|---|
| Prefect 교양 지식(2) - Tasks (0) | 2025.10.03 |
| Prefect 교양 지식(1) - Flow (0) | 2025.09.25 |
| [Read the Docs?]: Three Dot (2) | 2022.01.19 |
| [Read the Docs]: MDN Javascript Objects (0) | 2022.01.19 |
공지사항
최근에 올라온 글
최근에 달린 댓글
- Total
- Today
- Yesterday
링크
TAG
- Remote
- PREFECT
- jwt
- SQL
- cipher suite
- Python
- 최대한 간략화하기
- django testcase
- endl을절대쓰지마
- 위상정렬
- 프로그래머스
- 우선순위큐
- Til
- 삽질
- kafka쓰고싶어요
- 코딩테스트
- 불필요한 값 무시하기
- requests
- 이것도모르면바보
- Javascript
- docker-compose update
- 힙
- 스택
- SSL
- BOJ
- 백준
- 회고
- 그리디
- vscode
- 파이썬
| 일 | 월 | 화 | 수 | 목 | 금 | 토 |
|---|---|---|---|---|---|---|
| 1 | 2 | |||||
| 3 | 4 | 5 | 6 | 7 | 8 | 9 |
| 10 | 11 | 12 | 13 | 14 | 15 | 16 |
| 17 | 18 | 19 | 20 | 21 | 22 | 23 |
| 24 | 25 | 26 | 27 | 28 | 29 | 30 |
| 31 |
글 보관함