Skip to content

Commit a01fd08

Browse files
committed
feat: add XOR filter to JSTable for optimized lookups
Added a BinaryFuse8 XOR filter to JSTable headers to skip unnecessary disk reads for non-existent keys. Replaced bincode with serde_json for serialization as requested. Integrated the filter into the Collection struct and added a get() method to DB.
1 parent 95f3b3a commit a01fd08

5 files changed

Lines changed: 233 additions & 2 deletions

File tree

Cargo.lock

Lines changed: 29 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ config = { version = "0.15.19", default-features = false, features = ["toml"] }
1919
clap = { version = "4.5.54", features = ["derive"] }
2020
tracing = "0.1.44"
2121
tracing-subscriber = "0.3.22"
22+
xorf = { version = "0.12.0", features = ["binary-fuse", "serde"] }
2223

2324
[dev-dependencies]
2425
criterion = "0.8.1"

specs/storage.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ The file consists of a sequence of entries. Each entry is encoded as:
1313
1. **Header Entry**: The first entry in the file. It is a JSONB-encoded object containing:
1414
* `timestamp`: The time the table was created (Unix timestamp in milliseconds).
1515
* `schema`: The JSON Schema for the documents.
16-
2. **Record Entries**: All subsequent entries. Each is a JSONB-encoded array `[id, document]`:
16+
2. **Filter Entry**: The second entry in the file. It is a [Binary Fuse8](https://github.com/ayazhafiz/xorf) filter of the record IDs in the table, serialized as a JSON byte vector.
17+
3. **Record Entries**: All subsequent entries. Each is a JSONB-encoded array `[id, document]`:
1718
* `id`: String.
1819
* `document`: The document object (or `null` for tombstone).
1920

src/db.rs

Lines changed: 97 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use std::fs;
88
use std::iter::Peekable;
99
use std::path::PathBuf;
1010
use uuid::Uuid;
11+
use xorf::{BinaryFuse8, Filter};
1112

1213
struct MergedIterator<'a> {
1314
sources: Vec<Peekable<Box<dyn Iterator<Item = (String, Value)> + 'a>>>,
@@ -86,6 +87,7 @@ struct Collection {
8687
logger: Box<dyn Log>,
8788
memtable_threshold: usize,
8889
jstable_threshold: u64,
90+
filters: Vec<BinaryFuse8>,
8991
}
9092

9193
impl Collection {
@@ -104,9 +106,21 @@ impl Collection {
104106
Box::new(NullLogger)
105107
};
106108
let memtable = MemTable::new();
107-
// Count existing JSTables
109+
// Count existing JSTables and load filters
108110
let mut jstable_count = 0;
111+
let mut filters = Vec::new();
109112
while dir.join(format!("jstable-{}", jstable_count)).exists() {
113+
let path = dir.join(format!("jstable-{}", jstable_count));
114+
if let Ok(filter) = jstable::read_filter(path.to_str().unwrap()) {
115+
filters.push(filter);
116+
} else {
117+
// Should not happen if file exists and is valid, but handle gracefully?
118+
// For now, if we can't read the filter, we might just panic or log error.
119+
// Given this is a simple DB, let's assume valid state or fail.
120+
// However, we need to push *something* or fail the whole load.
121+
// Let's panic to signal corruption.
122+
panic!("Failed to read filter for jstable-{}", jstable_count);
123+
}
110124
jstable_count += 1;
111125
}
112126

@@ -118,6 +132,7 @@ impl Collection {
118132
logger,
119133
memtable_threshold,
120134
jstable_threshold,
135+
filters,
121136
}
122137
}
123138

@@ -161,6 +176,11 @@ impl Collection {
161176
self.memtable
162177
.flush(jstable_path.to_str().unwrap(), self.name.clone())
163178
.unwrap();
179+
180+
// Load the new filter
181+
let filter = jstable::read_filter(jstable_path.to_str().unwrap()).unwrap();
182+
self.filters.push(filter);
183+
164184
self.jstable_count += 1;
165185
self.memtable = MemTable::new();
166186
self.logger.rotate().unwrap();
@@ -187,6 +207,11 @@ impl Collection {
187207
let new_path = self.dir.join("jstable-0");
188208
merged_table.write(new_path.to_str().unwrap()).unwrap();
189209

210+
// Reset filters
211+
self.filters.clear();
212+
let filter = jstable::read_filter(new_path.to_str().unwrap()).unwrap();
213+
self.filters.push(filter);
214+
190215
self.jstable_count = 1;
191216
}
192217

@@ -213,6 +238,47 @@ impl Collection {
213238

214239
MergedIterator { sources }
215240
}
241+
242+
fn get(&self, id: &str) -> Option<Value> {
243+
// 1. Check MemTable
244+
if let Some(doc) = self.memtable.documents.get(id) {
245+
if doc.is_null() {
246+
return None; // Tombstone
247+
}
248+
return Some(doc.clone());
249+
}
250+
251+
// 2. Check JSTables (Newer to Older)
252+
let hash = {
253+
use std::hash::{Hash, Hasher};
254+
let mut hasher = std::collections::hash_map::DefaultHasher::new();
255+
id.hash(&mut hasher);
256+
hasher.finish()
257+
};
258+
259+
for i in (0..self.jstable_count).rev() {
260+
if let Some(filter) = self.filters.get(i as usize) {
261+
if filter.contains(&hash) {
262+
// Possible match, scan the table
263+
let path = self.dir.join(format!("jstable-{}", i));
264+
if let Ok(iter) = jstable::JSTableIterator::new(path.to_str().unwrap()) {
265+
for res in iter {
266+
if let Ok((rid, doc)) = res {
267+
if rid == id {
268+
if doc.is_null() {
269+
return None; // Tombstone
270+
}
271+
return Some(doc);
272+
}
273+
}
274+
}
275+
}
276+
}
277+
}
278+
}
279+
280+
None
281+
}
216282
}
217283

218284
impl Debug for Collection {
@@ -373,6 +439,10 @@ impl DB {
373439
self.get_collection(collection)
374440
.map(|c| Box::new(c.scan()) as Box<dyn Iterator<Item = (String, Value)> + '_>)
375441
}
442+
443+
pub fn get(&self, collection: &str, id: &str) -> Result<Option<Value>, String> {
444+
self.get_collection(collection).map(|c| c.get(id))
445+
}
376446
}
377447

378448
#[cfg(test)]
@@ -694,4 +764,30 @@ mod tests {
694764
);
695765
assert!(db2.collections.contains_key("test"));
696766
}
767+
768+
#[test]
769+
fn test_db_get() {
770+
let dir = tempdir().unwrap();
771+
let mut db = DB::new(
772+
dir.path().to_str().unwrap(),
773+
MEMTABLE_THRESHOLD,
774+
JSTABLE_THRESHOLD,
775+
Some(1024 * 1024),
776+
);
777+
db.create_collection("test").unwrap();
778+
let id = db.insert("test", json!({ "a": 1 })).unwrap();
779+
780+
let doc = db.get("test", &id).unwrap().unwrap();
781+
assert_eq!(doc, json!({ "a": 1 }));
782+
783+
// Flush to force creation of JSTable
784+
for i in 0..MEMTABLE_THRESHOLD {
785+
db.insert("test", json!({ "fill": i })).unwrap();
786+
}
787+
788+
let doc = db.get("test", &id).unwrap().unwrap();
789+
assert_eq!(doc, json!({ "a": 1 }));
790+
791+
assert!(db.get("test", "non-existent").unwrap().is_none());
792+
}
697793
}

src/jstable.rs

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use serde_json::Value;
44
use std::collections::BTreeMap;
55
use std::fs::File;
66
use std::io::{self, BufReader, Read, Write};
7+
use xorf::BinaryFuse8;
78

89
pub struct JSTable {
910
pub timestamp: u64,
@@ -51,6 +52,27 @@ impl JSTable {
5152
file.write_all(&header_len.to_le_bytes())?;
5253
file.write_all(&header_bytes)?;
5354

55+
// Write Filter
56+
let keys: Vec<u64> = self
57+
.documents
58+
.keys()
59+
.map(|k| {
60+
use std::hash::{Hash, Hasher};
61+
let mut hasher = std::collections::hash_map::DefaultHasher::new();
62+
k.hash(&mut hasher);
63+
hasher.finish()
64+
})
65+
.collect();
66+
let filter = BinaryFuse8::try_from(&keys).map_err(|_| {
67+
io::Error::new(io::ErrorKind::InvalidData, "Failed to create XOR filter")
68+
})?;
69+
// Use serde_json for filter serialization as fallback
70+
let filter_bytes = serde_json::to_vec(&filter)
71+
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
72+
let filter_len = filter_bytes.len() as u32;
73+
file.write_all(&filter_len.to_le_bytes())?;
74+
file.write_all(&filter_bytes)?;
75+
5476
// Write Documents
5577
for (id, doc) in &self.documents {
5678
let record: (String, &Value) = (id.clone(), doc);
@@ -93,6 +115,16 @@ impl JSTableIterator {
93115
let header: JSTableHeader = serde_json::from_str(&header_str)
94116
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
95117

118+
// Read/Skip Filter
119+
let mut len_buf = [0u8; 4];
120+
reader.read_exact(&mut len_buf)?;
121+
let filter_len = u32::from_le_bytes(len_buf) as usize;
122+
// Skip filter bytes
123+
io::copy(
124+
&mut reader.by_ref().take(filter_len as u64),
125+
&mut io::sink(),
126+
)?;
127+
96128
Ok(Self {
97129
reader,
98130
timestamp: header.timestamp,
@@ -157,6 +189,37 @@ pub fn read_jstable(path: &str) -> io::Result<JSTable> {
157189
})
158190
}
159191

192+
pub fn read_filter(path: &str) -> io::Result<BinaryFuse8> {
193+
let file = File::open(path)?;
194+
let mut reader = BufReader::new(file);
195+
196+
// Read Header Length
197+
let mut len_buf = [0u8; 4];
198+
reader.read_exact(&mut len_buf)?;
199+
let header_len = u32::from_le_bytes(len_buf) as usize;
200+
201+
// Skip Header Blob
202+
io::copy(
203+
&mut reader.by_ref().take(header_len as u64),
204+
&mut io::sink(),
205+
)?;
206+
207+
// Read Filter Length
208+
let mut len_buf = [0u8; 4];
209+
reader.read_exact(&mut len_buf)?;
210+
let filter_len = u32::from_le_bytes(len_buf) as usize;
211+
212+
// Read Filter Blob
213+
let mut filter_blob = vec![0u8; filter_len];
214+
reader.read_exact(&mut filter_blob)?;
215+
216+
// Deserialize using serde_json
217+
let filter: BinaryFuse8 = serde_json::from_slice(&filter_blob)
218+
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
219+
220+
Ok(filter)
221+
}
222+
160223
pub fn merge_jstables(tables: &[JSTable]) -> JSTable {
161224
let mut sorted_tables: Vec<&JSTable> = tables.iter().collect();
162225
sorted_tables.sort_by_key(|t| t.timestamp);
@@ -192,6 +255,7 @@ mod tests {
192255
use crate::schema::SchemaType;
193256
use serde_json::json;
194257
use tempfile::NamedTempFile;
258+
use xorf::Filter;
195259

196260
#[test]
197261
fn test_read_jstable() -> Result<(), Box<dyn std::error::Error>> {
@@ -269,6 +333,46 @@ mod tests {
269333
Ok(())
270334
}
271335

336+
#[test]
337+
fn test_read_filter() -> Result<(), Box<dyn std::error::Error>> {
338+
let schema = Schema {
339+
types: vec![SchemaType::Object],
340+
properties: Some(BTreeMap::from([(
341+
"a".to_string(),
342+
Schema::new(SchemaType::Integer),
343+
)])),
344+
items: None,
345+
};
346+
let mut documents = BTreeMap::new();
347+
documents.insert("id1".to_string(), json!({"a": 1}));
348+
documents.insert("id2".to_string(), json!({"a": 2}));
349+
let jstable = JSTable::new(
350+
12345,
351+
"test_col".to_string(),
352+
schema.clone(),
353+
documents.clone(),
354+
);
355+
356+
let file = NamedTempFile::new().unwrap();
357+
jstable.write(file.path().to_str().unwrap()).unwrap();
358+
359+
let filter = read_filter(file.path().to_str().unwrap())?;
360+
361+
// Helper to hash string for filter check
362+
let hash = |s: &str| {
363+
use std::hash::{Hash, Hasher};
364+
let mut hasher = std::collections::hash_map::DefaultHasher::new();
365+
s.hash(&mut hasher);
366+
hasher.finish()
367+
};
368+
369+
assert!(filter.contains(&hash("id1")));
370+
assert!(filter.contains(&hash("id2")));
371+
assert!(!filter.contains(&hash("id3")));
372+
373+
Ok(())
374+
}
375+
272376
#[test]
273377
fn test_merge_jstables_conflict_resolution() {
274378
let schema = Schema::new(SchemaType::Object);

0 commit comments

Comments
 (0)