- import os
- import sys
- import time
- import datetime
- import json
- import warnings
- from functools import wraps
- import numpy as np
- from eqc_models.ml.classifierbase import ClassifierBase
- class QSVMClassifier(ClassifierBase):
- """An implementation of QSVM classifier that uses QCi's Dirac-3.
-
- Parameters
- ----------
-
- relaxation_schedule: Relaxation schedule used by Dirac-3; default:
- 2.
-
- num_samples: Number of samples used by Dirac-3; default: 1.
-
- upper_limit: Coefficient upper limit; a regularization parameter;
- default: 1.0.
-
- gamma: Gaussian kernel parameter; default: 1.0.
-
- eta: A penalty multiplier; default: 1.0.
-
- zeta: A penalty multiplier; default: 1.0.
- Examples
- -----------
- >>> from sklearn import datasets
- >>> from sklearn.preprocessing import MinMaxScaler
- >>> from sklearn.model_selection import train_test_split
- >>> iris = datasets.load_iris()
- >>> X = iris.data
- >>> y = iris.target
- >>> scaler = MinMaxScaler()
- >>> X = scaler.fit_transform(X)
- >>> for i in range(len(y)):
- ... if y[i] == 0:
- ... y[i] = -1
- ... elif y[i] == 2:
- ... y[i] = 1
- >>> X_train, X_test, y_train, y_test = train_test_split(
- ... X,
- ... y,
- ... test_size=0.2,
- ... random_state=42,
- ... )
- >>> from eqc_models.ml.classifierqsvm import QSVMClassifier
- >>> obj = QSVMClassifier(
- ... relaxation_schedule=2,
- ... num_samples=1,
- ... upper_limit=1.0,
- ... gamma=1.0,
- ... eta=1.0,
- ... zeta=1.0,
- ... )
- >>> from contextlib import redirect_stdout
- >>> import io
- >>> f = io.StringIO()
- >>> with redirect_stdout(f):
- ... obj = obj.fit(X_train, y_train)
- ... y_train_prd = obj.predict(X_train)
- ... y_test_prd = obj.predict(X_test)
-
- """
-
- def __init__(
- self,
- relaxation_schedule=2,
- num_samples=1,
- upper_limit=1.0,
- gamma=1.0,
- eta=1.0,
- zeta=1.0,
- ):
- super(QSVMClassifier).__init__()
- self.relaxation_schedule = relaxation_schedule
- self.num_samples = num_samples
- self.upper_limit = upper_limit
- self.gamma = gamma
- self.eta = eta
- self.zeta = zeta
- def kernel(self, vec1, vec2):
- return np.exp(-self.gamma * np.linalg.norm(vec1 - vec2) ** 2)
- def fit(self, X, y):
- """
- Build a QSVM classifier from the training set (X, y).
-
- Parameters
- ----------
- X : {array-like, sparse matrix} of shape (n_samples, n_features)
- The training input samples.
-
- y : array-like of shape (n_samples,)
- The target values.
-
- Returns
- -------
- Response of Dirac-3 in JSON format.
- """
-
- assert X.shape[0] == y.shape[0], "Inconsistent sizes!"
- assert set(y) == {-1, 1}, "Target values should be in {-1, 1}"
- J, C, sum_constraint = self.get_hamiltonian(X, y)
- assert J.shape[0] == J.shape[1], "Inconsistent hamiltonian size!"
- assert J.shape[0] == C.shape[0], "Inconsistent hamiltonian size!"
- self.set_model(J, C, sum_constraint)
- sol, response = self.solve()
- assert len(sol) == C.shape[0], "Inconsistent solution size!"
- self.params = self.convert_sol_to_params(sol)
- self.X_train = X
- self.y_train = y
- n_records = X.shape[0]
- self.kernel_mat_train = np.zeros(
- shape=(n_records, n_records), dtype=np.float32
- )
- for m in range(n_records):
- for n in range(n_records):
- self.kernel_mat_train[m][n] = self.kernel(X[m], X[n])
- return response
- def predict(self, X: np.array):
- """
- Predict classes for X.
-
- Parameters
- ----------
- X : {array-like, sparse matrix} of shape (n_samples, n_features)
-
- Returns
- -------
- y : ndarray of shape (n_samples,)
- The predicted classes.
- """
-
- assert self.X_train is not None, "Model not trained yet!"
- assert self.y_train is not None, "Model not trained yet!"
- assert (
- X.shape[1] == self.X_train.shape[1]
- ), "Inconsistent dimensions!"
- n_records = X.shape[0]
- n_records_train = self.X_train.shape[0]
- kernel_mat = np.zeros(
- shape=(n_records, n_records_train), dtype=np.float32
- )
- for m in range(n_records):
- for n in range(n_records_train):
- kernel_mat[m][n] = self.kernel(X[m], self.X_train[n])
- intercept = 0
- tmp_vec1 = np.tensordot(
- self.params * self.y_train, self.kernel_mat_train, axes=(0, 0)
- )
- assert tmp_vec1.shape[0] == n_records_train, "Inconsistent size!"
- tmp1 = np.sum(
- self.params
- * (self.upper_limit - self.params)
- * (self.y_train - tmp_vec1)
- )
- tmp2 = np.sum(self.params * (self.upper_limit - self.params))
- assert tmp2 != 0, "Something went wrong!"
- intercept = tmp1 / tmp2
- y = np.zeros(shape=(n_records), dtype=np.float32)
- y += np.tensordot(
- self.params * self.y_train, kernel_mat, axes=(0, 1)
- )
- y += intercept
- y = np.sign(y)
- return y
- def get_hamiltonian(
- self,
- X: np.array,
- y: np.array,
- ):
- n_records = X.shape[0]
- n_dims = X.shape[1]
- J = np.zeros(
- shape=(2 * n_records, 2 * n_records), dtype=np.float32
- )
- C = np.zeros(shape=(2 * n_records,), dtype=np.float32)
- for n in range(n_records):
- for m in range(n_records):
- J[n][m] = (
- 0.5 * y[n] * y[m] * self.kernel(X[n], X[m])
- + self.zeta * y[n] * y[m]
- )
- J[n][n] += self.eta
- J[n][n + n_records] = self.eta
- J[n + n_records][n] = self.eta
- J[n + n_records][n + n_records] = self.eta
- C[n] = -1.0 - 2.0 * self.eta * self.upper_limit
- C[n + n_records] = -2.0 * self.eta * self.upper_limit
- C = C.reshape((2 * n_records, 1))
- J = 0.5 * (J + J.transpose())
- return J, C, n_records * self.upper_limit
- def convert_sol_to_params(self, sol):
- assert len(sol) % 2 == 0, "Expected an even solution size!"
- sol = sol[: int(len(sol) / 2)]
- return np.array(sol)