Here's my understanding of your requirement based on your question description and comment:
Loop through the collect-ed RDD row-wise, and whenever nd in the
current row is less than or equal to the ndLimit, extract ts from
the previous row and reset ndLimit to value of nd from that same
row.
If that's correct, I would suggest using foldLeft to assemble the list of timestamps, as shown below:
import org.apache.spark.sql.Row
val s_n181n = Seq(
(1, "a1", 101L, "b1", 1.0), // nd 1.0 is the initial limit
(2, "a2", 102L, "b2", 1.6),
(3, "a3", 103L, "b3", 1.2),
(4, "a4", 104L, "b4", 0.8), // 0.8 <= 1.0, hence ts 103 is saved and nd 1.2 is the new limit
(5, "a5", 105L, "b5", 1.5),
(6, "a6", 106L, "b6", 1.3),
(7, "a7", 107L, "b7", 1.1), // 1.1 <= 1.2, hence ts 106 is saved and nd 1.3 is the new limit
(8, "a8", 108L, "b8", 1.2) // 1.2 <= 1.3, hence ts 107 is saved and nd 1.1 is the new limit
).toDF("c1", "c2", "ts", "c4", "nd")
val s_rows = s_n181n.rdd.collect
val s_list = s_rows.map(r => (r.getAs[Long](2), r.getAs[Double](4))).toList
// List[(Long, Double)] = List(
// (101,1.0), (102,1.6), (103,1.2), (104,0.8), (105,1.5), (106,1.3), (107,1.1), (108,1.2)
// )
val ndLimit = s_list.head._2 // 1.0
s_list.tail.foldLeft( (s_list.head._1, s_list.head._2, ndLimit, List.empty[Long]) ){
(acc, x) =>
if (x._2 <= acc._3)
(x._1, x._2, acc._2, acc._1 :: acc._4)
else
(x._1, x._2, acc._3, acc._4)
}._4.reverse
// res1: List[Long] = List(103, 106, 107)
Note that a tuple of ( previous ts, previous nd, current ndLimit, list of timestamps ) is used as the accumulator to carry over items from the previous row for the necessary comparison logic in the current row.