Source code for fast.fast

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from time import time
from pybgl.ipynb import ipynb_display_graph

from .cbfs import CBFS
from .find_ast_visitor import FindAstFromStringsDefaultVisitor
from .objective_func import (
    make_additive_objective_func_for_str,
    make_multiplicative_objective_func_for_str,
    make_tuple_based_objective_func_for_str,
    make_normalized_additive_objective_func_for_str,
)
from .pattern_automaton import PatternAutomaton
from .regexp_ast import RegexpAst
from .regexp_mutators import MUTATORS


(OBJ_ADD, OBJ_MULT, OBJ_ADD_NORM, OBJ_TUPLE) = range(4)


[docs]def shortness_factor(examples: iter) -> float: """ Computes the normalization factor used to ensure the shortness is always in [0, 1]. Args: examples (set): The set of positive samples Returns: The normalization factor. Notes: - The default behavior of find_ast is to choose n = 1. However, using n = 1 makes the shortness equal to the size of the inferred AST. As a sequel, the shortness is too prepondarant w.r.t. to the accuracy which is in [0.0, 1.0] and fAST waste a lot of energy to explore irrelevant ASTs. That is why we must normalize the shortness. - To do so, we should use n = sum(len(example) for example in examples). However, this scaling factor is too aggressive if len(examples) increases. - To circumvent the problem, we use n = max(len(example) for example in examples). Intuitively, this guarantees that 0 <= shortness(PTA(longest_example)) <= 1 which means that the ASTs we are searching have most of time a scaled shortness below 1. This scaling factor makes the shortness insensitive to the number of examples (only to the length of the longest example) and comparable with the accuracy. """ n = max(len(example) for example in examples) return 1 / (2 * n)
[docs]def fast_from_strings( examples: list, objective_function: callable = None, stop_condition: callable = None, mutators: list = None, visitor: FindAstFromStringsDefaultVisitor = None ) -> list: """ Regular expression inference algorithm. Args: examples (list): The list of input :py:class:`PatternAutomaton` instances corresponding to each positive examples. objective_function (callable): The objective function to be minimized. stop_condition (callable): A ``callable(candidate_solutions, elapsed_time) -> bool`` function where: - candidate_solutions (list): The list of current candidates solutions. - time_elapsed (float): The current execution time, in seconds. - the returned value is ``True`` iff the execution must be stopped. mutators (list): The list of `Mutator` instances used to update the ASTs. Returns: A list of final solutions, once the queue is empty (dream on), or once the stop condition is called (more reasonable). A final solution is a tuple `(h, prefix_regexp)` where `pref_reg: list` is a prefix regular expression of the solution, and `h` is its objective function value. """ if visitor is None: visitor = FindAstFromStringsDefaultVisitor() if objective_function is None: alphabet = set(a for w in examples for a in w) alpha = shortness_factor(examples) objective_function = make_normalized_additive_objective_func_for_str( examples=examples, alphabet=alphabet, density_factor=(1 - alpha), size_factor=alpha ) if stop_condition is None: def stop_condition(final_results, time_elapsed): return len(final_results) == 1 if mutators is None: mutators = [mutator(MUTATORS) for mutator in MUTATORS] def indices_to_depth(i: int, k: int) -> int: """ Converts `(i, k)` pair, where `i` is the index of the word being processed and `k` the index of the character being processed to the corresponding progression. In CBFS, the progression of an AST identifies in which queue it must pushed. Args: i (int): The index of the current positive example. k (int): The index of the current character in the positive example being processed. Returns: The current progression. """ return sum( len(example.w) if isinstance(example, PatternAutomaton) else len(example) for example in examples[:i] ) + k def next_symbols(example, j: int) -> iter: if isinstance(example, PatternAutomaton): pa = example return ( (pa.label(e), pa.target(e)) for e in pa.out_edges(j) ) else: # str sigma = w[j] k = j + 1 return [(sigma, k)] # Maximum progression of the problem total_depth = sum( len(example.w) if isinstance(example, PatternAutomaton) else len(example) for example in examples ) + 1 # Keep track of the pushed regexps to prevent duplicates pushed_regexps = { depth: set() for depth in range(total_depth) } def was_already_pushed(ast: RegexpAst, active_leaf: int, push_idx: int) -> bool: """ Checks whether an AST has already been seen. To check this the AST is converted to a string identifier and this key is stored in `pushed_regexps`. """ # print(ast.to_prefix_regexp_list()) # ipynb_display_graph(ast) ast_key = '$'.join(ast.to_prefix_regexp_list()), active_leaf if ast_key in pushed_regexps[push_idx]: return True else: pushed_regexps[push_idx].add(ast_key) return False # Cache to keep track of regexps objective_function's value, # to prevent redundant computations map_regexp_value = dict() def compute_objective_value( ast: RegexpAst, examples: list ) -> float: """ Computes the value returned by the objective function given a candidate solution and the list of positive examples. Args: ast (RegexpAst): The candidate solution. examples: A list of `PatternAutomaton` instances. Returns: The corresponding objective function value. The lower this value, the better the candidate solution. """ ast_key = '$'.join(ast.to_prefix_regexp_list()) if ast_key not in map_regexp_value.keys(): map_regexp_value[ast_key] = objective_function( ast, examples ) return map_regexp_value[ast_key] # List to store final results (tuples obj_value, ast) final_results = [] # Initialize the CBFS priority queue. cbfs_q = CBFS(total_depth) # Build the initial item of our pq i = 0 # index of current PA k = 0 # current progression in current string ast_counter = 0 # ast counter, acts as distinguisher in the pq initial_ast = RegexpAst() # initial empty ast active_leaf = initial_ast.root obj_func_value = 0 # any value would do, will never be compared initial_pq_item = (obj_func_value, ast_counter, initial_ast, active_leaf, i, k) cbfs_q.push(initial_pq_item, indices_to_depth(i, k)) ast_counter += 1 # Main loop t_start = time() while not cbfs_q.is_empty(): # Check the final condition time_elapsed = time() - t_start if stop_condition(final_results, time_elapsed): # print("stop condition triggered") break # Get the next item to process from the priority queue # print(cbfs_q.queues[cbfs_q.pop_idx]) pq_item = cbfs_q.pop() obj_func_value, _, ast, active_leaf, i, j = pq_item print(i, j, "___", ast.to_prefix_regexp_str()) w = ( examples[i].w if isinstance(examples[i], PatternAutomaton) else examples[i] ) visitor.visit_pop_item(pq_item) # If we have reached the end of the current positive example. if j == len(w): # print("end of word %s" % i) # move to next string i += 1 j = 0 active_leaf = ast.root # Check that the AST still recognizes all # the words that have been seen so far if any( not ast.recognizes(example) for example in examples[:i] ): print("bad", ast.to_prefix_regexp_str()) ipynb_display_graph(ast) continue # 'bad' ast. discard and move on # If we have reach the end of the last example, then the AST is a # candidate solution. if i == len(examples) or all( ast.recognizes(example) for example in examples ): print("final", ast.to_prefix_regexp_str(), obj_func_value) ipynb_display_graph(ast) final_results.append((obj_func_value, ast)) visitor.visit_final_solution(obj_func_value, ast) continue # final solution checked and stored, move on # not end of last example -> consume character and produce children active_leaf_parent = ast.map_node_parent[active_leaf] epsilon_reachable_arcs = ast.epsilon_reachables( active_leaf, active_leaf_parent ) # print("eps reachables: %s" % epsilon_reachable_arcs) num_mutants = 0 example = examples[i] for (u, v) in epsilon_reachable_arcs: for mutator in mutators: for (sigma, k) in next_symbols(example, j): # print(" **", a) new_depth = indices_to_depth(i, k) mutated_asts = mutator.mutate( ast, sigma, u, v, example.w[:k+1] if isinstance(example, PatternAutomaton) else w[:k], examples[:i], epsilon_reachable_arcs, example ) num_mutants += len(mutated_asts) for (new_ast, new_active_leaf) in mutated_asts: new_ast.simplify() if was_already_pushed(new_ast, new_active_leaf, new_depth): continue new_obj_func_value = compute_objective_value( new_ast, examples ) new_pq_item = ( new_obj_func_value, ast_counter, new_ast, new_active_leaf, i, k ) # print(" pushing %s" % new_ast.to_prefix_regexp_str()) cbfs_q.push(new_pq_item, new_depth) visitor.visit_push_item(mutator, new_depth, new_pq_item) ast_counter += 1 print(" generated %s mutants" % num_mutants) return final_results
[docs]def fast( examples: list, obj_id: int = OBJ_ADD, stop_condition: callable = lambda final_results, time_elapsed: len(final_results) == 1, mutators: list = MUTATORS, *cls, **kwargs ) -> list: """ Runs fAST algorithm to infer regular expression from a set of positive examples. Args: examples(list): A ``set(str)`` gathering the positive examples. obj_id(int): A value among ``{OBJ_ADD, OBJ_MULT, OBJ_TUPLE}``. stop(callable): Stopping ``callable(final_results: list, time_elapsed: float) -> bool`` callback. mutators(list): Defaults to ``MUTATORS``. cls, kwargs: see ``objective_func.make_*_objective_funcfor_str`` functions. Returns: The found regular expressions. """ alphabet = set(a for w in examples for a in w) make_obj = [ make_additive_objective_func_for_str, make_multiplicative_objective_func_for_str, make_normalized_additive_objective_func_for_str, make_tuple_based_objective_func_for_str ][obj_id] if obj_id == OBJ_ADD and "size_factor" not in kwargs.keys(): kwargs["size_factor"] = shortness_factor(examples) # TODO: one pattern per letter # transform examples to a list of PA f_obj = make_obj( examples=examples, alphabet=alphabet, *cls, **kwargs ) return fast_from_strings( examples=examples, objective_function=f_obj, stop_condition=stop_condition, mutators=[mutator(mutators) for mutator in mutators] )
[docs]def fast_from_re( regexp: str, num_examples: int = 10, p_stop_final: float = 0.5, repeat: bool = True, max_sampling: int = 1000, *cls, **kwargs ) -> list: """ Runs fAST algorithm to infer regular expression from a set of positive examples. Args: regexp(str): A regular expression. num_examples(int): Number of positive examples. p_stop_final (float): A ``float`` between ``0.0`` and ``1.0`` corresponding the probability to stop when reaching a final state of the automaton induced by ``regexp``. The higher ``p_stop_final``, the longer the examples. repeat(bool): Pass `True` to rerun sampling in case of reject. max_sampling (int): Maximum number of sampling. Pass ``None`` to continue sampling until finding a non-rejected value. Note that if `sample` always returns ``None``, this results to an infinite loop. cls, kwargs: see the py:func:`fast` function . Returns: The found regular expressions. """ from pybgl.regexp import compile_dfa from .random import random_word_from_automaton g = compile_dfa(regexp) examples = set() while len(examples) < num_examples: w = random_word_from_automaton(g, p_stop_final, repeat, max_sampling) examples.add(w) print(examples) for kw in ("num_examples", "p_stop_final", "repeat", "max_sampling"): kwargs.pop(kw, None) solutions = sorted( fast(list(examples), *cls, **kwargs), key=lambda solution: solution[0], ) return [(score, ast.to_infix_regexp_str()) for (score, ast) in solutions]
[docs]def fast_benchmark( map_relen_numre: dict = None, alphabet: list = None, num_examples: int = 10, *cls, **kwargs ) -> float: """ Tests fAST on a custom benchmark by crafting a set of test regular expression. The goal is to test whether fAST finds the original (or a better) regular expression. Args: map_relen_numre (dict): A dictionary which maps for each length the number of regular expressions to be randomly crafted. alphabet (list): The list of symbols. num_examples (int): The number of positive samples to be generated for this evaluation. cls, kwargs: See the :py:func:`fast` function. """ from pybgl.regexp import compile_dfa from pprint import pformat from .random import random_ast, random_word_from_automaton def is_valid(g, examples): return all(g.accepts(example) for example in examples) if map_relen_numre is None: map_relen_numre = { re_len: 10 for re_len in range(5, 20, 5) } if not alphabet: alphabet = list("abcde") for kw in ("map_relen_numre", "alphabet"): kwargs.pop(kw, None) num_tries = 0 num_successes = 0 for (re_len, num_regexps) in map_relen_numre.items(): print("=" * 80) print(f"re_len = {re_len}") for index_instance in range(num_regexps): print("-" * 80) # Generate random regexp ast = random_ast(re_len, alphabet) regexp = ast.to_expr().replace(".", "") print(f"input regexp = {regexp}") g = compile_dfa(regexp) # Generate positive examples examples = set() i = 0 while len(examples) < num_examples: w = random_word_from_automaton(g, p=0.5, repeat=True, max_sampling=1000) if w: # TODO: fix fast_from_strings to that is support the "" example. examples.add(w) i += 1 if i == 100: print(f"The target language seems very small, stopping with {len(examples)} instead of {num_examples}") break print(f"examples = {pformat(examples)}") # Run fAST inference solutions = sorted( fast( list(examples), stop_condition=lambda final_results, time_elapsed: len(final_results) == 1 or time_elapsed > 10, *cls, **kwargs ), key=lambda solution: solution[0], ) regexps = [ ast.to_infix_regexp_str().replace(".", "") for (score, ast) in solutions ] print(f"output regexps = {pformat(regexps)}") if any(is_valid(compile_dfa(regexp), examples) for regexp in regexps): print("SUCCESS") num_successes += 1 else: print("FAIL") num_tries += 1 return (num_successes, num_tries)