Microsoft is giving away 50,000 FREE Microsoft Certification exam vouchers. Get Fabric certified for FREE! Learn more
We created a notebook to do some revenue predictions for locations using MLflow and pyspark. (Yes, later we might use pandas.)
The code is something like below, and forgive me if the code is not completely correct.
In the code you see that for each location we do 14 iterations to use the predicted revenue do finetune the predictions. This process works to our likings.
When we run this process using a foreach loop everything works fine.
What we want to do is use the ThreadPoolExecutor to do parallel processing of the predictions for locations and create an experiment per location to save the process. The problem that we run into is that we see predictions sometimes being saved to experiments of other locations and even runs being nested in runs of other locations. Does anyone know how to prevent this from happening?
import mlflow from datetime import datetime from pyspark.sql import DataFrame from pyspark.ml.pipeline import PipelineModel from concurrent.futures import ThreadPoolExecutor class LocationPrediction: def __init__(self, location_name, pipeline_model): self.location_name = location_name self.pipeline_model = pipeline_model self.df_with_predictions: DataFrame = None self.iteration = 0 self.get_data_from_lakehouse() def get_data_from_lakehouse(self): self.initial_data = spark.read.format("delta").table("table_name").filter(f"location = '{self.location_name}'") def predict(self): # Start a child iteration run with mlflow.start_run(run_name=f"Iteration_{self.iteration}", nested=True): predictions = self.pipeline_model.transform(self.data) mlflow.log_metric("row_count", predictions.count()) # ... # Do some stuff do dataframe result # ... self.df_with_predictions = predictions def write_to_lakehouse(self): self.df_with_predictions.write.format("delta").mode("append").saveAsTable("table_name") # Use new predictions to predict again def do_iteration(self): for i in range(14): self.predict() self.iteration += 1 self.write_to_lakehouse() def get_pipeline_model(location_name) -> PipelineModel: model_uri = f"models:/{location_name}/latest" model = mlflow.spark.load_model(model_uri) return model def run_prediction_task(location_name): # Create or set Fabric experiment and start main run mlflow.set_experiment(location_name) run_timestamp = datetime.now().strftime("%Y%m%d%H%M%S") mlflow.start_run(run_name=f"Prediction_{run_timestamp}") pipeline_model = get_pipeline_model(location_name) pipeline = LocationPrediction(location_name, pipeline_model) pipeline.do_iteration() mlflow.end_run() if __name__ == "__main__": locations = ["location_1", "location_2", "location_3","location_4","location_5","location_6"] with ThreadPoolExecutor(max_workers=3) as executor: futures = [executor.submit(run_prediction_task, location) for location in locations]
Solved! Go to Solution.
Thanks for reaching out to MS Fabric community support.
Your code is mostly on track, but there are a few key changes needed to ensure the MLflow experiments and runs are correctly isolated when using ThreadPoolExecutor. Specifically, you need to make sure that:
Set Experiment Properly: The call to mlflow.set_experiment(location_name) inside run_prediction_task is correct for setting a separate experiment per location. However, we need to ensure that mlflow.start_run() is executed within the context of each thread and location.
Unique Run Names: You're using run_name=f"Prediction_{run_timestamp}" to uniquely identify the main run. That’s great! This will ensure each location has its own main run.
Manage Iteration Runs: You should avoid using nested=True in the mlflow.start_run() if you want fully independent runs (and if the iteration runs don't need to be nested within the main run). If nested runs are necessary, you can keep nested=True, but the parent-child relationship between runs can sometimes lead to issues when parallelizing execution.
DataFrame Management: Ensure that your df_with_predictions isn't being modified by multiple threads simultaneously. Each thread should work with its own version of the data.
MLflow Context in Threads: Make sure that each thread correctly creates and manages its own experiment and run context.
You can find more information on managing MLflow experiments and runs in the official documentation:
Thanks,
Prashanth Are
MS Fabric community support
Thanks for reaching out to MS Fabric community support.
Your code is mostly on track, but there are a few key changes needed to ensure the MLflow experiments and runs are correctly isolated when using ThreadPoolExecutor. Specifically, you need to make sure that:
Set Experiment Properly: The call to mlflow.set_experiment(location_name) inside run_prediction_task is correct for setting a separate experiment per location. However, we need to ensure that mlflow.start_run() is executed within the context of each thread and location.
Unique Run Names: You're using run_name=f"Prediction_{run_timestamp}" to uniquely identify the main run. That’s great! This will ensure each location has its own main run.
Manage Iteration Runs: You should avoid using nested=True in the mlflow.start_run() if you want fully independent runs (and if the iteration runs don't need to be nested within the main run). If nested runs are necessary, you can keep nested=True, but the parent-child relationship between runs can sometimes lead to issues when parallelizing execution.
DataFrame Management: Ensure that your df_with_predictions isn't being modified by multiple threads simultaneously. Each thread should work with its own version of the data.
MLflow Context in Threads: Make sure that each thread correctly creates and manages its own experiment and run context.
You can find more information on managing MLflow experiments and runs in the official documentation:
Thanks,
Prashanth Are
MS Fabric community support
Thanks @v-prasare. We are looking further into using NotebookUtils.notebook.RunMultiple(), as it gives us better insights in what the process was for each location in the notebook instance, by providing us a link to the notebook snaphots afterwards.
@nielsvdc, Thank you for the update!
It sounds like using NotebookUtils.notebook.RunMultiple() is a great choice, especially since it provides better insights and visibility into the process for each location in the notebook instance. The ability to access notebook snapshots afterwards will definitely help with tracking and debugging.
If this post helps, then please consider Accept it as the solution to help the other members find it more quickly and give Kudos if helped you resolve your query
Thanks,
Prashanth Are
MS Fabric community support
Check out the March 2025 Fabric update to learn about new features.
Explore and share Fabric Notebooks to boost Power BI insights in the new community notebooks gallery.