티스토리 뷰

Read the Docs

Prefect 교양 지식(1) - Flow

onaeonae1 2025. 9. 25. 15:37

 

회사에서 prefect 를 쓸 일이 생겼다. 공식 문서를 읽으면서 정리한 것이다.

 

Prefect 의 주요 개념으로는 다음이 존재한다.

  • Workflows
    • flows
    • tasks
  • Deployments
  • Configuration
  • Automations
  • Prefect Cloud 가 있다.

여기서 Flow 부분까지 정리해보자

 

 

1. Flow

from prefect import Flow

@flow(log_prints=True)
def explain_flows():
    print("run python code")
    ### magic stuff
    print("encapsulated")

Flow 는 Python Function 으로 정의되며, Prefect 에서 관리되는 작업의 기본적인 단위(unit) 임

기본적으로 python function 과 매우 유사하나, 다음의 추가적인 특징을 가진다

  1. 실행에 대한 metadata
    1. state 가 그 예시이며, 대부분 prefect 에서 자동으로 관리된다
  2. parameter 사용 가능(당연하게도)
    1. workflow 에서 작업 호출할 때 타입 검증 해줌(pydantic based)
    2. runtime context (현 실행에 대한 정보) global access 가능함
  3. 횟수 제한을 설정할 수 있는 실패 시 재시도 기능
  4. 작업 일체에 대한 타임아웃 설정 가능
  5. (Prefect Server) API를 통해 각각의 task 를 등록가능
  6. 각 Flow 는 서로를 호출할 수 있음(필수적이진 않음)

요약하자면, Prefect 에서 관리되는 함수 단위로 보면 된다.

flow 생명 주기

그럼 이것의 생명주기는 어떻게 될까?

아까 말한 metadata 에 state 가 존재하는데, 함수가 실행되면서 이것으로 관리한다

근데 저건 모든 인프라가 잘 동작할 때의 이야기이고 다음의 3가지 요소에 의한 interrupt 까지는 잡아줌

  1. manual/accidental 한 flow run의 취소
  2. flow 가 실행되는 host infrastructure 의 부재/오작동
  3. 기타 네트워크 이슈와 같은 것들

이 경우들에 대해서는 최대한 종료를 보장하려고 해줌. → 여기 해당 안되는 사유들은 ZOMBIE FLOW RUN

Flow 를 그래서 어케 실행시키는데?

일단 함수에 decorator 를 붙여준다면 다음과 같이 실행시킬 수 있다.

  1. 함수 호출
    1. 진짜 그냥 호출
    2. crontab, modal 같은 스케줄링 써서 함수 호출
  2. prefect server(혹은 prefect cloud) 에서 메뉴얼하게 호출
  3. prefect server 를 구축한 후 schedule 이나 automations에서 등록

여기서 중요한 건 저 어노테이션이 붙은 한, Prefect 에서는 이것에 대해 관리를 해준다는 것임

  • 모니터링, 상태 변경 같은 모든 것들

subflows and tasks

flow 는 process 같은 거라고 할 수 있음

이걸 보조하는 thread 같은 개념이 task 임

참고로 subflow 는 그냥 다른 flow를 flow 안에서 호출하는 것임

아니 근데 애초에 함수를 쓰면 다른 함수들을 정의,호출하면서 기능을 분리하는데 그런 개념으로 보면 됨

만약 한 Flow 에 모든 기능을 몰빵해놓으면 prefect 에서 관리하기가 어려워짐

retry, failure ⇒ 어디서 뭐가 터진지 모름, 재시도를 맨 처음부터 다 해야함

task 는 주로 다음의 특징을 갖는 작업들을 배치하면 된다

  1. cachable
  2. retryable
  3. transactional semantics
  4. (optional) concurrent

nested task 에는 주의가 필요한데 이건 구현해보기 전엔 약간 감이 안옴 따라서 생략

가능한 “함수” 들

정말 다양하다

  1. sync/async functions
  2. instance methods
  3. class methods
  4. static methods
  5. generators

일단은 가독성을 위해 동기 함수들을 반환하는 것부터 해보는게 좋겠다

final statue determination

flow, task는 언제나 state 를 갖는다.

flow의 실행이 완료되는 경우, state 가 어떻게 결정되는 것일까?

다음의 규칙을 따라 state 가 결정된다

  1. 함수 안에서 exception 이 raise 된 경우 ⇒ FAILED
  2. 함수 안에서 state 를 직접 반환하는 경우 ⇒ 그것을 사용
  3. iterable 한 status 들을 반환하면서 FAILED 가 포함된 경우 ⇒ FAILED
  4. 그 외에 에러 없이 return 한 경우(status 말고) ⇒ COMPLETED

future 를 반환하는 경우

from prefect import flow, task

@task
def always_fails_task():
    raise ValueError("I fail successfully")

@task
def always_succeeds_task():
    print("I'm fail safe!")
    return "success"

@flow
def main_flow():
    x = always_fails_task.submit().result(raise_on_failure=False)
    y = always_succeeds_task.submit(wait_for=[x])
    return y

if __name__ == "__main__":
    main_flow()

main_flow의 최종 state 는 COMPLETED 이다. 왜냐하면?

x에서 raise_on_failure 를 무시해서 main_flow 까지 exception 이 전파되지 않고

always_succeededs_task (=future) 라는 항상 성공하는 futrure 를 반환하기 때문이다.

multiple states or futures

from prefect import task, flow

@task
def always_fails_task():
    raise ValueError("I am bad task")

@task
def always_succeeds_task():
    return "foo"

@flow
def always_succeeds_flow():
    return "bar"

@flow
def main_flow():
    x = always_fails_task()
    y = always_succeeds_task()
    z = always_succeeds_flow()
    return x, y, z

이건 (x,y,z) 라는 future 모음을 반환하는데 이때 무조건 실패하는 future 가 포함되어 있으므로 FAILED

return a manual state

from prefect import task, flow
from prefect.states import Completed, Failed

@task
def always_fails_task():
    raise ValueError("I fail successfully")

@task
def always_succeeds_task():
    print("I'm fail safe!")
    return "success"

@flow
def main_flow():
    x = always_fails_task.submit()
    y = always_succeeds_task.submit()
    if y.result() == "success":
        return Completed(message="I am happy with this result")
    else:
        return Failed(message="How did this happen!?")

여기서 main_flow 는 항상 COMPLETED . 왜냐하면

항상 result 로 “success”를 반환하는 future 의 실행 결과에 대한 if 문에서 직접 state 를 반환하므로

참고로 flow 의 return 을 저렇게 x, y 와 같이 직접 접근하면 언제나 future 이다.

뭔가 literal 한 값을 반환하는 경우에는 future.result() 로 접근해야 한다.

custom named states

from prefect import flow
from prefect.states import Completed

@flow
def my_flow(work_to_do: bool):
    if not work_to_do:
        return Completed(message="No work to do 💤", name="Skipped")
    else:
        return Completed(message="Work was done 💪")

manual 하게 상태를 반환하되, 거기에 name 을 명시하는게 가능하다 이정도

공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2026/05   »
1 2
3 4 5 6 7 8 9
10 11 12 13 14 15 16
17 18 19 20 21 22 23
24 25 26 27 28 29 30
31
글 보관함