<?xml version="1.0" encoding="utf-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom">
    <channel>
        <title>data-graphy</title>
        <link>https://velog.io/</link>
        <description>키보드 한 자루로 시작하는 데이터 엔지니어링 삽질기</description>
        <lastBuildDate>Mon, 24 Jun 2024 13:16:49 GMT</lastBuildDate>
        <docs>https://validator.w3.org/feed/docs/rss2.html</docs>
        <generator>https://github.com/jpmonette/feed</generator>
        <image>
            <title>data-graphy</title>
            <url>https://images.velog.io/images/graphy-young/profile/f01b497c-3cd8-446b-9fcb-85a80db2eebd/social.jpeg</url>
            <link>https://velog.io/</link>
        </image>
        <copyright>Copyright (C) 2019. data-graphy. All rights reserved.</copyright>
        <atom:link href="https://v2.velog.io/rss/graphy-young" rel="self" type="application/rss+xml"/>
        <item>
            <title><![CDATA[Apache Airflow Custom Operator Template]]></title>
            <link>https://velog.io/@graphy-young/Apache-Airflow-Custom-Operator-Template</link>
            <guid>https://velog.io/@graphy-young/Apache-Airflow-Custom-Operator-Template</guid>
            <pubDate>Mon, 24 Jun 2024 13:16:49 GMT</pubDate>
            <description><![CDATA[<h1 id="apache-airflow-custom-operator">Apache Airflow Custom Operator</h1>
<h2 id="01-apache-airflow의-operator란-무엇인가">01. Apache Airflow의 Operator란 무엇인가?</h2>
<hr>
<ul>
<li>Apache Airflow의 <code>DAG</code> 내 구성요소, 사전 정의된 Task의 템플릿 (객체지향 프로그래밍에서의 Class와 객체의 관계와 비슷합니다)</li>
<li>DAG 내에서 선언적으로 정의할 수 있으며, 파이프라인에서 데이터가 어떻게 처리되는지에 대한 로직을 포함하고 있습니다.</li>
<li>기본적으로 제공되는 Operator 외에 <a href="https://airflow.apache.org/docs/apache-airflow-providers/">Provider Package</a>를 통해 확장할 수 있으며, 필요에 따라서 <a href="https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator"><code>airflow.models.baseoperator.BaseOperator</code></a> 또는 특정 Operator를 상속받아 Custom Operator를 작성할 수 있습니다.</li>
<li>Custom Operator는 상속 후 <code>execute(self, context)</code> 함수 내에 동작할 코드를 작성하며, <code>on_kill(self)</code>과 같은 사전 정의된 Method를 Override하여 추가적인 동작을 설정할 수 있습니다.</li>
</ul>
<blockquote>
<p><strong>[주의사항]</strong> Custom Operator 작성 시, <code>__init__()</code> 함수에 과도한 연산을 발생시킬 경우 해당 Scheduler에서 해당 Operator를 사용한 Task마다 연산이 발생할 수 있기 때문에 서버에 부담이 발생할 수 있으므로 가급적 가볍게 작성할 것</p>
</blockquote>
<h2 id="02-custom-operator-template">02. Custom Operator Template</h2>
<hr>
<pre><code class="language-python">from typing import *

from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults # Don&#39;t need to use this decorator in recent versions due to its deprecation
from airflow.utils.context import Context


class MyCustomOperator(BaseOperator):


    # To use Jinja template, consider the field names present in the template_fields attribute.
    template_fields: Sequence[str] = (&quot;param_name&quot;, )
    template_ext: Optional[str] = &quot;.sql&quot; # If the parameter contains file name, set the file extension here.
    template_fields_renderers: Optional[dict] = {
        &quot;param_name&quot;: &quot;py&quot;,
        &quot;another_param&quot;: &quot;json&quot;
    } # This defines in what style the value from template field renders in Web UI

    @apply_defaults
    def __init__(
            self,
            param : str,
            *args, **kwargs
    ) -&gt; None

        &quot;&quot;&quot;
        Description

        :param param: parameter description
        :returns: None
        :raises AssertionError: if param is not an str
        &quot;&quot;&quot;

        super().__init__(*args, **kwargs)

        self.param = param

        assert isinstance(self.param, str), f&quot;param is not str, it is {type(self.param).__name__}.&quot;

    # This function is called ahead of execute() function.
    def pre_execute(self, context: Context) -&gt; None:
        # Statements here
        super().pre_execute() # Optional

    # Defines the operator&#39;s actual work.
    def execute(self, context: Context) -&gt; None:
        # Statements here
        super().execute(context) # If you want to execute the parent class&#39;s execute method, call it explicitly.

    # This function is called when the task instance is killed.
    def on_kill(self) -&gt; None:
        # Statements here
        super().on_kill() # Optional</code></pre>
<h3 id="설명">설명</h3>
<h4 id="class">Class</h4>
<ul>
<li><code>airflow.models.baseoperator.BaseOperator</code><ul>
<li>모든 Airflow의 Operator의 기초 모델링이 포함된 클래스</li>
<li>일반적으로 Airflow Operator를 개발할 때 해당 클래스를 상속받아 작성</li>
<li>이 클래스는 작업의 예약, 실행, 상태 업데이트 등의 기본적인 동작을 정의하고 있음</li>
</ul>
</li>
<li><code>airflow.utils.decorators.apply_defaults</code><ul>
<li>Airflow 1.X 버전에서 Operator 작성 시, 클래스 초기화 메소드(<strong>init</strong>)의 기본 매개변수 값을 설정하고 상속된 클래스의 기본값을 포함한 인수들을 자동으로 초기화하는데 사용되었으며, 현재는 사용하지 않아도 상관없음</li>
<li>현재는 사용 중단(deprecated)되었으며, 대신 명시적인 매개변수 초기화를 사용하는 것이 권장됨</li>
</ul>
</li>
</ul>
<h4 id="class-variable">Class variable</h4>
<ul>
<li><code>template fields</code>: <ul>
<li>Airflow에서 지원하는 <code>Jinja2</code> 템플릿을 사용하려면 해당 클래스 변수에 변수명을 지정해야 하며, 지정된 변수명에 해당하는 매개변수에서 템플릿이 렌더링 됨</li>
<li>이를 통해 실행 시점에 동적으로 값을 설정할 수 있음</li>
</ul>
</li>
<li><code>template_ext</code><ul>
<li>템플릿 파일의 확장자를 지정할 수 있음</li>
<li>예시: [&#39;.sql&#39;, &#39;.hql&#39;]와 같이 설정하면 SQL 및 HiveQL 템플릿 파일을 사용할 수 있음</li>
</ul>
</li>
<li><code>template_fields_renderers</code>: <ul>
<li>특정 템플릿 필드에 대한 렌더링 방식을 지정</li>
<li>예시: <code>{&#39;field_name&#39;: &#39;json&#39;}</code>와 같이 설정하면 해당 필드가 JSON 형식으로 렌더링됨</li>
</ul>
</li>
</ul>
<h4 id="method">Method</h4>
<ul>
<li><code>pre_execute(self, context: Context)</code>: <ul>
<li>Operator가 실행되기 전에 호출되는 메소드로, 주로 실행 전 준비 작업을 수행</li>
<li>작업 실행 전후의 작업 흐름을 조절할 때 유용함</li>
</ul>
</li>
<li><code>execute(self, context: Context)</code><ul>
<li>Operator가 동작할 때 실행되는 실제 메소드로, 작업의 주요 로직이 구현됨</li>
<li>이 메소드 내에서 작업이 수행되고, 작업 결과가 반환</li>
</ul>
</li>
<li><code>on_kill(self)</code><ul>
<li>작업이 강제 종료될 때 호출되는 메소드로종료 작업이나 클린업 작업을 수행</li>
<li>작업이 중단될 때 필요한 정리 작업을 정의하는 데 사용됨</li>
</ul>
</li>
</ul>
<blockquote>
<p>그 외 부가적인 속성 및 메소드는 <a href="https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator">공식 문서</a> 참고</p>
</blockquote>
<h2 id="참고자료">참고자료</h2>
<hr>
<ul>
<li><a href="https://airflow.apache.org/docs/apache-airflow/stable/howto/custom-operator.html">Apache Airflow Documentation - Creating a custom Operator</a></li>
<li><a href="https://www.astronomer.io/docs/learn/airflow-importing-custom-hooks-operators">Astronomer Documenation - Custom hooks and operators</a></li>
</ul>
]]></description>
        </item>
        <item>
            <title><![CDATA[Apache Airflow의 DAG별 접근 제어 구현하기]]></title>
            <link>https://velog.io/@graphy-young/Apache-Airflow%EC%9D%98-DAG%EB%B3%84-%EC%A0%91%EA%B7%BC-%EC%A0%9C%EC%96%B4-%EB%B0%A9%EB%B2%95</link>
            <guid>https://velog.io/@graphy-young/Apache-Airflow%EC%9D%98-DAG%EB%B3%84-%EC%A0%91%EA%B7%BC-%EC%A0%9C%EC%96%B4-%EB%B0%A9%EB%B2%95</guid>
            <pubDate>Mon, 27 May 2024 14:53:03 GMT</pubDate>
            <description><![CDATA[<h2 id="01-개요">01. 개요</h2>
<p>만약 하나의 Apache Airflow에서 다양한 조직의 사용자가 존재하며, DAG의 종류 또는 개별 접근 제어(Access Control)를 구현해야한다면 어떻게 해야할까?
Airflow는 Resource 및 DAG-level 기반의 권한을 Role을 통해 권한을 정의한 후, 이를 사용자 계정에 할당함으로써 사용자별 접근 제어를 구현할 수 있다.</p>
<ol>
<li><p>Resource-base permission (2.0+)
 자원 기반 권한은 Airflow 내 다음 객체를 대상으로 지정할 수 있다.</p>
<ul>
<li><p><code>Dag</code></p>
</li>
<li><p><code>DagRun</code></p>
</li>
<li><p><code>Task</code></p>
</li>
<li><p><code>Connection</code></p>
</li>
<li><p>그 외 특정 Menu, Profile 등</p>
<p>또한 상기 자원에 대해 CRUD Action을 제어할 수 있다.</p>
</li>
<li><p><code>can_create</code></p>
</li>
<li><p><code>can_read</code></p>
</li>
<li><p><code>can_edit</code></p>
</li>
<li><p><code>can_delete</code></p>
</li>
</ul>
</li>
<li><p>DAG-level permission
 DAG 단계에서의 권한은 Role 내에서 전체 DAG(<code>DAGs.[action]</code>) 또는 특정 DAG(<code>DAG:[dag_id].[action]</code>)을 지정하여 설정할 수 있으며, DAG를 정의할 때 <code>airflow.DAG</code> 객체 내 <code>access_control</code> 속성을 통해 Role별로 Action을 지정할 수 있다.</p>
<pre><code class="language-python"> from airflow import DAG
 from airflow.utils.dates import days_ago

 DAG(dag_id=&quot;permission_example_dag&quot;,
     start_date=days_ago(1),
     access_control={
         &quot;Viewer&quot;: {&quot;can_edit&quot;, &quot;can_read&quot;, &quot;can_delete&quot;}, 
         # access_control 속성은 {”role_name”: {”action_1”, “action_2”}, …}와 같은 형식의 dict형으로 입력
     },
 )</code></pre>
<h2 id="02-실험">02. 실험</h2>
</li>
<li><p>요구사항</p>
<ol>
<li><strong>Apache Airflow 2.0</strong> 이상 버전의 실험 환경</li>
</ol>
</li>
</ol>
<ol>
<li><p>실험에 사용할 Airflow 사용자 계정 및 Role 생성</p>
<ul>
<li>테스트에 사용할 Role(<code>taylor_role</code>) 생성 및 Airflow Webserver에 로그인할 수 있도록 Website에 대한 읽기 권한 부여<pre><code class="language-bash">airflow roles create taylor_role 
airflow roles add-perms taylor_role -r Website -a can_read</code></pre>
</li>
<li>테스트에 사용할 2가지 사용자 계정 생성 - <code>taylor</code> (user), <code>trey</code> (admin)<pre><code class="language-bash">airflow users create --username taylor --password taylor --role taylor_role --email taylor@example.com --firstname taylor --lastname kim</code></pre>
<pre><code class="language-bash">airflow users create --username trey --password trey --role Admin --email trey@example.com --firstname trey --lastname yi</code></pre>
</li>
</ul>
</li>
<li><p>실험에 사용할 DAG를 Airflow의 DAG 경로(일반적으로 <code>$AIRFLOW_HOME/dags</code>)에 생성</p>
<pre><code class="language-python"> from airflow import DAG
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.utils.dates import days_ago

 with DAG(
     dag_id=&#39;test_role_dag&#39;,
     tags=[&#39;test&#39;],
     start_date=days_ago(1),
     schedule_interval=None,
     default_args={
         &#39;owner&#39;: &#39;airflow&#39;,
         &#39;retries&#39;: 0
     },
     catchup=False,
     is_paused_upon_creation=False
 ) as dag:

     task_for_user1 = DummyOperator(task_id=&#39;task&#39;)</code></pre>
</li>
</ol>
<ol start="3">
<li><p><code>taylor</code>로 로그인하여 생성한 예시 DAG(<code>test_role_dag</code>)가 Airflow Webserver에서 보이는지 확인
 <img src="https://velog.velcdn.com/images/graphy-young/post/7cf9ef05-d236-43a1-ba02-a6ed560aca6f/image.png" alt=""></p>
<blockquote>
<p><code>taylor</code> 계정에는 어떠한 읽기 권한이 없기 때문에 해당 계정에서는 아무런 DAG도 보이지 않는다.</p>
</blockquote>
</li>
<li><p><code>taylor</code>와 비교하기 위해 <code>Admin</code> role을 가진 <code>trey</code>로 로그인
<img src="https://velog.velcdn.com/images/graphy-young/post/50714390-96c5-461e-a15d-c4bbe391e7be/image.png" alt=""></p>
<blockquote>
<p><code>trey</code> 계정은 Apache Airflow에서 기본 제공되는 <code>Admin</code> Role의 권한 내 DAGs(DAG 전체)에 대한 <code>can_read</code> 권한을 가지고 있으므로 별도의 설정 없이 모든 DAG를 확인할 수 있음.</p>
</blockquote>
</li>
<li><p><code>taylor</code>에 특정(또는 모든) DAG(<code>DAG(s)</code>)에 대한 읽기 권한(<code>can_read</code>)을 부여</p>
<pre><code class="language-bash"> airflow roles add-perms taylor_role -r DAGs -a can_read

 # or

 airflow roles add-perms taylor_role -r DAG:test_role_dag -a can_read</code></pre>
<p> <img src="https://velog.velcdn.com/images/graphy-young/post/f5c3c6cc-955a-46c7-981d-0cf879d962a9/image.png" alt=""></p>
</li>
<li><p><code>test_role_dag</code> DAG 내 <code>access_control</code> 속성에 권한을 지정해보기</p>
<blockquote>
<p>(글 순서대로 진행했을 경우) <code>taylor_role</code>에 <code>DAG:test_role_dag</code> 또는 DAGs에 대한 <code>can_read</code> 권한이 있을 경우 제거 필요</p>
</blockquote>
<pre><code class="language-python"> from airflow import DAG
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.utils.dates import days_ago

 with DAG(
     dag_id=&#39;test_role_dag&#39;,
     tags=[&#39;test&#39;],
     start_date=days_ago(1),
     schedule_interval=None,
     default_args={
         &#39;owner&#39;: &#39;airflow&#39;,
         &#39;retries&#39;: 0
     },
     catchup=False,
     is_paused_upon_creation=False
 ) as dag:

     task_for_user1 = DummyOperator(task_id=&#39;task&#39;)</code></pre>
<p> <img src="https://velog.velcdn.com/images/graphy-young/post/94c208e4-2d8f-44bd-b7dc-c2a4552bcf72/image.png" alt=""></p>
<blockquote>
<p>Role에 지정된 권한이 없어도 DAG에서 지정된 권한을 통해 <code>taylor</code> 계정에서 정상적으로 표시됨.</p>
</blockquote>
</li>
<li><p>테스트에 사용한 계정 및 Role 정리</p>
<pre><code class="language-bash"> airflow users delete --username trey
 airflow users delete --username taylor
 airflow roles delete taylor_role</code></pre>
</li>
</ol>
<h3 id="03-결론">03. 결론</h3>
<ul>
<li>Apache Airflow 2.X 버전에서는 DAG 객체의 owner argument를 통한 접근 권한 관리가 불가능하다.</li>
<li>사용자 계정에 포함되는 Role에 DAG 전체 또는 개별 DAG에 대한 권한 명시를 통해 ACL 가능, 그러나 변경이나 추가 발생시 수행해야하는 CLI 또는 SQL을 자동화하는 것은 복잡할 수 있다.</li>
<li>Role에 각 DAG에 대한 개별적 관리를 정의하는것보다 DAG 객체에 <code>access_control</code> 속성에 Role에 대한 권한을 명시하고 관리하는 것이 관리적인 측면에서 효율적일 것이다.</li>
</ul>
<h3 id="99-appendix">99. Appendix</h3>
<h4 id="airflow에-okta를-연동하여-ssosingle-sign-on-구축하기">Airflow에 Okta를 연동하여 SSO(Single Sign-On) 구축하기</h4>
<blockquote>
<p><a href="https://tech.scribd.com/blog/2021/integrating-airflow-and-okta.html">Integrating Airflow with Okta</a></p>
</blockquote>
<ul>
<li>Apache Airflow는 공식적으로 Okta integration을 지원하지 않지만, 아래 링크와 같은 사례가 있음</li>
<li>Okta API Integration은 별도 추가 비용이 발생하지 않으므로(사용자 계정 당 비용 발생) 외부 팀 사용자가 늘어날 경우 Okta 연동과 본 글에서 소개한 ACL을 통해 접근 제어 구현을 고려해볼 수 있음</li>
</ul>
<h3 id="참고자료">참고자료</h3>
<ol>
<li>Apache Airflow Documentation<ul>
<li><a href="https://airflow.apache.org/docs/apache-airflow-providers-fab/stable/auth-manager/access-control.html#access-control">Access Control</a></li>
<li><a href="https://airflow.apache.org/docs/apache-airflow/1.10.13/security.html#rbac-ui-security">Security &gt; RBAC UI Security</a></li>
</ul>
</li>
</ol>
]]></description>
        </item>
        <item>
            <title><![CDATA[Docker로 Python Django 개발하기]]></title>
            <link>https://velog.io/@graphy-young/Docker%EB%A1%9C-Python-Django-%EA%B0%9C%EB%B0%9C%ED%95%98%EA%B8%B0</link>
            <guid>https://velog.io/@graphy-young/Docker%EB%A1%9C-Python-Django-%EA%B0%9C%EB%B0%9C%ED%95%98%EA%B8%B0</guid>
            <pubDate>Fri, 10 Nov 2023 17:41:05 GMT</pubDate>
            <description><![CDATA[<h1 id="developing-python-django-with-docker">Developing Python Django with Docker</h1>
<p><img src="https://velog.velcdn.com/images/graphy-young/post/f13be46d-d59c-4eb2-9ea8-2b7f37145a31/image.png" alt="Django"></p>
<h2 id="개요">개요</h2>
<hr>
<p>2023년을 기준으로 Python에서 웹 개발을 위한 프레임워크를 생각해보면 Django와 Flask가 아직까지도 양대산맥인 것 같다.
물론 타 언어의 여러 프레임워크와 비교했을때 Python이 주류가 될 만큼 인기 있는 프로젝트들은 아니지만 큰 규모가 아니라면 충분히 걱정 안하고 써도 될만큼 훌륭한 수단이라고 생각한다.</p>
<p>최근 어느날 Docker로 Django를 개발해보고자 별 생각없이 <a href="https://hub.docker.com/">Docker Hub</a>를 찾았으나 공식 이미지가 2016년부터 Deprecated 되어있었다.</p>
<p><img src="https://velog.velcdn.com/images/graphy-young/post/4388b324-1b3e-42ff-9137-9031ca300376/image.png" alt=""></p>
<p>만약 Docker를 통해 Django를 개발하고자 한다면 기본 Python 이미지에 Django를 설치하여 사용하라고 안내가 되어있었으며, 본문에 친절하게 코드가 제시되어있다.</p>
<p>공식 이미지가 있다면 더 좋았겠으나, ARM64 아키텍쳐가 지원되지 않기 때문에 Python 기본 이미지를 쓰면 CPU 아키텍쳐 문제에서도 어느정도 자유로울 수 있기에 Python 이미지를 베이스로 Dockerfile을 통해 Django 이미지를 빌드하면 M 시리즈의 프로세서를 사용하는 macOS에서도 개발이 가능했다.</p>
<h3 id="dockerfile">Dockerfile</h3>
<pre><code class="language-dockerfile"># Use the official Python 3.8 image as the base image
FROM python:3.8

# Set the default value for the &#39;app_name&#39; argument
ARG app_name=django
ENV APP_NAME $app_name  

# Set the PYTHONUNBUFFERED environment variable to ensure Python prints directly to terminal
ENV PYTHONUNBUFFERED 1

# Set the working directory inside the container to &#39;/django&#39;
WORKDIR /django

# Install Django using pip
RUN pip install Django

# RUN apt-get update &amp;&amp; apt-get install -y mariadb-client

# Start a new Django project named as specified in the &#39;app_name&#39; argument
RUN django-admin startproject $app_name

# Update the &#39;ALLOWED_HOSTS&#39; setting in the Django project&#39;s settings.py file
RUN sed -i &quot;s/ALLOWED_HOSTS = \[\]/ALLOWED_HOSTS = \[&#39;*&#39;\]/g&quot; ./${APP_NAME}/${APP_NAME}/settings.py

# Expose port 8000 to allow external access
EXPOSE 8000

# Specify the command to run the Django development server when the container starts
CMD [&quot;sh&quot;, &quot;-c&quot;, &quot;python ./${APP_NAME}/manage.py runserver 0.0.0.0:8000&quot;]</code></pre>
<ul>
<li><code>app_name</code> 변수는 자유롭게 변경해도 되며, 해당 코드는 별도의 DBMS를 사용하지 않는 기본 내장 <code>sqlite</code>를 사용하여 Django 서버를 사용한다.</li>
<li>만약 mariaDB 또는 PostgreSQL을 사용하고자 하면 주석처리된 mariadb-client 설치 코드를 주석 해제하여 사용하며, 별도의 컨테이너 또는 서버에 연결하는 작업이 추가로 필요하다</li>
<li>개발용 서버이기에 <code>settings.py</code>에 <code>ALLOWED_HOST</code>를 별도로 제한하지 않았으며, 확장하여 개발 후 실제로 사용하고자 한다면 해당 설정은 반드시 변경해야한다.</li>
</ul>
]]></description>
        </item>
        <item>
            <title><![CDATA[Apache Airflow의 동시성 설정 이해하기]]></title>
            <link>https://velog.io/@graphy-young/Apache-Airflow%EC%9D%98-%EB%8F%99%EC%8B%9C%EC%84%B1-%EC%84%A4%EC%A0%95-%EC%9D%B4%ED%95%B4%ED%95%98%EA%B8%B0-%EC%9D%B4%EB%A1%A0%ED%8E%B8</link>
            <guid>https://velog.io/@graphy-young/Apache-Airflow%EC%9D%98-%EB%8F%99%EC%8B%9C%EC%84%B1-%EC%84%A4%EC%A0%95-%EC%9D%B4%ED%95%B4%ED%95%98%EA%B8%B0-%EC%9D%B4%EB%A1%A0%ED%8E%B8</guid>
            <pubDate>Sun, 05 Feb 2023 15:17:23 GMT</pubDate>
            <description><![CDATA[<h2 id="0-서론">0. 서론</h2>
<hr>
<p>모던 데이터 엔지니어링에서 필수적인 기술 스택 중 하나는 바로 Workflow Orchestration이다. <a href="https://hadoop.apache.org/">Apache Hadoop</a> <a href="https://www.geeksforgeeks.org/hadoop-ecosystem/">Ecosystem</a>의 <a href="https://oozie.apache.org/">Oozie</a>부터 AWS의 <a href="https://aws.amazon.com/ko/step-functions/">Step Functions</a>, GCP의 <a href="https://cloud.google.com/composer">Cloud Composer</a>와 같은 클라우드 네이티브 서비스도 한 축을 차지하고 있으며, 더욱 더 모던한 기능들을 갖춘 <a href="https://www.prefect.io/">Prefect</a> 등 다양한 Workflow 플랫폼이나 어플리케이션이 시장에 퍼져있다. <a href="https://airflow.apache.org/">Apache Airflow</a>는 최근 가장 많이 쓰이는 워크플로우 플랫폼으로, 오픈소스 재단인 Apache Foundation의 프로젝트로 공개되어 있다.
필자의 팀 역시 Apache Airflow로 ETL과 머신러닝 자동화가 구성되어 있는데, 어느날부터 평화로웠던(?) Airflow가 새로운 프로젝트들로 인해 수집, 프로파일과 모델 파이프라인들이 쏟아져나오며 비명을 지르기 시작했다. 갑작스럽게 병렬 처리되는 Task가 급격하게 증가하기 시작하여 스케쥴러의 대기열에 경합이 일어나기 시작했고, 이는 수행되어야 할 작업들을 오랜 시간 대기 상태에 빠트려 장애를 발생시키기 시작했다. 결과적으로 추천과 검색 서비스의 데이터 최신성에 지연을 가져오게 되었으며, 서비스에 영향을 미치는 사안인 만큼 긴급하게 해결할 이슈로 부상했다.
이 이슈의 해결 과정에서 Airflow에서 동시성(Concurrency)에 관여하는 옵션에 대해 알 수 있었다. 이 계기를 통해 알게 된 것들을 정리하고, 이어서 작성할 예제편을 통해 해당 옵션들이 어떻게 작동하는지 확인해 볼 것이다.</p>
<h2 id="1-airflow-동시성-제어-구조-개요">1. Airflow 동시성 제어 구조 개요</h2>
<hr>
<p><img src="https://velog.velcdn.com/images/graphy-young/post/c49ceac1-af00-4019-81ec-dfbb9402f8e2/image.png" alt=""></p>
<p>Airflow Cluster 내에서 동시성에 영향을 줄 수 있는 요소들은 크게 다음으로 나눌 수 있다.</p>
<ul>
<li>Scheduler 단계에서 설정 가능한 <strong>Parallelism</strong>과 <strong>Pools</strong></li>
<li>Celery Worker 사용시에 설정 가능한 <strong>Worker Concurrency</strong></li>
<li>Scheduler~Metadata Database 사이의 Connection 설정</li>
</ul>
<h2 id="2-설정값-이해하기">2. 설정값 이해하기</h2>
<hr>
<p><img src="https://velog.velcdn.com/images/graphy-young/post/55788a87-980f-4312-8dee-fd6b88a153c1/image.png" alt=""></p>
<blockquote>
<p><strong>최근 버전의 경우 위 그림에서 설정값들이 이름이 변경되었으므로 참고!</strong></p>
</blockquote>
<ul>
<li><code>max_active_runs</code> -&gt; <code>max_active_runs_per_dag</code></li>
<li><code>task_concurrency</code> -&gt; <code>max_active_tis_per_dag</code></li>
<li><code>concurrency</code> -&gt; <code>max_active_tasks_per_dag</code></li>
</ul>
<h3 id="21-airflow-configuration">2.1. Airflow configuration</h3>
<ol>
<li><a href="https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#parallelism"><code>core.parallelism</code></a> (<code>AIRFLOW__CORE__PARALLELISM</code>)
 Parallelism(병렬성)은, Scheduler별로 Worker 수와 관계 없이 Airflow Cluster 내에서 동시에 수행될 수 있는(= Runnning state에 진입할 수 있는) Task instance의 전체 갯수
 기본 설정 값은 <code>32</code>이며, 최상위의 설정값이므로 후술할 <strong>Pool이나 Worker Concurrency가 아무리 높은 값을 가지고 있더라도 동시 실행이 가능한 최대 Task 수는 반드시 이 값을 초과할 수 없다.</strong><blockquote>
<p>2.3.3 버전 이전 문서에서는 Scheduler 당이 아닌 &quot;<em>Scheduler의 수와 상관없이(= 전체가 같은 값을 공유하는 뉘앙스)</em>&quot;로 표현되었으나 표현오류로 수정되었음 (<a href="https://airflow.apache.org/docs/apache-airflow/2.3.3/release_notes.html#doc-only-changes">링크</a>)</p>
</blockquote>
</li>
</ol>
<ol start="2">
<li><p><a href="https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#max-active-tasks-per-dag"><code>core.max_active_tasks_per_dag</code></a> (2.2.0 이전의 <a href="https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#max-active-tasks-per-dag"><code>dag_concurrency</code></a>와 같음)
DAG 당 최대로 실행 가능한 Task 수 (1개의 DAG 내 모든 실행이 값을 공유)</p>
</li>
<li><p><a href="max_active_runs_per_dag"><code>core.max_active_runs_per_dag</code></a> (<code>AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG</code>)
각 DAG 당 실행중인 최대 DAG Run의 수 </p>
</li>
<li><p><a href="https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#worker-concurrency"><code>celery.worker_concurrency</code></a> (<code>AIRFLOW__CELERY__WORKER_CONCURRENCY</code>)
Celery Worker 사용 시, 각 Worker가 처리할 수 있는 최대 Task instance 수</p>
<blockquote>
<p>그러나 <a href="https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#worker-concurrency"><code>core.worker_autoscale</code></a> 옵션이 활성화될 경우 이 옵션은 적용되지 않음</p>
</blockquote>
<h4 id="적용방법">적용방법</h4>
<ul>
<li><p><strong>$AIRFLOW_HOME/airflow.cfg</strong> &gt;&gt; 해당 옵션 값 변경 (<a href="https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#parallelism">공식 문서</a>)</p>
<pre><code class="language-bash">...
[core]
...
paralellism = 32
</code></pre>
</li>
<li><p>Airflow Scheduler 재시작</p>
</li>
</ul>
</li>
</ol>
<h3 id="22-pools">2.2. Pools</h3>
<p>Pool은 Airflow에서 동시 실행을 제어하기 위한 실행 대기열의 개념이다. Parallelism은 Airflow 환경 내의 전체 Task 실행의 최대 수를 제한할 수 있는 Scheduler의 옵션이며, Configuration에서만 설정할 수 있기 때문에 Scheduler 내의 Configuration에 접근해야만 이 값을 변경할 수 있다. 
하지만 pool은 Webserver GUI에서 설정과 관리가 가능하며, 여러 개의 pool을 정의하고, Task를 사용자 임의의 기준으로 분류하여 각 pool의 Slot(concurrency)을 제한하여 pool에 따라 개별적인 동시 실행을 제어할 수 있다.
이 Slot의 개념은 다른 동시성 관련 설정값과 다른 특징이 하나 있는데, 바로 <strong>각 Task가 가지는 slot 값은 사용자가 유동적으로 조절할 수 있다.</strong> Task의 경중에 상관없이 Concurrency에서는 1개의 task로써 동시성을 제어받게 된다.
예를 들어 <code>Task A</code>는 부하 <strong>1</strong>, <code>Task B</code>는 부하 <strong>3</strong>을 가지고 있다고 가정해보자. 각 Task의 20개가 동시에 수행될 경우 Server와 Worker에게는 차원이 다른 부하가 될 것이다. 이를 다른 동시성 설정으로는 통제할 수 없지만, Pool을 사용할 경우 slot 값을 부여하여 작업의 무게(Weight)를 반영한 동시성 제어를 수행할 수 있게 한다.
Airflow는 <code>default_pool</code> 이라는 기본 pool을 가지고 있으며, 기본 설정값은 <code>128</code>이다. 이 기본 값은 <code>airflow.cfg</code>의 <code>core.default_pool_task_slot_count</code>을 변경하여 조정할 수 있다.</p>
<h4 id="적용-방법">적용 방법</h4>
<ul>
<li>DAG 내 Task 정의 시 <code>pool</code> 및 <code>pool_slots</code> 매개변수 삽입<pre><code class="language-python">  task_name = BashOperator(
      task_id=&quot;test_task&quot;,
      pool=&quot;pool_name&quot;, # Pool 이름 입력. 기본값은 &quot;default_pool&quot;
      pool_slots=2, # 해당 Task가 사용할 Slot 갯수. 기본값은 1
      bash_command=&quot;sleep 10&quot;,
      dag=dag,
  )</code></pre>
</li>
</ul>
<h4 id="주의사항">주의사항</h4>
<ul>
<li><code>default_pool</code>은 삭제 불가능</li>
<li><em>1개의 Task는 1개의 Pool에만 등록 가능</em></li>
<li>SubDAG을 사용할 경우 SubDAG 안의 Task들에만 Pool 적용 가능. <code>SubDAGOperator</code> 자체는 Pool에 등록되지 않음</li>
<li>잘못된 Pool 이름 입력 시 Task가 작동하지 않으므로 오타로 인해 작동불능이 되지 않도록 주의할 것</li>
</ul>
<h3 id="23-operator-weight-rule">2.3. Operator Weight Rule</h3>
<p>만약 여러개의 DAG가 실행중이며, 수많은 Task가 당신의 Workflow에 몰려들고 있다면, 어떤 Task를 먼저 실행시킬 것인가? Airflow는 이를 Priority Weights라는 개념을 통해 Task의 우선도를 결정하고 실행 순서를 지정한다.
Priority Weights는 Executor 내 Queue에서 실행 순서를 지정하는데 사용하는 정수값이다. Priority Weights를 산정하는 방법인 <a href="https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/priority-weight.html"><code>weight_rule</code></a>은 <a href="https://github.com/apache/airflow/blob/main/airflow/models/baseoperator.py"><code>BaseOperator</code></a> 객체 내의 인수에 의해 정의되어 모든 Operator에 상속된다.
Weight rule은 <a href="https://github.com/apache/airflow/blob/51d96334b8a688851d3ed44f57519fc795a7a509/airflow/utils/weight_rule.py#L25"><code>airflow.utils.WeightRule</code></a>에 정의되어 있으며 다음과 같이 3가지가 존재한다.</p>
<ul>
<li><code>UPSTREAM</code>: 처음 실행되는 Task부터 낮은 값을 가지며, 마지막에 가까운 Task일수록 높은 값을 가진다.</li>
<li><code>DOWNSTREAM</code>: <code>BaseOperator</code>의 기본 값. 선순위 Task부터 높은 값을 가지며, 후순위 Task는 낮은 값을 가진다(내림차순)</li>
<li><code>ABSOLUTE</code>: 모든 Task가 동일한 실행 우선순위를 가진다.</li>
</ul>
<p>기본값인 <code>DOWNSTREAM</code>의 경우 DAG 앞에서 실행되는 Task는 높은 실행 우선순위를 갖게 되는 반면, 뒤쪽에서 실행되는 Task는 낮은 실행 순위를 가지게 되어 전반적인 DAG 실행시간이 길어질 수 있어 DAG의 동시성에 영향을 줄 수 있다.</p>
<h4 id="적용방법-1">적용방법</h4>
<ul>
<li><code>$AIRFLOW_HOME/models/baseoperator.py</code> 또는 각 Operator의 Class 내 매개변수 수정<pre><code class="language-python">  class BaseOperator:
      def __init__(self, ...):
          ...
          weight_rule=WeightRule.DOWNSTREAM # UPSTREAM 또는 ABSOLUTE
          ...
</code></pre>
</li>
</ul>
<h4 id="주의사항-1">주의사항</h4>
<ul>
<li>Priority Weight의 기본 값은 1</li>
</ul>
<h2 id="참고자료">참고자료</h2>
<hr>
<ul>
<li>Apache Airflow Documentation<ul>
<li><a href="https://airflow.apache.org/docs/apache-airflow/stable/index.html">https://airflow.apache.org/docs/apache-airflow/stable/index.html</a></li>
<li><a href="https://airflow.apache.org/docs/apache-airflow/2.0.2/configurations-ref.html">https://airflow.apache.org/docs/apache-airflow/2.0.2/configurations-ref.html</a></li>
</ul>
</li>
<li>Astronomer - <a href="https://docs.astronomer.io/learn">https://docs.astronomer.io/learn</a></li>
<li>쏘카 데이터 그룹 - <a href="https://tech.socarcorp.kr/data/2021/06/01/data-engineering-with-airflow.html">https://tech.socarcorp.kr/data/2021/06/01/data-engineering-with-airflow.html</a></li>
<li>LINE ENGINEERING - <a href="https://engineering.linecorp.com/ko/blog/data-engineering-with-airflow-k8s-1/">https://engineering.linecorp.com/ko/blog/data-engineering-with-airflow-k8s-1/</a></li>
</ul>
]]></description>
        </item>
        <item>
            <title><![CDATA[macOS에서 Docker를 통한 Linux 개발 시작하기 (작성중)]]></title>
            <link>https://velog.io/@graphy-young/Docker%EB%A5%BC-%ED%86%B5%ED%95%9C-Linux-%EA%B0%80%EC%83%81-%EB%A8%B8%EC%8B%A0VM-%EC%8B%9C%EC%9E%91%ED%95%98%EA%B8%B0-%EC%9E%91%EC%84%B1%EC%A4%91</link>
            <guid>https://velog.io/@graphy-young/Docker%EB%A5%BC-%ED%86%B5%ED%95%9C-Linux-%EA%B0%80%EC%83%81-%EB%A8%B8%EC%8B%A0VM-%EC%8B%9C%EC%9E%91%ED%95%98%EA%B8%B0-%EC%9E%91%EC%84%B1%EC%A4%91</guid>
            <pubDate>Tue, 09 Aug 2022 15:49:54 GMT</pubDate>
            <description><![CDATA[<blockquote>
<p><strong>작성중인 글입니다. 내용은 틈틈히 추가할 예정입니다.</strong></p>
</blockquote>
<h1 id="starting-developing-linux-via-docker-container-in-macos">Starting developing Linux via Docker container in macOS</h1>
<blockquote>
<p>현재 글은 Intel 기반의 macOS Monterey 12.5 환경에서 진행되었습니다.</p>
</blockquote>
<h2 id="0-시작하며">0. 시작하며</h2>
<p>그 동안은 AWS 클라우드 환경을 나름 비용 제한만 피하면 자율적으로 쓸 수 있었으나, 이제는 그렇지 못한 상황이 되어 로컬 개발 환경이 어느정도 필요해졌다.</p>
<h2 id="1-docker-설치">1. Docker 설치</h2>
<h3 id="command-line에서-docker-설치하기">Command Line에서 Docker 설치하기</h3>
<blockquote>
<p>만약 Homebrew가 설치되지 않았을 경우 <a href="https://brew.sh">Homebrew 홈페이지</a>를 참고하여 사전 설치하시거나 <a href="https://www.docker.com/products/docker-desktop/">Docker 공식 홈페이지</a>에서 설치파일을 다운로드 받아 직접 설치할 수도 있습니다.</p>
</blockquote>
<ul>
<li>Homebrew가 설치되어있을 경우<pre><code class="language-bash">brew install docker</code></pre>
</li>
<li>Homebrew가 설치되지 않았거나 macOS 외에 다른 운영체제를 사용 중인 경우<pre><code class="language-bash">sudo wget -qO- https://get.docker.com/ | sh</code></pre>
<h2 id="2-docker-사용하기">2. Docker 사용하기</h2>
<h3 id="search---docker-hub에서-ubuntu-image-찾기"><code>search</code> - Docker Hub에서 Ubuntu image 찾기</h3>
</li>
<li><code>docker search &lt;image_name&gt;</code><pre><code class="language-bash">docker search ubuntu</code></pre>
<img src="https://velog.velcdn.com/images/graphy-young/post/c7c7313a-66b8-4c67-b7ef-8c56b021ecbf/image.png" alt="Docker Hub에 Ubuntu image를 검색한 결과"><h3 id="pull---ubuntu-image-다운로드"><code>pull</code> - Ubuntu image 다운로드</h3>
</li>
<li><code>docker pull &lt;image_name&gt;[:tag]</code></li>
<li>tag를 지정하여 원하는 버전의 이미지를 받을 수도 있습니다.<pre><code>docker pull ubuntu:latest</code></pre><img src="https://velog.velcdn.com/images/graphy-young/post/7b74dc10-7178-4fd8-9be3-40bc4aab911b/image.png" alt=""></li>
</ul>
<h3 id="images---가지고-있는-image-확인하기"><code>images</code> - 가지고 있는 image 확인하기</h3>
<ul>
<li><code>docker images [image_name]</code></li>
<li>image 이름을 입력하면 해당 이름을 가진 여러 태그의 이미지들을 보여줌<pre><code>docker images</code></pre><img src="https://velog.velcdn.com/images/graphy-young/post/72fc29aa-7f08-405c-9e73-e6e4fb958f42/image.png" alt=""><h3 id="run---image로-컨테이너를-생성하여-실행"><code>run</code> - Image로 컨테이너를 생성하여 실행</h3>
</li>
<li><code>docker run [options] &lt;image_name or image_id&gt;</code><pre><code>docker run -it --name test_ubuntu ubuntu /bin/bash</code></pre></li>
<li><code>-i</code>: Interactive, 표준 입력(stdin)을 활성화</li>
<li><code>-t</code>: Pseudo-tty, TTY 모드를 사용해 Shell 표시</li>
<li><code>--name &lt;container_name&gt;</code>: Container의 이름을 지정하는 옵션</li>
<li><code>/bin/bash/</code>: Container의 Bash shell 지정 (보통 위의 두 옵션과 같이 사용)</li>
<li><code>-it</code> 옵션과 Bash shell은 보통 같이 사용하며, <code>-i</code> 옵션이 있어야 Bash에 명령을 입력할 수 있고, <code>-t</code> 옵션을 활성화해야 Shell이 표시된다</li>
</ul>
<p><img src="https://velog.velcdn.com/images/graphy-young/post/5efa008c-396c-41d0-b4ad-3dca9a6f8018/image.png" alt=""></p>
<ul>
<li><code>ls</code> 명령어를 입력해 Container 내부에 명령이 전달된 것을 확인할 수 있었다.</li>
<li>종료 등 내용 확인<h2 id="references">References</h2>
</li>
<li>Docker 기본 사용법 (이재홍) - <a href="http://pyrasis.com/Docker/Docker-HOWTO">http://pyrasis.com/Docker/Docker-HOWTO</a></li>
</ul>
]]></description>
        </item>
    </channel>
</rss>