라이브러리
implementation("org.springframework.kafka:spring-kafka")
kafka Consumer
설정
@Configuration
@EnableKafka
class KafkaConsumerConfig {
// 토픽에 접속하기 위한 설정 정보
@Bean
fun consumerFactory(): ConsumerFactory<String, String> {
val properties = mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092",
ConsumerConfig.GROUP_ID_CONFIG to "consumerGroupId",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java
)
return DefaultKafkaConsumerFactory(properties)
}
// 토픽에 변경사항 리스너
@Bean
fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
return ConcurrentKafkaListenerContainerFactory<String, String>()
.also { it.consumerFactory = consumerFactory() }
}
}
토픽 리스너 등록
@Service
@Transactional
class KafkaConsumer(
private val catalogRepository: CatalogRepository,
) {
private val log: Logger = LoggerFactory.getLogger(KafkaService::class.java)
@KafkaListener(topics = ["order-topic"])
fun processMessage(kafkaMessage: String) {
log.info("카프카 메세지: [${kafkaMessage}]")
val map: Map<String, Any> = ObjectMapper().readValue(
kafkaMessage, object : TypeReference<Map<String, Any>>() {})
val productId = map["productId"].toString()
val entity = catalogRepository.findByProductIdOrThrow(productId)
entity.stock -= map["qty"] as Int
}
}
kafka Producer
설정
@Configuration
@EnableKafka
class KafkaProducerConfig {
@Bean
fun producerFactory(): ProducerFactory<String, String> {
val properties = mapOf(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "127.0.0.1:9092",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java
)
return DefaultKafkaProducerFactory(properties)
}
@Bean
fun kafkaTemplate(): KafkaTemplate<String, String> {
return KafkaTemplate(producerFactory())
}
}
토픽 생성 및 사용
@Service
@Transactional
class KafkaProducer(
private val kafkaTemplate: KafkaTemplate<String, String>,
) {
private val log: Logger = LoggerFactory.getLogger(KafkaService::class.java)
fun send(topicName: String, orderDto: OrderDto) {
val jsonInString = ObjectMapper().writeValueAsString(orderDto)
kafkaTemplate.send(topicName, jsonInString)
log.info("Kafka Producer send data from th order microservice: [${orderDto}]")
}
}
@Service
@Transactional
class OrderService(
private val orderRepository: OrderRepository,
private val kafkaProducer: KafkaProducer,
) {
fun createOrder(requestOrder: RequestOrder, userId: String): OrderDto {
val entity = OrderEntity.from(requestOrder, userId)
.let { orderRepository.save(it) }
val orderDto = OrderDto.from(entity)
kafkaProducer.send("order-topic", orderDto)
return orderDto
}
}
728x90
'JVM > SpringCloud' 카테고리의 다른 글
Resilience4J - CircuitBreaker (2) | 2024.01.02 |
---|---|
Kafka를 활용한 데이터 베이스 비동기 통신 (0) | 2024.01.01 |
마이크로서비스간 동기 통신 (0) | 2023.12.30 |
Spring Cloud Config Server의 암호화 (1) | 2023.12.30 |
Spring Cloud Bus (0) | 2023.12.28 |