Spring Redis Pipeline 배치 최적화

Redis Pipeline이란?

Redis는 클라이언트-서버 모델로 동작하며, 각 명령마다 요청 → 응답 왕복(RTT)이 발생한다. 100개의 명령을 개별 실행하면 100번의 RTT가 필요하다. Pipeline은 여러 명령을 한 번에 묶어 보내고 응답을 한꺼번에 받아, 네트워크 왕복을 1회로 줄인다. 명령 수가 많을수록 성능 차이가 극적이다.

Spring Data Redis의 RedisTemplate은 Pipeline을 네이티브로 지원한다. 이 글에서는 기본 사용법부터 실전 패턴, 주의사항까지 심화 정리한다.

기본 Pipeline 사용법

@Service
@RequiredArgsConstructor
public class UserCacheService {

    private final RedisTemplate<String, String> redisTemplate;

    // ❌ 개별 명령: 1000번의 RTT
    public void setUsersIndividually(List<User> users) {
        for (User user : users) {
            redisTemplate.opsForValue().set(
                "user:" + user.getId(), 
                user.getName(), 
                Duration.ofHours(1)
            );
        }
    }

    // ✅ Pipeline: 1번의 RTT
    public void setUsersPipelined(List<User> users) {
        redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
            for (User user : users) {
                connection.stringCommands().set(
                    ("user:" + user.getId()).getBytes(),
                    user.getName().getBytes(),
                    Expiration.seconds(3600),
                    SetOption.upsert()
                );
            }
            return null;  // 반환값은 무시됨 — 결과는 List로 반환
        });
    }
}

executePipelined()의 콜백 내부에서 실행한 모든 명령은 버퍼에 쌓였다가 한 번에 전송된다. 콜백의 반환값은 무시되며, 각 명령의 결과는 List<Object>로 반환된다.

Pipeline 결과 처리

// Pipeline 결과를 활용하는 패턴
public Map<String, String> getUsersBatch(List<Long> userIds) {
    // Pipeline으로 여러 GET 실행
    List<Object> results = redisTemplate.executePipelined(
        (RedisCallback<Object>) connection -> {
            for (Long id : userIds) {
                connection.stringCommands().get(
                    ("user:" + id).getBytes()
                );
            }
            return null;
        }
    );

    // 결과 매핑: 순서가 보장됨
    Map<String, String> userMap = new LinkedHashMap<>();
    for (int i = 0; i < userIds.size(); i++) {
        String value = (String) results.get(i);
        if (value != null) {
            userMap.put("user:" + userIds.get(i), value);
        }
    }
    return userMap;
}

핵심: Pipeline 결과는 명령 실행 순서와 동일한 순서로 반환된다. 이 순서 보장 덕분에 인덱스 기반으로 요청과 응답을 매핑할 수 있다.

SessionCallback: 고수준 API

RedisCallback은 저수준 바이트 배열을 직접 다뤄야 한다. SessionCallback을 사용하면 RedisTemplate의 고수준 API를 Pipeline 내에서 쓸 수 있다.

public void cacheProductDetails(List<Product> products) {
    redisTemplate.executePipelined(new SessionCallback<Object>() {
        @Override
        public <K, V> Object execute(RedisOperations<K, V> operations) {
            RedisOperations<String, String> ops = 
                (RedisOperations<String, String>) operations;

            for (Product product : products) {
                String key = "product:" + product.getId();

                // Hash로 상품 정보 저장
                ops.opsForHash().putAll(key, Map.of(
                    "name", product.getName(),
                    "price", String.valueOf(product.getPrice()),
                    "stock", String.valueOf(product.getStock())
                ));

                // TTL 설정
                ops.expire(key, Duration.ofMinutes(30));

                // Sorted Set에 인기 상품 점수 추가
                ops.opsForZSet().add(
                    "popular:products",
                    product.getId().toString(),
                    product.getViewCount()
                );
            }
            return null;
        }
    });
}

SessionCallback은 직렬화/역직렬화를 RedisTemplate의 Serializer 설정에 따라 자동 처리한다. 바이트 변환을 직접 할 필요가 없어 코드가 깔끔하다.

실전 패턴 1: 캐시 워밍

@Component
@RequiredArgsConstructor
public class CacheWarmer {

    private final RedisTemplate<String, Object> redisTemplate;
    private final ObjectMapper objectMapper;
    private final ProductRepository productRepository;

    // 서비스 시작 시 인기 상품 캐시 워밍
    @EventListener(ApplicationReadyEvent.class)
    public void warmProductCache() {
        List<Product> topProducts = productRepository
            .findTop1000ByOrderByViewCountDesc();

        // 1000개 상품을 Pipeline으로 한 번에 캐싱
        List<List<Product>> batches = partition(topProducts, 200);

        for (List<Product> batch : batches) {
            redisTemplate.executePipelined((RedisCallback<Object>) conn -> {
                for (Product p : batch) {
                    try {
                        byte[] key = ("product:" + p.getId()).getBytes();
                        byte[] value = objectMapper.writeValueAsBytes(p);
                        conn.stringCommands().set(key, value, 
                            Expiration.seconds(1800), SetOption.upsert());
                    } catch (Exception e) {
                        log.warn("캐시 워밍 실패: product:{}", p.getId(), e);
                    }
                }
                return null;
            });
        }
        log.info("캐시 워밍 완료: {} products", topProducts.size());
    }

    private <T> List<List<T>> partition(List<T> list, int size) {
        List<List<T>> partitions = new ArrayList<>();
        for (int i = 0; i < list.size(); i += size) {
            partitions.add(list.subList(i, Math.min(i + size, list.size())));
        }
        return partitions;
    }
}

Pipeline 하나에 너무 많은 명령을 담으면 Redis 서버 메모리 사용이 급증한다. 200~500개 단위로 배치 분할하는 것이 안전하다.

실전 패턴 2: 조회수 일괄 업데이트

@Service
@RequiredArgsConstructor
public class ViewCountService {

    private final RedisTemplate<String, String> redisTemplate;
    private final ArticleRepository articleRepository;

    // 개별 조회수는 Redis INCR로 즉시 반영
    public long incrementView(Long articleId) {
        return redisTemplate.opsForValue()
            .increment("views:" + articleId);
    }

    // 5분마다 Redis → DB 일괄 동기화 (Pipeline으로 조회)
    @Scheduled(fixedRate = 300_000)
    public void syncViewCountsToDb() {
        Set<String> keys = redisTemplate.keys("views:*");
        if (keys == null || keys.isEmpty()) return;

        List<String> keyList = new ArrayList<>(keys);

        // Pipeline으로 모든 조회수 한 번에 가져오기
        List<Object> counts = redisTemplate.executePipelined(
            (RedisCallback<Object>) connection -> {
                for (String key : keyList) {
                    connection.stringCommands().get(key.getBytes());
                }
                return null;
            }
        );

        // DB 일괄 업데이트
        List<ArticleViewUpdate> updates = new ArrayList<>();
        for (int i = 0; i < keyList.size(); i++) {
            String key = keyList.get(i);
            Long count = Long.parseLong((String) counts.get(i));
            Long articleId = Long.parseLong(key.replace("views:", ""));
            updates.add(new ArticleViewUpdate(articleId, count));
        }
        articleRepository.batchUpdateViewCounts(updates);

        // 동기화 후 Redis 카운터 초기화 (Pipeline)
        redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
            for (String key : keyList) {
                connection.keyCommands().del(key.getBytes());
            }
            return null;
        });

        log.info("조회수 동기화: {} articles", updates.size());
    }
}

이 패턴은 Write-Behind 캐시 전략의 일종이다. Redis에서 빠르게 카운팅하고, 주기적으로 DB에 반영한다. 캐시 전략에 대한 자세한 비교는 Redis 캐시 전략: Cache-Aside 글을 참고하자.

실전 패턴 3: 랭킹 보드 갱신

@Service
@RequiredArgsConstructor
public class LeaderboardService {

    private final RedisTemplate<String, String> redisTemplate;

    // 여러 유저 점수를 한 번에 갱신
    public void updateScores(Map<String, Double> userScores) {
        redisTemplate.executePipelined(new SessionCallback<Object>() {
            @Override
            public Object execute(RedisOperations operations) {
                for (var entry : userScores.entrySet()) {
                    operations.opsForZSet().add(
                        "leaderboard:daily", 
                        entry.getKey(), 
                        entry.getValue()
                    );
                }
                // 상위 100명만 유지
                operations.opsForZSet().removeRange(
                    "leaderboard:daily", 0, -101
                );
                return null;
            }
        });
    }

    // 다중 랭킹 보드 동시 조회
    public Map<String, Set<ZSetOperations.TypedTuple<String>>> getMultipleBoards() {
        List<Object> results = redisTemplate.executePipelined(
            new SessionCallback<Object>() {
                @Override
                public Object execute(RedisOperations operations) {
                    operations.opsForZSet().reverseRangeWithScores(
                        "leaderboard:daily", 0, 9);
                    operations.opsForZSet().reverseRangeWithScores(
                        "leaderboard:weekly", 0, 9);
                    operations.opsForZSet().reverseRangeWithScores(
                        "leaderboard:monthly", 0, 9);
                    return null;
                }
            }
        );

        return Map.of(
            "daily", (Set<ZSetOperations.TypedTuple<String>>) results.get(0),
            "weekly", (Set<ZSetOperations.TypedTuple<String>>) results.get(1),
            "monthly", (Set<ZSetOperations.TypedTuple<String>>) results.get(2)
        );
    }
}

Pipeline vs Transaction vs Lua Script

기능 Pipeline MULTI/EXEC Lua Script
원자성 ❌ 없음 ✅ 보장 ✅ 보장
네트워크 RTT 1회 1회 1회
중간 결과 참조 ❌ 불가 ❌ 불가 ✅ 가능
롤백 ❌ (DISCARD만)
사용 시점 대량 읽기/쓰기 원자적 다중 명령 조건부 로직 필요 시
// Pipeline + Transaction 조합
List<Object> results = redisTemplate.executePipelined(
    new SessionCallback<Object>() {
        @Override
        public Object execute(RedisOperations operations) {
            operations.multi();  // 트랜잭션 시작
            
            operations.opsForValue().set("key1", "val1");
            operations.opsForValue().set("key2", "val2");
            operations.opsForValue().increment("counter");
            
            operations.exec();   // 트랜잭션 커밋
            return null;
        }
    }
);

Pipeline은 원자성을 보장하지 않는다. 중간에 다른 클라이언트의 명령이 끼어들 수 있다. 원자성이 필요하면 MULTI/EXEC 트랜잭션을 Pipeline 안에 넣거나, Lua Script를 사용한다.

성능 벤치마크

작업 개별 실행 Pipeline 향상
SET 100개 ~50ms ~3ms 16x
SET 1,000개 ~500ms ~8ms 62x
GET 1,000개 ~480ms ~6ms 80x
ZADD 1,000개 ~520ms ~10ms 52x

네트워크 RTT가 클수록(클라우드 환경, 리전 간 통신) Pipeline의 성능 향상 폭이 커진다. 같은 호스트의 Unix Socket이면 차이가 줄어들지만, 여전히 시스템 콜 횟수 절감 효과가 있다.

주의사항과 운영 팁

// 1. 배치 크기 제한: 한 Pipeline에 10,000개 이상 넣지 않기
// → Redis 서버 출력 버퍼가 비대해져 OOM 위험
private static final int PIPELINE_BATCH_SIZE = 500;

public void bulkSet(Map<String, String> data) {
    List<Map.Entry<String, String>> entries = new ArrayList<>(data.entrySet());
    
    for (int i = 0; i < entries.size(); i += PIPELINE_BATCH_SIZE) {
        List<Map.Entry<String, String>> batch = entries.subList(
            i, Math.min(i + PIPELINE_BATCH_SIZE, entries.size())
        );
        
        redisTemplate.executePipelined((RedisCallback<Object>) conn -> {
            for (var entry : batch) {
                conn.stringCommands().set(
                    entry.getKey().getBytes(),
                    entry.getValue().getBytes()
                );
            }
            return null;
        });
    }
}

// 2. Pipeline 내부에서 결과 참조 불가
// ❌ 잘못된 사용: Pipeline 안에서 이전 결과를 읽으려 함
redisTemplate.executePipelined((RedisCallback<Object>) conn -> {
    conn.stringCommands().set("key".getBytes(), "val".getBytes());
    byte[] result = conn.stringCommands().get("key".getBytes()); // null!
    return null;
});
// Pipeline은 응답을 버퍼링하므로 콜백 내에서 결과를 읽을 수 없다

// 3. Redis Cluster에서의 Pipeline
// Redis Cluster는 키별로 슬롯이 다르면 Pipeline이 분할됨
// Spring Data Redis가 자동으로 슬롯별 그룹핑 처리
// → 같은 슬롯의 키는 Hash Tag 사용: {user}:1001, {user}:1002

Redis Cluster 환경에서의 슬롯 관리는 Redis Cluster 샤딩·페일오버 운영 글에서 자세히 다루고 있다.

마무리

Redis Pipeline은 가장 간단하면서 효과가 큰 최적화 기법이다. 네트워크 RTT를 N회에서 1회로 줄여, 대량 연산 시 수십 배의 성능 향상을 얻는다. Spring Data Redis의 executePipelined()로 쉽게 적용할 수 있으며, 캐시 워밍·조회수 동기화·랭킹 보드 갱신 같은 배치 작업에 필수다. 단, 원자성이 필요하면 Transaction이나 Lua Script와 조합하고, 배치 크기는 500개 이내로 제한하는 것이 안전한 운영의 핵심이다.

위로 스크롤
WordPress Appliance - Powered by TurnKey Linux