# aes-xts-pur64 is OpenCL code for aes-xts256-plain64 encryption compatible with LUKS # # Copyright (C) 2025 Kirill Shakirov # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import json import time import argparse from nyanger.simple.log_writers.file_writer import FileWriter from nyanger.simple.log_writers.console_writer import ConsoleWriter import nyanger.simple.static as nya_stat import pyopencl as cl # Init logger log: nya_stat.Nyanger def load_program(include_dirs: list[str], file_name: str, ctx: cl.Context) -> cl.Program: with open(file_name, 'rt') as ft: prog_src = ft.read() opt_list = [f"-I {d}" for d in include_dirs] prg = cl.Program(ctx, prog_src).build(options=opt_list) return prg def test(cl_src_file: str, cl_incl_dir: str, vectors_file: str): log.info("Starting tests...") total_errors = 0 start_time = time.time() log.info(f"Loading test vectors from {vectors_file}") with open(vectors_file, "rt") as vec_file: test_json = json.load(vec_file) b_key = bytes.fromhex(test_json["encryption_key"]) test_vectors = test_json["vectors"] test_vectors_num = len(test_vectors) log.info(f"Data key: {b_key[:16].hex()}") log.info(f"Tweak key: {b_key[16:].hex()}") b_u_data = bytearray(test_vectors_num * 16) b_enc_data = bytearray(test_vectors_num * 16) b_sec_nums = bytearray(test_vectors_num * 8) b_blk_nums = bytearray(test_vectors_num * 4) for i, _vect in enumerate(test_vectors): of16 = i * 16 of8 = i * 8 of4 = i * 4 b_u_data[of16:of16 + 16] = bytes.fromhex(_vect["unencrypted_data"]) b_enc_data[of16:of16 + 16] = bytes.fromhex(_vect["encrypted_data"]) b_sec_nums[of8:of8 + 8] = _vect["sector_number"].to_bytes(length=8, byteorder="little") b_blk_nums[of4:of4 + 4] = _vect["block_number"].to_bytes(length=4, byteorder="little") log.info(f"{test_vectors_num} test vectors loaded.") log.info("Getting OCL devices...") # get OCL devices cl_platforms = cl.get_platforms() log.info(f"{len(cl_platforms)} platforms found.") cl_devices = [] for platform in cl_platforms: _dev = platform.get_devices(cl.device_type.ALL) log.info(f"Platform \"{platform.name}\" have {len(_dev)} devices: {[_d.name for _d in _dev]}") cl_devices.extend(_dev) print() for cl_device in cl_devices: log.info(f"Testing on {cl.device_type.to_string(cl_device.type)} device: {cl_device.name}") cl_ctx = cl.Context(devices=[cl_device]) cl_queue = cl.CommandQueue(cl_ctx) cl_prg = load_program([cl_incl_dir], cl_src_file, cl_ctx) encrypt_data_kernel = cl_prg.encrypt_data decrypt_data_kernel = cl_prg.decrypt_data mf = cl.mem_flags clb_key = cl.Buffer(cl_ctx, mf.READ_ONLY, size=32) clb_sec_nums = cl.Buffer(cl_ctx, mf.READ_ONLY, size=8 * test_vectors_num) clb_blk_nums = cl.Buffer(cl_ctx, mf.READ_ONLY, size=4 * test_vectors_num) clb_u_data = cl.Buffer(cl_ctx, mf.READ_ONLY, size=16 * test_vectors_num) clb_enc_data = cl.Buffer(cl_ctx, mf.WRITE_ONLY, size=16 * test_vectors_num) log.info("Executing encrypt kernel...") cl.enqueue_copy(cl_queue, clb_sec_nums, b_sec_nums) cl.enqueue_copy(cl_queue, clb_blk_nums, b_blk_nums) cl.enqueue_copy(cl_queue, clb_key, b_key) cl.enqueue_copy(cl_queue, clb_u_data, b_u_data) knl_e = encrypt_data_kernel(cl_queue, (test_vectors_num,), None, clb_sec_nums, clb_blk_nums, clb_key, clb_u_data, clb_enc_data) b_test_enc_data = bytearray(test_vectors_num * 16) cl.enqueue_copy(cl_queue, b_test_enc_data, clb_enc_data, wait_for=[knl_e]) log.info("Executing decrypt kernel...") cl.enqueue_copy(cl_queue, clb_sec_nums, b_sec_nums) cl.enqueue_copy(cl_queue, clb_blk_nums, b_blk_nums) cl.enqueue_copy(cl_queue, clb_key, b_key) cl.enqueue_copy(cl_queue, clb_u_data, b_enc_data) knl_e = decrypt_data_kernel(cl_queue, (test_vectors_num,), None, clb_sec_nums, clb_blk_nums, clb_key, clb_u_data, clb_enc_data) b_test_u_data = bytearray(test_vectors_num * 16) cl.enqueue_copy(cl_queue, b_test_u_data, clb_enc_data, wait_for=[knl_e]) cl_queue.finish() log.info("Comparing encryption results...") right = 0 wrong = 0 for i in range(test_vectors_num): of16 = i * 16 if b_enc_data[of16:of16 + 16] == b_test_enc_data[of16:of16 + 16]: right += 1 else: wrong += 1 log.error(f"Missmatch of test vector {i} !") log.error(f" expected:{b_enc_data[of16:of16 + 16].hex()}") log.error(f" actual:{b_test_enc_data[of16:of16 + 16].hex()}") if wrong > 0: log.error(f"Test failed, total of {wrong} mismatch results!") else: log.info("All match! Success!") total_errors += wrong log.info("Comparing decryption results...") right = 0 wrong = 0 for i in range(test_vectors_num): of16 = i * 16 if b_u_data[of16:of16 + 16] == b_test_u_data[of16:of16 + 16]: right += 1 else: wrong += 1 log.error(f"Missmatch of test vector {i} !") log.error(f" expected:{b_u_data[of16:of16 + 16].hex()}") log.error(f" actual:{b_test_u_data[of16:of16 + 16].hex()}") if wrong > 0: log.error(f"Test failed, total of {wrong} mismatch results!\n") else: log.info("All match! Success!\n") total_errors += wrong run_time = time.time() - start_time if total_errors > 0: log.error(f"--- Finish in {run_time} seconds with {total_errors} failed tests ---\n\n") else: log.info(f"--- Finish in {run_time} seconds without errors ---\n\n") def cmd_parse(): # Initialize arguments parser parser = argparse.ArgumentParser( prog="test_vectors.py", description="This test suite for OpenCL AES-XTS implementation", epilog="Have a nice day!") parser.add_argument("-s", "--cl-src-file", action="store", default="../src/test_aes_xts256_plain.cl", help="OpenCL src file with defined test kernels.", required=False) parser.add_argument("-d", "--cl-incl-dir", action="store", default="../src", help="OpenCL include dir with aes256_xts_plain.cl src file.", required=False) parser.add_argument("-v", "--test-vectors", action="store", default="./data/test_vectors.json", help="Path to json file with test vectors.", required=False) parser.add_argument("-l", "--log-file", action="store", default=None, help="Path to optional log file.", required=False) return parser.parse_args() def main(): argumets = cmd_parse() # Init logger global log if argumets.log_file is not None: log = nya_stat.get_logger("nyan", log_writers=[ConsoleWriter(), FileWriter(argumets.log_file)]) else: log = nya_stat.get_logger("nyan") log.start() test(cl_src_file=argumets.cl_src_file, cl_incl_dir=argumets.cl_incl_dir, vectors_file=argumets.test_vectors) log.stop() if __name__ == '__main__': main()