Download

Source code for eqc_direct.eqc_client

  • """
  • :class:`.EqcClient` contains all RPC calls to process, get system status,
  • and fetch results.
  • """
  • import logging
  • import time
  • import os
  • import warnings
  • from typing import TypedDict, List, Optional
  • import grpc
  • from grpc._channel import _InactiveRpcError
  • import numpy as np
  • from . import eqc_pb2, eqc_pb2_grpc
  • from .utils import (
  • SysStatus,
  • message_to_dict,
  • )
  • [docs]
  • class InactiveRpcError(Exception):
  • """Custom exception wrapper around grpc._channel._InactiveRpcError."""
  • [docs]
  • class EqcResult(TypedDict):
  • """
  • EQC results object. Will not contain a ground state or spins if err_code not 0.
  • :param err_code: the error code for a given job. Full list of :code:`err_code`
  • values can be found :class:`eqc_direct.utils.JobCodes`
  • :param err_desc: the error description for a given job submission. Full list of
  • :code:`err_desc` values can be found in :class:`eqc_direct.utils.JobCodes`
  • :param runtime: solving time in seconds
  • :param energy: energy for best solution found
  • :param solution: vector of floats representing the lowest energy solution
  • :note:
  • * Eqc1 only support ising formulation where possible solution values are {-1, 1}
  • * all other formulations have length n solution vector of floats \
  • that sum to the device constraint (Eqc2 and Eqc3)
  • """
  • err_code: int
  • err_desc: str
  • runtime: float
  • energy: Optional[float]
  • solution: Optional[List[float]]
  • [docs]
  • class HealthCheckResponse(TypedDict):
  • """
  • Health check response object structure. Unless :code:`debug` is specified
  • at submission only returns pass/fail for each test.
  • :param debug: whether health check was run in debug mode
  • :param err_code: if non-zero indicates an error in health check
  • :param err_desc: describes errors that occurred during health check run
  • :param entropy_pass: pass/fail for entropy test
  • :param stability_pass: pass/fail for stability test
  • :param extinction_ratio_pass: pass/fail for extinction ratio tests
  • :param small_problem_pass: pass/fail for small problem ground state test
  • :param entropy_data: test data for entropy tests only returned if debug=True
  • :param stability_data: test data for stability tests only returned if debug=True
  • :param extinction_ratio_data: test data for extinction ratio tests only
  • returned if debug=True
  • :param small_problem_result: Eqc results object for small problem test if debug=True
  • """
  • debug: bool
  • err_code: int
  • err_desc: str
  • entropy_pass: Optional[bool]
  • stability_pass: Optional[bool]
  • extinction_ratio_pass: Optional[bool]
  • small_problem_pass: Optional[bool]
  • entropy_data: Optional[List[float]]
  • stability_data: Optional[List[float]]
  • extinction_ratio_data: Optional[List[float]]
  • small_problem_result: Optional[EqcResult]
  • [docs]
  • class EqcClient:
  • """
  • Provides calls to process jobs using EQC RPC server
  • :param ip_addr: The IP address of the RPC server
  • :param port: The port that the RPC server is running on
  • :param max_data_size: the max send and recieve message length for RPC server
  • .. note::
  • :code:`lock_id` is used by a variety of class functions.
  • It is set to an empty string by default since default for device server
  • :code:`lock_id` is also an empty string. This allows for single user
  • processing without having to acquire a device lock.
  • .. All GRPC calls follow a specific pattern:
  • .. 1. Fill in data to be sent in message stub
  • .. 2. Send data using stub service method
  • .. 3. Parse response
  • """
  • def __init__(
  • self,
  • ip_addr: str = os.getenv("DEVICE_IP_ADDRESS", "localhost"),
  • port: str = os.getenv("DEVICE_PORT", "50051"),
  • max_data_size: int = 512 * 1024 * 1024,
  • ):
  • self._ip_addr = ip_addr
  • self._max_data_size = max_data_size
  • self._ip_add_port = ip_addr + ":" + port
  • self._channel_opt = [
  • ("grpc.max_send_message_length", max_data_size),
  • ("grpc.max_receive_message_length", max_data_size),
  • ]
  • self.channel = grpc.insecure_channel(
  • self._ip_add_port,
  • options=self._channel_opt,
  • )
  • self.eqc_stub = eqc_pb2_grpc.EqcServiceStub(self.channel)
  • [docs]
  • def submit_job(
  • self,
  • problem_data: np.ndarray,
  • lock_id: str = "",
  • sum_constraint: float = 1,
  • relaxation_schedule: int=2,
  • continuous_soln: bool=True,
  • ) -> dict:
  • """
  • Submits data to be processed by EQC device
  • :param problem_data: an array of problem data to be optimized
  • :param lock_id: a UUID to allow for multi-user processing
  • :param sum_constraint: a normalization constraint that is applied to the
  • problem space that is used to calculate :code:`ground_state` energy.
  • Value must be greater than or equal to 1.
  • :param relaxation_schedule: four different schedules represented
  • in integer parameter. Higher values reduce the variation in
  • the analog spin values and therefore, lead to better ground state
  • for input problem. Accepts range of values in set [1,4].
  • :param continuous_soln: whether solutions should be returned as integer
  • or continuous values. In order to obtain integer solutions a
  • distillation method is applied to the continuous solutions to map
  • them to integer values.
  • :return: a member of :class:`eqc_direct.utils.JobCodes` as a dict
  • with the following keys:
  • - **err_code**: `int`- job submission error code
  • - **err_desc**: `str`- error code description for submission
  • """
  • if problem_data.dtype==np.float64:
  • warn_dtype_msg = "Max precision for EQC device is float32 input type was float64. Input matrix will be rounded"
  • logging.warning(warn_dtype_msg)
  • warnings.warn(warn_dtype_msg, Warning)
  • # flatten columnwise rather for matrix
  • prob_data = problem_data.flatten(order="F")
  • # dimension may need to change when we introduce multibody problems
  • try:
  • dimx, _ = problem_data.shape
  • except ValueError as err:
  • err_msg = "Input data must be two dimensions"
  • logging.error(err_msg, exc_info=True)
  • raise ValueError(err_msg) from err
  • job_input = eqc_pb2.JobInput(
  • nvars=dimx,
  • sum_constraint=sum_constraint,
  • relaxation_schedule = relaxation_schedule,
  • prob_data=prob_data,
  • continuous_soln = continuous_soln,
  • lock_id=lock_id,
  • )
  • try:
  • job_results = self.eqc_stub.SubmitJob(job_input)
  • except _InactiveRpcError as exc:
  • # Make error easier to read/detect by consumers.
  • raise InactiveRpcError(
  • "EQC submit_job failed due to grpc._channel._InactiveRpcError."
  • ) from exc
  • return message_to_dict(job_results)
  • [docs]
  • def fetch_result(self, lock_id: str = "") -> EqcResult:
  • """
  • Request last EQC job results. Returns results from the most recent
  • run on the device.
  • :param lock_id: a valid :code:`lock_id` that matches current device
  • :code:`lock_id`
  • :return: an :class:`.EqcResult` object
  • """
  • fetch_input = eqc_pb2.LockMessage(lock_id=lock_id)
  • try:
  • eqc_results = self.eqc_stub.FetchResults(fetch_input)
  • except _InactiveRpcError as exc:
  • # Make error easier to read/detect by consumers.
  • raise InactiveRpcError(
  • "EQC fetch_results failed due to grpc._channel._InactiveRpcError."
  • ) from exc
  • result = message_to_dict(eqc_results)
  • # need to interpret result with correct precision
  • # grpc serialization deserialization causes a change in the values
  • result["solution"] = [np.float32(val) for val in result["solution"]]
  • result["energy"] = np.float32(result["energy"])
  • return result
  • [docs]
  • def system_status(self) -> dict:
  • """
  • Client call to obtain EQC system status
  • :returns: a member of :class:`eqc_direct.utils.SysStatus` as a dict:
  • - **status_code**: `int`- current system status code
  • - **status_desc**: `str`- description of current system status
  • """
  • try:
  • sys_resp = self.eqc_stub.SystemStatus(eqc_pb2.Empty())
  • except _InactiveRpcError as exc:
  • raise InactiveRpcError(
  • "EQC system_status failed due to grpc._channel._InactiveRpcError."
  • ) from exc
  • return message_to_dict(sys_resp)
  • [docs]
  • def acquire_lock(self) -> dict:
  • """
  • Makes a single attempt to acquire exclusive lock on hardware execution.
  • Locking can be used to ensure orderly processing in multi-user environments.
  • Lock can only be acquired when no other user has acquired the lock or when
  • the system has been idle for 60 seconds while another user has the lock.
  • This idle timeout prevents one user from blocking other users from using
  • the machine even if they are not active.
  • :return:
  • a member of :class:`eqc_direct.utils.LockManageStatus` as a dict along
  • with an additional key :code:`lock_id`:
  • - **lock_id**: `str`- if acquired the current device `lock_id`
  • else empty string
  • - **status_code**: `int`- status code for lock id acquisition
  • - **status_desc**: `str`- a description for the associated status code
  • """
  • try:
  • acquire_lock_resp = self.eqc_stub.AcquireLock(eqc_pb2.Empty())
  • except _InactiveRpcError as exc:
  • raise InactiveRpcError(
  • "EQC acquire_lock failed due to grpc._channel._InactiveRpcError."
  • ) from exc
  • return message_to_dict(acquire_lock_resp)
  • [docs]
  • def release_lock(self, lock_id: str = "") -> dict:
  • """
  • Releases exclusive lock for running health check or submitting job
  • :param lock_id: a UUID with currently acquired exclusive device lock
  • :return: a member of :class:`eqc_direct.utils.LockManageStatus` as a dict:
  • - **status_code**: `int`- status code for lock id acquisition
  • - **status_desc**: `str`- a description for the associated status code
  • """
  • release_input = eqc_pb2.LockMessage(lock_id=lock_id)
  • try:
  • release_lock_resp = self.eqc_stub.ReleaseLock(release_input)
  • except _InactiveRpcError as exc:
  • raise InactiveRpcError(
  • "EQC release_lock failed due to grpc._channel._InactiveRpcError."
  • ) from exc
  • return message_to_dict(release_lock_resp)
  • [docs]
  • def check_lock(self, lock_id: str = "") -> dict:
  • """
  • Checks if submitted :code:`lock_id` has execution lock on the device
  • :param lock_id: a UUID which will be checked to determine if has exclusive
  • device execution lock
  • :return: a member of :class:`eqc_direct.utils.LockCheckStatus` as a dict:
  • - **status_code**: `int`- status code for lock check
  • - **status_desc**: `str`- a description for the associated status code
  • """
  • check_input = eqc_pb2.LockMessage(lock_id=lock_id)
  • check_output = self.eqc_stub.CheckLock(check_input)
  • return message_to_dict(check_output)
  • [docs]
  • def start_health_check(
  • self,
  • lock_id: str = "",
  • entropy: bool = False,
  • stability: bool = False,
  • extinction_ratio: bool = False,
  • small_problem: bool = False,
  • debug: bool = False,
  • ) -> dict:
  • """
  • Runs health checks for an Eqc device must have lock to run.
  • :param lock_id: the execution lock_id as acquired by acquire_lock
  • :param entropy: request run of entropy test on Eqc device (more info)
  • :param stability: request run of stability test on Eqc device (more info)
  • :param extinction_ratio: request test of extinction ratio on Eqc device
  • (more info)
  • :param small_problem: run small problem and test valid result (more info)
  • :param debug: return verbose output from health check
  • :return:
  • one of the members of :class:`eqc_direct.utils.JobCodes`
  • as a dict with the following keys:
  • - **err_code**: `int`- non-zero value indicates error
  • - **err_desc**: `str`- a description for associated error code
  • """
  • health_input = eqc_pb2.HealthInput(
  • entropy=entropy,
  • stability=stability,
  • extinction_ratio=extinction_ratio,
  • small_problem=small_problem,
  • lock_id=lock_id,
  • debug=debug,
  • )
  • health_resp = self.eqc_stub.HealthCheck(health_input)
  • return message_to_dict(health_resp)
  • [docs]
  • def fetch_health_check_result(self, lock_id="") -> HealthCheckResponse:
  • """
  • Fetch health check data from previous run of health check tests
  • :param lock_id: requires a lock_id that was acquired by
  • :return: dict object :class:`.HealthCheckResponse`
  • .. note::
  • This result structure hasn't been finalized.
  • When C++ code is written will know exact format of augmented data.
  • """
  • health_result_input = eqc_pb2.LockMessage(lock_id=lock_id)
  • try:
  • health_result_resp = self.eqc_stub.FetchHealth(health_result_input)
  • except _InactiveRpcError as exc:
  • raise InactiveRpcError(
  • "EQC fetch_health_check_result failed due to "
  • "grpc._channel._InactiveRpcError."
  • ) from exc
  • health_dict = message_to_dict(health_result_resp)
  • if health_dict["debug"]:
  • # could consider recursive unnesting in message to dict
  • health_dict["small_problem_result"] = message_to_dict(
  • health_dict["small_problem_result"]
  • )
  • return health_dict
  • # drop keys from debug view that have blank data
  • drop_keys = [
  • "entropy_data",
  • "stability_data",
  • "extinction_ratio_data",
  • "small_problem_result",
  • ]
  • return {
  • key: value for key, value in health_dict.items() if key not in drop_keys
  • }
  • [docs]
  • def stop_running_process(self, lock_id: str = "") -> dict:
  • """
  • Stops a running process either a health check or a Eqc job.
  • Process locks will release automatically based on a timeout
  • which is maintained in the server code if they are
  • not released using this.
  • :param lock_id: requires a lock_id that was acquired by
  • :return:
  • a member of :class:`eqc_direct.utils.SysStatus`
  • as dict with following keys:
  • - **status_code**: `int`- the system code after stopping
  • - **status_desc**: `str`- the associated system status description
  • """
  • stop_input = eqc_pb2.LockMessage(lock_id=lock_id)
  • try:
  • stop_resp = self.eqc_stub.StopRunning(stop_input)
  • except _InactiveRpcError as exc:
  • raise InactiveRpcError(
  • "EQC fetch_health_check_result failed due to "
  • "grpc._channel._InactiveRpcError."
  • ) from exc
  • return message_to_dict(stop_resp)
  • [docs]
  • def run_health_check(
  • self,
  • lock_id: str = "",
  • entropy: bool = False,
  • stability: bool = False,
  • extinction_ratio: bool = False,
  • small_problem: bool = False,
  • debug: bool = False,
  • ) -> HealthCheckResponse:
  • """
  • Runs health checks for an Eqc device. Requires a validate lock on the device.
  • :param lock_id: the execution lock_id as acquired by acquire_lock
  • :param entropy: request run of entropy test on Eqc device (more info)
  • :param stability: request run of stability test on Eqc device (more info)
  • :param extinction_ratio: request test of extinction ratio on Eqc device
  • (more info)
  • :param small_problem: run small problem and test valid result (more info)
  • :param debug: return verbose output from health check
  • :param lock_id: requires a lock_id that was acquired by
  • :return: dict object :class:`.HealthCheckResponse`
  • .. note::
  • This result structure hasn't been finalized.
  • When C++ code is written will know exact format of augmented data.
  • .. What happens when all health checks turned off just return blank message?
  • """
  • health_start_resp = self.start_health_check(
  • lock_id=lock_id,
  • entropy=entropy,
  • stability=stability,
  • extinction_ratio=extinction_ratio,
  • small_problem=small_problem,
  • debug=debug,
  • )
  • if health_start_resp["err_code"] != 0:
  • err_msg = f"Failed to start health check with response: {health_start_resp}"
  • logging.error(err_msg, exc_info=True)
  • raise RuntimeError(err_msg)
  • sys_code = self.system_status()["sys_code"]
  • while sys_code != SysStatus.IDLE["sys_code"]:
  • sys_code = self.system_status()["sys_code"]
  • # this is based on the error statuses are 3 and above
  • if sys_code >= 3:
  • raise RuntimeError(f"System unavailable sys_code: {sys_code}")
  • # only sleep if not idle
  • if sys_code != SysStatus.IDLE["sys_code"]:
  • time.sleep(1)
  • # pull in results after is idle
  • health_result = self.fetch_health_check_result(lock_id=lock_id)
  • lock_status = self.release_lock(lock_id=lock_id)
  • if not lock_status["lock_released"]:
  • err_msg = f"Failed to release lock with message: {lock_status['message']}"
  • logging.error(err_msg, exc_info=True)
  • raise RuntimeError(err_msg)
  • return health_result
  • [docs]
  • def wait_for_lock(self) -> tuple:
  • """
  • Waits for lock indefinitely calling :func:`acquire_lock`
  • :return: a tuple of the following items:
  • - **lock_id**: `str`- exclusive lock for device execution with a timeout
  • - **start_queue_ts**: `int`- time in ns on which lock was acquired is an int
  • - **end_queue_ts**: `int`- time in ns on which queue for
  • lock ended is an int.
  • """
  • lock_id = ""
  • start_queue_ts = time.time_ns()
  • while lock_id == "":
  • sys_code = self.system_status()["status_code"]
  • # this is based on the error statuses are 3 and above
  • if sys_code >= 3:
  • raise RuntimeError(f"System unavailable status_code: {sys_code}")
  • lock_id = self.acquire_lock()["lock_id"]
  • # only sleep if didn't get lock on device
  • if lock_id == "":
  • time.sleep(1)
  • end_queue_ts = time.time_ns()
  • return lock_id, start_queue_ts, end_queue_ts
  • [docs]
  • def process_job(
  • self,
  • hamiltonian: np.ndarray,
  • sum_constraint: float = 1,
  • relaxation_schedule: int = 4,
  • continuous_soln: bool=True,
  • lock_id: str = "",
  • ) -> dict:
  • """
  • Processes a job by:
  • 1. submitting job
  • 2. checks for status, until completes or fails
  • 3. returns results
  • :param hamiltonian: np.ndarray
  • an (n,n+1) array representing the problem hamiltonian
  • :param sum_constraint: a normalization constraint that is applied to the
  • problem space that is used to calculate :code:`ground_state` energy.
  • Value must be greater than or equal to 1.
  • :param relaxation_schedule: four different schedules represented in
  • integer parameter. Higher values reduce the variation in the
  • analog spin values and therefore, lead to better ground state
  • for input problem. Accepts range of values in set [1,4].
  • :param continuous_soln: whether solutions should be returned as integer or
  • continuous values.
  • :param lock_id: a str with exclusive lock for device execution with a timeout
  • :return: dict of results and timings with the following keys:
  • - results: :class:`.EqcResult` dict
  • - start_job_ts: time in ns marking start of job_submission
  • - end_job_ts: time in ns marking end of job submission complete
  • """
  • start_job = time.time_ns()
  • submit_job_resp = self.submit_job(
  • problem_data=hamiltonian,
  • sum_constraint=sum_constraint,
  • relaxation_schedule = relaxation_schedule,
  • continuous_soln = continuous_soln,
  • lock_id=lock_id,
  • )
  • if submit_job_resp["err_code"] != 0:
  • err_msg = f"Job submission failed with response: {submit_job_resp}"
  • logging.error(err_msg, exc_info=True)
  • raise RuntimeError(err_msg)
  • sys_code = self.system_status()["status_code"]
  • while sys_code != SysStatus.IDLE["status_code"]:
  • sys_code = self.system_status()["status_code"]
  • # this is based on the error statuses are 3 and above
  • if sys_code >= 3:
  • err_msg = f"System unavailable status_code: {sys_code}"
  • logging.error(err_msg, exc_info=True)
  • raise RuntimeError(err_msg)
  • # only sleep if not idle
  • if sys_code != SysStatus.IDLE["status_code"]:
  • time.sleep(1)
  • end_job = time.time_ns()
  • # pull in results after is idle
  • job_result = self.fetch_result(lock_id=lock_id)
  • if job_result["err_code"] != 0:
  • raise RuntimeError(
  • f"Job execution error\n"
  • f"err_code: {job_result['err_code']}\n"
  • f"err_desc: {job_result['err_desc']}"
  • )
  • job_result["start_job_ts"] = start_job
  • job_result["end_job_ts"] = end_job
  • return job_result