Module auto_osint_v.popular_information_finder
Finds entities (information) that is popular amongst the potentially corroborating sources.
Expand source code
"""Finds entities (information) that is popular amongst the potentially corroborating sources.
"""
import http.client
import itertools
from multiprocessing import Pool, Manager
import requests
import selenium.common.exceptions
from bs4 import BeautifulSoup
from tqdm import tqdm
from seleniumwire import webdriver
class PopularInformationFinder:
"""Finds the popular information amongst given sources
Class that provides methods that get text from sources and compares the number of times a
particular entity is mentioned.
"""
def __init__(self, file_handler_object, entity_processor_object):
"""Initialises the PopularInformationFinder object.
Args:
file_handler_object: gives the class access to the file_handler object.
entity_processor_object: gives the class access to the entity_processor object.
"""
# Lazy creation of class attribute.
try:
manager = getattr(type(self), 'manager')
except AttributeError:
manager = type(self).manager = Manager()
self.entities = manager.dict()
self.file_handler = file_handler_object
self.entity_processor = entity_processor_object
def get_text_process_entities(self, source):
"""Gets the body text from each source using its URL.
Uses requests and BeautifulSoup to retrieve and parse the webpage's HTML into a readable
format for entity recognition.
This method updates the 'self.sources' dictionary.
Args:
source: the individual source from the dictionary of sources.
Returns:
A list of key-value pairs (tuples).
Note: key-value pairs are required for the map function to construct a dictionary from.
"""
# define entities variable
entities = []
# define the url
url = source["url"]
# set headers to try to avoid 403 errors
headers = {
'User-Agent':
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/112.0.0.0 Safari/537.36'}
# request the webpage - if timeout, move on to next source
try:
response = requests.get(url, headers, timeout=5)
except (requests.exceptions.ReadTimeout, requests.exceptions.ConnectionError):
return entities
try:
content_type = response.headers['Content-Type']
except KeyError:
content_type = ''
if "application/javascript" in content_type or response.status_code != 200:
# using selenium to avoid 'JavaScript is not available.' error
options = webdriver.ChromeOptions()
options.headless = True
options.add_argument("start-maximized")
options.add_experimental_option("excludeSwitches", ["enable-automation"])
options.add_experimental_option('useAutomationExtension', False)
options.add_argument('--disable-blink-features=AutomationControlled')
options.add_argument(
"user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, "
"like Gecko) Chrome/98.0.4758.102 Safari/537.36")
try:
driver = webdriver.Chrome("chromedriver", chrome_options=options)
except (http.client.RemoteDisconnected,
selenium.common.exceptions.SessionNotCreatedException):
try:
driver.quit()
finally:
return entities
driver.set_page_load_timeout(5) # set timeout to 5 secs
# request the webpage. If source website timeout, return the current list of entities.
try:
driver.get(url)
except selenium.common.exceptions.TimeoutException:
driver.quit()
return entities
html = driver.page_source
# check if we are wasting our time with a broken or inaccessible website
try:
request = driver.wait_for_request(url, 5)
response = request.response
driver.quit()
except selenium.common.exceptions.TimeoutException:
driver.quit()
return entities
if request.response.status_code != 200:
driver.quit()
return entities
else:
html = response.text
# get the content type
try:
content_type = response.headers['Content-Type']
# if xml use xml parser
if content_type == "text/xml" or content_type == "application/xml":
# don't parse xml
return entities
else:
# parse using the lxml html parser
soup = BeautifulSoup(html, "lxml")
except KeyError:
# except on KeyError if no 'content-type' header exists
soup = BeautifulSoup(html, "lxml")
# kill all script and style elements
for script in soup(["script", "style"]):
script.extract() # rip it out
# get text
text = soup.get_text()
# break into lines and remove leading and trailing space on each
lines = (line.strip() for line in text.splitlines())
# break multi-headlines into a line each
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
# drop blank lines
text = '\n'.join(chunk for chunk in chunks if chunk)
# split into list
textlist = text.split('\n')
if len(text) <= 100000:
# run the text through the entity processor. stores entities in namesake variable
entities = self.entity_processor.get_entities_and_count(textlist, self.entities)
return entities
def find_entities(self, sources):
"""Finds entities in the given text.
Uses the same model for entity recognition in specific_entity_processor.
Looks like we need to scrap wikipedia articles because they are too long.
Articles over 100k characters are probably too long also.
Most slowdowns here have been due to Russia's wikipedia page.
Args:
sources: list of dictionaries of sources with corresponding URL.
Returns:
A list of the most popular words amongst all the sources.
"""
with Pool() as pool:
# sources = tqdm(sources) # add a progress bar
# calculate an even chunksize for the imap function using pool size (max processes)
chunksize = len(sources) / len(pool._pool)
if int(chunksize) < chunksize:
chunksize = int(chunksize) + 1
else:
chunksize = int(chunksize)
tmp = tqdm(pool.imap_unordered(self.get_text_process_entities, sources, chunksize),
total=len(sources), desc="Finding popular entities")
self.entities.update([tpl for sublist in tmp for tpl in sublist if tpl])
# sort list of dictionaries by highest no. of mentions.
# lambda function specifies sorted to use the values of the dictionary in desc. order
sorted_entities = sorted(self.entities.items(), key=lambda x: x[1], reverse=True)
# keep top 10% of popular entities, and no greater than 30 entities.
cut_off_index = len(sorted_entities) * 0.10
cut_off_index = int(min(cut_off_index, 30))
# truncate the list based on cut_off_index
sorted_entities = itertools.islice(sorted_entities, cut_off_index)
sorted_entities_words = list(word for (word, count) in sorted_entities)
return sorted_entities_words
Classes
class PopularInformationFinder (file_handler_object, entity_processor_object)
-
Finds the popular information amongst given sources
Class that provides methods that get text from sources and compares the number of times a particular entity is mentioned.
Initialises the PopularInformationFinder object.
Args
file_handler_object
- gives the class access to the file_handler object.
entity_processor_object
- gives the class access to the entity_processor object.
Expand source code
class PopularInformationFinder: """Finds the popular information amongst given sources Class that provides methods that get text from sources and compares the number of times a particular entity is mentioned. """ def __init__(self, file_handler_object, entity_processor_object): """Initialises the PopularInformationFinder object. Args: file_handler_object: gives the class access to the file_handler object. entity_processor_object: gives the class access to the entity_processor object. """ # Lazy creation of class attribute. try: manager = getattr(type(self), 'manager') except AttributeError: manager = type(self).manager = Manager() self.entities = manager.dict() self.file_handler = file_handler_object self.entity_processor = entity_processor_object def get_text_process_entities(self, source): """Gets the body text from each source using its URL. Uses requests and BeautifulSoup to retrieve and parse the webpage's HTML into a readable format for entity recognition. This method updates the 'self.sources' dictionary. Args: source: the individual source from the dictionary of sources. Returns: A list of key-value pairs (tuples). Note: key-value pairs are required for the map function to construct a dictionary from. """ # define entities variable entities = [] # define the url url = source["url"] # set headers to try to avoid 403 errors headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/112.0.0.0 Safari/537.36'} # request the webpage - if timeout, move on to next source try: response = requests.get(url, headers, timeout=5) except (requests.exceptions.ReadTimeout, requests.exceptions.ConnectionError): return entities try: content_type = response.headers['Content-Type'] except KeyError: content_type = '' if "application/javascript" in content_type or response.status_code != 200: # using selenium to avoid 'JavaScript is not available.' error options = webdriver.ChromeOptions() options.headless = True options.add_argument("start-maximized") options.add_experimental_option("excludeSwitches", ["enable-automation"]) options.add_experimental_option('useAutomationExtension', False) options.add_argument('--disable-blink-features=AutomationControlled') options.add_argument( "user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, " "like Gecko) Chrome/98.0.4758.102 Safari/537.36") try: driver = webdriver.Chrome("chromedriver", chrome_options=options) except (http.client.RemoteDisconnected, selenium.common.exceptions.SessionNotCreatedException): try: driver.quit() finally: return entities driver.set_page_load_timeout(5) # set timeout to 5 secs # request the webpage. If source website timeout, return the current list of entities. try: driver.get(url) except selenium.common.exceptions.TimeoutException: driver.quit() return entities html = driver.page_source # check if we are wasting our time with a broken or inaccessible website try: request = driver.wait_for_request(url, 5) response = request.response driver.quit() except selenium.common.exceptions.TimeoutException: driver.quit() return entities if request.response.status_code != 200: driver.quit() return entities else: html = response.text # get the content type try: content_type = response.headers['Content-Type'] # if xml use xml parser if content_type == "text/xml" or content_type == "application/xml": # don't parse xml return entities else: # parse using the lxml html parser soup = BeautifulSoup(html, "lxml") except KeyError: # except on KeyError if no 'content-type' header exists soup = BeautifulSoup(html, "lxml") # kill all script and style elements for script in soup(["script", "style"]): script.extract() # rip it out # get text text = soup.get_text() # break into lines and remove leading and trailing space on each lines = (line.strip() for line in text.splitlines()) # break multi-headlines into a line each chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) # drop blank lines text = '\n'.join(chunk for chunk in chunks if chunk) # split into list textlist = text.split('\n') if len(text) <= 100000: # run the text through the entity processor. stores entities in namesake variable entities = self.entity_processor.get_entities_and_count(textlist, self.entities) return entities def find_entities(self, sources): """Finds entities in the given text. Uses the same model for entity recognition in specific_entity_processor. Looks like we need to scrap wikipedia articles because they are too long. Articles over 100k characters are probably too long also. Most slowdowns here have been due to Russia's wikipedia page. Args: sources: list of dictionaries of sources with corresponding URL. Returns: A list of the most popular words amongst all the sources. """ with Pool() as pool: # sources = tqdm(sources) # add a progress bar # calculate an even chunksize for the imap function using pool size (max processes) chunksize = len(sources) / len(pool._pool) if int(chunksize) < chunksize: chunksize = int(chunksize) + 1 else: chunksize = int(chunksize) tmp = tqdm(pool.imap_unordered(self.get_text_process_entities, sources, chunksize), total=len(sources), desc="Finding popular entities") self.entities.update([tpl for sublist in tmp for tpl in sublist if tpl]) # sort list of dictionaries by highest no. of mentions. # lambda function specifies sorted to use the values of the dictionary in desc. order sorted_entities = sorted(self.entities.items(), key=lambda x: x[1], reverse=True) # keep top 10% of popular entities, and no greater than 30 entities. cut_off_index = len(sorted_entities) * 0.10 cut_off_index = int(min(cut_off_index, 30)) # truncate the list based on cut_off_index sorted_entities = itertools.islice(sorted_entities, cut_off_index) sorted_entities_words = list(word for (word, count) in sorted_entities) return sorted_entities_words
Methods
def find_entities(self, sources)
-
Finds entities in the given text.
Uses the same model for entity recognition in specific_entity_processor.
Looks like we need to scrap wikipedia articles because they are too long. Articles over 100k characters are probably too long also. Most slowdowns here have been due to Russia's wikipedia page.
Args
sources
- list of dictionaries of sources with corresponding URL.
Returns
A list of the most popular words amongst all the sources.
Expand source code
def find_entities(self, sources): """Finds entities in the given text. Uses the same model for entity recognition in specific_entity_processor. Looks like we need to scrap wikipedia articles because they are too long. Articles over 100k characters are probably too long also. Most slowdowns here have been due to Russia's wikipedia page. Args: sources: list of dictionaries of sources with corresponding URL. Returns: A list of the most popular words amongst all the sources. """ with Pool() as pool: # sources = tqdm(sources) # add a progress bar # calculate an even chunksize for the imap function using pool size (max processes) chunksize = len(sources) / len(pool._pool) if int(chunksize) < chunksize: chunksize = int(chunksize) + 1 else: chunksize = int(chunksize) tmp = tqdm(pool.imap_unordered(self.get_text_process_entities, sources, chunksize), total=len(sources), desc="Finding popular entities") self.entities.update([tpl for sublist in tmp for tpl in sublist if tpl]) # sort list of dictionaries by highest no. of mentions. # lambda function specifies sorted to use the values of the dictionary in desc. order sorted_entities = sorted(self.entities.items(), key=lambda x: x[1], reverse=True) # keep top 10% of popular entities, and no greater than 30 entities. cut_off_index = len(sorted_entities) * 0.10 cut_off_index = int(min(cut_off_index, 30)) # truncate the list based on cut_off_index sorted_entities = itertools.islice(sorted_entities, cut_off_index) sorted_entities_words = list(word for (word, count) in sorted_entities) return sorted_entities_words
def get_text_process_entities(self, source)
-
Gets the body text from each source using its URL.
Uses requests and BeautifulSoup to retrieve and parse the webpage's HTML into a readable format for entity recognition.
This method updates the 'self.sources' dictionary.
Args
source
- the individual source from the dictionary of sources.
Returns
- A list of key-value pairs (tuples).
Note
- key-value pairs are required for the map function to construct a dictionary from.
Expand source code
def get_text_process_entities(self, source): """Gets the body text from each source using its URL. Uses requests and BeautifulSoup to retrieve and parse the webpage's HTML into a readable format for entity recognition. This method updates the 'self.sources' dictionary. Args: source: the individual source from the dictionary of sources. Returns: A list of key-value pairs (tuples). Note: key-value pairs are required for the map function to construct a dictionary from. """ # define entities variable entities = [] # define the url url = source["url"] # set headers to try to avoid 403 errors headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/112.0.0.0 Safari/537.36'} # request the webpage - if timeout, move on to next source try: response = requests.get(url, headers, timeout=5) except (requests.exceptions.ReadTimeout, requests.exceptions.ConnectionError): return entities try: content_type = response.headers['Content-Type'] except KeyError: content_type = '' if "application/javascript" in content_type or response.status_code != 200: # using selenium to avoid 'JavaScript is not available.' error options = webdriver.ChromeOptions() options.headless = True options.add_argument("start-maximized") options.add_experimental_option("excludeSwitches", ["enable-automation"]) options.add_experimental_option('useAutomationExtension', False) options.add_argument('--disable-blink-features=AutomationControlled') options.add_argument( "user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, " "like Gecko) Chrome/98.0.4758.102 Safari/537.36") try: driver = webdriver.Chrome("chromedriver", chrome_options=options) except (http.client.RemoteDisconnected, selenium.common.exceptions.SessionNotCreatedException): try: driver.quit() finally: return entities driver.set_page_load_timeout(5) # set timeout to 5 secs # request the webpage. If source website timeout, return the current list of entities. try: driver.get(url) except selenium.common.exceptions.TimeoutException: driver.quit() return entities html = driver.page_source # check if we are wasting our time with a broken or inaccessible website try: request = driver.wait_for_request(url, 5) response = request.response driver.quit() except selenium.common.exceptions.TimeoutException: driver.quit() return entities if request.response.status_code != 200: driver.quit() return entities else: html = response.text # get the content type try: content_type = response.headers['Content-Type'] # if xml use xml parser if content_type == "text/xml" or content_type == "application/xml": # don't parse xml return entities else: # parse using the lxml html parser soup = BeautifulSoup(html, "lxml") except KeyError: # except on KeyError if no 'content-type' header exists soup = BeautifulSoup(html, "lxml") # kill all script and style elements for script in soup(["script", "style"]): script.extract() # rip it out # get text text = soup.get_text() # break into lines and remove leading and trailing space on each lines = (line.strip() for line in text.splitlines()) # break multi-headlines into a line each chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) # drop blank lines text = '\n'.join(chunk for chunk in chunks if chunk) # split into list textlist = text.split('\n') if len(text) <= 100000: # run the text through the entity processor. stores entities in namesake variable entities = self.entity_processor.get_entities_and_count(textlist, self.entities) return entities