Source code for pyrcs.utils

"""
Utilities - Helper functions.
"""

import collections
import datetime
import os
import re
import urllib.parse

import bs4
import dateutil.parser
import measurement.measures
import numpy as np
import pandas as pd
import pkg_resources
import requests
from pyhelpers.ops import confirmed, fake_requests_headers
from pyhelpers.store import load_json, load_pickle, save_json, save_pickle


[docs]def homepage_url(): """ Specify the homepage URL of the data source. :return: URL of the data source homepage :rtype: str """ return 'http://www.railwaycodes.org.uk/'
# -- Directory -------------------------------------------------------------------------
[docs]def cd_dat(*sub_dir, dat_dir="dat", mkdir=False, **kwargs): """ Change directory to `dat_dir/` and sub-directories within a package. :param sub_dir: name of directory; names of directories (and/or a filename) :type sub_dir: str :param dat_dir: name of a directory to store data, defaults to ``"dat"`` :type dat_dir: str :param mkdir: whether to create a directory, defaults to ``False`` :type mkdir: bool :param kwargs: optional parameters of `os.makedirs`_, e.g. ``mode=0o777`` :return: a full path to a directory (or a file) under ``data_dir`` :rtype: str .. _`os.makedirs`: https://docs.python.org/3/library/os.html#os.makedirs **Example**:: >>> import os >>> from pyrcs.utils import cd_dat >>> path_to_dat_dir = cd_dat("line-data", dat_dir="dat", mkdir=False) >>> print(os.path.relpath(path_to_dat_dir)) pyrcs\\dat\\line-data """ path = pkg_resources.resource_filename(__name__, dat_dir) for x in sub_dir: path = os.path.join(path, x) if mkdir: path_to_file, ext = os.path.splitext(path) if ext == '': os.makedirs(path_to_file, exist_ok=True, **kwargs) else: os.makedirs(os.path.dirname(path_to_file), exist_ok=True, **kwargs) return path
# -- Converters ------------------------------------------------------------------------
[docs]def mile_chain_to_nr_mileage(miles_chains): """ Convert mileage data in the form '<miles>.<chains>' to Network Rail mileage. :param miles_chains: mileage data presented in the form '<miles>.<chains>' :type miles_chains: str or numpy.nan or None :return: Network Rail mileage in the form '<miles>.<yards>' :rtype: str **Examples**:: >>> from pyrcs.utils import mile_chain_to_nr_mileage >>> miles_chains_dat = '0.18' # AAM 0.18 Tewkesbury Junction with ANZ (84.62) >>> mileage_data = mile_chain_to_nr_mileage(miles_chains_dat) >>> print(mileage_data) 0.0396 >>> miles_chains_dat = None # or np.nan, or '' >>> mileage_data = mile_chain_to_nr_mileage(miles_chains_dat) >>> print(mileage_data) """ if pd.notna(miles_chains) and miles_chains != '': miles, chains = str(miles_chains).split('.') yards = measurement.measures.Distance(chain=chains).yd network_rail_mileage = '%.4f' % (int(miles) + round(yards / (10 ** 4), 4)) else: network_rail_mileage = '' return network_rail_mileage
[docs]def nr_mileage_to_mile_chain(str_mileage): """ Convert Network Rail mileage to the form '<miles>.<chains>'. :param str_mileage: Network Rail mileage data presented in the form '<miles>.<yards>' :type str_mileage: str or numpy.nan or None :return: '<miles>.<chains>' :rtype: str **Examples**:: >>> from pyrcs.utils import nr_mileage_to_mile_chain >>> str_mileage_dat = '0.0396' >>> miles_chains_dat = nr_mileage_to_mile_chain(str_mileage_dat) >>> print(miles_chains_dat) 0.18 >>> str_mileage_dat = None # or np.nan, or '' >>> miles_chains_dat = nr_mileage_to_mile_chain(str_mileage_dat) >>> print(miles_chains_dat) """ if pd.notna(str_mileage) and str_mileage != '': miles, yards = str(str_mileage).split('.') chains = measurement.measures.Distance(yard=yards).chain miles_chains = '%.2f' % (int(miles) + round(chains / (10 ** 2), 2)) else: miles_chains = '' return miles_chains
[docs]def nr_mileage_str_to_num(str_mileage): """ Convert string-type Network Rail mileage to numerical-type one. :param str_mileage: string-type Network Rail mileage in the form '<miles>.<yards>' :type str_mileage: str :return: numerical-type Network Rail mileage :rtype: float **Examples**:: >>> from pyrcs.utils import nr_mileage_str_to_num >>> str_mileage_dat = '0.0396' >>> num_mileage_dat = nr_mileage_str_to_num(str_mileage_dat) >>> print(num_mileage_dat) 0.0396 >>> str_mileage_dat = '' >>> num_mileage_dat = nr_mileage_str_to_num(str_mileage_dat) >>> print(num_mileage_dat) nan """ num_mileage = np.nan if str_mileage == '' else round(float(str_mileage), 4) return num_mileage
[docs]def nr_mileage_num_to_str(num_mileage): """ Convert numerical-type Network Rail mileage to string-type one. :param num_mileage: numerical-type Network Rail mileage :type num_mileage: float :return: string-type Network Rail mileage in the form '<miles>.<yards>' :rtype: str **Examples**:: >>> import numpy as np_ >>> from pyrcs.utils import nr_mileage_num_to_str >>> num_mileage_dat = 0.0396 >>> str_mileage_dat = nr_mileage_num_to_str(num_mileage_dat) >>> print(str_mileage_dat) 0.0396 >>> type(str_mileage_dat) <class 'str'> >>> num_mileage_dat = np_.nan >>> str_mileage_dat = nr_mileage_num_to_str(num_mileage_dat) >>> print(str_mileage_dat) >>> type(str_mileage_dat) <class 'str'> """ if (num_mileage or num_mileage == 0) and pd.notna(num_mileage): nr_mileage = '%.4f' % round(float(num_mileage), 4) else: nr_mileage = '' return nr_mileage
[docs]def nr_mileage_to_yards(nr_mileage): """ Convert Network Rail mileages to yards. :param nr_mileage: Network Rail mileage :type nr_mileage: float or str :return: yards :rtype: int **Examples**:: >>> from pyrcs.utils import nr_mileage_to_yards >>> nr_mileage_dat = '0.0396' >>> yards_dat = nr_mileage_to_yards(nr_mileage_dat) >>> print(yards_dat) 396 >>> nr_mileage_dat = 0.0396 >>> yards_dat = nr_mileage_to_yards(nr_mileage_dat) >>> print(yards_dat) 396 """ if isinstance(nr_mileage, (float, np.float, int, np.integer)): nr_mileage = nr_mileage_num_to_str(nr_mileage) else: pass miles = int(nr_mileage.split('.')[0]) yards = int(nr_mileage.split('.')[1]) yards += int(measurement.measures.Distance(mi=miles).yd) return yards
[docs]def yards_to_nr_mileage(yards): """ Convert yards to Network Rail mileages. :param yards: yards :type yards: int or float, numpy.nan, None :return: Network Rail mileage in the form '<miles>.<yards>' :rtype: str **Examples**:: >>> from pyrcs.utils import yards_to_nr_mileage >>> yards_dat = 396 >>> mileage_dat = yards_to_nr_mileage(yards_dat) >>> print(mileage_dat) 0.0396 >>> type(mileage_dat) <class 'str'> >>> yards_dat = 396.0 >>> mileage_dat = yards_to_nr_mileage(yards_dat) >>> print(mileage_dat) 0.0396 >>> type(mileage_dat) <class 'str'> >>> yards_dat = None >>> mileage_dat = yards_to_nr_mileage(yards_dat) >>> print(mileage_dat) >>> type(mileage_dat) <class 'str'> """ if pd.notnull(yards) and yards != '': mileage_mi = np.floor(measurement.measures.Distance(yd=yards).mi) mileage_yd = yards - int(measurement.measures.Distance(mi=mileage_mi).yd) # Example: "%.2f" % round(2606.89579999999, 2) mileage = str('%.4f' % round((mileage_mi + mileage_yd / (10 ** 4)), 4)) else: mileage = '' return mileage
[docs]def shift_num_nr_mileage(nr_mileage, shift_yards): """ Shift Network Rail mileage by given yards. :param nr_mileage: Network Rail mileage :type nr_mileage: float or int or str :param shift_yards: yards by which the given ``nr_mileage`` is shifted :type shift_yards: int or float :return: shifted numerical Network Rail mileage :rtype: float **Examples**:: >>> from pyrcs.utils import shift_num_nr_mileage >>> num_mileage_dat = shift_num_nr_mileage(nr_mileage='0.0396', shift_yards=220) >>> print(num_mileage_dat) 0.0616 >>> shift_num_nr_mileage(nr_mileage='0.0396', shift_yards=220.99) >>> print(num_mileage_dat) 0.0617 >>> shift_num_nr_mileage(nr_mileage=10, shift_yards=220) >>> print(num_mileage_dat) 10.022 """ yards = nr_mileage_to_yards(nr_mileage) + shift_yards shifted_nr_mileage = yards_to_nr_mileage(yards) shifted_num_mileage = nr_mileage_str_to_num(shifted_nr_mileage) return shifted_num_mileage
[docs]def year_to_financial_year(date): """ Convert calendar year of a given date to Network Rail financial year. :param date: date :type date: datetime.datetime :return: Network Rail financial year of the given ``date`` :rtype: int **Example**:: >>> import datetime >>> from pyrcs.utils import year_to_financial_year >>> financial_year = year_to_financial_year(datetime.datetime.now()) >>> print(financial_year) 2020 """ financial_date = date + pd.DateOffset(months=-3) return financial_date.year
# -- Parsers ---------------------------------------------------------------------------
[docs]def parse_tr(header, trs): """ Parse a list of parsed HTML <tr> elements. .. _parse-tr: See also [`PT-1 <https://stackoverflow.com/questions/28763891/>`_]. :param header: list of column names of a requested table :type header: list :param trs: contents under <tr> tags (bs4.Tag) of a web page :type trs: bs4.ResultSet :return: list of lists with each comprising a row of the requested table :rtype: list **Example**:: >>> import bs4 >>> import requests >>> from pyrcs.utils import fake_requests_headers, parse_tr >>> source = requests.get('http://www.railwaycodes.org.uk/elrs/elra.shtm', ... headers=fake_requests_headers()) >>> parsed_text = bs4.BeautifulSoup(source.text, 'lxml') >>> header_ = [] >>> for th in parsed_text.find_all('th'): ... header_.append(th.text) >>> trs_dat = parsed_text.find_all('tr') >>> tables_list = parse_tr(header_, trs_dat) # returns a list of lists >>> type(tables_list) <class 'list'> >>> print(tables_list[-1]) ['AYT', 'Aberystwyth Branch', '0.00 - 41.15', 'Pencader Junction', '\xa0'] """ tbl_lst = [] for row in trs: data = [] for dat in row.find_all('td'): txt = dat.get_text() if '\n' in txt: t = txt.split('\n') txt = '%s (%s)' % (t[0], t[1:]) if '(' not in txt and ')' not in txt \ else '%s %s' % (t[0], t[1:]) data.append(txt) else: data.append(txt) tbl_lst.append(data) row_spanned = [] for no, tr in enumerate(trs): for td_no, rho in enumerate(tr.find_all('td')): # print(data.has_attr("rowspan")) if rho.has_attr('rowspan'): row_spanned.append((no, int(rho['rowspan']), td_no, rho.text)) if row_spanned: d = collections.defaultdict(list) for k, *v in row_spanned: d[k].append(v) row_spanned = list(d.items()) for x in row_spanned: i, to_repeat = x[0], x[1] for y in to_repeat: for j in range(1, y[0]): if y[2] in tbl_lst[i] and y[2] != '\xa0': y[1] += np.abs(tbl_lst[i].index(y[2]) - y[1]) tbl_lst[i + j].insert(y[1], y[2]) # if row_spanned: # for x in row_spanned: # for j in range(1, x[2]): # # Add value in next tr # idx = x[0] + j # # assert isinstance(idx, int) # if x[1] >= len(tbl_lst[idx]): # tbl_lst[idx].insert(x[1], x[3]) # elif x[3] in tbl_lst[x[0]]: # tbl_lst[idx].insert(tbl_lst[x[0]].index(x[3]), x[3]) # else: # tbl_lst[idx].insert(x[1] + 1, x[3]) for k in range(len(tbl_lst)): n = len(header) - len(tbl_lst[k]) if n > 0: tbl_lst[k].extend(['\xa0'] * n) elif n < 0 and tbl_lst[k][2] == '\xa0': del tbl_lst[k][2] return tbl_lst
[docs]def parse_table(source, parser='lxml'): """ Parse HTML <tr> elements for creating a data frame. :param source: response object to connecting a URL to request a table :type source: requests.Response :param parser: ``'lxml'`` (default), ``'html5lib'`` or ``'html.parser'`` :type parser: str :return: a list of lists each comprising a row of the requested table (see also :ref:`parse_tr() <parse-tr>`) and a list of column names of the requested table :rtype: tuple **Examples**:: >>> from pyrcs.utils import fake_requests_headers, parse_table >>> source_ = requests.get('http://www.railwaycodes.org.uk/elrs/elra.shtm', ... headers=fake_requests_headers()) >>> parsed_contents = parse_table(source_, parser='lxml') >>> type(parsed_contents) <class 'tuple'> >>> type(parsed_contents[0]) <class 'list'> >>> type(parsed_contents[1]) <class 'list'> """ # Get plain text from the source URL web_page_text = source.text # Parse the text parsed_text = bs4.BeautifulSoup(web_page_text, parser) # Get all data under the HTML label 'tr' table_temp = parsed_text.find_all('tr') # Get a list of column names for output DataFrame headers = table_temp[0] header = [header.text for header in headers.find_all('th')] # Get a list of lists, each of which corresponds to a piece of record trs = table_temp[1:] # Return a list of parsed tr's, each of which corresponds to one df row return parse_tr(header, trs), header
[docs]def parse_location_name(location_name): """ Parse location name (and its associated note). :param location_name: location name (in raw data) :type location_name: str or None :return: location name and, if any, note :rtype: tuple **Examples**:: >>> from pyrcs.utils import parse_location_name >>> location_dat = 'Abbey Wood' >>> dat_and_note = parse_location_name(location_dat) >>> print(dat_and_note) ('Abbey Wood', '') >>> location_dat = None >>> dat_and_note = parse_location_name(location_dat) >>> print(dat_and_note) ('', '') >>> location_dat = 'Abercynon (formerly Abercynon South)' >>> dat_and_note = parse_location_name(location_dat) >>> print(dat_and_note) ('Abercynon', 'formerly Abercynon South') >>> location_dat = 'Allerton (reopened as Liverpool South Parkway)' >>> dat_and_note = parse_location_name(location_dat) >>> print(dat_and_note) ('Allerton', 'reopened as Liverpool South Parkway') >>> location_dat = 'Ashford International [domestic portion]' >>> dat_and_note = parse_location_name(location_dat) >>> print(dat_and_note) ('Ashford International', 'domestic portion') """ if location_name is None: dat, note = '', '' else: # Location name d = re.search(r'.*(?= \[[\"\']\()', location_name) if d is not None: dat = d.group() elif ' [unknown feature, labelled "do not use"]' in location_name: dat = re.search(r'\w+(?= \[unknown feature, )', location_name).group() elif ') [formerly' in location_name: dat = re.search(r'.*(?= \[formerly)', location_name).group() else: m_pattern = re.compile( r'[Oo]riginally |' r'[Ff]ormerly |' r'[Ll]ater |' r'[Pp]resumed |' r' \(was |' r' \(in |' r' \(at |' r' \(also |' r' \(second code |' r'\?|' r'\n|' r' \(\[\'|' r' \(definition unknown\)|' r' \(reopened |' r'( portion])$') x_tmp = re.search(r'(?=[\[(]).*(?<=[\])])|(?=\().*(?<=\) \[)', location_name) x_tmp = x_tmp.group() if x_tmp is not None else location_name if re.search(m_pattern, location_name): dat = ' '.join(location_name.replace(x_tmp, '').split()) else: dat = location_name # Note y = location_name.replace(dat, '', 1).strip() if y == '': note = '' else: n = re.search(r'(?<=[\[(])[\w ,?]+(?=[])])', y) if n is None: n = re.search( r'(?<=(\[[\'\"]\()|(\([\'\"]\[)|(\) \[)).*' r'(?=(\)[\'\"]\])|(\][\'\"]\))|\])', y) elif '"now deleted"' in y and y.startswith('(') and y.endswith(')'): n = re.search(r'(?<=\().*(?=\))', y) note = n.group() if n is not None else '' if note.endswith('\'') or note.endswith('"'): note = note[:-1] if 'STANOX ' in dat and 'STANOX ' in location_name and note == '': dat = location_name[0:location_name.find('STANOX')].strip() note = location_name[location_name.find('STANOX'):] return dat, note
[docs]def parse_date(str_date, as_date_type=False): """ Parse a date. :param str_date: string-type date :type str_date: str :param as_date_type: whether to return the date as `datetime.date`_, defaults to ``False`` :type as_date_type: bool :return: parsed date as a string or `datetime.date`_ :rtype: str, datetime.date .. _`datetime.date`: https://docs.python.org/3/library/datetime.html#datetime.date **Examples**:: >>> from pyrcs.utils import parse_date >>> str_date_dat = '2020-01-01' >>> parsed_date_dat = parse_date(str_date_dat, as_date_type=True) >>> print(parsed_date_dat) 2020-01-01 >>> type(parsed_date_dat) <class 'datetime.date'> """ temp_date = dateutil.parser.parse(str_date, fuzzy=True) # or, temp_date = datetime.strptime(last_update_date[12:], '%d %B %Y') parsed_date = temp_date.date() if as_date_type else str(temp_date.date()) return parsed_date
# -- Get useful information ------------------------------------------------------------
[docs]def get_last_updated_date(url, parsed=True, as_date_type=False): """ Get last update date. :param url: URL link of a requested web page :type url: str :param parsed: whether to reformat the date, defaults to ``True`` :type parsed: bool :param as_date_type: whether to return the date as `datetime.date`_, defaults to ``False`` :type as_date_type: bool :return: date of when the specified web page was last updated :rtype: str, datetime.date, None .. _`datetime.date`: https://docs.python.org/3/library/datetime.html#datetime.date **Examples**:: >>> from pyrcs.utils import get_last_updated_date >>> last_update_date_ = get_last_updated_date( ... url='http://www.railwaycodes.org.uk/crs/CRSa.shtm', parsed=True, ... as_date_type=False) >>> type(last_update_date_) <class 'str'> >>> last_update_date_ = get_last_updated_date( ... url='http://www.railwaycodes.org.uk/crs/CRSa.shtm', parsed=True, ... as_date_type=True) >>> type(last_update_date_) <class 'datetime.date'> >>> last_update_date_ = get_last_updated_date( ... url='http://www.railwaycodes.org.uk/linedatamenu.shtm') >>> print(last_update_date_) None """ # Request to get connected to the given url source = requests.get(url, headers=fake_requests_headers()) web_page_text = source.text # Parse the text scraped from the requested web page parsed_text = bs4.BeautifulSoup(web_page_text, 'lxml') # Find 'Last update date' update_tag = parsed_text.find('p', {'class': 'update'}) if update_tag is not None: last_update_date = update_tag.text # Decide whether to convert the date's format if parsed: # Convert the date to "yyyy-mm-dd" format last_update_date = parse_date(last_update_date, as_date_type) else: last_update_date = None # print('Information not available.') return last_update_date
[docs]def get_catalogue(page_url, update=False, confirmation_required=True, json_it=True, verbose=False): """ Get the catalogue for a class. :param page_url: URL of the main page of a code category :type page_url: str :param update: whether to check on update and proceed to update the package data, defaults to ``False`` :type update: bool :param confirmation_required: whether to prompt a message for confirmation to proceed, defaults to ``True`` :type confirmation_required: bool :param json_it: whether to save the catalogue as a .json file, defaults to ``True`` :type json_it: bool :param verbose: whether to print relevant information in console as the function runs, defaults to ``False`` :type verbose: bool :return: catalogue in the form {'<title>': '<URL>'} :rtype: dict **Examples**:: >>> from pyrcs.utils import get_catalogue >>> url = 'http://www.railwaycodes.org.uk/elrs/elr0.shtm' >>> catalog = get_catalogue(url) >>> type(catalog) <class 'dict'> >>> print(list(catalog.keys())[:5]) ['Introduction', 'A', 'B', 'C', 'D'] >>> url = 'http://www.railwaycodes.org.uk/linedatamenu.shtm' >>> catalog = get_catalogue(url) >>> print(list(catalog.keys())[:5]) ['Line data'] >>> line_data_catalog = catalog['Line data'] >>> type(line_data_catalog) <class 'dict'> """ cat_json = '-'.join(x for x in urllib.parse.urlparse(page_url).path.replace( '.shtm', '.json').split('/') if x) path_to_cat_json = cd_dat("catalogue", cat_json, mkdir=True) if os.path.isfile(path_to_cat_json) and not update: catalogue = load_json(path_to_cat_json, verbose=verbose) else: if confirmed("To collect/update catalogue? ", confirmation_required=confirmation_required): source = requests.get(page_url, headers=fake_requests_headers()) source_text = source.text source.close() try: cold_soup = \ bs4.BeautifulSoup(source_text, 'lxml').find( 'div', attrs={'class': 'fixed'}) catalogue = { a.get_text(strip=True): urllib.parse.urljoin(page_url, a.get('href')) for a in cold_soup.find_all('a')} except AttributeError: cold_soup = \ bs4.BeautifulSoup(source_text, 'lxml').find('h1').find_all_next('a') catalogue = { a.get_text(strip=True): urllib.parse.urljoin(page_url, a.get('href')) for a in cold_soup} if json_it: save_json(catalogue, path_to_cat_json, verbose=verbose) else: print("The catalogue for the requested data has not been acquired.") catalogue = None return catalogue
[docs]def get_category_menu(menu_url, update=False, confirmation_required=True, json_it=True, verbose=False): """ Get a menu of the available classes. :param menu_url: URL of the menu page :type menu_url: str :param update: whether to check on update and proceed to update the package data, defaults to ``False`` :type update: bool :param confirmation_required: whether to prompt a message for confirmation to proceed, defaults to ``True`` :type confirmation_required: bool :param json_it: whether to save the catalogue as a .json file, defaults to ``True`` :type json_it: bool :param verbose: whether to print relevant information in console as the function runs, defaults to ``False`` :type verbose: bool :return: :rtype: dict **Example**:: >>> from pyrcs.utils import get_category_menu >>> url = 'http://www.railwaycodes.org.uk/linedatamenu.shtm' >>> menu = get_category_menu(url) >>> type(menu) <class 'dict'> >>> print(list(menu.keys())) ['Line data'] """ menu_json = '-'.join(x for x in urllib.parse.urlparse(menu_url).path.replace( '.shtm', '.json').split('/') if x) path_to_menu_json = cd_dat("catalogue", menu_json, mkdir=True) if os.path.isfile(path_to_menu_json) and not update: cls_menu = load_json(path_to_menu_json, verbose=verbose) else: if confirmed("To collect/update category menu? ", confirmation_required=confirmation_required): source = requests.get(menu_url, headers=fake_requests_headers()) soup = bs4.BeautifulSoup(source.text, 'lxml') h1, h2s = soup.find('h1'), soup.find_all('h2') cls_name = h1.text.replace(' menu', '') if len(h2s) == 0: cls_elem = dict((x.text, urllib.parse.urljoin(menu_url, x.get('href'))) for x in h1.find_all_next('a')) else: all_next = [x.replace(':', '') for x in h1.find_all_next(string=True) if x != '\n' and x != '\xa0'][2:] h2s_list = [x.text.replace(':', '') for x in h2s] all_next_a = [(x.text, urllib.parse.urljoin(menu_url, x.get('href'))) for x in h1.find_all_next('a', href=True)] idx = [all_next.index(x) for x in h2s_list] for i in idx: all_next_a.insert(i, all_next[i]) cls_elem, i = {}, 0 while i <= len(idx): if i == 0: d = dict(all_next_a[i:idx[i]]) elif i < len(idx): d = {h2s_list[i - 1]: dict(all_next_a[idx[i - 1] + 1:idx[i]])} else: d = {h2s_list[i - 1]: dict(all_next_a[idx[i - 1] + 1:])} i += 1 cls_elem.update(d) cls_menu = {cls_name: cls_elem} if json_it: save_json(cls_menu, path_to_menu_json, verbose=verbose) else: print("The category menu has not been acquired.") cls_menu = None return cls_menu
[docs]def get_station_data_catalogue(source_url, source_key, update=False): """ Get catalogue of railway station data. :param source_url: URL to the source web page :type source_url: str :param source_key: key of the returned catalogue (which is a dictionary) :type source_key: str :param update: whether to check on update and proceed to update the package data, defaults to ``False`` :type update: bool :return: catalogue of railway station data :rtype: dict See :py:class:`pyrcs.other_assets.Stations() <pyrcs.other_assets.stations.Stations>` """ cat_json = '-'.join(x for x in urllib.parse.urlparse(source_url).path.replace( '.shtm', '.json').split('/') if x) path_to_cat = cd_dat("catalogue", cat_json) if os.path.isfile(path_to_cat) and not update: catalogue = load_json(path_to_cat) else: source = requests.get(source_url, headers=fake_requests_headers()) cold_soup = bs4.BeautifulSoup(source.text, 'lxml').find( 'p', {'class': 'appeal'}).find_next('p').find_next('p') hot_soup = {a.text: urllib.parse.urljoin(source_url, a.get('href')) for a in cold_soup.find_all('a')} catalogue = {source_key: None} for k, v in hot_soup.items(): sub_cat = get_catalogue(v, update=True, confirmation_required=False, json_it=False) if sub_cat != hot_soup: if k == 'Introduction': catalogue.update({source_key: {k: v, **sub_cat}}) else: catalogue.update({k: sub_cat}) else: if k in ('Bilingual names', 'Not served by SFO'): catalogue[source_key].update({k: v}) else: catalogue.update({k: v}) save_json(catalogue, path_to_cat) return catalogue
[docs]def get_track_diagrams_items(source_url, source_key, update=False): """ Get catalogue of track diagrams. :param source_url: URL to the source web page :type source_url: str :param source_key: key of the returned catalogue (which is a dictionary) :type source_key: str :param update: whether to check on update and proceed to update the package data, defaults to ``False`` :type update: bool :return: catalogue of railway station data :rtype: dict See :py:class:`pyrcs.line_data.TrackDiagrams() <pyrcs.line_data.track_diagrams.TrackDiagrams>` """ cat_json = '-'.join(x for x in urllib.parse.urlparse(source_url).path.replace( '.shtm', '.json').split('/') if x) path_to_cat = cd_dat("catalogue", cat_json) if os.path.isfile(path_to_cat) and not update: items = load_pickle(path_to_cat) else: source = requests.get(source_url, headers=fake_requests_headers()) soup = bs4.BeautifulSoup(source.text, 'lxml') h3 = {x.get_text(strip=True) for x in soup.find_all('h3', text=True, attrs={'class': None})} items = {source_key: h3} save_pickle(items, path_to_cat) return items
# -- Rectification of location names ---------------------------------------------------
[docs]def fetch_location_names_repl_dict(k=None, regex=False, as_dataframe=False): """ Create a dictionary for rectifying location names. :param k: key of the created dictionary, defaults to ``None`` :type k: str or int or float or bool or None :param regex: whether to create a dictionary for replacement based on regular expressions, defaults to ``False`` :type regex: bool :param as_dataframe: whether to return the created dictionary as a pandas.DataFrame, defaults to ``False`` :type as_dataframe: bool :return: dictionary for rectifying location names :rtype: dict or pandas.DataFrame **Examples**:: >>> from pyrcs.utils import fetch_location_names_repl_dict >>> repl_dict = fetch_location_names_repl_dict() >>> type(repl_dict) <class 'dict'> >>> print(list(repl_dict.keys())[:5]) ['"Tyndrum Upper" (Upper Tyndrum)', 'AISH EMERGENCY CROSSOVER', 'ATLBRJN', 'Aberdeen Craiginches', 'Aberdeen Craiginches T.C.'] >>> repl_dict = fetch_location_names_repl_dict(regex=True, as_dataframe=True) >>> type(repl_dict) <class 'pandas.core.frame.DataFrame'> >>> print(repl_dict.head()) new_value re.compile(' \\(DC lines\\)') [DC lines] re.compile(' And | \\+ ') & re.compile('-By-') -by- re.compile('-In-') -in- re.compile('-En-Le-') -en-le- """ json_filename = "location-names-repl{}.json".format("" if not regex else "-regex") location_name_repl_dict = load_json(cd_dat(json_filename)) if regex: location_name_repl_dict = {re.compile(k): v for k, v in location_name_repl_dict.items()} replacement_dict = {k: location_name_repl_dict} if k else location_name_repl_dict if as_dataframe: replacement_dict = pd.DataFrame.from_dict(replacement_dict, orient='index', columns=['new_value']) return replacement_dict
[docs]def update_location_name_repl_dict(new_items, regex, verbose=False): """ Update the location-name replacement dictionary in the package data. :param new_items: new items to replace :type new_items: dict :param regex: whether this update is for regular-expression dictionary :type regex: bool :param verbose: whether to print relevant information in console as the function runs, defaults to ``False`` :type verbose: bool **Example**: >>> from pyrcs.utils import update_location_name_repl_dict >>> new_items_ = {} >>> update_location_name_repl_dict(new_items_, regex=False) """ json_filename = "location-names-repl{}.json".format("" if not regex else "-regex") new_items_keys = list(new_items.keys()) if confirmed("To update \"{}\" with {{\"{}\"... }}?".format( json_filename, new_items_keys[0])): path_to_json = cd_dat(json_filename) location_name_repl_dict = load_json(path_to_json) if any(isinstance(k, re.Pattern) for k in new_items_keys): new_items = {k.pattern: v for k, v in new_items.items() if isinstance(k, re.Pattern)} location_name_repl_dict.update(new_items) save_json(location_name_repl_dict, path_to_json, verbose=verbose)
# -- Fixers -------------------------------------------------------------------------------------------
[docs]def fix_num_stanox(stanox_code): """ Fix 'STANOX' if it is loaded as numbers. :param stanox_code: STANOX code :type stanox_code: str or int :return: standard STANOX code :rtype: str **Examples**:: >>> from pyrcs.utils import fix_num_stanox >>> stanox = 65630 >>> stanox_ = fix_num_stanox(stanox) >>> type(stanox_) <class 'str'> >>> stanox = 2071 >>> stanox_ = fix_num_stanox(stanox) >>> print(stanox_) 02071 """ if isinstance(stanox_code, (int or float)): stanox_code = '' if pd.isna(stanox_code) else str(int(stanox_code)) if len(stanox_code) < 5 and stanox_code != '': stanox_code = '0' * (5 - len(stanox_code)) + stanox_code return stanox_code
def fix_nr_mileage_str(nr_mileage): """ Fix Network Rail mileage. :param nr_mileage: NR mileage :type nr_mileage: str or float :return: conventional NR mileage code :rtype: str **Examples**:: >>> from pyrcs.utils import fix_nr_mileage_str >>> mileage = 29.011 >>> mileage_ = fix_nr_mileage_str(mileage) >>> print(mileage_) 29.0110 >>> mileage = '.1100' >>> mileage_ = fix_nr_mileage_str(mileage) >>> print(mileage_) 0.1100 """ if isinstance(nr_mileage, float): nr_mileage_ = fix_nr_mileage_str(str(nr_mileage)) elif nr_mileage and nr_mileage != '0': if '.' in nr_mileage: miles, yards = nr_mileage.split('.') if miles == '': miles = '0' else: miles, yards = nr_mileage, '0' if len(yards) < 4: yards += '0' * (4 - len(yards)) nr_mileage_ = '.'.join([miles, yards]) else: nr_mileage_ = nr_mileage return nr_mileage_ # -- Misc ---------------------------------------------------------------------------------------------
[docs]def is_str_float(str_val): """ Check if a string-type variable can express a float-type value. :param str_val: a string-type variable :type str_val: str :return: whether ``str_val`` can express a float value :rtype: bool **Examples**:: >>> from pyrcs.utils import is_str_float >>> is_str_float('') False >>> is_str_float('a') False >>> is_str_float('1') True >>> is_str_float('1.1') True """ try: float(str_val) # float(re.sub('[()~]', '', text)) test_res = True except ValueError: test_res = False return test_res