Files
aes-xts-pur64/tests/test_vectors.py
T

228 lines
8.2 KiB
Python

# aes-xts-pur64 is OpenCL code for aes-xts256-plain64 encryption compatible with LUKS
#
# Copyright (C) 2025 Kirill Shakirov
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import json
import time
import argparse
from nyanger.simple.log_writers.file_writer import FileWriter
from nyanger.simple.log_writers.console_writer import ConsoleWriter
import nyanger.simple.static as nya_stat
import pyopencl as cl
# Init logger
log: nya_stat.Nyanger
def load_program(include_dirs: list[str], file_name: str, ctx: cl.Context) -> cl.Program:
with open(file_name, 'rt') as ft:
prog_src = ft.read()
opt_list = [f"-I {d}" for d in include_dirs]
prg = cl.Program(ctx, prog_src).build(options=opt_list)
return prg
def test(cl_src_file: str, cl_incl_dir: str, vectors_file: str):
log.info("Starting tests...")
total_errors = 0
start_time = time.time()
log.info(f"Loading test vectors from {vectors_file}")
with open(vectors_file, "rt") as vec_file:
test_json = json.load(vec_file)
b_key = bytes.fromhex(test_json["encryption_key"])
test_vectors = test_json["vectors"]
test_vectors_num = len(test_vectors)
log.info(f"Data key: {b_key[:16].hex()}")
log.info(f"Tweak key: {b_key[16:].hex()}")
b_u_data = bytearray(test_vectors_num * 16)
b_enc_data = bytearray(test_vectors_num * 16)
b_sec_nums = bytearray(test_vectors_num * 8)
b_blk_nums = bytearray(test_vectors_num * 4)
for i, _vect in enumerate(test_vectors):
of16 = i * 16
of8 = i * 8
of4 = i * 4
b_u_data[of16:of16 + 16] = bytes.fromhex(_vect["unencrypted_data"])
b_enc_data[of16:of16 + 16] = bytes.fromhex(_vect["encrypted_data"])
b_sec_nums[of8:of8 + 8] = _vect["sector_number"].to_bytes(length=8, byteorder="little")
b_blk_nums[of4:of4 + 4] = _vect["block_number"].to_bytes(length=4, byteorder="little")
log.info(f"{test_vectors_num} test vectors loaded.")
log.info("Getting OCL devices...")
# get OCL devices
cl_platforms = cl.get_platforms()
log.info(f"{len(cl_platforms)} platforms found.")
cl_devices = []
for platform in cl_platforms:
_dev = platform.get_devices(cl.device_type.ALL)
log.info(f"Platform \"{platform.name}\" have {len(_dev)} devices: {[_d.name for _d in _dev]}")
cl_devices.extend(_dev)
print()
for cl_device in cl_devices:
log.info(f"Testing on {cl.device_type.to_string(cl_device.type)} device: {cl_device.name}")
cl_ctx = cl.Context(devices=[cl_device])
cl_queue = cl.CommandQueue(cl_ctx)
cl_prg = load_program([cl_incl_dir], cl_src_file, cl_ctx)
encrypt_data_kernel = cl_prg.encrypt_data
decrypt_data_kernel = cl_prg.decrypt_data
mf = cl.mem_flags
clb_key = cl.Buffer(cl_ctx, mf.READ_ONLY, size=32)
clb_sec_nums = cl.Buffer(cl_ctx, mf.READ_ONLY, size=8 * test_vectors_num)
clb_blk_nums = cl.Buffer(cl_ctx, mf.READ_ONLY, size=4 * test_vectors_num)
clb_u_data = cl.Buffer(cl_ctx, mf.READ_ONLY, size=16 * test_vectors_num)
clb_enc_data = cl.Buffer(cl_ctx, mf.WRITE_ONLY, size=16 * test_vectors_num)
log.info("Executing encrypt kernel...")
cl.enqueue_copy(cl_queue, clb_sec_nums, b_sec_nums)
cl.enqueue_copy(cl_queue, clb_blk_nums, b_blk_nums)
cl.enqueue_copy(cl_queue, clb_key, b_key)
cl.enqueue_copy(cl_queue, clb_u_data, b_u_data)
knl_e = encrypt_data_kernel(cl_queue, (test_vectors_num,), None,
clb_sec_nums, clb_blk_nums, clb_key, clb_u_data, clb_enc_data)
b_test_enc_data = bytearray(test_vectors_num * 16)
cl.enqueue_copy(cl_queue, b_test_enc_data, clb_enc_data, wait_for=[knl_e])
log.info("Executing decrypt kernel...")
cl.enqueue_copy(cl_queue, clb_sec_nums, b_sec_nums)
cl.enqueue_copy(cl_queue, clb_blk_nums, b_blk_nums)
cl.enqueue_copy(cl_queue, clb_key, b_key)
cl.enqueue_copy(cl_queue, clb_u_data, b_enc_data)
knl_e = decrypt_data_kernel(cl_queue, (test_vectors_num,), None,
clb_sec_nums, clb_blk_nums, clb_key, clb_u_data, clb_enc_data)
b_test_u_data = bytearray(test_vectors_num * 16)
cl.enqueue_copy(cl_queue, b_test_u_data, clb_enc_data, wait_for=[knl_e])
cl_queue.finish()
log.info("Comparing encryption results...")
right = 0
wrong = 0
for i in range(test_vectors_num):
of16 = i * 16
if b_enc_data[of16:of16 + 16] == b_test_enc_data[of16:of16 + 16]:
right += 1
else:
wrong += 1
log.error(f"Missmatch of test vector {i} !")
log.error(f" expected:{b_enc_data[of16:of16 + 16].hex()}")
log.error(f" actual:{b_test_enc_data[of16:of16 + 16].hex()}")
if wrong > 0:
log.error(f"Test failed, total of {wrong} mismatch results!")
else:
log.info("All match! Success!")
total_errors += wrong
log.info("Comparing decryption results...")
right = 0
wrong = 0
for i in range(test_vectors_num):
of16 = i * 16
if b_u_data[of16:of16 + 16] == b_test_u_data[of16:of16 + 16]:
right += 1
else:
wrong += 1
log.error(f"Missmatch of test vector {i} !")
log.error(f" expected:{b_u_data[of16:of16 + 16].hex()}")
log.error(f" actual:{b_test_u_data[of16:of16 + 16].hex()}")
if wrong > 0:
log.error(f"Test failed, total of {wrong} mismatch results!\n")
else:
log.info("All match! Success!\n")
total_errors += wrong
run_time = time.time() - start_time
if total_errors > 0:
log.error(f"--- Finish in {run_time} seconds with {total_errors} failed tests ---\n\n")
else:
log.info(f"--- Finish in {run_time} seconds without errors ---\n\n")
def cmd_parse():
# Initialize arguments parser
parser = argparse.ArgumentParser(
prog="test_vectors.py",
description="This test suite for OpenCL AES-XTS implementation",
epilog="Have a nice day!")
parser.add_argument("-s", "--cl-src-file",
action="store",
default="../src/test_aes_xts256_plain.cl",
help="OpenCL src file with defined test kernels.",
required=False)
parser.add_argument("-d", "--cl-incl-dir",
action="store",
default="../src",
help="OpenCL include dir with aes256_xts_plain.cl src file.",
required=False)
parser.add_argument("-v", "--test-vectors",
action="store",
default="./data/test_vectors.json",
help="Path to json file with test vectors.",
required=False)
parser.add_argument("-l", "--log-file",
action="store",
default=None,
help="Path to optional log file.",
required=False)
return parser.parse_args()
def main():
argumets = cmd_parse()
# Init logger
global log
if argumets.log_file is not None:
log = nya_stat.get_logger("nyan", log_writers=[ConsoleWriter(), FileWriter(argumets.log_file)])
else:
log = nya_stat.get_logger("nyan")
log.start()
test(cl_src_file=argumets.cl_src_file,
cl_incl_dir=argumets.cl_incl_dir,
vectors_file=argumets.test_vectors)
log.stop()
if __name__ == '__main__':
main()