Source code for eqc_models.ml.classifierqsvm

  • # (C) Quantum Computing Inc., 2024.
  • # Import libs
  • import os
  • import sys
  • import time
  • import datetime
  • import json
  • import warnings
  • from functools import wraps
  • import numpy as np
  • from eqc_models.ml.classifierbase import ClassifierBase
  • class QSVMClassifier(ClassifierBase):
  • """An implementation of QSVM classifier that uses QCi's Dirac-3.
  • Parameters
  • ----------
  • relaxation_schedule: Relaxation schedule used by Dirac-3; default:
  • 2.
  • num_samples: Number of samples used by Dirac-3; default: 1.
  • upper_limit: Coefficient upper limit; a regularization parameter;
  • default: 1.0.
  • gamma: Gaussian kernel parameter; default: 1.0.
  • eta: A penalty multiplier; default: 1.0.
  • zeta: A penalty multiplier; default: 1.0.
  • Examples
  • -----------
  • >>> from sklearn import datasets
  • >>> from sklearn.preprocessing import MinMaxScaler
  • >>> from sklearn.model_selection import train_test_split
  • >>> iris = datasets.load_iris()
  • >>> X = iris.data
  • >>> y = iris.target
  • >>> scaler = MinMaxScaler()
  • >>> X = scaler.fit_transform(X)
  • >>> for i in range(len(y)):
  • ... if y[i] == 0:
  • ... y[i] = -1
  • ... elif y[i] == 2:
  • ... y[i] = 1
  • >>> X_train, X_test, y_train, y_test = train_test_split(
  • ... X,
  • ... y,
  • ... test_size=0.2,
  • ... random_state=42,
  • ... )
  • >>> from eqc_models.ml.classifierqsvm import QSVMClassifier
  • >>> obj = QSVMClassifier(
  • ... relaxation_schedule=2,
  • ... num_samples=1,
  • ... upper_limit=1.0,
  • ... gamma=1.0,
  • ... eta=1.0,
  • ... zeta=1.0,
  • ... )
  • >>> from contextlib import redirect_stdout
  • >>> import io
  • >>> f = io.StringIO()
  • >>> with redirect_stdout(f):
  • ... obj = obj.fit(X_train, y_train)
  • ... y_train_prd = obj.predict(X_train)
  • ... y_test_prd = obj.predict(X_test)
  • """
  • def __init__(
  • self,
  • relaxation_schedule=2,
  • num_samples=1,
  • upper_limit=1.0,
  • gamma=1.0,
  • eta=1.0,
  • zeta=1.0,
  • ):
  • super(QSVMClassifier).__init__()
  • self.relaxation_schedule = relaxation_schedule
  • self.num_samples = num_samples
  • self.upper_limit = upper_limit
  • self.gamma = gamma
  • self.eta = eta
  • self.zeta = zeta
  • def kernel(self, vec1, vec2):
  • return np.exp(-self.gamma * np.linalg.norm(vec1 - vec2) ** 2)
  • def fit(self, X, y):
  • """
  • Build a QSVM classifier from the training set (X, y).
  • Parameters
  • ----------
  • X : {array-like, sparse matrix} of shape (n_samples, n_features)
  • The training input samples.
  • y : array-like of shape (n_samples,)
  • The target values.
  • Returns
  • -------
  • Response of Dirac-3 in JSON format.
  • """
  • assert X.shape[0] == y.shape[0], "Inconsistent sizes!"
  • assert set(y) == {-1, 1}, "Target values should be in {-1, 1}"
  • J, C, sum_constraint = self.get_hamiltonian(X, y)
  • assert J.shape[0] == J.shape[1], "Inconsistent hamiltonian size!"
  • assert J.shape[0] == C.shape[0], "Inconsistent hamiltonian size!"
  • self.set_model(J, C, sum_constraint)
  • sol, response = self.solve()
  • assert len(sol) == C.shape[0], "Inconsistent solution size!"
  • self.params = self.convert_sol_to_params(sol)
  • self.X_train = X
  • self.y_train = y
  • n_records = X.shape[0]
  • self.kernel_mat_train = np.zeros(
  • shape=(n_records, n_records), dtype=np.float32
  • )
  • for m in range(n_records):
  • for n in range(n_records):
  • self.kernel_mat_train[m][n] = self.kernel(X[m], X[n])
  • return response
  • def predict(self, X: np.array):
  • """
  • Predict classes for X.
  • Parameters
  • ----------
  • X : {array-like, sparse matrix} of shape (n_samples, n_features)
  • Returns
  • -------
  • y : ndarray of shape (n_samples,)
  • The predicted classes.
  • """
  • assert self.X_train is not None, "Model not trained yet!"
  • assert self.y_train is not None, "Model not trained yet!"
  • assert (
  • X.shape[1] == self.X_train.shape[1]
  • ), "Inconsistent dimensions!"
  • n_records = X.shape[0]
  • n_records_train = self.X_train.shape[0]
  • kernel_mat = np.zeros(
  • shape=(n_records, n_records_train), dtype=np.float32
  • )
  • for m in range(n_records):
  • for n in range(n_records_train):
  • kernel_mat[m][n] = self.kernel(X[m], self.X_train[n])
  • intercept = 0
  • tmp_vec1 = np.tensordot(
  • self.params * self.y_train, self.kernel_mat_train, axes=(0, 0)
  • )
  • assert tmp_vec1.shape[0] == n_records_train, "Inconsistent size!"
  • tmp1 = np.sum(
  • self.params
  • * (self.upper_limit - self.params)
  • * (self.y_train - tmp_vec1)
  • )
  • tmp2 = np.sum(self.params * (self.upper_limit - self.params))
  • assert tmp2 != 0, "Something went wrong!"
  • intercept = tmp1 / tmp2
  • y = np.zeros(shape=(n_records), dtype=np.float32)
  • y += np.tensordot(
  • self.params * self.y_train, kernel_mat, axes=(0, 1)
  • )
  • y += intercept
  • y = np.sign(y)
  • return y
  • def get_hamiltonian(
  • self,
  • X: np.array,
  • y: np.array,
  • ):
  • n_records = X.shape[0]
  • n_dims = X.shape[1]
  • J = np.zeros(
  • shape=(2 * n_records, 2 * n_records), dtype=np.float32
  • )
  • C = np.zeros(shape=(2 * n_records,), dtype=np.float32)
  • for n in range(n_records):
  • for m in range(n_records):
  • J[n][m] = (
  • 0.5 * y[n] * y[m] * self.kernel(X[n], X[m])
  • + self.zeta * y[n] * y[m]
  • )
  • J[n][n] += self.eta
  • J[n][n + n_records] = self.eta
  • J[n + n_records][n] = self.eta
  • J[n + n_records][n + n_records] = self.eta
  • C[n] = -1.0 - 2.0 * self.eta * self.upper_limit
  • C[n + n_records] = -2.0 * self.eta * self.upper_limit
  • C = C.reshape((2 * n_records, 1))
  • J = 0.5 * (J + J.transpose())
  • return J, C, n_records * self.upper_limit
  • def convert_sol_to_params(self, sol):
  • assert len(sol) % 2 == 0, "Expected an even solution size!"
  • sol = sol[: int(len(sol) / 2)]
  • return np.array(sol)