Thanks to all the answerers who mentioned Collectors.groupingBy(). This was key to setting up a stream where I could use reduce(). I had erroneously believed I should be able to use reduce on its own to solve the problem, without groupingBy.
Thanks also to the suggestion to create a fluent API. I added IncomingFlatItem.getEmailAddress() and IncomingFlatItem.getReport() to fluently grab the domain objects from IncomingFlatItem - and also a method to convert the whole flat item to a proper domain object with its email and report nested already:
public Client getClient() {
Client client = new Client();
client.setClientCode(clientCode);
client.setClientName(clientName);
client.setEmailAddresses(new ArrayList());
client.getEmailAddresses().add(this.getEmailAddress());
client.setReports(new ArrayList<>());
client.getReports().add(this.getReport());
return client;
}
I also created business ID-based .equals() and .hashCode() methods on Client, EmailAddress and Report as recommended by @SamuelPhilip
Lastly for the domain objects, I created .addReport(Report r) and .addEmail(EmailAddress e) on my Client class, which would add the child object to Client if not already present. I ditched the Set collection type for List because the domain model standard is List and Sets would have meant lots of conversions to Lists.
So with that, the stream code and lambdas look succinct.
There are 3 steps:
- map
IncomingFlatItems to Clients
- group the
Clients into a map by client (relying heavily on Client.equals())
- reduce each group to one
Client
So this is the functional algorithm:
List<Client> unflatten(List<IncomingFlatItem> flatItems) {
return flatItems.parallelStream()
.map(IncomingFlatItem::getClient)
.collect(Collectors.groupingByConcurrent(client -> client))
.entrySet().parallelStream()
.map(kvp -> kvp.getValue()
.stream()
.reduce(new Client(),
(client1, client2) -> {
client1.getReports()
.forEach(client2::addReport);
client1.getEmailAddresses()
.forEach(client2::addEmail);
return client2;
}))
.collect(Collectors.toList());
}
I took a long time due to going off on a tangent before I really understood reduce - I found a solution which passed my tests while using .stream() but totally failed with .parallelStream() hence its usage here. I had to use CopyOnWriteArrayList as well otherwise it would fall over randomly with ConcurrentModificationExceptions