I am trying to write a unit testcase using mock and pytest-asyncio. I have a normal function launching an asyncio coroutine using asyncio.run. [Using python3.7]
import asyncio
async def sample_async(arg2):
     # do something
     proc = await asyncio.create_subprocess_shell(arg2)
     # some more asyncio calls
     rc = proc.returncode
     return rc
def launcher(arg1, arg2):
    if arg1 == "no_asyncio":
        # do something
        print("No asyncio")
        return 0
    else:
        return_code = asyncio.run(sample_async(arg2))
        # do something
        return return_code
I was able to write unittest for the asyncio function sample_async but not for launcher. Here is what I have tried:
class AsyncMock(MagicMock):
    async def __call__(self, *args, **kwargs):
        return super(AsyncMock, self).__call__(*args, **kwargs)
@patch("asyncio.create_subprocess_shell", new_callable=AsyncMock)
def test_launcher(async_shell):
    arg1 = "async"
    arg2 = "/bin/bash ls"
    sample.launcher(arg1, arg2)
    async_shell.assert_called_once()
When I try to run the pytest, I keep getting RuntimeError: There is no current event loop in thread 'MainThread'. I can't use @pytest.mark.asyncio for this test as the function launcher is not an asyncio coroutine. What am I doing wrong here?
 
    