티스토리 뷰
회사에서 prefect 를 쓸 일이 생겼다. 공식 문서를 읽으면서 정리한 것이다.
Prefect 의 주요 개념으로는 다음이 존재한다.
- Workflows
- flows
- tasks
- Deployments
- Configuration
- Automations
- Prefect Cloud 가 있다.
여기서 Flow 부분까지 정리해보자
1. Flow
from prefect import Flow
@flow(log_prints=True)
def explain_flows():
print("run python code")
### magic stuff
print("encapsulated")
Flow 는 Python Function 으로 정의되며, Prefect 에서 관리되는 작업의 기본적인 단위(unit) 임
기본적으로 python function 과 매우 유사하나, 다음의 추가적인 특징을 가진다
- 실행에 대한 metadata
- state 가 그 예시이며, 대부분 prefect 에서 자동으로 관리된다
- parameter 사용 가능(당연하게도)
- workflow 에서 작업 호출할 때 타입 검증 해줌(pydantic based)
- runtime context (현 실행에 대한 정보) global access 가능함
- 횟수 제한을 설정할 수 있는 실패 시 재시도 기능
- 작업 일체에 대한 타임아웃 설정 가능
- (Prefect Server) API를 통해 각각의 task 를 등록가능
- 각 Flow 는 서로를 호출할 수 있음(필수적이진 않음)
요약하자면, Prefect 에서 관리되는 함수 단위로 보면 된다.
flow 생명 주기
그럼 이것의 생명주기는 어떻게 될까?
아까 말한 metadata 에 state 가 존재하는데, 함수가 실행되면서 이것으로 관리한다

근데 저건 모든 인프라가 잘 동작할 때의 이야기이고 다음의 3가지 요소에 의한 interrupt 까지는 잡아줌
- manual/accidental 한 flow run의 취소
- flow 가 실행되는 host infrastructure 의 부재/오작동
- 기타 네트워크 이슈와 같은 것들
이 경우들에 대해서는 최대한 종료를 보장하려고 해줌. → 여기 해당 안되는 사유들은 ZOMBIE FLOW RUN
Flow 를 그래서 어케 실행시키는데?
일단 함수에 decorator 를 붙여준다면 다음과 같이 실행시킬 수 있다.
- 함수 호출
- 진짜 그냥 호출
- crontab, modal 같은 스케줄링 써서 함수 호출
- prefect server(혹은 prefect cloud) 에서 메뉴얼하게 호출
- prefect server 를 구축한 후 schedule 이나 automations에서 등록
여기서 중요한 건 저 어노테이션이 붙은 한, Prefect 에서는 이것에 대해 관리를 해준다는 것임
- 모니터링, 상태 변경 같은 모든 것들
subflows and tasks
flow 는 process 같은 거라고 할 수 있음
이걸 보조하는 thread 같은 개념이 task 임
참고로 subflow 는 그냥 다른 flow를 flow 안에서 호출하는 것임
아니 근데 애초에 함수를 쓰면 다른 함수들을 정의,호출하면서 기능을 분리하는데 그런 개념으로 보면 됨
만약 한 Flow 에 모든 기능을 몰빵해놓으면 prefect 에서 관리하기가 어려워짐
retry, failure ⇒ 어디서 뭐가 터진지 모름, 재시도를 맨 처음부터 다 해야함
task 는 주로 다음의 특징을 갖는 작업들을 배치하면 된다
- cachable
- retryable
- transactional semantics
- (optional) concurrent
nested task 에는 주의가 필요한데 이건 구현해보기 전엔 약간 감이 안옴 따라서 생략
가능한 “함수” 들
정말 다양하다
- sync/async functions
- instance methods
- class methods
- static methods
- generators
일단은 가독성을 위해 동기 함수들을 반환하는 것부터 해보는게 좋겠다
final statue determination
flow, task는 언제나 state 를 갖는다.
flow의 실행이 완료되는 경우, state 가 어떻게 결정되는 것일까?
다음의 규칙을 따라 state 가 결정된다
- 함수 안에서 exception 이 raise 된 경우 ⇒ FAILED
- 함수 안에서 state 를 직접 반환하는 경우 ⇒ 그것을 사용
- iterable 한 status 들을 반환하면서 FAILED 가 포함된 경우 ⇒ FAILED
- 그 외에 에러 없이 return 한 경우(status 말고) ⇒ COMPLETED
future 를 반환하는 경우
from prefect import flow, task
@task
def always_fails_task():
raise ValueError("I fail successfully")
@task
def always_succeeds_task():
print("I'm fail safe!")
return "success"
@flow
def main_flow():
x = always_fails_task.submit().result(raise_on_failure=False)
y = always_succeeds_task.submit(wait_for=[x])
return y
if __name__ == "__main__":
main_flow()
main_flow의 최종 state 는 COMPLETED 이다. 왜냐하면?
x에서 raise_on_failure 를 무시해서 main_flow 까지 exception 이 전파되지 않고
always_succeededs_task (=future) 라는 항상 성공하는 futrure 를 반환하기 때문이다.
multiple states or futures
from prefect import task, flow
@task
def always_fails_task():
raise ValueError("I am bad task")
@task
def always_succeeds_task():
return "foo"
@flow
def always_succeeds_flow():
return "bar"
@flow
def main_flow():
x = always_fails_task()
y = always_succeeds_task()
z = always_succeeds_flow()
return x, y, z
이건 (x,y,z) 라는 future 모음을 반환하는데 이때 무조건 실패하는 future 가 포함되어 있으므로 FAILED
return a manual state
from prefect import task, flow
from prefect.states import Completed, Failed
@task
def always_fails_task():
raise ValueError("I fail successfully")
@task
def always_succeeds_task():
print("I'm fail safe!")
return "success"
@flow
def main_flow():
x = always_fails_task.submit()
y = always_succeeds_task.submit()
if y.result() == "success":
return Completed(message="I am happy with this result")
else:
return Failed(message="How did this happen!?")
여기서 main_flow 는 항상 COMPLETED . 왜냐하면
항상 result 로 “success”를 반환하는 future 의 실행 결과에 대한 if 문에서 직접 state 를 반환하므로
참고로 flow 의 return 을 저렇게 x, y 와 같이 직접 접근하면 언제나 future 이다.
뭔가 literal 한 값을 반환하는 경우에는 future.result() 로 접근해야 한다.
custom named states
from prefect import flow
from prefect.states import Completed
@flow
def my_flow(work_to_do: bool):
if not work_to_do:
return Completed(message="No work to do 💤", name="Skipped")
else:
return Completed(message="Work was done 💪")
manual 하게 상태를 반환하되, 거기에 name 을 명시하는게 가능하다 이정도
'Read the Docs' 카테고리의 다른 글
| Kafka partition : consumer group instance (0) | 2025.10.10 |
|---|---|
| Prefect 교양 지식(3) - Define Workflow, Task Runner, Design Considerations (0) | 2025.10.10 |
| Prefect 교양 지식(2) - Tasks (0) | 2025.10.03 |
| [Read the Docs?]: Three Dot (2) | 2022.01.19 |
| [Read the Docs]: MDN Javascript Objects (0) | 2022.01.19 |
- Total
- Today
- Yesterday
- jwt
- 우선순위큐
- django testcase
- endl을절대쓰지마
- 힙
- 위상정렬
- cipher suite
- vscode
- SSL
- kafka쓰고싶어요
- 불필요한 값 무시하기
- BOJ
- 그리디
- 프로그래머스
- Til
- 최대한 간략화하기
- requests
- Remote
- 삽질
- PREFECT
- 스택
- 파이썬
- 회고
- Python
- 백준
- docker-compose update
- 코딩테스트
- Javascript
- SQL
- 이것도모르면바보
| 일 | 월 | 화 | 수 | 목 | 금 | 토 |
|---|---|---|---|---|---|---|
| 1 | 2 | |||||
| 3 | 4 | 5 | 6 | 7 | 8 | 9 |
| 10 | 11 | 12 | 13 | 14 | 15 | 16 |
| 17 | 18 | 19 | 20 | 21 | 22 | 23 |
| 24 | 25 | 26 | 27 | 28 | 29 | 30 |
| 31 |