jeudi 17 juin 2021

Spark dataframe random sampling based on frequency occurrence in dataframe

I have a spark job with input dataframe with a column queryId. This queryId is not unique with respect to the dataframe. For example, there are roughly 3M rows in the spark dataframe with 450k distinct query ids.

I am trying to implement sampling logic and create a new column sampledQueryId which contains randomly sampled query id for each dataframe row by looking up query ids from the aggregate spark dataframe query id set. The restriction is that sampled query id shouldn't be equal to input query id.

Also the overall goal is that sampling is done as per the frequency of occurrence of query id in the incoming spark dataframe - ie given two query id q1 and q2, if the ratio of occurrence is 10:1(q1:q2), then q1 should appear approximately 10 times more in the sample id column.

I have tried to implement this through collecting the query ids into a list and lookup query id list with random sampling but have some suspicion based on empirical evidence that the logic doesn't work as expected for eg I see a specific query id getting sampled 200 times but a query id with similar frequency never gets sampled.

Any suggestions on whether this spark code is expected to work as intended?

val random = new scala.util.Random
val queryIds = data.select($"queryId").map(row => row.getAs[Long](0)).collect()
val sampleQueryId = udf((queryId: Long) =>  {
      val sampledId = queryIds(random.nextInt(queryIds.length))
      if (sampledId != queryId) sampledId else null        
})
val dataWithSampledIds = data.withColumn("sampledQueryId",sampleQueryId($"queryId"))



Aucun commentaire:

Enregistrer un commentaire