Guide to Java Parallel Collectors Library

1. Introduction

Parallel-collectors is a small library that provides a set of Java Stream API collectors that enable parallel processing – while at the same time circumventing main deficiencies of standard Parallel Streams.

2. Maven Dependencies

If we want to start using the library, we need to add a single entry in Maven’s pom.xml file:

<dependency>
    <groupId>com.pivovarit</groupId>
    <artifactId>parallel-collectors</artifactId>
    <version>1.1.0</version>
</dependency>

Or a single line in Gradle’s build file:

compile 'com.pivovarit:parallel-collectors:1.1.0'

The newest version can be found on Maven Central.

3. Parallel Streams Caveats

Parallel Streams were one of Java 8’s highlights, but they turned out to be applicable to heavy CPU processing exclusively.

The reason for this was the fact that Parallel Streams were internally backed by a JVM-wide shared ForkJoinPool, which provided limited parallelism and was used by all Parallel Streams running on a single JVM instance.

For example, imagine we have a list of ids and we want to use them to fetch a list of users and that this operation is expensive.

We could use Parallel Streams for that:

List<Integer> ids = Arrays.asList(1, 2, 3);
List<String> results = ids.parallelStream()
  .map(i -> fetchById(i)) // each operation takes one second
  .collect(Collectors.toList());

System.out.println(results); // [user-1, user-2, user-3]

And indeed, we can see that there’s a noticeable speedup. But it becomes problematic if we start running multiple parallel blocking operations… in parallel. This might quickly saturate the pool and result in potentially huge latencies. That’s why it’s important to build bulkheads by creating separate thread pools – to prevent unrelated tasks from influencing each other’s execution.

In order to provide a custom ForkJoinPool instance, we could leverage the trick described here, but this approach relied on an undocumented hack and was faulty until JDK10. We can read more in the issue itself – [JDK8190974].

4. Parallel Collectors in Action

Parallel Collectors, as the name suggests, are just standard Stream API Collectors that allow performing additional operations in parallel at collect() phase.

ParallelCollectors (which mirrors Collectors class) class is a facade providing access to the whole functionality of the library.

If we wanted to redo the above example, we could simply write:

ExecutorService executor = Executors.newFixedThreadPool(10);

List<Integer> ids = Arrays.asList(1, 2, 3);

CompletableFuture<List<String>> results = ids.stream()
  .collect(ParallelCollectors.parallelToList(i -> fetchById(i), executor, 4));

System.out.println(results.join()); // [user-1, user-2, user-3]

The result is the same, however, we were able to provide our custom thread pool, specify our custom parallelism level, and the result arrived wrapped in a CompletableFuture instance without blocking the current thread. 

Standard Parallel Streams, on the other hand, couldn’t achieve any of those.

4.1. ParallelCollectors.parallelToList/ToSet()

As intuitive as it gets, if we want to process a Stream in parallel and collect results into a List or Set, we can simply use ParallelCollectors.parallelToList or parallelToSet:

List<Integer> ids = Arrays.asList(1, 2, 3);

List<String> results = ids.stream()
  .collect(parallelToList(i -> fetchById(i), executor, 4))
  .join();

4.2. ParallelCollectors.parallelToMap()

If we want to collect Stream elements into a Map instance, just like with Stream API, we need to provide two mappers:

List<Integer> ids = Arrays.asList(1, 2, 3);

Map<Integer, String> results = ids.stream()
  .collect(parallelToMap(i -> i, i -> fetchById(i), executor, 4))
  .join(); // {1=user-1, 2=user-2, 3=user-3}

We can also provide a custom Map instance Supplier:

Map<Integer, String> results = ids.stream()
  .collect(parallelToMap(i -> i, i -> fetchById(i), TreeMap::new, executor, 4))
  .join();

And a custom conflict resolution strategy:

List<Integer> ids = Arrays.asList(1, 2, 3);

Map<Integer, String> results = ids.stream()
  .collect(parallelToMap(i -> i, i -> fetchById(i), TreeMap::new, (s1, s2) -> s1, executor, 4))
  .join();

4.3. ParallelCollectors.parallelToCollection()

Similarly to the above, we can pass our custom Collection Supplier if we want to obtain results packaged in our custom container:

List<String> results = ids.stream()
  .collect(parallelToCollection(i -> fetchById(i), LinkedList::new, executor, 4))
  .join();

4.4. ParallelCollectors.parallelToStream()

If the above isn’t enough, we can actually obtain a Stream instance and continue custom processing there:

Map<Integer, List<String>> results = ids.stream()
  .collect(parallelToStream(i -> fetchById(i), executor, 4))
  .thenApply(stream -> stream.collect(Collectors.groupingBy(i -> i.length())))
  .join();

4.5. ParallelCollectors.parallel()

This one allows us to stream results in completion order:

ids.stream()
  .collect(parallel(i -> fetchByIdWithRandomDelay(i), executor, 4))
  .forEach(System.out::println);

// user-1
// user-3
// user-2

In this case, we can expect the collector to return different results each time since we introduced a random processing delay.

4.6. ParallelCollectors.parallelOrdered()

This facility allows streaming results just like the above, but maintains original order:

ids.stream()
  .collect(parallelOrdered(i -> fetchByIdWithRandomDelay(i), executor, 4))
  .forEach(System.out::println);

// user-1
// user-2
// user-3

In this case, the collector will always maintain the order but might be slower than the above.

5. Limitations

At the point of writing, parallel-collectors don’t work with infinite streams even if short-circuiting operations are used – it’s a design limitation imposed by Stream API internals. Simply put, Streams treat collectors as non-short-circuiting operations so the stream needs to process all upstream elements before getting terminated.

The other limitation is that short-circuiting operations don’t interrupt the remaining tasks after short-circuiting.

6. Conclusion

We saw how the parallel-collectors library allows us to perform parallel processing by using custom Java Stream API Collectors and CompletableFutures to utilize custom thread pools, parallelism, and non-blocking style of CompletableFutures.

As always, code snippets are available over on GitHub.

For further reading, see the parallel-collectors library on GitHub, the author’s blog, and the author’s Twitter account.

Leave a Reply

Your email address will not be published.