title | titleSuffix | description | author | ms.author | ms.date | ms.topic | ms.service | ms.reviewer | ms.devlang | ms.custom |
---|---|---|---|---|---|---|---|---|---|---|
Process change feed in Azure Blob Storage | Azure Storage | Learn how to process change feed transaction logs in a .NET client application using the Blobs Change Feed client library. | normesta | normesta | 06/06/2024 | article | azure-blob-storage | sadodd | csharp | devx-track-csharp, devx-track-dotnet |
Change feed provides transaction logs of all the changes that occur to the blobs and the blob metadata in your storage account. This article shows you how to read change feed records by using the blob change feed processor library.
To learn more about the change feed, see Change feed in Azure Blob Storage.
This section walks you through preparing a project to work with the Blobs Change Feed client library for .NET.
From your project directory, install the package for the Azure Storage Blobs Change Feed client library for .NET using the dotnet add package
command. In this example, we add the --prerelease
flag to the command to install the latest preview version.
dotnet add package Azure.Storage.Blobs.ChangeFeed --prerelease
The code examples in this article also use the Azure Blob Storage and Azure Identity packages.
dotnet add package Azure.Identitydotnet add package Azure.Storage.Blobs
Add the following using
directives to your code file:
usingAzure.Identity;usingAzure.Storage.Blobs;usingAzure.Storage.Blobs.ChangeFeed;
To connect the application to Blob Storage, create an instance of the BlobServiceClient
class. The following example shows how to create a client object using DefaultAzureCredential
for authorization. To learn more, see Authorize access and connect to Blob Storage. To work with the change feed, you need Azure RBAC built-in role Storage Blob Data Reader or higher.
// TODO: Replace <storage-account-name> with the name of your storage accountstringaccountName="<storage-account-name>";BlobServiceClientclient=new(newUri($"https://{accountName}.blob.core.windows.net"),newDefaultAzureCredential());
The client object is passed as a parameter to some of the methods shown in this article.
Note
The change feed is an immutable and read-only entity in your storage account. Any number of applications can read and process the change feed simultaneously and independently at their own convenience. Records aren't removed from the change feed when an application reads them. The read or iteration state of each consuming reader is independent and maintained by your application only.
The following code example iterates through all records in the change feed, adds them to a list, and then returns the list of change feed events:
publicasyncTask<List<BlobChangeFeedEvent>>ChangeFeedAsync(BlobServiceClientclient){// Create a new BlobChangeFeedClientBlobChangeFeedClientchangeFeedClient=client.GetChangeFeedClient();List<BlobChangeFeedEvent>changeFeedEvents=[];// Get all the events in the change feedawaitforeach(BlobChangeFeedEventchangeFeedEventinchangeFeedClient.GetChangesAsync()){changeFeedEvents.Add(changeFeedEvent);}returnchangeFeedEvents;}
The following code example prints some values from the list of change feed events:
publicvoidshowEventData(List<BlobChangeFeedEvent>changeFeedEvents){foreach(BlobChangeFeedEventchangeFeedEventinchangeFeedEvents){stringsubject=changeFeedEvent.Subject;stringeventType=changeFeedEvent.EventType.ToString();BlobOperationNameoperationName=changeFeedEvent.EventData.BlobOperationName;Console.WriteLine("Subject: "+subject+"\n"+"Event Type: "+eventType+"\n"+"Operation: "+operationName.ToString());}}
You can choose to save your read position in the change feed, and then resume iterating through the records at a future time. You can save the read position by getting the change feed cursor. The cursor is a string and your application can save that string in any way that makes sense for your application's design, for example, to a file or database.
This example iterates through all records in the change feed, adds them to a list, and saves the cursor. The list and the cursor are returned to the caller.
publicasyncTask<(string,List<BlobChangeFeedEvent>)>ChangeFeedResumeWithCursorAsync(BlobServiceClientclient,stringcursor){// Get a new change feed clientBlobChangeFeedClientchangeFeedClient=client.GetChangeFeedClient();List<BlobChangeFeedEvent>changeFeedEvents=newList<BlobChangeFeedEvent>();IAsyncEnumerator<Page<BlobChangeFeedEvent>>enumerator=changeFeedClient.GetChangesAsync(continuationToken:cursor).AsPages(pageSizeHint:10).GetAsyncEnumerator();awaitenumerator.MoveNextAsync();foreach(BlobChangeFeedEventchangeFeedEventinenumerator.Current.Values){changeFeedEvents.Add(changeFeedEvent);}// Update the change feed cursor. The cursor is not required to get each page of events,// it's intended to be saved and used to resume iterating at a later date.cursor=enumerator.Current.ContinuationToken;return(cursor,changeFeedEvents);}
You can choose to process change feed records as they're committed to the change feed. See Specifications. The change events are published to the change feed at a period of 60 seconds on average. We recommend that you poll for new changes with this period in mind when specifying your poll interval.
This example periodically polls for changes. If change records exist, this code processes those records and saves change feed cursor. That way if the process is stopped and then started again, the application can use the cursor to resume processing records where it last left off. This example saves the cursor to a local file for demonstration purposes, but your application can save it in any form that makes the most sense for your scenario.
publicasyncTaskChangeFeedStreamAsync(BlobServiceClientclient,intwaitTimeMs,stringcursor){// Get a new change feed clientBlobChangeFeedClientchangeFeedClient=client.GetChangeFeedClient();while(true){IAsyncEnumerator<Page<BlobChangeFeedEvent>>enumerator=changeFeedClient.GetChangesAsync(continuationToken:cursor).AsPages().GetAsyncEnumerator();while(true){varresult=awaitenumerator.MoveNextAsync();if(result){foreach(BlobChangeFeedEventchangeFeedEventinenumerator.Current.Values){stringsubject=changeFeedEvent.Subject;stringeventType=changeFeedEvent.EventType.ToString();BlobOperationNameoperationName=changeFeedEvent.EventData.BlobOperationName;Console.WriteLine("Subject: "+subject+"\n"+"Event Type: "+eventType+"\n"+"Operation: "+operationName.ToString());}// Helper method to save cursorSaveCursor(enumerator.Current.ContinuationToken);}else{break;}}awaitTask.Delay(waitTimeMs);}}voidSaveCursor(stringcursor){// Specify the path to the file where you want to save the cursorstringfilePath="path/to/cursor.txt";// Write the cursor value to the fileFile.WriteAllText(filePath,cursor);}
You can read records that fall within a specific time range. This example iterates through all records in the change feed that fall within a specific date and time range, adds them to a list, and returns the list:
asyncTask<List<BlobChangeFeedEvent>>ChangeFeedBetweenDatesAsync(BlobServiceClientclient){// Get a new change feed clientBlobChangeFeedClientchangeFeedClient=client.GetChangeFeedClient();List<BlobChangeFeedEvent>changeFeedEvents=newList<BlobChangeFeedEvent>();// Create the start and end time. The change feed client will round start time down to// the nearest hour, and round endTime up to the next hour if you provide DateTimeOffsets// with minutes and seconds.DateTimeOffsetstartTime=newDateTimeOffset(2024,3,1,0,0,0,TimeSpan.Zero);DateTimeOffsetendTime=newDateTimeOffset(2024,6,1,0,0,0,TimeSpan.Zero);// You can also provide just a start or end time.awaitforeach(BlobChangeFeedEventchangeFeedEventinchangeFeedClient.GetChangesAsync(start:startTime,end:endTime)){changeFeedEvents.Add(changeFeedEvent);}returnchangeFeedEvents;}
The start time that you provide is rounded down to the nearest hour and the end time is rounded up to the nearest hour. It's possible that users might see events that occurred before the start time and after the end time. It's also possible that some events that occur between the start and end time won't appear. That's because events might be recorded during the hour previous to the start time or during the hour after the end time.