You'd need to convert your stream to a Spliterator and then adapt this spliterator to a custom one that partially-reduces some elements according to your logic (in your example it would need to count equal elements until a different element appears). Then, you'd need to turn your spliterator back to a new stream.
Bear in mind that this can't be 100% lazy, as you'd need to eagerly consume some elements from the backing stream in order to create a new TokenBag element for the new stream.
Here's the code for the custom spliterator:
public class CountingSpliterator
extends Spliterators.AbstractSpliterator<TokenBag>
implements Consumer<String> {
private final Spliterator<String> source;
private String currentToken;
private String previousToken;
private int tokenCount = 0;
private boolean tokenHasChanged;
public CountingSpliterator(Spliterator<String> source) {
super(source.estimateSize(), source.characteristics());
this.source = source;
}
@Override
public boolean tryAdvance(Consumer<? super TokenBag> action) {
while (source.tryAdvance(this)) {
if (tokenHasChanged) {
action.accept(new TokenBag(previousToken, tokenCount));
tokenCount = 1;
return true;
}
}
if (tokenCount > 0) {
action.accept(new TokenBag(currentToken, tokenCount));
tokenCount = 0;
return true;
}
return false;
}
@Override
public void accept(String newToken) {
if (currentToken != null) {
previousToken = currentToken;
}
currentToken = newToken;
if (previousToken != null && !previousToken.equals(currentToken)) {
tokenHasChanged = true;
} else {
tokenCount++;
tokenHasChanged = false;
}
}
}
So this spliterator extends Spliterators.AbstractSpliterator and also implements Consumer. The code is quite complex, but the idea is that it adapts one or more tokens from the source spliterator into an instance of TokenBag.
For every accepted token from the source spliterator, the count for that token is incremented, until the token changes. At this point, a TokenBag instance is created with the token and the count and is immediately pushed to the Consumer<? super TokenBag> action parameter. Also, the counter is reset to 1. The logic in the accept method handles token changes, border cases, etc.
Here's how you should use this spliterator:
Stream<String> src = Stream.of("a", "a", "a", "b", "b", "a", "a");
Stream<TokenBag> stream = StreamSupport.stream(
new CountingSpliterator(src.spliterator()),
false); // false means sequential, we don't want parallel!
stream.forEach(System.out::println);
If you override toString() in TokenBag, the output is:
TokenBag{token='a', count=3}
TokenBag{token='b', count=2}
TokenBag{token='a', count=2}
A note on parallelism: I don't know how to parallelize this partial-reduce task, I even don't know if it's at all possible. But if it were, I doubt it would produce any measurable improvement.