Lots of work on database part.
This commit is contained in:
+174
-113
@@ -1,8 +1,7 @@
|
|||||||
use crate::num_utils;
|
use crate::num_utils;
|
||||||
use rand::{self, Rng, distr::weighted::Weight};
|
use rand::{self, Rng, distr::weighted::Weight};
|
||||||
use redb::{
|
use redb::{
|
||||||
Database, Key, MultimapTableDefinition, ReadableMultimapTable, ReadableTable, ReadableTableMetadata,
|
Database, Key, MultimapTableDefinition, ReadableDatabase, ReadableMultimapTable, ReadableTable, ReadableTableMetadata, TableDefinition
|
||||||
TableDefinition,
|
|
||||||
};
|
};
|
||||||
// use std::collections::HashMap;
|
// use std::collections::HashMap;
|
||||||
use std::{ops::Not, u64, u128};
|
use std::{ops::Not, u64, u128};
|
||||||
@@ -15,7 +14,7 @@ const COMPLETED_RANGES: TableDefinition<u128, u128> = TableDefinition::new("comp
|
|||||||
// const RANGE_STATUS: MultimapTableDefinition<bool, u128> =
|
// const RANGE_STATUS: MultimapTableDefinition<bool, u128> =
|
||||||
// MultimapTableDefinition::new("range_status");
|
// MultimapTableDefinition::new("range_status");
|
||||||
|
|
||||||
const JOBS_TABLE: TableDefinition<u64, (u16, u128, u128, bool, u64, u64)> = TableDefinition::new("jobs");
|
const JOBS_TABLE: TableDefinition<u64, (u16, u128, u128, u64, u64)> = TableDefinition::new("jobs");
|
||||||
const JOBS_FREE_IDS: TableDefinition<u64, ()> = TableDefinition::new("jobs_free_ids");
|
const JOBS_FREE_IDS: TableDefinition<u64, ()> = TableDefinition::new("jobs_free_ids");
|
||||||
|
|
||||||
// get time is seconds
|
// get time is seconds
|
||||||
@@ -30,21 +29,19 @@ struct JobRecord {
|
|||||||
range_id: u16,
|
range_id: u16,
|
||||||
tweak_key: u128,
|
tweak_key: u128,
|
||||||
start_key: u128,
|
start_key: u128,
|
||||||
on_range_tail: bool,
|
|
||||||
len: u64,
|
len: u64,
|
||||||
start_time: u64,
|
start_time: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl JobRecord {
|
impl JobRecord {
|
||||||
pub fn from_data(k: u64, v: &(u16, u128, u128, bool, u64, u64)) -> Self {
|
pub fn from_data(k: u64, v: &(u16, u128, u128, u64, u64)) -> Self {
|
||||||
Self {
|
Self {
|
||||||
id: k,
|
id: k,
|
||||||
range_id: v.0,
|
range_id: v.0,
|
||||||
tweak_key: v.1,
|
tweak_key: v.1,
|
||||||
start_key: v.2,
|
start_key: v.2,
|
||||||
on_range_tail: v.3,
|
len: v.3,
|
||||||
len: v.4,
|
start_time: v.4,
|
||||||
start_time: v.5,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -52,7 +49,7 @@ impl JobRecord {
|
|||||||
item: Result<
|
item: Result<
|
||||||
(
|
(
|
||||||
redb::AccessGuard<'_, u64>,
|
redb::AccessGuard<'_, u64>,
|
||||||
redb::AccessGuard<'_, (u16, u128, u128, bool, u64, u64)>,
|
redb::AccessGuard<'_, (u16, u128, u128, u64, u64)>,
|
||||||
),
|
),
|
||||||
redb::StorageError,
|
redb::StorageError,
|
||||||
>,
|
>,
|
||||||
@@ -61,12 +58,11 @@ impl JobRecord {
|
|||||||
Ok(Self::from_data(ag.0.value(), &ag.1.value()))
|
Ok(Self::from_data(ag.0.value(), &ag.1.value()))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_value(&self) -> (u16, u128, u128, bool, u64, u64) {
|
pub fn get_value(&self) -> (u16, u128, u128, u64, u64) {
|
||||||
return (
|
return (
|
||||||
self.range_id,
|
self.range_id,
|
||||||
self.tweak_key,
|
self.tweak_key,
|
||||||
self.start_key,
|
self.start_key,
|
||||||
self.on_range_tail,
|
|
||||||
self.len,
|
self.len,
|
||||||
self.start_time,
|
self.start_time,
|
||||||
);
|
);
|
||||||
@@ -75,7 +71,7 @@ impl JobRecord {
|
|||||||
// search for abandoned job
|
// search for abandoned job
|
||||||
// returns first job older than 3 hours
|
// returns first job older than 3 hours
|
||||||
pub fn get_staled_job(
|
pub fn get_staled_job(
|
||||||
jobs_table: &mut redb::Table<'_, u64, (u16, u128, u128, bool, u64, u64)>,
|
jobs_table: &mut redb::Table<'_, u64, (u16, u128, u128, u64, u64)>,
|
||||||
timeout: u64,
|
timeout: u64,
|
||||||
) -> Result<Option<JobRecord>, redb::Error> {
|
) -> Result<Option<JobRecord>, redb::Error> {
|
||||||
let current_time = get_timestump();
|
let current_time = get_timestump();
|
||||||
@@ -109,7 +105,7 @@ impl JobRecord {
|
|||||||
// What we want is to check max and min ids
|
// What we want is to check max and min ids
|
||||||
// and return id lower than min or grater than max
|
// and return id lower than min or grater than max
|
||||||
fn get_new_job_id(
|
fn get_new_job_id(
|
||||||
jobs_table: &redb::Table<'_, u64, (u16, u128, u128, bool, u64, u64)>,
|
jobs_table: &redb::Table<'_, u64, (u16, u128, u128, u64, u64)>,
|
||||||
) -> Result<u64, redb::Error> {
|
) -> Result<u64, redb::Error> {
|
||||||
// If table is empty return 0 ID
|
// If table is empty return 0 ID
|
||||||
if jobs_table.len()? == 0 {
|
if jobs_table.len()? == 0 {
|
||||||
@@ -138,46 +134,84 @@ impl JobRecord {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const RANGES: TableDefinition<u16, (u128, u128, u128, u128, u128, u128)> = TableDefinition::new("ranges");
|
const RANGES: TableDefinition<u16, (u128, u128, u128, u128)> = TableDefinition::new("ranges");
|
||||||
const RANGES_TO_USE: TableDefinition<u16, ()> = TableDefinition::new("ranges_to_use");
|
const RANGES_TO_USE: TableDefinition<u16, ()> = TableDefinition::new("ranges_to_use");
|
||||||
|
|
||||||
|
// to not owerflow u128 we calculate it in range 0..max and then substruct
|
||||||
|
// const SUB_DIV: u128 = 0x1000100010001000100010001;
|
||||||
|
// TWEAKS_PER_RANGE: u128 = 5192296858534827628530496329220096
|
||||||
|
const TWEAKS_PER_RANGE: u128 = (u128::MAX / (u16::MAX as u128 + 1)) + 1;
|
||||||
|
const TWEAKS_PER_RANGE_F64: f64 = TWEAKS_PER_RANGE as f64;
|
||||||
|
//assert!(TWEAKS_PER_RANGE == 5192296858534827628530496329220096u128);
|
||||||
|
|
||||||
|
const TWEAKS_PER_RANGE_ID_VICE: u128 = TWEAKS_PER_RANGE - 1;
|
||||||
|
//const HALF_TWEAKS_PER_RANGE: u128 = TWEAKS_PER_RANGE / 2;
|
||||||
|
|
||||||
#[derive(Copy, Clone, Debug)]
|
#[derive(Copy, Clone, Debug)]
|
||||||
struct RangeRecord {
|
struct RangeRecord {
|
||||||
id: u16,
|
id: u16,
|
||||||
tweak_head: u128,
|
|
||||||
tweak_tail: u128,
|
tweak_current: u128,
|
||||||
head_progress: u128,
|
tweak_end: u128,
|
||||||
tail_progress: u128,
|
key_progress: u128,
|
||||||
head_in_progress: u128,
|
key_committed_progress: u128
|
||||||
tail_in_progress: u128,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RangeRecord {
|
impl RangeRecord {
|
||||||
pub fn from_value(k: u16, v: &(u128, u128, u128, u128, u128, u128)) -> Self {
|
|
||||||
|
fn from_value(k: u16, v: &(u128, u128, u128, u128)) -> Self {
|
||||||
Self {
|
Self {
|
||||||
id: k,
|
id: k,
|
||||||
tweak_head: v.0,
|
tweak_current: v.0,
|
||||||
tweak_tail: v.1,
|
tweak_end: v.1,
|
||||||
head_progress: v.2,
|
key_progress: v.2,
|
||||||
tail_progress: v.3,
|
key_committed_progress: v.3,
|
||||||
head_in_progress: v.4,
|
|
||||||
tail_in_progress: v.5,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn create_job(&mut self, job_id: u64, pref_job_size: u64, on_tail: bool) -> JobRecord {
|
fn new(id: u16) -> Self {
|
||||||
|
|
||||||
|
let tweak_start: u128 = id as u128 * TWEAKS_PER_RANGE;
|
||||||
|
Self {
|
||||||
|
id: id,
|
||||||
|
tweak_current: tweak_start,
|
||||||
|
tweak_end: tweak_start.strict_add(TWEAKS_PER_RANGE_ID_VICE) ,
|
||||||
|
key_progress: 0,
|
||||||
|
key_committed_progress: 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn tweaks_left(&self) -> u128 {
|
||||||
|
let t_left = (self.tweak_end - self.tweak_current) + 1;
|
||||||
|
if t_left == 1 && self.key_committed_progress == u128::MAX {
|
||||||
|
0u128
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
t_left
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn completed_ratio(&self) -> f64 {
|
||||||
|
let t_left = self.tweak_end - self.tweak_current;
|
||||||
|
if t_left == 0 && self.key_committed_progress == u128::MAX {
|
||||||
|
1.0f64
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
1.0f64 - ((t_left as f64 + 1.0)/ TWEAKS_PER_RANGE_F64)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn create_job(&mut self, job_id: u64, pref_job_size: u64) -> JobRecord {
|
||||||
const MAX_U64_AS_U128: u128 = u64::MAX as u128;
|
const MAX_U64_AS_U128: u128 = u64::MAX as u128;
|
||||||
|
let start_key = self.key_progress + 1; // Key progrees point to the LAST key in previos job
|
||||||
|
|
||||||
let (tweak_key, start_key, use_tail, job_len) = if on_tail == true && self.tail_in_progress < u128::MAX {
|
// using here self.key_progress enshures proper job len
|
||||||
let start_key = self.tail_in_progress;
|
// in client we just need increment start_key this many times.
|
||||||
let job_len = pref_job_size.min(MAX_U64_AS_U128.min(u128::MAX - start_key) as u64);
|
let job_len = pref_job_size.min(MAX_U64_AS_U128.min(u128::MAX - self.key_progress) as u64);
|
||||||
self.tail_in_progress += job_len as u128;
|
|
||||||
(self.tweak_tail, start_key, true, job_len)
|
// update range progress
|
||||||
} else {
|
// New key_progress point to end of current job
|
||||||
let start_key = self.head_in_progress;
|
self.key_progress += job_len as u128;
|
||||||
let job_len = pref_job_size.min(MAX_U64_AS_U128.min(u128::MAX - start_key) as u64);
|
|
||||||
self.head_in_progress += job_len as u128;
|
|
||||||
(self.tweak_head, start_key, false, job_len)
|
|
||||||
};
|
|
||||||
|
|
||||||
if job_len == 0 {
|
if job_len == 0 {
|
||||||
panic!("Job len should not ended as 0. We should never call create_job on busy range!");
|
panic!("Job len should not ended as 0. We should never call create_job on busy range!");
|
||||||
@@ -186,35 +220,54 @@ impl RangeRecord {
|
|||||||
JobRecord {
|
JobRecord {
|
||||||
id: job_id,
|
id: job_id,
|
||||||
range_id: self.id,
|
range_id: self.id,
|
||||||
tweak_key: tweak_key,
|
tweak_key: self.tweak_current,
|
||||||
start_key: start_key,
|
start_key: start_key,
|
||||||
on_range_tail: use_tail,
|
|
||||||
len: job_len,
|
len: job_len,
|
||||||
start_time: get_timestump(),
|
start_time: get_timestump(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn value(&self) -> (u128, u128, u128, u128, u128, u128) {
|
// return true if after commiting range is active for new jobs
|
||||||
(
|
fn commit_work(&mut self, work_len: u64) -> bool {
|
||||||
self.tweak_head,
|
self.key_committed_progress = self.key_committed_progress.strict_add(work_len as u128);
|
||||||
self.tweak_tail,
|
|
||||||
self.head_progress,
|
// if we finished one tweak bruteforcing.
|
||||||
self.tail_progress,
|
if self.key_committed_progress == u128::MAX {
|
||||||
self.head_in_progress,
|
// Check if range is finished
|
||||||
self.tail_in_progress,
|
if self.tweak_current == self.tweak_end {
|
||||||
)
|
false
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
// Increment key
|
||||||
|
self.key_progress = 0;
|
||||||
|
self.key_committed_progress = 0;
|
||||||
|
self.tweak_current += 1;
|
||||||
|
true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if self.key_progress == u128::MAX {
|
||||||
|
// range is waiting for jobs to finish
|
||||||
|
false
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn value(&self) -> (u128, u128, u128, u128) {
|
||||||
|
(self.tweak_current, self.tweak_end, self.key_progress, self.key_committed_progress)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct RangesTable<'a> {
|
struct RangesTable<'a> {
|
||||||
ranges: redb::Table<'a, u16, (u128, u128, u128, u128, u128, u128)>,
|
ranges: redb::Table<'a, u16, (u128, u128, u128, u128)>,
|
||||||
ranges_to_use: redb::Table<'a, u16, ()>,
|
ranges_to_use: redb::Table<'a, u16, ()>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> RangesTable<'a> {
|
impl<'a> RangesTable<'a> {
|
||||||
pub fn new(
|
fn new(
|
||||||
ranges: redb::Table<'a, u16, (u128, u128, u128, u128, u128, u128)>,
|
ranges: redb::Table<'a, u16, (u128, u128, u128, u128)>,
|
||||||
ranges_to_use: redb::Table<'a, u16, ()>,
|
ranges_to_use: redb::Table<'a, u16, ()>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
RangesTable {
|
RangesTable {
|
||||||
@@ -223,40 +276,72 @@ impl<'a> RangesTable<'a> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn open_rw(trx: &'a redb::WriteTransaction) -> Result<Self, redb::Error> {
|
fn open_rw(trx: &'a redb::WriteTransaction) -> Result<Self, redb::Error> {
|
||||||
Ok(RangesTable {
|
Ok(RangesTable {
|
||||||
ranges: trx.open_table(RANGES)?,
|
ranges: trx.open_table(RANGES)?,
|
||||||
ranges_to_use: trx.open_table(RANGES_TO_USE)?,
|
ranges_to_use: trx.open_table(RANGES_TO_USE)?,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_range(&self, k: u16) -> Result<Option<RangeRecord>, redb::Error> {
|
fn get_range(&self, k: u16) -> Result<Option<RangeRecord>, redb::Error> {
|
||||||
Ok(self
|
Ok(self
|
||||||
.ranges
|
.ranges
|
||||||
.get(k)?
|
.get(k)?
|
||||||
.and_then(|ag_v| Some(RangeRecord::from_value(k, &ag_v.value()))))
|
.and_then(|ag_v| Some(RangeRecord::from_value(k, &ag_v.value()))))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_useful_range(&self) -> Result<Vec<RangeRecord>, redb::Error> {
|
fn get_useful_ranges(&self) -> Result<Vec<RangeRecord>, redb::Error> {
|
||||||
let mut res: Vec<RangeRecord> = Vec::with_capacity(self.ranges_to_use.len()? as usize);
|
let mut u_ranges: Vec<RangeRecord> = Vec::with_capacity(self.ranges_to_use.len()? as usize);
|
||||||
for kres in self.ranges_to_use.iter()? {
|
for kres in self.ranges_to_use.iter()? {
|
||||||
res.push(
|
u_ranges.push(
|
||||||
self.get_range(kres?.0.value())?
|
self.get_range(kres?.0.value())?
|
||||||
.expect("Randge id from ranges_to_use should allways be present in ranges table"),
|
.expect("Randge id from ranges_to_use should allways be present in ranges table"),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(res)
|
Ok(u_ranges)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn update_range(&mut self, range: &RangeRecord) -> Result<(), redb::Error> {
|
fn update_range(&mut self, range: &RangeRecord) -> Result<(), redb::Error> {
|
||||||
self.ranges.insert(range.id, range.value())?;
|
self.ranges.insert(range.id, range.value())?;
|
||||||
// if range head and tail reach max values - remove it from to_use table
|
// if range key in progress reach max values - remove it from to_use table
|
||||||
if range.head_in_progress == u128::MAX && range.tail_in_progress == u128::MAX {
|
if range.key_progress == u128::MAX {
|
||||||
self.ranges_to_use.remove(range.id)?;
|
self.ranges_to_use.remove(range.id)?;
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn commit_job(&mut self, job: &JobRecord) -> Result<(), redb::Error> {
|
||||||
|
|
||||||
|
let mut range_rec = self.get_range(job.range_id)?
|
||||||
|
.expect("Randge id is not in ranges_table!");
|
||||||
|
|
||||||
|
let is_active = range_rec.commit_work(job.len);
|
||||||
|
|
||||||
|
// update range in db
|
||||||
|
self.ranges.insert(range_rec.id, range_rec.value())?;
|
||||||
|
|
||||||
|
if is_active {
|
||||||
|
self.ranges_to_use.insert(range_rec.id, ())?;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
self.ranges_to_use.remove(range_rec.id)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn compute_progress(&self) -> Result<f64, redb::StorageError> {
|
||||||
|
let mut mean_prog: f64 = 0.0;
|
||||||
|
for range_res in self.ranges.iter()? {
|
||||||
|
let ag = range_res?;
|
||||||
|
let range = RangeRecord::from_value(ag.0.value(), &ag.1.value());
|
||||||
|
mean_prog += range.completed_ratio();
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(mean_prog / 65536.0f64)
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn db_create_job(db: Database, pref_job_size: u64) -> Result<Option<JobRecord>, redb::Error> {
|
fn db_create_job(db: Database, pref_job_size: u64) -> Result<Option<JobRecord>, redb::Error> {
|
||||||
@@ -291,7 +376,7 @@ fn db_create_job(db: Database, pref_job_size: u64) -> Result<Option<JobRecord>,
|
|||||||
|
|
||||||
// 3 Now get available range
|
// 3 Now get available range
|
||||||
let mut ranges_table = RangesTable::open_rw(&transct)?;
|
let mut ranges_table = RangesTable::open_rw(&transct)?;
|
||||||
let ranges_to_choose = ranges_table.get_useful_range()?;
|
let ranges_to_choose = ranges_table.get_useful_ranges()?;
|
||||||
if ranges_to_choose.len() == 0 {
|
if ranges_to_choose.len() == 0 {
|
||||||
println!("No ranges to choose from!! This can happed if work is sone");
|
println!("No ranges to choose from!! This can happed if work is sone");
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
@@ -301,7 +386,7 @@ fn db_create_job(db: Database, pref_job_size: u64) -> Result<Option<JobRecord>,
|
|||||||
let mut range_to_use = ranges_to_choose[rand::rng().random_range(0..ranges_to_choose.len())];
|
let mut range_to_use = ranges_to_choose[rand::rng().random_range(0..ranges_to_choose.len())];
|
||||||
|
|
||||||
// if we found range - proceed. If None - means we already done.
|
// if we found range - proceed. If None - means we already done.
|
||||||
let new_job = range_to_use.create_job(job_id_to_use, pref_job_size, rand::rng().random_bool(0.5));
|
let new_job = range_to_use.create_job(job_id_to_use, pref_job_size);
|
||||||
|
|
||||||
jobs_table.insert(new_job.id, new_job.get_value())?;
|
jobs_table.insert(new_job.id, new_job.get_value())?;
|
||||||
ranges_table.update_range(&range_to_use)?;
|
ranges_table.update_range(&range_to_use)?;
|
||||||
@@ -323,18 +408,18 @@ fn db_create_job(db: Database, pref_job_size: u64) -> Result<Option<JobRecord>,
|
|||||||
// **********************************************
|
// **********************************************
|
||||||
}
|
}
|
||||||
|
|
||||||
//fn commit_work()
|
|
||||||
|
|
||||||
|
// returns false if no such job found in DB
|
||||||
fn db_commit_job(db: Database, job_id: u64) -> Result<bool, redb::Error> {
|
fn db_commit_job(db: Database, job_id: u64) -> Result<bool, redb::Error> {
|
||||||
let transct: redb::WriteTransaction = db.begin_write()?;
|
let transct: redb::WriteTransaction = db.begin_write()?;
|
||||||
|
|
||||||
{
|
let job_found = {
|
||||||
let mut jobs_table = transct.open_table(JOBS_TABLE)?;
|
let mut jobs_table = transct.open_table(JOBS_TABLE)?;
|
||||||
|
|
||||||
// 1. get job record
|
// 1. get job record
|
||||||
let maybe_job_rec = jobs_table
|
let maybe_job_rec = jobs_table
|
||||||
.get(job_id)?
|
.get(job_id)?
|
||||||
.and_then(|ag| Some(JobRecord::from_data(job_id, ag.value())));
|
.and_then(|ag| Some(JobRecord::from_data(job_id, &ag.value())));
|
||||||
|
|
||||||
if let Some(job_rec) = maybe_job_rec {
|
if let Some(job_rec) = maybe_job_rec {
|
||||||
// 2. Remove job from jobs_table and add job_id to jobs_free_ids table
|
// 2. Remove job from jobs_table and add job_id to jobs_free_ids table
|
||||||
@@ -344,59 +429,35 @@ fn db_commit_job(db: Database, job_id: u64) -> Result<bool, redb::Error> {
|
|||||||
let mut jobs_free_ids = transct.open_table(JOBS_FREE_IDS)?;
|
let mut jobs_free_ids = transct.open_table(JOBS_FREE_IDS)?;
|
||||||
jobs_free_ids.insert(job_id, ())?;
|
jobs_free_ids.insert(job_id, ())?;
|
||||||
|
|
||||||
let mut range_info = transct.open_table(RANGE_INFO)?;
|
let mut ranges_table = RangesTable::open_rw(&transct)?;
|
||||||
let job_range = RangeRecord::from_data(
|
|
||||||
job_rec.tweak_key,
|
|
||||||
range_info
|
|
||||||
.get(job_rec.tweak_key)?
|
|
||||||
.expect("Range to which Job reffering musbe present!")
|
|
||||||
.value(),
|
|
||||||
);
|
|
||||||
|
|
||||||
// commit job to range. Panik if overflow.
|
// commit job to range. Panik if overflow.
|
||||||
job_range.committed.checked_add_assign(&(job_rec.len as u128)).unwrap();
|
ranges_table.commit_job(&job_rec)?;
|
||||||
|
true
|
||||||
range_info.insert(job_rec.tweak_key, job_range.get_value())?;
|
|
||||||
|
|
||||||
// check if Job is finished.
|
|
||||||
if job_range.committed == u128::MAX {
|
|
||||||
let mut range_status = transct.open_multimap_table(RANGE_STATUS)?;
|
|
||||||
range_status.insert(true, job_range.start_tweak)?;
|
|
||||||
}
|
}
|
||||||
|
else {
|
||||||
|
false
|
||||||
}
|
}
|
||||||
|
|
||||||
let maybe_job_rec = match jobs_table.get(job_id)? {
|
|
||||||
Some(ag) => JobRecord::from_data(job_id, ag.value()),
|
|
||||||
None => None,
|
|
||||||
};
|
};
|
||||||
}
|
transct.commit()?;
|
||||||
|
|
||||||
return Ok(false); // Job already done by someone else
|
return Ok(job_found); // Job already done by someone else
|
||||||
}
|
}
|
||||||
|
|
||||||
// fn test_db() -> Result<(), Error> {
|
|
||||||
|
|
||||||
// let db = Database::create(DB_FILE)?;
|
fn db_get_progress(db: Database) -> Result<f64, redb::Error> {
|
||||||
// let write_txn = db.begin_write()?;
|
let trx: redb::ReadTransaction = db.begin_read()?;
|
||||||
// {
|
let progress = {
|
||||||
// let mut table = write_txn.open_table(RANGE_TABLE)?;
|
let ranges_t = trx.open_table(RANGES)?;
|
||||||
// table.insert([0,1,2,3,0,124,6,7], (42u128, 42u128, false))?;
|
let mut mean_prog: f64 = 0.0;
|
||||||
|
for range_res in ranges_t.iter()? {
|
||||||
|
let ag = range_res?;
|
||||||
|
let range = RangeRecord::from_value(ag.0.value(), &ag.1.value());
|
||||||
|
mean_prog += range.completed_ratio();
|
||||||
|
}
|
||||||
|
|
||||||
// }
|
mean_prog / 65536.0f64
|
||||||
// write_txn.commit()?;
|
};
|
||||||
|
trx.close()?;
|
||||||
|
return Ok(progress);
|
||||||
|
}
|
||||||
|
|
||||||
// let read_txn = db.begin_read()?;
|
|
||||||
// let table = read_txn.open_table(RANGE_TABLE)?;
|
|
||||||
// // println!("{}", table.get([0,1,2,3,3,5,6,7])?.unwrap().value());
|
|
||||||
// let a = table.get([0,1,2,1,4,5,6,7]);
|
|
||||||
// match a {
|
|
||||||
// Ok(data) => match data {
|
|
||||||
// Some(d) => println!("some data {}", d.value().0),
|
|
||||||
// None => println!("None")
|
|
||||||
// }
|
|
||||||
// Err(_) => println!("Error")
|
|
||||||
// }
|
|
||||||
// assert_eq!(table.get([0,1,2,3,3,5,6,7])?.unwrap().value().0, 42u128);
|
|
||||||
|
|
||||||
// Ok(())
|
|
||||||
// }
|
|
||||||
|
|||||||
Reference in New Issue
Block a user