diff --git a/nyash_server/src/database.rs b/nyash_server/src/database.rs index 2b270ce..e520e2e 100644 --- a/nyash_server/src/database.rs +++ b/nyash_server/src/database.rs @@ -1,8 +1,7 @@ use crate::num_utils; use rand::{self, Rng, distr::weighted::Weight}; use redb::{ - Database, Key, MultimapTableDefinition, ReadableMultimapTable, ReadableTable, ReadableTableMetadata, - TableDefinition, + Database, Key, MultimapTableDefinition, ReadableDatabase, ReadableMultimapTable, ReadableTable, ReadableTableMetadata, TableDefinition }; // use std::collections::HashMap; use std::{ops::Not, u64, u128}; @@ -15,7 +14,7 @@ const COMPLETED_RANGES: TableDefinition = TableDefinition::new("comp // const RANGE_STATUS: MultimapTableDefinition = // MultimapTableDefinition::new("range_status"); -const JOBS_TABLE: TableDefinition = TableDefinition::new("jobs"); +const JOBS_TABLE: TableDefinition = TableDefinition::new("jobs"); const JOBS_FREE_IDS: TableDefinition = TableDefinition::new("jobs_free_ids"); // get time is seconds @@ -30,21 +29,19 @@ struct JobRecord { range_id: u16, tweak_key: u128, start_key: u128, - on_range_tail: bool, len: u64, start_time: u64, } impl JobRecord { - pub fn from_data(k: u64, v: &(u16, u128, u128, bool, u64, u64)) -> Self { + pub fn from_data(k: u64, v: &(u16, u128, u128, u64, u64)) -> Self { Self { id: k, range_id: v.0, tweak_key: v.1, start_key: v.2, - on_range_tail: v.3, - len: v.4, - start_time: v.5, + len: v.3, + start_time: v.4, } } @@ -52,7 +49,7 @@ impl JobRecord { item: Result< ( redb::AccessGuard<'_, u64>, - redb::AccessGuard<'_, (u16, u128, u128, bool, u64, u64)>, + redb::AccessGuard<'_, (u16, u128, u128, u64, u64)>, ), redb::StorageError, >, @@ -61,12 +58,11 @@ impl JobRecord { Ok(Self::from_data(ag.0.value(), &ag.1.value())) } - pub fn get_value(&self) -> (u16, u128, u128, bool, u64, u64) { + pub fn get_value(&self) -> (u16, u128, u128, u64, u64) { return ( self.range_id, self.tweak_key, self.start_key, - self.on_range_tail, self.len, self.start_time, ); @@ -75,7 +71,7 @@ impl JobRecord { // search for abandoned job // returns first job older than 3 hours pub fn get_staled_job( - jobs_table: &mut redb::Table<'_, u64, (u16, u128, u128, bool, u64, u64)>, + jobs_table: &mut redb::Table<'_, u64, (u16, u128, u128, u64, u64)>, timeout: u64, ) -> Result, redb::Error> { let current_time = get_timestump(); @@ -109,7 +105,7 @@ impl JobRecord { // What we want is to check max and min ids // and return id lower than min or grater than max fn get_new_job_id( - jobs_table: &redb::Table<'_, u64, (u16, u128, u128, bool, u64, u64)>, + jobs_table: &redb::Table<'_, u64, (u16, u128, u128, u64, u64)>, ) -> Result { // If table is empty return 0 ID if jobs_table.len()? == 0 { @@ -138,46 +134,84 @@ impl JobRecord { } } -const RANGES: TableDefinition = TableDefinition::new("ranges"); +const RANGES: TableDefinition = TableDefinition::new("ranges"); const RANGES_TO_USE: TableDefinition = TableDefinition::new("ranges_to_use"); + +// to not owerflow u128 we calculate it in range 0..max and then substruct +// const SUB_DIV: u128 = 0x1000100010001000100010001; +// TWEAKS_PER_RANGE: u128 = 5192296858534827628530496329220096 +const TWEAKS_PER_RANGE: u128 = (u128::MAX / (u16::MAX as u128 + 1)) + 1; +const TWEAKS_PER_RANGE_F64: f64 = TWEAKS_PER_RANGE as f64; +//assert!(TWEAKS_PER_RANGE == 5192296858534827628530496329220096u128); + +const TWEAKS_PER_RANGE_ID_VICE: u128 = TWEAKS_PER_RANGE - 1; +//const HALF_TWEAKS_PER_RANGE: u128 = TWEAKS_PER_RANGE / 2; + #[derive(Copy, Clone, Debug)] struct RangeRecord { id: u16, - tweak_head: u128, - tweak_tail: u128, - head_progress: u128, - tail_progress: u128, - head_in_progress: u128, - tail_in_progress: u128, + + tweak_current: u128, + tweak_end: u128, + key_progress: u128, + key_committed_progress: u128 } impl RangeRecord { - pub fn from_value(k: u16, v: &(u128, u128, u128, u128, u128, u128)) -> Self { + + fn from_value(k: u16, v: &(u128, u128, u128, u128)) -> Self { Self { id: k, - tweak_head: v.0, - tweak_tail: v.1, - head_progress: v.2, - tail_progress: v.3, - head_in_progress: v.4, - tail_in_progress: v.5, + tweak_current: v.0, + tweak_end: v.1, + key_progress: v.2, + key_committed_progress: v.3, } } - pub fn create_job(&mut self, job_id: u64, pref_job_size: u64, on_tail: bool) -> JobRecord { - const MAX_U64_AS_U128: u128 = u64::MAX as u128; + fn new(id: u16) -> Self { - let (tweak_key, start_key, use_tail, job_len) = if on_tail == true && self.tail_in_progress < u128::MAX { - let start_key = self.tail_in_progress; - let job_len = pref_job_size.min(MAX_U64_AS_U128.min(u128::MAX - start_key) as u64); - self.tail_in_progress += job_len as u128; - (self.tweak_tail, start_key, true, job_len) - } else { - let start_key = self.head_in_progress; - let job_len = pref_job_size.min(MAX_U64_AS_U128.min(u128::MAX - start_key) as u64); - self.head_in_progress += job_len as u128; - (self.tweak_head, start_key, false, job_len) - }; + let tweak_start: u128 = id as u128 * TWEAKS_PER_RANGE; + Self { + id: id, + tweak_current: tweak_start, + tweak_end: tweak_start.strict_add(TWEAKS_PER_RANGE_ID_VICE) , + key_progress: 0, + key_committed_progress: 0 + } + } + + fn tweaks_left(&self) -> u128 { + let t_left = (self.tweak_end - self.tweak_current) + 1; + if t_left == 1 && self.key_committed_progress == u128::MAX { + 0u128 + } + else { + t_left + } + } + + fn completed_ratio(&self) -> f64 { + let t_left = self.tweak_end - self.tweak_current; + if t_left == 0 && self.key_committed_progress == u128::MAX { + 1.0f64 + } + else { + 1.0f64 - ((t_left as f64 + 1.0)/ TWEAKS_PER_RANGE_F64) + } + } + + fn create_job(&mut self, job_id: u64, pref_job_size: u64) -> JobRecord { + const MAX_U64_AS_U128: u128 = u64::MAX as u128; + let start_key = self.key_progress + 1; // Key progrees point to the LAST key in previos job + + // using here self.key_progress enshures proper job len + // in client we just need increment start_key this many times. + let job_len = pref_job_size.min(MAX_U64_AS_U128.min(u128::MAX - self.key_progress) as u64); + + // update range progress + // New key_progress point to end of current job + self.key_progress += job_len as u128; if job_len == 0 { panic!("Job len should not ended as 0. We should never call create_job on busy range!"); @@ -186,35 +220,54 @@ impl RangeRecord { JobRecord { id: job_id, range_id: self.id, - tweak_key: tweak_key, + tweak_key: self.tweak_current, start_key: start_key, - on_range_tail: use_tail, len: job_len, start_time: get_timestump(), } } - pub fn value(&self) -> (u128, u128, u128, u128, u128, u128) { - ( - self.tweak_head, - self.tweak_tail, - self.head_progress, - self.tail_progress, - self.head_in_progress, - self.tail_in_progress, - ) + // return true if after commiting range is active for new jobs + fn commit_work(&mut self, work_len: u64) -> bool { + self.key_committed_progress = self.key_committed_progress.strict_add(work_len as u128); + + // if we finished one tweak bruteforcing. + if self.key_committed_progress == u128::MAX { + // Check if range is finished + if self.tweak_current == self.tweak_end { + false + } + else { + // Increment key + self.key_progress = 0; + self.key_committed_progress = 0; + self.tweak_current += 1; + true + } + } + else if self.key_progress == u128::MAX { + // range is waiting for jobs to finish + false + } + else { + true + } + } + + fn value(&self) -> (u128, u128, u128, u128) { + (self.tweak_current, self.tweak_end, self.key_progress, self.key_committed_progress) } } #[derive(Debug)] struct RangesTable<'a> { - ranges: redb::Table<'a, u16, (u128, u128, u128, u128, u128, u128)>, + ranges: redb::Table<'a, u16, (u128, u128, u128, u128)>, ranges_to_use: redb::Table<'a, u16, ()>, } impl<'a> RangesTable<'a> { - pub fn new( - ranges: redb::Table<'a, u16, (u128, u128, u128, u128, u128, u128)>, + fn new( + ranges: redb::Table<'a, u16, (u128, u128, u128, u128)>, ranges_to_use: redb::Table<'a, u16, ()>, ) -> Self { RangesTable { @@ -223,40 +276,72 @@ impl<'a> RangesTable<'a> { } } - pub fn open_rw(trx: &'a redb::WriteTransaction) -> Result { + fn open_rw(trx: &'a redb::WriteTransaction) -> Result { Ok(RangesTable { ranges: trx.open_table(RANGES)?, ranges_to_use: trx.open_table(RANGES_TO_USE)?, }) } - pub fn get_range(&self, k: u16) -> Result, redb::Error> { + fn get_range(&self, k: u16) -> Result, redb::Error> { Ok(self .ranges .get(k)? .and_then(|ag_v| Some(RangeRecord::from_value(k, &ag_v.value())))) } - pub fn get_useful_range(&self) -> Result, redb::Error> { - let mut res: Vec = Vec::with_capacity(self.ranges_to_use.len()? as usize); + fn get_useful_ranges(&self) -> Result, redb::Error> { + let mut u_ranges: Vec = Vec::with_capacity(self.ranges_to_use.len()? as usize); for kres in self.ranges_to_use.iter()? { - res.push( + u_ranges.push( self.get_range(kres?.0.value())? .expect("Randge id from ranges_to_use should allways be present in ranges table"), ); } - Ok(res) + Ok(u_ranges) } - pub fn update_range(&mut self, range: &RangeRecord) -> Result<(), redb::Error> { + fn update_range(&mut self, range: &RangeRecord) -> Result<(), redb::Error> { self.ranges.insert(range.id, range.value())?; - // if range head and tail reach max values - remove it from to_use table - if range.head_in_progress == u128::MAX && range.tail_in_progress == u128::MAX { + // if range key in progress reach max values - remove it from to_use table + if range.key_progress == u128::MAX { self.ranges_to_use.remove(range.id)?; } Ok(()) } + + fn commit_job(&mut self, job: &JobRecord) -> Result<(), redb::Error> { + + let mut range_rec = self.get_range(job.range_id)? + .expect("Randge id is not in ranges_table!"); + + let is_active = range_rec.commit_work(job.len); + + // update range in db + self.ranges.insert(range_rec.id, range_rec.value())?; + + if is_active { + self.ranges_to_use.insert(range_rec.id, ())?; + } + else { + self.ranges_to_use.remove(range_rec.id)?; + } + + Ok(()) + } + + fn compute_progress(&self) -> Result { + let mut mean_prog: f64 = 0.0; + for range_res in self.ranges.iter()? { + let ag = range_res?; + let range = RangeRecord::from_value(ag.0.value(), &ag.1.value()); + mean_prog += range.completed_ratio(); + } + + Ok(mean_prog / 65536.0f64) + } + } fn db_create_job(db: Database, pref_job_size: u64) -> Result, redb::Error> { @@ -291,7 +376,7 @@ fn db_create_job(db: Database, pref_job_size: u64) -> Result, // 3 Now get available range let mut ranges_table = RangesTable::open_rw(&transct)?; - let ranges_to_choose = ranges_table.get_useful_range()?; + let ranges_to_choose = ranges_table.get_useful_ranges()?; if ranges_to_choose.len() == 0 { println!("No ranges to choose from!! This can happed if work is sone"); return Ok(None); @@ -301,7 +386,7 @@ fn db_create_job(db: Database, pref_job_size: u64) -> Result, let mut range_to_use = ranges_to_choose[rand::rng().random_range(0..ranges_to_choose.len())]; // if we found range - proceed. If None - means we already done. - let new_job = range_to_use.create_job(job_id_to_use, pref_job_size, rand::rng().random_bool(0.5)); + let new_job = range_to_use.create_job(job_id_to_use, pref_job_size); jobs_table.insert(new_job.id, new_job.get_value())?; ranges_table.update_range(&range_to_use)?; @@ -323,18 +408,18 @@ fn db_create_job(db: Database, pref_job_size: u64) -> Result, // ********************************************** } -//fn commit_work() +// returns false if no such job found in DB fn db_commit_job(db: Database, job_id: u64) -> Result { let transct: redb::WriteTransaction = db.begin_write()?; - { + let job_found = { let mut jobs_table = transct.open_table(JOBS_TABLE)?; // 1. get job record let maybe_job_rec = jobs_table .get(job_id)? - .and_then(|ag| Some(JobRecord::from_data(job_id, ag.value()))); + .and_then(|ag| Some(JobRecord::from_data(job_id, &ag.value()))); if let Some(job_rec) = maybe_job_rec { // 2. Remove job from jobs_table and add job_id to jobs_free_ids table @@ -344,59 +429,35 @@ fn db_commit_job(db: Database, job_id: u64) -> Result { let mut jobs_free_ids = transct.open_table(JOBS_FREE_IDS)?; jobs_free_ids.insert(job_id, ())?; - let mut range_info = transct.open_table(RANGE_INFO)?; - let job_range = RangeRecord::from_data( - job_rec.tweak_key, - range_info - .get(job_rec.tweak_key)? - .expect("Range to which Job reffering musbe present!") - .value(), - ); - + let mut ranges_table = RangesTable::open_rw(&transct)?; // commit job to range. Panik if overflow. - job_range.committed.checked_add_assign(&(job_rec.len as u128)).unwrap(); - - range_info.insert(job_rec.tweak_key, job_range.get_value())?; - - // check if Job is finished. - if job_range.committed == u128::MAX { - let mut range_status = transct.open_multimap_table(RANGE_STATUS)?; - range_status.insert(true, job_range.start_tweak)?; - } + ranges_table.commit_job(&job_rec)?; + true } + else { + false + } + }; + transct.commit()?; - let maybe_job_rec = match jobs_table.get(job_id)? { - Some(ag) => JobRecord::from_data(job_id, ag.value()), - None => None, - }; - } - - return Ok(false); // Job already done by someone else + return Ok(job_found); // Job already done by someone else } -// fn test_db() -> Result<(), Error> { -// let db = Database::create(DB_FILE)?; -// let write_txn = db.begin_write()?; -// { -// let mut table = write_txn.open_table(RANGE_TABLE)?; -// table.insert([0,1,2,3,0,124,6,7], (42u128, 42u128, false))?; +fn db_get_progress(db: Database) -> Result { + let trx: redb::ReadTransaction = db.begin_read()?; + let progress = { + let ranges_t = trx.open_table(RANGES)?; + let mut mean_prog: f64 = 0.0; + for range_res in ranges_t.iter()? { + let ag = range_res?; + let range = RangeRecord::from_value(ag.0.value(), &ag.1.value()); + mean_prog += range.completed_ratio(); + } -// } -// write_txn.commit()?; + mean_prog / 65536.0f64 + }; + trx.close()?; + return Ok(progress); +} -// let read_txn = db.begin_read()?; -// let table = read_txn.open_table(RANGE_TABLE)?; -// // println!("{}", table.get([0,1,2,3,3,5,6,7])?.unwrap().value()); -// let a = table.get([0,1,2,1,4,5,6,7]); -// match a { -// Ok(data) => match data { -// Some(d) => println!("some data {}", d.value().0), -// None => println!("None") -// } -// Err(_) => println!("Error") -// } -// assert_eq!(table.get([0,1,2,3,3,5,6,7])?.unwrap().value().0, 42u128); - -// Ok(()) -// }