프로젝트/APM Web Server 개발

[트랜잭션, EM]네이티브 쿼리 임시 테이블 미적용 문제 (24.06.17)

블랑v 2024. 9. 26. 03:58

최근 프로젝트에서 JPA를 사용하여 네이티브 쿼리를 실행하는 도중 임시 테이블에 데이터가 삽입되지 않는 문제를 겪었다. 원인을 트러블슈팅한 결과를 여기에 정리한다.

문제 상황

JPA 네이티브 쿼리를 사용하여 임시 테이블에 데이터를 삽입하고 이를 기반으로 조회하는 로직을 구현했다.

하지만, 데이터가 임시 테이블에 삽입되지 않는 문제가 발생했다. 로그를 확인한 결과, 데이터가 삽입되는 과정에서는 문제가 없었지만 조회할 때는 데이터가 없는 것으로 나타났다.

@Service
@RequiredArgsConstructor
public class SpansServiceImpl implements SpansService {
    private final ResourceService resourceServiceImpl;
    private final JPAQueryFactory jpaQueryFactory;
    private static final QSpan qSpan = QSpan.span;
    private static final QTrace qTrace = QTrace.trace;
    private static final QTempTraceIds tempTraceIds = QTempTraceIds.tempTraceIds;
    private final TraceUtil traceUtil;

    @PersistenceContext
    private EntityManager entityManager;

		//..//
   
    @Override
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public List<SpanDto> getSpansUseTraceIdSet(List<UUID> inputTraceIdSet, Long minTime, Long maxTime) {
        try {
            Timestamp createdAtMin = new Timestamp(minTime / 1_000_000);
            Timestamp createdAtMax = new Timestamp(maxTime / 1_000_000);

            // 임시 테이블 생성
            entityManager.createNativeQuery("CREATE TEMPORARY TABLE temp_trace_ids (trace_id UUID)").executeUpdate();
            traceUtil.batchInsertTraceIds(inputTraceIdSet); // 임시 테이블에 데이터 배치 삽입

            BigInteger tempTableRowCount = (BigInteger) entityManager.createNativeQuery("SELECT COUNT(*) FROM temp_trace_ids").getSingleResult();
            System.out.println("임시 테이블의 데이터 개수: " + tempTableRowCount);

            // 네이티브 쿼리 실행
            String nativeQuery = "SELECT s.id, s.scope_name, cast(s.trace_id as varchar), s.span_id, s.parent_span_id, s.flag, s.name, " +
                    "s.kind, s.start_time_unix_nano, s.end_time_unix_nano, s.status_is_error, s.attributes, s.has_event, s.created_at " +
                    "FROM tbl_apm_trace_span s " +
                    "JOIN temp_trace_ids t ON s.trace_id = t.trace_id " +
                    "WHERE s.created_at BETWEEN :createdAtMin AND :createdAtMax";

            List<Span> results = entityManager.createNativeQuery(nativeQuery, "SpanMapping")
                    .setParameter("createdAtMin", createdAtMin)
                    .setParameter("createdAtMax", createdAtMax)
                    .getResultList();

            System.out.println("검증 로직 : =============");
            System.out.println("결과 크기 : " + results.size());

            // 결과를 SpanDto 리스트로 변환하여 반환
            return results.stream()
                    .map(SpanEach -> new SpanDto(SpanEach))
                    .collect(Collectors.toList());
        } catch (Exception e) {
            throw e;
        } finally {
            // 모든 작업이 끝난 후 임시 테이블 삭제
            entityManager.createNativeQuery("DROP TABLE temp_trace_ids").executeUpdate();
        }
    }
}

/**
 * Trace 레이어 비즈니스 로직 연산 시 공통적으로 사용하는 유틸 클래스
 */
@Component
@RequiredArgsConstructor
public class TraceUtil {

    private final DataSource dataSource;

    /**
     * 임시 테이블 사용하여 새롭게 생성 후 데이터 삽입
     * @param traceIds
     * @throws SQLException
     */
    public void batchInsertTraceIds(List<UUID> traceIds) throws SQLException {
        String createTableSQL = "CREATE TEMPORARY TABLE IF NOT EXISTS temp_trace_ids (trace_id UUID)";
        String insertSQL = "INSERT INTO temp_trace_ids (trace_id) VALUES (?)";
        try (Connection connection = dataSource.getConnection();
             PreparedStatement createTableStmt = connection.prepareStatement(createTableSQL);
             PreparedStatement insertStmt = connection.prepareStatement(insertSQL)) {

            connection.setAutoCommit(false);
            createTableStmt.executeUpdate(); // 임시 테이블 생성

            // 데이터 삽입
            for (UUID traceId : traceIds) {
                System.out.println("데이터 삽입 : " + traceId); //분명 루프가 돌며 데이터가 출력되고 있음
                insertStmt.setObject(1, traceId);
                insertStmt.addBatch();
            }

            insertStmt.executeBatch();
            connection.commit();

            // 삽입 후 데이터 개수 확인
            String countSQL = "SELECT COUNT(*) FROM temp_trace_ids";
            try (PreparedStatement countStmt = connection.prepareStatement(countSQL)) {
                ResultSet rs = countStmt.executeQuery();
                if (rs.next()) {
                    int rowCount = rs.getInt(1);
                    System.out.println("임시 테이블의 데이터 개수: " + rowCount);
                }
            }
        } catch (Exception e) {
            ErrorLogger.log(this.getClass().getName(), "batchInsertTraceIds");
            throw e;
        }
    }

}

문제 원인의 고찰

내 생각에 같은 트랜잭션 범위 안에 있으나, 'Connection'과 'EntityManager'의 차이가 아닌가 생각이 들었다.

  1. 현재 임시 테이블의 생성과 데이터 삽입은 'batchInsertTraceIds'의 'Connection' 객체에 의해 이루어지며, 이는 try-catch 블럭으로 묶여 있다. 즉, 이 메서드가 닫히는 순간 commit으로 인해 '세션'이 분리되는 것이 아닌가? 또한, Temporary Table은 Session 기준으로 테이블을 생성하며 종료한다.
  2. 핵심 로직인 'getSpansUseTraceIdSet'는 1과 달리, EntityManager의 네이티브 쿼리를 통해 조회를 실시한다. 즉, 1의 Connection과 다른 세션의 범위를 조회하는 것이 아닌가 싶은 것이다.

더 요약하자면 다음과 같다 :

  1. getSpansUseTraceIdSet의 EM을 통해 Temp table 생성
  2. batchInsertTraceIds의 Connection을 통해 데이터 삽입 후 검증(임시 데이터 크기가 검출되었음 .. 로그로 확인)
  3. 다시 getSpansUseTraceIdSet의 EM을 통해 table 조회 시 확인되지 않음(개수 0으로 나옴)
  • temp 테이블 자체는 조회가 되는데, 2의 삽입이 조회되지 않는 것일까? (테이블이 없다는 에러는 발생하지 않음)
  • 그러면 위의 내 요약에서 '2'번이 생략되었다는 것이 아닌가? EM에서는 1->3으로 넘어간 것이다.
  • EM.flush나, em.clear를 통해 db의 정보를 갱신받는다면 문제를 해결할 수 있을까?

해결 방안

EntityManager를 사용하여 임시 테이블 생성, 데이터 삽입 및 조회 작업을 모두 동일한 세션 내에서 수행하는 것이 중요하다. Connection을 사용하더라도 동일한 세션 내에서 작업을 수행하도록 하여야 한다.

일반적인 JPA나 QueryDSL 구문이었다면 EntityManager의 flush와 clear를 사용하여 영속성 컨텍스트를 강제로 동기화하고, 트랜잭션 범위를 유지할 수 있다. 하지만 Connection과 EntityManager가 동일한 세션을 사용한다는 보장이 없고, 네이티브 쿼리이기에 이 방법은 모호하다.

  • 동기화 문제: flush는 영속성 컨텍스트의 변경 사항을 데이터베이스에 동기화하지만, 이는 JPA 엔티티에 대한 변경 사항에만 적용된다. 네이티브 쿼리로 임시 테이블에 데이터를 삽입한 경우, flush는 이러한 변경 사항을 동기화하지 않는다.
  • 세션 분리 문제: flush와 clear는 영속성 컨텍스트의 동기화 및 초기화 작업을 수행하지만, 네이티브 쿼리로 작업한 Connection 객체와 EM이 다른 세션을 사용하는 경우, flush와 clear는 이러한 세션 분리를 해결하지 못한다.

Connection을 통해 동일한 세션 유지

EntityManager에서 Connection을 가져오는 것은 동일한 세션을 유지하는 데 효과적일 수 있다. EntityManager가 사용하는 실제 데이터베이스 커넥션을 가져와 이 커넥션을 통해 작업을 수행하면, EM과 동일한 세션 내에서 작업이 이루어지게 된다. 이렇게 하면 임시 테이블을 생성하고 데이터를 삽입한 후, 동일한 세션 내에서 조회할 수 있다.

이를 위해, 로직의 분리가 아닌, TimeUtil에서 기본적으로 Connection 내부 동일 세션을 유지하며 임시 테이블을 통한 조회 로직을 구현하도록 변경하였다.

    /**
     * Temp 임시 테이블 사용하여 내부 JOIN 사용 후 조회
     * @param inputTraceIdSet
     * @param minTime
     * @param maxTime
     * @return
     */
    @Override
    @Transactional(propagation = Propagation.REQUIRED)
    public List<SpanDto> getSpansUseTraceIdSet(List<UUID> inputTraceIdSet, Long minTime, Long maxTime) {
        List<SpanDto> result = new ArrayList<>();
        try {
            Timestamp createdAtMin = new Timestamp(minTime / 1_000_000);
            Timestamp createdAtMax = new Timestamp(maxTime / 1_000_000);
            //List<UUID>를 가지는 임시 테이블 생성
            result = traceUtil.batchInsertTraceIds(inputTraceIdSet, createdAtMin, createdAtMax, 0); // 임시 테이블에 데이터 배치 삽입

            for (SpanDto spanDto : result) {
                System.out.println("검증 : " + spanDto.getName());
            }

        } catch (SQLException | IOException e) {
            ErrorLogger.log(this.getClass().getName(), "getSpansUseTraceIdSet", e);
        }
        return result;
    }

/**
 * Trace 레이어 비즈니스 로직 연산 시 공통적으로 사용하는 유틸 클래스
 */
@Component
@RequiredArgsConstructor
public class TraceUtil {

    private final ObjectMapper objectMapper = new ObjectMapper();
    private final DataSource dataSource;

    /**
     * Set을 Batch 크기에 맞는 List로 잘라 할당하기
     * In 연산에서 사용
     */
    public <T> List<Set<T>> sliceSet(List<T> originalSet) {
        List<Set<T>> slicedSets = new ArrayList<>();
        Iterator<T> iterator = originalSet.iterator();

        while (iterator.hasNext()) {
            Set<T> batch = new HashSet<>();
            for (int i = 0; i < BATCH_SIZE && iterator.hasNext(); i++) {
                batch.add(iterator.next());
            }
            slicedSets.add(batch);
        }
        return slicedSets;
    }

    /**
     * 임시 테이블 사용하여 새롭게 생성 후 데이터 삽입
     * Connection 및 기본 JDBC 구현체 사용 (EM의 경우 성능 이슈 발생)
     * @param traceIds
     * @return
     * @throws SQLException
     */
    public <T> List<T> batchInsertTraceIds(List<UUID> traceIds, Timestamp createdAtMin, Timestamp createdAtMax, int type) throws SQLException, IOException {
        String createTableSQL = "CREATE TEMPORARY TABLE IF NOT EXISTS temp_trace_ids (trace_id UUID)";
        String insertSQL = "INSERT INTO temp_trace_ids (trace_id) VALUES (?)"; //임시 테이블
        List<T> result = new ArrayList<>(); //결과값
        try (Connection connection = dataSource.getConnection();
             PreparedStatement createTableStmt = connection.prepareStatement(createTableSQL);
             PreparedStatement insertStmt = connection.prepareStatement(insertSQL);
             PreparedStatement queryStmt = connection.prepareStatement(joinTempTableQuery[type])) {

            connection.setAutoCommit(false);
            createTableStmt.executeUpdate(); // 임시 테이블 생성

            // 데이터 삽입 후 Commit 시행
            for (UUID traceId : traceIds) {
                insertStmt.setObject(1, traceId);
                insertStmt.addBatch();
            }
            insertStmt.executeBatch();

            // 네이티브 쿼리로 조인 실행
            queryStmt.setTimestamp(1, createdAtMin);
            queryStmt.setTimestamp(2, createdAtMax);
            try (ResultSet rs = queryStmt.executeQuery()) {
                while (rs.next()) {
                    switch (type) {
                        case 0 : {
                            result.add((T) spanDtoMapper(rs));
                            break;
                        }
                        case 1 : {
                            result.add((T) traceDtoMapper(rs));
                            break;
                        }
                        default: {
                            ErrorLogger.log(this.getClass().getName(), "Native Query 시행 시 Type Input Param 유효하지 않음");
                        }
                    }
                }
            }
        } catch (Exception e) {
            ErrorLogger.log(this.getClass().getName(), "batchInsertTraceIds");
            throw e;
        }
        return result;
    }

    /**
     * 모듈화 위한 네이티브 쿼리 모음
     */
    private final String[] joinTempTableQuery = {
            // 0 : SpanDto
            "SELECT s.id, s.scope_name, cast(s.trace_id as varchar), s.span_id, s.parent_span_id, s.flag, s.name, " +
                    "s.kind, s.start_time_unix_nano, s.end_time_unix_nano, s.status_is_error, s.attributes, s.has_event, s.created_at " +
                    "FROM tbl_apm_trace_span s " +
                    "JOIN temp_trace_ids t ON s.trace_id = t.trace_id " +
                    "WHERE s.created_at BETWEEN ? AND ?",

            // 1 : TraceDto
            "SELECT tr.id, tr.agent_id, tr.span_trace_id, tr.span_start_time_unix_nano, tr.span_end_time_unix_nano," +
                    "tr.is_err, tr.resource_id, tr.created_at " +
                    "FROM tbl_apm_trace tr " +
                    "JOIN temp_trace_ids te ON tr.span_trace_id = te.trace_id " +
                    "WHERE tr.created_at BETWEEN :createdAtMin AND :createdAtMax"

            // for more ..
    };

    /**
     * ResultSet SpanDto Mapper
     */
    private SpanDto spanDtoMapper(ResultSet rs) throws SQLException, IOException {
        return new SpanDto(
                rs.getLong("id"),
                rs.getString("scope_name"),
                UUID.fromString(rs.getString("trace_id")),
                rs.getBytes("span_id"),
                rs.getBytes("parent_span_id"),
                rs.getInt("flag"),
                rs.getString("name"),
                rs.getInt("kind"),
                rs.getLong("start_time_unix_nano"),
                rs.getLong("end_time_unix_nano"),
                rs.getBoolean("status_is_error"),
                objectMapper.readTree(rs.getString("attributes")),
                rs.getBoolean("has_event")
                // rs.getTimestamp("created_at")
        );
    }

    /**
     * ResultMap TraceDto Mapper
     */
    private TraceDto traceDtoMapper(ResultSet rs) throws SQLException {
        return new TraceDto(
                rs.getLong("id"),
                UUID.fromString(rs.getString("span_trace_id")),
                rs.getLong("span_start_time_unix_nano"),
                rs.getLong("span_end_time_unix_nano"),
                rs.getBoolean("status_is_error"),
                rs.getLong("agent_id"),
                rs.getLong("resource_id")

        );
    }

}