이전 프로젝트에서 개발을 하다가 서비스가 가지는 필드가 상당히 많아지는 상황이 존재했습니다.
프로젝트 내에서 편지에 대해 신고가 접수될 경우 다음의 로직을 거치게 됩니다.
- 신고 생성
- 해당 편지에 신고가 3회 쌓였다면,
- 해당 편지 블락 처리
- 편지 작성자에게 경고 알림 전송
- 편지 작성자 경고 횟수 증가
신고 서비스에서 a, b, c에 해당하는 내용은 다른 서비스를 통해 처리해야 하는 로직입니다. 그러다 보니 신고 서비스의 필드는 늘어나 다른 서비스와 강결합되어 있는 상태입니다.
public class ComplaintService {
private final KeywordComplaintRepository keywordComplaintRepository;
private final MapComplaintRepository mapComplaintRepository;
private final KeywordReplyComplaintRepository keywordReplyComplaintRepository;
private final MapReplyComplaintRepository mapReplyComplaintRepository;
private final NotificationService notificationService;
private final LetterService letterService;
private final ReplyLetterService replyLetterService;
private final MapLetterService mapLetterService;
private final UserService userService;
@Transactional
public ComplaintResponseDTO complain(ComplaintType type, Long letterId, Long reporterId, String description) {
Complaint newComplaint = Complaint.create(letterId, reporterId, description);
Complaints existingComplaints = findExistingComplaints(type, letterId);
existingComplaints.add(newComplaint);
if (existingComplaints.needWarning()) {
sendWarningToWriter(type, letterId);
}
return ComplaintResponseDTO.from(saveComplaint(newComplaint, type));
}
}
이러한 상황과 같은 강결합을 해소하기 위해 어떤 방법이 있을까 고민하다가 이벤트 기반 아키텍처(Event Driven Architecture)를 접하게 되었습니다.
📌 Event Driven Architecture
주로 Event Driven Architecture는 MSA에서 언급됩니다. MSA의 핵심 키워드 중 느슨한 결합과 관련이 있는데, Event Driven Architecture가 느슨한 결합을 돕습니다.
Event Driven Architecture란?
이벤트 기반 아키텍처(Event Driven Architecture)는 소프트웨어 아키텍처의 패턴으로 시스템 내 발생하는 이벤트를 기반으로 컴포넌트들이 통신하는 구조입니다. 특정 이벤트가 발생하면 이를 구독하는 다른 컴포넌트들이 이에 반응하여 동작합니다.
Event Driven Architecture의 Message Queue
이벤트 기반 아키텍처에서 이벤트가 전달되는 Message Queue가 필요합니다.
RabbitMQ
- AMQP를 따르는 오픈소스 메시지 큐
- AMQP : 메시지 지향 미들웨어를 위한 개방형 표준 응용 계층 프로토콜
Kafka
- pub/sub 모델의 메시지 큐
- 분산 환경에 특화
- Producer가 Event를 게시
- Consumer는 Topic을 구독하여 이로부터 Event를 가져와 처리
Redis Pub/Sub
- Redis에서 제공하는 메시지 큐
- publisher가 channel에 메시지 개시
- 해당 채널을 구독하는 subscriber가 메시지를 sub해서 처리
📌 예시 : 회원가입 시, 회원가입 쿠폰 발행
유저가 회원가입하면, 회원가입 쿠폰이 발행되는 로직을 예시로 들어보겠습니다.
@Service
@RequiredArgsConstructor
public class UserService {
private final UserRepository userRepository;
private final CouponService couponService;
@Transactional
public UserCreateResponse createUser(UserCreateRequest userCreateRequest) {
User user = User.from(userCreateRequest);
User savedUser = userRepository.save(user);
couponService.createCoupon(new CouponCreateRequest(savedUser.getId(), CouponType.SIGN_UP));
return UserCreateResponse.from(savedUser);
}
}
@Service
@RequiredArgsConstructor
public class CouponService {
private final CouponRepository couponRepository;
@Transactional
public void createCoupon(CouponCreateRequest couponCreateRequest) {
Coupon coupon = Coupon.from(couponCreateRequest);
couponRepository.save(coupon);
}
}
UserService는 CouponService를 알고 있는 상태로, 회원가입 쿠폰 발행을 요청합니다.
이 로직만 봤을 때 의존하는 서비스가 그렇게 많진 않지만, 알림이나 다른 부가 기능이 더해진다면 점점 늘어나게 됩니다.
이때 회원가입 이벤트를 발행하여 이를 구독하는 CouponService가 이벤트를 확인하고 쿠폰을 발행하게 하면 어떨까요?
Kafka를 이용하여 가볍게 다뤄보도록 하겠습니다.
Kafka 적용
build.gradle 추가
- build.gradle
implementation 'org.springframework.kafka:spring-kafka'
Configuration
Kafka Confg를 설정하는 방법은 2가지가 존재합니다. application 설정을 통해 bean을 등록하는 방법과 코드를 통해 직접 설정하는 방법이 있습니다. 예시에서는 코드를 통해 bean을 등록해 보겠습니다.
- ProducerConfig
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, SignUpEvent> factory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, SignUpEvent> kafkaTemplate() {
return new KafkaTemplate<>(factory());
}
}
예시에서는 SignUpEvent를 통해 record를 전달하고자 합니다. 따라서 value에는 JsonSerializer를 설정하였습니다.
- ConsumerConfig
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, SignUpEvent> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
// 들어오는 record를 객체로 받기 위한 deserializer
JsonDeserializer<SignUpEvent> deserializer = new JsonDeserializer<>(SignUpEvent.class, false);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), deserializer);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, SignUpEvent> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, SignUpEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
Component
- Producer
@Component
@Slf4j
@RequiredArgsConstructor
public class SignUpEventProducer {
private final KafkaTemplate<String, SignUpEvent> kafkaTemplate;
public void send(String topic, SignUpEvent event) {
log.info("sending event={} to topic={}", event, topic);
kafkaTemplate.send(topic, event);
}
}
Producer를 Component로 설정하고, UserService에서 주입받는 방식으로 변경하였습니다.
@Service
@RequiredArgsConstructor
public class UserService {
private static final String TOPIC = "sign-up";
private final UserRepository userRepository;
// private final CouponService couponService;
private final SignUpEventProducer signUpEventProducer;
@Transactional
public UserCreateResponse createUser(UserCreateRequest userCreateRequest) {
User user = User.from(userCreateRequest);
User savedUser = userRepository.save(user);
// couponService.createCoupon(new CouponCreateRequest(savedUser.getId(), CouponType.SIGN_UP));
signUpEventProducer.send(TOPIC, new SignUpEvent(savedUser.getId()));
return UserCreateResponse.from(savedUser);
}
}
이를 통해 UserService는 CouponService를 알 필요 없이, SignUpEvent를 발행하기만 하면 됩니다.
- Consumer
@Service
@RequiredArgsConstructor
public class CouponService {
private final CouponRepository couponRepository;
@Transactional
@KafkaListener(topics = "sign-up", containerFactory = "kafkaListenerContainerFactory", groupId = "coupon")
public void receive(ConsumerRecord<String, SignUpEvent> record) {
SignUpEvent event = record.value();
Coupon coupon = Coupon.from(new CouponCreateRequest(event.getUserId(), CouponType.SIGN_UP));
couponRepository.save(coupon);
}
}
이를 통해, UserService에서 SignUpEvent 발행하면 회원가입 쿠폰이 생성됩니다.
📌 트랜잭션은 어떻게 이루어지는 걸까?
위처럼 카프카를 통해 처리한 후, 위 두 로직의 트랜잭션에 대해 궁금증이 생겼습니다. 트랜잭션 로그를 확인해 보았습니다.
Getting transaction for [com.example.event.user.service.UserService.createUser]
Getting transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]
Hibernate: insert into user (email,username) values (?,?)
Completing transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]
sending event=SignUpEvent(userId=1) to topic=sign-up
Created new Producer: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@14974343]
CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@14974343] beginTransaction()
Sending: ProducerRecord(topic=sign-up, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value=SignUpEvent(userId=1), timestamp=null)
CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@14974343] send(ProducerRecord(topic=sign-up, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value=SignUpEvent(userId=1), timestamp=null))
Sent: ProducerRecord(topic=sign-up, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 101, 120, 97, 109, 112, 108, 101, 46, 101, 118, 101, 110, 116, 46, 99, 111, 117, 112, 111, 110, 46, 115, 101, 114, 118, 105, 99, 101, 46, 114, 101, 115, 112, 111, 110, 115, 101, 46, 83, 105, 103, 110, 85, 112, 69, 118, 101, 110, 116])], isReadOnly = true), key=null, value=SignUpEvent(userId=1), timestamp=null)
Completing transaction for [com.example.event.user.service.UserService.createUser]
CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@14974343] commitTransaction()
ProducerRecord(topic=sign-up, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 101, 120, 97, 109, 112, 108, 101, 46, 101, 118, 101, 110, 116, 46, 99, 111, 117, 112, 111, 110, 46, 115, 101, 114, 118, 105, 99, 101, 46, 114, 101, 115, 112, 111, 110, 115, 101, 46, 83, 105, 103, 110, 85, 112, 69, 118, 101, 110, 116])], isReadOnly = true), key=null, value=SignUpEvent(userId=1), timestamp=null), metadata: sign-up-0@8
receive sign-up event: SignUpEvent(userId=1)
Getting transaction for [com.example.event.coupon.service.CouponService.receive]
Getting transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]
Hibernate: insert into coupon (coupon_type,user_id) values (?,?)
Completing transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]
Completing transaction for [com.example.event.coupon.service.CouponService.receive]
로그를 확인해 보면 다음과 같은 순서를 거치게 됩니다.
- UserService.createUser 트랜잭션 시작
- 유저 DB에 저장
- 회원가입 이벤트 생성
- UserService.createUser 트랜잭션 커밋
- CouponService.receive 트랜잭션 시작
- 쿠폰 생성
- CouponService.receive 트랜잭션 종료
그런데 유저 등록에 실패하였을 때는 어떻게 될까요?
유저 등록 실패
유저 생성 로직에서 예외를 터트리는 코드를 추가해 보겠습니다.
@Transactional
public UserCreateResponse createUser(UserCreateRequest userCreateRequest) {
User user = User.from(userCreateRequest);
User savedUser = userRepository.save(user);
signUpEventProducer.send(TOPIC, new SignUpEvent(savedUser.getId()));
throw new RuntimeException();
// return UserCreateResponse.from(savedUser);
}
유저 등록과 쿠폰 생성은 현재 별개로 돌아가기 때문에 유저 생성에는 실패했지만, 쿠폰은 생성되었습니다.
Getting transaction for [com.example.event.user.service.UserService.createUser]
Getting transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]
Hibernate: insert into user (email,username) values (?,?)
Completing transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]
sending event=SignUpEvent(userId=2) to topic=sign-up
Sending: ProducerRecord(topic=sign-up, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value=SignUpEvent(userId=2), timestamp=null)
CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@15da14e2] send(ProducerRecord(topic=sign-up, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value=SignUpEvent(userId=2), timestamp=null))
Sent: ProducerRecord(topic=sign-up, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 101, 120, 97, 109, 112, 108, 101, 46, 101, 118, 101, 110, 116, 46, 99, 111, 117, 112, 111, 110, 46, 115, 101, 114, 118, 105, 99, 101, 46, 114, 101, 115, 112, 111, 110, 115, 101, 46, 83, 105, 103, 110, 85, 112, 69, 118, 101, 110, 116])], isReadOnly = true), key=null, value=SignUpEvent(userId=2), timestamp=null)
Completing transaction for [com.example.event.user.service.UserService.createUser] after exception: java.lang.RuntimeException
Sent ok: ProducerRecord(topic=sign-up, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 101, 120, 97, 109, 112, 108, 101, 46, 101, 118, 101, 110, 116, 46, 99, 111, 117, 112, 111, 110, 46, 115, 101, 114, 118, 105, 99, 101, 46, 114, 101, 115, 112, 111, 110, 115, 101, 46, 83, 105, 103, 110, 85, 112, 69, 118, 101, 110, 116])], isReadOnly = true), key=null, value=SignUpEvent(userId=2), timestamp=null), metadata: sign-up-0@6
CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@15da14e2] close(PT5S)
receive sign-up event: SignUpEvent(userId=2)
Getting transaction for [com.example.event.coupon.service.CouponService.receive]
Getting transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]
Hibernate: insert into coupon (coupon_type,user_id) values (?,?)
Completing transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]
Completing transaction for [com.example.event.coupon.service.CouponService.receive]
로그를 보면, 유저 등록 트랜잭션이 끝나기 전에 이벤트를 보내고 그 이후에 유저 등록 트랜잭션이 종료됩니다. 코드 라인 그대로 이벤트 발행은 이미 처리되었기 때문에, 유저 생성은 실패했지만 쿠폰은 정상적으로 만들어졌습니다.
만약 서비스에서 커밋되지 않은 내용에 대한 조회가 발생할 경우, 문제가 될 수 있습니다.
유저 생성에 실패했을 때, 쿠폰도 생성되지 않으려면 어떻게 해야 할까?
유저 생성에 실패했을 때 쿠폰도 생성되지 않도록 하려면, 이벤트가 커밋 후 보내지도록 하는 방법이 있습니다.
스프링 트랜잭션 이후 이벤트를 발행하기 위해서는 Kafka 트랜잭션을 설정해야 합니다.
@Bean
public ProducerFactory<String, SignUpEvent> factory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
DefaultKafkaProducerFactory<String, SignUpEvent> factory = new DefaultKafkaProducerFactory<>(props);
factory.setTransactionIdPrefix("tx-");
return factory;
}
이전에 설정한 Producer에서 setTransactionIdPrefix를 설정합니다. 이후 카프카 트랜잭션으로 동작하게 되며, kafkaTemplate.send에서 이벤트가 발행되지만 스프링 트랜잭션 이후 해당 이벤트가 커밋됩니다.
- 카프카 트랜잭션 설정 전 정상 동작 로그
2025-02-17T21:42:49.402+09:00 TRACE 61583 --- [event] [nio-8080-exec-2] o.s.t.i.TransactionInterceptor : Getting transaction for [com.example.event.user.service.UserService.createUser]
2025-02-17T21:42:49.403+09:00 TRACE 61583 --- [event] [nio-8080-exec-2] o.s.t.i.TransactionInterceptor : Getting transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]
Hibernate: insert into user (email,username) values (?,?)
2025-02-17T21:42:49.423+09:00 TRACE 61583 --- [event] [nio-8080-exec-2] o.s.t.i.TransactionInterceptor : Completing transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]
2025-02-17T21:42:49.423+09:00 INFO 61583 --- [event] [nio-8080-exec-2] c.e.event.kafka.SignUpEventProducer : sending event=SignUpEvent(userId=1) to topic=sign-up
2025-02-17T21:42:49.435+09:00 DEBUG 61583 --- [event] [nio-8080-exec-2] o.s.k.core.DefaultKafkaProducerFactory : Created new Producer: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@433b8059]
2025-02-17T21:42:49.435+09:00 TRACE 61583 --- [event] [nio-8080-exec-2] o.s.kafka.core.KafkaTemplate : Sending: ProducerRecord(topic=sign-up, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value=SignUpEvent(userId=1), timestamp=null)
2025-02-17T21:42:49.435+09:00 TRACE 61583 --- [event] [nio-8080-exec-2] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@433b8059] send(ProducerRecord(topic=sign-up, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value=SignUpEvent(userId=1), timestamp=null))
2025-02-17T21:42:49.443+09:00 TRACE 61583 --- [event] [nio-8080-exec-2] o.s.kafka.core.KafkaTemplate : Sent: ProducerRecord(topic=sign-up, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 101, 120, 97, 109, 112, 108, 101, 46, 101, 118, 101, 110, 116, 46, 99, 111, 117, 112, 111, 110, 46, 115, 101, 114, 118, 105, 99, 101, 46, 114, 101, 115, 112, 111, 110, 115, 101, 46, 83, 105, 103, 110, 85, 112, 69, 118, 101, 110, 116])], isReadOnly = true), key=null, value=SignUpEvent(userId=1), timestamp=null)
2025-02-17T21:42:49.443+09:00 TRACE 61583 --- [event] [nio-8080-exec-2] o.s.t.i.TransactionInterceptor : Completing transaction for [com.example.event.user.service.UserService.createUser]
2025-02-17T21:42:49.450+09:00 TRACE 61583 --- [event] [vent-producer-1] o.s.kafka.core.KafkaTemplate : Sent ok: ProducerRecord(topic=sign-up, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 101, 120, 97, 109, 112, 108, 101, 46, 101, 118, 101, 110, 116, 46, 99, 111, 117, 112, 111, 110, 46, 115, 101, 114, 118, 105, 99, 101, 46, 114, 101, 115, 112, 111, 110, 115, 101, 46, 83, 105, 103, 110, 85, 112, 69, 118, 101, 110, 116])], isReadOnly = true), key=null, value=SignUpEvent(userId=1), timestamp=null), metadata: sign-up-0@26
2025-02-17T21:42:49.450+09:00 TRACE 61583 --- [event] [vent-producer-1] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@433b8059] close(PT5S)
2025-02-17T21:42:49.450+09:00 INFO 61583 --- [event] [ntainer#1-0-C-1] c.e.event.kafka.SignUpEventConsumer : receive sign-up event: SignUpEvent(userId=1)
2025-02-17T21:42:49.451+09:00 TRACE 61583 --- [event] [ntainer#0-0-C-1] o.s.t.i.TransactionInterceptor : Getting transaction for [com.example.event.coupon.service.CouponService.receive]
2025-02-17T21:42:49.451+09:00 TRACE 61583 --- [event] [ntainer#0-0-C-1] o.s.t.i.TransactionInterceptor : Getting transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]
Hibernate: insert into coupon (coupon_type,user_id) values (?,?)
2025-02-17T21:42:49.453+09:00 TRACE 61583 --- [event] [ntainer#0-0-C-1] o.s.t.i.TransactionInterceptor : Completing transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]
2025-02-17T21:42:49.453+09:00 TRACE 61583 --- [event] [ntainer#0-0-C-1] o.s.t.i.TransactionInterceptor : Completing transaction for [com.example.event.coupon.service.CouponService.receive]
- 카프카 트랜잭션 설정 후 정상 동작 로그
2025-02-17T21:44:16.530+09:00 TRACE 61616 --- [event] [nio-8080-exec-2] o.s.t.i.TransactionInterceptor : Getting transaction for [com.example.event.user.service.UserService.createUser]
2025-02-17T21:44:16.531+09:00 TRACE 61616 --- [event] [nio-8080-exec-2] o.s.t.i.TransactionInterceptor : Getting transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]
Hibernate: insert into user (email,username) values (?,?)
2025-02-17T21:44:16.548+09:00 TRACE 61616 --- [event] [nio-8080-exec-2] o.s.t.i.TransactionInterceptor : Completing transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]
2025-02-17T21:44:16.548+09:00 INFO 61616 --- [event] [nio-8080-exec-2] c.e.event.kafka.SignUpEventProducer : sending event=SignUpEvent(userId=1) to topic=sign-up
2025-02-17T21:44:16.669+09:00 DEBUG 61616 --- [event] [nio-8080-exec-2] o.s.k.core.DefaultKafkaProducerFactory : Created new Producer: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@291089b5]
2025-02-17T21:44:16.669+09:00 DEBUG 61616 --- [event] [nio-8080-exec-2] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@291089b5] beginTransaction()
2025-02-17T21:44:16.670+09:00 TRACE 61616 --- [event] [nio-8080-exec-2] o.s.kafka.core.KafkaTemplate : Sending: ProducerRecord(topic=sign-up, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value=SignUpEvent(userId=1), timestamp=null)
2025-02-17T21:44:16.670+09:00 TRACE 61616 --- [event] [nio-8080-exec-2] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@291089b5] send(ProducerRecord(topic=sign-up, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value=SignUpEvent(userId=1), timestamp=null))
2025-02-17T21:44:16.685+09:00 TRACE 61616 --- [event] [nio-8080-exec-2] o.s.kafka.core.KafkaTemplate : Sent: ProducerRecord(topic=sign-up, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 101, 120, 97, 109, 112, 108, 101, 46, 101, 118, 101, 110, 116, 46, 99, 111, 117, 112, 111, 110, 46, 115, 101, 114, 118, 105, 99, 101, 46, 114, 101, 115, 112, 111, 110, 115, 101, 46, 83, 105, 103, 110, 85, 112, 69, 118, 101, 110, 116])], isReadOnly = true), key=null, value=SignUpEvent(userId=1), timestamp=null)
2025-02-17T21:44:16.686+09:00 TRACE 61616 --- [event] [nio-8080-exec-2] o.s.t.i.TransactionInterceptor : Completing transaction for [com.example.event.user.service.UserService.createUser]
2025-02-17T21:44:16.690+09:00 DEBUG 61616 --- [event] [nio-8080-exec-2] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@291089b5] commitTransaction()
2025-02-17T21:44:16.698+09:00 TRACE 61616 --- [event] [vent-producer-1] o.s.kafka.core.KafkaTemplate : Sent ok: ProducerRecord(topic=sign-up, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 101, 120, 97, 109, 112, 108, 101, 46, 101, 118, 101, 110, 116, 46, 99, 111, 117, 112, 111, 110, 46, 115, 101, 114, 118, 105, 99, 101, 46, 114, 101, 115, 112, 111, 110, 115, 101, 46, 83, 105, 103, 110, 85, 112, 69, 118, 101, 110, 116])], isReadOnly = true), key=null, value=SignUpEvent(userId=1), timestamp=null), metadata: sign-up-0@27
2025-02-17T21:44:16.699+09:00 INFO 61616 --- [event] [ntainer#1-0-C-1] c.e.event.kafka.SignUpEventConsumer : receive sign-up event: SignUpEvent(userId=1)
2025-02-17T21:44:16.700+09:00 TRACE 61616 --- [event] [nio-8080-exec-2] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@291089b5] close(PT5S)
2025-02-17T21:44:16.700+09:00 TRACE 61616 --- [event] [ntainer#0-0-C-1] o.s.t.i.TransactionInterceptor : Getting transaction for [com.example.event.coupon.service.CouponService.receive]
2025-02-17T21:44:16.701+09:00 TRACE 61616 --- [event] [ntainer#0-0-C-1] o.s.t.i.TransactionInterceptor : Getting transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]
Hibernate: insert into coupon (coupon_type,user_id) values (?,?)
2025-02-17T21:44:16.704+09:00 TRACE 61616 --- [event] [ntainer#0-0-C-1] o.s.t.i.TransactionInterceptor : Completing transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]
2025-02-17T21:44:16.704+09:00 TRACE 61616 --- [event] [ntainer#0-0-C-1] o.s.t.i.TransactionInterceptor : Completing transaction for [com.example.event.coupon.service.CouponService.receive]
두 로그가 유사해 보이지만, 카프카 트랜잭션 설정 이후에는 beginTransaction(), commitTransaction() 로그가 추가되었습니다.
그렇다면 이제 UserService에서 예외를 던지면 어떻게 될까요?
2025-02-17T21:47:48.692+09:00 TRACE 61681 --- [event] [nio-8080-exec-2] o.s.t.i.TransactionInterceptor : Getting transaction for [com.example.event.user.service.UserService.createUser]
2025-02-17T21:47:48.693+09:00 TRACE 61681 --- [event] [nio-8080-exec-2] o.s.t.i.TransactionInterceptor : Getting transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]
Hibernate: insert into user (email,username) values (?,?)
2025-02-17T21:47:48.712+09:00 TRACE 61681 --- [event] [nio-8080-exec-2] o.s.t.i.TransactionInterceptor : Completing transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]
2025-02-17T21:47:48.712+09:00 INFO 61681 --- [event] [nio-8080-exec-2] c.e.event.kafka.SignUpEventProducer : sending event=SignUpEvent(userId=1) to topic=sign-up
2025-02-17T21:47:48.831+09:00 DEBUG 61681 --- [event] [nio-8080-exec-2] o.s.k.core.DefaultKafkaProducerFactory : Created new Producer: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@478f595]
2025-02-17T21:47:48.831+09:00 DEBUG 61681 --- [event] [nio-8080-exec-2] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@478f595] beginTransaction()
2025-02-17T21:47:48.832+09:00 TRACE 61681 --- [event] [nio-8080-exec-2] o.s.kafka.core.KafkaTemplate : Sending: ProducerRecord(topic=sign-up, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value=SignUpEvent(userId=1), timestamp=null)
2025-02-17T21:47:48.832+09:00 TRACE 61681 --- [event] [nio-8080-exec-2] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@478f595] send(ProducerRecord(topic=sign-up, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value=SignUpEvent(userId=1), timestamp=null))
2025-02-17T21:47:48.846+09:00 TRACE 61681 --- [event] [nio-8080-exec-2] o.s.kafka.core.KafkaTemplate : Sent: ProducerRecord(topic=sign-up, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 101, 120, 97, 109, 112, 108, 101, 46, 101, 118, 101, 110, 116, 46, 99, 111, 117, 112, 111, 110, 46, 115, 101, 114, 118, 105, 99, 101, 46, 114, 101, 115, 112, 111, 110, 115, 101, 46, 83, 105, 103, 110, 85, 112, 69, 118, 101, 110, 116])], isReadOnly = true), key=null, value=SignUpEvent(userId=1), timestamp=null)
2025-02-17T21:47:48.847+09:00 TRACE 61681 --- [event] [nio-8080-exec-2] o.s.t.i.TransactionInterceptor : Completing transaction for [com.example.event.user.service.UserService.createUser] after exception: java.lang.RuntimeException
2025-02-17T21:47:48.849+09:00 DEBUG 61681 --- [event] [nio-8080-exec-2] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@478f595] abortTransaction()
2025-02-17T21:47:48.850+09:00 ERROR 61681 --- [event] [vent-producer-1] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='null' and payload='SignUpEvent(userId=1)' to topic sign-up:
org.apache.kafka.common.errors.TransactionAbortedException: Failing batch since transaction was aborted
2025-02-17T21:47:48.851+09:00 DEBUG 61681 --- [event] [vent-producer-1] o.s.kafka.core.KafkaTemplate : Failed to send: ProducerRecord(topic=sign-up, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 101, 120, 97, 109, 112, 108, 101, 46, 101, 118, 101, 110, 116, 46, 99, 111, 117, 112, 111, 110, 46, 115, 101, 114, 118, 105, 99, 101, 46, 114, 101, 115, 112, 111, 110, 115, 101, 46, 83, 105, 103, 110, 85, 112, 69, 118, 101, 110, 116])], isReadOnly = true), key=null, value=SignUpEvent(userId=1), timestamp=null)
org.apache.kafka.common.errors.TransactionAbortedException: Failing batch since transaction was aborted
2025-02-17T21:47:48.853+09:00 TRACE 61681 --- [event] [nio-8080-exec-2] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@478f595] close(PT5S)
2025-02-17T21:47:48.857+09:00 ERROR 61681 --- [event] [nio-8080-exec-2] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed: java.lang.RuntimeException] with root cause
이벤트를 발행했지만, 후에 abortTransaction() 로그를 확인할 수 있습니다. 즉, 이벤트는 발행되지만 abortTransaction() 로그와 함께 실행되지는 않았습니다.
TransactionalEventListner 활용
Consumer가 해당 이벤트를 처리하진 않았지만, 더 완벽한 상황은 스프링 트랜잭션 이후 이벤트가 발행되는 것입니다. 이때 TransactionalEventListener를 활용하여 커밋 이후에 kafkaTemplate.send를 실행시킬 수 있습니다.
이를 위해 기존 UserService에서 SignUpEventProducer 대신 ApplicationEventPublisher를 주입받았습니다.
@Service
@RequiredArgsConstructor
public class UserService {
private static final String TOPIC = "sign-up";
private final UserRepository userRepository;
// private final SignUpEventProducer signUpEventProducer;
private final ApplicationEventPublisher applicationEventPublisher;
@Transactional
public UserCreateResponse createUser(UserCreateRequest userCreateRequest) {
User user = User.from(userCreateRequest);
User savedUser = userRepository.save(user);
applicationEventPublisher.publishEvent(new SignUpEvent(savedUser.getId()));
// throw new RuntimeException();
return UserCreateResponse.from(savedUser);
}
}
@Component
@Slf4j
@RequiredArgsConstructor
public class SignUpEventProducer {
private final KafkaTemplate<String, SignUpEvent> kafkaTemplate;
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void send(SignUpEvent event) {
log.info("sending event={} to topic={}", event, "sign-up");
kafkaTemplate.send("sign-up", event);
}
}
ApplicationEventPublisher는 SignUpEvent를 publish 합니다. 그 후, @TransactionalEventListener를 통해 SignUpEventProducer는 해당 이벤트를 처리합니다.
phase = TransactionPhase.AFTER_COMMIT 옵션은 @Transactional의 커밋 이후에만 이벤트가 발행됩니다. 따라서 스프링 트랜잭션이 완료된 후 이벤트가 생성됩니다.
- 예외 상황
2025-02-18T00:30:33.526+09:00 TRACE 62693 --- [event] [nio-8080-exec-2] o.s.t.i.TransactionInterceptor : Getting transaction for [com.example.event.user.service.UserService.createUser]
2025-02-18T00:30:33.527+09:00 TRACE 62693 --- [event] [nio-8080-exec-2] o.s.t.i.TransactionInterceptor : Getting transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]
Hibernate: insert into user (email,username) values (?,?)
2025-02-18T00:30:33.547+09:00 TRACE 62693 --- [event] [nio-8080-exec-2] o.s.t.i.TransactionInterceptor : Completing transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]
2025-02-18T00:30:33.548+09:00 TRACE 62693 --- [event] [nio-8080-exec-2] o.s.t.i.TransactionInterceptor : Completing transaction for [com.example.event.user.service.UserService.createUser] after exception: java.lang.RuntimeException
2025-02-18T00:30:33.551+09:00 ERROR 62693 --- [event] [nio-8080-exec-2] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed: java.lang.RuntimeException] with root cause
로그에 나와있다시피 이벤트가 발행되지 않았습니다.
- 예외 X
2025-02-18T00:31:41.161+09:00 TRACE 62718 --- [event] [nio-8080-exec-2] o.s.t.i.TransactionInterceptor : Getting transaction for [com.example.event.user.service.UserService.createUser]
2025-02-18T00:31:41.162+09:00 TRACE 62718 --- [event] [nio-8080-exec-2] o.s.t.i.TransactionInterceptor : Getting transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]
Hibernate: insert into user (email,username) values (?,?)
2025-02-18T00:31:41.180+09:00 TRACE 62718 --- [event] [nio-8080-exec-2] o.s.t.i.TransactionInterceptor : Completing transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]
2025-02-18T00:31:41.181+09:00 TRACE 62718 --- [event] [nio-8080-exec-2] o.s.t.i.TransactionInterceptor : Completing transaction for [com.example.event.user.service.UserService.createUser]
2025-02-18T00:31:41.185+09:00 INFO 62718 --- [event] [nio-8080-exec-2] c.e.event.kafka.SignUpEventProducer : sending event=SignUpEvent(userId=1) to topic=sign-up
2025-02-18T00:31:41.307+09:00 DEBUG 62718 --- [event] [nio-8080-exec-2] o.s.k.core.DefaultKafkaProducerFactory : Created new Producer: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@b98f482]
2025-02-18T00:31:41.307+09:00 DEBUG 62718 --- [event] [nio-8080-exec-2] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@b98f482] beginTransaction()
2025-02-18T00:31:41.308+09:00 TRACE 62718 --- [event] [nio-8080-exec-2] o.s.kafka.core.KafkaTemplate : Sending: ProducerRecord(topic=sign-up, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value=SignUpEvent(userId=1), timestamp=null)
2025-02-18T00:31:41.309+09:00 TRACE 62718 --- [event] [nio-8080-exec-2] o.s.k.core.DefaultKafkaProducerFactory : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@b98f482] send(ProducerRecord(topic=sign-up, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value=SignUpEvent(userId=1), timestamp=null))
2025-02-18T00:31:41.322+09:00 TRACE 62718 --- [event] [nio-8080-exec-2] o.s.kafka.core.KafkaTemplate : Sent: ProducerRecord(topic=sign-up, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 101, 120, 97, 109, 112, 108, 101, 46, 101, 118, 101, 110, 116, 46, 99, 111, 117, 112, 111, 110, 46, 115, 101, 114, 118, 105, 99, 101, 46, 114, 101, 115, 112, 111, 110, 115, 101, 46, 83, 105, 103, 110, 85, 112, 69, 118, 101, 110, 116])], isReadOnly = true), key=null, value=SignUpEvent(userId=1), timestamp=null)
2025-02-18T00:31:41.332+09:00 TRACE 62718 --- [event] [vent-producer-1] o.s.kafka.core.KafkaTemplate : Sent ok: ProducerRecord(topic=sign-up, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 101, 120, 97, 109, 112, 108, 101, 46, 101, 118, 101, 110, 116, 46, 99, 111, 117, 112, 111, 110, 46, 115, 101, 114, 118, 105, 99, 101, 46, 114, 101, 115, 112, 111, 110, 115, 101, 46, 83, 105, 103, 110, 85, 112, 69, 118, 101, 110, 116])], isReadOnly = true), key=null, value=SignUpEvent(userId=1), timestamp=null), metadata: sign-up-0@37
2025-02-18T00:31:41.332+09:00 INFO 62718 --- [event] [ntainer#1-0-C-1] c.e.event.kafka.SignUpEventConsumer : receive sign-up event: SignUpEvent(userId=1)
2025-02-18T00:31:41.332+09:00 TRACE 62718 --- [event] [ntainer#0-0-C-1] o.s.t.i.TransactionInterceptor : Getting transaction for [com.example.event.coupon.service.CouponService.receive]
2025-02-18T00:31:41.333+09:00 TRACE 62718 --- [event] [ntainer#0-0-C-1] o.s.t.i.TransactionInterceptor : Getting transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]
Hibernate: insert into coupon (coupon_type,user_id) values (?,?)
2025-02-18T00:31:41.334+09:00 TRACE 62718 --- [event] [ntainer#0-0-C-1] o.s.t.i.TransactionInterceptor : Completing transaction for [org.springframework.data.jpa.repository.support.SimpleJpaRepository.save]
2025-02-18T00:31:41.335+09:00 TRACE 62718 --- [event] [ntainer#0-0-C-1] o.s.t.i.TransactionInterceptor : Completing transaction for [com.example.event.coupon.service.CouponService.receive]
createUser의 트랜잭션이 끝난 후 이벤트가 발행되었습니다.
https://velog.io/@mdy0102/MQ-비교-Kafka-RabbitMQ-Redis
https://medium.com/@tlsrid1119/spring-kafka를-이용한-producer-consumer-example-cea4002eaad6
https://medium.com/@tlsrid1119/spring-kafka를-이용한-producer-consumer-example-cea4002eaad6
'아키텍처' 카테고리의 다른 글
도메인과 인프라의 분리 (1) | 2024.12.11 |
---|---|
레이어 아키텍처 제대로 적용하기 (2) | 2024.11.24 |