1. 概述

Stream API 是 Java 8 引入的一套函数式编程风格的数据处理框架,用于对集合、数组等数据源执行声明式批量操作(过滤、映射、排序、聚合等)。

核心特点

  • 声明式:指定“做什么”,而非“怎么做”

  • 链式调用:多个操作通过 . 连接成流水线

  • 惰性求值:中间操作只记录操作,遇到终端操作才真正执行

  • 并行能力.parallelStream() 一行代码实现多核处理

  • 不可变数据:流不存储数据,不改变原数据源


2. Stream vs 传统循环对比

场景

传统 for 循环

Stream API

过滤 + 转换 + 收集

5~6 行代码

1 行链式调用

并行处理

手动管理线程极易出错

.parallelStream() 自动完成

代码可读性

命令式(逐步操作)

声明式(表达意图)

调试

易打断点,观察到中间变量

借助 peek() 或拆开调试

性能(小数据量)

略快(无额外开销)

差异可忽略

性能(大数据量+并行)

难发挥多核优势

自动并行,性能提升明显

建议:默认使用 Stream,除非有明确的性能测量证明循环更快,或逻辑本身不适合函数式风格(如 breakreturn、受检异常)。


3. Stream 组成要素

text

数据源 (集合/数组/文件/生成器)
   ↓
0个或多个 中间操作 (filter, map, sorted, ...)
   ↓
终端操作 (collect, forEach, reduce, ...)
   ↓
结果 / 副作用

4. 创建 Stream

java

// 1. 从集合创建
List<String> list = Arrays.asList("a", "b");
Stream<String> stream = list.stream();          // 串行流
Stream<String> parallelStream = list.parallelStream(); // 并行流

// 2. 从数组创建
String[] array = {"a", "b"};
Stream<String> arrayStream = Arrays.stream(array);
Stream<String> streamOf = Stream.of("a", "b");

// 3. 无限流
Stream<Integer> iterate = Stream.iterate(0, n -> n + 2);      // 0,2,4,...
Stream<Double> generate = Stream.generate(Math::random);      // 随机数

// 4. 文件行
try (Stream<String> lines = Files.lines(Paths.get("file.txt"))) {
    // 操作
}

// 5. 拼接流
Stream<String> concat = Stream.concat(stream1, stream2);

5. 中间操作(Intermediate Operations)

特点:返回一个新的 Stream,惰性执行,可链式调用。

5.1 筛选与切片

方法

说明

示例

filter(Predicate<T>)

保留满足条件的元素

.filter(s -> s.length() > 2)

distinct()

去重(基于 equals()

.distinct()

limit(long maxSize)

截取前 n 个元素

.limit(10)

skip(long n)

跳过前 n 个元素

.skip(5)

takeWhile(Predicate) (Java 9+)

从开头取满足条件的元素,遇到不满足即停止

.takeWhile(s -> s.startsWith("A"))

dropWhile(Predicate) (Java 9+)

丢弃开头满足条件的元素

.dropWhile(s -> s.length() < 3)

5.2 映射

方法

说明

示例

map(Function<T,R>)

元素一对一转换

.map(String::toUpperCase)

flatMap(Function<T, Stream<R>>)

元素一对多扁平化

.flatMap(line -> Arrays.stream(line.split(" ")))

mapToInt/ToLong/ToDouble

转换为基本类型流

.mapToInt(String::length)

boxed()

将基本类型流装箱为包装类型

IntStream.range(1,5).boxed()

5.3 排序

方法

说明

示例

sorted()

自然排序(元素需实现 Comparable

.sorted()

sorted(Comparator<T>)

自定义排序

.sorted(Comparator.comparingInt(String::length))

5.4 调试

方法

说明

示例

peek(Consumer<T>)

查看流中元素,不影响流(常用于日志)

.peek(System.out::println)


6. 终端操作(Terminal Operations)

特点:触发流水线执行,产生结果或副作用,流使用后关闭不可再重复使用。

6.1 聚合操作

方法

返回值

说明

示例

count()

long

元素个数

.count()

min(Comparator<T>)

Optional<T>

最小值

.min(String::compareTo)

max(Comparator<T>)

Optional<T>

最大值

.max(String::compareTo)

sum()

数值

仅限 IntStream

IntStream.of(1,2,3).sum()

average()

OptionalDouble

平均值

.average()

6.2 收集

方法

说明

示例

collect(Collector<T,A,R>)

将元素收集到容器

.collect(Collectors.toList())

toList() (Java 16+)

简化的收集为 List

.toList() (不可变)

toArray()

收集为数组

.toArray(String[]::new)

6.3 条件检查

方法

返回值

说明

示例

anyMatch(Predicate)

boolean

任一元素满足

.anyMatch(s -> s.isEmpty())

allMatch(Predicate)

boolean

全部元素满足

.allMatch(s -> s.length() > 0)

noneMatch(Predicate)

boolean

没有元素满足

.noneMatch(s -> s == null)

findFirst()

Optional<T>

返回第一个元素

.findFirst()

findAny()

Optional<T>

返回任意一个元素(并行流友好)

.findAny()

6.4 归约

方法

说明

示例

reduce(T identity, BinaryOperator<T>)

累加计算

.reduce(0, Integer::sum)

reduce(BinaryOperator<T>)

返回 Optional

.reduce((a,b) -> a + "," + b)

6.5 遍历

方法

说明

示例

forEach(Consumer)

遍历执行操作(不保证顺序,并行流中无序)

.forEach(System.out::println)

forEachOrdered(Consumer)

按流顺序遍历(并行流中保证有序)

.forEachOrdered(System.out::println)


7. 收集器 Collectors 详解

Collectors 类提供了丰富的静态工厂方法,用于实现常见的收集操作。

7.1 转换成集合

java

// 转为 List
List<String> list = stream.collect(Collectors.toList());

// 转为 Set
Set<String> set = stream.collect(Collectors.toSet());

// 转为特定集合类型(如 ArrayList)
List<String> arrayList = stream.collect(Collectors.toCollection(ArrayList::new));

// 转为不可变集合(Java 10+)
List<String> unmodifiable = stream.collect(Collectors.toUnmodifiableList());

7.2 字符串连接

java

String joined = stream.collect(Collectors.joining(", ", "[", "]"));
// 例如: [a, b, c]

7.3 分组与分区

java

// 按属性分组
Map<Integer, List<Person>> byAge = people.stream()
        .collect(Collectors.groupingBy(Person::getAge));

// 分组后统计每组的数量
Map<String, Long> countByName = people.stream()
        .collect(Collectors.groupingBy(Person::getName, Collectors.counting()));

// 分区(true/false 两组)
Map<Boolean, List<Integer>> evenOdd = numbers.stream()
        .collect(Collectors.partitioningBy(n -> n % 2 == 0));

7.4 聚合收集

java

// 求和
int sum = people.stream().collect(Collectors.summingInt(Person::getAge));

// 平均值
Double avg = people.stream().collect(Collectors.averagingInt(Person::getAge));

// 汇总统计(count, sum, min, max, avg)
IntSummaryStatistics stats = people.stream()
        .collect(Collectors.summarizingInt(Person::getAge));

7.5 映射先收集

java

// 先将属性映射再收集
List<String> names = people.stream()
        .collect(Collectors.mapping(Person::getName, Collectors.toList()));
// 等价于 .map(Person::getName).collect(...)

8. 基本类型流(IntStream, LongStream, DoubleStream)

避免装箱 / 拆箱开销,并提供专用方法。

8.1 创建

java

IntStream ints = IntStream.of(1, 2, 3);
IntStream range = IntStream.range(1, 10);      // [1,10)
IntStream rangeClosed = IntStream.rangeClosed(1, 10); // [1,10]

8.2 常用方法

java

int sum = intStream.sum();
OptionalDouble avg = intStream.average();
IntSummaryStatistics stats = intStream.summaryStatistics();
int[] array = intStream.toArray();

8.3 与对象流互相转换

java

// 对象流 → 基本类型流
IntStream lengthStream = strings.mapToInt(String::length);

// 基本类型流 → 对象流(装箱)
Stream<Integer> boxed = intStream.boxed();

9. 并行流(Parallel Stream)

9.1 开启并行

java

// 方式1:从集合直接创建
list.parallelStream()

// 方式2:现有串行流转并行
stream.parallel()

// 转为串行
parallelStream.sequential()

9.2 注意事项

要点

说明

线程安全

确保在并行流中使用的 CollectorFunctionPredicate 是线程安全的。不要修改外部共享可变状态。

性能提升条件

数据量大(通常 > 1万元素)、每个元素处理耗时、多核 CPU。

有序性

forEach 不保证顺序,forEachOrdered 保证顺序但降低并行性能。

公共 ForkJoinPool

默认使用 ForkJoinPool.commonPool(),可通过系统参数调整大小。

自定义池

避免嵌套并行流,建议在自定义 ForkJoinPool 中提交任务。

java

// 在自定义线程池中执行并行流
ForkJoinPool customPool = new ForkJoinPool(4);
customPool.submit(() -> 
    list.parallelStream().forEach(...)
).get();

10. 高级操作与技巧

10.1 reduce 自定义归约

java

// 字符串拼接
Optional<String> combined = words.stream().reduce((a,b) -> a + "," + b);

// 带初始值的归约
String result = words.stream().reduce("", (a,b) -> a + b);

10.2 flatMap 展平嵌套结构

java

// List<List<Integer>> -> List<Integer>
List<Integer> flat = listOfLists.stream()
        .flatMap(List::stream)
        .collect(Collectors.toList());

// 处理文件行单词
List<String> words = Files.lines(Paths.get("file.txt"))
        .flatMap(line -> Arrays.stream(line.split("\\W+")))
        .collect(Collectors.toList());

10.3 collect 自定义收集器

java

// 简单示例:将流收集到 TreeSet
TreeSet<String> treeSet = stream.collect(
        TreeSet::new,          // 提供者
        TreeSet::add,          // 累加器
        TreeSet::addAll        // 组合器(并行流使用)
);

10.4 调试技巧:peek + 日志

java

List<String> result = names.stream()
        .filter(n -> n.length() > 2)
        .peek(n -> System.out.println("After filter: " + n))
        .map(String::toUpperCase)
        .peek(n -> System.out.println("After map: " + n))
        .collect(Collectors.toList());

10.5 短路操作优化

limit(), anyMatch(), findFirst() 等短路操作可以提前终止流处理,提高性能。


11. 性能考量

场景

建议

小数据量(< 1000)

传统循环或串行流差别不大,优先可读性。

大数据量 + 多核 CPU + 复杂操作(多次 mapping, sorting)

使用并行流并充分测试。

频繁的 distinct / sorted 后的 limit

考虑 limit 前置减少处理量。

装箱操作(int → Integer)

使用 IntStream 避免装箱。

无限流

必须使用 limit() 截断,否则内存爆炸。


12. 常见陷阱与注意点

  1. 流只能消费一次
    多次调用终端操作会抛出 IllegalStateException

  2. 惰性求值警告
    没有终端操作的流水线不会执行任何中间操作。

  3. 并行流中避免非线程安全的容器
    例如 ArrayList 在并行 collect 时可能异常,但 Collectors.toList() 是线程安全的。

  4. forEachpeek 副作用
    应优先使用 collect 等无副作用操作,若需副作用确保在非并行流中使用,或控制好并发。

  5. sorteddistinct 有状态操作开销大
    它们需要存储所有元素(在内存中排序/去重),对于超大数据流可能内存爆炸。

  6. null 威胁
    流中一般不允许元素为 null(某些操作如 max 可能抛出 NullPointerException)。可使用 filter(Objects::nonNull) 提前过滤。

  7. 不要在 filter 中修改数据源
    可能会导致不确定结果。


13. 最佳实践总结

  • 优先使用方法引用System.out::println 优于 x -> System.out.println(x)

  • 坚持声明式风格:让代码表达意图

  • 按逻辑顺序排列操作filtersortedmapcollect

  • 小心无限流:务必搭配 limit()

  • 利用 Collectors 简化收集逻辑,避免手动 reduce

  • 并行流谨慎使用,非瓶颈点不要过度优化

  • 适当使用 peek 调试,但在生产代码中移除或转为日志


14. 完整示例:综合运用

java

public class StreamDemo {
    public static void main(String[] args) {
        List<Person> people = Arrays.asList(
            new Person("Alice", 30),
            new Person("Bob", 20),
            new Person("Charlie", 25),
            new Person("Alice", 35)
        );

        // 需求:找出年龄 >= 25 的人,按姓名分组,每组取出年龄最大的名字,结果排序后收集
        Map<String, Integer> result = people.stream()
                .filter(p -> p.getAge() >= 25)
                .collect(Collectors.groupingBy(
                    Person::getName,
                    Collectors.collectingAndThen(
                        Collectors.maxBy(Comparator.comparingInt(Person::getAge)),
                        opt -> opt.get().getAge()
                    )
                ));
        // 结果: {Alice=35, Charlie=25}
        System.out.println(result);
    }
}

15. 学习资源

Stream API 是现代 Java 开发的核心技能之一,掌握它能让你的代码更简洁、更健壮、更易于并行化。本文档可作为日常开发的速查手册。