Module mdd.mdd
Expand source code
from __future__ import annotations
import re
import itertools
import uuid
from functools import reduce
from typing import Any, Callable, Dict, Hashable
from typing import Iterable, Optional, Sequence, Set, Tuple, Union
from typing import FrozenSet
import aiger_bv as BV
import aiger_bdd
import attr
from aiger_bv.bundle import Bundle
from aiger_bv.expr import UnsignedBVExpr
from attr._make import Attribute
from dd.autoref import BDD
Assignment = Tuple[Dict[str, Any], Any]
INDEX_SPLITTER = re.compile(r"(.*)\[(.*)\]")
@attr.s(frozen=True, auto_attribs=True)
class Variable:
"""BDD representation of a multi-valued variable."""
valid: BV.UnsignedBVExpr = attr.ib()
encode: Callable[[Any], int]
decode: Callable[[int], Any]
def size(self) -> int:
"""Returns number of values Variable can take on."""
return aiger_bdd.count(self.valid)
@valid.validator
def check_bitvector_input(self, _: Attribute, value: UnsignedBVExpr):
if len(value.inputs) != 1:
raise ValueError("valid must be over single bitvector input!")
@property
def _name_bundle(self) -> Tuple[str, int]:
imap = self.valid.aigbv.imap
(name, bundle), *_ = imap.items()
return name, bundle
@property
def name(self) -> str:
return self._name_bundle[0]
@property
def bundle(self) -> Bundle:
return self.valid.aigbv.imap[self.name]
def with_name(self, name: str) -> Variable:
"""Create a copy of this Variable with a new name."""
if self.name == name:
return self
valid_circ = self.valid.aigbv["i", {self.name: name}]
return attr.evolve(self, valid=BV.UnsignedBVExpr(valid_circ))
@property
def _encoding_size(self) -> int:
return self.bundle.size
def expr(self) -> BV.UnsignedBVExpr:
"""Returns Aiger BitVector representing this variable."""
return BV.uatom(self._encoding_size, self.name)
VariableLike = Union[Sequence[Hashable], Variable]
def pow2_exponent(val: int) -> int:
"""Compute exponent of power of 2."""
count = 0
while val > 1:
count += 1
val >>= 1
assert val == 1
return count
def to_bdd(circ_or_expr, manager: Optional[BDD] = None) -> BDD:
"""Convert py-aiger compatible object into a BDD."""
return aiger_bdd.to_bdd(circ_or_expr, manager=manager, renamer=lambda _, x: x)[0]
Domain = Union[Iterable[Any], Variable]
def to_var(domain: Domain, name: Optional[str] = None) -> Variable:
"""Create a named variable taking on values in `domain`.
Uses a 1-hot encoding of domain, i.e., one variable per
element. For more efficient encoding consider creating Variable
directly.
If `name is None`, then unique name selected when creating
Variable.
"""
if isinstance(domain, Variable):
return domain if (name is None) else domain.with_name(name)
if name is None:
name = str(uuid.uuid1()) # Create unique name.
domain = tuple(domain)
tmp = BV.atom(len(domain), name, signed=True)
# Create circuit representing classic onehot bit trick.
one_hot = (tmp != 0) & ((tmp & (tmp - 1)) == 0)
one_hot = BV.UnsignedBVExpr(one_hot.aigbv) # Forget about sign.
return Variable(
valid=one_hot.with_output("valid"),
encode=lambda val: 1 << domain.index(val),
decode=lambda val: domain[pow2_exponent(val)],
)
Variables = Dict[str, Variable]
def to_vars(vals: Union[Iterable[Variable], Dict[str, Domain]]) -> Variables:
if isinstance(vals, dict):
vals = (to_var(name=key, domain=value) for key, value in vals.items())
return {var.name: var for var in vals}
@attr.s(frozen=True, auto_attribs=True)
class Interface:
"""Input output interface of Multi-valued Decision Diagram."""
_inputs: Variables = attr.ib(converter=to_vars)
output: Variable = attr.ib(converter=to_var)
applied: FrozenSet[str] = frozenset()
def __attrs_post_init__(self):
names = list(self._inputs.keys()) + [self.output.name]
if len(names) != len(set(names)):
raise ValueError("All input names must be unique!")
@property
def inputs(self) -> Iterable[Variable]:
return [var for var in self._inputs.values()]
def valid(self) -> BV.UnsignedBVExpr:
"""Circuit testing if input assignment is valid."""
valid_tests = (var.valid for var in self.inputs)
return reduce(lambda x, y: x & y, valid_tests)
def constantly(self, output: Any, manager: Optional[BDD] = None) -> MDD:
"""Return MDD returns `output` for any input."""
encoded = self.output.encode(output)
assert self.output.valid({self.output.name: encoded})[0]
# Create BDD that only depends on hot variable in encoded.
index = pow2_exponent(encoded)
expr = self.output.expr()[index] & self.valid()
bdd = to_bdd(expr, manager=manager)
return DecisionDiagram(interface=self, bdd=bdd)
def lift(self, bdd_or_aig, manager: None = None) -> MDD:
"""Wrap bdd or py-aiger object using this interface.
Note: Output is assumed to be 1-hot encoded!
"""
if hasattr(bdd_or_aig, "aig"):
bdd = to_bdd(bdd_or_aig)
return DecisionDiagram(interface=self, bdd=bdd)
def var(self, name: str) -> Variable:
"""Get `Variable` for `name` in this interface."""
return self._inputs.get(name, self.output)
@attr.s(frozen=True, auto_attribs=True)
class DecisionDiagram:
interface: Interface
bdd: BDD
def __attrs_post_init__(self):
"""Check that bdd conforms to interface."""
bdd_vars: Set[str] = set(self.bdd.bdd.vars)
interface_vars: Set[str] = set()
io = self.interface
for var in itertools.chain(io.inputs, [io.output]):
interface_vars |= set(var.bundle)
if bdd_vars != interface_vars:
diff = bdd_vars.symmetric_difference(interface_vars)
raise ValueError(
"Input AIG or BDD does not agree with this"
f"interface.\n symmetric difference={diff}"
)
def let(self, inputs: Dict[str, Any]) -> MDD:
"""Return MDD where subset of inputs have been applied."""
vals: Dict[str, bool] = {}
for name, value in inputs.items():
var = self.interface.var(name)
encoded = var.encode(value)
assert var.valid({var.name: encoded})[0]
# Turn bitvector into individual assignments.
bundle = var.bundle
encoded = BV.encode_int(bundle.size, encoded, signed=False)
vals.update(bundle.blast(encoded))
bdd = self.bdd.let(**vals)
io = self.interface
io2 = attr.evolve(io, applied=io.applied | set(inputs))
return attr.evolve(self, bdd=bdd, interface=io2)
def __call__(self, inputs: Dict[str, Any]) -> Any:
"""Evaluate MDD on inputs."""
bdd = self.let(inputs).bdd
assert bdd.dag_size == 2, "Result should be single variable BDD."
# Return which decision this was.
match = INDEX_SPLITTER.match(bdd.var)
assert match is not None
name, idx_str = match.groups()
idx = int(idx_str)
output_var = self.interface.output
assert name == output_var.name
assert 0 <= idx < output_var._encoding_size
return output_var.decode(1 << idx)
def order(self, var_names: Sequence[str]):
"""Reorder underlying BDD to respect order seen in inputs.
As a side effect, this function turns off reordering.
"""
io = self.interface
levels: Dict[str, int] = {}
for name in var_names:
offset = len(levels)
var = io._inputs.get(name, io.output)
assert var.name == name, "Name doesn't match input or output."
size = var._encoding_size
levels.update(var.bundle.blast(range(offset, offset + size)))
assert len(levels) == len(self.bdd.bdd.vars)
self.bdd.bdd.reorder(levels)
self.bdd.bdd.configure(reordering=False)
def override(self, test, value: Union[Any, MDD]) -> MDD:
"""Return MDD where `value if test else self`.
Args:
test: Can be a BDD or and py-aiger compatible object.
value: Either an element of co-domain or another compatible
MDD.
"""
manager = self.bdd.bdd
if not isinstance(value, DecisionDiagram):
value = self.interface.constantly(value, manager=manager).bdd
if hasattr(test, "aig"):
test = to_bdd(test, manager=manager)
# Assuming test and value are BDDs now.
# test => value ~test => self.bdd.
bdd = ((~test) | value) & (test | self.bdd)
return attr.evolve(self, bdd=bdd)
MDD = DecisionDiagram
__all__ = ["DecisionDiagram", "Interface", "Variable", "to_var", "to_bdd"]
Functions
def to_bdd(circ_or_expr, manager: Optional[BDD] = None) ‑> dd.autoref.BDD
-
Convert py-aiger compatible object into a BDD.
Expand source code
def to_bdd(circ_or_expr, manager: Optional[BDD] = None) -> BDD: """Convert py-aiger compatible object into a BDD.""" return aiger_bdd.to_bdd(circ_or_expr, manager=manager, renamer=lambda _, x: x)[0]
def to_var(domain: Domain, name: Optional[str] = None) ‑> Variable
-
Create a named variable taking on values in
domain
.Uses a 1-hot encoding of domain, i.e., one variable per element. For more efficient encoding consider creating Variable directly.
If
name is None
, then unique name selected when creating Variable.Expand source code
def to_var(domain: Domain, name: Optional[str] = None) -> Variable: """Create a named variable taking on values in `domain`. Uses a 1-hot encoding of domain, i.e., one variable per element. For more efficient encoding consider creating Variable directly. If `name is None`, then unique name selected when creating Variable. """ if isinstance(domain, Variable): return domain if (name is None) else domain.with_name(name) if name is None: name = str(uuid.uuid1()) # Create unique name. domain = tuple(domain) tmp = BV.atom(len(domain), name, signed=True) # Create circuit representing classic onehot bit trick. one_hot = (tmp != 0) & ((tmp & (tmp - 1)) == 0) one_hot = BV.UnsignedBVExpr(one_hot.aigbv) # Forget about sign. return Variable( valid=one_hot.with_output("valid"), encode=lambda val: 1 << domain.index(val), decode=lambda val: domain[pow2_exponent(val)], )
Classes
class DecisionDiagram (interface: Interface, bdd: BDD)
-
Expand source code
class DecisionDiagram: interface: Interface bdd: BDD def __attrs_post_init__(self): """Check that bdd conforms to interface.""" bdd_vars: Set[str] = set(self.bdd.bdd.vars) interface_vars: Set[str] = set() io = self.interface for var in itertools.chain(io.inputs, [io.output]): interface_vars |= set(var.bundle) if bdd_vars != interface_vars: diff = bdd_vars.symmetric_difference(interface_vars) raise ValueError( "Input AIG or BDD does not agree with this" f"interface.\n symmetric difference={diff}" ) def let(self, inputs: Dict[str, Any]) -> MDD: """Return MDD where subset of inputs have been applied.""" vals: Dict[str, bool] = {} for name, value in inputs.items(): var = self.interface.var(name) encoded = var.encode(value) assert var.valid({var.name: encoded})[0] # Turn bitvector into individual assignments. bundle = var.bundle encoded = BV.encode_int(bundle.size, encoded, signed=False) vals.update(bundle.blast(encoded)) bdd = self.bdd.let(**vals) io = self.interface io2 = attr.evolve(io, applied=io.applied | set(inputs)) return attr.evolve(self, bdd=bdd, interface=io2) def __call__(self, inputs: Dict[str, Any]) -> Any: """Evaluate MDD on inputs.""" bdd = self.let(inputs).bdd assert bdd.dag_size == 2, "Result should be single variable BDD." # Return which decision this was. match = INDEX_SPLITTER.match(bdd.var) assert match is not None name, idx_str = match.groups() idx = int(idx_str) output_var = self.interface.output assert name == output_var.name assert 0 <= idx < output_var._encoding_size return output_var.decode(1 << idx) def order(self, var_names: Sequence[str]): """Reorder underlying BDD to respect order seen in inputs. As a side effect, this function turns off reordering. """ io = self.interface levels: Dict[str, int] = {} for name in var_names: offset = len(levels) var = io._inputs.get(name, io.output) assert var.name == name, "Name doesn't match input or output." size = var._encoding_size levels.update(var.bundle.blast(range(offset, offset + size))) assert len(levels) == len(self.bdd.bdd.vars) self.bdd.bdd.reorder(levels) self.bdd.bdd.configure(reordering=False) def override(self, test, value: Union[Any, MDD]) -> MDD: """Return MDD where `value if test else self`. Args: test: Can be a BDD or and py-aiger compatible object. value: Either an element of co-domain or another compatible MDD. """ manager = self.bdd.bdd if not isinstance(value, DecisionDiagram): value = self.interface.constantly(value, manager=manager).bdd if hasattr(test, "aig"): test = to_bdd(test, manager=manager) # Assuming test and value are BDDs now. # test => value ~test => self.bdd. bdd = ((~test) | value) & (test | self.bdd) return attr.evolve(self, bdd=bdd)
Class variables
var bdd : dd.autoref.BDD
var interface : Interface
Methods
def let(self, inputs: Dict[str, Any]) ‑> DecisionDiagram
-
Return MDD where subset of inputs have been applied.
Expand source code
def let(self, inputs: Dict[str, Any]) -> MDD: """Return MDD where subset of inputs have been applied.""" vals: Dict[str, bool] = {} for name, value in inputs.items(): var = self.interface.var(name) encoded = var.encode(value) assert var.valid({var.name: encoded})[0] # Turn bitvector into individual assignments. bundle = var.bundle encoded = BV.encode_int(bundle.size, encoded, signed=False) vals.update(bundle.blast(encoded)) bdd = self.bdd.let(**vals) io = self.interface io2 = attr.evolve(io, applied=io.applied | set(inputs)) return attr.evolve(self, bdd=bdd, interface=io2)
def order(self, var_names: Sequence[str])
-
Reorder underlying BDD to respect order seen in inputs.
As a side effect, this function turns off reordering.
Expand source code
def order(self, var_names: Sequence[str]): """Reorder underlying BDD to respect order seen in inputs. As a side effect, this function turns off reordering. """ io = self.interface levels: Dict[str, int] = {} for name in var_names: offset = len(levels) var = io._inputs.get(name, io.output) assert var.name == name, "Name doesn't match input or output." size = var._encoding_size levels.update(var.bundle.blast(range(offset, offset + size))) assert len(levels) == len(self.bdd.bdd.vars) self.bdd.bdd.reorder(levels) self.bdd.bdd.configure(reordering=False)
def override(self, test, value: Union[Any, MDD]) ‑> DecisionDiagram
-
Return MDD where
value if test else self
.Args
test
- Can be a BDD or and py-aiger compatible object.
value
- Either an element of co-domain or another compatible MDD.
Expand source code
def override(self, test, value: Union[Any, MDD]) -> MDD: """Return MDD where `value if test else self`. Args: test: Can be a BDD or and py-aiger compatible object. value: Either an element of co-domain or another compatible MDD. """ manager = self.bdd.bdd if not isinstance(value, DecisionDiagram): value = self.interface.constantly(value, manager=manager).bdd if hasattr(test, "aig"): test = to_bdd(test, manager=manager) # Assuming test and value are BDDs now. # test => value ~test => self.bdd. bdd = ((~test) | value) & (test | self.bdd) return attr.evolve(self, bdd=bdd)
class Interface (inputs, output, applied: FrozenSet[str] = frozenset())
-
Input output interface of Multi-valued Decision Diagram.
Expand source code
class Interface: """Input output interface of Multi-valued Decision Diagram.""" _inputs: Variables = attr.ib(converter=to_vars) output: Variable = attr.ib(converter=to_var) applied: FrozenSet[str] = frozenset() def __attrs_post_init__(self): names = list(self._inputs.keys()) + [self.output.name] if len(names) != len(set(names)): raise ValueError("All input names must be unique!") @property def inputs(self) -> Iterable[Variable]: return [var for var in self._inputs.values()] def valid(self) -> BV.UnsignedBVExpr: """Circuit testing if input assignment is valid.""" valid_tests = (var.valid for var in self.inputs) return reduce(lambda x, y: x & y, valid_tests) def constantly(self, output: Any, manager: Optional[BDD] = None) -> MDD: """Return MDD returns `output` for any input.""" encoded = self.output.encode(output) assert self.output.valid({self.output.name: encoded})[0] # Create BDD that only depends on hot variable in encoded. index = pow2_exponent(encoded) expr = self.output.expr()[index] & self.valid() bdd = to_bdd(expr, manager=manager) return DecisionDiagram(interface=self, bdd=bdd) def lift(self, bdd_or_aig, manager: None = None) -> MDD: """Wrap bdd or py-aiger object using this interface. Note: Output is assumed to be 1-hot encoded! """ if hasattr(bdd_or_aig, "aig"): bdd = to_bdd(bdd_or_aig) return DecisionDiagram(interface=self, bdd=bdd) def var(self, name: str) -> Variable: """Get `Variable` for `name` in this interface.""" return self._inputs.get(name, self.output)
Class variables
var applied : FrozenSet[str]
var output : Variable
Instance variables
var inputs : Iterable[Variable]
-
Expand source code
@property def inputs(self) -> Iterable[Variable]: return [var for var in self._inputs.values()]
Methods
def constantly(self, output: Any, manager: Optional[BDD] = None) ‑> DecisionDiagram
-
Return MDD returns
output
for any input.Expand source code
def constantly(self, output: Any, manager: Optional[BDD] = None) -> MDD: """Return MDD returns `output` for any input.""" encoded = self.output.encode(output) assert self.output.valid({self.output.name: encoded})[0] # Create BDD that only depends on hot variable in encoded. index = pow2_exponent(encoded) expr = self.output.expr()[index] & self.valid() bdd = to_bdd(expr, manager=manager) return DecisionDiagram(interface=self, bdd=bdd)
def lift(self, bdd_or_aig, manager: None = None) ‑> DecisionDiagram
-
Wrap bdd or py-aiger object using this interface.
Note: Output is assumed to be 1-hot encoded!
Expand source code
def lift(self, bdd_or_aig, manager: None = None) -> MDD: """Wrap bdd or py-aiger object using this interface. Note: Output is assumed to be 1-hot encoded! """ if hasattr(bdd_or_aig, "aig"): bdd = to_bdd(bdd_or_aig) return DecisionDiagram(interface=self, bdd=bdd)
def valid(self) ‑> aiger_bv.expr.UnsignedBVExpr
-
Circuit testing if input assignment is valid.
Expand source code
def valid(self) -> BV.UnsignedBVExpr: """Circuit testing if input assignment is valid.""" valid_tests = (var.valid for var in self.inputs) return reduce(lambda x, y: x & y, valid_tests)
def var(self, name: str) ‑> Variable
-
Get
Variable
forname
in this interface.Expand source code
def var(self, name: str) -> Variable: """Get `Variable` for `name` in this interface.""" return self._inputs.get(name, self.output)
class Variable (valid: BV.UnsignedBVExpr, encode: Callable[[Any], int], decode: Callable[[int], Any])
-
BDD representation of a multi-valued variable.
Expand source code
class Variable: """BDD representation of a multi-valued variable.""" valid: BV.UnsignedBVExpr = attr.ib() encode: Callable[[Any], int] decode: Callable[[int], Any] def size(self) -> int: """Returns number of values Variable can take on.""" return aiger_bdd.count(self.valid) @valid.validator def check_bitvector_input(self, _: Attribute, value: UnsignedBVExpr): if len(value.inputs) != 1: raise ValueError("valid must be over single bitvector input!") @property def _name_bundle(self) -> Tuple[str, int]: imap = self.valid.aigbv.imap (name, bundle), *_ = imap.items() return name, bundle @property def name(self) -> str: return self._name_bundle[0] @property def bundle(self) -> Bundle: return self.valid.aigbv.imap[self.name] def with_name(self, name: str) -> Variable: """Create a copy of this Variable with a new name.""" if self.name == name: return self valid_circ = self.valid.aigbv["i", {self.name: name}] return attr.evolve(self, valid=BV.UnsignedBVExpr(valid_circ)) @property def _encoding_size(self) -> int: return self.bundle.size def expr(self) -> BV.UnsignedBVExpr: """Returns Aiger BitVector representing this variable.""" return BV.uatom(self._encoding_size, self.name)
Class variables
var decode : Callable[[int], Any]
var encode : Callable[[Any], int]
var valid : aiger_bv.expr.UnsignedBVExpr
Instance variables
var bundle : aiger_bv.bundle.Bundle
-
Expand source code
@property def bundle(self) -> Bundle: return self.valid.aigbv.imap[self.name]
var name : str
-
Expand source code
@property def name(self) -> str: return self._name_bundle[0]
Methods
def check_bitvector_input(self, _: Attribute, value: UnsignedBVExpr)
-
Expand source code
@valid.validator def check_bitvector_input(self, _: Attribute, value: UnsignedBVExpr): if len(value.inputs) != 1: raise ValueError("valid must be over single bitvector input!")
def expr(self) ‑> aiger_bv.expr.UnsignedBVExpr
-
Returns Aiger BitVector representing this variable.
Expand source code
def expr(self) -> BV.UnsignedBVExpr: """Returns Aiger BitVector representing this variable.""" return BV.uatom(self._encoding_size, self.name)
def size(self) ‑> int
-
Returns number of values Variable can take on.
Expand source code
def size(self) -> int: """Returns number of values Variable can take on.""" return aiger_bdd.count(self.valid)
def with_name(self, name: str) ‑> Variable
-
Create a copy of this Variable with a new name.
Expand source code
def with_name(self, name: str) -> Variable: """Create a copy of this Variable with a new name.""" if self.name == name: return self valid_circ = self.valid.aigbv["i", {self.name: name}] return attr.evolve(self, valid=BV.UnsignedBVExpr(valid_circ))