📕 목차
1. Introduction
2. Reader
3. Page offset
4. Writer
5. Improved Performance
6. Additional Improvement
1. Introduction
📌 Goal
[Spring Boot] 정기 푸시 알림(Push Notification) 전송 배치(Batch) 프로세스
💡 문제가 되는 부분이 많고, Batch에 대해 미숙한 이해를 기반으로 작성한 글이므로 참고로만 읽어주세요.📕 목차1. Introduction2. Domain3. Infra4. Batch5. Discussion Topics1. Introduction 📌 Usecase사용자는
jaeseo0519.tistory.com
이전에 Spring Boot Batch 애플리케이션을 빠르게 공부해서 구현하는 데 성공하긴 했지만, 제대로 알고 사용한 게 아니다보니 찝찝하기 그지 없었다.
@JobScope와 @StepScope의 존재는 뒤늦게야 알게 되었고, JobParameters 때문에 매우 중요하다는데 그건 또 뭔데 🤗...?
여튼 이런 지식의 공백? 난 용납할 수 없다.
포트폴리오 빨리 업데이트 해야 하는데, 그딴 거 모르겠고 난 내가 모르는 걸 방치하는 걸 용서할 수 없다.
개념 정리는 다음 포스팅에서 할 예정.
spring-boot-starter-batch 3.3.0 버전 기준으로 작성했습니다.
도중에 테스트를 잘못 하는 바람에 처음부터 다시 다 하는 바람에, 미처 수정하지 않은 잘못된 결과 분석이 포함되어 있을 수 있습니다 ㅠㅠ
GitHub - psychology50/spring-batch-performance: 📌 Spring Boot Batch의 Performance 측정과 성능 개선
📌 Spring Boot Batch의 Performance 측정과 성능 개선. Contribute to psychology50/spring-batch-performance development by creating an account on GitHub.
github.com
📌 Why improve batch
원래는 Reader와 Writer 각각에 대해서 수행 시간을 측정하고, 개선하는 걸 보이려고 했는데
기존 방식을 실행해보니 데이터가 100,000개만 넘어도 끝날 생각을 안 한다. ㅎㅎ
아니, 애초에 Reader에서 더 이상 진전이 없었다.
급하게 구현을 하긴 했다지만 이 정도까지 심각할 줄은 몰랐는데.
여튼 이런 상황이다보니, 어디서부터 손대야 할 지 고민만 하다가 얼마 전에 나름 단위 테스트 코드 작성해본다고 난리 치면서, 재미삼아 page size를 조금씩 바꿔봤는데 배치 성능이 천차만별로 달라지는 것도 알았다.
(데이터의 크기에 따라 page size를 100, 300, 500, 800, 1000, 3000 이런 식으로 늘려봤는데, 수행 시간 차이가 너무 벌어짐)
확실한 건 실행했을 때, Reader가 느린 성능 원인 중 가장 큰 비중을 차지하고 있었다.
그래서 보다 구체적으로 분석하고, 내 상황에 맞게 Reader와 Writer를 개선하여 성능 향상을 한 과정을 기록하고자 했다.
📌 Chunk
다음 Step으로 넘어가기 전에, Batch를 처음 도입하면서 날 혼란스럽게 했던 Chunk의 개념을 짚어보자.
내용 겁나 쉬운데, 컴퓨터 메모리 구조 공부할 때 나왔던 Chunk가 튀어나오는가 싶어서 바짝 쫄았었다.
Batch에서 Chunk란 읽어 온 데이터들을 하나로 처리하는 트랜잭션 단위를 의미한다.
예를 들어, 1억건의 데이터에 대해 배치 작업을 수행할 때, 모든 데이터를 메모리에 적재하고 작업을 처리할 수는 없다.
따라서 이 데이터들을 Chunk라는 단위로 쪼개서 트랜잭션 작업을 수행해야 하는데, 실패할 경우에도 해당 Chunk만큼만 rollback되고, 이전의 commit 내역들은 반영이 된다.
위 그림에서 보다시피 Reader와 Processor에선 데이터를 1건씩 다루고, Writer에서 Chunk 단위로 처리한다.
2. Reader
📌 ItemReader
Spring Batch에서는 ItemReader 인터페이스를 제공해주는데, 문서에는 이렇게 적혀있다.
- 데이터 제공을 위한 전략 인터페이스
- ItemReader의 구현체는 stateful하고, 각 배치에서 여러번 호출될 수 있으며, 각 호출의 read()는 다른 값을 반환하다가 불러올 데이터가 소진되면 null을 반환한다.
- 구현체는 thread-safe할 필요 없으며, client가 주의해야 한다.
- read 호출자는 첫 번째 호출이 롤백되어도, 그 다음 성공적인 호출에 동일한 데이터를 받을 수 있어야 한다.
- read 메서드를 구현할 때, input data set의 마지막에는 반드시 null을 반환해야 한다.
물론 ItemReader 인터페이스를 직접 개발자가 구현할 일은 잘 없다.
이유를 알기 전에 ItemSream도 알아야 하기 때문에 잠시 패스
🤔 ListItemReader
![](https://blog.kakaocdn.net/dn/YtB3W/btsIG8nC0ek/D1Zdx8fQnR8A1k5fotj8w0/img.png)
/**
* An {@link ItemReader} that pulls data from a list. Useful for testing.
*
* <p>
* This reader is <b>not</b> thread-safe.
* </p>
*
* @author Dave Syer
* @author jojoldu
* @author Mahmoud Ben Hassine
*
*/
public class ListItemReader<T> implements ItemReader<T> {
private final List<T> list;
public ListItemReader(List<T> list) {
// If it is a proxy we assume it knows how to deal with its own state.
// (It's probably transaction aware.)
if (AopUtils.isAopProxy(list)) {
this.list = list;
}
else {
this.list = new LinkedList<>(list);
}
}
@Nullable
@Override
public T read() {
if (!list.isEmpty()) {
return list.remove(0);
}
return null;
}
}
ItemReader 인터페이스만을 구현한 매우 단순하기 그지없는 ListItemReader라는 클래스도 존재한다.
이는 읽어온 리스트 데이터를 모두 메모리에 올려두고 read() 메서드를 사용해 하나씩 읽어온다.
하지만 주석에도 나와있듯이, 실제 사용의 목적이라기 보단 테스팅 용도로 쓰이며 non-thread safe 하다는 점.
애초에 대용량 데이터를 처리하기 위한 Batch 시스템에서 모든 데이터를 메모리에 적재해둔다는 점부터 실용성을 위한 클래스가 아님을 알 수 있다.
만약에 사용할 거면, 데이터를 쪼개어 List로 가져온 다음, ListItemReader에 조금씩 올리는 방식을 채택해야 한다. (굳이?)
📌 ItemStream
/**
* <p>
* Marker interface defining a contract for periodically storing state and restoring from
* that state should an error occur.
* </p>
*
* @author Dave Syer
* @author Lucas Ward
* @author Mahmoud Ben Hassine
*
*/
public interface ItemStream {
/**
* Open the stream for the provided {@link ExecutionContext}.
* @param executionContext current step's
* {@link org.springframework.batch.item.ExecutionContext}. Will be the
* executionContext from the last run of the step on a restart.
* @throws IllegalArgumentException if context is null
*/
default void open(ExecutionContext executionContext) throws ItemStreamException {
}
/**
* Indicates that the execution context provided during open is about to be saved. If
* any state is remaining, but has not been put in the context, it should be added
* here.
* @param executionContext to be updated
* @throws IllegalArgumentException if executionContext is null.
*/
default void update(ExecutionContext executionContext) throws ItemStreamException {
}
/**
* If any resources are needed for the stream to operate they need to be destroyed
* here. Once this method has been called all other methods (except open) may throw an
* exception.
*/
default void close() throws ItemStreamException {
}
}
- ItemStream은 ItemReader의 상태를 저장하고, 오류가 발생하면 해당 상태에서 복원하기 위한 Marker Interface
- open(), close()로 stream을 열고 닫으며, update()로 batch 처리 상태를 업데이트 하면 된다.
개발자는 ItemReader와 ItemStream 인터페이스를 직접 구현하여 원하는 형태의 ItemReader를 만들 수도 있지만, Spring Batch에서 제공해주는 구현체들을 사용해도 무방하다.
🤔 언제 직접 구현해서 사용해야 할까?
Batch 5.0 이전까지는 JdbcItemReader를 많이 사용했는데, 해당 구현체의 문제는 jpa 영속성 컨텍스트의 지원을 받을 수 없다는 점이다.
즉, QueryDsl, Jooq 같은 조회 프레임워크를 사용하면 HibernateItemReader를 이용해 Reader 구현체를 직접 구현해야 했다.
그런데 5.0 이상부터는 HibernateItemReader가 deprecated 되었다.
거의 대부분 프레임워크 환경을 Batch가 사용가능하도록 만들어두었고, 애초에 Batch 사용하면서 Jpa의 영속화 기능을 사용할 일이 딱히 없기 때문인 듯하다.
하지만 만약 Reader 혹은 Write의 대상이 비표준 데이터 소스, 혹은 복잡한 데이터 처리 로직 등을 포함한다면, 직접 구현해야 할 필요가 있을 수도 있다. (아니면 사내 특수 라이브러리를 사용한 성능 최적화라던가,,,)
📌 Cursor vs Paging
Spring Batch은 JDBC 기능을 확장하여 Cursor와 Paging 방법을 제공하고 있다.
(Spring의 JdbcTemplate는 분할 처리를 지원하지 않기 때문에, 개발자가 직접 limit, offset 작업을 처리해야 하기 때문)
- Cursor
- JDBC ResultSet의 기본 기능
- ResultSet이 open 될 때마다 DB의 data 반환
- Database와 연결된 후 데이터를 Streaming하고, Cursor를 한 칸씩 옮기면서 지속적으로 데이터 전달
- Batch 작업이 끝날 때까지 Connection이 끊어져서는 안 되기 때문에, socketTimeOut을 충분히 큰 값으로 설정해야 한다.
- JdbcCursorItemReader, StoredProcedureItemReader, JpaCursorItemReader
- Paging
- Page 단위(page size)로 데이터를 한번에 조회
- 한 페이지를 읽을 땜다 Connection을 맺고 끊기 때문에 아무리 많은 데이터라도 타임아웃의 우려가 없다.
- JdbcPagingItemReader, JpaPagingItemReader, RepositoryItemReader
일반적으로 Cursor가 Paging보다 훨씬 빠르다.
Paging의 offset 문제를 어떻게 해결했다고 하더라도, DB와 한 번 Connection을 맺고 모든 작업을 처리해버리는 Cursor보다 빠를 수는 없다. (이론상..? 실제 테스트 해본 건 아니라 단언하기에 좀 무리가 있습니다.)
하지만 Cursor는 배치 작업 도중 socket 연결이 끊기면 안 되므로, 충분히 큰 값을 정해주어야 하는데 그만큼 DB Connection pool을 차지하는 의미가 된다.
너무 과하게 길면 다른 애플리케이션의 작업에 영향을 줄 수도 있고, 너무 적으면 배치 작업이 실패하는데 이를 적절하게 결정하기엔 내가 너무 미숙했다.
따라서 우리 프로젝트에선 속도를 조금 희생하고, 안전한 Paging 전략을 사용하고 있다.
물론, 해당 포스팅은 성능 테스트 목적이므로 둘 다 해볼 참이다.
✒️ Page size vs Chunk size
잘못된 내용들이 많이 퍼져있던데, page size와 chunk size가 의미하는 것은 다르다.
chunk size는 한 번에 처리할 작업량(트랜잭션)을 의미하고, page size는 한 번에 읽어올 Item의 양을 의미한다.
그 말은 즉슨, chunk size가 100이고 page size가 10이라면, 조회 10번 당 1번의 트랜잭션이 실행되어 chunk가 처리됨을 이야기 한다.
그런데 둘을 왜 같은 의미로 오해하는 사람이 많냐면, chunk size와 page size를 다르게 했을 때 문제가 발생할 여지가 많이 때문이다.
chunk size가 page size보다 너무 크면, 트랜잭션 한 번을 처리하기 위해 5번의 조회가 발생하여 성능 이슈가 발생할 수 있다.
반면, page size가 chunk size보다 크면, JPA를 사용할 때 영속성 컨텍스트가 깨질 수도 있다.
따라서 이유가 없다면 chunk size와 page size를 일치시키는 것을 권장한다.
📌 Cursor 기반
1️⃣ JdbcCursorItemReader
뭐가 엄청 많긴 한데, JdbcCursorItemReader빈을 등록할 땐 Builder를 사용하면 된다.
@Bean
@StepScope
public JdbcCursorItemReader<DeviceTokenOwner> jdbcCursorItemReader(DataSource dataSource) {
return new JdbcCursorItemReaderBuilder<DeviceTokenOwner>()
.name("jdbcCursorItemReader")
.dataSource(dataSource)
.sql("SELECT u.id, u.name, dt.token FROM device_token dt " +
"LEFT JOIN user u ON dt.user_id = u.id " +
"WHERE dt.activated = true AND u.account_book_notify = true " +
"ORDER BY u.id ASC")
// .rowMapper(new BeanPropertyRowMapper<>(DeviceTokenOwner.class))
// .beanRowMapper(DeviceTokenOwner.class)
.rowMapper((rs, rowNum) -> new DeviceTokenOwner(
rs.getLong("id"),
rs.getString("name"),
rs.getString("token")
))
.fetchSize(100)
.build();
}
- Spring Boot application 파일에서 datasource 관련 설정을 해줬다면, 알아서 적절하게 빈이 생성되어 있으므로 신경쓰지 않아도 된다.
- sql은 실행할 쿼리를 삽입하면 된다.
- rowMapper를 사용하면, 결과를 담을 Entity 혹은 Dto를 binding 할 수 있다.
- BeanPropertyRowMapper 방식을 사용할 거라면, .beanRowMapper()를 사용하면 끝난다. (생성자로 rowMapper()에 넣는 것과 다를 게 없다.)
- 하지만 Bean 주입 방식은 Dto의 불변식을 깨트리므로, Lambda를 사용해서 직접 값을 매핑해주면 생성자를 활용할 수 있다.
- fetchSize로 한 번에 가져올 데이터를 정의한다.
여기서 fetchSize가 정말 중요한데, 너무 작으면 성능이 저하되고 너무 크면 메모리에 부하가 걸려서 애플리케이션이 다운될 수도 있다.
아래는 테스트를 직접 수행해본 결과
@ParameterizedTest
@ValueSource(ints = {100, 1000, 10000, 100000})
@DisplayName("JdbcCursorItemReader 테스트")
void testJdbcCursorItemReader(int dataSize) throws Exception {
insertData(dataSize);
JdbcCursorItemReader<DeviceTokenOwner> itemReader = reader.jdbcCursorItemReader(dataSource);
itemReader.afterPropertiesSet();
itemReader.open(new ExecutionContext());
testItemReader(itemReader, "JdbcCursorItemReader");
}
![](https://blog.kakaocdn.net/dn/bBLV9R/btsIJzFuENE/KOXW7kLIh6N15bXQTkkPVK/img.png)
뭔가 내가 예상했던 대로 테스트 결과가 안 나왔다.
실제로는 fetch가 커질 수록 한 번에 불러오는 데이터가 많아서 실행속도가 빨라져야 하지만, 아무래도 auto increment 값을 초기화 해주지 않아서 상대적으로 불공정한 테스트가 수행됐을 수도 있을 것 같다.
(더 정확한 결과값을 얻고 싶다면, 테스트를 더 많이 해보고 평균치를 구해야만 한다.)
그래도 가장 큰 데이터 셋을 기준으론 fetch가 클 수록 빠른 성능을 보여주고 있음을 알 수 있다.
그렇다고 해서 무리하게 큰 값을 넣으면, 메모리가 감당하기 힘드므로 적절한 값을 선택해주어야 한다.
💡 Jdbc 기반 테스트 시 주의사항
Job 통합 테스트가 아니라서 그런지는 모르겠지만 .open()을 직접 해주지 않으면, "Reader Must be open before it can be read" 문구가 뜨면서 실패한다.
그리고 테스트 케이스에 @Transactional 어노테이션을 선언하면, 테스트 중 생성된 모든 데이터베이스 변경사항을 자동으로 롤백시킬 수 있는 편의성을 얻을 수 있다.
하지만 Jdbc 방식의 ItemReader를 사용할 땐, 해당 어노테이션을 사용하면 안 된다.
@Transactional에 의해 독립성을 지킬 수 있는 이유는 Tx 격리 수준(isolation Level)을 READ COMMITTED로 강제하기 때문인데, 이는 변경 사항이 commit 되기 전까지는 다른 Tx에서는 변경 사항을 읽을 수 없도록 한다.
문제는 JdbcItemReader는 별도의 DB 연결을 시도하여 데이터를 읽으려 하기 때문에 분명히 더미 데이터를 삽입했는데, 읽어온 데이터는 0개인 기이한 상황을 경험할 수 있다.
그래서 선언적 Tx를 제거하거나, 테스트 시 JdbcItemReader가 같은 Tx를 공유하도록 정의해야 한다.
2️⃣ StoredProcedureItemReader
StoredProcedureItemReader는 DB의 프로시저를 호출해서 데이터를 읽어오는 방식이다.
복잡한 로직을 DB 내부에서 처리하도록 캡슐화를 할 수 있지만, 역으로 따지면 DB에 강하게 종속된다.
해당 방법의 성능이 어느정도인지는 모르겠지만, 외부 Actor에 강하게 종속되는 것은 되도록 피하고 싶으므로 선택하지 않았다.
3️⃣ JpaCursorItemReader
❌ JpaCursorItemReader는 이유가 없다면 사용하지 말 것!
JpaCursorItemReader는 대용량 처리에 전혀 적합하지 않다.
왜냐하면, 이 ItemReader는 Cursor를 DB가 아닌 메모리에서 움직이기 때문이다.
그 말은 즉슨, DB의 모든 데이터를 메모리에 적재해두고 Cursor를 움직이면서 데이터를 읽어오는데
데이터가 1억개면 이걸 죄다 메모리에 올려두고 읽고 있는 셈이다.
내 서비스는 비록 데이터가 소규모긴 하지만, 안전성이 심각하게 훼손되는 해당 방식은 기각하였다.
📌 Paging 기반
1️⃣ JdbcPagingItemReader
@Bean
@StepScope
public JdbcPagingItemReader<DeviceTokenOwner> jdbcPagingItemReader(DataSource dataSource) {
SqlPagingQueryProviderFactoryBean factoryBean = new SqlPagingQueryProviderFactoryBean();
factoryBean.setDataSource(dataSource);
factoryBean.setSelectClause("SELECT u.id, u.name, dt.token");
factoryBean.setFromClause("FROM device_token dt LEFT JOIN user u ON dt.user_id = u.id");
factoryBean.setWhereClause("WHERE dt.activated = true AND u.account_book_notify = true");
factoryBean.setSortKey("u.id");
try {
return new JdbcPagingItemReaderBuilder<DeviceTokenOwner>()
.name("jdbcPagingItemReader")
.dataSource(dataSource)
.fetchSize(100)
.rowMapper((rs, rowNum) -> new DeviceTokenOwner(
rs.getLong("id"),
rs.getString("name"),
rs.getString("token")
))
.queryProvider(factoryBean.getObject())
.pageSize(100)
.build();
} catch (Exception e) {
log.error("Error creating jdbcPagingItemReader", e);
return null;
}
}
Jdbc 방식의 PagingItemReader는 Cursor와 달리 query를 생성하기 위해 SqlPagingQueryProviderFactoryBean을 사용한다.
사용법은 워낙 명시적이라서 그닥 어렵진 않은데, 서브 쿼리 같은 게 튀어나오면 또 얘기가 달라질 지도.
query 생성 방식 외에는 JdbcCursorItemReader와 거의 유사하다.
단, 이번엔 page size가 있는데 이 값이 성능에 많은 영향을 미친다. (Jdbc 방식 뿐만 아니라, 모든 Paging 기법의 ItemReader가 동일하다.)
@ParameterizedTest
@ValueSource(ints = {100, 1000, 10000, 100000})
@DisplayName("JdbcPagingItemReader 테스트")
void testJdbcPagingItemReader(int dataSize) throws Exception {
insertData(dataSize);
JdbcPagingItemReader<DeviceTokenOwner> itemReader = reader.jdbcPagingItemReader(dataSource);
itemReader.afterPropertiesSet();
itemReader.open(new ExecutionContext());
testItemReader(itemReader, "JdbcPagingItemReader");
}
![](https://blog.kakaocdn.net/dn/cUq5hN/btsIKDtZlkn/GxLIitGebpgqulM5Qwfiz0/img.png)
테스트를 할 때, 편의를 위해 page size와 fetch size를 동일하게 맞추고 진행했다.
page size가 100, 300, 500, 800일 때는 데이터 1,000,000개일 때 처리 속도가 너무 느려서 skip했다.
결과가...어째 내가 생각한 거랑 많이 달라서 당황스러운데 아무래도 fetch size가 문제가 아니었을까
2️⃣ JpaPagingItemReader
@Bean
@StepScope
public JpaPagingItemReader<DeviceTokenOwner> jpaPagingItemReader(EntityManagerFactory em) {
return new JpaPagingItemReaderBuilder<DeviceTokenOwner>()
.name("jpaPagingItemReader")
.entityManagerFactory(em)
.queryString("SELECT new com.test.batch.dto.DeviceTokenOwner(u.id, u.name, dt.token) " +
"FROM DeviceToken dt LEFT JOIN User u " +
"ON dt.user.id = u.id WHERE dt.activated = true AND u.notifySetting.accountBookNotify = true " +
"ORDER BY u.id ASC")
.pageSize(100)
.build();
}
밑에서 다시 이야기 할 거지만 쿼리를 넣을 때 queryString() 말고 다른 방법도 존재한다. (나중에 알았다.)
JPA 위에서 동작하긴 하지만, 실제 쿼리는 JPQL을 사용해서 동작한다.
메모리 사용량을 줄이고 싶다면, page를 읽은 후에 flushed and cleared 동작을 수행하라고 주의를 하고 있다.
(내부에 정의된 코드를 읽어보니 transaction 걸어두면 기본으로 수행한다.)
@ParameterizedTest
@ValueSource(ints = {100, 1000, 10000, 100000})
@DisplayName("JpaPagingItemReader 테스트")
void testJpaPagingItemReader(int dataSize) {
insertData(dataSize);
JpaPagingItemReader<DeviceTokenOwner> itemReader = reader.jpaPagingItemReader(entityManagerFactory);
itemReader.open(new ExecutionContext());
testItemReader(itemReader, "JpaPagingItemReader");
}
![](https://blog.kakaocdn.net/dn/dRRJAZ/btsIKffOkNn/cKhta0YA0jkxmG6bhHjfT0/img.png)
Jpa 위에서 동작하니까 open을 굳이 안 해도 되지 않을까 싶지만, query는 JDBC로 처리하기 때문에 직접 open 해주어야 한다.
page size에 대한 성능 차이도 극심한 것을 확인할 수 있다.Hibernate 기반으로 동작하기 때문인지, JdbcPaging보다 성능이 현저하게 저하된다.
3️⃣ RepositoryItemReader
RepositoryItemReader는 개발자가 더 이상 QueryDsl과 같은 프레임워크를 사용할 때, Hibernate 구현체를 직접 구현하지 않아도 되게끔 만든 클래스다.
@Bean
@StepScope
public RepositoryItemReader<DeviceTokenOwner> execute() {
return new RepositoryItemReaderBuilder<DeviceTokenOwner>()
.name("execute")
.repository(deviceTokenRepository)
.methodName("findActivatedDeviceTokenOwners")
.pageSize(300)
.sorts(new HashMap<>() {{
put("id", Sort.Direction.ASC);
}})
.build();
}
사용법은 위와 같이 사용할 수 있는데, deviceTokenRepository에 정의한 메서드 이름을 문자열로 넣어주면, 알아서 해당 Repository 구현체에서 메서드를 찾아내 쿼리를 생성한다.
결론부터 말하자면, RepositoryItemReader가 모든 Paging 기법의 ItemReader 중 최악의 성능을 보였다.
내가 처음 만든 Batch가 매우 오랜 수행 시간을 요구했던 common case 중 하나가 이 때문이었는데, JdbcRepositoryItemReader로 바꾸자마자 Reader 부분의 성능이 100,000배 빨라졌다. ㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋㅋ
물론 RepositoryItemReader 입장에서도 억울할만한 것이 하나 있는데, page size를 처음에 100밖에 주지 않은 탓도 있다.
주석에도 적혀있다시피, 해당 구현체는 Repository 구현체에 성능이 좌우되며, 큰 page size를 넣으면 개선할 여지가 있다고 이야기하고 있다.
하지만 다른 PagingReader의 경우, page size가 10배 정도 차이난다고 해서 극적인 성능 향상을 보여주진 않았는데 RepositoryItemReader는 그 정도가 심하다.
@ParameterizedTest
@ValueSource(ints = {100, 1000, 10000, 100000})
@DisplayName("RepositoryItemReader 테스트")
void testRepositoryItemReader(int dataSize) {
insertData(dataSize);
RepositoryItemReader<DeviceTokenOwner> itemReader = reader.execute();
testItemReader(itemReader, "RepositoryItemReader");
}
![](https://blog.kakaocdn.net/dn/UMckh/btsIIyUyS4x/6Zd2vx5lGOQOVc8EXnb2vk/img.png)
데이터가 1,000,000개일 때도 테스트하고 싶었는데, 너무 오래 걸려서 못 했다.
page size가 100일 땐 3시간이 지나도 안 끝나는 어메이징한 상황
차이가 너무 심해서 데이터가 1,000인 경우와 10,000일 때는 보이지도 않는다.
이 차이는 resultset의 크기가 클 수록 극명하게 나타난다.
비록 page size를 늘리면 많이 개선되긴 하지만, JdbcPagingItemReader와 비교했을 때 여전히 1,000배나 느린 퍼포먼스를 보여준다.
🤔 RepositoryItemReader는 왜 이렇게 느린걸까?
여기서부턴 내 추론이라 정확하진 않지만, 일단 가장 큰 이유는 Hibernate 기반의 Jpa Repository를 사용하기 때문이 아닐까 싶다.
우선 내부 구현은 다음과 같이 이루어져 있었다.
• doOpen(): 빈 메서드. Repository 구현체를 사용할 것이므로 사용하지 않음.
• doRead(): doPageRead()를 호출해 Page 정보를 반환받고, 빈 페이지가 나올 때까지 이동
• doPageRead(): Pageable 객체를 생성한 후, 메서드를 리플렉션으로 찾아 실행하고 결과를 Slicing
Hibernate를 사용하므로 Entity 영속성 관리, 캐싱, 지연 로딩 등의 부가 작업이 실행되어야 하며, 그 와중에 Query Dsl을 사용해 동적 쿼리를 생성하는데 많은 시간을 소요할 것이라 본다.
또한 method name을 문자열로 받은 후, 리플렉션으로 Repository의 메서드를 탐색하는 것도 있다고 생각하는데 리플렉션은 매우 편리한 기능이지만 느리다.
이러한 요인 때문에 가장 성능이 떨어지게 되진 않았을까 생각한다.
📌 개선 방향
우선 CursorItemReader 방식은 사용하지 않았다. JpaCursorItemReader는 논할 가치가 없고, JdbcCursorItemReader는 안전성 문제와 더불어 Native SQL을 사용해야 한다는 것이 꺼름칙했다.
(모든 쿼리를 문자열로 구현한다는 것은 상당한 휴먼 에러를 야기하고, 유지 보수를 어렵게 만든다.)
따라서 QueryDsl을 사용하면서, Paging 방식으로 ItemReader를 사용할 방법이 없을까 좀 더 고민해보았다.
사실 데이터가 백만 개도 채 안 될 거라, RepositoryItemReader의 page size를 1,000으로 늘리고 실행해도 현재 서비스에선 전혀 문제없이 동작한다.
다만 JpaPagingItemReader가 Native SQL만 지원해주는 게 뭔가 이상하다 싶어서 좀 더 들여다보았다.
아니나 다를까, queryProvider를 사용하면 동적으로 쿼리를 만들 수 있다고 한다.
/**
* <p>
* Interface defining the functionality to be provided for generating queries for use with
* JPA {@link ItemReader}s or other custom built artifacts.
* </p>
*
* @author Anatoly Polinsky
* @author Dave Syer
* @author Mahmoud Ben Hassine
* @since 2.1
*
*/
public interface JpaQueryProvider {
/**
* <p>
* Create the query object.
* </p>
* @return created query
*/
Query createQuery();
/**
* Provide an {@link EntityManager} for the query to be built.
* @param entityManager to be used by the {@link JpaQueryProvider}.
*/
void setEntityManager(EntityManager entityManager);
}
JpaQueryProvider는 단순한 인터페이스였고, 구현체는 총 2개가 있었다.
당연히 NativeQueryProvider는 아니겠지만, 그렇다고 NamedQueryProvider를 사용할 것도 아니다.
만약 이 방식을 사용하고 싶다면 AbstractJpaQueryProvider를 구현한 커스텀 Provider를 만들어주는 게 가장 좋다고 생각한다.
public class JpaQueryDslProvider<E> extends AbstractJpaQueryProvider {
private final JPAQuery<E> query;
public JpaQueryDslProvider(JPAQuery<E> query) {
this.query = query;
}
@Override
public Query createQuery() {
return query.createQuery();
}
@Override
public void afterPropertiesSet() throws Exception {
Assert.state(query != null, "Querydsl query must be set");
}
}
@Bean
@StepScope
public JpaPagingItemReader<DeviceTokenOwner> jpaPagingItemReader(EntityManagerFactory em) {
return new JpaPagingItemReaderBuilder<DeviceTokenOwner>()
.name("jpaPagingItemReader")
.entityManagerFactory(em)
.queryProvider(new JpaQueryDslProvider<>(
queryFactory
.select(
Projections.constructor(
DeviceTokenOwner.class,
user.id,
user.name,
deviceToken.token
)
)
.from(deviceToken)
.leftJoin(user).on(deviceToken.user.id.eq(user.id))
.where(deviceToken.activated.isTrue().and(user.notifySetting.accountBookNotify.isTrue()))
.orderBy(user.id.asc())
))
.pageSize(100)
.build();
사실 테스트 해보지 않아서 동작하지 않을 수도 있다. ㅎㅎ..;
JpaPagingItemReader는 내부적으로 JDBC를 사용하므로 성능도 빠르고, 동적 쿼리를 사용할 수도 있기 때문에 여기까지만 해도 충분한 성능 개선을 이룰 수 있을 것이다.
하지만 Paging 기법에는 고질적인 문제가 한 가지 있는데, 이는 Batch에서 제공해주는 PagingItemReader로는 개선할 수가 없다는 점이다.
3. Page offset
📌 MySQL Limit Offset
MySQL의 Limit Offset은 Page 단위로 데이터를 읽기에 아주 용이하지만, 태생적 성능 한계가 존재한다.
다음 페이지를 읽기 위해서는 앞서 데이터를 얼마나 건너뛸지에 대해 offset을 결정해야 하는데, 이 값이 커질 수록 성능이 저하된다.
offset이란 해당 값부터 읽겠다는 의미긴 하지만, jump가 아닌 1번부터 모두 읽고난 후에 offset부터 limit까지 데이터를 읽어오겠다는 연산을 수행한다.
그 말은 즉, data 개수가 무한히 많고, offset 1,000,000, limit 100이라면, 최종적으로 읽어야 하는 데이터의 수는 1,000,100개가 된다. (앞의 1,000,000개의 데이터는 그냥 버린다.)
📌 PageableExecutionUtils는 답이 될 수 있는가?
지난 번에는 이러한 문제를 조금이나마 해소해놓을까 싶어 PageableExecutionUtils를 적용했었는데, 완전히 잘못 생각하고 있던 것이 있었다.
해당 Util의 최적화 대상은 offset으로 발생하는 문제가 아니고, Pagination방식을 사용할 때, count 쿼리를 호출하는 문제를 최적화하기 위함이다. (그땐 너무 정신이 없어서 아무 생각이 없었던 걸지도)
재밌는 사실은 PagingItemReader를 만들 때, count 쿼리가 나갈 일이 없다는 점이다.
이유는 아래에서 분석한 JpaPagingItemReader의 내부 동작 방식을 읽어보면 이해할 수 있다.
어쨌든 offset 문제를 해결하기 위해선 다른 접근법이 필요하다.
📌 ZeroOffsetItemReader
💡 offset은 그대로 두고, pk 조건문을 추가하자!
Offset 문제를 해결하는데 가장 잘 알려진 방법이다.
기존에는 offset 만큼 건너뛸 줄 알았는데, 실제로는 그렇지 않았기 때문에 발생한 문제였다.
그렇다면 offset을 사용하지 않고 건너뛰게 만들면 되는 것이다.
일반적으로 pk를 사용한다.
모든 데이터를 pk 순으로 정렬해서 첫 번째 페이지를 읽은 후, 페이지의 가장 마지막 페이지의 pk까진 더 이상 읽을 필요가 없으므로 다음 page를 읽을 때는 다음과 같은 쿼리를 작성하면 된다.
SELECT *
FROM 테이블명
WHERE id > 이전_페이지_마지막_pk
OFFSET 0
LIMIT ?;
이 때 offset은 반드시 0으로 고정해야 한다.
WHERE 절에 의해 pk만큼 건너뛰었는데, offset이 0이 아니면 엉뚱한 데이터를 조회하게 되기 때문이다.
📌 번외. JpaPagingItemReader 파헤쳐 보기
아무래도 ZeroOffsetItemReader 방식의 ItemReader를 사용하려면 직접 구현할 필요가 있을 듯 한데, 그렇다면 이미 구현된 Reader가 어떤 식으로 동작하는 지 이해할 필요가 있었다.
그리고 그 희생양은 JpaPagingItemReader로 정했다. ㅎㅎ
처음엔 AbstractItemCountingItemStreamItemReader의 open()으로 DB 연결을 하면서 시작한다.
여기서 doOpen()을 시도하는데, 가장 마지막에 재정의된 곳이 JpaPagingItemReader다.
그냥 EntityManager 할당해주는 게 다라서 딱히 볼 건 없었다.
호출이 끝나면 다시 아래로 진행한다.
여기서 itemCount와 maxItemCount가 대체 뭔가 한참을 뒤졌는데, 내가 생각한 게 맞다면 두 변수가 갖는 역할은 각각 이러하다.
- maxItemCount: 처리할 최대 아이템 수
- itemCount: 현재까지 처리된 아이템 수
여기서 item이 대체 뭔지 진짜 감이 안 잡혀서 이해가 힘들었다.
처음에는 지금까지 처리한 record 수를 의미하는 건지, reader의 단위인 item을 말하는 건지 고민했었는데 디버깅하고 이해할 수 있었다.
open이 끝나면 이번엔 AbstractPagingItemReader의 doRead() 메서드가 실행된다.
처음 조회하는 것이므로 멤버 변수인 results는 당연히 null이고, page size는 내가 정해준 100인 상태다.
results가 없으므로 doReadPage()를 호출하는데, 해당 메서드는 JpaPagingItemReader로 넘어간다.
내부에선 tx 여부에 따라 분기 처리하는 로직과 Query를 생성하여 데이터 셋을 가져오는 로직을 수행한다.
doReadPage가 끝나면, page를 +1 해주는데 여기까진 디버깅 툴을 안 써도 이해할 수 있었다.
문제는 이 다음인데..
next에 current값을 복사하고, current는 1 증가시킨다.
그 다음 results.size()와 next를 비교하여, next가 작다면 results.get(next)를 반환...? 어디로 반환하는데??
혹시나 doRead가 있으니 read 메서드도 있나 싶어서 찾아보니 AbstractItemCountingItemStreamItemReader에 있었다...ㅎㅎ
현재까지 읽기 작업이 1번 실행됐으므로 currentItemCount는 1이 되는 것을 확인할 수 있다.
여기서 doRead의 결과로 result set의 next번 째 인덱스 데이터를 가져오는 이유가 뭔지 여전히 감이 오질 않아서, 이 뒤로 무지성 디버깅을 하다가 진짜 별 거 아닌 내용이었음을 깨달았다.
처음 doReadPage()가 실행된 후엔 results 변수에 size만큼 가져온 데이터들이 쌓여있을 텐데, 이걸 그냥 계속 가져오는 거였다..
무슨 소리냐면, Pagination 기법을 사용하여 데이터를 조회하니 데이터도 복수개를 받을 것이고, 그 결과인 컬렉션을 그대로 반환하는 줄 알았는데 그게 아니었던 것.
size가 100이고, result set을 100개 가져왔다면, current가 100이 될 때까지 results에 담긴 데이터를 하나씩 꺼내간다.
current가 100이면 DB에서 가져온 데이터를 모두 옮겼다는 의미가 되므로, 다시 doReadPage()를 호출한다.
currentItemCount는 처리한 record 개수가 맞았구나...😊
사실 이것만 보면 실제로 구현할 수 있는 걸까 싶지만, 대부분은 AbstractItemCountingItemStreamItemReader와 AbstractPagingItemReader가 이미 처리하고 있다.
나는 위 과정에 필요한 데이터 셋을 돌려주되, zero offset 방식으로 동작하는 ItemReader를 만들어주기만 하면 되는 것이다!
그리고 정말 재밌는 걸 알 수 있는데, Pagination에선 원래 전체 dataset을 파악하기 위한 count 쿼리를 필수로 요구한다.
그러나 Batch 에서 제공해주는 ItemReader들은 사실상 Slice처럼 동작한다.
📌 QueryDslZeroOffsetItemReader
JpaPagingItemReader의 doReadPage() 메서드에는 다음과 같이 offset을 정한다.
Query query = createQuery().setFirstResult(getPage() * getPageSize()).setMaxResults(getPageSize());
이건 뭐 내가 어떻게 수정할 수도 없다.
하지만 다른 말로 이야기 하면, 해당 쿼리가 생성되는 doReadPage() 메서드만 재정의한다면?
내가 원하는 결과를 얻을 수 있다는 이야기가 된다.
Spring Batch와 Querydsl | 우아한형제들 기술블로그
{{item.name}} Spring Batch와 QuerydslItemReader 안녕하세요 우아한형제들 정산시스템팀 이동욱입니다. 올해는 무슨 글을 기술 블로그에 쓸까 고민하다가, 1월초까지 생각했던 것은 팀에 관련된 주제였습
techblog.woowahan.com
혼자서 어느 정도 구현을 했었는데, 몇 가지 이해 안 가는 기능을 찾다가 이미 만들어진 코드를 봐버렸다.
심지어 내가 고려한 것보다 훨씬 더 많은 고민이 담겨있다. ㅜㅜ
일단 혼자 나름 고심하면서 투자한 시간이 아까워서 작성한 부분까지만이라도 남겨둘까 했지만,
마지막 pk 정보를 상태로 관리하던 내 코드 상에 결함을 찾아내는 게 어려워서 실제로 사용하긴 어렵다고 판단해서 지워버렸다.
다만, 위 코드의 문제점은 반드시 QEntity가 존재해야 한다는 점이었다.
나는 현재 응답을 Dto에 매핑하고 있기 때문에 QuerydslNoOffsetNumberOptions에 `user.id`를 field로 던져주면 필드 탐색에 실패한다.
문제가 발생하는 지점은 getFieldValue()인데, 다음과 같이 정의되어 있다.
protected Object getFiledValue(T item) {
try {
Field field = item.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
return field.get(item);
} catch (NoSuchFieldException | IllegalAccessException e) {
logger.error("Not Found or Not Access Field= " + fieldName, e);
throw new IllegalArgumentException("Not Found or Not Access Field");
}
}
Field 정보를 reflection으로 가져오는데, 사실 실패 지점은 여기가 맞지만 근본적인 이유는 아니다.
public QuerydslNoOffsetOptions(@Nonnull Path field,
@Nonnull Expression expression) {
String[] qField = field.toString().split("\\.");
this.fieldName = qField[qField.length - 1];
this.expression = expression;
if (logger.isDebugEnabled()) {
logger.debug("fieldName= " + fieldName);
}
}
진짜 실패 이유는 생성자 쪽에 있다.
fieldName를 유효한 QClass로 부터 추출하여 정의하는데, Path 정보로 `user.id`를 넘기면 fieldName은 `id`가 된다.
getFieldValue()는 마지막으로 가져온 pk를 currentId로 할당하기 위해 호출하는데, DeviceTokenOwner DTO에는 id라는 필드가 없으므로 "Not Found" 조건에 걸리게 되는 것이다.
실제로 fieldName을 "userId"로 강제하면, 테스트가 정상적으로 동작함을 확인할 수 있다.
물론 QEntity를 생성하지 않을 Dto에 대한 Option을 별도로 만들어주는 것도 방법이 될 수 있지만, 보다 간단하게 해결하는 방법이 있다.
QuerydslNoOffsetNumberOptions가 갖는 field는 정렬 조건을 위함이고, QuerydslNoOffsetOptions의 fieldName은 마지막 pk를 타겟팅하기 위함이다.
지금 발생하는 이슈는 정렬 조건으로 갖는 이름과 타겟팅할 필드명이 동일함을 전제로 하기 때문에 발생하는 문제다.
(물론 이게 일반적이고 내가 이상하게 쓰는 게 맞다. ㅎㅎ)
public QuerydslNoOffsetNumberOptions(@Nonnull NumberPath<N> field,
@Nonnull Expression expression,
String idName) {
super(idName, expression);
this.field = field;
}
그렇다면 생성자를 하나 더 정의해서, QuerydslNoOffsetOptions가 사용할 field name과 QuerydslNoOffsetNumberOptions가 사용할 field를 다르게 넘겨주면 된다.
짜잔. 성공적으로 동작함을 확인할 수 있다.
실제로 내가 사용할 때는 public 생성자를 제공해주지 않고 정적 패토리 메서드를 사용하고 있다.
📌 성능 평가
offset이 바뀌지 않는 사진을 찍어놨었는데, 포스팅 불러오기 했더니 이 아래로 작성한 내용이 모두 날아갔다. ㅎㅎㅎㅎㅎ
여튼 QuerydslNoOffsetItemReader는 비록 Jdbc보다는 느리지만,
쿼리가 문자열에 종속되는 문제를 피하면서 높은 안정성을 보장할 수 있음을 보여준다.
📌 Query Optimization
가능하다면 Join을 완전히 없애는 방향으로 개선해보고 싶었지만, 그렇게 하려면 테이블을 추가하거나 비정규화를 하는 방법밖에 없었다.
관리할 테이블의 증가는 부수적인 side effect를 유발할 가능성만 커질 듯 해서 join 자체를 없애지는 못 했다.
하지만 굳이 LEFT JOIN이어야 할까?
성능적으로 INNER JOIN은 언제나 LEFT JOIN보다 빠르기 때문에 같은 결과를 반환한다면 INNER JOIN을 사용하는 것이 좋다.
user와 device_token 사이의 관계를 생각해보자.
현재는 user 정보가 없어도 device_token 정보가 있다면, user 정보들은 null로 join 된다.
하지만 푸시 알림을 보내기 위해선 user와 device_token이 모두 유효해야 한다.
user 데이터는 있는데 유효한 device_token만 있거나, (이런 경우는 있을 수 없지만)device_token은 있는데 user 데이터가 없는 경우는 일절 고려하지 않아도 된다.
따라서 쿼리를 INNER JOIN으로 바꾸어도 문제가 없음을 알 수 있기 때문에 join 방식을 수정했다.
4. Writer
📌 Query Optimization
SELECT u.id, u.name
FROM user u
WHERE u.id IN (1, 2, 3, 4, 5, 6, 7, 8, 9)
AND NOT EXISTS (
SELECT n.receiver
FROM notification n
WHERE n.receiver = u.id
AND n.created_at >= CURDATE() AND n.created_at < CURDATE() + INTERVAL 1 day
AND n.type = 0 AND n.announcement = 1
);
INSERT 문에 사용할 SELECT로 위와 같은 쿼리를 사용했다.
하지만 가독성 측면에서 별로 마음에 들지 않았다.
-> Nested loop antijoin (cost=20.52 rows=9) (actual time=0.136..0.136 rows=0 loops=1)
-> Filter: (u.id in (1,2,3,4,5,6,7,8,9)) (cost=10.66 rows=9) (actual time=0.059..0.068 rows=9 loops=1)
-> Index range scan on u using PRIMARY (cost=10.66 rows=9) (actual time=0.058..0.065 rows=9 loops=1)
-> Filter: ((n.created_at >= <cache>(curdate())) and (n.created_at < <cache>((curdate() + interval 1 day))) and (n.`type` = 0) and (n.announcement = 1)) (cost=1.01 rows=1) (actual time=0.007..0.007 rows=1 loops=9)
-> Index lookup on n using FKhivcjl7a0kx0owwuy2api09i7 (receiver=u.id) (cost=1.01 rows=1) (actual time=0.006..0.006 rows=1 loops=9)
실제 실행 전략을 분석해보니 결과는 이러했다.
- 전체 실행 시간: 0.136ms
- Nested loop antijoin 사용
- user 테이블에서 9개 행을 읽음 (0.068ms 소요)
- notification 테이블에 대해 9번의 루프를 돌며 각각 1행씩 읽음 (각 루프당 0.007ms 소요)
- 최종적으로 0개 행 반환
이런 개떡같은 Query를 던져줘도, 찰떡같이 최적화를 해주는 MySQL..
user.id는 PK를 사용하고 있으므로 index range scan을 하고 있고, n.id도 type이 ref이므로 제법 빠른 탐색 속도를 보여준다.
하지만 가독성 문제를 고려해보면 아래 쿼리를 고려해볼 수도 있다.
SELECT NOW(), NOW(), u.id, u.name
FROM user u
LEFT JOIN notification n ON u.id = n.receiver
AND n.created_at BETWEEN CURDATE() AND CURDATE() + INTERVAL 1 DAY - INTERVAL 1 SECOND
AND n.type = 0
AND n.announcement = 1
WHERE u.id IN (1, 2, 3, 4, 5, 6, 7, 8, 9)
AND n.id IS NULL;
-> Filter: (n.id is null) (cost=20.51 rows=1) (actual time=0.125..0.125 rows=0 loops=1)
-> Nested loop antijoin (cost=20.51 rows=1) (actual time=0.124..0.124 rows=0 loops=1)
-> Filter: (u.id in (1,2,3,4,5,6,7,8,9)) (cost=10.66 rows=9) (actual time=0.054..0.068 rows=9 loops=1)
-> Index range scan on u using PRIMARY (cost=10.66 rows=9) (actual time=0.053..0.065 rows=9 loops=1)
-> Filter: ((n.created_at between <cache>(curdate()) and <cache>(((curdate() + interval 1 day) - interval 1 second))) and (n.`type` = 0) and (n.announcement = 1)) (cost=1.00 rows=0) (actual time=0.006..0.006 rows=1 loops=9)
-> Index lookup on n using FKhivcjl7a0kx0owwuy2api09i7 (receiver=u.id) (cost=1.00 rows=1) (actual time=0.005..0.005 rows=1 loops=9)
- 전체 실행 시간: 0.125ms
- Nested loop antijoin 사용 후 추가 필터링
- user 테이블에서 9개 행을 읽음 (0.068ms 소요)
- notification 테이블에 대해 9번의 루프를 돌며 각각 1행씩 읽음 (각 루프당 0.006ms 소요)
- 최종적으로 0개의 행 반환
쿼리도 보다 명확해지고, 실행 속도도 아주 미세하게 빨라졌다!
MySQL의 최적화 덕에 실행 계획은 비슷하지만, 실제 동작은 조금 차이가 발생하는 것을 확인할 수 있었다.
하지만 실제 IN절에는 최대 1,000개의 값이 삽입되는데, 양을 더 늘려도 비슷할까?
-> Nested loop antijoin (cost=107.14 rows=47) (actual time=0.191..0.191 rows=0 loops=1)
-> Filter: (u.id in (1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,50,51,52,53,54,100)) (cost=55.60 rows=47) (actual time=0.025..0.071 rows=47 loops=1)
-> Index range scan on u using PRIMARY (cost=55.60 rows=47) (actual time=0.023..0.063 rows=47 loops=1)
-> Filter: ((n.created_at >= <cache>(curdate())) and (n.created_at < <cache>((curdate() + interval 1 day))) and (n.`type` = 0) and (n.announcement = 1)) (cost=1.00 rows=1) (actual time=0.002..0.002 rows=1 loops=47)
-> Index lookup on n using FKhivcjl7a0kx0owwuy2api09i7 (receiver=u.id) (cost=1.00 rows=1) (actual time=0.002..0.002 rows=1 loops=47)
-> Filter: (n.id is null) (cost=107.14 rows=5) (actual time=0.248..0.248 rows=0 loops=1)
-> Nested loop antijoin (cost=107.14 rows=5) (actual time=0.247..0.247 rows=0 loops=1)
-> Filter: (u.id in (1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,50,51,52,53,54,100)) (cost=55.60 rows=47) (actual time=0.027..0.096 rows=47 loops=1)
-> Index range scan on u using PRIMARY (cost=55.60 rows=47) (actual time=0.026..0.085 rows=47 loops=1)
-> Filter: ((n.created_at between <cache>(curdate()) and <cache>(((curdate() + interval 1 day) - interval 1 second))) and (n.`type` = 0) and (n.announcement = 1)) (cost=1.00 rows=0) (actual time=0.003..0.003 rows=1 loops=47)
-> Index lookup on n using FKhivcjl7a0kx0owwuy2api09i7 (receiver=u.id) (cost=1.00 rows=1) (actual time=0.003..0.003 rows=1 loops=47)
IN 절에 들어갈 값을 47개로 늘리자 첫 번째 쿼리가 미세하게 더 빨라졌다.
정확한 이유는 잘 모르겠으나, 두 번째 쿼리의 `n.id is null`이 미세한 오버헤드를 발생시키기 때문인 듯하다.
가독성만 따지면 두 번째 쿼리를 사용해야 하겠지만, IN 절의 값이 더 많아지면 성능 차이가 더 심해질 수도 있으므로 쿼리를 수정하지는 않았다.
📌 Batch Insert
대용량 데이터를 삽입함에 있어 JPA 환경은 유용한 전략이 될 수 없다.
이전 포스트에서도 언급했 듯, Hibernate는 IDENTITY 전략의 PK에 대해 Batch insert를 수행할 수 없으며
대용량 데이터 삽입 과정에 영속성 컨텍스트나 dirty checking같은 기능이 필요할까?
이러한 관점에서 푸시 알림 데이터를 삽입할 때는 JdbcTemplate를 사용했었다.
@Slf4j
@Repository
@RequiredArgsConstructor
public class NotificationCustomRepositoryImpl implements NotificationCustomRepository {
private final JdbcTemplate jdbcTemplate;
private final int BATCH_SIZE = 1000;
@Override
public void saveDailySpendingAnnounceInBulk(List<Long> userIds, Announcement announcement) {
int batchCount = 0;
List<Long> subItems = new ArrayList<>();
for (int i = 0; i < userIds.size(); ++i) {
subItems.add(userIds.get(i));
if ((i + 1) % BATCH_SIZE == 0) {
batchCount = batchInsert(batchCount, subItems, NoticeType.ANNOUNCEMENT, announcement);
}
}
if (!subItems.isEmpty()) {
batchInsert(batchCount, subItems, NoticeType.ANNOUNCEMENT, announcement);
}
log.info("Notification saved. announcement: {}, count: {}", announcement, userIds.size());
}
private int batchInsert(int batchCount, List<Long> userIds, NoticeType noticeType, Announcement announcement) {
String sql = "INSERT INTO notification(id, read_at, type, announcement, created_at, updated_at, receiver, receiver_name) " +
"SELECT NULL, NULL, ?, ?, NOW(), NOW(), u.id, u.name " +
"FROM user u " +
"WHERE u.id IN (?) " +
"AND NOT EXISTS ( " +
" SELECT n.receiver " +
" FROM notification n " +
" WHERE n.receiver = u.id " +
" AND n.created_at >= CURDATE() " +
" AND n.created_at < CURDATE() + INTERVAL 1 DAY " +
" AND n.type = ? " +
" AND n.announcement = ? " +
")";
jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
ps.setString(1, noticeType.getCode());
ps.setString(2, announcement.getCode());
ps.setLong(3, userIds.get(i));
ps.setString(4, noticeType.getCode());
ps.setString(5, announcement.getCode());
}
@Override
public int getBatchSize() {
return userIds.size();
}
});
userIds.clear();
return ++batchCount;
}
}
Batch Size는 Chunk Size와 동일하게 1,000으로 수정했고, 코드는 정상적으로 동작했다.
여기까지만 해도 Writer는 더 이상 성능을 개선할 여지가 없다고 생각했는데, 치명적인 오류를 범하고 있었다.
📌 rewriteBatchedStatements=true
JdbcTemplate의 Batch Insert 구현시, rewriteBatchedStatements 옵션을 true로 설정하여 성능 문제 해결
Spring Batch Application과 JPA, JDBC를 함께 사용하여 Batch 작업을 처리해야 하는 상황이 있었습니다. 이때 MySQL Connector/J (JDBC Reference) - Configuration Properties for Connector/J 에서 제공하는 기능들을 몰랐기에,
hyos-dev-log.tistory.com
나도 로그 캡쳐해둔 거 있었는데, 포스팅이 도중에 다 날아가버려서 참고했던 블로그 링크로 대체했다.
JDBC 연결을 할 때 url 쿼리 파라미터로 rewriteBatchedStatements=true를 추가해주지 않으면,
jpa repository의 saveAll()과 똑같이 동작한다는 충격적인 사실을 알게 되었다.
이걸 어떻게 알았냐 하면, 테스트를 위해 데이터셋을 삽입하는데 너무 느려서 로그를 출력해보니, 데이터를 한 개씩 삽입하고 있었다.
위 옵션을 추가해야 INSERT의 VALUES가 묶여서 하나의 쿼리로 실행된다.
난 지금까지 그것도 모르고 원효대사 해골 배치 삽입 연산을 하고 있던 거였다..🤦♂️
5. Improved Performance
📌 Reader
- 성능 개선 비율 = (기존 실행 시간 - 개선된 실행 시간) / 기존 실행 시간 * 100%
- 기존 실행 시간: 253,170ms
- 개선된 실행 시간: 13,725ms
- 성능 개선 비율 ≈ 1,744%
(이거 맞나..? 내가 알던 성능 계산식이 잘못 된 줄 알았는데 맞는데...ㅎ)
기존 실행 시간을 측정할 때, 엄밀히 따지면 page size를 100으로 잡아야 하지만
그렇게 되면 속도를 10,000,000배 개선시킨 포스팅이 되어버린다.
너무 사기꾼 같아서 chunk size와 page size 그리고 batch size는 모두 1,000으로 맞춰 주었다.
이렇게 해도 데이터 100,000개에 대해 성능이 94.59%나 빨라졌음을 알 수 있다.
📌 Job
기존의 Job과 개선한 Job의 성능 비교를 위해, 우선 기존 Job을 실행시켰는데 12시간이 지나도 안 끝나길래 중단시켜버렸다.
그래서 12h를 milliseconds로 변환한 43,200,000ms가 소요된다고 적어놨지만, 실제로는 더 많이 걸릴 것으로 본다.
개선된 Job은 고작 12min만에 1,000,000개의 데이터에 대한 작업을 수행했다.
사용자는 모두 푸시 알림을 허용해놓은 데이터만을 사용했기 때문에, 유효한 device token을 가진 모든 user에게 notification 데이터가 삽입되었다는 것 또한 확인할 수 있었다.
원래는 then 절에 작성했어야 했지만, 잊어먹었다 ㅎ
📌 결론
원래 이렇게까지 분석할 생각은 없었는데, 하다보니 너무 재밌어서 내용이 자꾸 추가되었다.
성능 개선도 개선인데, QueryDsl로 쿼리를 생성하기 위한 ItemReader 코드도 얻을 수 있고 너무 좋았다. ㅎㅎ
몇 가지 아쉬운 점이 아직 남아 있긴 하다.
- 더 쿼리를 최적화할 수 있지는 않았을까?
- 현재 책정해둔 Chunk Size가 프리티어 EC2에서도 버틸 수 있을까?
- 운영환경에선 Batch를 실행하기 위해, @Scheduled가 아닌 shell 환경에서 cron을 사용한다던데 job parameter에 대해 더 깊은 이해가 필요할 듯
그래도 Batch에 대해 하나도 모르던 상태에서 이만큼 공부하고, 성능까지 개선시켜볼 수 있어서 일단 만족!
5. Additional Improvement
📌 데이터 1억 개 테스트
윈도우 서버 2016에서 MySQL Innodb_buffer_pool_size 설정하기 - SQL 서버 성능 향상
0. MySQL에서 Innodb_buffer_pool_size란? Innodb_buffer_pool_size는 초기에 8M로 설정되어 있습니...
blog.naver.com
그냥 갑자기 궁금해졌다. 내 Item Reader..과연 데이터 1억개가 있다면 어떨까?
정확하게는 user 데이터 1억 개와 device_token 3천 8백만 개였다.
호기심을 충족시키기 위해서 위 블로그를 참고해 MySQL의 pool size를 늘려서 batch로 더미 데이터를 삽입했다.
(device token도 1억 개로 맞추고 싶었는데, 진짜 데이터 삽입 너무 오래 걸려서 나중에 하기로 했다. ㅎㅎ..)
그런데 웬걸, 성능 평가는 커녕 쿼리가 하나 나가고 동작이 완전히 멈춰버렸다.
문제의 원인은 QuerydslNoOffsetItemReader에서 currentId과 lastId를 확보하기 위해, 가장 처음 min, max 함수를 호출하는 것에 있었다.
단일 테이블에서 pk를 사용해 min, max 함수를 사용한다면 문제가 없다.
다만 나는 코드 설계자의 의도와 다르게 JOIN을 하고 있기 때문에 모든 데이터를 합치고 있으며, 더 심각한 점은 Join 후엔 더 이상 user.id가 PK 인덱스를 타지 않게 되므로 성능이 급격하게 저하된 것이다.
📌 join을 없앨까?
이젠 정말 Join을 놓아주어야 할 때가 된 걸까..?
사실 아이디어가 아예 없던 건 아니었다.
그저 다른 기능도 개발해야 하는 시점에 불필요한 경우의 수까지 고려한다는 게 오버 엔지니어링이라 판단했기 때문에 패스했을 뿐. (애초에 device_token 백만 개는 쌓일 일이 있을까 🥲)
1️⃣ Reader 나누고, 유효한 사용자 정보 캐싱
한 번에 데이터를 조회하려 하지 않고, Device Token을 chunk size만큼 조회한 후, user_id를 추출한다.
user_id 목록을 사용해 푸시 알림을 허용한 사용자 정보를 조회하여 캐싱한 후, Device Token 정보와 캐싱된 사용자 정보를 결합하여 처리하는 방식이다.
혹은 반대로 사용자 테이블을 먼저 읽어서 푸시 알림을 허용한 사용자의 pk를 캐싱해도 괜찮을 것 같다.
User 데이터가 1억 개라고 가정하면 유효한 user id만 캐싱했을 때, 8byte * 100,000,000 = 8MB가 나온다.
문제는 이걸 어디에 캐싱해둘 것이냐인데, Batch에서 상태를 관리한다는 접근법은 별로라고 생각한다.
그렇다면 Redis나 Cuava Cache에 저장해두어야 할 텐데, ttl은 어떻게 잡을 것이며 부하가 심하진 않을까?
만약 Batch 작업 도중 Redis가 down된다면, 실패 복구는 무슨 수로 할 것인가?
2️⃣ 파티셔닝 & 병렬 처리
이 방법을 전에 이미 구상하고, 결론에 써놨었는데 포스팅이 날아간 후에 다시 안 적어놨었다. 😩
Device Token 테이블을 user_id 해시값을 기준으로 파티션을 분리한다면, 애플리케이션 단에서 병렬 처리도 가능하다.
구간도 좁힐 수 있으면서, 파티션이 나뉘어져 있으므로 동시성 문제도 크게 신경쓰지 않아도 된다고 생각한다.
문제는 이건 진짜 공부 목적으로 할 거 아니면, 서비스에 반영하기엔 너무 오버 엔지니어링인데다가
시간을 투자해서 개발했다고 하더라도 추후 다른 기능에 영향을 미칠 수 있고, 팀원들이 이해할 수 있을지가...
3️⃣ 데이터 비정규화
번거롭지만 가장 간단한 방법.
Device Token 테이블에 사용자 푸시 알림 허용 여부 정보까지 저장해버리면 된다.
대신 사용자가 푸시 알림 수신 여부를 업데이트 할 때마다, N개의 device token 정보도 함께 수정해야 한다.
📌 min, max를 탐색하는 쿼리를 수정할까?
애초에 이 문제가 발생한 문제가 뭘까?
...물론, 1차 원인은 Join을 걸어서 대규모 데이터 처리를 하려고 한 내 지능 이슈긴 한데....
우선, min/max는 나중에 Pagination Query에서 offset 대신 범위를 지정하기 위함이다.
SELECT u.id, u.name, d.token
FROM device_token d
INNER JOIN user u ON d.user_id = u.id AND u.deleted_at IS NULL
WHERE d.activated = true AND u.account_book_notify=true AND u.id > currentId AND u.id <= lastId
ORDER BY u.id
LIMIT 0, chunkSize;
그런데 뭔가 이상하지 않나?
user와 device token은 일대다 관계이므로, Join을 하면 user id는 더 이상 유일값이 아니게 된다.
그럼 범위 지정은 user id가 아니라 device token id로 하는 게 맞다. (근데 왜 지금까지 정상 동작한 거지..?)
더 나아가서 device token id로 min/max를 정할 거라면 굳이 join이 필요하지 않다.
그런데 현재 방식은 SELECT를 위해 주입한 쿼리를 from 절만 수정해서 min, max를 구하는 쿼리로 사용하므로 문제가 발생한다.
public class QuerydslNoOffsetNumberOptions<T, N extends Number & Comparable<?>> extends QuerydslNoOffsetOptions<T> {
...
private JPAQuery<T> idSelectQuery;
...
/**
* currentId와 lastId를 초기화하기 위한 JPAQuery를 설정한다.
* <p>
* idSelectQuery가 null이면 내부적으로 currentId와 lastId를 초기화한다.
*/
@Override
public void setIdSelectQuery(JPAQuery<T> idSelectQuery) {
this.idSelectQuery = idSelectQuery;
}
...
@Override
public void initKeys(JPAQuery<T> query, int page) {
if (page == 0) {
query = (idSelectQuery != null) ? idSelectQuery.clone() : query.clone();
initFirstId(query);
initLastId(query);
if (logger.isDebugEnabled()) {
logger.debug("First Key= " + currentId + ", Last Key= " + lastId);
}
}
}
}
그래서 그냥 바꿔치기 스니펫을 추가해줬다.
물론 나도 이게 깔끔한 방법이라고는 생각 안 하지만, 일단 문제 해결에 보다 초점을 두고 코드를 이쁘게 만드는 건 나중에 고려하자.
/**
* 디바이스 토큰과 유저 아이디를 담은 DTO
*/
public record DeviceTokenOwner(
Long userId,
Long deviceTokenId, // 추가
String name,
String deviceToken
) {
}
current id를 업데이트하기 위한 device token id 정보도 추가해주자.
@Bean
@StepScope
public QuerydslNoOffsetPagingItemReader<DeviceTokenOwner> querydslNoOffsetPagingItemReader() {
QuerydslNoOffsetOptions<DeviceTokenOwner> options = new QuerydslNoOffsetNumberOptions<>(deviceToken.id, Expression.ASC, "deviceTokenId");
options.setIdSelectQuery(queryFactory.select(createConstructorExpression()).from(deviceToken));
return new QuerydslNoOffsetPagingItemReader<>(emf, 1000, options, queryFactory -> queryFactory
.select(createConstructorExpression())
.from(deviceToken)
.innerJoin(user).on(deviceToken.user.id.eq(user.id))
.where(deviceToken.activated.isTrue().and(user.notifySetting.accountBookNotify.isTrue()))
);
}
private ConstructorExpression<DeviceTokenOwner> createConstructorExpression() {
return Projections.constructor(
DeviceTokenOwner.class,
user.id,
deviceToken.id,
user.name,
deviceToken.token
);
}
그리고 ItemReader 쪽에서 QuerydslNoOffsetOptions를 생성한 후, id min/max 탐색을 위한 query를 추가 지정해주고 실행하면 min/max 체크가 훨씬 빠르게 끝나는 것을 확인할 수 있다.
📌 성능 평가
-> Limit: 1000 row(s) (cost=12856797.61 rows=1000) (actual time=0.244..167.040 rows=1000 loops=1)
-> Nested loop inner join (cost=12856797.61 rows=474067) (actual time=0.243..166.889 rows=1000 loops=1)
-> Filter: ((d.activated = true) and (d.id > 76258) and (d.id <= 38150113) and (d.user_id is not null)) (cost=3823747.36 rows=9481344) (actual time=0.076..2.493 rows=1000 loops=1)
-> Index range scan on d using PRIMARY (cost=3823747.36 rows=18962689) (actual time=0.072..1.810 rows=1000 loops=1)
-> Filter: ((u.account_book_notify = true) and (u.deleted_at is null)) (cost=0.85 rows=0) (actual time=0.164..0.164 rows=1 loops=1000)
-> Single-row index lookup on u using PRIMARY (id=d.user_id) (cost=0.85 rows=1) (actual time=0.163..0.163 rows=1 loops=1000)
SELECT u.id, u.name, d.id, d.token
FROM device_token d
INNER JOIN user u ON d.user_id = u.id AND u.deleted_at IS NULL
WHERE d.activated = true AND u.account_book_notify=true AND d.id > 76258 AND d.id <= 38150113
ORDER BY d.id
LIMIT 0, 1000;
- device_token 테이블을 먼저 스캔하고, 각 행에 대해 user 테이블을 조회하고 있다.
- device_token 테이블 스캔
- Primary 인덱스로 range scan
- 예상 rows: 18,962,689, 실제 rows: 1,000 (LIMIT 조건)
- 실행 시간: 약 1.810ms
- user 테이블 조회
- 각 device_token 행에 대해 primary 인덱스로 single-row lookup
- 1,000번의 루프를 돌며 각각 약 0.163ms 소요
- 전체 쿼리 성능
- 총 실행 시간: 약 167.040ms
- 반환된 행 수: 1,000
생각했던 것 이상의 퍼포먼스를 보여주고 있다.
u.id가 아니라 d.id를 사용하기 때문에 Index도 기가막히게 타고 있고,
join 또한 user pk를 사용하여 eq_ref type(const 타입을 제외하고 가장 빠른 조인 타입)을 사용하기 때문에 매우 훌륭한 join 전략을 사용하고 있다.
그럼 배치도 빠르게 수행되냐고 한다면 그렇지 않다.
정확히는 더 느려졌다. ㅎㅎ
그도 그럴것이 이전 쿼리는 device token이 아니라 사실상 user 테이블을 타겟으로 움직였었다. 애초에 잘못된 쿼리였던 것이다.
예를 들어, 처음에 currentId가 0이고, lastId까지의 사용자를 select하고, pk가 1~1,000인 사용자를 가져왔다고 치자.
사용자들이 가지고 있는 device token를 조회하는데 1,000까지만 가져온다.
문제는 여기서 데이터가 누락되었을 수도 있다.
current_id는 1,000으로 업데이트가 될 텐데, 그럼 더 이상 pk 1,000 이하의 user이 device_token은 선택될 일이 없음을 의미한다.
따라서 기존 방식은 user는 모두 한 번씩 선택되었으므로 상관없지만, device_token이 누락되는 심각한 문제가 존재했다.
다시 데이터 1,000,000개 넣고 테스트 해보니, 30min이 걸린다. 🥲
현재 device token은 3천 8백만 개가 저장되어 있고, chunk size는 1,000이므로 싱글 스레드로 이 작업을 처리하려면 38,000번 반복해야 한다.
reader만 놓고 봤을 때 횟수당 167.040ms, 즉 총 수행 시간은 6,346,000ms = 105min이 된다. (1h 45min)
여기에 writer와 db connection을 연결하고 해제하는 작업 + 애플리케이션 수행 시간 등을 고려하면, 1h 50min 정도 소요될 것이라 예상한다.
이 이상 퍼포먼스를 끌어올리려면 두 가지 선택지가 남았다.
- Reader의 수행 시간을 10ms까지 끌어올리거나, Batch size 크기를 늘린다.
- DB 파티셔닝을 하고, 멀티 스레드로 돌린다.
아무리 생각해도 최적의 방법은 전자의 방식을 택하는 것이다.
왜냐하면, 현재 쿼리에서 join만 제거하면 아래의 성능을 얻을 수 있다.
-> Limit: 1000 row(s) (cost=3823762.27 rows=1000) (actual time=0.080..0.756 rows=1000 loops=1)
-> Filter: ((d.activated = true) and (d.id > 76258) and (d.id <= 38150113)) (cost=3823762.27 rows=9481344) (actual time=0.079..0.665 rows=1000 loops=1)
-> Index range scan on d using PRIMARY (cost=3823762.27 rows=18962689) (actual time=0.076..0.514 rows=1000 loops=1)
무려 0.756ms 컷 ㅋㅋㅋㅋㅋㅋ
원래 join으로 해결했어야 할 문제를 어딘가에서는 처리를 해야 한다지만,
user table 또한 단일 테이블 탐색으로 빠르게 돌려서 푸시 알림을 허용한 사용자 정보를 캐싱해두고
pk 존재 여부만 체크하는 작업이 join보다는 압도적으로 빠를 것.
물론 난 이미 초기 목적을 달성한 상태고, 이 이상은 여태 말했듯 오버 엔지니어링이라 작업은 안 했지만
개인적으로 흥미가 생겨서 꼭 개선해보고 싶다.
추가로 느낀 점. 데이터 1억 개란 정말 엄청나게 큰 수구나..