Java 8: Creating a custom Collector for your Stream

In the previous post we looked at different implementations of the Collector interface.

As you saw, there is a wide range of collectors available out of the box.

However — sometimes you need something special that you simply have to make yourself.

So today we’re going to look at how we can create a custom collector that can be used to accumulate the elements of your stream.

To see how this works, let’s pick up the topic from the previous post.

Say we got a stream of articles — Stream<Article> — and each article got a method — getWordCount() — returning the number of words in the article.

We now want to create a custom Collector that reduces the stream of articles to the total number of words of all the articles combined.

Using Collector.of() to create our custom collector.

To be able to create a custom collector, you have to implement the Collector interface.

Now, instead of implementing the interface the traditional way, we’ll use the static method Collector.of() to create our custom Collector.

Not only is it more concise and readable, but also more convenient since you can omit parts of the implementation that are not required.

The thing is, the Collector interface only requires three parts — A supplier, an accumulator and a combiner.

A supplier creating the result container

To implement a collector, we need to provide a supplier that creates a result container. This is where the accumulated value will be stored.

Let’s look at how the supplier for our custom collector could look.

() -> new int[1]

You may wonder why we use new int[1] and not 0 as the initial result.

The reason for this is that the Collector interface requires a result container that can be updated in a mutable way.

Accumulating the elements

Next, we need to create a function that defines how we’ll add an element to the result container.

In our example this means a function that extracts the word count and adds it to the result.

(result, article) -> result[0] += article.getWordCount();

This function is of type Consumer, which means that it doesn’t return anything. It just updates the result container in a mutable way — in our case updating the first element in the int array.

Combining two partial results

In a sequential reduction the supplier and accumulator above would be sufficient. But to be able to support a parallel implementation we need to provide a combiner.

The combiner is a function that defines how two result containers could be combined.

Why?

Because in a parallel scenario our stream would be split into partitions where each partition would be accumulated in parallel. When finished, the results would be merged using our combiner function.

Let’s look at how it could be solved for our word count example.

(result1, result2) -> {
  result1[0] += result2[0];
  return result1;
}

The minimal collector

Now that we have all the mandatory parts, we can add them together to create our collector.

Collector.of(  
  () -> new int[1],
  (result, article) -> result[0] += article.getWordCount(),
  (result1, result2) -> {
    result1[0] += result2[0];
    return result1;
  }
);

There is one problem with this minimal solution though — it returns the result container, which in our case is of type int[].

What we really want is to return the count, not the result container.

A final transformation

Luckily this can easily be solved by adding a function where you map the result container to whatever you like the collector to return.

In our case we simply get the first element from the int array.

total -> total[0]  

The final collector would then look like this:

Collector.of(  
  () -> new int[1],
  (result, article) -> result[0] += article.getWordCount(),
  (result1, result2) -> {
    result1[0] += result2[0];
    return result1;
  },
  total -> total[0] 
);

If we now assign our collector to a variable, we could easily use it to accumulate our stream of articles.

Int wordCount = articleStream.collect(totalWordCountCollector);  

Optimisations

As a final treat, let’s quickly look at optimisations.

The thing is, you can indicate if your collector supports different kinds of optimisations.

This is done by adding characteristics.

Using Collector.of() you add the Characteristics as varargs at the end of your parameter list.

Collector.of(  
  // supplier,
  // accumulator,
  // combiner,
  // finisher, 
  Collector.Characteristics.CONCURRENT,
  Collector.Characteristics.IDENTITY_FINISH,
  // ...
);

There are 3 Characteristics that you can use:

  • CONCURRENT — Indicating that one result container can be used by multiple concurrent accumulators.
  • IDENTITY_FINISH — Indicates that the finisher function is the identity function and then can be omitted.
  • UNORDERED — Indicates that the collector doesn’t depend on the ordering of the elements.

Further reading

For more information about the Collector interface, make sure to checkout the documentation.

Enjoyed the post?

If you don't want to miss future posts, make sure to subscribe