반응형
1. 커스텀 Stream 구현
1.1 Spliterator 이해하기
Spliterator는 Java 8에서 도입된 인터페이스로, 컬렉션의 요소를 분할하고 순회하는 기능을 제공합니다.
public class CustomSpliterator<T> implements Spliterator<T> {
private final List<T> list;
private int current = 0;
public CustomSpliterator(List<T> list) {
this.list = list;
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
if (current < list.size()) {
action.accept(list.get(current++));
return true;
}
return false;
}
@Override
public Spliterator<T> trySplit() {
int currentSize = list.size() - current;
if (currentSize <= 1) {
return null;
}
int splitPoint = current + currentSize / 2;
List<T> splitList = list.subList(current, splitPoint);
current = splitPoint;
return new CustomSpliterator<>(splitList);
}
@Override
public long estimateSize() {
return list.size() - current;
}
@Override
public int characteristics() {
return ORDERED | SIZED | SUBSIZED;
}
}
// 사용 예제
public class TimeSeriesStream {
public static Stream<TimeSeriesData> createStream(List<TimeSeriesData> data) {
return StreamSupport.stream(
new CustomSpliterator<>(data), true);
}
}
1.2 커스텀 Collector 구현
특별한 집계 요구사항을 처리하기 위한 커스텀 Collector입니다.
public class WeightedAverageCollector implements
Collector<Transaction, WeightedAverageCollector.Accumulator, Double> {
static class Accumulator {
private double sum = 0;
private double weightSum = 0;
void add(double value, double weight) {
sum += value * weight;
weightSum += weight;
}
}
@Override
public Supplier<Accumulator> supplier() {
return Accumulator::new;
}
@Override
public BiConsumer<Accumulator, Transaction> accumulator() {
return (acc, transaction) ->
acc.add(transaction.getAmount().doubleValue(),
calculateWeight(transaction));
}
@Override
public BinaryOperator<Accumulator> combiner() {
return (acc1, acc2) -> {
Accumulator combined = new Accumulator();
combined.sum = acc1.sum + acc2.sum;
combined.weightSum = acc1.weightSum + acc2.weightSum;
return combined;
};
}
@Override
public Function<Accumulator, Double> finisher() {
return acc -> acc.sum / acc.weightSum;
}
@Override
public Set<Characteristics> characteristics() {
return Collections.emptySet();
}
private double calculateWeight(Transaction transaction) {
// 가중치 계산 로직 구현
return transaction.getImportance().getValue();
}
}
2. 고급 스트림 활용 패턴
2.1 동적 필터링 구현
여러 조건을 조합하여 동적으로 필터를 생성하는 패턴입니다.
public class DynamicFilter {
public class FilterCriteria {
private LocalDate startDate;
private LocalDate endDate;
private BigDecimal minAmount;
private Set<String> categories;
// getter/setter 생략
}
public List<Transaction> filterTransactions(
List<Transaction> transactions,
FilterCriteria criteria) {
Predicate<Transaction> datePredicate = criteria.getStartDate() != null ?
t -> !t.getDate().isBefore(criteria.getStartDate()) : t -> true;
Predicate<Transaction> amountPredicate = criteria.getMinAmount() != null ?
t -> t.getAmount().compareTo(criteria.getMinAmount()) >= 0 : t -> true;
Predicate<Transaction> categoryPredicate =
criteria.getCategories() != null && !criteria.getCategories().isEmpty() ?
t -> criteria.getCategories().contains(t.getCategory()) : t -> true;
return transactions.stream()
.filter(datePredicate)
.filter(amountPredicate)
.filter(categoryPredicate)
.collect(Collectors.toList());
}
}
2.2 재귀적 스트림 처리
트리 구조나 계층 구조의 데이터를 처리하는 패턴입니다.
public class TreeProcessor {
public class TreeNode {
private String value;
private List<TreeNode> children;
// getter/setter 생략
}
public Stream<TreeNode> flattenTree(TreeNode root) {
return Stream.concat(
Stream.of(root),
root.getChildren().stream()
.flatMap(this::flattenTree)
);
}
// 특정 깊이까지만 처리
public Stream<TreeNode> flattenTreeWithDepth(TreeNode root, int maxDepth) {
if (maxDepth == 0) {
return Stream.of(root);
}
return Stream.concat(
Stream.of(root),
root.getChildren().stream()
.flatMap(child -> flattenTreeWithDepth(child, maxDepth - 1))
);
}
}
3. 실전 시나리오
3.1 대량 데이터 처리 시스템
대용량 데이터를 효율적으로 처리하는 예제입니다.
public class BigDataProcessor {
private static final int BATCH_SIZE = 1000;
public void processBigData(Stream<DataRecord> dataStream) {
AtomicInteger counter = new AtomicInteger();
List<DataRecord> batchBuffer = dataStream
.collect(Collectors.groupingBy(
record -> counter.getAndIncrement() / BATCH_SIZE
))
.values()
.stream()
.parallel()
.map(this::processBatch)
.flatMap(Collection::stream)
.collect(Collectors.toList());
}
private List<DataRecord> processBatch(List<DataRecord> batch) {
return batch.stream()
.map(this::enrichData)
.filter(this::validateData)
.map(this::transformData)
.collect(Collectors.toList());
}
}
3.2 실시간 분석 시스템
실시간으로 들어오는 데이터를 분석하는 예제입니다.
public class RealTimeAnalyzer {
private final Queue<Transaction> recentTransactions =
new ConcurrentLinkedQueue<>();
private final int windowSize = 1000;
public void processTransaction(Transaction transaction) {
recentTransactions.add(transaction);
// 윈도우 크기 유지
while (recentTransactions.size() > windowSize) {
recentTransactions.poll();
}
// 실시간 통계 계산
DoubleSummaryStatistics stats = recentTransactions.stream()
.mapToDouble(t -> t.getAmount().doubleValue())
.summaryStatistics();
// 이상 거래 탐지
double average = stats.getAverage();
double stdDev = calculateStandardDeviation(
recentTransactions.stream(), average);
if (isAnomalous(transaction, average, stdDev)) {
raiseAlert(transaction);
}
}
private double calculateStandardDeviation(
Stream<Transaction> stream, double mean) {
return Math.sqrt(stream
.mapToDouble(t -> {
double diff = t.getAmount().doubleValue() - mean;
return diff * diff;
})
.average()
.orElse(0.0));
}
}
4. 성능 최적화와 모니터링
4.1 성능 모니터링 시스템 구현
스트림 연산의 성능을 모니터링하는 예제입니다.
public class StreamPerformanceMonitor {
private static final Logger logger =
LoggerFactory.getLogger(StreamPerformanceMonitor.class);
public <T, R> R measureStreamPerformance(
Stream<T> stream,
Function<Stream<T>, R> streamOperation,
String operationName) {
long startTime = System.nanoTime();
R result = streamOperation.apply(stream);
long endTime = System.nanoTime();
long duration = (endTime - startTime) / 1_000_000; // 밀리초 변환
logger.info("Stream operation '{}' took {} ms",
operationName, duration);
return result;
}
// 사용 예제
public void example() {
List<Transaction> transactions = getTransactions();
Map<String, Double> result = measureStreamPerformance(
transactions.stream(),
stream -> stream
.filter(t -> t.getAmount().compareTo(BigDecimal.ZERO) > 0)
.collect(Collectors.groupingBy(
Transaction::getCategory,
Collectors.averagingDouble(t ->
t.getAmount().doubleValue())
)),
"Category averaging"
);
}
}
이것으로 Stream API의 심화 학습을 마무리합니다. 이러한 고급 기능들을 활용하면 더 복잡한 비즈니스 요구사항도 효과적으로 처리할 수 있습니다.
반응형