I have a pyspark data frame as shown below.
+---+-------+--------+
|age|balance|duration|
+---+-------+--------+
|  2|   2143|     261|
| 44|     29|     151|
| 33|      2|      76|
| 50|   1506|      92|
| 33|      1|     198|
| 35|    231|     139|
| 28|    447|     217|
|  2|      2|     380|
| 58|    121|      50|
| 43|    693|      55|
| 41|    270|     222|
| 50|    390|     137|
| 53|      6|     517|
| 58|     71|      71|
| 57|    162|     174|
| 40|    229|     353|
| 45|     13|      98|
| 57|     52|      38|
|  3|      0|     219|
|  4|      0|      54|
+---+-------+--------+
and my expected output should be look like,
+---+-------+--------+-------+-----------+------------+
|age|balance|duration|age_out|balance_out|duration_out|
+---+-------+--------+-------+-----------+------------+
|  2|   2143|     261|      1|          1|           0|
| 44|     29|     151|      0|          0|           0|
| 33|      2|      76|      0|          0|           0|
| 50|   1506|      92|      0|          1|           0|
| 33|      1|     198|      0|          0|           0|
| 35|    231|     139|      0|          0|           0|
| 28|    447|     217|      0|          0|           0|
|  2|      2|     380|      1|          0|           0|
| 58|    121|      50|      0|          0|           0|
| 43|    693|      55|      0|          0|           0|
| 41|    270|     222|      0|          0|           0|
| 50|    390|     137|      0|          0|           0|
| 53|      6|     517|      0|          0|           1|
| 58|     71|      71|      0|          0|           0|
| 57|    162|     174|      0|          0|           0|
| 40|    229|     353|      0|          0|           0|
| 45|     13|      98|      0|          0|           0|
| 57|     52|      38|      0|          0|           0|
|  3|      0|     219|      1|          0|           0|
|  4|      0|      54|      0|          0|           0|
+---+-------+--------+-------+-----------+------------+
Here my objective is to identify the outlier records in the data set by using inter quartile method as I described in the below python code. If we find any outlier records, then we need to flag them as 1 otherwise 0.
I can do the same thing using python by using below code.
import numpy as np
def outliers_iqr(ys):
    quartile_1, quartile_3 = np.percentile(ys, [25, 75])
    iqr = quartile_3 - quartile_1
    lower_bound = quartile_1 - (iqr * 1.5)
    upper_bound = quartile_3 + (iqr * 1.5)
    ser = np.zeros(len(ys))
    pos =np.where((ys > upper_bound) | (ys < lower_bound))[0]
    ser[pos]=1
    return(ser)
But I wanted to do the same thing in pyspark. Can someone help me on the same?
my pyspark code:
def outliers_iqr(ys):
    quartile_1, quartile_3 = np.percentile(ys, [25, 75])
    iqr = quartile_3 - quartile_1
    lower_bound = quartile_1 - (iqr * 1.5)
    upper_bound = quartile_3 + (iqr * 1.5)
    ser = np.zeros(len(ys))
    pos =np.where((ys > upper_bound) | (ys < lower_bound))[0]
    ser[pos]=1
    return(float(ser))
outliers_iqr_udf = udf(outliers_iqr, FloatType())
DF.withColumn('age_out', outliers_iqr_udf(DF.select('age').collect())).show()
 
     
    