I'm encountering an issue where I have two methods, the first calling the second in a loop and the second creating a Future as follows:
public class WorkersCoordinator {
    private static Logger LOGGER = 
        LoggerFactory.getLogger(WorkersCoordinator.class);
    private final Timeout timeout;
    private final ActorSystem system;
    private final List<Class<? extends BaseWorker>> workers;
    private final Map<Class, ActorRef> refMap;
    private final WorkResultPackageQueue pkgQueue;
    private final ActorFactory actorFactory;
    @Autowired
    public WorkersCoordinator(final ApplicationConfiguration config,
                             final ActorSystem system,
                             final ActorFactory actorFactory, 
                             final WorkerFactory workerFactory,
                             final WorkResultPackageQueue pkgQueue) {
        timeout = new Timeout(config.getTimeoutInMilliseconds(), 
                              TimeUnit.MILLISECONDS);
        this.system = system;
        this.actorFactory = actorFactory;
        this.pkgQueue = pkgQueue;
        refMap = Map.newHashMap();
        workers = Lists.newArrayList(workerFactory.getAllAvailableWorkers());
    }
    public void delegateWorkers() {
        for (Class<? extends BaseWorker> worker : workers) {
            if (refMap.containsKey(worker) continue;
            sendWork(worker);
        }
    }
    private void sendWork(Class<? extends BaseWorker> worker) {
        // GetDataActor extends AbstractActor
        ActorRef actorRef = actorFactory.create(GetDataActor.class);
        Future<Object> responseRef = Patterns.ask(actorRef, worker, timeout);
        responseRef.onFailure(new OnFailure() {
            @Override
            public void onFailure(Throwable failure) throws Throwable {
                LOGGER.error("Worker {} encountered a problem - cancelling.", 
                             worker.getSimpleName());
                if (refMap.containsKey(worker)) {
                    refMap.remove(worker);
                }
            }
        }, system.dispatcher());
        responseRef.onSuccess(new OnSuccess<Object>() {
            @Override
            public void onSuccess(Object msg) throws Throwable {
                if (msg instanceof WorkResultPackage) {
                    final WorkResultPackage reportPackage = (WorkResultPackage) msg;
                    LOGGER.info(
                        "Received AssetDataPackage from {}, containing {} items",
                        reportPackage.getWorkReport().getFromWorker().getSimpleName(),
                        reportPackage.getPkg().getData().size());
                    pkgQueue.enqueue(reportPackage);
                } else {
                    LOGGER.eror(
                        "Expected to receive WorkResultPackage Object but received: {}",
                        msg.getClass());
                        throw new UnknownObjectReceived(msg);
                }
            }
        }, system.dispatcher());
        refMap.put(worker, actorRef);
    }
}
The issue is that the closure I believe responseRef.onFailure makes doesn't act as I would expect. If I call this with 3 workers, one of which I configure to fail, a failure is handled but the logging is indeterminate as to which worker will be reported as having failed even though it should be consistently the one I marked to fail. I'm new to this tech stack (Java, Scala Futures, and AKKA) and the established code base in which I find this established pattern so I don't know if I'm overlooking something or misunderstanding closures in Java/Scala Futures. The sticking point here is that I need to report which worker failed and remove it from the refMap so that it will no longer be considered in process. Even stranger is the fact that all workers appear to complete and to be removed from refMap even while the wrong worker is reported as failed.
Update: After having no luck with getting an answer why the closure wasn't working, I did some investigation and found another post answering whether Java 8 even supports closures:
Short answer, I believe it does. However, it spoke of final or effectively final variables. Thus, I updated my code as follows. Hopefully, this gets folks who understand closures to help me understand why they don't work as I'm used to (in C# and JavaScript). I'm only posting updates to sendWork(...) to highlight efforts I tried to no avail.
private void sendWork(Class<? extends BaseWorker> worker) {
    // GetDataActor extends AbstractActor
    ActorRef actorRef = actorFactory.create(GetDataActor.class);
    Future<Object> responseRef = Patterns.ask(actorRef, worker, timeout);
    Consumer<Throwable> onFailureClosure = (ex) -> {
            LOGGER.error("Worker {} encountered a problem - cancelling.", 
                         worker.getSimpleName());
            if (refMap.containsKey(worker)) {
                refMap.remove(worker);
            }
    }
    responseRef.onFailure(new OnFailure() {
        @Override
        public void onFailure(Throwable failure) throws Throwable {
            onFailureClosure.accept(failure);
        }
    }, system.dispatcher());
    responseRef.onSuccess(new OnSuccess<Object>() {
        @Override
        public void onSuccess(Object msg) throws Throwable {
            if (msg instanceof WorkResultPackage) {
                final WorkResultPackage reportPackage = (WorkResultPackage) msg;
                LOGGER.info(
                    "Received AssetDataPackage from {}, containing {} items",
                    reportPackage.getWorkReport().getFromWorker().getSimpleName(),
                    reportPackage.getPkg().getData().size());
                pkgQueue.enqueue(reportPackage);
            } else {
                LOGGER.eror(
                    "Expected to receive WorkResultPackage Object but received: {}",
                    msg.getClass());
                    throw new UnknownObjectReceived(msg);
            }
        }
    }, system.dispatcher());
    refMap.put(worker, actorRef);
}
 
    