I have 3 processes, let's call them host and worker1 and worker2. I want worker1 and worker2 to be able to communicate with each other directly via PUB/SUB sockets (with host interjecting intermittently), so I have the following setup:
# host
socket = ctx.Socket(zmq.PUB)
socket.bind('ipc:///tmp/comms')
# worker1
socket = ctx.Socket(zmq.PUB)
socket.connect('ipc:///tmp/comms')
socket.send(b'worker1')
# worker2
socket = ctx.Socket(zmq.SUB)
socket.connect('ipc:///tmp/comms')
socket.setsockopt(zmq.SUBSCRIBE, b'worker1')
socket.recv()
As of now, this setup doesn't work. worker1 sends fine, but worker2 never seems to receive the message. However, if I now change the setup to this:
# host
socket = ctx.Socket(zmq.PUB)
socket.connect('ipc:///tmp/comms')
# worker1
socket = ctx.Socket(zmq.PUB)
socket.connect('ipc:///tmp/comms')
socket.connect(b'worker1')
# worker2
socket = ctx.Socket(zmq.SUB)
socket.bind('ipc:///tmp/comms')
socket.setsockopt(zmq.SUBSCRIBE, b'worker1')
socket.recv()
It works just fine. However, if I also bind in host, it stops working again.
Why is this? What happens if I now have workerN which also needs to subscribe to worker1, how can I bind from all the processes? What are these bind/connect semantics? Isn't host, being the long-lived process, doing the right thing by binding, and if so, why is worker2 failing to receive when it is connecting?
MWE: https://gist.github.com/ooblahman/f8f9724b9995b9646ebdb79d26afd68a
import zmq
import time
from multiprocessing import Process
addr = 'ipc:///tmp/test_zmq'
def worker1():
    ctx = zmq.Context()
    sock = ctx.socket(zmq.PUB)
    sock.connect(addr)
    while True:
        sock.send(b'worker1')
        print('worker1 sent')
        time.sleep(1)
def worker2():
    ctx = zmq.Context()
    sock = ctx.socket(zmq.SUB)
    sock.connect(addr) # Change this to bind
    sock.setsockopt(zmq.SUBSCRIBE, b'worker1')
    while True:
        sock.recv()
        print('worker2 heard')
def main():
    ctx = zmq.Context()
    sock = ctx.socket(zmq.PUB)
    sock.bind(addr) # Change this to connect (or remove entirely)
    p1 = Process(target=worker1)
    p2 = Process(target=worker2)
    p1.start()
    p2.start()
    p1.join()
if __name__ == '__main__':
    main()