From 2f42fc60424596ec23c24f40f676398856372e8e Mon Sep 17 00:00:00 2001 From: Takashi Imamichi Date: Wed, 29 May 2024 22:52:30 +0900 Subject: [PATCH] port sampling_vqe and qaoa --- .pylintdict | 10 + qiskit_optimization/compat/__init__.py | 35 ++ .../compat/diagonal_estimator.py | 209 ++++++++++ qiskit_optimization/compat/qaoa.py | 144 +++++++ qiskit_optimization/compat/sampling_vqe.py | 389 ++++++++++++++++++ 5 files changed, 787 insertions(+) create mode 100644 qiskit_optimization/compat/__init__.py create mode 100644 qiskit_optimization/compat/diagonal_estimator.py create mode 100644 qiskit_optimization/compat/qaoa.py create mode 100644 qiskit_optimization/compat/sampling_vqe.py diff --git a/.pylintdict b/.pylintdict index b4bd3457..33d7dcdc 100644 --- a/.pylintdict +++ b/.pylintdict @@ -6,6 +6,7 @@ aer's al annealers ansatz +ansatzes apidocs applegate args @@ -16,6 +17,7 @@ backend backends barkoutsos benchmarking +bfgs bitstring bitstrings bixby @@ -65,12 +67,14 @@ farhi fmin formatter func +functools fred fval fx gambella geq getter +getters glover goemans goldstone @@ -81,6 +85,7 @@ gurobi gurobioptimizer gurobipy gutmann +hadfield hamilton hamiltonian hamiltonians @@ -99,6 +104,7 @@ iprint ising iter iteratively +jac july karimi kirkpatrick @@ -115,6 +121,7 @@ lp lucas macos makefile +marecek masahito matplotlib maxcut @@ -149,6 +156,7 @@ optimizationresultstatus optimizers panchenko param +parameterizations params parikh passmanager @@ -207,6 +215,7 @@ simonetto slsqp smode smoothen +spedalieri spsa src statevector @@ -249,6 +258,7 @@ wecker whitespace wiesner williamson +woerner xs ys zemlin diff --git a/qiskit_optimization/compat/__init__.py b/qiskit_optimization/compat/__init__.py new file mode 100644 index 00000000..acb3b53e --- /dev/null +++ b/qiskit_optimization/compat/__init__.py @@ -0,0 +1,35 @@ +# This code is part of a Qiskit project. +# +# (C) Copyright IBM 2024. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. +""" +Compatibility module (:mod:`qiskit_optimization.compat`) +======================================================= + +Algorithms copied from qiskit-algorithms, which are compatible with Sampler V2. + +.. currentmodule:: qiskit_optimization.compat + +Algorithms +---------- + +.. autosummary:: + :toctree: ../stubs/ + :nosignatures: + + SamplingVQE + QAOA + +""" + +from .qaoa import QAOA +from .sampling_vqe import SamplingVQE + +__all__ = ["SamplingVQE", "QAOA"] diff --git a/qiskit_optimization/compat/diagonal_estimator.py b/qiskit_optimization/compat/diagonal_estimator.py new file mode 100644 index 00000000..d7109b15 --- /dev/null +++ b/qiskit_optimization/compat/diagonal_estimator.py @@ -0,0 +1,209 @@ +# This code is part of a Qiskit project. +# +# (C) Copyright IBM 2022, 2024. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +"""Expectation value for a diagonal observable using a sampler primitive.""" + +from __future__ import annotations + +from collections.abc import Callable, Iterable, Mapping, MappingView, Sequence +from typing import Any + +import numpy as np +from qiskit.circuit import QuantumCircuit +from qiskit.primitives import BaseEstimator, BaseSamplerV1, BaseSamplerV2 +from qiskit.primitives.utils import _circuit_key, init_observable +from qiskit.quantum_info import SparsePauliOp +from qiskit.quantum_info.operators.base_operator import BaseOperator +from qiskit_algorithms.algorithm_job import AlgorithmJob +from qiskit_algorithms.minimum_eigensolvers.diagonal_estimator import _DiagonalEstimatorResult + + +class _DiagonalEstimator(BaseEstimator): + """An estimator for diagonal observables.""" + + def __init__( + self, + sampler: BaseSamplerV1 | BaseSamplerV2, + aggregation: float | Callable[[Iterable[tuple[float, float]]], float] | None = None, + callback: Callable[[Sequence[Mapping[str, Any]]], None] | None = None, + **options, + ) -> None: + r"""Evaluate the expectation of quantum state with respect to a diagonal operator. + + Args: + sampler: The sampler used to evaluate the circuits. + aggregation: The aggregation function to aggregate the measurement outcomes. If a float + this specified the CVaR :math:`\alpha` parameter. + callback: A callback which is given the best measurements of all circuits in each + evaluation. + run_options: Options for the sampler. + + """ + super().__init__(options=options) + self._circuits: list[QuantumCircuit] = [] # See Qiskit pull request 11051 + self._parameters: list[MappingView] = [] + self._observables: list[SparsePauliOp] = [] + + self.sampler = sampler + if not callable(aggregation): + aggregation = _get_cvar_aggregation(aggregation) + + self.aggregation = aggregation + self.callback = callback + self._circuit_ids: dict[int, QuantumCircuit] = {} + self._observable_ids: dict[int, BaseOperator] = {} + + def _run( + self, + circuits: Sequence[QuantumCircuit], + observables: Sequence[BaseOperator], + parameter_values: Sequence[Sequence[float]], + **run_options, + ) -> AlgorithmJob: + circuit_indices = [] + for circuit in circuits: + key = _circuit_key(circuit) + index = self._circuit_ids.get(key) + if index is not None: + circuit_indices.append(index) + else: + circuit_indices.append(len(self._circuits)) + self._circuit_ids[key] = len(self._circuits) + self._circuits.append(circuit) + self._parameters.append(circuit.parameters) + observable_indices = [] + for observable in observables: + index = self._observable_ids.get(id(observable)) + if index is not None: + observable_indices.append(index) + else: + observable_indices.append(len(self._observables)) + self._observable_ids[id(observable)] = len(self._observables) + converted_observable = init_observable(observable) + _check_observable_is_diagonal(converted_observable) # check it's diagonal + self._observables.append(converted_observable) + job = AlgorithmJob( + self._call, circuit_indices, observable_indices, parameter_values, **run_options + ) + job.submit() + return job + + def _call( + self, + circuits: Sequence[int], + observables: Sequence[int], + parameter_values: Sequence[Sequence[float]], + **run_options, + ) -> _DiagonalEstimatorResult: + if isinstance(self.sampler, BaseSamplerV1): + job = self.sampler.run( + [self._circuits[i] for i in circuits], + parameter_values, + **run_options, + ) + sampler_result = job.result() + metadata = sampler_result.metadata + samples = sampler_result.quasi_dists + else: # BaseSamplerV2 + job = self.sampler.run( + [(self._circuits[i], val) for i, val in zip(circuits, parameter_values)], + **run_options, + ) + sampler_pub_result = job.result() + metadata = [] + samples = [] + for i, result in zip(circuits, sampler_pub_result): + creg = self._circuits[i].cregs[0].name + counts = getattr(result.data, creg).get_int_counts() + shots = sum(counts.values()) + samples.append({key: val / shots for key, val in counts.items()}) + metadata.append(result.metadata) + + # a list of dictionaries containing: {state: (measurement probability, value)} + evaluations: list[dict[int, tuple[float, float]]] = [ + { + state: (probability, _evaluate_sparsepauli(state, self._observables[i])) + for state, probability in sampled.items() + } + for i, sampled in zip(observables, samples) + ] + + results = np.array([self.aggregation(evaluated.values()) for evaluated in evaluations]) + + # get the best measurements + best_measurements = [] + num_qubits = self._circuits[0].num_qubits + for evaluated in evaluations: + best_result = min(evaluated.items(), key=lambda x: x[1][1]) + best_measurements.append( + { + "state": best_result[0], + "bitstring": bin(best_result[0])[2:].zfill(num_qubits), + "value": best_result[1][1], + "probability": best_result[1][0], + } + ) + + if self.callback is not None: + self.callback(best_measurements) + + return _DiagonalEstimatorResult( + values=results, metadata=metadata, best_measurements=best_measurements + ) + + +def _get_cvar_aggregation(alpha: float | None) -> Callable[[Iterable[tuple[float, float]]], float]: + """Get the aggregation function for CVaR with confidence level ``alpha``.""" + if alpha is None: + alpha = 1 + elif not 0 <= alpha <= 1: + raise ValueError(f"alpha must be in [0, 1] but was {alpha}") + + # if alpha is close to 1 we can avoid the sorting + if np.isclose(alpha, 1): + + def aggregate(measurements: Iterable[tuple[float, float]]) -> float: + return sum(probability * value for probability, value in measurements) + + else: + + def aggregate(measurements: Iterable[tuple[float, float]]) -> float: + # sort by values + sorted_measurements = sorted(measurements, key=lambda x: x[1]) + + accumulated_percent = 0.0 # once alpha is reached, stop + cvar = 0.0 + for probability, value in sorted_measurements: + cvar += value * min(probability, alpha - accumulated_percent) + accumulated_percent += probability + if accumulated_percent >= alpha: + break + + return cvar / alpha + + return aggregate + + +_PARITY = np.array([-1 if bin(i).count("1") % 2 else 1 for i in range(256)], dtype=np.complex128) + + +def _evaluate_sparsepauli(state: int, observable: SparsePauliOp) -> float: + packed_uint8 = np.packbits(observable.paulis.z, axis=1, bitorder="little") + state_bytes = np.frombuffer(state.to_bytes(packed_uint8.shape[1], "little"), dtype=np.uint8) + reduced = np.bitwise_xor.reduce(packed_uint8 & state_bytes, axis=1) + return np.sum(observable.coeffs * _PARITY[reduced]) + + +def _check_observable_is_diagonal(observable: SparsePauliOp) -> None: + is_diagonal = not np.any(observable.paulis.x) + if not is_diagonal: + raise ValueError("The observable must be diagonal.") diff --git a/qiskit_optimization/compat/qaoa.py b/qiskit_optimization/compat/qaoa.py new file mode 100644 index 00000000..1393f60c --- /dev/null +++ b/qiskit_optimization/compat/qaoa.py @@ -0,0 +1,144 @@ +# This code is part of a Qiskit project. +# +# (C) Copyright IBM 2022, 2024. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +"""The quantum approximate optimization algorithm.""" + +from __future__ import annotations + +from typing import Any, Callable + +import numpy as np +from qiskit.circuit import QuantumCircuit +from qiskit.circuit.library.n_local.qaoa_ansatz import QAOAAnsatz +from qiskit.passmanager import BasePassManager +from qiskit.primitives import BaseSamplerV1, BaseSamplerV2 +from qiskit.quantum_info.operators.base_operator import BaseOperator +from qiskit_algorithms.optimizers import Minimizer, Optimizer +from qiskit_algorithms.utils.validation import validate_min + +from .sampling_vqe import SamplingVQE + + +class QAOA(SamplingVQE): + r""" + The Quantum Approximate Optimization Algorithm (QAOA). + + QAOA is a well-known algorithm for finding approximate solutions to combinatorial-optimization + problems [1]. + + The QAOA implementation directly extends :class:`.SamplingVQE` and inherits its optimization + structure. However, unlike VQE, which can be configured with arbitrary ansatzes, QAOA uses its + own fine-tuned ansatz, which comprises :math:`p` parameterized global :math:`x` rotations and + :math:`p` different parameterizations of the problem hamiltonian. QAOA is thus principally + configured by the single integer parameter, ``reps``, which dictates the depth of the ansatz, + and thus affects the approximation quality. + + An optional array of :math:`2p` parameter values, as the :attr:`initial_point`, may be provided + as the starting :math:`\beta` and :math:`\gamma` parameters for the QAOA ansatz [1]. + + An operator or a parameterized quantum circuit may optionally also be provided as a custom + :attr:`mixer` Hamiltonian. This allows in the case of quantum annealing [2] and QAOA [3], to run + constrained optimization problems where the mixer constrains the evolution to a feasible + subspace of the full Hilbert space. + + The following attributes can be set via the initializer but can also be read and updated once + the QAOA object has been constructed. + + Attributes: + sampler (BaseSampler): The sampler primitive to sample the circuits. + optimizer (Optimizer | Minimizer): A classical optimizer to find the minimum energy. This + can either be an :class:`.Optimizer` or a callable implementing the + :class:`.Minimizer` protocol. + reps (int): The integer parameter :math:`p`. Has a minimum valid value of 1. + initial_state: An optional initial state to prepend the QAOA circuit with. + mixer (QuantumCircuit | BaseOperator): The mixer Hamiltonian to evolve with or + a custom quantum circuit. Allows support of optimizations in constrained subspaces [2, + 3] as well as warm-starting the optimization [4]. + aggregation (float | Callable[[list[float]], float] | None): A float or callable to specify + how the objective function evaluated on the basis states should be aggregated. If a + float, this specifies the :math:`\alpha \in [0,1]` parameter for a CVaR expectation + value. + callback (Callable[[int, np.ndarray, float, dict[str, Any]], None] | None): A callback + that can access the intermediate data at each optimization step. These data are: the + evaluation count, the optimizer parameters for the ansatz, the evaluated value, and + the metadata dictionary. + + References: + [1]: Farhi, E., Goldstone, J., Gutmann, S., "A Quantum Approximate Optimization Algorithm" + `arXiv:1411.4028 `__ + [2]: Hen, I., Spedalieri, F. M., "Quantum Annealing for Constrained Optimization" + `PhysRevApplied.5.034007 `__ + [3]: Hadfield, S. et al, "From the Quantum Approximate Optimization Algorithm to a Quantum + Alternating Operator Ansatz" `arXiv:1709.03489 `__ + [4]: Egger, D. J., Marecek, J., Woerner, S., "Warm-starting quantum optimization" + `arXiv: 2009.10095 `__ + """ + + def __init__( + self, + sampler: BaseSamplerV1 | BaseSamplerV2, + optimizer: Optimizer | Minimizer, + *, + reps: int = 1, + initial_state: QuantumCircuit | None = None, + mixer: QuantumCircuit | BaseOperator = None, + initial_point: np.ndarray | None = None, + aggregation: float | Callable[[list[float]], float] | None = None, + callback: Callable[[int, np.ndarray, float, dict[str, Any]], None] | None = None, + passmanager: BasePassManager | None = None, + ) -> None: + r""" + Args: + sampler: The sampler primitive to sample the circuits. + optimizer: A classical optimizer to find the minimum energy. This can either be + an :class:`.Optimizer` or a callable implementing the :class:`.Minimizer` + protocol. + reps: The integer parameter :math:`p`. Has a minimum valid value of 1. + initial_state: An optional initial state to prepend the QAOA circuit with. + mixer: The mixer Hamiltonian to evolve with or a custom quantum circuit. Allows support + of optimizations in constrained subspaces [2, 3] as well as warm-starting the + optimization [4]. + initial_point: An optional initial point (i.e. initial parameter values) for the + optimizer. The length of the initial point must match the number of :attr:`ansatz` + parameters. If ``None``, a random point will be generated within certain parameter + bounds. ``QAOA`` will look to the ansatz for these bounds. If the ansatz does not + specify bounds, bounds of :math:`-2\pi`, :math:`2\pi` will be used. + aggregation: A float or callable to specify how the objective function evaluated on the + basis states should be aggregated. If a float, this specifies the :math:`\alpha \in + [0,1]` parameter for a CVaR expectation value. + callback: A callback that can access the intermediate data at each optimization step. + These data are: the evaluation count, the optimizer parameters for the ansatz, the + evaluated value, the metadata dictionary. + passmanager: A pass manager to transpile the circuits. + """ + validate_min("reps", reps, 1) + + self.reps = reps + self.mixer = mixer + self.initial_state = initial_state + self._cost_operator = None + + super().__init__( + sampler=sampler, + ansatz=None, + optimizer=optimizer, + initial_point=initial_point, + aggregation=aggregation, + callback=callback, + passmanager=passmanager, + ) + + def _check_operator_ansatz(self, operator: BaseOperator): + # Recreates a circuit based on operator parameter. + self.ansatz = QAOAAnsatz( + operator, self.reps, initial_state=self.initial_state, mixer_operator=self.mixer + ).decompose() # TODO remove decompose once #6674 is fixed diff --git a/qiskit_optimization/compat/sampling_vqe.py b/qiskit_optimization/compat/sampling_vqe.py new file mode 100644 index 00000000..283e5409 --- /dev/null +++ b/qiskit_optimization/compat/sampling_vqe.py @@ -0,0 +1,389 @@ +# This code is part of a Qiskit project. +# +# (C) Copyright IBM 2022, 2024. +# +# This code is licensed under the Apache License, Version 2.0. You may +# obtain a copy of this license in the LICENSE.txt file in the root directory +# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. +# +# Any modifications or derivative works of this code must retain this +# copyright notice, and modified files need to carry a notice indicating +# that they have been altered from the originals. + +"""The Variational Quantum Eigensolver algorithm, optimized for diagonal Hamiltonians.""" + +from __future__ import annotations + +import logging +from collections.abc import Callable +from time import time +from typing import Any + +import numpy as np +from qiskit.circuit import QuantumCircuit +from qiskit.passmanager import BasePassManager +from qiskit.primitives import BaseSamplerV1, BaseSamplerV2 +from qiskit.primitives.utils import init_observable +from qiskit.quantum_info.operators.base_operator import BaseOperator +from qiskit.result import QuasiDistribution +from qiskit_algorithms.exceptions import AlgorithmError +from qiskit_algorithms.list_or_dict import ListOrDict +from qiskit_algorithms.minimum_eigensolvers.sampling_mes import ( + SamplingMinimumEigensolver, + SamplingMinimumEigensolverResult, +) +from qiskit_algorithms.minimum_eigensolvers.sampling_vqe import ( + SamplingVQEResult, + _compare_measurements, +) +from qiskit_algorithms.observables_evaluator import estimate_observables +from qiskit_algorithms.optimizers import Minimizer, Optimizer, OptimizerResult +from qiskit_algorithms.utils import validate_bounds, validate_initial_point + +# private function as we expect this to be updated in the next released +from qiskit_algorithms.utils.set_batching import _set_default_batchsize +from qiskit_algorithms.variational_algorithm import VariationalAlgorithm + +from .diagonal_estimator import _DiagonalEstimator + +logger = logging.getLogger(__name__) + + +class SamplingVQE(VariationalAlgorithm, SamplingMinimumEigensolver): + r"""The Variational Quantum Eigensolver algorithm, optimized for diagonal Hamiltonians. + + VQE is a hybrid quantum-classical algorithm that uses a variational technique to find the + minimum eigenvalue of a given diagonal Hamiltonian operator :math:`H_{\text{diag}}`. + + In contrast to the :class:`~qiskit_algorithms.minimum_eigensolvers.VQE` class, the + ``SamplingVQE`` algorithm is executed using a :attr:`sampler` primitive. + + An instance of ``SamplingVQE`` also requires an :attr:`ansatz`, a parameterized + :class:`.QuantumCircuit`, to prepare the trial state :math:`|\psi(\vec\theta)\rangle`. It also + needs a classical :attr:`optimizer` which varies the circuit parameters :math:`\vec\theta` to + minimize the objective function, which depends on the chosen :attr:`aggregation`. + + The optimizer can either be one of Qiskit's optimizers, such as + :class:`~qiskit_algorithms.optimizers.SPSA` or a callable with the following signature: + + .. code-block:: python + + from qiskit_algorithms.optimizers import OptimizerResult + + def my_minimizer(fun, x0, jac=None, bounds=None) -> OptimizerResult: + # Note that the callable *must* have these argument names! + # Args: + # fun (callable): the function to minimize + # x0 (np.ndarray): the initial point for the optimization + # jac (callable, optional): the gradient of the objective function + # bounds (list, optional): a list of tuples specifying the parameter bounds + + result = OptimizerResult() + result.x = # optimal parameters + result.fun = # optimal function value + return result + + The above signature also allows one to use any SciPy minimizer, for instance as + + .. code-block:: python + + from functools import partial + from scipy.optimize import minimize + + optimizer = partial(minimize, method="L-BFGS-B") + + The following attributes can be set via the initializer but can also be read and updated once + the ``SamplingVQE`` object has been constructed. + + Attributes: + sampler (BaseSamplerV1 or BaseSamplerV2): The sampler primitive to sample the circuits. + ansatz (QuantumCircuit): A parameterized quantum circuit to prepare the trial state. + optimizer (Optimizer | Minimizer): A classical optimizer to find the minimum energy. This + can either be an :class:`.Optimizer` or a callable implementing the + :class:`.Minimizer` protocol. + aggregation (float | Callable[[list[tuple[float, complex]], float] | None): + A float or callable to specify how the objective function evaluated on the basis states + should be aggregated. If a float, this specifies the :math:`\alpha \in [0,1]` parameter + for a CVaR expectation value [1]. If a callable, it takes a list of basis state + measurements specified as ``[(probability, objective_value)]`` and return an objective + value as float. If None, all an ordinary expectation value is calculated. + callback (Callable[[int, np.ndarray, float, dict[str, Any]], None] | None): A callback that + can access the intermediate data at each optimization step. These data are: the + evaluation count, the optimizer parameters for the ansatz, the evaluated value, and the + metadata dictionary. + + References: + [1]: Barkoutsos, P. K., Nannicini, G., Robert, A., Tavernelli, I., and Woerner, S., + "Improving Variational Quantum Optimization using CVaR" + `arXiv:1907.04769 `_ + """ + + def __init__( + self, + sampler: BaseSamplerV1 | BaseSamplerV2, + ansatz: QuantumCircuit, + optimizer: Optimizer | Minimizer, + *, + initial_point: np.ndarray | None = None, + aggregation: float | Callable[[list[float]], float] | None = None, + callback: Callable[[int, np.ndarray, float, dict[str, Any]], None] | None = None, + passmanager: BasePassManager | None = None, + ) -> None: + r""" + Args: + sampler: The sampler primitive to sample the circuits. + ansatz: A parameterized quantum circuit to prepare the trial state. + optimizer: A classical optimizer to find the minimum energy. This can either be an + :class:`.Optimizer` or a callable implementing the :class:`.Minimizer` protocol. + initial_point: An optional initial point (i.e. initial parameter values) for the + optimizer. The length of the initial point must match the number of :attr:`ansatz` + parameters. If ``None``, a random point will be generated within certain parameter + bounds. ``SamplingVQE`` will look to the ansatz for these bounds. If the ansatz does + not specify bounds, bounds of :math:`-2\pi`, :math:`2\pi` will be used. + aggregation: A float or callable to specify how the objective function evaluated on the + basis states should be aggregated. + callback: A callback that can access the intermediate data at each optimization step. + These data are: the evaluation count, the optimizer parameters for the ansatz, the + estimated value, and the metadata dictionary. + """ + super().__init__() + + self.sampler = sampler + self.ansatz = ansatz + self.optimizer = optimizer + self.aggregation = aggregation + self.callback = callback + self.passmanager = passmanager + + # this has to go via getters and setters due to the VariationalAlgorithm interface + self._initial_point = initial_point + + @property + def initial_point(self) -> np.ndarray | None: + """Return the initial point.""" + return self._initial_point + + @initial_point.setter + def initial_point(self, value: np.ndarray | None) -> None: + """Set the initial point.""" + self._initial_point = value + + def _check_operator_ansatz(self, operator: BaseOperator): + """Check that the number of qubits of operator and ansatz match and that the ansatz is + parameterized. + """ + if operator.num_qubits != self.ansatz.num_qubits: + try: + logger.info( + "Trying to resize ansatz to match operator on %s qubits.", operator.num_qubits + ) + self.ansatz.num_qubits = operator.num_qubits + except AttributeError as error: + raise AlgorithmError( + "The number of qubits of the ansatz does not match the " + "operator, and the ansatz does not allow setting the " + "number of qubits using `num_qubits`." + ) from error + + if self.ansatz.num_parameters == 0: + raise AlgorithmError("The ansatz must be parameterized, but has no free parameters.") + + @classmethod + def supports_aux_operators(cls) -> bool: + return True + + def compute_minimum_eigenvalue( + self, + operator: BaseOperator, + aux_operators: ListOrDict[BaseOperator] | None = None, + ) -> SamplingMinimumEigensolverResult: + # check that the number of qubits of operator and ansatz match, and resize if possible + self._check_operator_ansatz(operator) + + if len(self.ansatz.clbits) > 0: + self.ansatz.remove_final_measurements() + self.ansatz.measure_all() + + initial_point = validate_initial_point(self.initial_point, self.ansatz) + + bounds = validate_bounds(self.ansatz) + + if self.passmanager: + ansatz: QuantumCircuit = self.passmanager.run(self.ansatz) + layout = ansatz.layout + operator = init_observable(operator) + operator = operator.apply_layout(layout) + if aux_operators: + if isinstance(aux_operators, list): + aux_operators = [op.apply_layout(layout) for op in aux_operators] + else: + aux_operators = { + key: op.apply_layout(layout) for key, op in aux_operators.items() + } + else: + ansatz = self.ansatz + + # NOTE: we type ignore below because the `return_best_measurement=True` is guaranteed to + # return a tuple + evaluate_energy, best_measurement = self._get_evaluate_energy( # type: ignore[misc] + operator, ansatz, return_best_measurement=True + ) + + start_time = time() + + if callable(self.optimizer): + optimizer_result = self.optimizer( + fun=evaluate_energy, + x0=initial_point, + jac=None, + bounds=bounds, + ) + else: + # we always want to submit as many estimations per job as possible for minimal + # overhead on the hardware + was_updated = _set_default_batchsize(self.optimizer) + + optimizer_result = self.optimizer.minimize( + fun=evaluate_energy, + x0=initial_point, + bounds=bounds, + ) + + # reset to original value + if was_updated: + self.optimizer.set_max_evals_grouped(None) + + optimizer_time = time() - start_time + + logger.info( + "Optimization complete in %s seconds.\nFound opt_params %s.", + optimizer_time, + optimizer_result.x, + ) + + if isinstance(self.sampler, BaseSamplerV1): + final_state = self.sampler.run([ansatz], [optimizer_result.x]).result().quasi_dists[0] + else: + result = self.sampler.run([(ansatz, optimizer_result.x)]).result()[0] + creg = ansatz.cregs[0].name + counts = getattr(result.data, creg).get_counts() + shots = sum(counts.values()) + final_state = QuasiDistribution( + {key: val / shots for key, val in counts.items()}, shots=shots + ) + + if aux_operators is not None: + aux_operators_evaluated = estimate_observables( + _DiagonalEstimator(sampler=self.sampler), + ansatz, + aux_operators, + optimizer_result.x, + ) + else: + aux_operators_evaluated = None + + return self._build_sampling_vqe_result( + self.ansatz.copy(), + optimizer_result, + aux_operators_evaluated, + best_measurement, + final_state, + optimizer_time, + ) + + def _get_evaluate_energy( + self, + operator: BaseOperator, + ansatz: QuantumCircuit, + return_best_measurement: bool = False, + ) -> ( + Callable[[np.ndarray], np.ndarray | float] + | tuple[Callable[[np.ndarray], np.ndarray | float], dict[str, Any]] + ): + """Returns a function handle to evaluate the energy at given parameters. + + This is the objective function to be passed to the optimizer that is used for evaluation. + + Args: + operator: The operator whose energy to evaluate. + ansatz: The ansatz preparing the quantum state. + return_best_measurement: If True, a handle to a dictionary containing the best + measurement evaluated with the cost function. + + Returns: + A tuple of a callable evaluating the energy and (optionally) a dictionary containing the + best measurement of the energy evaluation. + + Raises: + AlgorithmError: If the circuit is not parameterized (i.e. has 0 free parameters). + + """ + num_parameters = ansatz.num_parameters + if num_parameters == 0: + raise AlgorithmError("The ansatz must be parameterized, but has 0 free parameters.") + + # avoid creating an instance variable to remain stateless regarding results + eval_count = 0 + + best_measurement = {"best": None} + + def store_best_measurement(best): + for best_i in best: + if best_measurement["best"] is None or _compare_measurements( + best_i, best_measurement["best"] + ): + best_measurement["best"] = best_i + + estimator = _DiagonalEstimator( + sampler=self.sampler, + callback=store_best_measurement, + aggregation=self.aggregation, # type: ignore[arg-type] + ) + + def evaluate_energy(parameters: np.ndarray) -> np.ndarray | float: + nonlocal eval_count + # handle broadcasting: ensure parameters is of shape [array, array, ...] + parameters = np.reshape(parameters, (-1, num_parameters)).tolist() + batch_size = len(parameters) + + estimator_result = estimator.run( + batch_size * [ansatz], batch_size * [operator], parameters + ).result() + values = estimator_result.values + + if self.callback is not None: + metadata = estimator_result.metadata + for params, value, meta in zip(parameters, values, metadata): + eval_count += 1 + self.callback(eval_count, params, value, meta) + + result = values if len(values) > 1 else values[0] + return np.real(result) + + if return_best_measurement: + return evaluate_energy, best_measurement + + return evaluate_energy + + def _build_sampling_vqe_result( + self, + ansatz: QuantumCircuit, + optimizer_result: OptimizerResult, + aux_operators_evaluated: ListOrDict[tuple[complex, tuple[complex, int]]], + best_measurement: dict[str, Any], + final_state: QuasiDistribution, + optimizer_time: float, + ) -> SamplingVQEResult: + result = SamplingVQEResult() + result.eigenvalue = optimizer_result.fun + result.cost_function_evals = optimizer_result.nfev + result.optimal_point = optimizer_result.x + result.optimal_parameters = dict(zip(self.ansatz.parameters, optimizer_result.x)) + result.optimal_value = optimizer_result.fun + result.optimizer_time = optimizer_time + result.aux_operators_evaluated = aux_operators_evaluated + result.optimizer_result = optimizer_result + result.best_measurement = best_measurement["best"] + result.eigenstate = final_state + result.optimal_circuit = ansatz + return result