DevOps

Apache Kafka

kyoulho 2023. 12. 30. 19:11

Kafka


카프카는 분산 스트리밍 플랫폼으로, 대량의 데이터를 높은 처리량으로 실시간으로 처리할 수 있도록 설계된 오픈 소스 메시지 브로커이다. Jay Kreps, Neha Narkhede, 그리고 Jun Rao가 LinkedIn에서 개발한 후에 Apache 소프트웨어 재단에 기증되어 오픈 소스 프로젝트로 진화했다.

 카프카는 대용량의 데이터 스트림을 효율적으로 수집, 저장, 처리 및 전달하는 데 사용되며 이벤트 주도 아키텍처 구축에 적합하다. 주로 로그 및 이벤트 스트리밍, 데이터 파이프라인, 실시간 데이터 분석 등의 분야에서 활용된다.

  • 분산 아키텍처 및 확장성: Kafka는 분산 아키텍처로 설계되어 있어 수평 확장이 용이하며, 새로운 브로커를 추가하여 클러스터를 쉽게 확장할 수 있다.
  • 내구성 및 복제: 메시지는 여러 브로커에 복제되어 내구성을 보장하며, 디스크에 저장되어 장애 시에도 데이터 손실을 최소화한다.
  • 높은 처리량과 낮은 지연: Kafka는 대량의 메시지를 효율적으로 처리하며, 낮은 지연과 높은 처리량을 제공하여 실시간 데이터 처리를 지원한다.
  • 다양한 언어 및 클라이언트 지원: Java를 비롯한 다양한 언어로 작성된 클라이언트를 제공하며, 다양한 플랫폼에서 Kafka를 통합할 수 있다.
  • 유연한 메시지 모델: 메시지는 토픽으로 구조화되고, 토픽은 여러 파티션으로 나뉘어 병렬 처리와 확장성을 지원한다.
  • 스케일 아웃 및 관리: 브로커를 추가하거나 제거하여 클러스터를 확장하고, 토픽의 파티션 수를 동적으로 조절할 수 있다.
  • 풍부한 Eco-system: Apache Kafka 프로젝트 외에도 다양한 확장과 도구들이 존재하며, 스트리밍 처리를 위한 여러 플랫폼과의 통합이 가능하다.
  • 데이터 보존 및 리플레이: 데이터는 일정 기간 동안 보존되며, 필요한 경우에는 저장된 데이터를 리플레이할 수 있다.
  • 커뮤니티 지원 및 업데이트: 활발한 커뮤니티와 지속적인 업데이트로 신규 기능과 개선 사항이 빠르게 반영된다.

 

Kafka Broker

 실행된 카프카 애플리케이션 서버를 카프카 브로커라고 한다. 일반적으로 3대 이상의 브로커를 가진 클러스터를 구성하길 권장한다. 브로커끼리는 서로의 데이터를 공유하며 장애 발생 시 다른 브로커를 사용함으로써 안정성을 높인다.

 Apache Zookeeper는 애플리케이션 서버의 상태, 서버의 리더, 장애 체크, 장애 복구를 관리해주기 위해 코디네이터로 사용된다. 주키퍼는 카프카 클러스터의 구성 정보와 메타데이터(브로커의 아이디, 컨트롤러 정보, 파티션 정보)를 저장한다. 주키퍼를 통해 저장된 정보를 브로커들이 공유하고 클러스터 내의 상태를 동기화한다.

 클러스터 내에서 1대의 브로커는 컨트롤러 역할을 수행해야 한다. 이 브로커는 클러스터의 상태를 관리하고 파티션을 각 브로커에 할당한다. 컨트롤러는 주키퍼를 통해 메타데이터 정보를 관리하며, 클러스터 내에서의 브로커 교체 시에도 역할을 수행하여 안정적인 운영을 지원한다.

 

 

Kafka 실행

# 주키퍼 기동
$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties

# 카프카 기동
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties

# 토픽 생성
$KAFKA_HOME/bin/kafka-topics.sh --create --topic quickstart-events \
--bootstrap-server localhost:9092 --paritions 1

# 토픽 목록 확인
$KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

# 토픽 상세 확인
$KAFKA_HOME/bin/kafka-topics.sh --describe --topic quickstart-events \
--bootstrap-server localhost:9092

# 메시지 생산
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 \
--topic quickstart-events

# 메시지 소비
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic quickstart-events --from-beginning
부트스트랩 서버(Bootstrap Server)는 Kafka 클라이언트가 Kafka 클러스터와 통신하기 위해 사용하는 초기 연결 지점이다. Kafka 클라이언트는 부트스트랩 서버를 통해 Kafka 브로커와의 연결을 설정하고 메타데이터를 가져온다. 이후에는 클라이언트가 Kafka 클러스터의 메타데이터를 주기적으로 업데이트하여 최신 정보를 유지한다. 메타데이터에는 Kafka 클러스터의 토픽 및 파티션 정보 등이 포함되어 있다. 부트스트랩 서버의 주소는 클라이언트 설정 파일(예: consumer.properties, producer.properties)이나 Kafka 명령어 실행 시에 --bootstrap-server 옵션을 통해 지정된다.
 예를 들어, --bootstrap-server localhost:9092는 로컬에서 9092 포트에 실행 중인 Kafka 브로커를 부트스트랩 서버로 사용하겠다는 것을 나타낸다. 
파티션(Partition)은 토픽을 물리적으로 분할하는 메커니즘이다. 각 토픽은 여러 개의 파티션으로 나누어질 수 있다. 각 파티션은 독립적인 로그로 관리되며, 데이터는 특정 파티션에 저장된다. 여러 파티션을 사용하면 여러 브로커나 컨슈머가 동시에 작업을 수행할 수 있다. 각 파티션은 독립적으로 작동하므로, 각각의 파티션에서 동시에 메시지를 생산하거나 소비할 수 있다.
 파티션 내에서는 메시지의 순서가 보장된다. 하지만 다른 파티션 간에는 순서가 보장되지 않는다. 즉, 동일한 파티션에서 생성된 메시지는 생성 순서대로 소비되지만, 다른 파티션에서 생성된 메시지 간에는 상대적인 순서가 보장되지 않는다.
 파티션의 수는 토픽을 생성할 때 지정되며, 일반적으로 이 수는 Kafka 클러스터의 브로커 수보다 크게 설정된다. 각 파티션은 고유한 순서를 가지며, 이는 파티션 내에서 메시지의 순서를 보장하는 데 사용된다.

 

 

Kafka Connect


 

 

Kafka Connect는 Apache Kafka와 다양한 데이터 소스 또는 대상 간에 데이터를 신속하게 이동시키기 위한 분산 데이터 통합 프레임워크로서, 코드 없이 Configuration을 통해 데이터를 이동할 수 있다.

 Standalone mode와 Distributed mode를 지원하여 단일 노드나 여러 노드에서 실행할 수 있으며, RESTful API를 통해 간편한 관리 및 설정 변경이 가능하다.

 Stream 및 Batch 형태의 데이터 전송을 지원하여 실시간 및 대량 데이터 처리가 가능하며, 다양한 플러그인을 활용하여 커스텀 커넥터를 통해 File, S3, Hive, MySQL 등과 같은 다양한 데이터 소스 및 대상에 대한 통합이 있다.

 Kafka Connect Source는 외부 시스템에서 Kafka로 데이터를 가져오는 데 사용된다. 예를 들어, 데이터베이스, 로그 파일, MQTT 브로커 등과 같은 여러 소스에서 데이터를 읽어와 Kafka 토픽에 전송할 수 있다.

Kafka Connect Sink는 Kafka에서 다른 시스템으로 데이터를 보내는 데 사용된다. Kafka 토픽에서 데이터를 가져와 데이터베이스, 검색 엔진, 다른 메시징 시스템 등으로 전송할 수 있다.

 

Kafka Connect  설치

# 다운로드
curl -O https://packages.confluent.io/archive/6.1/confluent-6.1.0.tar.gz

# 압축 해제
taf -xvf confluent-6.1.0.tar.gz

# 실행
# 주키퍼, 카프카 실행 이후
$KAFKA_CONNECT_HOME/bin/connect-distributed ./etc/kafka/connect-distributed.properties

# 토픽 목록
$KAFKA_CONNECT_HOME/bin/kafka-topics --bootstrap-server localhost:9092 --list

 

JDBC Connect 설치

다운로드: https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc

vim $KAFKA_CONNECT_HOME/etc/kafka/connect-distributed.properties

# 프로퍼티 파일에서 설정
plugin.path=${jdbc-connector/lib 디렉토리}

# $KAFKA_CONNECT_HOME/share/java/kafka 디렉토리에 사용하는 데이터베이스 드라이버 파일을 복사
cp ~/.m2/repsotiroy/org/mariadb/jdb/mariadb-java-client/3.0.9/mariadb-java-client-3.0.9.jar \
 ~/msa-study/kafka-connect/confluent-6.1.0/share/java/kafka

 

 

Kafka Source Connect


등록

MySQL 데이터베이스에서 "users" 테이블의 변경 사항을 Kafka 토픽으로 캡처하는 JDBC 소스 커넥터를 생성하는 예제이다.

# Source Connect 등록
POST http://{주소}:{포트}/connectors

RequestBody
{
    "name": "my-source-connect",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url": "jdbc:mysql://localhost:3306/test",
        "connection.user": "root",
        "connection.password": "root",
        "mode": "incrementing",
        "incrementing.column.name": "id",
        "table.whitelist": "users",
        "topic.prefix": "my_topic_",
        "tasks.max": "1"
    }
}

# Source Connect 목록 확인
GET http://{주소}:{포트}/connectors | jq

# Source Connect 상세 확인
GET http://{주소}:{포트}/connectors/{커넥터 이름}/status | jq
  • name: 커넥터의 이름
  • connector.class: 사용되는 커넥터 클래스는 "io.confluent.connect.jdbc.JdbcSourceConnector". 이 커넥터는 JDBC 소스로 동작하며 데이터베이스에서 데이터를 읽어와서 Kafka 토픽으로 전송한다.
  • mode: 변경된 데이터를 감지하는 모드는 "incrementing"이다. 이 모드에서는 지정된 열("id" 열)의 값을 기반으로 변경된 데이터를 추적한다.
  • incrementing.column.name: 변경을 추적할 때 사용할 증가하는 열의 이름
  • table.whitelist: 커넥터가 처리할 테이블의 목록
  • topic.prefix: 생성된 Kafka 토픽의 접두사는 "my_topic_"이다. 예를 들어, "users" 테이블은 "my_topic_users"로 매핑된다
  • tasks.max: 동시에 실행되는 작업(태스크)의 최대 수는 1개

출력

# 데이터 입력 시
insert into users(user_id, pwd, name) values('user1', 'test123', 'kyoulho');

# 콘솔 출력
{
  "schema": {
    "type": "struct", 
    "fields": [
      {"type": "int32", "optional": false, "field": "id"},
      {"type": "string", "optional": true, "field": "user_id"},
      {"type": "string", "optional": true, "field": "pwd"},
      {"type": "string", "optional": true, "field": "name"}
    ],
    "optional": false,
    "name": "users"
  },
  "payload": {
    "id": 1,
    "user_id": "user1",
    "pwd": "test123",
    "name": "kyoulho"
  }
}

 

 

Kafka Sink Connect


등록

Kafka 토픽 "my_topic_users"에서 메시지를 읽어와서 MySQL 데이터베이스 "test"에 해당하는 테이블에 자동으로 쓰는 JDBC Sink 커넥터를 생성하는 예제이다.

POST http://http://{주소}:{포트}/connectors

RequestBody
{
    "name": "my-sink-connect",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url": "jdbc:mysql://localhost:3306/test",
        "connection.user": "root",
        "connection.password": "root",
        "auto.create": "true",
        "delete.enaled": "false",
        "tasks.max": "1",
        "topics": "my_topic_users"
    }
}
  • auto.create: 테이블이 없는 경우 자동으로 테이블을 생성하는지 여부, "my_topic_users" 테이블이 생성된다.
  • delete.enabled: 레코드 삭제를 허용할지 여부

 

Topic 데이터 구조

{
  "schema": {
    "type": "struct",
    "fields": [
      {"type": "int32", "optional": false, "field": "id"},
      {"type": "string", "optional": true, "field": "user_id"},
      {"type": "string", "optional": true,"field": "name"}
    ],
    "optional": false,
    "name": "users3"
  },
  "payload": {
    "id": 3,
    "user_id": "user1",
    "name": "kyoulho"
  }
}

'DevOps' 카테고리의 다른 글

[MW] Flyway  (0) 2024.08.31
RabbitMQ, Apache Kafka, AWS SQS 비교  (0) 2024.01.13