Spring R2DBC란?
R2DBC(Reactive Relational Database Connectivity)는 관계형 데이터베이스를 논블로킹 리액티브로 접근하는 드라이버 사양입니다. JDBC가 블로킹 I/O 기반인 반면, R2DBC는 Reactive Streams 기반으로 동작하여 Spring WebFlux와 완전한 비동기 파이프라인을 구성합니다.
WebFlux로 HTTP 레이어를 논블로킹으로 만들었지만, DB 접근에 JDBC를 사용하면 결국 블로킹 스레드가 필요합니다. R2DBC는 이 마지막 블로킹 지점을 제거합니다. 이 글에서는 Spring Data R2DBC의 설정, Repository 패턴, 트랜잭션, 복잡한 쿼리, 그리고 JDBC와의 성능 비교까지 심화 분석합니다.
의존성과 설정
// build.gradle.kts
dependencies {
implementation("org.springframework.boot:spring-boot-starter-data-r2dbc")
implementation("org.springframework.boot:spring-boot-starter-webflux")
// PostgreSQL R2DBC 드라이버
runtimeOnly("org.postgresql:r2dbc-postgresql")
// 커넥션 풀
implementation("io.r2dbc:r2dbc-pool")
}
// application.yml
spring:
r2dbc:
url: r2dbc:pool:postgresql://localhost:5432/mydb
username: ${DB_USER}
password: ${DB_PASS}
pool:
initial-size: 5
max-size: 20
max-idle-time: 30m
validation-query: SELECT 1
URL 형식: r2dbc:pool:postgresql://host:port/database — pool: 접두사가 커넥션 풀링을 활성화합니다. 프로덕션에서는 반드시 풀을 사용하세요.
엔티티와 Repository 기본
Spring Data R2DBC는 JPA와 달리 ORM이 아닙니다. 지연 로딩, 연관관계 매핑, DDL 자동 생성이 없습니다. 대신 가볍고 빠릅니다:
// 엔티티 정의
@Table("users")
public record User(
@Id Long id,
String email,
String name,
@Column("is_active") boolean active,
@Column("login_count") int loginCount,
@CreatedDate Instant createdAt,
@LastModifiedDate Instant updatedAt
) {
// 새 엔티티 생성용 (ID 없음)
public static User create(String email, String name) {
return new User(null, email, name, true, 0, null, null);
}
}
// Repository 인터페이스
public interface UserRepository extends ReactiveCrudRepository<User, Long> {
Mono<User> findByEmail(String email);
Flux<User> findByActiveTrue();
Flux<User> findByNameContainingIgnoreCase(String name);
@Query("SELECT * FROM users WHERE login_count > :minLogins ORDER BY login_count DESC LIMIT :limit")
Flux<User> findActiveUsers(@Param("minLogins") int minLogins, @Param("limit") int limit);
@Modifying
@Query("UPDATE users SET login_count = login_count + 1 WHERE id = :id")
Mono<Integer> incrementLoginCount(@Param("id") Long id);
Mono<Long> countByActiveTrue();
}
서비스 레이어: 리액티브 체이닝
@Service
@RequiredArgsConstructor
public class UserService {
private final UserRepository userRepository;
public Mono<User> createUser(CreateUserDto dto) {
return userRepository.findByEmail(dto.email())
.flatMap(existing -> Mono.<User>error(
new DuplicateEmailException(dto.email())))
.switchIfEmpty(Mono.defer(() ->
userRepository.save(User.create(dto.email(), dto.name()))
));
}
public Mono<User> getUser(Long id) {
return userRepository.findById(id)
.switchIfEmpty(Mono.error(new UserNotFoundException(id)));
}
public Flux<UserDto> searchUsers(String keyword, int limit) {
return userRepository.findByNameContainingIgnoreCase(keyword)
.take(limit)
.map(UserDto::from);
}
public Mono<User> login(Long userId) {
return userRepository.incrementLoginCount(userId)
.then(userRepository.findById(userId))
.switchIfEmpty(Mono.error(new UserNotFoundException(userId)));
}
}
컨트롤러: WebFlux 통합
@RestController
@RequestMapping("/api/users")
@RequiredArgsConstructor
public class UserController {
private final UserService userService;
@GetMapping("/{id}")
public Mono<UserDto> getUser(@PathVariable Long id) {
return userService.getUser(id).map(UserDto::from);
}
@PostMapping
@ResponseStatus(HttpStatus.CREATED)
public Mono<UserDto> createUser(@RequestBody @Valid CreateUserDto dto) {
return userService.createUser(dto).map(UserDto::from);
}
// SSE 스트리밍: 실시간 사용자 목록
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<UserDto> streamUsers() {
return userService.searchUsers("", 100)
.delayElements(Duration.ofMillis(100));
}
// 페이지네이션
@GetMapping
public Mono<Page<UserDto>> listUsers(
@RequestParam(defaultValue = "0") int page,
@RequestParam(defaultValue = "20") int size) {
Pageable pageable = PageRequest.of(page, size);
return userService.findAll(pageable)
.map(UserDto::from)
.collectList()
.zipWith(userService.count())
.map(tuple -> new PageImpl<>(tuple.getT1(), pageable, tuple.getT2()));
}
}
R2DBC 트랜잭션
리액티브 트랜잭션은 @Transactional과 TransactionalOperator 두 가지 방식을 지원합니다:
// 방식 1: @Transactional (선언적)
@Service
public class TransferService {
@Transactional
public Mono<TransferResult> transfer(Long fromId, Long toId, BigDecimal amount) {
return accountRepository.findById(fromId)
.flatMap(from -> {
if (from.balance().compareTo(amount) < 0) {
return Mono.error(new InsufficientBalanceException());
}
return accountRepository.debit(fromId, amount);
})
.then(accountRepository.credit(toId, amount))
.then(auditRepository.save(AuditLog.transfer(fromId, toId, amount)))
.map(audit -> new TransferResult(true, audit.id()));
}
}
// 방식 2: TransactionalOperator (프로그래밍 방식)
@Service
public class BatchService {
private final TransactionalOperator txOperator;
public Flux<Item> processBatch(List<ItemDto> items) {
return Flux.fromIterable(items)
.flatMap(dto ->
itemRepository.save(Item.from(dto))
.then(inventoryRepository.decrement(dto.productId(), dto.quantity()))
.thenReturn(dto)
.as(txOperator::transactional) // 아이템별 독립 트랜잭션
, 5); // 동시성 5
}
}
// R2DBC 트랜잭션 설정
@Configuration
@EnableTransactionManagement
public class R2dbcConfig extends AbstractR2dbcConfiguration {
@Bean
public ReactiveTransactionManager transactionManager(ConnectionFactory cf) {
return new R2dbcTransactionManager(cf);
}
@Bean
public TransactionalOperator transactionalOperator(ReactiveTransactionManager tm) {
return TransactionalOperator.create(tm);
}
}
DatabaseClient: 복잡한 쿼리
Repository로 표현하기 어려운 동적 쿼리는 DatabaseClient를 사용합니다:
@Repository
@RequiredArgsConstructor
public class UserCustomRepository {
private final DatabaseClient databaseClient;
public Flux<UserWithStats> findUsersWithOrderStats(UserFilter filter) {
var sql = new StringBuilder("""
SELECT u.id, u.name, u.email,
COUNT(o.id) as order_count,
COALESCE(SUM(o.total_amount), 0) as total_spent
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
WHERE u.is_active = true
""");
var spec = databaseClient.sql("");
Map<String, Object> bindings = new HashMap<>();
if (filter.role() != null) {
sql.append(" AND u.role = :role");
bindings.put("role", filter.role());
}
if (filter.minOrders() != null) {
sql.append(" HAVING COUNT(o.id) >= :minOrders");
bindings.put("minOrders", filter.minOrders());
}
sql.append(" GROUP BY u.id ORDER BY total_spent DESC LIMIT :limit");
bindings.put("limit", filter.limit());
var query = databaseClient.sql(sql.toString());
for (var entry : bindings.entrySet()) {
query = query.bind(entry.getKey(), entry.getValue());
}
return query.map((row, metadata) -> new UserWithStats(
row.get("id", Long.class),
row.get("name", String.class),
row.get("email", String.class),
row.get("order_count", Long.class),
row.get("total_spent", BigDecimal.class)
))
.all();
}
// 배치 INSERT
public Mono<Long> batchInsert(List<User> users) {
return databaseClient.sql("""
INSERT INTO users (email, name, is_active)
VALUES (:email, :name, :active)
""")
.filter(s -> s.returnGeneratedValues("id"))
.bind("email", users.stream().map(User::email).toList())
.bind("name", users.stream().map(User::name).toList())
.bind("active", users.stream().map(User::active).toList())
.fetch()
.rowsUpdated();
}
}
연관관계 처리 패턴
R2DBC는 JPA의 @OneToMany, @ManyToOne을 지원하지 않습니다. 수동으로 조합합니다:
@Service
public class OrderService {
// 주문 + 주문항목 조합 조회
public Mono<OrderDetailDto> getOrderDetail(Long orderId) {
return orderRepository.findById(orderId)
.zipWith(orderItemRepository.findByOrderId(orderId).collectList())
.map(tuple -> OrderDetailDto.of(tuple.getT1(), tuple.getT2()));
}
// 사용자 + 최근 주문 3건 조합
public Mono<UserProfileDto> getUserProfile(Long userId) {
var userMono = userRepository.findById(userId);
var ordersMono = orderRepository.findTop3ByUserIdOrderByCreatedAtDesc(userId)
.collectList();
var statsMono = orderRepository.getOrderStats(userId);
return Mono.zip(userMono, ordersMono, statsMono)
.map(tuple -> UserProfileDto.of(
tuple.getT1(), tuple.getT2(), tuple.getT3()
));
}
}
마이그레이션: Flyway + R2DBC
// build.gradle.kts — Flyway는 JDBC 드라이버 필요
dependencies {
implementation("org.flywaydb:flyway-core")
runtimeOnly("org.postgresql:postgresql") // Flyway용 JDBC
runtimeOnly("org.postgresql:r2dbc-postgresql") // R2DBC
}
// application.yml
spring:
flyway:
url: jdbc:postgresql://localhost:5432/mydb # JDBC URL (Flyway용)
user: ${DB_USER}
password: ${DB_PASS}
locations: classpath:db/migration
r2dbc:
url: r2dbc:pool:postgresql://localhost:5432/mydb # R2DBC URL (앱용)
Flyway는 시작 시 JDBC로 마이그레이션을 실행하고, 이후 앱은 R2DBC로 동작합니다.
관련 글: Spring Boot Resilience4j에서 리액티브 서킷브레이커를, Spring Transaction 전파 전략에서 블로킹 트랜잭션 패턴과 비교해보세요.
마무리
Spring R2DBC는 WebFlux 스택의 완전한 논블로킹 파이프라인을 완성하는 핵심 기술입니다. JPA의 편의 기능(지연 로딩, 연관관계)은 없지만, 가벼운 매핑과 높은 동시성 처리가 장점입니다. 높은 동시 접속이 필요한 실시간 서비스, 스트리밍 API에서 JDBC 대비 확실한 이점을 제공합니다. 다만 팀의 리액티브 프로그래밍 숙련도를 고려하여 도입을 결정하세요.