I have a dataframe where I want to assign id in for each window partition and for each 5 rows. Meaning, the id should increase/change when the partition has a different value or the number of rows in a partition is more than 5.
Input:
id | group |
1 | A |
2 | A |
3 | A |
4 | A |
5 | A |
6 | A |
7 | A |
8 | A |
9 | B |
10 | B |
11 | C |
12 | C |
Expected output:
id | group | group_id
1 | A | 1
2 | A | 1
3 | A | 1
4 | A | 1
5 | A | 1
6 | A | 2
7 | A | 2
8 | A | 2
9 | B | 3
10 | B | 3
11 | C | 4
12 | C | 4
I tried a dense_rank approach, followed by udf. However, I cannot figure out how to carry over the previous rank value if I need to change the rank due to "for each 5 rows" constraint.
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType
window = Window.partitionBy('group').orderBy('id')
df_rank = df.withColumn('group_rank', F.dense_rank().over(window))
group_size = 5
# +1 in group_size to include 5, and +1 after the division to avoid rank 0
map_func = F.udf(lambda x: int(x / (group_size+1)) + 1, IntegerType())
df_rank_map = df_rank.withColumn('group_id', map_func(df_rank.group_rank))
Output:
id | group | group_rank | group_id
1 | A | 1 | 1
2 | A | 2 | 1
3 | A | 3 | 1
4 | A | 4 | 1
5 | A | 5 | 1
6 | A | 6 | 2
7 | A | 7 | 2
8 | A | 8 | 2
9 | B | 1 | 1
10 | B | 2 | 1
11 | C | 1 | 1
12 | C | 2 | 1
From my code, there are 2 problems. The obvious one is that the group_id is no where, what I want. I have yet to figure out a logic to carry over the previous rank to the next group. The second problem is that, due to the udf, this logic is really slow. It would be really nice, if there is a way that also improve the performance.
I have been pulling my hair to figure out a way to do this. However, to no avail. Any idea or hint how to achieve the expected output?
source https://stackoverflow.com/questions/73708756/pyspark-assigning-group-id-based-on-group-member-count
Comments
Post a Comment