Source code for eqc_direct.eqc_client
- """
- :class:`.EqcClient` contains all RPC calls to process, get system status,
- and fetch results.
- """
- import logging
- import time
- import os
- import warnings
- from typing import TypedDict, List, Optional
- import grpc
- from grpc._channel import _InactiveRpcError
- import numpy as np
- from . import eqc_pb2, eqc_pb2_grpc
- from .utils import (
- SysStatus,
- message_to_dict,
- )
- [docs]
- class InactiveRpcError(Exception):
- """Custom exception wrapper around grpc._channel._InactiveRpcError."""
- [docs]
- class EqcResult(TypedDict):
- """
- EQC results object. Will not contain a ground state or spins if err_code not 0.
- :param err_code: the error code for a given job. Full list of :code:`err_code`
- values can be found :class:`eqc_direct.utils.JobCodes`
- :param err_desc: the error description for a given job submission. Full list of
- :code:`err_desc` values can be found in :class:`eqc_direct.utils.JobCodes`
- :param runtime: solving time in seconds
- :param energy: energy for best solution found
- :param solution: vector of floats representing the lowest energy solution
- :note:
- * Eqc1 only support ising formulation where possible solution values are {-1, 1}
- * all other formulations have length n solution vector of floats \
- that sum to the device constraint (Eqc2 and Eqc3)
- """
- err_code: int
- err_desc: str
- runtime: float
- energy: Optional[float]
- solution: Optional[List[float]]
- [docs]
- class HealthCheckResponse(TypedDict):
- """
- Health check response object structure. Unless :code:`debug` is specified
- at submission only returns pass/fail for each test.
- :param debug: whether health check was run in debug mode
- :param err_code: if non-zero indicates an error in health check
- :param err_desc: describes errors that occurred during health check run
- :param entropy_pass: pass/fail for entropy test
- :param stability_pass: pass/fail for stability test
- :param extinction_ratio_pass: pass/fail for extinction ratio tests
- :param small_problem_pass: pass/fail for small problem ground state test
- :param entropy_data: test data for entropy tests only returned if debug=True
- :param stability_data: test data for stability tests only returned if debug=True
- :param extinction_ratio_data: test data for extinction ratio tests only
- returned if debug=True
- :param small_problem_result: Eqc results object for small problem test if debug=True
- """
- debug: bool
- err_code: int
- err_desc: str
- entropy_pass: Optional[bool]
- stability_pass: Optional[bool]
- extinction_ratio_pass: Optional[bool]
- small_problem_pass: Optional[bool]
- entropy_data: Optional[List[float]]
- stability_data: Optional[List[float]]
- extinction_ratio_data: Optional[List[float]]
- small_problem_result: Optional[EqcResult]
- [docs]
- class EqcClient:
- """
- Provides calls to process jobs using EQC RPC server
- :param ip_addr: The IP address of the RPC server
- :param port: The port that the RPC server is running on
- :param max_data_size: the max send and recieve message length for RPC server
- .. note::
- :code:`lock_id` is used by a variety of class functions.
- It is set to an empty string by default since default for device server
- :code:`lock_id` is also an empty string. This allows for single user
- processing without having to acquire a device lock.
- .. All GRPC calls follow a specific pattern:
- .. 1. Fill in data to be sent in message stub
- .. 2. Send data using stub service method
- .. 3. Parse response
- """
- def __init__(
- self,
- ip_addr: str = os.getenv("DEVICE_IP_ADDRESS", "localhost"),
- port: str = os.getenv("DEVICE_PORT", "50051"),
- max_data_size: int = 512 * 1024 * 1024,
- ):
- self._ip_addr = ip_addr
- self._max_data_size = max_data_size
- self._ip_add_port = ip_addr + ":" + port
- self._channel_opt = [
- ("grpc.max_send_message_length", max_data_size),
- ("grpc.max_receive_message_length", max_data_size),
- ]
- self.channel = grpc.insecure_channel(
- self._ip_add_port,
- options=self._channel_opt,
- )
- self.eqc_stub = eqc_pb2_grpc.EqcServiceStub(self.channel)
- [docs]
- def submit_job(
- self,
- problem_data: np.ndarray,
- lock_id: str = "",
- sum_constraint: float = 1,
- relaxation_schedule: int=2,
- continuous_soln: bool=True,
- ) -> dict:
- """
- Submits data to be processed by EQC device
- :param problem_data: an array of problem data to be optimized
- :param lock_id: a UUID to allow for multi-user processing
- :param sum_constraint: a normalization constraint that is applied to the
- problem space that is used to calculate :code:`ground_state` energy.
- Value must be greater than or equal to 1.
- :param relaxation_schedule: four different schedules represented
- in integer parameter. Higher values reduce the variation in
- the analog spin values and therefore, lead to better ground state
- for input problem. Accepts range of values in set [1,4].
- :param continuous_soln: whether solutions should be returned as integer
- or continuous values. In order to obtain integer solutions a
- distillation method is applied to the continuous solutions to map
- them to integer values.
- :return: a member of :class:`eqc_direct.utils.JobCodes` as a dict
- with the following keys:
- - **err_code**: `int`- job submission error code
- - **err_desc**: `str`- error code description for submission
- """
- if problem_data.dtype==np.float64:
- warn_dtype_msg = "Max precision for EQC device is float32 input type was float64. Input matrix will be rounded"
- logging.warning(warn_dtype_msg)
- warnings.warn(warn_dtype_msg, Warning)
-
- prob_data = problem_data.flatten(order="F")
-
- try:
- dimx, _ = problem_data.shape
- except ValueError as err:
- err_msg = "Input data must be two dimensions"
- logging.error(err_msg, exc_info=True)
- raise ValueError(err_msg) from err
- job_input = eqc_pb2.JobInput(
- nvars=dimx,
- sum_constraint=sum_constraint,
- relaxation_schedule = relaxation_schedule,
- prob_data=prob_data,
- continuous_soln = continuous_soln,
- lock_id=lock_id,
- )
- try:
- job_results = self.eqc_stub.SubmitJob(job_input)
- except _InactiveRpcError as exc:
-
- raise InactiveRpcError(
- "EQC submit_job failed due to grpc._channel._InactiveRpcError."
- ) from exc
- return message_to_dict(job_results)
- [docs]
- def fetch_result(self, lock_id: str = "") -> EqcResult:
- """
- Request last EQC job results. Returns results from the most recent
- run on the device.
- :param lock_id: a valid :code:`lock_id` that matches current device
- :code:`lock_id`
- :return: an :class:`.EqcResult` object
- """
- fetch_input = eqc_pb2.LockMessage(lock_id=lock_id)
- try:
- eqc_results = self.eqc_stub.FetchResults(fetch_input)
- except _InactiveRpcError as exc:
-
- raise InactiveRpcError(
- "EQC fetch_results failed due to grpc._channel._InactiveRpcError."
- ) from exc
- result = message_to_dict(eqc_results)
-
-
- result["solution"] = [np.float32(val) for val in result["solution"]]
- result["energy"] = np.float32(result["energy"])
- return result
- [docs]
- def system_status(self) -> dict:
- """
- Client call to obtain EQC system status
- :returns: a member of :class:`eqc_direct.utils.SysStatus` as a dict:
- - **status_code**: `int`- current system status code
- - **status_desc**: `str`- description of current system status
- """
- try:
- sys_resp = self.eqc_stub.SystemStatus(eqc_pb2.Empty())
- except _InactiveRpcError as exc:
- raise InactiveRpcError(
- "EQC system_status failed due to grpc._channel._InactiveRpcError."
- ) from exc
- return message_to_dict(sys_resp)
- [docs]
- def acquire_lock(self) -> dict:
- """
- Makes a single attempt to acquire exclusive lock on hardware execution.
- Locking can be used to ensure orderly processing in multi-user environments.
- Lock can only be acquired when no other user has acquired the lock or when
- the system has been idle for 60 seconds while another user has the lock.
- This idle timeout prevents one user from blocking other users from using
- the machine even if they are not active.
- :return:
- a member of :class:`eqc_direct.utils.LockManageStatus` as a dict along
- with an additional key :code:`lock_id`:
- - **lock_id**: `str`- if acquired the current device `lock_id`
- else empty string
- - **status_code**: `int`- status code for lock id acquisition
- - **status_desc**: `str`- a description for the associated status code
- """
- try:
- acquire_lock_resp = self.eqc_stub.AcquireLock(eqc_pb2.Empty())
- except _InactiveRpcError as exc:
- raise InactiveRpcError(
- "EQC acquire_lock failed due to grpc._channel._InactiveRpcError."
- ) from exc
- return message_to_dict(acquire_lock_resp)
- [docs]
- def release_lock(self, lock_id: str = "") -> dict:
- """
- Releases exclusive lock for running health check or submitting job
- :param lock_id: a UUID with currently acquired exclusive device lock
- :return: a member of :class:`eqc_direct.utils.LockManageStatus` as a dict:
- - **status_code**: `int`- status code for lock id acquisition
- - **status_desc**: `str`- a description for the associated status code
- """
- release_input = eqc_pb2.LockMessage(lock_id=lock_id)
- try:
- release_lock_resp = self.eqc_stub.ReleaseLock(release_input)
- except _InactiveRpcError as exc:
- raise InactiveRpcError(
- "EQC release_lock failed due to grpc._channel._InactiveRpcError."
- ) from exc
- return message_to_dict(release_lock_resp)
- [docs]
- def check_lock(self, lock_id: str = "") -> dict:
- """
- Checks if submitted :code:`lock_id` has execution lock on the device
- :param lock_id: a UUID which will be checked to determine if has exclusive
- device execution lock
- :return: a member of :class:`eqc_direct.utils.LockCheckStatus` as a dict:
- - **status_code**: `int`- status code for lock check
- - **status_desc**: `str`- a description for the associated status code
- """
- check_input = eqc_pb2.LockMessage(lock_id=lock_id)
- check_output = self.eqc_stub.CheckLock(check_input)
- return message_to_dict(check_output)
- [docs]
- def start_health_check(
- self,
- lock_id: str = "",
- entropy: bool = False,
- stability: bool = False,
- extinction_ratio: bool = False,
- small_problem: bool = False,
- debug: bool = False,
- ) -> dict:
- """
- Runs health checks for an Eqc device must have lock to run.
- :param lock_id: the execution lock_id as acquired by acquire_lock
- :param entropy: request run of entropy test on Eqc device (more info)
- :param stability: request run of stability test on Eqc device (more info)
- :param extinction_ratio: request test of extinction ratio on Eqc device
- (more info)
- :param small_problem: run small problem and test valid result (more info)
- :param debug: return verbose output from health check
- :return:
- one of the members of :class:`eqc_direct.utils.JobCodes`
- as a dict with the following keys:
- - **err_code**: `int`- non-zero value indicates error
- - **err_desc**: `str`- a description for associated error code
- """
- health_input = eqc_pb2.HealthInput(
- entropy=entropy,
- stability=stability,
- extinction_ratio=extinction_ratio,
- small_problem=small_problem,
- lock_id=lock_id,
- debug=debug,
- )
- health_resp = self.eqc_stub.HealthCheck(health_input)
- return message_to_dict(health_resp)
- [docs]
- def fetch_health_check_result(self, lock_id="") -> HealthCheckResponse:
- """
- Fetch health check data from previous run of health check tests
- :param lock_id: requires a lock_id that was acquired by
- :return: dict object :class:`.HealthCheckResponse`
- .. note::
- This result structure hasn't been finalized.
- When C++ code is written will know exact format of augmented data.
- """
- health_result_input = eqc_pb2.LockMessage(lock_id=lock_id)
- try:
- health_result_resp = self.eqc_stub.FetchHealth(health_result_input)
- except _InactiveRpcError as exc:
- raise InactiveRpcError(
- "EQC fetch_health_check_result failed due to "
- "grpc._channel._InactiveRpcError."
- ) from exc
- health_dict = message_to_dict(health_result_resp)
- if health_dict["debug"]:
-
- health_dict["small_problem_result"] = message_to_dict(
- health_dict["small_problem_result"]
- )
- return health_dict
-
- drop_keys = [
- "entropy_data",
- "stability_data",
- "extinction_ratio_data",
- "small_problem_result",
- ]
- return {
- key: value for key, value in health_dict.items() if key not in drop_keys
- }
- [docs]
- def stop_running_process(self, lock_id: str = "") -> dict:
- """
- Stops a running process either a health check or a Eqc job.
- Process locks will release automatically based on a timeout
- which is maintained in the server code if they are
- not released using this.
- :param lock_id: requires a lock_id that was acquired by
- :return:
- a member of :class:`eqc_direct.utils.SysStatus`
- as dict with following keys:
- - **status_code**: `int`- the system code after stopping
- - **status_desc**: `str`- the associated system status description
- """
- stop_input = eqc_pb2.LockMessage(lock_id=lock_id)
- try:
- stop_resp = self.eqc_stub.StopRunning(stop_input)
- except _InactiveRpcError as exc:
- raise InactiveRpcError(
- "EQC fetch_health_check_result failed due to "
- "grpc._channel._InactiveRpcError."
- ) from exc
- return message_to_dict(stop_resp)
- [docs]
- def run_health_check(
- self,
- lock_id: str = "",
- entropy: bool = False,
- stability: bool = False,
- extinction_ratio: bool = False,
- small_problem: bool = False,
- debug: bool = False,
- ) -> HealthCheckResponse:
- """
- Runs health checks for an Eqc device. Requires a validate lock on the device.
- :param lock_id: the execution lock_id as acquired by acquire_lock
- :param entropy: request run of entropy test on Eqc device (more info)
- :param stability: request run of stability test on Eqc device (more info)
- :param extinction_ratio: request test of extinction ratio on Eqc device
- (more info)
- :param small_problem: run small problem and test valid result (more info)
- :param debug: return verbose output from health check
- :param lock_id: requires a lock_id that was acquired by
- :return: dict object :class:`.HealthCheckResponse`
- .. note::
- This result structure hasn't been finalized.
- When C++ code is written will know exact format of augmented data.
- .. What happens when all health checks turned off just return blank message?
- """
- health_start_resp = self.start_health_check(
- lock_id=lock_id,
- entropy=entropy,
- stability=stability,
- extinction_ratio=extinction_ratio,
- small_problem=small_problem,
- debug=debug,
- )
- if health_start_resp["err_code"] != 0:
- err_msg = f"Failed to start health check with response: {health_start_resp}"
- logging.error(err_msg, exc_info=True)
- raise RuntimeError(err_msg)
- sys_code = self.system_status()["sys_code"]
- while sys_code != SysStatus.IDLE["sys_code"]:
- sys_code = self.system_status()["sys_code"]
-
- if sys_code >= 3:
- raise RuntimeError(f"System unavailable sys_code: {sys_code}")
-
- if sys_code != SysStatus.IDLE["sys_code"]:
- time.sleep(1)
-
- health_result = self.fetch_health_check_result(lock_id=lock_id)
- lock_status = self.release_lock(lock_id=lock_id)
- if not lock_status["lock_released"]:
- err_msg = f"Failed to release lock with message: {lock_status['message']}"
- logging.error(err_msg, exc_info=True)
- raise RuntimeError(err_msg)
- return health_result
- [docs]
- def wait_for_lock(self) -> tuple:
- """
- Waits for lock indefinitely calling :func:`acquire_lock`
- :return: a tuple of the following items:
- - **lock_id**: `str`- exclusive lock for device execution with a timeout
- - **start_queue_ts**: `int`- time in ns on which lock was acquired is an int
- - **end_queue_ts**: `int`- time in ns on which queue for
- lock ended is an int.
- """
- lock_id = ""
- start_queue_ts = time.time_ns()
- while lock_id == "":
- sys_code = self.system_status()["status_code"]
-
- if sys_code >= 3:
- raise RuntimeError(f"System unavailable status_code: {sys_code}")
- lock_id = self.acquire_lock()["lock_id"]
-
- if lock_id == "":
- time.sleep(1)
- end_queue_ts = time.time_ns()
- return lock_id, start_queue_ts, end_queue_ts
- [docs]
- def process_job(
- self,
- hamiltonian: np.ndarray,
- sum_constraint: float = 1,
- relaxation_schedule: int = 4,
- continuous_soln: bool=True,
- lock_id: str = "",
- ) -> dict:
- """
- Processes a job by:
- 1. submitting job
- 2. checks for status, until completes or fails
- 3. returns results
- :param hamiltonian: np.ndarray
- an (n,n+1) array representing the problem hamiltonian
- :param sum_constraint: a normalization constraint that is applied to the
- problem space that is used to calculate :code:`ground_state` energy.
- Value must be greater than or equal to 1.
- :param relaxation_schedule: four different schedules represented in
- integer parameter. Higher values reduce the variation in the
- analog spin values and therefore, lead to better ground state
- for input problem. Accepts range of values in set [1,4].
- :param continuous_soln: whether solutions should be returned as integer or
- continuous values.
- :param lock_id: a str with exclusive lock for device execution with a timeout
- :return: dict of results and timings with the following keys:
- - results: :class:`.EqcResult` dict
- - start_job_ts: time in ns marking start of job_submission
- - end_job_ts: time in ns marking end of job submission complete
- """
- start_job = time.time_ns()
- submit_job_resp = self.submit_job(
- problem_data=hamiltonian,
- sum_constraint=sum_constraint,
- relaxation_schedule = relaxation_schedule,
- continuous_soln = continuous_soln,
- lock_id=lock_id,
- )
- if submit_job_resp["err_code"] != 0:
- err_msg = f"Job submission failed with response: {submit_job_resp}"
- logging.error(err_msg, exc_info=True)
- raise RuntimeError(err_msg)
- sys_code = self.system_status()["status_code"]
- while sys_code != SysStatus.IDLE["status_code"]:
- sys_code = self.system_status()["status_code"]
-
- if sys_code >= 3:
- err_msg = f"System unavailable status_code: {sys_code}"
- logging.error(err_msg, exc_info=True)
- raise RuntimeError(err_msg)
-
- if sys_code != SysStatus.IDLE["status_code"]:
- time.sleep(1)
- end_job = time.time_ns()
-
- job_result = self.fetch_result(lock_id=lock_id)
- if job_result["err_code"] != 0:
- raise RuntimeError(
- f"Job execution error\n"
- f"err_code: {job_result['err_code']}\n"
- f"err_desc: {job_result['err_desc']}"
- )
- job_result["start_job_ts"] = start_job
- job_result["end_job_ts"] = end_job
- return job_result