| ID | date | balance | 
|---|---|---|
| 01 | 31/01/2021 | 100 | 
| 01 | 28/02/2021 | 200 | 
| 01 | 31/05/2021 | 500 | 
| 01 | 30/06/2021 | 600 | 
Expected Output (Since March/April/May months are missing), The balance for missing months are previous months balance
| ID | date | balance | 
|---|---|---|
| 01 | 31/01/2021 | 100 | 
| 01 | 28/02/2021 | 200 | 
| 01 | 31/03/2021 | 200 | 
| 01 | 30/04/2021 | 200 | 
| 01 | 31/05/2021 | 500 | 
| 01 | 30/06/2021 | 600 | 
Java Code:
I read this file into a
PCollection< Row >   row = /*Logic to read csv */
Updated Code
CSV File:
| ID | date | balance | 
|---|---|---|
| 01 | 31/01/2021 | 100 | 
| 01 | 28/02/2021 | 200 | 
| 01 | 31/05/2021 | 500 | 
| 01 | 30/06/2021 | 600 | 
Schema File
{
  "type" : "record",
  "name" : "Entry",
  "namespace" : "transform",
  "fields" : [  {
    "name" : "ID",
    "type" : [ "string", "null" ]
  }, {
    "name" : "date",
    "type" : [ "string", "null" ]
  }, {
    "name" : "balance",
    "type" : [ "double", "null" ]
  } ]
}
public static void main(String[] args) throws IOException {
        
        final File schemaFile = new File("src/main/resources/schema_ie.avsc");
        File csvFile = new File("src/main/resources/CustomerRequestIE.csv");
        Schema schema = new Schema.Parser().parse(schemaFile);
        Pipeline pipeline = Pipeline.create();
        // Reading schema
        org.apache.beam.sdk.schemas.Schema beamSchema = AvroUtils.toBeamSchema(schema);
        final PCollectionTuple tuples = pipeline
                // Reading csv input
                .apply("1", FileIO.match().filepattern(csvFile.getAbsolutePath()))
                // Reading files that matches conditions //PRashanth needs to be looked at
                .apply("2", FileIO.readMatches())
                // Reading schema and validating with schema and converts to row and returns
                // valid and invalid list
                .apply("3", ParDo.of(new FileReader(beamSchema)).withOutputTags(FileReader.validTag(),
                        TupleTagList.of(invalidTag())));
        // Fetching only valid rows
        final PCollection<Row> rows = tuples.get(FileReader.validTag()).setCoder(RowCoder.of(beamSchema));
        ToKV toKV = new ToKV();
        toKV.setColumnName1("ID");
        toKV.setColumnName2("date");
        PCollection<KV<String, Row>> kvRows = rows.apply(ParDo.of(toKV)).setCoder(KvCoder.of(StringUtf8Coder.of(), rows.getCoder()));
        PCollection<KV<String,Iterable<Row>>> groupedKVRows = kvRows.apply(GroupByKey.<String,Row>create());
        
//      groupedKVRows.apply(ParDo.of(new ForwardFillFn()));
        
        
        pipeline.run();
        pipeline.run().waitUntilFinish();
        System.out.println("The end");
    }
    
}
class ToKV extends DoFn<Row, KV<String, Row>> {
    private static final long serialVersionUID = -8093837716944809689L;
    String columnName1 = null;
    String columnName2 = null;
    @ProcessElement
    public void processElement(ProcessContext context) {
        Row row = context.element();
        org.apache.beam.sdk.schemas.Schema schema = row.getSchema();
        context.output(KV.of(row.getValue(columnName1).toString() + row.getValue(columnName2).toString(), row));
    }
    public void setColumnName1(String columnName1) {
        this.columnName1 = columnName1;
    }
    public void setColumnName2(String columnName2) {
        this.columnName2 = columnName2;
    }
}
class ForwardFillFn extends DoFn<KV<String,Iterable<Row>>, KV<String,Iterable<Row>>>{
    @StateId("model")
      private final StateSpec<ValueState<Model>> modelSpec =
          StateSpecs.value(Model.coder());
      @StateId("previousPrediction")
      private final StateSpec<ValueState<Prediction>> previousPredictionSpec =
          StateSpecs.value(Prediction.coder());
      
      @ProcessElement
      public void processElement(
          ProcessContext context,
          @StateId("index") ValueState<Integer> index) {
      }
}
I have written this based on the examples from https://beam.apache.org/blog/stateful-processing/ . I am stuck at writing the implementation for ForwardFillFn class. I am unable to resolve the Model and Prediction class.
Also I am lost in the implementation part we for processElement