<?xml version="1.0" encoding="utf-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom">
    <channel>
        <title>2h-kim.log</title>
        <link>https://velog.io/</link>
        <description>열심히 정리하는 습관 기르기..</description>
        <lastBuildDate>Sun, 19 Nov 2023 12:13:43 GMT</lastBuildDate>
        <docs>https://validator.w3.org/feed/docs/rss2.html</docs>
        <generator>https://github.com/jpmonette/feed</generator>
        <image>
            <title>2h-kim.log</title>
            <url>https://velog.velcdn.com/images/2h-kim/profile/4ba44d60-74ba-49df-ae63-f643d5027afc/image.jpeg</url>
            <link>https://velog.io/</link>
        </image>
        <copyright>Copyright (C) 2019. 2h-kim.log. All rights reserved.</copyright>
        <atom:link href="https://v2.velog.io/rss/2h-kim" rel="self" type="application/rss+xml"/>
        <item>
            <title><![CDATA[[RaspberryPI] Kubernetes 구축]]></title>
            <link>https://velog.io/@2h-kim/RaspberryPI-Kubernetes-%EA%B5%AC%EC%B6%95</link>
            <guid>https://velog.io/@2h-kim/RaspberryPI-Kubernetes-%EA%B5%AC%EC%B6%95</guid>
            <pubDate>Sun, 19 Nov 2023 12:13:43 GMT</pubDate>
            <description><![CDATA[<h1 id="문서-목적">문서 목적</h1>
<ul>
<li>2023년 11월 19일 기준 라즈베리파이로 docker 기반 쿠버네티스를 구축 진행</li>
<li>이런 저러한 삽질의 결과물로, 라즈베리 파이를 이용하여 구축할 때 참고시 도움이 될 것으로 보임</li>
</ul>
<p><strong>결과</strong></p>
<pre><code class="language-bash">$ kubectl get node -o wide
NAME            STATUS   ROLES           AGE     VERSION   INTERNAL-IP    EXTERNAL-IP   OS-IMAGE             KERNEL-VERSION      CONTAINER-RUNTIME
raspberrypi01   Ready    control-plane   3h16m   v1.28.2   192.168.0.x    &lt;none&gt;        Ubuntu 22.04.3 LTS   5.15.0-1034-raspi   docker://24.0.7
raspberrypi02   Ready    &lt;none&gt;          3h11m   v1.28.2   192.168.0.x    &lt;none&gt;        Ubuntu 22.04.3 LTS   5.15.0-1034-raspi   docker://24.0.7
raspberrypi03   Ready    &lt;none&gt;          3h2m    v1.28.2   192.168.0.x    &lt;none&gt;        Ubuntu 22.04.3 LTS   5.15.0-1034-raspi   docker://24.0.7
raspberrypi04   Ready    &lt;none&gt;          3h2m    v1.28.2   192.168.0.x    &lt;none&gt;        Ubuntu 22.04.3 LTS   5.15.0-1034-raspi   docker://24.0.7</code></pre>
<h3 id="environment">Environment</h3>
<ul>
<li>Raspberry PI 4B<ul>
<li>Memory : 8 GB</li>
<li>Storage : 32 GB</li>
<li>Arm 기반 64 bit</li>
<li>OS : Ubuntu 22.04 LTS</li>
</ul>
</li>
<li>Docker 24.0.7</li>
<li>Kubernetes Version 1.28.2</li>
</ul>
<h1 id="설치-과정">설치 과정</h1>
<h2 id="✅-all-node-환경-설정">✅ All Node 환경 설정</h2>
<hr>
<h3 id="croups를-위한-설정">croups를 위한 설정</h3>
<ul>
<li>cgroup이란 ? : <a href="https://sonseungha.tistory.com/535">https://sonseungha.tistory.com/535</a></li>
<li>About cgroup v2 : <a href="https://kubernetes.io/docs/concepts/architecture/cgroups/">https://kubernetes.io/docs/concepts/architecture/cgroups/</a><pre><code class="language-bash">$ sudo vi /boot/firmware/cmdline.txt
# cgroup_enable=memory cgroup_memory=1 을 앞에 추가</code></pre>
</li>
</ul>
<h3 id="linux-패키지-최신화">Linux 패키지 최신화</h3>
<pre><code class="language-bash"># 패키지를 최신 버전으로 갱신하고 리부팅한다.
$ sudo apt update &amp;&amp; sudo apt -y full-upgrade
[ -f /var/run/reboot-required ] &amp;&amp; sudo reboot -f</code></pre>
<h3 id="swapoff-처리">SwapOff 처리</h3>
<p>주로 아래와 같은 이유로 swap off를 함</p>
<ul>
<li>쿠버네티스 노드의 안정성 향상</li>
<li>컨테이너 환경에서의 메모리 관리</li>
<li>스왑 사용 시 성능 저하 방지<pre><code class="language-bash">sudo swapoff -a &amp;&amp; sudo sed -i &#39;/ swap / s/^/#/&#39; /etc/fstab</code></pre>
</li>
</ul>
<h3 id="docker-설치">Docker 설치</h3>
<pre><code class="language-bash">$ sudo apt -y install curl apt-transport-https net-tools vim git curl wget software-properties-common
$ curl -sSL https://get.docker.com | sh
$ sudo usermod -aG docker $USER # sudo를 안쓰기 위해
$ sudo docker run hello-world # 잘 나오는지 테스트를 위해</code></pre>
<h3 id="cri-dockered-설치">cri-dockered 설치</h3>
<ul>
<li>1.24 버전 이후로 도커심 대신 cri-dockered를 통해 도커 엔진을 사용할 수 있음<pre><code class="language-bash">$ VER=$(curl -s https://api.github.com/repos/Mirantis/cri-dockerd/releases/latest|grep tag_name | cut -d &#39;&quot;&#39; -f 4|sed &#39;s/v//g&#39;) # for latest
$ echo $VER # 0.3.7
$ wget https://github.com/Mirantis/cri-dockerd/releases/download/v${VER}/cri-dockerd-${VER}.arm64.tgz
$ tar xvf cri-dockerd-${VER}.arm64.tgz
$ sudo mv cri-dockerd/cri-dockerd /usr/local/bin/
$ cri-dockerd --version # cri-dockerd 0.3.7 
$ wget https://raw.githubusercontent.com/Mirantis/cri-dockerd/master/packaging/systemd/cri-docker.service
$ wget https://raw.githubusercontent.com/Mirantis/cri-dockerd/master/packaging/systemd/cri-docker.socket
$ sudo mv cri-docker.socket cri-docker.service /etc/systemd/system/
$ sudo sed -i -e &#39;s,/usr/bin/cri-dockerd,/usr/local/bin/cri-dockerd,&#39; /etc/systemd/system/cri-docker.service
$ sudo systemctl daemon-reload
$ sudo systemctl enable cri-docker.service
$ sudo systemctl enable --now cri-docker.socket
$ sudo systemctl restart docker &amp;&amp; sudo systemctl restart cri-docker
$ sudo systemctl status cri-docker.socket --no-pager
$ sudo mkdir /etc/docker
$ cat &lt;&lt;EOF | sudo tee /etc/docker/daemon.json
{
  &quot;exec-opts&quot;: [&quot;native.cgroupdriver=systemd&quot;],
  &quot;log-driver&quot;: &quot;json-file&quot;,
  &quot;log-opts&quot;: {
      &quot;max-size&quot;: &quot;100m&quot;
  },
  &quot;storage-driver&quot;: &quot;overlay2&quot;
}
EOF
$ sudo systemctl restart docker &amp;&amp; sudo systemctl restart cri-docker
$ sudo docker info | grep Cgroup</code></pre>
</li>
</ul>
<h3 id="cri-o-설치">cri-o 설치</h3>
<ul>
<li>현재는 어떤 건지는 모르겠으나, 해당 내용이 없을 경우 <code>core-dns</code> POD가 Creating 단계에서 멈춰버림<pre><code class="language-bash">$ OS_NAME=xUbuntu_22.04
$ CRIO_VER=1.28
$ echo &quot;deb https://download.opensuse.org/repositories/devel:/kubic:/libcontainers:/stable/${OS_NAME}/ /&quot;|sudo tee /etc/apt/sources.list.d/devel:kubic:libcontainers:stable.list
$ echo &quot;deb http://download.opensuse.org/repositories/devel:/kubic:/libcontainers:/stable:/cri-o:/${CRIO_VER}/${OS_NAME}/ /&quot;|sudo tee /etc/apt/sources.list.d/devel:kubic:libcontainers:stable:cri-o:$CRIO_VER.list
$ curl -L https://download.opensuse.org/repositories/devel:kubic:libcontainers:stable:cri-o:$CRIO_VER/$OS_NAME/Release.key | sudo apt-key add -
$ curl -L https://download.opensuse.org/repositories/devel:/kubic:/libcontainers:/stable/$OS_NAME/Release.key | sudo apt-key add -
$ sudo apt update
$ sudo apt -y install cri-o cri-o-runc
$ crio version</code></pre>
<h3 id="kubelet-kubeadm-kubectl-설치">kubelet, kubeadm, kubectl 설치</h3>
<pre><code class="language-bash">$ curl -s https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key add -
$ echo &quot;deb http://apt.kubernetes.io/ kubernetes-xenial main&quot; | sudo tee -a /etc/apt/sources.list.d/kubernetes.list
$ sudo apt update
$ sudo apt -y install kubeadm kubelet kubectl
$ sudo apt-mark hold kubelet kubeadm kubectl # 버전 고정
$ kubectl version --client &amp;&amp; kubeadm version # 잘 설치 되었는지 확인</code></pre>
</li>
</ul>
<h3 id="ipv4를-포워딩하여-iptables가-브리지된-트래픽을-보게-하기">IPv4를 포워딩하여 iptables가 브리지된 트래픽을 보게 하기</h3>
<pre><code class="language-bash">$ cat &lt;&lt;EOF | sudo tee /etc/modules-load.d/k8s.conf
  overlay
  br_netfilter
  EOF

$ sudo modprobe overlay
$ sudo modprobe br_netfilter

# 필요한 sysctl 파라미터를 설정하면, 재부팅 후에도 값이 유지된다.
$ cat &lt;&lt;EOF | sudo tee /etc/sysctl.d/k8s.conf
  net.bridge.bridge-nf-call-iptables  = 1
  net.bridge.bridge-nf-call-ip6tables = 1
  net.ipv4.ip_forward                 = 1
  EOF

# 재부팅하지 않고 sysctl 파라미터 적용하기
$ sudo sysctl --system</code></pre>
<p>잘 적용되었는지 확인하는 방법은 <a href="https://velog.io/@sororiri/k8s-kubeadm-%EC%84%A4%EC%B9%98-big2bz1i#-step-3-bridge-%EB%84%A4%ED%8A%B8%EC%9B%8C%ED%81%AC%EB%A5%BC-%ED%86%B5%ED%95%B4-%EC%86%A1%EC%88%98%EC%8B%A0-%EB%90%98%EB%8A%94-%ED%8C%A8%ED%82%B7%EC%BB%A8%ED%85%8C%EC%9D%B4%EB%84%88-%ED%8C%A8%ED%82%B7%EC%9D%B4-iptables-%EC%84%A4%EC%A0%95%EC%97%90-%EB%94%B0%EB%9D%BC-%EC%A0%9C%EC%96%B4%EB%90%98%EB%8F%84%EB%A1%9D-%EC%84%A4%EC%A0%95">여기</a>에 잘 정리 되어 있음</p>
<hr>
<h2 id="✅-master-node에서만-적용">✅ Master Node에서만 적용</h2>
<h3 id="마스터-노드-초기화-작업">마스터 노드 초기화 작업</h3>
<pre><code class="language-bash">$ sudo systemctl enable kubelet # kubelet enable
$ sudo kubeadm config images pull --cri-socket unix:///run/cri-dockerd.sock
$ mkdir -p ~/tmp
$ cat &gt; ~/tmp/kube-admin-config.yaml &lt;&lt;EOF
---
apiVersion: &quot;kubeadm.k8s.io/v1beta3&quot;
kind: InitConfiguration
nodeRegistration:
  criSocket: &quot;unix:///run/cri-dockerd.sock&quot;
---
apiVersion: kubelet.config.k8s.io/v1beta1
kind: KubeletConfiguration
failSwapOn: false
featureGates:
  NodeSwap: true
memorySwap:
  swapBehavior: LimitedSwap
---
apiVersion: kubeadm.k8s.io/v1beta3
kind: ClusterConfiguration
networking:
  podSubnet: &quot;172.24.0.0/24&quot; # --pod-network-cidr
EOF
sudo kubeadm init --config ~/tmp/kube-admin-config.yaml # 해당 결과 복사 (kubeadm join 부분)</code></pre>
<h3 id="kubectl-설정">kubectl 설정</h3>
<pre><code class="language-bash">$ mkdir -p $HOME/.kube
$ sudo cp -f /etc/kubernetes/admin.conf $HOME/.kube/config
$ sudo chown $(id -u):$(id -g) $HOME/.kube/config</code></pre>
<p>자동 완성을 위해 아래처럼 실행(<a href="https://kubernetes.io/ko/docs/tasks/tools/included/optional-kubectl-configs-bash-linux/">출처</a>)</p>
<pre><code class="language-bash">$ echo &#39;source &lt;(kubectl completion bash)&#39; &gt;&gt;~/.bashrc # 현재 사용자만
$ kubectl completion bash | sudo tee /etc/bash_completion.d/kubectl &gt; /dev/null # 시스템 전체 적용</code></pre>
<h3 id="networkplugin-설치">NetworkPlugin 설치</h3>
<pre><code class="language-bash">$ curl -O https://raw.githubusercontent.com/projectcalico/calico/v3.26.4/manifests/tigera-operator.yaml
$ curl -O https://raw.githubusercontent.com/projectcalico/calico/v3.26.4/manifests/custom-resources.yaml
$ kubectl create -f tigera-operator.yaml
$ sed -ie &#39;s/192.168.0.0/172.24.0.0/g&#39; custom-resources.yaml # CIDR 수정
$ kubectl create -f custom-resources.yaml
$ kubectl get pods --all-namespaces -w # 정상 설치 확인
$ kubectl taint nodes --all  node-role.kubernetes.io/control-plane- # 격리 해제
$ kubectl get nodes # 확인</code></pre>
<hr>
<h2 id="✅-workernode에서만-적용">✅ WorkerNode에서만 적용</h2>
<pre><code class="language-bash">$ sudo kubeadm join master_node_host:6443 --token we7mc5.************ \
    --discovery-token-ca-cert-hash sha256:61d59b805bb8a9c3649f004e28d8bed***************** \
    --cri-socket unix:///run/cri-dockerd.sock</code></pre>
<hr>
<h1 id="reference">Reference</h1>
<ul>
<li>kubernetes : <a href="https://velog.io/@sororiri/k8s-kubeadm-%EC%84%A4%EC%B9%98-big2bz1i">https://velog.io/@sororiri/k8s-kubeadm-%EC%84%A4%EC%B9%98-big2bz1i</a></li>
<li>cri-dockered : <a href="https://tech.hostway.co.kr/2022/08/30/1374/">https://tech.hostway.co.kr/2022/08/30/1374/</a></li>
<li>cri-o : <a href="https://tech.hostway.co.kr/2022/05/12/1029/">https://tech.hostway.co.kr/2022/05/12/1029/</a></li>
<li><a href="https://truelifer.medium.com/raspberry-pi-4b-%EC%97%90-kubernetes-cluster-%EC%84%A4%EC%B9%98%ED%95%98%EA%B8%B0-35a1c7be3cbd">https://truelifer.medium.com/raspberry-pi-4b-%EC%97%90-kubernetes-cluster-%EC%84%A4%EC%B9%98%ED%95%98%EA%B8%B0-35a1c7be3cbd</a></li>
</ul>
]]></description>
        </item>
        <item>
            <title><![CDATA[SQL로 데이터 추출하기(개인적인 정리)]]></title>
            <link>https://velog.io/@2h-kim/SQL%EB%A1%9C-%EB%8D%B0%EC%9D%B4%ED%84%B0-%EC%B6%94%EC%B6%9C%ED%95%98%EA%B8%B0%EA%B0%9C%EC%9D%B8%EC%A0%81%EC%9D%B8-%EC%A0%95%EB%A6%AC</link>
            <guid>https://velog.io/@2h-kim/SQL%EB%A1%9C-%EB%8D%B0%EC%9D%B4%ED%84%B0-%EC%B6%94%EC%B6%9C%ED%95%98%EA%B8%B0%EA%B0%9C%EC%9D%B8%EC%A0%81%EC%9D%B8-%EC%A0%95%EB%A6%AC</guid>
            <pubDate>Tue, 25 Apr 2023 14:16:07 GMT</pubDate>
            <description><![CDATA[<h1 id="문서-목적">문서 목적</h1>
<p>해당 문서는 데분당태 챌린지를 진행하는 겸, 개인적으로 쿼리를 작성할 때 접근을 어떻게 했는지 정리하고자 작성한 문서입니다.</p>
<h1 id="요약">요약</h1>
<ul>
<li>개인적으로 SQL로 데이터를 추출할 때 주로 아래와 같이 진행한다.
요구사항 정리 → 필요한 테이블 선정 → 테이블 내용 확인 → 쿼리 작성 → EXPLAIN → 결과 확인</li>
</ul>
<h1 id="문서-내용">문서 내용</h1>
<p>맨 처음 SQL을 이용해서 데이터를 추출할 때, 어떻게 쿼리를 작성해야 원하는 값을 뽑을 수 있는지 고민을 많이 한 경험이 있습니다. 이러한 경험을 살려서 현재 제가 쿼리를 작성할 때 어떻게 접근하고 있는지 정리하고자 합니다. 물론 이는 지극히 주관적이므로 다를 수 있음을 미리 알려 드립니다.</p>
<br/>

<h2 id="예제-데이터">예제 데이터</h2>
<p>예제 데이터의 경우, 제가 아직 한번도 풀어보지 않고 정답률이 낮은 데이터로 시작하면 좋을 것 같아 아래 예제를 선정했습니다.</p>
<ul>
<li>사이트 : 프로그래머스 </li>
<li>문제 : 자동차 대여 기록 별 대여 금액 구하기(작성일 2023.04.25 기준 정답률 34%)
<img src="https://velog.velcdn.com/images/2h-kim/post/95ca6835-15e3-483e-8bcf-1c8189773978/image.png" alt=""></li>
</ul>
<br/>
<br/>


<h2 id="요구사항-정리">요구사항 정리</h2>
<p>맨 처음, 제가 진행하는 내용은 어떤 데이터를 뽑아야 하는가 입니다. 현재 주어진 문제는 아래와 같습니다.</p>
<blockquote>
<p><code>CAR_RENTAL_COMPANY_CAR</code> 테이블과 <code>CAR_RENTAL_COMPANY_RENTAL_HISTORY</code> 테이블과 <code>CAR_RENTAL_COMPANY_DISCOUNT_PLAN</code> 테이블에서 자동차 종류가 &#39;트럭&#39;인 자동차의 대여 기록에 대해서 대여 기록 별로 대여 금액(컬럼명: <code>FEE</code>)을 구하여 대여 기록 ID와 대여 금액 리스트를 출력하는 SQL문을 작성해주세요. 결과는 대여 금액을 기준으로 내림차순 정렬하고, 대여 금액이 같은 경우 대여 기록 ID를 기준으로 내림차순 정렬해주세요.</p>
</blockquote>
<p>제가 생각했을 때 이를 정리하면 아래와 같습니다.</p>
<ul>
<li>자동차 종류가 <code>트럭</code>인 자동차의 대여 기록을 구한다.</li>
<li>대여 기록에는 금액(할인 적용된)이 들어가야 한다.</li>
<li>최종 결과에는 두 가지 컬럼 <code>대여 기록 ID(HISTORY_ID)</code>, <code>대여 금액(FEE)</code>만 있으면 된다.</li>
<li>최종 결과는 대여 기록과 금액에 대해 정렬되어야 한다.(FEE DESC, HISTORY_ID DESC) </li>
</ul>
<br/>

<h2 id="필요한-테이블-선정">필요한 테이블 선정</h2>
<p>우선 테이블에 무엇이 있어야 하는지를 알아야 합니다. 현재 문제 설명에 테이블에 대한 설명이 아래와 같이 있습니다.</p>
<ul>
<li><p><code>CAR_RENTAL_COMPANY_CAR</code> </p>
<ul>
<li>대여 중인 자동차들의 정보</li>
<li>컬럼 : CAR_ID, CAR_TYPE, DAILY_FEE, OPTIONS
(자동차 ID, 자동차 종류, 일일 대여 요금(원), 자동차 옵션 리스트)</li>
</ul>
</li>
<li><p><code>CAR_RENTAL_COMPANY_RENTAL_HISTORY</code></p>
<ul>
<li>테이블과 자동차 대여 기록 정보</li>
<li>컬럼 : HISTORY_ID, CAR_ID, START_DATE, END_DATE 
(자동차 대여 기록 ID, 자동차 ID, 대여 시작일, 대여 종료일)</li>
</ul>
</li>
<li><p><code>CAR_RENTAL_COMPANY_DISCOUNT_PLAN</code></p>
<ul>
<li>자동차 종류 별 대여 기간 종류 별 할인 정책 정보</li>
<li>컬럼 : PLAN_ID, CAR_TYPE, DURATION_TYPE, DISCOUNT_RATE 
(요금 할인 정책 ID, 자동차 종류, 대여 기간 종류, 할인율(%))</li>
</ul>
</li>
</ul>
<p>현업에서도 잘 정리되어 있으면 좋겠으나, 정리되어 있지 않다면 우선 시도해 볼 것은 아래의 쿼리 입니다.</p>
<pre><code class="language-sql">SELECT
    table_name, column_name, column_comment
FROM
    information_schema.columns
WHERE
    table_schema = &#39;schema name&#39;
    AND table_name = &#39;table name&#39;;</code></pre>
<p> 이렇게 하면 처음 테이블을 생성했을 때, 입력한 코멘트 들을 확인할 수 있습니다. 물론 이렇게 해도 알 수 없는 경우가 종종 있습니다. 이럴 때는 테이블의 내용 확인 + 서비스 코드 확인 + 담당자와 미팅 등으로 파악할 수 있습니다.(시간이 진짜 오래걸립니다.)</p>
<p> 우선 여기서 우리가 요금 계산에 사용해야 할 것은 <code>CAR_RENTAL_COMPANY_CAR</code>의 <code>DAILY_FEE</code>컬럼과 <code>CAR_RENTAL_COMPANY_DISCOUNT_PLAN</code>의 <code>DISCOUNT_RATE</code>로 보입니다. 왜냐하면 기간 별 할인율이 있고, 그렇다면 할인율을 적용한 금액이 대여 기간의 금액일 것이기 때문입니다.</p>
<p> 그렇다면 자동차 종류가 <code>트럭</code>인 자동차의 대여 기록은 어떻게 구해야할까요? 저의 생각은 <code>CAR_RENTAL_COMPANY_RENTAL_HISTORY</code>와 <code>CAR_RENTAL_COMPANY_CAR</code>를 사용하면 될 것으로 보입니다. 대여 기록에서의 <code>자동차 ID</code>를 <code>CAR_RENTAL_COMPANY_CAR</code>에서 찾아 트럭인지 파악하면 될 것으로 보입니다.</p>
 <br/>


<h2 id="테이블-내용-확인">테이블 내용 확인</h2>
<p> 항상 쿼리를 짜기 전 각 테이블에 대해서 아래와 같은 쿼리들을 실행하는 습관이 있습니다. 프로그래밍으로 치면 디버깅이라고 볼 수 있는데, 항상 LIMIT로 내가 생각하는 구조로 잘 있는지 확인하는 습관이 생겼습니다. 얼마나 데이터가 있고, 인덱스가 어디에 걸려 있는지 등 파악하는데 도움이 됩니다.</p>
<pre><code class="language-sql"> -- table 내용 확인
SELECT *
FROM CAR_RENTAL_COMPANY_CAR
LIMIT 10;

 -- 인덱스 확인
 SHOW INDEX FROM CAR_RENTAL_COMPANY_CAR;

 -- 대략적인 ROWS수
 SELECT table_name, table_rows
FROM information_schema.tables
WHERE table_schema = &#39;sql_runner_run&#39;
    AND table_name = &#39;CAR_RENTAL_COMPANY_CAR&#39;;</code></pre>
 <br/>



<h2 id="쿼리-작성">쿼리 작성</h2>
<p> 우리가 구해야할 내용은 아래와 같이 정리되었습니다.</p>
<ul>
<li>자동차 종류가 <code>트럭</code>인 자동차의 대여 기록을 구한다.</li>
<li>대여 기록에는 금액(할인 적용된)이 들어가야 한다.</li>
</ul>
<br/>

<h4 id="1자동차-종류가-트럭인-자동차의-대여-기록을-구한다">1.자동차 종류가 <code>트럭</code>인 자동차의 대여 기록을 구한다.</h4>
<p>그렇다면 제일 먼저 구해야 할 것은 대여 기록 중 <code>트럭</code>인 자동차인 대여기록입니다. 하지만 대여기록인 <code>CAR_RENTAL_COMPANY_RENTAL_HISTORY</code>에는 대여한 차의 ID가 있으나 차종에 대한 정보가 없습니다. 이를 위해 <code>CAR_RENTAL_COMPANY_CAR</code>와 연결이 필요합니다.
연결 할 수 있는 방법에는 무엇이 있는가 고민이  필요한데, 저는 서브쿼리로 작성하는것을 선호하지 않으므로 JOIN방식을 사용하고자 합니다. 그렇다면 어떤 JOIN이 필요할지 고민이 필요합니다.</p>
<p>만약에 특정 CAR_ID가 <code>CAR_RENTAL_COMPANY_RENTAL_HISTORY</code>에 존재하지만 <code>CAR_RENTAL_COMPANY_CAR</code>에 존재하지 않는다고 가정해봅시다. (물론 이런일은 없어야 겠지만... 현실에는 그런경우가 존재하죠...). 이 때 저희는 해당 CAR_ID가 <code>트럭</code>인지 판단할 수가 없습니다. 그렇다면 요청 사항인 <code>자동차 종류가 &#39;트럭&#39;인 자동차의 대여 기록을 구한다.</code>에 어긋납니다. 그래서 저희는 그런 CAR_ID는 필요가 없게 됩니다. 따라서 여기서 우리는 INNER JOIN을 사용하면 되는구나!를 알 수 있습니다.(물론 EXISTS 방식도 있습니다.)
이를 이용해서 쿼리를 작성하면 아래와 같습니다.</p>
<pre><code class="language-sql">SELECT *
FROM CAR_RENTAL_COMPANY_RENTAL_HISTORY AS rental_history
INNER JOIN  CAR_RENTAL_COMPANY_CAR AS car_information
    ON rental_history.car_id = car_information.car_id
LIMIT 10;</code></pre>
<p><code>LIMIT 10</code>을 입력함으로써 제가 원하는 결과가 나왔는지 확인해봅니다. 각 기록에 따라 <code>car_type</code>이 붙은 것을 확인할 수 있었습니다. 여기서 필터링과 필요 컬럼을 선정하여 1차 적으로 원하는 데이터를 뽑을 수 있습니다.(언제나 인덴트로 쿼리가 읽기 편하게 만들어주세요)</p>
<pre><code class="language-sql">SELECT 
    history_id, 
    rental_history.car_id, 
    car_type, 
    start_date, 
    end_date, 
    daily_fee
FROM CAR_RENTAL_COMPANY_RENTAL_HISTORY AS rental_history
INNER JOIN  CAR_RENTAL_COMPANY_CAR AS car_information
    ON rental_history.car_id = car_information.car_id
WHERE car_information.car_type = &#39;트럭&#39;
LIMIT 10;</code></pre>
<p>여기서 <code>daily_fee</code>를 포함한 이유는 최종적으로 필요한 것은 <code>금액</code>이기 때문입니다. 또한 <code>car_type</code>, <code>start_date</code>, <code>end_date</code>를 포함한 이유는 차종, 대여 기간에 따라서 금액이 다르기 때문입니다.
<code>LIMIT 10</code>을 계속해서 붙여주는 이유는 체크포인트처럼 현재 내가 원하는 내용대로 나왔는지 쿼리를 실행하고자 함입니다. </p>
<p>이쯤에서 쿼리 코스트를 한번 확인합니다. 이 때 EXPLAIN을 이용하는데 아래와 같습니다.</p>
<pre><code class="language-sql">EXPLAIN
SELECT 
    history_id, 
    rental_history.car_id, 
    car_type, 
    start_date, 
    end_date, 
    daily_fee
FROM CAR_RENTAL_COMPANY_RENTAL_HISTORY AS rental_history
INNER JOIN  CAR_RENTAL_COMPANY_CAR AS car_information
    ON rental_history.car_id = car_information.car_id
WHERE car_information.car_type = &#39;트럭&#39;</code></pre>
<p>현재 결과는 아래와 같이 나옵니다. 여기서 type <code>ALL</code>부분은 최대한 피해야합니다. <code>ALL</code>이 의미하는 바는 풀스캔이라서 리소스가 많이 듭니다. 하지만, 현재 예제에서는 <code>car_type</code>이 인덱스가 걸려져 있지 않으나, 트럭으로 필터링 할 수 있는 부분이 없어서 이 부분은 넘어갑니다.</p>
<table class="console-sql-result"><thead><tr><th>id</th><th>select_type</th><th>table</th><th>partitions</th><th>type</th><th>possible_keys</th><th>key</th><th>key_len</th><th>ref</th><th>rows</th><th>filtered</th><th>Extra</th></tr></thead><thead></thead><tbody><tr><td>1</td><td>SIMPLE</td><td>car_information</td><td></td><td>ALL</td><td>PRIMARY</td><td></td><td></td><td></td><td>30</td><td>10</td><td>Using where</td></tr><tr><td>1</td><td>SIMPLE</td><td>rental_history</td><td></td><td>ALL</td><td></td><td></td><td></td><td></td><td>162</td><td>10</td><td>Using where; Using join buffer (hash join)</td></tr></tbody></table>

<hr>
<h4 id="2-대여-기록에-할인율을-구해야-한다">2. 대여 기록에 할인율을 구해야 한다.</h4>
<p>이제 구해야 할것은 각 기록마다 할인된 금액을 구해야 합니다. <code>CAR_RENTAL_COMPANY_DISCOUNT_PLAN</code>에는 차종과 대여 기간 동안 얼마나 할인이 되는지 정리되어 있습니다. 그렇다면 우리는 각 기록에서 차량의 대여 기간을 알아야합니다. 
위에서 진행한 내용에서 대여 시작일인 <code>start_date</code>, 대여 종료일인 <code>end_date</code>가 있습니다. 이로 우리는 몇일 빌렸는지 확인하기 위해서는 날짜 차이를 구해야합니다. 이때 구글에서 <code>MySQL 날짜 차이 구하는 법</code>으로 검색해보니 <code>DATEDIFF</code>라는 함수를 찾았습니다. 이를 적용해봅니다.</p>
<pre><code class="language-sql">SELECT 
    history_id, 
    rental_history.car_id, 
    car_type, 
    start_date, 
    end_date, 
    daily_fee,
    DATEDIFF(end_date, start_date) AS rental_duration
FROM CAR_RENTAL_COMPANY_RENTAL_HISTORY AS rental_history
INNER JOIN  CAR_RENTAL_COMPANY_CAR AS car_information
    ON rental_history.car_id = car_information.car_id
WHERE car_information.car_type = &#39;트럭&#39;
LIMIT 10;</code></pre>
<p>결과를 확인해보니 저희가 생각하는 값과 다르다는 것을 확인했습니다. 예를 들어 <code>2022-08-03</code> 이 <code>start_date</code>이고, <code>2022-08-04</code>이 <code>end_date</code>이라하면 <code>DATEDIFF</code>함수는 <code>1</code>이라는 값을 내뱉습니다. 하지만 대여 기간의 경우는 2일이 되어야 합니다. 따라서 아래와 같이 <code>DATEDIFF</code>의 결과에 1을 더해줍니다.</p>
<pre><code class="language-sql">SELECT 
    history_id, 
    rental_history.car_id, 
    car_type, 
    start_date, 
    end_date, 
    daily_fee,
    (DATEDIFF(end_date, start_date) + 1)AS rental_duration
FROM CAR_RENTAL_COMPANY_RENTAL_HISTORY AS rental_history
INNER JOIN  CAR_RENTAL_COMPANY_CAR AS car_information
    ON rental_history.car_id = car_information.car_id
WHERE car_information.car_type = &#39;트럭&#39;
LIMIT 10;</code></pre>
<p>이제 이를 이용해서 할인율을 계산하면 될것으로 보입니다. 그런데, <code>CAR_RENTAL_COMPANY_DISCOUNT_PLAN</code>을 보게되면 <code>DURATION_TYPE</code>이 <code>N일 이상</code>이런식으로 작성되어 있는 것을 확인할 수 있습니다. <code>DURATION_TYPE</code> 에는 <code>&#39;7일 이상&#39; (대여 기간이 7일 이상 30일 미만인 경우), &#39;30일 이상&#39; (대여 기간이 30일 이상 90일 미만인 경우), &#39;90일 이상&#39; (대여 기간이 90일 이상인 경우)</code>의 경우만 존재한다고 합니다. 따라서 위의 결과를 아래와 같이 변경해 줄 수 있습니다.</p>
<pre><code class="language-sql">SELECT 
    history_id, 
    rental_history.car_id, 
    car_type, 
    start_date, 
    end_date, 
    daily_fee,
    (DATEDIFF(end_date, start_date) + 1)AS rental_duration,
    CASE
        WHEN (DATEDIFF(end_date, start_date) + 1) &gt;= 90
            THEN &#39;90일 이상&#39;
        WHEN (DATEDIFF(end_date, start_date) + 1) &gt;= 30
            THEN &#39;30일 이상&#39;
        WHEN (DATEDIFF(end_date, start_date) + 1) &gt;= 7
            THEN &#39;7일 이상&#39;
        ELSE &#39;7일 미만&#39;
    END AS rental_duration
FROM CAR_RENTAL_COMPANY_RENTAL_HISTORY AS rental_history
INNER JOIN  CAR_RENTAL_COMPANY_CAR AS car_information
    ON rental_history.car_id = car_information.car_id
WHERE car_information.car_type = &#39;트럭&#39;
LIMIT 10;</code></pre>
<hr>
<h4 id="3-대여-기록에는-금액할인-적용된이-들어가야-한다">3. 대여 기록에는 금액(할인 적용된)이 들어가야 한다.</h4>
<p>이제 대여 기록과 할인 정보를 연결하면 되는데 연결 조건에 대해 고민이 필요합니다. 1번의 결과와 <code>CAR_RENTAL_COMPANY_DISCOUNT_PLAN</code>을 연결하기 위해서는 조건이 아래와 같습니다.</p>
<ul>
<li>차종이 같다.</li>
<li>대여 기간 종류가 같다.</li>
</ul>
<p>위의 2가지 조건을 만족하는 것이 ON절로 가면 됩니다. 그렇다면 JOIN의 형태는 어떻게 되어야 할까요? 예를 들어 대여 기록에 <code>트럭</code>이 있지만, 할인 기록에는 해당 차종이 없다고 해봅시다. 이 때 대여 기록에 대한 데이터는 삭제되면 안됩니다. 따라서 INNER가 아닌 LEFT가 사용 되어야 합니다. 이 때 중요한 것은 조건절을 잘 설정해야 1:1 대응이 되어서 데이터가 중복이 일어나지 않게됩니다. 이를 쿼리로 바꾸면 아래와 같습니다.(제가 사용하기 편한 서브쿼리로 진행했습니다.)</p>
<pre><code class="language-sql">SELECT
    *
FROM (
    SELECT 
        history_id, 
        rental_history.car_id, 
        car_type, 
        start_date, 
        end_date, 
        daily_fee,
        (DATEDIFF(end_date, start_date) + 1)AS rental_duration,
        CASE
            WHEN (DATEDIFF(end_date, start_date) + 1) &gt;= 90
                THEN &#39;90일 이상&#39;
            WHEN (DATEDIFF(end_date, start_date) + 1) &gt;= 30
                THEN &#39;30일 이상&#39;
            WHEN (DATEDIFF(end_date, start_date) + 1) &gt;= 7
                THEN &#39;7일 이상&#39;
            ELSE &#39;7일 미만&#39;
        END AS duration_type
    FROM CAR_RENTAL_COMPANY_RENTAL_HISTORY AS rental_history
    INNER JOIN  CAR_RENTAL_COMPANY_CAR AS car_information
        ON rental_history.car_id = car_information.car_id
    WHERE car_information.car_type = &#39;트럭&#39;
) AS rental_history
LEFT JOIN (
    SELECT 
        car_type,
        duration_type,
        discount_rate
    FROM CAR_RENTAL_COMPANY_DISCOUNT_PLAN 
) AS rental_discount_plan
ON rental_history.car_type = rental_discount_plan.car_type
    AND rental_history.duration_type = rental_discount_plan.duration_type
LIMIT 10;</code></pre>
<p>이렇게 했을 때, 저희가 생각한 대로 데이터 연결이 잘 이루어짐을 확인하였습니다.</p>
<p>이제 할인된 금액을 구하기 위해서는 <code>daily_fee</code>에서 <code>(100 - discount_rate) / 100</code>을 곱해주면 됩니다. 그 후, 렌탈한 기간 <code>rental_duration</code>을 곱해주면 원하는 할인된 금액을 구할 수 있게 됩니다.
이를 쿼리로 나타내면 아래와 같습니다.</p>
<pre><code class="language-sql">SELECT
    *,
    (100 - discount_rate) AS non_discount_rate,
    rental_duration * daily_fee * ((100 - discount_rate) / 100) AS fee
FROM (
    SELECT 
        history_id, 
        rental_history.car_id, 
        car_type, 
        start_date, 
        end_date, 
        daily_fee,
        (DATEDIFF(end_date, start_date) + 1)AS rental_duration,
        CASE
            WHEN (DATEDIFF(end_date, start_date) + 1) &gt;= 90
                THEN &#39;90일 이상&#39;
            WHEN (DATEDIFF(end_date, start_date) + 1) &gt;= 30
                THEN &#39;30일 이상&#39;
            WHEN (DATEDIFF(end_date, start_date) + 1) &gt;= 7
                THEN &#39;7일 이상&#39;
            ELSE &#39;7일 미만&#39;
        END AS duration_type
    FROM CAR_RENTAL_COMPANY_RENTAL_HISTORY AS rental_history
    INNER JOIN  CAR_RENTAL_COMPANY_CAR AS car_information
        ON rental_history.car_id = car_information.car_id
    WHERE car_information.car_type = &#39;트럭&#39;
) AS rental_history
LEFT JOIN (
    SELECT 
        car_type,
        duration_type,
        discount_rate
    FROM CAR_RENTAL_COMPANY_DISCOUNT_PLAN 
) AS rental_discount_plan
ON rental_history.car_type = rental_discount_plan.car_type
    AND rental_history.duration_type = rental_discount_plan.duration_type
LIMIT 10;</code></pre>
<p>여기서 보이는 점은 <code>discount_rate</code>이 NULL인 경우 fee를 구할 수 없음입니다. 이를 해결하기 위해서는 null일 경우 <code>discount_rate</code>를 0으로 치환하면 됩니다. 또한 소숫점으로 나오게 되는데 이를 정수형으로 치환하면 원하는 결과가 나오게 됩니다. 이를 쿼리로 변환하면 아래와 같습니다.</p>
<pre><code class="language-sql">SELECT
    *,
    (100 - discount_rate) AS non_discount_rate,
    CAST(
        rental_duration * daily_fee * ((100 - ifnull(discount_rate,0)) / 100) AS DECIMAL
    ) AS fee
FROM (
    SELECT 
        history_id, 
        rental_history.car_id, 
        car_type, 
        start_date, 
        end_date, 
        daily_fee,
        (DATEDIFF(end_date, start_date) + 1)AS rental_duration,
        CASE
            WHEN (DATEDIFF(end_date, start_date) + 1) &gt;= 90
                THEN &#39;90일 이상&#39;
            WHEN (DATEDIFF(end_date, start_date) + 1) &gt;= 30
                THEN &#39;30일 이상&#39;
            WHEN (DATEDIFF(end_date, start_date) + 1) &gt;= 7
                THEN &#39;7일 이상&#39;
            ELSE &#39;7일 미만&#39;
        END AS duration_type
    FROM CAR_RENTAL_COMPANY_RENTAL_HISTORY AS rental_history
    INNER JOIN  CAR_RENTAL_COMPANY_CAR AS car_information
        ON rental_history.car_id = car_information.car_id
    WHERE car_information.car_type = &#39;트럭&#39;
) AS rental_history
LEFT JOIN (
    SELECT 
        car_type,
        duration_type,
        discount_rate
    FROM CAR_RENTAL_COMPANY_DISCOUNT_PLAN 
) AS rental_discount_plan
ON rental_history.car_type = rental_discount_plan.car_type
    AND rental_history.duration_type = rental_discount_plan.duration_type
LIMIT 10;</code></pre>
<hr>
<h4 id="4-최종-결과에는-두-가지-컬럼-대여-기록-idhistory_id-대여-금액fee만-있으면-된다">4. 최종 결과에는 두 가지 컬럼 <code>대여 기록 ID(HISTORY_ID)</code>, <code>대여 금액(FEE)</code>만 있으면 된다.</h4>
<p>이제 3의 결과에서 불필요한 컬럼을 날리면 아래와 같습니다.</p>
<pre><code class="language-sql">SELECT
    history_id,
    CAST(
        rental_duration * daily_fee * ((100 - ifnull(discount_rate,0)) / 100) AS DECIMAL
    ) AS fee
FROM (
    SELECT 
        history_id, 
        rental_history.car_id, 
        car_type, 
        start_date, 
        end_date, 
        daily_fee,
        (DATEDIFF(end_date, start_date) + 1)AS rental_duration,
        CASE
            WHEN (DATEDIFF(end_date, start_date) + 1) &gt;= 90
                THEN &#39;90일 이상&#39;
            WHEN (DATEDIFF(end_date, start_date) + 1) &gt;= 30
                THEN &#39;30일 이상&#39;
            WHEN (DATEDIFF(end_date, start_date) + 1) &gt;= 7
                THEN &#39;7일 이상&#39;
            ELSE &#39;7일 미만&#39;
        END AS duration_type
    FROM CAR_RENTAL_COMPANY_RENTAL_HISTORY AS rental_history
    INNER JOIN  CAR_RENTAL_COMPANY_CAR AS car_information
        ON rental_history.car_id = car_information.car_id
    WHERE car_information.car_type = &#39;트럭&#39;
) AS rental_history
LEFT JOIN (
    SELECT 
        car_type,
        duration_type,
        discount_rate
    FROM CAR_RENTAL_COMPANY_DISCOUNT_PLAN 
) AS rental_discount_plan
ON rental_history.car_type = rental_discount_plan.car_type
    AND rental_history.duration_type = rental_discount_plan.duration_type;</code></pre>
<hr>
<h4 id="5-최종-결과는-대여-기록과-금액에-대해-정렬되어야-한다fee-desc-history_id-desc">5. 최종 결과는 대여 기록과 금액에 대해 정렬되어야 한다.(FEE DESC, HISTORY_ID DESC)</h4>
<p>이제 마지막으로 결과에서 정렬하면 됩니다. 저의 경우는 주로 컬럼명 보다 번호로 진행을 합니다. 이는 좋지 않은 습관일 수도 있습니다...ㅎㅎ</p>
<pre><code class="language-sql">SELECT
    history_id,
    CAST(
        rental_duration * daily_fee * ((100 - ifnull(discount_rate,0)) / 100) AS DECIMAL
    ) AS fee
FROM (
    SELECT 
        history_id, 
        rental_history.car_id, 
        car_type, 
        start_date, 
        end_date, 
        daily_fee,
        (DATEDIFF(end_date, start_date) + 1)AS rental_duration,
        CASE
            WHEN (DATEDIFF(end_date, start_date) + 1) &gt;= 90
                THEN &#39;90일 이상&#39;
            WHEN (DATEDIFF(end_date, start_date) + 1) &gt;= 30
                THEN &#39;30일 이상&#39;
            WHEN (DATEDIFF(end_date, start_date) + 1) &gt;= 7
                THEN &#39;7일 이상&#39;
            ELSE &#39;7일 미만&#39;
        END AS duration_type
    FROM CAR_RENTAL_COMPANY_RENTAL_HISTORY AS rental_history
    INNER JOIN  CAR_RENTAL_COMPANY_CAR AS car_information
        ON rental_history.car_id = car_information.car_id
    WHERE car_information.car_type = &#39;트럭&#39;
) AS rental_history
LEFT JOIN (
    SELECT 
        car_type,
        duration_type,
        discount_rate
    FROM CAR_RENTAL_COMPANY_DISCOUNT_PLAN 
) AS rental_discount_plan
ON rental_history.car_type = rental_discount_plan.car_type
    AND rental_history.duration_type = rental_discount_plan.duration_type
ORDER BY 2 DESC, 1 DESC;</code></pre>
<hr>
<h2 id="explain으로-쿼리-효율-확인">EXPLAIN으로 쿼리 효율 확인</h2>
<p>해당 쿼리는 ALL로 풀스캔이지만, 인덱스 탈 수 없는 부분이 있어서 해당 부분은 넘어갔습니다.</p>
<table class="console-sql-result"><thead><tr><th>id</th><th>select_type</th><th>table</th><th>partitions</th><th>type</th><th>possible_keys</th><th>key</th><th>key_len</th><th>ref</th><th>rows</th><th>filtered</th><th>Extra</th></tr></thead><thead></thead><tbody><tr><td>1</td><td>SIMPLE</td><td>car_information</td><td></td><td>ALL</td><td>PRIMARY</td><td></td><td></td><td></td><td>30</td><td>10</td><td>Using where; Using temporary; Using filesort</td></tr><tr><td>1</td><td>SIMPLE</td><td>rental_history</td><td></td><td>ALL</td><td></td><td></td><td></td><td></td><td>162</td><td>10</td><td>Using where; Using join buffer (hash join)</td></tr><tr><td>1</td><td>SIMPLE</td><td>CAR_RENTAL_COMPANY_DISCOUNT_PLAN</td><td></td><td>ALL</td><td></td><td></td><td></td><td></td><td>15</td><td>100</td><td>Using where; Using join buffer (hash join)</td></tr></tbody></table>

<br/>

<h2 id="결과">결과</h2>
<p><img src="https://velog.velcdn.com/images/2h-kim/post/6c9ad1e4-1e42-4dc4-b9c6-68ccac2cca36/image.png" alt=""></p>
]]></description>
        </item>
        <item>
            <title><![CDATA[이항계수]]></title>
            <link>https://velog.io/@2h-kim/%EC%9D%B4%ED%95%AD%EA%B3%84%EC%88%98</link>
            <guid>https://velog.io/@2h-kim/%EC%9D%B4%ED%95%AD%EA%B3%84%EC%88%98</guid>
            <pubDate>Tue, 28 Mar 2023 15:32:07 GMT</pubDate>
            <description><![CDATA[<h1 id="문서-목적">문서 목적</h1>
<p>해당 문서는 수리통계학을 공부하면서, 해당 내용이 실생활에 어떻게 쓰이는지 알고 싶어서 각 항목에 따라 실생활에 대한 사용을 정리할 예정이다.</p>
<blockquote>
<p>실생활에서  사용 대부분 chatgpt의 질문을 기반으로 작성되었습니다.</p>
</blockquote>
<h1 id="이항계수">이항계수</h1>
<blockquote>
<p><strong>$(x+y)^{n}$의 이항 전개의 $x^{n-r}\cdot y^{r}$의 계수를 $\binom{n}{r}$이라고 한다.</strong>
<em>출처 : John E. Freund의 수리통계학 실제와 응용</em></p>
</blockquote>
<p>이항계수는 이항식을 이항정리로 전개했을 때 각 항의 계수를 나타낸 것으로, 조합을 통해 이항계수를 구할 수 있다.</p>
<h2 id="실생활에서-사용">실생활에서 사용</h2>
<h3 id="유전학">유전학</h3>
<p>유전학에서는 이항계수를 특정 유전자형을 구성하는 대립 유전자의 빈도를 고려하여 모집단에서 특정 유전자형이 발생할 확률을 계산하는데 사용한다.</p>
<h3 id="금융">금융</h3>
<p>금융에서 이항계수는, 특정 투자전략에 대한 가능한 결과의 수를 계산하는데 사용할 수 있으며 거래의 수와 각 거래의 손익 발생 확률에 기초한다.</p>
<h3 id="컴퓨터-과학">컴퓨터 과학</h3>
<p>컴퓨터 과학에서는 항복 집합의 가능한 조합의 수를 계산하는 데 사용될 수 있으며, 암호 생성 및 데이터 암호화와 같은 작업에 유용하다.</p>
<h3 id="일상-생활">일상 생활</h3>
<p>이항 계수 사용에 대해 대표적인 예시로는 품질 관리이다. 이항 계수는 제품 표본에서 특정 개수의 불량품 확률을 계산하는데 사용될 수 있다.
예를 들어 1000개의 제품을 생산하고, 단일 품목이 불량일 확률이 0.01이라고 가정하자. 그렇다면, 100개의 표본을 랜덤하게 선택할 경우, 정확하게 2개의 불량품을 뽑을 확률은 아래와 같다.
$$P(X=2) = \binom{2}{100} (0.01)^{2} \cdot \binom{98}{98}(1-0.01)^{98} = 4950\cdot0.0001\cdot0.3735=0.1848825$$
이 결과로 100개의 품목을 추출한 표본에서 정확하게 2개의 불량품을 찾을 확률은 약 18.5%라는 것을 얻을 수 있고, 이항 계수는 품질 관리에서 샘플 검사 결과를 기반으로 제품 배치의 적합성에 대한 실시간 결정을 내리는 데 사용된다.</p>
<h1 id="기타">기타</h1>
<blockquote>
<p>책에서의 연습문제 등에서 나온 이론</p>
</blockquote>
<h2 id="스털링의-공식">스털링의 공식</h2>
<p>n이 클 때, n!은 다음과 같은 방법으로 근사할 수 있다.
$$n! \ \approx \sqrt{2\pi n}\big(\frac{n}{e}\big)^{n}$$</p>
<h2 id="점유-이론occupancy-theory">점유 이론(occupancy theory)</h2>
<p>고정된 수의 객체를 고정된 수의 컨테이너에 분배하는 동시에 각 컨테이너에 적어도 하나의 객체가 있는지 확인하는 방법을 다루는 수학 문제이다.
예를 들어, 6개의 서로 다른 색상의 공과 3개의 구별할 수 없는 상자가 있다고 가정했을 때, 각 상자에 적어도 하나의 공이 있도록 공을 상자에 넣는 방법은 몇가지 일까?
해당 답은 점유 공식에 의해 제공되는데, k개의 컨테이너에 n개의 객체를 분배하는 방법의 수는 아래와 같은 공식에 의해 10가지 방법이 나온다. 따라서 공을 상자에 분배하는 10가지 방법이 있으므로 각 상자에 공이 적어도 하나 있다.
$$O(n, k) = \binom{n-1}{k-1}$$</p>
<h3 id="실생활-사용">실생활 사용</h3>
<p>점유 이론의 실제 사용의 예는 생태학의 점유 데이터 분석에 있다. 생태학는 점유 데이터를 이용하여 다양한 서식지에서 종의 분포와 풍부함을 연구하고자 한다. 이러한 상황에서 점유 이론은 종 발생 패턴을 모델링하고 주어진 지역에서 종의 수를 추정하는 데 유용한 프레임워크를 제공한다.</p>
<p>예를 들어 일련의 조사에서 각 종이 발견된 횟수를 기반으로 숲에 있는 새 종의 수를 추정한다고 가정하자. 각 종이 각 조사에 존재할 확률이 있다고 가정하면 이항 분포를 사용하여 탐지 과정을 모델링할 수 있다. 그런 다음 점유 이론을 사용하여 관찰된 데이터를 기반으로 숲의 총 종 수를 추정할 수 있다.</p>
<p>구체적으로 각 종의 탐지 확률이 같다고 가정하고 적어도 하나의 종이 탐지된 n개 조사 중 k개를 관찰하면 점유 공식을 사용하여 추정된 종 수(S)를 계산할 수 있다.
$$S = k \cdot \ln(\frac{n}{k})$$</p>
<p>이 공식은 추정된 종의 수는 전체 조사 수뿐만 아니라 적어도 하나의 종이 감지된 조사 수에 따라 다르다는 것을 의미한다. 이 공식을 사용하여 생태학자는 숲에 있는 새 종의 수를 추정하고 다양한 서식지와 지역에 걸쳐 추정치를 비교할 수 있다.</p>
]]></description>
        </item>
        <item>
            <title><![CDATA[RDD]]></title>
            <link>https://velog.io/@2h-kim/RDD</link>
            <guid>https://velog.io/@2h-kim/RDD</guid>
            <pubDate>Sun, 26 Feb 2023 09:32:39 GMT</pubDate>
            <description><![CDATA[<h1 id="문서-목적">문서 목적</h1>
<p>해당 문서는 RDD에 대해 정리하기 위해 작성된 문서이다.</p>
<h1 id="rddresilient-distributed-dataset">RDD(resilient distributed dataset)</h1>
<p>RDD의 경우, Spark 1.0 부터 지원하던 기본적인 데이터 구조로 아래와 같은 성질이 있다.</p>
<ul>
<li>Immutable : 읽기 전용</li>
<li>Resilient : 장애 내성</li>
<li>Distributed : 분산 저장된 데이터 셋</li>
</ul>
<p>한 번 생성된 RDD의 경우, 불변의 성질이 있으며 변환된 RDD의 경우 overwrite 방식이 아닌 새로운 RDD 객체를 가진다. 또한, RDD의 경우 복원성을 부여하는데, 노드에 장애가 발생하여도 유실된 RDD를 원래대로 복구할 수 있다.</p>
<blockquote>
<p>e.g.</p>
</blockquote>
<ul>
<li>RDD 불변성<pre><code class="language-python">data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
# type distData(ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274)
distData.map(lambda x: x*2)
# type PythonRDD[1] at RDD at PythonRDD.scala:53
distData.collect()
# return [1, 2, 3, 4, 5]</code></pre>
</li>
</ul>
<h2 id="rdd-연산자">RDD 연산자</h2>
<p>Spark의 경우, 기본적으로 Lazy evaluation 개념이 존재한다. 이는 행동 연산자를 호출하기 전까지는 실제로 실행하지 않는다는 것으로, 만약 행동 연산자 호출이 일어나게 되면 RDD 계보를 확인하고 이를 바탕으로 연산 그래프를 작성 및 계산한다.</p>
<h3 id="변환-연산자">변환 연산자</h3>
<p>RDD의 데이터를 조작해 새로운 RDD를 생성(filter, map ...)</p>
<h3 id="행동-연산자">행동 연산자</h3>
<p>계산 결과 반환 또는 RDD에 특정 작업을 수행하려고 실제 계산(count, foreach...)</p>
]]></description>
        </item>
        <item>
            <title><![CDATA[PySpark TestCode]]></title>
            <link>https://velog.io/@2h-kim/PySpark-TestCode</link>
            <guid>https://velog.io/@2h-kim/PySpark-TestCode</guid>
            <pubDate>Mon, 20 Feb 2023 16:07:58 GMT</pubDate>
            <description><![CDATA[<h1 id="문서-목적">문서 목적</h1>
<p>해당 문서는 PySpark를 이용하여 UnitTest 하는 방법을 정리 하기 위해 작성된 문서이다.</p>
<h1 id="unittest">UnitTest</h1>
<ul>
<li>단위 테스트는 모듈 또는 응용 프로그램 내의 개별 코드 단위가 예상대로 잘 작동하는지 검증하는 절차</li>
<li>Python에서 UnitTest의 경우, <code>unittest.TestCase</code>를 상속받는 클래스 형태로 정의한다.</li>
</ul>
<h2 id="unittest를-이용한-testcode">UnitTest를 이용한 TestCode</h2>
<pre><code class="language-python">import unittest
import pyspark.sql.functions as F
from pyspark.sql import SparkSession, DataFrame

def assert_test_data_ignore_ordering(
    df1: DataFrame, df2: DataFrame
):
    print(&#39;test assert_test_data_ignore_ordering&#39;)
    df3 = df1.subtract(df2)
    df4 = df2.subtract(df1)
    df3_count = df3.count()
    df4_count = df4.count()
    if (df3_count == 0) and (df4_count == 0):
        assert True
    else:      
        assert False

def assert_test_data_with_ordering(
    df1: DataFrame, df2: DataFrame
):
    print(&#39;test assert_test_data_with_ordering&#39;)
    df1_collect = df1.collect()
    df2_collect = df2.collect()

    for row_index in range(len(df1_collect)):
        for column_name in df1.columns:
            left_cell = df1_collect[row_index][column_name]
            right_cell = df2_collect[row_index][column_name]
            if left_cell == right_cell:
                assert True
            elif left_cell is None and right_cell is None:
                assert True

            else:
                msg = f&quot;Data mismatch\n\nRow = {row_index + 1} : Column = {column_name}\n\nACTUAL: {left_cell}\nEXPECTED: {right_cell}\n&quot;
                assert False, msg


class WordCountSparkTestCode(unittest.TestCase):
    def setUp(self) -&gt; None:
        self.spark = SparkSession.builder \
            .master(&#39;local[*]&#39;).appName(&#39;WordCountTest&#39;) \
            .getOrCreate()

    def tearDown(self) -&gt; None:
        self.spark.stop()

    def test_word_count(self):
        word_count = self.spark.createDataFrame(
            data= [
                (&#39;Hello World&#39;,),
                (&#39;The world is wide&#39;,),
                (&#39;This is for word count test&#39;,)
            ],
            schema=[&#39;text&#39;]
        )

        word_count_2 = word_count.select(
            F.split(F.lower(&#39;text&#39;), &#39; &#39;).alias(&#39;word&#39;)
        ).select(F.explode(&#39;word&#39;).alias(&#39;word&#39;)).groupBy(
            F.trim(&#39;word&#39;).alias(&#39;word&#39;)
        ).count().orderBy(
            [&#39;count&#39;, &#39;word&#39;], ascending=[False, True]
        )

        result_1 = self.spark.createDataFrame(
            data = [
                (&#39;is&#39;, 2),
                (&#39;world&#39;, 2),
                (&#39;count&#39;, 1),
                (&#39;for&#39;, 1),
                (&#39;hello&#39;, 1),
                (&#39;test&#39;, 1),
                (&#39;the&#39;, 1),
                (&#39;this&#39;, 1),
                (&#39;wide&#39;, 1),
                (&#39;word&#39;, 1),
            ],
            schema=[&#39;word&#39;, &#39;count&#39;]
        )

        assert_test_data_ignore_ordering(word_count_2, result_1)
        assert_test_data_with_ordering(word_count_2, result_1)

if __name__ == &#39;__main__&#39;:
    unittest.main()</code></pre>
]]></description>
        </item>
        <item>
            <title><![CDATA[YARN]]></title>
            <link>https://velog.io/@2h-kim/YARN</link>
            <guid>https://velog.io/@2h-kim/YARN</guid>
            <pubDate>Fri, 06 Jan 2023 07:00:08 GMT</pubDate>
            <description><![CDATA[<h1 id="문서-목적">문서 목적</h1>
<p>해당 문서는 EMR에서 resource manager capacity scheduler 부분에서 queue 부분이 막혀서, 이 부분을 해결하기 위해 공부한 내용을 정리하고자 작성된 문서이다. 해당 내용은 <code>하둡 완벽 가이드</code>책을 기반으로 정리한 내용이다.</p>
<blockquote>
<p><a href="https://www.aladin.co.kr/shop/wproduct.aspx?ItemId=103031150">https://www.aladin.co.kr/shop/wproduct.aspx?ItemId=103031150</a></p>
</blockquote>
<h1 id="yarnyet-anoter-resource-negotiator">YARN(Yet Anoter Resource Negotiator)</h1>
<p>하둡 클러스터 자원 관리 시스템으로, 맵리듀스의 성능을 높이기 위해 하둡2에서 처음 도입되었으나, 맵리듀스뿐만 아니라 다른 분산 컴퓨팅 도구도 지원한다.
클러스터의 자원을 요청하고 사용하기 위한 API를 제공하지만, 사용자 코드에서 직접 해당 API를 사용할 수 없다. YARN이 내장된 분산 컴퓨팅 프레임워크에서 고수준 API를 작성해야 하며, 사용자는 자원 관리의 자세한 내용은 알 수 없다.
애플리케이션은 YARN과 HDFS/HBase 위에서 YARN 애플리케이션을 실행한다.
<img src="https://velog.velcdn.com/images/2h-kim/post/843af5f6-8b61-453b-9c40-c2e7bd9cc132/image.png" alt=""></p>
<h2 id="yarn-application-수행">YARN Application 수행</h2>
<p><strong>YARN 구성</strong></p>
<ul>
<li>Resource Manager
클러스터에 유일하게 실행되며, 클러스터 전체 자원의 사용량을 관리</li>
<li>Node Manager
모든 머신에서 실행되며, 컨테이너를 구동하고 모니터링</li>
</ul>
<p>YARN 자체는 애플리케이션이 서로 통신하는 기능을 제공하지 않고, 하둡의 RPC와 같은 원격 호출 방식을 이용하여 상태 변경을 전달하고 클라이언트로부터 결과를 받는데, 애플리케이션마다 다르다.<br><img src="https://velog.velcdn.com/images/2h-kim/post/b79c471b-e4ff-45ec-93d2-985d0ae3c5b9/image.png" alt=""></p>
<h2 id="yarn-스캐줄링">YARN 스캐줄링</h2>
<h3 id="scheduler-option">Scheduler Option</h3>
<ul>
<li>FIFO<ul>
<li>애플리케이션을 큐에 하나씩 넣고 제출된 순서에 따라 순차적으로 실행</li>
<li>대형 애플리케이션이 수행될 때 클러스터의 모든 자원을 점유해버릴 수 있기 때문에 클러스터 환경에서는 적합하지 않음.</li>
</ul>
</li>
<li>Capacity<ul>
<li>트리 형태의 큐를 선언하고, 큐별로 사용 가능한 용량을 할당하여 YARN 자원을 관리 하는 것으로, 각 큐는 서로 분리되어 있으며 단일 큐 내부에서는 FIFO 방식으로 스케쥴링이 된다.</li>
</ul>
</li>
<li>Fair<ul>
<li>모든 잡의 자원을 동적으로 분배하기 때문에 미리 자원의 가용량을 예약할 필요가 없음.</li>
<li>대형 애플리케이션이 시작되면 클러스터의 모든 자원을 사용하다가, 다른 애플리케이션이 추가로 시작되면 클러스터 자원의 절반을 이 애플리케이션에 할당하여 각 애플리케이션은 클러스터의 자원을 공평하게 사용할 수 있게 된다.</li>
</ul>
</li>
</ul>
<p><img src="https://velog.velcdn.com/images/2h-kim/post/f9ad314a-2a4f-4355-ab33-6b2feef58452/image.png" alt=""></p>
<blockquote>
<p><a href="https://tutorials.freshersnow.com/hadoop-tutorial/hadoop-schedulers/">https://tutorials.freshersnow.com/hadoop-tutorial/hadoop-schedulers/</a></p>
</blockquote>
<h4 id="capacity-scheduler">Capacity Scheduler</h4>
<blockquote>
<p><strong>Capactity Scheduler 적용 방법</strong>
<code>conf/yarn-site.xml</code>에서 아래와 같이 설정 필요</p>
</blockquote>
<pre><code class="language-xml">&lt;property&gt;
  &lt;name&gt;yarn.resourcemanager.scheduler.class&lt;/name&gt;
  &lt;value&gt;org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler&lt;/value&gt;
&lt;/property&gt;</code></pre>
<p>Capacity Scheduler를 사용하면 조직 체계에 맞게 하둡 클러스터를 공유할 수 있다. 각 큐마다 전체 클러스터의 지정된 가용량을 미리 할당 받고, 분리된 전용 큐에서 가용량의 지정된 부분을 사용하도록 설정할 수 있다.
하나의 단일 애플리케이션은 큐의 가용량을 넘는 자원을 사용할 수 없으나, 큐 탄력성(queue elasticty)를 이용하여 다수의 애플리케이션이 존재하고 현재 가용할 수 있는 자원이 클러스터에 남아 있다면 스케줄러는 여분의 자원 할당할 수 있다.
<code>capacity-scheduler.xml</code>을 이용하여 capacity scheduler의 설정을 진행하는데, property별 의미는 아래와 같다.</p>
<ul>
<li><a href="https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/CapacityScheduler.html">https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/CapacityScheduler.html</a><blockquote>
<p>참고 사항</p>
<ul>
<li>leaf끼리의 min capacity의 합은 100이 되어야 함</li>
</ul>
</blockquote>
<h5 id="capacity-scheduler-example">Capacity Scheduler Example</h5>
아래 내용을 기반으로 capacity-scheduler.xml 작성 예시
<img src="https://velog.velcdn.com/images/2h-kim/post/9c639651-be96-442f-925e-1614da2af11d/image.png" alt=""></li>
</ul>
<pre><code class="language-xml">&lt;configuration&gt;
  &lt;property&gt;
    &lt;name&gt;yarn.scheduler.capacity.root.queues&lt;/name&gt;
    &lt;value&gt;analysis,workflow,sparksql&lt;/value&gt;
  &lt;/property&gt;
  &lt;property&gt;
    &lt;name&gt;yarn.scheduler.capacity.root.analysis.queues&lt;/name&gt;
    &lt;value&gt;worker01,worker02&lt;/value&gt;
  &lt;/property&gt;
  &lt;property&gt;
    &lt;name&gt;yarn.scheduler.capacity.root.workflow.queues&lt;/name&gt;
    &lt;value&gt;etl,ingest&lt;/value&gt;
  &lt;/property&gt;
  &lt;property&gt;
    &lt;name&gt;yarn.scheduler.capacity.root.analysis.capacity&lt;/name&gt;
    &lt;value&gt;20&lt;/value&gt;
  &lt;/property&gt;
  &lt;property&gt;
    &lt;name&gt;yarn.scheduler.capacity.root.analysis.maximum-capacity&lt;/name&gt;
    &lt;value&gt;50&lt;/value&gt;
  &lt;/property&gt;
  &lt;property&gt;
    &lt;name&gt;yarn.scheduler.capacity.root.workflow.capacity&lt;/name&gt;
    &lt;value&gt;60&lt;/value&gt;
  &lt;/property&gt;
  &lt;property&gt;
    &lt;name&gt;yarn.scheduler.capacity.root.workflow.maximum-capacity&lt;/name&gt;
    &lt;value&gt;90&lt;/value&gt;
  &lt;/property&gt;
  &lt;property&gt;
    &lt;name&gt;yarn.scheduler.capacity.root.sparksql.capacity&lt;/name&gt;
    &lt;value&gt;20&lt;/value&gt;
  &lt;/property&gt;
  &lt;property&gt;
    &lt;name&gt;yarn.scheduler.capacity.root.sparksql.maximum-capacity&lt;/name&gt;
    &lt;value&gt;80&lt;/value&gt;
  &lt;/property&gt;
  &lt;property&gt;
    &lt;name&gt;yarn.scheduler.capacity.root.analysis.worker01.capacity&lt;/name&gt;
    &lt;value&gt;50&lt;/value&gt;
  &lt;/property&gt;
  &lt;property&gt;
    &lt;name&gt;yarn.scheduler.capacity.root.analysis.worker01.maximum-capacity&lt;/name&gt;
    &lt;value&gt;80&lt;/value&gt;
  &lt;/property&gt;
  &lt;property&gt;
    &lt;name&gt;yarn.scheduler.capacity.root.analysis.worker02.capacity&lt;/name&gt;
    &lt;value&gt;50&lt;/value&gt;
  &lt;/property&gt;
  &lt;property&gt;
    &lt;name&gt;yarn.scheduler.capacity.root.analysis.worker02.maximum-capacity&lt;/name&gt;
    &lt;value&gt;80&lt;/value&gt;
  &lt;/property&gt;
  &lt;property&gt;
    &lt;name&gt;yarn.scheduler.capacity.root.workflow.etl.capacity&lt;/name&gt;
    &lt;value&gt;65&lt;/value&gt;
  &lt;/property&gt;
  &lt;property&gt;
    &lt;name&gt;yarn.scheduler.capacity.root.workflow.etl.maximum-capacity&lt;/name&gt;
    &lt;value&gt;80&lt;/value&gt;
  &lt;/property&gt;
  &lt;property&gt;
    &lt;name&gt;yarn.scheduler.capacity.root.workflow.ingest.capacity&lt;/name&gt;
    &lt;value&gt;35&lt;/value&gt;
  &lt;/property&gt;
  &lt;property&gt;
    &lt;name&gt;yarn.scheduler.capacity.root.workflow.ingest.maximum-capacity&lt;/name&gt;
    &lt;value&gt;50&lt;/value&gt;
  &lt;/property&gt;
&lt;/configuration&gt;</code></pre>
<p><strong>적용된 큐 확인</strong></p>
<pre><code class="language-bash">mapred queue -list</code></pre>
<h4 id="fair-scheduler">Fair Scheduler</h4>
<p>페어 스케줄러는 실행 중인 모든 애플리케이션에서 동일하게 자원을 할당한다.
활성화 방법으로는 <code>conf/yarn-site.xml</code>에서 아래와 같이 설정 필요</p>
<pre><code class="language-xml">&lt;property&gt;
  &lt;name&gt;yarn.resourcemanager.scheduler.class&lt;/name&gt;
  &lt;value&gt;org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler&lt;/value&gt;
&lt;/property&gt;</code></pre>
]]></description>
        </item>
        <item>
            <title><![CDATA[PySpark on mac]]></title>
            <link>https://velog.io/@2h-kim/PySpark-on-mac</link>
            <guid>https://velog.io/@2h-kim/PySpark-on-mac</guid>
            <pubDate>Sun, 27 Nov 2022 08:30:30 GMT</pubDate>
            <description><![CDATA[<h1 id="문서-목적">문서 목적</h1>
<p>해당 문서는 m1 맥북프로에서 pyspark 환경 설정을 진행한 내용을 정리하기 위해 작성된 문서입니다.(보통 로컬 설치 안하지만 필요시를 위해)</p>
<blockquote>
<ul>
<li>MacBook Pro</li>
</ul>
</blockquote>
<ul>
<li>14형, 2021년 모델</li>
<li>Apple M1 Pro 칩</li>
<li>메모리 32GB</li>
</ul>
<h1 id="python-install">python install</h1>
<pre><code class="language-bash"># https://docs.conda.io/en/latest/miniconda.html
$ wget https://repo.anaconda.com/miniconda/Miniconda3-py38_4.12.0-MacOSX-arm64.sh
$ ./Miniconda3-py38_4.12.0-MacOSX-arm64.sh</code></pre>
<h1 id="java-install">java install</h1>
<ul>
<li>출처 : <a href="https://bcp0109.tistory.com/302">https://bcp0109.tistory.com/302</a><pre><code class="language-bash">$ brew tap adoptopenjdk/openjdk
$ brew install --cask adoptopenjdk11</code></pre>
아래 명령어로 잘 설치되었는지 확인<pre><code class="language-bash">java --version
openjdk 11.0.11 2021-04-20
OpenJDK Runtime Environment AdoptOpenJDK-11.0.11+9 (build 11.0.11+9)
OpenJDK 64-Bit Server VM AdoptOpenJDK-11.0.11+9 (build 11.0.11+9, mixed mode)</code></pre>
</li>
</ul>
<h1 id="spark-install">spark install</h1>
<ul>
<li>spark 버전 및 하둡버전은 직접 선택(<a href="https://spark.apache.org/downloads.html">https://spark.apache.org/downloads.html</a>)<pre><code class="language-bash">$ wget https://dlcdn.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
$ tar xvf spark-3.3.1-bin-hadoop3.tgz
# ~/.zshrc에 아래 정보 추가
SPARK_HOME=$CURRENT_DIR/spark-3.3.1-bin-hadoop3
HADOOP_HOME=$CURRENT_DIR/spark-3.3.1-bin-hadoop3
PATH=CURRENT_DIR/spark-3.3.1-bin-hadoop3/bin:$PATH</code></pre>
</li>
</ul>
<p>아래처럼 나오면 setting 완료</p>
<pre><code class="language-bash">$ spark-submit --version
22/11/27 16:44:25 WARN Utils: Your hostname, hyunhoui-MacBookPro.local resolves to a loopback address: 127.0.0.1; using 172.20.10.6 instead (on interface en0)
22/11/27 16:44:25 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  &#39;_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.3.1
      /_/

Using Scala version 2.12.15, OpenJDK 64-Bit Server VM, 11.0.11
Branch HEAD
Compiled by user yumwang on 2022-10-15T09:47:01Z
Revision fbbcf9434ac070dd4ced4fb9efe32899c6db12a9
Url https://github.com/apache/spark
Type --help for more information.</code></pre>
<h1 id="pyspark-env">pyspark env</h1>
<pre><code class="language-bash">$ conda create -n pyspark python=3.8 -y
$ conda activate pyspark
$ pip install pyspark</code></pre>
<p>잘 생성되었는지 확인</p>
<pre><code class="language-bash">$ python
&gt;&gt;&gt; from pyspark.sql import SparkSession
&gt;&gt;&gt; spark = SparkSession.builder.appName(&#39;test-spark&#39;).master(&#39;local&#39;).getOrCreate()
&gt;&gt;&gt; data = spark.read.csv(CSV_PATH, header=True)
&gt;&gt;&gt; data.show(1, False, vertical=True)</code></pre>
<h1 id="aws-s3와-연결할-수-있게-설정">aws s3와 연결할 수 있게 설정</h1>
<ul>
<li>다운로드 : <a href="https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws">https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws</a></li>
<li>참고 문서 : <a href="https://jhleeeme.github.io/read-aws-s3-data-on-spark/">https://jhleeeme.github.io/read-aws-s3-data-on-spark/</a></li>
</ul>
<p>결과 확인</p>
<pre><code class="language-python">from pyspark.sql import SparkSession


spark = SparkSession.builder.appName(&#39;test-spark&#39;).master(&#39;local&#39;).getOrCreate()

data = spark.read.csv(&quot;s3a://test-hyunho/data/OnlineRetail.csv&quot;, header=True)
data.show(1, False, vertical=True)
-RECORD 0-----------------------------------------
 InvoiceNo   | 536365
 StockCode   | 85123A
 Description | WHITE HANGING HEART T-LIGHT HOLDER
 Quantity    | 6
 InvoiceDate | 12/1/2010 8:26
 UnitPrice   | 2.55
 CustomerID  | 17850
 Country     | United Kingdom
only showing top 1 row</code></pre>
]]></description>
        </item>
        <item>
            <title><![CDATA[Terraform - EMR]]></title>
            <link>https://velog.io/@2h-kim/Terraform-EMR</link>
            <guid>https://velog.io/@2h-kim/Terraform-EMR</guid>
            <pubDate>Sat, 26 Nov 2022 11:43:55 GMT</pubDate>
            <description><![CDATA[<h1 id="문서-목적">문서 목적</h1>
<p>해당 문서는 terraform으로 EMR 구축에 대해 테스트해보고 정리하기 위해 작성된 문서입니다.</p>
<h2 id="terraform-이란">Terraform 이란</h2>
<blockquote>
<p>Terraform은 클라우드 및 온프리미엄 리소스를 안전하고 효율적으로 구축, 변경 및 버전화할 수 있는 코드 도구로서의 인프라입니다.
<a href="https://www.terraform.io/">https://www.terraform.io/</a></p>
</blockquote>
<h2 id="terraform-installmac">Terraform install[mac]</h2>
<pre><code class="language-bash">brew install terraform</code></pre>
<h2 id="code">Code</h2>
<blockquote>
<p>terraform 공부 겸, 작성된 코드라 틀릴 수도 있습니다.</p>
</blockquote>
<ul>
<li><a href="https://github.com/2h-kim/emr-terraform-test">https://github.com/2h-kim/emr-terraform-test</a></li>
</ul>
<h1 id="참고-문서">참고 문서</h1>
<ul>
<li><a href="https://www.terraform.io/">https://www.terraform.io/</a></li>
<li><a href="https://medium.com/idealo-tech-blog/using-terraform-to-quick-start-pyspark-on-aws-2bc8ce9dcac">https://medium.com/idealo-tech-blog/using-terraform-to-quick-start-pyspark-on-aws-2bc8ce9dcac</a></li>
<li><a href="https://github.com/idealo/terraform-emr-pyspark">https://github.com/idealo/terraform-emr-pyspark</a></li>
</ul>
]]></description>
        </item>
        <item>
            <title><![CDATA[Kafka Source Connector - Debezium]]></title>
            <link>https://velog.io/@2h-kim/Kafka-Source-Connector-Debezium</link>
            <guid>https://velog.io/@2h-kim/Kafka-Source-Connector-Debezium</guid>
            <pubDate>Sat, 26 Nov 2022 10:28:25 GMT</pubDate>
            <description><![CDATA[<h2 id="debezium">Debezium</h2>
<blockquote>
<p>Stream changes from your database.</p>
<p>Debezium is an open source distributed platform for change data capture. Start it up, point it at your databases, and your apps can start responding to all of the inserts, updates, and deletes that other apps commit to your databases. Debezium is durable and fast, so your apps can respond quickly and never miss an event, even when things go wrong.</p>
</blockquote>
<ul>
<li>데이터베이스의 변경사항을 캡처하는 분산 서비스</li>
<li>각 데이터베이스 테이블 내의 모든 행 수준 변경을 변경 이벤트 스트림에 기록하고, 애플리케이션은 이러한 스트림을 읽기만 하면 변경 이벤트가 발생한 동일한 순서로 변경 이벤트를 볼 수 있다.</li>
</ul>
<h3 id="debezium-with-docker">Debezium with docker</h3>
<p>기존에 진행했던 도커 이미지와 연결되는 kafka source connector server를 제작하고자 한다.</p>
<h4 id="mysql-setting">MySQL setting</h4>
<p>Debezium 설정 전, mysql을 docker로 띄워서 진행할 것이며, cdc가 이루어 질 수 있는 환경을 설정하고자 한다.</p>
<h5 id="docker-image-생성">Docker image 생성</h5>
<pre><code class="language-bash">docker run \
    --name mysql-container \
    -e MYSQL_ROOT_PASSWORD=admin \
    -e MYSQL_USER=debezium \
    -e MYSQL_PASSWORD=debezium \
    -d -p 3306:3306 \
    mysql:latest</code></pre>
<h5 id="docker-image-접속--database-table-생성">Docker image 접속 &amp; database, table 생성</h5>
<pre><code class="language-bash"># exec docker image
docker exec -it mysql-container /bin/bash

## in docker bash
mysql -u root -p 
</code></pre>
<h6 id="이진-로그-설정-여부">이진 로그 설정 여부</h6>
<pre><code class="language-sql">SELECT variable_value
FROM performance_schema.global_variables
WHERE variable_name=&#39;log_bin&#39;;</code></pre>
<pre><code class="language-bash">+----------------+
| variable_value |
+----------------+
| ON             |
+----------------+</code></pre>
<h6 id="이진-로깅-형식-확인">이진 로깅 형식 확인</h6>
<pre><code class="language-sql">SELECT variable_value
FROM performance_schema.global_variables
WHERE variable_name=&#39;binlog_format&#39;;</code></pre>
<pre><code class="language-bash">+----------------+
| variable_value |
+----------------+
| ROW            |
+----------------+</code></pre>
<h6 id="database-및-table-생성">database 및 table 생성</h6>
<pre><code class="language-sql">CREATE DATABASE debezium_test;
USE debezium_test;
CREATE TABLE IF NOT EXISTS TestBinLogStream(
    id int(10) NOT NULL AUTO_INCREMENT PRIMARY KEY,
    status varchar(30),
    createdAt TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP 
                     ON UPDATE CURRENT_TIMESTAMP
);</code></pre>
<h5 id="기존-kafka-클러스터와-네트워크-연결쉽게-접속을-위해">기존 kafka 클러스터와 네트워크 연결(쉽게 접속을 위해)</h5>
<p>(실제로는 kafka-source-connector과 mysql, source-connector과 kafka클러스터 이렇게 네트워크가 연결되면 될 것 같다.)</p>
<pre><code class="language-bash">docker network connect test-network mysql-container</code></pre>
<h4 id="debezium-docker-image-생성">Debezium Docker image 생성</h4>
<ul>
<li><a href="https://github.com/2h-kim/kafka-personal-study/tree/main/kafka-source-debezium">https://github.com/2h-kim/kafka-personal-study/tree/main/kafka-source-debezium</a></li>
</ul>
<h3 id="연결-test">연결 Test</h3>
<h4 id="sql-실행">SQL 실행</h4>
<pre><code class="language-sql">INSERT INTO TestBinLogStream (status) VALUES (&quot;submitted&quot;);
INSERT INTO TestBinLogStream (status) VALUES (&quot;submitted&quot;);
INSERT INTO TestBinLogStream (status) VALUES (&quot;submitted&quot;);
UPDATE TestBinLogStream SET status = &#39;pickedUp&#39; WHERE id=1;
INSERT INTO TestBinLogStream (status) VALUES (&quot;submitted&quot;);
INSERT INTO TestBinLogStream (status) VALUES (&quot;submitted&quot;);
UPDATE TestBinLogStream SET status = &#39;pickedUp&#39; WHERE id=2;
UPDATE TestBinLogStream SET status = &#39;completed&#39; WHERE id=1;
UPDATE TestBinLogStream SET status = &#39;completed&#39; WHERE id=2;
UPDATE TestBinLogStream SET status = &#39;pickedUp&#39; WHERE id=3;
UPDATE TestBinLogStream SET status = &#39;pickedUp&#39; WHERE id=4;
UPDATE TestBinLogStream SET status = &#39;completed&#39; WHERE id=3;
UPDATE TestBinLogStream SET status = &#39;pickedUp&#39; WHERE id=5;
UPDATE TestBinLogStream SET status = &#39;completed&#39; WHERE id=4;
UPDATE TestBinLogStream SET status = &#39;completed&#39; WHERE id=5;
DELETE FROM TestBinLogStream WHERE id = 3;</code></pre>
<h4 id="결과-확인">결과 확인</h4>
<pre><code class="language-bash">$KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server kafka_tutorial:9092 --list
__consumer_offsets
connect-configs
connect-offsets
connect-status
debezium_test
debezium_test.debezium_test.TestBinLogStream
kafka.client.tutorial
kafka.client.tutorial.consumer
schemahistory.debezium_test
schemahistory.testdb
stream_log
streams-application-__assignor-__leader
test_kafka
topic_kafka_test
# comsumer 
$KAFKA_HOME/kafka_2.12-3.3.1/bin/kafka-console-consumer.sh  --bootstrap-server kafka_tutorial:9092  --topic debezium_test.debezium_test.TestBinLogStream  --from-beginning
{&quot;schema&quot;:{&quot;type&quot;:&quot;struct&quot;,&quot;fields&quot;:[{&quot;type&quot;:&quot;struct&quot;,&quot;fields&quot;:[{&quot;type&quot;:&quot;int32&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;id&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:true,&quot;field&quot;:&quot;status&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:true,&quot;name&quot;:&quot;io.debezium.time.ZonedTimestamp&quot;,&quot;version&quot;:1,&quot;field&quot;:&quot;createdAt&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:true,&quot;name&quot;:&quot;io.debezium.time.ZonedTimestamp&quot;,&quot;version&quot;:1,&quot;field&quot;:&quot;updated_at&quot;}],&quot;optional&quot;:true,&quot;name&quot;:&quot;debezium_test.debezium_test.TestBinLogStream.Value&quot;,&quot;field&quot;:&quot;before&quot;},{&quot;type&quot;:&quot;struct&quot;,&quot;fields&quot;:[{&quot;type&quot;:&quot;int32&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;id&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:true,&quot;field&quot;:&quot;status&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:true,&quot;name&quot;:&quot;io.debezium.time.ZonedTimestamp&quot;,&quot;version&quot;:1,&quot;field&quot;:&quot;createdAt&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:true,&quot;name&quot;:&quot;io.debezium.time.ZonedTimestamp&quot;,&quot;version&quot;:1,&quot;field&quot;:&quot;updated_at&quot;}],&quot;optional&quot;:true,&quot;name&quot;:&quot;debezium_test.debezium_test.TestBinLogStream.Value&quot;,&quot;field&quot;:&quot;after&quot;},{&quot;type&quot;:&quot;struct&quot;,&quot;fields&quot;:[{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;version&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;connector&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;name&quot;},{&quot;type&quot;:&quot;int64&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;ts_ms&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:true,&quot;name&quot;:&quot;io.debezium.data.Enum&quot;,&quot;version&quot;:1,&quot;parameters&quot;:{&quot;allowed&quot;:&quot;true,last,false,incremental&quot;},&quot;default&quot;:&quot;false&quot;,&quot;field&quot;:&quot;snapshot&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;db&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:true,&quot;field&quot;:&quot;sequence&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:true,&quot;field&quot;:&quot;table&quot;},{&quot;type&quot;:&quot;int64&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;server_id&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:true,&quot;field&quot;:&quot;gtid&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;file&quot;},{&quot;type&quot;:&quot;int64&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;pos&quot;},{&quot;type&quot;:&quot;int32&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;row&quot;},{&quot;type&quot;:&quot;int64&quot;,&quot;optional&quot;:true,&quot;field&quot;:&quot;thread&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:true,&quot;field&quot;:&quot;query&quot;}],&quot;optional&quot;:false,&quot;name&quot;:&quot;io.debezium.connector.mysql.Source&quot;,&quot;field&quot;:&quot;source&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;op&quot;},{&quot;type&quot;:&quot;int64&quot;,&quot;optional&quot;:true,&quot;field&quot;:&quot;ts_ms&quot;},{&quot;type&quot;:&quot;struct&quot;,&quot;fields&quot;:[{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;id&quot;},{&quot;type&quot;:&quot;int64&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;total_order&quot;},{&quot;type&quot;:&quot;int64&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;data_collection_order&quot;}],&quot;optional&quot;:true,&quot;name&quot;:&quot;event.block&quot;,&quot;version&quot;:1,&quot;field&quot;:&quot;transaction&quot;}],&quot;optional&quot;:false,&quot;name&quot;:&quot;debezium_test.debezium_test.TestBinLogStream.Envelope&quot;,&quot;version&quot;:1},&quot;payload&quot;:{&quot;before&quot;:null,&quot;after&quot;:{&quot;id&quot;:1,&quot;status&quot;:&quot;completed&quot;,&quot;createdAt&quot;:&quot;2022-11-26T10:12:19Z&quot;,&quot;updated_at&quot;:&quot;2022-11-26T10:12:19Z&quot;},&quot;source&quot;:{&quot;version&quot;:&quot;2.0.0.Final&quot;,&quot;connector&quot;:&quot;mysql&quot;,&quot;name&quot;:&quot;debezium_test&quot;,&quot;ts_ms&quot;:1669458279000,&quot;snapshot&quot;:&quot;first&quot;,&quot;db&quot;:&quot;debezium_test&quot;,&quot;sequence&quot;:null,&quot;table&quot;:&quot;TestBinLogStream&quot;,&quot;server_id&quot;:0,&quot;gtid&quot;:null,&quot;file&quot;:&quot;binlog.000002&quot;,&quot;pos&quot;:19791,&quot;row&quot;:0,&quot;thread&quot;:null,&quot;query&quot;:null},&quot;op&quot;:&quot;r&quot;,&quot;ts_ms&quot;:1669458279803,&quot;transaction&quot;:null}}
{&quot;schema&quot;:{&quot;type&quot;:&quot;struct&quot;,&quot;fields&quot;:[{&quot;type&quot;:&quot;struct&quot;,&quot;fields&quot;:[{&quot;type&quot;:&quot;int32&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;id&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:true,&quot;field&quot;:&quot;status&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:true,&quot;name&quot;:&quot;io.debezium.time.ZonedTimestamp&quot;,&quot;version&quot;:1,&quot;field&quot;:&quot;createdAt&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:true,&quot;name&quot;:&quot;io.debezium.time.ZonedTimestamp&quot;,&quot;version&quot;:1,&quot;field&quot;:&quot;updated_at&quot;}],&quot;optional&quot;:true,&quot;name&quot;:&quot;debezium_test.debezium_test.TestBinLogStream.Value&quot;,&quot;field&quot;:&quot;before&quot;},{&quot;type&quot;:&quot;struct&quot;,&quot;fields&quot;:[{&quot;type&quot;:&quot;int32&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;id&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:true,&quot;field&quot;:&quot;status&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:true,&quot;name&quot;:&quot;io.debezium.time.ZonedTimestamp&quot;,&quot;version&quot;:1,&quot;field&quot;:&quot;createdAt&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:true,&quot;name&quot;:&quot;io.debezium.time.ZonedTimestamp&quot;,&quot;version&quot;:1,&quot;field&quot;:&quot;updated_at&quot;}],&quot;optional&quot;:true,&quot;name&quot;:&quot;debezium_test.debezium_test.TestBinLogStream.Value&quot;,&quot;field&quot;:&quot;after&quot;},{&quot;type&quot;:&quot;struct&quot;,&quot;fields&quot;:[{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;version&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;connector&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;name&quot;},{&quot;type&quot;:&quot;int64&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;ts_ms&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:true,&quot;name&quot;:&quot;io.debezium.data.Enum&quot;,&quot;version&quot;:1,&quot;parameters&quot;:{&quot;allowed&quot;:&quot;true,last,false,incremental&quot;},&quot;default&quot;:&quot;false&quot;,&quot;field&quot;:&quot;snapshot&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;db&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:true,&quot;field&quot;:&quot;sequence&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:true,&quot;field&quot;:&quot;table&quot;},{&quot;type&quot;:&quot;int64&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;server_id&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:true,&quot;field&quot;:&quot;gtid&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;file&quot;},{&quot;type&quot;:&quot;int64&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;pos&quot;},{&quot;type&quot;:&quot;int32&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;row&quot;},{&quot;type&quot;:&quot;int64&quot;,&quot;optional&quot;:true,&quot;field&quot;:&quot;thread&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:true,&quot;field&quot;:&quot;query&quot;}],&quot;optional&quot;:false,&quot;name&quot;:&quot;io.debezium.connector.mysql.Source&quot;,&quot;field&quot;:&quot;source&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;op&quot;},{&quot;type&quot;:&quot;int64&quot;,&quot;optional&quot;:true,&quot;field&quot;:&quot;ts_ms&quot;},{&quot;type&quot;:&quot;struct&quot;,&quot;fields&quot;:[{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;id&quot;},{&quot;type&quot;:&quot;int64&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;total_order&quot;},{&quot;type&quot;:&quot;int64&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;data_collection_order&quot;}],&quot;optional&quot;:true,&quot;name&quot;:&quot;event.block&quot;,&quot;version&quot;:1,&quot;field&quot;:&quot;transaction&quot;}],&quot;optional&quot;:false,&quot;name&quot;:&quot;debezium_test.debezium_test.TestBinLogStream.Envelope&quot;,&quot;version&quot;:1},&quot;payload&quot;:{&quot;before&quot;:null,&quot;after&quot;:{&quot;id&quot;:2,&quot;status&quot;:&quot;completed&quot;,&quot;createdAt&quot;:&quot;2022-11-26T10:12:19Z&quot;,&quot;updated_at&quot;:&quot;2022-11-26T10:12:19Z&quot;},&quot;source&quot;:{&quot;version&quot;:&quot;2.0.0.Final&quot;,&quot;connector&quot;:&quot;mysql&quot;,&quot;name&quot;:&quot;debezium_test&quot;,&quot;ts_ms&quot;:1669458279000,&quot;snapshot&quot;:&quot;true&quot;,&quot;db&quot;:&quot;debezium_test&quot;,&quot;sequence&quot;:null,&quot;table&quot;:&quot;TestBinLogStream&quot;,&quot;server_id&quot;:0,&quot;gtid&quot;:null,&quot;file&quot;:&quot;binlog.000002&quot;,&quot;pos&quot;:19791,&quot;row&quot;:0,&quot;thread&quot;:null,&quot;query&quot;:null},&quot;op&quot;:&quot;r&quot;,&quot;ts_ms&quot;:1669458279804,&quot;transaction&quot;:null}}
{&quot;schema&quot;:{&quot;type&quot;:&quot;struct&quot;,&quot;fields&quot;:[{&quot;type&quot;:&quot;struct&quot;,&quot;fields&quot;:[{&quot;type&quot;:&quot;int32&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;id&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:true,&quot;field&quot;:&quot;status&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:true,&quot;name&quot;:&quot;io.debezium.time.ZonedTimestamp&quot;,&quot;version&quot;:1,&quot;field&quot;:&quot;createdAt&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:true,&quot;name&quot;:&quot;io.debezium.time.ZonedTimestamp&quot;,&quot;version&quot;:1,&quot;field&quot;:&quot;updated_at&quot;}],&quot;optional&quot;:true,&quot;name&quot;:&quot;debezium_test.debezium_test.TestBinLogStream.Value&quot;,&quot;field&quot;:&quot;before&quot;},{&quot;type&quot;:&quot;struct&quot;,&quot;fields&quot;:[{&quot;type&quot;:&quot;int32&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;id&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:true,&quot;field&quot;:&quot;status&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:true,&quot;name&quot;:&quot;io.debezium.time.ZonedTimestamp&quot;,&quot;version&quot;:1,&quot;field&quot;:&quot;createdAt&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:true,&quot;name&quot;:&quot;io.debezium.time.ZonedTimestamp&quot;,&quot;version&quot;:1,&quot;field&quot;:&quot;updated_at&quot;}],&quot;optional&quot;:true,&quot;name&quot;:&quot;debezium_test.debezium_test.TestBinLogStream.Value&quot;,&quot;field&quot;:&quot;after&quot;},{&quot;type&quot;:&quot;struct&quot;,&quot;fields&quot;:[{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;version&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;connector&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;name&quot;},{&quot;type&quot;:&quot;int64&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;ts_ms&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:true,&quot;name&quot;:&quot;io.debezium.data.Enum&quot;,&quot;version&quot;:1,&quot;parameters&quot;:{&quot;allowed&quot;:&quot;true,last,false,incremental&quot;},&quot;default&quot;:&quot;false&quot;,&quot;field&quot;:&quot;snapshot&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;db&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:true,&quot;field&quot;:&quot;sequence&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:true,&quot;field&quot;:&quot;table&quot;},{&quot;type&quot;:&quot;int64&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;server_id&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:true,&quot;field&quot;:&quot;gtid&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;file&quot;},{&quot;type&quot;:&quot;int64&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;pos&quot;},{&quot;type&quot;:&quot;int32&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;row&quot;},{&quot;type&quot;:&quot;int64&quot;,&quot;optional&quot;:true,&quot;field&quot;:&quot;thread&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:true,&quot;field&quot;:&quot;query&quot;}],&quot;optional&quot;:false,&quot;name&quot;:&quot;io.debezium.connector.mysql.Source&quot;,&quot;field&quot;:&quot;source&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;op&quot;},{&quot;type&quot;:&quot;int64&quot;,&quot;optional&quot;:true,&quot;field&quot;:&quot;ts_ms&quot;},{&quot;type&quot;:&quot;struct&quot;,&quot;fields&quot;:[{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;id&quot;},{&quot;type&quot;:&quot;int64&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;total_order&quot;},{&quot;type&quot;:&quot;int64&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;data_collection_order&quot;}],&quot;optional&quot;:true,&quot;name&quot;:&quot;event.block&quot;,&quot;version&quot;:1,&quot;field&quot;:&quot;transaction&quot;}],&quot;optional&quot;:false,&quot;name&quot;:&quot;debezium_test.debezium_test.TestBinLogStream.Envelope&quot;,&quot;version&quot;:1},&quot;payload&quot;:{&quot;before&quot;:null,&quot;after&quot;:{&quot;id&quot;:4,&quot;status&quot;:&quot;completed&quot;,&quot;createdAt&quot;:&quot;2022-11-26T10:12:19Z&quot;,&quot;updated_at&quot;:&quot;2022-11-26T10:12:19Z&quot;},&quot;source&quot;:{&quot;version&quot;:&quot;2.0.0.Final&quot;,&quot;connector&quot;:&quot;mysql&quot;,&quot;name&quot;:&quot;debezium_test&quot;,&quot;ts_ms&quot;:1669458279000,&quot;snapshot&quot;:&quot;true&quot;,&quot;db&quot;:&quot;debezium_test&quot;,&quot;sequence&quot;:null,&quot;table&quot;:&quot;TestBinLogStream&quot;,&quot;server_id&quot;:0,&quot;gtid&quot;:null,&quot;file&quot;:&quot;binlog.000002&quot;,&quot;pos&quot;:19791,&quot;row&quot;:0,&quot;thread&quot;:null,&quot;query&quot;:null},&quot;op&quot;:&quot;r&quot;,&quot;ts_ms&quot;:1669458279805,&quot;transaction&quot;:null}}
{&quot;schema&quot;:{&quot;type&quot;:&quot;struct&quot;,&quot;fields&quot;:[{&quot;type&quot;:&quot;struct&quot;,&quot;fields&quot;:[{&quot;type&quot;:&quot;int32&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;id&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:true,&quot;field&quot;:&quot;status&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:true,&quot;name&quot;:&quot;io.debezium.time.ZonedTimestamp&quot;,&quot;version&quot;:1,&quot;field&quot;:&quot;createdAt&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:true,&quot;name&quot;:&quot;io.debezium.time.ZonedTimestamp&quot;,&quot;version&quot;:1,&quot;field&quot;:&quot;updated_at&quot;}],&quot;optional&quot;:true,&quot;name&quot;:&quot;debezium_test.debezium_test.TestBinLogStream.Value&quot;,&quot;field&quot;:&quot;before&quot;},{&quot;type&quot;:&quot;struct&quot;,&quot;fields&quot;:[{&quot;type&quot;:&quot;int32&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;id&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:true,&quot;field&quot;:&quot;status&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:true,&quot;name&quot;:&quot;io.debezium.time.ZonedTimestamp&quot;,&quot;version&quot;:1,&quot;field&quot;:&quot;createdAt&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:true,&quot;name&quot;:&quot;io.debezium.time.ZonedTimestamp&quot;,&quot;version&quot;:1,&quot;field&quot;:&quot;updated_at&quot;}],&quot;optional&quot;:true,&quot;name&quot;:&quot;debezium_test.debezium_test.TestBinLogStream.Value&quot;,&quot;field&quot;:&quot;after&quot;},{&quot;type&quot;:&quot;struct&quot;,&quot;fields&quot;:[{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;version&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;connector&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;name&quot;},{&quot;type&quot;:&quot;int64&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;ts_ms&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:true,&quot;name&quot;:&quot;io.debezium.data.Enum&quot;,&quot;version&quot;:1,&quot;parameters&quot;:{&quot;allowed&quot;:&quot;true,last,false,incremental&quot;},&quot;default&quot;:&quot;false&quot;,&quot;field&quot;:&quot;snapshot&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;db&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:true,&quot;field&quot;:&quot;sequence&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:true,&quot;field&quot;:&quot;table&quot;},{&quot;type&quot;:&quot;int64&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;server_id&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:true,&quot;field&quot;:&quot;gtid&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;file&quot;},{&quot;type&quot;:&quot;int64&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;pos&quot;},{&quot;type&quot;:&quot;int32&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;row&quot;},{&quot;type&quot;:&quot;int64&quot;,&quot;optional&quot;:true,&quot;field&quot;:&quot;thread&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:true,&quot;field&quot;:&quot;query&quot;}],&quot;optional&quot;:false,&quot;name&quot;:&quot;io.debezium.connector.mysql.Source&quot;,&quot;field&quot;:&quot;source&quot;},{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;op&quot;},{&quot;type&quot;:&quot;int64&quot;,&quot;optional&quot;:true,&quot;field&quot;:&quot;ts_ms&quot;},{&quot;type&quot;:&quot;struct&quot;,&quot;fields&quot;:[{&quot;type&quot;:&quot;string&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;id&quot;},{&quot;type&quot;:&quot;int64&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;total_order&quot;},{&quot;type&quot;:&quot;int64&quot;,&quot;optional&quot;:false,&quot;field&quot;:&quot;data_collection_order&quot;}],&quot;optional&quot;:true,&quot;name&quot;:&quot;event.block&quot;,&quot;version&quot;:1,&quot;field&quot;:&quot;transaction&quot;}],&quot;optional&quot;:false,&quot;name&quot;:&quot;debezium_test.debezium_test.TestBinLogStream.Envelope&quot;,&quot;version&quot;:1},&quot;payload&quot;:{&quot;before&quot;:null,&quot;after&quot;:{&quot;id&quot;:5,&quot;status&quot;:&quot;completed&quot;,&quot;createdAt&quot;:&quot;2022-11-26T10:12:19Z&quot;,&quot;updated_at&quot;:&quot;2022-11-26T10:12:19Z&quot;},&quot;source&quot;:{&quot;version&quot;:&quot;2.0.0.Final&quot;,&quot;connector&quot;:&quot;mysql&quot;,&quot;name&quot;:&quot;debezium_test&quot;,&quot;ts_ms&quot;:1669458279000,&quot;snapshot&quot;:&quot;last&quot;,&quot;db&quot;:&quot;debezium_test&quot;,&quot;sequence&quot;:null,&quot;table&quot;:&quot;TestBinLogStream&quot;,&quot;server_id&quot;:0,&quot;gtid&quot;:null,&quot;file&quot;:&quot;binlog.000002&quot;,&quot;pos&quot;:19791,&quot;row&quot;:0,&quot;thread&quot;:null,&quot;query&quot;:null},&quot;op&quot;:&quot;r&quot;,&quot;ts_ms&quot;:1669458279805,&quot;transaction&quot;:null}}</code></pre>
<h1 id="참고-문서">참고 문서</h1>
<ul>
<li><a href="https://wecandev.tistory.com/109">https://wecandev.tistory.com/109</a></li>
</ul>
]]></description>
        </item>
        <item>
            <title><![CDATA[Kafka connect]]></title>
            <link>https://velog.io/@2h-kim/Kafka-connect</link>
            <guid>https://velog.io/@2h-kim/Kafka-connect</guid>
            <pubDate>Fri, 18 Nov 2022 05:39:05 GMT</pubDate>
            <description><![CDATA[<h2 id="kafka-connect">kafka connect</h2>
<p>카프카 커넥트는 카프카 오픈소스에 포함된 툴 중 하나로 데이터 파이프라인 생성 시 반복 작업을 줄이고 효율적인 전송을 이루기 위한 애플리케이션이다. 파이프라인마다 프로듀서, 컨슈머를 개발하는 것은 비효율적이다. 커넥트는 특정 작업 형태를 템플릿으로 만들어놓은 커넥터를 실행함으로써 반복 작업을 줄일 수 있다.
커넥터의 경우 소스 커넥터, 싱크 커넥터 2가지로 나뉘며 소스의 경우 데이터를 카프카 토픽으로 전송하는 프로듀서 역할, 싱크의 경우 토픽의 데이터를 파일로 저장하는 컨슈머 역할을 한다.</p>
<h3 id="커넥트-실행-방법">커넥트 실행 방법</h3>
<ul>
<li>단일 모드 커넥트<ul>
<li>단일 애플리케이션으로 실행</li>
<li>커넥터를 정의하는 파일을 작성하고 해당 파일을 참조하는 단일 모드 커넥트를 실행함을써 파이프라인 생성</li>
<li>1개 프로세스만 실행되기 때문에 단일 장애점이 될 수 있음</li>
<li>개발환경이나 중요도가 낮은 파이프라인을 운영할 때 사용</li>
</ul>
</li>
<li>분산 모드 커넥트<ul>
<li>2대 이상의 서버에서 클러스터 형태로 운영</li>
<li>단일 모드 커넥트 대비 안전하게 운영할 수 있음</li>
<li>데이터 처리양의 변환에도 유연하게 대응 가능</li>
</ul>
</li>
</ul>
<h4 id="단일-모드-커넥트">단일 모드 커넥트</h4>
<h5 id="connect-stadaloneproperties">connect-stadalone.properties</h5>
<p>단일 모드 커넥트를 참조하는 설정 파일</p>
<pre><code class="language-bash"># These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors</code></pre>
<ul>
<li><code>bootstrap.servers</code>
커넥트와 연동할 카프카 클러스터의 호스트-포트. 2개 이상의 브로커라면 ,로 구분하여 적으면 된다.</li>
<li><code>key.converter</code>, <code>value.converter</code>
데이터를 카프카에 저장할 때 혹은 카프카에서 데이터를 가져올 때 변환하는 데에 사용
JsonConverter, StringConverter, ByteArrayConverter를 기본으로 제공
만약 사용하고 싶지 않다면 <code>key.converter.schemas.enable=false</code> or <code>value.converter.schemas.enable=false</code>로 설정</li>
<li><code>offset.storage.file.filename</code>
단일 모드 커넥터는 로컬 파일에 오프셋 정보를 저장하며, 이 정보는 소스 커넥터 또는 싱크 커넥터가 데이터 처리 시점을 저장하기 위해 사용
해당 정보는 다른 사용자나 시스템이 접근하지 않도록 주의해야 함</li>
<li><code>offset.flush.interval.ms</code>
태스크가 처리 완료한 오프셋을 커밋하는 주기</li>
<li><code>plugin.path</code>
플러그인 형태로 추가할 커넥터의 디렉토리 주소로, 커넥터의 jar파일이 위치하는 디렉토리 값을 입력
커넥터 이외에도 직접 컨버터, 트랜스폼도 플러그인으로 추가할 수 있음</li>
</ul>
<h5 id="connect-file-sourceprofile">connect-file-source.profile</h5>
<p>파일 소스 커넥터</p>
<pre><code class="language-bash">name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test</code></pre>
<ul>
<li><code>name</code> 
커넥터의 이름으로 유일해야 함</li>
<li><code>connector.class</code>
사용할 커넥터의 클래스 이름</li>
<li><code>task.max</code>
커넥터로 실행할 태스크 개수로, 늘려서 병렬처리 가능</li>
<li><code>file</code>
읽을 파일의 위치</li>
<li><code>topic</code>읽은 파일의 데이터를 저장할 토픽 이름</li>
</ul>
<h5 id="실행">실행</h5>
<pre><code class="language-bash">$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties \
$KAFKA_HOME/config/connect-file-source.properties </code></pre>
<h4 id="분산-모드-커넥트">분산 모드 커넥트</h4>
<p>단일 모드 커넥트와 다르게 2개 이상의 프로세스가 1개의 그룹으로 묶여서 운영된다.</p>
<h5 id="connect-distributedproperties">connect-distributed.properties</h5>
<pre><code class="language-bash"># A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
bootstrap.servers=localhost:9092

# unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
group.id=connect-cluster

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.topic=connect-offsets
offset.storage.replication.factor=1

config.storage.topic=connect-configs
config.storage.replication.factor=1

status.storage.topic=connect-status
status.storage.replication.factor=1
#status.storage.partitions=5

offset.flush.interval.ms=10000

# List of comma-separated URIs the REST API will listen on. The supported protocols are HTTP and HTTPS.
# Specify hostname as 0.0.0.0 to bind to all interfaces.
# Leave hostname empty to bind to default interface.
# Examples of legal listener lists: HTTP://myhost:8083,HTTPS://myhost:8084&quot;
#listeners=HTTP://:8083

# The Hostname &amp; Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
# If not set, it uses the value for &quot;listeners&quot; if configured.
#rest.advertised.host.name=
#rest.advertised.port=
#rest.advertised.listener=

plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,</code></pre>
<ul>
<li><code>bootstrap.servers</code>
커넥트와 연동할 카프카 클러스터의 호스트 이름과 포트</li>
<li><code>group.id</code>
다수의 커넥트 프로세스들을 묶을 그룹 이름
동일한 group.id로 지정된 경우 커넥트들은 같은 그룹으로 인식
같은 그룹으로 지정된 커넥트들에서 커넥터가 실행되면 커넥트들에 분산되어 실행</li>
<li><code>key.converter</code>, <code>value.converter</code>
데이터를 카프카에 저장할 때 혹은 카프카에서 데이터를 가져올 때 변환하는 데에 사용
JsonConverter, StringConverter, ByteArrayConverter를 기본으로 제공
만약 사용하고 싶지 않다면 <code>key.converter.schemas.enable=false</code> or <code>value.converter.schemas.enable=false</code>로 설정</li>
<li><code>offset.storage.topic</code>
분산 모드 커넥트의 경우 카프카 내부 토픽에 오프셋 정보를 저장
소스 커넥터 또는 싱크 커넥터가 데이터 처리 시점을 저장하기 위해 사용
실제로 운영시 복제개수를 3이상으로 설정하는 것이 좋음</li>
<li><code>offset.flush.interval.ms</code>
태스크가 처리 완료한 오프셋을 커밋하는 주기</li>
<li><code>plugin.path</code>
플러그인 형태로 추가할 커넥터의 디렉토리 주소로, 커넥터의 jar파일이 위치하는 디렉토리 값을 입력
커넥터 이외에도 직접 컨버터, 트랜스폼도 플러그인으로 추가할 수 있음</li>
</ul>
<h3 id="소스-커넥터">소스 커넥터</h3>
<p>소스 커넥터는 소스 애플리케이션 또는 소스 파일로부터 데이터를 가져와 토픽으로 넣는 역할을 한다.
필요한 클래스는 2개로 아래와 같다.</p>
<ul>
<li>SourceConnector<ul>
<li>테스크를 실행하기 전 커넥터 설정파일을 초기화</li>
<li>어떤 테스크 클래스를 사용할 것인지 정의하는데 사용</li>
</ul>
</li>
<li>SourceTask<ul>
<li>실제로 데이터를 다루는 클래스</li>
<li>소스 애플리케이션 또는 소스 파일로부터 데이터를 가져와 토픽으로 데이터를 보내는 역할을 수행</li>
<li>토픽에서 사용하는 오프셋이 아닌 자체적으로 사용하는 오프셋을 사용</li>
</ul>
</li>
</ul>
]]></description>
        </item>
        <item>
            <title><![CDATA[Kafa Streams]]></title>
            <link>https://velog.io/@2h-kim/Kafa-Streams</link>
            <guid>https://velog.io/@2h-kim/Kafa-Streams</guid>
            <pubDate>Tue, 15 Nov 2022 10:47:03 GMT</pubDate>
            <description><![CDATA[<blockquote>
<p>Code의 경우, Python을 이용하여 컨버팅하려고 하였으나, faust에서 Null 키에 대해 오류가 많이 발생하여 우선은 코드 변환 패스</p>
</blockquote>
<h2 id="카프카-스트림즈">카프카 스트림즈</h2>
<p>카프카 스트림즈는 토픽에 적재된 데이터를 상태/비상태 기반으로 실시간 변환하여 다른 토픽에 적재하는 라이브러리이다. 컨슈머와 프로듀서의 조합으로 스트림즈 제공하는 기능과 유사하게 만들 수는 있으나, 장애 허용 시스템, 데이터 처리등의 특징 들은 2개의 조합으로 완벽하게 구현하기 어렵다. 하지만 스트림즈에 없는 기능의 경우 위의 2개를 이용해서 개발할 수도 있다.
스트림즈에서 task는 스트림즈 애플리케이션을 실행하면 생기는 데이터 처리 최소 단위이다. 만약 3개의 파티션으로 이루어진 토픽을 처리하는 스트림즈 애플리케이션을 실행하면 내부에 3개의 태스크가 생긴다.
실제 운영환경에서는 실행되는 서버 장애가 발생하더라도 안전하게 스트림 처리를 할 수 있게 2개 이상의 서버로 구성하여 스트림즈 애플리케이션을 운영한다. 
카프카 스트림즈에서 토폴로지를 이루는 노드를 processor, 노드와 노드를 잇는 선을 stream이라고 부른다. 스트림의 경우, 토픽의 데이터를 뜻하며 프로듀서 컨슈머에서 활용했던 레코드와 동일하다. 프로세서는 총 3개로 나뉘며 아래와 같다.</p>
<ul>
<li>소스 프로세서
데이터를 처리하기 위해 최초로 선언해야 하는 노드로, 하나 이상의 토픽에서 데이터를 가져오는 역할</li>
<li>스트림 프로세서
다른 프로세서가 반환한 데이터를 처리하는 역할(변환, 분기 처리 등과 같은 로직)</li>
<li>싱크 프로세서
데이터를 특정 카프카 토픽으로 저장하는 역할</li>
</ul>
<p>스트림즈의 경우 2가지 방법으로 개발 가능하며 데이터 처리 예시는 아래와 같다.</p>
<ul>
<li>스트림즈 DSL<ul>
<li>메시지 값을 기반으로 토픽 분기 처리</li>
<li>지난 10분간 들어온 데이터의 개수 집계</li>
<li>토픽과 다른 토픽의 결합으로 새로운 데이터 생성</li>
</ul>
</li>
<li>프로세서 API<ul>
<li>메시지 값의 종류에 따라 토픽을 가변적으로 전송</li>
<li>일정한 시간 간격으로 데이터 처리</li>
</ul>
</li>
</ul>
<h3 id="streams-dsldomain-specific-language">Streams DSL(Domain Specific Language)</h3>
<h4 id="kstream">KStream</h4>
<p>KStream은 레코드의 흐름을 표현한 것으로 메시지 키와 값으로 구성되어 있다. KStream으로 데이터를 조회하면 토픽에 존재하는 모든 레코드가 출력되며, 컨슈머로 토픽을 구독하는 것과 동일한 선상에서 사용하는 것이라고 볼 수 있다.
(약간 CDC 데이터와 같은 느낌 같음..)</p>
<h4 id="ktable">KTable</h4>
<p>KTable은 메시지 키를 기준으로 묶어서 사용한다. 토픽의 모든 레코드를 조회할 수 있으나 유니크한 메시지 키를 기준으로 가장 최신 레코드를 사용한다. 따라서 KTable로 데이터를 조회시 메시지 키를 기준으로 가장 최신에 추가한 레코드의 데이터가 출력된다.
(약간 python의 dict형식 같음)</p>
<h4 id="globalktable">GlobalKTable</h4>
<p>GlobalKTable은 KTable과 동일하게 메시지 키를 기준으로 묶어서 사용하지만, KTable로 선언된 토픽은 1개 파티션이 1개 테스크에 할당되어 사용되고, GlobalKTable로 선언된 토픽은 모든 파티션 데이터가 각 테스크에 할당되어 사용된다는 차이가 있다.
GlobalKTable 설명의 가장 좋은 예는 KStream과 KTable이 join을 수행할 때다. 이 때 반드시 co-partitioning이 되어 있어야 하는데, 이는 join을 하고자 하는 2개의 데이터의 파티션 개수 및 partitioning strategy(파티셔닝 전략)을 동일하게 맞추는 작업이다.(코파티셔닝이 안된 2개의 토픽 join 시 ToplologyException 발생)
만약 KStream, KTable이 코파티셔닝이 되어 있지 않으면 repartitioning이 이루어져야 한다. 이 때 데이터 중복 생성 뿐만 아니라 프로세싱하는 과정도 거쳐야 한다. KTable을 GlobalKTable로 선언하여 사용하게 되면, GlobalKTable로 정의된 데이터는 스트림즈 애플리케이션의 모든 테스크에 동일하게 공유되어 사용되기 때문에 코파티셔닝 되지 않은 KStream과 데이터 조인이 가능하다. 
GlobalKTable을 사용하면 GlobalKTable로 정의된 모든 데이터를 저장하고 사용하기 때문에 스트림즈 애플리케이션의 로컬 스토리지의 사용량이 증가하고 네트워크, 브로커에 부하가 생기므로 되도록 작은 용량의 데이터일 경우에만 사용하는 것이 좋다.(만약, 많은 양의 데이터를 가진 토픽으로 조인할 경우 리파티션을 통해 KTable로 사용하는 것을 권장)</p>
<h3 id="processor-api">Processor API</h3>
<p>스트림즈 DSL보다 투박한 코드를 가지지만 토폴로지를 기준으로 데이터를 처리한다는 관점에서는 동일한 역할을 한다.</p>
]]></description>
        </item>
        <item>
            <title><![CDATA[Kafka client]]></title>
            <link>https://velog.io/@2h-kim/Kafka-client</link>
            <guid>https://velog.io/@2h-kim/Kafka-client</guid>
            <pubDate>Sun, 30 Oct 2022 09:06:19 GMT</pubDate>
            <description><![CDATA[<blockquote>
<p>해당 문서의 경우, 책에서는 모두 java로 짜여져 있으나, 작성자의 경우 python을 주 언어로 사용하기 때문에 해당 책의 코드를 모두 python으로 변경했습니다.
    &gt; python version : python 3.8.13</p>
</blockquote>
<h2 id="kafka-client">Kafka client</h2>
<p>카프카 클라이언트 라이브러리를 사용하여 카프카 클러스터에 명령을 내리거나 데이터를 송수신을 하며, 이를 이용하여 애플리케이션을 개발한다.</p>
<h3 id="kafka-with-python">Kafka with Python</h3>
<p>크게 3가지로 나뉘는데, 여기서 진행할 때에는 <code>kafka-python</code>을 사용하고자 한다.</p>
<ul>
<li>confluent-kafka-python: 퍼포먼스가 가장 좋음. confluent의 공식 클라이언트.</li>
<li>kafka-python: pure python. confluent-python에 비해서는 속도가 느림</li>
<li>pykafka: 2018년 이후 업데이트가 잘 안 된다고 한다.</li>
</ul>
<h4 id="install">install</h4>
<pre><code class="language-bash">pip install kafka-python</code></pre>
<hr>
<h3 id="producer">Producer</h3>
<p>카프카의 데이터 시작점은 프로듀서라고 보면 된다. 프로듀서 애플리케이션은 필요 데이터 선언 및 브로커의 특정 토픽의 파티션에 전송한다. 데이터를 전송할 떄 리더 파티션을 가지고 있는 카프카 브로커와 직접 통신하고, 직렬화하여 카프카 브로커로 보낸다. 직렬화로 동영상, 이미지와 같은 바이너리 데이터도 프로듀서를 통해 전송할 수 있다.</p>
<h4 id="simple-kafka-producer">simple-kafka-producer</h4>
<h5 id="code">Code</h5>
<pre><code class="language-python">from kafka.producer import KafkaProducer

TOPIC_NAME = &quot;kafka.client.tutorial&quot; # producer는 생성한 레코드를 전송하기 위해 전송하고자 하는 토픽을 알고 있어야 한다.
BOOTSTRAP_SERVER_HOST = &quot;kafka_tutorial:9092&quot; # 전송하고자 하는 카프카 클러스터 서버의 host와 IP를 지정
KEY_SERIALIZER = str.encode
VALUE_SERIALIZER = str.encode


producer = KafkaProducer(
    bootstrap_servers=[BOOTSTRAP_SERVER_HOST],
    key_serializer=KEY_SERIALIZER,
    value_seriakuzer=VALUE_SERIALIZER
)
# test message without key
test_message_value = &quot;testMessage for python with Kafka&quot;

# send는 즉각 전송이 아닌 배치 전송이다.
producer.send(topic=TOPIC_NAME, value=test_message_value)

# flush를 통해 내부 버퍼에 가지고 있던 레코드 배치를 브로커에 전송
producer.flush()

# producer 리소스 종료
producer.close()</code></pre>
<h5 id="실행">실행</h5>
<pre><code class="language-bash"># topic 생성
bin/kafka-topics.sh --bootstrap-server kafka_tutorial:9092 --create --topic kafka.client.tutorial --partitions 3
## WARNING: Due to limitations in metric names, topics with a period (&#39;.&#39;) or underscore (&#39;_&#39;) could collide. To avoid issues it is best to use either, but not both.
## Created topic kafka.client.tutorial.

# producer 실행
python ${PYTHON_CODE_PATH}/simple-kafka-producer/simple-kafka-producer.py

# consumer에 제대로 들어왔는지 확인
bin/kafka-console-consumer.sh --bootstrap-server kafka_tutorial:9092 --topic kafka.client.tutorial --from-beginning           
## testMessage for python with Kafka
</code></pre>
<br/>

<p>책에 있는 코드를 한 번 python으로 바꿔봄..</p>
<ul>
<li><a href="https://github.com/2h-kim/kafka-personal-study/tree/main/simple-kafka-producer">https://github.com/2h-kim/kafka-personal-study/tree/main/simple-kafka-producer</a></li>
</ul>
<br/>
<hr/>

<h3 id="consumer">Consumer</h3>
<p>producer가 전송한 데이터는 broker에 적재되며, consumer는 broker로부터 데이터를 가져와 필요한 처리를 한다. </p>
<h4 id="컨슈머-운영-방법">컨슈머 운영 방법</h4>
<p><strong>컨슈머 그룹으로 운영</strong>
컨슈머를 각 컨슈머 그룹으로부터 격리된 환경에서 안전하게 운영할 수 있도록 도와주는 카프카의 독특한 방식
토픽의 1개 이상 파티션들에 할당되어 데이터를 가져갈 수 있으며, 컨슈머 그룹으로 묶인 컨슈머가 토픽을 구독해서 데이터를 가져갈 때, 1개의 파티션은 최대 1개의 컨슈머에 할당 가능하여, 컨슈머 그룹의 컨슈머 개수는 토픽의 파티션 개수보다 같거나 작아야 한다.
컨슈머 그룹은 다른 컨슈머 그룹과 격리되는 특징을 가지고 있어서, 프로듀서가 보낸 데이터를 각기 다른 역할을 하는 컨슈머 그룹끼리 영향을 받지 않게 처리할 수 있다.
만약, 컨슈머 그룹의 컨슈머에 장애가 발생하면 장애가 발생한 컨슈머에 할당된 파티션은 장애가 발생하지 않은 컨슈머에 소유권을 넘기는 리밸런싱이 이루어진다. 리밸런싱은 크게 2가지 상황에서 일어나는데, 컨슈머가 추가되는 상황과 제외되는 상황에서 발생한다. 리밸런싱은 유용하지만 자주 일어나서는 안되는데, 리밸런싱이 발생할 때 파티션의 소유권을 컨슈머로 재할당하는 과정에서 해당 컨슈머 그룹의 컨슈머들이 토픽의 데이터를 읽을 수 없기 때문이다.</p>
<h4 id="컨슈머-작동-방식">컨슈머 작동 방식</h4>
<p>컨슈머는 커밋을 통해 브로커로부터 데이터를 어디까지 가져갔는지 기록한다. <code>__consumer_offsets</code>를 통해 특정 토픽의 파티션을 어떤 컨슈머 그룹이 몇 번째 가져갔는지 브로커 내부에서 가용되는 내부토픽에 기록된다. 만약 <code>__consumer_offsets</code>토픽에 어느 레코드 까지 읽어갔는지 오프셋 커핏이 기록되지 못했다며느, 데이터 처리가 중복이 발생하기 때문에 컨슈머 애플리케이션이 오프셋 커밋을 정상적으로 처리했는지 검증해야 한다.
오프셋 커밋의 경우 명시적, 비명시적으로 수행할 수 있다. 기본 옵션은 <code>poll()</code>메소드가 수행될 때 일정 간경마다 오프셋을 커밋하도록 <code>enable.auto.commit=true</code>로 설정 되어 있는데 이렇게 일정 간격마다 자동으로 커밋되는 것을 비명시 오프셋 커밋이라고 부른다. <code>poll()</code> 메서드가 <code>auto.commit.interval.ms</code>에 설정된 값 이상이 지났을 때 그 시점까지 읽은 레코드의 오프셋을 커밋하는 방식인데, 코드상에서 따로 커밋 관련 코드를 작성할 필요가 없어 편리하지만 호출 이후에 리벨런싱 또는 컨슈머 강제종료 발생 시 컨슈머가 처리하는 데이터가 중복 또는 유실될 수 있는 취약점이 있다. <strong>데이터 중복이나 유실을 허용하지 않는 서비스라면 이를 사용해서는 안 된다.</strong>
명시적으로 오프셋 커밋의 경우 <code>commitSync()</code>메소드를 호출하면 된다. 해당 메소드의 경우, <code>poll()</code>메소드를 통해 반환된 레코드의 가장 마지막 오프셋을 기준으로 커밋을 수행한다. 이는 브로커에 커밋 요청을 보내고, 정상 처리되었는지 응답까지 기다리는데 이는 컨슈머의 처리량에 영향을 끼친다. 이를 위해 <code>commitAsnc()</code>메서드를 사용할 수 있는데, 이는 커밋 요청을 전송하고 응답이 오기 전까지 데이터 처리를 수행할 수 있다. 이러한 비동기 커밋은 처리 중인 데이터의 순서를 보장하지 않으며 데이터의 중복 처리가 발생할 수도 있다.
컨슈머의 내부 구조를 보게 되면, 컨슈머 애플리케이션을 실행하게 되면 내부에서 Fetcher 인스턴스가 생성되어 <code>poll()</code>메서드를 호출하기 전에 미리 레코드들을 내부 큐로 가져오고, 메서드를 호출하면 그때 내부 큐에 있는 레코드들을 반환받아 처리를 수행한다.</p>
<br/>

<p>책에 있는 코드를 한 번 python으로 바꿔봄..
여기서 컨슈머의 안전한 종료인 <code>kafka-consumer-sync-offset-commit-shutdown-hook.py</code>의 경우, 현재 내가 알고 있는 지식으로 작성했기 때문에 정확하지 않고 수정이 필요합니다.</p>
<blockquote>
<p>Thread safety</p>
</blockquote>
<ul>
<li><p>The KafkaProducer can be used across threads without issue, unlike the KafkaConsumer which cannot.</p>
</li>
<li><p>While it is possible to use the KafkaConsumer in a thread-local manner, multiprocessing is recommended.</p>
</li>
<li><p><a href="https://github.com/2h-kim/kafka-personal-study/tree/main/simple-kafka-consumer">https://github.com/2h-kim/kafka-personal-study/tree/main/simple-kafka-consumer</a></p>
</li>
</ul>
<hr/>

<h3 id="admin-api">Admin API</h3>
<p>실제 운영환경에서 프로듀서와 컨슈머를 통해 데이터를 주고받는 것 외에 카프카에 설정된 내부 옵션을 설정하고 확인하는 것이 중요하다. AdminClient를 통해 클러스터 옵션 관련된 부부을 자동화할 수 있다.</p>
<blockquote>
<p>admin api를 사용할 때 클러스터의 버전과 클라이언트의 버전을 맞춰서 사용해줘야 한다.</p>
</blockquote>
<ul>
<li><a href="https://github.com/2h-kim/kafka-personal-study/tree/main/simple-kafka-admin-client">https://github.com/2h-kim/kafka-personal-study/tree/main/simple-kafka-admin-client</a></li>
</ul>
<hr/>

<h2 id="요약">요약</h2>
<ul>
<li>python을 이용하여 kafka producer, consumer, admin api를 진행하였다.</li>
<li>Producer<ul>
<li>프로듀서는 브로커로 데이터를 전송할 때 내부적으로 파티셔너, 배치 생성 단계를 거친다.</li>
<li>프로듀서 인스턴스가 send() 메서드를 호출하면 ProducerRecord는 파티셔너에서 토픽의 어느 파티션으로 전송될 것인지 정해지며, 따로 설정하지 않으면 DefaultPartitioner로 설정된다.</li>
<li>Accumulator에 데이터를 전송하기 전에 버퍼로 쌓아놓고 발송하며 배치로 묶어 전송함으로써 처리량을 향상시키는데 도움을 준다.</li>
<li>압축 옵션을 통해 브로커로 전송 시 압축방식을 정할 수 있으나, 네트워크 처리량에 이득을 볼 수 있으나 CPU/메모리 리소스를 사용하므로 적절한 압축 옵션을 사용해야 한다.</li>
</ul>
</li>
<li>Consumer<ul>
<li>컨슈머늘 브로커에 적재된 데이터를 가져와 처리를 한다.</li>
<li>컨슈머 그룹을 이용하게 되면 각 컨슈머 그룹끼리 격리된 환경에서 안정적으로 운영할 수 있도록 도와주며, 컨슈머 개수는 가져가고자 하는 토픽의 파티션 개수보다 같거나 작아야 한다.</li>
<li>컨슈머 그룹의 컨슈머에 장애 발생 시 리밸런싱이 일어나는데, 자주 일어날 경우 행이 걸릴 수도 있다.(리밸런싱 과정에서 컨슈머들은 토픽의 데이터를 읽을 수 없기 때문)</li>
<li>오프셋 커밋을 이용하여 컨슈머는 어느 오프셋까지 데이터를 가져갔는 지 확인 가능하다.</li>
<li>자동 오프셋 커밋의 경우 사용에 편리하지만, 데이터 중복/유실에 대한 가능성이 있다.(async 방식은 중복 처리 발생 가능성이 있음)</li>
<li>컨슈머 애플리케이션은 안전하게 종료되어야 하며, 정상적 종료되지 않은 컨슈머는 세션 타임아웃이 발생할때까지 컨슈머 그룹에 남게 된다.</li>
</ul>
</li>
<li>Admin API<ul>
<li>내부 옵션 확인 및 토픽 추가 등을 할 수 있다.</li>
<li>카프카 CLI의 경우 일회성 작업에 그치지만, 이를 이용하면 자동화가 가능하다.</li>
</ul>
</li>
</ul>
<h2 id="참고-문서">참고 문서</h2>
<ul>
<li><a href="https://seulcode.tistory.com/559">https://seulcode.tistory.com/559</a></li>
<li><a href="https://boying-blog.tistory.com/87">https://boying-blog.tistory.com/87</a></li>
</ul>
]]></description>
        </item>
        <item>
            <title><![CDATA[토픽 · 파티션 · 레코드]]></title>
            <link>https://velog.io/@2h-kim/%ED%86%A0%ED%94%BD-%ED%8C%8C%ED%8B%B0%EC%85%98-%EB%A0%88%EC%BD%94%EB%93%9C</link>
            <guid>https://velog.io/@2h-kim/%ED%86%A0%ED%94%BD-%ED%8C%8C%ED%8B%B0%EC%85%98-%EB%A0%88%EC%BD%94%EB%93%9C</guid>
            <pubDate>Mon, 24 Oct 2022 17:10:00 GMT</pubDate>
            <description><![CDATA[<h2 id="topic">Topic</h2>
<p>토픽은 카프카에서 데이터를 구분하기 위해 사용하는 단위로 데이터가 들어갈 수 있는 공간이라고 보면 된다. 토픽은 여러개 생성 가능하며, 약간 데이터베이스의 테이블 또는 파일 시스템의 폴더와 같은 느낌이라고 볼 수 있다. 
토픽은 이름을 가질 수 있으며, 무슨 데이터를 담는지 명확하게 명시해야 유지보수에 도움이 된다. 
토픽 이름의 경우 데이터의 얼굴이라고 보고 어떤 개발환경에서 사용되는 것인지 판단 가능해야 하고 어떤 애플리케이션에서 어떤 데이터 타입으로 사용되는지 유추할 수 있어야 한다.</p>
<h4 id="토픽-이름-제약-조건">토픽 이름 제약 조건</h4>
<ul>
<li>빈 문자열 토픽 이름은 지원하지 않는다.</li>
<li>토픽 이름은 마침표 하나 또는 파침표 둘로 생성될 수 없다.</li>
<li>토픽 이름의 길이는 249자 미만으로 생성되어야 한다.</li>
<li>토픽 이름은 영어 대소문자, 숫자, . , _ , - 조합으로 생성할 수 있으며, 이외의 문자열이 포함된 토픽 이름은 생성 불가하다.</li>
<li><code>__consumer_offsets</code>, <code>__transaction_state</code>는 카프카 내부 로직 관리 목적으로 사용되므로 생성 불가능하다.</li>
<li>이름에 마침표와 언더바가 동시에 들어갈 수 없다.</li>
<li>이미 생성된 토픽 이름의 마침표와 언더바를 바꾼 내역은 같다고 보아 생성 불가능하다.
(e.g. <code>to.pic</code> 이름이 생성되어 있으면 <code>to_pic</code>은 생성 불가)</li>
</ul>
<h4 id="토픽-작명-템플릿과-예시">토픽 작명 템플릿과 예시</h4>
<p>제일 중요한 것은 토픽 이름에 대해 규칙을 사전에 정의하고 그 규칙을 따르는 것이다.</p>
<ul>
<li><code>&lt;환경&gt;.&lt;팀명&gt;.&lt;애플리케이션명&gt;.&lt;메시지타입&gt;</code>
<code>prod.data-engineer.data-cdc.json</code></li>
<li><code>&lt;프로젝트명&gt;.&lt;서비스명&gt;.&lt;환경&gt;.&lt;이벤트명&gt;</code>
<code>commerence.payment.prod.notification</code></li>
</ul>
<br/>


<h3 id="토픽과-파티션">토픽과 파티션</h3>
<p>토픽은 1개 이상의 파티션을 소유하고 있으며, 0으로 시작된다. 하나의 파티션은 큐와 같이 파티션 끝에서 부터 차곡 차곡 쌓이며, 토픽에 컨슈머를 연결하면 오래된 순서로 데이터를 가져간다. 다른 메시지 플랫폼과 달리 데이터를 가져다고 레코드를 삭제하지 않는다. 파티션에는 프로듀서가 보낸 데이터들이 들어가 저장되는데 이것을 <strong>레코드</strong>라고 부른다.
파티션은 카프카의 병렬처리 핵심으로, 컨슈머들이 레코드를 병렬로 처리할 수 있도록 매칭된다. 컨슈머와 파티션을 늘림으로써 처리량을 증가 시킬 수 있다.
<img src="https://velog.velcdn.com/images/2h-kim/post/199cb393-d1bd-458e-a968-b20866245a2d/image.png" alt=""></p>
<br/>

<hr>
<h2 id="레코드">레코드</h2>
<p>레코드는 타임스탬프, 메시지 키, 메시지 값, 오프셋, 헤더로 구성되어 있다. 브로커에 한번 적재된 레코드는 수정할 수 없고 로그 리텐션 기간 또는 용량에 따라서만 삭제된다.
타임스탬프의 경우 프로듀서에서 해당 레코드가 생성된 시점의 유닉스타임이 설정(이는 임의의 타임스탬프 값 또는 브로커에 적재된 시간 등으로 변경 가능)되며, 컨슈머는 레코드의 타임스탬프를 통해 레코드가 언제 생성되었는지 알 수 있다. 
메시지 키는 메시지 값을 순서대로 처리하거나 값의 종류를 나타내기 위해 사용한다. 메시지 키를 사용하면 메시지 키의 해시값을 토대로 파티션을 지정하여, 동일한 메시지 키라면 동일 파티션에 들어가게 된다. 어느 파티션에 지정될지는 알 수 없고 파티션의 개수가 변경되면 키와 파티션 매칭이 달라지게 된다. 메시지 키를 선정하지 않으면 null로 설정되고 라운드로빈으로 파티션을 지정한다.
메시지 값에는 실질적으로 처리할 데이터가 들어있고, 키와 값은  직렬화되어 브로커로 전송된다. 따라서 컨슈머를 이용할 때 역직렬화를 수행해야 하며 반드시 동일한 형태로 처리해야 한다.
오프셋의 경우 0이상의 숫자로 이루어져 있으며, 사용자가 직접 지정할 수 없고 이전에 전송된 레코드의 오프셋 + 1 값으로 생성된다. 이 오프셋으로 컨슈머들이 파티션의 데이터를 어디까지 가져갔는지 명확히 지정할 수 있다.
헤더는 레코드의 추가적인 정보를 담는 메타데이터 저장소 용도로 사용되며 키/값 형태로 데이터를 추가하여 레코드의 속성을 저장 컨슈머에서 참조할 수 있다.</p>
<br/>

<hr>
<h2 id="요약">요약</h2>
<ul>
<li>토픽 <ul>
<li>카프카에서 데이터를 구분하기 위해 사용하는 단위로 데이터가 들어갈 수 있는 공간</li>
<li>토픽의 이름은 무슨 데이터를 담는지 명확하게 명시되어야 한다.</li>
</ul>
</li>
<li>파티션<ul>
<li>토픽은 1개 이상의 파티션이 있으며, 파티션은 저장소 안에 데이터를 저장하는 공간</li>
<li>파티션의 구조는 큐와 같이 FIFO이며, 레코드를 가지고 가더라도 삭제되지 않는다</li>
<li>컨슈머 개수를 늘림과 동시에 파티션 개수를 늘리면 처리량이 증가하는 효과를 얻을 수 있다.</li>
</ul>
</li>
<li>레코드<ul>
<li>타임스탬프, 메시지 키, 메시지 값, 오프셋, 헤더로 구성되어 있다.<ul>
<li>타임스탬프 : 프로듀서에서 해당 레코드가 생성된 시점(설정에 따라 바뀔 수 있음)</li>
<li>메시지 키 : 메시지 값을 처리하거나 종류를 나타내기 위해 사용
메시지 키가 같으면 같은 partition 매칭이 된다.(partition 수를 변경하면 이 조건이 깨짐)</li>
<li>메시지 값 : 실질적으로 처리할 데이터가 들어있으며 직렬화 되어 브로커로 전송</li>
<li>오프셋 : 0 이상의 숫자로 컨슈머들이 파티션의 데이터를 어디까지 가져갔는지 명확히 지정할 수 있다.</li>
<li>헤더 : 레코드의 추가적인 메타데이터 저장소 용도로 사용</li>
</ul>
</li>
</ul>
</li>
</ul>
]]></description>
        </item>
        <item>
            <title><![CDATA[브로커 · 클러스터 · 주키퍼]]></title>
            <link>https://velog.io/@2h-kim/%EB%B8%8C%EB%A1%9C%EC%BB%A4-%ED%81%B4%EB%9F%AC%EC%8A%A4%ED%84%B0-%EC%A3%BC%ED%82%A4%ED%8D%BC</link>
            <guid>https://velog.io/@2h-kim/%EB%B8%8C%EB%A1%9C%EC%BB%A4-%ED%81%B4%EB%9F%AC%EC%8A%A4%ED%84%B0-%EC%A3%BC%ED%82%A4%ED%8D%BC</guid>
            <pubDate>Mon, 24 Oct 2022 15:53:56 GMT</pubDate>
            <description><![CDATA[<h2 id="kafka-broker">Kafka Broker</h2>
<ul>
<li>카프카 클라이언트와 데이터를 주고받기 위해 사용하는 주체이자, 데이터를 분산 저장하여 장애가 발생하여도 안전하게 사용할 수 있도록 도와주는 애플리케이션</li>
<li>브로커 서버 1대로도 기본 기능이 실행되지만, 데이터를 안전하게 보관하고 처리하기 위해 3대 이상의 브로커 서버를 1개의 클러스터로 묶어서 운영<ul>
<li>카프카 클러스터로 묶인 브로커들은 프로듀서가 보낸 데이터를 안전하게 분산 저장하고 복제하는 역할을 수행</li>
</ul>
</li>
</ul>
<h3 id="역할">역할</h3>
<h4 id="데이터-저장전송">데이터 저장/전송</h4>
<p>카프카 브로커는 프로듀서로부터 전달받은 데이터를 토픽의 파티션에 저장하고, 컨슈머가 요청하면 파티션에 저장된 데이터를 전달한다. 전달받은 데이터는 파일 시스템에 저장되고, <code>config/server.properties</code>에서 <code>log.dir</code>옵션에 정의된 디렉토리에 저장된다.<br><a href="https://velog.io/@2h-kim/Kafka-tutorial">Kafka-tutorial</a>에서 진행했던 server를 보면 아래와 같이 나타난다. 아래를 보면 알겠지만 <code>topic_kafka_test-0</code>와 같이 토픽이름과 파티션번호의 조합으로 하위 디렉토리를 생성하여 데이터를 저장한다.</p>
<pre><code class="language-bash">$ ls /tmp/kafka-logs/
__consumer_offsets-0   ...
__consumer_offsets-17  __consumer_offsets-30  __consumer_offsets-44  replication-offset-checkpoint
__consumer_offsets-18  __consumer_offsets-31  __consumer_offsets-45  topic_kafka_test-0
__consumer_offsets-19  __consumer_offsets-32  __consumer_offsets-46  topic_kafka_test-1
__consumer_offsets-2   __consumer_offsets-33  __consumer_offsets-47  topic_kafka_test-2
__consumer_offsets-20  __consumer_offsets-34  __consumer_offsets-48  topic_kafka_test-3
__consumer_offsets-21  __consumer_offsets-35  __consumer_offsets-49  topic_kafka_test-4</code></pre>
<p>각 파티션에는 아래와 같은 데이터를 확인할 수 있으며 각 파일의 내용은 아래와 같다.</p>
<ul>
<li><code>log</code> : 메시지와 메타데이터를 저장
(특정 버전 이상 부터 metadata를 <code>partition.metadata</code>와 같이 나누어 저장하는 것으로 보임)</li>
<li><code>index</code> : 메시지의 오프셋을 인덱싱한 정보</li>
<li><code>timeindex</code> : 메시지에 포함된 timestamp 값을 기준으로 인덱싱한 정보<ul>
<li>0.10.0.0 버전 이후로 메시지에 timestamp 값이 포함이 되며, 이 값으로 브로커가 적재한 데이터를 삭제하거나 압축하는데 사용한다<pre><code class="language-bash">$ ls /tmp/kafka-logs/topic_kafka_test-0
00000000000000000000.index  00000000000000000000.timeindex  partition.metadata
00000000000000000000.log    leader-epoch-checkpoint</code></pre>
<br/>  

</li>
</ul>
</li>
</ul>
<p>카프카는 파일 시스템에 저장하지만 <strong>페이지 캐시</strong>를 사용하여 디스크 입출력 속도를 높여서 느린 부분을 해결하였다. 페이지 캐시는 OS에서 파일 입출력 성능 향상을 위해 만들어 놓은 메모리 영역을 의미한다. (자세한 설명은 <a href="https://ddongwon.tistory.com/56">여기</a>를 참고)
페이지 캐시를 통해 한번 접근한 파일의 내용은 메모리의 페이지 캐시 영역에 저장하여, 동일한 파일의 접근이 일어나면 메모리에서 읽는 방식이다. </p>
<h4 id="데이터-복제-및-싱크">데이터 복제 및 싱크</h4>
<p>데이터 복제는 카프카를 장애 허용 시스템(fault tolerant system)으로 동작하도록 하는 원동력이다. 복제를 통해 클러스터로 묶인 브로커 중 일부에 장애가 발생하여도 데이터를 유실하지 않고 안전하게 사용하게 된다.
카프카의 데이터 복제는 파티션 단위로 이루어지고, 토픽을 생성할 때 파티션의 복제 개수도 같이 설정되는데, 선택하지 않으면 브로커에 설정된 옵션을 따른다. 복제 개수는 1~ 브로커의 개수만큼 설정 가능하다.</p>
<p>복제된 파티션의 경우, 리더와 팔로워로 구성되는데 프로듀서나 컨슈머와 직접적으로 통신하는 파티션을 리더, 나머지 복제 데이터를 가지고 있는 파티션을 팔로워라고 한다. 여기서 팔로우 파티션들은 리더 파티션의 오프셋과 자신의 오프셋을 비교하여 차이가 발생하는 경우 리더 파티션으로부터 데이터를 가져와서 자신의 파티션에 저장하는데, 이를 <strong>복제</strong>라고 한다.
복제 시 나머지 브로커에도 파티션의 데이터가 복제되므로 복제 개수만큼 저장 용량이 증가한다는 단점이 있으나, 데이터를 안전하게 사용할 수 있다는 장점이 있다. 
리더 파티션에 장애가 발생할 겨우, 다른 팔로워 파티션 중 하나가 리더 파티션 지위를 넘겨받아 데이터가 유실되지 않고 컨슈머나 프로듀서와 데이터를 주고받을 수 있다. 
<img src="https://velog.velcdn.com/images/2h-kim/post/e2422806-4268-4e39-b385-22d6608cb005/image.png" alt=""></p>
<blockquote>
<p>If replica 1, partition 1</p>
<ul>
<li>broker 사망 시 복구 불가  </li>
</ul>
<p>If replica 2</p>
<ul>
<li>broker 사망해도 복제본에서 복구 가능</li>
<li>folower broker가 leader broker 승계</li>
</ul>
</blockquote>
<h4 id="컨트롤러">컨트롤러</h4>
<p>한 개의 브로커는 컨트롤러 역할을 하는데, 이는 다른 브로커들의 상태를 체크하고 브로커가 클러스터에서 빠지는 경우 해당 브로커에 존재하는 리더 파티션을 재분배 한다.
컨트롤러 역할을 하는 브로커에 장애가 생기면 다른 브로커가 컨트롤러 역할을 한다.</p>
<h4 id="데이터-삭제">데이터 삭제</h4>
<p>카프카의 경우, 컨슈머가 데이터를 소비해도 토픽의 데이터는 삭제되지 않으며 컨슈머나 프로듀서가 데이터 삭제를 요청할 수도 없다. 브로커 만이 데이터 삭제할 수 있는데, 데이터 삭제는 파일 단위로 이루어지며 이것을 <strong>로그 세그먼트</strong>라고 한다. 로그 세그먼트에는 다수의 데이터가 들어가 있어, 일반적인 DB처럼 특정 데이터를 선별해 삭제 불가능하다. 브로커에 <code>log.segment.bytes</code> 또는 <code>log.segment.ms</code>값이 주어지게 되면 세그먼트 파일이 닫히게 된다. 너무 작은 용량으로 하게 될경우 부하가 발생할 수 있으므로 주의해야 한다. <code>log.retention.bytes</code> 또는 <code>log.retention.ms</code> 옵션에 따라 닫힌 세그먼트 파일을 관리하고 이 기간/용량이 넘으면 삭제한다.</p>
<h4 id="컨슈머-오프셋-저장">컨슈머 오프셋 저장</h4>
<p>컨슈머 그룹은 토픽이 특정 파티션으로부터 데이터를 가져가서 처리하고 어느 레코드 까지 가져갔는디 확인하기 위해 오프셋을 커밋한다. 이 때 커밋한 오프셋은 <code>_consumer_offset</code>토픽에 저장되고, 이를 토대로 컨슈머 그룹은 다음 레코드를 가서 처리한다.</p>
<h4 id="코디네이터">코디네이터</h4>
<p>한 대의 브로커는 코디네이터의 역할을 수행한다. 코디네이터는 컨슈머 그룹의 상태를 체크하고, 파티션을 컨슈머와 매칭되도록 분배하는 역할을 한다. <strong>리밸런스</strong>라는 과정을 거치는데 만약 컨슈머가 컨슈머 그룹에서 빠지면 매칭되지 않은 파티션을 정상 동작하는 컨슈머로 할당하여 끊임 없이 데이터가 처리되도록 도와준다.</p>
<h2 id="zookeeper">Zookeeper</h2>
<p>카프카의 메타데이터를 관리하는데 사용된다.
카프카 클러스터로 묶인 브로커들은 동일한 경로의 주키퍼 경로로 선언해야 같은 카프카 브로커 묶음이되며, 만약 클러스터를 여러 개로 운영한다면 한 개의 주피터에 다수의 카프카 클러스터를 연결해서 사용할 수 있다.</p>
<h3 id="connect-zookeeper">connect zookeeper</h3>
<pre><code class="language-bash"># connect zookeeper
# znode를 조회하고 수정할 수 있음
bin/zookeeper-shell.sh  kafka_tutorial:2181

Connecting to kafka_tutorial:2181
Welcome to ZooKeeper!
JLine support is disabled

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
</code></pre>
<h4 id="zookeper-command">zookeper command</h4>
<ul>
<li>root znode의 하위 znode들을 확인<pre><code class="language-bash">ls /</code></pre>
</li>
<li>kafka 브로커에 대한 정보를 확인<pre><code class="language-bash">get /brokers/ids/0</code></pre>
</li>
<li>어느 브로커가 컨트롤러인지 확인<pre><code class="language-bash">get /controller</code></pre>
</li>
<li>카프카에 저장된 토픽들을 확인
여기서 <code>__consumer_offset</code>은 컨슈머 오프셋을 저장하기 위한 용도로 사용되는 토픽<pre><code class="language-bash">ls /brokers/topics</code></pre>
</li>
</ul>
<blockquote>
<p><strong>Zookeeper에서 다수의 카프카 클러스터 사용 방법</strong>
주키퍼의 서로 다른 znode에 카프카 클러스터를 설정하면 된다.
znode는 주키퍼에서 사용하는 데이터 저장 단위로, 파일 시스템 처럼 znode 간에 계층 구조를 가진다. 
2개 이상의 카프카 클러스터를 구축할 때 root node가 아닌 한 단계 아래의 znode를 카프카 브로커 옵션으로 지정하도록 한다. 서로 다른 하위 znode에 연결한 카프카 클러스터는 각 클러스의 데이터에 영향을 미치지 않고 정상 동작한다.</p>
</blockquote>
<h2 id="요약">요약</h2>
<ul>
<li>브로커<ul>
<li>카프카 클라이언트와 데이터를 주고 받기 위해 사용하는 주체이며, 카프카가 설치되어 있는 서버 단위 이다.</li>
<li>보통 3개 이상 브로커로 구성하는 것을 권장한다.</li>
<li>역할은 아래와 같다<ul>
<li>데이터 저장, 전송</li>
<li>데이터 복제, 싱크</li>
<li>컨트롤러</li>
<li>데이터 삭제</li>
<li>컨슈머 오프셋 저장</li>
<li>코디네이터</li>
</ul>
</li>
</ul>
</li>
<li>주키퍼<ul>
<li>카프카의 메타데이터를 관리하는데 사용</li>
<li>카프카 브로커 묶음이 되려면 동일한 경로의 주키퍼 경로로 선언 해야 한다.</li>
<li>2개 이상의 카프카 클러스터를 구축할 때는 root znode가 아닌 한 단계 아래로 지정하도록 한다.</li>
</ul>
</li>
</ul>
]]></description>
        </item>
        <item>
            <title><![CDATA[Kafka tutorial]]></title>
            <link>https://velog.io/@2h-kim/Kafka-tutorial</link>
            <guid>https://velog.io/@2h-kim/Kafka-tutorial</guid>
            <pubDate>Wed, 19 Oct 2022 12:35:34 GMT</pubDate>
            <description><![CDATA[<p>Kafka tutorial을 위해 책에서는 EC2를 이용하였으나, Docker를 이용해서 진행하고자 한다.</p>
<h2 id="kafka-with-docker">Kafka with Docker</h2>
<h3 id="docker로-amazon-linux-접속">Docker로 amazon linux 접속</h3>
<pre><code class="language-bash"># amazon linux install
docker pull amazonlinux

# port 9092, 2181 열어서 실행
docker run -it -p 9092:9092 -p 2181:2181 --name kafka_tutorial amazonlinux /bin/bash</code></pre>
<br/>

<h3 id="java-설치">Java 설치</h3>
<pre><code class="language-bash"># openjdk 1.8
yum install -y java-1.8.0-openjdk-devel.x86_64
# install 확인
java -version</code></pre>
<p>아래와 같이 나오면 성공</p>
<pre><code>openjdk version &quot;1.8.0_342&quot;
OpenJDK Runtime Environment (build 1.8.0_342-b07)
OpenJDK 64-Bit Server VM (build 25.342-b07, mixed mode)</code></pre><br/>

<h3 id="kafka-설치-및-실행">Kafka 설치 및 실행</h3>
<p>최근 버전을 좋아하므로... 최근 버전으로 진행해보고자 한다.(물론 production에서는 stable 버전을 사용해야함)</p>
<pre><code class="language-bash"># wget, tar install
yum install -y wget tar
# kafka 3.3.1, scalar 2.12 version
wget https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz
# unzip
tar xvf kafka_2.12-3.3.1.tgz</code></pre>
<h4 id="브로커-힙-메모리-설정">브로커 힙 메모리 설정</h4>
<p>kafka 브로커는 레코드의 내용은 페이지 캐시로 시스템 메모리를 사용하고, 나머지 객체들은 힙 메모리에 저장하여 사용한다는 특징이 있다.
그래서 kafka 브로커의 경우 힙 메모리를 6GB 이상 설정하지 않는 것을 권장한다.
책에서는 설정 값을 400m로 설정 하던데, 도커이니 한번 2GB를 줘서 실행해보고자 한다.</p>
<pre><code class="language-bash"># Xmx : Java 힙의 최대 크기를 지정하는 것
# Xms : Java 힙의 최초 크기를 지정하는 것
export KAFKA_HEAP_OPTS=&quot;-Xmx2G -Xms2G&quot;</code></pre>
<h4 id="브로커-실행옵션">브로커 실행옵션</h4>
<p><code>config/server.properties</code>에서 카프카 브로커가 클러스터 운영에 필요한 옵션들을 지정할 수 있다. 이번 튜토리얼에서 <code>advertised.listeners=PLAINTEXT://172.19.0.2:9092</code>로 수정(docker container ip)
<code>config/server.properties</code>에 있는 대표적인 옵션은 아래와 같다.</p>
<ul>
<li><code>broker.id</code>
실행하는 카프카 브로커의 번호로, 클러스터를 구축할 때 브로커들은 구분하기 위해 identity한 번호로 설정해야 한다. 만약, 다른 카프카 브로커와 동일한 id를 가질 경우 비정상적인 동작이 발생할 수 있다.</li>
<li><code>listeners</code>
카프카 브로커가 통신을 위해 열어둘 인터페이스 IP, port, protocol을 설정할 수 있으며, 설정을 따로 하지 않으면 모든 IP와 port에서 접속할 수 있다.</li>
<li><code>advertised.listners</code>
카프카 클라이언트 또는 카프카 커맨드 라인 툴에서 접속할 때 사용할 IP와 port정보</li>
<li><code>listener.security.protocol.map</code>
SASL_SSL, SASL_PLAIN 보안 설정 시 프로토콜 매핑을 위한 설정</li>
<li><code>num.network.threads</code>
네트워크를 통한 처리를 할 때 사용할 네트워크 스레드 개수</li>
<li><code>num.io.threads</code>
카프카 브로커 내부에서 사용할 스레드 개수</li>
<li><code>log.dirs</code>
카프카의 경우, 모든 메시지를 로그 세그먼트 파일에 모아서 디스크에 저장한다. 이를 위해 통신을 통해 가져온 데이터를 파일로 저장할 디렉토리의 위치로 경로를 쉼표로 구분해서 여러개의 경로에 저장할 수 있다.</li>
<li><code>num.partitions</code>
파티션의 개수로, 파티션의 수가 많아지면 병렬처리 데이터의 양이 증가 한다.</li>
<li><code>log.retention.hours</code>
브로커가 메시지가 삭제되기까지 걸리는 시간으로 <code>log.retention.minutes</code> 또는 <code>log.retention.ms</code>로도 사용할 수 있다. <code>log.retention.ms</code>값을 설정하여 운영하는 것을 추천하며, <code>log.retention.ms=-1</code>이면 영원히 삭제되지 않는다.</li>
<li><code>log.segment.bytes</code>
브로커가 메시지의 최대 크기를 지정하는 것으로, 데이터양이 많아 이 크기를 채우게 되면 새로운 파일이 생성된다. </li>
<li><code>log.retention.check.interval.ms</code>
브로커가 메시지를 삭제하기 위해 체크하는 간격</li>
<li><code>zookeeper.connect</code>
브로커와 연동할 주키퍼의 IP와 port</li>
<li><code>zookeeper.connection.timeout.ms</code>
주키퍼의 세션 타임아웃 시간으로, 주키퍼에 연결할 때 시간 제한을 두고 그 이상이 되면 더 이상 시도를 하지 않는다</li>
</ul>
<h4 id="주키퍼-실행">주키퍼 실행</h4>
<p>카프카를 실행하기 위해서는 주키퍼가 반드시 필요하다. 주키퍼는 클러스터 설정 리더 정보, 컨트롤러 정보를 담고 있으며, 상용 환경에서는 안전하게 3대 이상의 서버로 구성하여 사용하여야 한다.
여기서는 1대만 실행할 것이며, 이를  <code>Quick-and-dirty single-node</code>라고 부른다고 한다.</p>
<pre><code class="language-bash"># start zookeeper (daemon to run background, if you want to run foreground remove that argument)
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

# check zookeeper run normally
jps -vm</code></pre>
<blockquote>
<p>주키퍼란? <a href="https://velog.io/@2h-kim/Zookeeper">https://velog.io/@2h-kim/Zookeeper</a></p>
</blockquote>
<h4 id="카프카-브로커-실행">카프카 브로커 실행</h4>
<pre><code class="language-bash"># start kafka broker (daemon to run background, if you want to run foreground 
bin/kafka-server-start.sh -daemon config/server.properties

# check kafka run normally
jps -m

# show broker log
tail -f logs/server.log</code></pre>
<h4 id="local과의-통신-확인">local과의 통신 확인</h4>
<blockquote>
<p>윈도우에서는 wsl로 실행</p>
</blockquote>
<pre><code class="language-bash"># kafka download
curl https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz --output kafka.tgz

# unzip
tar -xvf kafka.tgz
cd kafka_2.12-3.3.1/
</code></pre>
<blockquote>
<p>docker과 wsl 네트워크 통신이 안 되는 것 같아서 아래와 같이 진행(여기서 삽질 엄청 함...ㅎㅎㅎ)</p>
</blockquote>
<pre><code class="language-bash"># network 생성
docker network create --driver bridge test-network
# container network 연결
docker network connect test-network kafka_tutorial
# network 연결된 docker container 생성
docker run -itd --name kafka_connect_test_ubuntu --net=test-network ubuntu /bin/bash
# exec
docker exec -it kafka_connect_test_ubuntu /bin/bash
# kafka download
curl https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz --output kafka.tgz
# unzip
tar -xvf kafka.tgz
cd kafka_2.12-3.3.1/
# run
bin/kafka-broker-api-versions.sh --bootstrap-server kafka_tutorial:9092 </code></pre>
<blockquote>
<p>결과 적으로 아래와 같이 connection 성공</p>
</blockquote>
<pre><code class="language-bash">172.19.0.2:9092 (id: 0 rack: null) -&gt; (
        Produce(0): 0 to 9 [usable: 9],
        Fetch(1): 0 to 13 [usable: 13],
        ListOffsets(2): 0 to 7 [usable: 7],
        ...
)</code></pre>
<br/>
<br/>

<hr>
<h3 id="kafka-command-line-tool">Kafka command-line tool</h3>
<p>카프카 커맨드 라인 툴들은 카프카를 운영할 때 가장 많이 접하는 도구로, 커맨드 라인 툴을 이용하여, 브로커 운영에 필요한 다양한 명령을 내릴 수 있다.</p>
<blockquote>
<p><strong><code>--bootstrap-server</code> vs <code>--zookeeper</code></strong>
kafka 2.1버전 포함 이전 버전에서는 kafka command-line tool이 주키퍼와 직접 통신하여 명령을 실행하였으나, 2.2 버전 이후부터는 카프카를 통해 토픽과 관련된 명령을 실행할 수 있게 되었다.</p>
</blockquote>
<br/>

<h4 id="kafka-topicssh">kafka-topics.sh</h4>
<h5 id="토픽-생성">토픽 생성</h5>
<ul>
<li><code>--create</code>로 topic을 생성하라는 명령어를 날림</li>
<li><code>--bootstrap-server</code>로 토픽을 생성할 카프카 클러스터를 구성하는 브로커들의 IP와 port를 적는다.</li>
<li><code>--topic</code>에서 토픽 이름을 적으며, 내부 데이터가 무엇이 있는지 유추가 가능할 정도로 자세히 적는 것을 추천한다.</li>
<li>(Option) <code>--partitions</code>로 파티션의 개수를 지정할 수 있으며, default는 <code>server.properties의 num.partitions</code>이다.</li>
<li>(Option) <code>--replication-factor</code>로 토픽의 파티션을 복제할 복제 개수를 적는다. 1의 경우는 복제를 하지 않는 것이며, 2이면 1개의 복제본을 사용하겠다는 의미이다. max값은 브로커의 개수며, 명시하지 않으면 <code>default.replication.factor</code>옵션의 값을 따른다.</li>
<li>(Option) <code>--config</code>를 통해 추가적인 설정을 할 수 있다.<pre><code class="language-bash">bin/kafka-topics.sh \
--create \
--bootstrap-server kafka_tutorial:9092 \
--partitions 3 \
--replication-factor 1 \
--config retention.ms=86400000 \
--topic topic_kafka_test</code></pre>
<pre><code class="language-bash">Created topic topic_kafka_test.</code></pre>
<blockquote>
<p><strong>토픽 생성</strong>
토픽을 생성하는 방법은 크게 2가지로 아래와 같다.
토픽 생성 시 토픽에 들어오는 데이터 양이나, 병렬로 처리되어야 하는 용량, 보관 기간 등 잘 파악하여 생성하는 것이 중요하다.</p>
</blockquote>
</li>
<li>카프카 컨슈머 또는 프로듀서가 카프카 브로커에 생성되지 않은 토픽에 대해 요청할 때</li>
<li>커맨드 라인 툴로 명시적으로 토픽을 생성(이것을 추천)</li>
</ul>
<h5 id="토픽-리스트-조회">토픽 리스트 조회</h5>
<pre><code class="language-bash">bin/kafka-topics.sh --bootstrap-server kafka_tutorial:9092 --list
</code></pre>
<h5 id="토픽-상세-조회">토픽 상세 조회</h5>
<p>파티션이 몇개인지, 복제된 파티션이 위치한 브로커의 번호, 구성하는 설정 등을 출력한다. 또한, 토픽이 가진 파티션의 리더가 현재 어느 브로커에 있는지 확인 가능하다.</p>
<pre><code class="language-bash">bin/kafka-topics.sh --bootstrap-server kafka_tutorial:9092 --describe --topic topi
c_kafka_test</code></pre>
<pre><code class="language-bash">Topic: topic_kafka_test TopicId: TI_ehzIXQzS0bg6CL2d2-w PartitionCount: 3       ReplicationFactor: 1    Configs: retention.ms=864000
        Topic: topic_kafka_test Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: topic_kafka_test Partition: 1    Leader: 0       Replicas: 0     Isr: 0
        Topic: topic_kafka_test Partition: 2    Leader: 0       Replicas: 0     Isr: 0</code></pre>
<h5 id="토픽-옵션-수정">토픽 옵션 수정</h5>
<p>파티션의 개수 수정은 <code>kafka-topic.sh</code>, 토픽 리텐션 기간 변경은 <code>kafka-configs.sh</code>를 사용하여야 한다.</p>
<pre><code class="language-bash"># kafka partition count change
bin/kafka-topics.sh --bootstrap-server kafka_tutorial:9092 \
 --topic topic_kafka_test \
 --alter \
 --partitions 5

# kafka topic configure change
# if configure is already exists then change configure, else add configure 
bin/kafka-configs.sh --bootstrap-server kafka_tutorial:9092 \
 --entity-type topics \
 --entity-name topic_kafka_test \
 --alter --add-config retention.ms=86400000</code></pre>
<br/>

<h4 id="kafka-console-producersh">kafka-console-producer.sh</h4>
<p>토픽에 데이터를 넣을 수 있는 명령어로, 토픽에 넣는 데이터는 record라고 부르며 key,value 로 이루어져 있다.
<strong>키 값 없이 전송</strong>
이 때, 메세지 키는 null로 기본 설정되어 브로커로 전송된다.</p>
<pre><code class="language-bash">bin/kafka-console-producer.sh \
 --bootstrap-server kafka_tutorial:9092 \
 --topic topic_kafka_test

&gt;hello kafka
&gt;test
&gt;1
&gt;2
&gt;3
&gt;4
&gt;5
&gt;;;</code></pre>
<p><strong>메시지 키를 가지는 레코드 전송</strong>
만약 key.separator에 해당하는 값을 전송하지 않으면 <code>org.apache.kafka.common.KafkaException: No key separator found on line number 1: &#39;k-v&#39;</code>에러 발생</p>
<pre><code class="language-bash"># default of key.separator = \t
bin/kafka-console-producer.sh \
 --bootstrap-server kafka_tutorial:9092 \
 --topic topic_kafka_test \
 --property &quot;parse.key=true&quot; \
 --property &quot;key.seperator=:&quot;</code></pre>
<br/>

<h4 id="kafka-console-consumersh">kafka-console-consumer.sh</h4>
<p>토픽으로 전송한 데이터를 가져올 수 있는 명령어로 <code>--from-beginning</code>옵션을 통해 토픽에 저장된 가장 처음 데이터부터 출력할 수 있다.
<code>--group</code>을 이용하여 컨슈머 그룹을 생성할 수 있다. 컨슈머 그룹은 1개 이상의 컨슈머로 이루어졌으며, 해당 컨슈머 그룹을 통해 가져간 토픽의 메시지는 가져갔다고 commit을 한다. commit은 내가 여기까지 가져갔어라고 브로커에 오프셋 번호를 저장하는 것으로, <code>__consumer_offsets</code>이름의 내부 토픽에 저장된다.</p>
<pre><code class="language-bash">bin/kafka-console-consumer.sh \
 --bootstrap-server kafka_tutorial:9092 \
 --topic topic_kafka_test \
 --from-beginning

# if want to see message key, value
bin/kafka-console-consumer.sh \
 --bootstrap-server kafka_tutorial:9092 \
 --topic topic_kafka_test \
 --property print.key=true \
 --property key.separator=&quot;-&quot; \
 --from-beginning

# consumer group
bin/kafka-console-consumer.sh \
 --bootstrap-server kafka_tutorial:9092 \
 --topic topic_kafka_test \
 --property print.key=true \
 --group consumer-group-test \
 --from-beginning</code></pre>
<p>output of below command-line</p>
<pre><code class="language-bash">null    hello kafka
null    test
null    1
null    2
null    3
null    4
null    5
null    ;;
k2      v2
k1      v1
k3      v3</code></pre>
<blockquote>
<p>partition 수가 2개 이상일 경우, 토픽에 넣은 데이터의 순서를 보장하지 못한다. 만약 데이터의 순서를 보장하고 싶다면 파티션 1개로 구성된 토픽을 만드는 것이다.</p>
</blockquote>
<br/>

<h4 id="kafka-consumer-groupssh">kafka-consumer-groups.sh</h4>
<p><strong>Consumer group list</strong></p>
<pre><code class="language-bash">bin/kafka-consumer-groups.sh \
 --bootstrap-server kafka_tutorial:9092 \
 --list</code></pre>
<p><strong>특정 컨슈머 그룹 세부 정보</strong></p>
<pre><code class="language-bash"># describe consumer group which name is consumer-group-test 
bin/kafka-consumer-groups.sh  \
 --bootstrap-server kafka_tutorial:9092 \
 --group consumer-group-test \
 --describe</code></pre>
<p>output</p>
<ul>
<li>current-offset : 토픽의 가장 최신 오프셋이 몇 번인지 나타냄</li>
<li>log-end-offset : 컨슈머 그룹의 컨슈머가 어디까지 커밋했는지</li>
<li>lag : 컨슈머 그룹이 토픽의 파티션에 있는 데이터를 가져가는 데에 얼마나 지연이 발생하였는지<pre><code class="language-bash">GROUP               TOPIC            PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
consumer-group-test topic_kafka_test 1          1               1               0               -               -               -
consumer-group-test topic_kafka_test 4          8               8               0               -               -               -
consumer-group-test topic_kafka_test 0          0               0               0               -               -               -
consumer-group-test topic_kafka_test 3          1               1               0               -               -               -
consumer-group-test topic_kafka_test 2          1               1               0               -               -    </code></pre>
</li>
</ul>
<br/>

<h4 id="kafka-verifiable-producer-counsumersh">kafka-verifiable-producer, counsumer.sh</h4>
<p>kafka-verifiable로 시작하는 2개의 스크립트를 사용하면 string 타입 메시지 값을 코드 없이 주고 받을 수 있다. 간단한 네트워크 통신 테스트를 할 때 유용하다.</p>
<p><strong>kafka-verifiable-consumer</strong></p>
<pre><code class="language-bash">bin/kafka-verifiable-producer.sh \
 --bootstrap-server kafka_tutorial:9092 \
 --max-messages 10 \
 --topic topic_kafka_test</code></pre>
<p><strong>kafka-verifiable-producer</strong></p>
<pre><code class="language-bash">bin/kafka-verifiable-consumer.sh \
 --bootstrap-server kafka_tutorial:9092 \
 --group-id consumer-group-test \
 --topic topic_kafka_test</code></pre>
<br/>

<h4 id="kafka-delete-recordssh">kafka-delete-records.sh</h4>
<p>이미 적재된 토픽의 데이터를 지우는 방법으로, 이미 적재된 토픽의 데이터 중 가장 오래된 데이터 부터 특정 시점의 오프셋까지 삭제가 가능하다.
파티션에 저장된 특정 데이터만 삭제할 수 없다는 점에서 유의가 필요하다.
~/test-delete-record.json</p>
<pre><code class="language-json">{
  &quot;partitions&quot; : [
    {
      &quot;topic&quot; : &quot;topic_kafka_test&quot;,
      &quot;partition&quot;: 4,
      &quot;offset&quot;: 5
    }
    ],
  &quot;version&quot;: 1
}</code></pre>
<p><strong>bash</strong></p>
<pre><code class="language-bash">bin/kafka-delete-records.sh \
 --bootstrap-server kafka_tutorial:9092 \
 --offset-json-file ~/test-delete-record.json</code></pre>
<p>성공 시</p>
<pre><code class="language-bash">Executing records delete operation
Records delete operation completed:
partition: topic_kafka_test-4   low_watermark: 5</code></pre>
<p>현재 record보다 더 많은 offset 요구시</p>
<pre><code>Executing records delete operation
Records delete operation completed:
partition: topic_kafka_test-0   error: org.apache.kafka.common.errors.OffsetOutOfRangeException: The requested offset is not within the range of offsets maintained by the server.</code></pre><br/>
<br/>

<hr>
<h3 id="요약">요약</h3>
<ul>
<li>zookeeper의 경우, kafka-cluster 설정 리더 정보, 컨트롤러 정보를 담고 있어 kafka를 실행하는 데 필수 애플리케이션이다.</li>
<li>producer를 이용해 데이터를 집어넣고 consumer를 통해 데이터를 가져오며, consumer-group을 이용하여 consumer가 어느 데이터 까지 가져갔는지 체크포인트를 남길 수 있다.</li>
<li>kafka command-line tool의 경우 운영 시 자주 사용하므로 손으로 익히는 것이 좋다.</li>
</ul>
<hr>
<h3 id="추가-출처">추가 출처</h3>
<ul>
<li><a href="https://data-engineer-tech.tistory.com/11">https://data-engineer-tech.tistory.com/11</a></li>
</ul>
<p>###</p>
]]></description>
        </item>
        <item>
            <title><![CDATA[Kafka 특징]]></title>
            <link>https://velog.io/@2h-kim/Kafka-%ED%8A%B9%EC%A7%95</link>
            <guid>https://velog.io/@2h-kim/Kafka-%ED%8A%B9%EC%A7%95</guid>
            <pubDate>Wed, 19 Oct 2022 08:06:01 GMT</pubDate>
            <description><![CDATA[<p>Kafka가 데이터 파이프라인으로 적합한 이유는 아래와 같다.</p>
<p><strong>높은 처리량</strong>
프로듀서가 브로커에게 데이터를 보낼 때, 컨슈머가 브로커에게 데이터를 받을 때 모두 배치형식으로 이루어진다.
대량의 데이터를 송수신할 때 네트워크 비용은 무시할 수 없는 규모가 되며, 동일한 양의 데이터를 보낼 때 통신횟수를 줄여 동일 시간 내에 더 많은 데이터를 전송할 수 있다.
많은 양의 데이터를 묶음 단위인 배치로 처리할 수 있기 때문에 대용량의 실시간 로그 데이터를 처리하는 데에 적합하다.
또한, 파티션 단위를 통해 동일한 목적의 데이터를 여러 파티션에 분배하고 데이터를 병렬처리 가능하다. 컨슈머의 개수를 늘려서 동일 시간당 데이터 처리량을 늘릴 수 있다.</p>
<p><strong>확장성</strong>
카프카는 가변적인 상황에서 안정적으로 확장 가능하도록 설계가 되어있다. 카프카 클러스터는 데이터가 작을 경우 scale-in으로 브로커의 수를 줄이고, 데이터가 많아지면 scale-out으로 브로커의 개수를 자연스럽게 늘린다.
카프카의 스케일 인/아웃은 무중단 운영을 지원하여 계속해서 데이터를 처리해야 하는 기업에서도 사용 가능하다.</p>
<p><strong>영속성</strong>
카프카의 경우 다른 메시지 플랫폼(e.g. redis)와 달리 전송받은 데이터를 파일 시스템에 저장한다. 페이지 캐시 메모리 영역을 사용하여 한번 읽은 파일 내용은 메모리에 저장시켰다가 다시 사용하는 방식이기 때문에 데이터를 저장, 전송하여도 처리량이 높다. 심지어 장애 발생으로 인해 급작스럽게 종료되더라도 프로세스를 재시작하여 안전하게 데이터를 다시 처리할 수 있다.</p>
<blockquote>
<p><em>영속성 : 데이터를 생성한 프로그램이 종료되더라도 사라지지 않은 데이터의 특성</em></p>
</blockquote>
<p><strong>고가용성</strong>
3개 이상의 서버들로 운영되는 카프카 클러스터는 서버 장애가 발생하여도 무중단으로 안전하고 지속적으로 데이터를 처리할 수 있다. 
클러스터로 이루어진 카프카는 데이터의 복제를 통해 프로듀서로 받은 데이터를 1대의 브로커에만 저장하는 것이 아닌 다른 브로커에도 저장을 한다. 이로 인해 한 브로커에 장애가 나더라도 복제된 데이터가 다른 브로커에 저장되어 있어 지속적으로 데이터 처리가 가능하다.</p>
<blockquote>
<p>브로커는 3대 이상으로 구성해야 하는 것이 좋다. 1대는 서비스 장애가 발생하면 서비스의 장애로 이어지므로 테스트에서만 진행하는 것이 좋으며, 2대의 경우 데이터 복제되는 시간 차이로 일부 데이터 유실 가능성이 있다. </p>
</blockquote>
<h3 id="요약">요약</h3>
<ul>
<li>Kafka는 아래와 같은 이유 때문에 대용량 데이터 파이프라인을 안전하고 확장성 높게 운영할 수 있도록 설계되었다.<ul>
<li>높은 처리량</li>
<li>확장성</li>
<li>영속성</li>
<li>고가용성</li>
</ul>
</li>
</ul>
]]></description>
        </item>
        <item>
            <title><![CDATA[Kafka의 등장]]></title>
            <link>https://velog.io/@2h-kim/Kafka%EC%9D%98-%EB%93%B1%EC%9E%A5</link>
            <guid>https://velog.io/@2h-kim/Kafka%EC%9D%98-%EB%93%B1%EC%9E%A5</guid>
            <pubDate>Wed, 19 Oct 2022 07:28:41 GMT</pubDate>
            <description><![CDATA[<p>kafka가 있기 전까지는 단방향 통신을 통해 source application에서 target application으로 연동하는 코드를 작성하였고, 아키텍처가 복잡하지 않아 운영에 큰 문제가 없었다.<br>하지만 시간이 지남에 따라 source 및 target 애플리케이션 수가 증가하였고, 이로 인해 파이프라인 개수가 많아지면서 코드 및 버전관리에 한계가 생기게 되었다. 또한, target 애플리케이션의 장애가 source 애플리케이션에 그대로 전달되기도 하였다.
<img src="https://velog.velcdn.com/images/2h-kim/post/5f0a0d70-de8f-4908-ba2e-4a0d7b367c6e/image.png" alt=""></p>
<br/>
<br/>
<br/>

<p>이를 해결하기 위해, 링크드인 데이터팀은 <strong>Apache Kafka</strong>라는 시스템을 만들었다.<br>카프카는 중앙집중화를 통해 여러 source의 데이터를 한 곳에서 실시간으로 관리할 수 있게 되었다. 
즉, 기업의 대용량 데이터를 수집하고 이를 사용자들이 실시간 스트림으로 소비할 수 있게 만들어 주는 중추 신경으로 동작한다.<br>카프카를 중앙에 배치함으로, sourc 애플리케이션과 target 애플리케이션의 커플링을 완화하였다.(의존도를 낮춤)</p>
<p><img src="https://velog.velcdn.com/images/2h-kim/post/e8c02d23-7b07-48c0-908b-10640dd877be/image.png" alt=""></p>
<br/>
<br/>
<br/>

<p>카프카 내부에 데이터가 저장되는 파티션의 동작은 FIFO방식인 큐와 유사하며, 큐에 데이터를 보내는 것이 프로듀서, 가져오는 것이 컨슈머다.
카프카에 전달할 수 있는 데이터 포멧은 사실상 제한이 없으며, 카프카 클라이언트에서는 기본적으로 ByteArray, ByteBuffer, Long, Double, String 타입에 대해 serializer, deserializer 클래스가 제공된다.
만약 필요 시, 커스텀 클래스를 생성하여 사용하면 된다.</p>
<p>카프카는 최소 3대 이상의 서버에서 분산 운영하여 프로듀서를 통해 전송받은 데이터를 파일 시스템에 안전하게 기록한다. 카프카 클러스터 중 일부 서버에 장애가 발생하더라도 데이터를 지속적으로 복제하여 안전하게 운영 가능하다.</p>
<p>카프카는 현재 많은 기업에서 사용하고 있으며, 현재 오픈 소스로 제공되고 있다.</p>
<h3 id="요약">요약</h3>
<ul>
<li>Before Kafka<ul>
<li>source 애플리케이션에서 target 애플리케이션으로 단방향 통신</li>
<li>시간이 지남에 따라 source, target 애플리케이션 증가</li>
<li>데이터 전송라인의 증가로 배포, 장애 대응 힘듦</li>
<li>데이터 전송 시 Protocal foramat의 파편화 심화</li>
</ul>
</li>
<li>After Kafka<ul>
<li>source와 target간의 커플링을 약하게 만듦</li>
<li>분산 및 복제 운영으로 서버 장애 시에도 안전하게 사용 가능</li>
<li>낮은 지연과 높은 처리량을 지님</li>
</ul>
</li>
<li>Producer와 Consumer로 구성<ul>
<li>Producer : kafka에 데이터를 보내는 아이</li>
<li>Consumer : kafka에서 데이터를 가져가는 아이</li>
</ul>
</li>
</ul>
]]></description>
        </item>
        <item>
            <title><![CDATA[Kafka 개인 정리]]></title>
            <link>https://velog.io/@2h-kim/Kafka-%EA%B0%9C%EC%9D%B8-%EC%A0%95%EB%A6%AC</link>
            <guid>https://velog.io/@2h-kim/Kafka-%EA%B0%9C%EC%9D%B8-%EC%A0%95%EB%A6%AC</guid>
            <pubDate>Wed, 19 Oct 2022 07:26:49 GMT</pubDate>
            <description><![CDATA[<p>해당 시리즈의 경우, 데브원영님의 유튜브와 아파치 카프카 애플리케이션 프로그래밍 with 자바라는 책을 기반으로 작성된 글입니다.</p>
<p>데브원영님 유튜브 링크</p>
<ul>
<li><a href="https://www.youtube.com/c/%EB%8D%B0%EB%B8%8C%EC%9B%90%EC%98%81DevWonYoung">https://www.youtube.com/c/%EB%8D%B0%EB%B8%8C%EC%9B%90%EC%98%81DevWonYoung</a></li>
</ul>
<p>아파치 카프카 애플리케이션 프로그래밍 with 자바</p>
<ul>
<li><a href="https://www.aladin.co.kr/shop/wproduct.aspx?ItemId=268985828">https://www.aladin.co.kr/shop/wproduct.aspx?ItemId=268985828</a></li>
</ul>
<p>관련 인프런</p>
<ul>
<li><a href="https://www.inflearn.com/course/%EC%95%84%ED%8C%8C%EC%B9%98-%EC%B9%B4%ED%94%84%EC%B9%B4-%EC%95%A0%ED%94%8C%EB%A6%AC%EC%BC%80%EC%9D%B4%EC%85%98-%ED%94%84%EB%A1%9C%EA%B7%B8%EB%9E%98%EB%B0%8D">https://www.inflearn.com/course/%EC%95%84%ED%8C%8C%EC%B9%98-%EC%B9%B4%ED%94%84%EC%B9%B4-%EC%95%A0%ED%94%8C%EB%A6%AC%EC%BC%80%EC%9D%B4%EC%85%98-%ED%94%84%EB%A1%9C%EA%B7%B8%EB%9E%98%EB%B0%8D</a></li>
</ul>
]]></description>
        </item>
        <item>
            <title><![CDATA[데이터 로드]]></title>
            <link>https://velog.io/@2h-kim/%EB%8D%B0%EC%9D%B4%ED%84%B0-%EB%A1%9C%EB%93%9C</link>
            <guid>https://velog.io/@2h-kim/%EB%8D%B0%EC%9D%B4%ED%84%B0-%EB%A1%9C%EB%93%9C</guid>
            <pubDate>Thu, 06 Oct 2022 16:32:49 GMT</pubDate>
            <description><![CDATA[<h1 id="데이터-로드">데이터 로드</h1>
<h2 id="redshift-warehouse에-데이터-로드">Redshift warehouse에 데이터 로드</h2>
<ul>
<li>S3에서 Redshift로 데이터를 로드하는 가장 효율적인 방법은 COPY 명령을 사용하는 것이다.</li>
<li>COPY는 로드 중인 데이터를 대상 테이블의 기존행에 추가한다.</li>
<li>Insert 권한 필요</li>
</ul>
<p><strong>COPY 명령어</strong></p>
<pre><code class="language-sql">COPY table_name
[ column_list ]
FROM source_file
authorization
[ [ FORMAT ] [ AS ] data_format ]
[ parameter [ argument ] [ ,.. ] ]</code></pre>
<ul>
<li><a href="https://docs.aws.amazon.com/ko_kr/redshift/latest/dg/r_COPY.html">https://docs.aws.amazon.com/ko_kr/redshift/latest/dg/r_COPY.html</a></li>
</ul>
<p>e.g.</p>
<pre><code class="language-sql">COPY my_schema.my_table
FROM &#39;s3://bucket-name/file.csv&#39;
iam_role &#39;&lt;my-arn&gt;&#39;;</code></pre>
<h3 id="증분-및-전체-로드">증분 및 전체 로드</h3>
<ul>
<li>immutable 데이터이다면, 한 번 로드하면 이제 수정 필요 없음</li>
<li>만약 증분 데이터가 아닌 전체 데이터일 경우 truncate로 데이터를 날린 뒤, 로드해야 함<ul>
<li><a href="https://docs.aws.amazon.com/ko_kr/redshift/latest/dg/r_TRUNCATE.html">https://docs.aws.amazon.com/ko_kr/redshift/latest/dg/r_TRUNCATE.html</a></li>
</ul>
</li>
<li>증분데이터인 경우, 기록을 모두 가지고 있는게 이상적</li>
</ul>
<h3 id="cdc-로그에서-추출한-데이터-로드">CDC 로그에서 추출한 데이터 로드</h3>
<ul>
<li>아래와 같이 이벤트 타입과 함께 데이터를 적재하고, transform 파이프라인 딴에서 변환단계를 거치는 것이 좋음<table>
<tr>
  <th>
    EventType
  </th>
  <th>
    OrderId
  </th>
  <th>
    OrderStatus
  </th>
  <th>
    LastUpdated
  </th>
</tr>
<tr>
  <td>
    insert
  </td>
  <td>
    1
  </td>
  <td>
    assign
  </td>
  <td>
    2022-10-01 06:00
  </td>
</tr>
<tr>
  <td>
    update
  </td>
  <td>
    1
  </td>
  <td>
    pickup
  </td>
  <td>
    2022-10-01 12:00
  </td>
</tr>
<tr>
  <td>
    delete
  </td>
  <td>
    1
  </td>
  <td>
    pickup
  </td>
  <td>
    2022-10-01 12:01
  </td>
</tr>
</table>

</li>
</ul>
<br/>

<hr>
<h2 id="snowflake-warehouse를-대상으로-구성">Snowflake warehouse를 대상으로 구성</h2>
<p>이 부분은 snowflake를 직접 써봐야 이해가 될 것 같아서 지금은 패스</p>
]]></description>
        </item>
        <item>
            <title><![CDATA[데이터 추출 ]]></title>
            <link>https://velog.io/@2h-kim/%EB%8D%B0%EC%9D%B4%ED%84%B0-%EC%B6%94%EC%B6%9C</link>
            <guid>https://velog.io/@2h-kim/%EB%8D%B0%EC%9D%B4%ED%84%B0-%EC%B6%94%EC%B6%9C</guid>
            <pubDate>Thu, 06 Oct 2022 11:22:53 GMT</pubDate>
            <description><![CDATA[<h1 id="문서-목적">문서 목적</h1>
<p>해당 문서는 <code>데이터 파이프라인 핵심 가이드</code>를 읽고 필요 내용을 정리한 문서이다.</p>
<h1 id="데이터-추출">데이터 추출</h1>
<p>언어는 모두 python으로 진행
샘플 데이터는 아래 데이터로 진행</p>
<ul>
<li><a href="https://archive.ics.uci.edu/ml/machine-learning-databases/00352/Online%20Retail.xlsx">https://archive.ics.uci.edu/ml/machine-learning-databases/00352/Online%20Retail.xlsx</a><h2 id="mysql">MySQL</h2>
<h3 id="table-create">table create</h3>
<h4 id="create-table">Create table</h4>
<pre><code class="language-sql">DROP TABLE IF EXISTS OnlineRetail;
CREATE TABLE OnlineRetail(
  voiceNo varchar(10),
  stockCode varchar(10),
  description varchar(200),
  quantity int,
  invoiceDate timestamp,
  unitprice double,
  customerId int,
  country varchar(20)
);</code></pre>
<h4 id="generate-data">Generate data</h4>
아래와 같이 script로 밀어넣음<pre><code class="language-python">import pymysql
import pandas as pd
</code></pre>
</li>
</ul>
<p>data = pd.read_excel(&#39;./Online Retail.xlsx&#39;)</p>
<p>host = &quot;localhost&quot;
port = &quot;3306&quot;
username = &quot;root&quot;
passwd = &quot;password&quot;
db_name = &#39;datapipeline&#39;</p>
<p>db = pymysql.connect(user=username, passwd=passwd, host=host, db=db_name, charset=&#39;utf8&#39;)</p>
<p>cur = db.cursor(pymysql.cursors.DictCursor)
data_2 = data.where(data.notnull(), None)
data_value = data_2.values.tolist()</p>
<p>insert_sql = &quot;INSERT INTO <code>OnlineRetail</code> VALUES (%s, %s, %s, %s, %s, %s, %s, %s);&quot;
cur.executemany(insert_sql, data_value)
db.commit()</p>
<p>cur.close()
db.close()</p>
<pre><code>
---

### SQL 사용 추출
구현하기 간단하지만, 대규모 데이터셋에서는 확장성이 떨어진다.
전체 추출과 증분 추출 사이에도 트레이드 오프가 있음
#### 전체 추출
이 부분은 말그대로 테이블 전체를 가져오는 것으로, 대용량 테이블의 경우 실행하는데 오랜 시간이 걸릴 수 있음
목적지에 있는 대상 테이블을 먼저 삭제하고 새로 추출된 데이터를 대상 데이터에 로드
##### e.g
```sql
select *
from OnlineRetail</code></pre><h4 id="증분-추출">증분 추출</h4>
<p>추출 작업의 마지막 실행 이후 변경되거나 추가된 원본 테이블의 레코드만 추출
목적지에 있는 대상 테이블 데이터에 추가한다. 추가함으로써 기존 데이터 뿐만 아니라 업데이트된 기록도 모두 가지게 된다.</p>
<h5 id="eg">e.g.</h5>
<pre><code class="language-sql">select *
from OnlineRetail
where invoiceDate &gt;= {{ last_update_date }}</code></pre>
<h4 id="한계점">한계점</h4>
<ol>
<li>삭제된 행은 캡처되지 않음
원본 테이블에서 행이 삭제되면 알 수 없으며 해당 레코드는 대상 테이블에서 아무것도 변경되지 않은 것처럼 남아 있음</li>
<li>원본 테이블에는 마지막 업데이트된 시간에 대한 신뢰할 수 있는 타임스탬프가 있어야 한다.
소스 시스템 테이블에서 해당 컬럼이 없거다 업데이트되지 않는다면 소싱 불가능하다.<br/>

</li>
</ol>
<h4 id="example-code">example code</h4>
<pre><code class="language-python">import csv
import pymysql

last_update_date = &#39;2011-11-01&#39;
query = &quot;&quot;&quot;
SELECT *
FROM OnlineRetail
WHERE  invoiceDate &gt;= %s;
&quot;&quot;&quot;
output_file_name = &#39;online_reatail_{}.csv&#39;.format(
    last_update_date.replace(&#39;-&#39;, &#39;&#39;)
)

# mysql connect
host = &quot;localhost&quot;
port = &quot;3306&quot;
username = &quot;root&quot;
passwd = &quot;password&quot;
db_name = &#39;datapipeline&#39;

db = pymysql.connect(user=username, passwd=passwd, port=int(port), host=host, db=db_name, charset=&#39;utf8&#39;)

if conn is None:
    raise Error(&quot;Connection Error&quot;)

cur = db.cursor()
cur.execute(query, (last_update_date, ))

result = cur.fetchall()

with open(output_file_name, &#39;w&#39;) as f:
    csv_w = csv.writer(f, delimiter=&#39;|&#39;)
    csv_w.writeroows(result)

cur.close()
conn.close()
</code></pre>
<hr>
<h3 id="이진-로그-복제">이진 로그 복제</h3>
<p>구현이 복잡하지만, 원본 테이블의 변경되는 데이터 볼륨이 크거나 MySQL 소스에서 데이터를 더 자주 수집해야 하는 경우에 적합
이진 로그의 경우 CDC의 한 형태로, 데이터 베이스에서 수행된 모든 작업에 대한 기록을 보관하는 로그이다.
원래 이진 로그의 목적은 MySQL 인스턴스로 데이터를 복제하기 위한 것(레플리카 DB)이지만, 이 것을 데이터 웨어하우스로 수집하는데 사용 가능하다.
복제 순서로는 아래와 같다.(e.g. mysql)</p>
<ol>
<li>이진 로그 활성화 및 구성</li>
<li>초기 전체 테이블 추출 실행 및 로드</li>
<li>지속적 이진 로그 추출</li>
<li>추출된 이진 로그를 데이터 웨어하우스로 변환 로드</li>
</ol>
<blockquote>
<p>이진 로그 구성을 변경하기 전, 항상 데이터베이스 소유자와 논의하고 진행</p>
</blockquote>
<br/>

<h4 id="이진-로깅-활성화-여부-조회">이진 로깅 활성화 여부 조회</h4>
<pre><code class="language-sql">SELECT variable_value
FROM performance_schema.global_variables
WHERE variable_name=&#39;log_bin&#39;;</code></pre>
<pre><code class="language-bash">+----------------+
| variable_value |
+----------------+
| ON             |
+----------------+</code></pre>
<br/>

<h4 id="이진-로깅-형식-확인">이진 로깅 형식 확인</h4>
<p>MySQL 최신 버전에서는 아래와 같이 구분</p>
<ul>
<li><p>STATEMENT</p>
<ul>
<li>이진 로그에 행을 삽입하거나 수정하는 행동들에 대해 SQL문 자체 기록</li>
<li>다른 MySQL 데이터 베이스로 복제하는 경우, 이게 유용하지만 데이터 웨어하우스와 호환되지 않을 수 있음</li>
</ul>
</li>
<li><p>ROW </p>
<ul>
<li>테이블의 행에 대한 모든 변경 사항이 행 자체의 데이터로 이진 로그 행에 표시</li>
</ul>
</li>
<li><p>MIXED</p>
<ul>
<li>STATEMENT + ROW</li>
</ul>
</li>
</ul>
<h5 id="확인-방법">확인 방법</h5>
<pre><code class="language-sql">SELECT variable_value
FROM performance_schema.global_variables
WHERE variable_name=&#39;binlog_format&#39;;</code></pre>
<pre><code>+----------------+
| variable_value |
+----------------+
| ROW            |
+----------------+</code></pre><blockquote>
<p>이진 로그 형식의 변화의 경우, my.cnf 파일에서 설정된다.</p>
</blockquote>
<h4 id="test를-위한-table-생성">test를 위한 table 생성</h4>
<pre><code class="language-sql">CREATE TABLE IF NOT EXISTS TestBinLogStream(
    id int(10) NOT NULL AUTO_INCREMENT PRIMARY KEY,
    status varchar(30),
    createdAt TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP 
                     ON UPDATE CURRENT_TIMESTAMP
);

INSERT INTO TestBinLogStream (status) VALUES (&quot;submitted&quot;);
INSERT INTO TestBinLogStream (status) VALUES (&quot;submitted&quot;);
INSERT INTO TestBinLogStream (status) VALUES (&quot;submitted&quot;);
UPDATE TestBinLogStream SET status = &#39;pickedUp&#39; WHERE id=1;
INSERT INTO TestBinLogStream (status) VALUES (&quot;submitted&quot;);
INSERT INTO TestBinLogStream (status) VALUES (&quot;submitted&quot;);
UPDATE TestBinLogStream SET status = &#39;pickedUp&#39; WHERE id=2;
UPDATE TestBinLogStream SET status = &#39;completed&#39; WHERE id=1;
UPDATE TestBinLogStream SET status = &#39;completed&#39; WHERE id=2;
UPDATE TestBinLogStream SET status = &#39;pickedUp&#39; WHERE id=3;
UPDATE TestBinLogStream SET status = &#39;pickedUp&#39; WHERE id=4;
UPDATE TestBinLogStream SET status = &#39;completed&#39; WHERE id=3;
UPDATE TestBinLogStream SET status = &#39;pickedUp&#39; WHERE id=5;
UPDATE TestBinLogStream SET status = &#39;completed&#39; WHERE id=4;
UPDATE TestBinLogStream SET status = &#39;completed&#39; WHERE id=5;
DELETE FROM TestBinLogStream WHERE id = 3;</code></pre>
<h4 id="python">python</h4>
<h5 id="library">library</h5>
<pre><code class="language-bash">pip install mysql-replication</code></pre>
<h5 id="code">code</h5>
<pre><code class="language-python">import pymysqlreplication
from pymysqlreplication import BinLogStreamReader, row_event


def event_action_parsing(event):
    if isinstance(event, row_event.DeleteRowsEvent):
        return &quot;delete&quot;
    elif isinstance(event, row_event.UpdateRowsEvent):
        return &quot;update&quot;
    elif isinstance(event, row_event.WriteRowsEvent):
        return &quot;insert&quot;
    else:
        raise ValueError(f&quot;Unknown type tracked : {type(event)}&quot;)

mysql_settings = {
        &quot;host&quot; : host,
        &quot;port&quot; : int(port),
        &quot;user&quot;: username,
        &quot;passwd&quot;: passwd
    }


bin_log_stream = BinLogStreamReader(    
    connection_settings=mysql_settings,
    server_id=0,
    only_tables=(&#39;TestBinLogStream&#39;), # target table
    only_events=(
            row_event.DeleteRowsEvent,
            row_event.UpdateRowsEvent,
            row_event.WriteRowsEvent
        )    
)

database_event = []

for event in bin_log_stream:
    action_type = event_action_parsing(event)
    for row in event.rows:
        temp_row = {}
        temp_row[&#39;action&#39;] = action_type
        if action_type == &quot;update&quot;:
            temp_row.update(row[&quot;after_values&quot;].items())
        else:
            temp_row.update(row[&quot;values&quot;].items())
        database_event.append(temp_row)

bin_log_stream.close()
</code></pre>
<h5 id="result">result</h5>
<pre><code class="language-python">[
    {
        &quot;action&quot;: &quot;insert&quot;,
        &quot;id&quot;: 1,
        &quot;status&quot;: &quot;submitted&quot;,
        &quot;createdAt&quot;: &quot;2022-10-06 19:15:21&quot;,
        &quot;updated_at&quot;: &quot;2022-10-06 19:15:21&quot;
    }, 
    ...
    {
        &quot;action&quot;: &quot;update&quot;,
        &quot;id&quot;: 5,
        &quot;status&quot;: &quot;completed&quot;,
        &quot;createdAt&quot;: &quot;2022-10-06 19:22:32&quot;,
        &quot;updated_at&quot;: &quot;2022-10-06 19:24:25&quot;
    },
    {
        &quot;action&quot;: &quot;delete&quot;,
        &quot;id&quot;: 3,
        &quot;status&quot;: &quot;completed&quot;,
        &quot;createdAt&quot;: &quot;2022-10-06 19:20:45&quot;,
        &quot;updated_at&quot;: &quot;2022-10-06 19:24:23&quot;
    }
]</code></pre>
<hr>
<br/>

<h2 id="kafka-및-debezium을-이용한-스트리밍-데이터-수집">Kafka 및 Debezium을 이용한 스트리밍 데이터 수집</h2>
<h3 id="debezium">Debezium</h3>
<blockquote>
<p>Debezium is an open source distributed platform for change data capture. Start it up, point it at your databases, and your apps can start responding to all of the inserts, updates, and deletes that other apps commit to your databases. Debezium is durable and fast, so your apps can respond quickly and never miss an event, even when things go wrong.
<a href="https://debezium.io/">공식문서</a></p>
</blockquote>
<ul>
<li>CDC 시스템에서 행 수준 변경을 캡처한 후 다른 시스템에서 사용할 수 있는 이벤트로 스트리밍 해주는 시스템</li>
<li>아래와 같이 세가지 주요 구성 요소가 있음<ul>
<li>zookeeper : 분산 환경을 관리, 각 서비스의 구성을 처리</li>
<li>kafka : 확장성이 뛰어난 데이터 파이프라인을 구축하는 데 일반적으로 사용되는 분산 스트리밍 플랫폼</li>
<li>kafka conect : 데이터를 카프카를 통해 쉽게 스트리밍할 수 있도록 카프카를 다른 시스템과 연결하는 도구. CDC 시스템의 데이터를 카프카 토픽으로 변환</li>
</ul>
</li>
</ul>
<h3 id="flow-example">Flow example</h3>
<p><img src="https://velog.velcdn.com/images/2h-kim/post/780d0765-e72c-4ff2-92e4-64373854d2b6/image.png" alt=""></p>
]]></description>
        </item>
    </channel>
</rss>