토스 SLASH 23 중 ‘실시간 시세 데이터 안전하고 빠르게 처리하기’ 발표를 보고 정리한 내용입니다.
발표에서 사용되는 키워드를 중심으로 학습을 진행했습니다.
또한, 제 생각을 적어놓은 것들이 많아 틀린 부분에 대해서 말씀해 주시면 감사하겠습니다.
시세 플랫폼이 하는 일
시세 플랫폼은 거래소 데이터를 제일 먼저 수신하고 가공한 뒤, 내부 서비스들에게 제공한다. 거래소에 있는 데이터 전문을 디코딩하고 전달하며, 주식차트처럼 과거 데이터를 누적하거나 여러 정보를 합성하여 제공하기도 한다.
내부 서비스들은 이 가공된 데이터 중 각자 필요한 정보를 실시간 혹은 API를 통해 얻게 되고, 최종적으로 토스 앱에도 보이는 것이다.
시세 플랫폼의 최우선 목표는 낮은 지연시간과 빠른 장애복구이다.
시세 플랫폼 파트
시세 플랫폼은 총 3가지 파트가 존재하게 된다.
수신부
거래소가 제공하는 시세 데이터를 UDP 멀티캐스트 그룹에 접속해서 읽어오는 일을 한다.
처리부에게 데이터를 전송할 때 수신 시각을 헤더에 포함하여 처리부에서 총 처리 시간을 측정하는 데 사용한다.
UDP에서는 unicast, broadcast, multicast가 있다.
unicast는 단일 발신자, 수신자가 통신하는 형태
broadcast는 하나의 발신자가 가능한 모든 수신자와 통신
mutlicast는 하나의 발신자와 여러 특정 수신자와 통신, 즉, 해당 데이터를 받고 싶다고 등록한 수신자들에게만 데이터를 전송하는 통신 방법
근데 UDP를 사용하게 되면 무조건 데이터가 수신자에게 도달한다는 보장이 없다. KRX로부터 데이터를 받을 때, 다른 증권사는 데이터가 있지만 토스에서는 데이터를 받지 못할 경우, 이 또한 잘못된 정보지 않을까라는 생각이 든다. 그러면 이를 어떻게 해결할 수 있을까..?
처리부
비즈니스 로직이 모여있는 곳으로 처리 결과를 redis에 저장하거나 실시간 정보를 서비스들에게 바로 전달한다.
비즈니스 로직 중에는 blocking I/O가 있기 때문에 처리 시간에 가장 많은 영향을 주는 곳이다.
조회부
조회부는 서비스들에게 rest api를 제공한다.
장애 상황 발생 가정
만약 장 중에 장애가 일어날 경우를 가정해 본다.
처리부 장애
시세 데이터를 사용하는 모든 서비스에 영향을 주게 되어 결국 사용자에게까지 잘못된 정보를 제공하게 된다.
장애를 복구하더라도 api를 사용하는 서비스들은 여전히 장애를 겪게 된다. 왜냐하면 장애동안 데이터 유실 혹은 오염될 수 있기 때문이다.
이를 해결하기 위해 처리부와 redis를 두 개의 그룹으로 만든다.
SPOF였던 처리부를 하나 더 만듦으로써 해결하는 것 같다.
평상시에 A 처리부만 서비스에 할당하다가 장애 발생 시 B 처리부로 전환하는 것이다.
오염된 데이터를 장 중에 복구하는 게 아닌 문제가 없는 디비를 빨리 사용하는 것이 중요하고, 이로 인해서 배포에도 긍정적인 영향을 준다.
시세는 정말 실시간성과 데이터 정합성 두 요소 모두 챙겨야 하기 때문에 장 중에는 장애를 고치는 것보다 차라리 여러 개의 대안들을 가지고 있어서 바로바로 적용하는 게 나아보인다.
여기서 중복 데이터가 발생할 수 있기 때문에 ZooKeeper를 통해 리더를 선출하고, 각 그룹의 리더만 데이터를 처리하도록 했다.
하지만 처리부의 개수가 늘어날수록 수신부는 처리부의 개수만큼 반복해서 데이터를 보내야 하는데 이때 성능이 개수에 비례하면서 성능이 떨어지게 된다.
이 문제를 해결하기 위해 메시지 브로커를 사용하는 것이다.
메시지 브로커를 사용함으로써 수신부, 처리부를 디커플링 하고, 수신부는 데이터를 한 번만 전송해도 된다.
하지만 목표가 낮은 지연시간인데 메시지 브로커로 인해 지연시간이 늘어나게 된다.
바로 데이터를 보내는 게 아니라 메시지 브로커와 같은 중간 서버를 하나 더 사용하게 되면 통신이 늘어나기 때문에 기존보다 지연시간이 늘어나는 것 같다.
그래서 메시지 브로커 선택이 아주 중요하기 때문에 3가지 후보 중 하나를 골랐다.
메시지 브로커 선택
UDP 멀티캐스트
- 해당 설정을 위해 라우터 설정과 k8s 배포 설정이 필요해서 빠른 개발을 위해 선택하지 않음
카프카
- 높은 처리량과 뛰어난 안전성을 제공 및 사내에서 사용하기 때문에 초기에 사용
redis pub/sub
- 자체 테스트 결과 redis pub/sub이 카프카보다 지연시간이 낮아서 레디스를 선택함
결국 카프카는 15ms, 레디스는 3ms 지연이 발생하므로 레디스를 선택하게 됐다.
많은 메시지 브로커들은 메시지를 받으면 내부 큐에 저장하지만 레디스는 메시지를 받는 즉시 채널에 등록되어 있는 구독자들에게 보내버린다. 하지만 만약 구독자가 없다면 데이터를 버리게 된다.
레디스 pub/sub 안에는 pubsub_channel이라는 dictionary 타입의 변수가 있는데 이곳에 모든 채널과 구독자 정보를 보관한다.
구독자가 특정 채널을 구독하면, 그 채널에 해당하는 연결 리스트에 구독자 정보를 추가하고, 발행자가 메시지를 보내면, dictionary에서 채널을 찾고, 그 채널에 해당하는 연결리스트를 모두 순회하면서 차례대로 구독자들에게 메시지를 전송한다.
채널톡 기술블로그에서 pubsub_channel의 실제 타입을 Map<ChannelName, Set<Client>> 라 명시해 주었다.
즉, 특정 채널 이름을 기준으로 클라이언트들을 찾아서 Set을 순회하며 메시지를 보낸다고 정리할 수 있다.
다른 자료구조로는 pubsub_patterns라는 것이 존재한다. pubsub_patterns는 LinkedList<Pair<PatternStr, Client>>를 가진다고 한다.
정리하면 pubsub_channel은 고정된 채널 이름을 가진 채널들에게 메시지를 보내는 것이고, pubsub_patterns는 패턴과 일치하는 모든 채널들에게 메시지를 보내는 것이라 할 수 있다. ex) news.sports, news.weather가 있을 때 news.*를 구독하면 해당 표현식에 맞는 client 들에게 메시지를 보내게 된다.
이제 메시지를 전송할 때, 처리부는 Redis와 TCP 연결을 맺은 후, 소켓의 수신 버퍼로부터 데이터를 읽고 비즈니스를 처리한다.
여기서 가장 중요한 것은 아무리 레디스에서 처리부에게 데이터를 빠르게 송신한 다고 해도, 처리부의 읽기 속도가 느려지게 되면 TCP의 특징 중 하나인 흐름 제어로 인해 레이턴시가 생기게 된다.
TCP 흐름제어는 송신속도가 수신속도보다 빠를 경우 데이터 유실을 막기 위한 방법으로, 수신자는 소켓 수신 버퍼를 기준으로 한 번에 받을 수 있는 윈도우 사이즈를 송신자에게 전달해 주고, 송신자는 다음 차례에 윈도우 사이즈만큼 데이터를 보내 데이터 유실을 막는다.
이 방법을 생각해 냈다는 것에서 정말 감탄했다. 발표자 분께서는 지속적인 성능 테스트를 통해 레디스에서는 빠르게 메시지를 보내고 있는데도 불구하고, 성능이 나오지 않는 것을 겪으셨을 것 같다. 그 이유를 찾기 위해 TCP 흐름 제어까지 나오는 것을 보고, CS 공부가 정말 필요하구나라는 것을 느낄 수 있었다.
그러면 이를 어떻게 해결할 수 있을까?
처리부의 속도를 레디스에서 송신하는 속도에 가깝도록 빨라져야 한다.
현재 처리부에서 데이터 읽기와 비즈니스 처리를 하고 있는데 이를 다른 스레드로 분리한다.
즉, 데이터 읽기 스레드, blocking I/O를 수행하는 비즈니스 처리 스레드로 나눈다.
발표자 분은 Spring Data Redis가 제공하는 ReactiveRedisTemplate을 사용했고, Spring Data Redis는 Lettuce라는 Redis Client 라이브러리를 기본적으로 사용한다.
Lettuce는 네트워크 라이브러리인 Netty를 사용하고, Netty의 Channel은 Socket을 추상화한 레이어로서 커넥션이 맺어진 이후 EventLoop에 등록된다.
라인 기술블로그를 보면 Java NIO에서는 채널+버퍼, 셀렉터를 통해서 멀티플렉싱 기술을 선택해 사용한다고 한다. 발표에서도 설명하듯이 Channel은 Socket을 추상화한 것으로, 채널과 버퍼는 클라이언트 ↔ 서버에서 데이터를 주고받을 때 사용된다. 즉, TCP 소켓의 수신 버퍼로부터 채널을 통해 데이터를 읽어 들이게 된다. Netty는 Java NIO의 Selector와 Reactor 패턴을 기반으로 구현되어있다고 한다. Reactor는 무한 반복문을 실행해 이벤트가 발생할 때까지 대기하다가 이벤트가 발생하면 처리할 수 있는 핸들러에게 디스패치한다고 한다. 즉, Reactor 자체가 EventLoop이다.
여기서 EventLoop가 무한루프를 돌면서 수신버퍼의 데이터를 읽는 역할을 한다.
이벤트 루프는 동시성을 제공하기 위한 프로그래밍 모델이다. 특정 이벤트가 발생할 때까지 대기하다가 이벤트나 메시지가 발생하면 디스패치해 처리하는 방식으로 동작하게 된다.
라인 기술블로그에서는 Armeria로 설명해 주었지만, 작성자 분께서는 비동기 라이브러리나 프레임워크를 사용할 때도 포함된다고 작성해 주셨다. 로직 처리 중 blocking I/O를 하는 작업이 있다면 해당 작업 때문에 이후 API 요청들이 제대로 처리할 수 없게 되거나 응답 속도가 매우 느려지는 경우가 있다고 한다. 그래서 발표에서도 Blocking I/O를 하는 비즈니스 처리를 아예 다른 스레드로 분리하였다.
EventLoop은 OS에 따라 NIO, Epoll, KQueue 등 여러 방식을 지원 그중 NIO를 설명한다.
NioEventLoop은 실행되고, 버퍼에 데이터가 있는지 확인하고, 데이터를 읽고 변환하여 결과를 통보함. 그러므로 NioEventLoop가 비즈니스 로직을 처리하지 않는 것이 중요하다.
비즈니스 처리는 blocking I/O이기 때문에 성능을 높이기 위해서 멀티 스레딩을 사용해야 하지만, 이는 처리 순서가 역전이 될 수도 있다. 체결 전문이 짧은 시간 동안 여러 개 올경우, 처리 순서가 바뀌게 되면은 엉뚱한 가격이 보이게 된다.
예를 들어, xx가 1000, 1200, 1100인데 순서가 역전될 경우 1000, 1100, 1200이 되어서 잘못된 정보가 전달될 수 있다.
이를 방지하기 위해 멀티스레딩이 아닌 EventLoopGroup을 사용한다. EventLoopGroup은 큐를 사용하여 순서를 보장하고, 하나의 스레드만 사용하기 때문에 동기화가 필요 없다.
EventLoopGroup을 종목코드에 대한 hash function을 돌려서 구분한다. 종목코드를 알기 위해서는 JSON을 객체로 변환해야 한다.
이 변환 작업이 트래픽 양이 증가함에 따라 NioEventLoop의 CPU 자원을 많이 사용하여 지연의 원인이 된다는 것이다.
이렇게 성능이 매우 중요한 곳에서는 데이터 변환 작업 또한 지연의 원인이라고 할 만큼 많은 영향을 끼칠 수 있다는 것을 알 수 있었다.
근데 위에서 말했듯이 아마 객체 변환 자체의 로직이 아닌, TCP 소켓으로부터 읽어오는 시간 자체가 짧아야만 송신 버퍼로부터 읽어오는 값이 더더욱 많아지기 때문에 최대한 데이터 읽는 로직이 빨라야 한다. 객체 변환을 하게 되면 그만큼 시간이 더해지기 때문에 지연의 원인이 된다는 것 같다.
즉, ‘데이터를 읽기’하는 작업 이외에는 최대한 다른 작업을 하지 않도록 하는 것을 목표로 하는 것 같다. 꼭 필요한 작업만을 하는 것 같다.
그렇다면 이 문제를 어떻게 해결할까?
발표자는 Redis pub/sub에서 제공하는 channel을 사용했다.
수신부가 데이터를 보낼 때, 처리부의 EvnetLoop 개수만큼 채널을 나누어 보내고, 처리부는 이 채널명을 보고 해당하는 EventLoop을 찾는 것이다.
결국 채널의 번호와 EventLoop 번호를 일치시키는 것이다.
0번 채널이면 0번 EventLoop로, 3번 채널이면 3번 EventLoop로 말이다.
이를 통해 수신부에서 발송한 데이터의 순서를 처리부에서 그대로 유지할 수 있고, 무엇보다 NioEventLoop에서 더 이상 객체 변환을 하지 않아도 되기 때문에 성능도 이전보다 올라가게 된다.
EventLoop 구현 방법
- ThreadPoolTaskExecutor 사용하고 corePoolSize = maxPoolSize = 1로 설정함으로써 쉽게 만들 수 있음
- 큐가 꽉 찰 경우를 위한 정책은 DiscardOldestPolicy를 사용
- 가장 오래된 작업을 지운 후 새 작업을 넣는 circular q 와 유사
- 데이터를 유실할 수 있지만 실시간이 중요함
- queuecapacity는 정답이 없음
- 크기가 너무 작으면 피크시간에 데이터 유실하고, 너무 크면 오래된 데이터가 큐에 적재되어 실시간성을 떨어뜨림
EventLoopGroup
- List<ThreadPoolTaskExecutor> 사용
- 리스트 개수는 EventLoop 개수를 의미하고, 이 개수를 줄이려고 노력함
- 왜냐하면 EventLoop의 개수가 늘어날수록 context switching으로 인해 NioEventLoop의 성능에도 악영향을 주기 때문
- 하지만 무조건 개수를 줄이면, EventLoop에 backpressure가 발생해서 지연시간이 훨씬 늘어나게 됨
backpressure라는 것은 요청하는 속도가 응답하는 속도보다 매우 빠를 때, 응답 쪽에서 적당히 주라는 신호를 요청하는 곳에 보내는 것이다. 흐름제어 메커니즘이라고 생각하면 될 것 같다.
성능 테스트 중 목표로 하는 트래픽을 발생시키고, 각 EventLoop에 얼마나 많은 대기가 발생하는지 모니터링하면서 적정 개수를 찾아나갔다.
성능 개선을 할 때 어느 정도를 목표로 둘 것인지를 정하는 게 중요하다고 생각 들었다.
구체적인 목표를 바탕으로 시나리오를 작성하면 개선하는 과정이 시간도 줄일 수 있고, 지정한 목표가 있기 때문에 작업의 마무리도 구분할 수 있다.
EventLoop 개수 줄이기
Non-blocking I/O, local cache, 무거운 작업을 위한 별도 eventLoopGroup 사용을 통해 EventLoop의 개수를 줄이려고 노력했다.
레디스에 저장 시 non-blocking I/O 사용
비즈니스 처리를 하는 EventLoop은 한 번에 한 작업만 실행하는데, 작업의 마지막에는 redis에 데이터를 저장한다. 하지만 redis에 데이터가 저장되기까지 굳이 기다릴 필요가 없기 때문에 ReactiveRedisTemplate의 비동기 함수를 이용하여 EventLoop가 더 이상 Blocking 되지 않고 다음 작업을 처리할 수 있도록 했다.
데이더 조회 시 local cache 사용
비즈니스 로직 중 레디스에서 과거 데이터를 조회하는 경우가 많은데, 해당 데이터는 순서가 중요하기 때문에 비동기로 사용할 수 없다. 그래서 매번 레디스에서 데이터를 읽기보다는 로컬 캐시에서 데이터를 읽어 Blocking I/O를 많이 줄일 수 있었다.
이렇게 할 수 있었던 이유는 데이터의 성격 때문인 것 같다. 과거 데이터는 수정될 수 없기 때문에 로컬 캐시와 레디스 간 데이터 정합성을 맞춰줄 필요가 없기 때문에 바로 로컬 캐시에서 가져올 수 있는 것 같다.
무거운 작업을 위한 별도 EventLoopGroup 분리
배치 작업이나 MySQL 저장과 같은 상대적으로 무거운 작업은 기존의 EventLoop이 아닌 별도의 그룹에서 처리하도록 한다.
높은 트래픽 상황에서 처리량이 더 이상 올라가지 않는 상황
이때 netstat나 ss를 통해 Socket의 수신, 송신 버퍼를 눈으로 직접 확인한다.
Recv-Q와 Send-Q는 해당 Socket이 얼마나 처리 대기 상태로 남아있는지 의미하는데, 특히 Recv-Q가 0보다 지속적으로 크다면 다음과 같은 방법들을 시도해 볼 수 있다.
NioEventLoop 확인
- 데이터 변환, 객체 생성 혹은 로깅 등과 같은 사소한 처리도 높은 트래픽 상황에는 성능 저하의 원인이 된다. 앞에서 redis channel를 통해 데이터 디코딩을 분리한 것 같이 읽기 성능을 높이기 위해 작은 부분이라도 다른 thread에게 위임해야 한다.
- CPU Profiling을 통해 프로세스 내에 불필요한 thread를 찾아내어 제거하거나 개수를 줄이면 컨텍스트 스위칭이 줄어듦
- NioEventLoop 개수를 늘려 병렬적으로 처리함으로써 Socket 버퍼 읽기 속도를 올려줄 수 있음
REFERENCES
'컨퍼런스 감상문' 카테고리의 다른 글
[SLASH 23] '토스는 Gateway 이렇게 씁니다'를 보고 (2) | 2023.11.17 |
---|