I'm trying to integrate an aiohttp web server into a Crossbar+Autobahn system architecture.
More in detail, when the aiohttp server receive a certain API call, it has to publish a message to a Crossbar router.
I've seen this example on the official repos but i've no clue on how to integrate it on my application.
Ideally, i would like to be able to do this
# class SampleTaskController(object):
async def handle_get_request(self, request: web.Request) -> web.Response:
    self.publisher.publish('com.myapp.topic1', 'Hello World!')
    return web.HTTPOk()
where self il an instance of SampleTaskController(object) which defines all the routes handler of the web server.
def main(argv):
    cfg_path = "./task_cfg.json"
    if len(argv) > 1:
        cfg_path = argv[0]
    logging.basicConfig(level=logging.DEBUG,
                        format=LOG_FORMAT)
    loop = zmq.asyncio.ZMQEventLoop()
    asyncio.set_event_loop(loop)
    app = web.Application(loop=loop)
    with open(cfg_path, 'r') as f:
        task_cfg = json.load(f)
        task_cfg['__cfg_path'] = cfg_path
        controller = SampleTaskController(task_cfg)
        controller.restore()
        app['controller'] = controller
        controller.setup_routes(app)
        app.on_startup.append(controller.on_startup)
        app.on_cleanup.append(controller.on_cleanup)
        web.run_app(app,
                    host=task_cfg['webserver_address'],
                    port=task_cfg['webserver_port'])
Notice that i'm using an zmq.asyncio.ZMQEventLoop because the server is also listening on a zmq socket, which is configured inside the controller.on_startup method.
Instead of using autobahn, i've also tried to publish the message to Crossbar using wampy and it works, but the autobahn subscribers couldn't correctly parse the message.
# autobahn subscriber
class ClientSession(ApplicationSession):
    async def onJoin(self, details):
        self.log.info("Client session joined {details}", details=details)
        self.log.info("Connected:  {details}", details=details)
        self._ident = details.authid
        self._type = u'Python'
        self.log.info("Component ID is  {ident}", ident=self._ident)
        self.log.info("Component type is  {type}", type=self._type)
        # SUBSCRIBE
        def gen_on_something(thing):
            def on_something(counter, id, type):
                print('----------------------------')
                self.log.info("'on_{something}' event, counter value: {message}",something=thing, message=counter)
                self.log.info("from component {id} ({type})", id=id, type=type)
            return on_something
        await self.subscribe(gen_on_something('landscape'), 'landscape')
        await self.subscribe(gen_on_something('nature'), 'nature')
-
# wampy publisher
async def publish():
    router = Crossbar(config_path='./crossbar.json')
    logging.getLogger().debug(router.realm)
    logging.getLogger().debug(router.url)
    logging.getLogger().debug(router.port)
    client = Client(router=router)
    client.start()
    result = client.publish(topic="nature", message=0)
    logging.getLogger().debug(result)
With this configuration the subscriber receive the message published, but it get an exception while parsing it.
TypeError: on_something() got an unexpected keyword argument 'message'