반응형
1. 복잡한 데이터 처리 패턴
1.1 다중 조건 필터링
실무에서는 단순한 필터링이 아닌, 여러 조건을 조합해야 하는 경우가 많습니다. 이러한 경우 Stream API를 효과적으로 활용할 수 있습니다.
public class AdvancedFilterExample {
public class SalesFilter {
private LocalDateTime startDate;
private LocalDateTime endDate;
private Set<String> categories;
private BigDecimal minAmount;
private Set<String> excludedCustomers;
// 생성자, getter, setter 생략
}
public List<Sale> filterSales(List<Sale> sales, SalesFilter filter) {
return sales.stream()
// 날짜 범위 필터
.filter(sale -> isWithinDateRange(sale, filter))
// 카테고리 필터
.filter(sale -> filter.getCategories().isEmpty() ||
filter.getCategories().contains(sale.getCategory()))
// 최소 금액 필터
.filter(sale -> sale.getAmount()
.compareTo(filter.getMinAmount()) >= 0)
// 제외 고객 필터
.filter(sale -> !filter.getExcludedCustomers()
.contains(sale.getCustomerId()))
.collect(Collectors.toList());
}
private boolean isWithinDateRange(Sale sale, SalesFilter filter) {
return !sale.getDateTime().isBefore(filter.getStartDate()) &&
!sale.getDateTime().isAfter(filter.getEndDate());
}
}
위 예제의 주요 포인트:
- 각 필터 조건을 별도의 filter() 메소드로 체이닝
- 복잡한 조건은 별도 메소드로 분리하여 가독성 향상
- 필터 조건을 객체로 캡슐화하여 재사용성 증가
1.2 중첩된 그룹화 처리
복잡한 비즈니스 요구사항을 처리하기 위한 다중 레벨 그룹화 예제입니다.
public class AdvancedGroupingExample {
public class SalesAnalysis {
// 지역별, 카테고리별, 월별 매출 분석
public Map<String, Map<String, Map<YearMonth, SalesStats>>>
analyzeRegionalCategorySales(List<Sale> sales) {
return sales.stream()
.collect(Collectors.groupingBy(
Sale::getRegion, // 1차: 지역별
Collectors.groupingBy(
Sale::getCategory, // 2차: 카테고리별
Collectors.groupingBy(
sale -> YearMonth.from(sale.getDateTime()), // 3차: 월별
Collectors.collectingAndThen(
Collectors.toList(),
this::calculateSalesStats
)
)
)
));
}
private SalesStats calculateSalesStats(List<Sale> sales) {
// 기본 통계 계산
DoubleSummaryStatistics stats = sales.stream()
.mapToDouble(sale -> sale.getAmount().doubleValue())
.summaryStatistics();
// 시간대별 분석
Map<Integer, Long> hourlyDistribution = sales.stream()
.collect(Collectors.groupingBy(
sale -> sale.getDateTime().getHour(),
Collectors.counting()
));
// 결제 수단별 분석
Map<PaymentMethod, BigDecimal> paymentMethodTotal = sales.stream()
.collect(Collectors.groupingBy(
Sale::getPaymentMethod,
Collectors.reducing(
BigDecimal.ZERO,
Sale::getAmount,
BigDecimal::add
)
));
return new SalesStats(
BigDecimal.valueOf(stats.getSum()),
BigDecimal.valueOf(stats.getAverage()),
sales.size(),
hourlyDistribution,
paymentMethodTotal
);
}
}
}
1.3 커스텀 Collector 구현
특별한 집계 요구사항을 처리하기 위한 커스텀 Collector 예제입니다.
public class MovingAverageCollector<T> implements Collector<T,
List<T>, List<Double>> {
private final int windowSize;
private final Function<T, Number> valueExtractor;
public MovingAverageCollector(int windowSize,
Function<T, Number> valueExtractor) {
this.windowSize = windowSize;
this.valueExtractor = valueExtractor;
}
@Override
public Supplier<List<T>> supplier() {
return ArrayList::new;
}
@Override
public BiConsumer<List<T>, T> accumulator() {
return List::add;
}
@Override
public BinaryOperator<List<T>> combiner() {
return (list1, list2) -> {
list1.addAll(list2);
return list1;
};
}
@Override
public Function<List<T>, List<Double>> finisher() {
return list -> {
List<Double> result = new ArrayList<>();
if (list.size() < windowSize) {
return result;
}
for (int i = 0; i <= list.size() - windowSize; i++) {
double average = list.subList(i, i + windowSize).stream()
.mapToDouble(item -> valueExtractor.apply(item).doubleValue())
.average()
.orElse(0.0);
result.add(average);
}
return result;
};
}
@Override
public Set<Characteristics> characteristics() {
return Collections.emptySet();
}
}
// 사용 예제
public class TimeSeriesAnalyzer {
public List<Double> calculateMovingAverage(List<Sale> sales,
int windowSize) {
return sales.stream()
.sorted(Comparator.comparing(Sale::getDateTime))
.collect(new MovingAverageCollector<>(
windowSize,
sale -> sale.getAmount()
));
}
}
2. 성능 최적화 기법
2.1 병렬 스트림 활용
병렬 스트림을 효과적으로 사용하는 방법과 주의사항을 살펴봅니다.
public class ParallelStreamExample {
public class PerformanceAnalyzer {
public Map<String, Object> analyzeLargeDataSet(
List<Transaction> transactions) {
int processorCount = Runtime.getRuntime().availableProcessors();
// 데이터를 청크로 나누어 처리
int chunkSize = transactions.size() / processorCount;
return transactions.parallelStream()
.collect(Collectors.groupingBy(
Transaction::getType,
Collectors.collectingAndThen(
Collectors.toList(),
chunk -> {
Map<String, Object> results = new HashMap<>();
// CPU 집약적 연산 수행
results.put("total", calculateTotal(chunk));
results.put("risk", assessRisk(chunk));
results.put("patterns", findPatterns(chunk));
return results;
}
)
));
}
// CPU 집약적 연산 예시
private BigDecimal calculateTotal(List<Transaction> chunk) {
return chunk.parallelStream()
.map(Transaction::getAmount)
.reduce(BigDecimal.ZERO, BigDecimal::add);
}
}
}
2.2 성능 최적화 전략
2.2.1 메모리 효율성 개선
public class MemoryOptimization {
// 대용량 데이터 처리를 위한 청크 처리
public void processLargeFile(String filename) {
try (Stream<String> lines = Files.lines(Paths.get(filename))) {
lines
.filter(line -> !line.isEmpty())
.map(this::parseLine)
.collect(Collectors.groupingBy(
Record::getType,
Collectors.mapping(
Record::getValue,
Collectors.reducing(BigDecimal.ZERO, BigDecimal::add)
)
));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
// 스트림 조기 종료를 통한 최적화
public Optional<Transaction> findFirstLargeTransaction(
List<Transaction> transactions, BigDecimal threshold) {
return transactions.stream()
.filter(t -> t.getAmount().compareTo(threshold) > 0)
.findFirst(); // 조건을 만족하는 첫 번째 요소를 찾으면 즉시 종료
}
}
2.3 디버깅과 로깅
스트림 연산의 중간 결과를 확인하고 디버깅하는 방법입니다.
public class StreamDebugging {
public List<ProcessedData> processDataWithLogging(
List<RawData> dataList) {
return dataList.stream()
.filter(data -> {
boolean result = isValidData(data);
log.debug("Filtering data {}: {}", data.getId(), result);
return result;
})
.map(data -> {
ProcessedData result = transformData(data);
log.debug("Transformed data {}: {}",
data.getId(), result);
return result;
})
.peek(data -> {
if (log.isTraceEnabled()) {
log.trace("Processing data: {}", data);
}
})
.collect(Collectors.toList());
}
}
3. 실전 응용 사례
3.1 실시간 데이터 처리
실시간으로 들어오는 데이터를 스트림으로 처리하는 예제입니다.
public class RealTimeProcessor {
public class TransactionMonitor {
private Queue<Transaction> recentTransactions =
new ConcurrentLinkedQueue<>();
public void processNewTransaction(Transaction transaction) {
recentTransactions.add(transaction);
// 최근 거래 분석
List<AlertType> alerts = Stream.of(
checkAmount(transaction),
checkFrequency(transaction),
checkPattern(transaction)
)
.flatMap(Optional::stream)
.collect(Collectors.toList());
if (!alerts.isEmpty()) {
sendAlerts(alerts);
}
}
private Optional<AlertType> checkFrequency(
Transaction transaction) {
long recentCount = recentTransactions.stream()
.filter(t -> t.getCustomerId()
.equals(transaction.getCustomerId()))
.filter(t -> t.getDateTime()
.isAfter(LocalDateTime.now().minusHours(1)))
.count();
return recentCount > 10 ?
Optional.of(AlertType.HIGH_FREQUENCY) :
Optional.empty();
}
}
}
이러한 고급 활용 사례들은 Stream API의 강력한 기능을 실제 비즈니스 요구사항에 적용하는 방법을 보여줍니다. 특히 성능과 메모리 효율성을 고려한 구현은 실제 프로덕션 환경에서 매우 중요합니다.
반응형