<?xml version="1.0" encoding="utf-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom">
    <channel>
        <title>jaekyu_lim.log</title>
        <link>https://velog.io/</link>
        <description>공부 기록</description>
        <lastBuildDate>Mon, 11 Sep 2023 08:37:39 GMT</lastBuildDate>
        <docs>https://validator.w3.org/feed/docs/rss2.html</docs>
        <generator>https://github.com/jpmonette/feed</generator>
        <copyright>Copyright (C) 2019. jaekyu_lim.log. All rights reserved.</copyright>
        <atom:link href="https://v2.velog.io/rss/jaekyu_lim" rel="self" type="application/rss+xml"/>
        <item>
            <title><![CDATA[[8월 미니프로젝트] Spark 사용해보기 - 2]]></title>
            <link>https://velog.io/@jaekyu_lim/8%EC%9B%94-%EB%AF%B8%EB%8B%88%ED%94%84%EB%A1%9C%EC%A0%9D%ED%8A%B8-Spark-%EC%82%AC%EC%9A%A9%ED%95%B4%EB%B3%B4%EA%B8%B0-2</link>
            <guid>https://velog.io/@jaekyu_lim/8%EC%9B%94-%EB%AF%B8%EB%8B%88%ED%94%84%EB%A1%9C%EC%A0%9D%ED%8A%B8-Spark-%EC%82%AC%EC%9A%A9%ED%95%B4%EB%B3%B4%EA%B8%B0-2</guid>
            <pubDate>Mon, 11 Sep 2023 08:37:39 GMT</pubDate>
            <description><![CDATA[<h2 id="쇼핑몰-데이터-eda">쇼핑몰 데이터 EDA</h2>
<blockquote>
<p>캐글에서 가지고 온 쇼핑몰 로그 데이터(8,9월 자료)를 병합하여 탐색한 내용</p>
</blockquote>
<h3 id="데이터-합치기">데이터 합치기</h3>
<p>스파크 세션을 띄우고 스키마를 자동으로 설정하게 하고, 헤더가 있는 자료라는 옵션을 주어 불러왔다. 이후 <code>union</code>을 이용하여 합쳤다.</p>
<ul>
<li><code>union</code> - 데이터를 합치고 중복된 행을 제거</li>
<li><code>unionAll</code> - 데이터를 합치고 중복된 행을 유지</li>
</ul>
<p><code>printSchema</code>를 활용하여 스키마 구조 탐색</p>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/ebe4c323-ebbd-4b9b-9fe2-7d24dd35f232/image.png" alt=""></p>
<h3 id="category_code-전처리">category_code 전처리</h3>
<p>csv파일에 <code>category_code</code> 칼럼이 <code>electronics.audio.headphone</code>과 같은 형식으로 <code>.</code>을 기준으로 대, 중, 소분류가 되어있었다.</p>
<p>이를 나눠서 탐색하고자 <code>.</code>을 기준으로 나누고 새로운 칼럼을 생성했다.
major_category, intermediate_category, minor_category 각각 대, 중, 소 분류 칼럼으로 생성</p>
<pre><code class="language-python">from pyspark.sql.functions import split, col

# DataFrame의 category_code 칼럼을 대, 중, 소 분류로 분할
shop = result_df.withColumn(&quot;category_list&quot;, split(col(&quot;category_code&quot;), &quot;\.&quot;))

# 대분류, 중분류, 소분류 칼럼 생성
shop = shop.withColumn(&quot;major_category&quot;, col(&quot;category_list&quot;)[0])
shop = shop.withColumn(&quot;intermediate_category&quot;, col(&quot;category_list&quot;)[1])
shop = shop.withColumn(&quot;minor_category&quot;, col(&quot;category_list&quot;)[2])

# 중복되는 칼럼 제거
shop = shop.drop(&quot;category_list&quot;)
# 결과 확인
shop.show()
</code></pre>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/a6984d96-9402-42ad-a74d-2496462dbcff/image.png" alt=""></p>
<p>카테고리를 대상으로 데이터를 분석하고 싶어 카테고리가 <code>null</code>값인 경우를 제외하였다.</p>
<pre><code class="language-python">shop = shop.filter(col(&quot;category_code&quot;).isNotNull())</code></pre>
<p>이후 데이터의 수를 확인해보니 아래와 같이 나왔다.</p>
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/5aa94018-0a21-4e4a-81de-85dd5d344090/image.png">


<h3 id="eda">EDA</h3>
<blockquote>
<p>”저희 고객사별로 이번 분기 프로모션 지원 대상좀 추려보려고 하는데, 현재 <strong>카테고리</strong> 별로 <strong>전환율</strong>이랑 <strong>매출 현황</strong>좀 분석해주세요” 라는 물음을 배경으로 하여
전환율 현황 분석 리포트를 위한 탐색을 했다.</p>
</blockquote>
<p>합친 데이터를 임시 뷰를 만들어 EDA 해보려 한다.</p>
<pre><code class="language-python">shop.createOrReplaceTempView(&quot;shop&quot;)</code></pre>
<h4 id="매출">매출</h4>
<ol>
<li>대분류의 카테고리 중 가장 많은 구매 카운트 상위 10건 출력</li>
</ol>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/2b5808a0-5400-43c3-9a8c-84f49d5cc9c8/image.png" alt=""></p>
<ul>
<li>전자제품, 가구, 컴퓨터 순으로 많았다.
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/a18df3fd-b784-45b5-a84a-d65fa26f64e7/image.png" alt=""></li>
</ul>
<ol start="2">
<li>가장 많이 팔린 품목인 전자 제품을 판 브랜드 중 상위 10개 브랜드는?</li>
</ol>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/cb640452-4166-42bd-a4c5-632789253ab1/image.png" alt=""></p>
<ul>
<li>삼성, 애플, 샤오미 순이었다.
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/cfc89212-d3af-4acf-a365-4334f4d9e81b/image.png" alt=""></li>
</ul>
<h4 id="전환율">전환율</h4>
<ol start="3">
<li>event_type 확인
view, cart, purchase로 보기, 카트 담기, 구매로 나눠져 있었다.</li>
</ol>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/138db5f6-779c-4db9-adb8-cbffa6961100/image.png" alt=""></p>
<ol start="4">
<li><p>쇼핑몰에서 뷰만 하던 이용자 중에 카트로 담은 비율 확인
4%로 그리 큰 비율이 아니었다.
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/0f3ddfb4-5431-4528-b2bd-dafc875df9a1/image.png" alt=""></p>
</li>
<li><p>카트로 담은 이용자 중 구매로 넘어가는 비율 확인
40%로 상대적으로 큰 비율이었다.
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/9a169b24-80f8-46fe-b391-c086bbd52958/image.png" alt=""></p>
</li>
<li><p>구매하는 경로인 <code>뷰&gt;카트&gt;구매</code>와 <code>뷰&gt; 구매</code> 확인
보기만 하고 바로 구매하는 경우도 있었지만 17%밖에 되지 않았고, 카트에 담은 후에 구매로 전환되는 비율은 41%로 상대적으로 더 컸다.
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/a4731a0d-e6e8-4f85-ba91-12dd2e783f21/image.png" alt=""></p>
</li>
</ol>
<h3 id="차트-시각화">차트 시각화</h3>
<p><code>view to cart</code>, <code>cart to purchase</code>, <code>view to purchase</code> 3가지 항목을 비율화하여 파이차트로 표시해 본 결과 카트에 담아 구매로 넘어가는 전환율이 압도적으로 높았다.</p>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/929a256b-dead-49d9-9410-c8b9b17855b7/image.png" alt=""></p>
<h3 id="추론">추론</h3>
<blockquote>
<p>사람들이 구매하는 과정에서 여러 제품을 많이 구경한다. 상품 비교를 많이 해보는 것으로 추정된다.
하지만 많은 사람들이 본다고 해서 카트 담기로 전환하는 비율은 크지 않다.
카트 담기로 전환된 제품들은 구매로 이어질 확률이 보다가 구매하는 것보다 크게 증가한다.</p>
</blockquote>
<blockquote>
<p>카트 담기에 대한 프로모션을 진행한다면 구매 유도하는데 더 도움이 되지 않을까 추측한다.
예) 카트 담기 시 할인 쿠폰을 제공 (기간 설정)</p>
</blockquote>
<h3 id="추후">추후</h3>
<blockquote>
<p>매출과 전환율을 관련지어 구매 예측 모델을 만들어 보려한다.</p>
</blockquote>
]]></description>
        </item>
        <item>
            <title><![CDATA[[7월프로젝트] 하둡 클러스터 사용하기- Spark]]></title>
            <link>https://velog.io/@jaekyu_lim/7%EC%9B%94%ED%94%84%EB%A1%9C%EC%A0%9D%ED%8A%B8-%ED%95%98%EB%91%A1-%ED%81%B4%EB%9F%AC%EC%8A%A4%ED%84%B0-%EC%82%AC%EC%9A%A9%ED%95%98%EA%B8%B0-Spark</link>
            <guid>https://velog.io/@jaekyu_lim/7%EC%9B%94%ED%94%84%EB%A1%9C%EC%A0%9D%ED%8A%B8-%ED%95%98%EB%91%A1-%ED%81%B4%EB%9F%AC%EC%8A%A4%ED%84%B0-%EC%82%AC%EC%9A%A9%ED%95%98%EA%B8%B0-Spark</guid>
            <pubDate>Sun, 27 Aug 2023 17:50:29 GMT</pubDate>
            <description><![CDATA[<h2 id="노트북-8대를-이용하여-클러스터를-구축">노트북 8대를 이용하여 클러스터를 구축</h2>
<blockquote>
<p><strong>openVPN</strong>을 통해 학원IP가 아니더라도 연결하여 작업을 수행할 수 있게 만들어 주었다.</p>
</blockquote>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/56febc6b-806f-43fd-80b8-723fa9bac019/image.png" alt=""></p>
<h2 id="spark-클러스터-모드로-데이터-전처리-하기">Spark 클러스터 모드로 데이터 전처리 하기</h2>
<pre><code class="language-python">from pyspark.sql import SparkSession

from pyspark.sql.functions import col
from pyspark.sql.functions import explode
from pyspark.sql.functions import count
from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import when, array_contains, coalesce, udf, StringType

df = spark.read.parquet(&quot;hdfs:/killv2_/&quot;)

df.count()</code></pre>
<p><code>df.printSchema()</code>을 확인해보니 아래와 같이 되어있었다.</p>
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/a59790f9-643d-476d-8591-8a7d579d5d26/image.png">




<p>총 322656건이 자료가 조회됐다.</p>
<pre><code class="language-python">df = df.withColumn(&quot;victim_weapon&quot;, regexp_replace(&quot;victim_weapon&quot;, r&quot;(?:^Weapon|^Weap)|_C.*$&quot;, &quot;&quot;))
df = df.withColumn(&quot;killer_weapon&quot;, regexp_replace(&quot;killer_weapon&quot;, r&quot;(?:^Weapon|^Weap)|_C.*$&quot;, &quot;&quot;))

df = df.withColumn(&quot;무기분류&quot;, 
                   when(df[&#39;killer_weapon&#39;].isin([&#39;SKS&#39;, &#39;SLR&#39;, &#39;미니 14&#39;, &#39;MK12&#39;, &#39;MK14&#39;, &#39;QBU&#39;, &#39;VSS&#39;]), &#39;DMR&#39;)
                   .when(df[&#39;killer_weapon&#39;].isin([&#39;AWM&#39;, &#39;KAR98K&#39;, &#39;링스 AMR&#39;, &#39;M24&#39;, &#39;모신 나강&#39;, &#39;WIN94&#39;]), &#39;SR&#39;)
                   .when(df[&#39;killer_weapon&#39;].isin([&#39;토미 건&#39;, &#39;PP-19 비존&#39;, &#39;마이크로 UZI&#39;, &#39;MP5K&#39;, &#39;MP9&#39;, &#39;P90&#39;, &#39;UMP45&#39;, &#39;벡터&#39;]), &#39;SMG&#39;)
                   .when(df[&#39;killer_weapon&#39;].isin([&#39;DP-28&#39;, &#39;M249&#39;, &#39;MG3&#39;]), &#39;LMG&#39;)
                   .when(df[&#39;killer_weapon&#39;].isin([&#39;S12K&#39;, &#39;DBS&#39;, &#39;012&#39;, &#39;S1897&#39;, &#39;S686&#39;, &#39;소드 오프&#39;]), &#39;SG&#39;)
                   .when(df[&#39;killer_weapon&#39;].isin([&#39;DEAGLE&#39;, &#39;P18C&#39;, &#39;P1911&#39;, &#39;P92&#39;, &#39;R1895&#39;, &#39;R45&#39;, &#39;스콜피온&#39;]), &#39;Pistol&#39;)
                   .when(df[&#39;killer_weapon&#39;].isin([&#39;석궁&#39;]), &#39;MISC&#39;)
                   .when(df[&#39;killer_weapon&#39;].isin([&#39;M416&#39;, &#39;G36C&#39;, &#39;ACE32&#39;, &#39;AKM&#39;, &#39;AUG&#39;, &#39;FAMAS&#39;, &#39;그로자&#39;, &#39;K2&#39;, &#39;M16A4&#39;, &#39;베릴 M762&#39;, &#39;MK47 뮤턴트&#39;, &#39;QBZ&#39;, &#39;SCAR-L&#39;]), &#39;AR&#39;)
                   .otherwise(&quot;Unknown&quot;)
                  )


 weapon_name_mapping = {
        &#39;SKS&#39;: &#39;SKS&#39;, &#39;FNFal&#39;: &#39;SLR&#39;, &#39;Mini14&#39;: &#39;미니14&#39;, &#39;Mk12&#39;: &#39;MK12&#39;, &#39;Mk14&#39;: &#39;MK14&#39;,
        &#39;QBU88&#39;: &#39;QBU&#39;, &#39;VSS&#39;: &#39;VSS&#39;, &#39;AWM&#39;: &#39;AWM&#39;, &#39;Kar98k&#39;: &#39;KAR98K&#39;, &#39;L6&#39;: &#39;링스 AMR&#39;,
        &#39;M24&#39;: &#39;M24&#39;, &#39;Mosin&#39;: &#39;모신 나강&#39;, &#39;Win1894&#39;: &#39;WIN94&#39;, &#39;Thompson&#39;: &#39;토미 건&#39;,
        &#39;BizonPP19&#39;: &#39;PP-19 비존&#39;, &#39;UZI&#39;: &#39;마이크로 UZI&#39;, &#39;MP5K&#39;: &#39;MP5K&#39;, &#39;MP9&#39;: &#39;MP9&#39;,
        &#39;P90&#39;: &#39;P90&#39;, &#39;UMP&#39;: &#39;UMP45&#39;, &#39;Vector&#39;: &#39;벡터&#39;, &#39;DP28&#39;: &#39;DP-28&#39;, &#39;M249&#39;: &#39;M249&#39;,
        &#39;MG3&#39;: &#39;MG3&#39;, &#39;Saiga12&#39;: &#39;S12K&#39;, &#39;DP12&#39;: &#39;DBS&#39;, &#39;OriginS12&#39;: &#39;O12&#39;,
        &#39;Winchester&#39;: &#39;S1897&#39;, &#39;Berreta686&#39;: &#39;S686&#39;, &#39;Sawnoff&#39;: &#39;소드 오프&#39;,
        &#39;DesertEagle&#39;: &#39;Deagle&#39;, &#39;G18&#39;: &#39;P18C&#39;, &#39;M1911&#39;: &#39;P1911&#39;, &#39;M9&#39;: &#39;P92&#39;,
        &#39;NagantM1895&#39;: &#39;R1895&#39;, &#39;Rhino&#39;: &#39;R45&#39;, &#39;vz61Skorpion&#39;: &#39;스콜피온&#39;,
        &#39;Crossbow_1&#39;: &#39;석궁&#39;, &#39;HK416&#39;: &#39;M416&#39;, &#39;G36C&#39;: &#39;G36C&#39;, &#39;ACE32&#39;: &#39;ACE32&#39;,
        &#39;AK47&#39;: &#39;AKM&#39;, &#39;AUG&#39;: &#39;AUG&#39;, &#39;Mk47Mutant&#39;: &#39;MK47 뮤턴트&#39;, &#39;FamasG2&#39;: &#39;FAMAS&#39;,
        &#39;G36C&#39;: &#39;G36C&#39;, &#39;K2&#39;: &#39;K2&#39;, &#39;M16A4&#39;: &#39;M16A4&#39;, &#39;BerylM762&#39;: &#39;베릴 M762&#39;,
        &#39;QBZ95&#39;: &#39;QBZ&#39;, &#39;SCAR-L&#39;: &#39;SCAR-L&#39;, &#39;Groza&#39;: &#39;그로자&#39;
    }

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def map_weapon(weapon):
    return weapon_mapping.get(weapon, weapon)

map_weapon_udf = udf(map_weapon, StringType())
df_exploded = df.withColumn(&quot;killer_weapon&quot;, map_weapon_udf(df[&quot;killer_weapon&quot;]))</code></pre>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/04369c35-8846-4ea4-a936-171c2a07f8f3/image.png" alt=""></p>
<blockquote>
<p>매치 데이터가 많아져 전처리를 한꺼번에 수행하려 하면 로컬에서 실행할 경우에 커널이 죽어버리는 현상이 발생했는데, 클러스터를 형성하여 파케이 파일들을 다 불러모아 실행하니 커널이 죽지 않고 전처리가 잘 됐다.</p>
</blockquote>
<ul>
<li>간혹 파케이 파일의 스키마 형식이 잘못되어 오류가 발생하는 경우가 있었다.
victim_weapon의 형태가 string이 기댓값인데, int32로 되어있는 경우가 있었기 때문이다.
그래서 아래와 같은 코드로 형태를 변형시켜서 적용하였다.</li>
</ul>
<pre><code class="language-python">from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
import re

# Spark 세션 생성
spark = SparkSession.builder.appName(&quot;WeaponPreprocessing&quot;).getOrCreate()

# 여러 개의 Parquet 파일을 동시에 로드
df = spark.read.parquet(&quot;hdfs:/killv2_/&quot;)

# victim_weapon 컬럼 데이터 타입 변환
df = df.withColumn(&quot;victim_weapon&quot;, col(&quot;victim_weapon&quot;).cast(&quot;string&quot;))

# killer_weapon 컬럼 값 전처리
df = df.withColumn(&quot;killer_weapon&quot;, regexp_replace(&quot;killer_weapon&quot;, r&quot;(?:^Weapon|^Weap)|_C.*$&quot;, &quot;&quot;))
df = df.withColumn(&quot;killer_weapon&quot;,
                   when(df[&#39;killer_weapon&#39;] == &#39;SKS&#39;, &#39;SKS&#39;)
                   .when(df[&#39;killer_weapon&#39;] == &#39;FNFal&#39;, &#39;SLR&#39;)
                   .when(df[&#39;killer_weapon&#39;] == &#39;Mini14&#39;, &#39;미니14&#39;)
                   .when(df[&#39;killer_weapon&#39;] == &#39;Mk12&#39;, &#39;MK12&#39;)
                   .when(df[&#39;killer_weapon&#39;] == &#39;Mk14&#39;, &#39;MK14&#39;)
                   # 다른 경우도 추가해주세요
                   .otherwise(df[&#39;killer_weapon&#39;])
                   )

# 무기분류 컬럼 생성
df = df.withColumn(&quot;무기분류&quot;, 
                   when(df[&#39;killer_weapon&#39;].isin([&#39;SKS&#39;, &#39;SLR&#39;, &#39;미니14&#39;, &#39;MK12&#39;, &#39;MK14&#39;]), &#39;DMR&#39;)
                   .when(df[&#39;killer_weapon&#39;].isin([&#39;AWM&#39;, &#39;KAR98K&#39;, &#39;링스 AMR&#39;, &#39;M24&#39;, &#39;모신 나강&#39;, &#39;WIN94&#39;]), &#39;SR&#39;)
                   .when(df[&#39;killer_weapon&#39;].isin([&#39;토미 건&#39;, &#39;PP-19 비존&#39;, &#39;마이크로 UZI&#39;, &#39;MP5K&#39;, &#39;MP9&#39;, &#39;P90&#39;, &#39;UMP45&#39;, &#39;벡터&#39;]), &#39;SMG&#39;)
                   .when(df[&#39;killer_weapon&#39;].isin([&#39;DP-28&#39;, &#39;M249&#39;, &#39;MG3&#39;]), &#39;LMG&#39;)
                   .when(df[&#39;killer_weapon&#39;].isin([&#39;S12K&#39;, &#39;DBS&#39;, &#39;012&#39;, &#39;S1897&#39;, &#39;S686&#39;, &#39;소드 오프&#39;]), &#39;SG&#39;)
                   .when(df[&#39;killer_weapon&#39;].isin([&#39;DEAGLE&#39;, &#39;P18C&#39;, &#39;P1911&#39;, &#39;P92&#39;, &#39;R1895&#39;, &#39;R45&#39;, &#39;스콜피온&#39;]), &#39;Pistol&#39;)
                   .when(df[&#39;killer_weapon&#39;].isin([&#39;석궁&#39;]), &#39;MISC&#39;)
                   .when(df[&#39;killer_weapon&#39;].isin([&#39;M416&#39;, &#39;G36C&#39;, &#39;ACE32&#39;, &#39;AKM&#39;, &#39;AUG&#39;, &#39;FAMAS&#39;, &#39;그로자&#39;, &#39;K2&#39;, &#39;M16A4&#39;, &#39;베릴 M762&#39;, &#39;MK47 뮤턴트&#39;, &#39;QBZ&#39;, &#39;SCAR-L&#39;]), &#39;AR&#39;)
                   .otherwise(&quot;Unknown&quot;)
                  )

# 변환 결과 확인
df.show()

# 이제 df를 사용하여 원하는 분석 작업을 수행할 수 있습니다.

# Spark 세션 종료
spark.stop()
</code></pre>
]]></description>
        </item>
        <item>
            <title><![CDATA[[7월프로젝트] 3. 데이터 분석하기(2)
]]></title>
            <link>https://velog.io/@jaekyu_lim/7%EC%9B%94%ED%94%84%EB%A1%9C%EC%A0%9D%ED%8A%B8-3.-%EB%8D%B0%EC%9D%B4%ED%84%B0-%EB%B6%84%EC%84%9D%ED%95%98%EA%B8%B02</link>
            <guid>https://velog.io/@jaekyu_lim/7%EC%9B%94%ED%94%84%EB%A1%9C%EC%A0%9D%ED%8A%B8-3.-%EB%8D%B0%EC%9D%B4%ED%84%B0-%EB%B6%84%EC%84%9D%ED%95%98%EA%B8%B02</guid>
            <pubDate>Sun, 27 Aug 2023 16:30:05 GMT</pubDate>
            <description><![CDATA[<h2 id="완성-화면">완성 화면</h2>
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/46d2a273-d248-4a3f-83b6-1c1a24e5c05b/image.png">
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/0b88e26d-23af-457c-95ca-eec65a6f1e0b/image.png">

<blockquote>
<p><strong>데이터 분석 과정</strong>
무기 상성과 무기 티어표</p>
</blockquote>
<h2 id="1-무기-상성">1. 무기 상성</h2>
<blockquote>
<p><strong>개인전</strong>의 경우 killer_weapon과 finisher_weapon이 같아 문제가 없었지만, <strong>다인전(듀오나 스쿼드)</strong> 의 경우 교전한 데이터인 killer_weapon과 마지막 한발만을 친(막타) finsiher_weapon에 차이가 있었다.
게임 내에서 교전 시에 승률을 알고 싶었기 때문에 마지막에 쏜 것이 아닌 교전을 하며 싸운 killer_weapon을 사용하고자 했다.
교전에 사용된 주무기들을 추출하여 교전 횟수가 일정 횟수 이상인 데이터만을 찾아 상대적 승률을 나타내고자 했다.
AI유저의 데이터는 반영하지 않았고, 동일 무기에 대해서도 반영하지 않았다.
주무기 만의 교전 결과만을 반영하였다.</p>
</blockquote>
<pre><code class="language-python"># 전체 대상 무기 추출
target = result_df[&#39;killer_weapon&#39;].unique()

result = {}
for weapon in target:
    # 필터링: total_count가 20 이상인 데이터만 선택합니다.
    filtered_df = result_df[(result_df[&#39;killer_weapon&#39;] == weapon) &amp; (result_df[&#39;total_count&#39;] &gt;= 20)].sort_values(&#39;win_rate&#39;, ascending=False)

    high_weapons = filtered_df.iloc[:3] if filtered_df.shape[0] &gt;= 3 else filtered_df
    low_weapons = filtered_df.iloc[-3:] if filtered_df.shape[0] &gt;= 3 else filtered_df

    result[weapon] = {
        &quot;highs&quot;: high_weapons,
        &quot;lows&quot;: low_weapons,
    }

result_list = []

for weapon in result.keys():
    result_list.append({
        &quot;weapon_name&quot;: weapon,
        **{f&quot;easy_weapon_{i+1}&quot;: row[&quot;victim_weapon&quot;] if row[&quot;victim_weapon&quot;] else None for i, row in result[weapon][&quot;highs&quot;].reset_index(drop=True).iterrows()},
        **{f&quot;easy_percent_{i+1}&quot;: row[&quot;win_rate&quot;] if row[&quot;win_rate&quot;] else None for i, row in result[weapon][&quot;highs&quot;].reset_index(drop=True).iterrows()},
        **{f&quot;hard_weapon_{i+1}&quot;: row[&quot;victim_weapon&quot;] if row[&quot;victim_weapon&quot;] else None for i, row in result[weapon][&quot;lows&quot;].reset_index(drop=True).iterrows()},
        **{f&quot;hard_percent_{i+1}&quot;: row[&quot;win_rate&quot;] if row[&quot;win_rate&quot;] else None for i, row in result[weapon][&quot;lows&quot;].reset_index(drop=True).iterrows()},
    })

final_df = pd.DataFrame(result_list)

final_df = final_df.fillna(0)</code></pre>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/cf02bc6d-8804-48df-a58f-bba1a97ea4a7/image.png" alt=""></p>
<ul>
<li>이 코드를 실행하여 적용하여 보니 문제가 생겼다. 슬라이싱을 활용해 승률로만 정리하다보니 교전한 무기가 3개 미만인 경우에 상대하기 어려운 무기와 쉬운 무기에 똑같은 무기가 들어가는 경우가 발생했다.<img src="https://velog.velcdn.com/images/jaekyu_lim/post/08677c16-ae17-4e1d-b191-a5cba633f010/image.png">

</li>
</ul>
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/1638ab90-c90e-4e50-942c-614d0e61671c/image.png">

<img src="https://velog.velcdn.com/images/jaekyu_lim/post/fb91b3ef-aec1-4c84-aaff-b6bbcc4d33df/image.png">

<img src="https://velog.velcdn.com/images/jaekyu_lim/post/09d71d69-e584-46e9-9477-a92a9042bde5/image.png">


<blockquote>
<p><strong>수정된 코드</strong>
코드를 승률 50퍼 기준으로 아예 다르게 변수에 저장하였다.</p>
</blockquote>
<pre><code class="language-python"># 전체 경기 수를 계산합니다.
result_df[&#39;total_count&#39;] = result_df[&#39;count&#39;] + result_df[&#39;reverse_count&#39;]

# 승률을 계산합니다. (승리 횟수 / 전체 경기 횟수)
result_df[&#39;win_rate&#39;] = result_df[&#39;count&#39;] * 100 / result_df[&#39;total_count&#39;]

# None 값과 무기가 서로 같은 경우를 제거합니다.
result_df = result_df[(result_df[&#39;victim_weapon&#39;].notnull()) &amp; (result_df[&#39;killer_weapon&#39;] != result_df[&#39;victim_weapon&#39;])]

# 전체 대상 무기 추출
target = result_df[&#39;killer_weapon&#39;].unique()

result = {}
for weapon in target:
    filtered_df = result_df[(result_df[&#39;killer_weapon&#39;] == weapon) &amp; (result_df[&#39;total_count&#39;] &gt;= 20)]
    filtered_df[&quot;high&quot;] = filtered_df[&quot;win_rate&quot;] &gt;= 50  # 승률 50%를 기준으로 새로운 &#39;high&#39; 컬럼에 True 또는 False 값 저장합니다.

    high_weapons = filtered_df[filtered_df[&quot;high&quot;]].sort_values(&#39;win_rate&#39;, ascending=False)[:3]
    low_weapons = filtered_df[~filtered_df[&quot;high&quot;]].sort_values(&#39;win_rate&#39;, ascending=False)[-3:]

    high_weapons.drop(&#39;high&#39;, axis=1, inplace=True)
    low_weapons.drop(&#39;high&#39;, axis=1, inplace=True)

    result[weapon] = {
        &quot;highs&quot;: high_weapons,
        &quot;lows&quot;: low_weapons,
    }


result_list = []

for weapon in result.keys():
    highs = result[weapon][&quot;highs&quot;].reset_index(drop=True) if result[weapon][&quot;highs&quot;] is not None else pd.DataFrame(columns=[&#39;victim_weapon&#39;, &#39;win_rate&#39;])
    lows = result[weapon][&quot;lows&quot;].reset_index(drop=True) if result[weapon][&quot;lows&quot;] is not None else pd.DataFrame(columns=[&#39;victim_weapon&#39;, &#39;win_rate&#39;])

    result_list.append({
        &quot;weapon_name&quot;: weapon,
        **{f&quot;easy_weapon_{i + 1}&quot;: row[&quot;victim_weapon&quot;] if not pd.isna(row[&quot;victim_weapon&quot;]) else None for i, row in highs.iterrows()},
        **{f&quot;easy_percent_{i + 1}&quot;: row[&quot;win_rate&quot;] if not pd.isna(row[&quot;win_rate&quot;]) else None for i, row in highs.iterrows()},
        **{f&quot;hard_weapon_{i + 1}&quot;: row[&quot;victim_weapon&quot;] if not pd.isna(row[&quot;victim_weapon&quot;]) else None for i, row in lows.iterrows()},
        **{f&quot;hard_percent_{i + 1}&quot;: row[&quot;win_rate&quot;] if not pd.isna(row[&quot;win_rate&quot;]) else None for i, row in lows.iterrows()},
    })

final_df = pd.DataFrame(result_list)
final_df = final_df.fillna(0)
</code></pre>
<ul>
<li>결과값<blockquote>
<p>중복데이터도 사라지고, 승률이 높아야 할 easy_percent가 50 아래인 경우도 없어졌다.</p>
</blockquote>
</li>
</ul>
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/6e716200-fa7c-43bc-9a36-7abf2b12802a/image.png">



<blockquote>
<p><strong>최종 dag코드</strong>
read_data_task : 이전 포스트에서 저장했던 여러 매치 데이터에서 파싱한 무기데이터 불러오기
process_data_task : 무기 상성 및 무기 티어 데이터 처리
update_database_task : 처리한 데이터를 DB에 최신화</p>
</blockquote>
<pre><code class="language-python">from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import os
from google.cloud import storage
import numpy as np
import pandas as pd
from io import BytesIO
import io
import pyarrow
# slack_notifications.py
from slack_notifications import SlackAlert
from airflow.models import Variable
import pymysql
from config import DB_CONFIG

KEY_PATH = &quot;./playdata-2-1e60a2f219de.json&quot;
os.environ[&quot;GOOGLE_APPLICATION_CREDENTIALS&quot;] = KEY_PATH

slack_api_token = Variable.get(&quot;slack_api_token&quot;)
alert = SlackAlert(&#39;#message&#39;, slack_api_token) # 메세지를 보낼 슬랙 채널명을 파라미터로 넣어줍니다.


dag = DAG(
    dag_id=&quot;load_weapon_data&quot;,
    description=&quot;무기_분석&quot;,
    start_date=datetime(2023, 7, 1, 0, 0),
    schedule_interval=&#39;0 16 * * *&#39;,
    on_success_callback=alert.success_msg,
    on_failure_callback=alert.fail_msg
)

def _read_data_from_gcp_storage(**kwargs):
    bucket_name = &quot;playdata2&quot;
    file_path = &quot;logs_weapon/&quot;
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)

    parquet_data = []
    blobs = bucket.list_blobs(prefix=file_path)

    for blob in blobs:

        # parquet 형식의 파일인지 확인
        if blob.name.endswith(&quot;.parquet&quot;):
            # 객체를 바이트 스트림으로 다운로드
            byte_stream = io.BytesIO(blob.download_as_bytes())

            # parquet 데이터를 pandas DataFrame으로 읽기
            df = pd.read_parquet(byte_stream)
            parquet_data.append(df)


    # 개별 DataFrame들을 하나의 DataFrame으로 합치기
    concat_data = pd.concat(parquet_data, axis=0, ignore_index=True)
    kwargs[&#39;ti&#39;].xcom_push(key=&#39;parquet_data&#39;, value=concat_data)


def _process_weapon_data(**kwargs):
    # GCP Storage에서 데이터 읽어오기
    parquet_data = kwargs[&#39;ti&#39;].xcom_pull(key=&#39;parquet_data&#39;)
    kv3 = pd.concat(parquet_data, axis=0, ignore_index=True)
    result_df = kv3.groupby([&#39;killer_weapon&#39;, &#39;victim_weapon&#39;]).size().reset_index(name=&#39;count&#39;)
    reverse_combinations = result_df.rename(columns={&#39;killer_weapon&#39;: &#39;victim_weapon&#39;, &#39;victim_weapon&#39;: &#39;killer_weapon&#39;, &#39;count&#39;: &#39;reverse_count&#39;})
    result_df = result_df.merge(reverse_combinations, on=[&#39;killer_weapon&#39;, &#39;victim_weapon&#39;], how=&#39;outer&#39;)
    result_df.fillna(0, inplace=True)

    result_df[&#39;total_count&#39;] = result_df[&#39;count&#39;] + result_df[&#39;reverse_count&#39;]
    result_df[&#39;win_rate&#39;] = result_df[&#39;count&#39;] * 100 / result_df[&#39;total_count&#39;]
    result_df = result_df[(result_df[&#39;victim_weapon&#39;].notnull()) &amp; (result_df[&#39;killer_weapon&#39;] != result_df[&#39;victim_weapon&#39;])]

    target = result_df[&#39;killer_weapon&#39;].unique()
    result = {}

    for weapon in target:
        filtered_df = result_df[(result_df[&#39;killer_weapon&#39;] == weapon) &amp; (result_df[&#39;total_count&#39;] &gt;= 20)].sort_values(&#39;win_rate&#39;, ascending=False)
        high_weapons = filtered_df.iloc[:3] if filtered_df.shape[0] &gt;= 3 else filtered_df
        low_weapons = filtered_df.iloc[-3:] if filtered_df.shape[0] &gt;= 3 else filtered_df

        result[weapon] = {
            &quot;highs&quot;: high_weapons,
            &quot;lows&quot;: low_weapons,
        }

    result_list = []

    for weapon in result.keys():
        result_list.append({
            &quot;weapon_name&quot;: weapon,
            **{f&quot;easy_weapon_{i + 1}&quot;: row[&quot;victim_weapon&quot;] if row[&quot;victim_weapon&quot;] else None for i, row in result[weapon][&quot;highs&quot;].reset_index(drop=True).iterrows()},
            **{f&quot;easy_percent_{i + 1}&quot;: row[&quot;win_rate&quot;] if row[&quot;win_rate&quot;] else None for i, row in result[weapon][&quot;highs&quot;].reset_index(drop=True).iterrows()},
            **{f&quot;hard_weapon_{i + 1}&quot;: row[&quot;victim_weapon&quot;] if row[&quot;victim_weapon&quot;] else None for i, row in result[weapon][&quot;lows&quot;].reset_index(drop=True).iterrows()},
            **{f&quot;hard_percent_{i + 1}&quot;: row[&quot;win_rate&quot;] if row[&quot;win_rate&quot;] else None for i, row in result[weapon][&quot;lows&quot;].reset_index(drop=True).iterrows()},
        })

    final_df = pd.DataFrame(result_list)
    final_df = final_df.fillna(0)

    games_threshold = 30
    weapon_summary2 = result_df.groupby(&#39;killer_weapon&#39;).agg({&#39;total_count&#39;: &#39;sum&#39;, &#39;count&#39;: &#39;sum&#39;}).reset_index()
    weapon_summary2[&#39;win_rate&#39;] = weapon_summary2[&#39;count&#39;] * 100 / weapon_summary2[&#39;total_count&#39;]
    valid_weapons = weapon_summary2[weapon_summary2[&#39;total_count&#39;] &gt;= games_threshold]

    count_weight = 0.2
    win_rate_weight = 0.8
    weapon_summary2[&#39;score&#39;] = (weapon_summary2[&#39;total_count&#39;] * count_weight) + (weapon_summary2[&#39;win_rate&#39;] * win_rate_weight)

    quantiles = weapon_summary2[&#39;score&#39;].quantile([.9, .8, .5, .3, 0]).values
    tier_1 = weapon_summary2[weapon_summary2[&#39;score&#39;] &gt;= quantiles[0]]
    tier_2 = weapon_summary2[(weapon_summary2[&#39;score&#39;] &gt;= quantiles[1]) &amp; (weapon_summary2[&#39;score&#39;] &lt; quantiles[0])]
    tier_3 = weapon_summary2[(weapon_summary2[&#39;score&#39;] &gt;= quantiles[2]) &amp; (weapon_summary2[&#39;score&#39;] &lt; quantiles[1])]
    tier_4 = weapon_summary2[(weapon_summary2[&#39;score&#39;] &gt;= quantiles[3]) &amp; (weapon_summary2[&#39;score&#39;] &lt; quantiles[2])]
    tier_5 = weapon_summary2[weapon_summary2[&#39;score&#39;] &lt; quantiles[3]]

    tier_1_sorted = tier_1.sort_values(&#39;score&#39;, ascending=True)[&#39;killer_weapon&#39;].tolist()
    tier_2_sorted = tier_2.sort_values(&#39;score&#39;, ascending=True)[&#39;killer_weapon&#39;].tolist()
    tier_3_sorted = tier_3.sort_values(&#39;score&#39;, ascending=True)[&#39;killer_weapon&#39;].tolist()
    tier_4_sorted = tier_4.sort_values(&#39;score&#39;, ascending=True)[&#39;killer_weapon&#39;].tolist()
    tier_5_sorted = tier_5.sort_values(&#39;score&#39;, ascending=True)[&#39;killer_weapon&#39;].tolist()

    for idx, row in weapon_summary2.iterrows():
        if row[&#39;killer_weapon&#39;] in tier_1_sorted:
            weapon_summary2.loc[idx, &#39;tier&#39;] = 1
        elif row[&#39;killer_weapon&#39;] in tier_2_sorted:
            weapon_summary2.loc[idx, &#39;tier&#39;] = 2
        elif row[&#39;killer_weapon&#39;] in tier_3_sorted:
            weapon_summary2.loc[idx, &#39;tier&#39;] = 3
        elif row[&#39;killer_weapon&#39;] in tier_4_sorted:
            weapon_summary2.loc[idx, &#39;tier&#39;] = 4
        else:
            weapon_summary2.loc[idx, &#39;tier&#39;] = 5

    weapon_summary2[&#39;tier&#39;] = weapon_summary2[&#39;tier&#39;].astype(int)

    final_df = final_df.merge(weapon_summary2[[&#39;killer_weapon&#39;, &#39;tier&#39;]], left_on=&#39;weapon_name&#39;, right_on=&#39;killer_weapon&#39;, how=&#39;left&#39;).drop(columns=[&#39;killer_weapon&#39;])

    output_filename = &quot;result.csv&quot;
    final_df.to_csv(output_filename, index=False)

    return final_df

def _update_database(final_df):
    conn = pymysql.connect(**DB_CONFIG)
    cur = conn.cursor()

    for index, row in final_df.iterrows():
        weapon_name = row[&#39;weapon_name&#39;]
        easy_weapon_1 = row[&#39;easy_weapon_1&#39;]
        easy_weapon_2 = row[&#39;easy_weapon_2&#39;]
        easy_weapon_3 = row[&#39;easy_weapon_3&#39;]
        easy_percent_1 = row[&#39;easy_percent_1&#39;]
        easy_percent_2 = row[&#39;easy_percent_2&#39;]
        easy_percent_3 = row[&#39;easy_percent_3&#39;]
        hard_weapon_1 = row[&#39;hard_weapon_1&#39;]
        hard_weapon_2 = row[&#39;hard_weapon_2&#39;]
        hard_weapon_3 = row[&#39;hard_weapon_3&#39;]
        hard_percent_1 = row[&#39;hard_percent_1&#39;]
        hard_percent_2 = row[&#39;hard_percent_2&#39;]
        hard_percent_3 = row[&#39;hard_percent_3&#39;]
        weapon_tier = row[&#39;tier&#39;]

    update_query = &quot;&quot;&quot;
        UPDATE services_weapons SET weapon_tier = %s,
                            first_easy_weapon = %s,
                            first_easy_percent = %s,
                            second_easy_weapon = %s,
                            second_easy_percent = %s,
                            third_easy_weapon = %s,
                            third_easy_percent = %s,
                            first_hard_weapon = %s,
                            first_hard_percent = %s,
                            second_hard_weapon = %s,
                            second_hard_percent= %s,
                            third_hard_weapon = %s,
                            third_hard_percent = %s WHERE weapon_name = %s

        &quot;&quot;&quot;
    cur.execute(update_query, (weapon_tier, easy_weapon_1, easy_percent_1, easy_weapon_2, easy_percent_2, easy_weapon_3, easy_percent_3,
                                hard_weapon_1, hard_percent_1, hard_weapon_2, hard_percent_2, hard_weapon_3, hard_percent_3,
                                weapon_name))

    conn.commit()    


read_data_task = PythonOperator(
    task_id=&#39;read_data&#39;,
    python_callable=_read_data_from_gcp_storage,
    provide_context=True
)

process_data_task = PythonOperator(
    task_id=&#39;process_data&#39;,
    python_callable=_process_weapon_data,
    provide_context=True
)

update_database_task = PythonOperator(
    task_id=&#39;update_database&#39;,
    python_callable=_update_database,
    provide_context=True
)

read_data_task &gt;&gt; process_data_task &gt;&gt; update_database_task
</code></pre>
<blockquote>
<p><strong>회고</strong>
배틀그라운드라는 게임을 많이 해보지 않았지만, 이번 프로젝트를 수행하며 배틀그라운드 게임도 많이 해보고, 특히 무기 관련해서 많이 보게 되었다.
처음에는 어느 총이 좋은 총인지도 몰라 해메었지만, 이번 분석을 통해 어느 정도 티어가 확립되고 나니 나에게 맞는 총, 그 총에 대한 정보를 알 수 있었다.
<strong>아쉬운점</strong>
아쉬운 점이 있다면, 주무기만을 한정해서 만든게 좀 아쉬웠다. 데이터를 파싱하는 데에 시간이 많이 들어 범위를 축소해서 분석을 수행했었다. 시간이 더 있었다면 다른 종류에 무기들에 대해서도 조사해보고 싶다. ( 투척무기, 근접무기, 권총 등)</p>
</blockquote>
]]></description>
        </item>
        <item>
            <title><![CDATA[[7월프로젝트] 2. 데이터 분석하기
]]></title>
            <link>https://velog.io/@jaekyu_lim/7%EC%9B%94%ED%94%84%EB%A1%9C%EC%A0%9D%ED%8A%B8-2.-%EB%8D%B0%EC%9D%B4%ED%84%B0-%EB%B6%84%EC%84%9D%ED%95%98%EA%B8%B0</link>
            <guid>https://velog.io/@jaekyu_lim/7%EC%9B%94%ED%94%84%EB%A1%9C%EC%A0%9D%ED%8A%B8-2.-%EB%8D%B0%EC%9D%B4%ED%84%B0-%EB%B6%84%EC%84%9D%ED%95%98%EA%B8%B0</guid>
            <pubDate>Sun, 27 Aug 2023 15:53:08 GMT</pubDate>
            <description><![CDATA[<h2 id="무기-데이터-분석하기">무기 데이터 분석하기</h2>
<blockquote>
<p>나는 이 프로젝트에서 <strong>무기 데이터</strong> 부분을 맡았다. RAW DATA에서 무기 데이터 관련만 파싱하는 코드, 그 데이터를 가지고 <strong>무기 티어, 무기 별 상성</strong>을 최신화 하는 airflow dag 코드를 작성했다.</p>
</blockquote>
<h2 id="완성-화면">완성 화면</h2>
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/46d2a273-d248-4a3f-83b6-1c1a24e5c05b/image.png">
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/0b88e26d-23af-457c-95ca-eec65a6f1e0b/image.png">

<h2 id="과정">과정</h2>
<blockquote>
<p>앞서서 유저 정보 페이지에서 유저가 가장 많이 사용하는 숙련도 top3을 보여주어 자신이 가장 잘 다룰 수 있는 무기를 보여주었다.
무기들 중 주무기(소총, 저격총 등등)을 대상으로 자기가 잘 다루는 무기가 어떤 무기에는 유리하고, 또 어떤 무기에는 불리한 지 알고 싶어졌다.
1주간에 매치 데이터를 모아 이 데이터를 기반으로 무기별 상성에 대해 알아보고자 했다.
또 이를 보기 쉽게 티어표로 나타내고자 하였다.</p>
</blockquote>
<h3 id="데이터-확인-후-작업-과정">데이터 확인 후 작업 과정</h3>
<blockquote>
<p>여러개의 매치 데이터가 모아 파케이 파일 형태로 저장하였다.
그 저장된 데이터를 불러모아 칼럼들을 살펴보았다.</p>
</blockquote>
<ul>
<li>로그데이터에 _T 칼럼에서 LogPlayerKillV2인 것에서 주요 칼럼으로
victim_weapon, killer_weapon, finisher_weapon을 찾았다.
개인전의 경우 killer_weapon과 finisher_weapon이 같아 문제가 없었지만, 듀오나 스쿼드의 경우 교전한 데이터인 killer_weapon과 마지막 한발만을 친(막타) finsiher_weapon에 차이가 있었다. 무기 상성에서는 victim_weapon과 killer_weapon을 사용하기로 했다.</li>
</ul>
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/cf044398-36b5-4752-b83d-9c477f0698fa/image.png">
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/500ed96a-f5e5-4baa-822d-206008838676/image.png">

<ul>
<li>로그 데이터에서 아까 가져오기로 한 데이터들만을 가져오는 코드를 만들었다.
ai를 제외하고 유저의 데이터만을 가져왔다.</li>
</ul>
<pre><code class="language-python">def _process_logs_weapon(combined_data):
tmp = [data for data2 in combined_data[&#39;logs&#39;] for data in data2 if &#39;_T&#39; in data and data[&#39;_T&#39;] == &#39;LogPlayerKillV2&#39;]

kv2 = []

for log in tmp:

    # 무기 관련 데이터 파싱
    if log[&#39;_T&#39;] == &#39;LogPlayerKillV2&#39;:
        try :
            if &#39;ai&#39; not in log[&#39;victim&#39;][&#39;accountId&#39;] and &#39;ai&#39; not in log[&#39;killer&#39;][&#39;accountId&#39;] and &#39;ai&#39; not in log[&#39;finisher&#39;][&#39;accountId&#39;] :
                try :
                    v2row = {&#39;victim_weapon&#39; : None if len(log[&#39;victimWeapon&#39;]) == 0 else log[&#39;victimWeapon&#39;],
                            &#39;victim_account_id&#39; :log[&#39;victim&#39;][&#39;accountId&#39;],
                            &#39;victim_parts&#39; : None if len(log[&quot;victimWeaponAdditionalInfo&quot;]) == 0 else log[&quot;victimWeaponAdditionalInfo&quot;],
                            &#39;killer_weapon&#39; : log[&#39;killerDamageInfo&#39;][&#39;damageCauserName&#39;],
                            &#39;killer_account_id&#39; : log[&#39;killer&#39;][&#39;accountId&#39;],
                            &#39;killer_parts&#39; : None if len(log[&#39;killerDamageInfo&#39;][&#39;additionalInfo&#39;]) == 0 else log[&#39;killerDamageInfo&#39;][&#39;additionalInfo&#39;],
                            &#39;killer_distance&#39; :log[&#39;killerDamageInfo&#39;][&#39;distance&#39;],
                            &#39;finisher_weapon&#39; : log[&#39;finishDamageInfo&#39;][&#39;damageCauserName&#39;],
                            &#39;finisher_account_id&#39; : log[&#39;finisher&#39;][&#39;accountId&#39;],
                            &#39;finisher_parts&#39; : None if len(log[&#39;finishDamageInfo&#39;][&#39;additionalInfo&#39;]) == 0 else log[&#39;finishDamageInfo&#39;][&#39;additionalInfo&#39;],
                            &#39;finisher_distance&#39; : log[&#39;finishDamageInfo&#39;][&#39;distance&#39;],
                            }
                except :
                    v2row = {&#39;victim_weapon&#39; : None,
                            &#39;victim_account_id&#39; : None,
                            &#39;victim_parts&#39; : None,
                            &#39;killer_weapon&#39; : None,
                            &#39;killer_account_id&#39; : None,
                            &#39;killer_parts&#39; : None,
                            &#39;killer_distance&#39; : None,
                            &#39;finisher_weapon&#39; : None,
                            &#39;finisher_account_id&#39; : None,
                            &#39;finisher_parts&#39; : None,
                            &#39;finisher_distance&#39; : None,
                            }
            kv2.append(v2row)
        except :
            pass

kv3 = pd.DataFrame(kv2)
return kv3

</code></pre>
<ul>
<li>결과값<blockquote>
<p>Weapon data에 전처리가 필요해보였다.
라이선스 만료로 인해 변수명이 실제 게임 내에 사용되는 무기명과 다른 경우가 있었고,
&#39;weap&#39; &#39;<em>C</em>2&#39;등의 파츠 구분으로 인해 같은 총기 구분을 해줘야 할 필요성을 느꼈다.</p>
</blockquote>
</li>
</ul>
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/2a68ef9a-9bfe-4339-8692-e1955316e286/image.png">

<ul>
<li><p>전처리 코드</p>
<pre><code class="language-python">  def remove_part(text):
      return re.sub(r&#39;(?:^Weapon|^Weap)|_C.*$&#39;, &#39;&#39;, text)

  kv3[&#39;victim_weapon&#39;] = kv3[&#39;victim_weapon&#39;].astype(str)
  kv3[&#39;victim_weapon&#39;] = kv3[&#39;victim_weapon&#39;].apply(remove_part)
  kv3[&#39;killer_weapon&#39;] = kv3[&#39;killer_weapon&#39;].astype(str)
  kv3[&#39;killer_weapon&#39;] = kv3[&#39;killer_weapon&#39;].apply(remove_part)
  kv3[&#39;finisher_weapon&#39;] = kv3[&#39;finisher_weapon&#39;].astype(str)
  kv3[&#39;finisher_weapon&#39;] = kv3[&#39;finisher_weapon&#39;].apply(remove_part)

  weapon_name_mapping = {
      &#39;SKS&#39;: &#39;SKS&#39;, &#39;FNFal&#39;: &#39;SLR&#39;, &#39;Mini14&#39;: &#39;미니14&#39;, &#39;Mk12&#39;: &#39;MK12&#39;, &#39;Mk14&#39;: &#39;MK14&#39;,
      &#39;QBU88&#39;: &#39;QBU&#39;, &#39;VSS&#39;: &#39;VSS&#39;, &#39;AWM&#39;: &#39;AWM&#39;, &#39;Kar98k&#39;: &#39;KAR98K&#39;, &#39;L6&#39;: &#39;링스 AMR&#39;,
      &#39;M24&#39;: &#39;M24&#39;, &#39;Mosin&#39;: &#39;모신 나강&#39;, &#39;Win1894&#39;: &#39;WIN94&#39;, &#39;Thompson&#39;: &#39;토미 건&#39;,
      &#39;BizonPP19&#39;: &#39;PP-19 비존&#39;, &#39;UZI&#39;: &#39;마이크로 UZI&#39;, &#39;MP5K&#39;: &#39;MP5K&#39;, &#39;MP9&#39;: &#39;MP9&#39;,
      &#39;P90&#39;: &#39;P90&#39;, &#39;UMP&#39;: &#39;UMP45&#39;, &#39;Vector&#39;: &#39;벡터&#39;, &#39;DP28&#39;: &#39;DP-28&#39;, &#39;M249&#39;: &#39;M249&#39;,
      &#39;MG3&#39;: &#39;MG3&#39;, &#39;Saiga12&#39;: &#39;S12K&#39;, &#39;DP12&#39;: &#39;DBS&#39;, &#39;OriginS12&#39;: &#39;O12&#39;,
      &#39;Winchester&#39;: &#39;S1897&#39;, &#39;Berreta686&#39;: &#39;S686&#39;, &#39;Sawnoff&#39;: &#39;소드 오프&#39;,
      &#39;DesertEagle&#39;: &#39;Deagle&#39;, &#39;G18&#39;: &#39;P18C&#39;, &#39;M1911&#39;: &#39;P1911&#39;, &#39;M9&#39;: &#39;P92&#39;,
      &#39;NagantM1895&#39;: &#39;R1895&#39;, &#39;Rhino&#39;: &#39;R45&#39;, &#39;vz61Skorpion&#39;: &#39;스콜피온&#39;,
      &#39;Crossbow_1&#39;: &#39;석궁&#39;, &#39;HK416&#39;: &#39;M416&#39;, &#39;G36C&#39;: &#39;G36C&#39;, &#39;ACE32&#39;: &#39;ACE32&#39;,
      &#39;AK47&#39;: &#39;AKM&#39;, &#39;AUG&#39;: &#39;AUG&#39;, &#39;Mk47Mutant&#39;: &#39;MK47 뮤턴트&#39;, &#39;FamasG2&#39;: &#39;FAMAS&#39;,
      &#39;G36C&#39;: &#39;G36C&#39;, &#39;K2&#39;: &#39;K2&#39;, &#39;M16A4&#39;: &#39;M16A4&#39;, &#39;BerylM762&#39;: &#39;베릴 M762&#39;,
      &#39;QBZ95&#39;: &#39;QBZ&#39;, &#39;SCAR-L&#39;: &#39;SCAR-L&#39;, &#39;Groza&#39;: &#39;그로자&#39;
  }

  # 무기 이름 변환
  kv3[&#39;victim_weapon&#39;] = kv3[&#39;victim_weapon&#39;].map(weapon_name_mapping)
  kv3[&#39;killer_weapon&#39;] = kv3[&#39;killer_weapon&#39;].map(weapon_name_mapping)
  kv3[&#39;finisher_weapon&#39;] = kv3[&#39;finisher_weapon&#39;].map(weapon_name_mapping)

  # 무기 분류를 위한 조건과 값 설정
  conditions = [
      kv3[&#39;killer_weapon&#39;].isin([&#39;SKS&#39;, &#39;SLR&#39;, &#39;미니14&#39;, &#39;MK12&#39;, &#39;MK14&#39;, &#39;QBU&#39;, &#39;VSS&#39;]),
      kv3[&#39;killer_weapon&#39;].isin([&#39;AWM&#39;, &#39;KAR98K&#39;, &#39;링스 AMR&#39;, &#39;M24&#39;, &#39;모신 나강&#39;, &#39;WIN94&#39;]),
      kv3[&#39;killer_weapon&#39;].isin([&#39;토미 건&#39;, &#39;PP-19 비존&#39;, &#39;마이크로 UZI&#39;, &#39;MP5K&#39;, &#39;MP9&#39;, &#39;P90&#39;, &#39;UMP45&#39;, &#39;벡터&#39;]),
      kv3[&#39;killer_weapon&#39;].isin([&#39;DP-28&#39;, &#39;M249&#39;, &#39;MG3&#39;]),
      kv3[&#39;killer_weapon&#39;].isin([&#39;S12K&#39;, &#39;DBS&#39;, &#39;O12&#39;, &#39;S1897&#39;, &#39;S686&#39;, &#39;소드 오프&#39;]),
      kv3[&#39;killer_weapon&#39;].isin([&#39;Deagle&#39;, &#39;P18C&#39;, &#39;P1911&#39;, &#39;P92&#39;, &#39;R1895&#39;, &#39;R45&#39;, &#39;스콜피온&#39;]),
      kv3[&#39;killer_weapon&#39;].isin([&#39;석궁&#39;]),
      kv3[&#39;killer_weapon&#39;].isin([&#39;M416&#39;, &#39;G36C&#39;, &#39;ACE32&#39;, &#39;AKM&#39;, &#39;AUG&#39;, &#39;FAMAS&#39;, &#39;그로자&#39;, &#39;K2&#39;, &#39;M16A4&#39;, &#39;베릴 M762&#39;, &#39;MK47 뮤턴트&#39;, &#39;QBZ&#39;, &#39;SCAR-L&#39;])
  ]

  values = [&#39;DMR&#39;, &#39;SR&#39;, &#39;SMG&#39;, &#39;LMG&#39;, &#39;SG&#39;, &#39;PISTOL&#39;, &#39;MISC&#39;, &#39;AR&#39;]

  kv3[&#39;weapon_classification&#39;] = np.select(conditions, values)
  kv3.reset_index(drop=True, inplace=True)</code></pre>
</li>
<li><p>전처리 후 모습</p>
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/728396e7-1748-458f-b75b-b8632a50123d/image.png">

</li>
</ul>
<blockquote>
<p><strong>dag코드</strong> 
read_data_task : Raw 데이터 불러오기
process_logs_weapon_task : 필요한 무기 데이터 파싱 및 전처리
upload_data_task : 전처리 한 결과를 저장
dag가 잘도는 지 확인하기 위해 slack을 통해 알림을 받았다. <a href="https://velog.io/@jaekyu_lim/airflow-%EC%98%A4%EB%A5%98%EB%A9%94%EC%8B%9C%EC%A7%80-%EB%B3%B4%EB%82%B4%EA%B8%B0-slack">슬랙알림보내기</a> &lt;&lt; 링크 확인</p>
</blockquote>
<pre><code class="language-python">from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import os
from google.cloud import storage
import re
import numpy as np
import pandas as pd
import json
from io import BytesIO
import io
import pyarrow
# slack_notifications.py
from slack_notifications import SlackAlert
from airflow.models import Variable

KEY_PATH = &quot;./playdata-2-1e60a2f219de.json&quot;
os.environ[&quot;GOOGLE_APPLICATION_CREDENTIALS&quot;] = KEY_PATH

slack_api_token = Variable.get(&quot;slack_api_token&quot;)
alert = SlackAlert(&#39;#message&#39;, slack_api_token) # 메세지를 보낼 슬랙 채널명을 파라미터로 넣어줍니다.


dag = DAG(
    dag_id=&quot;parsing_weapon_data&quot;,
    description=&quot;무기_분석&quot;,
    start_date=datetime(2023, 7, 2, 0, 0),
    schedule_interval=&#39;0 14 * * *&#39;,
    on_success_callback=alert.success_msg,
    on_failure_callback=alert.fail_msg
)

def _read_data_from_gcp_storage():
    bucket_name = &quot;playdata2&quot;
    file_path = &quot;&quot;
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)

    parquet_data = []
    blobs = bucket.list_blobs(prefix=file_path)

    for blob in blobs:

        # parquet 형식의 파일인지 확인
        if blob.name.endswith(&quot;.parquet&quot;):
            # 객체를 바이트 스트림으로 다운로드
            byte_stream = io.BytesIO(blob.download_as_bytes())

            # parquet 데이터를 pandas DataFrame으로 읽기
            df = pd.read_parquet(byte_stream)
            parquet_data.append(df)


    # 개별 DataFrame들을 하나의 DataFrame으로 합치기
    concat_data = pd.concat(parquet_data, axis=0, ignore_index=True)
    return concat_data



def _process_logs_weapon(**kwargs):
    ti = kwargs[&#39;ti&#39;]
    concat_data = ti.xcom_pull(task_ids=&#39;read_data&#39;)

    tmp = [data for data2 in concat_data[&#39;logs&#39;] for data in data2 if &#39;_T&#39; in data and data[&#39;_T&#39;] == &#39;LogPlayerKillV2&#39;]

    kv2 = []

    for log in tmp:

        # 무기 관련 데이터 파싱
        if log[&#39;_T&#39;] == &#39;LogPlayerKillV2&#39;:
            try :
                if &#39;ai&#39; not in log[&#39;victim&#39;][&#39;accountId&#39;] and &#39;ai&#39; not in log[&#39;killer&#39;][&#39;accountId&#39;] and &#39;ai&#39; not in log[&#39;finisher&#39;][&#39;accountId&#39;] :
                    try :
                        v2row = {&#39;victim_weapon&#39; : None if len(log[&#39;victimWeapon&#39;]) == 0 else log[&#39;victimWeapon&#39;],
                                &#39;victim_account_id&#39; :log[&#39;victim&#39;][&#39;accountId&#39;],
                                &#39;victim_parts&#39; : None if len(log[&quot;victimWeaponAdditionalInfo&quot;]) == 0 else log[&quot;victimWeaponAdditionalInfo&quot;],
                                &#39;killer_weapon&#39; : log[&#39;killerDamageInfo&#39;][&#39;damageCauserName&#39;],
                                &#39;killer_account_id&#39; : log[&#39;killer&#39;][&#39;accountId&#39;],
                                &#39;killer_parts&#39; : None if len(log[&#39;killerDamageInfo&#39;][&#39;additionalInfo&#39;]) == 0 else log[&#39;killerDamageInfo&#39;][&#39;additionalInfo&#39;],
                                &#39;killer_distance&#39; :log[&#39;killerDamageInfo&#39;][&#39;distance&#39;],
                                &#39;finisher_weapon&#39; : log[&#39;finishDamageInfo&#39;][&#39;damageCauserName&#39;],
                                &#39;finisher_account_id&#39; : log[&#39;finisher&#39;][&#39;accountId&#39;],
                                &#39;finisher_parts&#39; : None if len(log[&#39;finishDamageInfo&#39;][&#39;additionalInfo&#39;]) == 0 else log[&#39;finishDamageInfo&#39;][&#39;additionalInfo&#39;],
                                &#39;finisher_distance&#39; : log[&#39;finishDamageInfo&#39;][&#39;distance&#39;],
                                }
                    except :
                        v2row = {&#39;victim_weapon&#39; : None,
                                &#39;victim_account_id&#39; : None,
                                &#39;victim_parts&#39; : None,
                                &#39;killer_weapon&#39; : None,
                                &#39;killer_account_id&#39; : None,
                                &#39;killer_parts&#39; : None,
                                &#39;killer_distance&#39; : None,
                                &#39;finisher_weapon&#39; : None,
                                &#39;finisher_account_id&#39; : None,
                                &#39;finisher_parts&#39; : None,
                                &#39;finisher_distance&#39; : None,
                                }
                kv2.append(v2row)
            except :
                pass

    kv3 = pd.DataFrame(kv2)



    def remove_part(text):
        return re.sub(r&#39;(?:^Weapon|^Weap)|_C.*$&#39;, &#39;&#39;, text)

    kv3[&#39;victim_weapon&#39;] = kv3[&#39;victim_weapon&#39;].astype(str)
    kv3[&#39;victim_weapon&#39;] = kv3[&#39;victim_weapon&#39;].apply(remove_part)
    kv3[&#39;killer_weapon&#39;] = kv3[&#39;killer_weapon&#39;].astype(str)
    kv3[&#39;killer_weapon&#39;] = kv3[&#39;killer_weapon&#39;].apply(remove_part)
    kv3[&#39;finisher_weapon&#39;] = kv3[&#39;finisher_weapon&#39;].astype(str)
    kv3[&#39;finisher_weapon&#39;] = kv3[&#39;finisher_weapon&#39;].apply(remove_part)

    weapon_name_mapping = {
        &#39;SKS&#39;: &#39;SKS&#39;, &#39;FNFal&#39;: &#39;SLR&#39;, &#39;Mini14&#39;: &#39;미니14&#39;, &#39;Mk12&#39;: &#39;MK12&#39;, &#39;Mk14&#39;: &#39;MK14&#39;,
        &#39;QBU88&#39;: &#39;QBU&#39;, &#39;VSS&#39;: &#39;VSS&#39;, &#39;AWM&#39;: &#39;AWM&#39;, &#39;Kar98k&#39;: &#39;KAR98K&#39;, &#39;L6&#39;: &#39;링스 AMR&#39;,
        &#39;M24&#39;: &#39;M24&#39;, &#39;Mosin&#39;: &#39;모신 나강&#39;, &#39;Win1894&#39;: &#39;WIN94&#39;, &#39;Thompson&#39;: &#39;토미 건&#39;,
        &#39;BizonPP19&#39;: &#39;PP-19 비존&#39;, &#39;UZI&#39;: &#39;마이크로 UZI&#39;, &#39;MP5K&#39;: &#39;MP5K&#39;, &#39;MP9&#39;: &#39;MP9&#39;,
        &#39;P90&#39;: &#39;P90&#39;, &#39;UMP&#39;: &#39;UMP45&#39;, &#39;Vector&#39;: &#39;벡터&#39;, &#39;DP28&#39;: &#39;DP-28&#39;, &#39;M249&#39;: &#39;M249&#39;,
        &#39;MG3&#39;: &#39;MG3&#39;, &#39;Saiga12&#39;: &#39;S12K&#39;, &#39;DP12&#39;: &#39;DBS&#39;, &#39;OriginS12&#39;: &#39;O12&#39;,
        &#39;Winchester&#39;: &#39;S1897&#39;, &#39;Berreta686&#39;: &#39;S686&#39;, &#39;Sawnoff&#39;: &#39;소드 오프&#39;,
        &#39;DesertEagle&#39;: &#39;Deagle&#39;, &#39;G18&#39;: &#39;P18C&#39;, &#39;M1911&#39;: &#39;P1911&#39;, &#39;M9&#39;: &#39;P92&#39;,
        &#39;NagantM1895&#39;: &#39;R1895&#39;, &#39;Rhino&#39;: &#39;R45&#39;, &#39;vz61Skorpion&#39;: &#39;스콜피온&#39;,
        &#39;Crossbow_1&#39;: &#39;석궁&#39;, &#39;HK416&#39;: &#39;M416&#39;, &#39;G36C&#39;: &#39;G36C&#39;, &#39;ACE32&#39;: &#39;ACE32&#39;,
        &#39;AK47&#39;: &#39;AKM&#39;, &#39;AUG&#39;: &#39;AUG&#39;, &#39;Mk47Mutant&#39;: &#39;MK47 뮤턴트&#39;, &#39;FamasG2&#39;: &#39;FAMAS&#39;,
        &#39;G36C&#39;: &#39;G36C&#39;, &#39;K2&#39;: &#39;K2&#39;, &#39;M16A4&#39;: &#39;M16A4&#39;, &#39;BerylM762&#39;: &#39;베릴 M762&#39;,
        &#39;QBZ95&#39;: &#39;QBZ&#39;, &#39;SCAR-L&#39;: &#39;SCAR-L&#39;, &#39;Groza&#39;: &#39;그로자&#39;
    }

    # 무기 이름 변환
    kv3[&#39;victim_weapon&#39;] = kv3[&#39;victim_weapon&#39;].map(weapon_name_mapping)
    kv3[&#39;killer_weapon&#39;] = kv3[&#39;killer_weapon&#39;].map(weapon_name_mapping)
    kv3[&#39;finisher_weapon&#39;] = kv3[&#39;finisher_weapon&#39;].map(weapon_name_mapping)

    # 무기 분류를 위한 조건과 값 설정
    conditions = [
        kv3[&#39;killer_weapon&#39;].isin([&#39;SKS&#39;, &#39;SLR&#39;, &#39;미니14&#39;, &#39;MK12&#39;, &#39;MK14&#39;, &#39;QBU&#39;, &#39;VSS&#39;]),
        kv3[&#39;killer_weapon&#39;].isin([&#39;AWM&#39;, &#39;KAR98K&#39;, &#39;링스 AMR&#39;, &#39;M24&#39;, &#39;모신 나강&#39;, &#39;WIN94&#39;]),
        kv3[&#39;killer_weapon&#39;].isin([&#39;토미 건&#39;, &#39;PP-19 비존&#39;, &#39;마이크로 UZI&#39;, &#39;MP5K&#39;, &#39;MP9&#39;, &#39;P90&#39;, &#39;UMP45&#39;, &#39;벡터&#39;]),
        kv3[&#39;killer_weapon&#39;].isin([&#39;DP-28&#39;, &#39;M249&#39;, &#39;MG3&#39;]),
        kv3[&#39;killer_weapon&#39;].isin([&#39;S12K&#39;, &#39;DBS&#39;, &#39;O12&#39;, &#39;S1897&#39;, &#39;S686&#39;, &#39;소드 오프&#39;]),
        kv3[&#39;killer_weapon&#39;].isin([&#39;Deagle&#39;, &#39;P18C&#39;, &#39;P1911&#39;, &#39;P92&#39;, &#39;R1895&#39;, &#39;R45&#39;, &#39;스콜피온&#39;]),
        kv3[&#39;killer_weapon&#39;].isin([&#39;석궁&#39;]),
        kv3[&#39;killer_weapon&#39;].isin([&#39;M416&#39;, &#39;G36C&#39;, &#39;ACE32&#39;, &#39;AKM&#39;, &#39;AUG&#39;, &#39;FAMAS&#39;, &#39;그로자&#39;, &#39;K2&#39;, &#39;M16A4&#39;, &#39;베릴 M762&#39;, &#39;MK47 뮤턴트&#39;, &#39;QBZ&#39;, &#39;SCAR-L&#39;])
    ]

    values = [&#39;DMR&#39;, &#39;SR&#39;, &#39;SMG&#39;, &#39;LMG&#39;, &#39;SG&#39;, &#39;PISTOL&#39;, &#39;MISC&#39;, &#39;AR&#39;]

    kv3[&#39;weapon_classification&#39;] = np.select(conditions, values)
    kv3.reset_index(drop=True, inplace=True)

    return kv3


def _upload_data_to_gcp_storage(**kwargs):
    ti = kwargs[&#39;ti&#39;]
    df = ti.xcom_pull(task_ids=&#39;process_logs_weapon&#39;)
    storage_client = storage.Client()

    bucket_name = &quot;playdata2&quot;
    folder_name = &quot;logs_weapon&quot;

    date = datetime.today().strftime(&quot;%Y%m%d&quot;)

    buffer = BytesIO()
    df.to_parquet(buffer, engine=&#39;pyarrow&#39;, index=False)
    buffer.seek(0)

    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(f&quot;{folder_name}/{date}_logs_weapon.parquet&quot;)
    blob.upload_from_file(buffer)

    buffer.close()

read_data_task = PythonOperator(
    task_id=&#39;read_data&#39;,
    python_callable=_read_data_from_gcp_storage,
    provide_context=True,
    dag=dag,
)

process_logs_weapon_task = PythonOperator(
    task_id=&#39;process_logs_weapon&#39;,
    python_callable=_process_logs_weapon,
    provide_context=True,
    dag=dag,
)

upload_data_task = PythonOperator(
    task_id=&#39;upload_data&#39;,
    python_callable=_upload_data_to_gcp_storage,
    provide_context=True,
    dag=dag,
)

read_data_task &gt;&gt; process_logs_weapon_task &gt;&gt; upload_data_task</code></pre>
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/49e77672-e9cc-4564-bcdd-bcb2da56737a/image.png">
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/8cde89ca-6506-4493-a38b-b74478611d79/image.png">



]]></description>
        </item>
        <item>
            <title><![CDATA[[7월프로젝트] 1. 데이터 가져오기]]></title>
            <link>https://velog.io/@jaekyu_lim/7%EC%9B%94%ED%94%84%EB%A1%9C%EC%A0%9D%ED%8A%B8-%EB%8D%B0%EC%9D%B4%ED%84%B0-%EA%B0%80%EC%A0%B8%EC%98%A4%EA%B8%B0</link>
            <guid>https://velog.io/@jaekyu_lim/7%EC%9B%94%ED%94%84%EB%A1%9C%EC%A0%9D%ED%8A%B8-%EB%8D%B0%EC%9D%B4%ED%84%B0-%EA%B0%80%EC%A0%B8%EC%98%A4%EA%B8%B0</guid>
            <pubDate>Sun, 27 Aug 2023 14:58:25 GMT</pubDate>
            <description><![CDATA[<blockquote>
<p>7월 프로젝트로 진행하였던 배틀그라운드 API를 활용한 사이트(pd.gg)에 웹 기능 구상도
내가 주로 다루었던 페이지는 빨간색으로 표시한 <strong>무기 분석 페이지</strong>다.</p>
</blockquote>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/0d2b5bf6-11c3-4f4d-b872-7dd733e810b2/image.png" alt=""></p>
<ol>
<li>필요한 데이터 파싱하기</li>
</ol>
<p><a href="https://developer.pubg.com/">배틀그라운드 개발자 센터</a> &lt;&lt; 개인당 2개의 API Key를 받을 수 있음 (1분당 10회에 요청 제한이 있음)</p>
<h3 id="무기-정보-조회-데이터">무기 정보 조회 데이터</h3>
<blockquote>
<p><strong>유저 정보 페이지</strong>에서 본인의 <strong>무기 숙련도 Top3</strong>을 보여줄 때 사용</p>
</blockquote>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/99d3097b-6b88-4a74-a145-5b6e6cb3f2f7/image.png" alt=""></p>
<h3 id="유저-간-전투-데이터">유저 간 전투 데이터</h3>
<blockquote>
<p><strong>무기 분석 페이지</strong>와 <strong>무기 상세 페이지</strong>에서 무기 티어와 상성 간 승률을 나타내기 위해 사용</p>
</blockquote>
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/8cad473e-4759-4536-b9aa-7b2171aabcac/image.png">
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/7e833dce-7457-4f89-9324-200e9cdd8e06/image.png">
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/e2af5a83-9695-47f2-abaf-d23b312b3bfc/image.png">


<h3 id="erd">ERD</h3>
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/7023a790-dc2b-45e9-81bc-40ef1c548a54/image.png">

<h2 id="무기-마스터리-데이터-가져와서-숙련도-top3-보여주기">무기 마스터리 데이터 가져와서 숙련도 TOP3 보여주기</h2>
<pre><code class="language-python">import requests

# 개인 정보 변수
api_key = &#39;YOUR_PUBG_API_KEY&#39;  # 여기에 본인의 PUBG API 키를 입력하세요
player_id = &#39;YOUR_PLAYER_ID&#39;   # 여기에 플레이어 ID를 입력하세요
platform = &#39;kakao&#39;             # 플랫폼 정보 (예: kakao, steam 등)

def get_top3_weapon_mastery(api_key, platform, player_id):

    url = f&quot;https://api.pubg.com/shards/{platform}/players/{player_id}/weapon_mastery&quot;
    headers = {
        &quot;Authorization&quot;: API_KEY,
        &quot;Accept&quot;: &quot;application/vnd.api+json&quot;
    }
    response = requests.get(url, headers=headers)

    # 응답 처리
    if response.status_code == 200:
        data = response.json()
        weapon_mastery = data[&#39;data&#39;][&#39;attributes&#39;][&#39;weaponSummaries&#39;]

        # 경험치를 기준으로 무기 정렬
        sorted_weapons = sorted(weapon_mastery.items(), key=lambda x: x[1][&#39;XPTotal&#39;], reverse=True)

        # 상위 3개 무기 출력
        top3_weapons = sorted_weapons[:3]

        for idx, weapon in enumerate(top3_weapons, start=1):
            weapon_name = weapon[0].split(&#39;_&#39;)[-2] # Split the string to get the weapon name
            experience = weapon[1][&#39;XPTotal&#39;]
            print(f&quot;Top {idx}: {weapon_name} - 경험치: {experience}&quot;)
    else:
        print(&quot;API 요청이 실패했습니다. 상태 코드:&quot;, response.status_code)

player_id = player_id  # 플레이어 ID
api_key = api_key

get_top3_weapon_mastery(api_key, platform, player_id)</code></pre>
<ul>
<li>결과값<img src="https://velog.velcdn.com/images/jaekyu_lim/post/23707d62-2472-4f28-a586-38b389d41b1b/image.png">


</li>
</ul>
]]></description>
        </item>
        <item>
            <title><![CDATA[[8월 미니프로젝트] Spark 사용해보기]]></title>
            <link>https://velog.io/@jaekyu_lim/8%EC%9B%94-%EB%AF%B8%EB%8B%88%ED%94%84%EB%A1%9C%EC%A0%9D%ED%8A%B8-Spark-%EC%82%AC%EC%9A%A9%ED%95%B4%EB%B3%B4%EA%B8%B0</link>
            <guid>https://velog.io/@jaekyu_lim/8%EC%9B%94-%EB%AF%B8%EB%8B%88%ED%94%84%EB%A1%9C%EC%A0%9D%ED%8A%B8-Spark-%EC%82%AC%EC%9A%A9%ED%95%B4%EB%B3%B4%EA%B8%B0</guid>
            <pubDate>Thu, 24 Aug 2023 14:31:06 GMT</pubDate>
            <description><![CDATA[<h2 id="쇼핑몰-로그-데이터-분석해보기">쇼핑몰 로그 데이터 분석해보기</h2>
<blockquote>
<p>8대로 구성된 하둡, 스파크 클러스터를 활용하여 데이터 분석</p>
</blockquote>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/47377fb4-8e1e-4313-9a6c-df8e82ec7e37/image.png" alt=""></p>
<h1 id="흐름도-예상안">흐름도 (예상안)</h1>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/594387cc-ca1f-490f-8b2b-d8fcac89a5a1/image.png" alt=""></p>
<ul>
<li><a href="https://www.kaggle.com/datasets/mkechinov/ecommerce-behavior-data-from-multi-category-store?datasetId=411512">데이터셋</a> -  (eCommerce behavior data from multi category store)</li>
</ul>
<h3 id="데이터셋-구조">데이터셋 구조</h3>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/d1f4dc8f-1d73-46de-88e7-68b96d4bfb14/image.png" alt=""></p>
<h2 id="데이터-로드">데이터 로드</h2>
<p>캐글 사이트에 접속 후 데이터를 다운로드 받았다. (위의 데이터셋 링크 참조)</p>
<p>데이터의 압축 해제 후 <code>hdfs dfs -put ./2019-Nov.csv /08</code></p>
<p>HDFS 08폴더에 저장하였다.</p>
<h2 id="데이터-전처리">데이터 전처리</h2>
<p>데이터 불러오기
<code>df = spark.read.option(&quot;header&quot;, &quot;true&quot;).csv(&quot;hdfs:/08/2019-Nov2.csv&quot;)</code></p>
<p>스파크 세션 생성
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/5a26596e-74aa-4e60-bfc3-213f19dc78e1/image.png" alt=""></p>
<ul>
<li><strong><code>Raw Data</code>.ver</strong></li>
</ul>
<table>
<thead>
<tr>
<th>컬럼명</th>
<th>컬럼 설명</th>
<th>예시</th>
</tr>
</thead>
<tbody><tr>
<td>event_time</td>
<td>이벤트 발생 시간</td>
<td>2019-11-01 00:00:00 UTC</td>
</tr>
<tr>
<td>event_type</td>
<td>이벤트 유형</td>
<td>view / cart / purchase</td>
</tr>
<tr>
<td>product_id</td>
<td>상품 id</td>
<td>9800515</td>
</tr>
<tr>
<td>category_id</td>
<td>카테고리 id</td>
<td>2053013558920217191</td>
</tr>
<tr>
<td>category_code</td>
<td>카테고리 분류</td>
<td>appliances.kitchen.oven</td>
</tr>
<tr>
<td>brand</td>
<td>브랜드명</td>
<td>samsung</td>
</tr>
<tr>
<td>price</td>
<td>(상품)가격</td>
<td>489.07</td>
</tr>
<tr>
<td>user_id</td>
<td>유저 id</td>
<td>520088904</td>
</tr>
<tr>
<td>user_session</td>
<td>유저 세션</td>
<td>4d3b30da-a5e4-49df-b1a8-ba5943f1dd33</td>
</tr>
</tbody></table>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/788cee79-0cab-423c-9a93-745446b8536f/image.png" alt=""></p>
<ul>
<li><strong><code>Modified</code>.ver</strong> (전처리 후 버전 —&gt; 카테고리 대/중/소 분할 등 처리)<blockquote>
<p>&quot;.&quot;을 기준으로 칼럼 나누기
split_col = split(df[&quot;category_code&quot;], &quot;\\.&quot;)
df = df.withColumn(&quot;major&quot;, split_col.getItem(0))
df = df.withColumn(&quot;intermediate&quot;, split_col.getItem(1))
df = df.withColumn(&quot;minor&quot;, split_col.getItem(2))</p>
</blockquote>
</li>
</ul>
<table>
<thead>
<tr>
<th>컬럼명</th>
<th>컬럼 설명</th>
<th>예시</th>
</tr>
</thead>
<tbody><tr>
<td>event_time</td>
<td>이벤트 발생 시간</td>
<td>2019-11-01 00:00:00 UTC</td>
</tr>
<tr>
<td>event_type</td>
<td>이벤트 유형</td>
<td>view / cart / purchase</td>
</tr>
<tr>
<td>product_id</td>
<td>상품 id</td>
<td>9800515</td>
</tr>
<tr>
<td>category_id</td>
<td>카테고리 id</td>
<td>2053013558920217191</td>
</tr>
<tr>
<td>category_code</td>
<td>카테고리 분류</td>
<td>appliances.kitchen.oven</td>
</tr>
<tr>
<td>brand</td>
<td>브랜드명</td>
<td>samsung</td>
</tr>
<tr>
<td>price</td>
<td>(상품)가격</td>
<td>489.07</td>
</tr>
<tr>
<td>user_id</td>
<td>유저 id</td>
<td>520088904</td>
</tr>
<tr>
<td>user_session</td>
<td>유저 세션</td>
<td>4d3b30da-a5e4-49df-b1a8-ba5943f1dd33</td>
</tr>
<tr>
<td>major</td>
<td>대분류</td>
<td>appliances</td>
</tr>
<tr>
<td>intermediate</td>
<td>중분류</td>
<td>kitchen</td>
</tr>
<tr>
<td>minor</td>
<td>소분류</td>
<td>oven</td>
</tr>
</tbody></table>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/934dbff3-d675-4a86-8c69-83d62cf4ac6a/image.png" alt=""></p>
<h2 id="데이터-전처리-후-데이터-마트-생성">데이터 전처리 후 데이터 마트 생성</h2>
<p>데이터에 category_code가 null값인 것을 제외하고 카테고리를 .을 기준으로 나눠 대, 중, 소분류 칼럼을 만들어 google bigquery에 적재하였다.</p>
<pre><code class="language-sql">from pyspark.sql.functions import col
from google.cloud import bigquery
from google.oauth2 import service_account
import pandas as pd
import pyarrow

# category_code가 널이 아닌 레코드 필터링
filtered_df = df.filter(col(&quot;category_code&quot;).isNotNull())

# GCP 서비스 계정 키 파일 경로
key_path = &quot;키파일경로&quot; json 파일

# 프로젝트 및 데이터셋 ID
project_id = &quot;프로젝트 ID&quot;
dataset_id = &quot;프로젝트 ID.데이터셋 ID&quot;

# BigQuery 클라이언트 설정
client = bigquery.Client.from_service_account_json(key_path)

# 데이터 프레임을 팬더스 데이터 프레임으로 변환(데이터 시리얼라이즈)
# pandas_df = df.select(&quot;*&quot;).toPandas()
pandas_df = df.limit(100000).toPandas()
# BigQuery로 데이터 프레임 전송
table_id = &quot;프로젝트 ID.데이터셋 ID.테이블&quot;

job_config = bigquery.LoadJobConfig(
    # 스키마 자동 감지
    schema=[],
    autodetect=True,
    # 데이터 쓰기 방식 선택
    write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,  # 이전 테이블 데이터 대체
)

# BigQuery에 데이터 프레임 불러오기
job = client.load_table_from_dataframe(
    pandas_df, table_id, job_config=job_config
)

# 작업 완료 시간 기록
job.result()</code></pre>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/87282442-f2d8-4175-ae89-6147f21ad40e/image.png" alt=""></p>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/fe34d3b6-ad5d-48cf-a3b5-2f92b7498834/image.png" alt=""></p>
<h2 id="데이터-분석">데이터 분석</h2>
<ol>
<li>카테고리 별 판매 상품 개수 집계 (TOP10)</li>
</ol>
<blockquote>
<p>스파크를 사용한 것과 사용하지 않은 것에 차이가 59초 차이
약 5.54배 정도에 성능을 얻을 수 있었다.</p>
</blockquote>
<ul>
<li><p>Spark 사용
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/7c7afa61-9c0d-44ae-b4ad-877d15bb1e0b/image.png" alt="">
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/9bc1ff2e-6bc3-4643-bfe2-2e186d237502/image.png" alt=""></p>
</li>
<li><p>로컬
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/fd0f1eb1-559e-4e2b-97f6-7ad18195218b/image.png" alt=""></p>
</li>
</ul>
<blockquote>
<p>추후 작업
<strong><code>매출, 전환율 현황 분석 리포트</code></strong>를 
<strong>카테고리 별로 나누어</strong> 보기 좋게 제작하고자 한다!</p>
</blockquote>
]]></description>
        </item>
        <item>
            <title><![CDATA[[7월프로젝트] 하둡 클러스터 구축하기 (3) - Spark ]]></title>
            <link>https://velog.io/@jaekyu_lim/%EC%B5%9C%EC%A2%85%ED%94%84%EB%A1%9C%EC%A0%9D%ED%8A%B8-%ED%95%98%EB%91%A1-%ED%81%B4%EB%9F%AC%EC%8A%A4%ED%84%B0-%EA%B5%AC%EC%B6%95%ED%95%98%EA%B8%B0-3-Spark</link>
            <guid>https://velog.io/@jaekyu_lim/%EC%B5%9C%EC%A2%85%ED%94%84%EB%A1%9C%EC%A0%9D%ED%8A%B8-%ED%95%98%EB%91%A1-%ED%81%B4%EB%9F%AC%EC%8A%A4%ED%84%B0-%EA%B5%AC%EC%B6%95%ED%95%98%EA%B8%B0-3-Spark</guid>
            <pubDate>Thu, 24 Aug 2023 11:13:32 GMT</pubDate>
            <description><![CDATA[<blockquote>
<p>이전 포스트에서 다뤘던  spark 세션을 띄우고자 했지만, 실패한 것에 해결에 대한 기록</p>
</blockquote>
<p><code>pyspark --master yarn --num-executors 5</code> 명령어를 이용하여 YARN으로 PySpark를 실행하였지만, 이건 <code>pyspark kernel</code>이 아니라 <code>Python 3</code> 이었다.</p>
<p>단일 노드에서 spark를 사용하는 로컬 세션을 생성했던 것이었다.
어쩐지 경로 보려고 tab키를 눌렀는데, 로컬 경로가 나왔었다.</p>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/10d12a23-6155-42dc-be1d-23f2b0df395c/image.png" alt=""></p>
<hr>
<h2 id="해결">해결</h2>
<blockquote>
<p>크게는 2가지 <strong>pyspark 세션 생성. python 버전을 낮추는 것이었다.</strong></p>
</blockquote>
<h2 id="해결-과정">해결 과정</h2>
<ol>
<li>Pyspark 커널 만들기</li>
</ol>
<blockquote>
<p>pyspark를 CLI환경이 아니라 Jupyter를 사용해서 작업 환경을 만들고 싶었기 때문에
 PySpark 커널의 명령 줄 인수, 사용할 프로그래밍 언어, 추가 메타데이터, 환경 변수 등을 정의하였다.</p>
</blockquote>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/fe773ac2-5627-47f0-b311-1193c0ff2da4/image.png" alt=""></p>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/edefd3c0-c83f-46be-8960-146d411da5be/image.png" alt=""></p>
<ol start="2">
<li>shell.py 만들기</li>
</ol>
<blockquote>
<p>Pyspark 실행 환경을 설정하고 초기화 하는 코드를 가지고 있는 py파일</p>
</blockquote>
<pre><code class="language-python">import atexit

import os

import platform

import warnings

​

from pyspark.context import SparkContext

from pyspark.sql import SparkSession

​

if os.environ.get(&quot;SPARK_EXECUTOR_URI&quot;):

    SparkContext.setSystemProperty(&quot;spark.executor.uri&quot;, os.environ[&quot;SPARK_EXECUTOR_URI&quot;])

​

SparkContext._ensure_initialized()  # type: ignore

​

try:

    spark = SparkSession._create_shell_session()  # type: ignore

except Exception:

    import sys

    import traceback

    warnings.warn(&quot;Failed to initialize Spark session.&quot;)

    traceback.print_exc(file=sys.stderr)

    sys.exit(1)

​

sc = spark.sparkContext

sql = spark.sql

atexit.register((lambda sc: lambda: sc.stop())(sc))

​

# for compatibility

sqlContext = spark._wrapped

sqlCtx = sqlContext

​

print(r&quot;&quot;&quot;Welcome to

      ____              __

     / __/__  ___ _____/ /__

    _\ \/ _ \/ _ `/ __/  &#39;_/

   /__ / .__/\_,_/_/ /_/\_\   version %s

      /_/

&quot;&quot;&quot; % sc.version)

print(&quot;Using Python version %s (%s, %s)&quot; % (

    platform.python_version(),

    platform.python_build()[0],

    platform.python_build()[1]))

print(&quot;Spark context Web UI available at %s&quot; % (sc.uiWebUrl))

print(&quot;Spark context available as &#39;sc&#39; (master = %s, app id = %s).&quot; % (sc.master, sc.applicationId))

print(&quot;SparkSession available as &#39;spark&#39;.&quot;)

​

# The ./bin/pyspark script stores the old PYTHONSTARTUP value in OLD_PYTHONSTARTUP,

# which allows us to execute the user&#39;s PYTHONSTARTUP file:

_pythonstartup = os.environ.get(&#39;OLD_PYTHONSTARTUP&#39;)

if _pythonstartup and os.path.isfile(_pythonstartup):

    with open(_pythonstartup) as f:
               code = compile(f.read(),_pythonstartup, &#39;exec&#39;)
               exec(code)
</code></pre>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/37d5324b-450a-498a-b2e3-683f0c020512/image.png" alt=""></p>
<ol start="3">
<li>Python 버전 낮추기</li>
</ol>
<blockquote>
<p>💡 Tip
venv는 설치된 파이썬 버전으로만 가상환경을 생성할 수 있습니다.
예를 들어, 여러분이 시스템에 Python 3.9 버전을 설치했는데
3.8 버전으로 프로젝트를 진행하고자 한다면 venv로는 이 문제를 해결할 수 없습니다.
파이썬 3.8 버전도 여러분의 PC에 설치를 해야합니다.
이와 달리 아나콘다의 conda는 아나콘다 배포판의 파이썬이 3.9 버전이라고 하더라도 
3.8 버전으로 가상환경을 만들 수 있습니다.</p>
</blockquote>
<blockquote>
<p>python 3.10 버전을 사용중이었는데, Spark와 호환이 안된다는 에러 문구가 나와 가상환경을 이용하여 3.8버전으로 낮춰주었다.</p>
</blockquote>
<pre><code class="language-bash">conda create -n py38 python=3.8 # py38 이름을 가진 가상환경을 만들어 python 3.8버전 설치

conda env list 가상환경 생성되었나 확인

conda activate py38 py38 가상환경 접속

</code></pre>
<p>.bashrc 수정</p>
<pre><code>export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS=&#39;notebook --ip=0.0.0.0&#39;
export PKG_CONFIG_PATH=&quot;/path/to:$PKG_CONFIG_PATH&quot;</code></pre><hr>
<p>이후에는 py38 가상환경으로 접속하여 <code>notebook --ip=0.0.0.0</code> 명령어를 통해
스파크를 실행하는데 성공하였다.</p>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/47131b6c-39ae-45cb-98d4-9e10efb041d4/image.png" alt=""></p>
<p><strong>해결한 후 화면</strong>
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/a8d07f7a-8f46-47ce-afeb-fffd9acd1b0f/image.png" alt=""></p>
<hr>
<p>배틀그라운드 데이터 하둡에 저장하기</p>
<p>팀원들이 개인 pc나 kict 인프라를 통해 적재해온 배틀그라운드 로그 데이터를
client pc에 모두 옮겨담았다.
<code>hdfs dfs -put ./hdfs/b/parquets/killv2/* /killv2</code> 명령어를 사용하여 client 로컬에 있던 파일들을 모두 하둡에 저장하였다.</p>
<ul>
<li>저장
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/f7e7cf1e-b809-4863-9ba7-33e312b1cfe5/image.png" alt=""></li>
</ul>
<p><code>(base) hadoop@client:~/hdfs/b/parquets/killv2$ ls -l | grep &quot;^-&quot; | wc -l</code></p>
<p>193595개의 파케이 파일을 업로드하여 데이터 준비를 마쳤다.</p>
]]></description>
        </item>
        <item>
            <title><![CDATA[[7월프로젝트] airflow 알림 메시지 보내기 - slack]]></title>
            <link>https://velog.io/@jaekyu_lim/airflow-%EC%98%A4%EB%A5%98%EB%A9%94%EC%8B%9C%EC%A7%80-%EB%B3%B4%EB%82%B4%EA%B8%B0-slack</link>
            <guid>https://velog.io/@jaekyu_lim/airflow-%EC%98%A4%EB%A5%98%EB%A9%94%EC%8B%9C%EC%A7%80-%EB%B3%B4%EB%82%B4%EA%B8%B0-slack</guid>
            <pubDate>Mon, 07 Aug 2023 03:01:04 GMT</pubDate>
            <description><![CDATA[<h2 id="slack_sdk를-활용하여-airflow-알리미-만들기">slack_sdk를 활용하여 airflow 알리미 만들기</h2>
<blockquote>
<p><strong>목표</strong> : 프로젝트 중 무기 데이터의 집계를 하기 위한 데이터를 주기적으로 받아 GCP 스토리지에 저장해야 하는 작업이 필요했다. Raw Data를 받아와 우리가 사용할 데이터로 전처리하고 파싱하는 dag를 짰고, 이것이 잘 수행 중인지에 대해 slack bot을 통해 알림을 받고자 한다.</p>
</blockquote>
<h2 id="봇-만들기">봇 만들기</h2>
<ol>
<li><a href="https://api.slack.com/">https://api.slack.com/</a> 사이트에서 로그인 후 Your apps</li>
</ol>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/015d98e8-5075-4a5d-998c-1ee95ad817cf/image.png" alt=""></p>
<ol start="2">
<li>Create New App 버튼 클릭 후 첫번째 클릭 (From scratch)</li>
</ol>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/d1c514c4-0415-4263-8268-c3fb0a18b8b0/image.png" alt=""></p>
<ol start="3">
<li>앱이름을 설정 한 후 본인이 사용할 워크스페이스를 선택</li>
</ol>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/7a68dce8-3322-4a68-b781-9a18f9ca59a2/image.png" alt=""></p>
<ol start="4">
<li>왼쪽 화면에서 권한설정하는 곳으로 가서 Scopes에서 권한을 부여한다.</li>
</ol>
<table>
<thead>
<tr>
<th><img src="https://velog.velcdn.com/images/jaekyu_lim/post/ae55a6af-5527-4e74-89c8-9fe7f2e4da97/image.png" alt=""></th>
<th><img src="https://velog.velcdn.com/images/jaekyu_lim/post/4cdf4a6c-37e6-4877-b4bd-90c7da2556a0/image.png" alt=""></th>
</tr>
</thead>
</table>
<ol start="5">
<li>소켓모드 활성화<blockquote>
<p><strong>Slack API 소켓모드란?</strong>
실시간 메시징 및 이벤트 기능을 구현하는 데 사용되는 기술로, 기본적으로 Slack API의 웹 소켓 연결을 통해 실기산으로 메시지 및 이벤트를 주고 받을 수 있게 해주며 이를 통해 애플리케이션은 더 빠르게 사용자와 상호작용하고, 이벤트를 감지하며, 실시간 업데이트를 제공
소켓 모드를 사용하면 회사 방화벽 뒤에서 허용되지 않을 수 있는 공용 HTTP 끝점을 노출하지 않고 봇이 작업 공간에서 상호  작용이 가능</p>
</blockquote>
</li>
</ol>
<table>
<thead>
<tr>
<th><img src="https://velog.velcdn.com/images/jaekyu_lim/post/fe3f1dc5-3945-45b5-90b2-904f141381e4/image.png" alt=""></th>
<th><img src="https://velog.velcdn.com/images/jaekyu_lim/post/4f4e2eba-9203-4bf6-bfc1-d5ca232a4e86/image.png" alt=""></th>
</tr>
</thead>
</table>
<ol start="6">
<li><p>워크스페이스에 앱 설치
처음 만들고 나면 Install to Workspace에 눌러 자신이 원하는 워크스페이스에 등록하면 된다.
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/49f13ca4-252c-4901-b329-7e58636f8938/image.png" alt="">
권한 수정과 같이 수정하였을 때는 Reinstall을 이용하여 수정 해주면 된다.
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/e6a16364-aa55-4e62-b888-5e2362012926/image.png" alt=""></p>
</li>
<li><p>채널에 봇 추가하기</p>
</li>
</ol>
<p><code>@봇이름</code> 을 하여 채널에 참여 시킨다.</p>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/9c8423c9-4b39-4b7a-95de-662b80462db2/image.png" alt=""></p>
<h2 id="airflow-세팅">Airflow 세팅</h2>
<p>airflow를 켠 후 admin - Variavles 설정</p>
<table>
<thead>
<tr>
<th><img src="https://velog.velcdn.com/images/jaekyu_lim/post/7db8d1ba-d595-408b-8d93-cab2c8e56c94/image.png" alt=""></th>
<th><img src="https://velog.velcdn.com/images/jaekyu_lim/post/448bd3c6-256a-4b2a-a7e5-7985f5041f39/image.png" alt=""></th>
</tr>
</thead>
</table>
<p>key값에 본인이 쓸 token 변수명을 적고, val에 토큰값을 입력 (xoxb로 시작하는 토큰값)</p>
<h2 id="slack_nofiticationspy-작성">slack_nofitications.py 작성</h2>
<blockquote>
<p>여러 개의 dag에 쉽게 적용 시키기 위해서 slack_nofitications.py 작성</p>
</blockquote>
<pre><code class="language-python">from slack_sdk import WebClient
from datetime import datetime

class SlackAlert:
    # 클래스 인스턴스 초기화
    # 채널정보와 slack 인증 토큰을 인자로 받음
    def __init__(self, channel, token):
        self.channel = channel
        self.client = WebClient(token=token)

    def success_msg(self, msg):
        # 성공메시지를 작성하고 일자와 task id, dag id, log url을 슬랙 메세지로 출력
        text = f&quot;&quot;&quot;
            date : {datetime.today().strftime(&#39;%Y-%m-%d&#39;)}
            alert : 
                Success! 
                    task id : {msg.get(&#39;task_instance&#39;).task_id}, 
                    dag id : {msg.get(&#39;task_instance&#39;).dag_id}, 
                    log url : {msg.get(&#39;task_instance&#39;).log_url}
            &quot;&quot;&quot;
        self.client.chat_postMessage(channel=self.channel, text=text)

    def fail_msg(self, msg):
        # 실패메시지를 작성하고 일자와 task id, dag id, log url을 슬랙 메세지로 출력
        text = f&quot;&quot;&quot;
            date : {datetime.today().strftime(&#39;%Y-%m-%d&#39;)}  
            alert : 
                Fail! 
                    task id : {msg.get(&#39;task_instance&#39;).task_id}, 
                    dag id : {msg.get(&#39;task_instance&#39;).dag_id}, 
                    log url : {msg.get(&#39;task_instance&#39;).log_url}
        &quot;&quot;&quot;
        self.client.chat_postMessage(channel=self.channel, text=text)</code></pre>
<h2 id="내-dag에-적용하기">내 dag에 적용하기</h2>
<p>앞서 slack_notifications.py를 이용하여 SlackAlert 함수 사용
토큰 값에 노출을 막기 위해 Variable 지정</p>
<p>Variable.get을 통해 지정해두었던 slack_api_token을 불러옴
SlackAlert에 첫째 인자는 메시지를 보낼 채널명을 입력하고, 두번째 인자에는 slack token을 입력</p>
<p>dag에 on_success_callback과 on_failure_callback에
각각 slack_notifications.py에서 만든 함수인 sucess_msg, fail_msg를 넣어준다.
 <img src="https://velog.velcdn.com/images/jaekyu_lim/post/a3681935-bd8f-49b4-8be6-62516178d2c6/image.png" alt=""></p>
<h2 id="성공화면">성공화면</h2>
<p>message 채널에 notice-bot이 성공 여부를 알리는 메시지를 보내준다.</p>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/bf8ca5d2-4619-42f0-a6d3-69f988925dd9/image.png" alt=""></p>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/230df61a-1d2f-42de-bb13-7d6646c7262a/image.png" alt=""></p>
<p>Raw data를 분석하기 위해 데이터 파싱 및 전처리를 한 후 파케이 파일로 logs_weapon에 넣는 것이 목적
logs_weapon파일에 가보니 잘 들어온 것을 확인할 수 있었음
gcp 스토리지에도 파케이 파일로 잘 들어온 모습</p>
<p> <img src="https://velog.velcdn.com/images/jaekyu_lim/post/e2aeabe7-f502-42fc-8c15-4c5d93cc6de3/image.png" alt=""></p>
<blockquote>
<p>파케이 파일을 꺼내서 본 모습
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/3aaa01a3-41f4-4117-82ba-8ac750b0ff1e/image.png" alt=""></p>
</blockquote>
<p> 참고 blog
 <a href="https://www.twilio.com/blog/how-to-build-a-slackbot-in-socket-mode-with-python">https://www.twilio.com/blog/how-to-build-a-slackbot-in-socket-mode-with-python</a></p>
]]></description>
        </item>
        <item>
            <title><![CDATA[[7월프로젝트]하둡 클러스터 구축하기 (2) - Spark]]></title>
            <link>https://velog.io/@jaekyu_lim/%ED%95%98%EB%91%A1-%ED%81%B4%EB%9F%AC%EC%8A%A4%ED%84%B0-%EA%B5%AC%EC%B6%95%ED%95%98%EA%B8%B0-2-Spark</link>
            <guid>https://velog.io/@jaekyu_lim/%ED%95%98%EB%91%A1-%ED%81%B4%EB%9F%AC%EC%8A%A4%ED%84%B0-%EA%B5%AC%EC%B6%95%ED%95%98%EA%B8%B0-2-Spark</guid>
            <pubDate>Wed, 26 Jul 2023 04:35:54 GMT</pubDate>
            <description><![CDATA[<blockquote>
<p>Spark 설치 후 SparkSession 띄우기</p>
</blockquote>
<p>이전에 작성했던 걸 보고 client에서 PySpark를 설치했다.
설치 후 datanode1~5에 spark 파일을 복사하였다.
<a href="https://velog.io/@jaekyu_lim/Hadoop-Spark-%EC%82%AC%EC%9A%A9%ED%95%B4%EB%B3%B4%EA%B8%B0">Spark 설치하기</a> &lt;-</p>
<p>spark_encore.tar.gz로 압축하여 각 노드에 ssh로 접속 후 압축 해제하는 과정을 진행했다.
<code>scp ./spark_encore.tar.gz datanode1:/home/hadoop/</code>
<code>scp ./spark_encore.tar.gz datanode2:/home/hadoop/</code>
<code>scp ./spark_encore.tar.gz datanode3:/home/hadoop/</code>
<code>scp ./spark_encore.tar.gz datanode4:/home/hadoop/</code>
<code>scp ./spark_encore.tar.gz datanode5:/home/hadoop/</code></p>
<hr>
<h2 id="spark_home과-jupyter를-웹브라우저에서-접속할-수-있도록-인터페이스-설정">SPARK_HOME과 Jupyter를 웹브라우저에서 접속할 수 있도록 인터페이스 설정</h2>
<p> <code>vim .bashrc</code> 수정</p>
<p><code>export SPARK_HOME=&quot;/home/hadoop/spark&quot;</code>
<code>export PYSPARK_DRIVER_PYTHON=jupyter</code>
<code>export PYSPARK_DRIVER_PYTHON_OPTS=&#39;notebook --ip=0.0.0.0&#39;</code></p>
<hr>
<h2 id="주피터-노트북으로-스파크-세션이-뜨지-않는-오류">주피터 노트북으로 스파크 세션이 뜨지 않는 오류</h2>
<blockquote>
<p><code>client:8888</code>에 접속하여 jupyter notebook을 켜서
<code>from pyspark.sql import SparkSession</code> 명령어를 실행하면 실행이 되지 않는 현상 발생</p>
</blockquote>
<ul>
<li>현상 이유 : excutor 5개가 뜨는 시간이 너무 오래 걸려 timeout 에러 발생</li>
</ul>
<blockquote>
<p>기존에 스파크 드라이버 메모리를 4096 MB로 설정했었는데, 8GB로 늘리고, EXCUTOR_MEMORY도 2048 MB에서 4GB로 늘렸다.
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/93d4bb1b-e937-4c9e-a579-e95962fd750c/image.png" alt=""></p>
</blockquote>
<blockquote>
<p>pyspark --master yarn --num-executors 1로 줄여서 실행하여 보니 실행되었다.</p>
</blockquote>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/856595b1-972a-4914-ab1b-a7f020464628/image.png" alt=""></p>
<p>설정을 그대로 하여 </p>
<p><code>pyspark --master yarn --num-executors 5</code>도 실행해봤다.</p>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/90c8ffaa-842e-4fa2-9801-d0e6e737f091/image.png" alt=""></p>
<blockquote>
<p><em>나중에 알아본 결과 이건, python3으로 단일노드로 실행한 것이었다.
내가 원하는 것은 서버를 구성한 클러스터를 모두 사용하여 분산 처리를 하려고 했던 것이었다.
pyspark 커널을 띄워 pyspark를 제대로 사용하는 건 다음 포스트에 작성하려고 한다.</em></p>
</blockquote>
<hr>
<blockquote>
<p>추후에 할 일</p>
</blockquote>
<ul>
<li>현재 프로젝트 진행 중인 데이터를 hdfs에 담아서 spark로 병렬처리해보기</li>
<li>온프레미스 환경으로 구축된 클러스터를 작업 중인 컴퓨터와 같은 와이파이를 사용하지 않아도 접속가능하도록 만들기 (open VPN)</li>
</ul>
]]></description>
        </item>
        <item>
            <title><![CDATA[[7월프로젝트]하둡 클러스터 구축하기 (1)]]></title>
            <link>https://velog.io/@jaekyu_lim/%ED%95%98%EB%91%A1-%ED%81%B4%EB%9F%AC%EC%8A%A4%ED%84%B0-%EA%B5%AC%EC%B6%95%ED%95%98%EA%B8%B01</link>
            <guid>https://velog.io/@jaekyu_lim/%ED%95%98%EB%91%A1-%ED%81%B4%EB%9F%AC%EC%8A%A4%ED%84%B0-%EA%B5%AC%EC%B6%95%ED%95%98%EA%B8%B01</guid>
            <pubDate>Wed, 12 Jul 2023 08:58:44 GMT</pubDate>
            <description><![CDATA[<blockquote>
<h3 id="개요">개요</h3>
<p><strong>윈도우 8대 노트북을 우분투 22.04 LTS를 이용하여 각각의 서버로 만들어 하둡 구성하기</strong>
최종 프로젝트를 준비하던 중 데이터가 많아 결과를 보기까지가 오래걸려
분산처리를 한다면 얼마나 시간을 단축할 수 있을까?에 대한 생각으로 온프레미스 환경으로
노트북 8대를 연결하여 Hadoop 클러스터를 구성하였다. 데이터를 HDFS에 밀어넣어
PySpark를 통해 데이터 전처리와 분석을 해보려 한다.</p>
</blockquote>
<hr>
<p>🛠️ 노트북 스펙
OS : Windows 10 HOME
프로세서 : Intel(R) Core(TM) i5-7200U CPU
RAM : 16GB</p>
<hr>
<p>🛠️ 설정 환경
OS : Ubuntu 22.04
Hadoop : 3.2.1
jdk : 1.8.0
spark : 3.2.4
python : 3.10.10 (miniconda 환경)</p>
<p>프로젝트 인프로 설계도 (변경 가능)
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/7fa16dd7-5455-4cda-a5ec-48733d45b3f6/image.png" alt=""></p>
<blockquote>
<p>목표: 여러 대의 노트북으로 하둡 클러스터 구성해보기
 8개의 노트북으로 client, namenode, secondnode, datanode 1~5를 구성</p>
</blockquote>
<h2 id="ubuntu-2204-설치">Ubuntu 22.04 설치</h2>
<blockquote>
<p>노트북에 기존에 깔려있던 윈도우 운영체제 삭제 후, Ubuntu 운영체제 설치 (USB 이용)</p>
</blockquote>
<blockquote>
<p>※ 컴퓨터 몇 대가 USB를 통해 Ubuntu 운영체제를 설치하려고 해도 설치되지 않는 현상 발생
기존에 windows 운영체제 파티션 충돌 문제로 인해 설치가 되지 않았다.
수동으로 파티션에 있는 걸 모두 비어준 다음 설치하니 제대로 작동이 되었다.</p>
</blockquote>
<h2 id="우분투-한글-설정-방법">우분투 한글 설정 방법</h2>
<p>터미널을 열어
<code>ibus-setup</code> 명령어 입력</p>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/3cb477f0-7fb2-4912-944e-fb2abe2a189c/image.png" alt=""></p>
<ol>
<li><p>Settings &gt; Keyboard &gt; + 버튼을 누르고 &gt; Korean 검색 후 &gt; Korean(Hangul)을 Add</p>
</li>
<li><p>여기까지 진행하면 우측상단에서 Korean(Hangul)이 추가되어서 수정할 수 있다.</p>
<ul>
<li>한글, 영어 수정 방법은 Shift+Space 키 혹은 한/영 키를 누르기</li>
</ul>
</li>
</ol>
<h2 id="hadoop-user-생성하기">hadoop user 생성하기</h2>
<p><code>sudo adduser hadoop</code>  hadoop 계정 생성</p>
<p>비밀번호는 hadoop으로 설정</p>
<h2 id="ssh-접속을-위한-키-생성">SSH 접속을 위한 키 생성</h2>
<blockquote>
<p>노트북끼리 서로 원활하게 소통하기 위해 키 생성 
퍼블릭 키를 통해 서로의 노트북에 접속 가능케 하기 위함</p>
</blockquote>
<p><code>ssh-keygen -t rsa</code> 키 생성</p>
<blockquote>
<p>SSH이란? (What is SSH?)
시큐어 셀(Secure SHell, SSH)은 네트워크 상의 다른 컴퓨터에 로그인하거나 원격 시스템에서 명령을 실행하고 다른 시스템으로 파일을 복사할 수 있도록 해주는 응용 프로그램 또는 그 프로토콜을 가리킨다.</p>
</blockquote>
<p>서로 ssh 명령어로 접속이 가능하도록 openssh-server 설치
<code>sudo apt install openssh-server</code></p>
<h2 id="호스트-이름-변경">호스트 이름 변경</h2>
<blockquote>
<p>노트북 8대를
client, namenode, secondnode, datanode1, datanode2, datanode3, datanode4, datanode5 설정</p>
</blockquote>
<p><code>sudo hostnamectl set-hostname client</code>
<code>sudo hostnamectl set-hostname namenode</code>
<code>sudo hostnamectl set-hostname secondnode</code>
<code>sudo hostnamectl set-hostname datanode1</code>
<code>sudo hostnamectl set-hostname datanode2</code>
<code>sudo hostnamectl set-hostname datanode3</code>
<code>sudo hostnamectl set-hostname datanode4</code>
<code>sudo hostnamectl set-hostname datanode5</code></p>
<h2 id="root계정으로-hadoop-계정-권한-설정">root계정으로 hadoop 계정 권한 설정</h2>
<blockquote>
<p>sudo visudo 명령은 리눅스 시스템에서 /etc/sudoers 파일을 수정하기 위해 사용되는 명령</p>
</blockquote>
<p><code>sudo visudo</code></p>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/d90b4334-2197-4a54-9728-3b7ec5e64e3f/image.png" alt=""></p>
<p>root 밑에 hadoop 계정 추가해주기
계정을 추가하면 sudo 명령어를 사용할 수 있게 됨</p>
<h2 id="방화벽-해제">방화벽 해제</h2>
<blockquote>
<p>방화벽을 해제하여 서로 연결하는데 방해요소가 없도록 설정</p>
</blockquote>
<p><code>sudo apt install ufw</code> 방화벽 설치</p>
<p><code>sudo systemctl stop ufw</code> 방화벽 멈추기</p>
<p><code>sudo systemctl status ufw</code> 방화벽 상태 확인</p>
<h2 id="ip-고정-설정">Ip 고정 설정</h2>
<blockquote>
<p>노트북에 전부 랜선을 꽂을 환경이 되지 않아 Wi-Fi로 접속하였고,
서로 지속적으로 통신을 해야 되기 때문에 IP주소를 고정하였다.</p>
</blockquote>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/89e3694f-f5a6-4316-b862-1c7f4dd0a09a/image.png" alt=""></p>
<h2 id="노드-간-통신-설정">노드 간 통신 설정</h2>
<blockquote>
<p>퍼블릭 키를 authorized_keys에 적어 scp 명령어를 통해 모든 노드에 복사
권한 설정을 하여 서로 ssh를 통해 접속 가능하게 설정</p>
</blockquote>
<p><code>cd ~/.ssh</code> .ssh로 이동</p>
<p><code>cat id_rsa.pub &gt;&gt; authorized_keys</code> authorized_keys에 노트북 7대에 퍼블릭 키를 받아 작성</p>
<p><code>chmod 600 ./authorized_keys</code> 소유자에게만 읽기,쓰기 권한 부여</p>
<p><code>scp ./authorized_keys client:/home/hadoop/.ssh/</code> 
<code>scp ./authorized_keys namenode:/home/hadoop/.ssh/</code> 
<code>scp ./authorized_keys secondnode:/home/hadoop/.ssh/</code> 
<code>scp ./authorized_keys datanode1:/home/hadoop/.ssh/</code> 
<code>scp ./authorized_keys datanode2:/home/hadoop/.ssh/</code> 
<code>scp ./authorized_keys datanode3:/home/hadoop/.ssh/</code> 
<code>scp ./authorized_keys datanode4:/home/hadoop/.ssh/</code> 
<code>scp ./authorized_keys datanode5:/home/hadoop/.ssh/</code> </p>
<p>7대에 퍼블릭 키를 모아둔 파일을 모든 노드에 복사</p>
<p>위의 명령어를 이용하여 모든 노드에서 <strong>키 공유</strong></p>
<p><code>sudo apt install vim</code> vim 설치</p>
<p><code>vim /etc/hosts</code> host 파일 수정</p>
<p>192.168.80.14   datanode1
192.168.80.150  datanode2
192.168.80.169  datanode3
192.168.80.160  datanode4
192.168.80.155  datanode5
192.168.80.170    client
192.168.80.4    namenode
192.168.80.28     secondnode</p>
<p>↑ 추가</p>
<hr>
<p>ssh를 통해 모든 컴퓨터에 접속 가능</p>
<p><code>ssh client</code>
<code>ssh namenode</code>
<code>ssh secondnode</code>
<code>ssh datanode1</code>
<code>ssh datanode2</code>
<code>ssh datanode3</code>
<code>ssh datanode4</code>
<code>ssh datanode5</code></p>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/97bccb46-251e-4693-8ba6-132fe14b350f/image.png" alt=""></p>
<hr>
<h2 id="java-설치-open-jdk-180">JAVA 설치 (open jdk 1.8.0)</h2>
<blockquote>
<p>하둡이 자바로 구성돼있기 때문에 java 설치</p>
</blockquote>
<p><code>sudo apt-get update</code></p>
<p><code>sudo apt-get install openjdk-8-jdk</code></p>
<p><code>java -version</code> 버전 확인</p>
<h3 id="java_home-설정">JAVA_HOME 설정</h3>
<p><code>readlink -f $(which java)</code> JAVA가 설치된 경로 확인</p>
<p><code>sudo vi /etc/profile</code>  profile 편집  // <code>.bashrc</code>에 설정해도 된다.</p>
<p><code>export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64</code> 자바 홈 경로 추가
<code>export PATH=$PATH:$JAVA_HOME/bin</code> 경로 지정</p>
<ul>
<li><code>:$JAVA_HOME/bin</code> 의미 : 기존 PATH에 JAVA_HOME을 추가<ul>
<li>$PATH를 제외 하고 입력할 시에 PATH가 JAVA_HOME만 남기 때문에 JAVA는 이상이 없지만 다른 명령어는 먹히지 않음</li>
</ul>
</li>
</ul>
<blockquote>
<h3 id="bashrc와-profile의-차이점">.bashrc와 .profile의 차이점</h3>
</blockquote>
<ul>
<li>.bashrc : 터미널에서 실행될 때마다 실행되는 파일로서, 새로운 쉘을 시작할 때마다 사용자의 환경을 설정. 사용자의 특정 프로그램에 관한 환경 변수를 정의하거나 에일리어스(Alias; 단축 명령어)를 설정하는 등의 작업을 수행. 이 파일은 로그인이 아닌 쉘에서 실행됩니다.</li>
<li>.profile : 사용자가 로그인할 때 실행되는 파일. 로그인 쉘에서만 실행되며, 사용자의 환경을 설정하는 중요한 파일. 이 파일에서는 PATH 환경변수를 정의하거나, PS1환경 변수를 설정하고, 로그인 후 실행해야 할 명령어를 작성할 수 있음</li>
</ul>
<hr>
<p>일반적으로 .profile 파일은 종료되면 재시작해야하는 작업을 수행하는 것과 같은 시스템 전체의 기본적인 설정을 정의하고, .bashrc 파일은 로그인 후 사용자가 사용 가능한 작업을 정의하는 데 사용됩니다.</p>
<h2 id="miniconda-설치">miniconda 설치</h2>
<p><code>wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh</code> 미니콘다 설치(wget)</p>
<p><code>sh ./Miniconda3-latest-Linux-x86_64.sh</code>  쉘스크립트 설치 파일 실행</p>
<p><code>source ~/.bashrc</code> 적용</p>
<ul>
<li><img src="https://velog.velcdn.com/images/jaekyu_lim/post/2dae6372-06ae-46c5-85b3-687ddcbb1f13/image.png" alt=""></li>
</ul>
<h2 id="hadoop-설치">Hadoop 설치</h2>
<p>hadoop 계정 <code>cd ~</code>에서</p>
<p><code>wget https://archive.apache.org/dist/hadoop/common/hadoop-3.2.1/hadoop-3.2.1.tar.gz</code>  하둡 파일 다운로드</p>
<p><code>tar xzf hadoop-3.2.1.tar.gz</code> hadoop tar 파일 압축 해제</p>
<p><code>mv ./hadoop-3.2.1 ./hadoop</code> 파일명 hadoop으로 변경</p>
<h3 id="하둡-설정">하둡 설정</h3>
<blockquote>
<p>hadoop의 conf 디렉토리에 있는 파일들을 수정하였다.
아래 파일들을 설정했다.</p>
</blockquote>
<ul>
<li><img src="https://velog.velcdn.com/images/jaekyu_lim/post/b70626de-ab12-42fa-a8e8-51bd06826d1d/image.png" alt=""></li>
</ul>
<hr>
<p><code>cd ~</code>에서 다른 노트북에도 hadoop 설치를 위해 파일 scp를 이용해 복사</p>
<p><code>scp -r ./hadoop namenode:/home/hadoop/</code> </p>
<blockquote>
<ul>
<li>쉘  스크립트 만들어보기</li>
</ul>
</blockquote>
<ul>
<li><p>여러 노드에 명령어를 계속치지 않으려 쉘 스크립트로 만들어 보았다. (feat. chatGPT)</p>
<pre><code class="language-bash">#!/bin/bash
</code></pre>
</li>
</ul>
<p>local_directory=&quot;./hadoop&quot;  # 로컬 디렉토리 경로
remote_hosts=(&quot;datanode2&quot; &quot;datanode3&quot; &quot;datanode4&quot; &quot;datanode5&quot; &quot;client&quot; &quot;namenode&quot; &quot;secondnode&quot;)  # 원격 호스트 이름 &gt; 목록
remote_directory=&quot;/home/hadoop/&quot;  # 원격 디렉토리 경로</p>
<p>for host in &quot;${remote_hosts[@]}&quot;; do
    scp -r &quot;$local_directory&quot; &quot;$host:$remote_directory&quot;
done</p>
<p>```</p>
<p>한개의 파일에 경우는 scp로 만으로도 복사가 되지만,
대량의 파일을 전송할 경우에는 -r 옵션을 사용해야 함</p>
<hr>
<h2 id="추후-진행할-작업">추후 진행할 작업</h2>
<blockquote>
<ol>
<li>pyspark 설치</li>
<li>jupyter 설치</li>
<li>HDFS에 데이터 업로드</li>
</ol>
</blockquote>
]]></description>
        </item>
        <item>
            <title><![CDATA[[Spark] 사용해보기 - 실습예제]]></title>
            <link>https://velog.io/@jaekyu_lim/Spark-%EC%82%AC%EC%9A%A9%ED%95%B4%EB%B3%B4%EA%B8%B0-%EC%8B%A4%EC%8A%B5%EC%98%88%EC%A0%9C</link>
            <guid>https://velog.io/@jaekyu_lim/Spark-%EC%82%AC%EC%9A%A9%ED%95%B4%EB%B3%B4%EA%B8%B0-%EC%8B%A4%EC%8A%B5%EC%98%88%EC%A0%9C</guid>
            <pubDate>Sun, 09 Jul 2023 15:17:58 GMT</pubDate>
            <description><![CDATA[<h2 id="스파크-세션-열기">스파크 세션 열기</h2>
<pre><code class="language-sql">from pyspark.sql import SparkSession</code></pre>
<pre><code class="language-sql">from pyspark.sql.types import StructField, StructType, StringType, LongType
</code></pre>
<blockquote>
</blockquote>
<p><strong>스키마</strong>는 DataFrame의 컬럼명과 데이터 타입을 정의 </p>
<ul>
<li>CSV나 JSON 같은 일반 텍스트 파일을 사용하면 다소 느릴 수 있음 </li>
<li>하지만 Long 데이터 타입을 Integer 데이터 타입으로 잘못 인식하는 등 정밀도 문제가 발생할 수 있음 </li>
<li>따라서 운영 환경에서 추출, 변환, 적재를 수행하는 ETL 작업에 스파크를 사용한다면 직접 스키마를 정의해야 함</li>
<li>ETL 작업 중에 데이터 타입을 알기 힘든 CSV나 JSON 등의 데이터소스를 사용하는 경우 스키마 추론 과정에서 읽어 들인 샘플 데이터의 타입에 따라 스키마를 결정해 버릴 수 있음</li>
</ul>
<blockquote>
<p><strong>스키마</strong>는 여러 개의 StructField 타입 필드로 구성된 StructType 객체 </p>
</blockquote>
<ul>
<li>StructField는 이름, 데이터 타입, 컬럼이 값이 없거나 null 일 수 있는지 지정하는 불리언값을 가짐 </li>
<li>필요한 경우 컬럼과 관련된 메타데이터를 지정할 수 있음 </li>
</ul>
<blockquote>
<ul>
<li>pyspark.sql.types: PySpark에서 구조화된 데이터를 다루기 위한 데이터 타입 및 스키마를 정의하는 모듈</li>
</ul>
</blockquote>
<ul>
<li>StructField: 구조화된 데이터의 필드 또는 열(column)을 정의하는 클래스입니다. 각 필드는 이름(name), 데이터 타입(dataType), 널 허용 여부(nullable) 등의 속성을 가지고 있다.</li>
<li>StructType: 구조화된 데이터의 스키마(schema)를 정의하는 클래스입니다. 스키마는 여러 StructField 객체로 구성되어 있으며, 각 필드의 이름과 데이터 타입을 정의하여 DataFrame의 열 구조를 설명</li>
<li>StringType: 문자열 데이터 타입을 나타내는 클래스입니다. PySpark에서 문자열은 StringType으로 표현</li>
<li>LongType: 정수형 데이터 타입 중 하나인 Long을 나타내는 클래스입니다. PySpark에서 64비트 정수는 LongType으로 표현</li>
</ul>
<h2 id="hdfs에-csv파일에-밀어넣기">HDFS에 csv파일에 밀어넣기</h2>
<p>filezila를 통해 강사님이 주신 json폴더에 csv파일 모두 다운로드 받은 후
홈에다가 옮기기</p>
<p>홈에 옮긴 후
<code>cd json</code></p>
<p><code>hdfs dfs -put *.csv ./json</code> csv파일 밀어넣기</p>
<p><code>hdfs dfs -ls ./json</code> 잘 들어갔는지 확인</p>
<h2 id="spark-실습해보기">Spark 실습해보기</h2>
<pre><code class="language-sql">myManualSchema = StructType([
  StructField(&quot;DEST_COUNTRY_NAME&quot;, StringType(), True),
  StructField(&quot;ORIGIN_COUNTRY_NAME&quot;, StringType(), True),
  StructField(&quot;count&quot;, LongType(), False, metadata={&quot;hello&quot;:&quot;world&quot;})
])
df = spark.read.format(&quot;json&quot;).schema(myManualSchema)\
  .load(&quot;/json/2015-summary.json&quot;)</code></pre>
<blockquote>
<p>StructField 클래스는 PySpark에서 구조화된 데이터의 필드 또는 열(column)을 정의하는 데 사용됩니다. 각 필드는 이름(name), 데이터 타입(dataType), 널 허용 여부(nullable) 등의 속성을 가지고 있다.</p>
</blockquote>
<ul>
<li>name: 필드의 이름을 나타내는 문자열. 필드의 이름은 해당 필드를 식별하는 데 사용</li>
<li>dataType: 필드의 데이터 타입을 나타내는 클래스. 데이터 타입은 PySpark의 pyspark.sql.types 모듈에서 제공하는 클래스를 사용하여 지정할 수 있습니다. 예를 들어, 문자열 데이터 타입은 StringType()으로, 정수형 데이터 타입은 LongType()으로 지정할 수 있다.</li>
<li>nullable: 필드의 널 허용 여부를 나타내는 부울 값입니다. 기본값은 True이며, True로 설정되면 해당 필드는 널(null) 값을 가질 수 있다.</li>
<li>metadata: 필드의 메타데이터를 나타내는 맵입니다. 메타데이터는 추가 정보를 필드에 연결하기 위해 사용될 수 있습니다. 예를 들어, 필드에 대한 설명이나 태그 등의 정보를 메타데이터로 설정할 수 있습니다. 메타데이터는 선택적이며, 필요에 따라 사용할 수 있다.<pre><code class="language-sql"># 기본폼
StructField(name, dataType, nullable=True, metadata=None)</code></pre>
</li>
</ul>
<blockquote>
<p><strong>metadata={&quot;hello&quot;:&quot;world&quot;}</strong>의 해석
메타데이터는 일반적으로 키-값 형태의 맵(map)으로 표현
여기서는 hello가 키, world가 값이다. 키와 값은 문자열 형태로 지정함
메타 데이터의 키와 값은 개발자가 데이터에 대한 부가적인 정보를 제공하기 위해 사용하는 것이므로, 어떤 값을 사용할 지는 개발자의 재량에 따라 결정됨
예제에서는 설명보단, 임의의 값을 넣었음</p>
</blockquote>
<p><code>df.explain()</code> 명령어를 입력해 데이터프레임의 실행 계획을 출력 (아래는 출력값)</p>
<blockquote>
<p>== Physical Plan ==
FileScan json [DEST_COUNTRY_NAME#0,ORIGIN_COUNTRY_NAME#1,count#2L] Batched: false, DataFilters: [], Format: JSON, Location: InMemoryFileIndex(1 paths)[hdfs://namenode:8020/json/2015-summary.json], PartitionFilters: [], PushedFilters: [], ReadSchema: struct&lt;DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:bigint&gt;</p>
</blockquote>
<p><code>df.show(5)</code> df 5건만 확인</p>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/89b6d701-4480-4b13-9b6b-2ee5792cb066/image.png" alt=""></p>
<h2 id="select와-selectexpr">Select와 SelectExpr</h2>
<blockquote>
<h3 id="select와-selectexpr-1">select와 selectExpr</h3>
</blockquote>
<ul>
<li><p>select와 selectExpr 메서드를 사용하면 데이터 테이블에 SQL을 실행하는 것처럼 DataFrame에서 SQL을 사용할 수 있음</p>
</li>
<li><p>sql과 같은 형태의 쿼리로 여러 칼럼을 선택해 출력 가능
<code>df.select(&quot;dest_country_name&quot;, &quot;origin_country_name&quot;).show(3)</code></p>
</li>
</ul>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/1ede7385-29d3-46c6-b63a-1f603c4d9376/image.png" alt=""></p>
<hr>
<pre><code class="language-sql">from pyspark.sql.functions import expr, col, column

# 각각 expr, col, column 함수를 사용하여 dest_country_name열을 선택 후 출력
df.select( expr(&quot;dest_country_name&quot;), col(&quot;dest_country_name&quot;), column(&quot;dest_country_name&quot;)).show(3)</code></pre>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/af199e0b-469b-4924-a58b-a8c40eac426b/image.png" alt=""></p>
<h2 id="칼럼명-변경-as-alias">칼럼명 변경 (as, .alias)</h2>
<pre><code class="language-sql"># dest_country_name을 destination으로 변경
df.select(expr(&quot;dest_country_name as destination&quot;)).show(2)</code></pre>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/2e9b4a4c-1e47-4476-8360-145b806eb8a1/image.png" alt=""></p>
<pre><code class="language-sql"># destination로 변경된 칼럼명을 alias를 이용해 dest_country_name로 변경
df.select(expr(&quot;dest_country_name as destination&quot;).alias(&quot;dest_country_name&quot;)).show(2)</code></pre>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/096c8bba-d678-4229-a72c-b11bfcaf9c19/image.png" alt=""></p>
<h2 id="selectexpr">selectExpr</h2>
<blockquote>
<p>selectExpr 메서드는 스파크의 진정한 능력을 보여줌 </p>
</blockquote>
<ul>
<li>새로운 DataFrame을 생성하는 복잡한 표현식을 간단하게 만드는 도구 </li>
<li>모든 유효한 비집계형(non-aggregating) SQL 구문을 지정할 수 있음 </li>
<li>단, 컬럼을 식별할 수 있어야 함 </li>
</ul>
<pre><code class="language-sql">df.selectExpr(&quot;*&quot;, &quot;(dest_country_name = origin_country_name) as withinCountry&quot;).show()</code></pre>
<blockquote>
<p>&quot;df&quot; DataFrame의 모든 열을 선택하고, &quot;dest_country_name&quot;과 &quot;origin_country_name&quot;이 동일한지 비교하여 &quot;withinCountry&quot; 열을 추가한 뒤, 결과를 출력하는 작업을 수행합니다. &quot;withinCountry&quot; 열은 동일한 국가 내에서 여행한 여부를 나타내는 불리언(Boolean) 값으로 출력</p>
</blockquote>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/dd74164a-bfd3-467e-80c4-c6a8b7ea8aa2/image.png" alt=""></p>
<pre><code class="language-sql">df.selectExpr(&quot;avg(count)&quot;, &quot;count(distinct(dest_country_name))&quot;).show(2)</code></pre>
<blockquote>
<p>&quot;df&quot; DataFrame에서 &quot;count&quot; 열의 평균과 &quot;dest_country_name&quot; 열의 고유한 값 개수를 계산한 뒤, 결과를 출력하는 작업을 수행</p>
</blockquote>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/6283d61b-4958-4405-92a1-c8f14e8ce6b4/image.png" alt=""></p>
<h2 id="spark-데이터-타입으로-변환하기">Spark 데이터 타입으로 변환하기</h2>
<blockquote>
<ul>
<li>새로운 컬럼이 아닌 명시적인 값을 스파크에 전달해야 함 </li>
</ul>
</blockquote>
<ul>
<li>명시적인 값은 상수 값일 수 있고, 추후 비교에 사용할 무언가가 될 수도 있음 </li>
<li>이때 literal을 사용 </li>
<li>리터럴은 프로그래밍 언어의 리터럴값을 스파크가 이해할 수 있는 값으로 변환 </li>
</ul>
<pre><code class="language-sql">df.withColumn(&quot;numberOne&quot;, lit(1)).show(2)</code></pre>
<blockquote>
<p>&quot;df&quot; DataFrame에 &quot;numberOne&quot;이라는 새로운 열을 추가하고, 해당 열의 모든 행에 값 1을 할당한 뒤, 결과를 출력하는 작업을 수행
withColumn 메서드 - DataFrame에 신규 컬럼을 추가하는 방법
.lit 함수를 사용하여 1이라는 상수 할당하고, 출력</p>
</blockquote>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/199565a3-ff40-43b3-bc7e-887f223880a0/image.png" alt=""></p>
]]></description>
        </item>
        <item>
            <title><![CDATA[[Spark] 사용해보기 - DB와 데이터 주고 받기]]></title>
            <link>https://velog.io/@jaekyu_lim/Spark-%EC%82%AC%EC%9A%A9%ED%95%B4%EB%B3%B4%EA%B8%B0-DB%EC%99%80-%EB%8D%B0%EC%9D%B4%ED%84%B0-%EC%A3%BC%EA%B3%A0-%EB%B0%9B%EA%B8%B0</link>
            <guid>https://velog.io/@jaekyu_lim/Spark-%EC%82%AC%EC%9A%A9%ED%95%B4%EB%B3%B4%EA%B8%B0-DB%EC%99%80-%EB%8D%B0%EC%9D%B4%ED%84%B0-%EC%A3%BC%EA%B3%A0-%EB%B0%9B%EA%B8%B0</guid>
            <pubDate>Sun, 09 Jul 2023 10:40:40 GMT</pubDate>
            <description><![CDATA[<h2 id="virtualbox-실행">VirtualBox 실행</h2>
<blockquote>
<p>VirtualBox를 켜서 가상머신 4개 창을 계속 띄우니 화면이 복잡함
<strong>bat파일을 통해 창을 백그라운드에서 실행</strong></p>
</blockquote>
<blockquote>
<p><strong>bat 파일이란?</strong></p>
</blockquote>
<ul>
<li>BAT 파일은 Batch 파일의 줄임말로, 윈도우 기반 컴퓨터에서 실행되는 스크립트 파일 </li>
<li>이 파일 형식은 .bat 확장자를 가지며, 한 개 이상의 명령어를 포함하여 일련의 작업을 자동화할 수 있다.</li>
</ul>
<p>메모장에 다음과 같이 입력 후 hadoop.bat로 저장</p>
<ul>
<li>가상머신 이름 (hadoop client / hadoop namenode / hadoop secondnode / hadoop datanode3)</li>
</ul>
<pre><code>&quot;C:\Program Files\Oracle\VirtualBox\VBoxManage.exe&quot; startvm &quot;hadoop client&quot; --type headless
&quot;C:\Program Files\Oracle\VirtualBox\VBoxManage.exe&quot; startvm &quot;hadoop namenode&quot; --type headless
&quot;C:\Program Files\Oracle\VirtualBox\VBoxManage.exe&quot; startvm &quot;hadoop secondnode&quot; --type headless
&quot;C:\Program Files\Oracle\VirtualBox\VBoxManage.exe&quot; startvm &quot;hadoop datanode3&quot; --type headless</code></pre><p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/5fd10f75-d007-457a-8529-450157aaf847/image.png" alt=""></p>
<hr>
<h2 id="superputty-접속-client">Superputty 접속 (client)</h2>
<p>192.168.56.11 client 접속</p>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/2ae498c1-6300-49e0-b312-4862e7fc0f85/image.png" alt=""></p>
<p>hadoop 계정에서 별칭으로 지정한 명령어 실행</p>
<p><code>start_dfs</code>
<code>start_yarn</code>
<code>start_mr</code></p>
<p>start-dfs.sh - 분산 파일 시스템(Distributed File System)을 시작
start-yarn.sh - YARN(Yet Another Resource Negotiator) 시작
start-mr.sh - MapReduce 작업 이력 서버 시작</p>
<blockquote>
<h3 id="용어정리-">*<em>용어정리 *</em></h3>
</blockquote>
<hr>
<p><strong>YARN</strong>
YARN은 Hadoop 클러스터에서 리소스 관리와 작업 스케줄링을 담당
<strong>HDFS</strong>
HDFS는 대용량의 데이터를 분산 저장하는 분산 파일 시스템
<strong>MapReduce</strong>
MapReduce는 데이터 처리 작업을 분산하여 실행하는 분산 프로그래밍 모델</p>
<h2 id="spark-실행">Spark 실행</h2>
<p><code>pyspark --master yarn --num-executors 3</code> 명령어 실행</p>
<p><code>client:8888</code> jupyter notebook 접속</p>
<pre><code class="language-sql"># 스파크 세션 띄우기
from pyspark.sql import SparkSession</code></pre>
<pre><code class="language-sql">spark = SparkSession.builder.appName(&quot;PySpark to MySQL&quot;).config(&quot;spark.jars&quot;, &quot;mysql-connector-java-8.0.21.jar&quot;).getOrCreate()</code></pre>
<blockquote>
<p><strong>코드 해석</strong></p>
</blockquote>
<ul>
<li><code>SparkSession.builder</code> : SparkSession을 생성하기 위한 빌더 객체를 생성</li>
<li><code>appName(&quot;PySpark to MySQL&quot;)</code> : Spark 애플리케이션의 이름을 &quot;PySpark to MySQL&quot;로 지정</li>
<li><code>config(&quot;spark.jars&quot;, &quot;mysql-connector-java-8.0.21.jar&quot;)</code> : Spark의 환경 설정 중 &quot;spark.jars&quot; 옵션을 설정하여, &quot;mysql-connector-java-8.0.21.jar&quot;라는 JAR 파일을 Spark에 제공<ul>
<li>JAR 파일은 MySQL과의 연결을 위해 필요한 JDBC 드라이버</li>
</ul>
</li>
<li><code>getOrCreate()</code> : 설정된 옵션을 기반으로 SparkSession 객체를 생성하거나, 이미 존재하는 SparkSession 객체를 반환함. 이미 생성된 SparkSession이 있는 경우 새로운 SparkSession을 생성하지 않고 기존의 SparkSession을 사용</li>
</ul>
<pre><code class="language-sql">df = spark.read.format(&quot;csv&quot;)\
  .option(&quot;header&quot;, &quot;true&quot;)\
  .option(&quot;inferSchema&quot;, &quot;true&quot;)\
  .load(&quot;/data/retail-data/*&quot;)</code></pre>
<blockquote>
<p><strong>코드 해석</strong></p>
</blockquote>
<ul>
<li><code>spark.read.format(&quot;csv&quot;)</code> : SparkSession을 사용하여 CSV 파일을 읽기 위한 데이터소스 형식을 설정</li>
<li><code>.option(&quot;header&quot;, &quot;true&quot;)</code> : CSV 파일의 첫 번째 줄을 헤더로 처리하도록 옵션을 설정</li>
<li><code>.option(&quot;inferSchema&quot;, &quot;true&quot;)</code> : 데이터프레임의 스키마를 자동으로 추론하도록 옵션을 설정</li>
<li><code>.load(&quot;/data/retail-data/*&quot;)</code> : 지정된 경로에서 CSV 파일을 로드하여 데이터프레임을 생성 <code>/data/retail-data/*</code>는 <code>/data/retail-data/</code> 디렉토리에 있는 모든 CSV 파일을 로드하도록 지정</li>
</ul>
<pre><code class="language-sql"># PySpark에서 데이터프레임을 임시 뷰로 등록하는 코드
df.createOrReplaceTempView(&quot;dfTable&quot;)</code></pre>
<pre><code class="language-sql"># dfTable에 있는 모든 칼럼을 보여줘 (상위 5개만 표시)
spark.sql(&quot;select * from dfTable&quot;).show(5)</code></pre>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/d7041373-1f1b-4177-904f-ac33987d92b6/image.png" alt=""></p>
<h2 id="데이터-베이스와-자료-교환해보기">데이터 베이스와 자료 교환해보기</h2>
<blockquote>
<p><strong>JDBC(Java Database Connectivity)란?</strong></p>
</blockquote>
<ul>
<li>JDBC는 Java 프로그래밍 언어를 사용하여 데이터베이스와 연결하고 상호 작용하기 위한 API(응용 프로그래밍 인터페이스)입니다. JDBC를 사용하면 Java 애플리케이션에서 다양한 데이터베이스 시스템에 접속하여 데이터베이스와 통신할 수 있다.</li>
<li>JDBC를 사용하면 데이터베이스 관리 시스템(DBMS)에 대한 특정 드라이버를 로드하고, 연결을 설정하고, SQL 쿼리를 실행하고, 결과를 검색하는 등의 작업을 수행할 수 있다. JDBC는 일관된 방식으로 다양한 데이터베이스와 상호 작용할 수 있도록 표준화된 API를 제공한다.</li>
</ul>
<pre><code class="language-sql"># 우리 조의 데이터 베이스의 접속 (테이블명은 battleGround)
jdbc_url = &quot;jdbc:mysql:// [ip주소]:3306/[테이블명]?serverTimezone=UTC&quot;  # 데이터베이스 URL
table_name = &quot;jk&quot;  # 테이블 이름
properties = {
    &quot;user&quot;: &quot;[사용자명]&quot;,  # MySQL 사용자명
    &quot;password&quot;: &quot;[비밀번호]&quot;  # MySQL 비밀번호
}</code></pre>
<blockquote>
<p><strong>코드해석</strong></p>
</blockquote>
<ul>
<li>jdbc_url: 데이터베이스의 URL을 나타냅니다. 주어진 URL은 jdbc:mysql://[IP주소]:3306/battleGround?serverTimezone=UTC로, MySQL 데이터베이스에 접속하기 위한 URL입니다. [IP 주소]는 데이터베이스 서버의 IP 주소이고, 3306은 MySQL의 기본 포트 번호입니다. battleGround은 데이터베이스의 이름을 나타냅니다. serverTimezone=UTC는 데이터베이스 서버의 시간대를 UTC로 설정</li>
<li>table_name: 작업을 수행할 테이블의 이름을 나타냅니다. 주어진 코드에서는 jk라는 테이블을 사용합니다.</li>
<li>properties: 데이터베이스 연결을 위한 추가 속성을 포함하는 맵입니다. 여기서는 user와 password를 설정하고 있습니다. user는 MySQL의 사용자명을 나타내며, password는 해당 사용자의 비밀번호를 나타냅니다.</li>
<li>이러한 설정을 사용하여 코드에서는 주어진 JDBC URL을 통해 MySQL 데이터베이스에 연결하고, jk라는 테이블에서 작업을 수행할 수 있게 된다.</li>
</ul>
<h3 id="dataframe을-db에-저장하기">DataFrame을 DB에 저장하기</h3>
<pre><code class="language-sql"># 앞서 불러왔던 retail-data에 모든 csv를 battleGround DB에 만든 jk테이블로 저장
df.write.jdbc(url=jdbc_url, table=table_name, mode=&quot;append&quot;, properties=properties)</code></pre>
<p>명령어 이후 DB 확인 (아래 사진)
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/01f16ccd-baf5-4274-b4aa-5eede170ca62/image.png" alt=""></p>
<h3 id="db에-있는-데이터-불러오기">DB에 있는 데이터 불러오기</h3>
<pre><code class="language-sql">battleGround DB에 있는 position 테이블 읽기
battle_position = spark.read.jdbc(url=jdbc_url, table=&#39;position&#39;, properties=properties)</code></pre>
]]></description>
        </item>
        <item>
            <title><![CDATA[[Vagrant] Vagrantfile로 가상머신 만들기(간단)]]></title>
            <link>https://velog.io/@jaekyu_lim/Vagrant-Vagrantfile%EB%A1%9C-%EA%B0%80%EC%83%81%EB%A8%B8%EC%8B%A0-%EB%A7%8C%EB%93%A4%EA%B8%B0%EA%B0%84%EB%8B%A8</link>
            <guid>https://velog.io/@jaekyu_lim/Vagrant-Vagrantfile%EB%A1%9C-%EA%B0%80%EC%83%81%EB%A8%B8%EC%8B%A0-%EB%A7%8C%EB%93%A4%EA%B8%B0%EA%B0%84%EB%8B%A8</guid>
            <pubDate>Wed, 05 Jul 2023 02:21:27 GMT</pubDate>
            <description><![CDATA[<blockquote>
<p><strong>Vagrant</strong>란?</p>
</blockquote>
<hr>
<p><strong>가상화(Virtualization)</strong>는 실제 운영체제 위에 가상화 소프트웨어를 설치한 후에 소프트웨어를 통해 하드웨어(CPU, Memory, Disk, NIC 등)를 에뮬레이션한 후에 이 위에 운영체제(Guest OS)를 설치하는 것을 의미합니다. 가상화를 해 주는 소프트웨어를 하이퍼바이저(Hypervisor) 라고 하며 종류로는 이 책에서 사용하는 VirtualBox, 그리고 VMWare, Xen 등이 있습니다.</p>
<ul>
<li>에뮬레이션은 컴퓨터 또는 기타 여러 주변 장치의 기능을 다른 컴퓨터에서 구현하는 것을 의미</li>
<li>프로그래밍 언어: 루비</li>
</ul>
<h2 id="vagrant-설치">Vagrant 설치</h2>
<p>아래 사이트에 접속하여 윈도우 버전을 설치
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/61d2753c-6c2d-4d7c-9d80-4d9c6e9aec07/image.png" alt=""></p>
<h2 id="cmd창-열기">CMD창 열기</h2>
<p><code>mkdir test</code> test파일을 만들어
<code>vagrant init</code> vagrant 초기화</p>
<p>Vagrantfile 파일을 열어서 <code>config.vm.box =</code> 에 내용을
깔고자 하는 centos7 입력 후 저장</p>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/281d8e77-dfd2-4233-9d88-15b6f00f62e0/image.png" alt=""></p>
<p>이후
<code>vagrant up</code> 명령어를 입력하면
centos 7 가상머신이 하나 만들어짐</p>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/35dd5376-e2d3-4647-a406-56e58558309a/image.png" alt=""></p>
<p><code>vagrant ssh</code> 가상머신 접속 </p>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/8eb4205c-3ce0-4ac8-9a95-7be5ac360cb1/image.png" alt=""></p>
<p><code>vagrant halt</code> 가상 머신 종료</p>
]]></description>
        </item>
        <item>
            <title><![CDATA[[Hadoop] 로컬 환경에서 만들어보기(2) feat.Virtual Box]]></title>
            <link>https://velog.io/@jaekyu_lim/Hadoop-%EB%A1%9C%EC%BB%AC-%ED%99%98%EA%B2%BD%EC%97%90%EC%84%9C-%EB%A7%8C%EB%93%A4%EC%96%B4%EB%B3%B4%EA%B8%B02-feat.Virtual-Box</link>
            <guid>https://velog.io/@jaekyu_lim/Hadoop-%EB%A1%9C%EC%BB%AC-%ED%99%98%EA%B2%BD%EC%97%90%EC%84%9C-%EB%A7%8C%EB%93%A4%EC%96%B4%EB%B3%B4%EA%B8%B02-feat.Virtual-Box</guid>
            <pubDate>Wed, 28 Jun 2023 02:26:48 GMT</pubDate>
            <description><![CDATA[<h2 id="host-이름-변경">host 이름 변경</h2>
<p><code>hostnamectl set-hostname client</code> clinet로 이름 변경</p>
<ul>
<li><img src="https://velog.velcdn.com/images/jaekyu_lim/post/c00686dd-85c8-4d68-8311-d946ebd075d5/image.png" alt=""></li>
</ul>
<p><code>sestatus</code>  운영체제 보안 설정 확인 </p>
<ul>
<li><img src="https://velog.velcdn.com/images/jaekyu_lim/post/b1a99a60-e4c5-4cbf-941d-b327366756bb/image.png" alt=""></li>
</ul>
<p><code>sudo sed -i &#39;s/^SELINUX=enforcing$/SELINUX=permissive/&#39; /etc/selinux/config</code></p>
<blockquote>
<p><strong>chatGPT 명령어정리</strong>
<code>sudo</code>: 명령어를 관리자 권한으로 실행하기 위해 sudo를 사용합니다. 사용자에게 관리자 액세스 권한이 필요한 경우 비밀번호를 입력하라는 프롬프트가 나타날 수 있습니다.
<code>sed -i &#39;s/^SELINUX=enforcing$/SELINUX=permissive/&#39; /etc/selinux/config</code>: sed는 텍스트 스트림에서 패턴을 찾아 다른 패턴으로 치환하는 유틸리티입니다. 이 명령어는 /etc/selinux/config 파일에서 &quot;SELINUX=enforcing&quot;이라는 패턴을 찾아 &quot;SELINUX=permissive&quot;로 변경합니다. -i 옵션은 변경된 내용을 원본 파일에 바로 적용하도록 지시합니다.</p>
</blockquote>
<blockquote>
<p>hadoop 계정 생성 
<code>adduser hadoop</code> 하둡 유저 생성
<code>passwd hadoop</code>  패스워드 설정</p>
</blockquote>
<blockquote>
<p><code>su hadoop</code> 하둡 유저 진입
<code>ssh-keygen -t rsa</code> RSA 알고리즘을 사용하여 SSH 키를 생성</p>
</blockquote>
<blockquote>
<p><code>cat id_rsa.pub &gt;&gt; authorized_keys</code> 
SSH 서버에서 사용자 인증을 위해 허용되는 공개키를 저장하는 파일</p>
</blockquote>
<p>처음에 들어가보면 ~가 아니라 root로 설정이 되어있는데, ~로 오도록 설정</p>
<p><code>vim ~/.bashrc</code>  편집기에서
<code>cd ~</code> 추가 후 wq</p>
<h2 id="root에서-권한-부여">root에서 권한 부여</h2>
<ul>
<li><p>visudo </p>
<p><code>:100</code> 백번째 줄로 가서
101번 라인에 아래 추가
<code>hadoop  ALL=(ALL)       ALL</code> 추가
hadoop유저에 root 권한 부여</p>
</li>
</ul>
<h2 id="hadoop-java-설치">hadoop Java 설치</h2>
<p><code>sudo yum install java-1.8.0-openjdk ant -y</code></p>
<p><code>cd /usr/lib/jvm</code>
<code>ls</code> </p>
<h3 id="자바-홈-설정">자바 홈 설정</h3>
<p><code>vim ~/.bashrc</code> </p>
<p><code>export JAVA_HOME=&quot;/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.372.b07-1.el7_9.x86_64&quot;</code>    </p>
<ul>
<li><img src="https://velog.velcdn.com/images/jaekyu_lim/post/761cf345-f1ce-4fea-a4fc-ca9646a005f0/image.png" alt=""></li>
</ul>
<h4 id="wget-설치">wget 설치</h4>
<p><code>sudo yum install wget</code> 
<code>cd ~</code> 에서</p>
<h2 id="하둡-파일-설치">하둡 파일 설치</h2>
<p><code>wget https://archive.apache.org/dist/hadoop/common/hadoop-3.2.1/hadoop-3.2.1.tar.gz</code> </p>
<p><code>tar xzf hadoop-3.2.1.tar.gz</code> 압축해제</p>
<p><code>mv ./hadoop-3.2.1 ./hadoop</code> hadoop으로 이름바꾸기</p>
<h3 id="filezilla에서">FileZilla에서</h3>
<p><code>/home/hadoop/hadoop/etc/hadoop</code> 경로에
강사님이 올려주신 5개 파일 넣기 
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/a67cd9f9-5109-43d4-a45f-4ec6045b87bb/image.png" alt=""></p>
<p><code>vim ~/.bashrc</code> 에서 아래 내용 추가
hadoop설정 - 전에 ec2환경에서 만든 하둡 환경 복사해서 만듦</p>
<pre><code class="language-bash">export SPARK_HOME=/home/hadoop/spark
export HADOOP_HOME=/home/hadoop/hadoop
export HIVE_HOME=/home/hadoop/hive
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export HADOOP_INSTALL=$HADOOP_HOME
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
export HADOOP_YARN_HOME=$HADOOP_HOME
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export HADOOP_OPTS=&quot;-Djava.library.path=$HADOOP_HOME/lib/native&quot;
export PATH=$PATH:$HADOOP_HOME/sbin:$HADOOP_HOME/bin:$HIVE_HOME/bin</code></pre>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/c9b57755-1d6e-40a2-8414-ac967d969a4f/image.png" alt=""></p>
<h2 id="하둡-이미지를-굽기-위해-종료">하둡 이미지를 굽기 위해 종료</h2>
<ul>
<li>현재 바로 off 
<code>sudo shutdown -h now</code> </li>
</ul>
<h2 id="이미지-복제-방법-1">이미지 복제 방법 1</h2>
<blockquote>
<p>하둡을 누른 후 복제를 이용하여 복사</p>
</blockquote>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/280a4e37-a7ff-4339-b1fb-333edea45bb7/image.png" alt=""></p>
<h2 id="이미지-복제-방법-2">이미지 복제 방법 2</h2>
<blockquote>
<p> OCI로 내보내기 - 설정 그대로 해서 생성
가상 시스템 가져오기 해서 설정 내보낸 거 불러오기</p>
</blockquote>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/e7e5e9ec-dc48-4103-a515-aa3ca287e134/image.png" alt=""></p>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/d3a9776e-b622-4496-adca-261f329664c2/image.png" alt=""></p>
<p>방법 2가지 중 하나 선택하여 4개 만들기
각각의 이름 Client, namenode, Secondnode, Datanode3으로 지정하기</p>
]]></description>
        </item>
        <item>
            <title><![CDATA[[Hadoop] 로컬 환경에서 만들어보기(1) feat.Virtual Box]]></title>
            <link>https://velog.io/@jaekyu_lim/Virtual-Box-%EB%A1%9C%EC%BB%AC%ED%99%98%EA%B2%BD%EC%97%90%EC%84%9C-%ED%95%98%EB%91%A1-%EA%B5%AC%EC%B6%95%ED%95%B4%EB%B3%B4%EA%B8%B01</link>
            <guid>https://velog.io/@jaekyu_lim/Virtual-Box-%EB%A1%9C%EC%BB%AC%ED%99%98%EA%B2%BD%EC%97%90%EC%84%9C-%ED%95%98%EB%91%A1-%EA%B5%AC%EC%B6%95%ED%95%B4%EB%B3%B4%EA%B8%B01</guid>
            <pubDate>Tue, 27 Jun 2023 07:03:05 GMT</pubDate>
            <description><![CDATA[<h2 id="virtual-box-설치">Virtual Box 설치</h2>
<blockquote>
<p>실행 시 오류 메세지를 보고 설치
c++ 2019 redistributable 검색 후 파일 다운받기</p>
</blockquote>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/f2939466-6ca0-4fcc-8056-d71d9cf09f7a/image.png" alt=""></p>
<h2 id="centos-설치">CentOS 설치</h2>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/d1c8ec37-800e-4257-844e-4fc1374eeee3/image.png" alt=""></p>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/c35c0fd1-15e4-458b-8e19-32d9dad6fdbf/image.png" alt=""></p>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/a080cd92-68e8-4158-ba3d-d96e8b70cbd9/image.png" alt=""></p>
<h2 id="virtual-box-실행">Virtual Box 실행</h2>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/4375f634-cbbd-48e6-a1f0-23b1b5926e27/image.png" alt=""></p>
<h2 id="하둡-설치를-위한-설정하기">하둡 설치를 위한 설정하기</h2>
<blockquote>
<p><strong><em>가상 머신 만들기</em></strong>
이름 hadoop</p>
</blockquote>
<p>iso image 다운로드 받은거 넣기 </p>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/396cb0b2-8c48-4175-a3be-577ed131ba1f/image.png" alt=""></p>
<blockquote>
<p><strong>ip addr</strong></p>
</blockquote>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/3c70bcbe-5fa2-4ab6-9949-8dc5fef162bb/image.png" alt=""></p>
<h2 id="vim-설치">vim 설치</h2>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/bf5e20db-8cbf-44fd-90b3-5a711ced12fd/image.png" alt=""></p>
<h2 id="네트워크-설정">네트워크 설정</h2>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/719f66e9-7427-4666-b112-285b552393b3/image.png" alt=""></p>
<h3 id="vim-ifcfg-enp0s8">vim ifcfg-enp0s8</h3>
<blockquote>
<p><strong>고정ip 설정</strong>
아래 내용 추가
수정한 부분 - 노란색</p>
</blockquote>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/538bb1cb-a24a-4f00-a6e6-bda516b49368/image.png" alt=""></p>
<blockquote>
<p><strong>명령어정리 ft. chatGPT</strong></p>
</blockquote>
<ul>
<li>&quot;bootproto=none&quot;을 설정하면 해당 시스템은 네트워크 부팅 프로토콜을 사용하지 않는다. 즉, 시스템은 로컬 저장 장치에서 운영체제를 부팅하여 시작하게 된다. 이 설정은 일반적으로 개별 컴퓨터에서 사용되며, 네트워크 부팅이 필요하지 않은 경우에 유용하다.</li>
<li>IPADDR = 192.168.56.11:
이 설정은 해당 네트워크 인터페이스에 할당된 IP 주소를 나타냅니다. 여기서는 192.168.56.11이라는 IP 주소가 할당되었다는 것을 의미합니다. 이 IP 주소는 해당 네트워크에서 해당 인터페이스를 식별하는 데 사용됩니다.</li>
<li>NETMASK = 255.255.255.0:
서브넷 마스크는 네트워크 ID와 호스트 ID를 구분하기 위해 사용되는 값입니다. 여기서는 255.255.255.0이라는 서브넷 마스크가 설정되었습니다. 이것은 해당 네트워크의 IP 주소에서 처음 24비트가 네트워크 ID를 나타내고, 나머지 8비트가 호스트 ID를 나타내는 것을 의미합니다.</li>
<li>GATEWAY = 192.168.56.1:
게이트웨이는 네트워크 간에 데이터를 전송하는 데 사용되는 네트워크 장치입니다. 여기서는 192.168.56.1이라는 IP 주소가 게이트웨이로 설정되었습니다. 이것은 해당 네트워크에서 다른 네트워크로 데이터를 전송할 때 패킷이 이 IP 주소를 통해 라우팅되도록 설정되었다는 것을 의미합니다.</li>
<li>&quot;ONBOOT=YES&quot;를 설정하면 해당 네트워크 인터페이스가 자동으로 활성화되어 네트워크 서비스에 연결됩니다. 이렇게 하면 사용자가 수동으로 인터페이스를 활성화하지 않고도 시스템이 부팅될 때마다 네트워크 연결이 자동으로 설정됩니다.</li>
</ul>
<p>설정후에     wq 저장</p>
<blockquote>
<p>ip addr을 보면 enp0s8에 설정해둔 192.168.56.11/24가 설정돼있는걸 확인할 수 있음
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/7f15565a-eaec-4c0a-a6ec-73ed0135eaca/image.png" alt=""></p>
</blockquote>
<blockquote>
<p>ping 명령어를 통해 통신되는거 확인
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/0e477e7d-f387-411c-9c3f-ba7b9c5077b6/image.png" alt=""></p>
</blockquote>
<blockquote>
<p>putty 접속
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/bd3f0b04-ac40-43c5-ac6b-a7616edd9680/image.png" alt=""></p>
</blockquote>
<blockquote>
<p><strong><em>용어 정리</em></strong></p>
</blockquote>
<ul>
<li>IP 주소 (IP address):
IP 주소는 인터넷 프로토콜 (IP)을 사용하는 컴퓨터나 네트워크 장치를 식별하는 데 사용되는 숫자로 된 주소입니다. IP 주소는 네트워크 상에서 각각의 기기에 고유하게 할당됩니다. IP 주소는 IPv4 (32비트)와 IPv6 (128비트) 두 가지 버전이 있습니다.</li>
<li>서브넷 마스크 (Subnet Mask):
서브넷 마스크는 IP 주소를 네트워크 부분과 호스트 부분으로 구분하는 데 사용되는 숫자로 된 값입니다. 서브넷 마스크는 IP 주소의 네트워크 ID 부분을 나타내는 역할을 합니다. 예를 들어, 192.168.0.0/24의 경우, 24비트 서브넷 마스크를 사용하고 있으며, 처음 24비트가 네트워크 ID를 나타내고 나머지 8비트가 호스트 ID를 나타냅니다.</li>
<li>게이트웨이 (Gateway):
게이트웨이는 네트워크 간에 데이터를 전송하는 데 사용되는 네트워크 장치입니다. 게이트웨이는 패킷을 받아서 목적지 네트워크로 전달하거나 다른 네트워크로 경로를 설정하는 역할을 합니다. 일반적으로 게이트웨이는 네트워크의 첫 번째 IP 주소로 설정되며, 로컬 네트워크 외부로 나가는 패킷을 전달하는 역할을 수행합니다.</li>
<li>CIDR 표기법 (CIDR notation):
CIDR 표기법은 IP 주소와 네트워크 세그먼트를 표현하는 데 사용되는 표기법입니다. 예를 들어, &quot;192.168.0.0/24&quot;는 IP 주소가 192.168.0.0이며, 24비트의 서브넷 마스크를 가지는 네트워크를 의미합니다. CIDR 표기법은 IP 주소와 서브넷 마스크를 조합하여 네트워크를 명확하게 식별하는 데 도움을 줍니다.</li>
</ul>
]]></description>
        </item>
        <item>
            <title><![CDATA[[Spark] 사용해보기(2)]]></title>
            <link>https://velog.io/@jaekyu_lim/Spark-%EC%82%AC%EC%9A%A9%ED%95%B4%EB%B3%B4%EA%B8%B02</link>
            <guid>https://velog.io/@jaekyu_lim/Spark-%EC%82%AC%EC%9A%A9%ED%95%B4%EB%B3%B4%EA%B8%B02</guid>
            <pubDate>Thu, 22 Jun 2023 03:27:18 GMT</pubDate>
            <description><![CDATA[<h2 id="준비">준비</h2>
<p><code>putty</code> 접속</p>
<ul>
<li><p>순서대로 실행
<code>su hadoop</code>
<code>start_dfs</code>
<code>start_yarn</code>
<code>start_mr</code></p>
</li>
<li><p>Spark 실행
<code>nohup pyspark --master yarn --num-executors 3 &amp;</code>
웹사이트에 <code>client:8888</code>로 접속 (client가 탄력적 ip가 아니라면 ec2에서 퍼블릭 Ipv4 주소 복사후 <code>:8888</code> 해주기</p>
</li>
</ul>
<p>주피터 노트북 켠 후 아래 명령어 입력 ↓
<code>from pyspark.sql import SparkSession</code></p>
<p>명령어 입력 후 client:4040 접속시 아래 화면 볼 수 있음</p>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/67cc29f4-325f-4a49-b4e4-ded1bf1b98bf/image.png" alt=""></p>
<hr>
<blockquote>
<p>실행중인 프로세스 확인해보기
<code>ssh [NodeName] jps</code>
namenode, secondnode, datanode3 확인해보면 아래와 같이 실행되고 있음</p>
</blockquote>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/f3748f7f-d84f-445c-aac7-6a8e278b94ea/image.png" alt=""></p>
<hr>
<blockquote>
<p><strong><em>스파크 아키텍처(Spark Architecture)</em></strong>
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/9b8e217e-8fc4-40de-ac0a-3c92cc27d384/image.png" alt=""></p>
</blockquote>
<hr>
<h2 id="데이터셋-준비">데이터셋 준비</h2>
<p><code>cd ~</code> 에서
<code>wget https://mydatahive.s3.ap-northeast-2.amazonaws.com/mnm_dataset.csv</code> 파일 다운로드</p>
<blockquote>
<ul>
<li>hdfs dfs 별명 붙이기 (계속 적을 때 간단히 사용하기 위해서)
<code>vim ~/.bashrc</code> 
<code>alias hd=&quot;hdfs dfs&quot;</code> 약어 입력해주기
<code>source ~/.bashrc</code> 적용 
<code>hd -ls /</code> 확인</li>
</ul>
</blockquote>
<p><code>hd -put mnm_dataset.csv /mydata/</code> 
로컬 파일 시스템에서 Hadoop 분산 파일 시스템(HDFS)로 파일을 업로드하기 위해 사용되는 HDFS 명령어</p>
<p><code>hd -ls /mydata</code> 파일 업로드 확인</p>
<hr>
<h2 id="jupyter-notebook">Jupyter Notebook</h2>
<blockquote>
<ul>
<li>from pyspark.sql import SparkSession 스파크세션 실행</li>
</ul>
</blockquote>
<ul>
<li>mnm_df = spark \<pre><code>  .read \
  .format(&quot;csv&quot;) \
  .option(&quot;header&quot;, &quot;true&quot;) \
  .option(&quot;inferSchema&quot;, &#39;true&#39;) \
  .load(&quot;/mydata/mnm_dataset.csv&quot;)</code></pre>read format - csv 형식의 파일 불러오기 / option - 첫줄이 헤더, 스키마는 스파크가 알아서 설정 / load 뒤에는 경로</li>
</ul>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/27d6fb45-c724-461d-ac09-2a555dbbea36/image.png" alt=""></p>
<blockquote>
<p><strong><em>Spark의 특징 지연연산/액션</em></strong>
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/3f691ece-a68b-4923-a86a-5fb1642b7e4c/image.png" alt=""></p>
</blockquote>
<ul>
<li><code>mnm_df.show(n=5)</code> <em>액션 명령을 내려야 결과 표시</em>
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/23a8730c-30ed-40db-ba86-48aee0c80b8b/image.png" alt=""></li>
</ul>
<hr>
<ul>
<li>이후 실습<blockquote>
<p>mnm_df에서 각 주의 색상별 합계를 계산한 다음, 합계를 기준으로 오름차순 정렬</p>
</blockquote>
</li>
</ul>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/031d18d8-2dc6-4454-afe3-3c7d63d6f740/image.png" alt="">
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/757df9a2-5b49-47d8-9df3-72283c200eee/image.png" alt=""></p>
<blockquote>
<p>mnm_df에서 캘리포니아(CA) 주의 색상별 합계를 계산한 다음, 합계를 기준으로 오름차순 정렬</p>
</blockquote>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/2b4880ff-c439-4d17-9e5a-86a68686a0fc/image.png" alt=""></p>
<hr>
<ul>
<li>다른 예제</li>
</ul>
<p>putty client <code>cd ~</code>에서
<code>wget https://mydatahive.s3.ap-northeast-2.amazonaws.com/fakefriends.csv</code> 예제 파일 받기</p>
<p>하둡 디렉토리에 업로드 시키기
<code>hd -put fakefriends.csv /mydata/</code></p>
<h4 id="jupyter-notebook-1">Jupyter notebook</h4>
<blockquote>
<p><strong><em>파일 불러오기</em></strong>
첫줄이 헤더는 아니고, 스키마는 알아서</p>
</blockquote>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/e1189e07-49ad-41f8-9414-f10812d88c1c/image.png" alt=""></p>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/d2f4c4f9-e6d5-4cd4-941c-b50384b7b87e/image.png" alt=""></p>
<blockquote>
<p>열이름 바꾸기 <code>selectExpr</code></p>
</blockquote>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/493a76a4-70c9-4f42-b799-908e1a576f1b/image.png" alt=""></p>
<blockquote>
<p>age별 count 평균</p>
</blockquote>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/abeb419f-3919-4e95-82e3-4bcae9a953d9/image.png" alt=""></p>
<blockquote>
<p>df_result로 변수 저장후 result.csv파일로 저장</p>
</blockquote>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/08db3fcc-735e-42b4-8e32-b8a2f6ee5ad1/image.png" alt=""></p>
<blockquote>
<p>namenode:50070 파일디렉토리에서 파일 확인</p>
</blockquote>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/912f42ab-ff7b-4e98-972f-f902d4d4b902/image.png" alt=""></p>
<blockquote>
<p><code>hd -cat /mydata/result.csv/*</code> 터미널에서 확인하기</p>
</blockquote>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/3411c471-3e3b-4ece-8b52-00351726cd82/image.png" alt=""></p>
]]></description>
        </item>
        <item>
            <title><![CDATA[[Spark] Spark 사용해보기]]></title>
            <link>https://velog.io/@jaekyu_lim/Hadoop-Spark-%EC%82%AC%EC%9A%A9%ED%95%B4%EB%B3%B4%EA%B8%B0</link>
            <guid>https://velog.io/@jaekyu_lim/Hadoop-Spark-%EC%82%AC%EC%9A%A9%ED%95%B4%EB%B3%B4%EA%B8%B0</guid>
            <pubDate>Wed, 21 Jun 2023 05:28:15 GMT</pubDate>
            <description><![CDATA[<h2 id="spark-설치">Spark 설치</h2>
<p><code>wget https://dlcdn.apache.org/spark/spark-3.2.4/spark-3.2.4-bin-hadoop3.2.tgz</code> 해서 다운로드 받기</p>
<p><code>tar xzf spark-3.2.4-bin-hadoop3.2.tgz</code> 압축 해제</p>
<p><code>mv ./spark-3.2.4-bin-hadoop3.2 ./spark</code> 이름 바꾸기</p>
<h3 id="conf-파일-업로드">conf 파일 업로드</h3>
<p><code>/home/hadoop/spark/conf</code> 경로에 강사님이 올려주신 파일 3개 넣기</p>
<ul>
<li>Fillzilla 이용
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/ee05e0ee-5aa1-42c0-8346-0e7dc8071e64/image.png" alt=""></li>
</ul>
<h3 id="bashrc-설정">bash.rc 설정</h3>
<ul>
<li>스파크 환경변수, 실행경로 설정
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/0420ecd9-44dc-4d13-a15e-532bd0bcdb81/image.png" alt=""></li>
</ul>
<p>설정후 <code>source ~/.bashrc</code>써서 적용시키기</p>
<h2 id="spark-실행">spark 실행</h2>
<p><code>pyspark</code> 명령어를 실행</p>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/e432db13-16ac-41f8-9171-4aec9b6d1207/image.png" alt=""></p>
<p>exit()로 나오고 다음 세팅 준비</p>
<h2 id="jupyter-세팅">jupyter 세팅</h2>
<pre><code class="language-bash">pip install jupyter  #주피터 설치
jupyter notebook --generate-config  </code></pre>
<ul>
<li><code>jupyter notebook --generate-config</code><ul>
<li>jupyter notebook --generate-config 명령어를 실행하면 기본값으로 설정된 Jupyter Notebook 설정 파일(jupyter_notebook_config.py)이 생성</li>
<li>이 파일이 생성되면 사용자는 해당 파일을 편집하여 Jupyter Notebook의 IP 주소, 포트 번호, 실행 모드(인라인 플롯 등), 보안 설정(비밀번호, SSL, 토큰 등) 및 기타 설정을 변경할 수 있음</li>
</ul>
</li>
</ul>
<h2 id="jupyter-실행">jupyter 실행</h2>
<p><code>ipython</code> 실행</p>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/83cce6b3-0cf4-4ff3-81b4-400155834093/image.png" alt=""></p>
<p><code>from notebook.auth import passwd</code> </p>
<ul>
<li>자신의 노트북 암호 만들기 
<code>passwd()</code> - 자신의 원하는 암호 입력 / 입력하고 나오는 문자열 복사해두기(이후에 편집기에서 넣어야 됨)
<code>vim ~/.jupyter/jupyter_notebook_config.py</code> 편집기 아래와 같이 수정</li>
</ul>
<blockquote>
<p>136번 줄  : 모든 도메인에서 접속 가능하도록 설정
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/cb8b010a-e164-4ad8-8f33-bd511e848768/image.png" alt="">
450번 줄 : 노트북 디렉토리 설정
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/8726f97e-9309-4420-bbab-be2427fcc03a/image.png" alt="">
458번 줄 : 주피터 노트북이 시작될 때 웹 브라우저를 자동으로 열지 않도록 함
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/dc846e70-5d00-4475-9839-0c067de7b02c/image.png" alt="">
469번 줄 :  주피터 노트북에 암호를 설정 (아까 복사해둔 문자열 복사)
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/7ad82da4-2b41-4080-8009-78648a20d7ec/image.png" alt=""></p>
</blockquote>
<h4 id="작업폴더-만들기">작업폴더 만들기</h4>
<p><code>mkdir workspace</code></p>
<p><code>pyspark --master yarn --num-executors 3</code></p>
<ul>
<li>pyspark --master yarn --num-executors 3 명령어는<ul>
<li>pyspark: PySpark 명령어로, PySpark 애플리케이션을 실행한다.<ul>
<li>PySpark은 Apache Spark를 사용하여 파이썬으로 효율적인 빅 데이터 처리를 가능하게 해준다.</li>
</ul>
</li>
<li>--master yarn: 이 옵션은 애플리케이션의 클러스터 관리자를 YARN(Yet Another Resource Negotiator)으로 설정합니다.<ul>
<li>YARN은 클러스터에서 자원 관리를 담당하며, 맵리듀스 및 다양한 애플리케이션 프레임워크가 하둡에서 실행될 수 있도록 한다.</li>
</ul>
</li>
<li>--num-executors 3: 이 옵션은 클러스터에서 사용할 Spark Executor의 개수를 3으로 설정한다. <ul>
<li>Executor는 Spark 애플리케이션에서 작업 작업(load data, process, store)을 수행하는 프로세스</li>
</ul>
</li>
</ul>
</li>
</ul>
<hr>
<p><code>nohup pyspark --master yarn --num-executors 3 &amp;</code></p>
<ul>
<li>nohup: 터미널이 종료된 후에도 계속 실행되도록 설정, 터미널과의 연결이 끊겨도 프로세스가 중단되지 않음</li>
<li>&amp;: 주어진 명령어를 백그라운드에서 실행</li>
</ul>
<p>위 명령어 실행 후 화면에 아래와 같은 것이 보이면 <code>http://client:8888</code>로 들어간다.</p>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/60d2cd74-ce7e-495d-8eaf-0a2d37cd7b96/image.png" alt=""></p>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/669d510b-d243-480c-be0d-da5f9c7dac58/image.png" alt=""></p>
<hr>
<p>이후, 아래 명령어 실행</p>
<p><code>from pyspark.sql import SparkSession</code></p>
<p><code>SparkSession</code></p>
<blockquote>
<ul>
<li>SparkSession은 Apache Spark 애플리케이션을 시작하는데 필요한 프로그래밍 진입점(entry point)이다.</li>
</ul>
</blockquote>
<ul>
<li>이를 통해 DataFrame과 Dataset의 생성과 작업을 관리하고, cluster manager와 application name, executor memory, number of executor cores 등의 Spark 애플리케이션 설정을 구성할 수 있다.</li>
<li>또한, SparkSession은 다양한 데이터 소스와 직접 연결할 수 있는 read 및 write 메서드를 지원한다.</li>
</ul>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/74b693dc-2044-4c16-84c2-130bbef4229a/image.png" alt=""></p>
<hr>
<ul>
<li>secondnode:8088이나 client 퍼블릭v4 ip주소(탄력적 ip설정했다면 탄력적ip):4040를 보면 아래와 같은 화면을 볼 수 있음</li>
</ul>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/1a5dfc3d-cbdc-4ed9-9b8b-13009fcd73df/image.png" alt=""></p>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/94a8adb5-067f-4fbb-a00a-ad0a8528fe7f/image.png" alt=""></p>
<hr>
<p>jupyter Directory에 spark 예제.ipynb와 hflight.csv 파일 업로드</p>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/ce8f47c0-f61e-4791-af17-343cc1764bbd/image.png" alt=""></p>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/995eba20-ad39-440a-b926-75ab3cac1fc5/image.png" alt=""></p>
<blockquote>
<p><strong><em>참고</em></strong> </p>
</blockquote>
<ul>
<li>spark는 코드를 다 만들고 실행하면 실행 계획만!</li>
<li>show나 action을 통해 실행이 일어남!
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/a90b4643-d9df-48da-a9fa-32a619dee702/image.png" alt="">
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/01a665f8-3020-49c7-bab8-d0561ffb0ef4/image.png" alt=""></li>
</ul>
<h2 id="용어-정리">용어 정리</h2>
<blockquote>
<p>다음시간에 사용할 파일 </p>
</blockquote>
<blockquote>
<p>Parquet이란?</p>
</blockquote>
<ul>
<li>대량의 데이터를 효율적으로 저장하고 처리하기 위한 열 지향(컬럼 기반) 파일 형식이다. Parquet 파일은 대규모 데이터 처리를 위해 설계되었으며, Hadoop 기반의 분산 데이터 처리 시스템인 Apache Hadoop, Apache Spark, Apache Hive 등에서 널리 사용된다.</li>
</ul>
<blockquote>
<p>Parquet은 다양한 데이터 처리 작업에 적합한 형식을 제공한다. 일반적으로 데이터 웨어하우스, 비즈니스 인텔리전스 및 대규모 분석 시나리오에서 사용되며, 특히 대규모 데이터 세트에서의 쿼리 성능을 향상시킬 수 있다. Parquet 파일은 데이터를 컬럼 기반으로 저장하므로 필요한 컬럼만 읽거나 필터링할 수 있어서 읽기 작업의 효율성을 높인다. 또한 압축 및 직렬화 알고리즘을 사용하여 저장 공간을 절약하고 데이터 전송 속도를 높일 수 있다.</p>
</blockquote>
]]></description>
        </item>
        <item>
            <title><![CDATA[[Hive] 사용해보기]]></title>
            <link>https://velog.io/@jaekyu_lim/Hive-%EC%82%AC%EC%9A%A9%ED%95%B4%EB%B3%B4%EA%B8%B0</link>
            <guid>https://velog.io/@jaekyu_lim/Hive-%EC%82%AC%EC%9A%A9%ED%95%B4%EB%B3%B4%EA%B8%B0</guid>
            <pubDate>Tue, 20 Jun 2023 06:24:07 GMT</pubDate>
            <description><![CDATA[<h2 id="hive-예제-사용해보기-영화-크롤링-파일">Hive 예제 사용해보기 (영화 크롤링 파일)</h2>
<blockquote>
<p>Hive를 이용하여 영화 크롤링 파일 mapreduce 해보기</p>
</blockquote>
<ul>
<li><p>hadoop 유저 접속</p>
<pre><code class="language-bash">su hadoop
# 별칭으로 지정했던 명령어 실행하기
start_dfs
start_yarn
start_mr</code></pre>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/c4001540-babd-436e-8303-ed76096d1b0c/image.png" alt=""></p>
</li>
<li><p>제공해준 파일 tmdb.zip 파일 받기</p>
</li>
</ul>
<p>clinet 에서 실행</p>
<pre><code class="language-bash">cd ~
wget https://mydatahive.s3.ap-northeast-2.amazonaws.com/tmdb.zip</code></pre>
<ul>
<li><p>unzip 설치</p>
<pre><code class="language-bash">sudo yum install unzip #unzip설치
# tmdb 파일 만들어서 tmdb.zip 파일을 tmdb파일안에 압축해제
mkdir tmdb &amp;&amp; unzip ./tmdb.zip -d ./tmdb
# 하둡에 tmdb파일 만들고 csv파일 붓기
hdfs dfs -mkdir /tmdb
hdfs dfs -put ./*.csv /tmdb</code></pre>
</li>
<li><p>잘들어갔는지 확인 (namenode:50070에서 확인)
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/d9545d4b-1df9-4821-b510-56fdb56fecdc/image.png" alt=""></p>
</li>
<li><p>putty에서 확인하는 방법</p>
<pre><code class="language-bash">hdfs dfs -ls /tmdb</code></pre>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/a0299cde-a343-4de3-bb8f-cc9e84b7b5dd/image.png" alt=""></p>
</li>
</ul>
<h2 id="hive-맵리듀스-해보기">Hive 맵리듀스 해보기</h2>
<ul>
<li>받았던 csv파일을 sql 쿼리를 날려 테이블 생성</li>
</ul>
<pre><code class="language-sql">CREATE EXTERNAL TABLE IF NOT EXISTS movie(
    Movie_ID  STRING,
    Adult  STRING,
    Backdrop_Path  STRING,
    Genres  STRING,
    Homepage  STRING,
    Original_Language  STRING,
    Original_Title  STRING,
    Overview  STRING,
    Popularity  STRING,
    Poster_Path  STRING,
    Production_Companies  STRING,
    Production_Countries  STRING,
    Release_Date  STRING,
    Revenue  STRING,
    Runtime  STRING,
    Spoken_Languages  STRING,
    Status  STRING,
    Tagline  STRING,
    Title  STRING,
    Vote_Average  FLOAT,
    Vote_Count  INT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY &#39;,&#39;
LOCATION &#39;/tmdb&#39;;</code></pre>
<ul>
<li>잘 적용됐는지 확인해보기</li>
</ul>
<pre><code class="language-sql">SELECT * FROM movie limit 5;</code></pre>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/2711ccf5-bf87-4759-92fc-61dc56c936a7/image.png" alt=""></p>
<h3 id="장르별로-집계해보기">장르별로 집계해보기</h3>
<pre><code class="language-sql">SELECT Genres , AVG(Vote_Count) FROM movie GROUP BY Genres </code></pre>
<ul>
<li><p>map 과 reduce 과정
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/47c30ab3-7c05-494f-a0cb-fa721e06b97b/image.png" alt=""></p>
</li>
<li><p>결과창
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/1fc4573d-ac94-4e81-9eff-1489d1322803/image.png" alt=""></p>
</li>
</ul>
]]></description>
        </item>
        <item>
            <title><![CDATA[[Hadoop] 하둡 WordCount(예제)]]></title>
            <link>https://velog.io/@jaekyu_lim/Hadoop-%ED%95%98%EB%91%A1-WordCount%EC%98%88%EC%A0%9C</link>
            <guid>https://velog.io/@jaekyu_lim/Hadoop-%ED%95%98%EB%91%A1-WordCount%EC%98%88%EC%A0%9C</guid>
            <pubDate>Mon, 19 Jun 2023 08:21:46 GMT</pubDate>
            <description><![CDATA[<h2 id="hadoop에서-wordcount-하기">Hadoop에서 wordcount 하기</h2>
<p>putty 접속 후</p>
<pre><code class="language-bash">su hadoop

#bashrc 명령어 실행
$ start dfs
$ start_yarn
$ start_mr</code></pre>
<ol>
<li><p>hdfs에 새로운 폴더 생성</p>
<pre><code class="language-bash">$ hdfs dfs -mkdir /mydata</code></pre>
</li>
<li><p>hdfs에 데이터 넣기</p>
<pre><code class="language-bash">$ hdfs dfs -put ~/hadoop/etc/hadoop/`*.xml` /mydata</code></pre>
</li>
<li><p>Hadoop Cluster에서 텍스트 파일 검색하기</p>
<pre><code class="language-bash">$ hadoop jar ~/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.2.1.jar grep /mydata /output2 &#39;dfs[a-z.]+&#39;</code></pre>
</li>
</ol>
<p>Hadoop MapReduce의 예제 중 하나인 grep 실행
하둡 클러스터에서 텍스트 파일을 검색</p>
<blockquote>
<p>/mydata 경로에 있는 텍스트 파일을
dfs[a-z.]+라는 문법 (dfs가 들어가는 거를 wordcount)을 통해서 작업할거고, 그 아웃풋을 /output2 에 담겠다.</p>
</blockquote>
<ol start="4">
<li><p>MapReduce 성공 결과 세부 내역 확인
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/fa02eb95-719e-4060-91da-4cf632d12171/image.png" alt="">
노란색 url 경로를 들어가면 아래 그림과 같이 확인할 수 있다.
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/d32ecaba-bc04-4d3d-a970-b57e25da4949/image.png" alt=""></p>
</li>
<li><p>결과 확인</p>
<pre><code class="language-bash">$ hdfs dfs -cat /output2/*</code></pre>
</li>
</ol>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/42df7179-9e09-46d2-93c7-463236b1ef90/image.png" alt=""></p>
]]></description>
        </item>
        <item>
            <title><![CDATA[[Hadoop] 하둡 설정하기(2)]]></title>
            <link>https://velog.io/@jaekyu_lim/Hadoop-%ED%95%98%EB%91%A1-%EC%84%A4%EC%A0%95%ED%95%98%EA%B8%B02</link>
            <guid>https://velog.io/@jaekyu_lim/Hadoop-%ED%95%98%EB%91%A1-%EC%84%A4%EC%A0%95%ED%95%98%EA%B8%B02</guid>
            <pubDate>Mon, 19 Jun 2023 08:11:51 GMT</pubDate>
            <description><![CDATA[<h3 id="저장소-지정workers-하기">저장소 지정(workers) 하기</h3>
<pre><code class="language-bash">cd ~/hadoop/etc/hadoop에서 vim workers</code></pre>
<p>datanode1, datanode2, datanode3 입력</p>
<p>workers 파일 복사(scp 사용법)</p>
<pre><code class="language-bash">scp [파일대상][서버][서버의 파일경로]

scp ./workers namenode:/home/hadoop/hadoop/etc/hadoop/ (namenode, secondnode, datanode3 모두 같은 명령어 실행)</code></pre>
<p>다른 서버끼리 미리 ssh 연결되어 있어야 scp 명령어 사용 가능
이 작업을 완료하면 worker 노드를 식별하고 클러스터의 구성을 설정, 작업 분산을 할 수 있음</p>
<h3 id="하둡-서버의-resource-map">하둡 서버의 resource map</h3>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/801dada0-0cd6-4e87-983c-9b5fa2a9304b/image.png" alt=""></p>
<h3 id="ec2-보안그룹-설정">ec2 보안그룹 설정</h3>
<p>인바운드 규칙 추가
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/a7fd2810-8376-483b-8c84-385da1d9693a/image.png" alt="">
hadoop이 형성된 인스턴스의 보안그룹으로 설정하면 ping datanode3을 했을 때, 핑이 날라가는 걸 확인할 수 있음</p>
<h3 id="hadoop-format">hadoop format</h3>
<pre><code class="language-bash">ssh namenode 
hadoop namenode -format #namenode 초기화</code></pre>
<p>hadoop 클러스터를 처음 설정할때 또는 데이터를 삭제하고 새로운 클러스터를 구성할 때 사용
namenode에 어떤 파일이 어디에 저장되고 정보를 저장하는 장부 역할을 수행
그 장부를 초기화해서 내용을 쓸 준비를 하는 과정</p>
<h4 id="start-dfs">start-dfs</h4>
<pre><code>sssh namenode
start-dfs.sh : 분산파일시스템(HDFS)의 서비스를 시작</code></pre><p>jps : 자바프로세스를 알려주는 명령어
jps를 입력하면 Namenode, Jps, Datanode가 출력됨</p>
<h4 id="start-yarn">start-yarn</h4>
<pre><code class="language-bash">ssh secondnode
start-yarn.sh</code></pre>
<p>yarn은 지원부서같은 개념, 한 node가 빡세면 다른곳으로 보내주는 개념
jps를 입력하면 Datanode, Nodemanager, Jps, Resourcemanager, secondaryNamenode가 출력됨</p>
<h4 id="start-mr">start-mr</h4>
<p>mr-jobhistory-daemon.sh start historyserver
jps입력 시 Datanode, Jps, Nodemanager가 출력됨</p>
<h3 id="hadoop-상태-확인-페이지">hadoop 상태 확인 페이지</h3>
<blockquote>
<p>보안그룹 인바운드 규칙 추가
50070 포트로 소스는 0.0.0.0/0으로 시정하고 규칙 추가
hadoop 상태 확인하는 페이지 열기
주소창에 namenode public IP:50070 입력</p>
</blockquote>
<p><img src="https://velog.velcdn.com/images/jaekyu_lim/post/2ed3cb61-24ae-4e1f-bf6c-1857956bfbd7/image.png" alt="">
<img src="https://velog.velcdn.com/images/jaekyu_lim/post/59f9d884-4070-47ff-b07f-fd4e718a27f1/image.png" alt=""></p>
<h3 id="하둡-로그-확인">하둡 로그 확인</h3>
<p>cd hadoop/logs에서 log를 확인할 수 있음</p>
<blockquote>
<p>대문자 G - 맨끝으로가기
소문자 gg - 맨 처음으로 가기</p>
</blockquote>
<p>tail을 맨 앞에 붙이면 해당 파일의 끝부분만 보여줌, 끝에 3줄만 보고 싶으면 명령어 맨 뒤에 -n 3를 붙여줌</p>
<p>log파일을 vim으로 열어서 error가 뜨는 부분을 확인</p>
<h4 id="명령어-쉽게-사용하기">명령어 쉽게 사용하기</h4>
<p>ssh namenode stop-dfs.sh, ssh secondnode start-yarn.sh같은 명령어를 쉽게 사용하기 위해 별칭 설정</p>
<pre><code class="language-bash">vim ~/.bashrc 입력하고 아래 부분을 추가
alias start-dfs=&quot;ssh namenode start-dfs.sh&quot;
alias start-yarn=&quot;ssh secondnode start-yarn.sh&quot;
alias stop-dfs=&quot;ssh namenode stop-dfs.sh&quot;
alias stop-yarn=&quot;ssh secondnode stop-yarn.sh&quot;
alias start-mr=&quot;ssh namenode mr-jobhistory-daemon.sh start historyserver&quot;
alias stop-mr=&quot;ssh namenode mr-jobhistory-daemon.sh stop historyserver&quot;</code></pre>
]]></description>
        </item>
    </channel>
</rss>