I've a scenario where 10K+ regular expressions are stored in a table along with various other columns and this needs to be joined against an incoming dataset. Initially I was using "spark sql rlike" method as below and it was able to hold the load until incoming record counts were less than 50K
PS: The regular expression reference data is a broadcasted dataset.
dataset.join(regexDataset.value, expr("input_column rlike regular_exp_column")
Then I wrote a custom UDF to transform them using Scala native regex search as below,
- Below val collects the reference data as Array of tuples.
val regexPreCalcArray: Array[(Int, Regex)] = { regexDataset.value .select( "col_1", "regex_column") .collect .map(row => (row.get(0).asInstanceOf[Int],row.get(1).toString.r)) }
Implementation of Regex matching UDF,
def findMatchingPatterns(regexDSArray: Array[(Int,Regex)]): UserDefinedFunction = { udf((input_column: String) => { for { text <- Option(input_column) matches = regexDSArray.filter(regexDSValue => if (regexDSValue._2.findFirstIn(text).isEmpty) false else true) if matches.nonEmpty } yield matches.map(x => x._1).min }, IntegerType) }
Joins are done as below, where a unique ID from reference data will be returned from UDF in case of multiple regex matches and joined against reference data using unique ID to retrieve other columns needed for result,
dataset.withColumn("min_unique_id", findMatchingPatterns(regexPreCalcArray)($"input_column")) .join(regexDataset.value, $"min_unique_id" === $"unique_id" , "left")
But this too gets very slow with skew in execution [1 executor task runs for a very long time] when record count increases above 1M. Spark suggests not to use UDF as it would degrade the performance, any other best practises I should apply here or if there's a better API for Scala regex match than what I've written here? or any suggestions to do this efficiently would be very helpful.