Thanks, @umutsahin, for your prompt response. Here is the code snippet where I’ve been using Concrete Numpy:
import concrete.numpy as cnp
#from concrete import fhe
import numpy as np
import csv
import time
def _PSI_impl(g_key, given_key):
intersect = np.bitwise_xor(g_key, given_key)
print(“Encrypted Intersection is :”, intersect)
if_match = (np.sum(np.bitwise_xor(g_key, given_key))==0)
is_sub_match = (np.sum(np.bitwise_and(g_key, given_key)))
if_sub_match = np.logical_xor(is_sub_match, 0)
if_no_match = (np.sum(np.bitwise_xor(g_key, given_key))==32)
match_status = if_no_match * 2 + (1 + if_sub_match) - if_match
return match_status
class HomomorphicPSIS:
_PSI_circuit: cnp.Circuit
def __init__(self):
configuration = cnp.Configuration(
enable_unsafe_features=True,
use_insecure_key_cache=True,
insecure_key_cache_location=".keys",
)
PSI_compiler = cnp.Compiler(
_PSI_impl,
{"g_key": "encrypted", "given_key": "encrypted"}
)
inputset_PSI = [
(
np.ones(32, dtype=np.int32),
np.ones(32, dtype=np.int32),
)
]
print("Compiling PSI circuit...")
start = time.time()
self._PSI_circuit = PSI_compiler.compile(inputset_PSI, configuration)
end = time.time()
print(f"(took {end - start:.3f} seconds)")
def load_data_from_csv(self, csv_file_path):
mask_array = []
with open(csv_file_path, 'r') as csv_file:
csv_reader = csv.reader(csv_file)
next(csv_reader)
for row in csv_reader:
mask = int(row[2])
mask_array.append(mask)
return np.array(mask_array, dtype=np.int32)
def PSI(self, g_key, given_key):
print("\nProcessing keys:", g_key, given_key)
start = time.time()
encrypted_keys = self._PSI_circuit.encrypt(g_key, given_key)
result_keys = self._PSI_circuit.run(encrypted_keys)
end = time.time()
print(f"Encryption (took {end - start:.3f} seconds)")
start = time.time()
decrypted_result = self._PSI_circuit.decrypt(result_keys)
end = time.time()
print(f"Decryption (took {end - start:.3f} seconds)")
print("Decrypted results for intersecting keys:", decrypted_result)
return decrypted_result
if name == “main”:
server = HomomorphicPSIS()
decrypted_results_array =
for i in range(5):
start = time.time()
g_key = server.load_data_from_csv(f'g_keys_dataset_with_third_column_{i}.csv')
end = time.time()
print(f"Load g_key from documet{i} (took {end - start:.3f} seconds)")
start = time.time()
given_key = server.load_data_from_csv("snomed_code_mask_with_conditions.csv")
end = time.time()
print(f"Load given key from document (took {end - start:.3f} seconds)")
print(" g_key is:", g_key)
print("given_key is:", given_key)
start = time.time()
print("encrypt and serialize data")
decrypted_result = server.PSI(g_key, given_key)
decrypted_results_array.append(decrypted_result)
end = time.time()
print(f"Pair encryption and serialization (took {end - start:.3f} seconds)")
# Iterate over intersection_array with document index
documents_with_full_match = []
documents_with_sub_match = []
documents_without_match = []
for i, intersection_result in enumerate(decrypted_results_array):
if intersection_result == 1:
documents_with_full_match.append(i)
elif intersection_result == 2:
documents_with_sub_match.append(i)
elif intersection_result == 3:
documents_without_match.append(i)
# Print the lists to see the categorized documents
print("Documents with full match:", documents_with_full_match)
print("Documents with sub match:", documents_with_sub_match)
print("Documents without match:", documents_without_match)
This function is designed to compute the encrypted intersection of two sets represented by g_key
and given_key
, and determine the match status based on logical operations. Additionally, this script has been executed as described previously, utilizing both Concrete Numpy(cnp) and Concrete FHE(fhe), with noted differences in performance and accuracy outcomes.
Thank you in advance for your help and insights on this matter.