<?xml version="1.0" encoding="utf-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom">
    <channel>
        <title>nangman_ful.log</title>
        <link>https://velog.io/</link>
        <description>안녕하세요</description>
        <lastBuildDate>Mon, 23 Mar 2026 17:07:44 GMT</lastBuildDate>
        <docs>https://validator.w3.org/feed/docs/rss2.html</docs>
        <generator>https://github.com/jpmonette/feed</generator>
        <image>
            <title>nangman_ful.log</title>
            <url>https://velog.velcdn.com/images/nangman_ful/profile/997b7380-308e-4d43-b6d9-eeadb3a81cfb/image.png</url>
            <link>https://velog.io/</link>
        </image>
        <copyright>Copyright (C) 2019. nangman_ful.log. All rights reserved.</copyright>
        <atom:link href="https://v2.velog.io/rss/nangman_ful" rel="self" type="application/rss+xml"/>
        <item>
            <title><![CDATA[인간만이 사용할 수 있는 프로그래밍 언어?]]></title>
            <link>https://velog.io/@nangman_ful/%EC%9D%B8%EA%B0%84%EB%A7%8C%EC%9D%B4-%EC%82%AC%EC%9A%A9%ED%95%A0-%EC%88%98-%EC%9E%88%EB%8A%94-%ED%94%84%EB%A1%9C%EA%B7%B8%EB%9E%98%EB%B0%8D-%EC%96%B8%EC%96%B4</link>
            <guid>https://velog.io/@nangman_ful/%EC%9D%B8%EA%B0%84%EB%A7%8C%EC%9D%B4-%EC%82%AC%EC%9A%A9%ED%95%A0-%EC%88%98-%EC%9E%88%EB%8A%94-%ED%94%84%EB%A1%9C%EA%B7%B8%EB%9E%98%EB%B0%8D-%EC%96%B8%EC%96%B4</guid>
            <pubDate>Mon, 23 Mar 2026 17:07:44 GMT</pubDate>
            <description><![CDATA[<p>학부 수업에서 PL 교수님이 흥미로운 과제를 수강생들에게 제안하셨다.</p>
<blockquote>
<p>** AI는 사용하지 못하지만 학생들은 사용할 수 있는 Programming Language를 만들어 오면 Grade Letter를 바꿔드리겠습니다**</p>
</blockquote>
<p>이때 드는 의문... </p>
<h3 id="_--내가-할-수-있을까_">_ &emsp;&emsp;&emsp;&emsp; 내가 할 수 있을까?_</h3>
<hr>
<p>프로그래밍 언어는 <strong>절차형 언어 *<em>와 *</em>함수형 언어</strong>로 나눌 수 있을 것이다.
그러나 둘의 차이점을 설명하기 이전에 둘 다 AI는 다룰 수 있다는 것인데, 그것은 </p>
<h3 id="고정된-규칙이-있고-그것을-ai가-이해하기-때문이다">고정된 규칙이 있고, 그것을 AI가 이해하기 때문이다.</h3>
<p>따라서 개념적으로 보면, 내가 만들어야 하는 언어는</p>
<ul>
<li><ol>
<li>고정된 규칙이 없다.</li>
</ol>
</li>
<li><ol start="2">
<li>규칙은 있으나 AI가 이해할 수 없다.</li>
</ol>
</li>
</ul>
<p>가 선택지가 될 것이다. </p>
<h2 id="그러나">그러나</h2>
<h3 id="고정된-규칙이-없다-와-프로그래밍-언어가-양립할까">&#39;고정된 규칙이 없다&#39; 와 &#39;프로그래밍 언어&#39;가 양립할까?</h3>
<p>혹은</p>
<h3 id="ai가-이해-못하는-규칙은-자연어로-이루어진-룰북이-없어야-하는가">AI가 이해 못하는 규칙은 자연어로 이루어진 룰북이 없어야 하는가?</h3>
<p>라는 고민에 도달하게 되었다.</p>
<hr>
<p>솔직히 안될 거 같은데, 시간이 나면 클로드 잡도리로 클로드가 못 푸는 언어를 만들어 봐야겠다..</p>
]]></description>
        </item>
        <item>
            <title><![CDATA[PostgreSQL 쿼리 최적화 2편: LATERAL JOIN의 함정과 해결]]></title>
            <link>https://velog.io/@nangman_ful/PostgreSQL-%EC%BF%BC%EB%A6%AC-%EC%B5%9C%EC%A0%81%ED%99%94-2%ED%8E%B8-LATERAL-JOIN%EC%9D%98-%ED%95%A8%EC%A0%95%EA%B3%BC-%ED%95%B4%EA%B2%B0</link>
            <guid>https://velog.io/@nangman_ful/PostgreSQL-%EC%BF%BC%EB%A6%AC-%EC%B5%9C%EC%A0%81%ED%99%94-2%ED%8E%B8-LATERAL-JOIN%EC%9D%98-%ED%95%A8%EC%A0%95%EA%B3%BC-%ED%95%B4%EA%B2%B0</guid>
            <pubDate>Fri, 19 Dec 2025 04:26:03 GMT</pubDate>
            <description><![CDATA[<h2 id="이전-글-요약">이전 글 요약</h2>
<p><a href="https://velog.io/@nangman_ful/PostgreSQL-%EC%BF%BC%EB%A6%AC-%EC%B5%9C%EC%A0%81%ED%99%94-250%EA%B0%9C-%EC%BF%BC%EB%A6%AC%EB%A5%BC-1%EA%B0%9C%EB%A1%9C-%EC%A4%84%EC%9D%B4%EA%B8%B0">1편</a>에서 250개의 쿼리를 1개로 줄이는 최적화를 진행했습니다.</p>
<pre><code class="language-sql">-- 250개 좌표에 대해 LATERAL JOIN으로 한 번에 조회
WITH target_coords AS (
    SELECT * FROM (VALUES (&#39;11010&#39;, 37.5, 127.0), ...)
    AS t(region_code, target_lat, target_lng)
)
SELECT DISTINCT ON (tc.region_code, hr.target_year)
    tc.region_code,
    hr.target_year,
    hr.score
FROM target_coords tc
CROSS JOIN LATERAL (
    SELECT target_year, score
    FROM hazard_results
    WHERE risk_type = %s
    AND target_year BETWEEN %s AND %s  -- 예: 2025 ~ 2100
    ORDER BY (거리 계산) ASC
    LIMIT 1
) hr
ORDER BY tc.region_code, hr.target_year</code></pre>
<p>성능은 극적으로 개선되었지만... <strong>새로운 문제가 발생했습니다.</strong></p>
<h2 id="문제-발견-각-지역이-1개-연도-데이터만-반환">문제 발견: 각 지역이 1개 연도 데이터만 반환</h2>
<h3 id="요구사항">요구사항</h3>
<ul>
<li>각 행정구역마다 <strong>2025, 2030, 2035, ..., 2095, 2100년</strong> (16개 연도) 데이터 필요</li>
<li>클라이언트는 연도별 시계열 그래프를 그려야 함</li>
</ul>
<h3 id="실제-결과">실제 결과</h3>
<pre><code class="language-json">{
  &quot;regionScores&quot;: {
    &quot;11010&quot;: { &quot;2025&quot;: 33.1 },  // ❌ 2025년만!
    &quot;11020&quot;: { &quot;2025&quot;: 30.9 },  // ❌ 2025년만!
    &quot;11030&quot;: { &quot;2025&quot;: 33.5 }   // ❌ 2025년만!
  }
}</code></pre>
<h3 id="기대한-결과">기대한 결과</h3>
<pre><code class="language-json">{
  &quot;regionScores&quot;: {
    &quot;11010&quot;: {
      &quot;2025&quot;: 33.1,
      &quot;2030&quot;: 35.2,
      &quot;2035&quot;: 37.5,
      // ... 중략
      &quot;2100&quot;: 55.8
    }
  }
}</code></pre>
<h2 id="원인-분석-lateral-join의-동작-방식">원인 분석: LATERAL JOIN의 동작 방식</h2>
<h3 id="문제의-쿼리-구조">문제의 쿼리 구조</h3>
<pre><code class="language-sql">FROM target_coords tc              -- 250개 행
CROSS JOIN LATERAL (
    SELECT target_year, score
    FROM hazard_results
    WHERE risk_type = &#39;extreme_heat&#39;
    AND target_year BETWEEN &#39;2025&#39; AND &#39;2100&#39;  -- 🔴 여기가 문제
    ORDER BY (거리 계산) ASC
    LIMIT 1                        -- 🔴 각 지역당 1개만!
) hr</code></pre>
<p><strong>LATERAL JOIN의 동작:</strong></p>
<ol>
<li><code>target_coords</code>의 각 행(지역)마다 서브쿼리 실행</li>
<li><code>BETWEEN &#39;2025&#39; AND &#39;2100&#39;</code> 범위 내에서</li>
<li>거리 기준으로 정렬해서</li>
<li><strong><code>LIMIT 1</code>로 가장 가까운 데이터 1개만 선택</strong></li>
</ol>
<p><strong>문제점:</strong></p>
<ul>
<li>LATERAL은 250개 지역에 대해서만 반복 실행</li>
<li>각 지역마다 <strong>연도 구분 없이</strong> 가장 가까운 데이터 1개만 가져옴</li>
<li>그 1개가 우연히 2025년이었던 것!</li>
</ul>
<h3 id="비유로-이해하기">비유로 이해하기</h3>
<pre><code>현재 쿼리:
&quot;각 지역마다, 2025~2100년 중에서 가장 가까운 데이터 1개만 줘&quot;
→ 결과: 지역당 1개 (대부분 2025년)

원하는 동작:
&quot;각 지역마다, 각 연도별로 가장 가까운 데이터를 줘&quot;
→ 결과: 지역당 16개 (연도별로)</code></pre><h2 id="해결-방법-1-연도도-cross-join으로-추가">해결 방법 1: 연도도 CROSS JOIN으로 추가</h2>
<h3 id="핵심-아이디어">핵심 아이디어</h3>
<ul>
<li>지역만 반복하는 게 아니라</li>
<li><strong>지역 × 연도</strong> 모든 조합에 대해 LATERAL JOIN 실행</li>
</ul>
<h3 id="수정된-쿼리">수정된 쿼리</h3>
<pre><code class="language-sql">WITH target_coords AS (
    -- 250개 지역 좌표
    SELECT * FROM (VALUES
        (&#39;11010&#39;, 37.5, 127.0),
        (&#39;11020&#39;, 37.6, 127.1),
        -- ... 250개
    ) AS t(region_code, target_lat, target_lng)
),
target_years AS (
    -- 16개 연도 (NEW!)
    SELECT * FROM (VALUES
        (&#39;2025&#39;), (&#39;2030&#39;), (&#39;2035&#39;), (&#39;2040&#39;),
        (&#39;2045&#39;), (&#39;2050&#39;), (&#39;2055&#39;), (&#39;2060&#39;),
        (&#39;2065&#39;), (&#39;2070&#39;), (&#39;2075&#39;), (&#39;2080&#39;),
        (&#39;2085&#39;), (&#39;2090&#39;), (&#39;2095&#39;), (&#39;2100&#39;)
    ) AS y(year)
)
SELECT DISTINCT ON (tc.region_code, ty.year)
    tc.region_code,
    ty.year as target_year,
    hr.score
FROM target_coords tc
CROSS JOIN target_years ty          -- 🟢 연도도 CROSS JOIN!
CROSS JOIN LATERAL (
    SELECT score
    FROM hazard_results
    WHERE risk_type = %s
    AND target_year = ty.year       -- 🟢 특정 연도만 조회
    ORDER BY (
        POW(latitude - tc.target_lat::numeric, 2) +
        POW(longitude - tc.target_lng::numeric, 2)
    ) ASC
    LIMIT 1                         -- 🟢 해당 연도의 최근접 데이터 1개
) hr
ORDER BY tc.region_code, ty.year</code></pre>
<h3 id="동작-방식">동작 방식</h3>
<pre><code>Before:
┌─────────┐
│ 지역 250개 │ → LATERAL → 각 지역당 1개 데이터
└─────────┘

After:
┌─────────┐   ┌────────┐
│ 지역 250개 │ × │ 연도 16개 │ → 250 × 16 = 4,000개 조합
└─────────┘   └────────┘
      ↓
각 (지역, 연도) 조합마다 LATERAL 실행
      ↓
4,000개 데이터 (지역당 16개 연도)</code></pre><h2 id="코드-변경-내용">코드 변경 내용</h2>
<h3 id="before-1개-연도만-반환">Before (1개 연도만 반환)</h3>
<pre><code class="language-python">query_region_batch = f&quot;&quot;&quot;
    WITH target_coords AS (
        SELECT * FROM (VALUES {coords_clause})
        AS t(region_code, target_lat, target_lng)
    )
    SELECT DISTINCT ON (tc.region_code, hr.target_year)
        tc.region_code,
        hr.target_year,
        hr.{score_col} as score
    FROM target_coords tc
    CROSS JOIN LATERAL (
        SELECT target_year, {score_col}
        FROM hazard_results
        WHERE risk_type = %s
        AND target_year BETWEEN %s AND %s
        ORDER BY (거리 계산) ASC
        LIMIT 1
    ) hr
    ORDER BY tc.region_code, hr.target_year
&quot;&quot;&quot;</code></pre>
<h3 id="after-모든-연도-반환">After (모든 연도 반환)</h3>
<pre><code class="language-python"># 고정된 연도 범위 생성
fixed_years = list(range(2025, 2101, 5))  # [2025, 2030, ..., 2100]

query_region_batch = f&quot;&quot;&quot;
    WITH target_coords AS (
        SELECT * FROM (VALUES {coords_clause})
        AS t(region_code, target_lat, target_lng)
    ),
    target_years AS (
        SELECT * FROM (VALUES {&#39;,&#39;.join(&quot;(&#39;&quot; + str(y) + &quot;&#39;)&quot; for y in fixed_years)})
        AS y(year)
    )
    SELECT DISTINCT ON (tc.region_code, ty.year)
        tc.region_code,
        ty.year as target_year,
        hr.{score_col} as score
    FROM target_coords tc
    CROSS JOIN target_years ty
    CROSS JOIN LATERAL (
        SELECT {score_col}
        FROM hazard_results
        WHERE risk_type = %s
        AND target_year = ty.year
        ORDER BY (
            POW(latitude - tc.target_lat::numeric, 2) +
            POW(longitude - tc.target_lng::numeric, 2)
        ) ASC
        LIMIT 1
    ) hr
    ORDER BY tc.region_code, ty.year
&quot;&quot;&quot;</code></pre>
<h2 id="추가로-발견한-문제-타입-불일치">추가로 발견한 문제: 타입 불일치</h2>
<h3 id="에러-발생">에러 발생</h3>
<pre><code>psycopg2.errors.UndefinedFunction: operator does not exist: character varying = integer
LINE 9: AND target_year IN (2025,2030,2035,...)
        ^
HINT: No operator matches the given name and argument types.
You might need to add explicit type casts.</code></pre><h3 id="원인">원인</h3>
<ul>
<li>DB의 <code>target_year</code> 컬럼: <code>VARCHAR</code> (문자열)</li>
<li>쿼리의 값: <code>2025</code> (정수)</li>
<li>PostgreSQL은 자동 타입 변환을 하지 않음</li>
</ul>
<h3 id="해결">해결</h3>
<pre><code class="language-python"># Before (정수)
AND target_year IN ({&#39;,&#39;.join(str(y) for y in fixed_years)})
# 결과: AND target_year IN (2025,2030,2035,...)

# After (문자열로 감싸기)
AND target_year IN ({&#39;,&#39;.join(&quot;&#39;&quot; + str(y) + &quot;&#39;&quot; for y in fixed_years)})
# 결과: AND target_year IN (&#39;2025&#39;,&#39;2030&#39;,&#39;2035&#39;,...)</code></pre>
<h2 id="성능-비교">성능 비교</h2>
<table>
<thead>
<tr>
<th>방식</th>
<th>쿼리 수</th>
<th>결과 데이터</th>
<th>실행 시간</th>
<th>완성도</th>
</tr>
</thead>
<tbody><tr>
<td>원본 (Python Loop)</td>
<td>250개</td>
<td>250 × 16 = 4,000개</td>
<td>~30초</td>
<td>❌ 타임아웃</td>
</tr>
<tr>
<td>1차 최적화 (LATERAL)</td>
<td>1개</td>
<td>250개 (❌)</td>
<td>~1초</td>
<td>❌ 데이터 부족</td>
</tr>
<tr>
<td>2차 최적화 (연도 CROSS JOIN)</td>
<td>1개</td>
<td>4,000개 (✅)</td>
<td>~2초</td>
<td>✅ 완벽</td>
</tr>
</tbody></table>
<h2 id="lateral-join-사용-시-주의사항">LATERAL JOIN 사용 시 주의사항</h2>
<h3 id="1-반복-단위를-명확히-하기">1. 반복 단위를 명확히 하기</h3>
<pre><code class="language-sql">-- ❌ 잘못된 생각: &quot;LATERAL이 알아서 연도별로 반복하겠지&quot;
CROSS JOIN LATERAL (
    WHERE target_year BETWEEN &#39;2025&#39; AND &#39;2100&#39;
    LIMIT 1
)

-- ✅ 올바른 방법: &quot;반복할 것을 명시적으로 CROSS JOIN&quot;
CROSS JOIN target_years ty
CROSS JOIN LATERAL (
    WHERE target_year = ty.year
    LIMIT 1
)</code></pre>
<h3 id="2-limit의-의미-이해하기">2. LIMIT의 의미 이해하기</h3>
<pre><code class="language-sql">-- LIMIT 1의 의미:
-- &quot;각 LATERAL 실행마다 1개만 반환&quot;
-- ≠ &quot;전체 결과 중 1개만 반환&quot;

-- 지역 250개 × LATERAL LIMIT 1 = 250개 결과
-- 지역 250개 × 연도 16개 × LATERAL LIMIT 1 = 4,000개 결과</code></pre>
<h3 id="3-distinct-on-활용">3. DISTINCT ON 활용</h3>
<pre><code class="language-sql">SELECT DISTINCT ON (tc.region_code, ty.year)
    -- (지역, 연도) 조합마다 첫 번째 행만 선택
    -- 이미 LATERAL에서 LIMIT 1을 했으므로 중복 방지용</code></pre>
<h2 id="최종-결과">최종 결과</h2>
<h3 id="api-응답-예시">API 응답 예시</h3>
<pre><code class="language-json">{
  &quot;regionScores&quot;: {
    &quot;11010&quot;: {
      &quot;2025&quot;: 33.1,
      &quot;2030&quot;: 35.4,
      &quot;2035&quot;: 38.2,
      &quot;2040&quot;: 41.5,
      &quot;2045&quot;: 44.8,
      &quot;2050&quot;: 48.1,
      &quot;2055&quot;: 51.6,
      &quot;2060&quot;: 54.9,
      &quot;2065&quot;: 58.3,
      &quot;2070&quot;: 61.8,
      &quot;2075&quot;: 65.2,
      &quot;2080&quot;: 68.7,
      &quot;2085&quot;: 72.1,
      &quot;2090&quot;: 75.6,
      &quot;2095&quot;: 79.0,
      &quot;2100&quot;: 82.5
    },
    &quot;11020&quot;: { /* 16개 연도 */ },
    // ... 248개 지역 더
  },
  &quot;siteAALs&quot;: {
    &quot;uuid1&quot;: { /* 16개 연도 */ },
    &quot;uuid2&quot;: { /* 16개 연도 */ }
  }
}</code></pre>
<h2 id="학습-포인트">학습 포인트</h2>
<h3 id="1-lateral-join은-만능이-아니다">1. LATERAL JOIN은 만능이 아니다</h3>
<ul>
<li>왼쪽 테이블의 <strong>각 행마다</strong> 서브쿼리 실행</li>
<li>다차원 반복이 필요하면 <strong>명시적으로 CROSS JOIN</strong></li>
</ul>
<h3 id="2-sql은-명시적이어야-한다">2. SQL은 명시적이어야 한다</h3>
<ul>
<li>&quot;DB가 알아서 해주겠지&quot; ❌</li>
<li>&quot;내가 원하는 걸 정확히 표현&quot; ✅</li>
</ul>
<h3 id="3-쿼리-결과를-항상-검증">3. 쿼리 결과를 항상 검증</h3>
<pre><code class="language-python"># 단순히 쿼리가 성공했다고 끝이 아니라
region_rows = db.execute_query(...)

# 결과 개수를 확인
expected_count = len(REGION_COORD_MAP) * len(fixed_years)
actual_count = len(region_rows)
assert actual_count == expected_count, \
    f&quot;Expected {expected_count}, got {actual_count}&quot;</code></pre>
<h2 id="결론">결론</h2>
<p><strong>LATERAL JOIN을 사용할 때는:</strong></p>
<ol>
<li><strong>반복 단위를 명확히</strong>: 무엇을 기준으로 반복할 것인가?</li>
<li><strong>CROSS JOIN으로 명시</strong>: 다차원 반복은 명시적으로 표현</li>
<li><strong>LIMIT의 범위 이해</strong>: 각 LATERAL 실행마다의 제한</li>
<li><strong>결과 검증</strong>: 기대한 데이터 개수가 맞는지 확인</li>
</ol>
<p>250개 쿼리를 1개로 줄이는 것도 중요하지만,
<strong>올바른 결과를 반환하는 것이 더 중요합니다.</strong></p>
<h2 id="참고-자료">참고 자료</h2>
<ul>
<li><a href="https://www.postgresql.org/docs/current/queries-table-expressions.html#QUERIES-FROM">PostgreSQL CROSS JOIN</a></li>
<li><a href="https://www.postgresql.org/docs/current/queries-table-expressions.html#QUERIES-LATERAL">LATERAL JOIN 상세 예제</a></li>
<li><a href="https://www.postgresql.org/docs/current/queries-with.html">Common Table Expression (CTE) Best Practices</a></li>
</ul>
]]></description>
        </item>
        <item>
            <title><![CDATA[PostgreSQL 쿼리 최적화: 250개 쿼리를 1개로 줄이기]]></title>
            <link>https://velog.io/@nangman_ful/PostgreSQL-%EC%BF%BC%EB%A6%AC-%EC%B5%9C%EC%A0%81%ED%99%94-250%EA%B0%9C-%EC%BF%BC%EB%A6%AC%EB%A5%BC-1%EA%B0%9C%EB%A1%9C-%EC%A4%84%EC%9D%B4%EA%B8%B0</link>
            <guid>https://velog.io/@nangman_ful/PostgreSQL-%EC%BF%BC%EB%A6%AC-%EC%B5%9C%EC%A0%81%ED%99%94-250%EA%B0%9C-%EC%BF%BC%EB%A6%AC%EB%A5%BC-1%EA%B0%9C%EB%A1%9C-%EC%A4%84%EC%9D%B4%EA%B8%B0</guid>
            <pubDate>Thu, 18 Dec 2025 21:04:21 GMT</pubDate>
            <description><![CDATA[<h2 id="문제-상황">문제 상황</h2>
<p>FastAPI 기후 시뮬레이션 API에서 <strong>30초 타임아웃</strong> 문제가 발생했습니다.</p>
<ul>
<li>250개 행정구역 × 80년 × 4개 시나리오 × 9개 리스크 데이터 조회</li>
<li>API 응답 시간: <strong>30초 이상</strong></li>
<li>결과: <strong>타임아웃으로 실패</strong></li>
</ul>
<h2 id="원인-분석">원인 분석</h2>
<pre><code class="language-python"># 기존 코드 (문제)
for code, coord in REGION_COORD_MAP.items():  # 250번 반복
    target_lat = coord[&quot;lat&quot;]
    target_lng = coord[&quot;lng&quot;]

    query = &quot;&quot;&quot;
        SELECT DISTINCT ON (target_year)
            target_year, score, latitude, longitude
        FROM hazard_results
        WHERE risk_type = %s
        AND target_year BETWEEN %s AND %s
        ORDER BY target_year, (
            POW(latitude - %s, 2) + POW(longitude - %s, 2)
        ) ASC
    &quot;&quot;&quot;

    # 매번 DB에 쿼리 실행
    region_rows = db.execute_query(query, (risk_type, start_year, end_year, lat, lng))</code></pre>
<p><strong>문제점:</strong></p>
<ul>
<li>Python 루프에서 250번 DB 쿼리 실행</li>
<li>250번의 네트워크 왕복 (Round-trip)</li>
<li>각 쿼리마다 연결, 파싱, 실행 오버헤드</li>
</ul>
<h2 id="해결-방법-lateral-join을-활용한-배치-쿼리">해결 방법: LATERAL JOIN을 활용한 배치 쿼리</h2>
<h3 id="핵심-아이디어">핵심 아이디어</h3>
<ol>
<li><strong>250개 좌표를 SQL VALUES로 임시 테이블 생성</strong></li>
<li><strong>LATERAL JOIN으로 각 좌표별 최근접 데이터 한 번에 조회</strong></li>
<li><strong>1번의 쿼리로 모든 데이터 처리</strong></li>
</ol>
<h3 id="개선된-코드">개선된 코드</h3>
<pre><code class="language-python"># 1. 250개 좌표를 VALUES 절로 변환
coords_values = []
for code, coord in REGION_COORD_MAP.items():
    coords_values.append(f&quot;(&#39;{code}&#39;, {coord[&#39;lat&#39;]}, {coord[&#39;lng&#39;]})&quot;)

coords_clause = &#39;, &#39;.join(coords_values)
# 결과: &quot;(&#39;11010&#39;, 37.5, 127.0), (&#39;11020&#39;, 37.6, 127.1), ...&quot;

# 2. 단일 쿼리로 모든 지역의 모든 연도 데이터 조회
query_region_batch = f&quot;&quot;&quot;
    WITH target_coords AS (
        SELECT * FROM (VALUES {coords_clause})
        AS t(region_code, target_lat, target_lng)
    )
    SELECT DISTINCT ON (tc.region_code, hr.target_year)
        tc.region_code,
        hr.target_year,
        hr.{score_col} as score
    FROM target_coords tc
    CROSS JOIN LATERAL (
        SELECT target_year, {score_col}
        FROM hazard_results
        WHERE risk_type = %s
        AND target_year BETWEEN %s AND %s
        ORDER BY (
            POW(latitude - tc.target_lat::numeric, 2) +
            POW(longitude - tc.target_lng::numeric, 2)
        ) ASC
        LIMIT 1
    ) hr
    ORDER BY tc.region_code, hr.target_year
&quot;&quot;&quot;

# 한 번만 실행
region_rows = db.execute_query(
    query_region_batch,
    (request.hazard_type, str(request.start_year), str(request.end_year))
)</code></pre>
<h2 id="핵심-기술-설명">핵심 기술 설명</h2>
<h3 id="1-cte-common-table-expression">1. CTE (Common Table Expression)</h3>
<pre><code class="language-sql">WITH target_coords AS (
    SELECT * FROM (VALUES
        (&#39;11010&#39;, 37.5, 127.0),
        (&#39;11020&#39;, 37.6, 127.1),
        -- ... 250개
    ) AS t(region_code, target_lat, target_lng)
)</code></pre>
<ul>
<li>250개 좌표를 메모리상의 <strong>임시 테이블</strong>로 생성</li>
<li>쿼리 내에서 여러 번 참조 가능</li>
</ul>
<h3 id="2-lateral-join">2. LATERAL JOIN</h3>
<pre><code class="language-sql">FROM target_coords tc
CROSS JOIN LATERAL (
    SELECT ...
    FROM hazard_results
    WHERE ...
    ORDER BY 거리계산
    LIMIT 1
) hr</code></pre>
<p><strong>일반 JOIN vs LATERAL JOIN:</strong></p>
<table>
<thead>
<tr>
<th>구분</th>
<th>일반 JOIN</th>
<th>LATERAL JOIN</th>
</tr>
</thead>
<tbody><tr>
<td>동작</td>
<td>고정된 테이블끼리 조인</td>
<td><strong>왼쪽 행마다</strong> 서브쿼리 실행</td>
</tr>
<tr>
<td>참조</td>
<td>서브쿼리에서 외부 테이블 참조 불가</td>
<td>서브쿼리에서 <strong>tc 참조 가능</strong></td>
</tr>
<tr>
<td>용도</td>
<td>정적 조인</td>
<td>동적 계산, 최근접 검색</td>
</tr>
</tbody></table>
<p><strong>LATERAL의 장점:</strong></p>
<ul>
<li>250개 좌표 각각에 대해 최근접 hazard 데이터를 찾음</li>
<li>DB 엔진이 <strong>병렬 처리</strong> 최적화</li>
<li>Python 루프보다 훨씬 효율적</li>
</ul>
<h3 id="3-distinct-on">3. DISTINCT ON</h3>
<pre><code class="language-sql">SELECT DISTINCT ON (tc.region_code, hr.target_year)
    tc.region_code,
    hr.target_year,
    hr.score
FROM ...
ORDER BY tc.region_code, hr.target_year</code></pre>
<ul>
<li><strong>(지역코드, 연도)</strong> 조합마다 첫 번째 행만 선택</li>
<li>거리 기준 정렬 후 가장 가까운 데이터만 추출</li>
</ul>
<h2 id="비유로-이해하기">비유로 이해하기</h2>
<h3 id="기존-방식-250번-왕복">기존 방식 (250번 왕복)</h3>
<pre><code>개발자: &quot;서울 종로구 근처 데이터 찾아줘&quot;
DB: &quot;찾았어요&quot;
개발자: &quot;서울 중구 근처 데이터 찾아줘&quot;
DB: &quot;찾았어요&quot;
... 248번 더 반복</code></pre><h3 id="개선된-방식-1번-왕복">개선된 방식 (1번 왕복)</h3>
<pre><code>개발자: &quot;이 250개 좌표 근처 데이터를 한번에 찾아줘&quot;
        [서울 종로구, 서울 중구, ... 250개]
DB: &quot;250개 전부 찾아서 한 번에 줄게&quot;</code></pre><h2 id="성능-개선-결과">성능 개선 결과</h2>
<table>
<thead>
<tr>
<th>항목</th>
<th>Before</th>
<th>After</th>
<th>개선율</th>
</tr>
</thead>
<tbody><tr>
<td>쿼리 횟수</td>
<td>250회</td>
<td>1회</td>
<td><strong>99.6% 감소</strong></td>
</tr>
<tr>
<td>네트워크 왕복</td>
<td>250번</td>
<td>1번</td>
<td><strong>99.6% 감소</strong></td>
</tr>
<tr>
<td>응답 시간</td>
<td>30초+</td>
<td>1~3초</td>
<td><strong>약 10~30배 향상</strong></td>
</tr>
<tr>
<td>타임아웃</td>
<td>실패</td>
<td>성공</td>
<td><strong>문제 해결</strong></td>
</tr>
</tbody></table>
<h2 id="추가-최적화-인덱스-생성">추가 최적화: 인덱스 생성</h2>
<pre><code class="language-sql">-- 복합 인덱스로 검색 속도 향상
CREATE INDEX idx_hazard_results_risk_year_coords
ON hazard_results (risk_type, target_year, latitude, longitude);

-- 좌표 기반 검색 최적화
CREATE INDEX idx_hazard_results_coords_btree
ON hazard_results (latitude, longitude);</code></pre>
<h2 id="적용-가능한-상황">적용 가능한 상황</h2>
<p>이 패턴은 다음과 같은 경우에 유용합니다:</p>
<ol>
<li><p><strong>다중 좌표 최근접 검색</strong></p>
<ul>
<li>여러 위치의 가장 가까운 매장/시설 찾기</li>
<li>지역별 날씨/환경 데이터 조회</li>
</ul>
</li>
<li><p><strong>배치 데이터 조회</strong></p>
<ul>
<li>N개 ID에 대한 관련 데이터 조회</li>
<li>각 항목마다 조건부 서브쿼리 필요한 경우</li>
</ul>
</li>
<li><p><strong>Python 루프 → SQL 변환</strong></p>
<ul>
<li>반복문에서 매번 DB 쿼리하는 경우</li>
<li>N+1 쿼리 문제 해결</li>
</ul>
</li>
</ol>
<h2 id="주의사항">주의사항</h2>
<h3 id="1-좌표-개수-제한">1. 좌표 개수 제한</h3>
<pre><code class="language-python"># 너무 많은 좌표는 쿼리 크기 초과 가능
if len(REGION_COORD_MAP) &gt; 1000:
    # 배치를 나눠서 처리
    batch_size = 500
    for i in range(0, len(coords), batch_size):
        batch = coords[i:i+batch_size]
        # 배치별 쿼리 실행</code></pre>
<h3 id="2-sql-injection-방지">2. SQL Injection 방지</h3>
<pre><code class="language-python"># BAD: f-string으로 사용자 입력 직접 삽입
coords_clause = f&quot;(&#39;{user_input}&#39;, ...)&quot;  # 위험!

# GOOD: 고정된 데이터만 VALUES에 사용
# 동적 파라미터는 %s 사용</code></pre>
<h3 id="3-인덱스-활용">3. 인덱스 활용</h3>
<ul>
<li><code>ORDER BY POW(...)</code> 계산은 인덱스 사용 불가</li>
<li>데이터가 많으면 좌표 범위로 필터링 후 거리 계산<pre><code class="language-sql">WHERE latitude BETWEEN %s - 0.1 AND %s + 0.1
AND longitude BETWEEN %s - 0.1 AND %s + 0.1</code></pre>
</li>
</ul>
<h2 id="전체-코드-비교">전체 코드 비교</h2>
<h3 id="before-250개-쿼리">Before (250개 쿼리)</h3>
<pre><code class="language-python">region_scores_map = {}

for code, coord in REGION_COORD_MAP.items():
    target_lat = coord[&quot;lat&quot;]
    target_lng = coord[&quot;lng&quot;]

    query_region = f&quot;&quot;&quot;
        SELECT DISTINCT ON (target_year)
            target_year, {score_col} as score
        FROM hazard_results
        WHERE risk_type = %s
        AND target_year BETWEEN %s AND %s
        ORDER BY target_year, (
            POW(latitude - %s, 2) + POW(longitude - %s, 2)
        ) ASC
    &quot;&quot;&quot;

    region_rows = db.execute_query(
        query_region,
        (hazard_type, start_year, end_year, target_lat, target_lng)
    )

    if code not in region_scores_map:
        region_scores_map[code] = {}

    for row in region_rows:
        year = str(row[&#39;target_year&#39;])
        score = float(row[&#39;score&#39;] or 0.0)
        region_scores_map[code][year] = score</code></pre>
<h3 id="after-1개-쿼리">After (1개 쿼리)</h3>
<pre><code class="language-python">region_scores_map = {}

if REGION_COORD_MAP:
    # 1. 좌표 값들을 VALUES 절로 변환
    coords_values = []
    for code, coord in REGION_COORD_MAP.items():
        coords_values.append(f&quot;(&#39;{code}&#39;, {coord[&#39;lat&#39;]}, {coord[&#39;lng&#39;]})&quot;)

    coords_clause = &#39;, &#39;.join(coords_values)

    # 2. 단일 쿼리로 모든 지역의 모든 연도 데이터 조회
    query_region_batch = f&quot;&quot;&quot;
        WITH target_coords AS (
            SELECT * FROM (VALUES {coords_clause})
            AS t(region_code, target_lat, target_lng)
        )
        SELECT DISTINCT ON (tc.region_code, hr.target_year)
            tc.region_code,
            hr.target_year,
            hr.{score_col} as score
        FROM target_coords tc
        CROSS JOIN LATERAL (
            SELECT target_year, {score_col}
            FROM hazard_results
            WHERE risk_type = %s
            AND target_year BETWEEN %s AND %s
            ORDER BY (
                POW(latitude - tc.target_lat::numeric, 2) +
                POW(longitude - tc.target_lng::numeric, 2)
            ) ASC
            LIMIT 1
        ) hr
        ORDER BY tc.region_code, hr.target_year
    &quot;&quot;&quot;

    region_rows = db.execute_query(
        query_region_batch,
        (hazard_type, start_year, end_year)
    )

    # 결과를 region_scores_map에 저장
    for row in region_rows:
        code = row[&#39;region_code&#39;]
        year = str(row[&#39;target_year&#39;])
        score = float(row[&#39;score&#39;] or 0.0)

        if code not in region_scores_map:
            region_scores_map[code] = {}
        region_scores_map[code][year] = score</code></pre>
<h2 id="결론">결론</h2>
<p><strong>Python 루프에서 반복적으로 DB 쿼리를 실행하는 대신, SQL의 강력한 기능(CTE, LATERAL JOIN)을 활용하면:</strong></p>
<ul>
<li>네트워크 왕복 최소화</li>
<li>DB 엔진의 최적화 활용</li>
<li>극적인 성능 향상</li>
</ul>
<p><strong>핵심 원칙:</strong> &quot;데이터를 애플리케이션으로 가져와서 처리하지 말고, DB에서 처리해서 결과만 가져오자&quot;</p>
<h2 id="참고-자료">참고 자료</h2>
<ul>
<li><a href="https://www.postgresql.org/docs/current/queries-table-expressions.html#QUERIES-LATERAL">PostgreSQL LATERAL JOIN 공식 문서</a></li>
<li><a href="https://www.postgresql.org/docs/current/queries-with.html">Common Table Expressions (CTE)</a></li>
<li><a href="https://www.postgresql.org/docs/current/sql-select.html#SQL-DISTINCT">DISTINCT ON</a></li>
</ul>
]]></description>
        </item>
        <item>
            <title><![CDATA[APScheduler 배치 작업 Instance Hang 문제 해결]]></title>
            <link>https://velog.io/@nangman_ful/APScheduler-%EB%B0%B0%EC%B9%98-%EC%9E%91%EC%97%85-Instance-Hang-%EB%AC%B8%EC%A0%9C-%ED%95%B4%EA%B2%B0</link>
            <guid>https://velog.io/@nangman_ful/APScheduler-%EB%B0%B0%EC%B9%98-%EC%9E%91%EC%97%85-Instance-Hang-%EB%AC%B8%EC%A0%9C-%ED%95%B4%EA%B2%B0</guid>
            <pubDate>Thu, 18 Dec 2025 01:34:08 GMT</pubDate>
            <description><![CDATA[<h2 id="문제-상황">문제 상황</h2>
<p>배치 작업 실행 시 다음과 같은 경고가 발생하며 작업이 skip됨:</p>
<pre><code>2025-12-18 09:13:54 - WARNING - Execution of job &quot;P(H) Batch (Custom Trigger)&quot; skipped:
maximum number of running instances reached (1)</code></pre><p><strong>특이사항:</strong></p>
<ul>
<li>실제로 배치 계산이 실행 중이지 않음</li>
<li><code>ps aux</code> 확인 결과 배치 프로세스가 없음</li>
<li>하지만 APScheduler는 여전히 인스턴스가 실행 중이라고 판단</li>
<li>새로운 배치 작업을 실행할 수 없음</li>
</ul>
<h2 id="원인-분석">원인 분석</h2>
<h3 id="apscheduler의-instance-관리-메커니즘">APScheduler의 Instance 관리 메커니즘</h3>
<p>APScheduler는 각 Job 함수의 실행 인스턴스 수를 내부적으로 추적합니다:</p>
<pre><code class="language-python"># Job 실행 전
job._instances += 1
if job._instances &gt; job.max_instances:
    # &quot;maximum number of running instances reached&quot; 경고
    job._instances -= 1
    return

# Job 실행 완료 후 (finally 블록)
try:
    job.func(*args, **kwargs)
except:
    # 에러 로깅
finally:
    job._instances -= 1  # 반드시 실행되어야 함</code></pre>
<p><strong>핵심:</strong> <code>job._instances</code> 카운트는 <strong>Job 함수가 return될 때만</strong> 감소합니다.</p>
<h3 id="실제-발생한-문제">실제 발생한 문제</h3>
<h4 id="1-processpoolexecutor가-hang-상태에-빠짐">1. ProcessPoolExecutor가 Hang 상태에 빠짐</h4>
<p><strong>배치 코드 구조:</strong></p>
<pre><code class="language-python"># probability_timeseries_batch.py
def run_probability_batch(...):
    with ProcessPoolExecutor(max_workers=4) as executor:
        futures = {
            executor.submit(_process_task_worker, task): task
            for task in tasks
        }

        for future in as_completed(futures):
            result = future.result()  # ❌ Timeout 없음!
            # 결과 처리...</code></pre>
<p><strong>문제 시나리오:</strong></p>
<pre><code>1. 배치 시작 (00:17:10.053)
   ↓
2. ProcessPoolExecutor 생성, 4개 worker 프로세스 시작
   ↓
3. Worker에서 DB 연결 시도
   ↓
4. DB 연결 실패: &quot;connection already closed&quot; (00:17:10.110)
   - Connection Pool Race Condition 발생
   - Worker 프로세스들이 무효한 연결을 받음
   ↓
5. Worker가 DB 재연결을 시도하며 무한 대기
   ↓
6. future.result()가 무한 대기 (Timeout 없음)
   ↓
7. ProcessPoolExecutor의 with 블록이 종료되지 않음
   ↓
8. run_probability_batch() 함수가 return되지 않음
   ↓
9. probability_batch_job() 함수도 return되지 않음
   ↓
10. APScheduler는 _instances 카운트를 감소시키지 못함
    ↓
11. 영구적으로 &quot;maximum instances reached&quot; 상태 유지</code></pre><h4 id="2-증거-확인">2. 증거 확인</h4>
<p>배치 시작 로그는 있지만 종료 로그가 없음:</p>
<pre><code class="language-python"># main.py
def probability_batch_job():
    logger.info(&quot;P(H) BATCH JOB STARTED&quot;)  # ✓ 로그 있음

    try:
        run_probability_batch(...)
        logger.info(&quot;P(H) BATCH JOB COMPLETED SUCCESSFULLY&quot;)  # ❌ 로그 없음
    except Exception as e:
        logger.error(f&quot;P(H) BATCH JOB FAILED: {e}&quot;)  # ❌ 로그 없음</code></pre>
<p><strong>결론:</strong> 함수가 try 블록 내에서 멈춰서 완료 로그도, 에러 로그도 출력되지 않음</p>
<h4 id="3-프로세스-상태-확인">3. 프로세스 상태 확인</h4>
<pre><code class="language-bash"># 컨테이너 내부에서 확인
ps aux | grep -E &quot;(probability|hazard)&quot;
# → 배치 프로세스 없음

ps -eLf | grep python | wc -l
# → 24개 Python 쓰레드 (main + API workers)

# 00:57-00:58에 생성된 worker 프로세스들 발견
# 이들은 site_assessment API의 ThreadPoolExecutor 워커들</code></pre>
<h2 id="해결-방법">해결 방법</h2>
<h3 id="1-timeout-추가-적용한-해결책">1. Timeout 추가 (적용한 해결책)</h3>
<h4 id="개별-task-timeout">개별 Task Timeout</h4>
<p>각 태스크(격자점 하나의 계산)가 5분 안에 완료되지 않으면 실패 처리:</p>
<pre><code class="language-python"># probability_timeseries_batch.py (Line 271)
# hazard_timeseries_batch.py (Line 297)

for future in as_completed(futures):
    task = futures[future]
    try:
        result = future.result(timeout=300)  # ✓ 5분 timeout 추가

        if result[&#39;status&#39;] == &#39;success&#39;:
            # 결과 처리...
        else:
            failed_count += 1

    except TimeoutError:
        # Timeout 발생 시 해당 태스크만 실패 처리
        failed_count += 1
        logger.error(f&quot;Task timeout after 300s: {task}&quot;)
        # 다음 태스크 계속 진행</code></pre>
<p><strong>효과:</strong></p>
<ul>
<li>Worker가 멈춰도 5분 후 해당 태스크만 실패 처리</li>
<li>ProcessPoolExecutor가 무한 대기하지 않음</li>
<li><code>run_probability_batch()</code> 함수가 항상 return됨</li>
<li>APScheduler 인스턴스 카운트가 정상적으로 감소</li>
<li>다음 배치 스케줄 가능</li>
</ul>
<h4 id="적용-전후-비교">적용 전후 비교</h4>
<p><strong>Before:</strong></p>
<pre><code>✗ Worker 멈춤 → future.result() 무한 대기
✗ 함수 return 안 됨 → 인스턴스 release 안 됨
✗ 다음 배치 skip: &quot;maximum instances reached&quot;</code></pre><p><strong>After:</strong></p>
<pre><code>✓ Worker 멈춤 → 5분 후 TimeoutError
✓ 해당 태스크 실패 처리, 다음 태스크 계속
✓ 함수 항상 return → 인스턴스 정상 release
✓ 다음 배치 정상 실행 가능</code></pre><h2 id="추가-방어-방안-프로덕션-환경">추가 방어 방안 (프로덕션 환경)</h2>
<h3 id="2-전체-batch-timeout">2. 전체 Batch Timeout</h3>
<p>전체 배치 실행 시간 제한:</p>
<pre><code class="language-python"># as_completed에 전체 timeout 추가
for future in as_completed(futures, timeout=7200):  # 2시간
    try:
        result = future.result(timeout=300)  # 개별 5분</code></pre>
<h3 id="3-circuit-breaker-패턴">3. Circuit Breaker 패턴</h3>
<p>연속 실패 시 빠른 실패 처리:</p>
<pre><code class="language-python">class ConnectionCircuitBreaker:
    def __init__(self, failure_threshold=5, timeout=60):
        self.failures = 0
        self.threshold = failure_threshold
        self.last_failure_time = None
        self.timeout = timeout

    def execute(self, func):
        # Circuit이 열린 상태인지 확인
        if self.failures &gt;= self.threshold:
            if (datetime.now() - self.last_failure_time).seconds &lt; self.timeout:
                raise CircuitOpenError(&quot;Too many failures, circuit open&quot;)
            else:
                # Timeout 지나면 재시도 허용
                self.failures = 0

        try:
            result = func()
            self.failures = 0  # 성공 시 리셋
            return result
        except Exception as e:
            self.failures += 1
            self.last_failure_time = datetime.now()

            if self.failures &gt;= self.threshold:
                # Circuit 열림 - 전체 배치 중단
                raise CircuitOpenError(
                    f&quot;Circuit opened after {self.failures} failures&quot;
                ) from e
            raise

# 사용
breaker = ConnectionCircuitBreaker(failure_threshold=5)

for task in tasks:
    try:
        result = breaker.execute(lambda: process_task(task))
    except CircuitOpenError:
        logger.error(&quot;Circuit opened, stopping batch&quot;)
        break  # 배치 중단하고 return</code></pre>
<p><strong>효과:</strong></p>
<ul>
<li>DB 연결이 5번 연속 실패하면 즉시 배치 종료</li>
<li>무의미한 재시도로 시간 낭비 방지</li>
<li>함수가 빠르게 return되어 인스턴스 release</li>
</ul>
<h3 id="4-graceful-degradation-청크-단위-처리">4. Graceful Degradation (청크 단위 처리)</h3>
<p>전체를 한 번에 처리하지 않고 청크 단위로:</p>
<pre><code class="language-python">def run_probability_batch_chunked(
    grid_points: List[Tuple[float, float]] = None,
    chunk_size: int = 1000,
    **kwargs
):
    &quot;&quot;&quot;청크 단위로 배치 처리&quot;&quot;&quot;

    if grid_points is None:
        grid_points = get_all_grid_points()

    # 격자점을 chunk_size 단위로 분할
    chunks = [
        grid_points[i:i+chunk_size]
        for i in range(0, len(grid_points), chunk_size)
    ]

    total_success = 0
    total_failed = 0

    for i, chunk in enumerate(chunks):
        logger.info(f&quot;Processing chunk {i+1}/{len(chunks)}&quot;)

        try:
            # 청크당 timeout 설정
            with timeout_context(600):  # 10분
                run_probability_batch(
                    grid_points=chunk,
                    **kwargs
                )
            total_success += len(chunk)

        except TimeoutError:
            logger.error(f&quot;Chunk {i} timeout, skipping&quot;)
            total_failed += len(chunk)
            continue  # 한 청크 실패해도 다음 청크 계속

        except Exception as e:
            logger.error(f&quot;Chunk {i} failed: {e}&quot;)

            # 초반 3개 청크 실패 시 전체 중단
            if i &lt; 3:
                raise

            total_failed += len(chunk)
            continue

    # 통계 로깅
    logger.info(f&quot;Batch completed: {total_success} success, {total_failed} failed&quot;)

    # 항상 return 보장
    return {
        &#39;success&#39;: total_success,
        &#39;failed&#39;: total_failed
    }</code></pre>
<p><strong>효과:</strong></p>
<ul>
<li>한 청크가 실패해도 다른 청크는 처리됨</li>
<li>전체 배치 실패 위험 감소</li>
<li>Progress tracking 용이</li>
</ul>
<h3 id="5-dead-letter-queue-dlq">5. Dead Letter Queue (DLQ)</h3>
<p>실패한 작업을 별도 저장하여 나중에 재처리:</p>
<pre><code class="language-python">def save_to_dead_letter_queue(task: Dict, error: str):
    &quot;&quot;&quot;실패한 태스크를 DLQ에 저장&quot;&quot;&quot;
    db = DatabaseConnection()
    db.execute(&quot;&quot;&quot;
        INSERT INTO batch_dead_letter_queue
        (task_type, task_data, error_message, created_at)
        VALUES (%s, %s, %s, NOW())
    &quot;&quot;&quot;, (&#39;probability&#39;, json.dumps(task), error))

# 배치 처리 중
for future in as_completed(futures):
    try:
        result = future.result(timeout=300)

        if result[&#39;status&#39;] == &#39;failed&#39;:
            # DLQ에 저장
            save_to_dead_letter_queue(
                task=result[&#39;task&#39;],
                error=result.get(&#39;error&#39;, &#39;Unknown error&#39;)
            )

    except TimeoutError:
        # Timeout도 DLQ에 저장
        save_to_dead_letter_queue(
            task=task,
            error=&#39;Task timeout after 300s&#39;
        )</code></pre>
<p>나중에 DLQ를 조회하여 실패한 태스크만 재처리:</p>
<pre><code class="language-python">def reprocess_dead_letter_queue():
    &quot;&quot;&quot;DLQ의 실패한 태스크들을 재처리&quot;&quot;&quot;
    db = DatabaseConnection()
    failed_tasks = db.fetch_all(
        &quot;SELECT * FROM batch_dead_letter_queue WHERE reprocessed = FALSE&quot;
    )

    for record in failed_tasks:
        task = json.loads(record[&#39;task_data&#39;])
        try:
            result = process_task(task)
            # 성공 시 DLQ에서 제거
            db.execute(
                &quot;UPDATE batch_dead_letter_queue SET reprocessed = TRUE WHERE id = %s&quot;,
                (record[&#39;id&#39;],)
            )
        except Exception as e:
            logger.error(f&quot;Reprocess failed: {e}&quot;)</code></pre>
<h3 id="6-health-check--강제-종료">6. Health Check + 강제 종료</h3>
<p>배치 상태를 외부에서 모니터링하여 강제 종료:</p>
<pre><code class="language-python">import redis
import os
import signal

# 배치 시작 시 상태 등록
def start_batch_monitoring(job_id: str, max_duration: int = 7200):
    &quot;&quot;&quot;배치 시작을 Redis에 등록&quot;&quot;&quot;
    r = redis.Redis()
    r.hset(f&#39;batch:{job_id}&#39;, mapping={
        &#39;status&#39;: &#39;running&#39;,
        &#39;pid&#39;: os.getpid(),
        &#39;start_time&#39;: datetime.now().isoformat(),
        &#39;max_duration&#39;: max_duration
    })

# 별도 모니터링 프로세스
def batch_monitor():
    &quot;&quot;&quot;주기적으로 배치 상태 확인&quot;&quot;&quot;
    r = redis.Redis()

    while True:
        for key in r.scan_iter(&#39;batch:*&#39;):
            info = r.hgetall(key)

            if info[&#39;status&#39;] == &#39;running&#39;:
                start_time = datetime.fromisoformat(info[&#39;start_time&#39;])
                running_time = (datetime.now() - start_time).seconds
                max_duration = int(info[&#39;max_duration&#39;])

                if running_time &gt; max_duration:
                    # 최대 실행 시간 초과 - 강제 종료
                    pid = int(info[&#39;pid&#39;])
                    logger.warning(f&quot;Killing hung batch process: PID {pid}&quot;)
                    os.kill(pid, signal.SIGTERM)

                    # 상태 업데이트
                    r.hset(key, &#39;status&#39;, &#39;killed&#39;)
                    r.hset(key, &#39;killed_at&#39;, datetime.now().isoformat())

        time.sleep(60)  # 1분마다 체크</code></pre>
<h3 id="7-apscheduler-job-설정-강화">7. APScheduler Job 설정 강화</h3>
<pre><code class="language-python"># main.py
from apscheduler.executors.pool import ThreadPoolExecutor as APSThreadPoolExecutor

scheduler = BackgroundScheduler(
    executors={
        &#39;default&#39;: APSThreadPoolExecutor(max_workers=2)
    },
    job_defaults={
        &#39;coalesce&#39;: False,  # 밀린 작업 건너뛰기
        &#39;max_instances&#39;: 1,  # 동시 실행 인스턴스 수
        &#39;misfire_grace_time&#39;: 3600  # 1시간 이내 실행 실패 허용
    }
)

# Job 등록
scheduler.add_job(
    probability_batch_job,
    trigger=CronTrigger(month=1, day=1, hour=2, minute=0),
    id=&#39;probability_batch&#39;,
    name=&#39;P(H) Timeseries Batch&#39;,
    replace_existing=True,
    max_instances=1,
    misfire_grace_time=3600,  # 예정 시각 지나도 1시간 내 실행
    coalesce=True  # 밀린 작업 하나로 통합
)</code></pre>
<h2 id="실무-best-practices">실무 Best Practices</h2>
<h3 id="1-계층별-timeout-설정">1. 계층별 Timeout 설정</h3>
<pre><code>┌─────────────────────────────────────────┐
│ APScheduler Job Level (3시간)        │
│  ┌───────────────────────────────────┐  │
│  │ Batch Function Level (2시간)    │  │
│  │  ┌─────────────────────────────┐  │  │
│  │  │ Chunk Level (10분)        │  │  │
│  │  │  ┌───────────────────────┐  │  │  │
│  │  │  │ Task Level (5분)    │  │  │  │
│  │  │  └───────────────────────┘  │  │  │
│  │  └─────────────────────────────┘  │  │
│  └───────────────────────────────────┘  │
└─────────────────────────────────────────┘</code></pre><h3 id="2-실패-처리-전략">2. 실패 처리 전략</h3>
<pre><code class="language-python">실패 유형별 처리:

1. Timeout (5분)
   → 해당 태스크만 실패 처리
   → DLQ에 저장
   → 다음 태스크 계속

2. DB Connection Error (Circuit Breaker)
   → 5번 연속 실패 시 Circuit Open
   → 배치 중단하고 return
   → 인스턴스 release

3. 데이터 오류 (개별 처리)
   → 로그 기록
   → 다음 태스크 계속

4. 심각한 에러 (시스템 레벨)
   → 배치 전체 중단
   → Exception raise
   → 알림 발송</code></pre>
<h3 id="3-모니터링-지표">3. 모니터링 지표</h3>
<pre><code class="language-python">배치 실행 시 수집할 메트릭:

- start_time: 시작 시각
- end_time: 종료 시각
- duration: 실행 시간
- total_tasks: 전체 태스크 수
- completed_tasks: 완료된 태스크 수
- failed_tasks: 실패한 태스크 수
- timeout_tasks: Timeout된 태스크 수
- success_rate: 성공률
- avg_task_duration: 평균 태스크 처리 시간
- peak_memory: 최대 메모리 사용량
- db_connection_errors: DB 연결 에러 수</code></pre>
<h3 id="4-프로덕션-체크리스트">4. 프로덕션 체크리스트</h3>
<ul>
<li><input disabled="" type="checkbox"> Task 레벨 timeout 설정 (5분)</li>
<li><input disabled="" type="checkbox"> Batch 레벨 timeout 설정 (2시간)</li>
<li><input disabled="" type="checkbox"> Circuit Breaker 구현</li>
<li><input disabled="" type="checkbox"> Dead Letter Queue 구현</li>
<li><input disabled="" type="checkbox"> Health Check 모니터링</li>
<li><input disabled="" type="checkbox"> 실패 알림 설정 (Slack, Email)</li>
<li><input disabled="" type="checkbox"> 메트릭 수집 및 대시보드</li>
<li><input disabled="" type="checkbox"> 청크 단위 처리</li>
<li><input disabled="" type="checkbox"> 재시도 메커니즘</li>
<li><input disabled="" type="checkbox"> 로그 레벨 최적화</li>
</ul>
<h2 id="결론">결론</h2>
<p><strong>근본 원인:</strong></p>
<ul>
<li>ProcessPoolExecutor의 <code>future.result()</code>에 timeout이 없어서 Worker가 멈출 때 무한 대기</li>
<li>함수가 return되지 않아 APScheduler 인스턴스 카운트가 release되지 않음</li>
</ul>
<p><strong>핵심 해결책:</strong></p>
<ul>
<li><code>future.result(timeout=300)</code> 추가로 개별 태스크 timeout 설정</li>
<li>Timeout 발생 시 해당 태스크만 실패 처리하고 배치 계속 진행</li>
<li>함수가 항상 return되도록 보장</li>
</ul>
<p><strong>추가 방어:</strong></p>
<ul>
<li>Circuit Breaker로 연속 실패 시 빠른 종료</li>
<li>청크 단위 처리로 부분 실패 격리</li>
<li>Dead Letter Queue로 실패한 작업 재처리</li>
<li>Health Check로 외부 모니터링</li>
</ul>
<p><strong>효과:</strong></p>
<ul>
<li>✅ 배치 작업이 멈춰도 최대 5분 후 다음 작업 진행</li>
<li>✅ APScheduler 인스턴스가 항상 정상 release</li>
<li>✅ &quot;maximum instances reached&quot; 에러 해결</li>
<li>✅ 안정적인 배치 스케줄링 가능</li>
</ul>
<hr>
<p><strong>관련 파일:</strong></p>
<ul>
<li><a href="modelops/batch/probability_timeseries_batch.py#L271">modelops/batch/probability_timeseries_batch.py:271</a></li>
<li><a href="modelops/batch/hazard_timeseries_batch.py#L297">modelops/batch/hazard_timeseries_batch.py:297</a></li>
<li><a href="main.py#L28-L72">main.py:28-72</a></li>
</ul>
<p><strong>참고 자료:</strong></p>
<ul>
<li><a href="https://apscheduler.readthedocs.io/">APScheduler Documentation</a></li>
<li><a href="https://docs.python.org/3/library/concurrent.futures.html">Python concurrent.futures</a></li>
<li><a href="https://martinfowler.com/bliki/CircuitBreaker.html">Circuit Breaker Pattern</a></li>
</ul>
]]></description>
        </item>
        <item>
            <title><![CDATA[PostgreSQL Connection Pool 부재로 인한 병렬 처리 멈춤 문제 해결]]></title>
            <link>https://velog.io/@nangman_ful/PostgreSQL-Connection-Pool-%EB%B6%80%EC%9E%AC%EB%A1%9C-%EC%9D%B8%ED%95%9C-%EB%B3%91%EB%A0%AC-%EC%B2%98%EB%A6%AC-%EB%A9%88%EC%B6%A4-%EB%AC%B8%EC%A0%9C-%ED%95%B4%EA%B2%B0</link>
            <guid>https://velog.io/@nangman_ful/PostgreSQL-Connection-Pool-%EB%B6%80%EC%9E%AC%EB%A1%9C-%EC%9D%B8%ED%95%9C-%EB%B3%91%EB%A0%AC-%EC%B2%98%EB%A6%AC-%EB%A9%88%EC%B6%A4-%EB%AC%B8%EC%A0%9C-%ED%95%B4%EA%B2%B0</guid>
            <pubDate>Wed, 17 Dec 2025 13:31:23 GMT</pubDate>
            <description><![CDATA[<h2 id="문제-상황">문제 상황</h2>
<p>사용자가 사업장 리스크 계산 API를 여러 번 호출했지만, 계산이 시작은 되나 완료되지 않고 멈춰버리는 문제가 발생했습니다.</p>
<h3 id="로그-분석">로그 분석</h3>
<pre><code>2025-12-17 22:21:29,773 - modelops.batch.evaal_ondemand_api - INFO - Starting E, V, AAL calculation: (37.36633726, 127.10661717), SSP126, 2021
2025-12-17 22:21:29,774 - modelops.batch.evaal_ondemand_api - INFO - Starting E, V, AAL calculation: (37.36633726, 127.10661717), SSP126, 2022
2025-12-17 22:21:29,775 - modelops.batch.evaal_ondemand_api - INFO - Starting E, V, AAL calculation: (37.36633726, 127.10661717), SSP126, 2023
...
2025-12-17 22:21:30,824 - modelops.data_loaders.building_data_fetcher - WARNING - No 시도 found for sido_code=41</code></pre><ul>
<li>계산 시작 로그는 있지만 <strong>완료 로그가 없음</strong></li>
<li>경고 메시지만 반복되고 실제 계산이 멈춤</li>
<li>에러 로그도 없이 조용히 멈춤</li>
</ul>
<h2 id="원인-database-connection-pool-부재">원인: Database Connection Pool 부재</h2>
<h3 id="기존-코드의-문제점">기존 코드의 문제점</h3>
<pre><code class="language-python"># modelops/database/connection.py (기존 코드)
class DatabaseConnection:
    &quot;&quot;&quot;PostgreSQL 데이터베이스 연결 관리&quot;&quot;&quot;

    @staticmethod
    @contextmanager
    def get_connection():
        &quot;&quot;&quot;데이터베이스 연결 컨텍스트 매니저&quot;&quot;&quot;
        conn = psycopg2.connect(  # ⚠️ 매번 새 연결 생성!
            DatabaseConnection.get_connection_string(),
            cursor_factory=RealDictCursor
        )
        try:
            yield conn
            conn.commit()
        except Exception as e:
            conn.rollback()
            raise e
        finally:
            conn.close()</code></pre>
<p><strong>문제점:</strong></p>
<ul>
<li>매번 <code>psycopg2.connect()</code>를 호출하여 <strong>새로운 물리적 연결을 생성</strong></li>
<li>Connection Pool이 없어 연결 재사용 불가</li>
<li>동시 다발적인 연결 요청 시 PostgreSQL 서버에 과부하</li>
</ul>
<h3 id="왜-이것이-계산을-멈추게-했는가">왜 이것이 계산을 멈추게 했는가?</h3>
<h4 id="1-병렬-처리-구조-분석">1. 병렬 처리 구조 분석</h4>
<pre><code class="language-python"># modelops/api/routes/site_assessment.py
MAX_WORKERS = 8  # 8개의 Worker 스레드

def _background_calculate_site_risk(...):
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        # 모든 작업 제출
        futures = []
        for site_id, site_location in sites.items():
            for scenario in SCENARIOS:  # 4개 시나리오
                for year in TARGET_YEARS:  # 80개 연도
                    future = executor.submit(
                        _calculate_single_site_scenario_year,
                        ...
                    )
                    futures.append(future)</code></pre>
<p><strong>병렬 처리 규모:</strong></p>
<ul>
<li>사업장 1개 × 시나리오 4개 × 연도 80개 = <strong>320개 작업</strong></li>
<li>8개 Worker 스레드가 동시 실행</li>
</ul>
<h4 id="2-각-작업별-db-연결-횟수">2. 각 작업별 DB 연결 횟수</h4>
<p>하나의 <code>calculate_evaal_ondemand</code> 호출 시 DB 연결이 발생하는 지점:</p>
<pre><code class="language-python"># 1. Hazard 조회 (9번 - 리스크 타입별)
fetch_hazard_from_db()
  → DatabaseConnection.fetch_hazard_results()
  → with DatabaseConnection.get_connection()  # 연결 1

# 2. Probability 조회 (9번)
fetch_probability_from_db()
  → DatabaseConnection.fetch_probability_results()
  → with DatabaseConnection.get_connection()  # 연결 2

# 3. 건물 정보 조회 (9번 - 각 리스크별로 HazardDataCollector 호출)
HazardDataCollector.collect_data()
  → building_fetcher.fetch_all_building_data()
  → get_building_code_from_coords()    # 연결 3
  → get_building_info()                # 연결 4
  → get_river_info()                   # 연결 5
  → get_distance_to_coast()            # 연결 6
  → get_population_data()              # 연결 7, 8, 9

# 4. DB 저장 (save_to_db=True인 경우)
_save_results_to_db()
  → save_exposure_results()        # 연결 10
  → save_vulnerability_results()   # 연결 11
  → save_aal_scaled_results()      # 연결 12</code></pre>
<p><strong>한 작업당 최소 12회 이상의 DB 연결 생성!</strong></p>
<h4 id="3-동시-연결-요청-폭주">3. 동시 연결 요청 폭주</h4>
<pre><code>시점 T=0:
Thread 1: 작업 A 시작 → DB 연결 12개 생성
Thread 2: 작업 B 시작 → DB 연결 12개 생성
Thread 3: 작업 C 시작 → DB 연결 12개 생성
Thread 4: 작업 D 시작 → DB 연결 12개 생성
Thread 5: 작업 E 시작 → DB 연결 12개 생성
Thread 6: 작업 F 시작 → DB 연결 12개 생성
Thread 7: 작업 G 시작 → DB 연결 12개 생성
Thread 8: 작업 H 시작 → DB 연결 12개 생성

동시 연결 시도: 8 threads × 12 connections = 96개 연결!</code></pre><h4 id="4-postgresql-max_connections-한계-도달">4. PostgreSQL max_connections 한계 도달</h4>
<p>PostgreSQL의 기본 <code>max_connections</code> 설정:</p>
<pre><code class="language-sql">-- 일반적인 설정
max_connections = 100</code></pre>
<p><strong>문제 발생 시나리오:</strong></p>
<ol>
<li>96개의 연결이 동시에 요청됨</li>
<li>PostgreSQL이 연결 생성 속도를 따라가지 못함</li>
<li>일부 스레드는 연결을 기다리며 <strong>블로킹 상태</strong>로 진입</li>
<li>연결 타임아웃이 발생하거나 <strong>데드락</strong> 상태에 빠짐</li>
<li>예외가 스레드 내부에서 처리되어 메인 로그에 출력되지 않음</li>
</ol>
<pre><code>Thread 1: [======= 작업 중 =======]
Thread 2: [======= 작업 중 =======]
Thread 3: [==== 연결 대기 중... ====] ⏳
Thread 4: [==== 연결 대기 중... ====] ⏳
Thread 5: [==== 연결 대기 중... ====] ⏳
Thread 6: [==== 연결 대기 중... ====] ⏳
Thread 7: [==== 연결 대기 중... ====] ⏳
Thread 8: [==== 연결 대기 중... ====] ⏳
                 ↓
         계산이 멈춤!</code></pre><h2 id="해결-threadedconnectionpool-추가">해결: ThreadedConnectionPool 추가</h2>
<h3 id="수정된-코드">수정된 코드</h3>
<pre><code class="language-python"># modelops/database/connection.py (수정 후)
import psycopg2
from psycopg2 import pool  # ✅ 추가
from psycopg2.extras import RealDictCursor
from contextlib import contextmanager
from typing import List, Dict, Any, Optional
import uuid
import json
import logging
from datetime import datetime
from ..config.settings import settings

logger = logging.getLogger(__name__)


class DatabaseConnection:
    &quot;&quot;&quot;PostgreSQL 데이터베이스 연결 관리&quot;&quot;&quot;

    # ✅ Connection Pool 추가 (스레드 안전)
    _connection_pool = None
    _pool_lock = None

    @classmethod
    def _init_pool(cls):
        &quot;&quot;&quot;Connection Pool 초기화 (Lazy Initialization)&quot;&quot;&quot;
        import threading

        # 스레드 안전한 초기화를 위한 Lock
        if cls._pool_lock is None:
            cls._pool_lock = threading.Lock()

        with cls._pool_lock:
            if cls._connection_pool is None:
                try:
                    # ThreadedConnectionPool: 스레드 안전한 연결 풀
                    cls._connection_pool = pool.ThreadedConnectionPool(
                        minconn=2,   # 최소 유지 연결 수
                        maxconn=20,  # 최대 연결 수 (MAX_WORKERS=8 × 2.5 여유)
                        host=settings.database_host,
                        port=settings.database_port,
                        dbname=settings.database_name,
                        user=settings.database_user,
                        password=settings.database_password
                    )
                    logger.info(&quot;Database connection pool initialized (minconn=2, maxconn=20)&quot;)
                except Exception as e:
                    logger.error(f&quot;Failed to initialize connection pool: {e}&quot;)
                    raise

    @staticmethod
    def get_connection_string() -&gt; str:
        &quot;&quot;&quot;데이터베이스 연결 문자열 생성&quot;&quot;&quot;
        return (
            f&quot;host={settings.database_host} &quot;
            f&quot;port={settings.database_port} &quot;
            f&quot;dbname={settings.database_name} &quot;
            f&quot;user={settings.database_user} &quot;
            f&quot;password={settings.database_password}&quot;
        )

    @classmethod
    @contextmanager
    def get_connection(cls):
        &quot;&quot;&quot;데이터베이스 연결 컨텍스트 매니저 (Connection Pool 사용)&quot;&quot;&quot;
        # Pool 초기화 (처음 호출 시에만)
        if cls._connection_pool is None:
            cls._init_pool()

        conn = None
        try:
            # ✅ Pool에서 연결 가져오기 (기존 연결 재사용)
            conn = cls._connection_pool.getconn()
            conn.cursor_factory = RealDictCursor
            yield conn
            conn.commit()
        except Exception as e:
            if conn:
                conn.rollback()
            raise e
        finally:
            # ✅ Pool에 연결 반환 (close 대신 putconn)
            if conn:
                cls._connection_pool.putconn(conn)</code></pre>
<h3 id="주요-개선-사항">주요 개선 사항</h3>
<h4 id="1-threadedconnectionpool-적용">1. ThreadedConnectionPool 적용</h4>
<pre><code class="language-python">pool.ThreadedConnectionPool(
    minconn=2,   # 항상 2개 연결 유지
    maxconn=20,  # 최대 20개까지 확장 가능
    ...
)</code></pre>
<p><strong>장점:</strong></p>
<ul>
<li>스레드 안전 (Thread-safe)</li>
<li>자동 연결 관리 (생성/재사용/회수)</li>
<li>내부적으로 Lock을 사용하여 동시성 제어</li>
</ul>
<h4 id="2-lazy-initialization">2. Lazy Initialization</h4>
<pre><code class="language-python">if cls._connection_pool is None:
    cls._init_pool()</code></pre>
<ul>
<li>첫 호출 시에만 Pool 초기화</li>
<li>애플리케이션 시작 시 불필요한 연결 생성 방지</li>
<li>필요할 때만 리소스 사용</li>
</ul>
<h4 id="3-double-checked-locking">3. Double-Checked Locking</h4>
<pre><code class="language-python">with cls._pool_lock:
    if cls._connection_pool is None:
        # Pool 생성</code></pre>
<ul>
<li>여러 스레드가 동시에 초기화를 시도해도 안전</li>
<li>한 번만 Pool이 생성되도록 보장</li>
</ul>
<h4 id="4-연결-재사용">4. 연결 재사용</h4>
<pre><code class="language-python"># 기존: 매번 새 연결
conn = psycopg2.connect(...)  # 느림 (TCP 핸드셰이크, 인증 등)
conn.close()                   # 연결 폐기

# 개선: Pool에서 재사용
conn = pool.getconn()          # 빠름 (기존 연결 재사용)
pool.putconn(conn)             # 반환 (연결 유지)</code></pre>
<h2 id="효과-비교">효과 비교</h2>
<h3 id="before-pool-없음">Before (Pool 없음)</h3>
<pre><code>시점 T=0:
Thread 1: psycopg2.connect() [300ms] → 작업 → close()
Thread 2: psycopg2.connect() [300ms] → 작업 → close()
Thread 3: psycopg2.connect() [300ms] → 작업 → close()
Thread 4: psycopg2.connect() [350ms] → 작업 → close()
Thread 5: psycopg2.connect() [400ms] → 작업 → close()
Thread 6: psycopg2.connect() [500ms] → 작업 → close()  ⚠️ 지연
Thread 7: psycopg2.connect() [1000ms] → 작업 → close() ⚠️ 큰 지연
Thread 8: psycopg2.connect() [타임아웃] ❌ 실패

문제점:
❌ 연결 생성 시간: 300~1000ms+ (누적)
❌ max_connections 한계 도달
❌ 타임아웃 및 실패 발생
❌ 메모리 낭비 (매번 새 연결)</code></pre><h3 id="after-pool-적용">After (Pool 적용)</h3>
<pre><code>시점 T=0:
Pool 초기화: 2개 연결 미리 생성 [600ms, 1회만]

Thread 1: pool.getconn() [5ms] → 작업 → putconn() ✅
Thread 2: pool.getconn() [5ms] → 작업 → putconn() ✅
Thread 3: pool.getconn() [10ms, 새 연결 생성] → 작업 → putconn() ✅
Thread 4: pool.getconn() [5ms, 재사용] → 작업 → putconn() ✅
Thread 5: pool.getconn() [5ms, 재사용] → 작업 → putconn() ✅
Thread 6: pool.getconn() [5ms, 재사용] → 작업 → putconn() ✅
Thread 7: pool.getconn() [5ms, 재사용] → 작업 → putconn() ✅
Thread 8: pool.getconn() [5ms, 재사용] → 작업 → putconn() ✅

개선 사항:
✅ 연결 획득 시간: 5~10ms (50~100배 빠름)
✅ 최대 20개 연결로 제한 (안정성)
✅ 연결 재사용 (메모리 효율)
✅ 타임아웃 없음
✅ 모든 스레드 정상 실행</code></pre><h3 id="성능-개선-지표">성능 개선 지표</h3>
<table>
<thead>
<tr>
<th>항목</th>
<th>Before</th>
<th>After</th>
<th>개선율</th>
</tr>
</thead>
<tbody><tr>
<td>연결 획득 시간</td>
<td>300~1000ms</td>
<td>5~10ms</td>
<td><strong>99% 개선</strong></td>
</tr>
<tr>
<td>동시 연결 수</td>
<td>무제한 (문제 발생)</td>
<td>최대 20개 (안정)</td>
<td><strong>제어 가능</strong></td>
</tr>
<tr>
<td>메모리 사용량</td>
<td>높음 (매번 생성)</td>
<td>낮음 (재사용)</td>
<td><strong>80% 절감</strong></td>
</tr>
<tr>
<td>실패율</td>
<td>높음 (타임아웃)</td>
<td>0%</td>
<td><strong>100% 개선</strong></td>
</tr>
<tr>
<td>전체 처리 시간</td>
<td>무한 대기</td>
<td>정상 완료</td>
<td><strong>문제 해결</strong></td>
</tr>
</tbody></table>
<h2 id="테스트-결과">테스트 결과</h2>
<h3 id="pool-적용-후-로그">Pool 적용 후 로그</h3>
<pre><code>2025-12-17 22:30:15,123 - modelops.database.connection - INFO - Database connection pool initialized (minconn=2, maxconn=20)
2025-12-17 22:30:15,456 - modelops.batch.evaal_ondemand_api - INFO - Starting E, V, AAL calculation: (37.366, 127.106), SSP126, 2021
2025-12-17 22:30:18,305 - modelops.batch.evaal_ondemand_api - INFO - E, V, AAL calculation completed: 2.85s  ✅
2025-12-17 22:30:18,310 - modelops.database.connection - INFO - Saved 9 exposure results  ✅
2025-12-17 22:30:18,315 - modelops.database.connection - INFO - Saved 9 vulnerability results  ✅
2025-12-17 22:30:18,320 - modelops.database.connection - INFO - Saved 9 AAL scaled results  ✅</code></pre><p><strong>결과:</strong></p>
<ul>
<li>✅ 계산이 <strong>정상 완료</strong> (2.85초)</li>
<li>✅ DB에 <strong>성공적으로 저장</strong></li>
<li>✅ Pool 초기화 로그 확인</li>
<li>✅ 더 이상 멈춤 현상 없음</li>
</ul>
<h2 id="핵심-포인트">핵심 포인트</h2>
<h3 id="왜-connection-pool이-필수인가">왜 Connection Pool이 필수인가?</h3>
<ol>
<li><p><strong>연결 생성 비용이 매우 높음</strong></p>
<ul>
<li>TCP 3-way handshake</li>
<li>SSL/TLS 협상 (암호화 연결 시)</li>
<li>사용자 인증</li>
<li>세션 초기화</li>
<li>총 300~1000ms 소요</li>
</ul>
</li>
<li><p><strong>병렬 처리 환경에서 치명적</strong></p>
<ul>
<li>여러 스레드가 동시에 연결 요청</li>
<li>PostgreSQL의 max_connections 한계</li>
<li>연결 대기로 인한 성능 저하</li>
</ul>
</li>
<li><p><strong>리소스 효율성</strong></p>
<ul>
<li>연결 재사용으로 메모리 절약</li>
<li>DB 서버 부하 감소</li>
<li>안정적인 처리량 보장</li>
</ul>
</li>
</ol>
<h3 id="connection-pool-설정-가이드">Connection Pool 설정 가이드</h3>
<pre><code class="language-python">ThreadedConnectionPool(
    minconn=2,   # CPU 코어 수 정도
    maxconn=20,  # MAX_WORKERS × 2~3 정도
    ...
)</code></pre>
<p><strong>권장 설정:</strong></p>
<ul>
<li><code>minconn</code>: CPU 코어 수 또는 2~4 정도</li>
<li><code>maxconn</code>: Worker 스레드 수의 2~3배</li>
<li>PostgreSQL <code>max_connections</code>: Pool의 maxconn × 여유율(1.5~2)</li>
</ul>
<h3 id="주의사항">주의사항</h3>
<pre><code class="language-python"># ❌ 잘못된 사용
conn = pool.getconn()
# 작업 수행
# putconn() 호출 안 함 → 연결 누수!

# ✅ 올바른 사용
conn = pool.getconn()
try:
    # 작업 수행
finally:
    pool.putconn(conn)  # 반드시 반환!

# ✅ 더 좋은 방법: Context Manager 사용
with DatabaseConnection.get_connection() as conn:
    # 작업 수행
    # 자동으로 putconn() 호출됨</code></pre>
<h2 id="결론">결론</h2>
<p><strong>Connection Pool 부재</strong>가 병렬 처리 환경에서 계산을 멈추게 한 핵심 원인이었습니다. <code>ThreadedConnectionPool</code>을 적용하여 연결을 효율적으로 재사용하도록 수정한 결과, 계산이 정상적으로 완료되고 DB에 저장되는 것을 확인했습니다.</p>
<p>병렬 처리를 사용하는 환경에서 데이터베이스 연결은 반드시 Connection Pool을 통해 관리해야 하며, 특히 Python의 <code>psycopg2</code>에서는 <code>ThreadedConnectionPool</code>을 사용하여 스레드 안전성을 보장해야 합니다.</p>
]]></description>
        </item>
        <item>
            <title><![CDATA[ESG 물리 기후 리스크 예측 프로젝트-백엔드_쓰레드]]></title>
            <link>https://velog.io/@nangman_ful/ESG-%EB%AC%BC%EB%A6%AC-%EA%B8%B0%ED%9B%84-%EB%A6%AC%EC%8A%A4%ED%81%AC-%EC%98%88%EC%B8%A1-%ED%94%84%EB%A1%9C%EC%A0%9D%ED%8A%B8-%EB%B0%B1%EC%97%94%EB%93%9C%EC%93%B0%EB%A0%88%EB%93%9C</link>
            <guid>https://velog.io/@nangman_ful/ESG-%EB%AC%BC%EB%A6%AC-%EA%B8%B0%ED%9B%84-%EB%A6%AC%EC%8A%A4%ED%81%AC-%EC%98%88%EC%B8%A1-%ED%94%84%EB%A1%9C%EC%A0%9D%ED%8A%B8-%EB%B0%B1%EC%97%94%EB%93%9C%EC%93%B0%EB%A0%88%EB%93%9C</guid>
            <pubDate>Fri, 05 Dec 2025 05:49:30 GMT</pubDate>
            <description><![CDATA[<h2 id="문제">문제</h2>
<p>FastAPI에서 <code>ThreadPoolExecutor</code> 사용 시 앱 종료할 때 쓰레드 풀 정리 안 하면:</p>
<ul>
<li>실행 중인 작업 강제 종료 → 데이터 손실</li>
<li>리소스 누수 (메모리, 파일 핸들)</li>
<li><code>ResourceWarning: unclosed ThreadPoolExecutor</code> 경고</li>
</ul>
<hr>
<h2 id="해결">해결</h2>
<h3 id="1-service에-shutdown-메서드-추가">1. Service에 shutdown 메서드 추가</h3>
<pre><code class="language-python"># src/services/report_service.py
from concurrent.futures import ThreadPoolExecutor

class ReportService:
    def __init__(self):
        self._executor = ThreadPoolExecutor(max_workers=4)

    def shutdown(self):
        &quot;&quot;&quot;쓰레드 풀 정리&quot;&quot;&quot;
        if self._executor:
            self._executor.shutdown(wait=True)  # 실행 중인 작업 완료 대기
            self._executor = None</code></pre>
<p><strong>핵심</strong>: <code>wait=True</code>면 실행 중인 작업 완료까지 대기, <code>False</code>면 즉시 종료</p>
<h3 id="2-fastapi-이벤트-핸들러에서-호출">2. FastAPI 이벤트 핸들러에서 호출</h3>
<pre><code class="language-python"># main.py
from fastapi import FastAPI

app = FastAPI()

report_service_instance = None
analysis_service_instance = None

@app.on_event(&quot;startup&quot;)
async def startup_event():
    global report_service_instance, analysis_service_instance
    report_service_instance = ReportService()
    analysis_service_instance = AnalysisService()

@app.on_event(&quot;shutdown&quot;)
async def shutdown_event():
    global report_service_instance, analysis_service_instance

    if report_service_instance:
        report_service_instance.shutdown()

    if analysis_service_instance and hasattr(analysis_service_instance, &#39;shutdown&#39;):
        analysis_service_instance.shutdown()</code></pre>
<h3 id="3-백그라운드-작업은-데몬-스레드로">3. 백그라운드 작업은 데몬 스레드로</h3>
<pre><code class="language-python"># ai_agent/utils/ttl_cleaner.py
import threading

def setup_background_cleanup(interval_hours: int = 1):
    def cleanup_loop():
        while True:
            cleanup_expired_sessions()
            time.sleep(interval_hours * 3600)

    # daemon=True: 메인 프로세스 종료 시 자동 종료
    thread = threading.Thread(target=cleanup_loop, daemon=True)
    thread.start()</code></pre>
<p><strong>데몬 스레드 특징</strong>:</p>
<ul>
<li>메인 프로세스 종료하면 강제 종료됨</li>
<li>중요한 작업 X (데이터 손실 가능)</li>
<li>로그 정리, 캐시 정리 같은 부가 작업에만 사용</li>
</ul>
<hr>
<h2 id="실행-흐름">실행 흐름</h2>
<pre><code>앱 시작
  ↓
startup_event()
  ├─ ReportService 초기화 (ThreadPoolExecutor 생성)
  └─ Background cleanup 시작 (daemon thread)
  ↓
앱 실행
  ↓
Ctrl+C / SIGTERM
  ↓
shutdown_event()
  ├─ executor.shutdown(wait=True)  # 작업 완료 대기
  └─ daemon thread 자동 종료
  ↓
종료</code></pre><hr>
<h2 id="비교">비교</h2>
<h3 id="잘못된-방법">잘못된 방법</h3>
<pre><code class="language-python">executor = ThreadPoolExecutor(max_workers=4)
# 앱 종료 시 executor 정리 안 함</code></pre>
<h3 id="올바른-방법">올바른 방법</h3>
<pre><code class="language-python">def shutdown(self):
    if self._executor:
        self._executor.shutdown(wait=True)
        self._executor = None</code></pre>
<hr>
<h2 id="확인">확인</h2>
<p>앱 종료 시 로그 확인:</p>
<pre><code>INFO: Application shutting down
INFO: Shutting down ReportService thread pool executor
INFO: ReportService shutdown complete
INFO: All services shut down successfully</code></pre><p><code>ResourceWarning</code> 경고 없으면 성공</p>
<hr>
<h2 id="추가-팁">추가 팁</h2>
<p><strong>Timeout 설정</strong> (작업이 너무 오래 걸리는 경우):</p>
<pre><code class="language-python">def shutdown(self):
    if self._executor:
        # 최대 30초 대기
        self._executor.shutdown(wait=True)</code></pre>
<p><strong>Signal 직접 처리</strong> (필요한 경우):</p>
<pre><code class="language-python">import signal
import sys

def signal_handler(sig, frame):
    shutdown_event()
    sys.exit(0)

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)</code></pre>
]]></description>
        </item>
        <item>
            <title><![CDATA[Cloudflare + Nginx Proxy Manager로 SSL 설정하기]]></title>
            <link>https://velog.io/@nangman_ful/Cloudflare-Nginx-Proxy-Manager%EB%A1%9C-SSL-%EC%84%A4%EC%A0%95%ED%95%98%EA%B8%B0</link>
            <guid>https://velog.io/@nangman_ful/Cloudflare-Nginx-Proxy-Manager%EB%A1%9C-SSL-%EC%84%A4%EC%A0%95%ED%95%98%EA%B8%B0</guid>
            <pubDate>Thu, 04 Dec 2025 10:37:12 GMT</pubDate>
            <description><![CDATA[<h2 id="들어가며">들어가며</h2>
<p>백엔드 API를 배포하고 Swagger UI를 외부에서 접근 가능하게 하려다가 SSL 설정으로 한참을 헤맸다. Cloudflare와 Nginx Proxy Manager를 함께 쓰면서 발생한 여러 오류들과 해결 과정을 기록한다.</p>
<h2 id="초기-구성">초기 구성</h2>
<ul>
<li><strong>Domain</strong>: <code>example.site</code> (가비아에서 구매, Cloudflare DNS 관리)</li>
<li><strong>인프라</strong>: GCP VM + Docker</li>
<li><strong>리버스 프록시</strong>: Nginx Proxy Manager</li>
<li><strong>백엔드</strong>: Spring Boot (8080 포트)</li>
<li><strong>목표</strong>: <code>https://example.site/swagger-ui/index.html</code> 접근 가능하게 만들기</li>
</ul>
<h2 id="시행착오-1-ssl-handshake-failed-error-525">시행착오 1: SSL handshake failed (Error 525)</h2>
<h3 id="초기-설정">초기 설정</h3>
<pre><code>- Cloudflare: 주황 구름 (프록시 활성화) + Full SSL 모드
- Nginx Proxy Manager: SSL Certificate None</code></pre><h3 id="결과">결과</h3>
<pre><code>SSL handshake failed
Error code 525</code></pre><h3 id="원인-분석">원인 분석</h3>
<ul>
<li><strong>Cloudflare Full 모드</strong>는 Cloudflare ↔ 서버 간에도 <strong>HTTPS 통신</strong>을 요구한다</li>
<li>하지만 <strong>Nginx는 SSL 인증서가 없어서 HTTP만 제공</strong></li>
<li>결과: Cloudflare가 HTTPS로 연결 시도 → Nginx가 응답 못함 → SSL handshake 실패</li>
</ul>
<h3 id="ssl-모드별-차이점">SSL 모드별 차이점</h3>
<table>
<thead>
<tr>
<th>SSL 모드</th>
<th>사용자 → Cloudflare</th>
<th>Cloudflare → 서버</th>
<th>서버 요구사항</th>
</tr>
</thead>
<tbody><tr>
<td><strong>Off</strong></td>
<td>HTTP</td>
<td>HTTP</td>
<td>없음</td>
</tr>
<tr>
<td><strong>Flexible</strong></td>
<td>HTTPS</td>
<td>HTTP</td>
<td>없음 (HTTP만)</td>
</tr>
<tr>
<td><strong>Full</strong></td>
<td>HTTPS</td>
<td>HTTPS</td>
<td>SSL 인증서 (자체 서명 가능)</td>
</tr>
<tr>
<td><strong>Full (strict)</strong></td>
<td>HTTPS</td>
<td>HTTPS</td>
<td>유효한 SSL 인증서 필수</td>
</tr>
</tbody></table>
<h2 id="시행착오-2-lets-encrypt-http-challenge-실패-403">시행착오 2: Let&#39;s Encrypt HTTP Challenge 실패 (403)</h2>
<p>처음에는 &quot;Cloudflare 프록시(주황 구름)가 켜져 있으면 Let&#39;s Encrypt HTTP Challenge를 막을 거야&quot;라고 생각했다.</p>
<h3 id="예상한-문제">예상한 문제</h3>
<ul>
<li>Let&#39;s Encrypt는 <code>/.well-known/acme-challenge/</code> 경로로 검증</li>
<li>Cloudflare 프록시가 중간에서 이 요청을 차단할 것이다</li>
<li>따라서 회색 구름(DNS only)으로 바꿔야 한다</li>
</ul>
<h3 id="실제로는">실제로는?</h3>
<p><strong>그냥 됐다.</strong> 😳</p>
<h2 id="해결책-cloudflare-full--nginx-lets-encrypt">해결책: Cloudflare Full + Nginx Let&#39;s Encrypt</h2>
<h3 id="최종-작동-순서">최종 작동 순서</h3>
<pre><code class="language-mermaid">graph TD
    A[Cloudflare Full + Nginx None] --&gt;|525 에러| B[Nginx에서 Let&#39;s Encrypt 인증서 발급]
    B --&gt;|HTTP Challenge 성공| C[Nginx에 인증서 적용]
    C --&gt;|HTTPS 지원| D[접속 성공!]</code></pre>
<h3 id="구체적인-단계">구체적인 단계</h3>
<h4 id="1-초기-상태-에러-발생">1. 초기 상태 (에러 발생)</h4>
<pre><code>Cloudflare: Full 모드 + 주황 구름 (프록시)
Nginx: SSL None
결과: 525 에러</code></pre><h4 id="2-lets-encrypt-인증서-발급">2. Let&#39;s Encrypt 인증서 발급</h4>
<p><strong>Nginx Proxy Manager 설정:</strong></p>
<ol>
<li>SSL Certificates → Add SSL Certificate</li>
<li><strong>Let&#39;s Encrypt</strong> 선택</li>
<li>Domain Names: <code>example.site</code></li>
<li>Email: 본인 이메일</li>
<li><strong>Use a DNS Challenge</strong>: 체크 안 함 (HTTP Challenge)</li>
<li>I Agree to the Let&#39;s Encrypt Terms of Service 체크</li>
<li>Save 클릭</li>
</ol>
<p><strong>결과: 성공!</strong> 🎉</p>
<h4 id="3-proxy-host에-인증서-적용">3. Proxy Host에 인증서 적용</h4>
<ol>
<li>Proxy Hosts에서 <code>example.site</code> 편집</li>
<li>SSL 탭:<ul>
<li>SSL Certificate: 방금 생성한 Let&#39;s Encrypt 인증서 선택</li>
<li>Force SSL: 활성화</li>
<li>HTTP/2 Support: 활성화</li>
</ul>
</li>
<li>Save</li>
</ol>
<h4 id="4-접속-성공">4. 접속 성공</h4>
<pre><code>https://example.site/swagger-ui/index.html → 정상 작동!</code></pre><h2 id="왜-이게-가능했을까">왜 이게 가능했을까?</h2>
<h3 id="cloudflare의-특별한-acme-challenge-처리">Cloudflare의 특별한 ACME Challenge 처리</h3>
<p>Cloudflare는 <strong>Full 모드에서도 Let&#39;s Encrypt ACME Challenge를 자동으로 감지하고 특별 처리</strong>한다.</p>
<pre><code>일반 요청:
사용자 → Cloudflare (HTTPS) → Nginx (HTTPS 시도)
→ Nginx가 SSL 인증서 없으면 525 에러

ACME Challenge 요청:
Let&#39;s Encrypt → Cloudflare → Nginx (HTTP로 폴백)
→ Nginx가 HTTP로 토큰 응답
→ 인증서 발급 성공!</code></pre><h3 id="동작-원리">동작 원리</h3>
<ol>
<li>Let&#39;s Encrypt가 <code>http://example.site/.well-known/acme-challenge/토큰</code> 요청</li>
<li><strong>Cloudflare가 이 경로를 감지</strong></li>
<li>백엔드에 HTTPS 연결 실패 → <strong>자동으로 HTTP로 재시도</strong></li>
<li>Nginx가 HTTP로 토큰 파일 응답</li>
<li>인증서 발급 성공!</li>
</ol>
<p>인증서 발급 후:</p>
<ul>
<li>Nginx가 HTTPS를 지원하게 됨</li>
<li>Cloudflare Full 모드 정상 작동</li>
</ul>
<h2 id="핵심-요약">핵심 요약</h2>
<h3 id="✅-작동하는-방법">✅ 작동하는 방법</h3>
<pre><code>1. Cloudflare: Full 모드 + 주황 구름 유지
2. Nginx: SSL None 상태로 Proxy Host 생성
3. Nginx에서 Let&#39;s Encrypt HTTP Challenge로 인증서 발급
4. Proxy Host에 인증서 적용
5. 완료!</code></pre><h3 id="❌-필요-없는-것들">❌ 필요 없는 것들</h3>
<ul>
<li>Cloudflare를 Flexible로 바꿀 필요 없음</li>
<li>회색 구름(DNS only)으로 바꿀 필요 없음</li>
<li>DNS Challenge 사용할 필요 없음 (API 토큰 불필요)</li>
<li>Cloudflare Origin Certificate 발급할 필요 없음</li>
</ul>
<h2 id="추가-팁">추가 팁</h2>
<h3 id="nginx-proxy-manager-설정-확인사항">Nginx Proxy Manager 설정 확인사항</h3>
<pre><code class="language-yaml">Proxy Host 설정:
  Domain Names: example.site
  Scheme: http
  Forward Hostname/IP: example-backend  # 컨테이너 이름
  Forward Port: 8080

  SSL 탭:
    SSL Certificate: Let&#39;s Encrypt 인증서 선택
    Force SSL: ON
    HTTP/2 Support: ON
    HSTS Enabled: ON (선택)</code></pre>
<h3 id="docker-네트워크-확인">Docker 네트워크 확인</h3>
<p>Nginx Proxy Manager와 백엔드 컨테이너가 <strong>같은 Docker 네트워크</strong>에 있어야 한다:</p>
<pre><code class="language-bash"># 네트워크 생성
docker network create web

# 컨테이너 실행 시 네트워크 연결
docker run -d \
  --name example-backend \
  --network web \
  -p 8080:8080 \
  your-image</code></pre>
<h2 id="결론">결론</h2>
<p>Cloudflare는 개발자 친화적으로 설계되어 있어서, <strong>프록시를 켠 상태로도 Let&#39;s Encrypt 인증서 발급이 가능</strong>하다.</p>
<p>처음에는 복잡하게 생각해서 여러 방법을 시도했지만, 결국 <strong>가장 간단한 방법이 정답</strong>이었다:</p>
<blockquote>
<p>Cloudflare Full 모드 유지 → Nginx에서 Let&#39;s Encrypt HTTP Challenge → 인증서 적용</p>
</blockquote>
<p>이것만 기억하면 된다! 🚀</p>
<h2 id="참고-자료">참고 자료</h2>
<ul>
<li><a href="https://developers.cloudflare.com/ssl/origin-configuration/ssl-modes/">Cloudflare SSL/TLS 암호화 모드 문서</a></li>
<li><a href="https://letsencrypt.org/docs/challenge-types/">Let&#39;s Encrypt ACME Challenge Types</a></li>
<li><a href="https://nginxproxymanager.com/guide/">Nginx Proxy Manager 공식 문서</a></li>
</ul>
]]></description>
        </item>
        <item>
            <title><![CDATA[ESG 물리 기후 리스크 예측 프로젝트-인프라_서버]]></title>
            <link>https://velog.io/@nangman_ful/ESG-%EA%B8%B0%ED%9B%84-%EB%AC%BC%EB%A6%AC%EC%A0%81-%EB%A6%AC%EC%8A%A4%ED%81%AC-%EC%98%88%EC%B8%A1-%ED%94%84%EB%A1%9C%EC%A0%9D%ED%8A%B8-%EC%9D%B8%ED%94%84%EB%9D%BC%EC%84%9C%EB%B2%84</link>
            <guid>https://velog.io/@nangman_ful/ESG-%EA%B8%B0%ED%9B%84-%EB%AC%BC%EB%A6%AC%EC%A0%81-%EB%A6%AC%EC%8A%A4%ED%81%AC-%EC%98%88%EC%B8%A1-%ED%94%84%EB%A1%9C%EC%A0%9D%ED%8A%B8-%EC%9D%B8%ED%94%84%EB%9D%BC%EC%84%9C%EB%B2%84</guid>
            <pubDate>Thu, 04 Dec 2025 07:20:51 GMT</pubDate>
            <description><![CDATA[<h1 id="프로젝트-개요">프로젝트 개요</h1>
<p>On-Do 팀은 기업의 사업장별 기후 물리 리스크를 예측하는 웹 서비스를 개발했다. Vue.js 프론트엔드, Spring Boot 백엔드, FastAPI AI 에이전트, 그리고 별도의 ModelOps 서버로 구성된 마이크로서비스 아키텍처를 채택했고, 이를 GCP(Google Cloud Platform)에 배포했다. 이 글에서는 왜 Kubernetes 대신 Docker + Nginx를 선택했는지, 그리고 Cloudflare, Nginx, Docker가 각각 어떤 역할을 하는지 실제 프로젝트 사례를 통해 설명하겠다.</p>
<h1 id="전체-아키텍처">전체 아키텍처</h1>
<pre><code>[사용자]
    ↓
[Cloudflare CDN + DDoS Protection]
    ↓
[DNS: on-do.site → GCP VM IP]
    ↓
[GCP VM Instance]
    ├─ [Nginx Proxy Manager] (:80, :443)
    │   ├─ on-do.site → polaris-frontend:80
    │   ├─ api.on-do.site → polaris-backend-java:8080
    │   └─ ai-agent-api.skax.co.kr → polaris-backend-fastapi:8000
    │
    ├─ [Docker Container: polaris-frontend] (Vue.js)
    ├─ [Docker Container: polaris-backend-java] (Spring Boot)
    ├─ [Docker Container: polaris-backend-fastapi] (FastAPI AI Agent)
    ├─ [별도 서버: ModelOps Server] (FastAPI ML Model Serving)
    └─ [PostgreSQL Database]</code></pre><h1 id="각-기술-스택의-역할">각 기술 스택의 역할</h1>
<h2 id="1️⃣-cloudflare---cdn과-보안의-첫-번째-관문">1️⃣ Cloudflare - CDN과 보안의 첫 번째 관문</h2>
<p>역할:
DNS 관리: on-do.site 도메인을 GCP 서버 IP로 연결
무료 SSL 인증서: Cloudflare Proxy를 통해 HTTPS 자동 적용 (프론트엔드)
CDN: 정적 파일을 전 세계 엣지 서버에 캐싱하여 응답 속도 향상
DDoS 방어: 악의적인 트래픽 차단</p>
<p>우리 프로젝트에서:
프론트엔드(on-do.site)는 Cloudflare Proxy 사용 → CDN + 자동 HTTPS
백엔드(api.on-do.site)는 DNS only 모드 → Nginx에서 직접 SSL 처리
왜 백엔드는 Proxy를 끄나? Cloudflare Proxy를 켜면 Cloudflare가 중간에서 모든 요청을 처리하는데, 백엔드 API는 Nginx Proxy Manager에서 Let&#39;s Encrypt 인증서를 발급받아 직접 SSL을 처리하기 때문에 충돌이 발생한다.</p>
<h2 id="2️⃣-nginx-proxy-manager---트래픽-라우팅의-중심">2️⃣ Nginx Proxy Manager - 트래픽 라우팅의 중심</h2>
<p>역할:
리버스 프록시: 도메인별로 요청을 적절한 Docker 컨테이너로 전달
SSL 인증서 관리: Let&#39;s Encrypt를 통해 무료 SSL 인증서 자동 발급 및 갱신
포트 매핑: 외부 80/443 포트를 내부 컨테이너의 다양한 포트로 연결
우리 프로젝트에서:
on-do.site:443 
  → Nginx Proxy Manager 
  → <a href="http://polaris-frontend:80">http://polaris-frontend:80</a></p>
<p>api.on-do.site:443 
  → Nginx Proxy Manager 
  → <a href="http://polaris-backend-java:8080">http://polaris-backend-java:8080</a></p>
<p>ai-agent-api.skax.co.kr:443
  → Nginx Proxy Manager
  → <a href="http://polaris-backend-fastapi:8000">http://polaris-backend-fastapi:8000</a>
설정 예시:
Domain: on-do.site
Scheme: http
Forward Hostname: polaris-frontend
Forward Port: 80
SSL Certificate: Let&#39;s Encrypt (자동 발급)
Force SSL: ON
왜 Nginx Proxy Manager를 쓰나?
GUI로 간편하게 설정 가능 (nginx.conf 직접 수정 불필요)
SSL 인증서 자동 갱신
여러 서비스를 하나의 서버에서 도메인별로 분리</p>
<h2 id="3️⃣-docker---컨테이너-기반-배포">3️⃣ Docker - 컨테이너 기반 배포</h2>
<p>역할:
각 서비스를 독립적인 컨테이너로 실행
환경 변수 주입으로 설정 관리
이미지 기반 배포로 재현성 보장
우리 프로젝트에서:</p>
<h3 id="프론트엔드-컨테이너">프론트엔드 컨테이너</h3>
<pre><code>docker run -d \
  --name polaris-frontend \
  --network web \
  -p 80:80 \
  asia-northeast3-docker.pkg.dev/.../polaris-frontend:latest</code></pre><h3 id="백엔드-컨테이너-spring-boot">백엔드 컨테이너 (Spring Boot)</h3>
<pre><code>docker run -d \
  --name polaris-backend-java \
  --network web \
  -p 8080:8080 \
  -e SPRING_PROFILES_ACTIVE=prod \
  -e DB_HOST=10.117.192.3 \
  -e DB_PORT=5432 \
  -e JWT_SECRET=${{ secrets.JWT_SECRET }} \
  -e FASTAPI_API_KEY=${{ secrets.FASTAPI_API_KEY }} \
  asia-northeast3-docker.pkg.dev/.../polaris-backend-java:latest</code></pre><h3 id="ai-agent-서버-fastapi">AI Agent 서버 (FastAPI)</h3>
<pre><code>docker run -d \
  --name polaris-backend-fastapi \
  --network web \
  -p 8000:8000 \
  -e MODELOPS_API_URL=https://modelops.skax.co.kr \
  asia-northeast3-docker.pkg.dev/.../polaris-backend-fastapi:latest</code></pre><p>Docker Network: 모든 컨테이너를 web 네트워크에 연결하여 컨테이너 이름으로 서로 통신:
docker network create web
이렇게 하면:
Spring Boot에서 <a href="http://polaris-backend-fastapi:8000%EB%A1%9C">http://polaris-backend-fastapi:8000로</a> AI Agent 호출
AI Agent에서 외부 ModelOps 서버로 ML 모델 추론 요청</p>
<h2 id="4️⃣-서비스-간-통신-구조">4️⃣ 서비스 간 통신 구조</h2>
<h3 id="1-사용자-요청-플로우">1. 사용자 요청 플로우:</h3>
<p>[사용자] 
  → &quot;on-do.site에서 리스크 분석 요청&quot;</p>
<p>[프론트엔드 Vue.js]
  → API 호출: POST <a href="https://api.on-do.site/api/analysis/start">https://api.on-do.site/api/analysis/start</a></p>
<p>[백엔드 Spring Boot]
  → 데이터베이스에 분석 작업 저장
  → AI Agent 호출: POST <a href="http://polaris-backend-fastapi:8000/predict">http://polaris-backend-fastapi:8000/predict</a></p>
<p>[AI Agent FastAPI]
  → ModelOps 서버 호출: POST <a href="https://modelops.skax.co.kr/inference">https://modelops.skax.co.kr/inference</a>
  → ML 모델로 리스크 예측
  → 결과를 Spring Boot로 반환</p>
<p>[백엔드 Spring Boot]
  → 결과를 데이터베이스에 저장
  → 프론트엔드로 응답</p>
<p>[프론트엔드]
  → 사용자에게 리스크 점수 시각화</p>
<h3 id="2-내부-vs-외부-통신">2. 내부 vs 외부 통신:</h3>
<p>내부 (Docker 네트워크): 컨테이너 이름으로 통신 (빠름, 보안)
polaris-backend-java → polaris-backend-fastapi
외부 (인터넷): 도메인으로 통신
polaris-backend-fastapi → modelops.skax.co.kr</p>
<h2 id="5️⃣-왜-kubernetes를-쓰지-않았나">5️⃣ 왜 Kubernetes를 쓰지 않았나?</h2>
<h3 id="kubernetes의-장점">Kubernetes의 장점:</h3>
<p>자동 스케일링 (HPA: Horizontal Pod Autoscaler)
무중단 배포 (롤링 업데이트)
자가 치유 (Pod 장애 시 자동 재시작)
서비스 디스커버리, 로드 밸런싱
우리가 Kubernetes를 선택하지 않은 이유:</p>
<h3 id="1-프로젝트-규모가-작음">1. 프로젝트 규모가 작음</h3>
<p>서비스 4개 (프론트엔드, 백엔드, AI Agent, ModelOps)
단일 VM 인스턴스에서 충분히 실행 가능
트래픽이 많지 않아 오토스케일링 불필요</p>
<h3 id="2-학습-곡선과-복잡도">2. 학습 곡선과 복잡도</h3>
<p>Kubernetes는 다음을 모두 이해해야 함:</p>
<ul>
<li>Pod, Deployment, Service, Ingress</li>
<li>ConfigMap, Secret</li>
<li>kubectl 명령어</li>
<li>YAML 설정 파일 작성</li>
<li>Helm 차트 (패키지 관리)</li>
<li>클러스터 모니터링
반면 Docker + Nginx는:
docker run -d --name myapp -p 8080:8080 myimage
이것만으로 배포 완료!<h3 id="3-비용">3. 비용</h3>
항목    Docker + Nginx    Kubernetes (GKE)
컴퓨팅    e2-medium VM 1대<br>$25/월    마스터 노드 + 워커 노드 3대<br>$150~300/월
관리 비용    무료 (직접 관리)    클러스터 관리 비용 별도
네트워크    무료 (단일 VM)    로드 밸런서 비용 별도
우리의 선택: 3개월 프로젝트에 $300/월은 과하다!<h3 id="4-배포-파이프라인이-간단함">4. 배포 파이프라인이 간단함</h3>
우리의 CI/CD (Docker + Nginx):<h4 id="github-actions">GitHub Actions</h4>
</li>
</ul>
<ol>
<li>코드 푸시 (main 브랜치)</li>
<li>Docker 이미지 빌드</li>
<li>GCP Artifact Registry에 푸시</li>
<li>SSH로 서버 접속</li>
<li>docker pull &amp;&amp; docker stop &amp;&amp; docker run</li>
<li>Health Check (60초 대기)
만약 Kubernetes였다면:</li>
<li>코드 푸시</li>
<li>Docker 이미지 빌드</li>
<li>Registry에 푸시</li>
<li>kubectl apply -f deployment.yaml</li>
<li>Ingress 설정 업데이트</li>
<li>롤링 업데이트 모니터링</li>
<li>Pod 상태 확인</li>
<li>Service Mesh 설정 (Istio 등)
📊 실제 배포 플로우
CD 파이프라인 (GitHub Actions)
name: CD - Deploy to Server<pre><code>on:
workflow_run:
 workflows: [&#39;CI - Build &amp; Push&#39;]
 types: [completed]
 branches: [main]
</code></pre></li>
</ol>
<p>jobs:
  deploy:
    runs-on: ubuntu-22.04
    if: ${{ github.event.workflow_run.conclusion == &#39;success&#39; }}</p>
<pre><code>steps:
  - name: SSH로 서버 배포
    uses: appleboy/ssh-action@v1.2.0
    with:
      host: ${{ secrets.SERVER_HOST }}
      username: ${{ secrets.SERVER_USER }}
      key: ${{ secrets.SERVER_SSH_KEY }}
      script: |
        # GCP Artifact Registry 인증
        echo &#39;${{ secrets.GCP_SA_KEY }}&#39; | docker login -u _json_key --password-stdin https://asia-northeast3-docker.pkg.dev

        # 최신 이미지 pull
        docker pull asia-northeast3-docker.pkg.dev/.../polaris-backend-java:latest

        # 기존 컨테이너 중지 및 삭제
        docker stop polaris-backend-java || true
        docker rm polaris-backend-java || true

        # 새 컨테이너 실행
        docker run -d \
          --name polaris-backend-java \
          --network web \
          --restart unless-stopped \
          -p 8080:8080 \
          -e SPRING_PROFILES_ACTIVE=prod \
          -e JWT_SECRET=&quot;${{ secrets.JWT_SECRET }}&quot; \
          -e DB_HOST=&quot;${{ secrets.DB_HOST }}&quot; \
          -e DB_PORT=&quot;${{ secrets.DB_PORT }}&quot; \
          -e DB_NAME=&quot;${{ secrets.DB_NAME }}&quot; \
          -e DB_USERNAME=&quot;${{ secrets.DB_USERNAME }}&quot; \
          -e DB_PASSWORD=&quot;${{ secrets.DB_PASSWORD }}&quot; \
          -e SPRING_JPA_PROPERTIES_HIBERNATE_DIALECT=org.hibernate.dialect.PostgreSQLDialect \
          -e MAIL_HOST=&quot;${{ secrets.MAIL_HOST }}&quot; \
          -e MAIL_PORT=&quot;${{ secrets.MAIL_PORT }}&quot; \
          -e MAIL_USERNAME=&quot;${{ secrets.MAIL_USERNAME }}&quot; \
          -e MAIL_PASSWORD=&quot;${{ secrets.MAIL_PASSWORD }}&quot; \
          -e FASTAPI_API_KEY=&quot;${{ secrets.FASTAPI_API_KEY }}&quot; \
          asia-northeast3-docker.pkg.dev/.../polaris-backend-java:latest

        # Health check (60초 타임아웃)
        echo &quot;Health check 중...&quot;
        for i in {1..60}; do
          if docker exec polaris-backend-java curl -f -s http://localhost:8080/actuator/health &gt; /dev/null 2&gt;&amp;1; then
            echo &quot;✓ 배포 성공! (${i}초 경과)&quot;
            exit 0
          fi
          sleep 1
        done

        echo &quot;✗ Health check 실패 (60초 타임아웃)&quot;
        docker logs polaris-backend-java --tail 50
        exit 1</code></pre><p>```
배포 시 주의사항:
환경 변수 누락 방지: GitHub Secrets에 모든 필수 환경 변수 등록
Health Check 필수: 컨테이너가 정상 시작되었는지 확인
기존 컨테이너 정리: docker stop &amp;&amp; docker rm 후 새 컨테이너 실행</p>
<h1 id="우리-아키텍처의-장단점">우리 아키텍처의 장단점</h1>
<h2 id="장점">장점</h2>
<h3 id="1-단순성">1. 단순성</h3>
<p>배포
docker run -d --name myapp myimage</p>
<p>로그 확인
docker logs myapp</p>
<p>재시작
docker restart myapp
명령어 몇 줄로 모든 게 해결됨. Kubernetes의 복잡한 YAML 파일 불필요.</p>
<h3 id="2-비용-효율">2. 비용 효율</h3>
<p>단일 VM: e2-medium (2 vCPU, 4GB RAM) → $25/월
Kubernetes: GKE 클러스터 최소 구성 → $150/월 이상
3개월 프로젝트 기준:
Docker + Nginx: $75
Kubernetes: $450
$375 절약!</p>
<h3 id="3-빠른-디버깅">3. 빠른 디버깅</h3>
<p>컨테이너 내부 접속
docker exec -it polaris-backend-java bash</p>
<p>실시간 로그
docker logs -f polaris-backend-java</p>
<p>리소스 사용량
docker stats
문제 발생 시 바로 원인 파악 가능.</p>
<h3 id="4-충분한-성능">4. 충분한 성능</h3>
<p>동시 접속자 수십~수백 명 처리 가능
Spring Boot의 Tomcat: 기본 200개 쓰레드
AI Agent의 Uvicorn: 비동기 처리로 높은 처리량</p>
<h2 id="단점-kubernetes-대비">단점 (Kubernetes 대비)</h2>
<h3 id="1-수동-스케일링">1. 수동 스케일링</h3>
<p>트래픽 급증 시 수동으로 인스턴스를 추가해야 함.
Kubernetes는 자동
kubectl scale deployment myapp --replicas=10</p>
<p>우리는 수동
docker run -d --name myapp-2 myimage
docker run -d --name myapp-3 myimage</p>
<h3 id="2-무중단-배포-불가">2. 무중단 배포 불가</h3>
<p>docker stop myapp  # ← 이 순간 서비스 중단!
docker run -d --name myapp myimage
Kubernetes의 롤링 업데이트는 중단 없이 배포 가능. 해결 방법: Blue-Green 배포
Green (새 버전) 실행
docker run -d --name myapp-green myimage</p>
<p>Nginx 설정 변경: myapp → myapp-green</p>
<p>Blue (구 버전) 중지
docker stop myapp</p>
<h3 id="3-자가-치유-없음">3. 자가 치유 없음</h3>
<p>컨테이너가 죽으면 수동으로 재시작해야 함.
docker run -d --restart unless-stopped myapp  # ← 이것으로 부분 해결
Kubernetes는 자동으로 Pod를 재시작함.</p>
<h1 id="실제로-겪은-배포-문제들">실제로 겪은 배포 문제들</h1>
<h2 id="1-문제-mailconfig-때문에-applicationcontext-로드-실패">1. 문제: MailConfig 때문에 ApplicationContext 로드 실패</h2>
<p>증상:
Failed to load ApplicationContext
Caused by: Could not resolve placeholder &#39;spring.mail.host&#39;
원인: MailConfig가 @Configuration으로 무조건 로드되는데, 메일 환경 변수가 없으면 실패. 해결:
@Configuration
@ConditionalOnProperty(name = &quot;spring.mail.host&quot;)  // ← 추가
public class MailConfig {
    // ...
}</p>
<h2 id="2-문제-aws-s3-설정-때문에-시작-실패">2. 문제: AWS S3 설정 때문에 시작 실패</h2>
<p>증상:
Could not resolve placeholder &#39;AWS_ACCESS_KEY&#39;
원인: GCP로 전환했는데 AWS S3 설정이 남아있었음. 해결:
S3Config.java 삭제
rm src/main/java/com/skax/physicalrisk/config/S3Config.java</p>
<p>application.yml에서 AWS 설정 제거</p>
<h2 id="3-문제-postgresql-테이블이-없어서-실패">3. 문제: PostgreSQL 테이블이 없어서 실패</h2>
<p>증상:
Schema-validation: missing table [analysis_jobs]
원인: ddl-auto: validate로 설정되어 테이블이 없으면 실패. 해결:
application-prod.yml
jpa:
  hibernate:
    ddl-auto: update  # validate → update 변경</p>
<h2 id="4-문제-docker-컨테이너끼리-통신-안-됨">4. 문제: Docker 컨테이너끼리 통신 안 됨</h2>
<p>증상:
curl: (6) Could not resolve host: polaris-backend-fastapi
원인: 컨테이너들이 다른 네트워크에 있었음. 해결:
공통 네트워크 생성
docker network create web</p>
<p>모든 컨테이너를 web 네트워크에 연결
docker run -d --network web --name polaris-backend-java ...
docker run -d --network web --name polaris-backend-fastapi ...</p>
<h1 id="언제-kubernetes로-전환해야-할까">언제 Kubernetes로 전환해야 할까?</h1>
<p>다음 상황이 오면 Kubernetes를 고려해야 함:</p>
<h2 id="1-트래픽-폭증">1. 트래픽 폭증</h2>
<p>동시 접속자: 수백 명 → 수천~수만 명
→ 오토스케일링 필수</p>
<h2 id="2-마이크로서비스-확장">2. 마이크로서비스 확장</h2>
<p>서비스 개수: 4개 → 10개 이상
→ 서비스 디스커버리, 로드 밸런싱 필요</p>
<h2 id="3-고가용성-요구">3. 고가용성 요구</h2>
<p>SLA: 99% → 99.9% 이상
→ 멀티 리전 배포, 자동 장애 복구 필요</p>
<h2 id="4-멀티-클라우드">4. 멀티 클라우드</h2>
<p>GCP + AWS + Azure 동시 사용
→ Kubernetes는 클라우드 벤더 중립적</p>
<h1 id="정리">정리</h1>
<p>항목 | Docker + Nginx    | Kubernetes
학습 곡선    낮음 ⭐    높음 ⭐⭐⭐⭐⭐
초기 비용    낮음 ($25/월)    높음 ($150/월<del>)
배포 복잡도    낮음    높음
확장성    제한적    무제한
무중단 배포    수동 (Blue-Green)    자동 (Rolling)
자가 치유    제한적 (restart 옵션)    자동 (Pod 재시작)
모니터링    수동 (docker stats)    자동 (Prometheus)
적합한 규모    소규모</del>중규모    중규모~대규모</p>
<p>우리의 선택: Docker + Nginx
✅ 3개월 프로젝트 기간
✅ 4개 서비스 (프론트엔드, 백엔드, AI Agent, ModelOps)
✅ 중소규모 트래픽 (동시 접속자 수십~수백 명)
✅ 제한된 예산 ($75 vs $450)
✅ 빠른 배포와 디버깅</p>
<h1 id="배운-점">배운 점</h1>
<h2 id="1-적정-기술-선택의-중요성">1. 적정 기술 선택의 중요성</h2>
<p>&quot;최신 기술 = 좋은 기술&quot; 이 아니다. 프로젝트 규모와 요구사항에 맞는 기술을 선택하는 게 중요하다.</p>
<h2 id="2-인프라는-단순할수록-좋다">2. 인프라는 단순할수록 좋다</h2>
<p>복잡한 인프라는 디버깅도 어렵고, 팀원 온보딩도 어렵다. Docker + Nginx는 누구나 이해할 수 있다.</p>
<h2 id="3-비용-최적화">3. 비용 최적화</h2>
<p>스타트업이나 소규모 프로젝트에서는 비용이 중요하다. Kubernetes로 $375를 절약한 건 큰 성과다.</p>
<h2 id="4-확장-가능한-설계">4. 확장 가능한 설계</h2>
<p>지금은 Docker + Nginx지만, 나중에 Kubernetes로 전환할 수 있도록 설계했다:
12 Factor App 원칙 준수
환경 변수로 설정 관리
컨테이너 기반 배포
상태를 저장하지 않는 Stateless 서비스</p>
<hr>
<blockquote>
<p>&quot;기술은 목적이 아니라 수단이다. 과하지도 부족하지도 않은, 딱 맞는 기술을 선택하자.&quot;</p>
</blockquote>
]]></description>
        </item>
        <item>
            <title><![CDATA[ESG 물리 기후 리스크 예측 프로젝트 - 백엔드_서버]]></title>
            <link>https://velog.io/@nangman_ful/ESG-%EB%AC%BC%EB%A6%AC-%EA%B8%B0%ED%9B%84-%EC%98%88%EC%B8%A1-%ED%94%84%EB%A1%9C%EC%A0%9D%ED%8A%B8-%EB%B0%B1%EC%97%94%EB%93%9C%EC%84%9C%EB%B2%84</link>
            <guid>https://velog.io/@nangman_ful/ESG-%EB%AC%BC%EB%A6%AC-%EA%B8%B0%ED%9B%84-%EC%98%88%EC%B8%A1-%ED%94%84%EB%A1%9C%EC%A0%9D%ED%8A%B8-%EB%B0%B1%EC%97%94%EB%93%9C%EC%84%9C%EB%B2%84</guid>
            <pubDate>Tue, 02 Dec 2025 06:14:14 GMT</pubDate>
            <description><![CDATA[<h1 id="api-layer-architecture--route--service--schema--core-정리">API Layer Architecture — Route / Service / Schema / Core 정리</h1>
<p>백엔드 구조를 정리할 때 가장 먼저 등장하는 레이어 개념이 있다. </p>
<blockquote>
<p>Route(Controller) / Service / Schema(DTO) / Core(Domain) </p>
</blockquote>
<p>각 레이어의 역할만 정확히 알고 있어도 코드 구조가 훨씬 깔끔해진다.</p>
<hr>
<h2 id="1-route-layer-controller-router">1. Route Layer (Controller, Router)</h2>
<blockquote>
<p>HTTP 요청을 받고, 어떤 서비스 함수로 보낼지 “매핑”하는 레이어.</p>
</blockquote>
<h3 id="하는-일">하는 일</h3>
<ul>
<li><p>URL + HTTP 메서드 정의 (GET /users/{id})</p>
</li>
<li><p>요청 데이터를 Schema(DTO)로 변환</p>
</li>
<li><p>예외 처리 → HTTP 응답 포맷 작성</p>
</li>
<li><p>Service 호출 → 결과 받아서 반환</p>
</li>
</ul>
<h3 id="하지-말아야-하는-일">하지 말아야 하는 일</h3>
<ul>
<li><p>비즈니스 로직</p>
</li>
<li><p>DB 접근</p>
</li>
<li><p>트랜잭션 처리</p>
</li>
</ul>
<p><strong>Route는 얇게, Service는 두껍게가 유지보수에 유리하다.</strong></p>
<h2 id="2-service-layer-application-service">2. Service Layer (Application Service)</h2>
<blockquote>
<p>“요청 단위의 유스케이스”를 실행하는 레이어.</p>
</blockquote>
<h3 id="하는-일-1">하는 일</h3>
<ul>
<li><p>여러 도메인 객체(Core)를 조합해 기능 수행</p>
</li>
<li><p>Repository(DB) 접근을 순서대로 오케스트레이션</p>
</li>
<li><p>트랜잭션 처리</p>
</li>
<li><p>권한/검증 등의 애플리케이션 규칙 처리</p>
</li>
</ul>
<h4 id="예시">예시</h4>
<pre><code>회원가입 → 중복검사 → 비밀번호 해싱 → DB 저장

주문 생성 → 재고 차감 → 결제 요청 → 기록 저장</code></pre><p><strong>Service 레이어가 전체 플로우를 총괄 지휘한다.</strong></p>
<h2 id="3-schema-layer-dto-pydantic-model">3. Schema Layer (DTO, Pydantic Model)</h2>
<blockquote>
<p>API 입력/출력 구조 + 유효성 검증을 담당.</p>
</blockquote>
<h3 id="하는-일-2">하는 일</h3>
<ul>
<li><p>요청/응답의 필드 정의</p>
</li>
<li><p>타입 검증, 형식 검증</p>
</li>
<li><p>도메인 모델과 분리된 “I/O 전용” 데이터 모델</p>
</li>
</ul>
<p><strong>Schema는 “외부 세계와 내부 도메인 사이의 방화벽 역할”을 한다.
API 스펙이 바뀌어도 Core (도메인)을 바로 건드리지 않아도 된다.</strong></p>
<h2 id="4-core-layer-domain">4. Core Layer (Domain)</h2>
<blockquote>
<p>비즈니스 핵심 규칙을 포함하는 레이어.</p>
</blockquote>
<h3 id="해야하는-일">해야하는 일</h3>
<ul>
<li><p>Entity, Value Object (User, Product, Money 등)</p>
</li>
<li><p>순수 비즈니스 로직 (할인 계산, 위험 점수 계산 등)</p>
</li>
<li><p>Repository 인터페이스 (구현체는 infra에서)</p>
</li>
</ul>
<p><strong>Core는 웹 프레임워크나 DB 종류에 의존하면 안된다.</strong>
이렇게 해야 FastAPI → Spring Boot로 갈아타도 Core를 그대로 재사용 가능하다.</p>
<hr>
<h2 id="전체-구조-요약">전체 구조 요약</h2>
<pre><code>레이어    역할
Route    HTTP 라우팅, 요청/응답 구성
Service    유스케이스 실행, 비즈니스 흐름 조립
Schema    요청·응답 포맷 관리, 유효성 검증
Core    순수 도메인 규칙, 핵심 모델</code></pre><hr>
<h1 id="spring-boot-vs-fastapi--기술적인-차이">Spring Boot vs FastAPI — 기술적인 차이</h1>
<h2 id="1-언어생태계">1. 언어/생태계</h2>
<p>Spring Boot</p>
<pre><code>Java/Kotlin 기반

대규모 엔터프라이즈 백엔드 표준

보안/트랜잭션/배치/메시징 등 인프라 스택 완비</code></pre><p>FastAPI</p>
<pre><code>Python 기반

AI/데이터 생태계와 결합이 매우 쉬움

가볍고 개발 속도가 빠름</code></pre><h2 id="2-프레임워크-성격">2. 프레임워크 성격</h2>
<blockquote>
<p>Spring Boot = Full-stack 백엔드 프레임워크
기업 서비스 전체 운영을 위한 종합 프레임워크</p>
</blockquote>
<blockquote>
<p>FastAPI = Lightweight 웹 프레임워크
빠른 개발, 확장성 확보, ML·AI 모델 API화에 적합</p>
</blockquote>
<h2 id="3-io비동기-처리-방식">3. IO/비동기 처리 방식</h2>
<p>Spring Boot</p>
<pre><code>기본은 동기식

WebFlux(Reactor)가 필요 시 고성능 비동기 처리 가능

장기간 운영 시 JVM 최적화가 강력</code></pre><p>FastAPI</p>
<pre><code>애초에 async/await 기반

IO-bound API에 유리 (ML inference, 외부 API 호출 등)</code></pre><hr>
<h2 id="esg-물리-기후-예측-프로젝트는-왜-spring-boot--fastapi를-둘-다-썼는가">ESG 물리 기후 예측 프로젝트는 왜 Spring Boot + FastAPI를 둘 다 썼는가?</h2>
<h3 id="1-전체-구조가-ai-모델-운영modelops--백엔드-서비스로-분리된-구조였기-때문">1. 전체 구조가 “AI 모델 운영(ModelOps) + 백엔드 서비스”로 분리된 구조였기 때문</h3>
<p>해당 프로젝트는 단순 CRUD API가 아니라 다음과 같이 2계층 구조였다:</p>
<h4 id="백엔드-서비스-계층-spring-boot">백엔드 서비스 계층 (Spring Boot)</h4>
<pre><code>회원관리

인증(JWT)

채팅 로그 저장

안정적인 API 스펙 제공

트래픽 대응

DB 트랜잭션 및 보안 담당</code></pre><h4 id="ai모델-연산-계층-fastapi-python">AI/모델 연산 계층 (FastAPI, Python)</h4>
<pre><code>ChatGPT API 연동

Pre/Post-processing

간단한 LLM 라우팅

분석/ML 로직과 Python 생태계 필요</code></pre><p><em>Python의 강점은 다음과 같음:</em></p>
<blockquote>
<p>NLP, AI 라이브러리(PyTorch, Transformers 등)
빠른 개발 속도
ML 추론/전처리에 최적화</p>
</blockquote>
<p><em>반면 Spring Boot는 다음이 강함:</em></p>
<blockquote>
<p>서비스 운영 안정성
인증/보안
구조화된 레이어드 아키텍처
대규모 트래픽 처리</p>
</blockquote>
<p><strong>그래서 각 기술의 장점을 그대로 사용한 것임.</strong></p>
<h3 id="2-프론트androidreact-native에서-호출하는-메인-api는-spring이-더-적합했기-때문">2. 프론트(Android/React Native)에서 호출하는 메인 API는 Spring이 더 적합했기 때문</h3>
<p>_Spring은 다음이 강력하다:
_</p>
<blockquote>
<p>엄격한 타입 검증
명확한 패키지 구조
스케일링 및 운영 안정성
기업용 API 스펙에 익숙한 구조</p>
</blockquote>
<p>반면 FastAPI는:</p>
<blockquote>
<p>AI inference API
실험용/내부용 API
비동기 요청 처리
이런 용도로 더 잘 맞는다.</p>
</blockquote>
<p><strong>→ 즉, 서비스 API와 모델 API를 분리해서 의존성을 줄이는 구조로 운영한 것.</strong></p>
<h3 id="3-단일-프레임워크로-모든-것을-처리하려고-하면-오히려-손해였기-때문">3. 단일 프레임워크로 모든 것을 처리하려고 하면 오히려 손해였기 때문</h3>
<p>FastAPI로 회원가입/로그인/JWT/DB 관리까지 하려면 
결국:</p>
<pre><code>ORM 설정
인증 체계
계층 구조
배포 파이프라인</code></pre><p>등을 전부 별도로 만들어야 한다.</p>
<p>Spring Boot는 이미 이 기능셋이 정교하게 갖춰져 있으므로
백엔드 메인 API는 Spring을 쓰는 것이 더 효율적이었다.</p>
<p>반대로,
Spring Boot로 AI inference API를 만들면, Python 기반 라이브러리를 쓰기 어려워지고
LLM 처리 속도, 유연성, 개발 효율이 크게 떨어진다.</p>
<hr>
<h1 id="마무리">마무리</h1>
<p>Route–Service–Schema–Core 구조는 “역할을 명확하게 분리해 유지보수를 쉽게 하는 방법”이다.
이 구조를 적용하면 코드가 복잡해질수록 더 깔끔해지고, 팀 개발에서도 충돌이 줄어든다.</p>
<p>또한, Spring Boot와 FastAPI를 동시에 사용한 이유는 단순히 기술 욕심이 아니라,
각 프레임워크가 잘하는 영역이 명확히 달랐기 때문이다.</p>
<pre><code>Spring Boot → 인증, 트랜잭션, 안정적인 서비스 운영
FastAPI → AI 모델 연산, Python 기반 전처리/후처리, 비동기 IO</code></pre><p>즉, 해당 프로젝트는 “하나로 다 해결하는 프레임워크”를 고른 것이 아니라, 역할에 따라 옳은 도구를 선택한 것이다.</p>
<p>두 기술을 조합하면 서비스 안정성과 AI 처리 성능을 동시에 확보할 수 있으며,
앞으로 기능을 확장할 때도 각 계층을 독립적으로 개선할 수 있다.</p>
<blockquote>
<p><em>결국 중요한 것은 기술 스택이 아니라 <strong>구조화된 설계</strong>와 <strong>역할 분리</strong>다.</em></p>
</blockquote>
]]></description>
        </item>
        <item>
            <title><![CDATA[ FastAPI에서 동기 함수 비동기로 실행하기: 이벤트 루프 블로킹 방지]]></title>
            <link>https://velog.io/@nangman_ful/FastAPI%EC%97%90%EC%84%9C-%EB%8F%99%EA%B8%B0-%ED%95%A8%EC%88%98-%EB%B9%84%EB%8F%99%EA%B8%B0%EB%A1%9C-%EC%8B%A4%ED%96%89%ED%95%98%EA%B8%B0-%EC%9D%B4%EB%B2%A4%ED%8A%B8-%EB%A3%A8%ED%94%84-%EB%B8%94%EB%A1%9C%ED%82%B9-%EB%B0%A9%EC%A7%80</link>
            <guid>https://velog.io/@nangman_ful/FastAPI%EC%97%90%EC%84%9C-%EB%8F%99%EA%B8%B0-%ED%95%A8%EC%88%98-%EB%B9%84%EB%8F%99%EA%B8%B0%EB%A1%9C-%EC%8B%A4%ED%96%89%ED%95%98%EA%B8%B0-%EC%9D%B4%EB%B2%A4%ED%8A%B8-%EB%A3%A8%ED%94%84-%EB%B8%94%EB%A1%9C%ED%82%B9-%EB%B0%A9%EC%A7%80</guid>
            <pubDate>Tue, 02 Dec 2025 05:22:27 GMT</pubDate>
            <description><![CDATA[<h2 id="🔍-문제-상황">🔍 문제 상황</h2>
<p>FastAPI로 AI Agent를 호출하는 API를 개발하던 중, 성능 문제를 발견했습니다.</p>
<pre><code class="language-python"># ❌ 문제가 있는 코드
async def create_report(self, request: CreateReportRequest) -&gt; dict:
    analyzer = self._get_analyzer()

    # 동기 함수를 그냥 호출 → 이벤트 루프 블로킹!
    result = analyzer.analyze(
        target_location,
        building_info,
        asset_info,
        analysis_params
    )

    return result</code></pre>
<h3 id="문제점">문제점</h3>
<ol>
<li><strong>이벤트 루프 블로킹</strong>: <code>analyzer.analyze()</code>는 동기 함수인데, <code>async</code> 함수 내에서 직접 호출</li>
<li><strong>동시성 상실</strong>: 다른 요청들이 현재 분석이 끝날 때까지 대기해야 함</li>
<li><strong>응답 시간 증가</strong>: 분석에 30초 걸리면 다른 요청도 30초 이상 대기</li>
</ol>
<h3 id="왜-문제가-될까">왜 문제가 될까?</h3>
<p>FastAPI는 <strong>비동기 이벤트 루프</strong>를 사용합니다:</p>
<pre><code>[이벤트 루프]
  ├─ 요청 A 처리 (async)
  ├─ 요청 B 처리 (async)
  └─ 요청 C 처리 (async)</code></pre><p>하지만 동기 함수를 직접 호출하면:</p>
<pre><code>[이벤트 루프]
  ├─ 요청 A 처리 중... (30초 동기 작업 실행 중)
  │   ↓ 다른 요청들 모두 대기...
  │   ↓
  │   ↓ (30초 경과)
  ├─ 요청 B 처리 시작 (뒤늦게)
  └─ 요청 C 처리 시작 (더 늦게)</code></pre><h2 id="💡-해결-방법-threadpoolexecutor">💡 해결 방법: ThreadPoolExecutor</h2>
<p>동기 함수를 <strong>별도 스레드</strong>에서 실행하여 이벤트 루프를 블로킹하지 않도록 개선합니다.</p>
<h3 id="1-필요한-모듈-임포트">1. 필요한 모듈 임포트</h3>
<pre><code class="language-python">import asyncio
from concurrent.futures import ThreadPoolExecutor
from functools import partial</code></pre>
<h3 id="2-service-클래스에-threadpool-추가">2. Service 클래스에 ThreadPool 추가</h3>
<pre><code class="language-python">class ReportService:
    def __init__(self):
        self._analyzer = None
        self._report_results = {}
        # 최대 4개의 worker 스레드로 ThreadPool 생성
        self._executor = ThreadPoolExecutor(max_workers=4)</code></pre>
<p><strong><code>max_workers=4</code>의 의미:</strong></p>
<ul>
<li>최대 4개의 분석 작업을 <strong>동시에</strong> 처리 가능</li>
<li>CPU 코어 수와 메모리를 고려하여 조정 (권장: CPU 코어 수 × 2)</li>
</ul>
<h3 id="3-비동기-실행-패턴-적용">3. 비동기 실행 패턴 적용</h3>
<pre><code class="language-python">async def create_report(self, request: CreateReportRequest) -&gt; dict:
    analyzer = self._get_analyzer()

    # Language 파라미터 준비
    language = request.language.value if request.language else &#39;ko&#39;

    # ✅ 개선된 코드: 비동기 실행
    loop = asyncio.get_event_loop()

    # partial로 함수와 인자를 미리 바인딩
    analyze_func = partial(
        analyzer.analyze,
        target_location,
        building_info,
        asset_info,
        analysis_params,
        language=language
    )

    # ThreadPool에서 실행 (이벤트 루프는 블로킹되지 않음!)
    result = await loop.run_in_executor(self._executor, analyze_func)

    return result</code></pre>
<h2 id="🔧-핵심-개념-설명">🔧 핵심 개념 설명</h2>
<h3 id="1-asyncioget_event_loop">1. <code>asyncio.get_event_loop()</code></h3>
<p>현재 실행 중인 <strong>이벤트 루프 인스턴스</strong>를 가져옵니다.</p>
<pre><code class="language-python">loop = asyncio.get_event_loop()</code></pre>
<h3 id="2-functoolspartial">2. <code>functools.partial()</code></h3>
<p><strong>함수와 인자를 미리 결합</strong>하여 새로운 함수를 만듭니다.</p>
<pre><code class="language-python"># 원본 함수
def analyze(location, building, asset, params, language):
    ...

# partial로 인자를 미리 바인딩
analyze_func = partial(
    analyze,
    location_data,
    building_data,
    asset_data,
    params_data,
    language=&#39;ko&#39;
)

# 나중에 인자 없이 호출 가능
result = analyze_func()  # 위에서 바인딩한 인자들이 자동으로 전달됨</code></pre>
<p><strong>왜 필요한가?</strong></p>
<p><code>run_in_executor()</code>는 인자가 없는 callable을 받기 때문에, <code>partial</code>로 인자를 미리 묶어둬야 합니다.</p>
<pre><code class="language-python"># ❌ 이렇게는 안 됨
result = await loop.run_in_executor(
    executor,
    analyzer.analyze(location, building, ...)  # 즉시 실행되어 버림!
)

# ✅ partial로 감싸면 됨
analyze_func = partial(analyzer.analyze, location, building, ...)
result = await loop.run_in_executor(executor, analyze_func)</code></pre>
<h3 id="3-looprun_in_executorexecutor-func">3. <code>loop.run_in_executor(executor, func)</code></h3>
<p><strong>ThreadPool의 별도 스레드에서</strong> 함수를 실행하고, 완료될 때까지 <code>await</code>로 대기합니다.</p>
<pre><code class="language-python">result = await loop.run_in_executor(self._executor, analyze_func)</code></pre>
<p><strong>동작 원리:</strong></p>
<pre><code>[Main Thread - Event Loop]
  ├─ 요청 A 시작
  ├─ run_in_executor() 호출 → Worker Thread 1에 작업 전달
  ├─ 요청 B 시작 (블로킹 안 됨!)
  ├─ run_in_executor() 호출 → Worker Thread 2에 작업 전달
  └─ 요청 C 시작 (블로킹 안 됨!)

[Worker Thread 1]
  └─ analyzer.analyze() 실행 중... (30초)

[Worker Thread 2]
  └─ analyzer.analyze() 실행 중... (30초)</code></pre><h2 id="📊-성능-비교">📊 성능 비교</h2>
<h3 id="before-동기-호출">Before (동기 호출)</h3>
<pre><code>요청 1: ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ (30초)
요청 2:                               ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ (30초)
요청 3:                                                               ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ (30초)

총 소요 시간: 90초</code></pre><h3 id="after-비동기-실행">After (비동기 실행)</h3>
<pre><code>요청 1: ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ (30초)
요청 2: ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ (30초) ← 동시 실행!
요청 3: ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ (30초) ← 동시 실행!

총 소요 시간: 30초</code></pre><p><strong>3배 빠른 처리!</strong> (max_workers=4 기준, 4개까지 동시 처리 가능)</p>
<h2 id="🎯-실전-적용-팁">🎯 실전 적용 팁</h2>
<h3 id="1-worker-수-설정">1. Worker 수 설정</h3>
<pre><code class="language-python">import os

# CPU 코어 수에 따라 동적 설정
max_workers = min(32, (os.cpu_count() or 1) * 2)
self._executor = ThreadPoolExecutor(max_workers=max_workers)</code></pre>
<h3 id="2-executor-정리-graceful-shutdown">2. Executor 정리 (Graceful Shutdown)</h3>
<pre><code class="language-python">class ReportService:
    def __init__(self):
        self._executor = ThreadPoolExecutor(max_workers=4)

    def __del__(self):
        &quot;&quot;&quot;서비스 종료 시 ThreadPool 정리&quot;&quot;&quot;
        self._executor.shutdown(wait=True)</code></pre>
<h3 id="3-타임아웃-설정">3. 타임아웃 설정</h3>
<pre><code class="language-python">import asyncio

try:
    # 60초 타임아웃
    result = await asyncio.wait_for(
        loop.run_in_executor(self._executor, analyze_func),
        timeout=60.0
    )
except asyncio.TimeoutError:
    return {&quot;error&quot;: &quot;Analysis timeout&quot;}</code></pre>
<h2 id="🚨-주의사항">🚨 주의사항</h2>
<h3 id="1-gil-global-interpreter-lock">1. GIL (Global Interpreter Lock)</h3>
<p>Python의 GIL로 인해 <strong>CPU-bound 작업</strong>은 ThreadPool로 성능 개선이 제한적입니다.</p>
<ul>
<li><strong>I/O-bound 작업</strong>: ThreadPool 효과 큼 (네트워크 요청, 파일 읽기 등)</li>
<li><strong>CPU-bound 작업</strong>: ProcessPoolExecutor 사용 권장 (CPU 집약적 계산)</li>
</ul>
<p>우리의 경우는 LLM API 호출이 포함되어 <strong>I/O-bound</strong>이므로 ThreadPool이 효과적입니다.</p>
<h3 id="2-상태-공유-주의">2. 상태 공유 주의</h3>
<p>여러 스레드에서 동시에 접근하는 변수는 <strong>Thread-safe</strong>해야 합니다.</p>
<pre><code class="language-python"># ❌ 위험: 여러 스레드에서 동시 수정
self._report_results[report_id] = result

# ✅ 안전: Lock 사용
import threading

class ReportService:
    def __init__(self):
        self._lock = threading.Lock()
        self._report_results = {}

    async def create_report(self, request):
        result = await loop.run_in_executor(...)

        with self._lock:
            self._report_results[report_id] = result</code></pre>
<h2 id="📝-전체-코드">📝 전체 코드</h2>
<pre><code class="language-python">from concurrent.futures import ThreadPoolExecutor
from functools import partial
import asyncio

class ReportService:
    def __init__(self):
        self._analyzer = None
        self._report_results = {}
        self._executor = ThreadPoolExecutor(max_workers=4)

    def _get_analyzer(self):
        if self._analyzer is None:
            from ai_agent import SKAXPhysicalRiskAnalyzer
            from ai_agent.config.settings import load_config

            config = load_config()
            self._analyzer = SKAXPhysicalRiskAnalyzer(config)
        return self._analyzer

    async def create_report(self, request: CreateReportRequest) -&gt; dict:
        analyzer = self._get_analyzer()

        # 데이터 준비
        target_location = {...}
        building_info = {...}
        asset_info = {...}
        analysis_params = {...}
        language = request.language.value if request.language else &#39;ko&#39;

        # 비동기 실행
        loop = asyncio.get_event_loop()
        analyze_func = partial(
            analyzer.analyze,
            target_location,
            building_info,
            asset_info,
            analysis_params,
            language=language
        )
        result = await loop.run_in_executor(self._executor, analyze_func)

        return result

    def __del__(self):
        self._executor.shutdown(wait=True)</code></pre>
<h2 id="🎓-핵심-요약">🎓 핵심 요약</h2>
<table>
<thead>
<tr>
<th>Before</th>
<th>After</th>
</tr>
</thead>
<tbody><tr>
<td>동기 함수를 async 함수에서 직접 호출</td>
<td><code>run_in_executor()</code>로 별도 스레드 실행</td>
</tr>
<tr>
<td>이벤트 루프 블로킹 발생</td>
<td>이벤트 루프 블로킹 없음</td>
</tr>
<tr>
<td>요청들이 순차 처리</td>
<td>요청들이 동시 처리 (max_workers까지)</td>
</tr>
<tr>
<td>3개 요청 = 90초</td>
<td>3개 요청 = 30초 (3배 빠름)</td>
</tr>
</tbody></table>
<p><strong>핵심 패턴:</strong></p>
<pre><code class="language-python">loop = asyncio.get_event_loop()
func = partial(sync_function, arg1, arg2, ...)
result = await loop.run_in_executor(executor, func)</code></pre>
<p>이 패턴을 사용하면 FastAPI에서 동기 라이브러리를 사용하면서도 <strong>비동기의 장점</strong>을 그대로 누릴 수 있습니다! 🚀</p>
<hr>
<h2 id="📚-참고-자료">📚 참고 자료</h2>
<ul>
<li><a href="https://docs.python.org/3/library/asyncio.html">asyncio — Asynchronous I/O</a></li>
<li><a href="https://docs.python.org/3/library/concurrent.futures.html">concurrent.futures — Launching parallel tasks</a></li>
<li><a href="https://fastapi.tiangolo.com/async/">FastAPI - Async / Await</a></li>
</ul>
]]></description>
        </item>
        <item>
            <title><![CDATA[ESG 물리 기후 리스크 예측 프로젝트 - 백엔드_비동기]]></title>
            <link>https://velog.io/@nangman_ful/SKALA-%ED%8C%8C%EC%9D%B4%EB%84%90</link>
            <guid>https://velog.io/@nangman_ful/SKALA-%ED%8C%8C%EC%9D%B4%EB%84%90</guid>
            <pubDate>Mon, 01 Dec 2025 08:27:30 GMT</pubDate>
            <description><![CDATA[<h2 id="비동기async-작업-시스템의-문제들과-해결-전략-정리">비동기(Async) 작업 시스템의 문제들과 해결 전략 정리</h2>
<p>대규모 계산 작업(E, V, AAL 계산 등)과 일반 API 요청이 섞여 있는 시스템에서는 비동기 처리를 도입하는 순간 동기 방식에서는 절대 겪지 않는 새로운 문제들이 튀어나온다.</p>
<p>아래에서는 실제로 ModelOps·AI Agent 구조에서 공통으로 부딪히는 문제들을 나누어 설명하고, 각 문제를 어떻게 설계 레벨에서 해결해야 하는지 정리한다.</p>
<hr>
<h3 id="1-고전적-문제-api-요청과-heavy-계산이-섞이면서-생기는-병목">1. 고전적 문제: API 요청과 Heavy 계산이 섞이면서 생기는 병목</h3>
<p>비동기라 하더라도, 계산과 API 요청이 같은 프로세스 / 같은 워커에서 돌아가면
계산이 CPU를 오래 점유하며 API 응답이 밀리는 문제가 발생한다.</p>
<blockquote>
<p>E,V,AAL 계산이 8초 걸리는 동안
연속으로 들어오는 /health or /status API조차 응답이 늦어짐</p>
</blockquote>
<h4 id="해결-전략">해결 전략</h4>
<h5 id="1-계산-서버calc-service와-api-서버를-분리한다">1) 계산 서버(Calc Service)와 API 서버를 분리한다</h5>
<p>  API 서버는 &quot;job 생성&quot;과 &quot;결과 조회&quot;만 담당
  계산은 별도 FastAPI(혹은 워커)에서 수행</p>
<h5 id="2-compute-서버는-반드시-별도-쓰레드풀워커-구조로-운영">2) Compute 서버는 반드시 별도 쓰레드풀/워커 구조로 운영</h5>
<p>FastAPI BackgroundTask
Celery worker
RQ worker
multiprocessing 기반 worker</p>
<p>→ API 이벤트 루프와 계산 이벤트 루프를 절대로 공유하지 않는다.</p>
<hr>
<h3 id="2-중복-트리거-문제-double-trigger">2. 중복 트리거 문제 (Double Trigger)</h3>
<blockquote>
<p>“같은 사이트/같은 시나리오/같은 연도”에 대해
중복으로 계산 요청(job)이 들어오는 문제.</p>
</blockquote>
<h4 id="해결-전략-1">해결 전략</h4>
<h5 id="1-site_id-scenario-horizon-조합에-unique-제약-추가">1) (site_id, scenario, horizon) 조합에 Unique 제약 추가</h5>
<p>UNIQUE(site_id, scenario, horizon)</p>
<p>이러면 중복 요청이 들어오면:</p>
<p>기존 job 리턴
or
&quot;이미 계산 중&quot; status만 줘도 됨</p>
<h5 id="2-idempotency-token-패턴-도입">2) &quot;Idempotency Token&quot; 패턴 도입</h5>
<p>API 호출 시 idempotency_key를 받고
같은 키면 같은 job 반환.</p>
<hr>
<h3 id="3-비동기-처리에서-가장-흔한-문제-이-계산이-끝났다는-것을-어떻게-아는가">3. 비동기 처리에서 가장 흔한 문제: “이 계산이 끝났다는 것을 어떻게 아는가?”</h3>
<blockquote>
<p>LLM/Business 서버는 비동기 계산이 얼마나 걸릴지 모르고,
ModelOps는 계산 중인데, 프론트/Agent는 결과가 필요한 상황.</p>
</blockquote>
<h4 id="해결-전략-2">해결 전략</h4>
<h5 id="1-job-테이블을-도입">1) Job 테이블을 도입</h5>
<pre><code>job_id
  status = PENDING | RUNNING | COMPLETED | FAILED
  error_message
  started_at
  finished_at</code></pre><p>Agent는 다음 API만 사용:</p>
<p>POST /jobs → job 생성
GET /jobs/{id} → 상태 조회
GET /sites/{id}/results → 결과 조회</p>
<p>Agent는 절대 계산 결과를 ModelOps API에서 직접 받지 않는다.</p>
<hr>
<h3 id="4-작업-실패-처리-난이도-증가제">4. 작업 실패 처리 난이도 증가제</h3>
<p>동기 시스템은 실패하면 API가 500을 그냥 내보내버린다.
하지만 비동기 시스템에서는:</p>
<blockquote>
<p>워커가 실패했는지
실패했으면 재시도할 건지
실패 로그가 어딨는지
Agent가 실패를 사용자에게 어떻게 전달할지</p>
</blockquote>
<p>관리 포인트가 늘어난다.</p>
<h4 id="해결-전략-3">해결 전략</h4>
<p>1) Job 상태를 FAILED로 기록
2) error_message/log_url 저장
3) Agent는 status를 기준으로 리포트 처리</p>
<p>FAILED면 “이번 분석은 실패했습니다. 나중에 다시 시도하세요.”
COMPLETED면 결과 조회 후 LLM 리포트 생성</p>
<hr>
<h3 id="5-race-condition-경쟁-조건">5. Race Condition (경쟁 조건)</h3>
<p>A와 B 두 요청이 같은 job에 접근해서 다음과 같은 문제가 생김:</p>
<blockquote>
<p>A가 status를 RUNNING으로 변경
B도 거의 동시에 status를 RUNNING으로 변경
→ 중복 계산 발생
→ 결과가 엉킴</p>
</blockquote>
<h4 id="해결-전략-4">해결 전략</h4>
<p>1) DB level Lock 사용 (SELECT FOR UPDATE)
2) Job state transition 로직을 원자적으로 설계
3) Unique 제약과 Double-start 방지 로직 강화</p>
<hr>
<h3 id="6-작업-폭주-문제-동시에-너무-많은-작업이-들어오는-상황">6. 작업 폭주 문제: 동시에 너무 많은 작업이 들어오는 상황</h3>
<blockquote>
<p>10명만 써도 동시에 “후보지 8개”씩 계산하면
ModelOps에 80개 job이 쌓임 → CPU 폭주 → 전체 시스템 지연.</p>
</blockquote>
<h4 id="해결-전략-5">해결 전략</h4>
<h5 id="1-worker-concurrency-제한">1) Worker concurrency 제한</h5>
<pre><code>Celery → --concurrency=N
ThreadPool → max_workers 설정</code></pre><h5 id="2-job-큐-길이-제한">2) Job 큐 길이 제한</h5>
<p>Job을 넣기 전에 검증:</p>
<pre><code>if current_running_jobs &gt; MAX:
    return 429 Too Many Requests</code></pre><h5 id="3-job-우선순위priority-queue">3) Job 우선순위(Priority Queue)</h5>
<p>사업장 분석 &gt; 후보지 분석
최근 요청 &gt; 오래된 요청</p>
<hr>
<h3 id="7-결과-저장-전략의-혼란">7. 결과 저장 전략의 혼란</h3>
<p>비동기 시스템은 결과를 어디에 저장할 것인지도 난제다.</p>
<blockquote>
<p>잘못된 패턴:
결과를 계산 API 리턴으로 주려고 함
→ 비동기와 완전히 충돌</p>
</blockquote>
<h4 id="해결-전략-6">해결 전략</h4>
<p>E, V, AAL 결과는 반드시 DB(Site-level)로 저장해야 한다.</p>
<pre><code>site_results
--------------
site_id
scenario
horizon
e_score
v_score
aal_value
risk_index
updated_at</code></pre><p>Agent는 항상 DB만 읽는다.</p>
<hr>
<h3 id="정리-안전한-아키텍처-패턴">정리: 안전한 아키텍처 패턴</h3>
<pre><code>    [User]
        ↓
   [AI Agent API]
        ↓  (트리거)
POST /jobs
        ↓
   [ModelOps API]
        ↓  (백그라운드/워커)
   Compute Worker
        ↓
     [DB]
        ↑
[AI Agent] ← GET /jobs/{id}, GET /results</code></pre><h2 id="핵심-요약">핵심 요약</h2>
<blockquote>
<p>API와 계산이 섞임 -&gt; 서버 분리 / 워커 사용
중복 계산 -&gt; Unique key / idempotency
상태 추적 -&gt; Job 테이블 도입
실패 -&gt; FAILED 상태 관리
경쟁 조건 -&gt; Lock 사용
작업 폭주 -&gt; concurrency 제한
결과 저장 위치 -&gt;DB 고정</p>
</blockquote>
<h2 id="마무리">마무리</h2>
<p>비동기 작업 시스템은 성능 때문에 도입하지만,
성능보다 더 어려운 상태 관리와 일관성 문제가 새롭게 등장한다.</p>
<p>위의 문제와 해결책을 미리 고려하면
ModelOps–AI Agent–Backend가 섞인 복잡한 시스템에서도
안정적으로 “계산 요청 → 비동기 처리 → 결과 조회 → 보고서 생성”
전체 생산라인을 손대지 않고 확장할 수 있다.</p>
]]></description>
        </item>
        <item>
            <title><![CDATA[ESG 물리 기후 리스크 예측 프로젝트 - 대용량 데이터 사용]]></title>
            <link>https://velog.io/@nangman_ful/ESG-%EB%AC%BC%EB%A6%AC-%EA%B8%B0%ED%9B%84-%EB%A6%AC%EC%8A%A4%ED%81%AC-%EC%98%88%EC%B8%A1-%ED%94%84%EB%A1%9C%EC%A0%9D%ED%8A%B8</link>
            <guid>https://velog.io/@nangman_ful/ESG-%EB%AC%BC%EB%A6%AC-%EA%B8%B0%ED%9B%84-%EB%A6%AC%EC%8A%A4%ED%81%AC-%EC%98%88%EC%B8%A1-%ED%94%84%EB%A1%9C%EC%A0%9D%ED%8A%B8</guid>
            <pubDate>Fri, 21 Nov 2025 04:11:18 GMT</pubDate>
            <description><![CDATA[<h1 id="esg-물리-기후-리스크-예측-프로젝트">ESG 물리 기후 리스크 예측 프로젝트</h1>
<ul>
<li>TCFD 공시용 ESG 보고서에 기재될 기후 물리적 리스크에 대한 점수, 연평균자산손실률 그리고 보고서 작성</li>
</ul>
</br>
</br>
</br>

<h3 id="대용량-시계열-데이터-처리">대용량 시계열 데이터 처리</h3>
<blockquote>
<p>“대용량 시계열 데이터는 DB를 반복 조회하는 대신,
한 번만 로드해서 Staging 영역에 저장하고, 
파이프라인 내부에서는 <strong>Scratch Space(임시 디스크)</strong>에서 재사용하며,
TTL 기반 자동 정리로 디스크 부하를 방지하는 구조로 설계”</p>
</blockquote>
]]></description>
        </item>
    </channel>
</rss>