I have a DAG which can insert data of JSON file in BQ table but I have a JSON file and many fields are started with illegal characters. So I have created BQ table with legal column names but I need to map BQ column names with JSON file fields and store it in a JSON file, to upload it into BQ table but it's not working.
My JSON file :-
{"ID":"4238382","Title":"El clon Cap\u00edtulo 3","Description":"Said y Ali llegan a un acuerdo. Leonidas sale con Yvete y Diogo. Edvaldo no quiere hacerse los ex\u00e1menes. Jade se reh\u00fasa a usar velo. Lucas se disculpa con Ali. Albieri dice que Ali fue duro con Jade, Ali lo acusa de querer experimentar con humanos.","Program":"El Clon","Season":"1","Episode":"3","Source":"GLOBO TV INTERNACIONAL","Category":"Drama","Syndicator":"CSv2","[CSv2] external_id":"ELCL100002002","[CSv2] pub_win_US_begin":"1661842800","[CSv2] pub_win_US_end":"1754625600","[CSv2] language":"es","[CSv2] title":"El clon Cap\u00edtulo 3","[CSv2] descriptive_title":"Acuerdo de matrimonio","[CSv2] description":"Said y Ali llegan a un acuerdo. Leonidas sale con Yvete y Diogo. Edvaldo no quiere hacerse los ex\u00e1menes. Jade se reh\u00fasa a usar velo. Lucas se disculpa con Ali. Albieri dice que Ali fue duro con Jade, Ali lo acusa de querer experimentar con humanos.","[CSv2] supplier":"GLOBO TV INTERNACIONAL","[CSv2] categories":"Drama","[CSv2] rating":"TV-14","[CSv2] subratings":"D","[CSv2] program_type":"NOVELA","[CSv2] entity":"","[CSv2] exception_countries":"US ,\tUM ,PR ,\tMX ,\tAR ,\tCL ,\tCO ,\tPE ,\tEC ,\tCR ,\tSV ,\tHN ,\tBO ,\tPA ,\tDO ,\tNI ,\tPY ,\tVE ,\tUY ,\tGT","[CSv2] episode_type":"","TMS ID":null,"external_id":"ELCL100002002","Content Type":"Entertainment","Release Year":"2001","sports_event_ID":""}
My BQ table :-
Created a python plugin file to map the fields of JSON file with BQ columns and store it in BQ :-
import json
import csv
import logging
import os
import bson.json_util as json_util
from google.cloud import storage
from pydantic import BaseModel, Field, validator
class EventsModel(BaseModel):
    ID: int = None
    Title: str = None
    Description: str = None
    Program: str = None
    Season: int = None
    Episode: int = None
    Source: str = None
    Category: str = None
    Syndicator: str = None
    CSv2_external_id: str = Field(alias="[CSv2] external_id", default=None)
    CSv2_pub_win_US_begin: int = Field(alias="[CSv2] pub_win_US_begin", default=None)
    CSv2_pub_win_US_end: int    = Field(alias="[CSv2] pub_win_US_end", default=None)
    CSv2_language: str = Field(alias="[CSv2] language", default=None)
    CSv2_title: str = Field(alias="[CSv2] title", default=None)
    CSv2_descriptive_title: str = Field(alias="[CSv2] descriptive_title", default=None)
    CSv2_description: str = Field(alias="[CSv2] description", default=None)
    CSv2_supplier: str = Field(alias="[CSv2] supplier", default=None)
    CSv2_categories: str = Field(alias="[CSv2] categories", default=None)
    CSv2_rating: str = Field(alias="[CSv2] rating", default=None)
    CSv2_subratings: str = Field(alias="[CSv2] subratings", default=None)
    CSv2_program_type: str = Field(alias="[CSv2] program_type", default=None)
    CSv2_entity: str = Field(alias="[CSv2] entity", default=None)
    CSv2_exception_countries: str = Field(alias="[CSv2] exception_countries", default=None)
    CSv2_episode_type: str = Field(alias="[CSv2] episode_type", default=None)
    TMS_ID: str = Field(alias="TMS ID", default= None)
    external_id: str = None
    Content_Type: str = None
    Release_Year: int = None
    sports_event_ID: str = None
    @validator(
        "TMS_ID",
        pre=True,
        always=True,
    )
    def is_date(cls, v):
        try:
            if type(v) == str:
                v = None if v.lower() =="null" else v
            else:
                raise ValueError
        except ValueError:
            v = "null"
        return v
def map_keys(bucket_name, file_path, list_of_files): #pass the folder as an argument
            
    logging.info(f"bucket_name: {bucket_name}")
    logging.info(f"file_path: {file_path}")
    storage_client = storage.Client()
    
    path = f'''{bucket_name}'''
    
    logging.info(f"list_of_files from the DAG: {list_of_files}")
    blobs  = storage_client.list_blobs(
    bucket_or_name=path
    )
    file = ""
    logging.info(f"blob {blobs}")
    for blob in blobs:
        if not blob.name.endswith("/"):
            file = blob.name
            bucket = storage_client.get_bucket(bucket_name)
        
        #TODO: iterate the files into the path and parse using the new model
        
        logging.info(f"file: {file}")
        
        with open(file, "w") as j_file:
                    j_file.write(json_util.dumps(file))
                    j_file.write("\n")
        # mapper del modelo
        new_model = EventsModel.parse_obj(j_file)
        new_model = new_model.dict()
        with open(new_model, "w") as file_transformed:
            file_transformed.write(json.dumps(new_model))
            file_transformed.write("\n")
        
        blob = bucket.blob(f"test_file")
        blob.upload_from_filename(file_transformed)
Added it in DAG :-
map_json_keys_with_BQ_columns = PythonOperator(
        task_id="map_json_keys_with_BQ_columns",
        retries=0,
        python_callable=map_keys,
        op_kwargs={
            "bucket_name": mcp_bucket,
            "file_path": mcp_source_folder,
            "list_of_files": source_files
        },
        dag=dag,
    )
    mcp_ingestion_to_bq = GCSToBigQueryOperator(
        task_id="mcp_ingestion_to_bq",
        retries=0,
        dag=dag,
        bucket=mcp_bucket,
        source_objects=f"{mcp_source_folder}*.json",
        source_format="NEWLINE_DELIMITED_JSON",
        #skip_leading_rows=16,
        destination_project_dataset_table=destination_bq_table,
        write_disposition="WRITE_TRUNCATE",
        create_disposition="CREATE_NEVER",
        
        autodetect=True
    )



 
    