<?xml version="1.0" encoding="utf-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom">
    <channel>
        <title>Taegong_s</title>
        <link>https://velog.io/</link>
        <description>가치를 창출하는 개발자! 가 목표입니다</description>
        <lastBuildDate>Thu, 31 Oct 2024 12:56:24 GMT</lastBuildDate>
        <docs>https://validator.w3.org/feed/docs/rss2.html</docs>
        <generator>https://github.com/jpmonette/feed</generator>
        <image>
            <title>Taegong_s</title>
            <url>https://velog.velcdn.com/images/taegong_s/profile/9d3840f8-86cd-4854-8d4e-8f20df03970d/image.jpg</url>
            <link>https://velog.io/</link>
        </image>
        <copyright>Copyright (C) 2019. Taegong_s. All rights reserved.</copyright>
        <atom:link href="https://v2.velog.io/rss/taegong_s" rel="self" type="application/rss+xml"/>
        <item>
            <title><![CDATA[[십자말풀이] PyTest를 활용한 인증, 퍼즐 서비스 테스트코드 작성]]></title>
            <link>https://velog.io/@taegong_s/%EC%8B%AD%EC%9E%90%EB%A7%90%ED%92%80%EC%9D%B4-PyTest%EB%A5%BC-%ED%99%9C%EC%9A%A9%ED%95%9C-%EC%9D%B8%EC%A6%9D-%ED%8D%BC%EC%A6%90-%EC%84%9C%EB%B9%84%EC%8A%A4-%ED%85%8C%EC%8A%A4%ED%8A%B8%EC%BD%94%EB%93%9C-%EC%9E%91%EC%84%B1</link>
            <guid>https://velog.io/@taegong_s/%EC%8B%AD%EC%9E%90%EB%A7%90%ED%92%80%EC%9D%B4-PyTest%EB%A5%BC-%ED%99%9C%EC%9A%A9%ED%95%9C-%EC%9D%B8%EC%A6%9D-%ED%8D%BC%EC%A6%90-%EC%84%9C%EB%B9%84%EC%8A%A4-%ED%85%8C%EC%8A%A4%ED%8A%B8%EC%BD%94%EB%93%9C-%EC%9E%91%EC%84%B1</guid>
            <pubDate>Thu, 31 Oct 2024 12:56:24 GMT</pubDate>
            <description><![CDATA[<p>작업 레포지토리 : <a href="https://github.com/fnzksxl/word-puzzle">https://github.com/fnzksxl/word-puzzle</a></p>
<p>예비군 동미참 다녀오느라 업로드가 조금 느렸습니다..
PyTest 라이브러리를 활용해 이 때까지 개발했던 서비스 코드들에 대한 테스트 코드를 작성해보겠습니다.</p>
<p>서비스 코드에 몇 가지 변경사항이 있으니, 그대로 따라해보실 분들께서는 레포지토리의 test-puzzle_auth 브랜치를 참고해주시길 바랍니다.</p>
<h2 id="1-pytest-세팅">1. Pytest 세팅</h2>
<pre><code>pip install pytest-asyncio</code></pre><p>pytest-asyncio 라이브러리를 설치하면 비동기로 테스트를 진행할 수 있습니다.</p>
<p><strong>테스트 폴더 구조</strong></p>
<pre><code>ROOT
ㄴ test/
  ㄴ __init__.py
  ㄴ conftest.py
    ㄴ domain1/
      ㄴ conftest.py
      ㄴ ...
    ㄴ domain2/
      ㄴ conftest.py
      ㄴ ...

    ...</code></pre><p>위와 같은 형식으로 테스트 코드를 추가해나가겠습니다.</p>
<h3 id="test-환경-설정">Test 환경 설정</h3>
<p>테스트 할 때 실사용중인 DB에서 진행하면 안 되겠죠?
그리고 각 테스트 케이스에 독립성을 부여하기 위해 Test DB를 따로 사용하겠습니다.</p>
<pre><code class="language-python"># app/config.py

# Test Settings, Settings Class에 추가
TESTING = True if os.getenv(&quot;TESTING&quot;) == &quot;true&quot; else False
TEST_DB_NAME = os.getenv(&quot;TEST_DB_NAME&quot;)</code></pre>
<pre><code class="language-python"># app/database.py

engine = create_engine(
    &quot;mysql+pymysql://{username}:{password}@{host}:{port}/{name}&quot;.format(
        username=settings.DB_USERNAME,
        password=settings.DB_PASSWORD,
        host=settings.DB_HOST,
        port=settings.DB_PORT,
        name=settings.TEST_DB_NAME if settings.TESTING else settings.DB_NAME,
    )
)</code></pre>
<p>테스트 중인지 아닌지를 서버세팅에 추가하고, 그 값에 따라 어떤 DB를 사용할 지 나누어주었습니다.</p>
<pre><code class="language-python"># test/conftest.py

@pytest_asyncio.fixture(scope=&quot;session&quot;)
def app():
    if not settings.TESTING:
        raise SystemError(&quot;Testing Environment setting must be &#39;TRUE&#39;&quot;)

    return main.app


@pytest_asyncio.fixture
async def session():
    db = next(get_db())
    try:
        yield db
    finally:
        db.close()


@pytest_asyncio.fixture
async def client(app):
    async with AsyncClient(app=app, base_url=&quot;http://test/api/v1&quot;) as ac:
        models.Base.metadata.drop_all(bind=engine, tables=tables_to_drop)
        models.Base.metadata.create_all(bind=engine)
        yield ac</code></pre>
<blockquote>
<ol>
<li>테스트 설정이 아니라면 pytest를 실행시켜도 SystemError가 발생</li>
<li>session fixture로 test db에 접근</li>
<li>client fixture로 각 테스트케이스마다 DB를 초기화 시켜 독립성 유지</li>
</ol>
</blockquote>
<p>app fixture의 scope가 &quot;session&quot;으로 pytest를 실행할 때 최초 호출 후 캐시에서 불러오기 때문에 한 번만 실행된다는 점이 눈여겨볼만한 요소입니다.</p>
<h2 id="2-puzzle-test-case-작성">2. Puzzle Test Case 작성</h2>
<h3 id="퍼즐-생성-테스트">퍼즐 생성 테스트</h3>
<p>퍼즐 생성 엔드포인트를 테스트하기 위해서는 WordInfo(단어정보) 테이블에 데이터가 들어가 있어야합니다.</p>
<p>하지만 우리가 위에서 작성한 conftest의 client fixture로 테스트를 진행하면 늘 DB가 초기화 된 상태로 진행되기 때문에 데이터가 들어있지 않습니다.</p>
<p>가장 간단해보이는 방법으로는 wordinfo fixture를 만들어 Test DB에 단어 데이터를 넣어주는 수가 있습니다만, 이 방법을 사용할 시에는 삽입해주는 데이터 양에 따라서 테스트 케이스 하나에 많은 비용이 들어갈 것 입니다.</p>
<p>단어 데이터를 모킹해주는 방법은 어떨까요?
테이블의 구조와 로직을 생각해보면 퍼즐 생성 후 저장하는 과정에서
WordInfo.id가 외래키로 걸려있으므로 테이블에 데이터를 삽입하는 과정이 불가피합니다.</p>
<blockquote>
<p>그래서 어떻게 시간을 줄일거냐?</p>
</blockquote>
<p>나름대로의 꼼수를 부려봤습니다.
현재 로직을 생각해보면 WordInfo 테이블에 단어 데이터를 정제하면서 삽입한 이후에 데이터의 수정을 요구하지 않습니다.</p>
<p>따라서, WordInfo 테이블에 데이터 삽입을 <strong>한번만</strong> 실행하고
client에서 WordInfo의 데이터를 드랍시키지 않는 방법을 떠올렸습니다.</p>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/8426a93c-87a0-48fd-ab48-242163932999/image.png" alt=""></p>
<p>Pytest를 시작했을 때 TEST DB에 WordInfo 데이터 유무에 따라
데이터 삽입을 결정하도록 하는 로직을 그림으로 표현해봤습니다.
아래 코드는 위 그림을 구현한 것입니다.</p>
<pre><code>pip install pandas</code></pre><p>단어 데이터를 csv로 관리하고 DB에 추가하기 위해 pandas 라이브러리를 설치해줬습니다.</p>
<pre><code class="language-python">
test/conftest.py

@pytest_asyncio.fixture(scope=&quot;session&quot;)
async def inserted_wordinfo_into_db(app):
    with engine.connect() as connection:
        result = connection.execute(text(&quot;SELECT COUNT(*) FROM wordinfo&quot;))
        row_count = result.scalar()

        if row_count &gt; 494047:
            connection.execute(text(&quot;DELETE FROM wordinfo&quot;))
            connection.execute(text(&quot;ALTER TABLE wordinfo AUTO_INCREMENT = 1&quot;))
        if row_count == 0:
            df = pd.read_csv(&quot;test/example/wordinfo_backup.csv&quot;)
            df.to_sql(&quot;wordinfo&quot;, con=engine, index=False, if_exists=&quot;append&quot;)


@pytest_asyncio.fixture
async def client(app, inserted_wordinfo_into_db):
    async with AsyncClient(app=app, base_url=&quot;http://test/api/v1&quot;) as ac:
        tables_to_drop = [
            table for table in models.Base.metadata.sorted_tables if table.name != &quot;wordinfo&quot;
        ]

        models.Base.metadata.drop_all(bind=engine, tables=tables_to_drop)
        models.Base.metadata.create_all(bind=engine)
        yield ac</code></pre>
<p>단어 데이터 확인 fixture의 scope를 session으로 설정해 이 후 다른 테스트 케이스에서 client를 불러도 데이터 확인 로직이 한 번만 실행하도록 구성했습니다.</p>
<p>또한, client fixture에서 drop하는 테이블에서 WordInfo 테이블만 제외시켜 데이터를 보존하는 동시에 각 테스트 케이스의 독립성을 유지해줬습니다.</p>
<p>이제 테스트 케이스를 작성해봅시다.</p>
<pre><code class="language-python"># test/puzzle/test_puzzle.py

@pytest.mark.asyncio
async def test_create_puzzle(client):
    r = await client.get(&quot;/puzzle&quot;)
    data = r.json()

    assert r.status_code == 200
    assert type(data.get(&quot;map&quot;, None)) == list


@pytest.mark.asyncio
async def test_create_puzzle_error_with_size(client):
    r = await client.get(&quot;/puzzle?size=11&quot;)

    assert r.status_code == 422</code></pre>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/1a5b7d42-4a0c-4249-b16e-3dbf2133014b/image.png" alt=""></p>
<p>터미널창에서 pytest -v를 입력해 확인해본 결과 잘 나왔습니다.</p>
<h3 id="퍼즐-조회-테스트">퍼즐 조회 테스트</h3>
<p>퍼즐 조회 테스트를 하기 위해서는 퍼즐 데이터가 존재해야겠죠?
puzzle fixture를 작성해줍시다.</p>
<pre><code class="language-python"># test/puzzle/conftest.py

@pytest_asyncio.fixture
async def puzzle(session):
    with open(&quot;test/example/example_map.json&quot;, &quot;r&quot;, encoding=&quot;utf-8&quot;) as f:
        json_data = json.load(f)
    _map = json_data.get(&quot;map&quot;, None)
    _answers = json_data.get(&quot;desc&quot;, None)

    map_row = models.Puzzle(puzzle=_map)
    session.add(map_row)
    session.flush()

    insert_data = [
        {&quot;puzzle_id&quot;: map_row.id, &quot;word_id&quot;: desc[&quot;id&quot;], &quot;num&quot;: desc[&quot;num&quot;]} for desc in _answers
    ]

    session.bulk_insert_mappings(models.PuzzleAnswer, insert_data)
    session.commit()</code></pre>
<p>퍼즐 샘플이 담긴 json 파일을 불러와 TEST DB에 저장하는 fixture입니다.</p>
<pre><code class="language-python">@pytest.mark.asyncio
async def test_get_puzzle(client, puzzle):
    r = await client.get(&quot;/puzzle/1&quot;)

    assert r.status_code == 200


@pytest.mark.asyncio
async def test_get_puzzle_error_with_wrong_id(client):
    r = await client.get(&quot;/puzzle/2&quot;)

    assert r.status_code == 404</code></pre>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/502bbdf1-05d0-43cb-92a1-81da3744bc3d/image.png" alt=""></p>
<h2 id="3-auth-test-case-작성">3. Auth Test Case 작성</h2>
<h3 id="일반-로그인">일반 로그인</h3>
<pre><code class="language-python"># test/auth/conftest.py

@pytest_asyncio.fixture
async def user(session):
    user_dict = {
        &quot;email&quot;: &quot;test@test.com&quot;,
        &quot;nickname&quot;: &quot;testname&quot;,
    }
    raw_password = &quot;test1234&quot;
    salt_value = bcrypt.gensalt()
    password = bcrypt.hashpw(raw_password.encode(), salt_value)

    user_dict.update({&quot;password&quot;: password})

    user_row = models.User(**user_dict)
    session.add(user_row)
    session.commit()

    return user_row</code></pre>
<p>conftest에 user fixture를 만들어줍시다.</p>
<pre><code class="language-python"># test/auth/test_auth.py

@pytest.mark.asyncio
async def test_general_register(client, session):
    body = {
        &quot;email&quot;: &quot;test@test.com&quot;,
        &quot;password&quot;: &quot;test1234&quot;,
        &quot;nickname&quot;: &quot;testname&quot;,
    }
    r = await client.post(&quot;/auth/general-register&quot;, data=json.dumps(body))

    assert r.status_code == 200

    user = session.query(models.User).filter(models.User.email == body.get(&quot;email&quot;)).first()

    assert body.get(&quot;email&quot;) == user.email
    assert body.get(&quot;nickname&quot;) == user.nickname

@pytest.mark.asyncio
async def test_general_login(client, user):
    body = {&quot;email&quot;: &quot;test@test.com&quot;, &quot;password&quot;: &quot;test1234&quot;}
    r = await client.post(&quot;/auth/general-login&quot;, data=json.dumps(body))

    assert r.status_code == 200

@pytest.mark.asyncio
async def test_check_duplicated_email(client, user):
    email = &quot;test@test.com&quot;
    r = await client.get(f&quot;/auth/duplicated?email={email}&quot;)
    data = r.json()

    assert r.status_code == 200
    assert data.get(&quot;is_duplicated&quot;)</code></pre>
<p>일반 회원가입과 로그인 테스트 및 중복 이메일 테스트 코드입니다.
이제 응답에 딸려온 토큰이 잘 생성되었는지 테스트 해봅시다.</p>
<pre><code class="language-python"># test/auth/conftest.py

@pytest_asyncio.fixture
async def token(user):
    jwt_service = JWTService()
    user_dict = user.as_dict()
    user_dict.pop(&quot;password&quot;)
    user_dict.pop(&quot;created_at&quot;)
    user_dict.pop(&quot;updated_at&quot;)
    access_token = jwt_service.create_access_token(user_dict)

    return access_token</code></pre>
<p>user fixture를 주입받아 token을 생성하는 fixture입니다.</p>
<pre><code class="language-python"># test/auth/test_auth.py

@pytest.mark.asyncio
async def test_get_user_by_token(client, token):
    cookies = {&quot;access&quot;: token}
    r = await client.get(&quot;/auth/get-user&quot;, cookies=cookies)
    data = r.json()

    assert r.status_code == 200
    assert data.get(&quot;email&quot;, None) == &quot;test@test.com&quot;</code></pre>
<p>토큰 정보로 유저 정보를 반환하는 테스트 코드입니다.</p>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/5ad92ded-dd1a-4c4c-8047-051224b04179/image.png" alt=""></p>
<h3 id="소셜-로그인">소셜 로그인</h3>
<p>소셜 로그인은 일반 로그인과 다르게 외부 API를 이용하는 로직이 있습니다.
이는 모킹을 이용해 테스트해보겠습니다.</p>
<p>파이썬에서 기본으로 제공하는 unittest 라이브러리의 patch 기능을 사용할 수 있습니다.</p>
<p>테스트 할 엔드포인트의 함수를 모킹하여 원하는 동작을 하도록 바꿔주는 기능이 patch입니다.</p>
<pre><code class="language-python"># test/auth/mock.py

async def mock_post_token_request(*args, **kwargs):
    return {&quot;access_token&quot;: &quot;mock_access_token&quot;}

async def mock_get_userinfo(*args, **kwargs):
    return {&quot;email&quot;: &quot;test@test.com&quot;, &quot;name&quot;: &quot;testuser&quot;}</code></pre>
<p>위 함수는 구글 API에서 Access Token을 받아오는 메소드,
아래 함수는 구글 API에서 유저정보를 받아오는 메소드를 모킹할 함수입니다.</p>
<pre><code class="language-python">@patch(&quot;app.api.v1.auth.service.GoogleOAuthService.get_token&quot;, new=mock.mock_post_token_request)
@patch(&quot;app.api.v1.auth.service.GoogleOAuthService.get_userinfo&quot;, new=mock.mock_get_userinfo)</code></pre>
<p>테스트 코드에 위와 같은 데코레이터를 붙여주면 모킹 기능이 작동합니다.</p>
<pre><code class="language-python"># test/auth/conftest.py

@pytest_asyncio.fixture
async def oauth_google_user(session):
    user = models.User(email=&quot;test@test.com&quot;, nickname=&quot;test&quot;)
    session.add(user)
    session.flush()

    oauth_info = models.OAuth(user_id=user.id, email=user.email, provider=&quot;google&quot;)
    session.add(oauth_info)
    session.commit()</code></pre>
<p>소셜 로그인 유저 fixture를 작성한 뒤 테스트 코드도 작성해봅시다.</p>
<pre><code class="language-python"># test/auth/test_auth.py

@pytest.mark.asyncio
@patch(&quot;app.api.v1.auth.service.GoogleOAuthService.get_token&quot;, new=mock.mock_post_token_request)
@patch(&quot;app.api.v1.auth.service.GoogleOAuthService.get_userinfo&quot;, new=mock.mock_get_userinfo)
async def test_google_register(client, session):
    code = &quot;dummy code&quot;
    r = await client.get(f&quot;/auth/oauth-register/google/callback?code={code}&quot;)

    assert r.status_code == 200

    user = session.query(models.User).filter(models.User.email == &quot;test@test.com&quot;).first()

    assert &quot;test&quot; == user.nickname

    oauth_user = session.query(models.OAuth).filter(models.OAuth.id == user.id).first()

    assert oauth_user.email == user.email


@pytest.mark.asyncio
@patch(&quot;app.api.v1.auth.service.GoogleOAuthService.get_token&quot;, new=mock.mock_post_token_request)
@patch(&quot;app.api.v1.auth.service.GoogleOAuthService.get_userinfo&quot;, new=mock.mock_get_userinfo)
async def test_google_register_failed_by_duplicated_email(client, user):
    code = &quot;dummmy code&quot;
    r = await client.get(f&quot;/auth/oauth-register/google/callback?code={code}&quot;)
    data = r.json()

    assert r.status_code == 400
    assert data.get(&quot;detail&quot;) == &quot;중복된 이메일입니다.&quot;


@pytest.mark.asyncio
@patch(&quot;app.api.v1.auth.service.GoogleOAuthService.get_token&quot;, new=mock.mock_post_token_request)
@patch(&quot;app.api.v1.auth.service.GoogleOAuthService.get_userinfo&quot;, new=mock.mock_get_userinfo)
async def test_google_login(client, oauth_google_user):
    code = &quot;dummmy code&quot;
    r = await client.get(f&quot;/auth/oauth-register/google/callback?code={code}&quot;)
    data = r.json()

    assert r.status_code == 200
    assert data.get(&quot;email&quot;) == &quot;test@test.com&quot;
    assert data.get(&quot;nickname&quot;) == &quot;test&quot;</code></pre>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/230a92bf-7941-4664-8977-b56fb317f8d2/image.png" alt=""></p>
<p>테스트가 잘 되는 군요.</p>
]]></description>
        </item>
        <item>
            <title><![CDATA[[십자말풀이] 회원가입/로그인과 소셜 로그인 JWT로 구현하기]]></title>
            <link>https://velog.io/@taegong_s/%EC%8B%AD%EC%9E%90%EB%A7%90%ED%92%80%EC%9D%B4-%ED%9A%8C%EC%9B%90%EA%B0%80%EC%9E%85%EB%A1%9C%EA%B7%B8%EC%9D%B8%EA%B3%BC-%EC%86%8C%EC%85%9C-%EB%A1%9C%EA%B7%B8%EC%9D%B8-JWT%EB%A1%9C-%EA%B5%AC%ED%98%84%ED%95%98%EA%B8%B0</link>
            <guid>https://velog.io/@taegong_s/%EC%8B%AD%EC%9E%90%EB%A7%90%ED%92%80%EC%9D%B4-%ED%9A%8C%EC%9B%90%EA%B0%80%EC%9E%85%EB%A1%9C%EA%B7%B8%EC%9D%B8%EA%B3%BC-%EC%86%8C%EC%85%9C-%EB%A1%9C%EA%B7%B8%EC%9D%B8-JWT%EB%A1%9C-%EA%B5%AC%ED%98%84%ED%95%98%EA%B8%B0</guid>
            <pubDate>Sat, 26 Oct 2024 09:43:41 GMT</pubDate>
            <description><![CDATA[<p>작업 레포지토리 : <a href="https://github.com/fnzksxl/word-puzzle">https://github.com/fnzksxl/word-puzzle</a></p>
<p>이번 포스트에서는 FastAPI 서버에 회원가입/로그인과 소셜 로그인을 구현하고 JWT로 인증하는 기능을 작성해보도록 하겠습니다.</p>
<h4 id="jwt와-쿠키를-사용하는-이유">JWT와 쿠키를 사용하는 이유</h4>
<p><a href="https://velog.io/@blessoms2017/JWT%EC%99%80-%EC%BF%A0%ED%82%A4%EB%A5%BC-%EA%B0%99%EC%9D%B4-%EC%82%AC%EC%9A%A9%ED%95%98%EB%8A%94-%EC%9D%B4%EC%9C%A0">참고 링크</a> 잘 정리되어있는 블로그 글이 있어서 링크를 걸어두도록 하겠습니다.</p>
<h3 id="프로젝트-구조-변경">프로젝트 구조 변경</h3>
<p>역할 별로 폴더를 나누던 구조에서 기능 별로 폴더를 나누는 구조로 변경했습니다.</p>
<p><strong>변경 전</strong></p>
<pre><code>ROOT
ㄴ app/
  ㄴ utils/
   ㄴ puzzle.py
   ㄴ ...
 ㄴ api/
  ㄴ v1/
   ㄴ puzzle.py
   ㄴ ...
 ㄴ schema/

...</code></pre><p><strong>변경 후</strong></p>
<pre><code>ROOT
ㄴ app/
  ㄴ api/
   ㄴ v1/
    ㄴ puzzle/
     ㄴ controller.py
     ㄴ schema.py
     ㄴ ...
    ㄴ auth/
     ㄴ controller.py
     ㄴ service.py
     ㄴ ...

...
</code></pre><h2 id="1-회원가입로그인-구현">1. 회원가입/로그인 구현</h2>
<pre><code class="language-python"># jwt.py
# models.py
class BaseDate(BaseMin):
    created_at = Column(DATETIME, default=func.now())
    updated_at = Column(DATETIME, default=func.now(), onupdate=func.now())

class User(BaseDate, Base):
    __tablename__ = &quot;user&quot;

    email = Column(VARCHAR(20), unique=True, nullable=False)
    password = Column(VARCHAR(70), nullable=True)
    nickname = Column(VARCHAR(10), nullable=False)
    solved = Column(Integer, default=0)

    def as_dict(self):
        return {column.name: getattr(self, column.name) for column in self.__table__.columns}</code></pre>
<p>User 테이블을 추가해주겠습니다.</p>
<p>JWT 암호화 및 해독을 담당할 관련 서비스와
쿠키에 토큰을 달아줄 서비스를 작성하겠습니다.</p>
<pre><code class="language-python">class JWTService:
    &quot;&quot;&quot;
    JWT 암호화 및 해독에 필요한 기능을 제공하는 클래스
    &quot;&quot;&quot;

    def __init__(self):
        self.algorithm = settings.ALGORITHM
        self.secret_key = settings.SECRET_KEY
        self.access_expire_time = settings.ACCESS_EXPIRE_TIME
        self.refresh_expire_time = settings.REFRESH_EXPIRE_TIME

    def _encode(self, data: dict, expires_delta: int) -&gt; str:
        &quot;&quot;&quot;
        JWT 토큰으로 암호화한다.

        Args:
            data (dict): 암호화 할 데이터
            expires_delta (int): JWT 만료기간
        Returns:
            str: JWT 토큰
        &quot;&quot;&quot;
        to_encode = data.copy()
        expire = datetime.now(ZoneInfo(&quot;Asia/Seoul&quot;)) + timedelta(minutes=expires_delta)
        to_encode.update({&quot;exp&quot;: expire})
        return jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm)

    def _decode(self, token: str) -&gt; Optional[Dict]:
        &quot;&quot;&quot;
        JWT 토큰을 해독한다.

        Args:
            token (str): JWT 토큰
        Returns:
            Dict or None: 해독 성공 시 정보를 담은 사전, 실패 시 None
        &quot;&quot;&quot;
        try:
            return jwt.decode(token, self.secret_key, algorithms=self.algorithm)
        except JWTError:
            return None

    def _create_token(self, data: dict, expires_delta: int) -&gt; str:
        &quot;&quot;&quot;
        JWT 토큰 암호화 래퍼함수
        &quot;&quot;&quot;
        return self._encode(data, expires_delta)

    def create_access_token(self, data) -&gt; str:
        &quot;&quot;&quot;
        Access Token 생성 함수
        &quot;&quot;&quot;
        return self._create_token(data, self.access_expire_time)

    def create_refresh_token(self, data) -&gt; str:
        &quot;&quot;&quot;
        Refresh Token 생성 함수
        &quot;&quot;&quot;
        return self._create_token(data, self.refresh_expire_time)

    def check_is_expired(self, token: str) -&gt; Optional[Dict]:
        &quot;&quot;&quot;
        JWT 토큰 해독 래퍼함수
        &quot;&quot;&quot;
        payload = self._decode(token)

        now = datetime.timestamp(datetime.now(ZoneInfo(&quot;Asia/Seoul&quot;)))
        if payload and payload[&quot;exp&quot;] &lt; now:
            return None

        return payload

# cookie.py
class Cookie:
    &quot;&quot;&quot;
    응답 객체의 쿠키 관련 작업을 담당하는 클래스
    &quot;&quot;&quot;

    async def attach_token_into_cookie(
        self, response: JSONResponse, access_token: str, refresh_token: str
    ) -&gt; JSONResponse:
        &quot;&quot;&quot;
        쿠키에 Access, Refresh Token을 부착하는 메소드

        Args:
            response (JSONResponse): 쿠키 생성해줄 response 객체
            access_token (str): Access Token JWT
            refresh_token (str): Refresh Token JWT
        Returns:
            JSONResponse: 쿠키가 부착된 JSONResponse 객체
        &quot;&quot;&quot;
        response.set_cookie(
            key=&quot;access&quot;,
            domain=&quot;localhost&quot;,
            samesite=&quot;None&quot;,
            value=access_token,
        )
        response.set_cookie(
            key=&quot;refresh&quot;,
            domain=&quot;localhost&quot;,
            samesite=&quot;None&quot;,
            value=refresh_token,
        )
        return response</code></pre>
<p>config.py와 .env 파일에 필요한 설정들을 추가되어 있는게 전제입니다.
위에 작성된 두 클래스를 Auth 서비스에 주입해 내부 메소드처럼 사용할 예정입니다.</p>
<pre><code class="language-python"># auth.py
class AuthBase(ABC):
    &quot;&quot;&quot;
    AuthService의 기초가 되는 추상클래스
    &quot;&quot;&quot;

    def __init__(self):
        self.jwt_service = JWTService()
        self.cookie_service = Cookie()

    @abstractmethod
    async def register(self):
        &quot;&quot;&quot;
        회원가입을 담당하는 추상 메소드
        &quot;&quot;&quot;
        pass

    @abstractmethod
    async def login(self):
        &quot;&quot;&quot;
        로그인을 담당하는 추상 메소드
        &quot;&quot;&quot;
        pass

    async def _get_login_response(self, user) -&gt; JSONResponse:
        &quot;&quot;&quot;
        로그인 성공 후 응답을 생성하는 메소드
        Args:
            user (User): 로그인한 유저의 User 객체
        Returns:
            JSONResponse: Access, Refresh Token이 쿠키에 들어간 응답
        &quot;&quot;&quot;
        self.user_dict = user.as_dict()
        self.user_dict.pop(&quot;password&quot;)
        self.user_dict.pop(&quot;created_at&quot;)
        self.user_dict.pop(&quot;updated_at&quot;)

        response = JSONResponse(content=self.user_dict)
        return await self._attach_token(response, self.user_dict)

    async def _attach_token(self, response, data) -&gt; JSONResponse:
        &quot;&quot;&quot;
        응답 쿠키에 Access, Refresh Token을 부착하는 메소드
        Args:
            response (JSONResponse): content로 유저 정보가 들어간 응답 객체
            data: Access, Refresh Token
        Returns:
            JSONResponse: Access, Refresh Token이 쿠키에 들어간 응답
        &quot;&quot;&quot;
        access_token = self.jwt_service.create_access_token(data)
        refresh_token = self.jwt_service.create_refresh_token(data)
        return await self.cookie_service.attach_token_into_cookie(
            response, access_token, refresh_token
        )</code></pre>
<p>Auth 서비스의 추상 클래스를 먼저 선언해줬습니다.</p>
<p>JWTService, Cookie 클래스가 init 함수에서 주입되었고,
로그인 후 응답을 생성하는 메소드와 응답 쿠키에 토큰을 달아주는 메소드는
정의되어 있는 상태입니다.</p>
<pre><code class="language-python"># exception.py
class LoginNotValidIDPWException(HTTPException):
    def __init__(self, detail: str = &quot;입력한 ID 또는 비밀번호가 일치하지 않습니다.&quot;):
        super().__init__(status_code=400, detail=detail)


# service.py
class GeneralAuthService(AuthBase):
    &quot;&quot;&quot;
    아이디(이메일), 비밀번호로 회원가입/로그인, 인증 서비스 클래스
    &quot;&quot;&quot;

    def __init__(self, email: str, password: str, db: Session, nickname: Optional[str] = None):
        &quot;&quot;&quot;
        입력받은 이메일, 비밀번호, 닉네임을 초기화한다.
        &quot;&quot;&quot;
        self.email = email
        self.password = password
        self.nickname = nickname
        self.db = db
        super().__init__()

    async def register(self) -&gt; Dict:
        &quot;&quot;&quot;
        비밀번호를 해쉬화해서 데이터베이스에 유저 정보를 저장하는 메소드

        Returns:
            dict: 필요한 유저 정보만 담아 반환되는 딕셔너리 자료
        &quot;&quot;&quot;
        user = User(email=self.email, password=await self._hash_pw(), nickname=self.nickname)
        self.db.add(user)
        self.db.commit()

        return await self._get_login_response(user)

    async def login(self) -&gt; JSONResponse:
        &quot;&quot;&quot;
        이메일과 비밀번호로 유저 정보를 확인하여 로그인하는 메소드

        Returns:
            JSONResponse: 쿠키에 토큰 정보, 콘텐츠에 유저 정보를 담은 response 객체
        &quot;&quot;&quot;
        user = self.db.query(User).filter(User.email == self.email).first()
        if (
            user
            and user.password
            and bcrypt.checkpw(self.password.encode(), user.password.encode())
        ):
            return await self._get_login_response(user)
        raise LoginNotValidIDPWException()

    async def _hash_pw(self) -&gt; str:
        &quot;&quot;&quot;
        비밀번호를 해쉬화 해 반환한다.

        Returns:
            str: 해쉬화 된 비밀번호
        &quot;&quot;&quot;
        salt_value = bcrypt.gensalt()
        return bcrypt.hashpw(self.password.encode(), salt_value)
</code></pre>
<p>저번에 작성했던 PuzzleCreateService와 init 함수 부분이 미묘하게 다릅니다.
퍼즐 서비스에서는 DB 세션을 바로 DI 받았었는데, 여기서는 안 받고있네요.</p>
<p>퍼즐 서비스에서는 SIZE 파라미터를 <strong>쿼리</strong> 형태로 받아왔습니다.
하지만, 회원가입/로그인할 때 사용할 데이터를 쿼리로 받아올 수는 없겠죠.</p>
<p>저는 JSON Body 형태로 필요 데이터를 받아오기 위해서 의존성 주입 함수를 따로 작성해줬습니다.</p>
<pre><code class="language-python"># schema.py
class GeneralLoginModel(BaseModel):
    email: str
    password: str


class GeneralRegisterModel(GeneralLoginModel):
    nickname: str

# dependancy.py
def get_general_auth_service_register(
    auth_data: GeneralRegisterModel, db: Session = Depends(get_db)
) -&gt; GeneralAuthService:
    return GeneralAuthService(
        email=auth_data.email, password=auth_data.password, nickname=auth_data.nickname, db=db
    )


def get_general_auth_service_login(
    auth_data: GeneralLoginModel, db: Session = Depends(get_db)
) -&gt; GeneralAuthService:
    return GeneralAuthService(email=auth_data.email, password=auth_data.password, db=db)</code></pre>
<p>이 함수들을 엔드포인트에서 DI 해주면, JSON Body으로 데이터를 받아오는 동시에 DB 세션을 서비스에 주입해줄 수 있습니다.</p>
<pre><code class="language-python"># controller.py

@router.post(
    &quot;/general-register&quot;, status_code=status.HTTP_201_CREATED, description=&quot;EMAIL/PW로 가입&quot;
)
async def general_register(
    auth_service: GeneralAuthService = Depends(get_general_auth_service_register),
):
    return await auth_service.register()


@router.post(&quot;/general-login&quot;, status_code=status.HTTP_200_OK, description=&quot;EMAIL/PW로 로그인&quot;)
async def general_login(auth_service: GeneralAuthService = Depends(get_general_auth_service_login)):
    return await auth_service.login()</code></pre>
<h4 id="test">TEST</h4>
<p>회원가입/로그인 후에 JWT가 생성되어 쿠키에 담겨 반환되었는지 알아보기 위해
POSTMAN을 사용해서 테스트 해보도록 하겠습니다.</p>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/16574c1b-9c13-4e75-8637-f46092883fef/image.png" alt=""></p>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/7a490402-d80f-4d71-ac32-545bf51d2b0c/image.png" alt=""></p>
<p>쿠키 access, refresh에 데이터가 잘 들어와있는 모습을 볼 수 있습니다.</p>
<h2 id="2-소셜-로그인">2. 소셜 로그인</h2>
<p>소셜 로그인 기능 구현에 앞서 소셜 로그인의 큰 틀을 먼저 살펴보겠습니다.</p>
<p>구글 소셜 로그인에 대한 예시를 그림으로 그려보겠습니다.
<img src="https://velog.velcdn.com/images/taegong_s/post/12d9672e-5794-42d1-b045-be63d939c6db/image.png" alt=""></p>
<p>General Auth와 비교하면 클라이언트에게서 인가 코드를 전달받고 구글에 access token과 유저 정보를 요청하는 로직이 추가되었다고 볼 수 있겠습니다.</p>
<pre><code class="language-python"># auth.py
class OAuthBase(AuthBase):
    &quot;&quot;&quot;
    GeneralAuth 서비스를 상속받아 OAuth 서비스의 기초가 되는 추상클래스
    &quot;&quot;&quot;

    def __init__(self):
        super().__init__()

    @abstractmethod
    async def get_token(self):
        &quot;&quot;&quot;
        OAuth 유저 정보 획득에 필요한 Token을 획득하는 추상 메소드
        &quot;&quot;&quot;
        pass

    @abstractmethod
    async def get_userinfo(self):
        &quot;&quot;&quot;
        Token으로 OAuth 유저 정보를 획득하는 추상 메소드
        &quot;&quot;&quot;
        pass</code></pre>
<p>기능 확장에 용이함을 주기 위해서 소셜 로그인에서 반드시 필요한 과정인
get_token과 get_userinfo 메소드를 추상메소드로 생성하는
OAuthBase 추상클래스를 작성해줍니다.</p>
<pre><code class="language-python"># exception.py

class EmailDuplicatedException(HTTPException):
    def __init__(self, detail: str = &quot;중복된 이메일입니다.&quot;):
        super().__init__(status_code=400, detail=detail)


class GoogleGetTokenException(HTTPException):
    def __init__(self, detail: str = &quot;구글 토큰 획득 과정에서 오류가 발생했습니다.&quot;):
        super().__init__(status_code=400, detail=detail)


class GoogleGetUserInfoException(HTTPException):
    def __init__(self, detail: str = &quot;구글 유저 정보 과정에서 오류가 발생했습니다.&quot;):
        super().__init__(status_code=400, detail=detail)


class GoogleRegisterException(HTTPException):
    def __init__(self, detail: str = &quot;구글 소셜 회원가입 과정에서 오류가 발생했습니다.&quot;):
        super().__init__(status_code=400, detail=detail)

# service.py

class GoogleOAuthService(OAuthBase):
    def __init__(self, code: str, db: Session = Depends(get_db)):
        &quot;&quot;&quot;
        구글 소셜 로그인에 필요한 정보를 초기화 한다.
        &quot;&quot;&quot;
        super().__init__()
        self.provider = &quot;google&quot;
        self.client_id = settings.GOOGLE_CLIENT_ID
        self.redirect_uri = settings.GOOGLE_REDIRECT_URI
        self.client_secret = settings.GOOGLE_CLIENT_SECRET
        self.db = db
        self.code = code
        self.token_request_url = &quot;https://oauth2.googleapis.com/token&quot;
        self.userinfo_endpoint = &quot;https://www.googleapis.com/userinfo/v2/me&quot;

    async def register(self) -&gt; JSONResponse:
        try:
            email = self.oauth_user_info.get(&quot;email&quot;, None)
            nickname = email.split(&quot;@&quot;)[0]
            user = User(email=email, nickname=nickname)
            self.db.add(user)
            self.db.flush()

            oauth = OAuth(user_id=user.id, email=user.email, provider=self.provider)
            self.db.add(oauth)

            self.db.commit()
        except Exception:
            raise GoogleRegisterException()

        return await self._get_login_response(user)

    async def login(self) -&gt; JSONResponse:
        &quot;&quot;&quot;
        소셜 로그인 로직을 수행하는 기초 메소드,
        데이터베이스에 유저 존재 -&gt; 정보 반환
        존재 X -&gt; 저장(회원가입) 후 정보 반환

        Returns:
            JSONResponse: 쿠키에 토큰 정보, 콘텐츠에 유저 정보를 담은 response 객체
        &quot;&quot;&quot;
        user = await self.is_registered()

        if user is None:
            return await self.register()

        return await self._get_login_response(user)

    async def get_token(self) -&gt; str:
        &quot;&quot;&quot;
        유저 정보 조회에 필요한 토큰을 받아온다.

        Returns:
            str: 구글 OAuth 서비스에서 받은 토큰
        &quot;&quot;&quot;
        token_request_payload = {
            &quot;grant_type&quot;: &quot;authorization_code&quot;,
            &quot;client_id&quot;: self.client_id,
            &quot;redirect_uri&quot;: self.redirect_uri,
            &quot;code&quot;: self.code,
            &quot;client_secret&quot;: self.client_secret,
        }

        async with httpx.AsyncClient() as client:
            response = await client.post(self.token_request_url, data=token_request_payload)
        result = response.json()

        if &quot;access_token&quot; in result:
            return result[&quot;access_token&quot;]
        else:
            raise GoogleGetTokenException()

    async def get_userinfo(self, token) -&gt; Dict:
        &quot;&quot;&quot;
        구글에서 받아온 토큰으로 유저 정보를 요청하고 반환한다.

        Args:
            token (str): 구글에서 받아온 토큰
        Returns:
            dict:
        &quot;&quot;&quot;
        headers = {&quot;Authorization&quot;: f&quot;Bearer {token}&quot;}

        async with httpx.AsyncClient() as client:
            response = await client.get(self.userinfo_endpoint, headers=headers)
        if response.status_code == 200:
            return response.json()
        else:
            raise GoogleGetUserInfoException()

    async def is_registered(self) -&gt; Optional[User]:
        &quot;&quot;&quot;
        가입된 유저라면 유저 정보를, 아니라면 None을 반환한다.

        Returns:
            User or None: 정보가 있다면 User 객체, 아니면 None 반환
        &quot;&quot;&quot;
        token = await self.get_token()
        user_info = await self.get_userinfo(token)
        return await self.get_user_from_db(user_info)

    async def get_user_from_db(self, user_info):
        &quot;&quot;&quot;
        구글에서 받은 유저 정보를 데이터베이스에 검색하고
        존재하면 User 객체, 아니면 None을 반환한다.

        Args:
            user_info (dict): 구글에서 받은 유저 정보
        Returns:
            User or None: 정보가 있다면 User 객체, 아니면 None 반환
        &quot;&quot;&quot;

        user = self.db.query(User).filter(User.email == user_info.get(&quot;email&quot;)).first()
        if user:
            oauth_user_info = (
                self.db.query(OAuth)
                .filter(OAuth.provider == self.provider, OAuth.email == user.email)
                .first()
            )
            if oauth_user_info:
                return user
            else:
                raise EmailDuplicatedException()
        else:
            self.oauth_user_info = user_info
            return None</code></pre>
<p>그림의 로직대로 진행하되 code를 쿼리 파라미터로 입력받고,
DB 내 유저 정보의 유무에 따라 조회만 할지 저장 할지 결정합니다.</p>
<pre><code class="language-python"># controller.py

@router.get(
    &quot;/oauth-register/google/callback&quot;,
    status_code=status.HTTP_201_CREATED,
    description=&quot;구글 소셜 로그인&quot;,
)
async def kakao_callback(auth_service: GoogleOAuthService = Depends(GoogleOAuthService)):
    return await auth_service.login()</code></pre>
<pre><code class="language-html">&lt;!DOCTYPE html&gt;
&lt;html lang=&quot;en&quot;&gt;
  &lt;head&gt;
    &lt;meta charset=&quot;UTF-8&quot; /&gt;
    &lt;meta name=&quot;viewport&quot; content=&quot;width=device-width, initial-scale=1.0&quot; /&gt;
    &lt;title&gt;Login Page&lt;/title&gt;
    &lt;style&gt;
      body {
        font-family: Arial, sans-serif;
        background-color: #f5f5f5;
        display: flex;
        align-items: center;
        justify-content: center;
        height: 100vh;
        margin: 0;
      }

      .login-container {
        background-color: #fff;
        padding: 20px 30px;
        border-radius: 8px;
        box-shadow: 0 0 10px rgba(0, 0, 0, 0.1);
        text-align: center;
      }

      .google-login-btn img {
        width: 200px;
        border-radius: 3px;
        transition: opacity 0.3s;
      }

      .google-login-btn:hover img {
        opacity: 0.8;
      }
    &lt;/style&gt;
  &lt;/head&gt;

  &lt;body&gt;
    &lt;div class=&quot;login-container&quot;&gt;
      &lt;a
        href=&quot;https://accounts.google.com/o/oauth2/v2/auth?client_id=639700898145-445d540qksvfnm3tg29mkht55ufkfeuv.apps.googleusercontent.com&amp;redirect_uri=http://localhost:8000/api/v1/auth/oauth-register/google/callback&amp;response_type=code&amp;scope=email profile&quot;
        class=&quot;github-login-btn&quot;
      &gt;
        &lt;img
          src=&quot;https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcReH1nivRV_9yG4wz04xIz1EEh-J69U_2JRaA&amp;s&quot;
          alt=&quot;Social Login&quot;
        /&gt;
      &lt;/a&gt;
    &lt;/div&gt;
  &lt;/body&gt;
&lt;/html&gt;</code></pre>
<p>엔드포인트와 인가 코드를 받아올 간단한 html을 작성한 뒤 테스트 해보겠습니다.
<img src="https://velog.velcdn.com/images/taegong_s/post/1bc571ed-0788-4412-969c-f0c23986ca56/image.png" alt=""></p>
<p>작성한 html 파일을 오픈해서 구글 로고를 눌러줍시다.</p>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/aea6363a-0494-49c9-9a3b-d96b5161a3e7/image.png" alt=""></p>
<p>저는 이미 로그인 해놨었기 때문에, 데이터를 GeneralAuth와 동일한 형태로 받아오는 모습입니다.
로그인이 되어 있지 않다면 구글 계정을 선택하는 화면으로 넘어갈 것 입니다.</p>
]]></description>
        </item>
        <item>
            <title><![CDATA[[십자말풀이] 십자말풀이판 데이터베이스 삽입]]></title>
            <link>https://velog.io/@taegong_s/%EC%8B%AD%EC%9E%90%EB%A7%90%ED%92%80%EC%9D%B4%ED%8C%90-%EB%8D%B0%EC%9D%B4%ED%84%B0%EB%B2%A0%EC%9D%B4%EC%8A%A4-%EC%82%BD%EC%9E%85</link>
            <guid>https://velog.io/@taegong_s/%EC%8B%AD%EC%9E%90%EB%A7%90%ED%92%80%EC%9D%B4%ED%8C%90-%EB%8D%B0%EC%9D%B4%ED%84%B0%EB%B2%A0%EC%9D%B4%EC%8A%A4-%EC%82%BD%EC%9E%85</guid>
            <pubDate>Mon, 21 Oct 2024 18:37:06 GMT</pubDate>
            <description><![CDATA[<p>작업 레포지토리 : <a href="https://github.com/fnzksxl/word-puzzle">https://github.com/fnzksxl/word-puzzle</a></p>
<p>지난 포스트에서는 십자말풀이 게임판을 만드는 작업까지 진행했습니다.
이번 포스트에서는 생성된 십자말풀이 게임판을 DB에 삽입하고,
이를 검색해보는 작업까지 진행해보도록 하겠습니다.</p>
<h2 id="db에-게임판-삽입">DB에 게임판 삽입</h2>
<p>퍼즐과 정답을 저장할 테이블 구조를 먼저 정의해보겠습니다.</p>
<pre><code class="language-python">class Puzzle(BaseMin, Base):
    __tablename__ = &quot;puzzle&quot;

    puzzle = Column(JSON, nullable=False)
    name = Column(VARCHAR(50), nullable=False, default=lambda: str(uuid.uuid4()))


class PuzzleAnswer(BaseMin, Base):
    __tablename__ = &quot;puzzleanswer&quot;

    puzzle_id = Column(Integer, ForeignKey(&quot;puzzle.id&quot;), nullable=False)
    word_id = Column(Integer, ForeignKey(&quot;wordinfo.id&quot;), nullable=False)
    num = Column(Integer, nullable=False)</code></pre>
<h4 id="puzzle">Puzzle</h4>
<p>2차원 리스트로 만들어진 퍼즐은 JSON 형태로 테이블에 넣도록 하겠습니다.
name으로 클라이언트가 식별할 수 있는 컬럼을 하나 추가해줬습니다.
&lt; </p>
<h4 id="answer">Answer</h4>
<p>퍼즐에 할당된 정답은 puzzle_id와 word_id를 외래키로 잡아
어떤 게임판의 정답인지, 정답이 어떤 단어인지를 알 수 있게 만들었습니다.
함께 저장되는 num으로 최초에 게임판 생성할 때와 동일한 데이터를 만들 수 있도록 했습니다.</p>
<blockquote>
<p>word_id도 외래키로 저장되어있어 조회 시에 word 테이블과의 조인 작업이 불가피합니다. 추후에 데이터가 많이 쌓인다면 반정규화도 고려해볼만한 사항입니다.</p>
</blockquote>
<p>DB에 게임판을 삽입하는 로직은 생성 이후에 곧바로 이루어져야 하는 흐름입니다.
따라서, 게임판 생성 서비스에 함수를 추가하겠습니다.</p>
<pre><code class="language-python">async def insert_map_answer_into_db(self) -&gt; None:
        &quot;&quot;&quot;
        생성된 맵과 정답을 DB에 삽입한다.
        &quot;&quot;&quot;
        map_row = Puzzle(puzzle=self.map)
        self.db.add(map_row)
        self.db.flush()

        insert_data = [
            {&quot;puzzle_id&quot;: map_row.id, &quot;word_id&quot;: desc[&quot;id&quot;], &quot;num&quot;: desc[&quot;num&quot;]}
            for desc in self.desc
        ]

        self.db.bulk_insert_mappings(PuzzleAnswer, insert_data)
        self.db.commit()</code></pre>
<p>puzzle을 먼저 <strong>flush</strong>해서 puzzle_id를 추출합니다.
이후에 puzzle_id와 self.desc 속의 num, desc로
여러 row를 한 번에 bulk_insert 합니다.</p>
<h2 id="db에서-게임판-조회">DB에서 게임판 조회</h2>
<h3 id="조회-서비스-구현">조회 서비스 구현</h3>
<p>게임판 조회 서비스 클래스를 생성하고 조회하는 함수를 추가해보겠습니다.</p>
<pre><code class="language-python">class PuzzleReadService:
    def __init__(self, puzzle_id: int, db: Session = Depends(get_db)):
        self.db = db
        self.puzzle_id = puzzle_id

    async def read_puzzle_from_db_by_id(self) -&gt; Dict:
        &quot;&quot;&quot;
        데이터베이스에서 퍼즐 ID로 퍼즐과 정답 정보를 읽어와 반환한다.
        Args:
            puzzle_id (int): 반환할 퍼즐 ID
        Returns:
            Dict: 퍼즐, 정답 정보가 담긴 사전 데이터
        &quot;&quot;&quot;
        puzzle = self.db.query(Puzzle).filter(Puzzle.id == self.puzzle_id).first()
        if puzzle is None:
            raise PuzzleNotExistException()
        answer = (
            self.db.query(PuzzleAnswer.num, WordInfo.pos, WordInfo.desc, WordInfo.word)
            .filter(PuzzleAnswer.puzzle_id == self.puzzle_id)
            .join(WordInfo, PuzzleAnswer.word_id == WordInfo.id)
            .all()
        )

        answer_json = [
            {&quot;num&quot;: num, &quot;desc&quot;: {&quot;pos&quot;: pos, &quot;desc&quot;: desc, &quot;word&quot;: word}}
            for num, pos, desc, word in answer
        ]

        return {&quot;map&quot;: puzzle.puzzle, &quot;desc&quot;: answer_json}</code></pre>
<ol>
<li>puzzle_id을 클라이언트에서 입력받았을 때, 퍼즐만 간단하게 불러옵니다.
예외) puzzle_id인 puzzle이 존재하지 않을 때. </li>
<li>PuzzleAnswer.puzzle_id가 puzzle_id이면서, PuzzleAnswer.word_id가 WordInfo.id인 PuzzleAnswer 객체들을 모두 불러옵니다.</li>
</ol>
<h4 id="예외-처리">예외 처리</h4>
<pre><code class="language-python">from fastapi import HTTPException


class PuzzleNotExistException(HTTPException):
    def __init__(self, detail: str = &quot;존재하지 않는 퍼즐입니다.&quot;):
        super().__init__(status_code=404, detail=detail)
</code></pre>
<p>예외 처리를 담당하는 모듈을 따로 만들어 추가해주었습니다.</p>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/b611db15-5e25-4946-99b4-a90dc76a1cfd/image.png" alt=""></p>
<h4 id="다르게-puzzleanswer를-찾아오는-방법">다르게 PuzzleAnswer를 찾아오는 방법</h4>
<p>SQLAlchemy ORM에서 제공하는 RelationShip을 사용해서 가져올 수 있습니다.</p>
<pre><code class="language-python"># import relationship on models.py
from sqlalchemy.orm import relationship


class WordInfo(BaseMin, Base):
    __tablename__ = &quot;wordinfo&quot;

    word = Column(VARCHAR(5), nullable=False, index=True)
    desc = Column(VARCHAR(200), nullable=False)
    len = Column(Integer, nullable=False)
    pos = Column(VARCHAR(7), nullable=False)

    puzzle_answers = relationship(&quot;PuzzleAnswer&quot;, back_populates=&quot;word&quot;)


class PuzzleAnswer(BaseMin, Base):
    __tablename__ = &quot;puzzleanswer&quot;

    puzzle_id = Column(Integer, ForeignKey(&quot;puzzle.id&quot;), nullable=False)
    word_id = Column(Integer, ForeignKey(&quot;wordinfo.id&quot;), nullable=False)
    num = Column(Integer, nullable=False)

    word = relationship(&quot;WordInfo&quot;, back_populates=&quot;puzzle_answers&quot;)</code></pre>
<p>models.py에서 relationship을 추가해주고,</p>
<pre><code class="language-python">from sqlalchemy.orm import joinedload

answer = (
    self.db.query(PuzzleAnswer)
    .option(joinedload(PuzzleAnswer.wordinfo))
    .filter(PuzzleAnswer.puzzle_id == self.puzzle_id)
    .all()
)

answer_josn = [
    {
        &quot;num&quot;: answer.num,
        &quot;pos&quot;: answer.wordinfo.pos,
        &quot;desc&quot;: answer.wordinfo.desc,
        &quot;word&quot;: answer.wordinfo.word,
    }
    for answer in puzzle_answers
]</code></pre>
<p>위에서 작성한 쿼리문과 반복문을 수정해주었습니다.
joinedload는 eager loading 하도록 해주는 옵션으로
이를 사용해서 불러오지않으면, answer.wordinfo.* 할 때마다
새로운 쿼리를 날리게되므로 N+1문제가 발생할 수 있습니다.</p>
<h3 id="조회-엔드포인트-작성">조회 엔드포인트 작성</h3>
<pre><code class="language-python">@router.get(&quot;/{puzzle_id}&quot;)
async def read_puzzle(
    puzzle_id: int, puzzle_service: PuzzleReadService = Depends(PuzzleReadService)
):
    return await puzzle_service.read_puzzle_from_db_by_id()</code></pre>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/23894a5b-64ca-4cb9-86db-6464237407a8/image.png" alt=""></p>
<p>요청을 보내보면, 잘 받아오는 것을 알 수 있습니다.</p>
<p>다음번에는 회원가입/로그인 기능에 대해서 포스트해보겠습니다.
읽어주셔서 감사합니다.</p>
]]></description>
        </item>
        <item>
            <title><![CDATA[[십자말풀이] 퍼즐 생성 및 성능 개선]]></title>
            <link>https://velog.io/@taegong_s/%EC%8B%AD%EC%9E%90%EB%A7%90%ED%92%80%EC%9D%B4-%ED%8D%BC%EC%A6%90-%EC%83%9D%EC%84%B1-%EB%B0%8F-%EC%84%B1%EB%8A%A5-%EA%B0%9C%EC%84%A0</link>
            <guid>https://velog.io/@taegong_s/%EC%8B%AD%EC%9E%90%EB%A7%90%ED%92%80%EC%9D%B4-%ED%8D%BC%EC%A6%90-%EC%83%9D%EC%84%B1-%EB%B0%8F-%EC%84%B1%EB%8A%A5-%EA%B0%9C%EC%84%A0</guid>
            <pubDate>Thu, 17 Oct 2024 17:25:21 GMT</pubDate>
            <description><![CDATA[<p>작업 레포지토리 : <a href="https://github.com/fnzksxl/word-puzzle">https://github.com/fnzksxl/word-puzzle</a></p>
<p>지난 번 포스트에서 단어 데이터 추출 후 데이터베이스 삽입까지 했습니다.</p>
<p>이번 포스트에서는 단어 데이터들을 데이터베이스에서 선택해 십자말풀이 퍼즐을
만들어보도록 하겠습니다.</p>
<h4 id="모델-변경점">모델 변경점</h4>
<pre><code class="language-python">class WordInfo(BaseMin, Base):
    __tablename__ = &quot;wordinfo&quot;

    word = Column(VARCHAR(5), nullable=False)
    desc = Column(VARCHAR(200), nullable=False)
    len = Column(Integer, nullable=False)
    pos = Column(VARCHAR(7), nullable=False)
</code></pre>
<p>시작하기에 앞서 지난 번 모델에서 단어 의 최대 길이가 5로 감소했습니다.</p>
<h2 id="퍼즐-데이터-형식">퍼즐 데이터 형식</h2>
<blockquote>
<ol>
<li>퍼즐의 크기는 정사각형이다</li>
<li>단어는 위-&gt;아래, 왼-&gt;오 방향으로만 추가된다.</li>
<li>연결된 단어를 제외하고 단어끼리는 상,하,좌,우로 접하지 않는다.</li>
</ol>
</blockquote>
<p>위 세가지를 기본 전제로 퍼즐을 구성하기로 했습니다.</p>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/526af048-449a-4ffa-9df0-6ffad236abe9/image.png" alt="">
위 그림은 (7x7) 형태의 제가 생각하는 퍼즐의 최종 모습을 메모장으로 작성해본 샘플입니다.</p>
<p>이러한 데이터를 구성하기 위해서 어떤 데이터 형식을 취해볼까요?</p>
<p>저는 퍼즐의 각 단어 배치를 2차원 배열로 처리하기로 했습니다.
(7x7) 형태의 정수 0으로 이루어진 리스트를 생성하고, 단어가 추가될 때마다
숫자가 +1 되면서 마킹하기로 말이죠.</p>
<p>그리고 각 단어에 대해 {번호, 설명}, {번호, 단어} 자료형을 가지는 리스트로 관리해보겠습니다.</p>
<h2 id="퍼즐-생성">퍼즐 생성</h2>
<h3 id="phase1-첫-단어-배치">Phase1: 첫 단어 배치</h3>
<p>각 함수들이 의존성 주입 형태로 배치되어있기 때문에,
가장 아래 함수부터 매개변수를 거슬러올라가며 보는 게 읽기 쉬울 겁니다.</p>
<pre><code class="language-python">import random
from collections import deque
from fastapi import Depends
from sqlalchemy.orm.session import Session
from typing import List, Tuple, Deque, Dict
from app.models import WordInfo
from app.database import get_db


async def append_letter_into_queue(
    start_point: Tuple[int, int], length: int, word: str, queue: deque, dir: bool
) -&gt; Deque:
    &quot;&quot;&quot;
    시작점, 길이, 단어, 방향을 토대로 큐에 다음 단어 추가에 대한 힌트를 큐에 삽입 후 반환한다.
    Args:
        start_point (Tuple): 단어 첫 음절이 위치하는 곳
        length (int): 단어의 길이
        word (str): 단어
        queue (deque): 큐
        dir (bool): 단어 방향 가로, 세로 여부
    Returns:
        Deque: 다음 단어의 시작점, 시작 음절, 방향이 추가된 큐
    &quot;&quot;&quot;
    def add_to_queue(coord: Tuple[int, int], letter: str, dir: bool) -&gt; None:
        if 0 &lt;= coord[0] &lt; 11 and 0 &lt;= coord[1] &lt; 11:
            queue.append(((coord[0], coord[1]), letter, not (dir)))
    if dir:
        add_to_queue((start_point[0], start_point[1] + length - 1), word[-1], dir)
    else:
        add_to_queue((start_point[0] + length - 1, start_point[1]), word[-1], dir)
    if length &gt; 2:
        if length &gt;= 5:
            if dir:
                add_to_queue((start_point[0], start_point[1] + 2), word[2], dir)
            else:
                add_to_queue((start_point[0] + 2, start_point[1]), word[2], dir)
        add_to_queue(start_point, word[0], dir)
    return queue

async def create_map() -&gt; List[str]:
    &quot;&quot;&quot;
    0으로 초기화 된 맵을 생성해 반환한다.
    Args:
        None
    Returns:
        List[str]
    &quot;&quot;&quot;
    map = [[0 for _ in range(11)] for _ in range(11)]
    return map

async def find_first_word_info(db: Session = Depends(get_db)) -&gt; WordInfo:
    &quot;&quot;&quot;
    DB에서 랜덤으로 하나의 단어를 찾아 반환한다.
    Args:
        db (Session): 커넥션
    Returns:
        WordInfo: 랜덤으로 가져온 WordInfo Row 객체
    &quot;&quot;&quot;
    random_idx = random.randint(1, 494047)
    return db.query(WordInfo).filter(WordInfo.id == random_idx).first()

async def create_puzzle_phase1(
    word: WordInfo = Depends(find_first_word_info), map: List[str] = Depends(create_map)
) -&gt; Dict:
    &quot;&quot;&quot;
    0으로 초기화 된 맵에 첫 단어를 추가,
    이 후 추가될 단어와 그 위치에 대한 힌트를 큐에 삽입,
    첫 단어 설명 추가 후 반환한다.
    Args:
        word (WordInfo): 단어 정보
        map (List[str]): 0으로 초기화 된 맵
    Returns:
        Dict: {map(List[str]), queue(deque), desc(dict)}
    &quot;&quot;&quot;
    start_y, start_x = 0, 0
    queue = deque()
    desc = []
    words = []
    for i in range(word.len):
        map[start_y][start_x + i] = 1
    words.append({&quot;num&quot;: 1, &quot;word&quot;: word.word})
    desc.append({&quot;num&quot;: 1, &quot;desc&quot;: word.desc})
    queue = await append_letter_into_queue((start_y, start_x), word.len, word.word, queue, True)
    return {&quot;map&quot;: map, &quot;queue&quot;: queue, &quot;desc&quot;: desc, &quot;words&quot;: words}</code></pre>
<p>위 함수들을 간단하게 요약하자면, 10x10 퍼즐에 (0,0), 최좌상단에서 가로 방향으로 단어를 하나 배치하고, 단어의 길이에 따라 큐에 다음 단어의 시작점 좌표를 추가해줬다는 겁니다.
단어 배치 시 0으로 초기화 된 맵에 단어 위치에 해당하는 좌표를 1로 바꿨습니다.</p>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/993ebd92-9d81-45d6-813b-ddf19ccefbfc/image.png" alt=""></p>
<p>위 상황을 그림으로 표현해봤습니다. 오른쪽 그림의 빨간 좌표가 현재 큐에 들어가있는 좌표입니다. 세로로 표기된 것은 편의상 배치한 것이고, 실제 코드 상에서는 세로는 False, 가로는 True 입니다.</p>
<h3 id="phase2-첫-단어에서-이어지는-단어들-배치">Phase2: 첫 단어에서 이어지는 단어들 배치</h3>
<p>Phase1의 첫 단어 배치가 끝나면 이제 첫 단어에서 이어지도록 단어를 계속 배치해야합니다.</p>
<p>위 그림을 예시로 들면 &quot;고&quot;와&quot;래&quot;로 시작하는 단어가 되겠네요.</p>
<pre><code class="language-python">async def find_word_info_start_with(start_word: str, db: Session) -&gt; WordInfo:
    &quot;&quot;&quot;
    DB에서 {start_word}로 시작하는 단어를 찾아 반환한다.
    Args:
        db (Session): 커넥션
        start_word (str): 시작 어절
    Returns:
        WordInfo: {start_word}로 시작하는 WordInfo Row 객체
    &quot;&quot;&quot;
    return (
        db.query(WordInfo)
        .filter(WordInfo.word.like(f&quot;{start_word}%&quot;))
        .order_by(func.rand())
        .first()
    )</code></pre>
<p>위 함수는 DB에서 {start_word}로 시작하는 단어를 검색해서 랜덤 추출합니다.</p>
<pre><code class="language-python">async def create_puzzle_phase2(
    db: Session = Depends(get_db), map_queue_desc: Dict = Depends(create_puzzle_phase1)
) -&gt; Dict:
    &quot;&quot;&quot;
    단어 하나가 삽입된 상태에서, 시작 어절이 같은 단어를 찾아 추가한다.
    단, 진행방향의 삼면에 이미 다른 단어의 어절이 없어야 한다.
    이 함수는 두 번째 페이즈로, 처음 추가된 단어와 이어지는 경우에만 단어를 추가한다.
    Args:
        db (Session): 커넥션
        map_queue_desc (Dict): 맵, 큐, 설명 및 단어가 들어있는 사전형 자료
    Returns:
        Dict: 맵, 큐, 설명 및 단어가 들어있는 사전형 자료
    &quot;&quot;&quot;
    map = map_queue_desc.get(&quot;map&quot;)
    queue = map_queue_desc.get(&quot;queue&quot;)
    desc = map_queue_desc.get(&quot;desc&quot;)
    words = map_queue_desc.get(&quot;words&quot;)
    num = 2
    while queue:
        point, start_word, dir = queue.popleft()
        next_word = await find_word_info_start_with(start_word, db)
        if not next_word:
            continue
        if (next_word) and (
            (dir and point[1] + next_word.len &lt; 11) or (not dir and point[0] + next_word.len &lt; 11)
        ):
            if point[1] &gt;= 1:
                if dir and map[point[0]][point[1] - 1] != 0:
                    continue
            if point[0] &gt;= 1:
                if not dir and map[point[0] - 1][point[1]] != 0:
                    continue
            original_value = map[point[0]][point[1]]
            for i in range(next_word.len):
                if dir:
                    if i and map[point[0]][point[1] + i] != 0:
                        for j in range(i):
                            map[point[0]][point[1] + j] = original_value
                        break
                    if i and (
                        (point[1] + i + 1 &lt; 11 and map[point[0]][point[1] + i + 1] != 0)
                        or (point[0] - 1 &gt;= 0 and map[point[0] - 1][point[1] + i] != 0)
                        or (point[0] + 1 &lt; 11 and map[point[0] + 1][point[1] + i] != 0)
                    ):
                        for j in range(1, i):
                            map[point[0]][point[1] + j] = 0
                        map[point[0]][point[1]] = original_value
                        break
                    map[point[0]][point[1] + i] = num
                else:
                    if i and map[point[0] + i][point[1]] != 0:
                        for j in range(i):
                            map[point[0] + j][point[1]] = original_value
                        break
                    if i and (
                        (point[1] - 1 &gt;= 0 and map[point[0] + i][point[1] - 1] != 0)
                        or (point[1] + 1 &lt; 11 and map[point[0] + i][point[1] + 1] != 0)
                        or (point[0] + i + 1 &lt; 11 and map[point[0] + i + 1][point[1]] != 0)
                    ):
                        for j in range(1, i):
                            map[point[0] + j][point[1]] = 0
                        map[point[0]][point[1]] = original_value
                        break
                    map[point[0] + i][point[1]] = num
            else:
                words.append({&quot;num&quot;: num, &quot;word&quot;: next_word.word})
                desc.append({&quot;num&quot;: num, &quot;desc&quot;: next_word.desc})
                num += 1
                queue = await append_letter_into_queue(
                    point, next_word.len, next_word.word, queue, dir
                )
    return {&quot;map&quot;: map, &quot;desc&quot;: desc, &quot;words&quot;: words}</code></pre>
<p>이 함수에서 검색하는 조건은 Docstring에 적혀있는 바와 동일합니다.
어차피 갈아엎을 함수이니 자세히 읽진 않아도 됩니다.
왜 갈아엎어야 할까요? 조건식이 너무 복잡해보여서?
그것도 맞지만 이 로직에는 커다란 문제점이 하나 있습니다.
조건식이 아닌 부분에서 잠시 생각해보시길 바랍니다.</p>
<br>
<br>
<br>
<br>
......
<br>
<br>
<br>
<br>


<h4 id="불필요한-db-호출">불필요한 DB 호출</h4>
<p>create_puzzle_phase2 함수에서 DB호출하는 부분을 한 번 확인해보시길 바랍니다.</p>
<blockquote>
<ol>
<li>큐에서 꺼내 단어를 DB에서 가져온다.</li>
<li>현재 퍼즐에 단어를 넣을 수 있는지 조건을 검사한다.</li>
</ol>
</blockquote>
<p>위의 로직은 DB에서 단어 데이터를 불러온 뒤에 그 단어가 퍼즐에 추가할 수 있는지 조건을 검사하고 있습니다.</p>
<p>하지만 순서를 바꿔 큐에서 꺼낸 좌표에서 <strong>가능한 단어의 길이를 먼저 계산</strong>한 후에 DB에서 해당 단어를 가져오는 로직이라면 성능적으로 이득을 볼 수 있다고 판단이 됩니다.</p>
<p>조건 계산 로직을 분리하고 변경된 로직에 맞게 코드를 수정해보죠.</p>
<pre><code class="language-python">async def find_word_info_start_with(start_word: str, limit: int, db: Session) -&gt; WordInfo:
        &quot;&quot;&quot;
        DB에서 {start_word}로 시작하면서 길이가 limit 이하인 단어를 찾아 반환한다.
        Args:
            start_word (str): 시작 어절
            limit (int): 가능한 최대 길이
        Returns:
            WordInfo: {start_word}로 시작하면서 길이가 limit 이하인 WordInfo Row 객체
        &quot;&quot;&quot;
        return (
            db.query(WordInfo)
            .filter(WordInfo.word.like(f&quot;{start_word}%&quot;))
            .filter(WordInfo.len &lt;= limit)
            .order_by(func.rand())
            .first()
        )


async def inspect_possible_length(map: List[int], point: Tuple[int, int], dir: bool) -&gt; int:
    &quot;&quot;&quot;
    추가할 수 있는 단어의 길이를 반환해주는 함수
    Args:
        map (List[int]): 퍼즐 맵 정보
        point (Tuple[int, int]): 새로운 단어의 시작 지점
        dir (bool): 가로,세로 여부
    Returns:
        Int: 추가할 수 있는 단어의 길이
    &quot;&quot;&quot;
    if point[1] &gt;= 1:
        if dir and map[point[0]][point[1] - 1] != 0:
            return 0
    if point[0] &gt;= 1:
        if not dir and map[point[0] - 1][point[1]] != 0:
            return 0
    if dir:
        for i in range(1, 5):
            if (
                (point[0] - 1 &gt;= 0 and map[point[0] - 1][point[1] + i] != 0)
                or (point[0] + 1 &lt; len(map) and map[point[0] + 1][point[1] + i] != 0)
                or (point[1] + i &lt; len(map[0]) - 1 and map[point[0]][point[1] + i + 1] != 0)
                or (point[1] + i == len(map[0]) - 1)
            ):
                return i
    else:
        for i in range(1, 5):
            if (
                (point[1] - 1 &gt;= 0 and map[point[0] + i][point[1] - 1] != 0)
                or (point[1] + 1 &lt; len(map[0]) and map[point[0] + i][point[1] + 1] != 0)
                or (point[0] + i &lt; len(map) - 1 and map[point[0] + i + 1][point[1]] != 0)
                or (point[0] + i == len(map) - 1)
            ):
                return i
    return 5

async def create_puzzle_phase2(db: Session = Depends(get_db), map_queue_desc: Dict = Depends(create_puzzle_phase1)) -&gt; Dict:
    &quot;&quot;&quot;
    단어 하나가 삽입된 상태에서, 시작 어절이 같은 단어를 찾아 추가한다.
    단, 진행방향의 삼면에 이미 다른 단어의 어절이 없어야 한다.
    이 함수는 두 번째 페이즈로, 처음 추가된 단어와 이어지는 경우에만 단어를 추가한다.

    Args:
        db (Session): 커넥션
        map_queue_desc (Dict): 맵, 큐, 설명 및 단어가 들어있는 사전형 자료
    Returns:
        Dict: 맵, 큐, 설명 및 단어가 들어있는 사전형 자료
    &quot;&quot;&quot;
    map = map_queue_desc.get(&quot;map&quot;)
    queue = map_queue_desc.get(&quot;queue&quot;)
    desc = map_queue_desc.get(&quot;desc&quot;)
    words = map_queue_desc.get(&quot;words&quot;)
    num = 2
    while queue:
        point, start_word, dir = queue.popleft()
        limit = await inspect_possible_length(map, point, dir)

        if limit &lt;= 1:
            continue
        next_word = await find_word_info_start_with(start_word, limit, db)
        if not next_word:
            continue
        for i in range(next_word.len):
            if dir:
                map[point[0]][point[1] + i] = num
            else:
                map[point[0] + i][point[1]] = num

        words.append({&quot;num&quot;: num, &quot;word&quot;: next_word.word})
        desc.append({&quot;num&quot;: num, &quot;desc&quot;: next_word.desc}) 
        num += 1

        queue = await append_letter_into_queue(point, next_word.len, next_word.word, queue, dir)

    return {&quot;map&quot;: map, &quot;desc&quot;: desc, &quot;words&quot;: words}</code></pre>
<p>이렇게 바꾸는게 성능적으로 실제로 이득이 있는지 확인해보겠습니다.</p>
<pre><code class="language-python">from fastapi import APIRouter, status, Depends
from typing import Dict
from app.utils.puzzle import create_puzzle_phase2

router = APIRouter(tags=[&quot;PuzzleV1&quot;], prefix=&quot;/puzzle&quot;)

@router.get(&quot;&quot;, status_code=status.HTTP_200_OK)
async def create_puzzle(puzzle: Dict = Depends(create_puzzle_phase2)):
    return await puzzle</code></pre>
<p>엔드포인트 하나 만들어주고,</p>
<pre><code class="language-python"># main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request
import time

from app.api import router


@asynccontextmanager
async def lifespan(app: FastAPI):
    from app import models
    from app.database import engine

    models.Base.metadata.create_all(bind=engine)

    yield

    pass


app = FastAPI(lifespan=lifespan)
app.include_router(router)


@app.middleware(&quot;http&quot;)
async def log_request_time(request: Request, call_next):
    start_time = time.time()

    response = await call_next(request)

    process_time = time.time() - start_time
    response.headers[&quot;X-Process-Time&quot;] = str(process_time)

    return response</code></pre>
<p>main.py에 response time을 측정할 수 있도록 미들웨어를 하나 달아주겠습니다.
FastAPI에서 자동으로 생성해주는 Swagger 문서에서 시간 차이를 보겠습니다.</p>
<h4 id="변경-전-로직">변경 전 로직</h4>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/b516998c-8daa-4dc3-a4f0-729e3d0f9dc5/image.png" alt="">
단어 12개 배치에 약 4.8초 정도 걸렸네요.</p>
<h4 id="변경-후-로직">변경 후 로직</h4>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/03021e7b-103b-4396-8883-cfc73839c7ec/image.png" alt="">
단어 20개 배치에 약 4.9초 정도 걸렸습니다.</p>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/c31ff929-4cc5-4118-8bd7-2a9243e4cbee/image.png" alt="">
같은 단어 수인 12개를 배치할 때 약 2.6초 정도 걸려, 45% 정도의 향상률을 보여줍니다.</p>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/3702b796-e44a-443b-a0fc-bdcb4d18fb98/image.png" alt=""></p>
<p>로직을 이해하기 쉽도록 그림으로 표현해봤습니다.</p>
<h4 id="참고-함수형---클래스로-변경">참고) 함수형 -&gt; 클래스로 변경</h4>
<p>클래스의 생성자도 Depends로 종속성 주입이 가능합니다.
PuzzleCreateService 생성자에 Optional[int] = None을 선언해
Query 파라미터로 퍼즐의 크기를 입력받을 수 있게 했습니다.
<img src="https://velog.velcdn.com/images/taegong_s/post/096efe71-409d-4656-892d-411f6144bb05/image.png" alt=""></p>
<pre><code class="language-python">class PuzzleCreateService:
    def __init__(self, size: Optional[int] = None, db: Session = Depends(get_db)):
        &quot;&quot;&quot;
        퍼즐의 크기를 받아 맵을 생성한다. 기본 값은 7x7
        Args:
            size (int or None): 퍼즐의 크기 + 1
        &quot;&quot;&quot;
        self.db = db
        self.map_size = size + 1 if size is not None else 8
        self.map = self.create_map()
        self.queue = deque()
        self.desc = []
        self.words = []
        self.num = 2
        self.word_size = 494047
    def create_map(self) -&gt; List[str]:
        &quot;&quot;&quot;
        0으로 초기화 된 맵을 생성해 반환한다.
        &quot;&quot;&quot;
        return [[0 for _ in range(self.map_size)] for _ in range(self.map_size)]
    async def append_letter_into_queue(
        self, start_point: Tuple[int, int], length: int, word: str, dir: bool
    ) -&gt; Deque:
        &quot;&quot;&quot;
        시작점, 길이, 단어, 방향을 토대로 큐에 다음 단어 추가에 대한 힌트를 큐에 삽입 후 반환한다.
        Args:
            start_point (Tuple): 단어 첫 음절이 위치하는 곳
            length (int): 단어의 길이
            word (str): 단어
            dir (bool): 단어 방향 가로, 세로 여부
        Returns:
            Deque: 다음 단어의 시작점, 시작 음절, 방향이 추가된 큐
        &quot;&quot;&quot;
        def add_to_queue(coord: Tuple[int, int], letter: str, dir: bool) -&gt; None:
            if 0 &lt;= coord[0] &lt; self.map_size and 0 &lt;= coord[1] &lt; self.map_size:
                self.queue.append(((coord[0], coord[1]), letter, not (dir)))
        if dir:
            add_to_queue((start_point[0], start_point[1] + length - 1), word[-1], dir)
        else:
            add_to_queue((start_point[0] + length - 1, start_point[1]), word[-1], dir)
        if length &gt; 2:
            if length &gt;= 5:
                if dir:
                    add_to_queue((start_point[0], start_point[1] + 2), word[2], dir)
                else:
                    add_to_queue((start_point[0] + 2, start_point[1]), word[2], dir)
            add_to_queue(start_point, word[0], dir)
        return self.queue
    async def find_first_word_info(self) -&gt; WordInfo:
        &quot;&quot;&quot;
        DB에서 랜덤으로 하나의 단어를 찾아 반환한다.
        Returns:
            WordInfo: 랜덤으로 가져온 WordInfo Row 객체
        &quot;&quot;&quot;
        random_idx = random.randint(1, self.word_size)
        return self.db.query(WordInfo).filter(WordInfo.id == random_idx).first()
    async def find_word_info_start_with(self, start_word: str, limit: int) -&gt; WordInfo:
        &quot;&quot;&quot;
        DB에서 {start_word}로 시작하면서 길이가 limit 이하인 단어를 찾아 반환한다.
        Args:
            start_word (str): 시작 어절
            limit (int): 가능한 최대 길이
        Returns:
            WordInfo: {start_word}로 시작하면서 길이가 limit 이하인 WordInfo Row 객체
        &quot;&quot;&quot;
        return (
            self.db.query(WordInfo)
            .filter(WordInfo.word.like(f&quot;{start_word}%&quot;))
            .filter(WordInfo.len &lt;= limit)
            .order_by(func.rand())
            .first()
        )
    async def create_puzzle_phase1(self) -&gt; None:
        &quot;&quot;&quot;
        0으로 초기화 된 맵에 첫 단어를 추가,
        이 후 추가될 단어와 그 위치에 대한 힌트를 큐에 삽입,
        첫 단어 설명 추가
        &quot;&quot;&quot;
        word = await self.find_first_word_info()
        start_y, start_x = 0, 0
        for i in range(word.len):
            self.map[start_y][start_x + i] = 1
        self.words.append({&quot;num&quot;: 1, &quot;word&quot;: word.word})
        self.desc.append({&quot;num&quot;: 1, &quot;desc&quot;: word.desc})
        self.queue = await self.append_letter_into_queue(
            (start_y, start_x), word.len, word.word, True
        )
    async def inspect_possible_length(self, point: Tuple[int, int], dir: bool) -&gt; int:
        &quot;&quot;&quot;
        단어의 시작 좌표와 방향으로 가능한 단어의 최대 길이를 반환한다.
        Args:
            point (Tuple[int,int]): 단어가 시작될 좌표
            dir (bool): 단어의 가로, 세로 여부
        Returns:
            int: 가능한 단어의 길이
        &quot;&quot;&quot;
        y, x = point
        if x &gt;= 1:
            if dir and self.map[y][x - 1] != 0:
                return 0
        if y &gt;= 1:
            if not dir and self.map[y - 1][x] != 0:
                return 0
        if dir:
            for i in range(1, 5):
                if (
                    (y - 1 &gt;= 0 and self.map[y - 1][x + i] != 0)
                    or (y + 1 &lt; self.map_size and self.map[y + 1][x + i] != 0)
                    or (x + i &lt; self.map_size - 1 and self.map[y][x + i + 1] != 0)
                    or (x + i == self.map_size - 1)
                ):
                    return i
        else:
            for i in range(1, 5):
                if (
                    (x - 1 &gt;= 0 and self.map[y + i][x - 1] != 0)
                    or (x + 1 &lt; self.map_size and self.map[y + i][x + 1] != 0)
                    or (y + i &lt; self.map_size - 1 and self.map[y + i + 1][x] != 0)
                    or (y + i == self.map_size - 1)
                ):
                    return i
        return 5
    async def create_puzzle_phase2(self) -&gt; Dict:
        &quot;&quot;&quot;
        단어 하나가 삽입된 상태에서, 시작 어절이 같은 단어를 찾아 추가한다.
        단, 진행방향의 삼면에 이미 다른 단어의 어절이 없어야 한다.
        이 함수는 두 번째 페이즈로, 처음 추가된 단어와 이어지는 경우에만 단어를 추가한다.
        Args:
            db (Session): 커넥션
            map_queue_desc (Dict): 맵, 큐, 설명 및 단어가 들어있는 사전형 자료
        Returns:
            Dict: 맵, 큐, 설명 및 단어가 들어있는 사전형 자료
        &quot;&quot;&quot;
        await self.create_puzzle_phase1()
        while self.queue:
            point, start_word, dir = self.queue.popleft()
            limit = await self.inspect_possible_length(point, dir)
            if limit &lt;= 1:
                continue
            next_word = await self.find_word_info_start_with(start_word, limit)
            if not next_word:
                continue
            for i in range(next_word.len):
                if dir:
                    self.map[point[0]][point[1] + i] = self.num
                else:
                    self.map[point[0] + i][point[1]] = self.num
            self.words.append({&quot;num&quot;: self.num, &quot;word&quot;: next_word.word})
            self.desc.append({&quot;num&quot;: self.num, &quot;desc&quot;: next_word.desc})
            self.num += 1
            self.queue = await self.append_letter_into_queue(
                point, next_word.len, next_word.word, dir
            )
        return {&quot;map&quot;: self.map, &quot;desc&quot;: self.desc, &quot;words&quot;: self.words}</code></pre>
<h3 id="phase3-남은-공간-채우기">Phase3: 남은 공간 채우기</h3>
<p>남은 공간을 최대한 채우기 위해 퍼즐을 순회하며 아직 0인 좌표에 대해서 단어를 추가할 수 있는지 검증한 뒤 Phase2처럼 가능한 이어서 붙이기로 했습니다.</p>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/a6aaa114-edf3-4756-a8fe-2f0d45d7c5e1/image.png" alt=""></p>
<pre><code class="language-python"># 가능한 단어 길이 검사 함수 로직 추가
async def inspect_possible_length(
        self, point: Tuple[int, int], dir: bool, phase: int = 2
    ) -&gt; int:
        &quot;&quot;&quot;
        단어의 시작 좌표와 방향으로 가능한 단어의 최대 길이를 반환한다.

        Args:
            point (Tuple[int,int]): 단어가 시작될 좌표
            dir (bool): 단어의 가로, 세로 여부
            phase (int): 퍼즐 생성 단계
        Returns:
            int: 가능한 단어의 길이
        &quot;&quot;&quot;
        y, x = point

        if phase == 3:
            if y &gt; 0 and self.map[y - 1][x] != 0:
                return 0
            if y &lt; self.map_size - 1 and self.map[y + 1][x] != 0:
                return 0
            if x &gt; 0 and self.map[y][x - 1] != 0:
                return 0
            if x &lt; self.map_size - 1 and self.map[y][x + 1] != 0:
                return 0
        if x &gt;= 1:
            if dir and self.map[y][x - 1] != 0:
                return 0
        if y &gt;= 1:
            if not dir and self.map[y - 1][x] != 0:
                return 0</code></pre>
<p>Phase3에서는 단어 시작하는 곳에서도 상하좌우를 확인해야합니다.</p>
<pre><code class="language-python">async def fill_puzzle_until_queue_empty(self) -&gt; None:
        &quot;&quot;&quot;
        큐가 빌 때까지 퍼즐에 단어를 추가시킨다.
        &quot;&quot;&quot;
        while self.queue:
            point, start_word, dir = self.queue.popleft()
            limit = await self.inspect_possible_length(point, dir)

            if limit &lt;= 1:
                continue
            next_word = await self.find_word_info_start_with(start_word, limit)

            if not next_word:
                continue
            for i in range(next_word.len):
                if dir:
                    self.map[point[0]][point[1] + i] = self.num
                else:
                    self.map[point[0] + i][point[1]] = self.num

            self.words.append({&quot;num&quot;: self.num, &quot;word&quot;: next_word.word})
            self.desc.append({&quot;num&quot;: self.num, &quot;desc&quot;: next_word.desc})
            self.num += 1

            self.queue = await self.append_letter_into_queue(
                point, next_word.len, next_word.word, dir
            )

    async def create_puzzle_phase2(self) -&gt; None:
        &quot;&quot;&quot;
        단어 하나가 삽입된 상태에서, 시작 어절이 같은 단어를 찾아 추가한다.
        단, 진행방향의 삼면에 이미 다른 단어의 어절이 없어야 한다.
        이 함수는 두 번째 페이즈로, 처음 추가된 단어와 이어지는 경우에만 단어를 추가한다.
        &quot;&quot;&quot;
        await self.create_puzzle_phase1()
        await self.fill_puzzle_until_queue_empty()</code></pre>
<p>Phase2에서 큐가 빌 때까지 단어 추가해주던 로직을 추출했고,</p>
<pre><code class="language-python">async def create_puzzle_phase3(self) -&gt; Dict:
        &quot;&quot;&quot;
        퍼즐의 비어있는 공간에 단어를 추가한다.
        Returns:
            Returns:
            Dict: 맵, 설명 및 단어가 들어있는 사전형 자료
        &quot;&quot;&quot;
        await self.create_puzzle_phase2()
        for i in range(self.map_size - 1):
            for j in range(self.map_size - 1):
                if self.map[i][j] == 0:
                    hor = await self.inspect_possible_length((i, j), True, phase=3)
                    ver = await self.inspect_possible_length((i, j), False, phase=3)
                    dir = True if hor &gt; ver else False
                    if not (hor and ver):
                        continue
                    else:
                        word = await self.find_word_info_start_with(&quot;&quot;, hor if hor &gt; ver else ver)
                        if not word:
                            continue
                        for k in range(word.len):
                            if dir:
                                self.map[i][j + k] = self.num
                            else:
                                self.map[i + k][j] = self.num

                        self.words.append({&quot;num&quot;: self.num, &quot;word&quot;: word.word})
                        self.desc.append({&quot;num&quot;: self.num, &quot;desc&quot;: word.desc})
                        self.num += 1
                        self.queue = await self.append_letter_into_queue(
                            (i, j), word.len, word.word, dir
                        )
                        await self.fill_puzzle_until_queue_empty()

        return {&quot;map&quot;: self.map, &quot;desc&quot;: self.desc, &quot;words&quot;: self.words}</code></pre>
<p>추출한 로직을 Phase3에서도 사용하도록 구성했습니다.</p>
<p>저는 이전에 Class형태로 의존성을 주입하도록 코드를 리팩토링 했기 때문에
엔드포인트도 수정이 필요합니다.</p>
<pre><code class="language-python">from fastapi import APIRouter, status, Depends
from app.utils.puzzle import PuzzleCreateService

router = APIRouter(tags=[&quot;PuzzleV1&quot;], prefix=&quot;/puzzle&quot;)


@router.get(&quot;&quot;, status_code=status.HTTP_200_OK)
async def create_puzzle(puzzle: PuzzleCreateService = Depends(PuzzleCreateService)):
    return await puzzle.create_puzzle_phase3()</code></pre>
<h2 id="추가적인-성능-개선">추가적인 성능 개선</h2>
<p>Phase3까지 모두 돌아갔을 때 어느정도 시간이 걸리는지 확인해볼까요?
위의 시간 테스트에서 맵 크기를 10x10으로 했기 때문에 동일하게 하겠습니다. 직접 해보실 분들은 Swagger 문서에서 size 파라미터에 10 적고 해보시면 됩니다.</p>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/cb0d7489-4d09-4ef4-b00e-7d352d2ce442/image.png" alt=""></p>
<p>단어가 총 29개나 들어가있네요. 단어 수가 많기도 하지만 게임판을 생성하는데에 약 8.5초나 걸리는건 꽤나 아쉽습니다.</p>
<p><strong>성능을 한 번 더 개선해봅시다.</strong></p>
<pre><code class="language-sql">EXPLAIN SELECT * FROM wordinfo
WHERE word LIKE &#39;마%&#39; 
AND len &lt;= 5
ORDER BY RAND()
LIMIT 1;</code></pre>
<p>&quot;마&quot;로 시작하면서 길이가 5 이하인 단어를 찾을 때,
SQLAlchemy ORM을 통해 생성될 SQL ROW Query의 실행 계획을 확인해봅시다.</p>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/ea10b5ce-8de0-40b9-8d7b-b1e079fb9ace/image.png" alt=""></p>
<p>확인해봐야할 row가 47만개가 넘어가면서, 탐색해야할 범위가 큰 것을 알 수 있습니다. 실제로 저 SQL문을 실행하면 약 0.25초가 걸립니다.</p>
<p>탐색 범위를 좁히기 위해 인덱스를 테이블에 걸어주는 방법이 있습니다.
우리는 word와 len에 조건을 걸어서 탐색하고 있습니다.
그런데 인덱스는 카디널리티가 클 수록 성능이 좋아지는 경향이 있습니다.
len은 2~5까지의 범위뿐이라 카디널리티가 매우 작습니다.
따라서, word에 인덱스를 걸어보도록 하죠.</p>
<pre><code class="language-python">class WordInfo(BaseMin, Base):
    __tablename__ = &quot;wordinfo&quot;

    word = Column(VARCHAR(5), nullable=False, index=True)
    desc = Column(VARCHAR(200), nullable=False)
    len = Column(Integer, nullable=False)
    pos = Column(VARCHAR(7), nullable=False)</code></pre>
<p>모델을 수정하고 alembic으로 마이그레이션하는 방법이 있고,</p>
<pre><code class="language-sql">CREATE INDEX ix_wordinfo_word ON wordinfo(word);</code></pre>
<p>MySQL에서 직접 인덱스를 생성하는 방법도 있습니다.</p>
<p>인덱스 생성 후에 실행 계획을 다시 살펴보겠습니다.</p>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/d8b7a5e1-4b8f-4b1b-93c5-ccd9f92ea9b2/image.png" alt=""></p>
<p>탐색해야 할 row가 현저히 줄어들었습니다. 위에서는 null 값이었던 key에 저희가 생성해준 인덱스도 들어있는 것을 확인할 수 있습니다.</p>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/e9dff4fc-7dff-43a8-ac34-f0abe94062a2/image.png" alt=""></p>
<p>실제로 조회 성능이 0.25초에서 0.015초로 무려 약 94%나 향상됐네요.
다시 한 번 퍼즐을 생성해봅시다.</p>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/0d374ae6-1193-4a8a-b964-f89515e358fa/image.png" alt=""></p>
<p>단어 수가 1개 적은 28개이긴 하지만, 약 3.4초 정도로 나왔습니다.
인덱스를 생성 이전 8.5초에서 생성 후 3.4초로 약 60%의 향상률을 보였습니다.</p>
<p>다음 포스트에서는 결과적으로 생성된 데이터의 형태를 알아보고
데이터베이스에 추가하는 작업을 해보겠습니다.
읽어주셔서 감사합니다.</p>
]]></description>
        </item>
        <item>
            <title><![CDATA[[십자말풀이] FastAPI 서버 초기 세팅 및 정제 데이터 데이터베이스 삽입]]></title>
            <link>https://velog.io/@taegong_s/%EC%84%9C%EB%B2%84-%EC%B4%88%EA%B8%B0-%EC%84%B8%ED%8C%85-%EB%B0%8F-%EC%A0%95%EC%A0%9C-%EB%8D%B0%EC%9D%B4%ED%84%B0-%EB%8D%B0%EC%9D%B4%ED%84%B0%EB%B2%A0%EC%9D%B4%EC%8A%A4-%EC%82%BD%EC%9E%85</link>
            <guid>https://velog.io/@taegong_s/%EC%84%9C%EB%B2%84-%EC%B4%88%EA%B8%B0-%EC%84%B8%ED%8C%85-%EB%B0%8F-%EC%A0%95%EC%A0%9C-%EB%8D%B0%EC%9D%B4%ED%84%B0-%EB%8D%B0%EC%9D%B4%ED%84%B0%EB%B2%A0%EC%9D%B4%EC%8A%A4-%EC%82%BD%EC%9E%85</guid>
            <pubDate>Tue, 15 Oct 2024 16:58:38 GMT</pubDate>
            <description><![CDATA[<p>작업 레포지토리 : <a href="https://github.com/fnzksxl/word-puzzle">https://github.com/fnzksxl/word-puzzle</a></p>
<p>지난 번에 정제했던 데이터를 데이터베이스에 삽입해보도록 하겠습니다.</p>
<p>프론트엔드까지 구현해볼지는 모르겠지만 백엔드 서비스 형태로
게임을 제공해보기 위해서 FastAPI 프레임워크를 사용하기로 했습니다.</p>
<p><del>MySQL이 설치가 되어있다는 전제하에 진행해보겠습니다.</del></p>
<pre><code class="language-python"># 프로젝트 구조
ROOT FOLDER
ㄴ app
  ㄴ main.py
  ㄴ config.py
  ㄴ database.py
  ㄴ models.py
ㄴ word-data
    ㄴ json files... 로우 데이터 JSON 파일들
ㄴ extract_word.py 데이터 정제 스크립트
ㄴ runserver.py 서버 실행 스크립트</code></pre>
<h2 id="초기-서버-설정">초기 서버 설정</h2>
<h4 id="실행환경-구성">실행환경 구성</h4>
<pre><code class="language-python"># 실행환경 구성
python -m venv venv 가상환경 설정
pip install uvicorn[standard] fastapi sqlalchemy pymysql</code></pre>
<pre><code class="language-python"># .env
# 각자의 환경에 맞게 .env 파일을 설정해주면 됩니다.
DB_USERNAME={USERNAME}
DB_PASSWORD={PASSWORD}
DB_HOST={HOST}
DB_PORT={PORT}
DB_NAME={NAME}

# config.py
from dotenv import load_dotenv
from functools import lru_cache

import os

load_dotenv()


class Settings:
    # DB Settings
    DB_USERNAME = os.getenv(&quot;DB_USERNAME&quot;)
    DB_HOST = os.getenv(&quot;DB_HOST&quot;)
    DB_PASSWORD = os.getenv(&quot;DB_PASSWORD&quot;)
    DB_NAME = os.getenv(&quot;DB_NAME&quot;)
    DB_PORT = os.getenv(&quot;DB_PORT&quot;)


@lru_cache
def get_settings():
    return Settings()


settings = get_settings()</code></pre>
<p>코드 전역에서 사용할 서버 세팅을 관리해주는 파일입니다.
lru_cache를 사용해 settings를 사용할 때 추가연산을 하지 않도록 했습니다.</p>
<pre><code class="language-python"># database.py

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

from app.config import settings

engine = create_engine(
    &quot;mysql+pymysql://{username}:{password}@{host}:{port}/{name}&quot;.format(
        username=settings.DB_USERNAME,
        password=settings.DB_PASSWORD,
        host=settings.DB_HOST,
        port=settings.DB_PORT,
        name=settings.DB_NAME,
    )
)
SessionLocal = sessionmaker(
    bind=engine,
    autocommit=False,
    autoflush=False,
)

Base = declarative_base()


def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()</code></pre>
<p>서버와 MySQL 서버를 연결해주는 세션을 생성하는 파일입니다.
API에서 사용할 때 get_db 함수를 Depends 해줄 수 있게 미리 선언했습니다.</p>
<pre><code class="language-python"># main.py

from contextlib import asynccontextmanager
from fastapi import FastAPI


@asynccontextmanager
async def lifespan(app: FastAPI):
    from app import models
    from app.database import engine

    models.Base.metadata.create_all(bind=engine)

    yield

    pass

app = FastAPI(lifespan=lifespan)

# runserver.py
import uvicorn

if __name__ == &quot;__main__&quot;:
    uvicorn.run(&quot;app.main:app&quot;, host=&quot;0.0.0.0&quot;, port=8000, reload=True)</code></pre>
<p>서버 실행과 관련된 파일입니다. 서버 실행 시에 데이터베이스와 연결되도록 lifespan으로 관리해줍니다.</p>
<p>이전에는 on_event를 사용해서 서버 시작, 종료 시 이벤트를 관리했는데
이는 사장될 문법이므로 지양하는게 좋습니다.</p>
<h4 id="db-테이블-생성">DB 테이블 생성</h4>
<pre><code class="language-python"># models.py
from sqlalchemy import Column, Integer, VARCHAR

from app.database import Base


class BaseMin:
    id = Column(Integer, primary_key=True, index=True)

class WordInfo(BaseMin, Base):
    __tablename__ = &quot;wordinfo&quot;

    # 단어 이름
    word = Column(VARCHAR(7), nullable=False)
    # 단어 설명
    desc = Column(VARCHAR(200), nullable=False)
    # 단어 길이
    len = Column(Integer, nullable=False)
    # 단어 품사
    pos = Column(VARCHAR(7), nullable=False)</code></pre>
<p>BaseMin 클래스를 선언해서 이후 테이블에서 id를 Primary Key로 반복 선언하지 않게 따로 빼줬습니다.</p>
<h2 id="데이터-db-삽입">데이터 DB 삽입</h2>
<pre><code class="language-python"># extract_word.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.models import WordInfo

import re
import os
import json
from app.config import settings


DIR = os.getcwd()
DATA_DIR = DIR + &quot;/word-data&quot;
json_list = os.listdir(DATA_DIR)

def process_word(word):
    &quot;&quot;&quot;
    자음이 포함된 단어 제거, 단어에 한글을 제외한 문자 및 공백 제거, 지정 길이를 벗어나는 단어 제거
    위 세 가지 일을 한 뒤 단어와 그 길이를 반환합니다

    Args:
        word (str): 정제할 단어
    Returns:
        dict: {단어, 단어의 길이}
    &quot;&quot;&quot;
    word = word.strip()
    consonant_mixed = re.search(r&#39;[ㄱ-ㅎ]&#39;, word)

    if consonant_mixed:
        return None

    result = re.sub(r&#39;[^가-힣]&#39;, &#39;&#39;, word)
    length = len(result)
    if length &gt; 7 or length &lt; 2:
        return None

    return {&quot;word&quot;: result, &quot;length&quot;: length}

def process_senseinfo(senseinfo):
    &quot;&quot;&quot;
    단어의 품사가 없거나 &#39;품사 없음&#39;일 시 제거, 방언 및 북한어 제거, 설명에 특이점이 있으면 제거
    위 세 가지 일을 한 뒤 설명과 단어의 품사를 반환합니다

    Args:
        senseinfo (dict): 품사, 설명 데이터가 들어있는 딕셔너리
    Returns:
        dict: {설명, 품사}
    &quot;&quot;&quot;
    definition = senseinfo[&quot;definition&quot;].strip()
    word_type = senseinfo[&quot;type&quot;].strip()
    pos = senseinfo.get(&quot;pos&quot;, None)

    if pos is None or pos == &quot;품사 없음&quot;:
        return None

    if word_type == &quot;방언&quot; or word_type == &quot;북한어&quot;:
        return None

    if len(definition) &gt; 200 or &quot;&amp;&quot; in definition or &quot;img&quot; in definition or &quot;&lt;FL&gt;&quot; in definition or &quot;규범 표기&quot; in definition or &quot;준말&quot; in definition or &quot;옛말&quot; in definition or &quot;-&quot; in definition:
        return None

    return {&quot;definition&quot;: definition, &quot;pos&quot;: pos}

if __name__ == &quot;__main__&quot;:
    &quot;&quot;&quot;
    JSON 파일로부터 단어 로우 데이터를 읽어들이고, 정제한 뒤 WordInfo 테이블에 추가합니다.
    &quot;&quot;&quot;
    engine = create_engine(
    &quot;mysql+pymysql://{username}:{password}@{host}:{port}/{name}&quot;.format(
        username=settings.DB_USERNAME,
        password=settings.DB_PASSWORD,
        host=settings.DB_HOST,
        port=settings.DB_PORT,
        name=settings.DB_NAME,
        )
    )

    Session = sessionmaker(bind=engine)
    db = Session()
    word_list = []

    for i, _json in enumerate(json_list):
        print(f&quot;{len(json_list)}/{i+1}번 째 파일, 파일명 : {_json}&quot;)
        with open(DATA_DIR+&quot;/&quot;+_json, encoding=&quot;utf-8&quot;) as f:
            json_data = json.load(f)
            for word_data in json_data[&quot;channel&quot;][&quot;item&quot;]:
                processed_word = process_word(word_data[&quot;wordinfo&quot;][&quot;word&quot;])
                processed_senseinfo = process_senseinfo(word_data[&quot;senseinfo&quot;])
                if processed_word and processed_senseinfo:
                    word_list.append({
                        &quot;word&quot;: processed_word[&quot;word&quot;],
                        &quot;desc&quot;: processed_senseinfo[&quot;definition&quot;],
                        &quot;pos&quot;: processed_senseinfo[&quot;pos&quot;],
                        &quot;len&quot;: processed_word[&quot;length&quot;]
                    })
                else:
                    continue
            print(f&quot;현재 단어 수 : {len(word_list)}&quot;)

    db.bulk_insert_mappings(WordInfo, word_list)
    db.commit()

    db.close()</code></pre>
<p>글 최상단의 프로젝트 구조대로 구성하고 위 스크립트를 실행하면
<img src="https://velog.velcdn.com/images/taegong_s/post/ec36ac12-671c-44ec-bfc3-4eef7b0d0cdc/image.png" alt=""></p>
<p>로그가 위와 같이 나오며 실행됩니다.
50만개가 넘는 row를 한 번에 삽입하는 경우에는 개별적으로 add하는게 아니라
bulk_insert 기능을 활용하는게 성능적으로 훨씬 우수합니다.</p>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/a9dde531-3955-4366-a5e5-81243684fc8f/image.png" alt=""></p>
<p>MySQL WorkBench에서 확인해본 결과입니다. 잘 들어가있죠?
다음 포스트에서는 십자말풀이 퍼즐 생성을 다뤄보겠습니다.
읽어주셔서 감사합니다.</p>
]]></description>
        </item>
        <item>
            <title><![CDATA[[십자말풀이] 단어 데이터 정제]]></title>
            <link>https://velog.io/@taegong_s/%EB%8B%A8%EC%96%B4-%EB%8D%B0%EC%9D%B4%ED%84%B0-%EC%A0%95%EC%A0%9C</link>
            <guid>https://velog.io/@taegong_s/%EB%8B%A8%EC%96%B4-%EB%8D%B0%EC%9D%B4%ED%84%B0-%EC%A0%95%EC%A0%9C</guid>
            <pubDate>Tue, 15 Oct 2024 15:13:58 GMT</pubDate>
            <description><![CDATA[<p>십자말풀이 게임을 한 번 토이프로젝트로 진행해보고자 합니다.
게임을 진행하려고 할 때마다 새로운 게임판을 생성해주는 형식을 취해보기 위해
단어 데이터들을 수집해서 데이터베이스에 넣어두려합니다.</p>
<p>작업 레포지토리 : <a href="https://github.com/fnzksxl/word-puzzle">https://github.com/fnzksxl/word-puzzle</a></p>
<h2 id="정제-전-데이터-소스">정제 전 데이터 소스</h2>
<p>국립국어원의 <a href="https://opendict.korean.go.kr/main">우리말샘</a>의 사전 데이터를 로우 데이터로 삼았습니다.</p>
<p>그 중 JSON 형태로 제공되는 파일의 일부를 살펴보겠습니다.</p>
<p><strong>channel</strong> 아래 <strong>item</strong> 배열에 단어 데이터들이 들어있는 형태네요.</p>
<pre><code class="language-json">{
  &quot;channel&quot;: {
    &quot;total&quot;: 50000,
    &quot;title&quot;: &quot;사전 검색&quot;,
    &quot;description&quot;: &quot;사전 검색 결과&quot;,
    &quot;item&quot;: [
      {
        &quot;wordinfo&quot;: {
          &quot;conju_info&quot;: [
            {
              &quot;conjugation_info&quot;: {
                &quot;pronunciation_info&quot;: {
                  &quot;pronunciation&quot;: &quot;걷치레싱만&quot;
                },
                &quot;conjugation&quot;: &quot;겉치레식만&quot;
              }
            }
          ],
          &quot;pronunciation_info&quot;: [
            {
              &quot;pronunciation&quot;: &quot;걷치레식&quot;
            }
          ],
          &quot;word_unit&quot;: &quot;어휘&quot;,
          &quot;word&quot;: &quot;겉치레-식&quot;,
          &quot;original_language_info&quot;: [
            {
              &quot;original_language&quot;: &quot;겉치레&quot;,
              &quot;language_type&quot;: &quot;고유어&quot;
            },
            {
              &quot;original_language&quot;: &quot;式&quot;,
              &quot;language_type&quot;: &quot;한자&quot;
            }
          ],
          &quot;word_type&quot;: &quot;혼종어&quot;
        },
        &quot;group_order&quot;: 1,
        &quot;group_code&quot;: 46174,
        &quot;link&quot;: &quot;http://opendict.korean.go.kr/dictionary/view?sense_no=643318&quot;,
        &quot;target_code&quot;: 643318,
        &quot;senseinfo&quot;: {
          &quot;definition&quot;: &quot;겉으로 보기에만 좋게 꾸미어 드러내는 방식.&quot;,
          &quot;sense_no&quot;: &quot;001&quot;,
          &quot;type&quot;: &quot;일반어&quot;,
          &quot;example_info&quot;: [
            {
              &quot;source&quot;: &quot;노컷뉴스 2009년 7월&quot;,
              &quot;example&quot;: &quot;그러나 주변에서 일상적으로 듣는 {겉치레식} 격려는 아무런 힘을 발휘할 수 없다.&quot;
            },
            {
              &quot;source&quot;: &quot;서울신문 2013년 3월&quot;,
              &quot;example&quot;: &quot;“통과의례, {겉치레식의} 인사 청문회 제도에 대한 개선이 필요하다.”라는 지적도 나온다.&quot;
            }
          ],
          &quot;definition_original&quot;: &quot;겉으로 보기에만 좋게 꾸미어 드러내는 방식.&quot;,
          &quot;pos&quot;: &quot;명사&quot;
        }
      },</code></pre>
<hr>
<h2 id="데이터-정제">데이터 정제</h2>
<h3 id="1-단순-데이터-추출">1. 단순 데이터 추출</h3>
<pre><code class="language-python"># 단어 이름, 설명, 품사 추출하는 스크립트
import json
import os


DIR = os.getcwd()
DATA_DIR = DIR + &quot;/word-data&quot;
json_list = os.listdir(DATA_DIR)

for i, _json in enumerate(json_list):
        print(f&quot;{len(json_list)}/{i+1}번 째 파일, 파일명 : {_json}&quot;)
        with open(DATA_DIR+&quot;/&quot;+_json, encoding=&quot;utf-8&quot;) as f:
            json_data = json.load(f)
            for word_data in json_data[&quot;channel&quot;][&quot;item&quot;]:
                word = word_data[&quot;wordinfo&quot;][&quot;word&quot;]
                senseinfo = word_data[&quot;senseinfo&quot;]
                pos = senseinfo[&quot;pos&quot;]
                definition = senseinfo[&quot;definition&quot;]</code></pre>
<p>위 데이터 중 단어의 이름인 [&quot;wordinfo&quot;][&quot;word&quot;],
단어의 설명인 [&quot;senseinfo&quot;][&quot;definition&quot;],
단어의 품사인 [&quot;senseinfo&quot;][&quot;pos&quot;] 이렇게 세 개를 가져오기로 했습니다.</p>
<h3 id="2-데이터-확인">2. 데이터 확인</h3>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/c633604c-db88-4b1f-9edb-6524ddd6931f/image.png" alt=""></p>
<p>단어 설명에 img 나 FL 태그가 들어있는 경우도 있었고,</p>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/3eb37b50-7884-42a4-9d2a-c086521634a4/image.png" alt=""></p>
<p>어휘에 특수문자가 들어가거나,</p>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/546b5b3e-5f8a-4586-aed0-c3afd1298ed4/image.png" alt="">
자음만 존재하는 등 십자말풀이에서 쓰지 못할 데이터가 존재합니다.
이외에도 설명에 %lt %gt 같은 용어, 너무 긴 단어나 설명이 존재하기도 했습니다.</p>
<h3 id="3-데이터-정제">3. 데이터 정제</h3>
<p>위에서 확인해본 데이터를 배제함과 동시에 십자말풀이에 쓸 수 있도록
데이터를 정제해보겠습니다.</p>
<h4 id="단어">단어</h4>
<blockquote>
<ol>
<li>2 &lt;= 단어의 길이 &lt;= 7 인 경우만 추출할 것</li>
<li>단어에 자음만 존재하거나 한글을 제외한 다른 문자가 있으면 스킵할 것</li>
<li>단어에 띄워쓰기는 제거할 것</li>
</ol>
</blockquote>
<p>위 조건을 만족하도록 단어를 정제하는 함수를 정규표현식을 이용해 작성해봅시다.</p>
<pre><code class="language-python"># 원하는 조건으로 단어를 정제하는 함수
def process_word(word):
    word = word.strip()
    consonant_mixed = re.search(r&#39;[ㄱ-ㅎ]&#39;, word)

    if consonant_mixed:
        return None

    result = re.sub(r&#39;[^가-힣]&#39;, &#39;&#39;, word)
    length = len(result)
    if length &gt; 7 or length &lt; 2:
        return None

    return {&quot;word&quot;: result, &quot;length&quot;: length}</code></pre>
<ol>
<li>re.search(pattern, string) 함수는 패턴과 문자열이 일치하는 부분이 없으면 None, 있으면 그 부분을 반환해줍니다.</li>
<li>re.sub(pattern, replacement, string) 함수는 패턴과 문자열이 일치하는 모든 부분을 replacement로 치환해줍니다. r&#39;[^가-힣]&#39;의 패턴의 경우 한글을 제외한 모든 문자를 의미합니다. ^ 기호는 차집합의 의미로 생각하면 이해가 쉽습니다.</li>
</ol>
<p>다른 메타 문자나 정규표현식의 더 많은 함수는 <a href="https://docs.python.org/ko/3/howto/regex.html">파이썬에서 제공해주는 문서</a>에서 자세히 확인해볼 수 있습니다.</p>
<pre><code class="language-python"># EXAMPLE
word_list = [&quot;ㄱ-&quot;, &quot;게&quot;, &quot;게임&quot;, &quot;게이밍 모니터&quot;]
processed_word_list = []
for word in word_list:
    processed_word_list.append(process_word(word))
print(processed_word_list)</code></pre>
<pre><code class="language-python"># 출력 결과
[None, None, {&#39;word&#39;: &#39;게임&#39;, &#39;length&#39;: 2}, {&#39;word&#39;: &#39;게이밍모니터&#39;, &#39;length&#39;: 6}]</code></pre>
<h4 id="설명-및-품사">설명 및 품사</h4>
<blockquote>
<ol>
<li>품사는 반드시 있어야 할 것</li>
<li>방언과 북한어는 제외할 것</li>
<li>설명의 길이 &gt; 200 이거나 태그가 들어있으면 제외할 것</li>
<li>규범 표기, 옛말, 준말 등의 설명이 있으면 제외할 것</li>
<li>-가 들어가는 경우는 제외할 것</li>
</ol>
</blockquote>
<p>5번의 경우는 설명에서 이상한 부분이 있었지만 따로 캡쳐하지 못했습니다.</p>
<p>위 조건을 만족하도록 설명과 품사를 전제하는 함수를 만들어 봅시다.</p>
<pre><code class="language-python"># 원하는 조건으로 설명을 정제하는 함수
def process_senseinfo(senseinfo):
    definition = senseinfo[&quot;definition&quot;].strip()
    word_type = senseinfo[&quot;type&quot;].strip()
    pos = senseinfo.get(&quot;pos&quot;, None)

    if pos is None or pos == &quot;품사 없음&quot;:
        return None

    pos = pos.strip()

    if word_type == &quot;방언&quot; or word_type == &quot;북한어&quot;:
        return None

    if len(definition) &gt; 200 or &quot;&amp;&quot; in definition or &quot;img&quot; in definition or &quot;&lt;FL&gt;&quot; in definition or &quot;규범 표기&quot; in definition or &quot;준말&quot; in definition or &quot;옛말&quot; in definition or &quot;-&quot; in definition:
        return None

    return {&quot;definition&quot;: definition, &quot;pos&quot;: pos}</code></pre>
<pre><code class="language-python"># EXAMPLE
senseinfo_list = [{&quot;definition&quot;: &quot;&amp;&amp;안녕하세요&quot;, &quot;type&quot;: &quot;고유어&quot;, &quot;pos&quot;: &quot;명사&quot;},
                  {&quot;definition&quot;: &quot;안녕하세요&quot;, &quot;type&quot;: &quot;북한어&quot;, &quot;pos&quot;: &quot;명사&quot;},
                  {&quot;definition&quot;: &quot;안녕하세요&quot;, &quot;type&quot;: &quot;고유어&quot;, &quot;pos&quot;: &quot;명사&quot;},
                  {&quot;definition&quot;: &quot;안녕하세요&quot;, &quot;type&quot;: &quot;고유어&quot;, &quot;pos&quot;: &quot;품사 없음&quot;},
                  {&quot;definition&quot;: &quot;안녕하세요 &lt;img&quot;, &quot;type&quot;: &quot;고유어&quot;, &quot;pos&quot;: &quot;명사&quot;}]
processed_senseinfo_list = []
for senseinfo in senseinfo_list:
    processed_senseinfo_list.append(process_senseinfo(senseinfo))
print(processed_senseinfo_list)</code></pre>
<pre><code class="language-python"># 출력 결과
[None, None, {&#39;definition&#39;: &#39;안녕하세요&#39;, &#39;pos&#39;: &#39;명사&#39;}, None, None]</code></pre>
<p>다음 포스트에서는 정제한 단어 데이터들을 데이터베이스에 삽입해보겠습니다.
읽어주셔서 감사합니다.</p>
]]></description>
        </item>
        <item>
            <title><![CDATA[DRF Celery를 사용한 이메일 인증 서비스 구현]]></title>
            <link>https://velog.io/@taegong_s/DRF-Celery%EB%A5%BC-%EC%82%AC%EC%9A%A9%ED%95%9C-%EC%9D%B4%EB%A9%94%EC%9D%BC-%EC%9D%B8%EC%A6%9D-%EC%84%9C%EB%B9%84%EC%8A%A4-%EA%B5%AC%ED%98%84</link>
            <guid>https://velog.io/@taegong_s/DRF-Celery%EB%A5%BC-%EC%82%AC%EC%9A%A9%ED%95%9C-%EC%9D%B4%EB%A9%94%EC%9D%BC-%EC%9D%B8%EC%A6%9D-%EC%84%9C%EB%B9%84%EC%8A%A4-%EA%B5%AC%ED%98%84</guid>
            <pubDate>Fri, 24 May 2024 10:30:49 GMT</pubDate>
            <description><![CDATA[<p>개발중인 서비스에서 이메일 인증 서비스를 한 번 도입해보기로 했다.
널리 쓰이는 Gmail을 통해서 보내볼거고, 꽤나 오래 걸리는 작업이므로 Celery에 Task를 보내 백그라운드에서 처리할 것이다.</p>
<h2 id="gmail-설정하기">Gmail 설정하기</h2>
<p>장고와 같은 외부 앱에서 Gmail을 사용하려면 앱 비밀번호가 필요하다
<a href="https://myaccount.google.com/apppasswords">비밀번호 설정하기</a> &lt;= 이 링크를 타고 들어가서 비밀번호를 생성해주도록 하자.</p>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/770e3a1e-998b-48a3-b2d8-288f439ebedd/image.png" alt="">
필자가 테스트용으로 만들어본 앱비밀번호 16자리이다.
이 화면에서 벗어나면 다시 확인할 수가 없으니 확인 버튼을 누르기 전에
꼭 미리 저장해놔야 한다.</p>
<p>위 비밀번호는 외부에 반출되면 안되므로 프로젝트의 env 파일에 잘 숨겨두자.</p>
<h2 id="이메일-인증-로직">이메일 인증 로직</h2>
<p>이메일을 보내기 전에 구현해볼 인증 로직을 먼저 설명해보도록 하겠다.</p>
<ol>
<li>유저에게서 이메일을 입력받고 주소로 랜덤 코드를 발송한다.</li>
<li>서버에서는 redis에 이메일:랜덤 코드 쌍으로 5분의 유효기간을 주고 저장한다.</li>
<li>유저가 코드 확인 후 검증 API로 확인한 코드를 전송한다.</li>
<li>서버에서 코드의 일치 여부를 확인하고 일치한다면 이 코드의 유효기간을 삭제한다.</li>
<li>코드를 클라이언트 측으로 반환해준다.</li>
<li>회원가입 시 클라이언트는 유저정보와 함께 이 코드를 같이 서버로 보내 이메일 확인이 되었음을 증명한다.</li>
<li>회원가입 성공 시 서버에서는 이메일:랜덤 코드를 redis에서 제거한다.</li>
</ol>
<h2 id="django에서-이메일-보내기">Django에서 이메일 보내기</h2>
<p>장고에서는 이메일을 보내는 클래스가 이미 존재한다.
(역시 batteries included 철학답다)</p>
<pre><code class="language-python">#project_name/settings.py

EMAIL_BACKEND = &quot;django.core.mail.backends.smtp.EmailBackend&quot;
EMAIL_HOST = &quot;smtp.gmail.com&quot;
EMAIL_PORT = 587
EMAIL_HOST_USER = env(&quot;EMAIL_ID&quot;)
EMAIL_HOST_PASSWORD = env(&quot;EMAIL_PW&quot;)
EMAIL_USE_TLS = True
DEFAULT_FROM_EMAIL = EMAIL_HOST_USER</code></pre>
<p>프로젝트의 settings.py에 위 내용을 추가해주도록 하자.
env 파일에 이메일을 보내는 주체(EMAIL_ID), 위에서 생성한 비밀번호(EMAIL_PW)가 존재한다는 가정 하에 작성된 내용이다.</p>
<p>env 파일을 읽어오는 방법은 사용자에 따라 가지각색이므로 어떻게 불러와도 상관없다.</p>
<hr>
<p>위 이메일 인증 로직을 따라가면서 구현해보자.</p>
<h4 id="로직-1번">로직 1번</h4>
<p>먼저 랜덤 코드를 생성해 이메일로 발송해야한다.</p>
<pre><code class="language-python">#project_app/utils.py NEW

import strings
import secrets

class SendEmailHelper:
    def make_random_code_for_register():
        digit_and_alpha = string.ascii_letters + string.digits
        return &quot;&quot;.join(secrets.choice(digit_and_alpha) for _ in range(6))

sendEmailHelper = SendEmailHelper()</code></pre>
<p>이메일을 보내는 앱에 utils.py를 생성해 랜덤한 6자리의 코드를 생성해보자.
위 코드를 사용하면 알파벳+숫자로 이루어진 코드가 생성된다.</p>
<pre><code class="language-python">#project_app/views.py

from django.core.mail import EmailMessage
from .utils import sendEmailHelper

class EmailVerifyView(APIView):

    def post(self, request, *args, **kwargs):
        email = request.data.get(&quot;email&quot;)
        code = sendEmailHelper.make_random_code_for_register()
        message = code
        subject = &quot;EMAIL 제목&quot;
        to = [email]
        mail = EmailMessage(subject=subject, body=message, to=to)
        mail.content_subtype = &quot;html&quot; # html형태로 템플릿을 만들었을 때 필요함
        mail.send()

        return Response({&quot;detail&quot;: &quot;Success to send Email&quot;}, status=status.HTTP_202_ACCEPTED)
</code></pre>
<p>간단하게 유저로부터 이메일을 입력 받아 주소로 랜덤 코드 6자리를 보내는 코드이다.</p>
<p>message에는 이메일의 내용이, subject에는 제목이, to에는 받는 주소가 들어가야 한다.
(to에는 반드시 튜플이나 리스트의 형태의 데이터가 들어가야 함을 주의하자)</p>
<p>포스트맨으로 테스트를 한 번 해보자.</p>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/0b13b497-4198-483b-8d02-a550d92788f5/image.png" alt=""></p>
<p>오! 이메일이 잘 들어왔고, 포스트맨을 보면?</p>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/420ec603-c4ac-42a6-b913-e9cbcb45f37e/image.png" alt=""></p>
<p>응답도 잘 들어와있으나 시간을 한 번 보시라. 4초에 가까운 시간이 걸리는데 이를 일반적인 HTTP 통신 상에서 처리하려고 하면 안 된다.</p>
<p>이를 <a href="https://velog.io/@taegong_s/DRF-redis-Celery%EB%A1%9C-%EC%A1%B0%ED%9A%8C%EC%88%98-%EA%B5%AC%ED%98%84">지난 포스트</a>에서 다루었던 Celery를 이용해 백그라운드로 넘겨서 진행해주자.</p>
<p>위 링크를 통해 기본적인 Celery 세팅은 다 되었다고 판단하고 진행하겠다.</p>
<pre><code class="language-python">#project_app/tasks.py NEW

from django.core.mail import EmailMessage

from wtnt.celery import app
from .utils import sendEmailHelper


@app.task
def send_email(email):
    code = sendEmailHelper.make_random_code_for_register()
    message = sendEmailHelper.get_template(code)
    subject = &quot;%s&quot; % &quot;[WTNT] 이메일 인증 코드 안내&quot;
    to = [email]
    mail = EmailMessage(subject=subject, body=message, to=to)
    mail.content_subtype = &quot;html&quot;
    mail.send()
    return &quot;Success to send email&quot;</code></pre>
<p>tasks.py에 이메일 보내는 함수를 따로 빼내어 작성했다.
필자는 sendEmailHelper에 이메일의 템플릿을 따로 작성해두어 이메일의 내용에 약간의 구색을 갖춰두었다.</p>
<pre><code class="language-python">#project_app/views.py

from .tasks import send_email

class EmailVerifyView(APIView):

    def post(self, request, *args, **kwargs):
        email = request.data.get(&quot;email&quot;)
        try:
            send_email.delay(email)
            return Response({&quot;detail&quot;: &quot;Success to send Email&quot;}, status=status.HTTP_202_ACCEPTED)
        except Exception as e:
            return Response({&quot;error&quot;: e}, status=status.HTTP_400_BAD_REQUEST)</code></pre>
<p>Celery에 task를 할당하려면 delay 함수를 사용하면 된다. 좀 더 자세히 알아보고 싶다면 공식 문서를 참고해보는게 좋다.</p>
<pre><code class="language-python">celery -A project worker -l INFO</code></pre>
<p>Celery를 켜주고 같은 API에 요청을 보내보자.</p>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/266743a6-7f05-46bb-b1e9-267c2697462a/image.png" alt=""></p>
<p>Celery에 요청이 received되고 앞에서 포스트맨을 사용했을 때와 비슷한 시간이 걸려서 Task를 완수했다.</p>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/24b89df9-603d-4256-91c0-0a8c159669d8/image.png" alt=""></p>
<p>이메일도 예쁘게 도착한 모습.</p>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/3df1cce6-814a-44a6-80c8-50e414bb8f63/image.png" alt=""></p>
<p>그리고 포스트맨에서는? 125ms 밖에 걸리지 않았다. 야호!</p>
<p>이제 로직의 1번을 달성했다.</p>
<h4 id="로직-2번">로직 2번</h4>
<p>redis에 이메일:랜덤 코드 쌍을 유효기간 5분을 두고 저장한다.
이는 매우 간단하다.</p>
<pre><code class="language-python">from django_redis import get_redis_connection
from django.core.mail import EmailMessage

from wtnt.celery import app
from .utils import sendEmailHelper

client = get_redis_connection(&quot;default&quot;)


@app.task
def send_email(email):
    code = sendEmailHelper.make_random_code_for_register()
    client.set(email, code, ex=300)
    message = sendEmailHelper.get_template(code)
    subject = &quot;%s&quot; % &quot;[WTNT] 이메일 인증 코드 안내&quot;
    to = [email]
    mail = EmailMessage(subject=subject, body=message, to=to)
    mail.content_subtype = &quot;html&quot;
    mail.send()
    return &quot;Success to send email&quot;</code></pre>
<p>tasks.py를 위와 같이 변경해주었다.</p>
<pre><code class="language-python">client.set(email, code, ex=3000)</code></pre>
<p>이 코드가 함수 안에 추가된 것인데, 이는 email 키에 code값을 300초(5분)동안 저장하겠다는 것이다. ex 값을 60으로 바꿔놓고 확인해보면 1분 뒤에 키가 자동으로 삭제되어있을 것이다.</p>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/4f32ee98-f3af-4883-b0f9-6266d6988f05/image.png" alt=""></p>
<p>이메일을 보낸 뒤 redis-cli로 확인해보면 이렇게 이메일 키 값이 생겨져있다.</p>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/05a8e7f8-d016-4c47-bdb2-ac2be7b4cbeb/image.png" alt=""></p>
<p>아무런 작업을 하지않고 ex만큼 시간이 지난 뒤 다시 확인해보면 키가 없어진 것을 볼 수 있다.</p>
<h4 id="로직-345번">로직 3,4,5번</h4>
<p>유저가 보낸 코드가 일치하는지 확인하고 redis 내부의 값을 변경해줘야한다.</p>
<pre><code class="language-python">#project_app/views.py

from django_redis import get_redis_connection

client = get_redis_connection()

def patch(self, request, *args, **kwargs):
        code = request.data.get(&quot;code&quot;)
        email = request.data.get(&quot;email&quot;)
        answer = client.get(email)
        if code == answer:
            client.set(email, code)
            return Response({&quot;code&quot;: code}, status=status.HTTP_200_OK)
        else:
            return Response({&quot;error&quot;: &quot;Code Not Matched&quot;}, status=status.HTTP_400_BAD_REQUEST)</code></pre>
<p>위에서 작성한 EmailVerifyView에 patch 메소드를 추가해줬다.
유저로부터 코드를 받아 일치하는지 확인하는 API다.</p>
<p>코드가 일치하면 다시 이메일:랜덤 코드로 set해주는데 이번에는 ex옵션이 없다.
이렇게 하면 유효기간을 없앨 수 있다.</p>
<p>이 후에 클라이언트 쪽으로 유저가 입력한 코드를 다시 보내준다.</p>
<h4 id="로직-67번">로직 6,7번</h4>
<pre><code class="language-python">#project_app/views.py

def post(self, request):
        code = request.data.get(&quot;code&quot;)
        email = request.data.get(&quot;email&quot;)
        if code != client.get(email):
            return Response({&quot;error&quot;: &quot;Code Not Matched&quot;}, status=status.HTTP_400_BAD_REQUEST)

        # ...

        client.delete(email)</code></pre>
<p>회원가입을 담당하는 뷰에서 code와 email을 추가로 입력받고 redis에서 일치하는지 확인해 유저가 이메일 인증 작업을 완료했는지 검증한다.</p>
<p>회원가입 로직이 완료되면 이메일 키를 삭제해준다.</p>
]]></description>
        </item>
        <item>
            <title><![CDATA[DRF redis, Celery로 조회수 구현 ]]></title>
            <link>https://velog.io/@taegong_s/DRF-redis-Celery%EB%A1%9C-%EC%A1%B0%ED%9A%8C%EC%88%98-%EA%B5%AC%ED%98%84</link>
            <guid>https://velog.io/@taegong_s/DRF-redis-Celery%EB%A1%9C-%EC%A1%B0%ED%9A%8C%EC%88%98-%EA%B5%AC%ED%98%84</guid>
            <pubDate>Thu, 23 May 2024 13:05:45 GMT</pubDate>
            <description><![CDATA[<h2 id="초간단-구현">초간단 구현</h2>
<p>간단하게 포스트 디테일 페이지로 유저가 접근할 때마다 포스트의 조회수를 1씩 늘려주는 방법이 있다.</p>
<pre><code class="language-python">def get(self, request, *args, **kwargs):
        team_id = kwargs.get(&quot;team_id&quot;)
        try:
            team = Team.objects.get(id=team_id)
            team.view += 1
            team.save()
            teamSerializer = TeamCreateSerializer(team)
            response = createSerializerHelper.make_response(teamSerializer.data, request.user.id)

            return Response(response, status=status.HTTP_200_OK)

        except Team.DoesNotExist:
            return Response({&quot;error&quot;: &quot;No Content&quot;}, status=status.HTTP_404_NOT_FOUND)</code></pre>
<ul>
<li>문제점
API에 접근할 때마다 조회수를 <strong>무조건</strong> 올려주게 되어 있으므로 유저가 마음만 먹으면 무한정으로 조회수가 올라간다!</li>
</ul>
<h3 id="어떻게-해결해볼-수-있을까">어떻게 해결해볼 수 있을까?</h3>
<blockquote>
<p>이번에 조회수 기능을 개발하면서 목표는 <em>당일 중복 조회 불가</em>이다.
00시시가 되면 다시 조회수를 집계 해주겠다는 뜻</p>
</blockquote>
<p><strong>1. 쿠키에 조회 내역 정보를 담아서 보내준다</strong></p>
<p>예를 들어 id가 1인 유저가 10번, 20번 게시글에 접근했다면
쿠키에 view:1=10|20 같은 방식으로 저장해주는 것이다.</p>
<p>쿠키를 보낼 때, 쿠키의 생명주기를 당일 00시까지로 제한해 발급해주면
다음 날부터는 다시 조회수가 집계된다.</p>
<pre><code class="language-python">response.set_cookie(&quot;view&quot;, 조회 내역, expires=내일까지의 시간)</code></pre>
<p>이제 쿠키의 유무와 내용만 먼저 확인하면 조회수를 집계할 수 있다.
이 방법에는 문제가 없을까?</p>
<p>쿠키는 클라이언트가 관리할 수 있다.
즉, <strong>쿠키를 의도적으로 삭제</strong>할 수 있다는 것.</p>
<p>물론, 지금 개발하고 있는 서비스는 유튜브처럼 조회수가 비즈니스적으로 중요하지 않아 유저가 쿠키를 삭제해서 조회수를 올린다고 큰 문제가 되지는 않지만 쉽게 눈치챌 수 있는 기믹이다.</p>
<p><strong>2. 조회 내역 정보를 DB에 담아두고 서버에서 관리한다</strong></p>
<p>조회하는 유저와 게시글 정보 쌍을 DB에 저장해 조회수 중복을 방지할 수 있다.</p>
<p>user_id와 post_id를 저장하는 테이블이 있다면 가능한 얘기.
하지만 이 방법은 조회의 주체가 <strong>비회원</strong>이라면 관리가 불가능하다.
비회원의 id를 0으로 간주해 저장하는 방법도 있겠으나 다른 사람이 조회했을 때도 집계가 되지 않을 수 있다.</p>
<p>조회하는 유저의 IP정보를 저장하면 어떻게 될까?
서비스의 특성상 이 서비스는 주로 학교의 동아리 회원들이 학교에서 사용할 가능성이 높다. 즉, 같은 IP로 접근할 가능성이 꽤나 있다는 것.</p>
<p>따라서 user_id와 IP를 조합한 정보를 저장해보도록 하자.
게시글을 조회할 때마다 RDBMS에 I/O를 더 가져가기가 싫어서 이 정보를
redis에 저장해보기로 했다.</p>
<p>redis는 집합 자료형을 제공하므로 SADD 명령을 실행했을 때 반환 값을 통해 중복 여부를 체크하기에 용이하다.</p>
<p>또한 Celery-beat로 스케쥴링해주면 특정 키를 정해진 시간에 삭제해 하루가 지나면 다시 조회수를 카운팅해줄 수도 있다.</p>
<p><a href="https://medium.com/@miladev95/python-celery-cfff053ba1d1">Celery란 무엇일까?</a> 링크에 Celery의 개요와 장단점이 잘 설명되어 있다.</p>
<h3 id="로직">로직</h3>
<p>조회 관련 정보는 views:team_id 집합에 저장된다.</p>
<ol>
<li>유저가 게시글에 접근하면 redis에 유저id_IP 조합의 밸류값이 있는지 확인한다.</li>
<li>존재한다면 -&gt; PASS
 오늘 첫 방문이라면 -&gt; views:team_id 집합에 유저id_IP조합을 저장후 조회 수를 +1 해준다.</li>
<li>매일 자정마다 views:로 시작하는 키를 모두 <strong>제거</strong>한다.</li>
</ol>
<hr>
<p>2번 방법을 통해 한 번 구현해보도록 하자.
우선 Celery를 설치하고, Django 프로젝트에 이식하자.
(redis는 로컬에 설치되어 있거나 docker로 실행되어 있다고 가정한다.)</p>
<pre><code class="language-python"># Celery 관련 라이브러리 설치
pip install celery django-celery-beat django-celery-results</code></pre>
<pre><code class="language-python">#project_name/settings.py

INSTALLED_APPS = [
    # 기존 APP들 +
    &quot;celery&quot;,
    &quot;django_celery_beat&quot;,
    &quot;django_celery_results&quot;,
]

CELERY_BROKER_URL = &quot;redis://127.0.0.1:6379/0&quot;
CELERY_RESULT_BACKEND = &quot;django-db&quot;
CELERY_TIMEZONE = &quot;Asia/Seoul&quot;</code></pre>
<p>INSTALLED_APPS에 방금 설치한 내용을 업데이트,
Celery 설정을 추가해준다.
아래의 설정 파일을 따라해 Celery가 장고의 설정을 가져오는 경우에
RESULT_BACKEND를 저렇게 설정해 줄 수 있다.</p>
<pre><code class="language-python"># django_celery_results 관련 DB를 migrate 해준다.
python manage.py migrate</code></pre>
<pre><code class="language-python">#project_name/celery.py NEW

import os

from celery import Celery

os.environ.setdefault(&quot;DJANGO_SETTINGS_MODULE&quot;, &quot;#project_name.settings&quot;)
app = Celery(&quot;#project_name&quot;)

app.config_from_object(&quot;django.conf:settings&quot;, namespace=&quot;CELERY&quot;)
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print(f&quot;Request: {self.request!r}&quot;)</code></pre>
<blockquote>
<p>참고: <a href="https://velog.io/@jaewan/Celerywindow%EC%97%90%EC%84%9C-celery-task%EA%B0%80-%EC%8B%A4%ED%96%89%EB%90%98%EC%A7%80-%EC%95%8A%EB%8A%94-%EB%AC%B8%EC%A0%9C">Windows 환경이라면?</a> 링크를 참고해서 설정을 추가해야한다. 필자도 Windows 환경이었는데 도움을 많이 받았다.</p>
</blockquote>
<p>프로젝트 폴더 내에 celery.py 스크립트를 새로 생성한 뒤 위 내용을 적어주자.</p>
<p>위 내용은 장고 프로젝트의 세팅에서 celery세팅을 가져오고 이와 관련된 설정은 CELERY_로 시작한다는 뜻이다. namespace는 입맛대로 커스텀하면 된다.</p>
<p>app.autodiscover_tasks() 함수를 사용하면 각각의 앱 폴더에서 tasks.py 내부에 celery 데코레이터가 붙어있는 task를 모두 찾아서 적용할 수 있다.</p>
<pre><code class="language-python">#project_name/__init__.py

from .celery import app as celery_app

__all__ = (&quot;celery_app&quot;,)
</code></pre>
<p>위 설정을 추가하면 @shared_task 데코레이터를 사용해도 task를 app에 불러올 수 있다. (tasks.py에서 celery app을 import 안 해도 된다는 뜻)
이번에는 @app.task를 사용해줄 것이므로 해도 되고 안해도 된다.</p>
<pre><code class="language-python">#project_app/views.py

from django_redis import get_redis_connection

client = get_redis_connection()

def get(self, request, *args, **kwargs):
    team_id = kwargs.get(&quot;team_id&quot;)
    if not request.user.id:
        user_id = 0
    else:
        user_id = request.user.id
    try:
        redis_ans = client.sadd(f&quot;views:{team_id}&quot;, f&quot;{user_id}_{request.META.get(&#39;REMOTE_ADDR&#39;)}&quot;)
        team = Team.objects.get(id=team_id)
        if redis_ans:
            team.view += 1
            team.save()
        teamSerializer = TeamCreateSerializer(team)
        response = createSerializerHelper.make_response(teamSerializer.data, request.user.id)

        return Response(response, status=status.HTTP_200_OK)

    except Team.DoesNotExist:
        return Response({&quot;error&quot;: &quot;No Content&quot;}, status=status.HTTP_404_NOT_FOUND)</code></pre>
<p>비회원의 조회에 대해서 카운트해주기 위해서 비회원의 유저id는 0으로 고정했다.</p>
<p>유저가 API에 접근했을 때 views:team_id에 유저id_IP쌍을 SADD 해준다.
이 때, 집합 추가에 성공한다면 이 명령은 1을 반환하고 실패하면 0을 반환해준다.</p>
<p>따라서 이 값을 분기점으로 조회수를 추가해준다.</p>
<pre><code class="language-python">#project_app/tasks.py NEW

from django_redis import get_redis_connection

from #project_name.celery import app

client = get_redis_connection()


@app.task
def delete_view_history():
    cursor = &quot;0&quot;
    print(&quot;--- 팀 조회 히스토리 삭제 시작 ---&quot;)
    while cursor != 0:
        cursor, keys = client.scan(cursor=cursor, match=&quot;views:*&quot;)
        if keys:
            client.delete(*keys)
    print(&quot;--- 팀 조회 히스토리 삭제 완료 ---&quot;)</code></pre>
<p>celery에서 사용할 수 있는 task를 추가해주자. views:로 시작되는 키를 반복적으로 찾아내어 밸류값을 삭제하는 함수이다.</p>
<h3 id="여기서-잠깐">여기서 잠깐!</h3>
<p>필자는 장고의 캐시백엔드로 redis를 이미 사용하고 있다.
그런데 왜 django_redis에서 redis_connection을 따로 빼내어서 연결 설정을 해줬을까?</p>
<p>이유는 간단하다. django_redis의 cache.py를 보면 SADD에 관한 내용이 없다..
set, get, delete 등 뿐이고 set이나 sorted_set에 대한 내용을 지원해주지 않는다.</p>
<hr>
<p>task는 추가해줬으나 어떻게 자정마다 실행하게 할 수 있을까?
이를 가능케하는 것이 Celery-beat이다.</p>
<pre><code class="language-python">#project_name/celery.py

from celery.schedules import crontab

app.conf.beat_schedule = {
    &quot;delete-view-history-per-day&quot;: {&quot;task&quot;: &quot;project_app.tasks.delete_view_history&quot;, &quot;schedule&quot;: crontab(minute=0, hour=0)}
}</code></pre>
<p>celery.py로 돌아가 위 내용을 추가해주자. crontab을 통해 특정 시간을 지정해 task를 실행시킬 수 있게 해준다.</p>
<h2 id="실행">실행</h2>
<pre><code class="language-python">celery -A project_name worker -l INFO
celery -A project_name beat
python manage.py runserver</code></pre>
<p>각각의 터미널에서 명령을 입력해 실행해주자.</p>
<p><strong>celery worker</strong>
<img src="https://velog.velcdn.com/images/taegong_s/post/8cb10c72-7e41-4ee5-93e2-36126a8d7621/image.png" alt="">
user/tasks.send_email은 이메일 보내는 task를 따로 추가한 것이니 출력에 없어도 된다.</p>
<p><strong>celery beat</strong>
<img src="https://velog.velcdn.com/images/taegong_s/post/efa9723d-e3dc-4138-b852-998142e8c82a/image.png" alt=""></p>
<p>잘 실행되었다면 위와 같이 출력되어 있을 것이다.</p>
<p>Postman으로 조회를 한 번 해본 뒤에 redis-cli에서 확인해보면
아래와 같이 views:team_id 키가 생성되어있다.</p>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/9b0e2265-0082-4424-b686-e8e8e642ce26/image.png" alt=""></p>
<p>이게 자정에 삭제되는지 안 되는지 확인하기까지 시간이 많이 남았다면
celery.py의 crontab 설정을 바꿔볼 수 있다.</p>
<pre><code class="language-python">app.conf.beat_schedule = {
    &quot;delete-view-history-per-day&quot;: {&quot;task&quot;: &quot;project_app.tasks.delete_view_history&quot;, &quot;schedule&quot;: crontab()}
}</code></pre>
<p>이렇게 바꿔주면 매 분마다 task가 실행된다.</p>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/f4bf3b64-e889-492a-b473-5a3b665c83d6/image.png" alt=""></p>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/47cbadc2-b740-4049-8f08-e508a47dbb3c/image.png" alt=""></p>
<p>잘 삭제가 된 것을 확인할 수 있다.</p>
<p>위 내용이 실행된 결과를 아까 따로 설치한 django-celery-results를 통해서도 알 수 있는데</p>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/0aa048f8-6ae8-46fd-a52d-7b5401c4a300/image.png" alt=""></p>
<p>두 번째 열에 해당하는 내용이 방금 우리가 해본 내용이다. celery를 실행한 쉘에서 None값이 반환되었으므로 django_celery_results_taskresult 테이블에는 null이 저장되는 것!</p>
]]></description>
        </item>
        <item>
            <title><![CDATA[DRF에서 Blob 데이터 처리]]></title>
            <link>https://velog.io/@taegong_s/DRF%EC%97%90%EC%84%9C-Blob-%EB%8D%B0%EC%9D%B4%ED%84%B0-%EC%B2%98%EB%A6%AC</link>
            <guid>https://velog.io/@taegong_s/DRF%EC%97%90%EC%84%9C-Blob-%EB%8D%B0%EC%9D%B4%ED%84%B0-%EC%B2%98%EB%A6%AC</guid>
            <pubDate>Wed, 24 Apr 2024 01:35:36 GMT</pubDate>
            <description><![CDATA[<p>프로젝트를 진행하던 도중, 데이터베이스의 blob 필드를 사용해야 할 일이 생겼습니다.</p>
<p>처음에는 클라이언트가 작성한 글을 저장하려고 할 때 장고에서 CharField로 생성한 필드에 저장하려고 했으나, CharField는 max_length를 반드시 명시해줘야 하기 때문에 글이 한도 길이를 지정하고 싶지 않았고, 글의 길이가 짧을 경우 패딩 문자가 저장되는 용량이 아까웠습니다.</p>
<h3 id="그래서-어떻게-할-것인가">그래서 어떻게 할 것인가?</h3>
<p>클라이언트 측에서 보내준 글 내용을 Binary 값으로 바꾸어 데이터베이스의 Blob 필드에 저장하고자 합니다!</p>
<p>다르게 생각해본 방법은 파일로 만들어서 보관해볼까도 싶었지만, 서비스의 특성 상 읽어오는 작업이 많고 작성할 때의 작업이 너무 오래걸릴 것 같아 폐기했습니다.</p>
<h3 id="간단할-줄-알았는데">간단할 줄 알았는데..</h3>
<p>장고에서 모델을 설정할 때 CharField로 지정한 부분을 BinaryField로 바꾸고 저장하면 바로 끝나겠다 싶었습니다.</p>
<pre><code class="language-python"># Model 필드 BinaryField로 수정
explain = models.BinaryField()</code></pre>
<p>수정한 뒤에 Postman으로 데이터를 보내 테스트를 해봤으나,
serialzier의 is_valid()에서 계속 False가 떴습니다.</p>
<p>따로 알아본 결과 DRF의 Serializer에서는 BinaryField를 지원해주지 않는다! 라는 사실을 알게되었고, 어떻게 할까 생각하던 중 커스텀필드를 작성하자는 생각을 하게 되었습니다.</p>
<pre><code class="language-python">    def to_internal_value(self, data):
        &quot;&quot;&quot;
        Transform the *incoming* primitive data into a native value.
        &quot;&quot;&quot;
        raise NotImplementedError(
            &#39;{cls}.to_internal_value() must be implemented for field &#39;
            &#39;{field_name}. If you do not need to support write operations &#39;
            &#39;you probably want to subclass `ReadOnlyField` instead.&#39;.format(
                cls=self.__class__.__name__,
                field_name=self.field_name,
            )
        )

    def to_representation(self, value):
        &quot;&quot;&quot;
        Transform the *outgoing* native value into primitive data.
        &quot;&quot;&quot;
        raise NotImplementedError(
            &#39;{cls}.to_representation() must be implemented for field {field_name}.&#39;.format(
                cls=self.__class__.__name__,
                field_name=self.field_name,
            )
        )</code></pre>
<p>위 내용은 rest_framwork.fields의 Field 클래스 내부 함수입니다.
반드시 두 함수는 오버라이딩 되어 재정의 되어야 필드로 쓸 수 있죠!</p>
<blockquote>
<p>fields.py에서 미리 정의되어 있는 다른 필드들을 봐도 모두 오버라이딩 되어있습니다!</p>
</blockquote>
<h3 id="해결방법">해결방법</h3>
<p><strong>to_internal_value</strong> 함수는 외부에서 데이터베이스로, <strong>to_representation</strong> 함수는 데이터베이스에서 외부로 값을 보낼 때 호출되는 함수들입니다.</p>
<p>우리는 클라이언트에게서 str값을 받아 binary형태로 바꾸어 저장할 것이므로
<strong>to_internal_value</strong> 함수에서는 encode, <strong>to_representation</strong> 함수에서는 decode를 해주면 되겠죠?</p>
<pre><code class="language-python"># 제가 새로 생성한 바이너리 필드입니다!
class MyBinaryField(serializers.Field):
    def to_representation(self, value):
        return value.decode(&quot;utf-8&quot;)

    def to_internal_value(self, data):
        return data.encode(&quot;utf-8&quot;)</code></pre>
<p>이제 시리얼라이저에서 이 필드로 정의해주면 str값을 blob형태로 데이터베이스에 저장하고 blob 값을 str형태로 받아올 수 있습니다.</p>
<pre><code class="language-python">class TeamCreateSerializer(serializers.ModelSerializer):
    leader_id = serializers.IntegerField()
    explain = MyBinaryField()

    class Meta:
        model = Team
        fields = [&quot;id&quot;, &quot;leader_id&quot;, &quot;name&quot;, &quot;explain&quot;, &quot;genre&quot;, &quot;like&quot;, &quot;version&quot;, &quot;image&quot;]</code></pre>
<p>해결완료!</p>
]]></description>
        </item>
        <item>
            <title><![CDATA[DRF 쿠키 samesite]]></title>
            <link>https://velog.io/@taegong_s/DRF-%EC%BF%A0%ED%82%A4-samesite</link>
            <guid>https://velog.io/@taegong_s/DRF-%EC%BF%A0%ED%82%A4-samesite</guid>
            <pubDate>Mon, 22 Apr 2024 18:45:13 GMT</pubDate>
            <description><![CDATA[<h2 id="문제상황">문제상황</h2>
<p>프론트와 로그인 기능 관련해서 API 연동 중에 도메인이 다름에 따라서 쿠키가 전달이 잘 안 됐다.</p>
<blockquote>
<p><strong>Chrome</strong>
기본적으로 크롬에서는 쿠키의 samesite 설정을 &quot;Lax&quot;로 해놓기 때문에 현 상황에서는 samesite를 None으로 지정해줄 필요가 있다!</p>
</blockquote>
<h3 id="첫-번째-코드">첫 번째 코드</h3>
<pre><code class="language-python">response.set_cookie(
                    &quot;access&quot;, response.headers.get(&quot;access&quot;, None), httponly=True, samesite=None, secure=True
                )</code></pre>
<p>빠르게 구글링을 통해 set_cookie 부분을 작성했다.
그런데 프론트 측에서 확인해본 결과 cookie의 samesite 속성이 제대로 지정되지 않았다. 크롬에서 기본으로 세팅되는 Lax도 안 되어 있었다는 뜻!</p>
<h4 id="왜-그랬을까요">왜 그랬을까요?</h4>
<p>이유를 알아보기 위해서 라이브러리를 파고 들어가봅시다.</p>
<pre><code class="language-python">def set_cookie(
        self,
        key,
        value=&quot;&quot;,
        max_age=None,
        expires=None,
        path=&quot;/&quot;,
        domain=None,
        secure=False,
        httponly=False,
        samesite=None,
    ):
        &quot;&quot;&quot;
        Set a cookie.

        ``expires`` can be:
        - a string in the correct format,
        - a naive ``datetime.datetime`` object in UTC,
        - an aware ``datetime.datetime`` object in any time zone.
        If it is a ``datetime.datetime`` object then calculate ``max_age``.

        ``max_age`` can be:
        - int/float specifying seconds,
        - ``datetime.timedelta`` object.
        &quot;&quot;&quot;
        self.cookies[key] = value
        if expires is not None:
            if isinstance(expires, datetime.datetime):
                if timezone.is_naive(expires):
                    expires = timezone.make_aware(expires, datetime.timezone.utc)
                delta = expires - datetime.datetime.now(tz=datetime.timezone.utc)
                # Add one second so the date matches exactly (a fraction of
                # time gets lost between converting to a timedelta and
                # then the date string).
                delta += datetime.timedelta(seconds=1)
                # Just set max_age - the max_age logic will set expires.
                expires = None
                if max_age is not None:
                    raise ValueError(&quot;&#39;expires&#39; and &#39;max_age&#39; can&#39;t be used together.&quot;)
                max_age = max(0, delta.days * 86400 + delta.seconds)
            else:
                self.cookies[key][&quot;expires&quot;] = expires
        else:
            self.cookies[key][&quot;expires&quot;] = &quot;&quot;
        if max_age is not None:
            if isinstance(max_age, datetime.timedelta):
                max_age = max_age.total_seconds()
            self.cookies[key][&quot;max-age&quot;] = int(max_age)
            # IE requires expires, so set it if hasn&#39;t been already.
            if not expires:
                self.cookies[key][&quot;expires&quot;] = http_date(time.time() + max_age)
        if path is not None:
            self.cookies[key][&quot;path&quot;] = path
        if domain is not None:
            self.cookies[key][&quot;domain&quot;] = domain
        if secure:
            self.cookies[key][&quot;secure&quot;] = True
        if httponly:
            self.cookies[key][&quot;httponly&quot;] = True
        if samesite:
            if samesite.lower() not in (&quot;lax&quot;, &quot;none&quot;, &quot;strict&quot;):
                raise ValueError(&#39;samesite must be &quot;lax&quot;, &quot;none&quot;, or &quot;strict&quot;.&#39;)
            self.cookies[key][&quot;samesite&quot;] = samesite</code></pre>
<p>위 코드는 장고의 HttpResponseBase 클래스의 set_cookie 함수 전문입니다.</p>
<p>인자를 받는 부분에서부터 samesite=None이 default 값이네요. 벌써 쎄하죠?</p>
<p>아래쪽으로 내려와서 samesite에 관련된 부분을 다시 한 번 봅시다.</p>
<pre><code class="language-python">if samesite:
            if samesite.lower() not in (&quot;lax&quot;, &quot;none&quot;, &quot;strict&quot;):
                raise ValueError(&#39;samesite must be &quot;lax&quot;, &quot;none&quot;, or &quot;strict&quot;.&#39;)
            self.cookies[key][&quot;samesite&quot;] = samesite</code></pre>
<p>아하, samesite 속성을 지정해줄 때 None 값으로 넣어서 if문 자체가 통과가 안 됐었네요..</p>
<p>내용을 참고해보니 none을 <strong>문자열</strong>로 넣어야 했습니다!</p>
<h4 id="참고-django-docs">참고 (Django Docs)</h4>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/6f95d1dc-5a0f-4e9c-8591-aed99acd1ae9/image.png" alt=""></p>
<p>여기서도 set_cookie에서 samesite를 어떻게 지정해야하는지 나오네요.
역시 공식 독스..</p>
<h3 id="수정-후-코드">수정 후 코드</h3>
<pre><code class="language-python">response.set_cookie(
                    &quot;access&quot;, response.headers.get(&quot;access&quot;, None), httponly=True, samesite=&quot;none&quot;, secure=True
                )</code></pre>
<p>간단한 수정으로 문제 해결 완료!</p>
<p>역시 가장 확실한 방법은 공식 docs를 확인하거나 직접 라이브러리를 까보는 것..
알량한 구글링이 아니라 내가 직접 체득하는게 최고다!</p>
]]></description>
        </item>
        <item>
            <title><![CDATA[DRF JWT 인증/인가]]></title>
            <link>https://velog.io/@taegong_s/DRF-JWT-%EC%9D%B8%EC%A6%9D%EC%9D%B8%EA%B0%80</link>
            <guid>https://velog.io/@taegong_s/DRF-JWT-%EC%9D%B8%EC%A6%9D%EC%9D%B8%EA%B0%80</guid>
            <pubDate>Sat, 20 Apr 2024 23:32:00 GMT</pubDate>
            <description><![CDATA[<h4 id="이번에-새로운-프로젝트를-진행하고-있습니다">이번에 새로운 프로젝트를 진행하고 있습니다!</h4>
<p>FastAPI를 사용하다가 장고도 한 번 써보고 싶은 마음에 무작정 돌입하게 됐는데, 건드는게 생각보다 쉽지 않아 여기저기 공부하면서 하고 있네요.. 장고가 처음이다보니 부족한 지식으로 잘못 기술되거나 보셨을 때 개선의 여지가 있다면 코멘트 부탁드리겠습니다! (__) </p>
<p>이번 프로젝트에서 적용한 인증/인가 방식을 한 번 소개해볼까 합니다.
글에서는 단순히 로직만 소개하는 것이 아니라 나름대로 구현할 때 살펴봤던 점도 함께 기술해보겠습니다.</p>
<h2 id="흐름도">흐름도</h2>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/f1927c1c-ddf5-4abb-824a-a7b3b5576b71/image.png" alt=""></p>
<p>전체적인 흐름도입니다. 위에서는 FrontEnd만 표기되어 있지만, 사실 App과 통신할 때 Authorization 헤더를 사용하고 웹에서 프론트와 통신할 때는 쿠키를 사용했습니다. 글 읽으시다가 참고하기 좋으시라고 미리 올려두겠습니다.</p>
<blockquote>
<p>이번에 앱과 웹 동시에 통신하는 백엔드를 처음 개발해보는데 이게 통상적인 방법인지는 모르겠습니다만 커스텀 헤더를 추가해서 앱에서 온 요청인지 웹에서 온 요청인지 구분하고 있습니다. (api 따로 추가 X)</p>
</blockquote>
<h2 id="커스텀-미들웨어">커스텀 미들웨어</h2>
<p>App에서 통신할 때는 굳이 쿠키를 이용하지 않고 JWT를 주고 받는 방식,
Web에서 통신할 때는 쿠키에 JWT를 넣어 주고 받는 방식을 구현하고자 했는데요.
이를 구현하기 위해서 미들웨어를 따로 추가해줬습니다.</p>
<pre><code class="language-python">from django.utils.deprecation import MiddlewareMixin
from rest_framework import status


class AttachJWTFromHeaderToCookieMiddleware(MiddlewareMixin):
    def __init__(self, get_response):
        super().__init__(get_response)
        self.API = [&quot;github/login&quot;, &quot;callback/github&quot;]
        self.REFRESH = &quot;token/refresh&quot;

    def process_response(self, request, response):
        path = request.path_info
        is_valid = any(api in path for api in self.API)
        is_refresh = True if self.REFRESH in path else False
        if (
            is_valid
            or is_refresh
            and (response.status_code == status.HTTP_200_OK or response.status_code == status.HTTP_201_CREATED)
        ):
            if request.META.get(&quot;HTTP_X_FROM&quot;, None) == &quot;web&quot;:
                response.set_cookie(&quot;access&quot;, response.headers.get(&quot;access&quot;, None), httponly=True)
                if is_valid:
                    del response.headers[&quot;access&quot;]

                response.content = response.render().rendered_content

        return response


class AttachJWTFromCookieToHeaderMiddleware(MiddlewareMixin):
    def __init__(self, get_response):
        super().__init__(get_response)
        self.NOT_API = [&quot;github/login&quot;, &quot;callback/github&quot;]

    def process_request(self, request):
        path = request.path_info
        is_valid = any(api in path for api in self.NOT_API)

        if not is_valid:
            if request.META.get(&quot;HTTP_X_FROM&quot;, None) == &quot;web&quot;:
                request.META[&quot;HTTP_AUTHORIZATION&quot;] = f&quot;Bearer {request.COOKIES.get(&#39;access&#39;, None)}&quot;
</code></pre>
<h4 id="attachjwtfromheadertocookiemiddleware">AttachJWTFromHeaderToCookieMiddleware</h4>
<p>말 그대로 헤더에서 쿠키로 JWT를 붙여주는 미들웨어입니다.
서버에서 클라이언트 측으로 JWT를 보낼 때는 현재 회원가입/로그인, 토큰 리프레싱 API 밖에 없기 때문에 요청 Path에 위 태스크의 API 주소가 들어있다면 실행되도록 커스텀했습니다.</p>
<p>웹에서 온 요청일 때만 리스폰스의 헤더에서 쿠키로 JWT를 옮겨줍니다.</p>
<h4 id="attachjwtfromcookietoheadermiddleware">AttachJWTFromCookieToHeaderMiddleware</h4>
<p>위의 미들웨어와 정반대의 일을 하는 미들웨어입니다.
클라이언트 쪽에서 JWT가 들어오는 요청은 회원가입/로그인 요청을 제외한 모든 요청입니다.</p>
<p>웹에서 온 요청일 때만 리퀘스트의 쿠키에서 헤더로 JWT를 옮겨줍니다.</p>
<h3 id="왜-굳이-쿠키에서-헤더로-다시-보냈나요"><em>왜? 굳이 쿠키에서 헤더로 다시 보냈나요?</em></h3>
<p>라는 생각을 하신 분들이 계실 수도 있는데요.</p>
<p>DRF에서 JWT를 사용하면서 사용자를 식별하기 위해</p>
<pre><code class="language-python">rest_framework_simplejwt.authentication.JWTAuthentication</code></pre>
<p>를 사용합니다. 저도 이 인증방식을 기본으로 채택해놨구요.
이 JWTAuthentication을 조금 살펴봅시다.</p>
<pre><code class="language-python">def authenticate(self, request: Request) -&gt; Optional[Tuple[AuthUser, Token]]:
        header = self.get_header(request)

        if header is None:
            return None

        raw_token = self.get_raw_token(header)
        if raw_token is None:
            return None

        validated_token = self.get_validated_token(raw_token)

        return self.get_user(validated_token), validated_token

def get_header(self, request: Request) -&gt; bytes:
        &quot;&quot;&quot;
        Extracts the header containing the JSON web token from the given
        request.
        &quot;&quot;&quot;
        header = request.META.get(api_settings.AUTH_HEADER_NAME)
        if isinstance(header, str):
            # Work around django test client oddness
            header = header.encode(HTTP_HEADER_ENCODING)</code></pre>
<p>JWTAuthentication의 일부 함수들인데요.
음.. 쿠키로 인증하는 내용은 없고 읽어보니 기본 인증 헤더(Authorization)로부터 JWT를 가져오네요. 물론 JWTAuthentication을 상속받아 쿠키에서도 토큰을 가져오도록 커스텀 할 수도 있겠습니다만은, 저는 미들웨어에서 그냥 헤더로 넣어버리는 방법을 선택해봤습니다.</p>
<blockquote>
<p>JWTAuthentication를 통과하고나면 request.user에 유저 객체 또는 AnonymousUser가 들어갑니다. 이를 통해 APIView에서 permission_classes로 권한 설정을 쉽게 할 수 있었어요!</p>
</blockquote>
<pre><code class="language-python">    def get_validated_token(self, raw_token: bytes) -&gt; Token:
        &quot;&quot;&quot;
        Validates an encoded JSON web token and returns a validated token
        wrapper object.
        &quot;&quot;&quot;
        messages = []
        for AuthToken in api_settings.AUTH_TOKEN_CLASSES:
            try:
                return AuthToken(raw_token)
            except TokenError as e:
                messages.append(
                    {
                        &quot;token_class&quot;: AuthToken.__name__,
                        &quot;token_type&quot;: AuthToken.token_type,
                        &quot;message&quot;: e.args[0],
                    }
                )

        raise InvalidToken(
            {
                &quot;detail&quot;: _(&quot;Given token not valid for any token type&quot;),
                &quot;messages&quot;: messages,
            }
        )
</code></pre>
<p>authenticate 함수 내에서 실행되는 토큰 확인 함수입니다. 여기서 정상적인 토큰인지, 토큰이 만료되었는지 확인하게 됩니다. simple_jwt 라이브러리에서 정의된 AuthToken 클래스에서 확인합니다. 여기서 위 흐름도의 401을 반환하는 경우의 수를 충족하겠죠? get_user 함수를 통해 유저 객체를 반환해줍니다.</p>
<h2 id="드디어-view로">드디어 View로..</h2>
<p>위에서 미리 설명한대로 APIView 속에서 유저의 권한을 쉽게 확인할 수 있습니다.
아래에 View를 두 개 올려둘게요.</p>
<pre><code class="language-python">class UserManageView(APIView):
    serializer_class = ApproveUserSerializer
    permission_classes = [IsAdminUser]

    def get(self, request):
        queryset = User.objects.filter(is_approved=False)
        if queryset:
            serializer = self.serializer_class(queryset, many=True)
            return Response(serializer.data)
        else:
            return Response({&quot;error&quot;: &quot;No Content&quot;}, status=status.HTTP_404_NOT_FOUND)

    def patch(self, request, *args, **kwargs):
        user_id = kwargs.get(&quot;user_id&quot;)
        user = User.objects.get(id=user_id)

        serializer = ApproveUserSerializer(user, data={&quot;is_approved&quot;: True}, partial=True)
        if serializer.is_valid():
            serializer.save()
            return Response({&quot;success&quot;: True}, status=status.HTTP_202_ACCEPTED)

        return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

class GithubOAuthCallBackView(APIView):
    permission_classes = [AllowAny]

    def get(self, request: Request):
        if code := request.GET.get(&quot;code&quot;):
            response = self.send_code_to_github_login_view(code)
            if response.status_code == 200:
                return Response(response.json(), status=status.HTTP_200_OK)
            return Response(
                {&quot;error&quot;: &quot;Failed to process with GithubLoginView&quot;},
                status=response.status_code,
            )

    def send_code_to_github_login_view(self, code: str):
        url = &quot;http://localhost:8000/api/auth/github/login&quot;
        payload = {&quot;code&quot;: code}
        headers = {&quot;Content-Type&quot;: &quot;application/json&quot;}
        response = requests.post(url, json=payload, headers=headers)

        return response</code></pre>
<p>두 뷰가 하는 일에 대해서는 우선 제쳐두고, permission_classes 부분을 보도록 합시다.</p>
<p>각각 AllowAny, IsAdminUser가 있죠?</p>
<p>APIView에서 메소드들에 접근하기 전에</p>
<pre><code class="language-python">def initial(self, request, *args, **kwargs):
        &quot;&quot;&quot;
        Runs anything that needs to occur prior to calling the method handler.
        &quot;&quot;&quot;
        self.format_kwarg = self.get_format_suffix(**kwargs)

        # Perform content negotiation and store the accepted info on the request
        neg = self.perform_content_negotiation(request)
        request.accepted_renderer, request.accepted_media_type = neg

        # Determine the API version, if versioning is in use.
        version, scheme = self.determine_version(request, *args, **kwargs)
        request.version, request.versioning_scheme = version, scheme

        # Ensure that the incoming request is permitted
        self.perform_authentication(request)
        self.check_permissions(request)
        self.check_throttles(request)</code></pre>
<p>이 곳을 먼저 지나갑니다. 저희가 이번에 볼 곳은 저 check_permissions인데요.
저희가 설정해놓은 permission_classes를 검증해보면서 권한을 확인해봅니다.</p>
<p>각각의 permission_classes 내부에 오버라이딩 되어 있는 has_permission 메소드를 통해 True 또는 False를 반환하게 되어 있어요.</p>
<pre><code class="language-python">class IsAdminUser(BasePermission):
    &quot;&quot;&quot;
    Allows access only to admin users.
    &quot;&quot;&quot;

    def has_permission(self, request, view):
        return bool(request.user and request.user.is_staff)</code></pre>
<p>위에 설정되어 있는 IsAdminUser의 모습입니다.
JWTAuthentication을 통해 식별된 유저가 request.user에 들어있을 거에요.
식별된 유저의 is_staff 컬럼이 True, False인지에 따라서 권한이 정해지는 방식입니다!</p>
<p>이 형식을 따라서 우리도 쉽게 권한 설정을 커스터마이징 할 수 있습니다.</p>
<pre><code class="language-python"># 이건 제가 사용하고 있는 커스텀 권한입니다!
from rest_framework.permissions import BasePermission


class IsApprovedUser(BasePermission):
    def has_permission(self, request, view):
        return bool(request.user and request.user.is_approved)
</code></pre>
<blockquote>
<p>has_permission 함수에서 False를 반환하면, 403 상태 코드를 반환하도록 설계되어 있습니다!</p>
</blockquote>
<hr>
<h2 id="토큰-리프레싱">토큰 리프레싱</h2>
<p>401 상태 코드를 <strong>처음</strong> 리턴 받았다면, 클라이언트 쪽에서는 토큰 리프레싱 해주는 API로 다시 통신을 시도하도록 했습니다.</p>
<p>제가 사용한 simple_jwt 라이브러리에서는 제공해주는 뷰를 통해 이를 쉽게 할 수 있는데요.</p>
<pre><code class="language-python">class WtntTokenRefreshView(TokenRefreshView):
    def post(self, request: Request, *args, **kwargs):
        _, access_token = request.META.get(&quot;HTTP_AUTHORIZATION&quot;).split(&quot; &quot;)
        user_id = AccessToken(access_token, verify=False).payload.get(&quot;user_id&quot;)

        refresh_token = cache.get(user_id)
        if not refresh_token:
            return Response({&quot;error&quot;: &quot;Expired Refresh Token&quot;}, status=status.HTTP_401_UNAUTHORIZED)
        serializer = self.get_serializer(data={&quot;refresh&quot;: refresh_token})

        try:
            serializer.is_valid(raise_exception=True)
        except TokenError as e:
            raise InvalidToken(e.args[0])

        token = serializer.validated_data
        response = Response({&quot;success&quot;: True}, status=status.HTTP_200_OK)

        response.headers[&quot;access&quot;] = token[&quot;access&quot;]

        return response</code></pre>
<p>원래 이 뷰에서는, 기본으로 지정되어 있는 TokenRefreshSerializer에 RefreshToken을 데이터로 넣어주기만 하면 토큰을 리프레싱 해줍니다. 하지만 이번 프로젝트를 진행하면서 RefreshToken은 클라이언트 측에 전해주지 않고 서버 측에서 관리하기로 했는데요. 따라서, AccessToken의 페이로드에 들어있는 유저 정보를 통해 RefreshToken을 식별해서 리프레싱해줘야 합니다.</p>
<p>simple-jwt에서는 AccessToken을 디코드할 때 만료기한이 지났을 경우 바로 401 상태 코드를 반환하게 되어 있습니다.</p>
<blockquote>
<p>유저가 보낸 AccessToken은 이미 만료되어 있기 때문에 이 뷰에 접근을 요청했잖아요!!</p>
</blockquote>
<p>네, 그래서 verify=False 옵션을 반드시 명시해서 만료된 AccessToken에서도 유저 정보를 식별할 수 있어야합니다.</p>
<h4 id="refreshtoken은-어디에서-받아-오나요">RefreshToken은 어디에서 받아 오나요?</h4>
<p>AccessToken의 만료기한을 30분으로 정해놨는데, RefreshToken을 현재 사용중인 MySQL의 테이블에 넣어놓고 관리하면 자주 I/O가 생길 것 같았고 RefreshToken의 만료기한이 지났을 때 따로 삭제해줘야 했습니다.(스케쥴러 등을 통해)</p>
<p>그래서 Redis에 토큰을 저장하고, expire로 만료기한을 지정하는 방법을 사용해보기로 했습니다.</p>
<p>AccessToken의 payload에서 가져온 유저의 식별정보로 RefreshToken을 DB에서 받아옵니다.</p>
<p>RefreshToken이 존재한다면 시리얼라이저를 통해 새로운 AccessToken을 발급해주고, 존재하지 않는다면 두 번째 401 상태 코드를 반환합니다.</p>
<p>이렇게 인증/인가 로직을 구현해봤습니다. 제 부족한 점이나 궁금하신 점이 있으시다면 편하게 의견 남겨주세요!!</p>
]]></description>
        </item>
        <item>
            <title><![CDATA[캡스톤 디자인 마무리]]></title>
            <link>https://velog.io/@taegong_s/%EC%BA%A1%EC%8A%A4%ED%86%A4-%EB%94%94%EC%9E%90%EC%9D%B8-%ED%9A%8C%EA%B3%A0</link>
            <guid>https://velog.io/@taegong_s/%EC%BA%A1%EC%8A%A4%ED%86%A4-%EB%94%94%EC%9E%90%EC%9D%B8-%ED%9A%8C%EA%B3%A0</guid>
            <pubDate>Thu, 04 Jan 2024 12:34:14 GMT</pubDate>
            <description><![CDATA[<p>이번 글을 마지막으로 학교에서 진행했던 캡스톤 디자인 프로젝트 관련 포스트를 마무리 짓도록 하겠다!</p>
<h3 id="1-백엔드">1. 백엔드..?</h3>
<p>학부생 수준에서 AI를 조금 더 깊이 알아봤다고 할 정도의 얕은 지식과 더불어 백엔드 웹프레임워크에 대한 지식은 전무하다싶은 상황에서 처음으로 진행해본 프로젝트였다. BE 부분을 맡아서 해야한다는 사실이 내게는 트러블 그 자체였다. 다행히 FastAPI는 공식문서가 정리가 잘 되어있는 편이었고 파이썬 웹백엔드에 대한 강의를 따로 수강하면서 프로젝트를 진행했더니 어떻게든 완성한 것 같다.</p>
<h3 id="2-ai-완성도">2. AI 완성도</h3>
<p>자료 양이 방대해서 그런지 예상했던 것보다 더 번역이 잘 됐다 ㅋㅋ 데이터셋에 이런 문구가 존재했구나..? 싶었던 단어와 어미가 바뀌는 걸 보고 신기했다. 내가 해도 이정돈데 정말 전문적으로 공부한 사람들이 각 잡고 만들면 어떤 결과가 나올지 궁금하긴 하다.</p>
<blockquote>
<p>Ex) 그런건 여기 천지삐까리다! -&gt; 그런건 여기에 많이 있다!</p>
</blockquote>
<h3 id="3-결과물-스샷">3. 결과물 스샷</h3>
<h4 id="로그인창">로그인창 <img src="https://velog.velcdn.com/images/taegong_s/post/26fcc3c1-f14a-4199-82ea-7c81471f5dca/image.png" alt=""></h4>
<h4 id="로그인-후-번역기-창">로그인 후 번역기 창 <img src="https://velog.velcdn.com/images/taegong_s/post/ccb5d069-ce55-45f6-a16b-b6c8dba3ea82/image.png" alt=""></h4>
<h4 id="번역-및-검색-기록">번역 및 검색 기록<img src="https://velog.velcdn.com/images/taegong_s/post/673cf46f-daf3-491e-a412-d7f0a010ad67/image.png" alt=""></h4>
<h4 id="방명록">방명록<img src="https://velog.velcdn.com/images/taegong_s/post/a4c2f91c-9eec-4edc-aa97-f116540acfa0/image.png" alt=""><img src="https://velog.velcdn.com/images/taegong_s/post/e290f66f-47dc-4c06-910d-a78005908b0e/image.png" alt=""></h4>
<h3 id="4-아쉬웠던-점">4. 아쉬웠던 점</h3>
<ul>
<li><p>STT 기술도 활용해서 한국어 발화 데이터를 통해 사투리를 음성으로 입력 받아서 번역하는 기능까지 해보고 싶었는데 그 부분은 음성 데이터에 대한 지식 부족과 컴퓨팅 자원 부족으로 시도해보지 못 했다. 언젠가 시간과 자원이 갖춰진다면 시도해보고 싶다.</p>
</li>
<li><p>유저의 검색 기록 기반으로 데이터를 정리해서 추가적으로 모델을 계속 학습하는 파이프라인을 구축해보고 싶었으나 유저가 승인한 정보를 학습했을 때 AI에 긍정적으로 영향을 미칠지 확신하지 못해서 일단은 보류했다. (그저 검색 기록만으로 데이터를 구축하는 건 어려운 일이 아니다. 정보를 어떻게 정제할지 가이드라인을 정하지 못한 것..)</p>
</li>
<li><p>백엔드의 완성도가 꽤 부족한 것 같다. Github의 V1 부분을 보면 코드가 좀 많이 더럽다. 신경 쓰지 못한 부분과 몰랐던 부분이 많아서 가독성이 엄청 떨어진다. (그래서 V2로 리팩토링을 한 번 진행해봤다. 고수분들이 보시기엔 그래도 매~우 많이 부족할 것..)</p>
</li>
</ul>
<h3 id="5-결과">5. 결과!</h3>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/5b78ee5f-92d4-4999-bfc0-e517beb197fa/image.jpg" alt="">
<img src="https://velog.velcdn.com/images/taegong_s/post/98e0ef34-4706-4220-b9db-2660dd5b2872/image.jpg" alt=""></p>
<p>(다른 사람들 이름도 있어서 한 번에 다 가렸다..)</p>
<p>교수님들이 좋게 봐주셔서 다행히 단과대 및 학부 경진대회에서 모두 입상할 수 있었다!</p>
]]></description>
        </item>
        <item>
            <title><![CDATA[테스트 코드 작성]]></title>
            <link>https://velog.io/@taegong_s/%ED%85%8C%EC%8A%A4%ED%8A%B8-%EC%BD%94%EB%93%9C-%EC%9E%91%EC%84%B1</link>
            <guid>https://velog.io/@taegong_s/%ED%85%8C%EC%8A%A4%ED%8A%B8-%EC%BD%94%EB%93%9C-%EC%9E%91%EC%84%B1</guid>
            <pubDate>Thu, 04 Jan 2024 12:03:50 GMT</pubDate>
            <description><![CDATA[<h1 id="test-코드-작성에-앞서--">Test 코드 작성에 앞서 -</h1>
<hr>
<h2 id="환경-설정-변경">환경 설정 변경</h2>
<h4 id="테스트는-서비스에-활용되는-db가-아닌-test용-db를-사용하는-것이-좋다">테스트는 서비스에 활용되는 DB가 아닌 TEST용 DB를 사용하는 것이 좋다.</h4>
<p>따라서, 테스트와 서비스 상황을 구분해서 환경 설정을 따로 해주도록 하자.</p>
<h4 id="configpy">config.py</h4>
<p>환경설정을 좀 더 깔끔하게 해보고 싶어서 pydantic의 BaseSettings로 바꿔봤다. 이를 사용하기 위해서는 라이브러리를 하나 더 설치해줘야 한다.</p>
<pre><code>pip install pydantic-settings</code></pre><pre><code class="language-python">from pydantic_settings import BaseSettings
from typing import ClassVar
from functools import lru_cache


class BaseConfig(BaseSettings):
    # DB
    DB_USERNAME: str
    DB_HOST: str
    DB_PASSWORD: str
    DB_PORT: int

    # CRED
    SECRET_KEY: str
    ALGORITHM: str

    # PAPAGO
    CLIENT_ID: str
    CLIENT_SECRET: str

    # Test Config
    TESTING: ClassVar[bool] = False

    class Config:
        extra = &quot;ignore&quot;
        env_file = &quot;.env&quot;
        env_file_encoding = &quot;utf-8&quot;


class testSettings(BaseConfig):
    TESTING: ClassVar[bool] = True
    DB_TEST_NAME: str


class Settings(BaseConfig):
    TESTING: ClassVar[bool] = False
    DB_NAME: str


class isMain(BaseSettings):
    IS_MAIN: bool

    class Config:
        extra = &quot;ignore&quot;
        env_file = &quot;.env&quot;
        env_file_encoding = &quot;utf-8&quot;


@lru_cache
def get_settings():
    if is_main.IS_MAIN:
        return Settings()
    else:
        return testSettings()


is_main = isMain()
settings = get_settings()
</code></pre>
<ul>
<li><p>테스트 환경에서 사용할 testSettings, 서비스 환경에서 사용할 Settings로 나눴다.</p>
</li>
<li><p>.env 파일에 IS_MAIN이라는 환경변수를 추가해서 IS_MAIN이 true인 경우에만 Settings를 사용한다.</p>
</li>
<li><p>class Config에서 extra = &quot;ignore&quot;를 설정해주지 않으면 에러가 나니 꼭 추가해주도록 하자.</p>
</li>
<li><p>만약 테스트와 서비스 환경이 데이터베이스 이름뿐 아니라 다른 것도 상이하다면 묶어둔 BaseConfig에서 분리해서 작성해줘야 한다.</p>
</li>
</ul>
<hr>
<h2 id="pytest">Pytest</h2>
<pre><code>pip install pytest-asyncio pytest-cov</code></pre><blockquote>
<p>Pytest 라이브러리는 말 그대로 Python 코드를 테스트해보기 위한 라이브러리이다. 우리는 비동기식으로 테스트 코드를 실행하고 html로 정리된 테스트 결과를 보기 위해서 위의 두 라이브러리를 설치하도록 하겠다.</p>
</blockquote>
<p>Pytest를 설치하고 나면 pytest 명령어를 콘솔에서 사용할 수 있는데, 명령어를 실행했을 때 test_*.py를 수집해서 안의 코드들을 테스트하게 된다.</p>
<hr>
<h1 id="test">Test</h1>
<blockquote>
<p>FASTAPI에서 TestClient를 제공해주지만, 필자가 알고 있는 지식(?)으로는 비동기 방식으로 지원해주지 않기 때문에 httpx로 AsyncClient를 생성해서 비동기 방식으로 테스트를 진행하겠다.</p>
</blockquote>
<h2 id="test-코드">Test 코드</h2>
<h3 id="conftestpy">conftest.py</h3>
<pre><code class="language-python">import bcrypt
import pytest_asyncio
import json
from httpx import AsyncClient

from app import main, models
from app.database import engine, get_db
from app.config import settings


@pytest_asyncio.fixture(scope=&quot;session&quot;)
def app():
    if not settings.TESTING:
        raise SystemError(&quot;TESTING environment must be set true&quot;)

    return main.app

@pytest_asyncio.fixture
async def session():
    db = next(get_db())
    try:
        yield db
    finally:
        db.close()


@pytest_asyncio.fixture
async def client(app):
    async with AsyncClient(app=app, base_url=&quot;http://test/v2&quot;) as ac:
        models.Base.metadata.drop_all(bind=engine)
        models.Base.metadata.create_all(bind=engine)

        yield ac</code></pre>
<ul>
<li><p>FIXTURE란 Pytest에서 반복적으로 생성되어야 하는 아이템을 매개변수로 불러와 생성할 수 있게 해주는 일종의 모듈이라고 볼 수 있다.</p>
</li>
<li><p>FIXTURE에서 FIXTURE를 불러와서 다른 FIXTURE를 만들 수도 있다.</p>
</li>
<li><p>위 세 개의 FIXTURE로 만들어놓은 API와 데이터베이스를 연결하고, client를 통해서 API에 접근할 수 있게 작성하였다.</p>
</li>
</ul>
<hr>
<h3 id="test_userpy">test_user.py</h3>
<blockquote>
</blockquote>
<p>conftest.py에 유저를 DB에 등록하는 FIXTURE를 생성하자.</p>
<pre><code class="language-python">@pytest_asyncio.fixture
def user(session) -&gt; models.User:
    salt_value = bcrypt.gensalt()
    pw = bcrypt.hashpw(&quot;testpw&quot;.encode(), salt_value)
    row = models.User(password=pw, email=&quot;test@sample.com&quot;)
    session.add(row)
    session.commit()
    return row</code></pre>
<pre><code class="language-python">import pytest
import json


# 유저 생성 테스트
@pytest.mark.asyncio
async def test_add_user(client):
    body = {&quot;email&quot;: &quot;sample@sample.com&quot;, &quot;password&quot;: &quot;samplepw&quot;}
    r = await client.post(&quot;/users/register&quot;, data=json.dumps(body))
    data = r.json()

    assert r.status_code == 201
    assert data.get(&quot;email&quot;) == body[&quot;email&quot;]

# 유저 생성 에러 테스트 (너무 긴 이메일)
@pytest.mark.asyncio
async def test_add_user_failed_by_long_email(client):
    body = {&quot;email&quot;: &quot;longlonglonglonglonglonglonglonglongmail@sample.com&quot;, &quot;password&quot;: &quot;samplepw&quot;}
    r = await client.post(&quot;/users/register&quot;, data=json.dumps(body))

    assert r.status_code == 422

# 유저 이메일 중복 확인 에러 테스트 (중복 이메일)
@pytest.mark.asyncio
async def test_email_duplicated_exist(client, user):
    email = &quot;test@sample.com&quot;
    r = await client.get(f&quot;/users/duplicated?email={email}&quot;)
    data = r.json()

    assert r.status_code == 200
    assert data.get(&quot;duplicated&quot;)
    assert email == user.email

# 유저 이메일 중복 확인 테스트
@pytest.mark.asyncio
async def test_email_duplicated_no_exist(client):
    email = &quot;noexist@sample.com&quot;
    r = await client.get(f&quot;/users/duplicated?email={email}&quot;)
    data = r.json()

    assert r.status_code == 200
    assert not data.get(&quot;duplicated&quot;)

# 유저 로그인 테스트
@pytest.mark.asyncio
async def test_login_user(client, user):
    body = {&quot;email&quot;: user.email, &quot;password&quot;: &quot;testpw&quot;}
    r = await client.post(&quot;/users/login&quot;, data=json.dumps(body))
    data = r.json()

    assert r.status_code == 200
    assert data.get(&quot;user_id&quot;) == user.id

# 유저 로그인 에러 테스트 (잘못된 패스워드)
@pytest.mark.asyncio
async def test_login_user_invalid_pw(client, user):
    body = {&quot;email&quot;: user.email, &quot;password&quot;: &quot;wrongpw&quot;}
    r = await client.post(&quot;/users/login&quot;, data=json.dumps(body))
    data = r.json()

    assert r.status_code == 403
    assert data.get(&quot;detail&quot;) == &quot;Incorrect Password&quot;

# 유저 로그인 에러 테스트 (잘못된 이메일)
@pytest.mark.asyncio
async def test_login_user_invalid_email(client):
    body = {&quot;email&quot;: &quot;noemail&quot;, &quot;password&quot;: &quot;testpw&quot;}
    r = await client.post(&quot;/users/login&quot;, data=json.dumps(body))
    data = r.json()

    assert r.status_code == 403
    assert data.get(&quot;detail&quot;) == &quot;No Email Found&quot;
</code></pre>
<h4 id="함수-두-개를-가져와서-찬찬히-살펴보도록-하자">함수 두 개를 가져와서 찬찬히 살펴보도록 하자.</h4>
<pre><code class="language-python"># 유저 이메일 중복 확인 테스트
@pytest.mark.asyncio
async def test_email_duplicated_no_exist(client):
    email = &quot;noexist@sample.com&quot;
    r = await client.get(f&quot;/users/duplicated?email={email}&quot;)
    data = r.json()

    assert r.status_code == 200
    assert not data.get(&quot;duplicated&quot;)

# 유저 이메일 중복 확인 에러 테스트 (중복 이메일)
@pytest.mark.asyncio
async def test_email_duplicated_exist(client, user):
    email = &quot;test@sample.com&quot;
    r = await client.get(f&quot;/users/duplicated?email={email}&quot;)
    data = r.json()

    assert r.status_code == 200
    assert data.get(&quot;duplicated&quot;)
    assert email == user.email</code></pre>
<ol>
<li><p><strong>유저 이메일 중복 확인 테스트 (에러X)</strong>
이 코드는 이메일이 중복인지 아닌지 확인하는 엔드포인트의 테스트 코드다.
Fixture를 client만 받아왔으므로 User 테이블에는 어떠한 데이터도 들어있지 않을 것이므로, 어떠한 email을 검사하더라도 통과할 것이다.
따라서, status_code는 성공이므로 200, 반환값은 duplicated가 아니어야 한다. 이를 assert 함수를 통해서 검증하는 것.</p>
</li>
<li><p><strong>유저 이메일 중복 확인 테스트 (중복 이메일)</strong>
여기서는 user Fixture도 받아오게 되는데, 이러면 User 테이블에 Fixture에서 생성한 유저 데이터가 삽입되어 있는 상태이다. Fixture에서 생성한 유저의 이메일이 &quot;<a href="mailto:test@sample.com">test@sample.com</a>&quot;이므로 이와 같은 이메일로 엔드포인트에 접근해서 <strong>일부러</strong> 실패하는 것!</p>
</li>
</ol>
<hr>
<h3 id="test_translatepy">test_translate.py</h3>
<blockquote>
</blockquote>
<p>우리는 번역 작업을 로그인 시에만 가능하게 설정해놨으므로, 토큰 값도 같이 보내줘야한다. 따라서 conftest.py에 token을 반환해주는 FIXTURE를 추가하도록 하자.</p>
<pre><code class="language-python">@pytest_asyncio.fixture
async def token(client, user) -&gt; str:
    body = {&quot;email&quot;: user.email, &quot;password&quot;: &quot;testpw&quot;}
    r = await client.post(&quot;/users/login&quot;, data=json.dumps(body))
    return r.headers.get(&quot;access_token&quot;)</code></pre>
<pre><code class="language-python">import pytest
import json


@pytest.mark.asyncio
async def test_translate(client, token):
    body = {&quot;dialect&quot;: &quot;밥 뭇나?&quot;}
    headers = {&quot;Authorization&quot;: f&quot;Bearer {token}&quot;}
    r = await client.post(&quot;/AI&quot;, data=json.dumps(body), headers=headers)
    data = r.json()

    assert r.status_code == 200
    assert data.get(&quot;dialect&quot;) == body[&quot;dialect&quot;]</code></pre>
<hr>
<h3 id="test_itempy">test_item.py</h3>
<blockquote>
<p>생성된 TsItem을 삭제하는 엔드포인트를 테스트해보기 위해서 데이터베이스에 Item을 삽입하는 FIXTURE와 다른 토큰을 생성하는 user2, token2 FIXTURE를 만들어주자.</p>
</blockquote>
<pre><code class="language-python">@pytest_asyncio.fixture
def item(session, user) -&gt; models.TsItem:
    row = models.TsItem(
        dialect=&quot;dialect&quot;,
        standard=&quot;standard&quot;,
        english=&quot;english&quot;,
        chinese=&quot;chinese&quot;,
        japanese=&quot;japanese&quot;,
        owner_id=user.id,
    )
    session.add(row)
    session.commit()
    return row</code></pre>
<pre><code class="language-python">@pytest_asyncio.fixture
def user2(session) -&gt; models.User:
    salt_value = bcrypt.gensalt()
    pw = bcrypt.hashpw(&quot;testpw&quot;.encode(), salt_value)
    row = models.User(password=pw, email=&quot;test2@sample.com&quot;)
    session.add(row)
    session.commit()
    return row</code></pre>
<pre><code class="language-python">@pytest_asyncio.fixture
async def token2(client, user2) -&gt; str:
    body = {&quot;email&quot;: user2.email, &quot;password&quot;: &quot;testpw&quot;}
    r = await client.post(&quot;/users/login&quot;, data=json.dumps(body))
    return r.headers.get(&quot;access_token&quot;)</code></pre>
<pre><code class="language-python">import pytest
import json


@pytest.mark.asyncio
async def test_add_item(client, token):
    body = {
        &quot;dialect&quot;: &quot;dialect&quot;,
        &quot;standard&quot;: &quot;standard&quot;,
        &quot;english&quot;: &quot;english&quot;,
        &quot;chinese&quot;: &quot;chinese&quot;,
        &quot;japanese&quot;: &quot;japanese&quot;,
    }
    headers = {&quot;Authorization&quot;: f&quot;Bearer {token}&quot;}

    r = await client.post(&quot;/items&quot;, data=json.dumps(body), headers=headers)
    data = r.json()

    assert r.status_code == 201
    assert data.get(&quot;dialect&quot;) == body[&quot;dialect&quot;]
    assert data.get(&quot;standard&quot;) == body[&quot;standard&quot;]
    assert data.get(&quot;english&quot;) == body[&quot;english&quot;]
    assert data.get(&quot;chinese&quot;) == body[&quot;chinese&quot;]
    assert data.get(&quot;japanese&quot;) == body[&quot;japanese&quot;]


@pytest.mark.asyncio
async def test_add_item_failed_by_long_text(client, token):
    body = {
        &quot;dialect&quot;: &quot;a&quot; * 256,
        &quot;standard&quot;: &quot;standard&quot;,
        &quot;english&quot;: &quot;english&quot;,
        &quot;chinese&quot;: &quot;chinese&quot;,
        &quot;japanese&quot;: &quot;japanese&quot;,
    }
    headers = {&quot;Authorization&quot;: f&quot;Bearer {token}&quot;}

    r = await client.post(&quot;/items&quot;, data=json.dumps(body), headers=headers)

    assert r.status_code == 422


@pytest.mark.asyncio
async def test_get_items(client, item):
    r = await client.get(&quot;/items&quot;)
    data = r.json()

    assert r.status_code == 200
    assert data[0].get(&quot;id&quot;) == item.id


@pytest.mark.asyncio
async def test_delete_item(client, item, token):
    headers = {&quot;Authorization&quot;: f&quot;Bearer {token}&quot;}

    r = await client.delete(f&quot;/items/{item.id}&quot;, headers=headers)

    assert r.status_code == 202


@pytest.mark.asyncio
async def test_delete_item_failed_by_invaild_id(client, item, token):
    headers = {&quot;Authorization&quot;: f&quot;Bearer {token}&quot;}

    r = await client.delete(f&quot;/items/{item.id+1}&quot;, headers=headers)
    data = r.json()

    assert r.status_code == 400
    assert data.get(&quot;detail&quot;) == &quot;No Item Found&quot;


@pytest.mark.asyncio
async def test_delete_item_failed_by_wrong_user(client, item, token2):
    headers = {&quot;Authorization&quot;: f&quot;Bearer {token2}&quot;}

    r = await client.delete(f&quot;/items/{item.id}&quot;, headers=headers)
    data = r.json()

    assert r.status_code == 403
    assert data.get(&quot;detail&quot;) == &quot;It&#39;s not owner&quot;
</code></pre>
<hr>
<h3 id="test_guestbookpy">test_guestbook.py</h3>
<blockquote>
<p> 방명록을 생성하는 Fixture도 추가해주자.</p>
</blockquote>
<pre><code class="language-python">@pytest_asyncio.fixture
def guestbook(session, user) -&gt; models.GuestBook:
    row = models.GuestBook(message=&quot;test&quot;, message_owner=&quot;test_owner&quot;, owner_id=user.id)
    session.add(row)
    session.commit()
    return row</code></pre>
<pre><code class="language-python">import pytest
import json


@pytest.mark.asyncio
async def test_add_guestbook(client, token):
    body = {&quot;message&quot;: &quot;test&quot;, &quot;message_owner&quot;: &quot;testowner&quot;}
    headers = {&quot;Authorization&quot;: f&quot;Bearer {token}&quot;}

    r = await client.post(&quot;/guestbooks&quot;, data=json.dumps(body), headers=headers)
    data = r.json()

    assert r.status_code == 201
    assert data.get(&quot;message&quot;) == body[&quot;message&quot;]


@pytest.mark.asyncio
async def test_add_guestbook_failed_by_long_owner(client, token):
    body = {&quot;message&quot;: &quot;test&quot;, &quot;message_owner&quot;: &quot;a&quot; * 25}
    headers = {&quot;Authorization&quot;: f&quot;Bearer {token}&quot;}

    r = await client.post(&quot;/guestbooks&quot;, data=json.dumps(body), headers=headers)

    assert r.status_code == 422


@pytest.mark.asyncio
async def test_get_guestbooks(client, guestbook):
    r = await client.get(&quot;/guestbooks&quot;)
    data = r.json()

    assert r.status_code == 200
    assert data[0].get(&quot;id&quot;) == guestbook.id


@pytest.mark.asyncio
async def test_update_guestbook(client, token, guestbook):
    body = {&quot;message&quot;: &quot;modified_message&quot;}
    headers = {&quot;Authorization&quot;: f&quot;Bearer {token}&quot;}

    r = await client.put(f&quot;/guestbooks/{guestbook.id}&quot;, data=json.dumps(body), headers=headers)
    data = r.json()

    assert r.status_code == 202
    assert data.get(&quot;message&quot;) == body[&quot;message&quot;]


@pytest.mark.asyncio
async def test_update_guestbook_failed_by_invalid_id(client, token, guestbook):
    body = {&quot;message&quot;: &quot;modified_message&quot;}
    headers = {&quot;Authorization&quot;: f&quot;Bearer {token}&quot;}

    r = await client.put(f&quot;/guestbooks/{guestbook.id+1}&quot;, data=json.dumps(body), headers=headers)
    data = r.json()

    assert r.status_code == 400
    assert data.get(&quot;detail&quot;) == &quot;Guestbook Not Found&quot;


@pytest.mark.asyncio
async def test_update_guestbook_failed_by_wrong_user(client, token2, guestbook):
    body = {&quot;message&quot;: &quot;modified_message&quot;}
    headers = {&quot;Authorization&quot;: f&quot;Bearer {token2}&quot;}

    r = await client.put(f&quot;/guestbooks/{guestbook.id}&quot;, data=json.dumps(body), headers=headers)
    data = r.json()

    assert r.status_code == 403
    assert data.get(&quot;detail&quot;) == &quot;It&#39;s not owner&quot;
</code></pre>
<hr>
<h2 id="test-결과">Test 결과</h2>
<h3 id="test-시작에-앞서--">Test 시작에 앞서 -</h3>
<pre><code>최상위폴더
ㄴ .coveragerc
ㄴ pytet.ini</code></pre><h4 id="테스트에서-제외하고-싶은-파일이-있다면">테스트에서 제외하고 싶은 파일이 있다면?</h4>
<pre><code># .coveragerc

[run]
omit =
    app/config.py</code></pre><p>이곳에 스크립트 파일을 추가해주면 테스트에서 제외된다!</p>
<h4 id="테스트-설정">테스트 설정</h4>
<pre><code># pytest.ini

[pytest]
addopts = --cov=app --cov-report term --cov-report html</code></pre><ol>
<li><p>--cov=app
테스트를 진행할 범위를 지정하는 곳이다.</p>
</li>
<li><p>--cov-report term
pytest를 실행하는 터미널에서 결과를 출력하게 해주는 옵션이다.</p>
</li>
<li><p>--cov-report html
테스트를 완료한 뒤 결과보고서를 html 형식으로 저장하겠다는 옵션이다. 이 html 보고서는 htmlcov 폴더에 저장되어 있다.</p>
</li>
</ol>
<hr>
<h3 id="test-실행">Test 실행</h3>
<p>가상환경을 activate하고 pytest -sv를 입력해주자.
여기서 -sv 옵션은 테스트 진행 중 더 상세한 정보를 표기해준다.</p>
<h4 id="terminal">Terminal</h4>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/07df06a0-f4b6-489b-94b7-00f4c2943d48/image.png" alt=""></p>
<ul>
<li>터미널에서 테스트 결과를 띄워주는 모습이다. 커버리지를 최대한 높여보고자 테스트 코드를 작성했기 때문에 꽤나 퍼센테이지가 높은 것을 볼 수 있다 ㅎㅎ.</li>
</ul>
<h4 id="htmlcov">HtmlCov</h4>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/33d3520c-c803-4492-bb45-d74e1f64b382/image.png" alt=""></p>
<p>htmlcov 폴더의 index.html로 들어가보면 이런 화면이 나온다. 확실히 터미널보다 한 눈에 들어오는 것을 볼 수 있다.</p>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/2b4b617f-92cb-4634-816b-f861ec1f814e/image.png" alt=""></p>
<ul>
<li>그리고 터미널과 다르게 Html로 커버리지 레포트를 받으면 좋은 점이 첫 index 화면에서 스크립트 이름을 클릭하면 어떤 코드가 커버되었고 어떤 코드가 안 되었는지 시각적으로 쉽게 확인할 수 있다.</li>
</ul>
<hr>
<p>여기까지 해서 캡스톤 디자인 프로젝트 테스트 코드까지 모두 작성해보았다. 다음 포스트에서 이번 프로젝트를 회고해보면서 시리즈를 마치도록 하겠다.</p>
]]></description>
        </item>
        <item>
            <title><![CDATA[방명록 API]]></title>
            <link>https://velog.io/@taegong_s/%EB%B0%A9%EB%AA%85%EB%A1%9D-API</link>
            <guid>https://velog.io/@taegong_s/%EB%B0%A9%EB%AA%85%EB%A1%9D-API</guid>
            <pubDate>Mon, 18 Dec 2023 00:16:34 GMT</pubDate>
            <description><![CDATA[<h1 id="방명록">방명록</h1>
<h2 id="database-table-생성">Database Table 생성</h2>
<pre><code>APP/
ㄴ models.py --&gt; 작업할 공간</code></pre><h3 id="modelspy">models.py</h3>
<pre><code class="language-python">class GuestBook(BaseMin, Base):
    __tablename__ = &quot;guestbook&quot;

    message = Column(String(255), nullable=False)
    message_owner = Column(String(20), nullable=False)
    owner_id = Column(Integer, ForeignKey(&quot;user.id&quot;))

    book_owner = relationship(&quot;User&quot;, back_populates=&quot;guestbooks&quot;)</code></pre>
<ol>
<li>message: 방명록에 남길 메세지</li>
<li>message_owner: 방명록에 남길 이름</li>
<li>owner_id: (외래키) 추가한 회원의 식별 ID</li>
</ol>
<hr>
<h2 id="기능">기능</h2>
<h3 id="controllerpy">controller.py</h3>
<pre><code class="language-python">from fastapi import Depends, APIRouter, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.orm.session import Session
from typing import List

from app.database import get_db
from .schema import GuestBookAdd, GuestBookReturn, GuestBookUpdate
from .service import addGuestBook, findAllGuestBook, updateGuestBook

router = APIRouter()
security = HTTPBearer()

# 방명록 작성
@router.post(&quot;&quot;, response_model=GuestBookReturn, status_code=status.HTTP_201_CREATED)
async def guestbook_add(
    data: GuestBookAdd,
    cred: HTTPAuthorizationCredentials = Depends(security),
    db: Session = Depends(get_db),
):
    return await addGuestBook(data, cred, db)

# 모든 방명록 조회
@router.get(&quot;&quot;, response_model=List[GuestBookReturn], status_code=status.HTTP_200_OK)
async def guestbook(db: Session = Depends(get_db)):
    return await findAllGuestBook(db)

# 방명록 수정
@router.put(&quot;/{id}&quot;, response_model=GuestBookReturn, status_code=status.HTTP_202_ACCEPTED)
async def guestbook_update(
    data: GuestBookUpdate,
    id: int,
    cred: HTTPAuthorizationCredentials = Depends(security),
    db: Session = Depends(get_db),
):
    return await updateGuestBook(data, id, cred, db)</code></pre>
<h4 id="1-방명록-작성">1. 방명록 작성</h4>
<pre><code class="language-python">@router.post(&quot;&quot;, response_model=GuestBookReturn, status_code=status.HTTP_201_CREATED)
async def guestbook_add(
    data: GuestBookAdd,
    cred: HTTPAuthorizationCredentials = Depends(security),
    db: Session = Depends(get_db),
):
    return await addGuestBook(data, cred, db)
</code></pre>
<ol>
<li>GuestBookAdd 형태에 맞추어 Body 정보를 받는다.</li>
<li>로그인 할 때 받았던 토큰을 cred 인자로 받는다.</li>
<li><code>addGuestBook</code>에서 받아온 정보를 GuestBookReturn 형태에 맞추어 반환한다.</li>
<li>문제없이 작동했다면 반환할 때 201 상태 코드를 반환한다.</li>
</ol>
<h4 id="2-모든-방명록-조회">2. 모든 방명록 조회</h4>
<pre><code class="language-python">@router.get(&quot;&quot;, response_model=List[GuestBookReturn], status_code=status.HTTP_200_OK)
async def guestbook(db: Session = Depends(get_db)):
    return await findAllGuestBook(db)</code></pre>
<ol>
<li><code>findAllGuestBook</code>에서 받아온 정보를 GuestBookReturn으로 이루어진 List 형태에 맞춰 반환한다.</li>
<li>문제없이 작동했다면 반환할 때 200 상태 코드를 반환한다.</li>
</ol>
<h4 id="3-방명록-수정">3. 방명록 수정</h4>
<pre><code class="language-python">@router.put(&quot;/{id}&quot;, response_model=GuestBookReturn, status_code=status.HTTP_202_ACCEPTED)
async def guestbook_update(
    data: GuestBookUpdate,
    id: int,
    cred: HTTPAuthorizationCredentials = Depends(security),
    db: Session = Depends(get_db),
):
    return await updateGuestBook(data, id, cred, db)</code></pre>
<ol>
<li>GuestBookUpdate 형태에 맞추어 Body 정보를 받는다.</li>
<li>수정할 GuestBook 객체의 ID를 Path 파라미터로 받는다.</li>
<li>로그인 할 때 받았던 토큰을 cred 인자로 받는다.</li>
<li><code>updateGuestBook</code>에서 받아온 정보를 GuestBookReturn 형태에 맞추어 반환한다. </li>
<li>문제없이 작동했다면 반환할 때 202 상태 코드를 반환한다.</li>
</ol>
<hr>
<h3 id="servicepy">service.py</h3>
<pre><code class="language-python">from ..user import utils
from .utils import add_guestbook, find_all_guestbook, update_guestbook


async def addGuestBook(data, cred, db):
    decoded_dict = await utils.verify_user(cred)
    return await add_guestbook(data, decoded_dict.get(&quot;id&quot;), db)


async def findAllGuestBook(db):
    return await find_all_guestbook(db)


async def updateGuestBook(data, id, cred, db):
    decoded_dict = await utils.verify_user(cred)
    return await update_guestbook(data.message, id, decoded_dict.get(&quot;id&quot;), db)</code></pre>
<h4 id="1-방명록-생성">1. 방명록 생성</h4>
<pre><code class="language-python">async def addGuestBook(data, cred, db):
    decoded_dict = await utils.verify_user(cred)
    return await add_guestbook(data, decoded_dict.get(&quot;id&quot;), db)</code></pre>
<ol>
<li><code>verify_user</code> 함수에 받아온 토큰 값을 넣어 해독된 딕셔너리를 받아온다.</li>
<li><code>add_guestbook</code>으로 메세지와 작성자를 DB에 저장한 뒤 반환한다.</li>
</ol>
<h4 id="2-모든-방명록-조회-1">2. 모든 방명록 조회</h4>
<pre><code class="language-python">async def findAllGuestBook(db):
    return await find_all_guestbook(db)</code></pre>
<ol>
<li><code>find_all_guestbook</code>에서 받은 정보를 반환한다.</li>
</ol>
<h4 id="3-방명록-수정-1">3. 방명록 수정</h4>
<pre><code class="language-python">async def updateGuestBook(data, id, cred, db):
    decoded_dict = await utils.verify_user(cred)
    return await update_guestbook(data.message, id, decoded_dict.get(&quot;id&quot;), db)</code></pre>
<ol>
<li><code>verify_user</code> 함수에 받아온 토큰 값을 넣어 해독된 딕셔너리를 받아온다.</li>
<li><code>update_guestbook</code>에서 방명록을 수정한 뒤 정보를 반환한다.</li>
</ol>
<hr>
<h3 id="utilspy">utils.py</h3>
<pre><code class="language-python">from fastapi import HTTPException, status
from sqlalchemy import desc

from app.models import GuestBook


async def add_guestbook(data, id, db):
    try:
        row = GuestBook(message=data.message, owner_id=id, message_owner=data.message_owner)
        db.add(row)
        db.commit()

        return row
    except Exception as e:
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
            detail=f&quot;{e} occured while adding guestbook&quot;,
        )


async def find_all_guestbook(db):
    return db.query(GuestBook).order_by(desc(GuestBook.created_at)).all()


async def update_guestbook(message, id, owner_id, db):
    row = db.query(GuestBook).filter_by(id=id).first()
    if row is None:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=&quot;Guestbook Not Found&quot;,
        )
    if row.owner_id == owner_id:
        row.message = message
        db.commit()
    else:
        raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=&quot;It&#39;s not owner&quot;)

    return row</code></pre>
<h4 id="1-방명록-생성-1">1. 방명록 생성</h4>
<pre><code class="language-python">async def add_guestbook(data, id, db):
    try:
        row = GuestBook(message=data.message, owner_id=id, message_owner=data.message_owner)
        db.add(row)
        db.commit()

        return row
    except Exception as e:
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
            detail=f&quot;{e} occured while adding guestbook&quot;,
        )</code></pre>
<ol>
<li>넘어온 데이터로 models.GuestBook 객체를 생성한다.</li>
<li>DB에 추가한다.</li>
<li>Commit</li>
</ol>
<h4 id="2-모든-방명록-조회-2">2. 모든 방명록 조회</h4>
<pre><code class="language-python">async def find_all_guestbook(db):
    return db.query(GuestBook).order_by(desc(GuestBook.created_at)).all()</code></pre>
<blockquote>
<p>sqlalchemy에서 쿼리문을 작성할 때 .order_by, .options, .offset, .limit 등으로 여러 옵션을 줘서 작성할 수 있다. desc() 옵션을 추가할 시 내부 인자를 기준으로 내림차순 정렬해서 반환해준다.</p>
</blockquote>
<ol>
<li>GuestBook 테이블의 모든 row를 내림차순으로 정렬해서 반환한다.</li>
</ol>
<h4 id="3-방명록-수정-2">3. 방명록 수정</h4>
<pre><code class="language-python">async def update_guestbook(message, id, owner_id, db):
    row = db.query(GuestBook).filter_by(id=id).first()
    if row is None:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=&quot;Guestbook Not Found&quot;,
        )
    if row.owner_id == owner_id:
        row.message = message
        db.commit()
    else:
        raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=&quot;It&#39;s not owner&quot;)</code></pre>
<ol>
<li>Path 파라미터로 들어온 id로 TsItem 객체를 검색한다.</li>
<li>row가 None이라면, 즉시 400 상태 코드를 반환한다.</li>
<li>models.TsItem 객체의 owner_id(회원 식별 ID)와 토큰 해독 후 나온 ID가 일치하지 않는다면 즉시 403을 반환한다.</li>
<li>일치한다면 방명록 내용을 수정하고 Commit</li>
</ol>
<hr>
<h3 id="schemapy">schema.py</h3>
<pre><code class="language-python">from pydantic import BaseModel


class GuestBookAdd(BaseModel):
    message: str
    message_owner: str


class GuestBookReturn(GuestBookAdd):
    id: int
    owner_id: int


class GuestBookUpdate(BaseModel):
    message: str</code></pre>
<hr>
<h2 id="생각해볼-점">생각해볼 점</h2>
<blockquote>
<p>지난 번과 마찬가지로 정의된 relationship을 가지고 회원 별 방명록 확인도 구현해보는 것도 재밌을 것이다.</p>
</blockquote>
<p><strong>[참고]</strong></p>
<p>필자는 어차피 방명록인데 모두가 봐야하지 않나?는 생각에 전체 방명록만 불러오게 구현했다. 테이블 작성할 때 relationship이 적혀있는건 오늘 벨로그 글을 쓰게 되면서 알게 된 사실이다..</p>
<hr>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/d6f94dcd-b9f4-4a28-a827-d8c57aeecae4/image.png" alt=""></p>
<p>이번에도 Swagger로 보면 이렇게 잘 생성되어 있을 것이다!</p>
<p>이 때까지 글을 읽어준 독자님들께 감사하다고 말씀을 드리고 싶다. 이번 게시글까지 해서 현재 작성되어있는 API는 모두 업로드했다. 두 편 정도로 나누어 Test 코드 작성과 회고를 끝으로 이 시리즈를 마무리해보고자 한다.</p>
]]></description>
        </item>
        <item>
            <title><![CDATA[번역된 문장 관리 API]]></title>
            <link>https://velog.io/@taegong_s/%EB%B2%88%EC%97%AD%EB%90%9C-%EB%AC%B8%EC%9E%A5-%EA%B4%80%EB%A6%AC-API</link>
            <guid>https://velog.io/@taegong_s/%EB%B2%88%EC%97%AD%EB%90%9C-%EB%AC%B8%EC%9E%A5-%EA%B4%80%EB%A6%AC-API</guid>
            <pubDate>Sun, 17 Dec 2023 23:04:21 GMT</pubDate>
            <description><![CDATA[<h1 id="번역된-문장-관리">번역된 문장 관리</h1>
<h2 id="database-table-생성">Database Table 생성</h2>
<pre><code>APP/
ㄴ models.py --&gt; 작업할 공간</code></pre><h3 id="modelspy">models.py</h3>
<pre><code class="language-python"># 이전 상태에서 추가로 import 해줘야 할 라이브러리
from sqlalchemy.sql.schema import ForeignKey


class TsItem(BaseMin, Base):
    __tablename__ = &quot;tsitem&quot;

    dialect = Column(String(255), nullable=False)
    standard = Column(String(255), nullable=False)
    english = Column(String(255), nullable=False)
    chinese = Column(String(255), nullable=False)
    japanese = Column(String(255), nullable=False)
    owner_id = Column(Integer, ForeignKey(&quot;user.id&quot;))

    owner = relationship(&quot;User&quot;, back_populates=&quot;items&quot;)</code></pre>
<ol>
<li>dialect: 사투리</li>
<li>standard: 표준어</li>
<li>english: 영어</li>
<li>chinese: 중국어</li>
<li>japanse: 일본어</li>
<li>owner_id: (외래키) 추가한 회원의 식별 ID</li>
</ol>
<hr>
<h2 id="기능">기능</h2>
<h3 id="controllerpy">controller.py</h3>
<pre><code class="language-python">from fastapi import Depends, APIRouter, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.orm.session import Session
from typing import List

from app.database import get_db
from .schema import TsItemAdd, TsItem, TsItemDelete
from .service import addTsItem, findTsItems, deleteTsItem

router = APIRouter()
security = HTTPBearer()


# TsItem 테이블의 모든 row 로드
@router.get(&quot;&quot;, response_model=List[TsItem])
async def get_item_list(db: Session = Depends(get_db)):
    return await findTsItems(db)


# TsItem 테이블에 Item 추가
@router.post(&quot;&quot;, response_model=TsItem, status_code=status.HTTP_201_CREATED)
async def add_item(
    data: TsItemAdd,
    cred: HTTPAuthorizationCredentials = Depends(security),
    db: Session = Depends(get_db),
):
    return await addTsItem(data, cred, db)


# TsItem 테이블에서 Item 제거
@router.delete(&quot;/{id}&quot;, response_model=TsItemDelete, status_code=status.HTTP_202_ACCEPTED)
async def delete_item(
    id: int,
    cred: HTTPAuthorizationCredentials = Depends(security),
    db: Session = Depends(get_db),
):
    return await deleteTsItem(id, cred, db)</code></pre>
<h4 id="1-tsitem-조회">1. TsItem 조회</h4>
<pre><code class="language-python">@router.get(&quot;&quot;, response_model=List[TsItem])
async def get_item_list(db: Session = Depends(get_db)):
    return await findTsItems(db)</code></pre>
<ol>
<li><code>findtsItems</code> 에서 받아온 정보를 TsItem으로 이루어진 List 형태에 맞춰 반환한다.</li>
<li>문제없이 작동했다면 반환할 때 200 상태 코드를 반환한다.</li>
</ol>
<h4 id="2-tsitem-생성">2. TsItem 생성</h4>
<pre><code class="language-python">@router.post(&quot;&quot;, response_model=TsItem, status_code=status.HTTP_201_CREATED)
async def add_item(
    data: TsItemAdd,
    cred: HTTPAuthorizationCredentials = Depends(security),
    db: Session = Depends(get_db),
):
    return await addTsItem(data, cred, db)</code></pre>
<ol>
<li>TsItemAdd 형태에 맞추어 Body 정보를 받는다.</li>
<li>로그인 할 때 받았던 토큰을 cred 인자로 받는다.</li>
<li><code>addTsItem</code>에서 받아온 정보를 TsItem 형식에 맞게 반환한다.</li>
<li>문제없이 작동했다면 반환할 때 201 상태 코드를 반환한다.</li>
</ol>
<h4 id="3-tsitem-삭제">3. TsItem 삭제</h4>
<pre><code class="language-python">@router.delete(&quot;/{id}&quot;, response_model=TsItemDelete, status_code=status.HTTP_202_ACCEPTED)
async def delete_item(
    id: int,
    cred: HTTPAuthorizationCredentials = Depends(security),
    db: Session = Depends(get_db),
):
    return await deleteTsItem(id, cred, db)</code></pre>
<ol>
<li>Path 파라미터로 id 값을 받는다.</li>
<li>로그인 할 때 받았던 토큰을 cred 인자로 받는다.</li>
<li><code>deleteTsItem</code>에서 받아온 정보를 TsItem 형식에 맞게 반환한다.</li>
<li>문제없이 작동했다면 반환할 때 202 상태 코드를 반환한다.</li>
</ol>
<hr>
<h3 id="servicepy">service.py</h3>
<pre><code class="language-python">from ..user import utils
from .utils import add_tsitem, find_tsitems, delete_tsitem


async def addTsItem(data, cred, db):
    decoded_dict = await utils.verify_user(cred)
    row = await add_tsitem(data, decoded_dict.get(&quot;id&quot;), db)

    return row


async def findTsItems(db):
    return await find_tsitems(db)


async def deleteTsItem(id, cred, db):
    decoded_dict = await utils.verify_user(cred)
    return await delete_tsitem(id, decoded_dict.get(&quot;id&quot;), db)</code></pre>
<h4 id="1-tsitem-추가">1. TsItem 추가</h4>
<pre><code class="language-python">async def addTsItem(data, cred, db):
    decoded_dict = await utils.verify_user(cred)
    row = await add_tsitem(data, decoded_dict.get(&quot;id&quot;), db)

    return row</code></pre>
<blockquote>
<p>user/utils.py 의 <code>verify_user</code>에서 반환되는 딕셔너리 객체에는 JWT 토큰을 인코딩할 때 사용되었던 정보들이 들어있다. ex) email, id</p>
</blockquote>
<ol>
<li><code>verify_user</code> 함수에 받아온 토큰 값을 넣어 해독된 딕셔너리를 받아온다.</li>
<li><code>add_tsitem</code>으로 사투리,표준어,...,일본어를 DB에 저장한다.</li>
<li>반환된 정보를 반환한다</li>
</ol>
<h4 id="2-tsitem들-반환">2. TsItem들 반환</h4>
<pre><code class="language-python">async def findTsItems(db):
    return await find_tsitems(db)</code></pre>
<ol>
<li><code>find_tsitems</code> 함수에서 반환된 정보를 반환한다.</li>
</ol>
<h4 id="3-tsitem-삭제-1">3. TsItem 삭제</h4>
<pre><code class="language-python">async def deleteTsItem(id, cred, db):
    decoded_dict = await utils.verify_user(cred)
    return await delete_tsitem(id, decoded_dict.get(&quot;id&quot;), db)</code></pre>
<ol>
<li><code>verify_user</code> 함수에 받아온 토큰 값을 넣어 해독된 딕셔너리를 받아온다.</li>
<li><code>delete_tsitem</code> 함수로 TsItem 테이블에서 row를 삭제한 뒤 정보를 반환한다.</li>
</ol>
<hr>
<h3 id="utilspy">utils.py</h3>
<pre><code class="language-python">from fastapi import HTTPException, status

from app.models import TsItem


async def add_tsitem(data, id, db):
    try:
        row = TsItem(**data.dict(), owner_id=id)
        db.add(row)
        db.commit()

        return row
    except Exception as e:
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
            detail=f&quot;{e} occured while adding tsitem&quot;,
        )


async def find_tsitems(db):
    row = db.query(TsItem).all()

    return row


async def delete_tsitem(id, user_id, db):
    row = db.query(TsItem).filter_by(id=id).first()
    if row is None:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=&quot;No Item Found&quot;,
        )
    if row.owner_id == user_id:
        db.delete(row)
        db.commit()
    else:
        raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=&quot;It&#39;s not owner&quot;)

    return row</code></pre>
<h4 id="1-tsitem-추가-1">1. TsItem 추가</h4>
<pre><code class="language-python">async def add_tsitem(data, id, db):
    try:
        row = TsItem(**data.dict(), owner_id=id)
        db.add(row)
        db.commit()

        return row
    except Exception as e:
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
            detail=f&quot;{e} occured while adding tsitem&quot;,
        )</code></pre>
<ol>
<li>넘어온 딕셔너리 객체를 풀어서 models.TsItem 객체를 생성한다. 여기서 owner_id는 외래키이다.</li>
<li>DB의 TsItem 테이블에 추가한다.</li>
<li>Commit</li>
</ol>
<h4 id="2-tsitem들-반환-1">2. TsItem들 반환</h4>
<pre><code class="language-python">async def find_tsitems(db):
    row = db.query(TsItem).all()

    return row</code></pre>
<blockquote>
<p>all() 메소드로 query를 사용하면 리스트 형태로 반환된다.</p>
</blockquote>
<ol>
<li>TsItem 테이블의 모든 row를 반환한다.</li>
</ol>
<h4 id="3-tsitem-삭제-2">3. TsItem 삭제</h4>
<pre><code class="language-python">async def delete_tsitem(id, user_id, db):
    row = db.query(TsItem).filter_by(id=id).first()
    if row is None:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=&quot;No Item Found&quot;,
        )
    if row.owner_id == user_id:
        db.delete(row)
        db.commit()
    else:
        raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=&quot;It&#39;s not owner&quot;)

    return row</code></pre>
<blockquote>
<p>sqlalchemy에서 row를 찾지못하면 None 값을 반환한다.</p>
</blockquote>
<ol>
<li>Path 파라미터로 들어온 id로 TsItem 객체를 검색한다.</li>
<li>row가 None이라면, 즉시 400 상태 코드를 반환한다.</li>
<li>models.TsItem 객체의 owner_id(회원 식별 ID)와 토큰 해독 후 나온 ID가 일치하지 않는다면 403 상태 코드를 반환한다.</li>
<li>일치한다면 객체를 테이블에서 삭제하고 Commit</li>
</ol>
<hr>
<h3 id="schemapy">schema.py</h3>
<pre><code class="language-python">from pydantic import BaseModel


class TsItemAdd(BaseModel):
    dialect: str
    standard: str
    english: str
    chinese: str
    japanese: str


class TsItem(TsItemAdd):
    id: int
    owner_id: int

    class Config:
        from_attributes = True


class TsItemDelete(BaseModel):
    id: int
</code></pre>
<hr>
<h2 id="생각해볼-점">생각해볼 점</h2>
<blockquote>
<p>생성한 Table에는 owner 라는 relationship으로 회원과 TsItem이 일대다 관계를 맺고 있지만, 위에 작성된 엔드포인트 중에서 relationship을 사용해서 query문을 작성한 적이 없다. 이는 모든 TsItem을 불러오는 엔드포인트만 존재하기 때문인데, 이 글을 읽고 있는 독자(있나?)가 있다면 이 부분을 생각해보면서 회원 별로 생성한 TsItem을 불러오는 엔드포인트를 작성해보면 좋을 것 같다.</p>
</blockquote>
<p><strong>[참고]</strong></p>
<p>필자가 모든 TsItem을 불러오는 엔드포인트로 작성한 이유는, 캡스톤디자인 경연대회 당일날에 다른 사람들은 어떤 문장을 번역해봤을지 늦게 온 사람들도 확인해볼 수 있도록 하기 위해서였다.</p>
<hr>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/67651747-0bcf-4cd6-ba0a-596b18c068ee/image.png" alt=""></p>
<p>위의 내용들을 잘 따라왔다면 사진처럼 Swagger에서 확인이 될 것이다.
이번에도 필자는 리팩토링 이 후 사진이라 V2로 태깅이 되어있다.</p>
<p>다음 게시글에서는 방명록 관련해서 작성하겠다.</p>
]]></description>
        </item>
        <item>
            <title><![CDATA[사투리 변환 API]]></title>
            <link>https://velog.io/@taegong_s/NLP-%EB%AA%A8%EB%8D%B8%EC%9D%84-%ED%99%9C%EC%9A%A9%ED%95%9C-%EC%BA%A1%EC%8A%A4%ED%86%A4-%EB%94%94%EC%9E%90%EC%9D%B8-8</link>
            <guid>https://velog.io/@taegong_s/NLP-%EB%AA%A8%EB%8D%B8%EC%9D%84-%ED%99%9C%EC%9A%A9%ED%95%9C-%EC%BA%A1%EC%8A%A4%ED%86%A4-%EB%94%94%EC%9E%90%EC%9D%B8-8</guid>
            <pubDate>Sun, 17 Dec 2023 20:16:01 GMT</pubDate>
            <description><![CDATA[<blockquote>
<p>지난 포스트에서는 회원가입 및 로그인 부분을 진행했다.
이번에는 사투리를 번역하는 엔드포인트를 작성해보자.</p>
</blockquote>
<hr>
<h1 id="기능-개발에-앞서--">기능 개발에 앞서 -</h1>
<p>몇 가지 변경점이 있다.</p>
<h3 id="1-userutilspy-에-함수-추가">1. user/utils.py 에 함수 추가</h3>
<pre><code class="language-python">async def verify_user(cred):
    token = cred.credentials
    try:
        jwt_dict = jwt.decode(token, settings.SECRET_KEY, settings.ALGORITHM)
        if jwt_dict:
            return jwt_dict
    except ExpiredSignatureError:
        raise HTTPException(401, &quot;Expired&quot;)</code></pre>
<ol>
<li>토큰 정보를 추출한다.</li>
<li>JWT 토큰을 서버의 SECRET_KEY와 ALGORITHM으로 해독한다.</li>
<li>제대로 해독되었다면 해독한 딕셔너리 결과값을 반환한다.</li>
<li>만료된 토큰이라면 401 상태코드를 즉시 반환한다.</li>
</ol>
<h3 id="2-aitestpy-변경">2. AI/test.py 변경</h3>
<pre><code class="language-python">from transformers import pipeline
import requests

from app.AI import config
from app.config import settings

nlg_pipeline = pipeline(
    &quot;text2text-generation&quot;, model=config.model_path, tokenizer=config.model_name
)


def generate_text(text, num_return_sequences=1, max_length=60):
    target_style_name = &quot;표준어&quot;
    text = f&quot;{target_style_name} 말투로 변환:{text}&quot;
    out = nlg_pipeline(text, num_return_sequences=num_return_sequences, max_length=max_length)
    return [x[&quot;generated_text&quot;] for x in out]

# Papago API로 번역하는 함수
def translate_with_papago(text, source_lang=&quot;ko&quot;, target_lang=&quot;en&quot;):
    url = &quot;https://openapi.naver.com/v1/papago/n2mt&quot;

    headers = {
        &quot;Content-Type&quot;: &quot;application/x-www-form-urlencoded; charset=UTF-8&quot;,
        &quot;X-Naver-Client-Id&quot;: settings.CLIENT_ID,
        &quot;X-Naver-Client-Secret&quot;: settings.CLIENT_SECRET,
    }

    data = {&quot;source&quot;: source_lang, &quot;target&quot;: target_lang, &quot;text&quot;: text}

    response = requests.post(url, headers=headers, data=data)
    translation_result = response.json()
    translated_text = translation_result[&quot;message&quot;][&quot;result&quot;][&quot;translatedText&quot;]

    return translated_text</code></pre>
<blockquote>
<ol>
<li>변경 전 코드에서는 deep-translator 라이브러리를 사용했었지만, 번역 품질이 매우 좋지 않았다.
따라서 PAPAGO API를 사용해서 표준어로 변환한 문장을 각각 영어,일본어,중국어로 다시 번역하는 방법으로 변경했다.</li>
<li>test.py를 직접 실행했을 때 수행되는 코드 또한 삭제되어있는데, 테스트 코드 작성 후 테스트 해볼 때 포함 안 되는게 거슬려서 삭제했다. 삭제하지 않아도 무방하다. </li>
</ol>
</blockquote>
<p>papago api 사용법은 <a href="https://developers.naver.com/docs/papago/README.md">https://developers.naver.com/docs/papago/README.md</a> 을 참고하자.</p>
<blockquote>
<p>또한 .env 파일에 CLIENT_ID와 CLIENT_SECRET도 추가해줘야 한다!</p>
</blockquote>
<h4 id="성능-차이">성능 차이</h4>
<ol>
<li>Papago</li>
</ol>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/ff3b05a9-b6f6-4d21-a2ec-80f50102dfcb/image.png" alt=""></p>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/1770cb45-6c60-4092-b140-28651af48e45/image.png" alt=""></p>
<p>시간을 측정할 때는 영어로 번역하는 부분만 시간을 측정했다.</p>
<ol start="2">
<li>deep-translator</li>
</ol>
<p>같은 문장 (너 오늘 많이 먹었네)를 스크립트로 돌려보겠다.</p>
<pre><code class="language-python">from deep_translator import GoogleTranslator
from time import time

start = time()
target_text = &quot;너 오늘 많이 먹었네.&quot;

print(f&quot;번역 전 : {target_text}&quot;)
print(f&quot;번역 후 :{GoogleTranslator(source=&#39;ko&#39;, traget=&#39;en&#39;).translate(target_text)}&quot;)
print(f&quot;Deep-Translator 경과시간 : {time() - start}&quot;)
</code></pre>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/b039861b-ba90-4802-b987-29b63216548d/image.png" alt=""></p>
<p>deep-translator를 사용하는 것보다 papago를 사용하는게 시간 측면에서 유리했다. 안 그래도 NLP모델을 CPU 환경에서 사용중인데 최대한 부담을 줄여주는게 좋아보였다.</p>
<hr>
<h1 id="번역">번역</h1>
<pre><code>APP/
ㄴ API/
  ㄴ translate/ --&gt; 작업할 공간
    ㄴ service.py 
    ㄴ controller.py
    ㄴ schema.py</code></pre><h3 id="v1__init__py"><code>v1/__init__.py</code></h3>
<pre><code class="language-python">from fastapi import APIRouter

from .user import controller as user
from .translate import controller as translate

router = APIRouter()

router.include_router(user.router, prefix=&quot;/users&quot;, tags=[&quot;User&quot;])
router.include_router(translate.router, prefix=&quot;/AI&quot;, tags=[&quot;AI&quot;])</code></pre>
<p>먼저 <code>__init__.py</code>에 우리가 작성할 router를 포함시켜주자.</p>
<hr>
<h3 id="controllerpy">controller.py</h3>
<pre><code class="language-python">from fastapi import Depends, APIRouter
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer

from .schema import Translated, ToTranslate
from .service import itemTranslate

router = APIRouter()
security = HTTPBearer()

# 사투리 -&gt; 표준어 및 다국어로 변환 후 반환
@router.post(&quot;&quot;, response_model=Translated)
async def translate_item(
    data: ToTranslate,
    cred: HTTPAuthorizationCredentials = Depends(security),
):
    return await itemTranslate(data, cred)</code></pre>
<ol>
<li>ToTranslate 형태에 맞게 Body를 받는다.</li>
<li>로그인 할 때 받았던 토큰을 cred 인자로 받는다.</li>
<li>itemTranslate에서 반환된 정보를 Translated 형식에 맞게 반환한다.</li>
<li>문제없이 작동했다면 반환할 때 200 상태 코드를 반환한다.</li>
</ol>
<hr>
<h3 id="servicepy">service.py</h3>
<pre><code class="language-python">from app.AI import test
from ..user import utils


async def itemTranslate(data, cred):
    if await utils.verify_user(cred):
        translated_text = test.generate_text(data.dialect)[0]  # Translation
        english_text = test.translate_with_papago(translated_text)
        chinese_text = test.translate_with_papago(translated_text, target_lang=&quot;zh-CN&quot;)
        japanese_text = test.translate_with_papago(translated_text, target_lang=&quot;ja&quot;)

        return {
            &quot;dialect&quot;: data.dialect,
            &quot;standard&quot;: translated_text,
            &quot;english&quot;: english_text,
            &quot;chinese&quot;: chinese_text,
            &quot;japanese&quot;: japanese_text,
        }</code></pre>
<ol>
<li>JWT 토큰을 검증한다.</li>
<li>translate_with_papago 함수로 표준어를 영어, 중국어, 일본어로 번역한다.</li>
<li>딕셔너리 형태로 사투리, 표준어, 영어, 중국어, 일본어를 반환한다.</li>
</ol>
<hr>
<h3 id="schemapy">schema.py</h3>
<pre><code class="language-python">
from pydantic import BaseModel


class ToTranslate(BaseModel):
    dialect: str


class Translated(ToTranslate):
    standard: str
    english: str
    chinese: str
    japanese: str
</code></pre>
<hr>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/881bf05a-7196-4e25-97cf-092a4d389cf6/image.png" alt=""></p>
<p>위의 내용들을 잘 따라왔다면 Swagger에서 확인이 될 것이다.
이번에도 필자는 리팩토링 이후 사진이라 V2로 태깅이 되어있다.</p>
<p>유의할 점은 Swagger에서 이 엔드포인트를 사용하려고 할 때 옆의 자물쇠 버튼을 클릭해서 Login하고 헤더에 들어있는 토큰 값을 넣어줘야하니 잘 기억해놓도록 하자.</p>
]]></description>
        </item>
        <item>
            <title><![CDATA[회원가입 및 로그인 API]]></title>
            <link>https://velog.io/@taegong_s/NLP-%EB%AA%A8%EB%8D%B8%EC%9D%84-%ED%99%9C%EC%9A%A9%ED%95%9C-%EC%BA%A1%EC%8A%A4%ED%86%A4-%EB%94%94%EC%9E%90%EC%9D%B8-7</link>
            <guid>https://velog.io/@taegong_s/NLP-%EB%AA%A8%EB%8D%B8%EC%9D%84-%ED%99%9C%EC%9A%A9%ED%95%9C-%EC%BA%A1%EC%8A%A4%ED%86%A4-%EB%94%94%EC%9E%90%EC%9D%B8-7</guid>
            <pubDate>Sun, 17 Dec 2023 01:29:50 GMT</pubDate>
            <description><![CDATA[<h1 id="기능-개발에-앞서--">기능 개발에 앞서 -</h1>
<h3 id="폴더-구조">폴더 구조</h3>
<blockquote>
<p>DOMAIN
ㄴ service.py --&gt; 비즈니스 로직이 작성되는 스크립트
ㄴ controller.py --&gt; 엔드포인트들이 작성되는 스크립트
ㄴ utils.py --&gt; 비즈니스 로직에서 사용되는 함수들이 작성되는 스크립트
ㄴ schema.py --&gt; 입출력 형태를 검증하는 class들이 작성되는 스크립트</p>
</blockquote>
<p>위의 형태로 각 도메인들을 채워나갈 것이다.</p>
<hr>
<h3 id="fastapi-간단-설명">FastAPI 간단 설명</h3>
<pre><code class="language-python">from fastapi import FastAPI
from pydantic import BaseModel

class ItemsReturn(BaseModel):
    start: int
    limit: int

class AddItem(BaseModel):
    name: str
    price: int

# FastAPI 앱 설정
app = FastAPI()

# app에 &#39;get&#39; 메소드로 http://{host}:{port}/hello에 접근하겠다.
@app.get(&quot;/hello&quot;)
async def print_hello():
    return {&quot;message&quot;: &quot;Hello, World!&quot;}

# app에 &#39;get&#39; 메소드로 http://{host}:{port}/{id}에 접근하겠다.
@app.get(&quot;/{id}&quot;)
async def print_id(id: int):
    return {&quot;id&quot;: id}

# app에 &#39;post&#39; 메소드로 http://{host}:{port}/items에 접근하겠다.
# 반환되는 상태 코드를 201로 하겠다.
@app.post(&quot;/items&quot;,status_code=201)
# 받아오는 Body 형태를 AddItem에 맞추겠다.
async def add_item(item: AddItem):
    return {&quot;name&quot;: item.name, &quot;price&quot;: item.price}

# app에 &#39;get&#39; 메소드로
# http://{host}:{port}/items/?start={start}&amp;limit={limit}에 접근하겠다.
# 반환 형태를 ItemsReturn에 맞추겠다.
# 반환되는 상태 코드를 200으로 하겠다.
@app.get(&quot;/items/&quot;, response_model=ItemsReturn, status_code=200)
async def print_item(start: int, limit: int):
    return {&quot;start&quot;: start, &quot;limit&quot;: limit}</code></pre>
<p>이를 main.py로 저장하고</p>
<p>콘솔창에서 아래 구문을 실행시켜보자.</p>
<pre><code>uvicorn main:app --host 0.0.0.0</code></pre><p>정상적으로 작성했다면, FastAPI 서버가 실행되었을 것이다.
<a href="http://localhost:8000/docs">http://localhost:8000/docs</a> 로 들어가면 자동으로 Swagger UI가 생성되어있는 것을 볼 수 있다.</p>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/dcf92481-ec6c-4d23-8fc2-0bd30cb6446f/image.png" alt=""></p>
<hr>
<h3 id="우리-프로젝트에서는">우리 프로젝트에서는?</h3>
<pre><code>API
ㄴ __init__.py
ㄴ v1
    ㄴ __init__.py</code></pre><p>이 <code>__init__.py</code>에다가 APIRouter로 router를 만들어 main에서 선언한 app에 추가하는 방식으로 엔드포인트를 선언하여 유지보수를 더 쉽게 할 수 있다.</p>
<h4 id="api__init__py">api/<code>__init__.py</code></h4>
<pre><code class="language-python">from fastapi import APIRouter

from . import v1

router = APIRouter()
router.include_router(v1.router, prefix=&quot;/v1&quot;)</code></pre>
<h4 id="v1__init__py">v1/<code>__init__.py</code></h4>
<pre><code class="language-python">from fastapi import APIRouter

from .user import controller as user

router = APIRouter()

router.include_router(user.router, prefix=&quot;/users&quot;, tags=[&quot;User&quot;])</code></pre>
<blockquote>
<p>prefix --&gt; router에 포함되어 있는 엔드포인트 앞에 붙여줄 전치사
tags --&gt; Swagger에서 tag 아래로 정렬해준다</p>
</blockquote>
<hr>
<h1 id="회원가입-및-로그인">회원가입 및 로그인</h1>
<h3 id="왜-회원가입이">왜 회원가입이..?</h3>
<blockquote>
<p>경상도 사투리를 번역해서 보여주는 웹사이트에 회원가입 및 로그인 기능이 반드시! 필요하냐라고 물어보면 자신있게 예쓰라고 대답하긴 힘들지만, 추가하고자 하는 기능 중에 하나(<del>아직 구현하지 않았다</del>)가 사용자의 피드백을 통해서 추가 학습까지 가능한 파이프라인을 구축하는 것이기 때문에 익명으로 받아오는 정보를 사용하기보다는 회원가입으로 추적 가능한 데이터를 사용하는게 더 나을 것 같아서 구현해보았다.</p>
</blockquote>
<hr>
<h2 id="database-table-생성">Database Table 생성</h2>
<pre><code>APP/
ㄴ models.py --&gt; 작업할 공간</code></pre><h3 id="modelspy">models.py</h3>
<pre><code class="language-python"># 이전 상태에서 추가로 import 해줘야 할 라이브러리들.
from sqlalchemy import String, Boolean
from sqlalchemy.orm import relationship

class User(BaseMin, Base):
    __tablename__ = &quot;user&quot;

    email = Column(String(30), nullable=False, unique=True)
    password = Column(String(255), nullable=False)
    is_provider = Column(Boolean, default=True)
    items = relationship(&quot;TsItem&quot;, back_populates=&quot;owner&quot;)
    guestbooks = relationship(&quot;GuestBook&quot;, back_populates=&quot;book_owner&quot;)

    def as_dict(self):
        return {column.name: getattr(self, column.name) for column in self.__table__.columns}</code></pre>
<p>User 테이블은 BaseMin도 상속받았기 때문에, id, created_at, updated_at은 자동으로 생성된다. </p>
<blockquote>
<p>nullable=False --&gt; 반드시 존재해야 하는 값
unique=True --&gt; 테이블에서 하나만 존재해 하는 값
default=True --&gt; 별도로 지정해주지 않았을 때 기본으로 저장되는 값</p>
</blockquote>
<ol>
<li>email: 로그인 및 회원가입 시 식별하기 위한 이메일</li>
<li>password: 해쉬화되어 DB에 저장된다</li>
<li>is_provider: 위에서 언급한 파이프라인 구축을 위한 피드백 제공 여부</li>
<li>items: 후에 작성할 TsItem에 대해서 일대다 관계를 형성한다.</li>
<li>guestbooks: 후에 작성할 guestbooks에 대해서 일대다 관계를 형성한다.</li>
<li>as_dict -&gt; models.User 객체를 딕셔너리 형태로 반환하는 함수</li>
</ol>
<hr>
<h2 id="기능">기능</h2>
<pre><code>APP/
ㄴ API/
  ㄴ USER/ --&gt; 작업할 공간
    ㄴ service.py 
    ㄴ controller.py
    ㄴ utils.py
    ㄴ schema.py</code></pre><h3 id="controllerpy">controller.py</h3>
<pre><code class="language-python">from fastapi import APIRouter, status, Depends
from sqlalchemy.orm.session import Session

from app.database import get_db
from .schema import UserAdd, UserAddReturn, DuplicatedEmail
from .service import userAdd, userLogin, emailDuplicated

router = APIRouter()

# 회원가입
@router.post(&quot;/register&quot;, response_model=UserAddReturn, status_code=status.HTTP_201_CREATED)
async def add_user(data: UserAdd, db: Session = Depends(get_db)):
    return await userAdd(data, db)

# 로그인
@router.post(&quot;/login&quot;)
async def issue_token(data: UserAdd, db: Session = Depends(get_db)):
    return await userLogin(data, db)

# 이메일 중복 여부 확인
@router.get(&quot;/duplicated&quot;, status_code=status.HTTP_200_OK)
async def is_duplicated(data: DuplicatedEmail = Depends(), db: Session = Depends(get_db)):
    return await emailDuplicated(data, db)
</code></pre>
<blockquote>
<p>Depends()</p>
<p>엔드포인트에서 매개변수로 Depends로 함수를 전달해주면,
FastAPI가 자동으로 함수를 실행시켜 엔드포인트에 전달해준다.</p>
</blockquote>
<p>database.py의 get_db를 살펴보도록 하자.</p>
<pre><code class="language-python">def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()</code></pre>
<p>database.py의 윗줄에서 생성한 SessionLocal의 인스턴스를 생성해서 넘겨준다. 우리는 이 세션을 통해서 MySQL과 통신할 것이다!</p>
<h4 id="1-회원가입">1. 회원가입</h4>
<pre><code class="language-python">### 회원가입 ###
@router.post(&quot;/register&quot;, response_model=UserAddReturn, status_code=status.HTTP_201_CREATED)
async def add_user(data: UserAdd, db: Session = Depends(get_db):
    return await userAdd(data, db)</code></pre>
<ol>
<li>UserAdd 형태에 맞게 Body를 받는다.</li>
<li>userAdd 함수에서 반환된 값을 UserAddReturn 형태에 맞추어 반환한다.</li>
<li>문제없이 작동했다면 반환할 때 201 상태 코드를 반환한다.</li>
</ol>
<h4 id="2-로그인">2. 로그인</h4>
<pre><code class="language-python">@router.post(&quot;/login&quot;)
async def issue_token(data: UserAdd, db: Session = Depends(get_db)):
    return await userLogin(data, db)</code></pre>
<ol>
<li>UserAdd 형태에 맞게 Body를 받는다.</li>
<li>userLogin에서 반환된 값을 반환한다.</li>
<li>문제없이 작동했다면 반환할 때 200 상태 코드를 반환한다.</li>
</ol>
<h4 id="3-닉네임-중복-확인">3. 닉네임 중복 확인</h4>
<pre><code class="language-python">@router.get(&quot;/duplicated&quot;, status_code=status.HTTP_200_OK)
async def is_duplicated(data: DuplicatedEmail = Depends(), db: Session = Depends(get_db)):
    return await emailDuplicated(data, db)</code></pre>
<ol>
<li>DuplicatedEmail 형태에 맞게 Query 파라미터를 받는다.
( Pydantic Class를 Depends로 받으면 Query 파라미터로 받을 수 있다.)</li>
<li>emailDuplicated에서 반환된 값을 반환한다.</li>
<li>문제없이 작동했다면 반환할 때 200 상태 코드를 반환한다.</li>
</ol>
<hr>
<h3 id="servicepy">service.py</h3>
<pre><code>pip install bcrypt</code></pre><pre><code class="language-python">import bcrypt
from fastapi.responses import JSONResponse
from .utils import (
    add_user,
    find_user_by_email,
    create_access_token,
    is_password_correct,
    is_duplicated,
)


async def userAdd(data, db):
    salt_value = bcrypt.gensalt()
    pw = bcrypt.hashpw(data.password.encode(), salt_value)

    row = await add_user(data.email, pw, db)

    return row.as_dict()


async def userLogin(data, db):
    user = await find_user_by_email(data.email, db)
    if await is_password_correct(data.password, user.password):
        token, user_id = await create_access_token(user)
        return JSONResponse(content={&quot;user_id&quot;: user_id}, headers={&quot;access_token&quot;: token})


async def emailDuplicated(data, db):
    return {&quot;duplicated&quot;: await is_duplicated(data.email, db)}

</code></pre>
<h4 id="1-회원가입-1">1. 회원가입</h4>
<pre><code class="language-python">async def userAdd(data, db):
    salt_value = bcrypt.gensalt()
    pw = bcrypt.hashpw(data.password.encode(), salt_value)

    row = await add_user(data.email, pw, db)

    return row.as_dict()</code></pre>
<ol>
<li>bcrypt 라이브러리를 활용해 솔트값을 생성한다.</li>
<li>body에 담겨온 password를 솔트값으로 해쉬화한다.</li>
<li>add_user 함수로 email, 해쉬화된 비밀번호를 DB에 저장한다.</li>
<li>반환된 models.User 객체를 딕셔너리로 바꾸어 반환한다.</li>
</ol>
<h4 id="2-로그인-1">2. 로그인</h4>
<pre><code class="language-python">async def userLogin(data, db):
    user = await find_user_by_email(data.email, db)
    if await is_password_correct(data.password, user.password):
        token, user_id = await create_access_token(user)
        return JSONResponse(content={&quot;user_id&quot;: user_id}, headers={&quot;access_token&quot;: token})
</code></pre>
<ol>
<li><code>find_user_by_email</code> 함수에서 body에 담겨온 email로 models.User 객체를 반환한다.</li>
<li><code>is_password_correct</code> 함수로 비밀번호가 일치하는지 확인한다.</li>
<li><code>create_access_token</code> 함수로 JWT 토큰과 user_id를 반환한다.</li>
<li>토큰은 Header에 user_id는 콘텐트에 담아 반환한다.</li>
</ol>
<blockquote>
<p>JWT 토큰에 관련된 설명은 <a href="https://velog.io/@chuu1019/%EC%95%8C%EA%B3%A0-%EC%93%B0%EC%9E%90-JWTJson-Web-Token">https://velog.io/@chuu1019/%EC%95%8C%EA%B3%A0-%EC%93%B0%EC%9E%90-JWTJson-Web-Token</a> 이곳에 잘 되어 있으니 확인해보길 바란다.</p>
</blockquote>
<h4 id="3-닉네임-중복-확인-1">3. 닉네임 중복 확인</h4>
<pre><code class="language-python">async def emailDuplicated(data, db):
    return {&quot;duplicated&quot;: await is_duplicated(data.email, db)}</code></pre>
<ol>
<li><code>is_duplicated</code> 함수에서 body에 담겨온 email로 중복 여부를 판단해 boolean값을 딕셔너리에 담아 반환한다.</li>
</ol>
<hr>
<h3 id="utilspy">utils.py</h3>
<pre><code class="language-python">import bcrypt

from datetime import datetime, timedelta
from fastapi import HTTPException, status
from jose import jwt

from app.config import settings
from app.models import User
from .schema import UserPayload


async def add_user(email, pw, db):
    try:
        row = User(**{&quot;email&quot;: email, &quot;password&quot;: pw})
        db.add(row)
        db.commit()

        return row
    except Exception as e:
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
            detail=f&quot;{e} occured while registering&quot;,
        )


async def find_user_by_email(email, db):
    row = db.query(User).filter_by(email=email).first()
    if row:
        return row
    else:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail=&quot;No Email Found&quot;,
        )


async def is_password_correct(data, user):
    if bcrypt.checkpw(data.encode(), user.encode()):
        return True
    else:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail=&quot;Incorrect Password&quot;,
        )


async def create_access_token(user):
    user_schema = user.as_dict()
    expire = datetime.utcnow() + timedelta(days=1)
    user_info = UserPayload(**user_schema, exp=expire)

    return (
        jwt.encode(user_info.dict(), settings.SECRET_KEY, algorithm=settings.ALGORITHM),
        user_schema[&quot;id&quot;],
    )


async def is_duplicated(email, db):
    if db.query(User).filter_by(email=email).first():
        return True
    else:
        return False
</code></pre>
<h4 id="1-db에-유저-추가">1. DB에 유저 추가</h4>
<pre><code class="language-python">async def add_user(email, pw, db):
    try:
        row = User(**{&quot;email&quot;: email, &quot;password&quot;: pw})
        db.add(row)
        db.commit()

        return row
    except Exception as e:
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
            detail=f&quot;{e} occured while registering&quot;,
        )</code></pre>
<ol>
<li>email과 해쉬화된 비밀번호로 models.User 객체를 생성한다.</li>
<li>DB의 User 테이블에 추가한다.</li>
<li>Commit</li>
</ol>
<pre><code class="language-python">async def find_user_by_email(email, db):
    row = db.query(User).filter_by(email=email).first()
    if row:
        return row
    else:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail=&quot;No Email Found&quot;,
        )</code></pre>
<ol>
<li>email로 User 테이블에서 models.User 객체를 검색한다.</li>
<li>존재한다면 (row is not None) 객체 반환</li>
<li>존재하지 않다면 403 상태 코드를 즉시 반환한다.</li>
</ol>
<h4 id="2-비밀번호-확인">2. 비밀번호 확인</h4>
<pre><code class="language-python">async def is_password_correct(data, user):
    if bcrypt.checkpw(data.encode(), user.encode()):
        return True
    else:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail=&quot;Incorrect Password&quot;,
        )</code></pre>
<ol>
<li>Body의 비밀번호와 models.User 객체의 비밀번호가 일치하는지 확인한다.</li>
<li>일치한다면 True 반환</li>
<li>일치하지 않다면 403 상태 코드를 즉시 반환한다.</li>
</ol>
<h4 id="3-jwt-토큰-생성">3. JWT 토큰 생성</h4>
<pre><code class="language-python">async def create_access_token(user):
    user_schema = user.as_dict()
    expire = datetime.utcnow() + timedelta(days=1)
    user_info = UserPayload(**user_schema, exp=expire)

    return (
        jwt.encode(user_info.dict(), settings.SECRET_KEY, algorithm=settings.ALGORITHM),
        user_schema[&quot;id&quot;],
    )</code></pre>
<ol>
<li>models.User 객체를 딕셔너리화 한다.</li>
<li>토큰의 만료기간을 정의한다.</li>
<li>위 두 개를 합쳐 토큰화 할 정보를 생성한다.</li>
<li>.env 파일에 정의된 SECRET_KEY와 ALGORITHM으로 토큰을 생성한 뒤 user_id와 함께 반환한다.</li>
</ol>
<blockquote>
<p>지난번 .env 파일에는 SECRET_KEY와 ALGORITHM이 정의되어 있지 않으므로 추가해줘야 할 것이다.</p>
</blockquote>
<h4 id="4-닉네임-중복-확인">4. 닉네임 중복 확인</h4>
<pre><code class="language-python">async def is_duplicated(email, db):
    if db.query(User).filter_by(email=email).first():
        return True
    else:
        return False</code></pre>
<ol>
<li>Query 파라미터로 받아온 email을 가진 models.User 객체가 존재한다면 True 반환</li>
<li>그렇지 않다면 False 반환</li>
</ol>
<hr>
<h3 id="schemapy">schema.py</h3>
<pre><code class="language-python">from datetime import datetime
from pydantic import BaseModel


class DuplicatedEmail(BaseModel):
    email: str


class UserAdd(DuplicatedEmail):
    password: str


class UserAddReturn(DuplicatedEmail):
    id: int


class UserPayload(UserAddReturn):
    exp: datetime
</code></pre>
<hr>
<p><img src="https://velog.velcdn.com/images/taegong_s/post/2113839c-2735-45b2-ab09-ccfd242f537f/image.png" alt=""></p>
<p>위의 내용들을 잘 따라왔다면 사진처럼 Swagger에서 확인이 될 것이다.
필자는 리팩토링을 거치고 난 뒤의 모습이라 UserV2 태그의 v2로 보이지만, 게시글들을 잘 따라왔다면 User 태그의 /v1 prefix로 보일 것이다.
테스트 코드는 API들을 작성한 뒤 마지막에 한 번에 쭉쭉 업로드하겠다.</p>
]]></description>
        </item>
        <item>
            <title><![CDATA[Pre-Commit 설정]]></title>
            <link>https://velog.io/@taegong_s/NLP-%EB%AA%A8%EB%8D%B8%EC%9D%84-%ED%99%9C%EC%9A%A9%ED%95%9C-%EC%BA%A1%EC%8A%A4%ED%86%A4-%EB%94%94%EC%9E%90%EC%9D%B8-6</link>
            <guid>https://velog.io/@taegong_s/NLP-%EB%AA%A8%EB%8D%B8%EC%9D%84-%ED%99%9C%EC%9A%A9%ED%95%9C-%EC%BA%A1%EC%8A%A4%ED%86%A4-%EB%94%94%EC%9E%90%EC%9D%B8-6</guid>
            <pubDate>Sat, 16 Dec 2023 23:18:58 GMT</pubDate>
            <description><![CDATA[<p>뒤에 내용들을 블로그에 게시하기 전에, 너무 코드들이 깔끔하지 못하고 더러워보여서 미약하게나마 리팩토링을 진행한다고 조금 글이 늦어졌다.</p>
<h1 id="pre-commit">Pre-Commit</h1>
<h3 id="pre-commit이란">Pre-Commit이란?</h3>
<p>깃허브와 같은 코드 저장소에 커밋을 수행하기 전에 포맷팅이나 린팅이 잘 되어 있는지 확인해주는 도구라고 볼 수 있다. 리팩토링을 진행하면서 코드 가독성을 높이기 위해 도입해보았다.</p>
<pre><code>pip install pre-commit
pre-commit install</code></pre><p>먼저 pre-commit을 설치해준 뒤</p>
<pre><code>.vscode/
 ㄴ settings.json
pyproject.toml
.flake8
.pre-commit-config.yaml</code></pre><p>최상위 깃에 위 파일들을 생성한다. </p>
<h4 id="settingsjson">settings.json</h4>
<pre><code># settings.json
{
&quot;python.linting.flake8Enabled&quot;: true,
  &quot;python.linting.flake8Args&quot;: [
      &quot;--max-line-length=100&quot;
  ],
  &quot;python.formatting.provider&quot;: &quot;black&quot;,
  &quot;python.formatting.blackArgs&quot;: [&quot;--line-length&quot;, &quot;100&quot;]
}</code></pre><h4 id="flake8">.flake8</h4>
<pre><code># .flake8
[flake8]
exclude =
    .git,
    .gitignore,
    *.pot,
    *.py[co],
    __pycache__,
    venv,
    .env

ignore =
    E121,
    E126,
    E127,
    E128,
    E203,
    E225,
    E226,
    E231,
    E241,
    E251,
    E261,
    E265,
    E302,
    E303,
    E305,
    E402,
    E501,
    E741,
    W291,
    W292,
    W293,
    W391,
    W503,
    W504,
    F403,
    B007,
    B950,

max-line-length = 100</code></pre><h4 id="pyprojecttoml">.pyproject.toml</h4>
<pre><code># .pyproject.toml

[tool.black]
line-length = 100
target-version = [&#39;py311&#39;]
exclude = &#39;&#39;&#39;
  \.git
  | \.hg
  | \.mypy_cache
  | \.tox
  | \.venv
  | _build
  | buck-out
  | build
  | dist
&#39;&#39;&#39;

[tool.flake8]
max-line-length = 100</code></pre><h4 id="pre-commit-configyaml">.pre-commit-config.yaml</h4>
<pre><code># .pre-commit-config.yaml

repos:
  - repo: https://github.com/pre-commit/pre-commit-hooks
    rev: v4.4.0
    hooks:
      - id: trailing-whitespace
      - id: check-yaml
      - id: check-json

  - repo: https://github.com/psf/black
    rev: 23.9.1
    hooks:
      - id: black
        name: black
        description: &quot;Black: The uncompromising Python code formatter&quot;
        entry: black
        language: python
        minimum_pre_commit_version: 2.9.2
        require_serial: true
        types_or: [python, pyi]

  - repo: https://github.com/pycqa/flake8
    rev: 6.1.0
    hooks:
      - id: flake8</code></pre><p>이렇게 설정 파일들을 각각 작성해주고 난 뒤 vscode에서 저장하면 black 포맷터가 자동으로 파일을 포맷팅해준다.</p>
<p>이 후 pre-commit을 설치한 가상환경을 activate하고 git commit 했을 때 pre-commit이 제대로 작동하는 것을 확인할 수 있다.</p>
<p><strong>[예시]</strong>
<img src="https://velog.velcdn.com/images/taegong_s/post/37192a90-f32f-4934-ba93-0abf88c4d8cb/image.png" alt=""></p>
]]></description>
        </item>
        <item>
            <title><![CDATA[FastAPI 서버 환경 세팅]]></title>
            <link>https://velog.io/@taegong_s/NLP-%EB%AA%A8%EB%8D%B8%EC%9D%84-%ED%99%9C%EC%9A%A9%ED%95%9C-%EC%BA%A1%EC%8A%A4%ED%86%A4-%EB%94%94%EC%9E%90%EC%9D%B8-5</link>
            <guid>https://velog.io/@taegong_s/NLP-%EB%AA%A8%EB%8D%B8%EC%9D%84-%ED%99%9C%EC%9A%A9%ED%95%9C-%EC%BA%A1%EC%8A%A4%ED%86%A4-%EB%94%94%EC%9E%90%EC%9D%B8-5</guid>
            <pubDate>Fri, 08 Dec 2023 22:37:25 GMT</pubDate>
            <description><![CDATA[<h1 id="backend">BACKEND</h1>
<p>지난 번 게시글까지 해서 프로젝트에 사용될 AI모델을 학습하였다.
이제부터는 AI모델을 파이썬 백엔드에 붙여서 간단하게 API화 해보도록 하자.</p>
<h2 id="fastapi">FastAPI</h2>
<p>이번에 내가 선택한 웹프레임워크는 FastAPI이다!
Django를 사용해서 이번 프로젝트를 진행하기에는 너무 가벼운 서비스였고,
Flask보다 FastAPI가 차후에 더 널리 쓰일 웹프레임워크라 생각해서 이를 선택하기로 했다.</p>
<blockquote>
<p><del>사실 백엔드로 진행해보는 첫 번째 프로젝트이다..</del></p>
</blockquote>
<h3 id="예상-폴더-구조">예상 폴더 구조</h3>
<pre><code>MAIN
ㄴ APP/
  ㄴ AI/
  ㄴ API/
    ㄴ V1/
      ㄴ UTILS/
        ㄴ utils.py
      apis...
  ㄴ config.py
  ㄴ database.py
  ㄴ main.py
  ㄴ schemas.py
ㄴ SAVED_MODEL/
  ㄴ model_weights..
ㄴ dataset/
  ㄴ data.tsv
ㄴ .env
ㄴ server.py</code></pre><h3 id="프로젝트-생성">프로젝트 생성</h3>
<p>FastAPI를 사용하기 위해서 두 라이브러리를 설치해주자
uvicorn은 ASGI 서버를 구동하기 위한 라이브러리며, 비동기를 지원하는 FastAPI를 실행하기 위해 설치해준다.</p>
<pre><code>pip install fastapi uvicorn sqlalchemy</code></pre><h4 id="1-serverpy">1. server.py</h4>
<pre><code class="language-python"># server.py
# 서버 구동 시 실행해줄 파이썬 스크립트

import uvicorn

if __name__ == &quot;__main__&quot;:
    # 서버 구동 이후 디버깅을 위해 reload=True 옵션을 추가한다    
    uvicorn.run(&quot;app.main:app&quot;, host=&quot;0.0.0.0&quot;, port=8000, reload=True)
</code></pre>
<h4 id="2-appmainpy">2. app/main.py</h4>
<pre><code class="language-python"># app/main.py

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

from app import api

app = FastAPI()

# 프론트를 다른 PC에서 구동할 예정이어서 CORS 에러 방지를 위해
# 미들웨어를 추가해줬다.
app.add_middleware(
    CORSMiddleware,
    allow_origins={&quot;*&quot;},
    allow_credentials=True,
    allow_methods={&quot;OPTIONS&quot;, &quot;GET&quot;, &quot;POST&quot;},
    allow_headers={&quot;*&quot;},
)

# 서버 최초 구동 시에 데이터베이스와 서버를 연결하는 작업
@app.on_event(&quot;startup&quot;)
def on_startup():
  from app import models
  from app.database import engine

  models.Base.metadata.create_all(bind=engine)

# 라우터를 포함하는 작업으로 엔드포인트들을 연결해준다
app.include_router(api.router)</code></pre>
<h4 id="수정">[수정]</h4>
<p>FastAPI에서 on_event 사용을 지양하고 있습니다.
<a href="https://velog.io/@taegong_s/series/%EC%8B%AD%EC%9E%90%EB%A7%90%ED%92%80%EC%9D%B4-%EA%B2%8C%EC%9E%84">https://velog.io/@taegong_s/series/%EC%8B%AD%EC%9E%90%EB%A7%90%ED%92%80%EC%9D%B4-%EA%B2%8C%EC%9E%84</a> 이 시리즈에서 서버 세팅 글에 lifespan으로 같은 일을 하는 부분을 포스팅 해놨으니 참고하시길 바랍니다. 제가 폴더 구조 설계부터 많은 발전이 있었는데 최근 프로젝트 게시글을 보시는게 좋을 듯하여 따로 수정글을 남깁니다.</p>
<h4 id="3-env-환경변수">3. .env 환경변수</h4>
<p>.env 파일을 최상위 폴더에 생성하고 gitingnore에 추가해서 따로 관리하자</p>
<pre><code>DB_USERNAME={USERNAME}
DB_HOST={HOST}
DB_PASSWORD={PASSWORD}
DB_NAME={NAME}
DB_PORT={PORT}</code></pre><h4 id="4-appconfigpy">4. app/config.py</h4>
<pre><code class="language-python"># app/config.py
import os
from dotenv import load_dotenv
from functools import lru_cache

load_dotenv()

class Settings():
  DB_USERNAME = os.environ.get(&quot;DB_USERNAME&quot;)
  DB_HOST = os.environ.get(&quot;DB_HOST&quot;)
  DB_PASSWORD = os.environ.get(&quot;DB_PASSWORD&quot;)
  DB_NAME = os.environ.get(&quot;DB_NAME&quot;)
  DB_PORT = int(os.environ.get(&quot;DB_PORT&quot;))

@lru_cache
def get_settings():
    return Settings()

settings = get_settings()</code></pre>
<p>lru_cache 데코레이터를 사용하면, 이 후에 get_settings 함수의 결과를 캐싱해서 빠르게 반환할 수 있도록 도와준다</p>
<h4 id="5-appdatabasepy">5. app/database.py</h4>
<p>SQLAlchemy ORM을 사용해서 파이썬 코드를 통해 MySQL에 접근할  수 있도록 세팅하는 스크립트다.</p>
<pre><code class="language-python"># app/database.py
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

from app.config import settings

# MySQL과 연결할 Engine 생성
engine = create_engine(
  &quot;mysql+pymysql://{username}:{password}@{host}:{port}/{name}&quot;.format(
    username=settings.DB_USERNAME,
    password=settings.DB_PASSWORD,
    host=settings.DB_HOST,
    port=settings.DB_PORT,
    name=settings.DB_NAME,
  )
)

# SQLAlchemy가 데이터베이스와 상호작용할 때 사용하는 Session생성
SessionLocal = sessionmaker(
    bind=engine,
    autocommit=False,
    autoflush=False,
)

# 이 후 데이터베이스의 테이블을 정의할 때 이 Base를 상속받는다.
Base = declarative_base()

# 엔드포인트에서 Session 객체를 생성할 때 종속성 주입으로 이 함수의 리턴 값을 전달한다.
def get_db():
  db = SessionLocal()
  try:
    yield db
  finally:
    db.close()
</code></pre>
<h4 id="6-appmodelspy">6. app/models.py</h4>
<p>MySQL의 테이블을 생성하는 코드를 작성하는 스크립트다.</p>
<pre><code class="language-python"># app/models.py
from sqlalchemy import Column, Integer, DateTime, func
from app.database import Base

# 테이블의 기본이 되는 인덱스, 생성시각, 변경시각을 사전 정의해두자
class BaseMin:
  id = Column(Integer, primary_key=True, index=True)
  created_at = Column(DateTime, nullable=False, default=func.utc_timestamp())
  updated_at = Column(DateTime, nullable=False, default=func.utc_timestamp(),
                      onupdate=func.utc_timestamp())

### 프로젝트를 진행하면서 필요한 테이블들을 추가하자</code></pre>
<h4 id="7-appschemaspy">7. app/schemas.py</h4>
<p>Pydantic 라이브러리를 이용하면 Response_model이나 엔드포인트에서 필요한 인자의 타입을 검증할 수 있다. 이에 필요한 class들을 정의하는 스크립트다.</p>
<pre><code class="language-python"># app/schemas.py
from pydantic import BaseModel

# 프로젝트에서 사용하지는 않지만 예시로 하나 작성해두었다.
### 프로젝트를 진행하면서 필요한 클래스들을 추가하자
class Example(BaseModel):
    id: int
    name: str</code></pre>
<p>이 후 개발을 위한 환경설정을 해주었다. 앞으로 필요한 기능을 하나씩 추가해보면서 프로젝트를 완성해보자!</p>
<h1 id="수정-1">[수정]</h1>
<p>리팩토링을 거치면서 위에서 작성한 폴더 구조도 다르고, 주요 코드들도 바뀌었다.
이후 게시글들과 <strong><em><a href="https://github.com/fnzksxl/capston-design">https://github.com/fnzksxl/capston-design</a></em></strong> 의 v2 API들을 참고해주길 바란다.</p>
]]></description>
        </item>
        <item>
            <title><![CDATA[모델 학습 및 테스트]]></title>
            <link>https://velog.io/@taegong_s/NLP-%EB%AA%A8%EB%8D%B8%EC%9D%84-%ED%99%9C%EC%9A%A9%ED%95%9C-%EC%BA%A1%EC%8A%A4%ED%86%A4-%EB%94%94%EC%9E%90%EC%9D%B8-4</link>
            <guid>https://velog.io/@taegong_s/NLP-%EB%AA%A8%EB%8D%B8%EC%9D%84-%ED%99%9C%EC%9A%A9%ED%95%9C-%EC%BA%A1%EC%8A%A4%ED%86%A4-%EB%94%94%EC%9E%90%EC%9D%B8-4</guid>
            <pubDate>Wed, 06 Dec 2023 03:06:03 GMT</pubDate>
            <description><![CDATA[<h1 id="학습">학습</h1>
<h3 id="모델-파라미터-설정">모델 파라미터 설정</h3>
<pre><code class="language-python"># config.py
from transformers import AutoModelForSeq2SeqLM,AutoTokenizer

# 모델 학습 후에 가중치 저장할 폴더
model_path=&#39;./saved_model&#39;
# 모델 초기 가중치 로드할 곳
model_name = &quot;gogamza/kobart-base-v2&quot;
# 데이터셋 파일
data_root=&#39;./dataset&#39;

# 내가 학습한 모델 가중치의 유무에 따라 분기 처리
if os.path.exists(f&#39;{model_path}/pytorch_model.bin&#39;):
  print(&quot;Use Customized Model&quot;)
  model = AutoModelForSeq2SeqLM.from_pretrained(model_path)
else:
  print(&quot;Use Pretrained Model&quot;)
  model = AutoModelForSeq2SeqLM.from_pretrained(model_name)
tokenizer = AutoTokenizer.from_pretrained(model_name)

# Training Arguments
args={
  num_train_epochs=3
  per_device_train_batch_size=32
  per_device_eval_batch_size=32

  # 저장 폴더에 가중치 덮어쓰기 하는 옵션
  overwrite_output_dir=True

  # 몇 번의 학습 스텝이 지난 후에 평가를 수행할지 결정
  # 하나의 배치를 처리했을 때가 하나의 스텝이라 볼 수 있음
  eval_steps=3000 

  # 몇 번의 학습 스텝이 지난 후에 체크포인트를 진행할 지 결정
  # 중간에 학습이 끊기더라도 다시 진행하기 용이함
  save_steps=3000

  # LR(Learning Rate) 스케쥴러에서 사용할 스텝 수
  # Warmup을 통해 LR을 조절해 파라미터 업데이트의 폭을 조절함
  warmup_steps=300

  # 평가 수행 주기를 Epoch와 Steps중 Steps로 선택
  evaluation_strategy=&quot;steps&quot;

  # 메모리 관리를 위해 예측값에 대한 손실만 계산하게 하는 옵션
  prediction_loss_only=True

  # 학습 중 저장되는 체크포인트의 최대 개수
  save_total_limit=3
}</code></pre>
<h3 id="모델-학습-코드">모델 학습 코드</h3>
<pre><code class="language-python"># train.py
from transformers import Seq2SeqTrainingArguments,Seq2SeqTrainer,\
                         DataCollatorForSeq2Seq
import dataset
import config

train_dataset,test_dataset = dataset.make_dataset(
    config.data_root, config.tokenizer
)

# 각 배치를 모델 학습에 알맞은 형태로 바꿔주는 역할
data_collator = DataCollatorForSeq2Seq(
    tokenizer=config.tokenizer, model=config.model
)

training_args = Seq2SeqTrainingArguments(
    **config.args
    output_dir=config.model_path,
    )

trainer = Seq2SeqTrainer(
    model=config.model,
    args=training_args,
    data_collator=data_collator,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
)

# 모델 학습 진행
try:
  trainer.train()
except Exception as e:
  print(f&quot;Failed to train model caused by {e}&quot;)

try:
  trainer.save_model(config.model_path)
  print(&quot;Model saved successfully.&quot;)
except Exception as e:
  print(f&quot;Failed to save model caused by {e}&quot;)</code></pre>
<p>예전에 작성 했던 코드들을 다시 되돌아보며 velog에 작성하고 있는데,
왜 이랬지? 싶은 부분들은 수정해서 업로드하고 있지만 기능적으로 빠진 부분들은 당장 추가하기에 어렵다.</p>
<p>또한 모델 학습 중 Evaluation 되는 수치들에 대한 정보가 없어 이를 참고용으로 올리지 못하는 것도 아쉽다.</p>
<p><del>다음에 AI로 학습할 일이 있다면 학습 결과는 꼭 따로 보관해야겠다..</del></p>
<h3 id="현재-폴더-구조">(현재 폴더 구조)</h3>
<pre><code>AI Folder
ㄴ config.py
ㄴ train.py
ㄴ saved_model/
   ㄴ (saved_model_weights_files)
ㄴ dataset/
   ㄴ data.tsv</code></pre><blockquote>
<p><strong>위 코드까지 문제없이 돌아갔다면, saved_model 폴더에 가중치가 잘 저장되었을 것이다!</strong></p>
</blockquote>
<h3 id="테스트">테스트</h3>
<p>표준어로 바뀐 사투리를 영어로도 바꾸기 위해 라이브러리를 하나 설치해주자!</p>
<pre><code>pip install deep-translater</code></pre><pre><code class="language-python"># test.py
from transformers import pipeline
from deep_translator import GoogleTranslator

nlg_pipeline=pipeline(&#39;text2text-generation&#39;,model=config.model_path,tokenizer=config.model_name)
</code></pre>
<pre><code class="language-python">def generate_text(pipe, text, num_return_sequences, max_length):
  target_style_name = &quot;표준어&quot;
  text = f&quot;{target_style_name} 말투로 변환:{text}&quot;
  out = pipe(text, num_return_sequences=num_return_sequences, max_length=max_length)
  return [x[&#39;generated_text&#39;] for x in out]</code></pre>
<p>num_return_sequences의 값에 따라서 반환되는 텍스트의 개수가 바뀐다
만약 3으로 지정했다면 길이가 3인 리스트에 담겨서 값이 반환될 것임!</p>
<pre><code class="language-python">def get_koen_text(target_text_ko):
  return GoogleTranslator(source=&#39;ko&#39;,target=&#39;en&#39;).translate(target_text_ko)</code></pre>
<pre><code class="language-python">print(&quot;Write &#39;q&#39; to exit&quot;)
while True:
  src_text=input(&quot;Dialect to translate : &quot;)
  if src_text == &#39;q&#39;:
    break
  target_text_ko=generate_text(nlg_pipeline,src_text,num_return_sequences=1,max_length=64)[0]
  target_text_en = get_koen_text(target_text_ko)
  print(f&quot;Translated Standard : {target_text_ko}&quot;)
  print(f&quot;Translated Dialect : {target_text_en}&quot;)</code></pre>
<p>여기까지 잘 따라와졌다면, 여러분은 경상도 사투리를 표준어로 번역해서 영어로 보여줄 수 있는 프로그램을 작성한 것이다! 아직 시도해보지는 않았지만 AI HUB에 있는 다른 방언 데이터셋을 이용해서 비슷한 방법으로 학습한다면 다른 사투리도 충분히 표준어로 바꿀 수 있다고 생각한다.</p>
]]></description>
        </item>
    </channel>
</rss>