Source code for textdirectory.textdirectory

# -*- coding: utf-8 -*-

"""Main module."""
import os
import sys
import difflib
from functools import wraps
from pathlib import Path
import numpy as np

sys.path.insert(0, os.path.abspath('..'))
from textdirectory import transformations
from textdirectory import helpers


[docs]class TextDirectory: def __init__(self, directory, encoding='utf8', autoload=False): """ :param directory: path to the text directory :type directory: str """ self.directory = Path(directory) self.files = [] self.filenames = [] self.aggregation = [] self.staged_transformations = [] self.applied_filters = [] self.aggregation_states = [] self.current_state = 0 self.encoding = encoding self.iterator = 0 if not self.directory.exists(): raise NotADirectoryError if autoload: self.load_files() def __iter__(self): self.iterator = 0 return self def __next__(self): if self.iterator < len(self.aggregation): file = self.files[self.aggregation[self.iterator]] self.iterator += 1 return file else: raise StopIteration() def __str__(self): self.print_aggregation()
[docs] def save_aggregation_state(self): """Saves the current self.aggregation state.""" current_state = [] for file in self.get_aggregation(): #A pointer would be great! current_state.append(self.files.index(file)) self.aggregation_states.append([current_state, list(self.applied_filters)]) self.current_state = len(self.aggregation_states)
[docs] def load_aggregation_state(self, state=0): """ :param back: how many filter operations to go back :type back: int """ if state in range(len(self.aggregation_states)): aggregation = [] previous_aggregation = self.aggregation_states[state] for file_id in previous_aggregation[0]: aggregation.append(file_id) self.aggregation = aggregation self.applied_filters = previous_aggregation[1] self.current_state = state else: raise ValueError
[docs] def get_aggregation(self): """A generator that provides the current aggregation.""" for file_id in self.aggregation: yield self.files[file_id]
[docs] def set_aggregation(self, aggregation): """Set the aggregation.""" self.aggregation = [] for file in aggregation: self.aggregation.append(self.files.index(file))
[docs] def filter(filter): """A wrapper for filters.""" @wraps(filter) def filter_wrapper(*args, **kwargs): self = args[0] self.applied_filters.append(filter.__name__) self.save_aggregation_state() return filter(*args, **kwargs) return filter_wrapper
[docs] def get_file_length(self, path): """ :param path: path to a textfile :return: the files length in characters """ with path.open(encoding=self.encoding, errors='ignore') as f: fr = f.read() return len(fr)
[docs] def get_file_tokens(self, path): """ :param path: path to a textfile :return: the files length in tokens """ with path.open(encoding=self.encoding, errors='ignore') as f: # Replace all line breaks with spaces fr = f.read().replace('\n', ' ') return len(fr.split(' '))
[docs] def get_text(self, file_id): """ :param file_id: the file_id in files :return: the (transformed) text of the given file """ if self.files[file_id]['transformed_text']: return self.files[file_id]['transformed_text'] else: with self.files[file_id]['path'].open(encoding=self.encoding, errors='ignore') as f: return f.read()
[docs] def load_files(self, recursive=True, sort=True, filetype='txt'): """ :param recursive: recursive search :type recursive: bool :param sort: sort the files by name :type sort: bool :param filetype: filetype to look for (e.g. txt) :type filetype: str """ if recursive: if filetype == '*': files = list(self.directory.glob('**/*.*')) else: files = list(self.directory.glob('**/*.' + filetype)) else: if filetype == '*': files = list(self.directory.glob('*.*')) else: files = list(self.directory.glob('*.' + filetype)) if len(files) > 0: if sort: files.sort() for file in files: file = Path(file) file_with_meta = {'path': file, 'filename': file.name, 'characters': self.get_file_length(file), 'tokens': self.get_file_tokens(file), 'transformed_text': False} self.files.append(file_with_meta) self.filenames.append(file.name) # Initial population of self.aggregation self.set_aggregation(self.files) # Initial checkpoint self.save_aggregation_state() else: raise FileNotFoundError
[docs] @filter def filter_by_max_chars(self, max_chars=100): """ :param max_chars: the maximum number of characters a file can have :type max_chars: int :human_name: Maximum characters """ new_aggregation = [] for file in self.get_aggregation(): if file['characters'] <= int(max_chars): new_aggregation.append(file) self.set_aggregation(new_aggregation)
[docs] @filter def filter_by_min_chars(self, min_chars=100): """ :param min_chars: the minimum number of characters a file can have :type min_chars: int :human_name: Minimum characters """ new_aggregation = [] for file in self.get_aggregation(): if file['characters'] >= int(min_chars): new_aggregation.append(file) self.set_aggregation(new_aggregation)
[docs] @filter def filter_by_max_tokens(self, max_tokens=100): """ :param max_tokens: the maximum number of tokens a file can have :type max_tokens: int :human_name: Maximum tokens """ new_aggregation = [] for file in self.get_aggregation(): if file['tokens'] <= int(max_tokens): new_aggregation.append(file) self.set_aggregation(new_aggregation)
[docs] @filter def filter_by_min_tokens(self, min_tokens=1): """ :param min_tokens: the minimum number of tokens a file can have :type min_tokens: int :human_name: Minimum tokens """ new_aggregation = [] for file in self.get_aggregation(): if file['tokens'] >= int(min_tokens): new_aggregation.append(file) self.set_aggregation(new_aggregation)
[docs] @filter def filter_by_contains(self, contains): """ :param contains: A string that needs to be present in the file :type contains: str :human_name: Contains string """ new_aggregation = [] for file in self.get_aggregation(): with open(file['path'], 'r', encoding=self.encoding, errors='ignore') as f: fr = f.read() if contains in fr: new_aggregation.append(file) self.set_aggregation(new_aggregation)
[docs] @filter def filter_by_not_contains(self, not_contains): """ :param not_contains: A string that is not allowed to be present in the file :type not_contains: str :human_name: Does not contain string """ new_aggregation = [] for file in self.get_aggregation(): with open(file['path'], 'r', encoding=self.encoding, errors='ignore') as f: fr = f.read() if not_contains not in fr: new_aggregation.append(file) self.set_aggregation(new_aggregation)
[docs] @filter def filter_by_filename_contains(self, contains): """ :param contains: A string that needs to be present in the filename :type contains: str :human_name: Filename contains string """ new_aggregation = [] for file in self.get_aggregation(): if contains in file['path'].name: new_aggregation.append(file) self.set_aggregation(new_aggregation)
[docs] @filter def filter_by_random_sampling(self, n, replace=False): """ :param n: the number of documents in the sample :type n: int :param replace: Should valued be replaced :type replace: bool :human_name: Random sampling """ self.aggregation = np.random.choice(self.aggregation, int(n), replace=replace)
[docs] @filter def filter_by_chars_outliers(self, sigmas=2): """ :param sigmas: The number of stds that qualifies an outlier. :type sigmas: int :human_name: Character outliers """ chars_list = [file['characters'] for file in self.get_aggregation()] std = np.std(chars_list) mean = np.mean(chars_list) min = round(mean - sigmas * std, 1) max = round(mean + sigmas * std, 1) self.filter_by_min_chars(min) self.filter_by_max_chars(max) return std, mean, min, max
[docs] @filter def filter_by_max_filesize(self, max_kb=100): """ :param max_mb: The maximum number of kB a file is allowed to have. :type max_mb: int :human_name: Maximum filesize """ new_aggregation = [] for file in self.get_aggregation(): if os.stat(file['path']).st_size / 1024 <= max_kb: new_aggregation.append(file) self.set_aggregation(new_aggregation)
[docs] @filter def filter_by_min_filesize(self, min_kb=10): """ :param max_mb: The minimum number of kB a file is allowed to have. :type max_mb: int :human_name: Minimum Filesize """ new_aggregation = [] for file in self.get_aggregation(): if os.stat(file['path']).st_size / 1024 >= min_kb: new_aggregation.append(file) self.set_aggregation(new_aggregation)
[docs] @filter def filter_by_similar_documents(self, reference_file, threshold=0.8): """ :param reference_file: Path to the reference file :type reference_file: str :param threshold: A value between 0.0 and 1.0 indicating the max. difference between the file and the reference. :type threshold: float :human_name: Similar documents """ if not 0.0 <= threshold <= 1.0: raise(ValueError) new_aggregation = [] with open(reference_file, 'r', encoding=self.encoding, errors='ignore') as rf: reference = rf.read() for file in self.get_aggregation(): with open(file['path'], 'r', encoding=self.encoding, errors='ignore') as ft: target = ft.read() d = difflib.SequenceMatcher(None, reference, target) if d.ratio() >= threshold: new_aggregation.append(file) self.set_aggregation(new_aggregation)
[docs] def stage_transformation(self, transformation): """ :param transformation: the transformation that should be staged and its parameters :type transformation: list """ available_transformations = dir(transformations) if transformation[0] in available_transformations: self.staged_transformations.append(transformation) else: raise NameError
[docs] def destage_transformation(self, transformation): """ :param transformation: the transformation that should be de-staged and its parameters :type transformation: list """ available_transformations = self.staged_transformations if transformation[0] in available_transformations: self.staged_transformations.remove(transformation) else: raise NameError
[docs] def run_transformations(self, text): """ :param text: the text to run staged transformations on :type text: str :return: the transformed text """ transformed_text = text for transformation in self.staged_transformations: transformation_method = getattr(transformations, transformation[0]) transformed_text = transformation_method(transformed_text, *transformation[1:]) return transformed_text
[docs] def run_filters(self, filters): """ :param filters: A list of tuples with filters and their arguments. :type filters: list """ for filter, *args in filters: filter_method = getattr(self, filter) filter_method(*args)
[docs] def aggregate_to_memory(self): """ :return: a string containing the aggregated text files :type: str """ aggregated_string = '' for file in self.get_aggregation(): with file['path'].open(encoding=self.encoding, errors='ignore') as f: text = self.run_transformations(f.read()) file['transformed_text'] = text aggregated_string = aggregated_string + text return aggregated_string
[docs] def transform_to_memory(self): """Runs all transformations and stores the transformed texts in memory.""" for file in self.get_aggregation(): with file['path'].open(encoding=self.encoding, errors='ignore') as f: text = self.run_transformations(f.read()) file['transformed_text'] = text
[docs] def clear_transformation(self): """Destage all transformations and clear memory.""" self.staged_transformations = [] for file in self.files: file['transformed_text'] = False
[docs] def aggregate_to_file(self, filename='aggregated.txt'): """ :param filename: the path/filename to write to :type filename: str """ with open(filename, 'w', encoding=self.encoding, errors='ignore') as aggregation_file: for file in self.get_aggregation(): with file['path'].open(encoding=self.encoding, errors='ignore') as f: text = self.run_transformations(f.read()) aggregation_file.write(text)
[docs] def print_aggregation(self): """Print the aggregated files as a table.""" print(helpers.tabulate_flat_list_of_dicts(list(self.get_aggregation()))) print(f'\nStaged Transformations: {self.staged_transformations}')
[docs] def print_saved_states(self): """Print all saved states.""" print('Saved States:') for i, state in enumerate(self.aggregation_states): print (f'[{i}] - {len(state[0])} files after applying {state[1]}')
[docs] def print_pipeline(self): """Print the current pipeline. """ print('Applied Filters:') if len(self.aggregation_states) > 0: print(f'> {len(self.aggregation_states)} states have been saved.') print(f'> Currently on state {self.current_state} / {len(self.aggregation_states)}') if len(self.applied_filters) == 0: print('None') else: for filter in self.applied_filters: print(filter) print('\nStaged Transformations:') if len(self.staged_transformations) == 0: print('None') else: for transformation in self.staged_transformations: print(transformation)