JVM/SpringCloud

Kafka를 활용한 마이크로 서비스간 비동기 통신

kyoulho 2024. 1. 1. 20:01

라이브러리

implementation("org.springframework.kafka:spring-kafka")

 

kafka Consumer


설정

@Configuration
@EnableKafka
class KafkaConsumerConfig {

    // 토픽에 접속하기 위한 설정 정보
    @Bean
    fun consumerFactory(): ConsumerFactory<String, String> {
        val properties = mapOf(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092",
            ConsumerConfig.GROUP_ID_CONFIG to "consumerGroupId",
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java
        )
        return DefaultKafkaConsumerFactory(properties)
    }
    // 토픽에 변경사항 리스너
    @Bean
    fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
        return ConcurrentKafkaListenerContainerFactory<String, String>()
            .also { it.consumerFactory = consumerFactory() }
    }
}

 

 

토픽 리스너 등록

@Service
@Transactional
class KafkaConsumer(
    private val catalogRepository: CatalogRepository,
) {
    private val log: Logger = LoggerFactory.getLogger(KafkaService::class.java)

    @KafkaListener(topics = ["order-topic"])
    fun processMessage(kafkaMessage: String) {
        log.info("카프카 메세지: [${kafkaMessage}]")
        
        val map: Map<String, Any> = ObjectMapper().readValue(
            kafkaMessage, object : TypeReference<Map<String, Any>>() {})
            
        val productId = map["productId"].toString()
        
        val entity = catalogRepository.findByProductIdOrThrow(productId)
        entity.stock -= map["qty"] as Int
    }
}

 

 

kafka Producer


설정

@Configuration
@EnableKafka
class KafkaProducerConfig {
    @Bean
    fun producerFactory(): ProducerFactory<String, String> {
        val properties = mapOf(
            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092",
            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java
        )
        return DefaultKafkaProducerFactory(properties)
    }

    @Bean
    fun kafkaTemplate(): KafkaTemplate<String, String> {
        return KafkaTemplate(producerFactory())
    }
}

 

토픽 생성 및 사용

@Service
@Transactional
class KafkaProducer(
    private val kafkaTemplate: KafkaTemplate<String, String>,
) {
    private val log: Logger = LoggerFactory.getLogger(KafkaService::class.java)

    fun send(topicName: String, orderDto: OrderDto) {
        val jsonInString = ObjectMapper().writeValueAsString(orderDto)
        kafkaTemplate.send(topicName, jsonInString)
        
        log.info("Kafka Producer send data from th order microservice: [${orderDto}]")
    }
}

@Service
@Transactional
class OrderService(
    private val orderRepository: OrderRepository,
    private val kafkaProducer: KafkaProducer,
) {

    fun createOrder(requestOrder: RequestOrder, userId: String): OrderDto {
        val entity = OrderEntity.from(requestOrder, userId)
            .let { orderRepository.save(it) }
        val orderDto = OrderDto.from(entity)
        
        kafkaProducer.send("order-topic", orderDto)
        return orderDto
    }
}
728x90

'JVM > SpringCloud' 카테고리의 다른 글

Resilience4J - CircuitBreaker  (2) 2024.01.02
Kafka를 활용한 데이터 베이스 비동기 통신  (0) 2024.01.01
마이크로서비스간 동기 통신  (0) 2023.12.30
Spring Cloud Config Server의 암호화  (1) 2023.12.30
Spring Cloud Bus  (0) 2023.12.28