From PyPI:
pip install mrjob
From source:
python setup.py install
See Sample Config File section for additional config details.
Sample mrjob code that processes log files on Amazon S3 based on the S3 logging format:
%%file mr_s3_log_parser.py importtimefrommrjob.jobimportMRJobfrommrjob.protocolimportRawValueProtocol,ReprProtocolimportreclassMrS3LogParser(MRJob):"""Parses the logs from S3 based on the S3 logging format: http://docs.aws.amazon.com/AmazonS3/latest/dev/LogFormat.html Aggregates a user's daily requests by user agent and operation Outputs date_time, requester, user_agent, operation, count """LOGPATS=r'(\S+) (\S+) \[(.*?)\] (\S+) (\S+) ' \ r'(\S+) (\S+) (\S+) ("([^"]+)"|-) ' \ r'(\S+) (\S+) (\S+) (\S+) (\S+) (\S+) ' \ r'("([^"]+)"|-) ("([^"]+)"|-)'NUM_ENTRIES_PER_LINE=17logpat=re.compile(LOGPATS)(S3_LOG_BUCKET_OWNER,S3_LOG_BUCKET,S3_LOG_DATE_TIME,S3_LOG_IP,S3_LOG_REQUESTER_ID,S3_LOG_REQUEST_ID,S3_LOG_OPERATION,S3_LOG_KEY,S3_LOG_HTTP_METHOD,S3_LOG_HTTP_STATUS,S3_LOG_S3_ERROR,S3_LOG_BYTES_SENT,S3_LOG_OBJECT_SIZE,S3_LOG_TOTAL_TIME,S3_LOG_TURN_AROUND_TIME,S3_LOG_REFERER,S3_LOG_USER_AGENT)=range(NUM_ENTRIES_PER_LINE)DELIMITER='\t'# We use RawValueProtocol for input to be format agnostic# and avoid any type of parsing errorsINPUT_PROTOCOL=RawValueProtocol# We use RawValueProtocol for output so we can output raw lines# instead of (k, v) pairsOUTPUT_PROTOCOL=RawValueProtocol# Encode the intermediate records using repr() instead of JSON, so the# record doesn't get Unicode-encodedINTERNAL_PROTOCOL=ReprProtocoldefclean_date_time_zone(self,raw_date_time_zone):"""Converts entry 22/Jul/2013:21:04:17 +0000 to the format 'YYYY-MM-DD HH:MM:SS' which is more suitable for loading into a database such as Redshift or RDS Note: requires the chars "[ ]" to be stripped prior to input Returns the converted datetime annd timezone or None for both values if failed TODO: Needs to combine timezone with date as one field """date_time=Nonetime_zone_parsed=None# TODO: Probably cleaner to parse this with a regexdate_parsed=raw_date_time_zone[:raw_date_time_zone.find(":")]time_parsed=raw_date_time_zone[raw_date_time_zone.find(":")+1:raw_date_time_zone.find("+")-1]time_zone_parsed=raw_date_time_zone[raw_date_time_zone.find("+"):]try:date_struct=time.strptime(date_parsed,"%d/%b/%Y")converted_date=time.strftime("%Y-%m-%d",date_struct)date_time=converted_date+" "+time_parsed# Throws a ValueError exception if the operation fails that is# caught by the calling function and is handled appropriatelyexceptValueErroraserror:raiseValueError(error)else:returnconverted_date,date_time,time_zone_parseddefmapper(self,_,line):line=line.strip()match=self.logpat.search(line)date_time=Nonerequester=Noneuser_agent=Noneoperation=Nonetry:forninrange(self.NUM_ENTRIES_PER_LINE):group=match.group(1+n)ifn==self.S3_LOG_DATE_TIME:date,date_time,time_zone_parsed= \ self.clean_date_time_zone(group)# Leave the following line of code if # you want to aggregate by datedate_time=date+" 00:00:00"elifn==self.S3_LOG_REQUESTER_ID:requester=groupelifn==self.S3_LOG_USER_AGENT:user_agent=groupelifn==self.S3_LOG_OPERATION:operation=groupelse:passexceptException:yield(("Error while parsing line: %s",line),1)else:yield((date_time,requester,user_agent,operation),1)defreducer(self,key,values):output=list(key)output=self.DELIMITER.join(output)+ \ self.DELIMITER+ \ str(sum(values))yieldNone,outputdefsteps(self):return[self.mr(mapper=self.mapper,reducer=self.reducer)]if__name__=='__main__':MrS3LogParser.run()
Run an Amazon Elastic MapReduce (EMR) job on the given input (must be a flat file hierarchy), placing the results in the output (output directory must not exist):
!python mr_s3_log_parser.py -r emr s3://bucket-source/ --output-dir=s3://bucket-dest/
Run a MapReduce job locally on the specified input file, sending the results to the specified output file:
!python mr_s3_log_parser.py input_data.txt > output_data.txt
Accompanying unit test:
%%file test_mr_s3_log_parser.py fromStringIOimportStringIOimportunittest2asunittestfrommr_s3_log_parserimportMrS3LogParserclassMrTestsUtil:defrun_mr_sandbox(self,mr_job,stdin):# inline runs the job in the same process so small jobs tend to# run faster and stack traces are simpler# --no-conf prevents options from local mrjob.conf from polluting# the testing environment# "-" reads from standard inmr_job.sandbox(stdin=stdin)# make_runner ensures job cleanup is performed regardless of# success or failurewithmr_job.make_runner()asrunner:runner.run()forlineinrunner.stream_output():key,value=mr_job.parse_output_line(line)yieldvalueclassTestMrS3LogParser(unittest.TestCase):mr_job=Nonemr_tests_util=NoneRAW_LOG_LINE_INVALID= \ '00000fe9688b6e57f75bd2b7f7c1610689e8f01000000' \ '00000388225bcc00000 ' \ 's3-storage [22/Jul/2013:21:03:27 +0000] ' \ '00.111.222.33 ' \ RAW_LOG_LINE_VALID= \ '00000fe9688b6e57f75bd2b7f7c1610689e8f01000000' \ '00000388225bcc00000 ' \ 's3-storage [22/Jul/2013:21:03:27 +0000] ' \ '00.111.222.33 ' \ 'arn:aws:sts::000005646931:federated-user/user 00000AB825500000 ' \ 'REST.HEAD.OBJECT user/file.pdf ' \ '"HEAD /user/file.pdf?versionId=00000XMHZJp6DjM9x500000' \ '00000SDZk ' \ 'HTTP/1.1" 200 - - 4000272 18 - "-" ' \ '"Boto/2.5.1 (darwin) USER-AGENT/1.0.14.0" ' \ '00000XMHZJp6DjM9x5JVEAMo8MG00000'DATE_TIME_ZONE_INVALID="AB/Jul/2013:21:04:17 +0000"DATE_TIME_ZONE_VALID="22/Jul/2013:21:04:17 +0000"DATE_VALID="2013-07-22"DATE_TIME_VALID="2013-07-22 21:04:17"TIME_ZONE_VALID="+0000"def__init__(self,*args,**kwargs):super(TestMrS3LogParser,self).__init__(*args,**kwargs)self.mr_job=MrS3LogParser(['-r','inline','--no-conf','-'])self.mr_tests_util=MrTestsUtil()deftest_invalid_log_lines(self):stdin=StringIO(self.RAW_LOG_LINE_INVALID)forresultinself.mr_tests_util.run_mr_sandbox(self.mr_job,stdin):self.assertEqual(result.find("Error"),0)deftest_valid_log_lines(self):stdin=StringIO(self.RAW_LOG_LINE_VALID)forresultinself.mr_tests_util.run_mr_sandbox(self.mr_job,stdin):self.assertEqual(result.find("Error"),-1)deftest_clean_date_time_zone(self):date,date_time,time_zone_parsed= \ self.mr_job.clean_date_time_zone(self.DATE_TIME_ZONE_VALID)self.assertEqual(date,self.DATE_VALID)self.assertEqual(date_time,self.DATE_TIME_VALID)self.assertEqual(time_zone_parsed,self.TIME_ZONE_VALID)# Use a lambda to delay the calling of clean_date_time_zone so that# assertRaises has enough time to handle it properlyself.assertRaises(ValueError,lambda:self.mr_job.clean_date_time_zone(self.DATE_TIME_ZONE_INVALID))if__name__=='__main__':unittest.main()
Run the mrjob test:
!python test_mr_s3_log_parser.py -v
runners:emr:aws_access_key_id:__ACCESS_KEY__aws_secret_access_key:__SECRET_ACCESS_KEY__aws_region:us-east-1ec2_key_pair:EMRec2_key_pair_file:~/.ssh/EMR.pemssh_tunnel_to_job_tracker:trueec2_master_instance_type:m3.xlargeec2_instance_type:m3.xlargenum_ec2_instances:5s3_scratch_uri:s3://bucket/tmp/s3_log_uri:s3://bucket/tmp/logs/enable_emr_debugging:Truebootstrap:-sudoapt-getinstall-ypython-pip-sudopipinstall--upgradesimplejson