I have a Spark DataFrame that contains multiple columns with free text. Separately, I have a dictionary of regular expressions where each regex maps to a key.
For instance:
df = spark.sparkContext.parallelize([Row(**{'primary_loc': 'USA', 'description': 'PyCon happens annually in the United States, with satellite events in India, Brazil and Tokyo'}),
                                     Row(**{'primary_loc': 'Canada', 'description': 'The annual hockey championship has some events occurring in the US'})]).toDF()
keywords = {'united states': re.compile(r'\b(usa|us|united states|texas|washington|new york)\b', re.I),
            'india': re.compile(r'\b(india|bangalore|mumbai|delhi)\b', re.I),
            'canada': re.compile(r'\b(canada|winnipeg|toronto|ontario|vancouver)\b', re.I),
            'japan': re.compile(r'\b(japan|tokyo|kyoto)\b', re.I}
I want to be able to extract countries from the dataframe, such that I extract all countries from a set of columns (primary_loc and description in this case). So in this case, I'd get an output somewhat like
primary_loc   | description | country
--------------------------------------------
USA           | PyCon...    | united states
USA           | PyCon...    | india
USA           | PyCon...    | brazil
USA           | PyCon...    | japan
Canada        | The ann...  | canada
Canada        | The ann...  | united states
To get an idea of the scale of the problem, I have around 12-15k regexes and a dataframe with around 90 million rows.
I've tried using a Python UDF that looks somewhat like:
def get_countries(row):
  rd = row.asDict()
  rows_out = []
  
  for p, k in keywords.items():
    if k.search(rd['PRIMARY_LOC']) or k.search(rd['DESCRIPTION']):
      rows_out.append(Row(**{'product': p, **rd}))
  return rows_out
newDF = df.rdd.flatMap(lambda row: get_countries(row)).toDF()
but this is excruciatingly slow, even when operating on a subset of 10k or so rows.
If it matters, I'm using PySpark via DataBricks on Azure.
 
     
    