OverView

이번시간에는 카프카 command line tools의 사용법에 대한 몇가지 정리와 카프카를 조금 더 쉽게 이해하기 위한 포스팅을 작성해봤다. 예제는 docker-compose를 사용해서 카프카 클러스터를 구성하고 consumer-group, partitions, replicas 에 대해서 간단하게 알아보도록 하겠다.

시작하기전에

준비물이 몇가지 필요하다.

  • docker
  • docker-compose
  • kafka command line tools

예제는 docker 컨테이너로 카프카3대와 주키퍼 서버1대를 올릴 것이다. docker-compose를 사용한다.

그리고 kafka에서 제공하는 command line tools를 사용해야한다. 파일들은 아래를 참고해서 다운 받을 수 있다

apache kafka와 confluent kafka가 있는데 confluent kafka가 조금더 많은 기능을 내장하고 있다. 자세한내용은 https://www.quora.com/What-is-the-difference-between-Apache-Kafka-and-Confluent-Kafka 에서 찾아 볼 수 있다.

간략하게 confleunt kafka와 apache kafka는 동일한 소프트웨어이고 confluent kafka는 오픈소스인 kafka community 버전 이외에 commercial버전도 제공하고 있다.

참고로 confluent 라는 회사는 linkedin에서 최초에 kafka를 만든 사람들이 따로 나와서 설립한 회사이다.

나는 이 예제에선 apache kafka를 사용한다.

docker-compose로 클러스터 구성하기

docker-compose를 직접 작성하지 않아도 누군가 만들어 놓은 compose 파일들이 있다. 나는 https://github.com/simplesteph/kafka-stack-docker-compose에 있는 zk-single-kafka-multiple.yml 을 사용한다.

카프카 서버 3대와 주키퍼 서버 1대의 구성으로 클러스터를 구성한다.

zk-single-kafka-multiple.yml

version: '2.1'

services:
  zoo1:
    image: zookeeper:3.4.9
    hostname: zoo1
    ports:
      - "2181:2181"
    environment:
        ZOO_MY_ID: 1
        ZOO_PORT: 2181
        ZOO_SERVERS: server.1=zoo1:2888:3888
    volumes:
      - ./zk-single-kafka-multiple/zoo1/data:/data
      - ./zk-single-kafka-multiple/zoo1/datalog:/datalog

  kafka1:
    image: confluentinc/cp-kafka:5.5.0
    hostname: kafka1
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
    volumes:
      - ./zk-single-kafka-multiple/kafka1/data:/var/lib/kafka/data
    depends_on:
      - zoo1

  kafka2:
    image: confluentinc/cp-kafka:5.5.0
    hostname: kafka2
    ports:
      - "9093:9093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19093,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 2
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
    volumes:
      - ./zk-single-kafka-multiple/kafka2/data:/var/lib/kafka/data
    depends_on:
      - zoo1


  kafka3:
    image: confluentinc/cp-kafka:5.5.0
    hostname: kafka3
    ports:
      - "9094:9094"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka3:19094,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
      KAFKA_BROKER_ID: 3
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
    volumes:
      - ./zk-single-kafka-multiple/kafka3/data:/var/lib/kafka/data
    depends_on:
      - zoo1

아래 명령어로 컨테이너를 구성해보자

export DOCKER_HOST_IP=127.0.0.1
docker-compose -f zk-single-kafka-multiple.yml up

docker-compose로 컨테이너들이 잘 구성되었다면 docker ps -a로 컨테이너들이 잘 동작중인지 확인해보자.

kafka command line tools는 {kafkahome}/bin 디렉토리에 위치하고 있다.

컨테이너들을 확인하고 문제가 없다면 kafka home 디렉토리에서 다음 명령어를 입력해보자.

./bin/zookeeper-shell.sh localhost:2181 ls /brokers/ids

#결과
[1, 2, 3]

우리가 브로커 컨테이너를 생성할때 KAFKA_BROKER_ID를 각각 브로커 서버마다 1, 2, 3 으로 줬기 때문에 결과값도 [1, 2, 3] 으로 나온다.

현재까지의 구성을 그림으로 표현하자면 아래와 같다.

주석 2020-06-10 082816

카프카 토픽 사용하기

토픽 만들기

가장먼저 해볼일은 카프카에 토픽을 생성하는 것이다. 아래 명령어로 토픽을 생성해보자

./bin/kafka-topics.sh --create \
    --replication-factor 3 \
    --partitions 3 \
    --topic my-test-topic\
    --zookeeper  localhost:2181
  • --create:토픽을 생성하는 옵션이다.
  • --topic: 토픽의 이름을 지정하는 옵션이다.
  • --zookeeper: zookeeper server를 지정하는 옵션이다.
  • --replication-factor: replicas의 개수를 지정하는 옵션이다. 이 옵션은 클러스터 내부 broker의 개수보다 높으면 안된다. 우리는 초기에 3개의 브로커를 생성한 클러스터를 구성했으므로 3개가 맥시멈이다.
  • --partitions: partition의 개수를 지정하는 옵션이다.

토픽 확인하기

./bin/zookeeper-shell.sh localhost:2181 ls /brokers/topics

#결과
[__confluent.support.metrics, __consumer_offsets, my-test-topic]

__이 붙은 토픽들은 카프카가 사용하는 토픽이라고 생각하면 된다. 우리가 위에서 생성한 ‘my-test-topic’이 잘 생성된 것을 확인할 수 있다.

Replication-factor 이해하기

카프카는 메시지를 중계함과 동시에 서버가 고장 났을때 수신한 메시지를 잃지 않기 위해서 복제구조를 갖추고 있다. 파티션은 단일 또는 여러개의 레플리카로 구성되고 여러 레플리카 중 최소 한개는 Leader, 나머지는 Follower가 된다.

./bin/kafka-topics.sh --create \
    --replication-factor 3 \
    --partitions 3 \
    --topic my-test-topic\
    --zookeeper  localhost:2181

위에서 생성한 이 토픽은 아래의 그림처럼 생성이된다.

주석 2020-06-10 104014

각각 브로커별로 Leader들이 분류되어있는 모습을 확인할 수 있다.

명령어로도 직접 확인할 수 있다.

./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic my-test-topic

주석 2020-06-10 104002

ISR (In-Sync Replica)

사진 맨 끝에 보면 ISR이 있는데 ISR은 Leader 레플리카의 복제 상태를 유지하고 있는 레플리카로 Leader를 갖고 있는 서버가 고장났을때 후보가 될 수 있는 브로커를 나타낸다.

만약 도커 컨테이너에서 브로커 서버 2, 3을 내린다면 아래와 같은 그림이 된다.

주석 2020-06-10 104103

명령어로도 직접 확인해 보자

./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic my-test-topic

20200610_112155

브로커서버 1에있던 Follower 들이 partition 1과 2의 ISR이였으므로 브로커서버 1에 있는 파티션이 전부 Leader가 되었다.

만약 다시 브로커 서버 2와 3을 올린다고 하더라도 Leader는 변경되지 않는다.

카프카의 kafka-console-consumer, kafka-console-producer 사용해보기

카프카는 간편하게 테스트해볼 수 있게 producer와 consumer를 console 형태로 제공한다. 쉘을 두개 이상 준비해서 진행해야한다.

kafka-console-consumer

먼저 consumer console을 실행시켜보자

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic my-test-topic

--bootstrap-server 에 카프카 서버들의 주소를 입력하고 –topic 옵션으로 우리가 방금 생성한 토픽의 이름을 넣어준다.

kafka-console-producer

아래의 명령어로 producer를 실행시켜 보자.

./bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --topic my-test-topic

마찬가지로 --broker-list –topic 옵션을 알맞게 넣어준다.

producer가 켜진 쉘에서 텍스트를 입력하면 consumer쪽 쉘에 producer가 보낸 메시지들이 출력되는 것을 확인할 수 있다.

카프카의 consumer group과 partitions이해하기

카프카는 consumer group과 partitions라는 개념을 통해서 분산처리 모델을 구현했다. 간단하게 설명하면 같은 consumer group을 가진 컨슈머 그룹이 같은 토픽을 구독하고있다면 메시지를 하나의 컨슈머에게만 소비하게 한다. 먼저 컨슈머 그룹 없이 my-test-topic을 구독하고 있을때를 보자.

Consumer Group이 없을때

왼쪽은 producer이고 오른쪽의 쉘3개는 consumer들이다. 현재는 consumer-group이 지정되지 않았기때문에 producer쪽에서 메시지를 입력했을때 아래와 같이 구독하고 있는 모든 컨슈머에 메시지가 출력되는 것을 확인할 수 있다.

2020-06-10-10-53-50

Consumer Group이 있을때

컨슈머들을 모두 종료하고 아래 명령어로 컨슈머 그룹을 my-group으로 할당시켜보자.

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic my-test-topic --consumer-property group.id=my-group

--consumer-property group.id=my-group 을 통해서 my-group이라는 컨슈머그룹을 할당했다. 이제 producer 쪽에서 메시지를 여러개 보내보면 메시지가 Consumer Group이 없을때와는 다르게 하나의 컨슈머에게만 메시지를 전달하는 것을 확인할 수 있다.

2020-06-10-10-55-56

이 partition와 consumer의 관계는 다음과같다.

  • partition은 무조건 한개의 consumer를 갖는다.
  • consumer는 0개 이상의 partition을 갖는다.

아래 그림을 보면 조금 더 이해가 쉬울 것이다.

그림1

kafka-consumer-groups.sh 사용하기

마지막으로 kafka-consumer-groups.sh 을 사용해서 현재 파티션에 대한 상세정보들을 확인할 수 있다. 아래명령어를 입력해보자.

./bin/kafka-consumer-groups.sh --describe --group my-group --bootstrap-server localhost:9092,localhost:9093,localhost:9094

현재는 모든 파티션에 컨슈머들이 연결되어있기때문에 아래와 같은 그림이 나온다.

20200610_113852

이전에 실행했던 consumer 쉘을 두개 종료해서 한개만 실행시킨다면 아래와 같은 그림이 된다.

20200610_113951

이전과 비교했을때 consumer-id가 하나로 통일된 것을 확인할 수 있다.

마지막으로 모든 consumer 쉘을 모두 종료시킨 뒤 producer에 어느정도의 메시지를 보내고 한번 더 확인하면 아래와 같이 LAG 목록에 축적된 메시지의 개수가 나온다.

20200610_114122

카프카는 메시지를 디스크에 영속시키는 특징이 있다. 따라서 현재 컨슈머가 없더라도 offset 이라는 정보를 통해서 이 메시지를 소비했는지 아닌지를 판별한 뒤에 소비하지 않은 메시지라면 컨슈머를 통해서 메시지를 소비하게 한다.

다시 console-consumer쪽을 올리면 소비하지 못했던 메시지들이 console-consumer쪽으로 출력될 것이다.

마무리

카프카를 공부하면서 사용했던 스크립트들을 간략하게 정리해볼겸 간단하게 정리해봤다. 이 글도 누군가에게 도움이 됐으면 좋겠다.


포스팅은 여기까지 하겠습니다. 퍼가실때는 출처를 반드시 남겨주세요!