Microsoft is giving away 50,000 FREE Microsoft Certification exam vouchers. Get Fabric certified for FREE! Learn more
Hi, I have a need to ingest a ton of Power BI audit logs into Fabric. The eventual target is a Lakehouse table or tables. I have one log file per day (as csv), about 500 altogether. These files need extensive reshaping - things like pivoting attribute:value pairs into columns, parsing the text, etc. There's also reasonably large - about 40 or 50 million total lines, and maybe 4 or 5 million distinct audit log entries (see image below of a single entry)
I was going to do this using a Gen2 dataflow, but I can't figure out how to get a Gen2 dataflow to ingest a folder of files. Works perfectly with our on-prem network file shares, but in Fabric the Gen2 dataflow seems to want only one file at a time, no way to combine things under a Lakehouse Files section.
I'm a complete newbie on notebooks and really barely understand any Python (working to rectify that, going to take some time).
Here's a sample log file entry:
Things I've tried:
1. Using some of the sample code in the "learn fabric" paths to combine the files and publish to a Delta lakehouse table. This doesn't really work - if set VORDER on it screws up the order of lines in the audit log, making it impossible to associate lines with the proper audit log ID. If I turn VORDER off, querying the resulting table returns "Failed to read parquet file because the column segment for column '_c0' is too large"
2. I've tried using code like the following to append the 500 individual csv files into a single file. But it doesn't work - it seems instead to create a folder that has a ton of separate files in it instead of a single large file
Any ideas or guidance would be greatly appreciated - thanks!
Scott
I read a lot of files data that are fed into OneLake via API calls to external systems. Even though my data source are .json files; the code below will work for .csv files as well.
#Imports and Includes to use different functions down the line from pyspark.sql.functions import * from pyspark.sql.window import * from pyspark.sql.types import DateType from pyspark.sql.functions import to_date,to_timestamp,sequence from delta.tables import * ##Define a Parameter Cell in the Notebook to be able to dynamically pass a file struct _year = "9999" _month = "5" _day = "2" ##Generating the path to my files dynamically ##Instead of *.json you can use *.csv as this will look for any file that ends .csv for the path specific. _root = "Files/XCM/TaskSignOff" _path = "/" + _year + "/" + "/" + _month + "/" + _day + "/*.json" _full = _root + _path ##If your CSV file has a defined schema, I recommend defining the schema in your Notebook so you can control schema drift. I am calling out the expect elements/headers that will be in the files that I am reading. By setting the schema, if the vendor adds a new field; we do not necessarily bring it in right away. _schema = StructType( [ StructField("taskId",StringType()), StructField("signOffId",StringType()), StructField("comments",StringType()), StructField("requiredComments",StringType()), StructField("requiredDate",StringType()), StructField("requiredName",StringType()), StructField("signOffBy",StringType()), StructField("signOffComments",StringType()), StructField("signOffDate",StringType()), StructField("signOffRequired",StringType()), StructField("signOffName",StringType()), StructField("statusName",StringType()), StructField("updatedBy",StringType()), StructField("updatedOn",StringType()) ] ) ##Actual Dataframe ingestion ##Change format form 'json' to 'csv' ##I also append the source path & file name to my column set _df = spark.read.format('json').schema(_schema).load(_full).select("*","_metadata.file_name","_metadata.file_path") ##Display the results of the read (first 1000 rows) display(_df) ##Write to Delta Table ##Write Initial Task List To Delta _df.write.mode("append").format("delta").save("Tables/<Table_Name>")
When you try to write to a file from spark, all of the different work threads have part of the data. So each worker writes their piece of the pie which results in a bunch of files with guids and other oddities for names. In order to write as a single file you would have to call the collect() statement and that would then result in all data getting combined in 1 single worker and then you could write the data. It is a very memory intensive operation.
Hope this helps out a little bit.
@Scott_Powell It sounds like you have quite the challenge on your hands, Scott!
For reading in the files, Spark has ability to read multiple csv files into a single DataFrame is definitely good way to get started , However i highly suggest adding a unique ID field to each row is a smart idea to preserve order later on. ( The error you mentioned sounds more like a corrupted parquet structure error at first glance)
Since you mentioned extensive data reshaping is needed, we can work together on the transformations - whether it's parsing text fields, pivoting, or anything else. Feel free to connect and share some example rows and we can prototype the logic.
Finally i would suggest instead of output as CSV try outputing the input csv in to Delta format in a Lakehouse architecture makes sense for your further workloads
The main thought is to maintain data integrity - both in order and values - as i suggest you move from raw CSVs to a transformed Delta table.
Check out the March 2025 Fabric update to learn about new features.
Explore and share Fabric Notebooks to boost Power BI insights in the new community notebooks gallery.