JVM/SpringCloud

Kafka를 활용한 데이터 베이스 비동기 통신

kyoulho 2024. 1. 1. 20:07

이벤트 중심 아키텍처


 

JDBC 또는 JPA를 사용하여 DB에 직접 접속하는 경우, 애플리케이션과 DB 간의 동기 상태로 인해 서비스 지연 및 다른 서비스에 대한 영향이 발생할 수 있다. 또한, 트래픽이 증가하거나 DB Locking이 발생할 경우 성능 문제가 발생할 수 있다. 이러한 문제를 해결하기 위해 Kafka Sink Connector를 사용하면 비동기 방식의 데이터 처리, 데이터 전달의 효율적인 관리, 안정적인 클러스터 구성, 서비스 간의 느슨한 결합, 이벤트 중심 아키텍처 등 다양한 이점을 얻을 수 있습니다. 특히 대용량 및 다중 서비스 환경에서는 MQ와 같은 서비스의 도입이 유용할 수 있다.

 

 

Dto

data class KafkaOrderDto(
    val schema: Schema,
    val payload: Payload,
) : Serializable

data class Schema(
    val type: String,
    val fields: List<Field>,
    val optional: Boolean,
    val name: String,
)

data class Field(
    val type: String,
    val optional: Boolean,
    val field: String,
)

data class Payload(
    val orderId: String,
    val userId: String,
    val productId: String,
    val qty: Int,
    val totalPrice: Int,
    val unitPrice: Int,
)

 

토픽 생성 및 사용

@Service
@Transactional
class OrderProducer(
    private val kafkaTemplate: KafkaTemplate<String, String>,
) {
    private val log: Logger = LoggerFactory.getLogger(OrderProducer::class.java)
    private val fields: List<Field> = listOf(
        Field("string", true, "order_id"),
        Field("string", true, "user_id"),
        Field("string", true, "product_id"),
        Field("int32", true, "qty"),
        Field("int32", true, "total_price"),
        Field("int32", true, "unit_price")
    )
    private val schema: Schema = Schema(type = "struct", fields, optional = false, name = "orders")

    fun send(kafkaTopic: String, orderDto: OrderDto) {
        val kafkaOrderDto = KafkaOrderDto(
            schema, Payload(
                orderId = orderDto.orderId,
                userId = orderDto.userId,
                productId = orderDto.productId,
                qty = orderDto.qty,
                unitPrice = orderDto.unitPrice,
                totalPrice = orderDto.totalPrice
            )
        )
        val jsonInString = ObjectMapper().writeValueAsString(kafkaOrderDto)
        kafkaTemplate.send(kafkaTopic, jsonInString)
        log.info("Kafka Producer send data from th order microservice: [${kafkaOrderDto}]")
    }
}

@Service
@Transactional
class OrderService(
    private val orderRepository: OrderRepository,
    private val kafkaProducer: KafkaProducer,
    private val orderProducer: OrderProducer,
) {

    fun createOrder(requestOrder: RequestOrder, userId: String): OrderDto {
        val orderDto = OrderDto.from(requestOrder, userId)
        orderProducer.send("order", orderDto)
        kafkaProducer.send("example-order-topic", orderDto)
        return orderDto
    }
}

 

Kafka Sink Connector 추가

curl -X POST -d @- http://{주소}:{포트}/connectors --header "content-Type:application/json"

RequestBody
{
    "name": "order-sink-connect",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url": "jdbc:mysql://{데이터베이스 주소}:{포트}/{스키마}",
        "connection.user": "root",
        "connection.password": "root",
        "auto.create": "true",
        "auto.evolve": "true",
        "delete.enaled": "false",
        "tasks.max": "1",
        "topics": "orders"
    }
}

 

 

 Domain 처리를 위한 Business Logic을 구현하는 것 이외에, 다른 부수적인 작업들은(Schema를 위한 자바 클래스 생성에 대한 부분) 상당히 귀찮은 부분이다.  아파치의 아브로(AVRO) 프래임워크를 사용하면 자료형과 프로토콜 정의 등을 위해 JSON을 작성하고 해당 데이터를 직렬화하여 Kafka에서 필요로 하는 스키마에 맞게 메시지를 직렬화/역직렬화가 가능하다. Confluent의 avro-kafka-connector 또는 아래와 같은 플러그인등을 사용할 수도 있다.

https://avro.apache.org/

https://github.com/davidmc24/gradle-avro-plugin

https://www.confluent.io/hub/confluentinc/kafka-connect-avro-converter

 

728x90

'JVM > SpringCloud' 카테고리의 다른 글

Zipkin  (2) 2024.01.03
Resilience4J - CircuitBreaker  (2) 2024.01.02
Kafka를 활용한 마이크로 서비스간 비동기 통신  (0) 2024.01.01
마이크로서비스간 동기 통신  (0) 2023.12.30
Spring Cloud Config Server의 암호화  (1) 2023.12.30