Source code for crosshair.statespace

import ast
import builtins
import copy
import enum
import functools
import random
import re
import threading
from collections import Counter, defaultdict
from dataclasses import dataclass
from sys import _getframe
from time import monotonic
from types import FrameType
from typing import (
    Any,
    Callable,
    Dict,
    List,
    NewType,
    Optional,
    Sequence,
    Set,
    Tuple,
    Type,
    TypeVar,
)

import z3  # type: ignore

from crosshair import dynamic_typing
from crosshair.condition_parser import ConditionExpr
from crosshair.tracers import NoTracing, ResumedTracing, is_tracing
from crosshair.util import (
    CrosshairInternal,
    IgnoreAttempt,
    PathTimeout,
    UnknownSatisfiability,
    debug,
    in_debug,
    name_of_type,
    test_stack,
)
from crosshair.z3util import z3Aassert, z3Not, z3PopNot


@functools.total_ordering
class MessageType(enum.Enum):
    CONFIRMED = "confirmed"
    # The postcondition returns True over all execution paths.

    CANNOT_CONFIRM = "cannot_confirm"
    # The postcondition returns True over the execution paths that were
    # attempted.

    PRE_UNSAT = "pre_unsat"
    # No attempted execution path got past the precondition checks.

    POST_ERR = "post_err"
    # The postcondition raised an exception for some input.

    EXEC_ERR = "exec_err"
    # The body of the function raised an exception for some input.

    POST_FAIL = "post_fail"
    # The postcondition returned False for some input.

    SYNTAX_ERR = "syntax_err"
    # Pre/post conditions could not be determined.

    IMPORT_ERR = "import_err"
    # The requested module could not be imported.

    def __repr__(self):
        return f"MessageType.{self.name}"

    def __lt__(self, other):
        return self._order[self] < self._order[other]


MessageType._order = {  # type: ignore
    # This is the order that messages override each other (for the same source
    # file line).
    # For exmaple, we prefer to report a False-returning postcondition
    # (POST_FAIL) over an exception-raising postcondition (POST_ERR).
    MessageType.CONFIRMED: 0,
    MessageType.CANNOT_CONFIRM: 1,
    MessageType.PRE_UNSAT: 2,
    MessageType.POST_ERR: 3,
    MessageType.EXEC_ERR: 4,
    MessageType.POST_FAIL: 5,
    MessageType.SYNTAX_ERR: 6,
    MessageType.IMPORT_ERR: 7,
}


CONFIRMED = MessageType.CONFIRMED
CANNOT_CONFIRM = MessageType.CANNOT_CONFIRM
PRE_UNSAT = MessageType.PRE_UNSAT
POST_ERR = MessageType.POST_ERR
EXEC_ERR = MessageType.EXEC_ERR
POST_FAIL = MessageType.POST_FAIL
SYNTAX_ERR = MessageType.SYNTAX_ERR
IMPORT_ERR = MessageType.IMPORT_ERR


@dataclass(frozen=True)
class AnalysisMessage:
    state: MessageType
    message: str
    filename: str
    line: int
    column: int
    traceback: str
    test_fn: Optional[str] = None
    condition_src: Optional[str] = None


@functools.total_ordering
class VerificationStatus(enum.Enum):
    REFUTED = 0
    UNKNOWN = 1
    CONFIRMED = 2

    def __repr__(self):
        return f"VerificationStatus.{self.name}"

    def __str__(self):
        return self.name

    def __lt__(self, other):
        if self.__class__ is other.__class__:
            return self.value < other.value
        return NotImplemented


@dataclass
class CallAnalysis:
    verification_status: Optional[VerificationStatus] = None  # None means "ignore"
    messages: Sequence[AnalysisMessage] = ()
    failing_precondition: Optional[ConditionExpr] = None
    failing_precondition_reason: str = ""


HeapRef = z3.DeclareSort("HeapRef")
SnapshotRef = NewType("SnapshotRef", int)


def model_value_to_python(value: z3.ExprRef) -> object:
    if z3.is_real(value):
        if isinstance(value, z3.AlgebraicNumRef):
            # Force irrational values to be rational:
            value = value.approx(precision=20)
        return float(value.as_fraction())
    elif z3.is_seq(value):
        ret = []
        while value.num_args() == 2:
            ret.append(model_value_to_python(value.arg(0).arg(0)))
            value = value.arg(1)
        if value.num_args() == 1:
            ret.append(model_value_to_python(value.arg(0)))
        return ret
    elif z3.is_int(value):
        return value.as_long()
    else:
        return ast.literal_eval(repr(value))


def prefer_true(v: Any) -> bool:
    assert not is_tracing()
    if not (hasattr(v, "var") and z3.is_bool(v.var)):
        with ResumedTracing():
            v = v.__bool__()
        if not (hasattr(v, "var")):
            return v
    space = context_statespace()
    return space.choose_possible(v.var, probability_true=1.0)


class StateSpaceCounter(Counter):
    @property
    def iterations(self) -> int:
        return sum(self[s] for s in VerificationStatus) + self[None]

    @property
    def unknown_pct(self) -> float:
        return self[VerificationStatus.UNKNOWN] / (self.iterations + 1)

    def __str__(self) -> str:
        parts = []
        for k, ct in self.items():
            if isinstance(k, enum.Enum):
                k = k.name
            parts.append(f"{k}:{ct}")
        return "{" + ", ".join(parts) + "}"


class NotDeterministic(Exception):
    pass


class AbstractPathingOracle:
    def pre_path_hook(self, root: "RootNode") -> None:
        pass

    def post_path_hook(self, path: Sequence["SearchTreeNode"]) -> None:
        pass

    def decide(
        self, root, node: "WorstResultNode", engine_probability: Optional[float]
    ) -> float:
        raise NotImplementedError


# NOTE: CrossHair's monkey-patched getattr calls this function, so we
# force ourselves to use the builtin getattr, avoiding an infinite loop.
real_getattr = builtins.getattr

_THREAD_LOCALS = threading.local()
_THREAD_LOCALS.space = None


class StateSpaceContext:
    def __init__(self, space: "StateSpace"):
        self.space = space

    def __enter__(self):
        prev = real_getattr(_THREAD_LOCALS, "space", None)
        assert prev is None, "Already in a state space context"
        space = self.space
        _THREAD_LOCALS.space = space
        space.mark_all_parent_frames()

    def __exit__(self, exc_type, exc_value, tb):
        prev = real_getattr(_THREAD_LOCALS, "space", None)
        assert prev is self.space, "State space was altered in context"
        _THREAD_LOCALS.space = None
        return False


def optional_context_statespace() -> Optional["StateSpace"]:
    return _THREAD_LOCALS.space


def context_statespace() -> "StateSpace":
    space = _THREAD_LOCALS.space
    if space is None:
        raise CrosshairInternal
    return space


def newrandom():
    return random.Random(1801243388510242075)


_N = TypeVar("_N", bound="SearchTreeNode")
_T = TypeVar("_T")


class NodeLike:
    def is_exhausted(self) -> bool:
        return False

    def get_result(self) -> CallAnalysis:
        """
        Get the result from the call.

        post: implies(_.verification_status == VerificationStatus.CONFIRMED, self.is_exhausted())
        """
        raise NotImplementedError

    def is_stem(self) -> bool:
        return False

    def grow_into(self, node: _N) -> _N:
        raise NotImplementedError

    def simplify(self) -> "NodeLike":
        return self

    def stats(self) -> StateSpaceCounter:
        raise NotImplementedError


class NodeStem(NodeLike):
    evolution: Optional["SearchTreeNode"] = None

    def is_exhausted(self) -> bool:
        return False if self.evolution is None else self.evolution.is_exhausted()

    def get_result(self) -> CallAnalysis:
        return (
            CallAnalysis(VerificationStatus.UNKNOWN)
            if self.evolution is None
            else self.evolution.get_result()
        )

    def is_stem(self) -> bool:
        return self.evolution is None

    def grow_into(self, node: _N) -> _N:
        self.evolution = node
        return node

    def simplify(self):
        return self if self.evolution is None else self.evolution

    def stats(self) -> StateSpaceCounter:
        return StateSpaceCounter() if self.evolution is None else self.evolution.stats()

    def __repr__(self) -> str:
        return "NodeStem()"


class SearchTreeNode(NodeLike):
    """
    Represent a single decision point.

    Abstract helper class for StateSpace.
    """

    stacktail: Tuple[str, ...] = ()
    result: CallAnalysis = CallAnalysis()
    exhausted: bool = False

    def choose(
        self, space: "StateSpace", probability_true: Optional[float] = None
    ) -> Tuple[bool, float, NodeLike]:
        raise NotImplementedError

    def is_exhausted(self) -> bool:
        return self.exhausted

    def get_result(self) -> CallAnalysis:
        return self.result

    def update_result(self, leaf_analysis: CallAnalysis) -> bool:
        if not self.exhausted:
            next_result, next_exhausted = self.compute_result(leaf_analysis)
            if next_exhausted != self.exhausted or next_result != self.result:
                self.result, self.exhausted = next_result, next_exhausted
                return True
        return False

    def compute_result(self, leaf_analysis: CallAnalysis) -> Tuple[CallAnalysis, bool]:
        raise NotImplementedError


def solver_is_sat(solver, *a) -> bool:
    ret = solver.check(*a)
    if ret == z3.unknown:
        solver.add(*a)
        debug("Unknown satisfiability. Solver state follows:\n", solver.sexpr())
        raise UnknownSatisfiability
    return ret == z3.sat


def node_result(node: Optional[NodeLike]) -> Optional[CallAnalysis]:
    if node is None:
        return None
    return node.get_result()


def node_status(node: Optional[NodeLike]) -> Optional[VerificationStatus]:
    result = node_result(node)
    if result is not None:
        return result.verification_status
    else:
        return None


class SearchLeaf(SearchTreeNode):
    def __init__(self, result: CallAnalysis):
        self.result = result
        self.exhausted = True
        self._stats = StateSpaceCounter({result.verification_status: 1})

    def stats(self) -> StateSpaceCounter:
        return self._stats

    def __str__(self) -> str:
        return f"{self.__class__.__name__}({self.result.verification_status})"


class SinglePathNode(SearchTreeNode):
    decision: bool
    child: NodeLike
    _random: random.Random

    def __init__(self, decision: bool):
        self.decision = decision
        self.child = NodeStem()
        self._random = newrandom()

    def choose(
        self, space: "StateSpace", probability_true: Optional[float] = None
    ) -> Tuple[bool, float, NodeLike]:
        return (self.decision, 1.0, self.child)

    def compute_result(self, leaf_analysis: CallAnalysis) -> Tuple[CallAnalysis, bool]:
        self.child = self.child.simplify()
        return (self.child.get_result(), self.child.is_exhausted())

    def stats(self) -> StateSpaceCounter:
        return self.child.stats()


class BranchCounter:
    __slots__ = ["pos_ct", "neg_ct"]
    pos_ct: int
    neg_ct: int

    def __init__(self):
        self.pos_ct = 0
        self.neg_ct = 0


class RootNode(SinglePathNode):
    def __init__(self):
        super().__init__(True)
        self._open_coverage: Dict[Tuple[str, ...], BranchCounter] = defaultdict(
            BranchCounter
        )
        from crosshair.pathing_oracle import CoveragePathingOracle  # circular import

        self.pathing_oracle = CoveragePathingOracle()


class DeatchedPathNode(SinglePathNode):
    def __init__(self):
        super().__init__(True)
        # Seems like `exhausted` should be True, but we set to False until we can
        # collect the result from path's leaf. (exhaustion prevents caches from
        # updating)
        self.exhausted = False
        self._stats = None

    def compute_result(self, leaf_analysis: CallAnalysis) -> Tuple[CallAnalysis, bool]:
        self.child = self.child.simplify()
        return (leaf_analysis, True)

    def stats(self) -> StateSpaceCounter:
        if self._stats is None:
            self._stats = StateSpaceCounter(
                {
                    k: v
                    for k, v in self.child.stats().items()
                    # We only propagate the verification status.
                    # (we should mostly look like a SearchLeaf)
                    if isinstance(k, VerificationStatus)
                }
            )
        return self._stats


class BinaryPathNode(SearchTreeNode):
    positive: NodeLike
    negative: NodeLike

    def __init__(self):
        self._stats = StateSpaceCounter()

    def stats_lookahead(self) -> Tuple[StateSpaceCounter, StateSpaceCounter]:
        return (self.positive.stats(), self.negative.stats())

    def stats(self) -> StateSpaceCounter:
        return self._stats


class RandomizedBinaryPathNode(BinaryPathNode):
    def __init__(self, rand: random.Random):
        super().__init__()
        self._random = rand
        self.positive = NodeStem()
        self.negative = NodeStem()

    def probability_true(
        self, space: "StateSpace", requested_probability: Optional[float] = None
    ) -> float:
        raise NotImplementedError

    def choose(
        self, space: "StateSpace", probability_true: Optional[float] = None
    ) -> Tuple[bool, float, NodeLike]:
        positive_ok = not self.positive.is_exhausted()
        negative_ok = not self.negative.is_exhausted()
        assert positive_ok or negative_ok
        if positive_ok and negative_ok:
            probability_true = self.probability_true(
                space, requested_probability=probability_true
            )
            randval = self._random.uniform(0.000_001, 0.999_999)
            if randval < probability_true:
                return (True, probability_true, self.positive)
            else:
                return (False, 1.0 - probability_true, self.negative)
        else:
            return (positive_ok, 1.0, self.positive if positive_ok else self.negative)

    def _simplify(self) -> None:
        self.positive = self.positive.simplify()
        self.negative = self.negative.simplify()


class ParallelNode(RandomizedBinaryPathNode):
    """Choose either path; the first complete result will be used."""

    def __init__(self, rand: random.Random, false_probability: float, desc: str):
        super().__init__(rand)
        self._false_probability = false_probability
        self._desc = desc

    def __repr__(self):
        return f"ParallelNode(false_pct={self._false_probability}, {self._desc})"

    def compute_result(self, leaf_analysis: CallAnalysis) -> Tuple[CallAnalysis, bool]:
        self._simplify()
        positive, negative = self.positive, self.negative
        pos_exhausted = positive.is_exhausted()
        neg_exhausted = negative.is_exhausted()
        if pos_exhausted and not node_status(positive) == VerificationStatus.UNKNOWN:
            self._stats = positive.stats()
            return (positive.get_result(), True)
        if neg_exhausted and not node_status(negative) == VerificationStatus.UNKNOWN:
            self._stats = negative.stats()
            return (negative.get_result(), True)
        # it's unclear whether we want to just add stats here:
        self._stats = StateSpaceCounter(positive.stats() + negative.stats())
        return merge_node_results(
            positive.get_result(),
            pos_exhausted and neg_exhausted,
            negative,
        )

    def probability_true(
        self, space: "StateSpace", requested_probability: Optional[float] = None
    ) -> float:
        if self.negative.is_exhausted():
            return 1.0
        elif requested_probability is not None:
            return requested_probability
        else:
            return 1.0 - self._false_probability


def merge_node_results(
    left: CallAnalysis, exhausted: bool, node: NodeLike
) -> Tuple[CallAnalysis, bool]:
    """
    Merge analysis from different branches of code.

    Combines messages, take the worst verification status of the two, etc.
    """
    right = node.get_result()
    if not node.is_exhausted():
        exhausted = False
    if left.verification_status is None:
        return (right, exhausted)
    if right.verification_status is None:
        return (left, exhausted)
    if left.failing_precondition and right.failing_precondition:
        lc, rc = left.failing_precondition, right.failing_precondition
        precond_side = left if lc.line > rc.line else right
    else:
        precond_side = left if left.failing_precondition else right
    return (
        CallAnalysis(
            min(left.verification_status, right.verification_status),
            list(left.messages) + list(right.messages),
            precond_side.failing_precondition,
            precond_side.failing_precondition_reason,
        ),
        exhausted,
    )


_RE_WHITESPACE_SUB = re.compile(r"\s+").sub


class WorstResultNode(RandomizedBinaryPathNode):
    forced_path: Optional[bool] = None
    expr: Optional[z3.ExprRef] = None
    normalized_expr: Tuple[bool, z3.ExprRef]

    def __init__(self, rand: random.Random, expr: z3.ExprRef, solver: z3.Solver):
        super().__init__(rand)
        is_positive, root_expr = z3PopNot(expr)
        self.normalized_expr = (is_positive, root_expr)
        notexpr = z3Not(expr) if is_positive else root_expr
        if solver_is_sat(solver, notexpr):
            if not solver_is_sat(solver, expr):
                self.forced_path = False
        else:
            # TODO: we still run into soundness issues on occasion, so I'd like to
            # leave _PERFORM_EXTRA_SAT_CHECKS enabled a little longer:
            _PERFORM_EXTRA_SAT_CHECKS = True
            if _PERFORM_EXTRA_SAT_CHECKS and not solver_is_sat(solver, expr):
                debug(" *** Reached impossible code path *** ")
                debug("Current solver state:\n", str(solver))
                raise CrosshairInternal("Reached impossible code path")
            self.forced_path = True
        self.expr = expr

    def _is_exhausted(self):
        return (
            (self.positive.is_exhausted() and self.negative.is_exhausted())
            or (self.forced_path is True and self.positive.is_exhausted())
            or (self.forced_path is False and self.negative.is_exhausted())
        )

    def __repr__(self):
        smt_expr = _RE_WHITESPACE_SUB(" ", str(self.expr))
        exhausted = " : exhausted" if self._is_exhausted() else ""
        forced = f" : force={self.forced_path}" if self.forced_path is not None else ""
        return f"{self.__class__.__name__}({smt_expr}{exhausted}{forced})"

    def choose(
        self, space: "StateSpace", probability_true: Optional[float] = None
    ) -> Tuple[bool, float, NodeLike]:
        if self.forced_path is None:
            return RandomizedBinaryPathNode.choose(self, space, probability_true)
        return (
            self.forced_path,
            1.0,
            self.positive if self.forced_path else self.negative,
        )

    def probability_true(
        self, space: "StateSpace", requested_probability: Optional[float] = None
    ) -> float:
        root = space._root
        return root.pathing_oracle.decide(
            root, self, engine_probability=requested_probability
        )

    def compute_result(self, leaf_analysis: CallAnalysis) -> Tuple[CallAnalysis, bool]:
        self._simplify()
        positive, negative = self.positive, self.negative
        exhausted = self._is_exhausted()
        if node_status(positive) == VerificationStatus.REFUTED or (
            self.forced_path is True
        ):
            self._stats = positive.stats()
            return (positive.get_result(), exhausted)
        if node_status(negative) == VerificationStatus.REFUTED or (
            self.forced_path is False
        ):
            self._stats = negative.stats()
            return (negative.get_result(), exhausted)
        self._stats = StateSpaceCounter(positive.stats() + negative.stats())
        return merge_node_results(
            positive.get_result(), positive.is_exhausted(), negative
        )


class ModelValueNode(WorstResultNode):
    condition_value: object = None

    def __init__(self, rand: random.Random, expr: z3.ExprRef, solver: z3.Solver):
        if not solver_is_sat(solver):
            debug("Solver unexpectedly unsat; solver state:", solver.sexpr())
            raise CrosshairInternal("Unexpected unsat from solver")

        self.condition_value = solver.model().evaluate(expr, model_completion=True)
        self._stats_key = f"realize_{expr}" if z3.is_const(expr) else None
        WorstResultNode.__init__(self, rand, expr == self.condition_value, solver)

    def compute_result(self, leaf_analysis: CallAnalysis) -> Tuple[CallAnalysis, bool]:
        stats_key = self._stats_key
        old_realizations = self._stats[stats_key]
        analysis, is_exhausted = super().compute_result(leaf_analysis)
        if stats_key:
            self._stats[stats_key] = old_realizations + 1
        return (analysis, is_exhausted)


def debug_path_tree(node, highlights, prefix="") -> List[str]:
    highlighted = node in highlights
    node = node.simplify()
    highlighted |= node in highlights
    if isinstance(node, BinaryPathNode):
        if isinstance(node, WorstResultNode) and node.forced_path is not None:
            return debug_path_tree(
                node.positive if node.forced_path else node.negative, highlights, prefix
            )
        lines = []
        forkstr = r"n|\ y  " if isinstance(node, WorstResultNode) else r" |\    "
        if highlighted:
            lines.append(f"{prefix}{forkstr}*{str(node)} {node.stats()}")
        else:
            lines.append(f"{prefix}{forkstr}{str(node)} {node.stats()}")
        if node.is_exhausted() and not highlighted:
            return lines  # collapse fully explored subtrees
        lines.extend(debug_path_tree(node.positive, highlights, prefix + " | "))
        lines.extend(debug_path_tree(node.negative, highlights, prefix))
        return lines
    elif isinstance(node, SinglePathNode):
        lines = []
        if highlighted:
            lines.append(f"{prefix} |     *{type(node).__name__} {node.stats()}")
        else:
            lines.append(f"{prefix} |     {type(node).__name__} {node.stats()}")
        lines.extend(debug_path_tree(node.child, highlights, prefix))
        return lines
    else:
        if highlighted:
            return [f"{prefix} -> *{str(node)} {node.stats()}"]
        else:
            return [f"{prefix} -> {str(node)} {node.stats()}"]


[docs] class StateSpace: """Holds various information about the SMT solver's current state.""" _search_position: NodeLike _deferred_assumptions: List[Tuple[str, Callable[[], bool]]] _extras: Dict[Type, object] def __init__( self, execution_deadline: float, model_check_timeout: float, search_root: RootNode, ): smt_timeout = model_check_timeout * 1000 + 1 smt_tactic = z3.Tactic("smt") if smt_timeout < 1 << 63: smt_tactic = z3.TryFor(smt_tactic, int(smt_timeout)) self.solver = smt_tactic.solver() self.solver.set(mbqi=True) # turn off every randomization thing we can think of: self.solver.set("random-seed", 42) self.solver.set("smt.random-seed", 42) # self.solver.set('randomize', False) self.choices_made: List[SearchTreeNode] = [] self.status_cap: Optional[VerificationStatus] = None self.heaps: List[List[Tuple[z3.ExprRef, Type, object]]] = [[]] self.next_uniq = 1 self.is_detached = False self._extras = {} self._already_logged: Set[z3.ExprRef] = set() self._exprs_known: Dict[z3.ExprRef, bool] = {} self.execution_deadline = execution_deadline self._root = search_root self._random = search_root._random _, _, self._search_position = search_root.choose(self) self._deferred_assumptions = [] search_root.pathing_oracle.pre_path_hook(search_root)
[docs] def add(self, expr: z3.ExprRef) -> None: with NoTracing(): # debug('Committed to ', expr) already_known = self._exprs_known.get(expr) if already_known is None: self.solver.add(expr) self._exprs_known[expr] = True elif already_known is not True: raise CrosshairInternal
def rand(self) -> random.Random: return self._random def extra(self, typ: Type[_T]) -> _T: """Get an object whose lifetime is tied to that of the SMT solver.""" value = self._extras.get(typ) if value is None: value = typ(self.solver) # type: ignore self._extras[typ] = value return value # type: ignore def stats_lookahead(self) -> Tuple[StateSpaceCounter, StateSpaceCounter]: node = self._search_position.simplify() if node.is_stem(): return (StateSpaceCounter(), StateSpaceCounter()) assert isinstance( node, BinaryPathNode ), f"node {node} {node.is_stem()} is not a binarypathnode" return node.stats_lookahead() def fork_parallel(self, false_probability: float, desc: str = "") -> bool: if self._search_position.is_stem(): node: NodeLike = self._search_position.grow_into( ParallelNode(self._random, false_probability, desc) ) assert isinstance(node, ParallelNode) self._search_position = node else: node = self._search_position.simplify() assert isinstance(node, ParallelNode) node._false_probability = false_probability self.choices_made.append(node) ret, _, next_node = node.choose(self) self._search_position = next_node return ret
[docs] def is_possible(self, expr: z3.ExprRef) -> bool: return solver_is_sat(self.solver, expr)
def mark_all_parent_frames(self): frames: Set[FrameType] = set() frame = _getframe() while frame and frame not in frames: frames.add(frame) frame = frame.f_back self.external_frames = ( frames # just to prevent dealllocation and keep the id()s valid ) self.external_frame_ids = {id(f) for f in frames} def gen_stack_descriptions(self) -> Tuple[str, ...]: f: Any = _getframe().f_back.f_back # type: ignore # TODO: if we deprecate 3.7, we could try this instead of the above: # frames = [f := f.f_back or f for _ in range(8)] frames = [] external_frame_ids = self.external_frame_ids for _ in range(8): if id(f) in external_frame_ids: break frames.append(f) f = f.f_back # TODO: To help oracles, I'd like to add sub-line resolution via f.f_lasti; # however, in Python >= 3.11, the instruction pointer can shift between # PRECALL and CALL opcodes, triggering our nondeterminism check. return tuple(f"{f.f_code.co_filename}:{f.f_lineno}" for f in frames) def check_timeout(self): if monotonic() > self.execution_deadline: debug( "Path execution timeout after making ", len(self.choices_made), " choices.", ) raise PathTimeout def choose_possible( self, expr: z3.ExprRef, probability_true: Optional[float] = None ) -> bool: known_result = self._exprs_known.get(expr) if isinstance(known_result, bool): return known_result if is_tracing(): raise CrosshairInternal if self._search_position.is_stem(): # We only allow time outs at stems - that's because we don't want # to think about how mutating an existing path branch would work: self.check_timeout() node = self._search_position.grow_into( WorstResultNode(self._random, expr, self.solver) ) else: node = self._search_position.simplify() # type: ignore if not isinstance(node, WorstResultNode): debug(" *** Begin Not Deterministic Debug *** ") debug(" Traceback: ", test_stack()) debug("Decision expression:") debug(f" {expr}") debug("Now found at incompatible node of type:") debug(f" {type(node)}") debug(" *** End Not Deterministic Debug *** ") raise NotDeterministic if not z3.eq(node.expr, expr): debug(" *** Begin Not Deterministic Debug *** ") debug(" Traceback: ", test_stack()) debug("Decision expression changed from:") debug(f" {node.expr}") debug("To:") debug(f" {expr}") debug(" *** End Not Deterministic Debug *** ") raise NotDeterministic self._search_position = node # NOTE: format_stack() is more human readable, but it pulls source file contents, # so it is (1) slow, and (2) unstable when source code changes while we are checking. stacktail = self.gen_stack_descriptions() assert isinstance(node, SearchTreeNode) if not node.stacktail: node.stacktail = stacktail else: if node.stacktail != stacktail: debug(" *** Begin Not Deterministic Debug *** ") debug(" First state: ") debug(node.stacktail) debug(" Current state: ") debug(stacktail) debug(" Decision points prior to this:") for choice in self.choices_made: debug(" ", choice) debug(" Stack Diff: ") import difflib debug("\n".join(difflib.context_diff(node.stacktail, stacktail))) debug(" *** End Not Deterministic Debug *** ") raise NotDeterministic choose_true, chosen_probability, stem = node.choose( self, probability_true=probability_true ) branch_counter = self._root._open_coverage[stacktail] if choose_true: branch_counter.pos_ct += 1 else: branch_counter.neg_ct += 1 self.choices_made.append(node) self._search_position = stem chosen_expr = expr if choose_true else z3Not(expr) if in_debug(): debug( "SMT chose:", chosen_expr, "(chance:", chosen_probability, ")", ) z3Aassert(self.solver, chosen_expr) self._exprs_known[expr] = choose_true return choose_true def find_model_value(self, expr: z3.ExprRef) -> Any: with NoTracing(): while True: if self._search_position.is_stem(): self._search_position = self._search_position.grow_into( ModelValueNode(self._random, expr, self.solver) ) node = self._search_position.simplify() if isinstance(node, SearchLeaf): raise CrosshairInternal( f"Cannot use symbolics; path is already terminated" ) if not isinstance(node, ModelValueNode): debug(" *** Begin Not Deterministic Debug *** ") debug(f"Model value node expected; found {type(node)} instead.") debug(" Traceback: ", test_stack()) debug(" *** End Not Deterministic Debug *** ") raise NotDeterministic (chosen, _, next_node) = node.choose(self, probability_true=1.0) self.choices_made.append(node) self._search_position = next_node if chosen: self.solver.add(expr == node.condition_value) ret = model_value_to_python(node.condition_value) if ( in_debug() and not self.is_detached and expr not in self._already_logged ): self._already_logged.add(expr) debug("SMT realized symbolic:", expr, "==", repr(ret)) debug("Realized at", test_stack()) return ret else: self.solver.add(expr != node.condition_value) def find_model_value_for_function(self, expr: z3.ExprRef) -> object: if not solver_is_sat(self.solver): raise CrosshairInternal("model unexpectedly became unsatisfiable") # TODO: this need to go into a tree node that returns UNKNOWN or worse # (because it just returns one example function; it's not covering the space) # TODO: note this is also unsound - after completion, the solver isn't # bound to the returned interpretation. (but don't know how to add the # right constraints) Maybe just use arrays instead. return self.solver.model()[expr] def current_snapshot(self) -> SnapshotRef: return SnapshotRef(len(self.heaps) - 1) def checkpoint(self): self.heaps.append( [(ref, typ, copy.deepcopy(val)) for (ref, typ, val) in self.heaps[-1]] ) def add_value_to_heaps(self, ref: z3.ExprRef, typ: Type, value: object) -> None: # TODO: needs more testing for heap in self.heaps[:-1]: heap.append((ref, typ, copy.deepcopy(value))) self.heaps[-1].append((ref, typ, value)) def find_key_in_heap( self, ref: z3.ExprRef, typ: Type, proxy_generator: Callable[[Type], object], snapshot: SnapshotRef = SnapshotRef(-1), ) -> object: with NoTracing(): # TODO: needs more testing for (curref, curtyp, curval) in self.heaps[snapshot]: # TODO: using unify() is almost certainly wrong; just because the types # have some instances in common does not mean that `curval` actually # satisfies the requirements of `typ`: could_match = dynamic_typing.unify(curtyp, typ) if not could_match: continue if self.smt_fork(curref == ref, probability_true=0.1): debug( "HEAP key lookup ", ref, ": Found existing. ", "type:", name_of_type(type(curval)), "id:", id(curval) % 1000, ) return curval ret = proxy_generator(typ) debug( "HEAP key lookup ", ref, ": Created new. ", "type:", name_of_type(type(ret)), "id:", id(ret) % 1000, ) self.add_value_to_heaps(ref, typ, ret) return ret def uniq(self): self.next_uniq += 1 return "_{:x}".format(self.next_uniq) def smt_fork( self, expr: Optional[z3.ExprRef] = None, desc: Optional[str] = None, probability_true: Optional[float] = None, ) -> bool: if expr is None: expr = z3.Bool((desc or "fork") + self.uniq()) return self.choose_possible(expr, probability_true) def defer_assumption(self, description: str, checker: Callable[[], bool]) -> None: self._deferred_assumptions.append((description, checker)) def detach_path(self) -> None: """ Mark the current path exhausted. Also verifies all deferred assumptions. After detaching, the space may continue to be used (for example, to print realized symbolics). """ assert is_tracing() with NoTracing(): if self.is_detached: debug("Path is already detached") return else: # Give ourselves a time extension for deferred assumptions and # (likely) counterexample generation to follow. self.execution_deadline += 2.0 for description, checker in self._deferred_assumptions: with ResumedTracing(): check_ret = checker() if not prefer_true(check_ret): raise IgnoreAttempt("deferred assumption failed: " + description) self.is_detached = True assert self._search_position.is_stem() node = self._search_position.grow_into(DeatchedPathNode()) assert node.child.is_stem() self.choices_made.append(node) self._search_position = node.child debug("Detached from search tree") def cap_result_at_unknown(self): self.status_cap = VerificationStatus.UNKNOWN def bubble_status( self, analysis: CallAnalysis ) -> Tuple[Optional[CallAnalysis], bool]: # In some cases, we might ignore an attempt while not at a leaf. if self._search_position.is_stem(): self._search_position = self._search_position.grow_into( SearchLeaf(analysis) ) else: self._search_position = self._search_position.simplify() assert isinstance(self._search_position, SearchTreeNode) self._search_position.exhausted = True self._search_position.result = analysis self._root.pathing_oracle.post_path_hook(self.choices_made) if not self.choices_made: return (analysis, True) for node in reversed(self.choices_made): node.update_result(analysis) if in_debug(): for line in debug_path_tree( self._root, set(self.choices_made + [self._search_position]) ): debug(line) # debug('Path summary:', self.choices_made) first = self.choices_made[0] analysis = first.get_result() verification_status = analysis.verification_status if self.status_cap is not None and verification_status is not None: analysis.verification_status = min(verification_status, self.status_cap) return (analysis, first.is_exhausted())
class SimpleStateSpace(StateSpace): def __init__(self): super().__init__(monotonic() + 10000.0, 10000.0, RootNode()) self.mark_all_parent_frames()