언어/JAVA

[Java] Java Stream API 완벽 가이드 - Part 3: 고급 활용과 성능 최적화

shaprimanDev 2025. 1. 18. 14:17
반응형

1. 복잡한 데이터 처리 패턴

1.1 다중 조건 필터링

실무에서는 단순한 필터링이 아닌, 여러 조건을 조합해야 하는 경우가 많습니다. 이러한 경우 Stream API를 효과적으로 활용할 수 있습니다.

public class AdvancedFilterExample {
    public class SalesFilter {
        private LocalDateTime startDate;
        private LocalDateTime endDate;
        private Set<String> categories;
        private BigDecimal minAmount;
        private Set<String> excludedCustomers;
        // 생성자, getter, setter 생략
    }

    public List<Sale> filterSales(List<Sale> sales, SalesFilter filter) {
        return sales.stream()
            // 날짜 범위 필터
            .filter(sale -> isWithinDateRange(sale, filter))
            // 카테고리 필터
            .filter(sale -> filter.getCategories().isEmpty() || 
                filter.getCategories().contains(sale.getCategory()))
            // 최소 금액 필터
            .filter(sale -> sale.getAmount()
                .compareTo(filter.getMinAmount()) >= 0)
            // 제외 고객 필터
            .filter(sale -> !filter.getExcludedCustomers()
                .contains(sale.getCustomerId()))
            .collect(Collectors.toList());
    }

    private boolean isWithinDateRange(Sale sale, SalesFilter filter) {
        return !sale.getDateTime().isBefore(filter.getStartDate()) &&
               !sale.getDateTime().isAfter(filter.getEndDate());
    }
}

위 예제의 주요 포인트:

  • 각 필터 조건을 별도의 filter() 메소드로 체이닝
  • 복잡한 조건은 별도 메소드로 분리하여 가독성 향상
  • 필터 조건을 객체로 캡슐화하여 재사용성 증가

1.2 중첩된 그룹화 처리

복잡한 비즈니스 요구사항을 처리하기 위한 다중 레벨 그룹화 예제입니다.

public class AdvancedGroupingExample {
    public class SalesAnalysis {
        // 지역별, 카테고리별, 월별 매출 분석
        public Map<String, Map<String, Map<YearMonth, SalesStats>>> 
                analyzeRegionalCategorySales(List<Sale> sales) {
            return sales.stream()
                .collect(Collectors.groupingBy(
                    Sale::getRegion,  // 1차: 지역별
                    Collectors.groupingBy(
                        Sale::getCategory,  // 2차: 카테고리별
                        Collectors.groupingBy(
                            sale -> YearMonth.from(sale.getDateTime()),  // 3차: 월별
                            Collectors.collectingAndThen(
                                Collectors.toList(),
                                this::calculateSalesStats
                            )
                        )
                    )
                ));
        }

        private SalesStats calculateSalesStats(List<Sale> sales) {
            // 기본 통계 계산
            DoubleSummaryStatistics stats = sales.stream()
                .mapToDouble(sale -> sale.getAmount().doubleValue())
                .summaryStatistics();

            // 시간대별 분석
            Map<Integer, Long> hourlyDistribution = sales.stream()
                .collect(Collectors.groupingBy(
                    sale -> sale.getDateTime().getHour(),
                    Collectors.counting()
                ));

            // 결제 수단별 분석
            Map<PaymentMethod, BigDecimal> paymentMethodTotal = sales.stream()
                .collect(Collectors.groupingBy(
                    Sale::getPaymentMethod,
                    Collectors.reducing(
                        BigDecimal.ZERO,
                        Sale::getAmount,
                        BigDecimal::add
                    )
                ));

            return new SalesStats(
                BigDecimal.valueOf(stats.getSum()),
                BigDecimal.valueOf(stats.getAverage()),
                sales.size(),
                hourlyDistribution,
                paymentMethodTotal
            );
        }
    }
}

1.3 커스텀 Collector 구현

특별한 집계 요구사항을 처리하기 위한 커스텀 Collector 예제입니다.

public class MovingAverageCollector<T> implements Collector<T, 
        List<T>, List<Double>> {
    private final int windowSize;
    private final Function<T, Number> valueExtractor;

    public MovingAverageCollector(int windowSize, 
            Function<T, Number> valueExtractor) {
        this.windowSize = windowSize;
        this.valueExtractor = valueExtractor;
    }

    @Override
    public Supplier<List<T>> supplier() {
        return ArrayList::new;
    }

    @Override
    public BiConsumer<List<T>, T> accumulator() {
        return List::add;
    }

    @Override
    public BinaryOperator<List<T>> combiner() {
        return (list1, list2) -> {
            list1.addAll(list2);
            return list1;
        };
    }

    @Override
    public Function<List<T>, List<Double>> finisher() {
        return list -> {
            List<Double> result = new ArrayList<>();
            if (list.size() < windowSize) {
                return result;
            }

            for (int i = 0; i <= list.size() - windowSize; i++) {
                double average = list.subList(i, i + windowSize).stream()
                    .mapToDouble(item -> valueExtractor.apply(item).doubleValue())
                    .average()
                    .orElse(0.0);
                result.add(average);
            }
            return result;
        };
    }

    @Override
    public Set<Characteristics> characteristics() {
        return Collections.emptySet();
    }
}

// 사용 예제
public class TimeSeriesAnalyzer {
    public List<Double> calculateMovingAverage(List<Sale> sales, 
            int windowSize) {
        return sales.stream()
            .sorted(Comparator.comparing(Sale::getDateTime))
            .collect(new MovingAverageCollector<>(
                windowSize,
                sale -> sale.getAmount()
            ));
    }
}

2. 성능 최적화 기법

2.1 병렬 스트림 활용

병렬 스트림을 효과적으로 사용하는 방법과 주의사항을 살펴봅니다.

public class ParallelStreamExample {
    public class PerformanceAnalyzer {
        public Map<String, Object> analyzeLargeDataSet(
                List<Transaction> transactions) {
            int processorCount = Runtime.getRuntime().availableProcessors();

            // 데이터를 청크로 나누어 처리
            int chunkSize = transactions.size() / processorCount;

            return transactions.parallelStream()
                .collect(Collectors.groupingBy(
                    Transaction::getType,
                    Collectors.collectingAndThen(
                        Collectors.toList(),
                        chunk -> {
                            Map<String, Object> results = new HashMap<>();
                            // CPU 집약적 연산 수행
                            results.put("total", calculateTotal(chunk));
                            results.put("risk", assessRisk(chunk));
                            results.put("patterns", findPatterns(chunk));
                            return results;
                        }
                    )
                ));
        }

        // CPU 집약적 연산 예시
        private BigDecimal calculateTotal(List<Transaction> chunk) {
            return chunk.parallelStream()
                .map(Transaction::getAmount)
                .reduce(BigDecimal.ZERO, BigDecimal::add);
        }
    }
}

2.2 성능 최적화 전략

2.2.1 메모리 효율성 개선

public class MemoryOptimization {
    // 대용량 데이터 처리를 위한 청크 처리
    public void processLargeFile(String filename) {
        try (Stream<String> lines = Files.lines(Paths.get(filename))) {
            lines
                .filter(line -> !line.isEmpty())
                .map(this::parseLine)
                .collect(Collectors.groupingBy(
                    Record::getType,
                    Collectors.mapping(
                        Record::getValue,
                        Collectors.reducing(BigDecimal.ZERO, BigDecimal::add)
                    )
                ));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    // 스트림 조기 종료를 통한 최적화
    public Optional<Transaction> findFirstLargeTransaction(
            List<Transaction> transactions, BigDecimal threshold) {
        return transactions.stream()
            .filter(t -> t.getAmount().compareTo(threshold) > 0)
            .findFirst();  // 조건을 만족하는 첫 번째 요소를 찾으면 즉시 종료
    }
}

2.3 디버깅과 로깅

스트림 연산의 중간 결과를 확인하고 디버깅하는 방법입니다.

public class StreamDebugging {
    public List<ProcessedData> processDataWithLogging(
            List<RawData> dataList) {
        return dataList.stream()
            .filter(data -> {
                boolean result = isValidData(data);
                log.debug("Filtering data {}: {}", data.getId(), result);
                return result;
            })
            .map(data -> {
                ProcessedData result = transformData(data);
                log.debug("Transformed data {}: {}", 
                    data.getId(), result);
                return result;
            })
            .peek(data -> {
                if (log.isTraceEnabled()) {
                    log.trace("Processing data: {}", data);
                }
            })
            .collect(Collectors.toList());
    }
}

3. 실전 응용 사례

3.1 실시간 데이터 처리

실시간으로 들어오는 데이터를 스트림으로 처리하는 예제입니다.

public class RealTimeProcessor {
    public class TransactionMonitor {
        private Queue<Transaction> recentTransactions = 
            new ConcurrentLinkedQueue<>();

        public void processNewTransaction(Transaction transaction) {
            recentTransactions.add(transaction);

            // 최근 거래 분석
            List<AlertType> alerts = Stream.of(
                    checkAmount(transaction),
                    checkFrequency(transaction),
                    checkPattern(transaction)
                )
                .flatMap(Optional::stream)
                .collect(Collectors.toList());

            if (!alerts.isEmpty()) {
                sendAlerts(alerts);
            }
        }

        private Optional<AlertType> checkFrequency(
                Transaction transaction) {
            long recentCount = recentTransactions.stream()
                .filter(t -> t.getCustomerId()
                    .equals(transaction.getCustomerId()))
                .filter(t -> t.getDateTime()
                    .isAfter(LocalDateTime.now().minusHours(1)))
                .count();

            return recentCount > 10 ? 
                Optional.of(AlertType.HIGH_FREQUENCY) : 
                Optional.empty();
        }
    }
}

이러한 고급 활용 사례들은 Stream API의 강력한 기능을 실제 비즈니스 요구사항에 적용하는 방법을 보여줍니다. 특히 성능과 메모리 효율성을 고려한 구현은 실제 프로덕션 환경에서 매우 중요합니다.

반응형