I'm in the process of writing a python script for automating a data ingestion pipeline using Amazon Web Service's Kinesis stream, Firehose and lambda. This is my first stab at writing Python, but I do have some experience with JavaScript and Node.js.
Scripts are organized as follows:
- build_pipeline.py - this is where all the logic lives
- aws_pipeline.py - this is where my corresponding classes live that called within build_pipeline.py
- config.json - this is a JSON file that has some configs for the stream name, etc.
Pipeline process:
- Run script like so:
python build_pipeline.py config.json
- Script checks to make sure config file exists and then reads the file so we can access the JSON properties.
- Set local variables based on JSON properties
- Initiate classes for s3 (for storing config), stream, and Firehose
- Upload file to s3 folder
- Check if the stream exists, if not, create and add tags
- Check if the Firehose exists, if not, create
- Get stream ARN and pass into lambda class and initialize
- Check if lambda event source exists, if not, create event source
aws_pipeline.py
import boto3 import os class Stream(object): ''' This a Class for Kinesis Stream ''' CLIENT = boto3.client('kinesis') SHARD_COUNT = 2 def __init__(self, stream_name): self.stream_name = stream_name def list(self): ''' Gets stream list ''' try: print 'GET: Kinesis Stream list' stream_list = self.CLIENT.list_streams()['StreamNames'] print 'DONE: Kinesis Stream List returned' return stream_list except Exception as ex: print ex raise ex def create(self): ''' Creates a stream ''' try: print 'CREATE: Stream' self.CLIENT.create_stream( StreamName=self.stream_name, ShardCount=self.SHARD_COUNT ) print 'DONE: Kinesis Stream created' except Exception as ex: print ex raise ex def add_tags(self): ''' Adds tags to stream ''' try: print 'ADD: Tags to Stream' self.CLIENT.add_tags_to_stream( StreamName=self.stream_name, Tags=self.build_tags() ) print 'DONE: Kinesis Stream tags added' except Exception as ex: print ex raise ex def build_tags(self): ''' Builds JSON object of Tags ''' return { 'BUSINESS_REGION': 'NORTHAMERICA', 'BUSINESS_UNIT': 'DATASERVICES', 'CLIENT': 'NONE', 'ENVIRONMENT': 'POC', 'NAME': self.stream_name, 'PLATFORM': 'ATLAS' } def get_arn(self): ''' Describes Kinesis stream - This is how we get Stream ARN for Lambda event mapping ''' try: print 'GET: Stream ARN' arn = self.CLIENT.describe_stream( StreamName=self.stream_name )['StreamDescription']['StreamARN'] print 'RETURN: Stream ARN' return arn except Exception as ex: print ex raise ex class Firehose(object): ''' This is a Class for Kinesis Firehose ''' CLIENT = boto3.client('firehose') def __init__(self, firehose_name, bucket_name, prefix_name): self.firehose_name = firehose_name self.bucket_name = bucket_name self.prefix_name = prefix_name def list(self): ''' Gets Firehose list ''' try: print 'GET: Kinesis Firehose list' stream_list = self.CLIENT.list_delivery_streams()['DeliveryStreamNames'] print 'RETURN: Kinesis Firehose list ' return stream_list except Exception as ex: print ex raise ex def create(self): ''' Creates a firehose ''' try: print 'CREATE: Firehose' self.CLIENT.create_stream( DeliveryStreamName=self.firehose_name, S3DestinationConfiguration=self.config ) print 'RETURN: Kinesis Stream Created ' except Exception as ex: print ex raise ex def config(self): ''' Builds config settings for firehose ''' return { 'RoleARN': 'arn:aws:iam::123456789:role/example_role', 'BucketARN': 'arn:aws:s3:::' + self.bucket_name, 'Prefix': self.prefix_name, 'BufferingHints': { 'SizeInMBs': 128, 'IntervalInSeconds': 900 }, 'CompressionFormat': 'Snappy', 'EncryptionConfiguration': { 'NoEncryptionConfig': 'NoEncryption' }, 'CloudWatchLoggingOptions': { 'Enabled': True, 'LogGroupName': '/aws/kinesisfirehose/' + self.firehose_name, 'LogStreamName': 'S3Delivery' } } class S3(object): RESOURCE = boto3.resource('s3') CONFIG_FILE_BUCKET = 'avrotest' CONFIG_FILE_PREFIX = 'lambda-configs/' def __init__(self, config_file): self.file = config_file def upload_file_to_config_folder(self): ''' Uploads file to config folder ''' try: print 'TRY: Upload file to S3' self.RESOURCE.meta.client.upload_file( os.path.realpath(self.file), self.CONFIG_FILE_BUCKET, self.CONFIG_FILE_PREFIX + self.file ) except Exception as ex: print ex raise ex print 'DONE: File uploaded to S3' class Lambda(object): ''' This is a Class for Lambda ''' CLIENT = boto3.client('lambda') def __init__(self, stream_arn, function_name): self.stream_arn = stream_arn self.function_name = function_name def event_source_list(self): ''' Gets event source mappings ''' try: print 'GET: Lambda Event Source mappings' event_source_list = self.CLIENT.list_event_source_mappings( EventSourceArn=self.stream_arn, FunctionName=self.function_name )['EventSourceMappings'] print 'RETURN: Lambda Event Source mappings' return event_source_list except Exception as ex: print ex raise ex def create_event_source(self): ''' Creates an event source ''' try: print 'CREATE: Event source' self.CLIENT.create_event_source_mapping( EventSourceArn=self.stream_arn, FunctionName=self.function_name, Enabled=True, BatchSize=100, StartingPosition='LATEST' ) print 'DONE: Lambda Event Source created' except Exception as ex: print ex raise ex
buid_pipeline.py
import sys import json import aws_pipeline LAMBDA_FUNC_EXCEPTION = 'EXTERNALCONFIG' def get_config_file(): ''' Gets the config file from the argument ''' try: return sys.argv[1] except IndexError: print "Error: config file is missing - please add." sys.exit() def read_config_file(config_file): ''' Reads and returns the JSON object ''' with open(config_file) as config: data = json.load(config) return data def main(): ''' the main thang ''' # get, read config file and make JSON accessible config_file = get_config_file() data = read_config_file(config_file) # set various values stream_name = data['stream'] firehose_name = data['firehose'] bucket_name = data['bucket'] prefix_name = data['prefix'] s_three = aws_pipeline.S3(config_file) stream = aws_pipeline.Stream(stream_name) firehose = aws_pipeline.Firehose(firehose_name, bucket_name, prefix_name) # upload config file to s3 s_three.upload_file_to_config_folder() # check if stream exists, if not create if stream_name not in stream.list(): stream.create() stream.add_tags() else: print 'STATUS: Stream found - do nothing' # check if firehose exists, if not create if firehose_name not in firehose.list(): firehose.create() else: print 'STATUS: Firehose found - do nothing' stream_arn = stream.get_arn() lambda_exception_handler = aws_pipeline.Lambda(stream_arn, LAMBDA_FUNC_EXCEPTION) # check if lambda event mapping exits for exception handler function, if not create if not lambda_exception_handler.event_source_list(): lambda_exception_handler.create_event_source() else: print 'STATUS: Lambda event source found - do nothing' main()
config.json
{ "bucket": "firstpipeline", "prefix": "EventSource1/", "firehose": "a-great-firehose", "stream": "a-great-stream" }
Everything works, but I don't have any other Python devs to conduct a code review - aside from pylint to lint code. I'm looking for general feedback on OOP design, exception handling, and anything else.