Netty의 스레드 모델
Netty는 비동기 네트워크 프레임워크 입니다. 이번 글에서는 이 비동기 프레임워크가 어떻게 동작하는지 살펴 보겠습니다.
Netty는 Channel에서 발생하는 이벤트들을 EventLoop가 처리하는 구조입니다. 여기서 EventLoop에 대해서 한번 살펴보고 글을 이어가겠습니다.
이벤트 루프
이벤트 루프란 이벤트를 실행하기 위한 무한루프 스레드를 말합니다.
위의 그림과 같이 객체에서 발생한 이벤트는 이벤트 큐에 입력되고 이벤트 루프는 이벤트 큐에 입력된 이벤트가 있을 때 해당 이벤트를 꺼내서 이벤트를 실행합니다. 이것이 이벤트 루프의 기본 개념입니다. 이벤트 루프는 지원하는 스레드 종류에 따라서 단일 스레드 이벤트 루프와 다중 스레드 이벤트 루프로 나뉘고, 이벤트 루프가 처리한 이벤트의 결과를 돌려주는 방식에 따라서 콜백 패턴과 퓨처 패턴으로 나뉩니다. Netty는 이 두 가지 패턴을 모두 지원합니다.
단일 스레드 이벤트 루프
단일 스레드 이벤트 루프는 이벤트 를 처리하는 스레드가 하나인 상태를 말합니다. 이벤트 루프의 구현이 단순하고 예측 가능한 동작을 보장하게 되죠. 하나의 스레드가 이벤트 큐에 입력된 이벤트를 처리하므로 이벤트가 발생한 순서를 보장하게됩니다. 하지만 멀티 코어 CPU를 효율적으로 사용하지 못하며 특정 이벤트를 처리하는데 blocking이 된다면 다음 이벤트는 이전 이벤트가 처리되기까지 기다려야합니다.
다중 스레드 이벤트 루프
다중 스레드 이벤트 루프는 이벤트를 처리하는 스레드가 여러개인 모델입니다. 단일 스레드 이벤트 루프에 비해서 프리엠워크의 구현이 복잡하지만, 이벤트 루프 스레드들이 이벤트 메서드를 병렬로 수행하므로 멀티 코어 CPU를 효율적으로 사용합니다. 단점으로는 여러 이벤트 루프 스레드가 이벤트 큐 하나에 접근하므로 여러 스레드가 자원 하나를 공유할 때 발생하는 스레드 경합이 발생합니다. 또한, 이벤트들이 병렬로 처리되므로 이벤트의 발생 순서와 실행 순서가 일치하지 않게됩니다.
다중 스레드 아키텍쳐는 자원의 효율적 사용이라는 장점이 있지만 컨텍스트 스위칭 비용과 스레드 경합이라는 단점도 존재합니다. 다중 스레드 어플리케이션에서 스레드들이 하나의 자원을 공유할 때 각 스레드는 공유 자원의 단일 액세스 권한을 획득하려고 스레드 경합을 벌입니다. 스레드 경합은 CPU를 소비하며 스레드가 많아질수록 스레드 경합에 더 많은 CPU자원을 사용하게 됩니다. 비즈니스 로직에 사용되어야 할 CPU자원이 불필요하게 낭비되는 것이죠.
Netty의 이벤트 루프 모델
Netty는 다중 스레드 이벤트 루프의 단점인 이벤트 발생 순서와 실행 순서의 불일치 문제를 다음과 같이 해결합니다.
- Netty의 이벤트는 Channel에서 발생합니다.
- 각각의 이벤트 루프 객체는 개인의 이벤트 큐를 가지고 있습니다.
- Netty의 Channel은 하나의 이벤트 루프에 등록됩니다.
- 하나의 이벤트 루프 스레드에는 여러 채널이 등록될 수 있습니다.
Netty의 각 Channel은 위의 그림과 설명에서 말했듯이 개별 이벤트 루프 스레드에 등록됩니다. 그러므로 채널에서 발생한 이벤트는 항상 동일한 이벤트 루프 스레드에서 처리하여 이벤트 발생 순서와 처리 순서가 일치됩니다. 다중 스레드 이벤트 모델에서 이벤트의 실행 순서가 일치하지 않는 근본적인 이유는 이벤트 루프들이 이벤트 큐를 공유하기 때문에 발생하는데 Netty는 이벤트 큐를 이벤트 루프 스레드의 내부에 둠으로써 실행 순서 불일치의 원인을 제거한 것입니다.
주의할 점
위의 이벤트 루프 모델을 잘 살펴보면 Netty를 이용하여 개발 할 때 주의해야할 점이 한 가지 있습니다. 바로 이벤트 루프 스레드가 blocking되면 안되는 것인데요. 이벤트 루프 스레드가 blocking되어 버리면 해당 이벤트 루프에 등록된 Channel들에서 발생한 이벤트들이 제때 처리되지못하고 요청들이 밀려버리는 상황이 발생합니다. Netty는 이벤트들을 처리하기 위해 ChannelPipeline에 여러 ChannelHandler를 등록하고 이 ChannelHandler들을 chaining하여 이벤트들이 처리됩니다. 자세한 내용은 밑의 글을 읽어 보시기바랍니다.
2020/05/17 - [Framework/Netty] - Netty의 기본 Component 및 Architecture
여러 ChannelHandler중 특정 ChannelHandler는 이벤트를 처리할 때 외부 서비스들과 네트워크 통신이 일어날수도 있고 DB에 write하는 작업이 있을수도 있습니다. 비즈니스 로직을 구현하다 보면 흔하게 발생할 수 있는 상황인데 Netty에서는 이런 blocking작업을 어떻게 처리해야할까요? Netty는 이벤트 루프가 blocking되지 않게 blocking구간이 있는 ChannelHandler를 다음과 같이 별도의 EventExecutor에서 실행될 수 있도록 지원합니다.
/**
* Inserts a {@link ChannelHandler} after an existing handler of this
* pipeline.
*
* @param group the {@link EventExecutorGroup} which will be used to execute the {@link ChannelHandler}
* methods
* @param baseName the name of the existing handler
* @param name the name of the handler to insert after
* @param handler the handler to insert after
*
* @throws NoSuchElementException
* if there's no such entry with the specified {@code baseName}
* @throws IllegalArgumentException
* if there's an entry with the same name already in the pipeline
* @throws NullPointerException
* if the specified baseName or handler is {@code null}
*/
ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
위의 메서드로 등록된 ChannelHandler는 이벤트 루프의 스레드가 아닌 EventExecutorGroup의 스레드에서 실행됩니다. 다음 코드를 통해 정말 그렇게 동작하는지 확인해 봅시다.
@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
private final int order;
public EchoServerHandler(int order) {
this.order = order;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf) msg;
String toUpper = in.toString(CharsetUtil.UTF_8).toUpperCase();
System.out.println(">>> order: " + this.order + " thread name: " + Thread.currentThread().getName() + ", id: " + Thread.currentThread().getId());
if (this.order == 1) {
ctx.fireChannelRead(Unpooled.copiedBuffer(toUpper.getBytes()));
} else {
ctx.writeAndFlush(Unpooled.copiedBuffer(toUpper.getBytes()))
.addListener(ChannelFutureListener.CLOSE);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
super.channelReadComplete(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println(">>> Echo Server Error!!!!");
cause.printStackTrace();
ctx.close();
}
}
public class EchoServer {
private final int port;
public EchoServer(ServerConfig config) {
this.port = config.getPort();
}
public void start() throws Exception {
EchoServerHandler first = new EchoServerHandler(1);
EchoServerHandler second = new EchoServerHandler(2);
EventLoopGroup group = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(group)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(this.port))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addFirst("first", first);
channel.pipeline().addAfter("first", "second", second);
// channel.pipeline().addAfter(new DefaultEventExecutorGroup(1), "first", "second", second);
}
});
ChannelFuture future = bootstrap.bind().sync();
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
}
위의 코드는 EchoServerHandler의 order가 1이면 다음 channelHandler로 이벤트를 넘기고 그렇지 않으면 메시지를 channel에 writeAndFlush하는 로직입니다. 먼저 일반적으로 등록하듯이 ChannelHandler를 ChannelPipeline에 등록하고 각 ChannelHandler가 실행되는 스레드의 정보를 출력해봅니다. 결과는 다음과 같습니다.
두 개의 ChannelHandler가 같은 스레드에서 실행된것을 확인할 수 있습니다. 그럼 다음으로는 위의 코드에서 주석된 부분을 해제하고 channel.pipeline().addAfter("first", "second", second); 부분을 주석처리하여 결과를 확인해 보겠습니다.
이벤트 루프 스레드가 아닌 별도로 지정한 EventExecutorGroup의 스레드로 ChannelHandler가 실행된 것을 확인할 수 있습니다.
개인적으로 "어떤 분야든 꾸준한 사람이 살아남는다"라는 말을 좋아하는데요. 이 글을 보시는 분들이 그러한 분들이라 생각됩니다. 소중한 시간을 자신의 분야에서 좀 더 능숙한 사람이 되기위해 학습에 사용하시는 분들이니까요. 이상 이번 글을 마치도록 하겠습니다.
참고 서적: Netty In Action, 자바 네트워크 소녀 Netty