Simplest way to achieve , below code reads dimension data folder for every batch but do keep in mind new dimension data values (country names in my case) have to be a new file.
package com.databroccoli.streaming.dimensionupateinstreaming
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, ForeachWriter, Row, SparkSession}
import org.apache.spark.sql.functions.{broadcast, expr}
import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
object RefreshDimensionInStreaming {
  def main(args: Array[String]) = {
    @transient lazy val logger: Logger = Logger.getLogger(getClass.getName)
    Logger.getLogger("akka").setLevel(Level.WARN)
    Logger.getLogger("org").setLevel(Level.ERROR)
    Logger.getLogger("com.amazonaws").setLevel(Level.ERROR)
    Logger.getLogger("com.amazon.ws").setLevel(Level.ERROR)
    Logger.getLogger("io.netty").setLevel(Level.ERROR)
    val spark = SparkSession
      .builder()
      .master("local")
      .getOrCreate()
    val schemaUntyped1 = StructType(
      Array(
        StructField("id", StringType),
        StructField("customrid", StringType),
        StructField("customername", StringType),
        StructField("countrycode", StringType),
        StructField("timestamp_column_fin_1", TimestampType)
      ))
    val schemaUntyped2 = StructType(
      Array(
        StructField("id", StringType),
        StructField("countrycode", StringType),
        StructField("countryname", StringType),
        StructField("timestamp_column_fin_2", TimestampType)
      ))
    val factDf1 = spark.readStream
      .schema(schemaUntyped1)
      .option("header", "true")
      .csv("src/main/resources/broadcasttest/fact")
    var countryDf: Option[DataFrame] = None: Option[DataFrame]
    def updateDimensionDf() = {
      val dimDf2 = spark.read
        .schema(schemaUntyped2)
        .option("header", "true")
        .csv("src/main/resources/broadcasttest/dimension")
      if (countryDf != None) {
        countryDf.get.unpersist()
      }
      countryDf = Some(
        dimDf2
          .withColumnRenamed("id", "id_2")
          .withColumnRenamed("countrycode", "countrycode_2"))
      countryDf.get.show()
    }
    factDf1.writeStream
      .outputMode("append")
      .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
        batchDF.show(10)
        updateDimensionDf()
        batchDF
          .join(
            countryDf.get,
            expr(
              """
      countrycode_2 = countrycode 
      """
            ),
            "leftOuter"
          )
          .show
      }
      .start()
      .awaitTermination()
  }
}