I am implementing a python module that takes a tuple with three lists (x,y,val) and subsamples them according to a given ratio. Am I doing it the right way? 
- Do I write to the disk asynchronously?
- Could I have many producers and consumers such that all of them generate and write data to the same output file?
- When I compare this code to a naive implementation of a single thread they perform similary with respect to their runtime.
import bisect
import numpy as np
import gzip
import asyncio
class SignalPDF:
    def __init__(self, inputSignal):
        self.x         = inputSignal[0][:]
        self.y         = inputSignal[1][:]
        self.vals      = inputSignal[2][:]
        self.valCumsum = np.cumsum(self.vals)
        self.totalSum  = np.sum(self.vals)
        self.N         = len(self.vals)
class SignalSampler:
    def __init__(self, inputSignal, ratio=1.0):
        self.signalPDF = SignalPDF(inputSignal)
        self.Q         = asyncio.Queue()
        self.ratio     = float(ratio)
        self.N         = int(self.signalPDF.N/self.ratio)
        self.sampledN  = 0
    async def randRead(self):
        while self.sampledN < self.N:
            i = np.random.randint(self.signalPDF.totalSum, size=1, dtype=np.uint64)[0]
            self.sampledN += 1 
            cell = bisect.bisect(self.signalPDF.valCumsum, i)
            yield (self.signalPDF.x[cell], self.signalPDF.y[cell], int(self.signalPDF.vals[cell]))
    async def readShortFormattedLine(self):
        async for read in self.randRead():
            x = read[0]; y = read[1]; val = read[2]; 
            yield '{0} {1} {2}'.format(x,y,val)
    async def populateQueue(self):
        async for i in self.readShortFormattedLine():
            await self.Q.put(i)
        await self.Q.put(None)
    async def hanldeGzip(self, filePath):
        with gzip.open(filePath, 'wt') as f:
            while True:
                item = await self.Q.get()
                if item is None:
                    break
                f.write('{0}\n'.format(item))
            f.flush()
    async def hanldeFile(self, filePath):
        with open(filePath, 'w+') as f:
            while True:
                item = await self.Q.get()
                if item is None:
                    break
                f.write('{0}\n'.format(item))
            f.flush()
def main(gzip, outputFile):
    x=[]; y=[];val=[]
    for i in range(100):
        for j in range(100):
            x.append(i)
            y.append(j)
            val.append(np.random.randint(0,250))
    loop      = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    mixer = SignalSampler(inputSignal=[x,y,val], ratio=2.0)
    futures = []
    if gzip:
        futures = [mixer.hanldeGzip(outputFile), mixer.populateQueue()]
    else:
        futures = [mixer.hanldeFile(outputFile), mixer.populateQueue()] 
    tasks   = asyncio.wait(futures, loop=loop)
    results = loop.run_until_complete(tasks)
    loop.close()
main(gzip=False, outputFile='/tmp/a.txt')
main(gzip=True, outputFile='/tmp/a.txt.gz')
 
    