I have a Python code that deals with parsing xml-feeds of footwear online stores. I want to let the code look more professional for the sake of optimal reusability for other programmers.
It includes more script files, but I'm more interested in utils.py
import re from .setup_logger import * from .metadata import reference_categories, ref_cat_list def get_ref_text(offer_element, anchor): linked_tuple = (offer_element[key].lower() for key in anchor if key in offer_element.keys() and type(offer_element[key]) is str) return ''.join(linked_tuple) def asos_size(meta_size, offer_element): meta_size = meta_size.replace(',', '.') sizetype = re.search(r'[a-zA-Zа-яА-Я]+', meta_size) int_size = re.search(r'\d+\.?\d+?', meta_size) fract_size = re.search(r' \d{1}/\d{1}', meta_size) if sizetype is None or int_size is None: return False str_size = int_size.group(0) sizetype = sizetype.group(0) size = float(str_size) if '.5' in str_size else int(str_size) offer_element['sizetype'] = sizetype.lower() offer_element['size'] = size if fract_size is not None: offer_element['size'] = size + 0.5 offer_element['size'] = str(offer_element['size']) def yoox_size(offer_element): if offer_element['sizetype'] != 'eu': del offer_element['size'] return False else: size_l = [] for meta_size in offer_element['size'].split(','): int_size = re.search(r'(\d+)\.?(\d+)?', meta_size) frac_size = re.search(r'(⅔)|(⅓)', meta_size) if int_size is None: continue elif frac_size is None: res_size = int_size.group(0) size_l.append(res_size) elif frac_size is not None: res_size = int_size.group(0) + '.5' size_l.append(res_size) offer_element['size'] = size_l def add_size(xml_element, offer_element, shop, unit_attr_name='unit'): if type(xml_element.text) is not str: return False if re.findall(r'[/-]', xml_element.text) or xml_element.text.isalpha(): return False # exceptions for size # and size type switch if shop == 'nike': offer_element['sizetype'] = 'us' offer_element['size'] = xml_element.text return True if shop == 'asos': asos_size(xml_element.text, offer_element) return True if shop == 'adidas': sizetype = xml_element.attrib[unit_attr_name].lower() eur = re.search('eur', sizetype) ru = re.search('ru', sizetype) uk = re.search('uk', sizetype) if eur is not None: offer_element['sizetype'] = eur.group(0) elif ru is not None: offer_element['sizetype'] = ru.group(0) elif uk is not None: offer_element['sizetype'] = uk.group(0) offer_element['size'] = xml_element.text return True if unit_attr_name in xml_element.keys(): offer_element['sizetype'] = xml_element.attrib[unit_attr_name].lower() offer_element['size'] = xml_element.text if shop == 'yoox': yoox_size(offer_element) if shop in ('vans', 'lamoda', 'aupontrouge', 'kupivip'): offer_element['size'] = offer_element['size'].replace(',', '.') # exceptions for puma and slamdunk def add_gender(shop, offer_element): exception_list = ['puma', 'slamdunk'] if shop not in exception_list: return False men = ('мужские', 'мужская', 'мужской', 'мужчин') women = ('женские', 'женская', 'женский', 'женщин') unisex = ('унисекс', 'unisex') kid = ('детские', 'детская', 'детский', 'детей', 'мальчиков', 'девочек') anchor = ('description', 'category', 'title') ref_text = get_ref_text(offer_element, anchor) for word in kid: if word in ref_text: offer_element['sex'] = 'kid' return True for word in women: if word in ref_text: offer_element['sex'] = 'women' return True for word in men: if word in ref_text: offer_element['sex'] = 'men' return True for word in unisex: if word in ref_text: offer_element['sex'] = 'unisex' return True offer_element['sex'] = 'men' # revert categories def add_categories(categories_dict, offer_element): if 'category' in offer_element.keys() and offer_element['category'] is not None \ and offer_element['category'] in categories_dict.keys(): offer_element['category'] = reference_categories[categories_dict[offer_element['category']]] else: del offer_element['category'] return False anchor = ('description', 'category', 'title', 'model') ref_text = get_ref_text(offer_element, anchor) for ref in ref_cat_list: if ref in ref_text: offer_element['category'] = ref return True # getting model out of description def add_model(shop, offer_element): def from_title(offer_vendor, offer_title, offer): vendor_len = len(offer_vendor) start_pos = title.index(offer_vendor) model_index = start_pos + vendor_len result_model = offer_title[model_index:] offer['model'] = result_model.strip() if 'title' not in offer_element.keys() or offer_element['title'] is None: return False if shop == 'nike': title = offer_element['title'] for vendor in ('Nike', 'Jordan'): if vendor in title: from_title(vendor, title, offer_element) return True result = re.sub(r'[а-яА-я]', '', title) offer_element['model'] = result.strip() if shop in ('adidas', 'puma', 'aizel'): title = offer_element['title'] result = re.sub(r'[а-яА-я]', '', title) offer_element['model'] = result.strip() # exceptions for offer def add_exceptions(shop, offer_element): if shop == 'nike' and 'picture' in offer_element.keys() and offer_element['picture'] is not None: offer_element['picture'] = offer_element['picture'].replace('_PREM', '')[:-1] + 'A' if shop == 'sportpoint' and 'title' in offer_element.keys(): offer_element['description'] = offer_element['title'] if shop == 'puma' and 'description' in offer_element.keys(): desc = offer_element['description'] if type(desc) is str: offer_element['description'] = re.sub(r'[&,lt;,li,&,gt;,strong,\\li,\\ul,]', ' ', desc) # exceptions for tag def add_tag_exception(shop, xml_element, offer_element): if shop == 'lamoda' and xml_element.tag == 'model' and xml_element.text is not None: offer_element['model'] = xml_element.text if shop == 'asos' and xml_element.tag == 'model' and xml_element.text is not None: result = re.sub(r'[а-яА-я]', '', xml_element.text) offer_element['model'] = result.strip()
and the main class Feed.py
import re import datetime import xml.etree.ElementTree as Etree from .resources.setup_logger import * from pytz import timezone from .resources.links import urls from .resources.metadata import categories, tags, size_type_dict, gender_dict from .resources.utils import * from os import remove, rename, listdir, mkdir from os.path import isfile, isdir, getsize, join, abspath from urllib import request from math import ceil class Feed(object): """Works with data feeds for importing to site. Converts object to a Feed object for working with data to be imported to site""" downloads = abspath('downloads') def __init__(self, shop, partner='admitad', _type='.xml'): self.shop = shop self.url = urls[self.shop] self.partner = partner self.filtered = False self.type = _type self.file_name = self.shop + self.type self.folder = join(self.downloads, self.partner) self.previous_file_name = 'previous_{}'.format(self.file_name) self.path = join(self.downloads, self.partner, self.file_name) self.previous_path = join(self.downloads, self.partner, self.previous_file_name) if not isdir(self.folder): mkdir(self.folder) self.shoe_categories = categories[shop] self.replace_tags = tags[shop] def __eq__(self, other): """Overloaded the equation operator checks if the feed file self is equal to the other one""" return getsize(self.path) == getsize(other.path) def downloaded(self): """Checks if feed (not the previous one) is downloaded""" return isfile(self.path) def previously_downloaded(self): """Checks if previous_feed is downloaded""" return isfile(self.previous_path) def archive(self): """Moves previous feed to archive""" rename(self.path, self.previous_path) logger.info('Archived {0} to {1}'.format(self.shop.upper(), self.previous_path)) def remove_feed(self, previous=False): """Deletes certain feed from a given path (in self.path) previous (boolean) key sets what exact feed has to be removed (old or new)""" if self.downloaded() and not previous: remove(self.path) logger.info('Removed {} xml feed'.format(self.shop.upper())) if self.previously_downloaded() and previous: remove(self.previous_path) logger.info('Removed {} archive xml feed'.format(self.shop.upper())) def download_feed(self): """Downloads feed from a given url (in self.url), deleting previous feed if exists, renames previous feed to an archived feed""" try: file_data = request.urlopen(self.url) except ValueError: logger.info('ERROR: Unknown url\nNot reachable site') return None else: if self.previously_downloaded() and self.downloaded(): self.remove_feed(previous=True) if self.downloaded(): self.archive() logger.info('Downloading {} xml feed...'.format(self.shop.upper())) data_to_write = file_data.read() with open(self.path, 'wb') as f: f.write(data_to_write) logger.info('Saved {0} xml feed to {1}\n'.format(self.shop.upper(), self.path)) def to_list(self): """Converting feed to a raw list of dicts with tags as keys and tags as content. Returns list as self.list""" shoe_categories = self.shoe_categories replace_tags = self.replace_tags necessary_tags = list(replace_tags.values()) necessary_tags.remove('oldprice') # data tags category_tag = 'category' offer_tag = 'offer' raw_category_tag = 'categoryId' param_tag = 'param' category_id = 'id' raw_time_tag = 'modified_time' time_tag = 'time' size_tag = 'size' sizetype_tag = 'sizetype' sex_tag = 'sex' price_tag = 'price' oldprice_tag = 'oldprice' discount_tag = 'discount' xml = self.path if self.type == '.xml': if not isfile(xml): xml = self.previous_path logger.warning('Parsing archived {} feed...'.format(self.shop.upper())) else: logger.info('Parsing {} feed...'.format(self.shop.upper())) categories_dict = {} offers = [] raw_size_tag = [k for k, v in replace_tags.items() if v == size_tag][0] for event, elem in Etree.iterparse(xml, events=('start',)): # parsing and filtering categories if elem.tag == category_tag and elem.text is not None and \ elem.text.lower() in shoe_categories and \ elem.attrib[category_id] is not None: categories_dict[elem.attrib[category_id]] = elem.text.lower() # parsing offers if elem.tag == offer_tag and elem.find(raw_category_tag) is not None and \ elem.find(raw_category_tag).text in categories_dict.keys(): offer = {} for el in elem.getchildren(): # size & sizetype if (el.tag == param_tag and el.attrib[el.items()[0][0]].lower() == raw_size_tag) \ or el.tag.lower() == size_tag: add_size(el, offer, self.shop) continue # timestamp if el.tag == raw_time_tag and el.text is not None: mod_time = datetime.fromtimestamp(int(el.text)).strftime('%Y-%m-%d %H:%M:%S') offer[time_tag] = mod_time continue # other params (gender, size, etc) if el.tag == param_tag: key = el.attrib[el.items()[0][0]] else: key = el.tag if key.lower() in replace_tags.keys() and key.lower() not in offer.keys(): key = replace_tags[key.lower()] offer[key] = el.text # exceptional tag added add_tag_exception(self.shop, el, offer) # discount if price_tag in offer.keys() and oldprice_tag in offer.keys() \ and offer[price_tag] is not None and offer[oldprice_tag] is not None: old = float(offer[oldprice_tag]) new = float(offer[price_tag]) offer[discount_tag] = str(ceil(100 * ((old - new) / old))) # exceptions (gender info, nike pic) add_gender(self.shop, offer) add_exceptions(self.shop, offer) add_categories(categories_dict, offer) add_model(self.shop, offer) if sizetype_tag in offer.keys() and offer[sizetype_tag] in size_type_dict.keys(): offer[sizetype_tag] = size_type_dict[offer[sizetype_tag]] else: continue if sex_tag in offer.keys() and offer[sex_tag] is not None \ and offer[sex_tag].lower() in gender_dict.keys(): offer[sex_tag] = gender_dict[offer[sex_tag].lower()] else: continue # check for necessary tags if set(necessary_tags).issubset(offer.keys()): if type(offer[size_tag]) is list: for offer_size in offer[size_tag]: sub_offer = offer sub_offer[size_tag] = offer_size offers.append(sub_offer) else: offers.append(offer) elem.clear() logger.info('Parsing {0} done\nIn total ({0} offers): {1} items'.format(self.shop.upper(), len(offers))) return offers
To use this parser first I run 'python3 upload.py'
import logging from lib.Feed import Feed from lib.resources.links import shops for shop in ['slamdunk', 'newbalance', 'lamoda', 'asos', 'adidas', 'yoox', 'puma', 'aupontrouge', 'aizel', 'nike', 'sportpoint', 'streetball', 'vans']: Feed(shop).download_feed()
and then parser in test.py
from lib.Feed import Feed from lib.resources.setup_logger import * i = 0 shops = ['slamdunk', 'newbalance', 'lamoda', 'asos', 'yoox', 'adidas', 'puma', 'aupontrouge', 'aizel', 'nike', 'sportpoint', 'streetball', 'vans'] for shop in shops: offers = Feed(shop).to_list() i += len(offers) logger.info('total items (offers): {}'.format(i))
Please help me to do this code better, you can just simply give me an advise or show what can be optimized. Thanks a lot!