언어/JAVA

[Java] Java Stream API 완벽 가이드 - Part 5: 심화 학습과 실전 활용

shaprimanDev 2025. 1. 18. 14:21
반응형

1. 커스텀 Stream 구현

1.1 Spliterator 이해하기

Spliterator는 Java 8에서 도입된 인터페이스로, 컬렉션의 요소를 분할하고 순회하는 기능을 제공합니다.

public class CustomSpliterator<T> implements Spliterator<T> {
    private final List<T> list;
    private int current = 0;

    public CustomSpliterator(List<T> list) {
        this.list = list;
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> action) {
        if (current < list.size()) {
            action.accept(list.get(current++));
            return true;
        }
        return false;
    }

    @Override
    public Spliterator<T> trySplit() {
        int currentSize = list.size() - current;
        if (currentSize <= 1) {
            return null;
        }

        int splitPoint = current + currentSize / 2;
        List<T> splitList = list.subList(current, splitPoint);
        current = splitPoint;
        return new CustomSpliterator<>(splitList);
    }

    @Override
    public long estimateSize() {
        return list.size() - current;
    }

    @Override
    public int characteristics() {
        return ORDERED | SIZED | SUBSIZED;
    }
}

// 사용 예제
public class TimeSeriesStream {
    public static Stream<TimeSeriesData> createStream(List<TimeSeriesData> data) {
        return StreamSupport.stream(
            new CustomSpliterator<>(data), true);
    }
}

1.2 커스텀 Collector 구현

특별한 집계 요구사항을 처리하기 위한 커스텀 Collector입니다.

public class WeightedAverageCollector implements 
        Collector<Transaction, WeightedAverageCollector.Accumulator, Double> {

    static class Accumulator {
        private double sum = 0;
        private double weightSum = 0;

        void add(double value, double weight) {
            sum += value * weight;
            weightSum += weight;
        }
    }

    @Override
    public Supplier<Accumulator> supplier() {
        return Accumulator::new;
    }

    @Override
    public BiConsumer<Accumulator, Transaction> accumulator() {
        return (acc, transaction) -> 
            acc.add(transaction.getAmount().doubleValue(), 
                   calculateWeight(transaction));
    }

    @Override
    public BinaryOperator<Accumulator> combiner() {
        return (acc1, acc2) -> {
            Accumulator combined = new Accumulator();
            combined.sum = acc1.sum + acc2.sum;
            combined.weightSum = acc1.weightSum + acc2.weightSum;
            return combined;
        };
    }

    @Override
    public Function<Accumulator, Double> finisher() {
        return acc -> acc.sum / acc.weightSum;
    }

    @Override
    public Set<Characteristics> characteristics() {
        return Collections.emptySet();
    }

    private double calculateWeight(Transaction transaction) {
        // 가중치 계산 로직 구현
        return transaction.getImportance().getValue();
    }
}

2. 고급 스트림 활용 패턴

2.1 동적 필터링 구현

여러 조건을 조합하여 동적으로 필터를 생성하는 패턴입니다.

public class DynamicFilter {
    public class FilterCriteria {
        private LocalDate startDate;
        private LocalDate endDate;
        private BigDecimal minAmount;
        private Set<String> categories;
        // getter/setter 생략
    }

    public List<Transaction> filterTransactions(
            List<Transaction> transactions, 
            FilterCriteria criteria) {

        Predicate<Transaction> datePredicate = criteria.getStartDate() != null ?
            t -> !t.getDate().isBefore(criteria.getStartDate()) : t -> true;

        Predicate<Transaction> amountPredicate = criteria.getMinAmount() != null ?
            t -> t.getAmount().compareTo(criteria.getMinAmount()) >= 0 : t -> true;

        Predicate<Transaction> categoryPredicate = 
            criteria.getCategories() != null && !criteria.getCategories().isEmpty() ?
            t -> criteria.getCategories().contains(t.getCategory()) : t -> true;

        return transactions.stream()
            .filter(datePredicate)
            .filter(amountPredicate)
            .filter(categoryPredicate)
            .collect(Collectors.toList());
    }
}

2.2 재귀적 스트림 처리

트리 구조나 계층 구조의 데이터를 처리하는 패턴입니다.

public class TreeProcessor {
    public class TreeNode {
        private String value;
        private List<TreeNode> children;
        // getter/setter 생략
    }

    public Stream<TreeNode> flattenTree(TreeNode root) {
        return Stream.concat(
            Stream.of(root),
            root.getChildren().stream()
                .flatMap(this::flattenTree)
        );
    }

    // 특정 깊이까지만 처리
    public Stream<TreeNode> flattenTreeWithDepth(TreeNode root, int maxDepth) {
        if (maxDepth == 0) {
            return Stream.of(root);
        }

        return Stream.concat(
            Stream.of(root),
            root.getChildren().stream()
                .flatMap(child -> flattenTreeWithDepth(child, maxDepth - 1))
        );
    }
}

3. 실전 시나리오

3.1 대량 데이터 처리 시스템

대용량 데이터를 효율적으로 처리하는 예제입니다.

public class BigDataProcessor {
    private static final int BATCH_SIZE = 1000;

    public void processBigData(Stream<DataRecord> dataStream) {
        AtomicInteger counter = new AtomicInteger();

        List<DataRecord> batchBuffer = dataStream
            .collect(Collectors.groupingBy(
                record -> counter.getAndIncrement() / BATCH_SIZE
            ))
            .values()
            .stream()
            .parallel()
            .map(this::processBatch)
            .flatMap(Collection::stream)
            .collect(Collectors.toList());
    }

    private List<DataRecord> processBatch(List<DataRecord> batch) {
        return batch.stream()
            .map(this::enrichData)
            .filter(this::validateData)
            .map(this::transformData)
            .collect(Collectors.toList());
    }
}

3.2 실시간 분석 시스템

실시간으로 들어오는 데이터를 분석하는 예제입니다.

public class RealTimeAnalyzer {
    private final Queue<Transaction> recentTransactions = 
        new ConcurrentLinkedQueue<>();
    private final int windowSize = 1000;

    public void processTransaction(Transaction transaction) {
        recentTransactions.add(transaction);

        // 윈도우 크기 유지
        while (recentTransactions.size() > windowSize) {
            recentTransactions.poll();
        }

        // 실시간 통계 계산
        DoubleSummaryStatistics stats = recentTransactions.stream()
            .mapToDouble(t -> t.getAmount().doubleValue())
            .summaryStatistics();

        // 이상 거래 탐지
        double average = stats.getAverage();
        double stdDev = calculateStandardDeviation(
            recentTransactions.stream(), average);

        if (isAnomalous(transaction, average, stdDev)) {
            raiseAlert(transaction);
        }
    }

    private double calculateStandardDeviation(
            Stream<Transaction> stream, double mean) {
        return Math.sqrt(stream
            .mapToDouble(t -> {
                double diff = t.getAmount().doubleValue() - mean;
                return diff * diff;
            })
            .average()
            .orElse(0.0));
    }
}

4. 성능 최적화와 모니터링

4.1 성능 모니터링 시스템 구현

스트림 연산의 성능을 모니터링하는 예제입니다.

public class StreamPerformanceMonitor {
    private static final Logger logger = 
        LoggerFactory.getLogger(StreamPerformanceMonitor.class);

    public <T, R> R measureStreamPerformance(
            Stream<T> stream, 
            Function<Stream<T>, R> streamOperation,
            String operationName) {

        long startTime = System.nanoTime();
        R result = streamOperation.apply(stream);
        long endTime = System.nanoTime();

        long duration = (endTime - startTime) / 1_000_000; // 밀리초 변환

        logger.info("Stream operation '{}' took {} ms", 
            operationName, duration);

        return result;
    }

    // 사용 예제
    public void example() {
        List<Transaction> transactions = getTransactions();

        Map<String, Double> result = measureStreamPerformance(
            transactions.stream(),
            stream -> stream
                .filter(t -> t.getAmount().compareTo(BigDecimal.ZERO) > 0)
                .collect(Collectors.groupingBy(
                    Transaction::getCategory,
                    Collectors.averagingDouble(t -> 
                        t.getAmount().doubleValue())
                )),
            "Category averaging"
        );
    }
}

이것으로 Stream API의 심화 학습을 마무리합니다. 이러한 고급 기능들을 활용하면 더 복잡한 비즈니스 요구사항도 효과적으로 처리할 수 있습니다.

반응형