🫠 포스팅 길이가 길어지면 임시 저장 데이터가 자꾸 날아가버려서, 점진적으로 내용 추가 중입니다.
✏️ 포스팅 길이가 너무 길어져 렉이 너무 심해진 관계로, User Status 관리, Redis Clustering 그리고 현재 포스팅에서 구현한 방식의 각종 문제점들에 대해서는 다른 포스팅에서 마저 작성하도록 하겠습니다.
수정 일자 | 내용 |
`24.09.15 | • System Design • Message pub/sub • Proxy Server Routing |
`24.09.19 | • Authenticate (작성중) • Authorize (작성중) • 디자인 패턴을 적용한 ChannelInterceptor 리팩토링 |
`24.09.29 | • Authenticate |
`24.10.01 | • Authorize Subscribe Interceptor 작성 • 디자인 패턴을 적용한 ExceptionHandler 리팩토링 • PreAuthorize를 사용한 Controller 자원 접근 권한 검사 |
`24.10.04 | • ID Generator 선택 • Chat store process (작성 중) |
`24.10.08 | • 트랜잭션 메시징 |
`24.10.10 | • SessionSubscribeEvent 핸들러 오류 사항 공지. (정상 동작 안 함. 사용하지 말 것) |
모든 코드는 위에서 확인 가능. docker compose 설정도 모두 업로드.
📕 목차
1. System Design
2. Message pub/sub
3. Proxy Server Routing
4. User Info Caching & Authorization
5. Local Chat ID Generator & Chat Store
6. Refactoring
1. System Design
📌 System Architecture
채팅 시스템 설계는 얼추 모두 구상해두었다.
물론 초기 설계라서 작업을 하면서 간간히 수정되는 부분이 발생할 수 있다.
사용하는 기술 스택은 mono repo, multi module 구조의 spring boot와 STOMP 서브 프로토콜을 갖는 WebSocket으로 구현할 예정이다.
Message Broker는 RabbitMQ(원래 안 쓰려고 했는데, 호기심이 발동해버려서 그만)를 사용한다.
채팅 시스템인만큼 클러스터를 통해 병렬 처리를 수행하여 방대한 양의 데이터를 처리할 수 있는 kafka를 사용하는 게 합리적일 수 있겠지만, MQ는 커녕 채팅 시스템도 처음 만들어보는 나에게 관리자 UI를 제공해주면서 학습 곡선이 낮은 RabbitMQ를 사용하는 게 훨씬 적절하다고 느꼈다.
애초에 Kafka를 사용해야 할 만큼 사용자가 확보된 상태도 아니라서 의미가 없다.
설계만 잘 해두면, MQ는 나중에 바꿔도 무방하다.
Proxy로는 프로젝트에서 사용하던 Ngnix를 그대로 사용할 예정이고, 키-값 저장소는 Redis를 사용한다.
chat 이력에 대해서는 Cassandra를 사용해보고 싶은 마음도 굴뚝같았지만, 프리티어 EC2 하나에 컨테이너 전부 올리는 건 무리가 있겠다 싶어 포기했다.
대신 roomId에 대해 파티셔닝을 위해 Redis를 Clustering하여 사용할 예정이다.
참고로 해당 포스팅을 참고하려는 사람이 있다면, 채팅 서버는 Spring이 아니라 node.js + socket.io 조합을 추천한다.
spring + webSocket은 뭔가....양복입고 마라톤하는 느낌이라고 해야 하나. 너무 과하다.
나는 채팅 시스템을 처음 구축해보는 거라, 익숙한 언어를 사용해서 일단 구현하고 나중에 migration하는 게 차라리 나을 거 같아서 spring을 사용하기로 결정했다.
📌 웹 소켓 연결
채팅 서버는 트래픽 양에 따라 N개 존재할 수 있다.
그 말은 즉슨, client 애플리케이션에 채팅 서버 URL을 입력하는 게 아니라, 우선 서버로부터 유효한 chat server url을 받는 것부터 시작해야 한다.
참고로 다이어그램에서 빠트렸는데, API 이전에 proxy 서버가 반드시 존재해야 한다.
- client가 stateless server에게 연결 가능한 websocket server url을 요청한다.
- stateless server는 사용자의 물리적 위치와 채팅 서버의 용량 등을 기반으로 최적의 채팅 서버 url을 client에게 반환한다.
- client가 채팅 서버에 대한 연결 요청을 server로 보낸다.
- SockJS의 경우 처음에 http 요청으로 채팅 서버 상태 정보를 조회하는데, 이건 stateless server로 전달한다.
- 채팅 서버와 client 간의 web socket 연결을 승인한다.
유효한 채팅 서버를 탐색하는 도구로 Apache Zookeeper를 사용할 수도 있겠지만, 내가 운영하는 서비스는 채팅 서버가 고작 하나밖에 없어서 사실상 불필요하다.
그럼에도 이렇게 구조를 설계한 이유는 확장성을 고려했기 때문인데, 나중에라도 채팅 서버를 증설해야 할 필요성을 느꼈을 때 Infrastructure 모듈만 손 보면 되기 때문이다.
🤔 Proxy Server는 왜 필수인가?
채팅 서버가 N개라 하더라도, 처음 API 서버로부터 url을 받은 후에 할당받은 채팅 서버에 연결하면 그만 아닌가?
라고 생각할 수 있겠지만 실은 그리 간단하지 않다.
채팅 서버가 모든 origin으로 부터의 웹 소켓 연결 요청을 허락해버리면, 보안에 상당한 위협이 된다.
이를 방지하려면 모든 채팅 서버가 웹 소켓 연결 이전에 JWT나 Session 등을 사용하여, 사용자의 인증 정보를 처리하는 로직을 추가해야 하는데 상당히 번거롭다.
처음 웹 소켓에 연결할 때만 인증에 성공하면, 그 뒤는 신용할 수 있다고 판단할 수 있기 때문에 채팅 서버가 해당 로직을 포함하는 것은 잘못된 설계라고 판단했다.
그리고 WebSocket은 `ws://`로 연결을 하지만, 이 또한 https처럼 TLS 암호화 처리를 해주어야 한다.
즉, `wss://` 요청을 처리하기 위해서라도 proxy server의 존재는 필수적이다.
이 외에도 많은 이유가 있지만, 대부분 채팅 시스템 구축할 때의 고유의 장점이라기 보단 proxy server가 필요한 이유와 동일하므로 패스~
📌 Message 송/수신
라이브러리를 사용하면 메시지를 송/수신하는 걸 구현하는 건 별로 어렵지 않다.
하지만 채팅 이력을 보관하고, 사용자 상태를 관리하면서, 사용자 상태에 따라 분기를 처리하는 건 복잡하다.
STOMP 서브 프로토콜을 사용하므로, pub/sub 구조의 요청과 응답을 주고 받아야 한다.
Message Broker는 RabbitMQ를 사용하기로 했는데, 클라이언트는 Server에서 어떤 MQ를 사용하는지 존재를 알 수 없도록 감추어야 한다. (물론 응답 헤더에 정보가 나오긴 하지만, MQ가 바뀐다고, client에 영향을 주어선 안 된다는 의미다.)
🤔 Session이 연결되어 있지 않은 사용자?
현재 설계에 포함되지 않았지만, 반드시 고려되어야 하는 사항이 있다.
바로 어떠한 채팅 서버에도 WebSocket이 연결되어 있지 않은 client에게도 푸시 알림을 보내야 한다는 점이다.
문제는 RabbitMQ의 특성상, Queue에 들어온 값을 매번 꺼내서 처리한다고 하면, 해당 메시지는 그대로 소비되어 버린다는 점이다. (리스크가 크다)
그리고 해당 기능을 채팅 서버에 그대로 넣을 수도 없는 것이, N개의 채팅 서버에서 모두 Queue의 값을 꺼내본다고 하면 중복 알림 전송 문제가 발생할 수도 있다는 점이다.
이 부분은 Node.js로 경량 애플리케이션을 하나 띄워서 처리하는 것이 나을 것 같아서 설계에 포함시키지 않은 상태다.
혹시나 하다가 더 나은 아이디어가 떠오를 지도 모르니 일단 보류.
📌 User Status
WebSocket에서 사용자의 상태는 online, offline이 고작이다.
애초에 연결 어떻게 할 지 약속하는 스펙이 다라서, 보다 구체적인 상태는 애플리케이션 레벨에서 처리해야 한다.
STOMP에서 제공해주는 command는 그저 online, offline 상태를 어떻게 표현할 지 클라이언트와 약속했을 뿐이다.
사용자 상태는 왜 online, offline 외에도 더 유지해야 할까?
일반적으로 socket이 연결되어 있다면, online 상태로 판단하여 연결된 socket에 message를 흘려주면 된다고 생각할 것이다.
하지만 사용자가 앱은 실행했지만, 구독 중인 채팅방 화면을 보고 있지 않다면?
socket이 아니라 push notification으로 받았어야 한다.
이렇게 세세한 설정을 해주려면, 사용자의 상태를 client로 부터 받아서 관리해주어야만 한다.
특히 박동 검사를 위한 ping/pong도 열심히 핸들링 해주어야 한다. ㅎㅎ
📌 구현에 들어가기 앞서..
설계가 있으니 바로 완성된 코드만 만들어낼 수 있으면 참 좋겠지만, 처음 접해보는 도메인이라 점진적인 구현과 수정으로 진행하고 있다.
따라서 위에서 적었던 코드가 갑자기 밑에서 역변하는 경우가 존재할 수도 있다.
2. Message pub/sub
📌 Why use RabbitMQ?
외부 브로커가 없는 상태를 상상해보자.
우리 서비스는 채팅방에 참여할 수 있는 인원수에 제한을 둘 예정이기 때문에, 사용자1이 전송한 메시지를 참가자 전원의 MQ에 복사하여 삽입하는 방식에 무리가 없다.
문제는 그 MQ를 어떻게 관리할 것인지가 관건이다.
spring boot stomp 의존성을 추가하면, 기본적으로 SimpleMessageBroker를 제공한다.
'얘가 뭐하는 놈인데?' 싶었는데, 쉽게 설명해서 각 채팅 서버가 자신의 MQ를 관리하는 방법이다.
그래서 이걸 in-memory 기반 message broker라고 하는데, 여기엔 치명적인 문제들이 존재한다.
- 사용자가 많아질 수록 서버의 메모리를 야금야금 잡아먹는다.
- 서버가 down되거나, 재시작하면 MQ 안의 데이터들이 유실된다.
- 다중화된 서버 환경에선 채팅방을 공유할 수 없게 된다.
- 사용자1이 채팅 서버1로 메시지를 전송한다. (roomId=1)
- 채팅 서버1에는 활성화된 사용자가 없으므로, 사용자2,3에게는 메시지를 전송하지 않고 종료한다.
- 해당 데이터는 사용자2,3가 채팅 서버1에 연결되지 않는 한 영원히 공유되지 않는다.
따라서 우리는 MQ 상태를 관리하는 외부 브로커를 둘 필요가 존재한다.
물론 애플리케이션을 하나 더 만들어서 직접 관리해줘도 되겠지만, 특별히 커스텀이라도 하고 싶은 게 아니라면 오버 엔지니어링이기 때문에 여기서 RabbitMQ를 사용하기로 결정했다.
우선 STOMP 프로토콜을 사용하여 통신하기 때문에 Message Broker 또한 STOMP 프로토콜을 지원해야 한다. (원래는 필요없는데 Spring 설정 상 필요함. 밑에서 설명)
그리고 우리들의 RabbitMQ는 다행히도 STOMP 프로토콜을 지원한다.
MQ를 외부로 분리시켜 중앙 집중화를 시키면, 이제 각 채팅 서버가 갖는 상태는 사용자의 상태밖에 없다.
채팅 서버가 down 되어도 MQ의 데이터가 증발할 일도 없고, 다중화를 하더라도 서로 다른 채팅 서버에 연결된 사용자에게도 메시지를 전달할 수 있게 된다.
📌 의존성 추가
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
/* Web Socket */
implementation 'org.springframework.boot:spring-boot-starter-websocket'
/* Reactor Netty */
implementation "org.springframework.boot:spring-boot-starter-reactor-netty" // RabbitMQ에서 브로커 연결 및 통신 설정을 위해 기본으로 사용
/* RabbitMQ */
implementation 'org.springframework.boot:spring-boot-starter-amqp'
implementation 'org.springframework.amqp:spring-rabbit-stream' // 안 쓸 거라면 없어도 됨.
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.amqp:spring-rabbit-test'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}
우선 위와 같은 의존성을 추가해주어야 한다.
버전도 관리해줘야 하는데, 귀찮아서 일단 버전 생략하고 전부 집어넣었다.
- spring-boot-starter-reactor-netty: rabbitmq stream은 기본적으로 연결할 때 reactor netty를 사용하는데, 의존성이 함께 주입되어져 있지 않기 때문에 같이 추가해주어야 한다. (spring-rabbit-stream 안 쓸 거면 필요없을 수도 있다.)
- spring-boot-starter-amqp: RabbitMQ와 AMQP를 사용하기 위해 주입. Message 큐잉, 라우팅과 같은 RabbitMQ 고급 기능을 사용하려면 필요하다.
- spring-rabbit-stream: RabbitMQ 스트립 기능을 사용할 때 필요하다. 대용량 데이터 스트리밍에 유용.
- 나중에 안 건데, 다른 사람들은 이 의존성을 추가하지 않았다. 그래서 나만 처음보는 에러들이 엄청나게 쏟아졌다.
- 비동기·논블로킹 방식의 TCP 연결을 하고 싶으면 사용하면 되는데, 없어도 된다.
- spring-boot-starter-websocket: WebSocket을 사용하기 위해 주입. STOMP 프로토콜이 내장되어 있다.
✒️ AMQP(Advanced Message Queuing Protocol)
이렇게 박스 하나에 몰아넣어서 끝낼 정도로 쉬운 개념이 아니다.
이해가 안 간다면, 구현을 하다가 막혔을 때 다시 보는 게 현명한 방법이지 않을까.
메시지 지향 미들웨어(MOM)을 위한 표준 응용 계층 프로토콜, 즉 메시지 통신을 위한 스펙을 정의하기 위해 사용한다. (RabbitMQ는 AMQP를 오픈 소스 메시지 브로커 중 하나)
손실이나 중복 없이 메시지를 전달해야 하는 경우, 복잡한 라우팅이 필요하고 다양한 애플리케이션과 플랫폼이 원활하게 정보를 교환해야 할 때 사용한다.
AMQP의 Queue의 구조는 다음과 같다.
1. Exchange: Publisher로부터 수신한 메시지를 Queue에 분배하는 라우터
2. Queue: 메시지를 memory 혹은 disk에 저장했다가 Consumer에게 전달하는 역할.
3. Binding: Excahnge에 전달된 메시지가 어떤 Queue에 저장되어 있어야 하는가에 대한 정의. (Routing Key를 사용함.)
말이 좀 어려운데, Routing Key로 Exchange와 Binding이 된 Queue는 해당 Exchange에 등록되었다고 말할 수 있다.
좀 더 쉽게...? 모든 메시지는 Queue로 직접 전달되지 않는다. 반드시 Exchange에서 먼저 받고, 바인딩 규칙에 따라 적절한 Queue로 전달된다.
(더 나가면, Default Exchange는 Topic, Dead Letter는 Fanout 타입을 사용한다.)
그리고 Binding에도 4가지 전략이 있다.
• Direct Exchange: 메시지 Routing Key를 Queue에 1:1 매칭
• Fanout Exchange: 모든 메시지를 모든 큐로 라우팅. (Exchange만 일치하면, Routing Key 무시)
• Topic Exchange: wildcard(*)를 사용하여, 메시지를 하나 또는 N개의 Routing Pattern이 일치하는 Queue에 전달
• Header Exchange: Routing Key에 key-value쌍의 헤더 속성을 추가하며, `x-match` argument로 header를 어떻게 해석하고 바인딩 할 지 결정
추후 다시 나올 내용을 미리 정리해두자면, Exchange 속성도 존재한다.
• Name: Exchage 이름
• Type: Binding 전략 (4가지 중 하나)
• Durability: 메시지 브로커가 시작될 때 데이터 유지 여부
∘ Durable: 메시지 브로커 서버가 재시작되어도 기존 Exchange 메시지가 저장되어 있음
∘ Transient: 메시지 브로커가 재시작되면 기존 Exchange 메시지가 모두 삭제
• Auto-delete: 마지막 Queue 연결이 해제되면, Exchange도 함께 삭제
🤔 외부 브로커가 대체 왜 STOMP를 지원해야 하는가?
생각해보면 이상한 일이다.
Client ↔ Server (STOMP), Server ↔ RabbitMQ (AMQP)로 통신할 거 같으면, 외부 브로커의 STOMP 지원 여부는 의미가 없어진다.
어차피 채팅 서버가 중간에서 번역을 해주는 셈이 아닌가?
위 생각은 합당하다고 생각한다. 아무리 생각해도 이론적으로 문제가 되지 않는다.
그러나 아래에서 다룰 StompBrokerRelay의 기능이 문제가 되는 것이 아닐까 싶다.
해당 기능은 서버가 메시지를 받기만 하고, 실제 메시지 라우팅은 외부 브로커가 처리하도록 넘겨버린다.
이렇게 하면 서버 구현도 쉽고, 중간에서 프로토콜을 변환하는 오버 헤드도 줄일 수 있기 때문에 성능 상으로도 이점을 얻을 수 있을 것이다.
어디까지나 추측이다.
솔직히 덕분에 꿀 빨았잖아. 🤪
📌 RabbitMQ Configuration
spring:
rabbitmq:
host: ${RABBITMQ_HOST:localhost} # RabbitMQ ip 주소
port: 5672 # RabbitMQ 연결 port
username: # RabbitMQ 웹 관리 콘솔 아이디
password: # RabbitMQ 웹 관리 콘솔 비밀번호
virtual-host: /
rabbitmq:
chat-queue:
name: "chat.queue" # 사용할 Queue 이름 지정
chat-exchange:
name: "chat.exchange" # 사용할 exchange 이름 (구독할 때 사용)
chat-routing:
key: "chat.room.*" # routing-key. chat.room.{room_id}를 구독
참고로 rabbitMQ를 설정할 때, 개방해야 하는 포트가 여러 개라 당황스러운데 각각 다음과 같다.
- 5672: Server가 RabbitMQ에 연결할 때 필요한 port
- 15672: RabbitMQ 관리자 UI에 접근하기 위한 port
- 61613: STOMP 통신을 위해 필요한 port
- 공식 문서에서 특별한 설정이 없다면, STOMP 어댑터는 61613 port를 listen 한다고 나와있다.
virtual host(vhost)는 RabbitMQ 브로커에서 여러 개의 메시지 도메인을 사용할 수 있게 해주는 논리적 그룹이라고 한다.
이것도 쓸 데 없이 말이 어려운데, AWS 사용할 때 IAM 계정을 생성하는 거랑 비슷한 느낌이다. (물론 다르다.)
각 vhost는 Queue, Exchange, Binding, 권한에 있어서 완전히 분리된 환경을 구성할 수 있다.
따라서 채팅 기능 이외에 다른 목적으로 RabbitMQ를 사용할 일이 있다면, vhost로 구분하여 사용할 수 있다.
@Configuration
@EnableRabbit
public class RabbitConfig {
private final String CHAT_QUEUE_NAME;
private final String CHAT_EXCHANGE_NAME;
private final String CHAT_ROUTING_KEY;
private final String RABBITMQ_HOST;
public RabbitConfig(
@Value("${rabbitmq.chat-queue.name}") String CHAT_QUEUE_NAME,
@Value("${rabbitmq.chat-exchange.name}") String CHAT_EXCHANGE_NAME,
@Value("${rabbitmq.chat-routing.key}") String CHAT_ROUTING_KEY,
@Value("${spring.rabbitmq.host}") String RABBITMQ_HOST
) {
this.CHAT_QUEUE_NAME = CHAT_QUEUE_NAME;
this.CHAT_EXCHANGE_NAME = CHAT_EXCHANGE_NAME;
this.CHAT_ROUTING_KEY = CHAT_ROUTING_KEY;
this.RABBITMQ_HOST = RABBITMQ_HOST;
}
// "chat.queue"라는 이름의 Queue 생성
@Bean
public Queue chatQueue() {
return new Queue(CHAT_QUEUE_NAME, true); // durable을 true로 제공
}
// 4가지 Binding 전략 중 TopicExchange 전략을 사용. "chat.exchange"를 이름으로 지정
@Bean
public TopicExchange chatExchange() {
return new TopicExchange(CHAT_EXCHANGE_NAME);
}
// Exchange와 Queue를 연결. "chat.queue"에 "chat.exchange" 규칙을 Binding
@Bean
public Binding chatBinding(Queue chatQueue, TopicExchange chatExchange) {
return BindingBuilder
.bind(chatQueue)
.to(chatExchange)
.with(CHAT_ROUTING_KEY);
}
// RabbitMQ와 메시지 담당할 클래스
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter);
return rabbitTemplate;
}
@Bean
public RabbitMessagingTemplate rabbitMessagingTemplate(RabbitTemplate rabbitTemplate) {
return new RabbitMessagingTemplate(rabbitTemplate);
}
// RabbitMQ와 연결 설정. CachingConnectionFactory를 선택
@Bean
public ConnectionFactory createConnectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setHost(RABBITMQ_HOST);
factory.setUsername(""); // RabbitMQ 관리자 아이디
factory.setPassword(""); // RabbitMQ 관리자 비밀번호
factory.setPort(5672); // RabbitMQ 연결할 port
factory.setVirtualHost("/"); // vhost 지정
return factory;
}
// Queue를 구독(Subscribe)하는 걸 어떻게 처리하느냐에 따라 필요함. 당장은 없어도 됨.
@Bean
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(messageConverter);
return factory;
}
// 없어도 잘 동작했었는데, 다른 이슈 처리한다고 명시적으로 선언함;
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin admin = new RabbitAdmin(connectionFactory);
admin.setAutoStartup(true);
return admin;
}
// 메시지를 JSON으로 직렬/역직렬화
@Bean
public Jackson2JsonMessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
쪼개서 설명하는 게 더 복잡할 거 같아서 RabbitConfig는 주석으로 설명을 첨부했다.
해당 설정은 지금 내게 필요한 최소한의 설정만을 해두었을 뿐이므로, 각자의 프로젝트에 맞게 커스텀해야 한다.
참고로 지금은 TopicExchange로 지정했는데, 그냥 테스트 하기가 제일 편해서 썼다.
나중에 다른 전략을 사용해야 할 수도 있다.
📌 WebSocketMessageBroker
@Slf4j
@Configuration
@EnableWebSocketMessageBroker
public class WebBrokerSocketConfig implements WebSocketMessageBrokerConfigurer {
private final String RABBITMQ_HOST;
public WebBrokerSocketConfig(
@Value("${spring.rabbitmq.host}") String rabbitmqHost
) {
this.RABBITMQ_HOST = rabbitmqHost;
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/chat")
// .setAllowedOrigins("*") // 이거 넣으면 allowedOrigins가 true일 때, * 못 넣으니까 pattern 쓰라고 에러 발생함.
.setAllowedOriginPatterns("*") // 실제 환경에선 API 서버 도메인만 허용
.withSockJS(); // JS 라이브러리. 우린 iOS라서 안 씀. 테스트를 위해 허용
}
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
TcpClient tcpClient = TcpClient
.create()
.host(RABBITMQ_HOST)
.port(61613);
// .secure(SslProvider.defaultClientProvider());
ReactorNettyTcpClient<byte[]> client = new ReactorNettyTcpClient<>(tcpClient, new StompReactorNettyCodec());
config.enableStompBrokerRelay("/queue", "/topic", "/exchange", "/amq/queue")
.setAutoStartup(true)
.setTcpClient(client) // RabbitMQ와 연결할 클라이언트 설정
.setRelayHost(RABBITMQ_HOST) // RabbitMQ 서버 주소
.setRelayPort(61613) // RabbitMQ 포트(5672), STOMP(61613)
.setSystemLogin("") // RabbitMQ 시스템 계정
.setSystemPasscode("") // RabbitMQ 시스템 비밀번호
.setClientLogin("") // RabbitMQ 클라이언트 계정
.setClientPasscode(""); // RabbitMQ 클라이언트 비밀번호
config.setPathMatcher(new AntPathMatcher(".")); // url을 chat/room/3 -> chat.room.3으로 참조하기 위한 설정
config.setApplicationDestinationPrefixes("/pub"); // 클라이언트에서 메시지 송신 시 프리픽스
}
}
여기 설정이 뭐가 엄청나게 많은데, 나름대로 이유가 있었다. (처절하게 피를 흘려가며 이슈 컨트롤 한 흔적 😇)
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/chat")
// .setAllowedOrigins("*") // 이거 넣으면 allowedOrigins가 true일 때, * 못 넣으니까 pattern 쓰라고 에러 발생함.
.setAllowedOriginPatterns("*") // 실제 환경에선 API 서버 도메인만 허용
.withSockJS(); // JS 라이브러리. 우린 iOS라서 안 씀. 테스트를 위해 허용
}
위 부분은 CORS 에러를 제어하기 위한 설정이다.
Spring으로 개발하면 API 부분은 Spring Security로 이미 설정을 해주어주었겠지만, 채팅 서버까지 Security를 적용하는 건 쓸 데 없이 애플리케이션을 무겁게 만들기 때문에 필요가 없다.
- addEndpoint(): 하위 규칙을 적용할 Endpoint를 지정한다.
- setAllowedOrigins(): 어떤 블로그에선 이걸 사용하던데, 사용자 헤더에 allowedOrigins가 true로 설정되어 있으면 사용할 수 없으니, patterns로 제어하라고 친절하게 에러 문구를 띄워준다.
- setAllowedOriginPatterns(): 현재는 테스트 편의성을 위해 모든 client를 허용한다. 나중엔 API 서버와 Proxy 서버만 허용할 예정
- withSockJs(): 주의해야 하는 부분. javascript에서 SockJs를 사용해서 웹 소켓 연결하는 경우에 필요하다. 모바일이나 Postman으로 연결할 때는 이걸 지워야 한다. 만약, 모바일과 앱 모두 지원할 거라면 규칙을 따로 만들고, proxy에서 제어해주어야 할 듯.
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
TcpClient tcpClient = TcpClient
.create()
.host(RABBITMQ_HOST)
.port(61613);
// .secure(SslProvider.defaultClientProvider());
ReactorNettyTcpClient<byte[]> client = new ReactorNettyTcpClient<>(tcpClient, new StompReactorNettyCodec());
config.enableStompBrokerRelay("/queue", "/topic", "/exchange", "/amq/queue")
.setAutoStartup(true)
.setTcpClient(client) // RabbitMQ와 연결할 클라이언트 설정
.setRelayHost(RABBITMQ_HOST) // RabbitMQ 서버 주소
.setRelayPort(61613) // RabbitMQ 포트(5672), STOMP(61613)
.setSystemLogin("") // RabbitMQ 시스템 계정
.setSystemPasscode("") // RabbitMQ 시스템 비밀번호
.setClientLogin("") // RabbitMQ 클라이언트 계정
.setClientPasscode(""); // RabbitMQ 클라이언트 비밀번호
config.setPathMatcher(new AntPathMatcher(".")); // url을 chat/room/3 -> chat.room.3으로 참조하기 위한 설정
config.setApplicationDestinationPrefixes("/pub"); // 클라이언트에서 메시지 송신 시 프리픽스
}
위 설정은 자꾸 Server와 RabbitMQ 연결에 실패해서 이것저것 검색하면서 추가한 결과물이다.
대체 왜 나만 연결이 안 되는 건가 싶었는데, 의존성에 rabbitmq-stream을 넣는 바람에 생겼던 것 같다.
Project Reactor의 Netty는 비동기·논블로킹 방식의 TCP 연결을 관리하여, 높은 동시성과 성능을 제공한다.
문제는 기본 TCP 클라이언트로는 연결이 안 되는 건지, 직접 `reactor.netty.tcp.TcpClient` 객체를 생성해서 ReactorNettyTcpClient를 생성한 후 Broker 연결 설정에 넣어줘야 한다.
지금 생각해보면 Config 파일로 빼버려도 되지 않을까 싶다.
그 다음으로는 enableStompBrokerRelay()가 있는데, 얘가 당췌 뭐 하는 건가 싶어서 아무 값이나 넣었다가 또 에러가 엄청 많이 나왔었다. ㅋㅋ
relay는 "중계" 또는 "전달"이라는 의미를 갖는데, Spring 내장 브로커 대신 외부 메시지 브로커(ex. RabbitMQ)를 사용하도록 지시하는 설정값이다.
여기서 전달해주는 인자값은 사용하려는 외부 브로커인 RabbitMQ의 규약에 정해진 값만을 사용 가능하다.
StompBrokerRelayMessageHandler : Received ERROR {message=[Unknown destination], content-type=[text/plain], version=[1.0,1.1,1.2], content-length=[143]} session=ivntdvzc text/plain payload='/sub/chat.room.1' is not a valid destination.
`/sub` 넣어보면 그게 뭐냐고 혼난다.
- `/queue`: point-to-point 메시징을 보낼 때 사용
- `/topic`: 발행/구독(pub/sub) 메시징에 사용
- `/exchange`: RabbitMQ의 exchange를 직접 지정할 때 사용
- `/amq/queue`: RabbitMQ의 특정 큐에 직접 메시지를 보낼 때 사
조금 더 찾아보니 RabbitMQ STOMP 문서에 적혀있었다.
머리가 나쁘면 몸이 고생하는 게 맞다니까..
마지막으로 MessageBrokerRegistory 설정에서 systemLogin과 clientLogin 설정이 다른데, 둘의 목적이 다르다.
우선 현재 설정에선 총 3개의 Actor가 존재한다는 걸 이해해야 한다.
- 클라이언트
- 스프링 애플리케이션
- 브로커
문서에 의하면 SystemLogin은 공유된 "시스템" 연결을 위한 것이며, 애플리케이션 내부에서 STOMP 브로커로 메시지를 전달하는 데 사용한다. 즉, 특정 클라이언트 세션과 연관되지 않은 메시지(ex. REST/HTTP 요청 처리)를 말한다.
ClientLogin은 연결된 클라이언트를 대신하여 STOMP 브로커에 연결을 생성할 때 사용하는 것이다.
지금은 테스트 목적이라 사용자 식별 정보를 알 수 없어서 둘 다 관리자 계정을 넣어버렸지만, 실제 환경에선 ClientLogin은 클라이언트 정보를 담아야 한다.
어떻게 담을 지는 나중에 고민해봐야지.
📌 Docker Compose
compose 파일이랑 Dockerfile-rabbitmq를 같은 경로에 위치시키고 다음과 같이 작성하면 된다.
services:
rabbitmq:
image: rabbitmq:3.13.7-management
container_name: rabbitmq
environment:
- 'RABBITMQ_DEFAULT_PASS=' # 관리자 비밀번호
- 'RABBITMQ_DEFAULT_USER=' # 관리자 아이디
- 'RABBITMQ_ERLANG_COOKIE=RabbitMQ-Cookies'
ports:
- "5672:5672" # RabbitMQ default port
- "15672:15672" # Web UI port
- "61613:61613" # STOMP port
command: >
bash -c "
rabbitmq-plugins enable --offline rabbitmq_stomp rabbitmq_web_stomp &&
docker-entrypoint.sh rabbitmq-server
"
command 블럭이 있는 이유는 rabbitmq의 stomp 설정이 default로 꺼져 있기 때문에 enable로 수정해주어야 한다.
원래라면 직접 컨테이너에 들어가서 명령어 입력해야 하는데, 귀찮아서 compose로 한 번에 처리해주었다.
여기까지 하고 정상적으로 애플리케이션이 실행되면 성공
📌 RabbitMQ Web UI
브라우저에서 15672 포트로 접근하면 RabbitMQ Web UI를 확인할 수 있다.
처음에 로그인하라고 나오면, compose 설정에서 입력한 id, pw를 입력하면 된다.
spring config에서 설정했던 것처럼 vhost가 `/`고, chat.exchange라는 이름의 topic 타입 Queue가 생성되어 있음을 확인할 수 있다.
이게 반드시 생성되어 있는 걸 확인해야 한다. 안 그러면 클라이언트가 존재하지 않는 Exchange를 구독하려 시도해서 실패한다.
📌 Message Send
클라이언트 테스트 코드를 작성하기 전에 메시지를 보낼 수 있도록 미리 설정해보자.
public record ChatMessage(
String roomId,
String content
) {}
@Service
public class ChatMessageProducer {
private final RabbitTemplate rabbitTemplate;
private final String CHAT_EXCHANGE_NAME;
public ChatMessageProducer(RabbitTemplate rabbitTemplate,
@Value("${rabbitmq.chat-exchange.name}") String CHAT_EXCHANGE_NAME) {
this.rabbitTemplate = rabbitTemplate;
this.CHAT_EXCHANGE_NAME = CHAT_EXCHANGE_NAME;
}
public void sendMessage(ChatMessage message) {
rabbitTemplate.convertAndSend(CHAT_EXCHANGE_NAME, "chat.room." + message.roomId(), message);
}
}
rabbitTemplate가 요구하는 매개변수를 올바르게 넣어주면 된다.
exchange는 설정 값을 불러오면 되고, routingKey는 메시지를 publish할 방을 구체적으로 명시해준다.
마지막에는 전달하려는 message object를 인자로 넣어주면 끝.
@Slf4j
@Controller
@RequiredArgsConstructor
public class ChatController {
private final ChatMessageProducer chatMessageProducer;
@MessageMapping("chat.message.{roomId}")
public void sendMessage(@DestinationVariable String roomId, ChatMessage message) {
log.info("sendMessage: roomId={}, message={}", roomId, message);
chatMessageProducer.sendMessage(message);
}
}
클라이언트의 STOMP 기반 요청을 읽어서 위의 컨트롤러로 라우팅해준다.
config에서 설정했듯이 publish할 때의 요청은 접두사로 `/pub`를 사용하기로 했으므로, 위 컨트롤러로 요청을 보내기 위해선 `/pub/chat.message.1`로 보내야 한다.
📌 Client WebSocket Connect & Subscribe
<!DOCTYPE html>
<html>
<head>
<title>WebSocket Chat Test</title>
<script src="https://cdnjs.cloudflare.com/ajax/libs/sockjs-client/1.6.1/sockjs.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script>
</head>
<body>
<div>
<input type="text" id="roomId" placeholder="Room ID">
<input type="text" id="message" placeholder="Type a message...">
<button onclick="sendMessage()">Send</button>
</div>
<div id="messages"></div>
<script>
var stompClient = null;
function connect() {
// transport의 세 가지 값은 연결 시도 방식을 의미하며, 이전 연결이 실패하면 이후 연결을 시도한다.
// - websoket: 실시간 양방향 통신
// - xhr-streaming: 긴 지속 시간의 HTTP 요청으로 연결
// - xhr-polling: 주기적으로 HTTP 요청을 보내 서버의 데이터 확인
var socket = new SockJS('http://localhost:8080/chat', null, {transports: ["websocket", "xhr-streaming", "xhr-polling"]});
console.log(socket)
stompClient = Stomp.over(socket);
stompClient.debug = function(str) {
console.log(str);
};
stompClient.connect({}, function(frame) {
console.log('Connected: ' + frame);
// 구독할 대상을 '/exchange'로 시작. '/{구독할 exchange}'지정. '/{구독할 queue}' 지정
stompClient.subscribe('/exchange/chat.exchange/chat.room.1', function(message) {
console.log('Received: ' + message);
showMessage(JSON.parse(message.body).content);
}, {'auto-delete':true, 'durable':false, 'exclusive':false});
// auto-delete: Consumer가 사라지면 자동 삭제되는 Queue
// durable: 서버와 연결이 끊어져도 메시지 보관 여부
// exclusive: 하나의 Consumer만 접근 가능 여부
});
}
function sendMessage() {
var roomId = document.getElementById("roomId").value;
var content = document.getElementById("message").value;
// spring에서 config.setApplicationDestinationPrefixes("/pub")로 지정한 접두사를 사용해야 한다.
stompClient.send("/pub/chat.message." + roomId, {}, JSON.stringify({
'roomId': roomId,
'content': content
}));
}
function showMessage(message) {
var messageElement = document.createElement('div');
messageElement.appendChild(document.createTextNode(message));
document.getElementById('messages').appendChild(messageElement);
}
// Connect when the page loads
window.onload = connect;
</script>
</body>
</html>
별로 어려운 코드가 아니라서 쉽게 이해할 수 있을 듯.
브라우저 두 개 열고, 채팅을 입력하면 정상 동작하는 것을 확인할 수 있다.
또한 RabbitMQ에도 사용자 별로 MQ가 생성되어 있음을 볼 수 있다.
📌 Inbound Interceptor
개인적으로 구독 요청을 할 때 `/exchange/chat.exchange`로 보내야 하는 게 마음에 들지 않았다.
위 경로는 RabbitMQ 스펙에 완전히 종속적인 상태고, 나중에 다른 MQ 브로커를 사용해야 할 때도 모든 클라이언트의 구독 경로를 수정해야 할 수도 있다.
stompClient.subscribe('/sub/chat.room.1', function(message) { ... }
클라이언트가 `/sub/`를 사용하게 만들려면 어떻게 해야할까?
구현 중인 WebSocketMessageBrokerConfigurer를 살펴보면 어디서 익숙한 이름의 메서드가 보인다.
InboundChannel? 대충 클라이언트한테서 들어오는 요청에 대한 무언가를 처리할 수 있다는 말이 아닐까?
그럼 해당 메서드가 갖는 ChannelRegistration은 뭘 하는 녀석일까?
public class ChannelRegistration {
...
public ChannelRegistration interceptors(ChannelInterceptor... interceptors) {
this.interceptors.addAll(Arrays.asList(interceptors));
return this;
}
...
}
아무래도 Channel에 대한 여러 설정들을 커스텀할 수 있도록 만든 거 같은데, 내가 원하던 키워드가 나와버렸다.
interceptor 너를 찾고 있었다구.
메서드 이름만 봐도 벌써 감이 온다.
이걸 사용하면 내가 원하는 기능을 구현할 수 있을 거라는 느낌이 왔다.
@Slf4j
@Component
public class StompInboundInterceptor implements ChannelInterceptor {
@Override
public Message<?> preSend(@NonNull Message<?> message, @NonNull MessageChannel channel) {
log.info("Inbound message: {}", message);
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
log.info("preSend: accessor={}", accessor);
if (StompCommand.SUBSCRIBE.equals(accessor.getCommand())) {
String destination = accessor.getDestination();
String sessionId = accessor.getSessionId();
log.info("preSend: destination={}, sessionId={}", destination, sessionId);
// 구독 전 필요한 처리 수행 (ex. 사용자 권한 확인, 구독 대상 변경, 로깅 등)
if (destination != null && destination.startsWith("/sub/")) { // client /sub/ 요청을 /exchange/chat.exchange/로 변경
// RabbitMQ exchange 직접 접근 방지
String convertedDestination = convertDestination(destination);
log.info("preSend: convertedDestination={}", convertedDestination);
accessor.setDestination(convertedDestination);
}
}
return message;
}
private String convertDestination(String originalDestination) {
return originalDestination.replace("/sub/", "/exchange/chat.exchange/");
}
}
테스트할 때 유용하게 쓰려고 로그를 많이 넣어버리긴 했는데, 여기서 중요한 건 convertDestination()이다.
이젠 사용자가 `/sub` 접두사로 구독 요청을 보내도, 내부적으로 해당 문자를 바꿔 RabbitMQ로 전달하므로 목적을 달성할 수 있을 것이다.
그리고 깔끔하게 성공.
💡 Interceptor 로직은 Refactoring 파트에서 코드를 개선했습니다. 참고 바랍니다.
3. Proxy Server Routing
📌 고민해보기
현재 구조는 처음 목표와는 다른 점이 있다.
클라이언트가 곧장 채팅 서버로 연결을 시도하고 있다는 점이다.
아파치 주키퍼까진 사용하진 않더라도, 추후 확장성을 고려했을 때 해당 방식은 너무 위험하다.
채팅 서버에서 모든 Origin을 허용하는 것도 그렇고, client가 고정된 url로 바로 요청하는 로직에서 채팅 서버 탐색 후 socket 연결을 시도하는 로직으로 수정하게 되면
앱을 사용하고 있던 모든 사용자들에게 강제 버전 업데이트를 요구해야만 한다. (앱 서비스가 클라이언트에게 강제로 버전 업데이트를 요구하는 건 좋지 않는 선택이다.)
그렇다면 형식만이라도 위 구조를 만족시키려면, 일단은 유효한 url을 반환시킬 API가 필요하다.
📌 Service Search Service
위 설계는 실제 프로젝트에 적용할 멀티 모듈 구조를 베이스로 작성했지만, 공부를 위한 현 프로젝트는 모노레포, 싱글모듈로 진행 중이다.
그래도 나중에 귀찮지 않기 위해 최대한 설계를 준수해가면서 작성해주었다.
@Slf4j
@RestController
@RequiredArgsConstructor
@RequestMapping(value = "/ws")
@CrossOrigin(originPatterns = "*", methods = {RequestMethod.GET}, allowedHeaders = "*", allowCredentials = "true")
public class WebSocketProxyController {
private final WebSocketProxyHandler webSocketProxyHandler;
@GetMapping("")
public ResponseEntity<?> getWebSocketServerUrl(
HttpServletRequest request,
@RequestHeader HttpHeaders headers
) {
return ResponseEntity.ok(webSocketProxyHandler.getWebSocketServerUrl(request, headers));
}
}
public interface WebSocketProxyHandler {
WebSocket.Url getWebSocketServerUrl(HttpServletRequest request, HttpHeaders headers);
}
public class WebSocket {
public record Url(String url) {}
}
@Slf4j
@Component
@RequiredArgsConstructor
public class DefaultWebSocketProxyHandler implements WebSocketProxyHandler {
@Override
public WebSocket.Url getWebSocketServerUrl(HttpServletRequest request, HttpHeaders headers) {
return new WebSocket.Url("http://localhost:8080/chat");
}
}
Security가 없어서 CrossOrigin까지 죄다 설정해줘야 하는 이 번거로움..너무 오랜만인 걸?
내용은 기초적인 코드라 설명 모두 스킵하고, url 마저 환경 변수로 분리하지 않고 레거시로 때려박아버렸다. ㅋ
그럼 이제 클라이언트 코드에서 url 조회를 먼저 수행하고, 그 다음 SockJS에 url을 넣어주면 되지 않을까?
function socket() {
connect().then(socket => {
console.log("Connected successfully");
}).catch(error => {
console.error("Connection error:", error);
});
}
async function connect() {
try {
const url = await getServerUrl();
console.log("Connecting to:", url);
const socket = new SockJS(url, null, {transports: ["websocket", "xhr-streaming", "xhr-polling"]});
console.log(socket)
stompClient = Stomp.over(socket);
stompClient.debug = function(str) {
console.log(str);
};
stompClient.connect({}, function(frame) {
console.log('Connected: ' + frame);
stompClient.subscribe('/sub/chat.room.1', function(message) {
console.log('Received: ' + message);
showMessage(JSON.parse(message.body).content);
}, {'auto-delete':true, 'durable':false, 'exclusive':false});
// auto-delete: Consumer가 사라지면 자동 삭제되는 Queue
// durable: 서버와 연결이 끊어져도 메시지 보관 여부
// exclusive: 하나의 Consumer만 접근 가능 여부
});
} catch (error) {
console.error("Error connecting to server:", error);
}
}
async function getServerUrl() {
try {
const response = await fetch("http://localhost:8080/ws", {method: 'GET'});
console.log("Response : " + response)
const data = await response.json();
console.log("Data : " + data)
return data["url"];
} catch (error) {
console.error("Error fetching server URL:", error);
throw error;
}
}
getServerUrl()에서 chat server url인 "http://localhost:8000/chat"을 받아온 후, 채팅 서버로 곧장 요청을 보내고 있으므로 성공한다.
📌 Proxy Server
이제 본격적으로 Web Server를 전면에 둘 차례다.
Client는 8080포트가 아닌, nginx가 동작하는 8000 포트로 전송하도록 만들고 채팅 서버의 origin을 nginx로부터 접근하는 요청만 허락할 것이다.
services:
app:
build:
context: .
dockerfile: Dockerfile-server
container_name: spring-app
ports:
- "8080:8080"
env_file:
- .env
depends_on:
- rabbitmq
- redis
networks:
- app-network
nginx:
image: nginx:latest
container_name: nginx
ports:
- "8000:80"
volumes:
- ./proxy/nginx.conf:/etc/nginx/conf.d/default.conf
depends_on:
- app
networks:
- app-network
rabbitmq:
image: rabbitmq:3.13.7-management
container_name: rabbitmq
environment:
- 'RABBITMQ_DEFAULT_PASS='
- 'RABBITMQ_DEFAULT_USER='
- 'RABBITMQ_ERLANG_COOKIE=RabbitMQ-Cookies'
ports:
- "5672:5672" # RabbitMQ default port
- "15672:15672" # Web UI port
- "61613:61613" # STOMP port
command: >
bash -c "
rabbitmq-plugins enable --offline rabbitmq_stomp rabbitmq_web_stomp &&
docker-entrypoint.sh rabbitmq-server
"
networks:
- app-network
networks:
default:
driver: bridge
app-network:
driver: bridge
upstream app_servers {
server app:8080;
}
map $http_origin $allowed_origin {
default "";
"http://localhost:63342" $http_origin; # 현재는 클라이언트 직접 지정해서 사용. 실제 환경에선 API 서버 Security에 CORS 설정이 되어 있어서 불필요(아마도)
}
server {
listen 80;
listen [::]:80;
server_name localhost;
error_log /var/log/nginx/error.log debug;
# 백엔드에서 설정한 CORS 헤더 제거
proxy_hide_header 'Access-Control-Allow-Origin';
proxy_hide_header 'Access-Control-Allow-Credentials';
location /chat {
if ($request_method = 'OPTIONS') {
add_header 'Access-Control-Allow-Origin' $allowed_origin always;
add_header 'Access-Control-Allow-Methods' 'GET, POST, DELETE, PATCH, OPTIONS' always;
add_header 'Access-Control-Allow-Headers' 'Content-Type, Authorization' always;
add_header 'Access-Control-Allow-Credentials' 'true' always;
return 204;
}
proxy_pass http://app_servers;
proxy_http_version 1.1; # WebSocket 연결은 HTTP/1.1 이상으로 연결해야 함
add_header 'Access-Control-Allow-Origin' $allowed_origin always;
add_header 'Access-Control-Allow-Credentials' 'true' always;
# WebSocket 연결을 위한 설정
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $http_host;
proxy_set_header Origin "http://127.0.0.1:8000"; # Origin 헤더 제거 안 하면, 계속 CORS 에러 발생
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
location /ws {
if ($request_method = 'OPTIONS') {
add_header 'Access-Control-Allow-Origin' $allowed_origin always;
add_header 'Access-Control-Allow-Methods' 'GET, POST, DELETE, PATCH, OPTIONS' always;
add_header 'Access-Control-Allow-Headers' 'Content-Type, Authorization' always;
add_header 'Access-Control-Allow-Credentials' 'true' always;
return 204;
}
proxy_pass http://app_servers;
proxy_http_version 1.1;
add_header 'Access-Control-Allow-Origin' $allowed_origin always;
add_header 'Access-Control-Allow-Credentials' 'true' always;
proxy_set_header Host $http_host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
}
$http_origin을 설정한 이유는 저거 안 하면 nginx에서 자꾸 400에러를 내뱉으면서 클라이언트 요청을 거부하기 때문이었다.
실제 서버 환경에서 사용하기엔 부적절하므로 추후 수정이 필요한 부분이다. (저거 때문에 시간을 너무 많이 쏟아서 일단 패스)
# WebSocket 연결을 위한 설정
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $http_host;
proxy_set_header Origin "http://127.0.0.1:8000"; # Origin 헤더 제거 안 하면, 계속 CORS 에러 발생
제일 중요한 건 이부분이다.
Upgrade와 Connection은 WebSocket 규약이고, Host 헤더는 Http 1.1에서 필수로 포함되어야 하는 헤더에 속한다.
마지막으로 날 제일 고생시켰던 게 Origin 부분인데, 이 값을 제거하지 않으면 채팅 서버에 접근할 때 계속 CORS 에러가 발생하면서 소켓 연결에 실패한다. ^^
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/chat")
.setAllowedOriginPatterns("http://127.0.0.1:8000") // 실제 환경에선 API 서버 도메인만 허용
.withSockJS(); // JS 라이브러리. 우린 iOS라서 안 씀. 테스트를 위해 허용
}
여기까지 하면 위와 같이 와일드 카드를 제거해도 정상적으로 동작한다.
🤔 Origin을 강제로 바꾸는 게 좋은 방법일까?
이 방법은 proxy server가 사용자의 요청을 조작하는 거나 다름없어 보인다.
그리고 사실상 와일드 카드를 설정한 것과 다를 게 뭔지...
SockJS의 경우엔 처음 요청에 반드시 `/info` 경로로 HTTP 요청을 보낸다.
이 요청을 Proxy에서 조절하여 API로 강제로 향하게 만든 다음, JWT 인증을 수행하게 하는 방법도 고민은 해봤으나, 채팅 서버 url만 안다면 WebSocket을 사용해 다이렉트로 채팅 서버에 연결해도 무방한 거 아닌가?
채팅 서버에 인증 로직을 넣기 싫어서, 되도록 분리해보려고 했는데 어차피 자원 접근 검사 할 때 필요하기도 하고
초기 소켓 연결 과정에서 인증 과정을 수행하는 로직을 포함하는 것은 필수 불가결하지 싶다. ㅜㅜ
4. User Info Caching & Authorization
📌 JWT Authentication과 STOMP
💡 해당 파트는 인증 과정의 필요성부터 끊임없이 고민하고 있는 파트입니다. 진짜 참고로만 읽어주세요.
처음 설계할 때만 해도 Chat Server는 인증 로직이 전혀 불필요하다고 생각했다.
Spring Document에 나와있듯이, WebSocket Hand-shake나 SockJS HTTP 전송 요청의 경우 일반적으로 이미 인증 과정을 수행했기 때문이다. (Spring Security가 됐건, 뭐가 됐건 이미 HTTP 요청에 대한 인증 필터를 구현한 서비스의 경우)
따라서 Spring은 STOMP over Websocket 시나리오에서는 STOMP 프로토콜 수준의 인증 헤더를 무시하고, 사용자가 이미 HTTP 수준에서 인증되었다고 가정한다. (STOMP 프로토콜 frame에 login 헤더가 존재하긴 하나, 이건 STOMP over TCP를 위해 설계되었다.)
😅 내가 생각했던 인증 과정이 필요없다고 생각한 이유
사실 내가 생각했던 이유는 조금 다른 이유였다.
설계할 때만 해도, sevice search service에서 최적의 chat server를 탐색한 후 client의 연결까지 direct로 수행해줄 수 있을 것이라 생각했었다.
이렇게 되면, client의 web socket 연결을 위한 모든 요청은 API를 거쳐야 하므로, 연결만 된다면 그 이후로는 인가(Authorization)에 대해서만 고려하면 된다고 생각했기 때문이다.
하지만, 막상 구현해보니 말이 안 되는 접근법이었다.
클라이언트가 어떤 Stomp 라이브러리를 사용하냐에 따라 연결 과정이 천차만별인데, 이걸 모두 API에서 대응해준다는 것부터 어이가 없는 설계 방법이기도 했고..
SockJS는 요청 전에 `HTTP /info` 반드시 보내지만, SockJS 안 쓰고 직접 구현한 WebSocket으로 연결한다면 URL만 알면 누구나 인증 단계를 건너뛰고 채널에 접근할 수 있다.
문제는 문서에서 나와있듯, 사용자가 쿠기 기반 HTTP 세션을 통해 유지되는 보안 컨텍스트를 사용함을 전제로 둔다는 것이다.
우리 서비스는 Session을 관리하지 않는다.
Stateless를 지향하기 위해 JWT 방식으로 모든 인증/인가를 수행하고 있는데, 그럼 이 방식 또한 spring이 말한대로 "이미 인증되었다"치고 넘어갈 수 있을까?
그렇지 않다.
(열심히 쓰고 방법 구상 중이었는데, 바로 다음 페이지에 토큰 인증 적혀있었음 ㅋㅋㅋㅋㅋㅋㅋㅋㅋ)
문서에서 주의 깊게 봐야하는 부분을 정리해봤다.
- 서버 측 세션을 유지하지 않는 애플리케이션이나 인증을 위해 헤더를 사용하는 것이 일반적인 모바일 애플리케이션에선 쿠기 기반 세션이 항상 적합하지는 않다.
- RFC 6444에선 "WebSocket hand-shake 중에 서버가 클라이언트를 인증할 수 있는 특정 방법을 규정하지 않는다"고 명시되어 있다.
- 그러나 실제로는 브라우저 클라이언트는 표준 인증 헤더 또는 쿠키만 사용 가능하며, 사용자 지정 헤더를 제공할 수 없다.
- SockJS에서도 Authorization 헤더를 지정할 수 없음에 대해, 2019년까지 활발한 논의가 이어졌다.
- 사람들은 대안책으로 Query Parameter로 Access Token을 전달하는 대안을 내놓았지만, 토큰이 서버 로그의 URL과 기록될 수 있다는 고유한 단점이 존재한다.
- HTTP 프로토콜 수준에선 충분한 대안이 없을 테니, STOMP 메시징 프로토콜 수준에서 헤더로 인증하는 방법을 고려해봐라.
- STOMP Client를 사용해, Connect 시점에 인증 헤더를 전달하고, ChannelInterceptor를 사용해 Server 단에서 해결하는 게 나을 것이다.
📌 Socket Connect with Authorization Header
우선 Client 측 코드를 한 번 정리할 겸 갈아엎었다.
JS 지식만 있다면 별로 어려운 코드는 아니고, 로그인, 로그아웃 과정을 추가해주었다.
<!DOCTYPE html>
<html>
<head>
<title>WebSocket Chat Test</title>
<script src="https://cdnjs.cloudflare.com/ajax/libs/sockjs-client/1.6.1/sockjs.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script>
</head>
<body>
<button id="logoutButton" style="display: none;">Logout</button>
<div id="loginSection">
<input type="number" id="userId" placeholder="User ID">
<button id="loginButton">Login</button>
<div id="loginError" style="color: red;"></div>
</div>
<div id="chatSection" style="display: none;">
<input type="text" id="roomId" placeholder="Room ID">
<input type="text" id="message" placeholder="Type a message...">
<button id="sendButton">Send</button>
<div id="messages"></div>
</div>
<script>
const API_URL = 'http://localhost:8000';
let stompClient = null;
// Token Management
const TokenManager = {
setAccessToken: (token) => sessionStorage.setItem('accessToken', token),
getAccessToken: () => sessionStorage.getItem('accessToken'),
clearTokens: () => {
sessionStorage.removeItem('accessToken');
document.cookie = 'refreshToken=; Max-Age=0; path=/; secure; samesite=none';
}
};
// API Request Wrapper
async function apiRequest(url, options = {}) {
const accessToken = TokenManager.getAccessToken();
if (accessToken) {
options.headers = {
...options.headers,
'Authorization': "Bearer " + accessToken
};
}
try {
const response = await fetch(url, options);
if (response.status === 401) {
const refreshed = await refreshToken();
if (refreshed) {
return apiRequest(url, options);
} else {
throw new Error('Authentication failed');
}
}
return response;
} catch (error) {
console.error('API request failed:', error);
throw error;
}
}
// Token Refresh
async function refreshToken() {
try {
const response = await fetch(`${API_URL}/api/auth/refresh`, {
method: 'GET',
credentials: 'include'
});
if (response.ok) {
console.log("[Refresh Token] : " + response.headers.get('Authorization'));
TokenManager.setAccessToken(response.headers.get('Authorization'));
return true;
}
return false;
} catch (error) {
console.error('Token refresh failed:', error);
return false;
}
}
// Login Function
async function login() {
const userId = document.getElementById('userId').value;
try {
const response = await fetch(`${API_URL}/api/auth/login/${userId}`, {
method: 'GET',
credentials: 'include'
});
// 디버깅용 AT, RT 수신 확인
for (var pair of response.headers.entries()) {
console.log(pair[0]+ ': '+ pair[1]);
}
if (response.ok) {
TokenManager.setAccessToken(response.headers.get('authorization'));
showChatSection();
connectWebSocket();
} else {
document.getElementById('loginError').textContent = 'Login failed';
}
} catch (error) {
console.error('Login error:', error);
document.getElementById('loginError').textContent = 'Login failed';
}
}
// Logout Function
async function logout() {
try {
await fetch(`${API_URL}/api/auth/logout`, {
method: 'GET',
headers: {
'Authorization': TokenManager.getAccessToken()
},
credentials: 'include'
});
TokenManager.clearTokens();
showLoginSection();
if (stompClient !== null) {
stompClient.disconnect();
}
} catch (error) {
console.error('Logout error:', error);
}
}
// WebSocket Connection
async function connectWebSocket() {
try {
const url = await getSocketServerUrl();
console.log("Connecting to:", url);
const socket = new SockJS(url, null, {transports: ["websocket"]});
console.log(socket);
stompClient = Stomp.over(socket);
stompClient.debug = function(str) {
console.log(str);
};
const headers = {
Authorization: TokenManager.getAccessToken()
};
stompClient.connect(headers, onConnected, onError);
} catch (error) {
console.error('WebSocket connection error:', error);
showLoginSection();
}
}
async function getSocketServerUrl() {
try {
// const response = await fetch(`${API_URL}/ws`, {method: 'GET'});
const response = await apiRequest(`${API_URL}/ws`, {method: 'GET'});
console.log("Response : " + response)
const data = await response.json();
console.log("Data : " + data)
return data["url"];
} catch (error) {
console.error("Error fetching server URL:", error);
throw error;
}
}
function onConnected(frame) {
console.log('Connected: ' + frame);
stompClient.subscribe('/sub/chat.room.1', onMessageReceived, {'auto-delete':true, 'durable':false, 'exclusive':false});
}
function onError(error) {
console.error('WebSocket error:', error);
if (error.headers && error.headers.message === 'Unauthorized') {
refreshToken().then(success => {
if (success) {
connectWebSocket();
} else {
showLoginSection();
}
});
}
}
function onMessageReceived(message) {
console.log('Received: ' + message);
showMessage(JSON.parse(message.body).content);
}
function sendMessage() {
const roomId = document.getElementById('roomId').value;
const content = document.getElementById('message').value;
stompClient.send("/pub/chat.message." + roomId, {'Authorization': TokenManager.getAccessToken()}, JSON.stringify({
'roomId': roomId,
'content': content
}));
}
function showMessage(message) {
const messageElement = document.createElement('div');
messageElement.appendChild(document.createTextNode(message));
document.getElementById('messages').appendChild(messageElement);
}
function showLoginSection() {
document.getElementById('loginSection').style.display = 'block';
document.getElementById('chatSection').style.display = 'none';
document.getElementById('logoutButton').style.display = 'none';
}
function showChatSection() {
document.getElementById('loginSection').style.display = 'none';
document.getElementById('chatSection').style.display = 'block';
document.getElementById('logoutButton').style.display = 'block';
}
// Event Listeners
document.addEventListener('DOMContentLoaded', () => {
document.getElementById('loginButton').addEventListener('click', login);
document.getElementById('sendButton').addEventListener('click', sendMessage);
document.getElementById('logoutButton').addEventListener('click', logout);
});
</script>
</body>
</html>
이젠 무식하게 페이지 열자마자 WebSocket을 연결하려 시도하지 않고, 우선 userId에 해당하는 정보를 서버에서 조회한 뒤, 데이터가 존재하면 AT, RT 토큰을 받은 후 WebSocket 연결에 시도한다.
Server에선 다음과 같이 구현하긴 했는데, 그냥 굴러가기만 하도록 만들어 놓은 쓰레기 코드니까
본인이 이미 만든 코드가 있다면, 그걸 사용하시길 추천합니다..
@Slf4j
@RestController
@RequiredArgsConstructor
@RequestMapping(value = "/api/auth")
@CrossOrigin(originPatterns = "*", methods = {RequestMethod.GET, RequestMethod.POST}, allowedHeaders = "*", allowCredentials = "true", exposedHeaders = {HttpHeaders.AUTHORIZATION, HttpHeaders.COOKIE})
public class AuthController {
private final UserService userService;
private final AccessTokenProvider accessTokenProvider;
private final RefreshTokenProvider refreshTokenProvider;
@GetMapping("/login/{userId}")
public ResponseEntity<?> login(@PathVariable("userId") Long userId) {
User user = userService.readById(userId);
String accessToken = accessTokenProvider.generateToken(AccessTokenClaim.of(user.getId(), user.getRole().getType()));
String refreshToken = refreshTokenProvider.generateToken(RefreshTokenClaim.of(user.getId(), user.getRole().getType()));
log.info("accessToken: {}", accessToken);
log.info("refreshToken: {}", refreshToken);
ResponseCookie cookie = ResponseCookie.from("refreshToken", refreshToken)
.httpOnly(true)
.secure(true)
.path("/")
.maxAge(Duration.ofDays(7).toSeconds())
.sameSite("None")
.build();
return ResponseEntity.ok()
.header(HttpHeaders.AUTHORIZATION, accessToken)
.header(HttpHeaders.SET_COOKIE, cookie.toString())
.body(Map.of("userId", userId));
}
@GetMapping("/logout")
public ResponseEntity<?> logout(
@RequestHeader("Authorization") String accessToken,
@CookieValue("refreshToken") String refreshToken
) {
return ResponseEntity.ok()
.header(HttpHeaders.AUTHORIZATION, "")
.header(HttpHeaders.SET_COOKIE, "refreshToken=; Max-Age=0")
.body(Map.of("message", "로그아웃 되었습니다."));
}
@GetMapping("/refresh")
public ResponseEntity<?> refresh(
@CookieValue("refreshToken") String refreshToken
) {
JwtClaims claims = refreshTokenProvider.getJwtClaimsFromToken(refreshToken);
Long userId = JwtClaimsParserUtil.getClaimsValue(claims, "userId", Long::parseLong);
String role = JwtClaimsParserUtil.getClaimsValue(claims, "role", String.class);
String newAccessToken = accessTokenProvider.generateToken(AccessTokenClaim.of(userId, role));
String newRefreshToken = refreshTokenProvider.generateToken(RefreshTokenClaim.of(userId, role));
ResponseCookie cookie = ResponseCookie.from("refreshToken", newRefreshToken)
.httpOnly(true)
.secure(true)
.path("/")
.maxAge(Duration.ofDays(7).toSeconds())
.sameSite("None")
.build();
return ResponseEntity.ok()
.header(HttpHeaders.AUTHORIZATION, newAccessToken)
.header(HttpHeaders.SET_COOKIE, cookie.toString())
.body(Map.of("userId", userId));
}
}
중요한 건 이 부분이다.
// WebSocket Connection
async function connectWebSocket() {
try {
...
const headers = {
Authorization: TokenManager.getAccessToken()
};
stompClient.connect(headers, onConnected, onError); // STOMP header에 Authorization 추가
} catch (error) {
...
}
}
이러면 StompClient Connect 시에, Access Token을 query param이 아닌 header로 전달할 수 있다.
처음에 WebSocket Request Header를 보고 인증 헤더가 존재하지 않아 실패한 줄 알고 낙담했는데,
WebSocket 프로토콜 레벨에서는 Authorization과 같은 추가 헤더를 지원하지 않는다.
그래서 여기에 인증 헤더가 담겨 있을 리가 없다는 것.
StompClient에 의해 출력되는 로그를 보면 Connect 요청 시에, 올바르게 헤더 정보를 담고 있음을 확인할 수 있다.
위에서 편의 목적으로 StompInboundInterceptor를 구현해놓은 게 있었는데,
Server 또한 Connect 요청 시, 인증 헤더를 수신하고 있다.
이제 이걸로 각자 서비스에 맞게 인증 로직을 수행해주면 끝난다.
단, 주의해야 할 것이 Spring Security의 인증 기능을 사용하고 있다면, 반드시 Security보다 먼저 수행될 수 있도록 @Order를 지정해주라고 한다.
난 채팅 서버까지 무거운 Security를 주입해주고 싶지 않으므로, 사용하지 않을 예정.
AT이 만료되면, 채팅 서버가 처리할 게 아니라, 클라이언트한테 API 서버에서 다시 발급받아오라고 보내면 그만이다.
@Slf4j
@Component
@RequiredArgsConstructor
public class StompInboundInterceptor implements ChannelInterceptor {
private final AccessTokenProvider chatAccessTokenProvider;
@Override
public Message<?> preSend(@NonNull Message<?> message, @NonNull MessageChannel channel) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (StompCommand.CONNECT.equals(accessor.getCommand())) {
// Object authorization = accessor.getHeader("Authorization"); // 이거 왜 안 됨 ㅋㅋ
String authorization = accessor.getFirstNativeHeader("Authorization");
if (authorization != null && authorization.startsWith("Bearer ")) {
// ==== 각자 서비스 정책에 맞게 인증 로직 구현
String accessToken = authorization.substring(7);
JwtClaims claims = chatAccessTokenProvider.getJwtClaimsFromToken(accessToken);
// =====
// accessor.setUser(principal); // 아직 미설정
}
}
...
return message
}
}
(일단 동작하는 게 보고 싶어서 코드는 대충 썼습니다 ㅎㅎ)
accessor에서 getHeader()로 헤더 추출하니까 계속 null이 나와서 검색해보니, getNavigteHeader()를 사용하란다.
여기까지하면 Connect 단계에서 인증 로직 구현까지 성공
📌 Principal
공식 문서를 보면 StompHeaderAccessor에 Principle을 구현한 객체를 등록할 수 있다.
Stateless 서버 환경에 너무 적응되어 있어서, 당연히 요청할 때마다 setUser()를 해줘야 하는 거겠지 싶었는데
Connect 할 때 한 번만 등록하면, Session이 연결되어 있는 동안은 사용자 정보가 계속 유지되어 있다고 한다.
나는 Chat Server에 무거운 Security를 얹어놓고 싶지 않으니, 사용할 Principal을 구현한 객체를 직접 만들어 줄 필요가 있다.
public interface Principal {
public boolean equals(Object another);
public String toString();
public int hashCode();
public String getName();
public default boolean implies(Subject subject) {
if (subject == null)
return false;
return subject.getPrincipals().contains(this);
}
}
java의 기본 security 기능에 정의된 Principal을 구현한 임의의 객체를 생성해주면 된다.
이건 뭐 개인 서비스마다 다르기도 하고, 채팅 서비스 구현할 정도면 이 정도는 기초적인 내용이므로 코드는 스킵.
if (StompCommand.CONNECT.equals(accessor.getCommand())) {
String authorization = accessor.getFirstNativeHeader("Authorization");
if (authorization != null && authorization.startsWith("Bearer ")) {
String accessToken = authorization.substring(7);
JwtClaims claims = chatAccessTokenProvider.getJwtClaimsFromToken(accessToken);
Long userId = JwtClaimsParserUtil.getClaimsValue(claims, AccessTokenClaimKeys.USER_ID.getValue(), Long::parseLong);
User user = userService.readById(userId);
Principal principal = UserPrincipal.from(user);
accessor.setUser(principal); // 세션 유저 설정
}
}
아까 전에 생성한 Principal 객체를 만들어주고, setUser()로 등록해주면 끝난다.
@MessageMapping("chat.message.{roomId}")
public void sendMessage(@DestinationVariable String roomId, ChatMessage message, Principal principal) {
log.info("sendMessage: roomId={}, message={}", roomId, message);
log.info("principal: name={}", principal.getName());
chatMessageProducer.sendMessage(message);
}
Controller에서 Principal을 주입해주기만 하면, 세션에 연결된 사용자를 알아서 잘 가져온다!
📌 long-lived connection에 대응하기 위한 전략
Stateful 상태의 WebSocket을 사용하면, Stateless를 지향하는 JWT와의 간극이 필연적으로 발생한다.
예로 Session은 유효한데, AT는 만료된 경우가 존재할 수 있는 건 쉽게 떠올릴 수 있는 상황이다.
그렇다면 모든 요청마다 매번 AT를 다시 검증할 것인가?
그렇게 한다고 쳤을 때, AT가 만료될 때마다 WebSocket을 끊었다가 유효한 AT를 가져오면 다시 연결할 것인가?
서비스 성격에 따라 사용자가 앱 내 체류 시간이 길지 않다면 나쁘지 않을 수도 있다고 본다. (여기서 체류 시간이 길지 않다는 것은 서비스를 잘못 만들어서 이탈율이 높기 때문이 아니라, 정책 상 체류 시간이 짧아야 하는 서비스를 말함)
우리 서비스는 플랫폼이 주 기능인만큼, 위 방식으로 안주하기엔 무리가 있다.
그렇다면 어떤 방법들을 도입해볼 수 있을까?
1️⃣ Principal에 토큰 만료시간(expiresAt) 정보 추가
Long userId = JwtClaimsParserUtil.getClaimsValue(claims, AccessTokenClaimKeys.USER_ID.getValue(), Long::parseLong);
LocalDateTime expiresDate = chatAccessTokenProvider.getExpiryDate(accessToken);
Principal principal = UserPrincipal.from(user, expiresDate);
Principal에 AT 만료 시간에 대한 정보를 추가하는 전략을 가장 처음 떠올렸다.
Connet 할 때 어차피 사용자 정보 추출해야 하니, 토큰 만료 시간도 같이 받아서 principal에 저장해둔다면
이후 사용자의 요청에 대해선 expiresAt 필드만 검사하면 될 것이라 생각했다.
요청할 때마다 AT를 헤더로 전달해서 만료 여부를 검증하는 것이 더 확실할 수도 있겠지만, 현재 JWT를 암호화할 때 SHA-256을 사용하고 있고, 이를 복호화하는 건 느리다.
실시간 요청에 대해 매번 복호화를 하고 있는 건 아무래도 비효율적이지 않을까?
따라서 Connect 시에 한 번만 복호화를 수행하고, 이후 AT을 새로 발급 받아오면 다시 만료 시간을 업데이트 시키는 전략을 구상하게 되었다.
2️⃣ 토큰 폐기(revocation) 동기화
JWT의 문제점 중 하나는 서비스 운영 도중 사용자의 상태가 변경(ex. 강제 로그아웃, 사용자 권한 변경)되었을 때, WebSocket 연결에 즉시 반영되기 어렵다는 점이다.
즉, 권한이 취소된 사용자가 일정 시간 동안 HTTP 요청은 못 하지만, WebSocket 요청은 성공하는 아이러니한 상황이 발생하게 된다는 문제가 존재한다.
이를 위해서 몇 가지 생각해볼 수 있는 방법이 있긴 하다.
- 스케쥴링
- 서버에서 주기적으로 모든 활성 WebSocket 연결의 토큰 유효성을 재검증한다.
- 채팅 서버에 많은 사용자가 연결되어 있을 경우 상당한 부담이 된다.
- expiresAt과 더불어 마지막 검증 시간을 유지하여, 일정 시간이 지난 후의 첫 메시지 전송 시 서버에서 토큰을 재검증하는 방법으로 완화할 수 있다.
- 이벤트 기반 검증
- 토큰 폐기, 권한 변경과 같은 중요 이벤트 발생 시, 해당 사용자의 WebSocket 연결을 강제로 종료하거나 재검증을 수행하도록 구성한다.
- Stateless 서버가 모든 클라이언트의 WebSocket Session 정보를 알고 있어야 하며, 구현이 상당히 복잡해질 우려가 있다.
Redis를 사용하는 건 SPOF로 작용할 우려도 있고, 사실상 Redis의 사용 목적을 벗어나는 행위이므로 고려하지 않았다.
어떻게든 구현을 해보라 한다면 할 수야 있겠지만, 조금 더 고민을 해보자.
애초에 이게 필요한 일일까?
JWT는 본래 목적이 Stateless를 지양하기 위함이다.
만료 기간이 너무 긴 refresh token을 방어하는 목적으로 refresh token refresh 같은 전략을 사용할 수는 있지만, 이 마저도 탈취된 access token의 만료 시간까지는 어찌 할 수 없는 게 당연 (그것이 stateless니까..)
그렇다면 관리자가 서비스 운영 도중 임의의 사용자 권한을 변경한 것이 WebSocket에 반영되게 하기 위해, 사용자가 가지고 있는 AT을 관리하겠다고 덤비는 순간, 이미 그건 stateless가 아니지 않을까?
채팅 기능 만들겠다고 JWT를 Stateful로 관리하려는 시도 자체가 패러다임을 거스르는 행위라고 생각한다.
그리고 가장 중요한 건, 우리 서비스엔 현재 이런 문제가 발생할 일 자체가 없다. 고려할 필요조차 없는 사항이라는 것이다.
3️⃣ 토큰 갱신 시나리오
Connect 시점에 AT를 검증하므로, 이제 사용자 인증을 수행할 수는 있겠으나 문제는 Client에게 어떤 응답이 돌아가고 있을까?
애석하게도 Connect가 해제된 이유를 명확하게 알려주지 않고 있다.
그렇다면 HTTP 응답처럼 예외 상태를 구분하기 위한 status code 정보를 정의하고, 전역 예외 핸들러를 설정해야 하는 상황이 되었다. (아니면 매번 try-catch 쓰거나)
하지만 명확히 다른 protocol을 사용하는데 똑같이 401 에러를 반환할 수는 없지 않은가..?
이 내용을 작성하려다가, 너무 길어지기도 하고 이번 포스트 목적과는 다른 내용이 될 것 같아서, 아예 게시물을 분리해서 작성하기로 했다.
작성하고 느낀 건데, 분리하길 잘 했다. ㅋㅋㅋㅋㅋㅋㅋ
📌 Authenticate
구현하고자 하는 목표는 다음과 같다.
- Client의 요청에 대해 Controller에서 Principal의 expiresAt 필드를 확인한다.
- 만약, expiresAt이 현재 시간보다 이전이라면 WebSocket 연결을 닫지 않으면서, client에게 인증 실패 메시지를 전달해야 한다.
- 클라이언트가 인증 서버에서 다시 유효한 AT를 발급받아서 갱신할 수 있어야 한다.
이를 위해, 작업해야 할 항목은 이렇다.
- Connect 시, 현재 client가 보낸 AT의 expiresAt을 상태로 추가한다.
- 모든 Controller마다 client의 인증 정보 만료 상태를 확인하는 AOP를 작성한다.
- 인증 실패 예외에 대한 전역 예외 처리를 구현한다.
- 클라이언트가 다시 expiresAt을 갱신할 수 있는 end-point를 제공한다.
1️⃣ Principal expiresAt 필드 추가 및 등록
이건 딱히 설명할 게 없음..그냥 UserPrincipal에 expiresAt 필드 추가하고, Authenticate Handler에서 정보를 추가로 넣어주기만 하면 된다.
AT의 만료 시간인 30분이 잘 들어가있다.
2️⃣ Authentication AOP
😇 비록 node.js가 아닌, spring을 쓰는 시점에서 성능 최적화 목적은 버렸기 때문에 Reflection을 사용했지만, 성능이 중요한 분들은 이런 식으로 하지 마세요.
/**
* WebSocket Controller에 대한 인증 및 인가를 지정하는 어노테이션.
* 이 어노테이션이 붙은 메서드는 {@link PreAuthorizeAspect}에 의해 처리됩니다.
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface PreAuthorize {
/**
* 인증/인가를 위한 SpEL 표현식.
* 이 표현식은 {@link PreAuthorizeSpELParser}에 의해 평가됩니다.
* 평가를 위해서 메서드 파라미터로 반드시 {@link java.security.Principal}이 포함되어야 합니다.
*
* @return 평가할 SpEL 표현식
*/
String value();
}
@MessageMapping("chat.message.{roomId}")
@PreAuthorize("#isAuthenticated(#principal)")
public void sendMessage(@DestinationVariable String roomId, ChatMessage message, Principal principal) {
chatMessageProducer.sendMessage(message);
}
Spring Security가 제공해주는 PreAuthorize 같은 어노테이션을 만들어 보고 싶었다.
그런데 SpEL이 내 맘 같이 동작해주질 않기도 했고, Thread-safe와 SpEL context 비용을 낮추기 위해서 고민하다가 머리가 생각 이상으로 머리가 아팠다.
원래라면 @PreAuthorize("isAuthenticated()")만 쳐도, 인증 사용자인지 아닌지 검증하도록 만들려고 했으나, 이게 생각보다 엄청나게 복잡하다.
SpEL은 둘째치고, Principal을 Security가 관리하는 객체가 아니다보니, 내가 직접 WebSession에 접속한 client들의 Principal을 관리해주는 작업도 해야하는데 배보다 배꼽이 커지는 거 같아서 관뒀다.
@Slf4j
@Aspect
@Component
@RequiredArgsConstructor
public class PreAuthorizeAspect {
/**
* {@link PreAuthorize} 어노테이션이 붙은 메서드를 가로채고 인증/인가를 수행합니다.
*
* @param joinPoint 가로챈 메서드의 실행 지점
* @return 인증/인가가 성공하면 원래 메서드의 실행 결과, 실패하면 UnauthorizedResponse
* @throws Throwable 메서드 실행 중 발생한 예외
*/
@Around("@annotation(com.example.socket.chats.common.annotation.PreAuthorize)")
public Object execute(final ProceedingJoinPoint joinPoint) throws Throwable {
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
Method method = signature.getMethod();
PreAuthorize preAuthorize = method.getAnnotation(PreAuthorize.class);
Principal principal = extractPrincipal(joinPoint.getArgs());
boolean isAuthorized = PreAuthorizeSpELParser.evaluate(preAuthorize.value(), principal, method, joinPoint.getArgs());
if (!isAuthorized) {
handleUnauthorized(principal, preAuthorize);
}
return joinPoint.proceed();
}
/**
* 메서드 인자에서 Principal 객체를 추출합니다.
*
* @param args 메서드 인자 배열
* @return 찾은 Principal 객체, 없으면 null
*/
private Principal extractPrincipal(Object[] args) {
return Stream.of(args)
.filter(arg -> arg instanceof Principal)
.map(arg -> (Principal) arg)
.findFirst()
.orElse(null);
}
/**
* 인증/인가 실패 시 처리합니다.
*
* @param principal
* @param preAuthorize
*/
private void handleUnauthorized(Principal principal, PreAuthorize preAuthorize) {
...
}
}
Reflection 재밌당...성능이 느린 걸 알지만 끊을 수 없어. ㅜㅜ
Controller 메서드 파라미터의 Principal 객체를 가져와서 Parser에 던져준다음 평가를 수행한다.
딱히 안 어려우니 설명은 패스.
이 다음은 날 미치게 만들 뻔한 SpELParser ^^
/**
* WebSocket 인증 및 인가를 위한 Spring Expression Language (SpEL) 파서.
* 이 클래스는 WebSocket 연결에서 사용되는 다양한 인증/인가 함수를 제공하고,
* SpEL 표현식을 평가하는 기능을 제공합니다.
*
* @author YANG JAESEO
* @since 2024.09.26
* @version 1.0.0
*/
public final class PreAuthorizeSpELParser {
private static final ExpressionParser parser = new SpelExpressionParser();
private static final StandardEvaluationContext context = new StandardEvaluationContext();
static {
initializeStaticContext();
}
private PreAuthorizeSpELParser() {
throw new IllegalStateException("Utility class");
}
private static void initializeStaticContext() {
for (SpELFunction function : SpELFunction.values()) {
try {
context.registerFunction(function.getName(),
PreAuthorizeSpELParser.class.getDeclaredMethod(function.getMethodName(), function.getParameterTypes()));
} catch (NoSuchMethodException e) {
throw new RuntimeException("Error registering SpEL function: " + function.getName(), e);
}
}
}
...
/**
* 모든 사용자에게 접근을 허용합니다.
*
* @return 언제나 true를 반환한다.
*/
public static boolean permitAll() {
return true;
}
/**
* 주어진 Principal이 익명 사용자인지 확인합니다.
*
* @param principal 확인할 Principal 객체
* @return 익명 사용자이면 true, 그렇지 않으면 false
*/
public static boolean isAnonymous(Principal principal) {
return principal == null;
}
/**
* 주어진 Principal이 인증된 사용자인지 확인합니다.
*
* @param principal 확인할 Principal 객체
* @return 인증된 사용자이고 토큰이 만료되지 않았으면 true, 그렇지 않으면 false
*/
public static boolean isAuthenticated(Principal principal) {
if (principal instanceof UserPrincipal userPrincipal) {
return userPrincipal.getExpiresAt().isAfter(LocalDateTime.now());
}
return false;
}
/**
* WebSocket 인증/인가에 사용되는 SpEL 함수들을 정의하는 열거형.
* 각 함수는 이름, 메서드 이름, 파라미터 타입을 가집니다.
*/
public enum SpELFunction {
PERMIT_ALL("permitAll", "permitAll"),
IS_ANONYMOUS("isAnonymous", "isAnonymous", Principal.class),
IS_AUTHENTICATED("isAuthenticated", "isAuthenticated", Principal.class);
private final String name;
private final String methodName;
private final Class<?>[] parameterTypes;
SpELFunction(String name, String methodName, Class<?>... parameterTypes) {
this.name = name;
this.methodName = methodName;
this.parameterTypes = parameterTypes;
}
public String getName() {
return name;
}
public String getMethodName() {
return methodName;
}
public Class<?>[] getParameterTypes() {
return parameterTypes;
}
}
}
우선 가장 처음엔 StandardEvaluationContext를 static으로 생성했다.
왜냐하면, 문자열 매칭하는 애들은 기본적으로 유한 상태 머신(FA)을 가지고 있을 확률이 높다.
물론 다 까보진 않았지만, 그냥 대충 훑어봐도 pattern 캐싱까지 하는 걸로 봐서 evaluate할 때마다 캐싱하는 건 위험하다고 판단했다. (심지어 매우 빈번하게 호출되는 녀석이라 더더욱)
문제는 principal이다.
내가 직접 현재 사용자의 Principal을 가져올 방법이 없으므로, client가 값을 넘겨주면, 그 값을 context가 사용할 수 있도록 설정해주어야 한다.
그러나 StandardEvaluationContext를 static으로 만들어버리는 덕에, 자칫하다 경합 문제가 발생할 위기에 처했다.
(사용자 1이 context에 Principal을 등록했는데, 평가가 수행되기 전에 사용자 2가 principal을 덮어쓸 수 있다.)
/**
* 주어진 SpEL 표현식을 평가합니다.
*
* @param expression 평가할 SpEL 표현식
* @param principal 현재 사용자의 Principal 객체
* @param method 평가 중인 메서드
* @param args 메서드의 인자들
* @return 표현식 평가 결과 (true/false)
*/
public static synchronized boolean evaluate(String expression, Principal principal, Method method, Object[] args) {
populateContext(principal, method, args);
return Boolean.TRUE.equals(parser.parseExpression(expression).getValue(context, Boolean.class));
}
/**
* SpEL 평가를 위한 컨텍스트를 생성합니다.
*
* @param principal 현재 사용자의 Principal 객체
* @param method 평가 중인 메서드
* @param args 메서드의 인자들
* @return 생성된 StandardEvaluationContext
*/
private static void populateContext(Principal principal, Method method, Object[] args) {
Parameter[] parameters = method.getParameters();
for (int i = 0; i < parameters.length; i++) {
context.setVariable(parameters[i].getName(), args[i]);
}
}
그래서 evaluate에 synchronized를 걸어버렸다.
popluateContext에만 걸면 되지 않을까라고 생각할 수 있겠지만, 그럼 여전히 평가 전에 principal 변수 값이 변경될 수 있다.
따라서 principal을 변수로 등록한 후, 평가가 완료될 때까지는 lock이 유지되어야 한다.
🤔 성능 문제가 있지 않을까?
당연히 synchronized를 걸어야겠다는 생각을 하기 이전에, 다른 시도를 해보려고도 해봤다.
아무래도 real-time 서비스에다, 거의 모든 요청마다 실행되어야 하는 로직에 synchronized가 걸려있다는 건 아무래도 꺼림칙하지 않은가.
public class PreAuthorizeSpELParserTest {
@Test
void testThreadSafety() throws Exception {
int threadCount = 1000;
ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
CountDownLatch latch = new CountDownLatch(threadCount);
AtomicInteger successCount = new AtomicInteger(0);
Method testMethod = TestController.class.getDeclaredMethod("testMethod", String.class);
GenericApplicationContext applicationContext = new GenericApplicationContext();
for (int i = 0; i < threadCount; i++) {
final int index = i;
executorService.submit(() -> {
try {
Principal principal = index % 2 == 0 ? null : UserFixture.AUTHENTICATED.getPrincipal();
String expression = index % 2 == 0 ? "#isAnonymous(#principal)" : "#isAuthenticated(#principal)";
Object[] args = new Object[]{"principal"};
boolean result = PreAuthorizeSpELParser.evaluate(expression, principal, testMethod, args, applicationContext);
if ((index % 2 == 0 && result) || (index % 2 != 0 && result)) {
successCount.incrementAndGet();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
latch.countDown();
}
});
}
latch.await();
executorService.shutdown();
assertEquals(threadCount, successCount.get(), "모든 평가가 완료되어야 합니다.");
}
private static class TestController {
public void testMethod(String arg) {
System.out.println("testMethod: " + arg);
}
}
private enum UserFixture {
AUTHENTICATED;
public Principal getPrincipal() {
return UserPrincipal.builder()
.userId(1L)
.name("jayang")
.expiresAt(LocalDateTime.now().plusMinutes(30))
.build();
}
}
}
그런데, 막상 테스트 해보니 캐싱의 힘이 어지간히 강력했는지, Thread 1,000개를 실행해도 고작 354ms.
..그럼 지금은 그냥 무시해도 될 정도의 지배 시간인데?
누누히 언급하지만, 이 이상의 성능을 요구한다면 spring을 쓰는 게 이상한 거라고 생각한다.
양복 입고 달리기가 빠르길 기대한다는 게 이상하잖아. 🏃♂️➡️
3️⃣ JwtException Exception Handler
/**
* 인증/인가 실패 시 처리합니다.
*
* @param principal
* @param preAuthorize
*/
private void handleUnauthorized(Principal principal, PreAuthorize preAuthorize) {
if (preAuthorize.value().contains(PreAuthorizeSpELParser.SpELFunction.IS_AUTHENTICATED.getName())) {
log.warn("인증 실패: {}", principal);
throw new PreAuthorizeErrorException(PreAuthorizeErrorCode.UNAUTHENTICATED);
} else if (preAuthorize.value().contains(PreAuthorizeSpELParser.SpELFunction.IS_ANONYMOUS.getName())) {
log.warn("익명 실패: {}", principal);
throw new PreAuthorizeErrorException(PreAuthorizeErrorCode.NOT_ANONYMOUS);
}
}
예외 처리는 정말 간단하다.
다만, isAuthenticate 실패인지, isAnonymous 실패인지 등을 구분해주려 한다면 제법 귀찮아진다. (permitAll은 실패할 일이 없으므로 패스)
이것도 디자인 패턴을 도입해서 해결해볼 수 있겠지만, 합리적으로 생각했을 때 이 이상 권한 메서드가 추가될 일이 없다. (isAnonymous도 사실 필요없음.)
그래서 그냥 위와 같이 처리하고 넘겼다.
String의 contains()의 성능이 살짝 마음에 걸렸는데, 매우 빠르길래 안심하고 사용했다.
예외를 전역 예외 처리에서 받아주기만 하면 끝난다.
4️⃣ Refresh expiresAt
@Slf4j
@Controller("chatAuthController")
@RequiredArgsConstructor
public class AuthController {
private final AuthService authService;
@MessageMapping("auth.refresh")
@PreAuthorize("#principal instanceof T(com.example.socket.chats.common.security.principal.UserPrincipal)")
public void refreshPrincipal(@Header("Authorization") String authorization, Principal principal, StompHeaderAccessor accessor) {
log.info("refreshPrincipal AccessToken: {}", authorization);
// token 앞의 "Bearer " 제거
if (authorization == null || !authorization.startsWith("Bearer ")) {
throw new JwtErrorException(JwtErrorCode.EMPTY_ACCESS_TOKEN);
}
String token = authorization.substring(7);
authService.refreshPrincipal(token, (UserPrincipal) principal, accessor);
}
}
이전엔 모든 예외를 Receipt 프레임으로 받으려다 대차게 실패했지만, 이번엔 refresh 메시지에 대해 성공/실패 여부를 client에서 요구하므로 reciept 프레임을 사용해도 괜찮지 않을까?
내가 구상한 flow는 이렇다.
- Client가 Chat Server로 Send 메시지를 보내, Principal의 expiresAt을 업데이트 한다. (receipt 헤더 포함)
- Chat Server와 이미 확인된 Principal이 존재함을 가정하므로, 인증 권한은 principal이 null이 아님을 검증한다.
- isAuthenticate를 해버리면, expiresAt이 만료된 상태일 수 있으므로 refresh에 실패한다.
- 단순히 Principal과 instanceof를 하거나, null이 아님을 검사해도 되지만, 이후 강제 형변환할 때 추가 검사하기 싫어서 위와 같이 했다.
- Chat Server는 client의 AT에서 만료 시간을 추출하여 Principal의 expiresAt을 업데이트한다.
- Server는 성공/실패 여부를 receipt 헤더를 기반으로, channel에 응답 메시지를 보낸다.
- Client는 message의 성공, 실패 여부에 따라 적절하게 핸들링한다.
Client 측에서 핸들링하는 코드는 적당히 넘길 생각이고, 중요한 것은 Receipt 프레임을 Client에서 받을 수 있냐는 것이 관건이었다.
결국 SockJS로는 한계가 찾아왔다고 판단해서 위 라이브러리로 Client 측 Stomp 라이브러리를 대체하였다.
(대체한 코드는 너무 길어져서, github에서 참고하시길 바랍니다.)
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/chat")
// .setAllowedOrigins("*") // 이거 넣으면 allowedOrigins가 true일 때, * 못 넣으니까 pattern 쓰라고 에러 발생함.
.setAllowedOriginPatterns("http://127.0.0.1:8000"); // 실제 환경에선 API 서버 도메인만 허용
// .withSockJS(); // JS 라이브러리. 우린 iOS라서 안 씀. 테스트를 위해 허용
registry.setErrorHandler(stompExceptionInterceptor);
}
SockJS를 사용하지 않으므로, withSockJS() 설정도 제거해주어야 한다.
@Slf4j
@Service
@RequiredArgsConstructor
public class AuthService {
private final AccessTokenProvider chatAccessTokenProvider;
private final ApplicationEventPublisher eventPublisher;
public void refreshPrincipal(String token, UserPrincipal principal, StompHeaderAccessor accessor) { // 실패했을 때, 연결 끊어버릴 건지??
// Receipt 프레임으로 응답 보낼 거라 RabbitMQ로 보낼 필요가 없음. event로 처리.
try {
LocalDateTime expiresAt = chatAccessTokenProvider.getExpiryDate(token);
principal.updateExpiresAt(expiresAt);
log.info("refresh success: {}", principal);
Message<ServerSideMessage> message = MessageBuilder.createMessage(ServerSideMessage.of("2000", "토큰 갱신 성공"), accessor.getMessageHeaders());
eventPublisher.publishEvent(RefreshEvent.of(message));
} catch (JwtErrorException e) {
log.info("refresh failed: {}", e.getErrorCode().getExplainError());
Message<ServerSideMessage> message = MessageBuilder.createMessage(ServerSideMessage.of(e.getErrorCode().getExplainError()), accessor.getMessageHeaders());
eventPublisher.publishEvent(RefreshEvent.of(message));
}
}
}
본격적으로 Principal을 업데이트 하기 위해서, 우선 AT의 만료 시간을 추출한 후 업데이트를 해주었다.
이 과정에서 에러가 발생할 수 있는데, 반드시 GlobalExceptionHandler로 넘어가지 않도록 handling을 해주어야 한다.
안 그러면, 성공은 receipt로 가면서, 실패는 error queue로 전달되어 client 측에 혼동을 주게 되기 때문이다.
Receipt 응답은 Client와 연결된 channel로 반환해줄 필요가 있다.
예외 처리 포스팅에서 simpMessageTemplate로는 불가능함을 충분히 만끽했기 때문에, 이걸 처리해줄 EventHandler를 구현하는 방향으로 진행했다.
public class RefreshEvent<T extends ServerSideMessage> extends ApplicationEvent {
private final Message<T> message;
public RefreshEvent(Message<T> message) {
super(message);
this.message = message;
}
public static <T extends ServerSideMessage> RefreshEvent<T> of(Message<T> message) {
return new RefreshEvent<>(message);
}
public Message<T> getMessage() {
return message;
}
}
우선 ApplicationEvent를 확장한 RefreshEvent 객체를 정의했다.
응답을 위해 필요한 receipt 헤더 또한, AuthService에서 Message를 생성할 때 header로 삽입해준 것을 전제로 한다.
굳이 코드가 동일할 이유는 없는데, ApplicationEvent를 확장하지 않고도 event handling을 할 수 있기 때문에 세부적인 구현은 신경쓰지 않아도 됨.
@Slf4j
@Component
@RequiredArgsConstructor
public class ReceiptEventHandler {
private final ObjectMapper objectMapper;
@Bean
@Async
@EventListener
public ApplicationListener<RefreshEvent> handleRefreshEvent(final AbstractSubscribableChannel clientOutboundChannel) {
return event -> {
log.info("handleRefreshEvent: {}", event);
Message<byte[]> message = event.getMessage();
StompHeaderAccessor accessor = StompHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
byte[] payload = new byte[0];
try {
payload = objectMapper.writeValueAsBytes(message.getPayload());
} catch (JsonProcessingException e) {
e.printStackTrace();
}
if (accessor.getReceipt() != null) {
accessor.setHeader("stompCommand", StompCommand.RECEIPT);
accessor.setReceiptId(accessor.getReceipt());
clientOutboundChannel.send(
MessageBuilder.createMessage(message.getPayload(), accessor.getMessageHeaders()));
}
};
}
}
ReceiptEvent를 처리해줄 Handler를 정의하고, 응답을 돌려줄 channel 정보 파악을 위해 AbstractSubscribableChannel을 매개변수로 주입해주었다.
이렇게 하면, server to client할 메시지 정보와 client의 receipt 정보를 모두 추출하여 client와 연결된 channel로 메시지를 전송할 수 있다.
이제 테스트를 해보자.
빠른 테스트를 위해 AT 만료 시간을 30초로 수정했다.
여기서 기존 요청 재시도를 한다고 로그를 찍어놓긴 했는데, 실제로 하진 않는다.
"haha"를 치던 시점에서 인증 에러가 발생했는데, 이건 채팅방에 재전송하도록 처리하지 않은 상태.
서버 측 로그를 살펴보면, pub 시에 인증에 실패했고, isAuthroized가 false로 나와 예외가 던져졌음을 볼 수 있다.
이후, refresh 요청이 들어왔을 때 사용자의 AT로 expiresAt을 추출하여 업데이트 한 후 receipt를 반환하면, client는 다시 정상적으로 요청을 보낼 수 있게 된다.
📬 보내지지 않은 러브레터 문제
"hello"를 전송했으나 인증 문제로 인해 요청이 거부되었을 때, 이 메시지를 어떻게 처리할 지가 관건이 된다.
Server가 메시지를 보관하고 있다가, 사용자 인증이 완료되면 MQ의 메시지들을 다시 차례로 publish 해야 할까?
하지만 Server 입장에서 정말 유효하지 않은 사용자가 시도한 건지, 잠시 expires된 사용자인지 매번 구분하고 MQ에 메시지를 축적해두는 건 리스크가 있다고 생각한다. (할 수야 있겠다만 굳이?)
이건 Client 측에서 해결하는 게 보다 적절하다고 보는데, pub에 실패한 메시지를 보관해두었다가 refresh에 성공하면, 전송에 실패한 message를 다시 차례로 server로 흘려주면 될 일이라고 생각한다.
그걸 Client 에서 구현하는 일 또한 별로 어렵지 않겠지만, 지금도 내가 javascript 공부를 하는 건지 spring 공부를 하는 건지 헷갈릴 지경이므로 구현은 생략했다.
📌 Authorize
사용자가 자원에 접근할 수 있는 지를 검사하는 단계는 필수적으로 포함되어야 한다.
만약, 그렇지 않으면 인증된 사용자가 자신이 속하지 않은 채팅방에 메시지를 전송하는 것도 허용될 수 있기 때문이다.
따라서 여기선 다음 두 가지 항목을 구현해야 한다.
- 모든 Subscribe 요청에 대해, 사용자의 자원 접근 권한을 확인해야 하며, 유효하지 않은 구독 요청에 대해선 WebSocket 연결을 해제하지 않고 에러를 반환한다. (Interceptor에서 처리)
- 모든 Publish 요청에 대해, 사용자가 해당 자원에 메시지를 작성 가능한 지 확인해야 하며, 위와 동일하게 처리해야 한다. (Business 영역에서 처리)
1️⃣ Subscribe 자원 접근 권한 검사 Interceptor
⚠️ 아래에서 성공 이벤트를 처리하기 위해 EventHandler를 사용했는데, `24.10.10 시점에 확인 결과 정상 동작하지 않음을 발견했습니다! 주의해주세요. (자세한 설명은 아래 첨부)
처음 구상은 이러했다.
- Client가 subscribe를 할 때, 해당 자원에 구독 권한이 없을 수도 있다.
- 따라서 server는 subscribe에 대해 성공/실패 여부 응답을 돌려주는 것이 좋다고 판단을 했고, 이를 error queue에 담기 보단 receipt 프레임을 사용하고자 했다.
- 기존에 Exchange를 변환해주던 핸들러를 아예 채팅방 구독 인가 핸들러로 개조해버렸다.
- 만약 "/sub/"를 "/exchange/~" 어쩌구로 변환해주는 핸들러와 인가 핸들러를 따로 두면, 내가 구현한 방식에서 Interceptor 실행 순서를 보장할 수 없다는 문제가 존재했기 때문. (리팩토링 파트에서 Handler에 디자인 패턴을 적용해버렸더니 그만..)
- 채팅방 구독 경로만을 처리하는 Handler를 만들어버리니 다른 문제가 발생했는데, 추후 다른 exchange에 구독할 일이 생겼을 때 path에서, 어떤 논리로 권한 검사를 해야할 지 너무 복잡해졌다.
- 채팅 검사는 "chat.room.{chat_room_id}"니까, 가장 마지막 id만 파싱하면 된다.
- 그럼 다른 exchange가 생기면? 중첩된 경로로 구독이 발생하면??
- 위 문제를 해결하기 위해, Resource Path를 정규식으로 관리하여 적절한 인가 로직을 수행할 수 있도록 하는 ResourceAccessRegistry를 구현하게 됐다.
- 실패하면 예외가 발생해서 StompExceptionInterceptor에서 캐치한 후, 구독 실패 핸들러에서 적절히 Receipt 프레임을 반환한다. (성공했을 때도 비슷하나, 아래에 마저 적음.)
이 때, 성공했을 때와 실패했을 때에 대해 각각 eventHandler를 구현할 예정이었다.
왜 이런 접근법을 택했냐면,
/**
* Event raised when a new WebSocket client using a Simple Messaging Protocol
* (e.g. STOMP) sends a subscription request.
*
* @author Rossen Stoyanchev
* @since 4.0.3
*/
@SuppressWarnings("serial")
public class SessionSubscribeEvent extends AbstractSubProtocolEvent {
public SessionSubscribeEvent(Object source, Message<byte[]> message) {
super(source, message);
}
public SessionSubscribeEvent(Object source, Message<byte[]> message, @Nullable Principal user) {
super(source, message, user);
}
}
Spring STOMP에서 이미 subscribe가 성공했을 때, SessionSubscribeEvent가 발생하도록 만들어뒀기 때문이다.
그렇다면 실패했을 때와 성공했을 때, 각각의 EventHandler를 구현한다면 아주 손쉽게 만들 수 있지 않을까 싶었다.
실패한 케이스까지 쓰기엔 지금도 포스트가 너무 길어서 작성하는 와중에 계속 렉이 걸린다. ㅋㅋ
그래서 결론부터 이야기하자면, 구독에 실패했을 때 EventHandler가 정상적으로 동작하지 않는 이슈가 있었다.
그저 짐작일 뿐이지만, Subscribe 요청이 들어왔는데 Interceptor에서 예외를 던져버리는 바람에 ExecutorSubscribableChannel이 message 전송에 실패하는 로그가 발생한다.
문제는 eventHandler에서는 clientOutputChannel을 통해 Receipt 프레임을 돌려주려 했는데, 도중에 실패해서인지 메시지가 보내지지 않는 문제가 있었다.
그래서 그냥 "예외 핸들러에서 Receipt 프레임 만들어서 보내면 되는 거 아님?" 이러고 보내봤는데 잘만 된다.
@Slf4j
@Component
@RequiredArgsConstructor
public class ChatExchangeAuthorizeHandler implements SubscribeCommandHandler {
private static final String REQUEST_EXCHANGE_PREFIX = "/sub/";
private static final String CONVERTED_EXCHANGE_PREFIX = "/exchange/chat.exchange/";
private static final String PRIVATE_EXCHANGE_PREFIX = "/user/";
private final ResourceAccessRegistry resourceAccessRegistry;
@Override
public boolean isSupport(StompCommand command) {
return StompCommand.SUBSCRIBE.equals(command);
}
@Override
public void handle(Message<?> message, StompHeaderAccessor accessor) {
String destination = accessor.getDestination();
if (destination != null && destination.startsWith(PRIVATE_EXCHANGE_PREFIX)) { // "/user/"로 시작하는 경우는 bypass
log.info("[Exchange 권한 검사] User {}에 대한 {} 권한 검사 통과", accessor.getUser().getName(), destination);
return;
}
// 자원 검사 (구독할 수 없는 데이터라면? connection을 해제할 것인지? 아니면, 구독만 안 되게 할 것인지?)
// 자원 검사를 위한 path에서 필요한 정보는 어떻게 추출할 것인지?
// 예를 들어, /sub/chat.exchange/chat.room.1 이라면, client가 1번 채팅방 접근 가능 여부를 판단
// 잘못 설계하면 추후 다른 path가 추가될 때마다 interceptor를 추가해야 하는데, 범용적으로 해결할 방법이 없을까???
if (resourceAccessRegistry.getChecker(destination).hasPermission(destination, accessor.getUser())) {
log.info("[Exchange 권한 검사] User {}에 대한 {} 권한 검사 통과", accessor.getUser().getName(), destination);
String convertedDestination = convertDestination(destination);
accessor.setDestination(convertedDestination);
} else { // 권한이 없으면 connection은 유지하고, client에게 에러 메시지를 전달
log.info("[Exchange 권한 검사] User {}에 대한 {} 권한 검사 실패", accessor.getUser().getName(), destination);
throw new InterceptorErrorException(InterceptorErrorCode.UNAUTHORIZED_TO_SUBSCRIBE);
}
}
private String convertDestination(String destination) {
if (destination == null || !destination.startsWith(REQUEST_EXCHANGE_PREFIX)) {
throw new InterceptorErrorException(InterceptorErrorCode.INVALID_DESTINATION); // 이것도 연결 끊지 말고, 구독 실패 에러 전달만
}
String convertedDestination = destination.replace(REQUEST_EXCHANGE_PREFIX, CONVERTED_EXCHANGE_PREFIX);
log.info("[Exchange 변환 핸들러] destination={}, convertedDestination={}", destination, convertedDestination);
return convertedDestination;
}
}
기존의 destination 접두사만 수정해주던 handler가 폭주해버렸다. 껄껄
여기서 개인적인 설계가 반영된 부분은 다음과 같다.
- SUBSCRIBE에 실패했을 때, WebSocket을 끊어야 하나 고민했지만 critical한 이슈가 아닌 단순 인가 이슈이므로 끊는 게 이상하다고 판단했다.
- "/user/"로 시작하는 구독 요청은 어차피 사용자 세션, 즉 client 고유의 exchange에 대한 구독이다. 따라서, 이걸 막을 이유는 없다고 판단했다. (그래서 이 경우는 무조건 성공한다)
- 접두사를 변환하고 인가 검사를 수행하려다, 그러면 결국 checker가 rabbitMQ의 프로토콜에 종속되는 것이 아닌가 싶어서 "/sub/" 접두사인 채로 검사를 수행하도록 수정했다.
참고로 여기서 InterceptorErrorCode는 기존의 다른 ErrorCode와는 달리 특별한 필드를 추가했다.
추후 예외 핸들러에서 다시 살펴보도록 하고, 일단은 ResourceAccessRegistry를 살펴보자.
/**
* 리소스 접근 권한 체커를 관리하는 레지스트리
* path에 대한 checker를 내부적으로 관리한다.
*/
public final class ResourceAccessRegistry {
private final Map<Pattern, ResourceAccessChecker> checkers = new HashMap<>();
public ResourceAccessRegistry() {
}
public void registerChecker(final String pathPattern, final ResourceAccessChecker checker) {
checkers.put(Pattern.compile(pathPattern), checker);
}
/**
* path에 대한 체커를 반환한다.
*
* @param path : 요청 경로
* @return ResourceAccessChecker : path에 대한 체커
* @throws IllegalArgumentException : 해당 경로에 대한 체커가 없는 경우
*/
public ResourceAccessChecker getChecker(final String path) {
return checkers.entrySet().stream()
.filter(entry -> entry.getKey().matcher(path).matches())
.map(Map.Entry::getValue)
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("해당 경로에 대한 체커가 없습니다. path = " + path));
}
}
ResourceAccessRegistry는 단순히 subscribe path 패턴에 대해, 적절한 AccessCheckr를 매핑하는 상태만을 관리한다.
checkers Map의 key로 path 문자열을 담으려 하지 않는다는 것에 주의하자.
Pattern 생성 비용은 비싸다.
그러니 처음 등록할 때, 아예 Pattern 객체를 캐싱해두는 게 비용적으로 이득이다.
@Configuration
@RequiredArgsConstructor
public class ResourceAccessRegistryConfig {
private final ChatRoomAccessChecker chatRoomChecker;
@Bean
public ResourceAccessRegistry configureResourceAccess() {
ResourceAccessRegistry registry = new ResourceAccessRegistry();
registry.registerChecker("^/sub/chat\\.room\\.\\d+$", chatRoomChecker);
return registry;
}
}
그리고 Config 파일에서 path와 checker를 등록해주면 된다.
엥, 그럼 ChatRoomChecker는 뭔데요?
/**
* 리소스 접근 권한을 확인하는 인터페이스
*/
public interface ResourceAccessChecker {
/**
* 리소스에 대한 접근 권한을 확인한다.
*
* @param path : 요청 경로
* @param principal : 요청자
* @return 접근 권한 여부
*/
boolean hasPermission(String path, Principal principal);
}
우선 모든 ResourceAccessChecker는 위 메서드를 가지고 있어야 한다.
그 이외에 확장성을 고려하거나, 본인의 서비스에 맞게 파라미터를 늘리거나, 메서드를 추가하면 된다.
난 딱히 떠오르는 게 없어서, 가장 단순한 형태로 만들었긴 한데...나중에 @PreAuthorize에서 써먹으려면 메서드가 더 필요할 수도 있겠다.
@Slf4j
@Component("chatRoomAccessChecker")
@RequiredArgsConstructor
public class ChatRoomAccessChecker implements ResourceAccessChecker {
private final ChatRoomService chatRoomService;
@Override
public boolean hasPermission(String path, Principal principal) {
return isChatRoomAccess(getChatRoomId(path), principal);
}
/**
* path에서 chatRoomId를 추출한다.
*
* @param path : {@code /sub/chat.room.{roomId} 포맷}
* @return chatRoomId
*/
private Long getChatRoomId(String path) {
String[] split = path.split("\\.");
return Long.parseLong(split[split.length - 1]);
}
private boolean isChatRoomAccess(Long chatRoomId, Principal principal) {
return chatRoomService.isExists(chatRoomId, 1L); // Long.parseLong(principal.getName())가 안 되므로, 일단 1L 고정
}
}
실제 접근이 가능한지는 Domain 모듈의 Service 로직을 가져와 판단해야 한다.
이 때, ChatRoomService 내부에서 캐싱을 하게 하던, 처음부터 Redis에서 값을 가져오게 하던 적절히 처리하면 된다.
(내부 구현 볼 게 없는 게, 난 테이블 만들기 귀찮아서 isExists() 반환값을 true로 고정시켜버렸다.)
잠시 숨을 돌리고, 우리가 마저 구현해야 할 코드가 무엇인지 확인해보자.
예외가 발생한 경우를 처리하는 건 너무 복잡해보이니까, 우선 성공하는 케이스를 먼저 테스트 해보기 위해 EventHandler를 먼저 구현했었다.
@Slf4j
@Component
@RequiredArgsConstructor
public class ReceiptEventHandler {
private final ObjectMapper objectMapper;
...
@Bean
@Async
@EventListener
public CompletableFuture<ApplicationListener<SessionSubscribeEvent>> sessionSubscribeEventListener(final AbstractSubscribableChannel clientOutboundChannel) {
return CompletableFuture.completedFuture(event -> {
log.info("sessionSubscribeEventListener: {}", event);
Message<byte[]> message = event.getMessage();
StompHeaderAccessor accessor = StompHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
sendReceiptMessage(clientOutboundChannel, accessor, new byte[0]);
});
}
private byte[] convertPayloadToBytes(Message<ServerSideMessage> message) {
byte[] payload = new byte[0];
try {
log.info("message.getPayload(): {}", message.getPayload());
payload = objectMapper.writeValueAsBytes(message.getPayload());
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return payload;
}
private void sendReceiptMessage(AbstractSubscribableChannel clientOutboundChannel, StompHeaderAccessor accessor, byte[] payload) {
if (accessor != null && accessor.getReceipt() != null) {
accessor.setHeader("stompCommand", StompCommand.RECEIPT);
accessor.setReceiptId(accessor.getReceipt());
Message<byte[]> receiptMessage = MessageBuilder.createMessage(payload, accessor.getMessageHeaders());
log.info("receiptMessage: {}", receiptMessage);
clientOutboundChannel.send(receiptMessage);
}
}
}
위에서 언급했던 것처럼, 구독에 성공하면 SessionSubscribeEvent가 발생하므로 이걸 Listen하는 bean을 등록해주기만 하면 끝난다!
참고로 이 과정에서 자꾸 경고가 나길래 봤더니 @Async 했으면, 반환값을 4가지(Future 종류 3개랑 void 1개) 중 하나로 정하라고 하던데, 그냥 수정 눌렀더니 저렇게 됐다.
Future는 비동기 관련 java util인 건 알고 있었는데, 저건 또 처음봐서 당황스러움.. 나중에 또 공부해서 알아봐야겠다.
이제 대망의 구독 예외 핸들러.
여기서 진짜 골머리를 앓았었는데, 우선 첫 번째로 SubscribeExceptionHandler의 canHandle() 메서드의 구현이었다.
만약, AuthenticateExceptionHandler처럼 InterceptorErrorException이면 모두 받아주기로 해버리면,
완전히 다른 Frame 요청에서 에러가 발생해도 어미의 마음으로 품어준 후, handle() 메서드를 한 번씩 돌려보고 가게 된다.
그렇다고 canHandle()에 추가 매개변수를 넣는 방식으로 해결하려 하면, 나중에 또 다른 문제가 발생하게 될 우려가 크다고 생각하여 최대한 지양하고 싶은 방법이었다.
한참을 고민하고 고민하다 떠오른 생각이 Exception이 추가 정보를 제공해주면 되지 않을까란 아이디어가 떠올랐다.
@Slf4j
@Component
@RequiredArgsConstructor
public class SubscribeExceptionHandler implements StompExceptionHandler {
private final ObjectMapper objectMapper;
private static final byte[] EMPTY_PAYLOAD = new byte[0];
@Override
public boolean canHandle(Throwable cause) {
if (cause instanceof InterceptorErrorException ex) {
return ex.getErrorCode().isSupportCommand(StompCommand.SUBSCRIBE);
}
return false;
}
...
}
예를 들면 이런 식으로 처리하는 방법이었다.
canHandle()의 파라미터로 StompCommand를 전달하는 방법과 위 방법 중 뭐가 더 나을까 고민을 해봤지만, 둘 다 나쁘지 않은 거 같아서 고민이 많이 됐었다.
그러나 매개변수는 적으면 적을 수록 좋다고 생각해서, 기존 아이디어를 채택했다.
public enum InterceptorErrorCode implements BaseErrorCode {
// 400
INVALID_DESTINATION(StatusCode.BAD_REQUEST, ReasonCode.INVALID_REQUEST, "유효하지 않은 목적지입니다", StompCommand.SEND, StompCommand.SUBSCRIBE, StompCommand.UNSUBSCRIBE),
// 403
UNAUTHORIZED_TO_SUBSCRIBE(StatusCode.FORBIDDEN, ReasonCode.ACCESS_TO_THE_REQUESTED_RESOURCE_IS_FORBIDDEN, "해당 주제에 대한 구독 권한이 없습니다", StompCommand.SUBSCRIBE, StompCommand.UNSUBSCRIBE),
;
private final StatusCode statusCode;
private final ReasonCode reasonCode;
private final String message;
private final StompCommand[] commands;
...
/**
* StompCommand가 ErrorCode에서 지원하는 명령어인지 확인하는 편의용 메서드
*
* @param command {@link StompCommand}
* @return 해당 ErrorCode에서 지원하는 명령어라면 true, 아니라면 false
*/
public boolean isSupportCommand(StompCommand command) {
for (StompCommand c : commands) {
if (c.equals(command)) {
return true;
}
}
return false;
}
}
다만 이 방식의 단점은 상수가 엄청나게 길어진다는 것..
매개변수로 넘기는 게 나았으려나 여전히 고민 중에 있다.
이제 handle() 메서드를 구현할 일만 남았다.
@Slf4j
@Component
@RequiredArgsConstructor
public class SubscribeExceptionHandler implements StompExceptionHandler {
...
@Override
public Message<byte[]> handle(Message<byte[]> clientMessage, Throwable cause) {
// header에 receipt 가 존재하는 지 확인
if (clientMessage == null) {
log.warn("receipt header가 존재하지 않습니다. clientMessage={}", clientMessage);
return null;
}
StompHeaderAccessor accessor = StompHeaderAccessor.getAccessor(clientMessage, StompHeaderAccessor.class);
if (accessor == null || accessor.getReceipt() == null) {
log.warn("receipt header가 존재하지 않습니다. accessor={}", accessor);
return null;
}
log.info("receipt header가 존재합니다. receipt={}", accessor.getReceipt());
InterceptorErrorException ex = (InterceptorErrorException) cause;
ServerSideMessage payload = ServerSideMessage.of(ex.causedBy().getCode(), ex.getErrorCode().getExplainError());
// Message<ServerSideMessage> message = MessageBuilder.createMessage(payload, accessor.getMessageHeaders());
// eventPublisher.publishEvent(SubscribeEvent.of(message)); // 이렇게 처리 안 됨.
StompHeaderAccessor errorHeaderAccessor = StompHeaderAccessor.create(StompCommand.RECEIPT); // 직접 RECIPT 메시지를 생성해서 반환
errorHeaderAccessor.setReceiptId(accessor.getReceipt());
errorHeaderAccessor.setLeaveMutable(true);
extractClientHeaderAccessor(clientMessage, errorHeaderAccessor);
errorHeaderAccessor.setImmutable();
return createMessage(errorHeaderAccessor, payload); // client 연결 해지 안 함.
}
private Message<byte[]> createMessage(StompHeaderAccessor errorHeaderAccessor, ServerSideMessage errorPayload) {
if (errorPayload == null) {
return MessageBuilder.createMessage(EMPTY_PAYLOAD, errorHeaderAccessor.getMessageHeaders());
}
try {
byte[] payload = objectMapper.writeValueAsBytes(errorPayload);
return MessageBuilder.createMessage(payload, errorHeaderAccessor.getMessageHeaders());
} catch (Exception e) {
log.error("[인증 예외] 에러 메시지 생성 중 오류가 발생했습니다.", e);
return MessageBuilder.createMessage(EMPTY_PAYLOAD, errorHeaderAccessor.getMessageHeaders());
}
}
}
코드가 엄청 더러운데, 당연히 리팩토링 구상은 모두 끝냈다.
하지만 날 것 그 자체인 코드를 먼저 보이는 게, 디자인 패턴을 적용한 코드보다 알아보기엔 편할 것 같아서 수정 전 코드를 먼저 올렸다.
clientMessage는 null일 수 있으며, client가 subscribe 요청 시 receipt 헤더를 누락했을 수도 있다.
이런 경우엔 null을 return해서 연결이 끊기지 않도록 유지했다.
처음 언급한 것처럼, 원래는 eventHandler로 message를 전달하려 했으나 정상적으로 동작하지 않는다.
(문제는 정확한 이유도 파악을 못 했다. ㅠ)
하지만 ExceptionHandler를 처음 구상했을 때만 해도, 원래부터 여기서 Receipt 프레임을 전송하려 했었기 때문에 기존처럼 처리하니 정상적으로 전달이 되는 것을 확인할 수 있었다.
두 가지 subscribe에 대한 요청 모두 성공했을 때, client는 이에 대한 receipt frame을 수신함을 확인할 수 있다.
이번엔 채팅방 구독에 실패한 경우도 테스트 해보았는데, 마찬가지로 실패 body가 전송됨을 확인할 수 있었다.
드디어 Interceptor 구독 응답 로직도 모두 처리했다!
🤔 Message를 구독자들에게 전달하기 위해 또 검증을 할 필요가 있을까?
친구가 내 설계를 잘 이해하지 못 했는지, 처음에 했던 질문이 "message를 꺼내서 전달하기 전에 권한을 확인은 어떻게 하는 거냐?"고 물어봤다.
처음에 이 부분을 고려했었는데, 잊고 있다가 질문을 받은 후에 다시 떠올렸다.
결론적으로 필요 없는 절차다.
이미 구독 요청에 성공했다면, 구독자는 모두 message를 받을 권한이 검증된 셈이기 때문이다.
다만, 사용자의 상태(status)에 따라 어떻게 핸들링할 지, 사용자가 구독을 해제했을 때 어떻게 핸들링할 지가 보다 중요해진다.
💡 ExceptionHandler에 대한 Refactoring 수행 후 다음 단계로 진행합니다. 참고해주세요.
⚠️ 에러 로그(`24.10.10)
성공했을 때도 실패했을 때와 동일한 메시지를 body에 담으려 했으나, 지속적으로 실패함을 확인했습니다.
확인 결과 실패했을 때, 의도적으로 StompAccessHeader의 내용을 조작하려 했기 때문인데,
이미 이벤트가 발생한 시점에선 immutable 상태로 정해져 있기 때문에 에러가 발생합니다.
해결 방법은 찾는 중...
2️⃣ Publish 자원 접근 권한 검사 AOP
@MessageMapping("chat.message.{roomId}")
@PreAuthorize("#isAuthenticated(#principal) and @chatRoomAccessChecker.hasPermission(#roomId, 1L)") // forbidden처리 된 access token으로 접근하면?
public void sendMessage(@DestinationVariable String roomId, ChatMessage message, Principal principal) {
log.info("sendMessage {}: ", message);
chatMessageService.sendMessage(message);
}
우리가 원하는 건, 위와 같이 미리 등록된 Bean을 사용하는 방법이다.
이건 정말 쉽게 해결할 수 있다.
public final class PreAuthorizeSpELParser {
...
private static void populateContext(Method method, Object[] args, ApplicationContext applicationContext) {
context.setBeanResolver(new BeanFactoryResolver(applicationContext)); // applicationContext 등록
Parameter[] parameters = method.getParameters();
for (int i = 0; i < parameters.length; i++) {
context.setVariable(parameters[i].getName(), args[i]);
}
}
...
}
EvaluationContext에 Bean을 등록하기만 하면 된다. ㅋㅋ
아주 손쉽게 자원 접근 권한 검사를 끝마쳤다. 👍
5. Local Chat ID Generator & Chat Store
📌 GUID generate strategy
위 포스팅에서 살펴봤듯, 여러 전역 고유 ID 생성 전략 중에 우리의 현재 여건과 가장 적합한 것은 TSID라고 판단했다.
- 64-bit밖에 안 되는 아담한 사이즈에 정수 표현 가능 (DB Join 성능 향상)
- ID에 비지니스적인 의미가 포함되어 있지 않음
- 시간 순 정렬 가능
- 노드가 여러 개 혹은, 단일 노드 멀티 스레드 환경에 대해 무작위 난수 bit에 값을 조절하여 별도의 중복 검사기 없이 안전성 확보 가능
- 노드가 많지 않다면, node bit를 줄여서 timestamp 당 10,000개 이상의 중복 없는 난수 생성 가능
- ID 생성 시간이 가장 빠르진 않지만, UUID와 비교했을 때 훨씬 빠름.
거의 뭐 TSID 찬양하는 광신도처럼 보이지만, 여튼 장점을 열거해봤을 때 우리 서비스를 위한 최적의 방법!
📌 Design
이제 처음 설계한 대로 구현을 하기 위해, 어떻게 설계해야 할 지 고민해보자.
사용자가 chat 메시지를 send하면, server가 인증/인가 검사를 완수한 이후, ID를 생성해야 한다.
1️⃣ 메시지 일관성 문제
public void sendMessage(ChatMessage message) {
// 아이디 생성
// 키-쌍 저장소에 저장
// 전송
rabbitTemplate.convertAndSend(CHAT_EXCHANGE_NAME, "chat.room." + message.roomId(), message);
}
처음에는 위의 플로우대로 수행하면 되지 않을까 싶었다.
하지만 키-쌍 저장소에 저장은 했는데, send에 실패한다면?
Client는 뜬금없이 나중에서야 받지도 못 했던 👻 메시지가 채팅방에 튀어나오는 걸 목격할 수도 있다.
2️⃣ 단일 책임 문제
그렇다면 아래 구조는 어떤가?
public void sendMessage(ChatMessage message) {
// 아이디 생성
// 전송
rabbitTemplate.convertAndSend(CHAT_EXCHANGE_NAME, "chat.room." + message.roomId(), message);
}
@RabbitListener(queues = "${rabbitmq.chat-queue.name}")
public void receiveMessage(ChatMessage message) {
// 키-쌍 저장소에 저장
}
지금까진 단순히 메시지 소비만을 하던 RabbitLitener를 사용해서, queue에 새로운 메시지가 발행되면 꺼내서 키-쌍 저장소에 저장하는 것이다.
하지만 이 방법에도 의구심이 들었던 부분이, 원래 Listener 부분은 메시지를 꺼내서 client의 상태에 따른 푸시 알림 전송을 계획하고 있었기 때문이다.
@RabbitListener(queues = "${rabbitmq.chat-queue.name}")
public void receiveMessage(ChatMessage message) {
// 키-쌍 저장소에 저장
// 메시지의 roomId에 속한 모든 사용자 정보 조회
// 각 사용자의 상태에 따라 메시지 전달
}
이걸 한 곳에서 수행하는 게 과연 옳을까?
하나의 Listener가 너무 많은 책임을 지고 있고, 이를 비동기로 처리한다고 해도 가입자 수가 많은 채팅방이 존재한다면 너무 많은 성능 저하가 발생할 수 있다.
🤔 대체 메시지 저장과 발행은 어떤 순서로 진행해야 하는 거야?
여기서 구조를 더 분할시키는 방법까지 구상하다가, 너무 쓸 데 없이 문제를 어렵게 만드는 거 같았다.
그래서 다시 첫 번째 문제로 돌아가봤는데, "메시지 저장은 성공하고, 메시지 발행에 실패할 수 있다"는 것이 리젝 사유였다.
그렇다면 순서를 뒤집었을 때는 언제나 유효한가? 이게 또 그렇게 쉽게 풀리질 않는다.
convertAndSend()는 AmqpException이라는 검사 예외를 던지기 때문에, try-catch를 이용해서 순서를 역전시킬 수는 있다.
그러나 "메시지 발행은 성공하고, 메시지 저장에는 실패할 수 있다"는 이전보다 더 심각한 문제다.
결국 "메시지 저장과 메시지 발행 중 하나라도 실패하면 rollback 해야 한다"는 원자성 문제(All or Not)로 넘어가게 된다.
3️⃣ Listener을 Tx로 선언하다면?
@Transactional
public void sendMessage(ChatMessage message) {
// 아이디 생성
// 키-쌍 저장소에 저장
// 전송
rabbitTemplate.convertAndSend(CHAT_EXCHANGE_NAME, "chat.room." + message.roomId(), message);
}
어찌보면 위 방법이 적절하다고 생각될 수도 있다.
key-value 저장을 Tx에 속하도록 정의하고, rabbitTemplate가 전송에 성공하면 결과적으로 commit이 되기 때문이다.
반대로 message 저장에 실패하면, commit 또한 실패할 것이므로 원자성이 지켜진다.
하지만 message 전송에 성공했는데, commit에 실패한다면 마찬가지로 일관성이 깨지게 된다.
📌 How to guarantee transaction?
결론은 좀 허무하게 끝나버렸다.
분산 트랜잭션, SAGA, Outbox 패턴에 이벤트 소싱, Dead Letter Queue까지 여러 방면으로 해결책을 구상해보려 했으나,
프리티어 EC2라는 한계로 인해 그냥 무식하게 진행하기로 했다..^^
@Slf4j
@Service
public class ChatMessageService {
private final RabbitTemplate rabbitTemplate;
private final IdGenerator<Long> idGenerator;
private final ChatCacheService chatCacheService;
private final String CHAT_EXCHANGE_NAME;
public ChatMessageService(
IdGenerator<Long> tsidGenerator,
RabbitTemplate rabbitTemplate,
ChatCacheService chatCacheService,
@Value("${rabbitmq.chat-exchange.name}") String CHAT_EXCHANGE_NAME
) {
this.idGenerator = tsidGenerator;
this.rabbitTemplate = rabbitTemplate;
this.chatCacheService = chatCacheService;
this.CHAT_EXCHANGE_NAME = CHAT_EXCHANGE_NAME;
}
public void sendMessage(ChatMessage message, UserPrincipal principal) {
Long messageId = idGenerator.execute(); // 메시지 아이디 생성
ChatCache chat = ChatCache.builder()
.id(ChatId.of(messageId, Long.parseLong(message.roomId())))
.messageFrom(principal.getUserId())
.content(message.content())
.createdAt(LocalDateTime.now())
.build();
try {
saveToRedis(chat); // 키-쌍 저장소에 저장
publishToMessageBroker(message); // 메시지 브로커로 전송
} catch (IllegalArgumentException | OptimisticLockException e) {
log.error("메시지 저장 실패: {}", e.getMessage());
throw new ChatErrorException(ChatErrorCode.CHAT_SAVE_ERROR); // 500
} catch (AmqpException e) {
log.error("메시지 전송 실패: {}", e.getMessage());
throw new ChatErrorException(ChatErrorCode.CHAT_SEND_ERROR); // 500
}
}
private void saveToRedis(ChatCache message) throws IllegalArgumentException, OptimisticLockException {
chatCacheService.save(message);
}
@Retryable(maxAttempts = 3, backoff = @Backoff(delay = 1000))
private void publishToMessageBroker(ChatMessage message) {
rabbitTemplate.convertAndSend(CHAT_EXCHANGE_NAME, "chat.room." + message.roomId(), message);
}
}
- 키-값 저장소에 저장
- 메시지 브로커에 메시지 전달 (실패하면 retry 3회)
- (2)가 실패하면 키-값 저장소에서 데이터 제거
최악의 경우인 2, 3번이 연달아 실패했을 때, 사용자는 새로고침 후에 못 보던 채팅을 확인하게 될 우려가 있다.
그러나 채팅할 때는 봤었던 메시지를 추후에 확인 불가능 하다는 최악의 경우보다는 차악의 경우라 판단하여, 현재로써는 이대로 진행하게 되었다.
6. Refactoring
📌 Channel Interceptor
내가 아무리 개념 익히는 것 자체에 집중해서 코드를 작성하고 있다지만, 이런 쓰레기 코드를 봐주는 것도 한계가 있다.
디자인 패턴 적당히 넣으면 충분히 수정할 수 있을 거 같이 생겨먹었으므로, 고민을 좀 해봤다.
어떤 Handler가 실행이 될 지는 결국 StompCommand 종류에 따라 갈린다.
하나의 Command에 대해 실행되어야 할 Handler는 0개일 수도, 1개일 수도, N개일 수도 있다.
그렇다면 Map<StompCommand, List<StompHandler>> 타입의 자료 구조를 하나 만들고, StompCommand에 대해 매칭되는 Handler를 실행하면 된다.
가장 먼저 떠올랐던 것은 Observer 패턴이지만, 여기서 도입하기엔 한계가 있다.
어떤 핸들러는 요청에 따라 응답을 되돌려주어야 하는 경우도 있기 때문이다.
Factory 패턴을 사용하면 어떨까.
/**
* STOMP 명령어 핸들러 인터페이스
*/
public interface StompCommandHandler {
/**
* 해당 핸들러가 지원하는 명령어인지 확인한다.
* @param command {@link StompCommand} 명령어
*/
boolean isSupport(StompCommand command);
void handle(Message<?> message, StompHeaderAccessor accessor);
}
모든 핸들러는 위 인터페이스를 구현하도록 만든다고 치자.
추후 Factory에서 Command 별로 Handler를 매핑해주기 위한 도우미 메서드 isSupport()와 실제 로직을 처리할 handle() 메서드를 제공해야 한다.
@Slf4j
@Component
@RequiredArgsConstructor
public class StompCommandHandlerFactory {
private final Map<StompCommand, List<StompCommandHandler>> handlers = new EnumMap<>(StompCommand.class);
@Autowired
public StompCommandHandlerFactory(List<StompCommandHandler> allHandlers) {
allHandlers.forEach(this::registerHandler);
log.info("StompCommandHandlerFactory: handlers={}", handlers);
}
private void registerHandler(StompCommandHandler handler) {
Arrays.stream(StompCommand.values())
.filter(handler::supports)
.forEach(command -> {
handlers.computeIfAbsent(command, k -> new ArrayList<>()).add(handler);
log.info("Registered handler {} for command {}", handler.getClass().getSimpleName(), command);
});
}
public List<StompCommandHandler> getHandlers(StompCommand command) {
return handlers.getOrDefault(command, List.of());
}
}
2중 반복인 게 조금 마음에 안 들지만, StompCommand는 개수가 고정이므로 상수 취급한다면
Handler가 많아 봐야 얼마나 많아지겠나...초기화할 때 말고는 호출도 안 되므로 딱히 성능에 지장을 주진 않는다.
StompCommand를 기준으로 StompCommandHandler를 구분해주는 상태를 Factory가 가지고 있다가,
Interceptor에서는 다음과 같이 호출하면 되지 않을까?
@Slf4j
@Component
@RequiredArgsConstructor
public class StompInboundInterceptor implements ChannelInterceptor {
private final StompCommandHandlerFactory stompCommandHandlerFactory;
@Override
public Message<?> preSend(@NonNull Message<?> message, @NonNull MessageChannel channel) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (accessor != null && accessor.getCommand() != null) {
log.info("[StompInboundInterceptor] command={}", accessor.getCommand());
for (StompCommandHandler handler: stompCommandHandlerFactory.getHandlers(accessor.getCommand())) {
handler.handle(message, accessor);
}
}
return message;
}
}
얼핏보기에 큰 문제가 없어보이므로, CONNECT와 SUBSCRIBE 핸들러 두 개를 구현해보자.
public interface ConnectCommandHandler extends StompCommandHandler {
}
@Slf4j
@Component
@RequiredArgsConstructor
public class AuthenticateHandler implements ConnectCommandHandler {
private final AccessTokenProvider chatAccessTokenProvider;
private final UserService userService;
@Override
public boolean isSupport(StompCommand command) {
return StompCommand.CONNECT.equals(command);
}
@Override
public void handle(Message<?> message, StompHeaderAccessor accessor) {
String accessToken = extractAccessToken(accessor);
JwtClaims claims = chatAccessTokenProvider.getJwtClaimsFromToken(accessToken);
Long userId = JwtClaimsParserUtil.getClaimsValue(claims, AccessTokenClaimKeys.USER_ID.getValue(), Long::parseLong);
authenticateUser(accessor, userId);
}
private String extractAccessToken(StompHeaderAccessor accessor) {
String authorization = accessor.getFirstNativeHeader("Authorization");
if ((authorization == null || !authorization.startsWith("Bearer "))) {
log.warn("[인증 핸들러] 헤더에 Authorization이 없거나 Bearer 토큰이 아닙니다.");
throw new JwtErrorException(JwtErrorCode.EMPTY_ACCESS_TOKEN);
}
return authorization.substring(7);
}
private void authenticateUser(StompHeaderAccessor accessor, Long userId) {
User user = userService.readById(userId);
Principal principal = UserPrincipal.from(user);
log.info("[인증 핸들러] 사용자 인증 완료: {}", principal);
accessor.setUser(principal);
}
}
public interface SubscribeCommandHandler extends StompCommandHandler {
}
@Slf4j
@Component
public class ExchangeConvertHandler implements SubscribeCommandHandler {
private static final String REQUEST_EXCHANGE_PREFIX = "/sub/";
private static final String CONVERTED_EXCHANGE_PREFIX = "/exchange/chat.exchange/";
@Override
public boolean isSupport(StompCommand command) {
return StompCommand.SUBSCRIBE.equals(command);
}
@Override
public void handle(Message<?> message, StompHeaderAccessor accessor) {
String destination = accessor.getDestination();
if (destination != null && destination.startsWith(REQUEST_EXCHANGE_PREFIX)) {
String convertedDestination = convertDestination(destination);
log.info("[Exchange 변환 핸들러] destination={}, convertedDestination={}", destination, convertedDestination);
accessor.setDestination(convertedDestination);
}
}
private String convertDestination(String destination) {
return destination.replace(REQUEST_EXCHANGE_PREFIX, CONVERTED_EXCHANGE_PREFIX);
}
}
원래는 마커 인터페이스로 사용하려고, StompCommandHandler를 확장한 추가 인터페이스를 만들었는데...
필요가 없다 ㅎ.
ez하게 리팩토링 클리어.
📌 ExceptionHandler
현재 문제는 두 가지가 존재한다.
- 구현체의 코드들이 너무 더럽다. interface를 구현하긴 하는데, 그 외에 아무런 제약도 없으니 아주 미쳐날뛰기 딱 좋다.
- 중복 코드가 너무 많다.
그래서 어찌할까 하다가, 첫 번째 문제는 템플릿 메서드 패턴을 적용하여 해결할 수 있을 것 같았다.
이유는 AuthenticateExceptionHandler와 SubscribeExceptionHandler의 로직 구조가 매우 유사함을 볼 수 있었기 때문
물론 완전히 다른 방식의 처리 로직을 갖는 Handler가 나타날 수도 있기 때문에, StompExceptionHandler 인터페이스는 그대로 유지할 필요가 있다.
대신, 해당 인터페이스를 구현한 추상 클래스를 하나 더 만들고, 여기서 템플릿 메서드 패턴을 적용해보면 될 것이라 판단했다.
두 번째 문제는 고민이 많긴 한데, Util로 분리하면 해결할 수는 있다.
하지만 Util로 분리하는 건 항상 신중해야 한다.
이게 정말 중복인지, 우발적 중복인지 지금으로썬 분간이 안 가기 때문..
따라서 Util로 분리하는 로직은 정말 최소한의 로직으로 가져가는 것으로 결정했다.
우선, 현재 구현된 두 ExceptionHandler의 특징을 분석해서, 보편적인 예외 핸들러의 정의를 세워봐야 한다.
AuthenticateExceptionHandler의 handle 로직은 총 4부분으로 나눌 수 있다.
- StompHeaderAccessor 생성
- ServerSideMessage 생성
- ClientHeaderAccessor 정보를 (1)에서 생성한 accessor로 옮김.
- 전송 message 생성
SubscribeExceptionHandler는 구현에 급급해서 코드가 너무 지저분하고, 순서도 뒤죽박죽이지만 잘 나눠보면 흐름이 동일하다.
- clientMessage 유효성 검사
- ServerSideMessage 생성
- StompHeaderAccessor 생성
- ClientHeaderAccessor 정보를 (3)에서 생성한 accessor로 옴김
- 전송 message 생성
(1)이 추가되긴 했지만, 그 외는 이전과 동일하게 순서를 맞춰도 문제없이 동작한다.
그렇다면 (1)의 진행은 선택값으로 남겨두고, 나머지를 분리하면 되지 않겠는가.
/**
* STOMP 예외 처리를 위한 추상 기본 클래스.
* 이 클래스는 공통적인 예외 처리 로직을 제공하며, 구체적인 예외 처리 동작은 하위 클래스에서 구현합니다.
*/
@Slf4j
public abstract class AbstractStompExceptionHandler implements StompExceptionHandler {
protected final ObjectMapper objectMapper;
public AbstractStompExceptionHandler(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}
@Override
public Message<byte[]> handle(Message<byte[]> clientMessage, Throwable cause) {
if (isNullReturnRequired(clientMessage)) {
return null;
}
StompHeaderAccessor accessor = StompHeaderAccessor.create(getStompCommand());
accessor.setLeaveMutable(true);
extractClientHeaderAccessor(clientMessage, accessor);
ServerSideMessage payload = getServerSideMessage(cause);
if (payload != null) {
accessor.setMessage(payload.code());
}
accessor.setImmutable();
return StompMessageUtil.createMessage(accessor, payload, objectMapper);
}
/**
* 클라이언트 메시지의 유효성을 검사합니다.
* 기본 구현은 항상 false를 반환합니다. 필요한 경우 하위 클래스에서 재정의할 수 있습니다.
*
* @param clientMessage 클라이언트로부터 받은 원본 메시지
* @return null을 반환해야 한다면 true, 그렇지 않다면 false
*/
protected boolean isNullReturnRequired(Message<byte[]> clientMessage) {
return false;
}
/**
* 이 핸들러가 사용할 STOMP 명령을 반환합니다.
*
* @return STOMP 명령
*/
protected abstract StompCommand getStompCommand();
/**
* 주어진 예외를 기반으로 {@link ServerSideMessage}를 생성합니다.
*
* @param cause 발생한 예외
* @return 생성된 ServerSideMessage
*/
protected abstract ServerSideMessage getServerSideMessage(Throwable cause);
}
위에서 정의한 대로 handle() 메서드를 템플릿 메서드 패턴으로 재정의 하면 이렇게 표현할 수 있다.
우선, 미결정적인 값들은 다음과 같았다.
- null이 반환되어야 하는 예외 처리 조건 검사
- 메시지의 StompCommand 타입
- 생성해야 하는 Message payload
그리고 확정적으로 정할 수 있는 부분은 createMessage 부분이었다. (이건 서버와 클라이언트 간의 약속이므로)
문제는 extractClientHeaderAccessor() 메서드가 관건이었다.
/**
* STOMP 인터셉터에서 발생한 예외를 처리하기 위한 인터페이스
*/
public interface StompExceptionHandler {
...
/**
* client로부터 받은 메시지의 HeaderAccessor에서 필요한 정보를 추출하는 편의용 메서드
* 기본으로는 receiptId만을 추출하도록 구현되어 있으며, 필요한 정보가 있다면 해당 메서드를 구현하여 사용한다.
*
* @param clientMessage {@link Message}: client로부터 받은 메시지
* @param errorHeaderAccessor {@link StompHeaderAccessor}: client에게 보낼 메시지를 생성하기 위한 errorHeaderAccessor
*/
default void extractClientHeaderAccessor(@NonNull Message<byte[]> clientMessage, @NonNull StompHeaderAccessor errorHeaderAccessor) {
StompHeaderAccessor clientHeaderAccessor = MessageHeaderAccessor.getAccessor(clientMessage, StompHeaderAccessor.class);
if (clientHeaderAccessor != null) {
String receiptId = clientHeaderAccessor.getReceipt();
if (receiptId != null) {
errorHeaderAccessor.setReceiptId(receiptId);
}
}
}
}
원래 유연하게 사용하라고 interface에서 default method로 작성해뒀는데, 매개변수를 NonNull로 잡는 바람에 템플릿 메서드 패턴을 적용하는데 상당히 방해가 된다.
그렇다고 Util로 제외하자니, 어떤 경우엔 ClientHeader에서 특수한 값들을 더 추출하는 경우가 있을 수도 있다고 생각했다.
/**
* 클라이언트 메시지에서 필요한 헤더 정보를 추출하여 새로운 StompHeaderAccessor에 설정합니다.
* 기본적으로 receiptId를 추출합니다. 하위 클래스에서 필요에 따라 재정의할 수 있습니다.
*
* @param clientMessage 클라이언트로부터 받은 원본 메시지 (null일 수 있음)
* @param accessor 새로 생성된 StompHeaderAccessor
*/
default void extractClientHeaderAccessor(Message<?> clientMessage, StompHeaderAccessor accessor) {
if (clientMessage != null) {
StompHeaderAccessor clientHeaderAccessor = MessageHeaderAccessor.getAccessor(clientMessage, StompHeaderAccessor.class);
if (clientHeaderAccessor != null) {
String receiptId = clientHeaderAccessor.getReceipt();
if (receiptId != null) {
accessor.setReceiptId(receiptId);
}
}
}
}
그래서 그냥 위와 같이 만들어버렸다. (3중 중첩 역겹긴 한데...단순하니까 뭐...)
메서드를 abstract로 옮기는 것도 고민했었지만, interface를 구현한 Handler가 너무 불편해질 거 같아서 냅뒀다.
/**
* STOMP 메시지 처리를 위한 유틸리티 클래스.
* 이 클래스는 STOMP 헤더 액세서 생성 및 메시지 생성과 관련된 공통 기능을 제공합니다.
*/
@Slf4j
@UtilityClass
public class StompMessageUtil {
private static final byte[] EMPTY_PAYLOAD = new byte[0];
/**
* StompHeaderAccessor와 페이로드를 사용하여 STOMP 메시지를 생성합니다.
*
* @param accessor {@link StompHeaderAccessor}
* @param payload 메시지 페이로드 (null일 수 있음)
* @param objectMapper Jackson ObjectMapper
* @return 생성된 STOMP 메시지
*/
public static Message<byte[]> createMessage(StompHeaderAccessor accessor, ServerSideMessage payload, ObjectMapper objectMapper) {
if (payload == null) {
return MessageBuilder.createMessage(EMPTY_PAYLOAD, accessor.getMessageHeaders());
}
try {
byte[] serializedPayload = objectMapper.writeValueAsBytes(payload);
return MessageBuilder.createMessage(serializedPayload, accessor.getMessageHeaders());
} catch (JsonProcessingException e) {
log.error("Error serializing payload", e);
return MessageBuilder.createMessage(EMPTY_PAYLOAD, accessor.getMessageHeaders());
}
}
}
Util은 Message를 생성하는 함수인 createMessage 메서드만을 갖도록 했다.
근데 이걸 Util로 분리하는 게 맞는지 판단이 잘 안 서네..
추상 클래스 메서드로 넣어도 무리가 없을 거 같긴한데, 워낙 자주 사용하는 패턴이라 생각해서 일단 분리해봤다. (일단 분리해본 시점에서 설계 원칙 어긴 건가 😇)
여기까지 하면, 기존 ExceptionHandler를 수정할 일만 남았는데 아주 단순해진다.
@Slf4j
@Component
public class AuthenticateExceptionHandler extends AbstractStompExceptionHandler {
public AuthenticateExceptionHandler(ObjectMapper objectMapper) {
super(objectMapper);
}
@Override
public boolean canHandle(Throwable cause) {
return cause instanceof JwtErrorException;
}
@Override
protected StompCommand getStompCommand() {
return StompCommand.ERROR;
}
@Override
protected ServerSideMessage getServerSideMessage(Throwable cause) {
JwtErrorException ex = (JwtErrorException) cause;
ex = JwtErrorCodeUtil.determineAuthErrorException(ex);
log.warn("[인증 예외] {}", ex.getErrorCode().getMessage());
return ServerSideMessage.of(ex.getErrorCode().getExplainError());
}
}
@Slf4j
@Component
public class SubscribeExceptionHandler extends AbstractStompExceptionHandler {
public SubscribeExceptionHandler(ObjectMapper objectMapper) {
super(objectMapper);
}
@Override
public boolean canHandle(Throwable cause) {
if (cause instanceof InterceptorErrorException ex) {
return ex.getErrorCode().isSupportCommand(StompCommand.SUBSCRIBE);
}
return false;
}
@Override
protected StompCommand getStompCommand() {
return StompCommand.RECEIPT;
}
@Override
protected ServerSideMessage getServerSideMessage(Throwable cause) {
InterceptorErrorException ex = (InterceptorErrorException) cause;
return ServerSideMessage.of(ex.causedBy().getCode(), ex.getErrorCode().getExplainError());
}
@Override
protected boolean isNullReturnRequired(Message<byte[]> clientMessage) {
if (clientMessage == null) {
log.warn("receipt header가 존재하지 않습니다. clientMessage={}", clientMessage);
return true;
}
StompHeaderAccessor accessor = StompHeaderAccessor.getAccessor(clientMessage, StompHeaderAccessor.class);
if (accessor == null || accessor.getReceipt() == null) {
log.warn("receipt header가 존재하지 않습니다. accessor={}", accessor);
return true;
}
return false;
}
}
아, 코드 이쁘다 ㅎ.
이번 리팩토링은 제법 애먹어서 뿌듯하다.
여전히 잘 도착하는 것까지 확인할 수 있었다.