High-level approach
In this case, I don't think you actually require the dictionary zip_dict to lazily read in these zipped files with Pandas. Based on this very similar SO question to read in (.gz) compressed *.csv files using Dask (also shown here), and since you have multiple files to be loaded, there are at least two possible approaches available to you
- use
dask.delayed and pandas.read_csv
- here, you can read each file into a
pandas.DataFrame, but rather than actually performing the read into memory, you would delay this operation, thereby creating a list of delayed objects (there are at least two ways to create this list as shown below)
- to create a list with a
for loop, this is something like [delayed(pd.read_csv)(f) for f in file_list]
- if you have 17
.csv.zip files, then this creates a list of 17 delayed objects
- to create a list with
map and functools.partial, this creates a single-element list and looks like list(map(functools.partial(delayed(pd.read_csv), file_list)))
- if you have 17
.csv.zip files, then this creates a list of 1 delayed object
- then you use
dd.from_delayed to convert this list of delayed objects into a pandas.DataFrame
- with the looping approach this is similar to
dd.from_delayed(dfs)
- with the
map() and functools.partial approach, you would use dd.from_delayed(dfs).repartition(file_list)
- since this approach only gives a single-(delayed)element list, the resulting
dask.dataframe will have the effect of vertically concatenating all the files into a single dask.dataframe partition
- in order to separate each of the 17 files into a dedicated partition of the
dask.dataframe, you would need to use .repartition()
- use
dask.dataframe.read_csv(file_list) directly, which actually uses pandas.read_csv and so it accepts many of the keyword arguments from pandas.read_csv
In both of these approaches
- it is a Dask best-practice to specify the
dtypes of the columns (as recommended) that will be read in
- you can do this with a dictionary, which would look like
{"time": int, "cik": int}, since you only need the columns time and cik and you know that each of them is expected to be of int (integer) dtype
- use the
.read_csv() keyword
usecols to specify a list of column names that are needed
compression to indicate that a .zip file is being read in
Python Code
Below is the code to implement each of these approaches with brief comments as required
Imports
from functools import partial
from itertools import repeat
from glob import glob
from collections import OrderedDict
import dask.dataframe as dd
import numpy as np
import pandas as pd
from dask.delayed import delayed
Generate dummy data files
Using this SO answer, generate multiple .csv files
def generate_multiple_csvs(data_dir, file_num=1):
col_names = list("ABCDEFG")+["time", "cik"]
df = pd.DataFrame(np.random.randint(10, size=(10,9)), columns=col_names)
filename = f"data_file_{file_num}.csv.zip"
filepath = data_dir + "/" + filename
df["filepath"] = filename
df.to_csv(filepath, index=False, compression="zip")
return df
# Specify path the directory where `.csv.zip` files should be created
data_dir = "data/processed"
# Specify number of files to create
num_files_wanted = 8
Use
itertools.repeat to create dummy files
_ = list(
map(
generate_multiple_csvs,
repeat(data_dir, num_files_wanted),
list(range(1, num_files_wanted+1)),
)
)
Use
functools.partial to create dummy files
_ = list(
map(
partial(generate_multiple_csvs, data_dir),
list(range(9, 9+num_files_wanted+1)),
)
)
Get list of files by filetype
file_list = glob(data_dir + "/" + "*.zip")
Specify column dtypes for columns in Dask DataFrame (recommended)
my_dtypes = OrderedDict([("time",int), ("cik",int)])
Approach 1 - Using dask.delayed with a for loop
# Lazily reading files into Pandas DataFrames by looping
dfs = [
delayed(pd.read_csv)(f, compression='zip', usecols=['time','cik'])
for f in file_list
]
# Combine into a single Dask DataFrame
ddf_from_delayed_loop = dd.from_delayed(dfs, meta=my_dtypes)
print(type(ddf_from_delayed_loop))
print(ddf_from_delayed_loop)
Output
<class 'dask.dataframe.core.DataFrame'>
Dask DataFrame Structure:
time cik
npartitions=17
int64 int64
... ...
... ... ...
... ...
... ...
Dask Name: from-delayed, 34 tasks
Approach 1 - Using dask.delayed with map
# Lazily reading files into Pandas DataFrames with Python's built-in map()
dfs = list(
map(
partial(
delayed(pd.read_csv),
compression="zip",
usecols=['time', 'cik'],
),
file_list,
)
)
# Combine into a single Dask DataFrame and repartition
ddf_from_delayed_map = dd.from_delayed(dfs, meta=my_dtypes).repartition(
npartitions=len(file_list)
)
print(type(ddf_from_delayed_map))
print(ddf_from_delayed_map)
Output
<class 'dask.dataframe.core.DataFrame'>
Dask DataFrame Structure:
time cik
npartitions=17
int64 int64
... ...
... ... ...
... ...
... ...
Dask Name: from-delayed, 34 tasks
Approach 2 - Directly using dask.dataframe
# Lazily reading files into single Dask DataFrame
ddf_direct = dd.read_csv(
data_dir+"/*.csv.zip",
compression='zip',
dtype=my_dtypes,
blocksize=None,
usecols=['time','cik'],
)
print(type(ddf_direct))
print(ddf_direct)
Output
<class 'dask.dataframe.core.DataFrame'>
Dask DataFrame Structure:
time cik
npartitions=17
int64 int64
... ...
... ... ...
... ...
... ...
Dask Name: read-csv, 17 tasks
Notes
- For all of the above approaches, specifying the number partitions should be done with the following in mind
- Use batching for the
dask.delayed approach with a for loop in order to cut back on overhead from a large number of calls to dask.delayed (see this SO question for batching implementation).