7
\$\begingroup\$

I'm in the process of writing a python script for automating a data ingestion pipeline using Amazon Web Service's Kinesis stream, Firehose and lambda. This is my first stab at writing Python, but I do have some experience with JavaScript and Node.js.

Scripts are organized as follows:

  • build_pipeline.py - this is where all the logic lives
  • aws_pipeline.py - this is where my corresponding classes live that called within build_pipeline.py
  • config.json - this is a JSON file that has some configs for the stream name, etc.

Pipeline process:

  1. Run script like so: python build_pipeline.py config.json
  2. Script checks to make sure config file exists and then reads the file so we can access the JSON properties.
  3. Set local variables based on JSON properties
  4. Initiate classes for s3 (for storing config), stream, and Firehose
  5. Upload file to s3 folder
  6. Check if the stream exists, if not, create and add tags
  7. Check if the Firehose exists, if not, create
  8. Get stream ARN and pass into lambda class and initialize
  9. Check if lambda event source exists, if not, create event source

aws_pipeline.py

import boto3 import os class Stream(object): ''' This a Class for Kinesis Stream ''' CLIENT = boto3.client('kinesis') SHARD_COUNT = 2 def __init__(self, stream_name): self.stream_name = stream_name def list(self): ''' Gets stream list ''' try: print 'GET: Kinesis Stream list' stream_list = self.CLIENT.list_streams()['StreamNames'] print 'DONE: Kinesis Stream List returned' return stream_list except Exception as ex: print ex raise ex def create(self): ''' Creates a stream ''' try: print 'CREATE: Stream' self.CLIENT.create_stream( StreamName=self.stream_name, ShardCount=self.SHARD_COUNT ) print 'DONE: Kinesis Stream created' except Exception as ex: print ex raise ex def add_tags(self): ''' Adds tags to stream ''' try: print 'ADD: Tags to Stream' self.CLIENT.add_tags_to_stream( StreamName=self.stream_name, Tags=self.build_tags() ) print 'DONE: Kinesis Stream tags added' except Exception as ex: print ex raise ex def build_tags(self): ''' Builds JSON object of Tags ''' return { 'BUSINESS_REGION': 'NORTHAMERICA', 'BUSINESS_UNIT': 'DATASERVICES', 'CLIENT': 'NONE', 'ENVIRONMENT': 'POC', 'NAME': self.stream_name, 'PLATFORM': 'ATLAS' } def get_arn(self): ''' Describes Kinesis stream - This is how we get Stream ARN for Lambda event mapping ''' try: print 'GET: Stream ARN' arn = self.CLIENT.describe_stream( StreamName=self.stream_name )['StreamDescription']['StreamARN'] print 'RETURN: Stream ARN' return arn except Exception as ex: print ex raise ex class Firehose(object): ''' This is a Class for Kinesis Firehose ''' CLIENT = boto3.client('firehose') def __init__(self, firehose_name, bucket_name, prefix_name): self.firehose_name = firehose_name self.bucket_name = bucket_name self.prefix_name = prefix_name def list(self): ''' Gets Firehose list ''' try: print 'GET: Kinesis Firehose list' stream_list = self.CLIENT.list_delivery_streams()['DeliveryStreamNames'] print 'RETURN: Kinesis Firehose list ' return stream_list except Exception as ex: print ex raise ex def create(self): ''' Creates a firehose ''' try: print 'CREATE: Firehose' self.CLIENT.create_stream( DeliveryStreamName=self.firehose_name, S3DestinationConfiguration=self.config ) print 'RETURN: Kinesis Stream Created ' except Exception as ex: print ex raise ex def config(self): ''' Builds config settings for firehose ''' return { 'RoleARN': 'arn:aws:iam::123456789:role/example_role', 'BucketARN': 'arn:aws:s3:::' + self.bucket_name, 'Prefix': self.prefix_name, 'BufferingHints': { 'SizeInMBs': 128, 'IntervalInSeconds': 900 }, 'CompressionFormat': 'Snappy', 'EncryptionConfiguration': { 'NoEncryptionConfig': 'NoEncryption' }, 'CloudWatchLoggingOptions': { 'Enabled': True, 'LogGroupName': '/aws/kinesisfirehose/' + self.firehose_name, 'LogStreamName': 'S3Delivery' } } class S3(object): RESOURCE = boto3.resource('s3') CONFIG_FILE_BUCKET = 'avrotest' CONFIG_FILE_PREFIX = 'lambda-configs/' def __init__(self, config_file): self.file = config_file def upload_file_to_config_folder(self): ''' Uploads file to config folder ''' try: print 'TRY: Upload file to S3' self.RESOURCE.meta.client.upload_file( os.path.realpath(self.file), self.CONFIG_FILE_BUCKET, self.CONFIG_FILE_PREFIX + self.file ) except Exception as ex: print ex raise ex print 'DONE: File uploaded to S3' class Lambda(object): ''' This is a Class for Lambda ''' CLIENT = boto3.client('lambda') def __init__(self, stream_arn, function_name): self.stream_arn = stream_arn self.function_name = function_name def event_source_list(self): ''' Gets event source mappings ''' try: print 'GET: Lambda Event Source mappings' event_source_list = self.CLIENT.list_event_source_mappings( EventSourceArn=self.stream_arn, FunctionName=self.function_name )['EventSourceMappings'] print 'RETURN: Lambda Event Source mappings' return event_source_list except Exception as ex: print ex raise ex def create_event_source(self): ''' Creates an event source ''' try: print 'CREATE: Event source' self.CLIENT.create_event_source_mapping( EventSourceArn=self.stream_arn, FunctionName=self.function_name, Enabled=True, BatchSize=100, StartingPosition='LATEST' ) print 'DONE: Lambda Event Source created' except Exception as ex: print ex raise ex 

buid_pipeline.py

import sys import json import aws_pipeline LAMBDA_FUNC_EXCEPTION = 'EXTERNALCONFIG' def get_config_file(): ''' Gets the config file from the argument ''' try: return sys.argv[1] except IndexError: print "Error: config file is missing - please add." sys.exit() def read_config_file(config_file): ''' Reads and returns the JSON object ''' with open(config_file) as config: data = json.load(config) return data def main(): ''' the main thang ''' # get, read config file and make JSON accessible config_file = get_config_file() data = read_config_file(config_file) # set various values stream_name = data['stream'] firehose_name = data['firehose'] bucket_name = data['bucket'] prefix_name = data['prefix'] s_three = aws_pipeline.S3(config_file) stream = aws_pipeline.Stream(stream_name) firehose = aws_pipeline.Firehose(firehose_name, bucket_name, prefix_name) # upload config file to s3 s_three.upload_file_to_config_folder() # check if stream exists, if not create if stream_name not in stream.list(): stream.create() stream.add_tags() else: print 'STATUS: Stream found - do nothing' # check if firehose exists, if not create if firehose_name not in firehose.list(): firehose.create() else: print 'STATUS: Firehose found - do nothing' stream_arn = stream.get_arn() lambda_exception_handler = aws_pipeline.Lambda(stream_arn, LAMBDA_FUNC_EXCEPTION) # check if lambda event mapping exits for exception handler function, if not create if not lambda_exception_handler.event_source_list(): lambda_exception_handler.create_event_source() else: print 'STATUS: Lambda event source found - do nothing' main() 

config.json

{ "bucket": "firstpipeline", "prefix": "EventSource1/", "firehose": "a-great-firehose", "stream": "a-great-stream" } 

Everything works, but I don't have any other Python devs to conduct a code review - aside from pylint to lint code. I'm looking for general feedback on OOP design, exception handling, and anything else.

\$\endgroup\$
0

    1 Answer 1

    6
    +25
    \$\begingroup\$

    Naming

    I don't think your names are very descriptive. You call your class Stream, for example, and then have to explain in the docstring that it is a Kinesis Stream. Just call the class KinesisStream and then remove the docstring (well, ideally give it a better docstring). This can be repeated for all of those classes. Lambda is especially confusing, given that lambda functions already exist in Python. I don't know enough about the usecase and the system to suggest a better name, but you should come up with one.

    Exception handling and logging

    In my mind there are two types of exception handling - application exceptions, and function exceptions (afaik this is all my own terminology - there might be an official name but I don't know it). Function exceptions include the specific exceptions you plan for and can handle (or know to pass up appropriately) inside of the function. They are targeted (meaning they encapsulate the fewest liens possible) and specific. They might look something like this:

    def to_int(thing): try: return int(thing) except TypeError as e: do_something() return -1 except ValueError as e: do_something_else() return -1 

    Here you know what specific exception you're looking for, and you can handle everything inside of your function.

    This also includes things like passing a (hopefully specific) exception up to something that is expected to handle it.

    Application exceptions are the catchall, log-em and pray type exceptions where you want to know that something bad happened, but you weren't able to handle it inside the code. This is what you do, everywhere. There are a few problems here. The first is that you're going to end up printing the exception multiple times if any of your functions end up calling one another. The second is that you're going to end up adding some unnecessary stuff to your traceback. The third is that it makes the code much harder to read - normally seeing exception handling gives you an idea of what to expect from some code, but seeing except Exception as ex doesn't tell me anything - just that something could go wrong. The fourth is that your logging doesn't really add much value because you just print to stdout.

    I'd just remove all of that error handling and move it up a level. If you know about specific exceptions that you want some specific logging or behavior for, those are great. Otherwise its just noise.

    WRT logging - printing to stdout barely qualifies as logging. I'd recommend either using the logging module, a third party module, or roll your own. If you so desire you can configure your logger to just print to stdout, but you shouldn't default to that. Additionally, the messages you're logging are pointless - they just say "I called this function" and "this function is done". You know that just by looking at the code. Remove all of them (or if you keep them, then set them to the lowest priority level of your logger).

    Docstrings

    All of your docstrings are pretty much worthless. Use an existing style guide (I like numpydoc, although Sphinx's RST format is probably slightly more common) to help guide what you should write, and then rewrite them to give useful information. If the name of the function gives you everything you need to know about it, which may sometimes be true, then pat yourself on the back - you've written a self-describing function. In these cases, however, you probably still want to have a class-level docstring explaining overall purpose and workflow with this class.

    CLI and CLA

    Instead of rolling your own sys.argv parser (which is admittedly easy in this case) you're better off using argparse or a third party library. They'll be easier to use if you add more complexity in the future, and more immediately understandable to fellow Python developers. I haven't made those changes in my code below, but they should be straightforward.

    Configuration

    Right now you're able to handle your config file with two simple functions and some local variables. That's fine, but if this expands in complexity you'll probably want a dedicated class to handle interfacing with the config file.

    Overall nitpickyness

    You sometimes make local variables just to have them. This can be nice for readability, but it is often unnecessary. You also have comments like set various values that very clearly add no information - dump those as well.

    In main you check if the stream/firehose exist, then make it there. I'd rather see logic like that in the constructor - your main should be as dumb as possible.

    Lastly, you should always use an if __name__ == '__main__' block to hold your main function, just in case.

    Here is your rewritten code for aws_pipeline - I didn't make all of the changes mentioned above, but it should be a good start.

    import boto3 import os class KinesisStream(object): CLIENT = boto3.client('kinesis') SHARD_COUNT = 2 def __init__(self, stream_name): self.stream_name = stream_name def list(self): return self.CLIENT.list_streams()['StreamNames'] def create(self): self.CLIENT.create_stream( StreamName=self.stream_name, ShardCount=self.SHARD_COUNT ) def add_tags(self): self.CLIENT.add_tags_to_stream( StreamName=self.stream_name, Tags=self.build_tags() ) def build_tags(self): return { 'BUSINESS_REGION': 'NORTHAMERICA', 'BUSINESS_UNIT': 'DATASERVICES', 'CLIENT': 'NONE', 'ENVIRONMENT': 'POC', 'NAME': self.stream_name, 'PLATFORM': 'ATLAS' } def get_arn(self): return self.CLIENT.describe_stream( StreamName=self.stream_name )['StreamDescription']['StreamARN'] class KinesisFirehose(object): CLIENT = boto3.client('firehose') def __init__(self, firehose_name, bucket_name, prefix_name): self.firehose_name = firehose_name self.bucket_name = bucket_name self.prefix_name = prefix_name def list(self): return self.CLIENT.list_delivery_streams()['DeliveryStreamNames'] def create(self): return self.CLIENT.create_stream( DeliveryStreamName=self.firehose_name, S3DestinationConfiguration=self.config ) def config(self): return { 'RoleARN': 'arn:aws:iam::123456789:role/example_role', 'BucketARN': 'arn:aws:s3:::' + self.bucket_name, 'Prefix': self.prefix_name, 'BufferingHints': { 'SizeInMBs': 128, 'IntervalInSeconds': 900 }, 'CompressionFormat': 'Snappy', 'EncryptionConfiguration': { 'NoEncryptionConfig': 'NoEncryption' }, 'CloudWatchLoggingOptions': { 'Enabled': True, 'LogGroupName': '/aws/kinesisfirehose/' + self.firehose_name, 'LogStreamName': 'S3Delivery' } } class S3(object): RESOURCE = boto3.resource('s3') CONFIG_FILE_BUCKET = 'avrotest' CONFIG_FILE_PREFIX = 'lambda-configs/' def __init__(self, config_file): self.file = config_file def upload_file_to_config_folder(self): self.RESOURCE.meta.client.upload_file( os.path.realpath(self.file), self.CONFIG_FILE_BUCKET, self.CONFIG_FILE_PREFIX + self.file ) class Lambda(object): CLIENT = boto3.client('lambda') def __init__(self, stream_arn, function_name): self.stream_arn = stream_arn self.function_name = function_name def event_source_list(self): return self.CLIENT.list_event_source_mappings( EventSourceArn=self.stream_arn, FunctionName=self.function_name )['EventSourceMappings'] def create_event_source(self): self.CLIENT.create_event_source_mapping( EventSourceArn=self.stream_arn, FunctionName=self.function_name, Enabled=True, BatchSize=100, StartingPosition='LATEST' ) 
    \$\endgroup\$

      Start asking to get answers

      Find the answer to your question by asking.

      Ask question

      Explore related questions

      See similar questions with these tags.