Romove big files. Wrapping things up.

This commit is contained in:
Kirill Shakirov
2026-03-14 14:13:18 +01:00
parent 8d721eabdf
commit f64af0a248
27 changed files with 2840 additions and 533 deletions
-48
View File
@@ -581,15 +581,6 @@ version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df1d3c3b53da64cf5760482273a98e575c651a67eec7f77df96b5b642de8f039"
[[package]]
name = "lock_api"
version = "0.4.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "224399e74b87b5f3557511d98dff8b14089b3dadafcab6bb93eab67d3aace965"
dependencies = [
"scopeguard",
]
[[package]]
name = "log"
version = "0.4.29"
@@ -662,29 +653,6 @@ version = "1.70.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe"
[[package]]
name = "parking_lot"
version = "0.12.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93857453250e3077bd71ff98b6a65ea6621a19bb0f559a85248955ac12c45a1a"
dependencies = [
"lock_api",
"parking_lot_core",
]
[[package]]
name = "parking_lot_core"
version = "0.9.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1"
dependencies = [
"cfg-if",
"libc",
"redox_syscall",
"smallvec",
"windows-link",
]
[[package]]
name = "percent-encoding"
version = "2.3.2"
@@ -888,15 +856,6 @@ dependencies = [
"libc",
]
[[package]]
name = "redox_syscall"
version = "0.5.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d"
dependencies = [
"bitflags",
]
[[package]]
name = "regex"
version = "1.12.3"
@@ -939,12 +898,6 @@ dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "scopeguard"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "serde"
version = "1.0.228"
@@ -1092,7 +1045,6 @@ dependencies = [
"bytes",
"libc",
"mio",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2",
+1 -1
View File
@@ -17,7 +17,7 @@ prost = "0.14.3"
rand = "0.9.2"
redb = "3.1.0"
serde = { version = "1.0.228", features = ["derive"] }
tokio = { version = "1.49.0", features = ["full"] }
tokio = { version = "1.49.0", features = ["rt", "macros", "signal"] }
tokio-tungstenite = "0.28.0"
toml = "1.0.1"
tonic = "0.14.5"
+41 -28
View File
@@ -10,7 +10,7 @@ use std::{path::Path, u64, u128};
// const RANGE_STATUS: MultimapTableDefinition<bool, u128> =
// MultimapTableDefinition::new("range_status");
const JOBS_TABLE: TableDefinition<u64, (u16, u128, u128, u64, u64)> = TableDefinition::new("jobs");
const JOBS_TABLE: TableDefinition<u64, (u32, u128, u128, u64, u64)> = TableDefinition::new("jobs");
const JOBS_FREE_IDS: TableDefinition<u64, ()> = TableDefinition::new("jobs_free_ids");
const FOUND_KEYS_TABLE: TableDefinition<(u128, u128), u64> = TableDefinition::new("found_keys");
@@ -24,7 +24,7 @@ fn get_timestump() -> u64 {
#[derive(Copy, Clone, Debug)]
pub struct JobRecord {
pub id: u64,
pub range_id: u16,
pub range_id: u32,
pub tweak_key: u128,
pub start_key: u128,
pub len: u64,
@@ -32,7 +32,7 @@ pub struct JobRecord {
}
impl JobRecord {
pub fn from_data(k: u64, v: &(u16, u128, u128, u64, u64)) -> Self {
pub fn from_data(k: u64, v: &(u32, u128, u128, u64, u64)) -> Self {
Self {
id: k,
range_id: v.0,
@@ -47,7 +47,7 @@ impl JobRecord {
item: Result<
(
redb::AccessGuard<'_, u64>,
redb::AccessGuard<'_, (u16, u128, u128, u64, u64)>,
redb::AccessGuard<'_, (u32, u128, u128, u64, u64)>,
),
redb::StorageError,
>,
@@ -56,7 +56,7 @@ impl JobRecord {
Ok(Self::from_data(ag.0.value(), &ag.1.value()))
}
pub fn get_value(&self) -> (u16, u128, u128, u64, u64) {
pub fn get_value(&self) -> (u32, u128, u128, u64, u64) {
return (
self.range_id,
self.tweak_key,
@@ -69,7 +69,7 @@ impl JobRecord {
// search for abandoned job
// returns first job older than 3 hours
pub fn get_staled_job(
jobs_table: &mut redb::Table<'_, u64, (u16, u128, u128, u64, u64)>,
jobs_table: &mut redb::Table<'_, u64, (u32, u128, u128, u64, u64)>,
timeout: u64,
) -> Result<Option<JobRecord>, redb::Error> {
let current_time = get_timestump();
@@ -103,7 +103,7 @@ impl JobRecord {
// What we want is to check max and min ids
// and return id lower than min or grater than max
fn get_new_job_id(
jobs_table: &redb::Table<'_, u64, (u16, u128, u128, u64, u64)>,
jobs_table: &redb::Table<'_, u64, (u32, u128, u128, u64, u64)>,
) -> Result<u64, redb::Error> {
// If table is empty return 0 ID
if jobs_table.len()? == 0 {
@@ -132,9 +132,9 @@ impl JobRecord {
}
}
const RANGES: TableDefinition<u16, (u128, u128, u128, u128)> = TableDefinition::new("ranges");
const RANGES_TO_USE: TableDefinition<u16, ()> = TableDefinition::new("ranges_to_use");
const RANGES: TableDefinition<u32, (u128, u128, u128, u128)> = TableDefinition::new("ranges");
const RANGES_TO_USE: TableDefinition<u32, ()> = TableDefinition::new("ranges_to_use");
// to not owerflow u128 we calculate it in range 0..max and then substruct
// const SUB_DIV: u128 = 0x1000100010001000100010001;
// TWEAKS_PER_RANGE: u128 = 5192296858534827628530496329220096
@@ -147,7 +147,7 @@ const TWEAKS_PER_RANGE_ID_VICE: u128 = TWEAKS_PER_RANGE - 1;
#[derive(Copy, Clone, Debug)]
struct RangeRecord {
id: u16,
id: u32,
tweak_current: u128,
tweak_end: u128,
@@ -157,7 +157,7 @@ struct RangeRecord {
impl RangeRecord {
fn from_value(k: u16, v: &(u128, u128, u128, u128)) -> Self {
fn from_value(k: u32, v: &(u128, u128, u128, u128)) -> Self {
Self {
id: k,
tweak_current: v.0,
@@ -167,7 +167,7 @@ impl RangeRecord {
}
}
fn new(id: u16) -> Self {
fn new(id: u32) -> Self {
let tweak_start: u128 = id as u128 * TWEAKS_PER_RANGE;
Self {
@@ -259,8 +259,8 @@ impl RangeRecord {
#[derive(Debug)]
struct RangesTable<'a> {
ranges: redb::Table<'a, u16, (u128, u128, u128, u128)>,
ranges_to_use: redb::Table<'a, u16, ()>,
ranges: redb::Table<'a, u32, (u128, u128, u128, u128)>,
ranges_to_use: redb::Table<'a, u32, ()>,
}
impl<'a> RangesTable<'a> {
@@ -281,7 +281,7 @@ impl<'a> RangesTable<'a> {
})
}
fn get_range(&self, k: u16) -> Result<Option<RangeRecord>, redb::Error> {
fn get_range(&self, k: u32) -> Result<Option<RangeRecord>, redb::Error> {
Ok(self
.ranges
.get(k)?
@@ -501,30 +501,43 @@ pub fn db_get_progress(db: &Database) -> Result<f64, redb::Error> {
fn db_init(db: &Database) -> Result<(), redb::Error> {
let transct: redb::WriteTransaction = db.begin_write()?;
println!("Start DB init.");
let trx: redb::WriteTransaction = db.begin_write()?;
{
let mut ranges = transct.open_table(RANGES)?;
let mut ranges_to_use = transct.open_table(RANGES_TO_USE)?;
// create new ranges
for r_id in 0..=u16::MAX {
let r = RangeRecord::new(r_id);
ranges.insert(r_id, r.value())?;
ranges_to_use.insert(r_id, ())?;
let mut ranges = trx.open_table(RANGES)?;
let mut ranges_to_use = trx.open_table(RANGES_TO_USE)?;
for k in 0..=u16::MAX {
// create new ranges
let r = RangeRecord::new(k as u32);
ranges.insert(k as u32, r.value())?;
ranges_to_use.insert(k as u32, ())?;
}
println!("ensure that all tables is created");
// ensure that all tables is created
trx.open_table(FOUND_KEYS_TABLE)?.len()?;
trx.open_table(JOBS_TABLE)?.len()?;
trx.open_table(JOBS_FREE_IDS)?.len()?;
}
transct.commit()?;
println!("Commit transaction");
trx.commit()?;
println!("Done!");
Ok(())
}
pub fn db_open(db_file_path: &Path) -> Result<Database, redb::Error> {
let db = Database::create(db_file_path)?;
let trx = db.begin_read()?;
let trx = db.begin_write()?;
let ranges_len = {
println!("Opening RANGES to get len");
let ranges_table = trx.open_table(RANGES)?;
println!("Try to get len");
ranges_table.len()?
};
trx.close()?;
println!("Commit transaction");
trx.commit()?;
if ranges_len == 0 {
db_init(&db)?;
}
+128 -128
View File
@@ -4,15 +4,15 @@
// }
fn u128_to_u32arr(a: u128) -> [u32;4] {
let mut res = [0u32;4];
let a_bytes = a.to_le_bytes();
let chunks = a_bytes.as_chunks::<4>().0;
for i in 0..4 {
res[i] = u32::from_le_bytes(chunks[i]);
}
return res;
}
// fn u128_to_u32arr(a: u128) -> [u32;4] {
// let mut res = [0u32;4];
// let a_bytes = a.to_le_bytes();
// let chunks = a_bytes.as_chunks::<4>().0;
// for i in 0..4 {
// res[i] = u32::from_le_bytes(chunks[i]);
// }
// return res;
// }
pub fn u128_to_u64arr(a: u128) -> [u64;2] {
let mut res = [0u64;2];
@@ -32,143 +32,143 @@ pub fn u64arr_to_u128(a:(u64,u64)) -> u128 {
return u128::from_le_bytes(bytes_data);
}
pub fn add_u128_to_u256(a: &[u32; 8], b: u128) -> ([u32; 8], bool) {
let mut res: [u32; 8] = [0; 8];
let mut carry = false;
// pub fn add_u128_to_u256(a: &[u32; 8], b: u128) -> ([u32; 8], bool) {
// let mut res: [u32; 8] = [0; 8];
// let mut carry = false;
// convert b u128 value to bytes then to u32 and then add it to a
b.to_le_bytes()
.as_chunks::<4>()
.0
.iter()
.enumerate()
.for_each(|(i, sl)| {
(res[i], carry) = a[i].carrying_add(u32::from_le_bytes(*sl), carry);
});
// // convert b u128 value to bytes then to u32 and then add it to a
// b.to_le_bytes()
// .as_chunks::<4>()
// .0
// .iter()
// .enumerate()
// .for_each(|(i, sl)| {
// (res[i], carry) = a[i].carrying_add(u32::from_le_bytes(*sl), carry);
// });
// propagate carry till the end of a
for idx in 4..8 {
(res[idx], carry) = a[idx].carrying_add(0, carry);
}
// // propagate carry till the end of a
// for idx in 4..8 {
// (res[idx], carry) = a[idx].carrying_add(0, carry);
// }
return (res, carry);
}
// return (res, carry);
// }
fn add_u32_to_u256(a: &[u32; 8], b: u32) -> ([u32; 8], bool) {
let mut res: [u32; 8] = [0; 8];
let mut carry = false;
(res[0], carry) = a[0].carrying_add(b, carry);
// fn add_u32_to_u256(a: &[u32; 8], b: u32) -> ([u32; 8], bool) {
// let mut res: [u32; 8] = [0; 8];
// let mut carry = false;
// (res[0], carry) = a[0].carrying_add(b, carry);
for idx in 1..8 {
(res[idx], carry) = a[idx].carrying_add(0, carry);
}
// for idx in 1..8 {
// (res[idx], carry) = a[idx].carrying_add(0, carry);
// }
return (res, carry);
}
// return (res, carry);
// }
fn add_u32_to_u256_(a: &mut [u32; 8], b: u32) -> bool {
let mut carry = false;
(a[0], carry) = a[0].carrying_add(b, carry);
// fn add_u32_to_u256_(a: &mut [u32; 8], b: u32) -> bool {
// let mut carry = false;
// (a[0], carry) = a[0].carrying_add(b, carry);
for idx in 1..8 {
(a[idx], carry) = a[idx].carrying_add(0, carry);
}
// for idx in 1..8 {
// (a[idx], carry) = a[idx].carrying_add(0, carry);
// }
return carry;
}
// return carry;
// }
fn bytes_from_chars(chars_chunk: &[char]) -> [u8; 4] {
let mut res: [u8; 4] = [0; 4];
let mut idx: usize = 0;
chars_chunk.chunks_exact(2).for_each(|b_c| {
if idx < 4 {
match u8::from_str_radix(&b_c.iter().collect::<String>(), 16) {
Ok(n) => res[idx] = n,
Err(_) => (),
}
idx += 1;
}
});
return res;
}
// fn bytes_from_chars(chars_chunk: &[char]) -> [u8; 4] {
// let mut res: [u8; 4] = [0; 4];
// let mut idx: usize = 0;
// chars_chunk.chunks_exact(2).for_each(|b_c| {
// if idx < 4 {
// match u8::from_str_radix(&b_c.iter().collect::<String>(), 16) {
// Ok(n) => res[idx] = n,
// Err(_) => (),
// }
// idx += 1;
// }
// });
// return res;
// }
fn bignum_from_hex(hex: &str) -> [u32; 8] {
let mut res: [u32; 8] = [0; 8];
let mut idx: usize = 0;
let chars_hex = hex.chars().collect::<Vec<char>>();
chars_hex
.chunks_exact(8)
.rev()
.map(|chunk| bytes_from_chars(chunk))
.for_each(|b_arr| {
if idx < 8 {
res[idx] = u32::from_be_bytes(b_arr);
idx += 1;
}
});
return res;
}
// fn bignum_from_hex(hex: &str) -> [u32; 8] {
// let mut res: [u32; 8] = [0; 8];
// let mut idx: usize = 0;
// let chars_hex = hex.chars().collect::<Vec<char>>();
// chars_hex
// .chunks_exact(8)
// .rev()
// .map(|chunk| bytes_from_chars(chunk))
// .for_each(|b_arr| {
// if idx < 8 {
// res[idx] = u32::from_be_bytes(b_arr);
// idx += 1;
// }
// });
// return res;
// }
fn hex_fmt_byte(n: u32) -> String {
let res: String = n
.to_be_bytes()
.iter()
.map(|b| format!("{:02x}", b))
.collect();
return res;
}
// fn hex_fmt_byte(n: u32) -> String {
// let res: String = n
// .to_be_bytes()
// .iter()
// .map(|b| format!("{:02x}", b))
// .collect();
// return res;
// }
fn bignum_to_hex(a: &[u32; 8]) -> String {
let res: String = a
.iter()
.rev()
.map(|n| hex_fmt_byte(*n))
.collect::<Vec<String>>()
.join("");
return res;
}
// fn bignum_to_hex(a: &[u32; 8]) -> String {
// let res: String = a
// .iter()
// .rev()
// .map(|n| hex_fmt_byte(*n))
// .collect::<Vec<String>>()
// .join("");
// return res;
// }
#[cfg(test)]
mod num_utils_tests {
use super::*;
// #[cfg(test)]
// mod num_utils_tests {
// use super::*;
#[test]
fn test_add() {
use std::io::{BufRead, BufReader};
use std::process::{Command, Stdio};
// #[test]
// fn test_add() {
// use std::io::{BufRead, BufReader};
// use std::process::{Command, Stdio};
let test_gen_cmd = "/home/kira/Development/Rust/nyash-aes-xts256-plain64/nyash_client/src/tests/gen_test_data.py";
let mut child = Command::new(test_gen_cmd)
.stdout(Stdio::piped())
.spawn()
.unwrap();
// let test_gen_cmd = "/home/kira/Development/Rust/nyash-aes-xts256-plain64/nyash_client/src/tests/gen_test_data.py";
// let mut child = Command::new(test_gen_cmd)
// .stdout(Stdio::piped())
// .spawn()
// .unwrap();
let gen_stdout = child
.stdout
.take()
.ok_or("Failed to capture stdout")
.unwrap();
let gen_reader = BufReader::new(gen_stdout);
// let gen_stdout = child
// .stdout
// .take()
// .ok_or("Failed to capture stdout")
// .unwrap();
// let gen_reader = BufReader::new(gen_stdout);
for r_line in gen_reader.lines() {
let test_line: String = r_line.unwrap(); // Handle any I/O errors
let test_data_line = test_line.split(' ').collect::<Vec<&str>>();
let num_to_add = u32::from_str_radix(test_data_line[0], 10).unwrap();
let t0 = bignum_from_hex(test_data_line[1]);
// for r_line in gen_reader.lines() {
// let test_line: String = r_line.unwrap(); // Handle any I/O errors
// let test_data_line = test_line.split(' ').collect::<Vec<&str>>();
// let num_to_add = u32::from_str_radix(test_data_line[0], 10).unwrap();
// let t0 = bignum_from_hex(test_data_line[1]);
let t1_test = add_u32_to_u256(&t0, 1).0;
let t2_test = add_u32_to_u256(&t0, num_to_add).0;
// let t1_test = add_u32_to_u256(&t0, 1).0;
// let t2_test = add_u32_to_u256(&t0, num_to_add).0;
let res_actual = format!(
"{} {} {} {}",
num_to_add,
bignum_to_hex(&t0),
bignum_to_hex(&t1_test),
bignum_to_hex(&t2_test)
);
assert_eq!(test_line, res_actual);
}
// let res_actual = format!(
// "{} {} {} {}",
// num_to_add,
// bignum_to_hex(&t0),
// bignum_to_hex(&t1_test),
// bignum_to_hex(&t2_test)
// );
// assert_eq!(test_line, res_actual);
// }
let _ = child.wait().unwrap();
}
}
// let _ = child.wait().unwrap();
// }
// }
+62 -10
View File
@@ -25,6 +25,31 @@ mod database;
mod num_utils;
mod config;
async fn shutdown_signal() {
let ctrl_c = async {
tokio::signal::ctrl_c()
.await
.expect("Failed to install Ctrl+C handler");
};
#[cfg(unix)]
let terminate = async {
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.expect("Failed to install SIGTERM handler")
.recv()
.await;
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
_ = ctrl_c => println!("Received Ctrl+C, initiating shutdown"),
_ = terminate => println!("Received SIGTERM, initiating shutdown"),
}
}
fn work_rec_to_work_data(work_rec: database::JobRecord) -> WorkData {
let tw_key = num_utils::u128_to_u64arr(work_rec.tweak_key);
let st_key = num_utils::u128_to_u64arr(work_rec.start_key);
@@ -42,8 +67,8 @@ fn work_rec_to_work_data(work_rec: database::JobRecord) -> WorkData {
// Background task to update progress from DB
async fn update_service_progress(progress: Arc<AsyncRwLock<f64>>, db: Arc<redb::Database>) {
loop {
async fn update_service_progress(progress: Arc<AsyncRwLock<f64>>, db: Arc<redb::Database>, term: Arc<AsyncRwLock<bool>>) {
while *term.read().await == false {
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
match database::db_get_progress(&db) {
Ok(p) => {
@@ -66,7 +91,11 @@ pub struct NyashService {
#[tonic::async_trait]
impl NyashLuks for NyashService {
async fn request_work(&self, request: Request<WorkRequest>) -> Result<Response<WorkReply>, Status> {
println!("Got a request: {:?}", request);
let rem_addr = match request.remote_addr() {
Some(v) => v.to_string(),
None => "None".to_string()
};
println!("Remoute: {}, request_work.", rem_addr);
if *self.key_found.read().await == true {
@@ -98,7 +127,12 @@ impl NyashLuks for NyashService {
}
async fn commit_work(&self, request: Request<WorkCommit>) -> Result<Response<CommitReply>, Status> {
println!("Got a request: {:?}", request);
let rem_addr = match request.remote_addr() {
Some(v) => v.to_string(),
None => "None".to_string()
};
println!("Remoute: {}, commit_work.", rem_addr);
let work_commit = request.into_inner();
let put_k_no_err: bool = match work_commit.result {
@@ -133,7 +167,12 @@ impl NyashLuks for NyashService {
}
}
async fn request_progress(&self, _request: Request<ProgressRequest>) -> Result<Response<ProgressReply>, Status> {
async fn request_progress(&self, request: Request<ProgressRequest>) -> Result<Response<ProgressReply>, Status> {
let rem_addr = match request.remote_addr() {
Some(v) => v.to_string(),
None => "None".to_string()
};
println!("Remoute: {}, request_progress.", rem_addr);
Ok(Response::new(ProgressReply {
key_found: *self.key_found.read().await,
progress: *self.progress.read().await,
@@ -153,7 +192,7 @@ struct ProgArgs {
#[tokio::main]
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = ProgArgs::parse();
@@ -165,7 +204,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let db_file = format!("{}data.rdb",db_dir);
let db_path = PathBuf::from_str(db_file.as_str()).unwrap();
let db = database::db_open(&db_path).expect("Error Opening database");
let mut db = database::db_open(&db_path).expect("Error Opening database");
println!("Compact db");
db.compact()?;
// Get the address to bind to
let addr = format!("{}:{}",server_config.bind_addr, server_config.listen_port);
@@ -190,22 +231,33 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let shared_progress = Arc::new(AsyncRwLock::new(progress));
let shared_db = Arc::new(db);
let shared_key_found = Arc::new(AsyncRwLock::new(false));
let shared_termination: Arc<AsyncRwLock<bool>> = Arc::new(AsyncRwLock::new(false));
// let service_state = Arc::new(AsyncRwLock::new(ServiceState {
// db: Arc::new(db),
// key_found: false,
// progress: progress,
// }));
let term_clone = shared_termination.clone();
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
tokio::spawn(async move {
shutdown_signal().await;
//signaling that we should stop
let mut guard = term_clone.write().await;
*guard = true;
let _ = shutdown_tx.send(());
});
// Spawn the periodic updater
tokio::spawn(update_service_progress(shared_progress.clone(), shared_db.clone()));
tokio::spawn(update_service_progress(shared_progress.clone(), shared_db.clone(), shared_termination.clone()));
let nyash_service = NyashService{db:shared_db, progress:shared_progress, key_found:shared_key_found};
Server::builder()
.add_service(NyashLuksServer::new(nyash_service))
.serve(addr)
.serve_with_shutdown(addr, async { shutdown_rx.await.ok(); })
.await?;
Ok(())
}