코딩하는 오징어

Anatomy Kafka Consumer#1 본문

Message Queue/Kafka

Anatomy Kafka Consumer#1

코딩하는 오징어 2021. 2. 23. 01:26
반응형

Kafka는 Message Queue 시스템으로 현재 수 많은 곳에서 데이터를 처리하기 위해 사용되고 있다. kafka는 간단하게 다음과 같은 Architecture로 구성되어있다.

 

kafka를 잘 활용하려면 topic의 구성 요소인 paritition들에 메시지가 어떻게 저장되는지, partition들의 replica가 broker들 사이에서 어떻게 관리되는지, 가용성과 메시지의 신뢰성있는 전달을 위해 producer application에서 다뤄야하는 option값들은 어떤 것들이 있는지 등 살펴보아야 할 것 들이 많다. 이러한 내용들은 차차 다뤄 보기로 하고, 이 글에서는 kafka client application중 하나인 consumer가 어떻게 동작하는지 자세히 살펴 볼 것이다. 이 글은 kafka에서 사용되는 기본적인 용어들을 알고 있다는 전제하에 작성하였다. 먼저, 일반적인 Message Queue(이하 MQ) 시스템에서 consumer는 producer가 발행한 메시지를 소비하는 application이다. kafka에서의 consumer도 같은 역할을 하지만, 타 MQ시스템과 달리 kafka의 consumer는 메시지를 push 방식이 아닌 polling 방식으로 소비한다. 이와 같은 메커니즘 덕분에 topic에 쌓이는 메시지가 필요한 곳, 어디든 consumer를 개발하여 타 시스템과 독립적으로 메시지를 소비하여 비즈니스 로직을 처리할 수 있다. 이렇게 동작 될 수 있도록 필요한 값이 바로 group.id이다. kafka는 consumer group단위로 partition의 메시지 위치인 offset을 관리한다. offset을 관리한다는 말은 consumer가 메시지를 어디까지 소비하였는지 관리한다는 뜻이다. consumer group에 join되는 consumer의 수에는 제한이 없지만 하나의 partition은 하나의 consumer만이 메시지를 소비할 수 있다. 즉, 두 개 이상의 consumer가 하나의 partition에 있는 메시지를 소비하지 못한다. (하나의 consumer가 하나의 partition만 할당 받는 것은 아니다. 하나의 consumer는 여러개의 partition을 할당 받아 메시지를 소비할 수 있다.) 그럼 다음 순서로 kafka의 consumer가 어떻게 동작하는지 살펴보자.

 

  • partition과 consumer
  • 메시지 순서
  • consumer 동작에 영향을 주는 parameter
  • polling loop

partition과 consumer

case 1
case 2
case 3

위의 그림을 살펴보자. case 1은 하나의 topic에 4개의 partition이 존재한다. 그리고 두 개의 consumer가 join된 그룹에서 해당 topic을 구독하고 있다. partition들을 모두 소비해야하므로 하나의 consumer가 partition을 두개 씩 할당 받아 메시지를 소비하게된다. 이 때 특정 consumer가 특정 partition만을 소비할 수 있도록 할 수 있지만 일반적인 rebalancing 알고리즘은 round robin으로 균형을 맞춘다. (rebalancing이 발생할 경우 모든 consumer가 메시지 소비를 중단하므로 rebalancing이 자주 일어나게 해서는 안된다.)case 1의 상황에서 메시지 소비 속도가 메시지 발행 속도보다 뒤쳐진다면 case 2처럼 consumer를 scale out 하여 처리량을 높일 수 있다. consumer를 무한정 scale out한다고해서 처리량이 높아지는 것은 아니다. case 3 처럼 consumer를 늘려도 partition은 하나의 consumer에서만 소비되므로 4개의 consumer는 대기 상태로 있게된다. 4개의 consumer도 활성화 시키려면 partition수를 8개로 늘려주면 된다. 하지만 partition수는 늘리기만 가능하고, 줄일 수 없기 때문에 partition 수는 신중하게 조절해야한다. 

메시지 순서

partition

하나의 topic에 세 개의 partition이 존재한다고 가정해보자. 메시지를 발행할 때 key값을 별도로 주지 않는다면 메시지는 round robin 알고리즘으로 partition에 분배된다. consumer는 partition에서 메시지를 읽을 때 commit되지 않은 offset부터 순서대로 읽는다. 즉 메시지가 처리될 때 각 partition에 존재하는 메시지들에 대한 읽는 순서가 보장된다. 예를 들어 메시지 0과 1은 읽는 순서가 보장되지 않지만(partition이 다르므로) 메시지 0과 3은 메시지 읽는 순서가 보장되어 처리된다. 여기서 읽는 순서라고 강조한 것은 처리 순서를 보장하지는 않기 때문이다. 처리 순서는 전적으로 consumer application에 달려있다. partition의 메시지들을 읽어 병렬로 처리하게 된다면 읽는 순서는 보장될지라도 처리 순서는 바뀔 수 있기 때문이다.

consumer 동작에 영향을 주는 parameter

  • auto.offset.reset: commit된 offset이 없는 partition을 consumer가 읽기 시작 할 때, 또는 commit된 offset이 있지만 유효하지 않을 때, consumer가 어떤 record(메시지)를 읽게 할 것인지 제어하는 paramter이다. 필자는 메시지 누락(유실)을 처리하는 것 보다 메시지 중복을 처리하는 게 더 수월하다고 판단하여 earliest 옵션을 주어 개발하였다. 메시지 중복처리는 모든 메시지에 대해 idempotent하게 처리하거나 처리된 메시지는 다시 processing되지 않게 방어 로직을 추가하는 방법등이 있다.
    • latest: default 값으로 가장 최근의 record들을 읽기 시작한다.
    • earliest: 해당 partition의 맨 앞 부터 모든 record를 읽기 시작한다.
  • enable.auto.commit: default값은 true이며, consumer의 offset commit을 자동으로 할 것 인지 결정한다. 이 값이 true일 때는 auto.commit.interval.ms를 설정하여 자동으로 offset을 commit하는 시간 간격을 제어 할 수 있다. 해당 동작은 consumer에서 poll()을 호출했을 때 이전 commit을 한 후 auto.commit.interval.ms만큼 시간이 흘렀는지 체크한 후, 항상 이전 호출에서 반환된 마지막 offset을 commit한다. 단, poll()이 호출 되지 않는다면 auto.commit.interval.ms가 지나도 commit되지 않는다.
  • session.timeout.ms: default 값은 10초이며 consumer가 heartbeat를 전송하지 않고 살아있을 수 있는 시간이다. 만일, consumer가 GroupCoordinator에게 heartbeat를 전송하지 않으면서 session.timeout.ms로 설정된 시간이 경과되면, 해당 consumer는 종료된 것으로 간주되고 GroupCoordinator는 해당 consumer가 담당하고 있는 partition들을 재분배 하기위해 rebalancing을 일으킨다. consumer의 poll() 메서드에서 GroupCoordinator에게 heartbeat를 전송하는 시간 간격을 제어하는 것이 heartbeat.interval.ms이다. 그러므로 두 paramter를 같이 고려하여 설정해야한다. 즉, heartbeat.interval.mssession.timeout.ms의 값보다 작아야한다. 대개 heartbeat.interval.mssession.timeout.ms값의 1/3로 설정한다.
  • max.poll.interval.ms: consumer가 heartbeat를 정상적으로 보내 broker에게 health check를 하더라도 실제로 poll() 메서드를 호출하지 않는다면 해당 partition을 점유하면서 메시지는 처리하지 않는 상황이 발생한다. 이를 막기위해 max.poll.interval.ms를 설정한다. 해당 시간이 지나도록 poll()메서드를 호출하지 않는다면 consumer가 메시지를 처리할 수 없는 상황이라고 판단하여 consumer group에서 제외 후 rebalancing이 발생한다. 하나의 메시지가 처리되는 시간을 고려하여 해당 값을 설정하여야한다. 메시지 처리가 오래 걸리는 비즈니스 로직이라면 해당 값을 크게 잡아주어야 불필요한 rebalancing이 발생하지 않기 때문이다.
  • max.poll.records: consumer가 poll호출을 할 때마다 가져오는 record의 최대 갯수이다. partition에 메시지가 충분하다면 해당 parameter 설정 값 만큼 메시지를 읽어오지만 메시지 발행 속도가 느려 메시지가 부족하다면 해당 값 보다 적을 수 있다.
  • fetch.min.bytes: record들을 가져올 때 broker로부터 받기 원하는 데이터의 최소량(byte)을 설정한다. 만일 broker가 consumer로부터 record 요청을 받았지만 읽을 record들의 양이 fetch.min.bytes에 지정된 것보다 작다면, broker는 더 많은 메시지가 모일 때까지 기다렸다가 consumer에게 전송한다.
  • fetch.max.wait.ms: 기본적으로 kafka는 500ms동안 fetch.min.bytes 만큼 record들의 양이 찰때까지 기다린다. 500ms가 지났는데도 데이터가 모이지 않는다면 더 이상 기다리지 않고 consumer에게 메시지를 보낸다. fetch.min.bytes를 1MB, fetch.max.wait.ms를 1000ms로 설정하였다면 kafka는 consumer의 데이터 읽기 요청을 받은 후 데이터 양이 1MB가 되거나 1000ms가 지난 후에 데이터를 전송할 것이다. producer의 linger.ms paramter와 비슷하다.

이외에도 key.deserializer, value.deserializer, bootstrap.servers등의 기본 paramter들이 있지만 해당 paramter들은 어렵지 않으므로 kafka document를 확인해보자.

Polling Loop

polling loop는 consumer의 poll() 메서드를 호출하는 loop를 말한다. 해당 loop에서 poll() 메서드 호출을 통해 받아온 record들을 처리한다. 해당 부분은 예제 코드를 보는 것이 몇 번의 설명보다 낫다.

public static void main(String... args) throws JsonProcessingException, InterruptedException {
    ObjectMapper objectMapper = new JacksonConfig().objectMapper();
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps());
    List<String> topics = new ArrayList<>();
    topics.add("play.kafka");
    
    consumer.subscribe(topics);

    // 해당 loop를 polling loop라고 한다. 이 loop는 메시지를 읽어서 로그를 찍은 후 commit하는 loop이다.
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(3)); //3초동안 읽을 데이터를 계속 fetch하다가 없으면 empty collection을 return

        for (ConsumerRecord<String, String> record : records) {
            Map<String, String> value = objectMapper.readValue(record.value(), new TypeReference<>() {});
            log.info("message: {}", value);
        }

        consumer.commitSync();
    }

    consumer.close();
}

private static Map<String, Object> consumerProps() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "vanilla.consumer");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 3);
    props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1000);

    return props;
}

위의 설명 이외에도 kafka의 consumer에 대해 알아두어야할 중요한 부분들이 많다. 글이 너무 길면 지루해질 수 있기 때문에 kafka consumer에대한 기본적인 내용은 여기서 마치겠다. consumer에서 offset을 manual하게 commit하는 방법과 commit전략, consumer에서 multi thread를 다루는 방법, 불필요한 rebalancing이 발생하지 않도록 하기위한 전략, consumer가 broker에 전달하는 읽기 요청, 실제 예제 코드 등 다양한 이야기는 2편에서 더 자세히 다루도록 하겠다.

2021/02/28 - [Message Queue/Kafka] - Anatomy Kafka Consumer#2

 

Anatomy Kafka Consumer#2

2021/02/23 - [Message Queue/Kafka] - Anatomy Kafka Consumer#1 지난 글에 이어 kafka의 consumer를 개발할 때 필요한 지식 및 전략들을 알아보자. 지난 글 1편을 읽어보고 이글을 읽는 것이 학습에 더 큰 도움..

effectivesquid.tistory.com

참고 서적:

  • Apache Kafka 핵심 가이드
  • 카프카, 데이터 플랫폼의 최강자
반응형

'Message Queue > Kafka' 카테고리의 다른 글

Anatomy Kafka Consumer#2  (0) 2021.02.28
Comments