I am implementing an ExecutorService on my own to learn about the Java internals. While the ExecutorService works when called upon in a single thread, I am not sure as to how implement it such that multiple threads can submit tasks to the same executor service concurrently. How do I implement this functionality into my existing code ?
The code I have written is as follows.
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.*;
public class MyExecutorService implements ExecutorService {
    private final MyWorkerThread[] workerThreads;
    private final LinkedList<MyTask> taskQueue;
    private final LinkedList<Object> completed;
    private boolean terminated;
    private boolean hasBeenShutDown;
    private final Object shutDownLock;
    public MyExecutorService(int numThreads) {
        workerThreads = new MyWorkerThread[numThreads];
        completed = new LinkedList<>();
        taskQueue = new LinkedList<>();
        shutDownLock = new Object();
        for (int i = 0; i < numThreads; i++) {
            workerThreads[i] = new MyWorkerThread();
            workerThreads[i].start();
        }
        terminated = false;
        hasBeenShutDown = false;
    }
    @Override
    public void shutdown() {
        hasBeenShutDown = true;
        for (int i = 0; i < workerThreads.length; i++) {
            submitToStop();
        }
        for (MyWorkerThread worker : workerThreads) {
            try {
                worker.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        synchronized (shutDownLock) {
            shutDownLock.notifyAll();
        }
    }
    @Override
    public List<Runnable> shutdownNow() {
        hasBeenShutDown = true;
        for (MyWorkerThread workerThread : workerThreads) {
            workerThread.interrupt();
        }
        LinkedList<Runnable> pendingTasks = new LinkedList<>();
        for (MyTask task : taskQueue) {
            pendingTasks.add(task.getFutureTask());
        }
        return pendingTasks;
    }
    @Override
    public boolean isTerminated() {
        return terminated;
    }
    @Override
    public boolean isShutdown() {
        return hasBeenShutDown;
    }
    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit) {
        System.out.println("Awaiting Termination");
        synchronized (shutDownLock) {
            try {
                shutDownLock.wait(unit.convert(timeout, unit));
                terminated = true;
            } catch (InterruptedException e) {
                // ignore
            }
        }
        return hasBeenShutDown;
    }
    @Override
    public <T> Future<T> submit(Callable<T> task) {
        FutureTask<T> futureTask = new FutureTask<>(task);
        synchronized (taskQueue) {
            taskQueue.add(new MyTask(false, futureTask));
            taskQueue.notifyAll();
        }
        return futureTask;
    }
    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        FutureTask<T> futureTask = new FutureTask<>(task, result);
        synchronized (taskQueue) {
            taskQueue.add(new MyTask(false, futureTask));
            taskQueue.notifyAll();
        }
        return futureTask;
    }
    @Override
    public Future<?> submit(Runnable task) {
        FutureTask<?> futureTask = new FutureTask<>(task, "RESULT");
        synchronized (taskQueue) {
            taskQueue.add(new MyTask(false, futureTask));
            taskQueue.notifyAll();
        }
        return futureTask;
    }
    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
        List<Future<T>> futures = new ArrayList<>();
        for (Callable<T> task : tasks) {
            futures.add(submit(task));
        }
        return futures;
    }
    @Override
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) {
        FutureTask<List<Future<T>>> futureWorkerTask = new FutureTask<>(() -> invokeAll(tasks));
        try {
            return futureWorkerTask.get(timeout, unit);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            return new ArrayList<>();
        }
    }
    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException {
        invokeAll(tasks);
        synchronized (completed) {
            while (completed.isEmpty()) {
//                System.out.println("Waiting");
                completed.wait();
            }
        }
        return (T) completed.removeFirst();
    }
    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) {
        FutureTask<T> futureWorkerTask = new FutureTask<>(() -> invokeAny(tasks));
        try {
            return (T) futureWorkerTask.get(timeout, unit);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            return null;
        }
    }
    @Override
    public void execute(Runnable task) {
        FutureTask<String> futureTask = new FutureTask<>(task, "RESULT");
        synchronized (taskQueue) {
            taskQueue.add(new MyTask(false, futureTask));
            taskQueue.notifyAll();
        }
    }
    private void submitToStop() {
        synchronized (taskQueue) {
//            System.out.println("added to queue");
            taskQueue.add(new MyTask(true, null));
            taskQueue.notify();
        }
    }
    private class MyWorkerThread extends Thread {
        public void run() {
            while (true) {
                MyTask myTask;
                try {
                    synchronized (taskQueue) {
                        while (taskQueue.isEmpty())
                            taskQueue.wait();
                        myTask = taskQueue.removeFirst();
                    }
                    if (myTask.toStop)
                        break;
                    FutureTask futureTask = myTask.getFutureTask();
                    futureTask.run();
//                    System.out.println("Running");
                    synchronized (completed) {
                        completed.add(futureTask.get());
                        completed.notifyAll();
                    }
                } catch (Exception e) {
                    break;
                }
            }
        }
    }
    private static class MyTask {
        private FutureTask task;
        private final boolean toStop;
        MyTask(boolean toStop, FutureTask task) {
            this.task = task;
            this.toStop = toStop;
        }
        public FutureTask getFutureTask() {
            return task;
        }
    }
}
 
    