Source code for pewtils.http

from __future__ import division
from bs4 import BeautifulSoup
from builtins import str
from pewtils import get_hash, decode_text, is_not_null
from six.moves.urllib import parse as urlparse
from unidecode import unidecode
import pandas as pd
import re
import os
import requests
import tldextract
import warnings
from requests.exceptions import ReadTimeout
from stopit import ThreadingTimeout as Timeout


_ = pd.read_csv(
    os.path.join(
        os.path.dirname(os.path.abspath(__file__)), "general_link_shorteners.csv"
    )
)
GENERAL_LINK_SHORTENERS = _["shortener"].values


_ = pd.read_csv(
    os.path.join(
        os.path.dirname(os.path.abspath(__file__)), "vanity_link_shorteners.csv"
    )
)
_ = _[_["historical"] == 0]
VANITY_LINK_SHORTENERS = dict(zip(_["shortener"], _["expanded"]))

_ = pd.read_csv(
    os.path.join(
        os.path.dirname(os.path.abspath(__file__)), "vanity_link_shorteners.csv"
    )
)
_ = _[_["historical"] == 1]
HISTORICAL_VANITY_LINK_SHORTENERS = dict(zip(_["shortener"], _["expanded"]))

VANITY_LINK_SHORTENERS.update(HISTORICAL_VANITY_LINK_SHORTENERS)


[docs]def hash_url(url): """ Clears out http/https prefix and returns an MD5 hash of the URL. More effective \ when used in conjunction with :py:func:`pewtils.http.canonical_link`. :param url: The URL to hash :type url: str :return: Hashed string representation of the URL using the md5 hashing algorithm. :rtype: str Usage:: from pewtils.http import hash_url >>> hash_url("http://www.example.com") "7c1767b30512b6003fd3c2e618a86522" >>> hash_url("www.example.com") "7c1767b30512b6003fd3c2e618a86522" """ http_regex = re.compile(r"^http(s)?\:\/\/") with warnings.catch_warnings(): warnings.simplefilter("ignore") result = get_hash( unidecode(http_regex.sub("", url.lower())), hash_function="md5" ) return result
[docs]def strip_html(html, simple=False, break_tags=None): """ Attempts to strip out HTML code from an arbitrary string while preserving meaningful text components. \ By default, the function will use BeautifulSoup to parse the HTML. Setting ``simple=True`` will make the \ function use a much simpler regular expression approach to parsing. :param html: The HTML to process :type html: str :param simple: Whether or not to use a simple regex or more complex parsing rules (default=False) :type simple: bool :param break_tags: A custom list of tags on which to break (default is ["strong", "em", "i", "b", "p"]) :type break_tags: list :return: The text with HTML components removed :rtype: str .. note: This function might not be effective for *all* variations of HTML structures, but it produces fairly \ reliable results in removing the vast majority of HTML without stripping out valuable content. Usage:: from pewtils.http import strip_html >>> my_html = "<html><head>Header text</head><body>Body text</body></html>" >>> strip_html(my_html) 'Header text Body text' """ html = re.sub(r"\n", " ", html) html = re.sub(r"\s+", " ", html) if not break_tags: break_tags = ["strong", "em", "i", "b", "p"] if not simple: try: split_re = re.compile(r"\s{2,}") soup = BeautifulSoup(html, "lxml") for tag in soup(): if ( "class" in tag.attrs and ("menu" in tag.attrs["class"] or "header" in tag.attrs["class"]) ) or ("menu" in str(tag.id) or "header" in str(tag.id)): tag.extract() for tag in soup(["script", "style"]): tag.extract() for br in soup.find_all("br"): br.replace_with("\n") for t in soup(break_tags): try: t.replace_with("\n{0}\n".format(t.text)) except (UnicodeDecodeError, UnicodeEncodeError): t.replace_with("\n{0}\n".format(decode_text(t.text))) if hasattr(soup, "body") and soup.body: text = soup.body.get_text() else: text = soup.get_text() lines = [l.strip() for l in text.splitlines()] lines = [l2.strip() for l in lines for l2 in split_re.split(l)] text = "\n".join([l for l in lines if l]) text = re.sub(r"(\sA){2,}\s", " ", text) text = re.sub(r"\n+(\s+)?", "\n\n", text) text = re.sub(r" +", " ", text) text = re.sub(r"\t+", " ", text) return text except Exception as e: print("strip_html error") print(e) text = re.sub(r"<[^>]*>", " ", re.sub("\\s+", " ", html)).strip() return text else: return "\n".join( [ re.sub(r"\s+", " ", re.sub(r"\<[^\>]+\>", " ", section)) for section in re.sub(r"\<\/?div\>|\<\/?p\>|\<br\>", "\n", html).split( "\n" ) ] )
[docs]def trim_get_parameters(url, session=None, timeout=30, user_agent=None): """ Takes a URL (presumed to be the final end point) and iterates over GET parameters, attempting to find optional ones that can be removed without generating any redirects. :param url: The URL to trim :type url: str :param session: (Optional) A persistent session that can optionally be passed (useful if you're processing many \ links at once) :type session: :py:class:`requests.Session` object :param user_agent: User agent for the auto-created requests Session to use, if a preconfigured requests Session \ is not provided :type user_agent: str :param timeout: Timeout for requests :type timeout: int or float :return: The original URL with optional GET parameters removed :rtype: str Usage:: from pewtils.http import trim_get_parameters >>> trim_get_parameters("https://httpbin.org/status/200?param=1") "https://httpbin.org/status/200" """ close_session = False if not session: close_session = True session = requests.Session() session.headers.update({"User-Agent": user_agent}) # Often there's extra information about social sharing and referral sources that can be removed ditch_params = [] parsed = urlparse.urlparse(url) if parsed.query: params = urlparse.parse_qs(parsed.query) for k, v in params.items(): # We iterate over all of the GET parameters and try holding each one out check = True for skipper in ["document", "article", "id", "qs"]: # If the parameter is named something that's probably a unique ID, we'll keep it if skipper in k.lower(): check = False for skipper in ["html", "http"]: # Same goes for parameters that contain URL information if skipper in v[0].lower(): check = False if check: new_params = { k2: v2[0] for k2, v2 in params.items() if k2 != k and len(v2) == 1 } new_params = urlparse.urlencode(new_params) new_parsed = parsed._replace(query=new_params) new_url = urlparse.urlunparse(new_parsed) try: resp = session.head(new_url, allow_redirects=True, timeout=timeout) except ReadTimeout: resp = None if is_not_null(resp): new_parsed = urlparse.urlparse(resp.url) if new_parsed.query != "" or new_parsed.path not in ["", "/"]: # If removing a parameter didn't redirect to a root domain... new_url = resp.url compare_new = ( new_url.split("?")[0] if "?" in new_url else new_url ) compare_old = url.split("?")[0] if "?" in url else url if compare_new == compare_old: # And the domain is the same as it was before, then the parameter was probably unnecessary ditch_params.append(k) if len(ditch_params) > 0: # Now we remove all of the unnecessary get parameters and finalize the URL new_params = { k: v[0] for k, v in params.items() if len(v) == 1 and k not in ditch_params } new_params = urlparse.urlencode(new_params) parsed = parsed._replace(query=new_params) url = urlparse.urlunparse(parsed) if close_session: session.close() return url
[docs]def extract_domain_from_url( url, include_subdomain=True, resolve_url=False, timeout=1.0, session=None, user_agent=None, expand_shorteners=True, ): """ Attempts to extract a standardized domain from a url by following the link and extracting the TLD. :param url: The link from which to extract the domain :type url: str :param include_subdomain: Whether or not to include the subdomain (e.g. 'news.google.com'); default is True :type include_subdomain: bool :param resolve_url: Whether to fully resolve the URL. If False (default), it will operate on the URL as-is; if \ True, the URL will be passed to :py:func:`pewtils.http.canonical_link` to be standardized prior to extracting the \ domain. :param timeout: (Optional, for use with ``resolve_url``) Maximum number of seconds to wait on a request before \ timing out (default is 1) :type timeout: int or float :param session: (Optional, for use with ``resolve_url``) A persistent session that can optionally be passed \ (useful if you're processing many links at once) :type session: :py:class:`requests.Session` object :param user_agent: (Optional, for use with ``resolve_url``) User agent for the auto-created requests Session to use, \ if a preconfigured requests Session is not provided :type user_agent: str :param expand_shorteners: If True, shortened URLs that don't successfully expand will be checked against a list \ of known URL shorteners and expanded if recognized. (Default = True) :type expand_shorteners: bool :return: The domain for the link :rtype: str .. note:: If ``resolve_url`` is set to True, the link will be standardized prior to domain extraction (in which \ case you can provide optional timeout, session, and user_agent parameters that will be passed to \ :py:func:`pewtils.http.canonical_link`). By default, however, the link will be operated on as-is. The final \ extracted domain is then checked against known URL shorteners (see :ref:`vanity_link_shorteners`) and if it \ is recognized, the expanded domain will be returned instead. Shortened URLs that are not standardized and \ do not follow patterns included in this dictionary of known shorteners may be returned with an incorrect domain. Usage:: from pewtils.http import extract_domain_from_url >>> extract_domain_from_url("http://forums.bbc.co.uk", include_subdomain=False) "bbc.co.uk" >>> extract_domain_from_url("http://forums.bbc.co.uk", include_subdomain=True) "forums.bbc.co.uk" """ if resolve_url: url = canonical_link( url, timeout=timeout, session=session, user_agent=user_agent ) domain = tldextract.extract(url) if domain: if include_subdomain and domain.subdomain and domain.subdomain != "www": domain = ".".join([domain.subdomain, domain.domain, domain.suffix]) else: domain = ".".join([domain.domain, domain.suffix]) if expand_shorteners: domain = VANITY_LINK_SHORTENERS.get(domain, domain) return domain