Redis Pipeline이란?
Redis는 클라이언트-서버 모델로 동작하며, 각 명령마다 요청 → 응답 왕복(RTT)이 발생한다. 100개의 명령을 개별 실행하면 100번의 RTT가 필요하다. Pipeline은 여러 명령을 한 번에 묶어 보내고 응답을 한꺼번에 받아, 네트워크 왕복을 1회로 줄인다. 명령 수가 많을수록 성능 차이가 극적이다.
Spring Data Redis의 RedisTemplate은 Pipeline을 네이티브로 지원한다. 이 글에서는 기본 사용법부터 실전 패턴, 주의사항까지 심화 정리한다.
기본 Pipeline 사용법
@Service
@RequiredArgsConstructor
public class UserCacheService {
private final RedisTemplate<String, String> redisTemplate;
// ❌ 개별 명령: 1000번의 RTT
public void setUsersIndividually(List<User> users) {
for (User user : users) {
redisTemplate.opsForValue().set(
"user:" + user.getId(),
user.getName(),
Duration.ofHours(1)
);
}
}
// ✅ Pipeline: 1번의 RTT
public void setUsersPipelined(List<User> users) {
redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
for (User user : users) {
connection.stringCommands().set(
("user:" + user.getId()).getBytes(),
user.getName().getBytes(),
Expiration.seconds(3600),
SetOption.upsert()
);
}
return null; // 반환값은 무시됨 — 결과는 List로 반환
});
}
}
executePipelined()의 콜백 내부에서 실행한 모든 명령은 버퍼에 쌓였다가 한 번에 전송된다. 콜백의 반환값은 무시되며, 각 명령의 결과는 List<Object>로 반환된다.
Pipeline 결과 처리
// Pipeline 결과를 활용하는 패턴
public Map<String, String> getUsersBatch(List<Long> userIds) {
// Pipeline으로 여러 GET 실행
List<Object> results = redisTemplate.executePipelined(
(RedisCallback<Object>) connection -> {
for (Long id : userIds) {
connection.stringCommands().get(
("user:" + id).getBytes()
);
}
return null;
}
);
// 결과 매핑: 순서가 보장됨
Map<String, String> userMap = new LinkedHashMap<>();
for (int i = 0; i < userIds.size(); i++) {
String value = (String) results.get(i);
if (value != null) {
userMap.put("user:" + userIds.get(i), value);
}
}
return userMap;
}
핵심: Pipeline 결과는 명령 실행 순서와 동일한 순서로 반환된다. 이 순서 보장 덕분에 인덱스 기반으로 요청과 응답을 매핑할 수 있다.
SessionCallback: 고수준 API
RedisCallback은 저수준 바이트 배열을 직접 다뤄야 한다. SessionCallback을 사용하면 RedisTemplate의 고수준 API를 Pipeline 내에서 쓸 수 있다.
public void cacheProductDetails(List<Product> products) {
redisTemplate.executePipelined(new SessionCallback<Object>() {
@Override
public <K, V> Object execute(RedisOperations<K, V> operations) {
RedisOperations<String, String> ops =
(RedisOperations<String, String>) operations;
for (Product product : products) {
String key = "product:" + product.getId();
// Hash로 상품 정보 저장
ops.opsForHash().putAll(key, Map.of(
"name", product.getName(),
"price", String.valueOf(product.getPrice()),
"stock", String.valueOf(product.getStock())
));
// TTL 설정
ops.expire(key, Duration.ofMinutes(30));
// Sorted Set에 인기 상품 점수 추가
ops.opsForZSet().add(
"popular:products",
product.getId().toString(),
product.getViewCount()
);
}
return null;
}
});
}
SessionCallback은 직렬화/역직렬화를 RedisTemplate의 Serializer 설정에 따라 자동 처리한다. 바이트 변환을 직접 할 필요가 없어 코드가 깔끔하다.
실전 패턴 1: 캐시 워밍
@Component
@RequiredArgsConstructor
public class CacheWarmer {
private final RedisTemplate<String, Object> redisTemplate;
private final ObjectMapper objectMapper;
private final ProductRepository productRepository;
// 서비스 시작 시 인기 상품 캐시 워밍
@EventListener(ApplicationReadyEvent.class)
public void warmProductCache() {
List<Product> topProducts = productRepository
.findTop1000ByOrderByViewCountDesc();
// 1000개 상품을 Pipeline으로 한 번에 캐싱
List<List<Product>> batches = partition(topProducts, 200);
for (List<Product> batch : batches) {
redisTemplate.executePipelined((RedisCallback<Object>) conn -> {
for (Product p : batch) {
try {
byte[] key = ("product:" + p.getId()).getBytes();
byte[] value = objectMapper.writeValueAsBytes(p);
conn.stringCommands().set(key, value,
Expiration.seconds(1800), SetOption.upsert());
} catch (Exception e) {
log.warn("캐시 워밍 실패: product:{}", p.getId(), e);
}
}
return null;
});
}
log.info("캐시 워밍 완료: {} products", topProducts.size());
}
private <T> List<List<T>> partition(List<T> list, int size) {
List<List<T>> partitions = new ArrayList<>();
for (int i = 0; i < list.size(); i += size) {
partitions.add(list.subList(i, Math.min(i + size, list.size())));
}
return partitions;
}
}
Pipeline 하나에 너무 많은 명령을 담으면 Redis 서버 메모리 사용이 급증한다. 200~500개 단위로 배치 분할하는 것이 안전하다.
실전 패턴 2: 조회수 일괄 업데이트
@Service
@RequiredArgsConstructor
public class ViewCountService {
private final RedisTemplate<String, String> redisTemplate;
private final ArticleRepository articleRepository;
// 개별 조회수는 Redis INCR로 즉시 반영
public long incrementView(Long articleId) {
return redisTemplate.opsForValue()
.increment("views:" + articleId);
}
// 5분마다 Redis → DB 일괄 동기화 (Pipeline으로 조회)
@Scheduled(fixedRate = 300_000)
public void syncViewCountsToDb() {
Set<String> keys = redisTemplate.keys("views:*");
if (keys == null || keys.isEmpty()) return;
List<String> keyList = new ArrayList<>(keys);
// Pipeline으로 모든 조회수 한 번에 가져오기
List<Object> counts = redisTemplate.executePipelined(
(RedisCallback<Object>) connection -> {
for (String key : keyList) {
connection.stringCommands().get(key.getBytes());
}
return null;
}
);
// DB 일괄 업데이트
List<ArticleViewUpdate> updates = new ArrayList<>();
for (int i = 0; i < keyList.size(); i++) {
String key = keyList.get(i);
Long count = Long.parseLong((String) counts.get(i));
Long articleId = Long.parseLong(key.replace("views:", ""));
updates.add(new ArticleViewUpdate(articleId, count));
}
articleRepository.batchUpdateViewCounts(updates);
// 동기화 후 Redis 카운터 초기화 (Pipeline)
redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
for (String key : keyList) {
connection.keyCommands().del(key.getBytes());
}
return null;
});
log.info("조회수 동기화: {} articles", updates.size());
}
}
이 패턴은 Write-Behind 캐시 전략의 일종이다. Redis에서 빠르게 카운팅하고, 주기적으로 DB에 반영한다. 캐시 전략에 대한 자세한 비교는 Redis 캐시 전략: Cache-Aside 글을 참고하자.
실전 패턴 3: 랭킹 보드 갱신
@Service
@RequiredArgsConstructor
public class LeaderboardService {
private final RedisTemplate<String, String> redisTemplate;
// 여러 유저 점수를 한 번에 갱신
public void updateScores(Map<String, Double> userScores) {
redisTemplate.executePipelined(new SessionCallback<Object>() {
@Override
public Object execute(RedisOperations operations) {
for (var entry : userScores.entrySet()) {
operations.opsForZSet().add(
"leaderboard:daily",
entry.getKey(),
entry.getValue()
);
}
// 상위 100명만 유지
operations.opsForZSet().removeRange(
"leaderboard:daily", 0, -101
);
return null;
}
});
}
// 다중 랭킹 보드 동시 조회
public Map<String, Set<ZSetOperations.TypedTuple<String>>> getMultipleBoards() {
List<Object> results = redisTemplate.executePipelined(
new SessionCallback<Object>() {
@Override
public Object execute(RedisOperations operations) {
operations.opsForZSet().reverseRangeWithScores(
"leaderboard:daily", 0, 9);
operations.opsForZSet().reverseRangeWithScores(
"leaderboard:weekly", 0, 9);
operations.opsForZSet().reverseRangeWithScores(
"leaderboard:monthly", 0, 9);
return null;
}
}
);
return Map.of(
"daily", (Set<ZSetOperations.TypedTuple<String>>) results.get(0),
"weekly", (Set<ZSetOperations.TypedTuple<String>>) results.get(1),
"monthly", (Set<ZSetOperations.TypedTuple<String>>) results.get(2)
);
}
}
Pipeline vs Transaction vs Lua Script
| 기능 | Pipeline | MULTI/EXEC | Lua Script |
|---|---|---|---|
| 원자성 | ❌ 없음 | ✅ 보장 | ✅ 보장 |
| 네트워크 RTT | 1회 | 1회 | 1회 |
| 중간 결과 참조 | ❌ 불가 | ❌ 불가 | ✅ 가능 |
| 롤백 | ❌ | ❌ (DISCARD만) | ❌ |
| 사용 시점 | 대량 읽기/쓰기 | 원자적 다중 명령 | 조건부 로직 필요 시 |
// Pipeline + Transaction 조합
List<Object> results = redisTemplate.executePipelined(
new SessionCallback<Object>() {
@Override
public Object execute(RedisOperations operations) {
operations.multi(); // 트랜잭션 시작
operations.opsForValue().set("key1", "val1");
operations.opsForValue().set("key2", "val2");
operations.opsForValue().increment("counter");
operations.exec(); // 트랜잭션 커밋
return null;
}
}
);
Pipeline은 원자성을 보장하지 않는다. 중간에 다른 클라이언트의 명령이 끼어들 수 있다. 원자성이 필요하면 MULTI/EXEC 트랜잭션을 Pipeline 안에 넣거나, Lua Script를 사용한다.
성능 벤치마크
| 작업 | 개별 실행 | Pipeline | 향상 |
|---|---|---|---|
| SET 100개 | ~50ms | ~3ms | 16x |
| SET 1,000개 | ~500ms | ~8ms | 62x |
| GET 1,000개 | ~480ms | ~6ms | 80x |
| ZADD 1,000개 | ~520ms | ~10ms | 52x |
네트워크 RTT가 클수록(클라우드 환경, 리전 간 통신) Pipeline의 성능 향상 폭이 커진다. 같은 호스트의 Unix Socket이면 차이가 줄어들지만, 여전히 시스템 콜 횟수 절감 효과가 있다.
주의사항과 운영 팁
// 1. 배치 크기 제한: 한 Pipeline에 10,000개 이상 넣지 않기
// → Redis 서버 출력 버퍼가 비대해져 OOM 위험
private static final int PIPELINE_BATCH_SIZE = 500;
public void bulkSet(Map<String, String> data) {
List<Map.Entry<String, String>> entries = new ArrayList<>(data.entrySet());
for (int i = 0; i < entries.size(); i += PIPELINE_BATCH_SIZE) {
List<Map.Entry<String, String>> batch = entries.subList(
i, Math.min(i + PIPELINE_BATCH_SIZE, entries.size())
);
redisTemplate.executePipelined((RedisCallback<Object>) conn -> {
for (var entry : batch) {
conn.stringCommands().set(
entry.getKey().getBytes(),
entry.getValue().getBytes()
);
}
return null;
});
}
}
// 2. Pipeline 내부에서 결과 참조 불가
// ❌ 잘못된 사용: Pipeline 안에서 이전 결과를 읽으려 함
redisTemplate.executePipelined((RedisCallback<Object>) conn -> {
conn.stringCommands().set("key".getBytes(), "val".getBytes());
byte[] result = conn.stringCommands().get("key".getBytes()); // null!
return null;
});
// Pipeline은 응답을 버퍼링하므로 콜백 내에서 결과를 읽을 수 없다
// 3. Redis Cluster에서의 Pipeline
// Redis Cluster는 키별로 슬롯이 다르면 Pipeline이 분할됨
// Spring Data Redis가 자동으로 슬롯별 그룹핑 처리
// → 같은 슬롯의 키는 Hash Tag 사용: {user}:1001, {user}:1002
Redis Cluster 환경에서의 슬롯 관리는 Redis Cluster 샤딩·페일오버 운영 글에서 자세히 다루고 있다.
마무리
Redis Pipeline은 가장 간단하면서 효과가 큰 최적화 기법이다. 네트워크 RTT를 N회에서 1회로 줄여, 대량 연산 시 수십 배의 성능 향상을 얻는다. Spring Data Redis의 executePipelined()로 쉽게 적용할 수 있으며, 캐시 워밍·조회수 동기화·랭킹 보드 갱신 같은 배치 작업에 필수다. 단, 원자성이 필요하면 Transaction이나 Lua Script와 조합하고, 배치 크기는 500개 이내로 제한하는 것이 안전한 운영의 핵심이다.