[Spring Boot] 정기 푸시 알림(Push Notification) 전송 배치(Batch) 프로세스
💡 문제가 되는 부분이 많고, Batch에 대해 미숙한 이해를 기반으로 작성한 글이므로 참고로만 읽어주세요.
📕 목차
1. Introduction
2. Domain
3. Infra
4. Batch
5. Discussion Topics
1. Introduction
📌 Usecase
- 사용자는 한 명당 여러 개의 디바이스 토큰을 가지고 있을 수 있다. (서로 다른 기기로 로그인 했을 시)
- 시스템은 푸시 알림을 허용한 사용자의 모든 활성화된 디바이스 토큰으로 정기(매일 20시) 푸시 알림을 전송할 수 있어야 한다.
- 사용자는 100명으로 가정한다.
- 알림 전송 실패에 대한 예외는 처리하지 않으며, 상태 또한 관리하지 않는다. 단, 알림 데이터는 DB에 저장되어 있어야 한다.
- 사용자에게 디바이스 토큰이 여러 개 있더라도 알림 데이터의 중복은 존재해선 안 된다.
- 동시성 문제는 고려하지 않는다. (구현을 최우선으로 하고 추후 개선)
📌 Architecture
유즈 케이스를 정의한 후 대략적으로 그려본 설계.
멀티 모듈 구조를 따르고 있으며, Batch 모듈은 common, infra, domain 모듈을 의존한다.
Batch 애플리케이션을 실행하면 바로 job이 실행되고 종료되게 한 다음, Linux에서 cron을 동작시킬까 고민도 해보았으나 문제가 귀찮아질 거 같아서 일단 Spring Boot 자체적으로 스케줄링을 하도록 만들기로 했다.
아직 브랜치 반영이 안 되어 있는 관계로 PR 링크로 대체
+ `24.10.08
이거 요새 조회수가 높아서, 급하게 추가해두는 내용이지만 해당 포스팅에서 최종 구현한 Batch는 최악의 성능을 보입니다.
만약 Batch 코드를 프로젝트에 반영이라도 할 계획이라면, 요것도 꼭 읽어보세요.
2. Domain
📌 Entity
DeviceToken 테이블은 토큰 정보와 활성화 여부를 확인하는 필드를 갖는다.
Notification은 알림의 종류, 그리고 알림이 공지 타입이라면 공지 종류가 무엇인지 확인하는 필드를 갖는다.
사용자의 읽음 여부를 확인하기 위해서 read_at 필드 또한 추가해주었다.
📌 푸시 알림을 허용한 사용자의 활성화된 디바이스 토큰 리스트 조회
SELECT 절을 수행하기 위해선 User와 DeviceToken을 Join 해주어야 한다.
또한 Batch는 Chunk 처리 방식을 지향하므로 Pagination을 적용하여 응답으로 보내주어야 한다.
✒️ Cursor vs Paging
Spring Batch는 Cursor, Paging 2개 타입의 Reader를 제공한다.
커넥션 시간과 메모리, non-thread safe, single thread라는 제약 조건 하에서 수행되어야 하는 Cursor를 사용하기엔 Batch를 처음 사용해보는 상황에서 쓰기엔 매우 부적합하다고 느꼈다.
결정적으로 우리 서버는 작고 귀여운 프리티어 EC2라 메모리 부담을 늘릴 수는 없다..!
여튼 위 로직을 처리하기 위해 구상한 쿼리는 다음과 같다.
SELECT d.token, u.id, u.name
FROM device_token d
LEFT JOIN user u ON d.user_id = u.id
WHERE d.activated = true AND u.account_book_notify = true
ORDER BY u.id ASC
LIMIT ${pageable.pageSize} OFFSET ${pageable.offset}
;
원래는 user_id를 기준으로 GROUP BY 절로 묶은 후, 결과를 transform 해주었었는데 Page 생성하다가 에러가 발생한다.
조회한 레코드는 N개인데, 사용자 한 명에게 여러 device_token이 존재하는 경우 응답 시 원소 개수가 달라서 실패하는 듯 하여 이는 api 호출자 측에서 처리하도록 만들었다.
모든 데이터를 가져오는 건 아무래도 비효율적이라 생각했고, 추후 bulk 연산을 할 생각이므로 영속화도 필요 없어서 DTO를 정의해서 데이터를 가져왔다.
/**
* 디바이스 토큰과 유저 아이디를 담은 DTO
*/
public record DeviceTokenOwner(
Long userId,
String name,
String deviceToken
) {
}
@Repository
@RequiredArgsConstructor
public class DeviceTokenCustomRepositoryImpl implements DeviceTokenCustomRepository {
private final JPAQueryFactory queryFactory;
private final QUser user = QUser.user;
private final QDeviceToken deviceToken = QDeviceToken.deviceToken;
@Override
public Page<DeviceTokenOwner> findActivatedDeviceTokenOwners(Pageable pageable) {
List<DeviceTokenOwner> content = queryFactory
.select(
Projections.constructor(
DeviceTokenOwner.class,
user.id,
user.name,
deviceToken.token
)
)
.from(deviceToken)
.leftJoin(user).on(deviceToken.user.id.eq(user.id))
.where(deviceToken.activated.isTrue().and(user.notifySetting.accountBookNotify.isTrue()))
.offset(pageable.getOffset())
.limit(pageable.getPageSize())
.orderBy(user.id.asc())
.fetch();
JPAQuery<Long> count = queryFactory
.select(deviceToken.id.count())
.from(deviceToken)
.leftJoin(user).on(deviceToken.user.id.eq(user.id))
.where(deviceToken.activated.isTrue().and(user.notifySetting.accountBookNotify.isTrue()));
return PageableExecutionUtils.getPage(content, pageable, () -> count.fetch().size());
}
}
count 쿼리 최적화를 위해서 PageableExecutionUtils를 사용했지만, 이를 최적화하기 위한 다른 기법도 존재한다.
그런데 100만 건도 안 될 데이터를 위해 최적화를 지금 고려하는 게 굳이..싶어서 보류했다.
PageableExecutionUtils를 사용하면 page size를 기반으로 count 쿼리를 지연시켜 호출한다.
만약 호출할 필요가 없는 상황이면 호출하지 않는다.
📝 테스트
@Test
@Transactional
@DisplayName("비활성화된 디바이스 토큰을 제외하고, 알림을 허용한 사용자의 활성화된 디바이스 토큰을 조회한다.")
public void selectActivatedDeviceTokenThatNotifyTrueUser() {
// given
User user = userRepository.save(createUser("jayang"));
List<DeviceToken> deviceTokens = List.of(
DeviceToken.of("deviceToken1", user),
DeviceToken.of("deviceToken2", user),
DeviceToken.of("deviceToken3", user)
);
deviceTokens.get(1).deactivate();
deviceTokenRepository.saveAll(deviceTokens);
Pageable pageable = Pageable.ofSize(100);
// when
Page<DeviceTokenOwner> owners = deviceTokenRepository.findActivatedDeviceTokenOwners(pageable);
// then
assertEquals("조회 결과 원소 개수는 2여야 합니다.", owners.getTotalElements(), 2L);
for (DeviceTokenOwner owner : owners) {
assertNotEquals("deviceToken2는 비활성화 토큰입니다.", "deviceToken2", owner.deviceToken());
}
}
@Test
@Transactional
@DisplayName("알림을 허용하지 않은 사용자의 활성화된 디바이스 토큰을 조회하지 않는다.")
public void notSelectNotifyFalseUser() {
// given
User activeUser = userRepository.save(createUser("jayang"));
User deactiveUser = userRepository.save(createUser("mock"));
List<DeviceToken> deviceTokens = List.of(
DeviceToken.of("deviceToken1", activeUser),
DeviceToken.of("deviceToken2", deactiveUser));
deviceTokens.get(1).deactivate();
deviceTokenRepository.saveAll(deviceTokens);
Pageable pageable = Pageable.ofSize(100);
// when
Page<DeviceTokenOwner> owners = deviceTokenRepository.findActivatedDeviceTokenOwners(pageable);
// then
assertEquals("조회 결과는 하나여야 합니다.", 1L, owners.getTotalElements());
assertEquals("알림을 허용하지 않은 사용자의 디바이스 토큰은 조회되지 않아야 합니다.", "jayang", owners.getContent().get(0).name());
}
@Test
@Transactional
@DisplayName("사용자 별로 디바이스 토큰 리스트를 받을 수 있다.")
public void selectDeviceTokenListByUserId() {
// given
User user1 = userRepository.save(createUser("jayang"));
User user2 = userRepository.save(createUser("mock"));
List<DeviceToken> deviceTokens = List.of(
DeviceToken.of("deviceToken1", user1),
DeviceToken.of("deviceToken2", user1),
DeviceToken.of("deviceToken3", user1),
DeviceToken.of("deviceToken4", user2),
DeviceToken.of("deviceToken5", user2)
);
deviceTokenRepository.saveAll(deviceTokens);
Pageable pageable = Pageable.ofSize(100);
// when
Page<DeviceTokenOwner> owners = deviceTokenRepository.findActivatedDeviceTokenOwners(pageable);
Map<String, List<String>> deviceTokenMap = new HashMap<>();
for (DeviceTokenOwner owner : owners) {
deviceTokenMap.computeIfAbsent(owner.name(), k -> new ArrayList<>()).add(owner.deviceToken());
}
// then
assertEquals("전체 결과 개수는 5개여야 합니다.", 5L, owners.getTotalElements());
assertEquals("jayang의 디바이스 토큰 개수는 3개여야 합니다.", 3, deviceTokenMap.get("jayang").size());
assertEquals("mock의 디바이스 토큰 개수는 2개여야 합니다.", 2, deviceTokenMap.get("mock").size());
}
📌 공지 Notification 삽입
이 경우 굉장히 까다로운 조건이 하나 있었는데, 이전의 DeviceToken 정보를 Page로 불러왔기 때문에 발생한다.
예를 들어, 사용자의 pk가 10번이고 해당 사용자에게 등록된 활성화 디바이스 토큰 pk가 {1, 50, 101}이라고 가정하자.
데이터를 읽을 때 size를 100으로 잡았다면, 1, 51번 디바이스 토큰에 알림을 전송하기 위해 Notification을 등록할 것이다.
그리고 다음 Chunk에서 101번 데이터를 읽어오므로, 10번 사용자에게 알림 데이터를 다시 저장한다.
bulkInsert로 데이터를 삽입해주기 전에, 조건을 탐색해야 할 필요가 있다.
INSERT INTO notification(id, type, read_at, created_at, updated_at, receiver, announcement)
SELECT NULL, '0', NULL, NOW(), NOW(), u.id, '1'
FROM user u
WHERE u.id IN (?)
AND NOT EXISTS (
SELECT n.receiver
FROM notification n
WHERE n.receiver = u.id
AND n.created_at >= CURDATE()
AND n.created_at < CURDATE() + INTERVAL 1 DAY
AND n.type = '0'
AND n.announcement = 1
);
데이터를 삽입하려는 사용자 리스트를 인자로 받아오고, 이미 당일에 같은 타입의 공지를 받은 경우엔 제외해주었다.
IN 절은 그나마 pk를 사용했기 때문에 index를 타서 괜찮은데, notification에서 정보를 조회할 때 Full Scan을 해버린다는 문제가...ㅠ
위 쿼리를 구현할 때는 QueryDsl을 사용할 수 없다.
JPA는 insert into from 절을 지원하지 않기 때문에, Hibernate 구현체를 사용하거나, JPQL Template, 혹은 Native SQL을 사용해야 한다.
✒️ JPA가 Batch insert를 지원하지 않는 이유
정확힌 JPA 때문이 아니라, MySQL을 RDBMS로 사용하면서 PK 전략을 IDENTITY로 사용했기 때문이다.
@GeneratedValue(strategy = GenerationType.IDENTITY)를 사용하면, auto_increment로 증가한 pk 값을 JpaRepository.save(T) 메서드 내부 동작에서 자동으로 저장하도록 처리한다.
문제는 여기서 Hibernate가 JDBC 수준의 Batch Insert를 비활성화 한다.
왜냐하면, 새로 할당할 key 값을 알 수 없는(DB가 새로운 기본 키 값을 생성하고 반환한 pk값을 JPA가 각 Entity ID를 추적하고 관리해야 함) IDENTITY 전략에서 Hibernate가 채택한 "Transactional Write Behind" flush 방식과 충돌하기 때문이다.
✒️ Transactional Write Behind
Hibernate가 성능 최적화를 채택한 방식으로써 주요 특징은 지연 쓰기(Deferred Writing)와 변경 사항 일괄 처리(Batch Processing)이 있다.
문제는 IDENTITY 전략은 각 Entity가 삽입될 때 DB가 pk를 생성하는데, Entity가 DB 삽입될 때마다 Hibernate가 해당 ID 값을 즉시 알아내야 한다는 점이다.
이는 Transaction이 끝날 때까지 변경 사항을 지연시키는 Hibernate 정책과 충돌한다. (INSERT가 그 즉시 발생해야 하므로)
따라서 IDENTITY 전략에서 여러 개의 INSERT 명령을 하나의 Batch로 묶어 실행하는 것이 어렵다.
우선 빠르게 구현을 해야하는 실정이었기 때문에, 소스가 가장 많은 JPQL Template를 활용했다.
@Slf4j
@Repository
@RequiredArgsConstructor
public class NotificationCustomRepositoryImpl implements NotificationCustomRepository {
private final JdbcTemplate jdbcTemplate;
private int batchSize = 500;
@Override
public void saveDailySpendingAnnounceInBulk(List<Long> userIds, LocalDateTime publishedAt, Announcement announcement) {
int batchCount = 0;
List<Long> subItems = new ArrayList<>();
for (int i = 0; i < userIds.size(); ++i) {
subItems.add(userIds.get(i));
if ((i + 1) % batchSize == 0) {
batchCount = batchInsert(batchCount, subItems, publishedAt, announcement);
}
}
if (!subItems.isEmpty()) {
batchInsert(batchCount, subItems, publishedAt, announcement);
}
log.info("Notification saved. announcement: {}, count: {}", announcement, userIds.size());
}
private int batchInsert(int batchCount, List<Long> userIds, LocalDateTime publishedAt, Announcement announcement) {
String sql = "INSERT INTO notification(id, type, read_at, created_at, updated_at, receiver, announcement) " +
"SELECT NULL, '0', NULL, NOW(), NOW(), u.id, ? " +
"FROM user u " +
"WHERE u.id IN (?) " +
"AND NOT EXISTS ( " +
" SELECT n.receiver " +
" FROM notification n " +
" WHERE n.receiver = u.id " +
" AND n.created_at >= CURDATE() " +
" AND n.created_at < CURDATE() + INTERVAL 1 DAY " +
" AND n.type = '0' " +
" AND n.announcement = ? " +
");";
jdbcTemplate.batchUpdate(sql, new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
ps.setString(1, announcement.getCode());
ps.setLong(2, userIds.get(i));
ps.setString(3, announcement.getCode());
}
@Override
public int getBatchSize() {
return userIds.size();
}
});
userIds.clear();
return ++batchCount;
}
}
네이티브 종속적인 게 너무 싫지만, QueryDsl로 insert 구현하면서 삽질을 너무 많이 해버려서 시간이 촉박했다. :(
batchSize는 500으로 잡았는데, 어차피 읽을 때 size를 100으로 잡아둬서 현재는 크게 의미가 없다.
batchSize가 500에 도달하면 도중에 subItems의 내용을 모두 Insert 해버리고 비운 후, 다시 채우기를 반복한다.
근데 JPQL Template 처음 써봐서 제대로 쓴 건지 모르겠다.
추후 성능 측정해보고 다시 확인해볼 예정.
📝 테스트
@Test
@Transactional
@DisplayName("여러 사용자에게 일일 소비 알림을 저장할 수 있다.")
public void saveDailySpendingAnnounceInBulk() {
// given
User user1 = userRepository.save(createUser("jayang"));
User user2 = userRepository.save(createUser("mock"));
User user3 = userRepository.save(createUser("test"));
// when
notificationRepository.saveDailySpendingAnnounceInBulk(
List.of(user1.getId(), user2.getId(), user3.getId()),
LocalDateTime.now(),
Announcement.DAILY_SPENDING
);
// then
notificationRepository.findAll().forEach(notification -> {
log.info("notification: {}", notification);
assertEquals("알림 타입이 일일 소비 알림이어야 한다.", Announcement.DAILY_SPENDING, notification.getAnnouncement());
});
}
@Test
@Transactional
@DisplayName("이미 당일에 알림을 받은 사용자에게 데이터가 중복 저장되지 않아야 한다.")
public void notSaveDuplicateNotification() {
// given
User user1 = userRepository.save(createUser("jayang"));
User user2 = userRepository.save(createUser("mock"));
Notification notification = new Notification.Builder(NoticeType.ANNOUNCEMENT, Announcement.DAILY_SPENDING)
.receiver(user1)
.build();
notificationRepository.save(notification);
// when
notificationRepository.saveDailySpendingAnnounceInBulk(
List.of(user1.getId(), user2.getId()),
LocalDateTime.now(),
Announcement.DAILY_SPENDING
);
// then
List<Notification> notifications = notificationRepository.findAll();
assertEquals("알림이 중복 저장되지 않아야 한다.", 2, notifications.size());
}
📌 NotifyType & AnnounceType
NoticeType을 어떻게 정의할 지 굉장히 고민이 많았는데, 위 아이디어를 참고했다.
알림 메시지는 기본적으로 틀이 거의 동일하다.
"'누가' '무엇'에 '행동'을 했습니다."와 같은 구조를 취하게 된다.
하지만, 현재 내가 개발 중인 MVP 상에는 공지 알림밖에 존재하지 않는다.
따라서 포맷은 그대로 빌려오긴 했지만, 상수는 고작해봐야 ANNOUNCEMENT 하나밖에 없다.
@Getter
public enum NoticeType implements LegacyCommonType {
ANNOUNCEMENT("0", "%s", "%s"); // 공지 사항은 별도 제목을 설정하여 사용한다.
private final String code;
private final String title;
private final String contentFormat;
private final String navigablePlaceholders = "{%s_%d}";
private final String plainTextPlaceholders = "%s";
NoticeType(String code, String title, String contentFormat) {
this.code = code;
this.title = title;
this.contentFormat = contentFormat;
}
}
하지만 여기서 고민이 하나 더 생겼는데, 알림 title과 content를 모두 DB에 저장하는 건 너무 비효율적이라 여겼다.
그렇다면 나중에 사용자가 알림을 다시 조회했을 때 어떻게 메시지를 다시 포맷팅 해줄 지가 관건이었다.
예를 들어, 지금은 "매일 지출 등록 요구 알림"이지만, Announcement 타입이라는 데이터만으로는 "매월 목표 금액 설정 알림"이라는 다른 공지 타입과 구분할 수 없었다.
@Getter
public enum Announcement implements LegacyCommonType {
NOT_ANNOUNCE("0", "", ""),
// 정기 지출 알림
DAILY_SPENDING("1", "%s님, 3분 카레보다 빨리 끝나요!", "많은 친구들이 소비 기록에 참여하고 있어요👀"),
MONTHLY_TARGET_AMOUNT("2", "6월의 첫 시작! 두구두구..🥁", "%s님의 이번 달 목표 소비 금액은?");
private final String code;
private final String title;
private final String content;
Announcement(String code, String title, String content) {
this.code = code;
this.title = title;
this.content = content;
}
public String createFormattedTitle(String name) {
validateName(name);
return String.format(title, name);
}
public String createFormattedContent(String name) {
validateName(name);
return String.format(content, name);
}
private void validateName(String name) {
if (!StringUtils.hasText(name)) {
throw new IllegalArgumentException("name must not be empty");
}
}
}
그래서 그냥 Announcement 타입인 경우, 어떤 공지 알림인지를 식별할 수 있는 상수를 별도로 정의해주었다.
3. Infra
📌 NotifcationEvent
/**
* FCM 푸시 알림에 필요한 정보를 담은 Event 클래스
* <p>
* 제목, 내용, 디바이스 토큰 리스트, 푸시 알림 이미지를 필드로 갖는다.
*/
public record NotificationEvent(
String title,
String content,
List<String> deviceTokens,
String imageUrl
) {
public NotificationEvent {
if (!StringUtils.hasText(title)) {
throw new IllegalArgumentException("제목은 반드시 null 혹은 공백이 아니어야 합니다.");
}
if (!StringUtils.hasText(content)) {
throw new IllegalArgumentException("내용은 반드시 null 혹은 공백이 아니어야 합니다.");
}
if (deviceTokens == null) {
throw new IllegalArgumentException("디바이스 토큰은 반드시 null이 아니어야 합니다.");
}
}
public static NotificationEvent of(String title, String content, List<String> deviceTokens, String imageUrl) {
return new NotificationEvent(title, content, deviceTokens, imageUrl);
}
public int deviceTokensSize() {
return deviceTokens.size();
}
/**
* 단일 메시지를 전송하기 위한 Message.Builder를 생성한다.
*/
public Message.Builder buildSingleMessage() {
return Message.builder().setNotification(toNotification()).setToken(deviceTokens.get(0));
}
/**
* 다중 메시지를 전송하기 위한 MulticastMessage.Builder를 생성한다.
*/
public MulticastMessage.Builder buildMulticastMessage() {
return MulticastMessage.builder().setNotification(toNotification()).addAllTokens(deviceTokens);
}
private Notification toNotification() {
return Notification.builder()
.setTitle(title)
.setBody(content)
.setImage(imageUrl)
.build();
}
}
우선 이벤트 핸들링을 위한 Event 클래스를 정의해주었다.
title, content, image 그리고 전송할 deviceToken 리스트를 포함한다.
현재는 공지 알림밖에 없기에 별도의 딥링크를 위한 데이터를 포함하지 않고 있는데, 아직은 나한테 필요 없는 고려사항이기 때문에 배제시켰다.
deviceToken이 한 개냐, 여러 개냐에 따라서 build 메서드를 나누었다.
하나로 합치려고 했는데, 리턴 타입이 달라서 무리가 있었다.
📌 NotificationEventHandler
/**
* FCM 푸시 알림을 처리하는 핸들러
*/
@Slf4j
@RequiredArgsConstructor
public class FcmNotificationEventHandler implements NotificationEventHandler {
private final FcmManager fcmManager;
@Async
@Override
@TransactionalEventListener
public void handleEvent(NotificationEvent event) {
log.debug("handleEvent: {}", event);
ApiFuture<?> response = fcmManager.sendMessage(event);
if (response == null) {
return;
}
response.addListener(() -> {
try {
log.info("Successfully sent message: " + response.get());
} catch (Exception e) {
log.error("Failed to send message: " + e.getMessage());
}
}, Executors.newCachedThreadPool()); // FIXME: 알림이 매우 많은 경우 out of memory 발생 가능성 있음 (Thread pool size 제한 필요)
}
}
요청을 전달해줄 fcmManager에게서 응답을 받으면 ApiFuture을 리턴 타입으로 받게 된다.
이는 비동기식 요청으로 메시지를 전송하도록 Async 메서드를 호출했기 때문인데, 주의할 점은 response를 바로 get 하는 순간 비동기가 아니게 된다.
Future의 get() 메서드 스펙을 읽어보면, 블록킹 방식으로 동작하고 있음을 알 수 있다.
애초에 대부분의 비동기 메서드 중 get은 대부분 블록킹 방식으로 동작한다는 것은 자바 공식문서에서 경고하고 있는 내용이다.
즉, 비동기-블락킹 방식으로 동작하고 있으므로 사실상 비동기가 아니게 된다.
이를 해결하기 위해선 콜백 메서드를 사용해서 response를 별도의 thread에서 get하면 된다.
이 문제의 가장 큰 단점은 푸시 알림이 굉장히 많이 발생했을 때, thread가 어마어마한 양으로 증가할 수 있다는 기술 부채를 끌어안게 된다는 점이다.
🤔 더 개선할 수 있는 방법
FCM 5.4.0 버전에서 이미 Fcm SDK에서 사용하는 Thread pool을 지정할 수 있는 API를 릴리즈 버전으로 내놓았다.
경로는 com.google.firebase에 위치한 ThreadManager에 있다.
ThreadManager는 추상 클래스고, 별다른 사용자 정의 구현체를 만들지 않으면 DefaultThreadManager를 사용하게 된다.
만약 이를 커스텀 해주고 싶으면 ThreadManager를 상속한 CustomThreadManager를 사용하면 된다.
@Slf4j
@ManagedResource
public class CustomThreadManager extends ThreadManager {
@Value("${fcm.thread.core-pool-size:#{T(java.lang.Runtime).getRuntime().availableProcessors() * 2}}")
private int corePoolSize;
@Value("${fcm.thread.max-pool-size:#{T(java.lang.Runtime).getRuntime().availableProcessors() * 4}}")
private int maxPoolSize;
@Value("${fcm.thread.keep-alive-time:60}")
private long keepAliveTime;
@Value("${fcm.thread.queue-capacity:1000}")
private int queueCapacity;
private ThreadPoolExecutor executor;
@Override
protected ExecutorService getExecutor(FirebaseApp firebaseApp) {
if (executor == null) {
executor = new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(queueCapacity),
getThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy()
) {
@Override
protected void beforeExecute(Thread t, Runnable r) {
log.debug("Executing task in thread: {}", t.getName());
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
if (t != null) {
log.error("Exception in executor", t);
}
}
};
}
return executor;
}
@Override
protected void releaseExecutor(FirebaseApp firebaseApp, ExecutorService executorService) {
if (executorService instanceof ThreadPoolExecutor) {
ThreadPoolExecutor tpe = (ThreadPoolExecutor) executorService;
tpe.shutdown();
try {
if (!tpe.awaitTermination(60, TimeUnit.SECONDS)) {
tpe.shutdownNow();
if (!tpe.awaitTermination(60, TimeUnit.SECONDS)) {
log.error("Thread pool did not terminate");
}
}
} catch (InterruptedException ie) {
tpe.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
@Override
protected ThreadFactory getThreadFactory() {
return new ThreadFactory() {
private final AtomicInteger threadNumber = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "FCM-Worker-" + threadNumber.getAndIncrement());
thread.setDaemon(true);
thread.setPriority(Thread.NORM_PRIORITY);
return thread;
}
};
}
@ManagedAttribute
public int getActiveCount() {
return executor != null ? executor.getActiveCount() : 0;
}
@ManagedAttribute
public long getCompletedTaskCount() {
return executor != null ? executor.getCompletedTaskCount() : 0;
}
@ManagedAttribute
public int getPoolSize() {
return executor != null ? executor.getPoolSize() : 0;
}
@ManagedAttribute
public int getQueueSize() {
return executor != null ? executor.getQueue().size() : 0;
}
}
위 코드는 나도 적용해보지 않은 거라서 절대 성공적으로 동작함을 보장하지 않는다.
그냥 나라면 이렇게 쓰지 않을까 싶어서 작성해보았는데, 실패할 가능성이 매우 높다.
fcm thread가 얼마나 생성될 지 알 수 없기 때문에 system core 수 기반으로 thread pool을 dynamic하게 결정한 후,
LinkedBlockingQueue를 사용해서 대기 중인 작업 수를 제한했다.
또한 CallerRunsPolicy를 사용해 Thread pool이 포화 상태일 때 호출 스레드에서 작업을 실행하고, 안전한 종료를 보장하기 위해 정상 종료를 시도하고, 필요한 경우에만 강제 종료를 수행하도록 한다.
근데 쓰다보니 생각보다 Thread 제어하는 거 재밌네..나중에 따로 포스팅해서 더 딥하게 파봐야겠다.
현재는 테스트 방법이 생각나지 않아서 여기까지만.
📌 FcmConfig
Firebase Cloud Messaging에 서비스를 등록하는 건 했다 치고,
admin key를 json 형태로 받아온 후 resource에 저장해준다.
참고로 이 키는 노출되어선 안 되므로 반드시 .gitignore에 등록해주어야 한다.
@Slf4j
@Profile({"local", "dev", "prod"})
public class FcmConfig implements PennywayInfraConfig {
private final ClassPathResource firebaseResource;
private final String projectId;
public FcmConfig(@Value("${app.firebase.config.file}") String firebaseConfigPath,
@Value("${app.firebase.project.id}") String projectId) {
this.firebaseResource = new ClassPathResource(firebaseConfigPath);
this.projectId = projectId;
}
@PostConstruct
public void init() throws IOException {
FirebaseOptions option = FirebaseOptions.builder()
.setCredentials(GoogleCredentials.fromStream(firebaseResource.getInputStream()))
.build();
if (FirebaseApp.getApps().isEmpty()) {
FirebaseApp.initializeApp(option);
log.info("FirebaseApp is initialized");
}
}
@Bean
FirebaseMessaging firebaseMessaging() {
return FirebaseMessaging.getInstance(FirebaseApp.getInstance());
}
@Bean
FcmManager fcmManager(FirebaseMessaging firebaseMessaging) {
return new FcmManager(firebaseMessaging);
}
@Bean
NotificationEventHandler notificationEventHandler(FcmManager fcmManager) {
return new FcmNotificationEventHandler(fcmManager);
}
}
FcmManager와 NotificationEventHandler까지 빈으로 직접 등록해주는 이유는 이전 포스팅에 작성해두었다.
📌 FcmManager
@Slf4j
@RequiredArgsConstructor
public class FcmManager {
private final FirebaseMessaging firebaseMessaging;
/**
* {@link NotificationEvent}를 받아서 메시지를 전송한다.
* <p>
* 디바이스 토큰이 1개인 경우에는 단일 메시지를, 2개 이상인 경우에는 다중 메시지를 전송한다.
* 만약 디바이스 토큰이 존재하지 않는 경우에는 메시지 전송을 하지 않는다.
* </p>
*/
public ApiFuture<?> sendMessage(NotificationEvent event) {
if (event.deviceTokensSize() == 0) {
log.info("메시지 전송을 위한 디바이스 토큰이 존재하지 않습니다.");
return null;
}
if (event.deviceTokensSize() == 1) {
return sendSingleMessage(event);
} else {
return sendMulticastMessage(event);
}
}
private ApiFuture<String> sendSingleMessage(NotificationEvent event) {
log.info("단일 메시지 전송 : {}", event);
Message message = event.buildSingleMessage().setApnsConfig(getApnsConfig(event)).build();
return firebaseMessaging.sendAsync(message);
}
private ApiFuture<BatchResponse> sendMulticastMessage(NotificationEvent event) {
log.info("다중 메시지 전송 : {}", event);
MulticastMessage messages = event.buildMulticastMessage().setApnsConfig(getApnsConfig(event)).build();
return firebaseMessaging.sendEachForMulticastAsync(messages);
}
private ApnsConfig getApnsConfig(NotificationEvent event) {
ApsAlert alert = ApsAlert.builder()
.setTitle(event.title())
.setBody(event.content())
.setLaunchImage(event.imageUrl())
.build();
Aps aps = Aps.builder()
.setAlert(alert)
.setSound("default")
.build();
return ApnsConfig.builder()
.setAps(aps)
.build();
}
}
Apns는 사용자에게 뜨는 푸시 알림에 보여질 정보를 나타낸다.
참고로 나는 iOS만 지원하기 때문에 Apns 설정만 해주었고, 웹이나 안드로이드는 별도 설정이 더 필요하다.
재밌는 건 여기서 푸시 알림 사운드도 지정이 가능하다는 점?
이걸 직접 요청을 보낸다고 하면, 대략 아래처럼 Body에 담긴다.
푸시 알림을 눌렀을 때, 딥 링크가 연결되어야 한다면 추가 data를 payload에 담아 전송해주어야 한다.
참고로 나는 deviceToken 개수에 따라서 single 요청과 multi 요청을 구분했는데, single 요청도 multicastAsync로 보내면 되지 않을까라는 의문도 들었었다.
솔직히 아직도 답을 모르겠다..!
Firebase 공식 문서랑 Stackoverflow, Reddit 다 뒤져봐도 해당 토픽에 대한 이야기를 찾을 수 없었다.
아무래도 코드를 뜯어봐야 할 것 같으니, 이것도 다른 포스팅에서 다루는 걸로...
4. Batch
📌 Architecture
Batch 처음 써봐서 제법 애먹었는데, 사용법이 어렵진 않았다.
그저 올바르게 쓰고 있다는 확신이 들지 않을 뿐..
패키지 구조를 어떻게 잡아야 할 지도 잘 모르겠어서, 그냥 정직하게 전부 분리해버렸다.
그래서 내 프로젝트에선 다음과 같은 패키지 구조를 갖는다.
📌 Job
@Configuration
@RequiredArgsConstructor
public class DailySpendingNotifyJobConfig {
private final JobRepository jobRepository;
private final SendSpendingNotifyStepConfig sendSpendingNotifyStepConfig;
@Bean
public Job dailyNotificationJob(PlatformTransactionManager transactionManager) {
return new JobBuilder("dailyNotificationJob", jobRepository)
.start(sendSpendingNotifyStepConfig.sendSpendingNotifyStep(transactionManager))
.on("FAILED")
.stopAndRestart(sendSpendingNotifyStepConfig.sendSpendingNotifyStep(transactionManager))
.on("*")
.end()
.end()
.build();
}
}
- 매일 실행 시켜줄 Job을 빈으로 등록시켜주었다.
- step의 실행 결과가 FAILED인 경우, 실행을 중단 후 다시 시도하는 실패 복구 단계를 포함시켰다.
- 그 외("*")에는 종료
Job 하나에는 여러 개의 Step이 존재할 수 있다.
만약 재시도 정책을 Step을 다시 실행하는 것이 아닌, 다른 Step 레벨에서 오류를 처리하도록 만들 수도 있을 것이다.
📌 Step
@Configuration
@RequiredArgsConstructor
public class SendSpendingNotifyStepConfig {
private final JobRepository jobRepository;
private final ActiveDeviceTokenReader reader;
private final NotificationProcessor processor;
private final NotificationWriter writer;
@Bean
public Step sendSpendingNotifyStep(PlatformTransactionManager transactionManager) {
return new StepBuilder("sendSpendingNotifyStep", jobRepository)
.<DeviceTokenOwner, DeviceTokenOwner>chunk(100, transactionManager)
.reader(reader.execute())
.processor(processor)
.writer(writer)
.build();
}
}
- Chunk size를 100으로 잡았다. (딱히 기준은 없었고, 어느 정도로 잡는 게 성능이 좋을 지 알 수 없어서 경험적으로 바꾸기로 했다.)
- chunk의 input과 output 제네릭 타입은 모두 DeviceTokenOwner
- 만약, processor에서 Input을 가공하여, 다른 타입의 Output을 내놓는 경우 수정이 필요하다.
- reader, processor, writer를 모두 step에 포함시켜준다.
📌 Reader
@Slf4j
@Component
@RequiredArgsConstructor
public class ActiveDeviceTokenReader {
private final DeviceTokenRepository deviceTokenRepository;
@Bean
public RepositoryItemReader<DeviceTokenOwner> execute() {
return new RepositoryItemReaderBuilder<DeviceTokenOwner>()
.name("execute")
.repository(deviceTokenRepository)
.methodName("findActivatedDeviceTokenOwners")
.pageSize(100)
.sorts(new HashMap<>() {{
put("id", Sort.Direction.ASC);
}})
.build();
}
}
- 위에서 정의했던 푸시 알림을 받기로 한 사용자의 활성화 토큰 리스트를 조회하는 메서드를 호출한다.
- Page 방식으로 처리하기로 했으므로, 인자로 들어갈 pageable 값을 지정해주었다.
- 다시 보니..sort 기준을 repository 메서드 내에서 내 멋대로 설정했던 거 같은데, 이러면 여기서 써준 의미가...ㅜ 수정해야겠다.
📌 Processor
@Slf4j
@Component
public class NotificationProcessor implements ItemProcessor<DeviceTokenOwner, DeviceTokenOwner> {
@Override
public DeviceTokenOwner process(@NonNull DeviceTokenOwner deviceTokenOwner) throws Exception {
log.info("NotificationProcessor 실행");
return deviceTokenOwner;
}
}
- 가공, 필터링을 처리하기 위한 단계로써, 현재는 로깅 외에 아무것도 처리하지 않는다.
이게..원래는 사용했었는데, 비지니스 로직을 이 단계에서 처리할 방법이 없어서 writer로 다시 옮겨버렸다.
없어도 무방.
📌 Writer
@Slf4j
@Component
@RequiredArgsConstructor
public class NotificationWriter implements ItemWriter<DeviceTokenOwner> {
private final NotificationRepository notificationRepository;
private final ApplicationEventPublisher publisher;
@Override
@Transactional
public void write(@NonNull Chunk<? extends DeviceTokenOwner> owners) throws Exception {
log.info("Writer 실행: {}", owners.size());
LocalDateTime publishedAt = LocalDateTime.now();
Map<Long, DailySpendingNotification> notificationMap = new HashMap<>();
for (DeviceTokenOwner owner : owners) {
notificationMap.computeIfAbsent(owner.userId(), k -> DailySpendingNotification.from(owner)).addDeviceToken(owner.deviceToken());
}
List<Long> userIds = new ArrayList<>(notificationMap.keySet());
notificationRepository.saveDailySpendingAnnounceInBulk(userIds, publishedAt, Announcement.DAILY_SPENDING);
for (DailySpendingNotification notification : notificationMap.values()) {
publisher.publishEvent(NotificationEvent.of(notification.title(), notification.content(), notification.deviceTokensForList(), ""));
}
}
}
- notificationRepository에 정의해주었던 batch insert 메서드를 사용하기 위한 인자를 추출한다.
- Transaction이 종료되면 event가 실행되도록 전부 publish 해준다.
- 사용자의 이름이 메시지 타이틀에 반영되어야 하기 때문에 한 번에 보낼 수가 없다..!
🟡 DailySpendingNotification DTO
@Builder
public record DailySpendingNotification(
Long userId,
String title,
String content,
Set<String> deviceTokens
) {
public DailySpendingNotification {
Objects.requireNonNull(userId, "userId must not be null");
Objects.requireNonNull(title, "title must not be null");
Objects.requireNonNull(content, "content must not be null");
Objects.requireNonNull(deviceTokens, "deviceTokens must not be null");
}
/**
* {@link DeviceTokenOwner}를 DailySpendingNotification DTO로 변환하는 정적 팩토리 메서드
* <p>
* DeviceToken은 List로 변환되어 멤버 변수로 관리하게 된다.
*/
public static DailySpendingNotification from(DeviceTokenOwner owner) {
Announcement announcement = Announcement.DAILY_SPENDING;
Set<String> deviceTokens = new HashSet<>();
deviceTokens.add(owner.deviceToken());
return DailySpendingNotification.builder()
.userId(owner.userId())
.title(announcement.createFormattedTitle(owner.name()))
.content(announcement.getContent())
.deviceTokens(deviceTokens)
.build();
}
public void addDeviceToken(String deviceToken) {
deviceTokens.add(deviceToken);
}
/**
* DeviceToken을 List로 변환하여 View를 반환한다.
*/
public List<String> deviceTokensForList() {
return List.copyOf(deviceTokens);
}
}
그냥 사용자 별로 deviceTokens를 묶어주기 위한 DTO.
Pagination 때문에 결과를 묶어서 반환해주지 않기 때문에, 이렇게 별도의 가공을 해주었다.
📌 Scheduler
@Slf4j
@Component
@RequiredArgsConstructor
public class SpendingNotifyScheduler {
private final JobLauncher jobLauncher;
private final Job dailyNotificationJob;
@Scheduled(cron = "0 0 20 * * ?")
public void runDailyNotificationJob() {
JobParameters jobParameters = new JobParametersBuilder()
.addLong("time", System.currentTimeMillis())
.toJobParameters();
try {
jobLauncher.run(dailyNotificationJob, jobParameters);
} catch (JobExecutionAlreadyRunningException | JobRestartException
| JobInstanceAlreadyCompleteException | JobParametersInvalidException e) {
log.error("Failed to run dailyNotificationJob", e);
}
}
}
그리고 최종적으로 매일 20시에 실행이 되도록 cron을 설정해주면 끝!!!!!
📌 Test
애플리케이션 자체적으로 정상적으로 동작함을 무식하게 실행시켜보고 통과함을 확인한 후,
실제 iOS로 메시지가 전달되는 것까지 확인했다. ㅎㅎ
5. Discussion Topics
📌 Notification Table record
현재 Notification 테이블의 정보만으로는 iOS 측에서 딥 링크를 사용할 정보를 저장할 수가 없다.
추후 공지 알림 뿐 아닌, 사용자간의 행동으로 발생한 이벤트 알림의 경우(누군가를 팔로우하거나, 실시간 메시지 전송)가 발생했을 때, 테이블을 수정해야 할 필요가 있다.
📌 Device Token model field
우리는 iOS 앱만이 Client로 존재하기 때문에 DeviceToken의 기기 OS를 식별하지 않는다.
하지만 만약 여러 플랫폼을 지원하게 되면, FCM 설정을 다르게 처리해주어야 할 필요가 있으므로 이 또한 데이터에 추가로 저장해야 한다.
📌 Batch
제일 스트레스인 건, 구현 속도를 높인다고 Batch에 대한 이해를 거의 버리다시피 진행했다는 점.
그냥 공식 문서 속독하면서 작동하는 코드에 집중하다보니, 어느 부분에서 비효율적이고 critical한 지 알 수가 없다.
이건 별도로 프로젝트를 파서 공부를 하거나, 현재 BE 개발 속도가 월등히 빠르므로 성능 테스트를 해볼 필요가 존재한다.
또한 Batch 테스트 케이스 작성 요령을 몰라서 하나도 못 써봤다..주말에 짬내서 시도해봐야 할 듯 하다.
📌 Test
이전 프로젝트에서도 느꼈던 고충인데, FCM 테스트를 어떻게 해야할 지 감이 안 온다.
iOS에서 메시지가 전송되는 것을 직접 확인해주기 전까진 정상 동작을 보장할 수가 없는데, 이게 너무 스트레스 받는다.
통합 테스트에서 고려할 사안은 아니다만..정말 이 방법밖에 없는 건지 좀 더 찾아 봐야겠다.
📌 Query Cost
현재 Batch 작업을 위한 Query 성능이 객관적인 평가가 이루어지고 있지 않다.
다만 Full Scan을 하는 건 누가봐도 문제가 있어 보이긴 하나, 그렇다고 index를 생성해주기엔 삽입이 너무 빈번해서 오히려 성능을 저하시킬 우려가 있다.
더 나은 방법이 떠오르질 않는데, 차라리 Join을 없애고 Process에서 필터링해주는 게 더 나을 지 성능 평가가 필요하다.