본문 바로가기
AI/MLOps

[Workflow] Airflow Workflow Management 실습

by merona99 2024. 11. 16.
반응형
Workflow

Airflow Workflow Management 실습

참고강의: 10개의 프로젝트로 한 번에 끝내는 MLOps 파이프라인 구현 초격차 패키지 Online.

 

workflow management의 종류 중 하나인 Airflow의 사용법을 알아보려고 한다.

 

 

[목차]

  1. 실습 소개
  2. 실습 환경 구축
  3. Airflow에서 Dag란?
  4. 실습

 

 


 

 

1. 실습 소개

Apache Airflow는 이전 포스팅에서 다루었던 것처럼 python으로 작성된 오픈 소스 워크플로우 관리 도구이다.

데이터 엔지니어링 및 머신러닝 파이프라인의 스케줄링과 관리에 유용하기에 Airflow를 사용해서 실습을 진행해보자.

 

 

Airflow에는 중요한 컴포넌트인 Dag가 있다.

Airflow를 사용해서 Dag를 생성하는 두 가지의 실습을 진행해보려고 한다.

  • Hello dag 실습
  • ML LifeCycle workflow를 management하는 dag 실습

 

 

2. 실습 환경 구축

실습 환경은 pip install을 통해 local에 설정하거나 Docker container를 사용해서 docker image로 실행하는 방법이 있다.

docker를 사용하면 독립적인 공간에서 수행이 가능하기 때문에 확장성과 유연성이 좋다.

일반적으로 docker를 사용해서 인프라 환경을 구축한다.

 

 

[실습 환경 구축]

  1. Docker image 생성을 진행
  2. Dockerfile을 정의
  3. docker build -t my_airflow_image .
  4. docker run --name my_airflow_image -d -p 8080:8080 my_airflow:latest

[실습 경로 설정]

실습 경로는 기존 경로에서 airflow_fastcampus 라는 폴더를 만든 후 이동해주었다.

해당 경로에 vim Dockerfile 명령어를 사용해서 Dockerfile을 생성하자.

 

 

[Dockerfile]

FROM python:3.8-slim

ENV AIRFLOW_HOME=/usr/local/airflow

RUN apt-get update && \
    apt-get install -y gcc libc-dev vim && \
    rm -rf /var/lib/apt/lists/*

RUN pip install apache-airflow


RUN mkdir -p $AIRFLOW_HOME
WORKDIR $AIRFLOW_HOME
RUN airflow db init

COPY my_dag.py $AIRFLOW_HOME/dags/

EXPOSE 8080

CMD airflow webserver -p 8080 & airflow scheduler

 

 

[Dag 파일 생성]

Dockerfile에 설정해준 대로 이어서 my_dag.py라는 Dag 파일을 생성해 볼 차례이다.

간단하게 os정보를 출력하는 파일을 만들었다.

import os

print(os)

 

 

[Docker Image 생성]

성공적으로 docker Image가 생성된 모습이다.

Dockerfile에 작성한 총 7개의 순번대로 코드가 실행되고 빌드됬음을 눈으로 확인 할 수 있다.

 

 

[생성된 Docker Image 확인]

docker images

my_airflow_fastcampus라는 이름의 image가 방금 생성됬음을 볼 수 있다.

 

 

[Docker 실행]

docker run --name my_airflow -d -p 8080:8080 my_airflow_fastcampus

8080포트로 my_airflow_fastcampus라는 docker container를 실행시켜 준다.

 

 

[로컬서버 접속]

Apach airflow

로컬주소로 들어가보면 아파치 서버가 성공적으로 떠있음을 확인할 수 있다.


이제 해당 서버의 계정을 생성해보도록 하자.

 

[docker 접속]

docker exec -it [CONTAINER ID] bash
  • 해당 container로 접속하는 명령어
  • bash : bash로 실행

container ID는 식별이 될 정도로 일부만 작성해도 무방하다.

 

 

[airflow 계정 생성]

airflow users create --username admin --firstname [NAME] --lastname [NAME]  --role Admin --email [EMAIL]

 

나의 경우 admin 권한으로 admin이라는 user를 생성하였다.

admin의 대소문자 구분을 유의하자.

원하는 비밀번호까지 두 차례 입력하면 계정 생성과정이 끝난다.

 

 

[airflow 로그인]

설정했던 id와 pw(admin / admin)를 입력하면 로그인이 되면서 위 화면이 나타난다.

 

홈 화면에 보이는 모든 목록들은 Airflow 예제들이다.

아무거나 하나를 클릭해서 들어가보자.

이렇게 해당 Dag의 상세정보를 확인할 수 있고,

 

해당 Dag의 flow를 확인할 수도 있다.

 

 

3. Airflow에서 Dag 란?

DAG의 정의

  • DAG (Directed Acyclic Graph)이며 Airflow에서 워크플로우를 정의하는 주요 구성 요소. DAG는 방향성을 가진 비순환 그래프이며, 여러 작업(Task)들과 이들 간의 의존성을 나타냄
  • 작업(Task): DAG 내에서 실행되는 개별 단위. ex) 데이터 다운로드, 변환/모델 학습 등을 작업
  • 의존성(Dependency): 한 작업이 다른 작업에 의존하는 관계를 의미 ex) 데이터 변환 작업을 데이터 다운로드 작업이 완료된 후에 실행되야 함
  • Python 스크립트로 정의

 

DAG의 구성 요소

  • Operator: Airflow에서 작업을 수행하는 객체. 다양한 유형의 Operator가 있으며, 각 특정 작업을 수행
  • Task: Operator의 인스턴스로 DAG 내에서 실행되는 개별 작업을 나타냄
  • Task Instance: 특정 시점에서 Task의 실행 인스턴스
  • Workflow: 전체적인 작업의 흐름을 나타내며, 하나 이상의 DAG로 구성

 

작업 순서 및 의존성 설정

  • 작업 순서와 의존성은 DAG 내에서 >> 또는 << 연산자를 사용하여 정의
# task1이 task2보다 먼저 실행되도록 설정
task1 >> task2

 

 

DAG 주의 사항

  • 의존성 순환: DAG에서는 순환 의존성을 가질 수 없음. 즉, 어떤 작업도 직접적이거나 간접적으로 자기 자신에게 의존할 수 없음
  • 스케줄링: 'start_date'와 'schedule_interval'을 적절히 설정하여 작업이 예상대로 실행되도록 해야함
  • 오류 처리: 각 Task는 실패 할 수 있으므로, 오류 처리 로직을 고려해야함

 

 

4. 실습

4-1) Hello Airflow DAG 실습

Hello Airflow DAG Docker Container

 

[실습순서]

1단계 : dag 작성

2단계 : dag 실행

3단계 : dag 실행 결과 확인

 

처음 실습으로 "hello airflow dag. I'm merona99! we can do it."과 같은 문장이 있을 때, 단어별로 순차적으로 출력하는 dag를 생성해보자.

task가 여러개 있고, 각각은 python operator로 구성되어있고 출력해주도록 만들어보자.

 

 

[경로]

airflow docker안으로 진입해서 dag라는 경로로 가주자.

이곳에 우리의 hello dag를 만들어 볼 것이다.

 

 

[hello DAG 생성]

vim hello_airflow_dag.py

 

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

# DAG 정의
default_args = {
    'owner': 'airflow',
    'depends_on_past': False, # 과거에 dependence가 있는지
    'start_date': datetime(2023, 11, 25),
    'email_on_failure': False, # email로 실패여부 보낸건지
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5), # 특정 workflow가 잘못됬을시 5분뒤 재실행
}

dag = DAG(
    'hello_airflow_dag',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1), # 하루 한번 스케줄링 실행 
)

# 출력 함수 정의
def print_word(word):
    print(word)

# 문장
sentence = "hello airflow dag. fast campus lecture! we can do it."

# 각 단어를 순차적으로 출력하는 Task 생성
prev_task = None
for i, word in enumerate(sentence.split()):
    task = PythonOperator(
        task_id=f'print_word_{i}',
        python_callable=print_word,
        op_kwargs={'word': word},
        dag=dag,
    )

    if prev_task:
        prev_task >> task

    prev_task = task

 

이후 sync를 한번 쳐준 후

sync

 

reserialize를 진행한다.

airflow dags reserialize

 

 

[airflow 서버에서 확인]

그러면 이렇게 airflow서버에서 내가 만들었던 hello_airflow_dag가 나타난다.

owner도 설정해줬던 airflow로 잘 나오고,

startday도 지정한 날로부터 1Day만큼 스케줄러가 돌도록 잘 설정된 것을 확인할 수 있다.

 

[graph]

총 10개의 python operator가 생성됬다.

아까 설정했던 문장이 총 10개의 단어로 쪼개지기 때문이다.

 

[DAG 실행]

화면에서 상단 우측에 있는 화살표 버튼을 클릭하면 된다.

 

[DAG 실행되는 화면]

 

 

상단 좌측을보면 실행되는 모습이 보인다.

 

[하나의 task 선택]

 

해당 task를 마우스로 클릭하면 된다.

 

[출력확인]

Logs탭을 눌러보면 첫 단어였던 hello가 잘 출력된 것이 보인다.

 

❗️주의할 점
특정 dag를 실행할 때, 지정했던 start_date부터 실행이 된다.
즉 위에 작성했던 코드는 하루에 한번 돌아가도록 되어있으므로 2023년11월25일부터 현재까지 해당되는 기간만큼 task가 실행된다는 소리다.

그래서 위 task를 나타내는 그래프 모양이 끝도없이 이어지는 것을 볼 수 있다.
이렇게 말이다ㅎㅎ;

기간을 잘 설정해서 실행하도록 하자.

 


4-2) ML Development 과정의 DAG 실습

이번에는 본격적으로 머신러닝을 활용한 dag 실습을 해 볼 차례이다.

 

[실습순서]

1단계 : dag 작성

2단계 : dag 실행

3단계 : dag 실행 결과 확인

 

feature engineering을 하고 이를 기반으로 traing을 진행해 볼 것이다.

training 방법은 random forest, gb를 기반으로 하고 둘 중에 성능이 좋은 모델을 선택하도록 구성해보자.

 

 

[경로]

경로는 기존의 docker 안에서 dags 폴더에서 시작한다.

 

 

[ml_development_dags.py]

vim ml_development_dags.py

 

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.metrics import accuracy_score
from airflow.models import Variable

default_args = {
    'owner': 'merona99',
    'depends_on_past': False,
    'start_date': datetime(2024, 11, 18),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'model_training_selection',
    default_args=default_args,
    description='A simple DAG for model training and selection',
    schedule_interval=timedelta(days=1),
)


def feature_engineering(**kwargs):
    from sklearn.datasets import load_iris
    import pandas as pd

    iris = load_iris()
    X = pd.DataFrame(iris.data, columns=iris.feature_names)
    y = pd.Series(iris.target)

    # 데이터 분할
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3)

    # XCom을 사용하여 데이터 저장
    ti = kwargs['ti']
    ti.xcom_push(key='X_train', value=X_train.to_json())
    ti.xcom_push(key='X_test', value=X_test.to_json())
    ti.xcom_push(key='y_train', value=y_train.to_json(orient='records'))
    ti.xcom_push(key='y_test', value=y_test.to_json(orient='records'))

def train_model(model_name, **kwargs):
    ti = kwargs['ti']
    X_train = pd.read_json(ti.xcom_pull(key='X_train', task_ids='feature_engineering'))
    X_test = pd.read_json(ti.xcom_pull(key='X_test', task_ids='feature_engineering'))
    y_train = pd.read_json(ti.xcom_pull(key='y_train', task_ids='feature_engineering'), typ='series')
    y_test = pd.read_json(ti.xcom_pull(key='y_test', task_ids='feature_engineering'), typ='series')

    if model_name == 'RandomForest':
        model = RandomForestClassifier()
    elif model_name == 'GradientBoosting':
        model = GradientBoostingClassifier()
    else:
        raise ValueError("Unsupported model: " + model_name)

    model.fit(X_train, y_train)
    predictions = model.predict(X_test)
    performance = accuracy_score(y_test, predictions)

    ti.xcom_push(key=f'performance_{model_name}', value=performance)

def select_best_model(**kwargs):
    ti = kwargs['ti']
    rf_performance = ti.xcom_pull(key='performance_RandomForest', task_ids='train_rf')
    gb_performance = ti.xcom_pull(key='performance_GradientBoosting', task_ids='train_gb')

    best_model = 'RandomForest' if rf_performance > gb_performance else 'GradientBoosting'
    print(f"Best model is {best_model} with performance {max(rf_performance, gb_performance)}")

    return best_model





with dag:
    t1 = PythonOperator(
        task_id='feature_engineering',
        python_callable=feature_engineering,
    )

    t2 = PythonOperator(
        task_id='train_rf',
        python_callable=train_model,
        op_kwargs={'model_name': 'RandomForest'},
        provide_context=True,
    )

    t3 = PythonOperator(
        task_id='train_gb',
        python_callable=train_model,
        op_kwargs={'model_name': 'GradientBoosting'},
        provide_context=True,
    )

    t4 = PythonOperator(
        task_id='select_best_model',
        python_callable=select_best_model,
        provide_context=True,
    )

    t1 >> [t2, t3] >> t4

이렇게 코드를 작성해보았다.

 

dag를 만들기에 앞서서 다시 bash창에서 python코드 안에서 import했던 모듈을 pip명령어로 설치해주는 작업이 필요하다.

pip install pandas 
pip install scikit-learn

 

이후

sync

한번 입력해준 후 

airflow dags reserialize

reserialize를 진행해준다.

 

 

[airflow 서버]

성공적으로 dag가 생성되었다.

 

 

[Graph]

 

 

[dag 실행]

실행버튼을 눌러 실행해보자.

success가 잘 떨어졌다.

어제 날짜부터 시작하도록 했으니 총 두 번의 dag가 실행됬음을 옆의 막대그래프에서 쉽게 볼 수 있다.

 

 

[Logs]

select_best_model이라는 task의 log를 확인해보면

이렇게 두 모델 중 좋은 성능의 수치가 출력되는 것을 확인할 수 있다.

 

select_best_model의 코드내용은 아래와 같다.

 

이렇게해서 이번에 배운 dags를 적용해서 다수의 모델을 원할 때 간편하게 돌릴 수 있는 airflow를 맛보기로 살펴보았다.

속성으로 필요한 내용만 빠르게 배운 느낌이다.

추가적으로 상세한 내용은 구글링하면서 알아가면 될 것 같고, 기본적인 지식은 강의에서 다룬 것 같기에 프로젝트에 접목하기엔 무리는 없을 것 같다.

 

 

 

 

 


 

 

 

// 이전에 ai 프로젝트를 여러개 진행했을 때 모델의 성능을 위해서 계속 모델을 갈아끼우곤 했었다.

크롤링하고, 전처리하고, 모델 여러개 돌려서 비교하고..튜닝하고..

그래서 프로젝트 후기에 (무한 노가다...)라는 웃픈 코멘트를 적어놓았던 적이 있다ㅋㅋ

 

그 과정들을 자동화로 싹다 돌려놓으면 속이 시원할 것 같다..^^

 

반응형

'AI > MLOps' 카테고리의 다른 글

[Jenkins] CI/CD 실습  (0) 2024.11.22
[Jenkins] CI/CD 이론  (0) 2024.11.21
[Workflow] Workflow Management 이론  (0) 2024.11.15
[쿠버네티스] Orchestrator 실습  (0) 2024.11.14
[쿠버네티스] Orchestarator 이론  (0) 2024.11.11

댓글