카테고리 없음

[GCP-10] Cloud Composer

솜정 2022. 1. 20. 09:00

0. 지난 글

[GCP-1] 구글 클라우드 기본 개념

https://ejtag.tistory.com/23

[GCP-2] 구글 클라우드 실습 준비

https://ejtag.tistory.com/24

[GCP-3] 구글 클라우드 IAM

https://ejtag.tistory.com/25

[GCP-4] Compute Engine

https://ejtag.tistory.com/26

[GCP-5] VPC

https://ejtag.tistory.com/27

[GCP-6] Cloud Load Balancing

https://ejtag.tistory.com/29

[GCP-7] Google Cloud Storage

https://ejtag.tistory.com/30

[GCP-8] Cloud SQL

https://ejtag.tistory.com/31

[GCP-9] BigQuery

https://ejtag.tistory.com/33

 

11. Cloud Composer

cloud composer는 파이프라인을 작성하여 예약 및 모니터링 할 수 있는 통합 워크플로 관리 서비스이다.

cloud composer는 Apache airflow를 기반으로 한 워크플로 통합서비스이다.

구글 쿠버네티스 엔진을 기반으로 한 배포 환경을 가지고 있으며, cloud SQL을 이용하여 메타 데디터를 저장하고, 앱 엔진을 활용하여 에어플로 웹 서버를 호스팅하며, 로그 관리는 stackdriver를 이용한다.

cloud omposer는 python을 기바으로 DAG와 Task에 대한 코드를 작성할 수 있으며, 분산 환경 및 웹 UI 기반의 강력한 모니터링 기능을 제공해 간편하게 모니터링을 할 수 있다. 또한 멀티 클라우드를 지원해 다른 클라우드 서비스와 온프레미스와 교차하는 워크플로를 작성할 수 있다.

cloud composer 내부 아키택처 (참고 : https://www.bespinglobal.com/cloud-composer/)

11.2 Apache Airflow

Apache Airflow는 에어비엔비에서 개발된 워크플로 통합 도구로 현재는 아파치 재단에서 인큐베이팅하고 있는 프로젝트이다.

장점 : python을 기보능로 태스크에 대한 코드 작성 가능하다. 웹 기반 UI를 제공한다.

 

11.3 Cloud Composer 주요 개념

11.3.1 DAG이란?

Cloud Composer 워크플로는 DAG이라는 비순환 그래프로 표현된다. 하나의 DAG가 하나의 워크플로이다.

워크플로 안에 Operator를 이용해 태스크를 만들어 담을 수 있다.

 

11.3.2 Operator와 Task

Operator는 DAG 안에 정의되는 작업 함수이며, 이를 이용해 Task를 만든다.

오퍼레이터 종류

- Bash Operator : Bash 내 쉘 명령어를 Task로 사용할 수 있게 해주는 오퍼레이터

- Python Operator : Python으로 만든 함수를 Task로 사용할 수 있게 해주는 오퍼레이터

참고 : http://ww38.airflow.apach.org/howto/operator.html?highlight=operator 

 

apach.org

 

ww38.airflow.apach.org

 

11.3.3 데이터 저장

cloud composer는 생성 시 자동으로 Google Cloud Storage 내에 버킷을 만들고, 클라우드 스토리지 FUSE를 사용하여 에어플로 인스턴스와 GCS 버킷을 서로 매핑한다.

 

[실습 11.1 - Cloud Composer에 워크플로 만들어서 올리기]

메뉴 > Composer 에 접근하여 CREATE ENVIRONMENT에서 composer1을 클릭한다.
composer가 만들어진다.
composer를 만드니 kubernetes 클러스터가 저절로 생성된다.
composer를 만드니vm 인스턴스도 저절로 3개 생성된다.

python 파일을 넣어보는 작업을 해보자.

DAG 폴더를 클릭해보자.
버킷이 하나 생성된다.

hello_world.py 파일을 아래와 같이 하나 만들어 버킷에 업로드 한다.

============================================================================

from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
  
def print_hello():
    return 'Hello world!'

dag = DAG('hello_world', description='Simple tutorial DAG',
          schedule_interval='0 12 * * *',
          start_date=datetime(2017, 3, 20), catchup=False)

dummy_operator = DummyOperator(task_id='dummy_task', retries=3, dag=dag)

hello_operator = PythonOperator(task_id='hello_task', python_callable=print_hello, dag=dag)

dummy_operator >> hello_operator

============================================================================

 

다시 컴포저로 돌아와서 airflow를 누른다.

hello_world 댁이 업로드 되었다.
작업이 실행되었다.

참고 : https://aldente0630.github.io/data-engineering/2018/06/17/developing-workflows-with-apache-airflow.html

 

[실습 11.2 - python 패키지 설치하기]

composer > 세부정보 > PYPI PACKAGES 에서 수정버튼 클릭한다.

위와 같이 추가하고 싶은 패키지 이름을 추가하면 된다.

 

composer에서 사용할 패키지 찾는방법을 알아보자

composer는 Py-PI 기반의 패키지 설치가 가능하기 대문에 https://pypi.org에서 서 찾고자 하는 패키지가 있는지 확인하자

위에 노란색 음영이 된 부분 이름으로 composer에서 설치 가능하다.