Module auto_osint_v.priority_manager
This module assigns scores to each source, prioritising the most relevant sources.
Expand source code
"""This module assigns scores to each source, prioritising the most relevant sources.
"""
import http.client
import inspect
from typing import List
from multiprocessing import Pool
import requests
import selenium.common.exceptions
from tqdm import tqdm
from bs4 import BeautifulSoup
from seleniumwire import webdriver
from auto_osint_v.popular_information_finder import PopularInformationFinder
def count_entities(entities, source_text):
"""Counts the number of entities appearing in a given source.
Args:
entities: the entities to look for.
source_text: the source text to look for entities within.
Returns:
Integer number of appearances of all entities in the source
"""
entity_count = 0
for entity in entities:
# built-in method find() returns the index of a given substring within
# a string, returns -1 if not found
if source_text.find(entity) != -1:
entity_count += 1
return entity_count
class PriorityManager:
"""Provides methods for assigning source scores based on relevancy to the user's statement.
"""
def __init__(self, fh_object, entity_processor_object, potential_corroboration: List[dict]):
"""Initialises the PriorityManager object.
Args:
fh_object: file handler object to use for extracting info from data files.
entity_processor_object: object to use for processing entities
potential_corroboration: list of dictionaries of source information.
"""
self._target_entity_multiplier = 10 # multiplier for mentions of target info
self._popular_entity_multiplier = 5 # multiplier for mentions of popular info
self._entities = []
self.file_handler = fh_object
self.entity_processor = entity_processor_object
self.sources = potential_corroboration
def manager(self):
"""This method controls the order of execution for counting target and popular info.
Returns:
self.sources: list of dictionaries of source information
"""
self.target_info_scorer() # generates a score for each source
# remove sources with 0 score (or could remove bottom x% of sources)
self.remove_sources()
# clear entities list
self._entities.clear()
# generate a popular info score for each source
self.popular_info_scorer()
# sort sources by score in descending order
self.sort_sources_desc()
# return scored sources list(dict)
return self.sources
def get_sources(self):
"""Method for getting the list of source dictionaries."""
return self.sources
@staticmethod
def get_text_from_site(url):
"""Gets the body text from each source using its URL.
Uses requests and BeautifulSoup to retrieve and parse the webpage's HTML into a readable
format for entity recognition.
Args:
url: url fetched from sources dictionary.
Returns:
The content of the webpage in UTF-8 format.
"""
# initialise the webpage text variable
text = ""
# set headers to try to avoid 403 errors
headers = {
'User-Agent':
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/112.0.0.0 Safari/537.36'}
# request the webpage - if timeout, move on to next source
try:
response = requests.get(url, headers, timeout=5)
except (requests.exceptions.ReadTimeout, requests.exceptions.ConnectionError):
return text
try:
content_type = response.headers['Content-Type']
except KeyError:
content_type = ''
if "application/javascript" in content_type or response.status_code != 200:
# using selenium to avoid 'JavaScript is not available." error
options = webdriver.ChromeOptions()
options.headless = True
options.add_argument("start-maximized")
options.add_experimental_option("excludeSwitches", ["enable-automation"])
options.add_experimental_option('useAutomationExtension', False)
options.add_argument('--disable-blink-features=AutomationControlled')
options.add_argument(
"user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, "
"like Gecko) Chrome/98.0.4758.102 Safari/537.36")
try:
driver = webdriver.Chrome("chromedriver", chrome_options=options)
except (http.client.RemoteDisconnected,
selenium.common.exceptions.SessionNotCreatedException):
try:
driver.quit()
finally:
return text
# driver.set_page_load_timeout(5) # set timeout to 5 secs
# request the webpage. If source website timeout, return the current list of entities.
try:
driver.get(url)
except selenium.common.exceptions.TimeoutException:
driver.quit()
return text
html = driver.page_source
# check if we are wasting our time with a broken or inaccessible website
try:
request = driver.wait_for_request(url, 5)
response = request.response
driver.quit()
except selenium.common.exceptions.TimeoutException:
driver.quit()
return text
if request.response.status_code in {400, 401, 403, 404, 429}:
driver.quit()
return text
else:
html = response.text
# get the content type
try:
# response is either the requests or selenium response
content_type = response.headers['Content-Type']
# if xml use xml parser
if content_type == "text/xml" or content_type == "application/xml":
# don't parse xml
return text
else:
# parse using the lxml html parser
soup = BeautifulSoup(html, "lxml")
except KeyError:
# except on KeyError if no 'content-type' header exists
soup = BeautifulSoup(html, "lxml")
# kill all script and style elements
for script in soup(["script", "style"]):
script.extract() # rip it out
# get text
text = soup.get_text()
# break into lines and remove leading and trailing space on each
lines = (line.strip() for line in text.splitlines())
# break multi-headlines into a line each
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
# drop blank lines
text = '\n'.join(chunk for chunk in chunks if chunk)
# return string text formatted and with line breaks
return text
def target_info_scorer(self):
"""Assigns scores based on the amount of target entities identified.
Updates the 'self.sources' list of dicts
"""
# Gather saved target entities
self._entities = self.file_handler.get_keywords_from_target_info()
# Count number of appearances in each source
# for source in tqdm(self.sources, desc="Counting target entity appearances in "
# "sources"):
with Pool() as pool:
self.sources = list(tqdm(pool.imap_unordered(self.get_text_get_score_target_inf,
self.sources), total=len(self.sources),
desc="Assigning scores to sources based on target info"))
# Updated 'self.sources' list of dictionaries
def popular_info_scorer(self):
"""Assigns scores to each source based on the amount of popular entities identified.
Updates the 'self.sources' list of dicts.
"""
# initialise popular info finder object
popular_info_object = PopularInformationFinder(self.file_handler, self.entity_processor)
# Gather popular entities
entities = popular_info_object.find_entities(self.sources)
self._entities = entities
# Count number of appearances in each source
# new approach using multiprocessing map function
with Pool() as pool:
self.sources = list(tqdm(pool.imap_unordered(self.get_text_get_score_pop_inf,
self.sources), total=len(self.sources),
desc="Assigning scores to sources based on popular info"))
# Updated 'self.sources' list of dictionaries
def get_text_get_score_target_inf(self, source):
"""Gets the text from the source URL and assigns a score.
Args:
source: the individual source dictionary of information
Returns:
the updated source dictionary with a new 'score' field.
"""
# get the text from the source
text = self.get_text_from_site(source["url"])
# return score for target info
score = int(count_entities(self._entities, text) * self._target_entity_multiplier)
# adds score to the source dictionary
try:
source["score"] += score
except KeyError:
source["score"] = score
return source
def get_text_get_score_pop_inf(self, source):
"""Gets the text from the source URL and assigns a score.
Args:
source: the individual source dictionary of information
Returns:
the updated source dictionary with a new 'score' field.
"""
# get the text from the source
text = self.get_text_from_site(source["url"])
# return score for target info
score = int(count_entities(self._entities, text) * self._target_entity_multiplier)
# adds score to the source dictionary
try:
source["score"] += score
except KeyError:
source["score"] = score
return source
def get_text_assign_score(self, source):
"""Gets the text from the source URL and examines it to count the number of entities.
Args:
source: the individual source dictionary of information.
"""
# get the text from the source
text = self.get_text_from_site(source["url"])
# assign score based on entity appearance count
# use different multiplier depending on which method has called 'get_text_assign_score()'
if inspect.stack()[1].function == "target_info_scorer()":
score = count_entities(self._entities, text) * self._target_entity_multiplier
elif inspect.stack()[1].function == "popular_info_scorer()":
score = count_entities(self._entities, text) * self._popular_entity_multiplier
else:
score = 10
# adds score to the source dictionary
try:
source["score"] += score
except KeyError:
source["score"] = score
return source
def remove_sources(self):
"""Removes sources that have a score of 0."""
self.sources = [dict_ for dict_ in self.sources if dict_["score"] != 0]
def sort_sources_desc(self):
"""Sorts the 'self.sources' list of dicts in descending order based on score."""
# sort sources list of dictionaries by highest score.
# lambda function specifies sorted to use the values of the dictionary in desc. order
self.sources.sort(key=lambda x: x["score"], reverse=True)
Functions
def count_entities(entities, source_text)
-
Counts the number of entities appearing in a given source.
Args
entities
- the entities to look for.
source_text
- the source text to look for entities within.
Returns
Integer number of appearances of all entities in the source
Expand source code
def count_entities(entities, source_text): """Counts the number of entities appearing in a given source. Args: entities: the entities to look for. source_text: the source text to look for entities within. Returns: Integer number of appearances of all entities in the source """ entity_count = 0 for entity in entities: # built-in method find() returns the index of a given substring within # a string, returns -1 if not found if source_text.find(entity) != -1: entity_count += 1 return entity_count
Classes
class PriorityManager (fh_object, entity_processor_object, potential_corroboration: List[dict])
-
Provides methods for assigning source scores based on relevancy to the user's statement.
Initialises the PriorityManager object.
Args
fh_object
- file handler object to use for extracting info from data files.
entity_processor_object
- object to use for processing entities
potential_corroboration
- list of dictionaries of source information.
Expand source code
class PriorityManager: """Provides methods for assigning source scores based on relevancy to the user's statement. """ def __init__(self, fh_object, entity_processor_object, potential_corroboration: List[dict]): """Initialises the PriorityManager object. Args: fh_object: file handler object to use for extracting info from data files. entity_processor_object: object to use for processing entities potential_corroboration: list of dictionaries of source information. """ self._target_entity_multiplier = 10 # multiplier for mentions of target info self._popular_entity_multiplier = 5 # multiplier for mentions of popular info self._entities = [] self.file_handler = fh_object self.entity_processor = entity_processor_object self.sources = potential_corroboration def manager(self): """This method controls the order of execution for counting target and popular info. Returns: self.sources: list of dictionaries of source information """ self.target_info_scorer() # generates a score for each source # remove sources with 0 score (or could remove bottom x% of sources) self.remove_sources() # clear entities list self._entities.clear() # generate a popular info score for each source self.popular_info_scorer() # sort sources by score in descending order self.sort_sources_desc() # return scored sources list(dict) return self.sources def get_sources(self): """Method for getting the list of source dictionaries.""" return self.sources @staticmethod def get_text_from_site(url): """Gets the body text from each source using its URL. Uses requests and BeautifulSoup to retrieve and parse the webpage's HTML into a readable format for entity recognition. Args: url: url fetched from sources dictionary. Returns: The content of the webpage in UTF-8 format. """ # initialise the webpage text variable text = "" # set headers to try to avoid 403 errors headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/112.0.0.0 Safari/537.36'} # request the webpage - if timeout, move on to next source try: response = requests.get(url, headers, timeout=5) except (requests.exceptions.ReadTimeout, requests.exceptions.ConnectionError): return text try: content_type = response.headers['Content-Type'] except KeyError: content_type = '' if "application/javascript" in content_type or response.status_code != 200: # using selenium to avoid 'JavaScript is not available." error options = webdriver.ChromeOptions() options.headless = True options.add_argument("start-maximized") options.add_experimental_option("excludeSwitches", ["enable-automation"]) options.add_experimental_option('useAutomationExtension', False) options.add_argument('--disable-blink-features=AutomationControlled') options.add_argument( "user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, " "like Gecko) Chrome/98.0.4758.102 Safari/537.36") try: driver = webdriver.Chrome("chromedriver", chrome_options=options) except (http.client.RemoteDisconnected, selenium.common.exceptions.SessionNotCreatedException): try: driver.quit() finally: return text # driver.set_page_load_timeout(5) # set timeout to 5 secs # request the webpage. If source website timeout, return the current list of entities. try: driver.get(url) except selenium.common.exceptions.TimeoutException: driver.quit() return text html = driver.page_source # check if we are wasting our time with a broken or inaccessible website try: request = driver.wait_for_request(url, 5) response = request.response driver.quit() except selenium.common.exceptions.TimeoutException: driver.quit() return text if request.response.status_code in {400, 401, 403, 404, 429}: driver.quit() return text else: html = response.text # get the content type try: # response is either the requests or selenium response content_type = response.headers['Content-Type'] # if xml use xml parser if content_type == "text/xml" or content_type == "application/xml": # don't parse xml return text else: # parse using the lxml html parser soup = BeautifulSoup(html, "lxml") except KeyError: # except on KeyError if no 'content-type' header exists soup = BeautifulSoup(html, "lxml") # kill all script and style elements for script in soup(["script", "style"]): script.extract() # rip it out # get text text = soup.get_text() # break into lines and remove leading and trailing space on each lines = (line.strip() for line in text.splitlines()) # break multi-headlines into a line each chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) # drop blank lines text = '\n'.join(chunk for chunk in chunks if chunk) # return string text formatted and with line breaks return text def target_info_scorer(self): """Assigns scores based on the amount of target entities identified. Updates the 'self.sources' list of dicts """ # Gather saved target entities self._entities = self.file_handler.get_keywords_from_target_info() # Count number of appearances in each source # for source in tqdm(self.sources, desc="Counting target entity appearances in " # "sources"): with Pool() as pool: self.sources = list(tqdm(pool.imap_unordered(self.get_text_get_score_target_inf, self.sources), total=len(self.sources), desc="Assigning scores to sources based on target info")) # Updated 'self.sources' list of dictionaries def popular_info_scorer(self): """Assigns scores to each source based on the amount of popular entities identified. Updates the 'self.sources' list of dicts. """ # initialise popular info finder object popular_info_object = PopularInformationFinder(self.file_handler, self.entity_processor) # Gather popular entities entities = popular_info_object.find_entities(self.sources) self._entities = entities # Count number of appearances in each source # new approach using multiprocessing map function with Pool() as pool: self.sources = list(tqdm(pool.imap_unordered(self.get_text_get_score_pop_inf, self.sources), total=len(self.sources), desc="Assigning scores to sources based on popular info")) # Updated 'self.sources' list of dictionaries def get_text_get_score_target_inf(self, source): """Gets the text from the source URL and assigns a score. Args: source: the individual source dictionary of information Returns: the updated source dictionary with a new 'score' field. """ # get the text from the source text = self.get_text_from_site(source["url"]) # return score for target info score = int(count_entities(self._entities, text) * self._target_entity_multiplier) # adds score to the source dictionary try: source["score"] += score except KeyError: source["score"] = score return source def get_text_get_score_pop_inf(self, source): """Gets the text from the source URL and assigns a score. Args: source: the individual source dictionary of information Returns: the updated source dictionary with a new 'score' field. """ # get the text from the source text = self.get_text_from_site(source["url"]) # return score for target info score = int(count_entities(self._entities, text) * self._target_entity_multiplier) # adds score to the source dictionary try: source["score"] += score except KeyError: source["score"] = score return source def get_text_assign_score(self, source): """Gets the text from the source URL and examines it to count the number of entities. Args: source: the individual source dictionary of information. """ # get the text from the source text = self.get_text_from_site(source["url"]) # assign score based on entity appearance count # use different multiplier depending on which method has called 'get_text_assign_score()' if inspect.stack()[1].function == "target_info_scorer()": score = count_entities(self._entities, text) * self._target_entity_multiplier elif inspect.stack()[1].function == "popular_info_scorer()": score = count_entities(self._entities, text) * self._popular_entity_multiplier else: score = 10 # adds score to the source dictionary try: source["score"] += score except KeyError: source["score"] = score return source def remove_sources(self): """Removes sources that have a score of 0.""" self.sources = [dict_ for dict_ in self.sources if dict_["score"] != 0] def sort_sources_desc(self): """Sorts the 'self.sources' list of dicts in descending order based on score.""" # sort sources list of dictionaries by highest score. # lambda function specifies sorted to use the values of the dictionary in desc. order self.sources.sort(key=lambda x: x["score"], reverse=True)
Static methods
def get_text_from_site(url)
-
Gets the body text from each source using its URL.
Uses requests and BeautifulSoup to retrieve and parse the webpage's HTML into a readable format for entity recognition.
Args
url
- url fetched from sources dictionary.
Returns
The content of the webpage in UTF-8 format.
Expand source code
@staticmethod def get_text_from_site(url): """Gets the body text from each source using its URL. Uses requests and BeautifulSoup to retrieve and parse the webpage's HTML into a readable format for entity recognition. Args: url: url fetched from sources dictionary. Returns: The content of the webpage in UTF-8 format. """ # initialise the webpage text variable text = "" # set headers to try to avoid 403 errors headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/112.0.0.0 Safari/537.36'} # request the webpage - if timeout, move on to next source try: response = requests.get(url, headers, timeout=5) except (requests.exceptions.ReadTimeout, requests.exceptions.ConnectionError): return text try: content_type = response.headers['Content-Type'] except KeyError: content_type = '' if "application/javascript" in content_type or response.status_code != 200: # using selenium to avoid 'JavaScript is not available." error options = webdriver.ChromeOptions() options.headless = True options.add_argument("start-maximized") options.add_experimental_option("excludeSwitches", ["enable-automation"]) options.add_experimental_option('useAutomationExtension', False) options.add_argument('--disable-blink-features=AutomationControlled') options.add_argument( "user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, " "like Gecko) Chrome/98.0.4758.102 Safari/537.36") try: driver = webdriver.Chrome("chromedriver", chrome_options=options) except (http.client.RemoteDisconnected, selenium.common.exceptions.SessionNotCreatedException): try: driver.quit() finally: return text # driver.set_page_load_timeout(5) # set timeout to 5 secs # request the webpage. If source website timeout, return the current list of entities. try: driver.get(url) except selenium.common.exceptions.TimeoutException: driver.quit() return text html = driver.page_source # check if we are wasting our time with a broken or inaccessible website try: request = driver.wait_for_request(url, 5) response = request.response driver.quit() except selenium.common.exceptions.TimeoutException: driver.quit() return text if request.response.status_code in {400, 401, 403, 404, 429}: driver.quit() return text else: html = response.text # get the content type try: # response is either the requests or selenium response content_type = response.headers['Content-Type'] # if xml use xml parser if content_type == "text/xml" or content_type == "application/xml": # don't parse xml return text else: # parse using the lxml html parser soup = BeautifulSoup(html, "lxml") except KeyError: # except on KeyError if no 'content-type' header exists soup = BeautifulSoup(html, "lxml") # kill all script and style elements for script in soup(["script", "style"]): script.extract() # rip it out # get text text = soup.get_text() # break into lines and remove leading and trailing space on each lines = (line.strip() for line in text.splitlines()) # break multi-headlines into a line each chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) # drop blank lines text = '\n'.join(chunk for chunk in chunks if chunk) # return string text formatted and with line breaks return text
Methods
def get_sources(self)
-
Method for getting the list of source dictionaries.
Expand source code
def get_sources(self): """Method for getting the list of source dictionaries.""" return self.sources
def get_text_assign_score(self, source)
-
Gets the text from the source URL and examines it to count the number of entities.
Args
source
- the individual source dictionary of information.
Expand source code
def get_text_assign_score(self, source): """Gets the text from the source URL and examines it to count the number of entities. Args: source: the individual source dictionary of information. """ # get the text from the source text = self.get_text_from_site(source["url"]) # assign score based on entity appearance count # use different multiplier depending on which method has called 'get_text_assign_score()' if inspect.stack()[1].function == "target_info_scorer()": score = count_entities(self._entities, text) * self._target_entity_multiplier elif inspect.stack()[1].function == "popular_info_scorer()": score = count_entities(self._entities, text) * self._popular_entity_multiplier else: score = 10 # adds score to the source dictionary try: source["score"] += score except KeyError: source["score"] = score return source
def get_text_get_score_pop_inf(self, source)
-
Gets the text from the source URL and assigns a score.
Args
source
- the individual source dictionary of information
Returns
the updated source dictionary with a new 'score' field.
Expand source code
def get_text_get_score_pop_inf(self, source): """Gets the text from the source URL and assigns a score. Args: source: the individual source dictionary of information Returns: the updated source dictionary with a new 'score' field. """ # get the text from the source text = self.get_text_from_site(source["url"]) # return score for target info score = int(count_entities(self._entities, text) * self._target_entity_multiplier) # adds score to the source dictionary try: source["score"] += score except KeyError: source["score"] = score return source
def get_text_get_score_target_inf(self, source)
-
Gets the text from the source URL and assigns a score.
Args
source
- the individual source dictionary of information
Returns
the updated source dictionary with a new 'score' field.
Expand source code
def get_text_get_score_target_inf(self, source): """Gets the text from the source URL and assigns a score. Args: source: the individual source dictionary of information Returns: the updated source dictionary with a new 'score' field. """ # get the text from the source text = self.get_text_from_site(source["url"]) # return score for target info score = int(count_entities(self._entities, text) * self._target_entity_multiplier) # adds score to the source dictionary try: source["score"] += score except KeyError: source["score"] = score return source
def manager(self)
-
This method controls the order of execution for counting target and popular info.
Returns
self.sources
- list of dictionaries of source information
Expand source code
def manager(self): """This method controls the order of execution for counting target and popular info. Returns: self.sources: list of dictionaries of source information """ self.target_info_scorer() # generates a score for each source # remove sources with 0 score (or could remove bottom x% of sources) self.remove_sources() # clear entities list self._entities.clear() # generate a popular info score for each source self.popular_info_scorer() # sort sources by score in descending order self.sort_sources_desc() # return scored sources list(dict) return self.sources
def popular_info_scorer(self)
-
Assigns scores to each source based on the amount of popular entities identified.
Updates the 'self.sources' list of dicts.
Expand source code
def popular_info_scorer(self): """Assigns scores to each source based on the amount of popular entities identified. Updates the 'self.sources' list of dicts. """ # initialise popular info finder object popular_info_object = PopularInformationFinder(self.file_handler, self.entity_processor) # Gather popular entities entities = popular_info_object.find_entities(self.sources) self._entities = entities # Count number of appearances in each source # new approach using multiprocessing map function with Pool() as pool: self.sources = list(tqdm(pool.imap_unordered(self.get_text_get_score_pop_inf, self.sources), total=len(self.sources), desc="Assigning scores to sources based on popular info")) # Updated 'self.sources' list of dictionaries
def remove_sources(self)
-
Removes sources that have a score of 0.
Expand source code
def remove_sources(self): """Removes sources that have a score of 0.""" self.sources = [dict_ for dict_ in self.sources if dict_["score"] != 0]
def sort_sources_desc(self)
-
Sorts the 'self.sources' list of dicts in descending order based on score.
Expand source code
def sort_sources_desc(self): """Sorts the 'self.sources' list of dicts in descending order based on score.""" # sort sources list of dictionaries by highest score. # lambda function specifies sorted to use the values of the dictionary in desc. order self.sources.sort(key=lambda x: x["score"], reverse=True)
def target_info_scorer(self)
-
Assigns scores based on the amount of target entities identified.
Updates the 'self.sources' list of dicts
Expand source code
def target_info_scorer(self): """Assigns scores based on the amount of target entities identified. Updates the 'self.sources' list of dicts """ # Gather saved target entities self._entities = self.file_handler.get_keywords_from_target_info() # Count number of appearances in each source # for source in tqdm(self.sources, desc="Counting target entity appearances in " # "sources"): with Pool() as pool: self.sources = list(tqdm(pool.imap_unordered(self.get_text_get_score_target_inf, self.sources), total=len(self.sources), desc="Assigning scores to sources based on target info")) # Updated 'self.sources' list of dictionaries