Anatomy Kafka Consumer#2
2021/02/23 - [Message Queue/Kafka] - Anatomy Kafka Consumer#1
지난 글에 이어 kafka의 consumer를 개발할 때 필요한 지식 및 전략들을 알아보자. 지난 글 1편을 읽어보고 이글을 읽는 것이 학습에 더 큰 도움이 될 것이다. 이 글은 다음과 같은 순서로 구성되어 있다.
- kafka consumer가 broker에게 보내는 읽기 요청
- offset commit 전략
- rebalancing이 발생 했을 때의 handling
kafka consumer가 broker에게 보내는 읽기 요청
kafka broker는 client(producer, consumer)와 partition replica 및 controller로부터 partition leader에게 전송되는 요청을 처리하는 일을 한다. kafka는 TCP로 전송되는 이진 프로토콜(binary protocol)을 갖고 있다. 이 프로토콜은 요청의 형식을 규정하고 있으며, 또한 요청이 성공적으로 처리되거나 처리 중 에러가 발생했을 때 broker가 응답하나는 방법도 나타낸다. producer나 consumer application을 개발할 때는 kafka가 규정하고 있는 binary protocol을 직접 작성하지 않는다. client library가 low level(protocol 변환등)을 처리해주기 때문이다. 우리는 client library를 이용하여 객체를 메시지화해서 보내고, 신뢰성있는 데이터 전달을 위한 알고리즘을 작성하면 된다. 하지만 kafka consumer가 broker에게 실제로 어떤 요청을 하고 어떤 데이터를 응답 받는지 알아두면 kafka consumer의 동작을 이해하는데 큰 도움이 된다.
kafka broker는 특정 client로부터 받은 요청을 항상 수신된 순서로 처리한다. kafka가 메시지 큐처럼 동작하게 되어 저장되는 메시지의 순서가 보장된다. 모든 요청은 다음 내용을 포함하는 표준 헤더를 갖는다.
- 요청 타입 ID: 어떤 요청인지를 나타내며 16비트 정수 형식의 고유 번호다. 예를 들어, 카프카에 메시지를 쓰는 요청은 Produce라고 하며 이것의 ID값은 0이고, 메시지를 읽는 요청은 Fetch라고 하며 이것의 ID값은 1이다.
- 요청 버전: 이 요청의 protocol API 버전을 나타내는 16비트 정숫값이다. 따라서 서로 다른 protocol 버전을 사용하는 kafka client를 broker가 처리하고 그에 맞춰 응답할 수 있다.
- cID(coreelation ID): 사용자가 지정한 32비트 정숫값이며, 이것을 각 요청의 고유 식별 번호로 사용하면 문제를 해결하는 데 도움이 될 수 있다. 이 값은 응답과 에러 로그에도 표시된다.
- client ID: 사용자가 지정한 문자열 형식의 값이며 null이 될 수 있다. 이것은 요청을 전송한 client application을 식별하는 데 사용될 수 있다.
표준 헤더의 요청 타입에 따라 서로 다른 구조의 payload를 전송한다. 예를들어, kafka에 메시지를 쓰는 Produce요청의 경우 topic 이름과 partition ID 및 데이터등이 포함된다. 자세한 내용은 kafka.apache.org/protocol.html를 참고하자.
kafka consumer client는 읽기를 원하는 메시지의 topic과 partition 및 offset을 읽기 요청을 통해 broker에게 전달한다. 이에 더해 client는 각 partition마다 broker가 반환할 수 있는 데이터 크기를 제한할 수 있다. client는 broker가 전송한 응답을 저장하는 메모리를 할당해야 하므로 데이터 크기 제한이 중요하다. 크기를 제한하지 않으면 client의 메모리 부족을 초래할 만큼 큰 응답을 broker가 전송할 수 있다.
kafka의 읽기 및 쓰기 요청은 모두 partition의 leader가 처리하므로 partition leader에게 요청들이 전송되어야한다. 따라서 읽기 요청이 올바르게 전달되도록 client는 필수적인 meta data를 요청한다. (partition leader가 down되어 leader를 선출하는 중이라면 kafka leader not available이라는 응답을 받을 수 있다. 이때는 재시도를 하면 대부분 문제가 해결된다.) 주의 할점은 kafka consumer가 partition leader에 존재하는 모든 메시지를 읽을 수 있는 것은 아니다라는 점이다. 대부분의 client는 모든 동기화 replica(ISR: in-sync replica)에 쓴 메시지들만 읽을 수 있다.(단, follower replica들 역시 consumer지만 예외다. 그렇지 않으면 복제를 할 수 없기 때문이다.) partition leader는 어떤 메시지들이 어느 replica에 복제되었는지 안다. 그리고 모든 동기화 replica들이 메시지를 쓸 때까지는 consumer에게 전송하지 않는다. kafka는 이러한 메커니즘을 이용하여 신뢰성있는 메시지 전달을 제공한다.
Offset Commit 전략
kafka consumer를 개발할 때 offset commit을 위하여 다양한 전략을 취할 수 있다. 처리한 메시지의 offset에대한 commit을 제대로 수행하지 않는다면 이후 consumer rebalancing이 발생했을 때 메시지를 중복 처리하거나 메시지를 누락 시킬 가능성이 있기 때문이다. offset은 kafka 0.9 이전 버전 까지는 zookeeper에 저장되었다가 0.9버전 부터는 kafka에 저장되는 형태로 바뀌었다. 하지만 kafka broker의 버전이 0.9보다 높다고 하더라도 kafka client API의 버전에 따라 offset이 zookeeper에 저장될 수도 있다. old 버전의 kafka client도 new kafka broker와 통신이 가능하고 new 버전의 통신 프로토콜을 구현하지 않았다면 zookeeper에 저장될 것이다. commit 전략은 다음과 같이 구성할 수 있다.
- Auto Commit
- 동기적인 Manual Commit
- 비동기적인 Manual Commit
Auto Commit
kafka consumer에서 offset을 commit하는 데 가장 쉬운 방법은 enable.auto.commit을 활성화하여 Auto Commit을 이용하는 것이다. Auto Commit의 간격은 auto.commit.interval.ms를 통해 설정이 가능하며 default 값은 5초이다. Auto Commit을 사용할 때는 몇 가지 주의 할 점이 있다. 첫 번째로 consumer에서 메시지를 commit을 하기 위해서는 poll() 메서드를 호출해야한다. 즉, Auto Commit은 consumer에서 poll() 메서드를 호출 했을 때 auto.commit.interval.ms에 설정된 값을 기준으로 현재 commit을 할 시간이 되었는지 확인하여 commit할 시간이 되었다면 offset을 commit한다. 어떠한 이유로 poll() 메서드를 호출 한 이후 polling loop에서 코드가 block되어 다음 poll()메서드를 호출하지 못한다면 auto.commit.interval.ms에 설정된 시간이 지나도 offset이 commit되지 않는다. 두 번째로 어떠한 이유로 consumer가 rebalancing되었을 때 auto.commit.interval.ms에 설정된 값에 따라 메시지를 중복 처리하게 될 수 있다. 예를 들어, Auto Commit이 5초마다 수행되도록 설정하고, 가장 최근의 마지막 commit을 한 후에 3초 동안 추가로 메시지들을 읽고 처리하다가 rebalancing이 시작되었다고 가정해보자. rebalancing이 끝난 후 모든 consumer들은 마지막으로 commit된 offset부터 메시지들을 읽기를 시작할 것이다. 이 경우 그 offset은 3초 이전의 것이므로, 3초 동안에 읽었던 모든 메시지는 중복으로 처리될 것이다. Auto Commit이 자주 실행되도록 auto.commit.interval.ms 값을 작게 조정할 수 있지만 완벽하게 해결할 수 있지는 않다.
위의 그림을 보면 0, 3, 6, 9 메시지들이 중복으로 처리되게 되는 것이다.
동기적인 Manual Commit
enable.auto.commit=false로 설정하면 application에서 요구할 때만 offset이 commit된다. 이때 가장 간단하면서도 신뢰도가 높은 것이 commitSync() 메서드를 호출하는 것이다. 이 메서드는 poll() 메서드에서 반환된 마지막 offset을 commit한다. 그리고 offset이 성공적으로 commit되면 실행이 끝지만, 어떠한 이유로 commit에 실패하면 예외를 발생시킨다. commitSync() 메서드는 poll()에서 반환된 가장 최근의 offset을 commit한다는 것에 유의해야한다. 따라서 poll()에서 반환된 모든 메시지들이 처리가 다 된 후에 commitSync() 메서드를 호출해야한다. 그렇지 않으면 메시지를 누락시킬 가능성이 생긴다. commitSync() 예제 코드는 다음과 같다.
while (flag.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(3)); //3초동안 읽을 데이터를 계속 fetch하다가 없으면 empty collection을 return
for (ConsumerRecord<String, String> record : records) {
Map<String, String> value = objectMapper.readValue(record.value(), new TypeReference<>() {});
log.info("message: {}", value);
Optional.ofNullable(value.get("stop"))
.ifPresent(isStop -> {
if (Boolean.parseBoolean(isStop)) {
flag.set(false);
}
});
}
consumer.commitSync(); // poll() 메서드에서 반환된 메시지들 중 마지막 메시지의 offset을 commit한다.
}
비동기적인 Manual Commit
broker가 commit 요청에 응답할 때까지 application이 block된다는 것이 동기적인 Manual Commit의 단점이다. 이로 인해 consumer의 처리량이 감소하게 되고 producer에서 메시지 발행이 빨라 진다면 Lag가 발생하게 된다. 이를 해결하기 위해 kafka consumer client api는 commitAsync() 메서드를 통해 비동기적인 Manual Commit을 제공한다.
while (flag.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(3)); //3초동안 읽을 데이터를 계속 fetch하다가 없으면 empty collection을 return
for (ConsumerRecord<String, String> record : records) {
Map<String, String> value = objectMapper.readValue(record.value(), new TypeReference<>() {});
log.info("message: {}", value);
Optional.ofNullable(value.get("stop"))
.ifPresent(isStop -> {
if (Boolean.parseBoolean(isStop)) {
flag.set(false);
}
});
}
consumer.commitAsync(); // 마지막 offset을 비동기적으로 commit하고 다음 코드를 진행시킨다.
}
commit이 성공하거나 재시도 불가능한 에러가 발생할 때까지 commitSync() 메서드는 commit을 재시도하지만, commitAsync() 메서드는 재시도하지 않는다는 것이 단점이다. commitAsync() 메서드에서 broker의 응답을 받는 사이에 이후의 다른 commit이 먼저 성공할 수 있기 때문이다. 예를 들어, offset 2000을 commit하는 요청을 전송했는데 일시적인 통신 문제가 생겨 broker가 그 요청을 받지 못해 응답 할 수 없다고 가정해보자. 그리고 그 동안에 또 다른 배치를 처리하고 offset 3000을 성공적으로 commit하였다. 이때 만일 commitAsync() 메서드에서 이전에 실패했던 commit을 재시도한다면 offset 2000의 commit이 성공할 수 있을 것이다. 그러나 이것은 offset 3000이 이미 처리되고 commit된 이후다. 따라서 rebalancing이 발생된다면 더 많은 메시지의 중복 처리가 생길 것이다. 따라서 offset commit의 순서를 지키는 것은 중요하다. offset commit의 순서가 바뀌는 경우는 다음과 같다.
- commitAsync() 메서드를 호출 할 때 callback 메서드를 정의하여 전달하며 callback 메서드에서 commit이 실패하였을 때 재시도를 수행할 경우
- 처리량을 높이기위해 multi-thread를 통해 메시지를 처리하고 offset을 commit 할 경우
multi-thread를 통해 메시지를 처리하는 전략에 대해서도 긴 글이 될 수 있으므로 이 부분은 다른 글을 통해서 소개해보도록 하고 이 글에서는 callback 메서드에서 재시도를 할 경우에 대해서만 다루어 보겠다.
while (flag.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(3)); //3초동안 읽을 데이터를 계속 fetch하다가 없으면 empty collection을 return
for (ConsumerRecord<String, String> record : records) {
Map<String, String> value = objectMapper.readValue(record.value(), new TypeReference<>() {});
log.info("message: {}", value);
Optional.ofNullable(value.get("stop"))
.ifPresent(isStop -> {
if (Boolean.parseBoolean(isStop)) {
flag.set(false);
}
});
}
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
log.error("commit fail!! msg: {}", exception.getMessage(), exception);
// 여기서 재시도 로직을 넣게되면 문제가 발생할 수 있다.
}
});
}
위의 코드와 같이 commitAsync() 메서드에는 callback 메서드를 전달 할 수 있다. 이 callback 메서드에 commit실패시 재시도 로직을 넣는다면 위에서 말한 문제가 발생할 수 있다. (callback 메서드는 broker에서 응답을 받았을 때 실행된다.) callback 메서드에서 재시도 로직을 넣어야한다면 다음과 같은 방법으로 offset commit요청 순서를 지킬 수 있다.
- 순차적으로 증가하는 일련번호를 정의한다. (ex. AtomicInteger)
- commit할 때마다 일련번호를 증가시키고 그것을 callback 메서드에 전달한다.
- 재시도 commit을 전송하기 전 callback이 갖고 있던 일련번호와 외부의 일련번호를 비교한다.
- 두 일련번호가 같다면 더 새로운 commit이 없었으므로 재시도를 해도 안전하다. 그러나 외부의 일련번호가 더 크다면 재시도를 하지말아야한다.
위와 같이 CAS연산과 비슷한 방식을 이용하면 commitAsync() 메서드를 이용할 때도 재시도 전략을 가져갈 수 있다.
Rebalancing이 발생 했을 때의 handling
Rebalancing이란 지난 글에서 소개했듯이 consumer들의 topic의 partition 소유권을 재정비하는 메커니즘이다. rebalancing이 발생하는 이유는 지난 글에서 다루었으므로 이 글에서는 rebalancing이 발생했을 때 어떻게 handling해야할지와 그 방법을 소개해보려고한다.
consumer는 종료되기 전이나 partition rebalancing이 시작되기 전에 사용하던 자원들을 정리하고 마지막으로 처리한 메시지의 offset을 commit해야한다. kafka consumer API에는 consumer에 partition이 추가할당 되거나 제거될 때 코드가 실행 될 수 있도록 ConsumerRebalanceListener 인터페이스를 제공한다. subscribe() 메서드를 호출 할 때 ConsumerRebalanceListener를 구현한 객체를 인자로 전달하면 partition이 추가로 할당되거나 제거될 때 우리가 원하는 동작을 실행시킬 수 있다.
ConsumerRebalanceListener에는 다음과 같은 메서드들이 선언되어 있다.
- public void onPartitionRevoked(Collection<TopicPartition> partitions): rebalancing이 시작되기 전에, 그리고 consumer가 메시지 소비를 중단한 후 호출된다. 처리된 마지막 메시지의 offset을 commit하고 사용하던 자원을 정리해야 하는 곳이 바로 이 메서드이다.
- public void onPartitionAssigned(Collection<TopicPartition> partitions): partition이 broker에게 재할당된 후, 그리고 consumer가 partition을 새로 할당받아 메시지 소비를 시작하기 전에 호출된다.
ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
log.info("onPartitionsRevoked: {}", partitions);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
log.info("onPartitionsAssigned: {}", partitions);
}
};
consumer.subscribe(topics, rebalanceListener);
여기 까지 내용을 살펴봤다면 kafka의 consumer를 개발할 때 고민해야할 부분들을 어느정도 알고 있다고 해도 무방할 것이다. 물론 topic의 partition들의 복제를 kafka broker에서 어떻게 다루는지 partition leader선출과 leader가 down되었을 때 어떤 메커니즘으로 복구되는지도 알아야한다. 또한 consumer의 처리량을 높이기 위해 메시지를 병렬로 처리할 때 주의 할 점에 대해서도 학습 할 필요가 있다. 해당 내용들은 다른 글을 통해 소개해보도록 하고 kafka의 consumer를 개발할 때 기본적으로 고려해야할 부분들은 여기서 마치도록 하겠다.
참고 자료: