From 54fe0634d7ea8315a7db0f2726da8ccae6bc5524 Mon Sep 17 00:00:00 2001 From: Kirill Shakirov <38155247+Nyanraltotlapun@users.noreply.github.com> Date: Fri, 30 Jan 2026 02:28:28 +0100 Subject: [PATCH] Revork of Db logic again =^0_o^=. --- nyash_server/rustfmt.toml | 2 + nyash_server/src/database.rs | 564 +++++++++++++++++------------------ 2 files changed, 273 insertions(+), 293 deletions(-) create mode 100644 nyash_server/rustfmt.toml diff --git a/nyash_server/rustfmt.toml b/nyash_server/rustfmt.toml new file mode 100644 index 0000000..073de4a --- /dev/null +++ b/nyash_server/rustfmt.toml @@ -0,0 +1,2 @@ + +max_width = 120 diff --git a/nyash_server/src/database.rs b/nyash_server/src/database.rs index ab54824..2b270ce 100644 --- a/nyash_server/src/database.rs +++ b/nyash_server/src/database.rs @@ -1,386 +1,313 @@ -use std::{u64, u128}; - use crate::num_utils; -use rand::{self, Rng}; +use rand::{self, Rng, distr::weighted::Weight}; use redb::{ - Database, MultimapTableDefinition, ReadableMultimapTable, ReadableTable, ReadableTableMetadata, + Database, Key, MultimapTableDefinition, ReadableMultimapTable, ReadableTable, ReadableTableMetadata, TableDefinition, }; +// use std::collections::HashMap; +use std::{ops::Not, u64, u128}; //tables defenition const DB_FILE: &str = "./data"; -const RANGE_INFO: TableDefinition = TableDefinition::new("range_info"); -const RANGE_STATUS: MultimapTableDefinition = - MultimapTableDefinition::new("range_status"); +const COMPLETED_RANGES: TableDefinition = TableDefinition::new("completed_ranges"); -const JOBS_TABLE: TableDefinition = TableDefinition::new("jobs_state"); +// const RANGE_STATUS: MultimapTableDefinition = +// MultimapTableDefinition::new("range_status"); + +const JOBS_TABLE: TableDefinition = TableDefinition::new("jobs"); const JOBS_FREE_IDS: TableDefinition = TableDefinition::new("jobs_free_ids"); -// -- main_range -- -// (start_tweak_key) (end_tweak) (committed) (progress) -// (u128) (u128, u128, u128) -// (u32,u32,u32,u32,u32,u32,u32,u32) - -// -- jobs_state -- -//(job_id) (tweak_key, start_key, len, start_time) -// u64 (u128, u128, u64, u64) - // get time is seconds fn get_timestump() -> u64 { use std::time::{SystemTime, UNIX_EPOCH}; - SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_secs() + SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs() } #[derive(Copy, Clone, Debug)] struct JobRecord { - job_id: u64, + id: u64, + range_id: u16, tweak_key: u128, start_key: u128, + on_range_tail: bool, len: u64, start_time: u64, } impl JobRecord { - pub fn from_data(rk: u64, rv: (u128, u128, u64, u64)) -> Self { + pub fn from_data(k: u64, v: &(u16, u128, u128, bool, u64, u64)) -> Self { Self { - job_id: rk, - tweak_key: rv.0, - start_key: rv.1, - len: rv.2, - start_time: rv.3, + id: k, + range_id: v.0, + tweak_key: v.1, + start_key: v.2, + on_range_tail: v.3, + len: v.4, + start_time: v.5, } } - pub fn from_acces_guard( - ag: ( - redb::AccessGuard<'_, u64>, - redb::AccessGuard<'_, (u128, u128, u64, u64)>, - ), - ) -> Self { - Self::from_data(ag.0.value(), ag.1.value()) - } - - pub fn from_option( - op: Option<( - redb::AccessGuard<'_, u64>, - redb::AccessGuard<'_, (u128, u128, u64, u64)>, - )>, - ) -> Option { - match op { - Some(ag) => Some(Self::from_acces_guard(ag)), - None => None, - } - } - - pub fn from_result( - res: Result< + pub fn from_iter_item( + item: Result< ( redb::AccessGuard<'_, u64>, - redb::AccessGuard<'_, (u128, u128, u64, u64)>, + redb::AccessGuard<'_, (u16, u128, u128, bool, u64, u64)>, ), redb::StorageError, >, - ) -> Result { - let ag_res = res?; - Ok(Self::from_data(ag_res.0.value(), ag_res.1.value())) + ) -> Result { + let ag = item?; + Ok(Self::from_data(ag.0.value(), &ag.1.value())) } - pub fn get_value(&self) -> (u128, u128, u64, u64) { - (self.tweak_key, self.start_key, self.len, self.start_time) + pub fn get_value(&self) -> (u16, u128, u128, bool, u64, u64) { + return ( + self.range_id, + self.tweak_key, + self.start_key, + self.on_range_tail, + self.len, + self.start_time, + ); + } + + // search for abandoned job + // returns first job older than 3 hours + pub fn get_staled_job( + jobs_table: &mut redb::Table<'_, u64, (u16, u128, u128, bool, u64, u64)>, + timeout: u64, + ) -> Result, redb::Error> { + let current_time = get_timestump(); + let mut maybe_job: Option = None; + for job_res in jobs_table.iter()? { + let job_rec = JobRecord::from_iter_item(job_res)?; + // if job is older that timeout in seconds + if (current_time - job_rec.start_time) > timeout { + maybe_job = Some(job_rec); + break; + } + } + if let Some(mut job) = maybe_job { + job.start_time = get_timestump(); + jobs_table.insert(job.id, job.get_value())?; + return Ok(Some(job)); + } + + Ok(None) + } + + // 1. Check if there is free ids to reuse + // If there is, returns first free ID + // and deletes it from jobs_free_ids_table + fn pop_job_free_id(free_ids_t: &mut redb::Table<'_, u64, ()>) -> Result, redb::Error> { + // 1. Check if there is free ids to reuse + Ok(free_ids_t.pop_first()?.and_then(|ag| Some(ag.0.value()))) + } + + // calling this if there was no free ids. + // What we want is to check max and min ids + // and return id lower than min or grater than max + fn get_new_job_id( + jobs_table: &redb::Table<'_, u64, (u16, u128, u128, bool, u64, u64)>, + ) -> Result { + // If table is empty return 0 ID + if jobs_table.len()? == 0 { + return Ok(0); + } + + let first_id = jobs_table + .first()? + .expect("Because we checked for empty table, we should never get None here") + .0 + .value(); + if first_id > 0 { + return Ok(first_id.strict_sub(1u64)); + } + + let last_id = jobs_table + .last()? + .expect("Because we checked for empty table, we should never get None here") + .0 + .value(); + if last_id < u64::MAX { + return Ok(last_id.strict_add(1u64)); + } + + panic!("Key value should never reach u64 max value"); } } +const RANGES: TableDefinition = TableDefinition::new("ranges"); +const RANGES_TO_USE: TableDefinition = TableDefinition::new("ranges_to_use"); #[derive(Copy, Clone, Debug)] struct RangeRecord { - start_tweak: u128, - end_tweak: u128, - committed: u128, - in_progress: u128, + id: u16, + tweak_head: u128, + tweak_tail: u128, + head_progress: u128, + tail_progress: u128, + head_in_progress: u128, + tail_in_progress: u128, } impl RangeRecord { - pub fn create_job(&mut self, job_id: u64, job_len: u64) -> JobRecord { - let job_rec = JobRecord { - job_id: job_id, - tweak_key: self.end_tweak, - start_key: self.in_progress, - len: job_len, - start_time: get_timestump(), + pub fn from_value(k: u16, v: &(u128, u128, u128, u128, u128, u128)) -> Self { + Self { + id: k, + tweak_head: v.0, + tweak_tail: v.1, + head_progress: v.2, + tail_progress: v.3, + head_in_progress: v.4, + tail_in_progress: v.5, + } + } + + pub fn create_job(&mut self, job_id: u64, pref_job_size: u64, on_tail: bool) -> JobRecord { + const MAX_U64_AS_U128: u128 = u64::MAX as u128; + + let (tweak_key, start_key, use_tail, job_len) = if on_tail == true && self.tail_in_progress < u128::MAX { + let start_key = self.tail_in_progress; + let job_len = pref_job_size.min(MAX_U64_AS_U128.min(u128::MAX - start_key) as u64); + self.tail_in_progress += job_len as u128; + (self.tweak_tail, start_key, true, job_len) + } else { + let start_key = self.head_in_progress; + let job_len = pref_job_size.min(MAX_U64_AS_U128.min(u128::MAX - start_key) as u64); + self.head_in_progress += job_len as u128; + (self.tweak_head, start_key, false, job_len) }; - self.in_progress = self.in_progress.strict_add(job_len as u128); // panic if owerflow - return job_rec; - } + if job_len == 0 { + panic!("Job len should not ended as 0. We should never call create_job on busy range!"); + } - pub fn new(start_tweak: u128) -> Self { - Self { - start_tweak: start_tweak, - end_tweak: start_tweak, - committed: 0, - in_progress: 0, + JobRecord { + id: job_id, + range_id: self.id, + tweak_key: tweak_key, + start_key: start_key, + on_range_tail: use_tail, + len: job_len, + start_time: get_timestump(), } } - pub fn from_data(rk: u128, rv: (u128, u128, u128)) -> Self { - Self { - start_tweak: rk, - end_tweak: rv.0, - committed: rv.1, - in_progress: rv.2, - } + pub fn value(&self) -> (u128, u128, u128, u128, u128, u128) { + ( + self.tweak_head, + self.tweak_tail, + self.head_progress, + self.tail_progress, + self.head_in_progress, + self.tail_in_progress, + ) } +} - pub fn from_acces_guard( - ag: ( - redb::AccessGuard<'_, u128>, - redb::AccessGuard<'_, (u128, u128, u128)>, - ), +#[derive(Debug)] +struct RangesTable<'a> { + ranges: redb::Table<'a, u16, (u128, u128, u128, u128, u128, u128)>, + ranges_to_use: redb::Table<'a, u16, ()>, +} + +impl<'a> RangesTable<'a> { + pub fn new( + ranges: redb::Table<'a, u16, (u128, u128, u128, u128, u128, u128)>, + ranges_to_use: redb::Table<'a, u16, ()>, ) -> Self { - Self::from_data(ag.0.value(), ag.1.value()) - } - - pub fn from_option( - op: Option<( - redb::AccessGuard<'_, u128>, - redb::AccessGuard<'_, (u128, u128, u128)>, - )>, - ) -> Option { - match op { - Some(ag) => Some(Self::from_acces_guard(ag)), - None => None, + RangesTable { + ranges: ranges, + ranges_to_use: ranges_to_use, } } - pub fn from_result( - res: Result< - ( - redb::AccessGuard<'_, u128>, - redb::AccessGuard<'_, (u128, u128, u128)>, - ), - redb::StorageError, - >, - ) -> Result { - let ag_res = res?; - Ok(Self::from_data(ag_res.0.value(), ag_res.1.value())) + pub fn open_rw(trx: &'a redb::WriteTransaction) -> Result { + Ok(RangesTable { + ranges: trx.open_table(RANGES)?, + ranges_to_use: trx.open_table(RANGES_TO_USE)?, + }) } - pub fn get_value(&self) -> (u128, u128, u128) { - (self.end_tweak, self.committed, self.in_progress) + pub fn get_range(&self, k: u16) -> Result, redb::Error> { + Ok(self + .ranges + .get(k)? + .and_then(|ag_v| Some(RangeRecord::from_value(k, &ag_v.value())))) } - pub fn increase_progress(self, job_len: u64) -> Self { - let (res, carry) = self.in_progress.carrying_add(job_len as u128, false); - if carry == true { - panic!("Increase progress resulted in u128 integer overflow, this should not happen!"); + pub fn get_useful_range(&self) -> Result, redb::Error> { + let mut res: Vec = Vec::with_capacity(self.ranges_to_use.len()? as usize); + for kres in self.ranges_to_use.iter()? { + res.push( + self.get_range(kres?.0.value())? + .expect("Randge id from ranges_to_use should allways be present in ranges table"), + ); } - Self { - in_progress: res, - ..self + + Ok(res) + } + + pub fn update_range(&mut self, range: &RangeRecord) -> Result<(), redb::Error> { + self.ranges.insert(range.id, range.value())?; + // if range head and tail reach max values - remove it from to_use table + if range.head_in_progress == u128::MAX && range.tail_in_progress == u128::MAX { + self.ranges_to_use.remove(range.id)?; } + Ok(()) } } -fn get_range( - range_info: &mut redb::Table<'_, u128, (u128, u128, u128)>, - range_status: &mut redb::MultimapTable<'_, bool, u128>, -) -> Result, redb::Error> { - // 1. Go tru active ranges and check if their in_progress less than u128::MAX - let mut ranges_list: Vec = Vec::new(); - for wrap_id in range_status.get(false)? { - let key = wrap_id?.value(); - let val = range_info - .get(key)? - .expect("range_table shuld contain key from range_active table") - .value(); - - let range = RangeRecord::from_data(key, val); - - if range.in_progress < u128::MAX { - ranges_list.push(range); - } - } - - // If found active ranges to use, choose one randomly from them - if ranges_list.len() > 0 { - let r_idx: usize = rand::rng().random_range(0..ranges_list.len()); - return Ok(Some(ranges_list[r_idx])); - } - - // Othervice, we need to create new range - // If no ranges in table at all, we creating first one - if range_info.len()? == 0 { - let start_tweak = u128::MAX / 2; - let new_range = RangeRecord::new(start_tweak); - range_info.insert(start_tweak, new_range.get_value())?; - range_status.insert(false, start_tweak)?; - return Ok(Some(new_range)); - } - - // - // if some records already exist, but no that we can use, - // we should add new range - let first_rec = - RangeRecord::from_option(range_info.first()?).expect("Table should not be empty"); - let last_rec = RangeRecord::from_option(range_info.last()?).expect("Table should not be empty"); - - let tweak_max = last_rec.end_tweak; - let tweak_min = first_rec.start_tweak; - - // check if we are done! - if tweak_max == u128::MAX && tweak_min == 0 { - return Ok(None); - } - - // randomly choosing to add new range to low or high part of the range - let start_tweak = if rand::rng().random_bool(0.5) { - // new range on upper - if tweak_max < u128::MAX { - tweak_max + 1 - } else { - tweak_min - 1 - } - } else { - // new range on lower - if tweak_min > 0 { - tweak_min - 1 - } else { - tweak_max + 1 - } - }; - - let new_range = RangeRecord::new(start_tweak); - range_info.insert(start_tweak, new_range.get_value())?; - range_status.insert(false, start_tweak)?; - return Ok(Some(new_range)); -} - -// search for abandoned job -// returns first job older than 3 hours -fn find_abandoned_job( - jobs_table: &redb::Table<'_, u64, (u128, u128, u64, u64)>, -) -> Result, redb::Error> { - let current_time = get_timestump(); - for job_res in jobs_table.iter()? { - let job_rec = JobRecord::from_result(job_res)?; - // if job is older that 3 hours - if (current_time - job_rec.start_time) > 10800 { - return Ok(Some(job_rec)); - } - } - Ok(None) -} - -// 1. Check if there is free ids to reuse -// If there is, returns first free ID -// and deletes it from jobs_free_ids_table -fn get_job_free_id( - jobs_free_ids_table: &mut redb::Table<'_, u64, ()>, -) -> Result, redb::Error> { - // 1. Check if there is free ids to reuse - if jobs_free_ids_table.len()? > 0 { - match jobs_free_ids_table.pop_first()? { - Some(ag) => Ok(Some(ag.0.value())), - None => Ok(None), - } - } else { - Ok(None) - } -} - -// calling this if there was no free ids. -// What we want is to check max and min ids -// and return id lower than min or grater than max -fn get_new_job_id( - jobs_table: &redb::Table<'_, u64, (u128, u128, u64, u64)>, -) -> Result { - // If table is empty return 0 ID - if jobs_table.len()? == 0 { - return Ok(0); - } - - // if min key greater than 0 - if let Some(ag) = jobs_table.first()? { - let min_kv = ag.0.value(); - if min_kv > 0 { - return Ok(min_kv.strict_sub(1u64)); - } - } else { - panic!("Because we checked for empty table, we should never get None here"); - } - - // now try to get new key from maximum key value - if let Some(ag) = jobs_table.last()? { - let max_kv = ag.0.value(); - if max_kv < u64::MAX { - return Ok(max_kv.strict_add(1u64)); - } else { - panic!("Key value should never reach u64 max value"); - } - } else { - panic!("Because we checked for empty table, we should never get None here"); - } -} - -const MAX_U64_AS_U128: u128 = u64::MAX as u128; -fn db_get_job(db: Database, pref_job_size: u64) -> Result, redb::Error> { +fn db_create_job(db: Database, pref_job_size: u64) -> Result, redb::Error> { let transct = db.begin_write()?; // 1. look for abandonent jobs let new_job_opt = { let mut jobs_table = transct.open_table(JOBS_TABLE)?; - - if let Some(mut abandoned_job) = find_abandoned_job(&jobs_table)? { - // update job start time - abandoned_job.start_time = get_timestump(); - jobs_table.insert(abandoned_job.job_id, abandoned_job.get_value())?; - Some(abandoned_job) - } else { - None - } + JobRecord::get_staled_job(&mut jobs_table, 1200)? }; if new_job_opt.is_some() { transct.commit()?; return Ok(new_job_opt); - } + }; // 2. look for free ids to reuse or generate new id let new_job_opt = { - let mut range_info = transct.open_table(RANGE_INFO)?; - let mut range_status = transct.open_multimap_table(RANGE_STATUS)?; + // let mut range_info = transct.open_table(RANGES)?; + // let mut range_to_use = transct.open_table(RANGES_TO_USE)?; let mut jobs_table = transct.open_table(JOBS_TABLE)?; let mut jobs_free_ids = transct.open_table(JOBS_FREE_IDS)?; // look for free ids to reuse - let job_id_to_use = match get_job_free_id(&mut jobs_free_ids)? { + let job_id_to_use = match JobRecord::pop_job_free_id(&mut jobs_free_ids)? { Some(v) => v, None => { // need to generate ID - get_new_job_id(&jobs_table)? + JobRecord::get_new_job_id(&jobs_table)? } }; // 3 Now get available range - let range_opt = get_range(&mut range_info, &mut range_status)?; - // if we found range - proceed. If None - means we already done. - match range_opt { - Some(mut range) => { - let job_len = - pref_job_size.min(MAX_U64_AS_U128.min(u128::MAX - range.in_progress) as u64); - if job_len == 0 { - panic!("Job len should not ended as 0"); - } - let new_job = range.create_job(job_id_to_use, job_len); - jobs_table.insert(new_job.job_id, new_job.get_value())?; - range_info.insert(range.start_tweak, range.get_value())?; - Some(new_job) - } - None => None, // We already done. All key space is processed. + let mut ranges_table = RangesTable::open_rw(&transct)?; + let ranges_to_choose = ranges_table.get_useful_range()?; + if ranges_to_choose.len() == 0 { + println!("No ranges to choose from!! This can happed if work is sone"); + return Ok(None); } + + // randomly choosing range from avaylable ranges + let mut range_to_use = ranges_to_choose[rand::rng().random_range(0..ranges_to_choose.len())]; + + // if we found range - proceed. If None - means we already done. + let new_job = range_to_use.create_job(job_id_to_use, pref_job_size, rand::rng().random_bool(0.5)); + + jobs_table.insert(new_job.id, new_job.get_value())?; + ranges_table.update_range(&range_to_use)?; + + // finally! + Some(new_job) }; if new_job_opt.is_some() { @@ -396,6 +323,57 @@ fn db_get_job(db: Database, pref_job_size: u64) -> Result, red // ********************************************** } +//fn commit_work() + +fn db_commit_job(db: Database, job_id: u64) -> Result { + let transct: redb::WriteTransaction = db.begin_write()?; + + { + let mut jobs_table = transct.open_table(JOBS_TABLE)?; + + // 1. get job record + let maybe_job_rec = jobs_table + .get(job_id)? + .and_then(|ag| Some(JobRecord::from_data(job_id, ag.value()))); + + if let Some(job_rec) = maybe_job_rec { + // 2. Remove job from jobs_table and add job_id to jobs_free_ids table + jobs_table.remove(job_id)?; + + // add job_id to jobs_free_ids table + let mut jobs_free_ids = transct.open_table(JOBS_FREE_IDS)?; + jobs_free_ids.insert(job_id, ())?; + + let mut range_info = transct.open_table(RANGE_INFO)?; + let job_range = RangeRecord::from_data( + job_rec.tweak_key, + range_info + .get(job_rec.tweak_key)? + .expect("Range to which Job reffering musbe present!") + .value(), + ); + + // commit job to range. Panik if overflow. + job_range.committed.checked_add_assign(&(job_rec.len as u128)).unwrap(); + + range_info.insert(job_rec.tweak_key, job_range.get_value())?; + + // check if Job is finished. + if job_range.committed == u128::MAX { + let mut range_status = transct.open_multimap_table(RANGE_STATUS)?; + range_status.insert(true, job_range.start_tweak)?; + } + } + + let maybe_job_rec = match jobs_table.get(job_id)? { + Some(ag) => JobRecord::from_data(job_id, ag.value()), + None => None, + }; + } + + return Ok(false); // Job already done by someone else +} + // fn test_db() -> Result<(), Error> { // let db = Database::create(DB_FILE)?;