I have a scenario where I need to send 1M message to a server using a blocking API. The API does not accept batch request so I have to send 1M message one by one.
Instead of using one thread, I am thinking to use multiple threads to send them.
The caller has to wait for all 1M messages to be sent before proceeding.
My implementation is as follows:
public class MySender {
    private final MyPublisher myPublisher;
    private final ExecutorService threadPool;
    private final Map<String, List<CompletableFuture<Void>>> jobMap = Maps.newConcurrentMap();
    public MySender (final MyPublisher myPublisher,
                     ExecutorService threadPool) {
        this.myPublisher= myPublisher;
        this.threadPool = threadPool;
    }
    public void send(final MyData event) {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> doSubmit(event), threadPool);
        List<CompletableFuture<Void>> futureList = jobMap.computeIfAbsent(event.getID(), entry -> new ArrayList<>());
        futureList.add(future);
    }
    public void notifySendComplete(final String id) {
        if(!jobMap.containsKey(id)) {
            return;
        }
        jobMap.get(id).forEach(CompletableFuture::join);
        jobMap.remove(id);
    }
    private void doSubmit(final MyData event) {
         try {
             ....
             myPublisher.send(event);
             ....
         } catch(Exception e) {
             // log error
         }
    }
}
The client class can simply use this class this way:
myInputList.forEach(input -> {
    MyData event = createData(input);
    mySender.send(event);
})
mySender.notifySendComplete();
I think this implementation will work, but the problem is obvious. It needs to hold 1M CompletableFuture in the map, which are not eligible for garbage collection.
Is it a big problem? If so, are there any better approaches?
Restriction:
- The Thread pool cannot be shut down
 - I can implement it using CountDownLatch but it is not allowed to use in my project.