2026-06-04 15:35:38 +02:00

344 lines
11 KiB
Rust

//! In-memory backend, keyed by relation name. Always available.
use std::collections::HashMap;
use crate::id::RowId;
use crate::value::Value;
use crate::{CommittedTx, RowStream, Storage, StorageError, Transaction};
/// In-memory backend, useful as the default in tests and as a correctness
/// oracle for other backends.
#[derive(Debug, Default)]
pub struct MemoryStorage {
relations: HashMap<String, MemoryRelation>,
}
#[derive(Debug)]
pub(crate) struct MemoryRelation {
pub(crate) arity: usize,
pub(crate) next_id: u64,
pub(crate) rows: Vec<(RowId, Vec<Value>)>,
}
impl MemoryStorage {
#[must_use]
pub fn new() -> Self {
Self::default()
}
}
impl Storage for MemoryStorage {
fn create_relation(&mut self, name: &str, arity: usize) -> Result<(), StorageError> {
if self.relations.contains_key(name) {
return Err(StorageError::RelationExists(name.to_string()));
}
self.relations.insert(
name.to_string(),
MemoryRelation {
arity,
next_id: 0,
rows: Vec::new(),
},
);
Ok(())
}
fn arity(&self, name: &str) -> Result<usize, StorageError> {
self.relations
.get(name)
.map(|r| r.arity)
.ok_or_else(|| StorageError::RelationNotFound(name.to_string()))
}
fn scan_iter<'a>(&'a self, name: &str) -> Result<RowStream<'a>, StorageError> {
let relation = self
.relations
.get(name)
.ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?;
Ok(Box::new(relation.rows.iter().cloned().map(Ok)))
}
fn transaction<'a>(&'a mut self) -> Result<Box<dyn Transaction + 'a>, StorageError> {
Ok(Box::new(MemoryTx {
storage: self,
next_ids: HashMap::new(),
pending: Vec::new(),
deletes: Vec::new(),
}))
}
}
/// In-flight memory transaction. Buffers inserts and deletes; applies on commit.
pub(crate) struct MemoryTx<'a> {
storage: &'a mut MemoryStorage,
/// Local next-id-per-relation; initialized lazily from storage on first
/// insert into a relation, then incremented per buffered row.
next_ids: HashMap<String, u64>,
/// (relation name, assigned `RowId`, row cells) for each buffered insert.
pending: Vec<(String, RowId, Vec<Value>)>,
/// (relation name, `RowId`) for each buffered delete. Applied after
/// inserts on commit, so insert+delete of the same id in one tx is a
/// net no-op.
deletes: Vec<(String, RowId)>,
}
impl MemoryTx<'_> {
fn next_id_for(&mut self, name: &str) -> Result<u64, StorageError> {
if let Some(id) = self.next_ids.get(name) {
return Ok(*id);
}
let relation = self
.storage
.relations
.get(name)
.ok_or_else(|| StorageError::RelationNotFound(name.to_string()))?;
let id = relation.next_id;
self.next_ids.insert(name.to_string(), id);
Ok(id)
}
}
impl Transaction for MemoryTx<'_> {
fn insert(&mut self, name: &str, row: Vec<Value>) -> Result<RowId, StorageError> {
let arity = self.storage.arity(name)?;
if row.len() != arity {
return Err(StorageError::ArityMismatch {
expected: arity,
got: row.len(),
});
}
let next_id = self.next_id_for(name)?;
let id = RowId::from(next_id);
self.next_ids.insert(name.to_string(), next_id + 1);
self.pending.push((name.to_string(), id.clone(), row));
Ok(id)
}
fn delete(&mut self, name: &str, id: &RowId) -> Result<(), StorageError> {
// Verify the relation exists; the actual removal is deferred to commit.
let _ = self.storage.arity(name)?;
self.deletes.push((name.to_string(), id.clone()));
Ok(())
}
fn commit(self: Box<Self>) -> Result<CommittedTx, StorageError> {
let MemoryTx {
storage,
next_ids,
pending,
deletes,
} = *self;
for (name, id, row) in pending {
let relation = storage
.relations
.get_mut(&name)
.ok_or_else(|| StorageError::RelationNotFound(name.clone()))?;
relation.rows.push((id, row));
}
for (name, id) in deletes {
let relation = storage
.relations
.get_mut(&name)
.ok_or_else(|| StorageError::RelationNotFound(name.clone()))?;
relation.rows.retain(|(rid, _)| rid != &id);
}
for (name, next_id) in next_ids {
if let Some(relation) = storage.relations.get_mut(&name) {
relation.next_id = next_id;
}
}
// Pending RowIds returned during the tx are already the real ids.
Ok(CommittedTx::empty())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::scan_as_table;
fn i(x: i64) -> Value {
Value::Int(x)
}
#[test]
fn create_insert_scan_roundtrip() -> Result<(), StorageError> {
let mut storage = MemoryStorage::new();
storage.create_relation("edge", 2)?;
let id0 = storage.insert("edge", vec![i(1), i(2)])?;
let id1 = storage.insert("edge", vec![i(2), i(3)])?;
let rows = storage.scan("edge")?;
assert_eq!(rows, vec![(id0, vec![i(1), i(2)]), (id1, vec![i(2), i(3)])],);
Ok(())
}
#[test]
fn batched_inserts_share_one_commit() -> Result<(), StorageError> {
let mut storage = MemoryStorage::new();
storage.create_relation("edge", 2)?;
let (a, b) = {
let mut tx = storage.transaction()?;
let a = tx.insert("edge", vec![i(1), i(2)])?;
let b = tx.insert("edge", vec![i(3), i(4)])?;
tx.commit()?;
(a, b)
};
let rows = storage.scan("edge")?;
assert_eq!(rows, vec![(a, vec![i(1), i(2)]), (b, vec![i(3), i(4)])],);
Ok(())
}
#[test]
fn dropped_transaction_is_rolled_back() -> Result<(), StorageError> {
let mut storage = MemoryStorage::new();
storage.create_relation("edge", 2)?;
{
let mut tx = storage.transaction()?;
tx.insert("edge", vec![i(1), i(2)])?;
tx.insert("edge", vec![i(3), i(4)])?;
// dropped without commit
}
let rows = storage.scan("edge")?;
assert!(rows.is_empty());
Ok(())
}
#[test]
fn inserted_row_ids_are_distinct_and_increment() -> Result<(), StorageError> {
let mut storage = MemoryStorage::new();
storage.create_relation("edge", 1)?;
let id0 = storage.insert("edge", vec![i(1)])?;
let id1 = storage.insert("edge", vec![i(2)])?;
assert_ne!(id0, id1);
assert_eq!(id0, RowId::from(0u64));
assert_eq!(id1, RowId::from(1u64));
Ok(())
}
#[test]
fn duplicate_create_returns_err() -> Result<(), StorageError> {
let mut storage = MemoryStorage::new();
storage.create_relation("edge", 2)?;
assert!(matches!(
storage.create_relation("edge", 2),
Err(StorageError::RelationExists(_))
));
Ok(())
}
#[test]
fn scan_unknown_relation_returns_err() {
let storage = MemoryStorage::new();
assert!(matches!(
storage.scan("missing"),
Err(StorageError::RelationNotFound(_))
));
}
#[test]
fn arity_unknown_relation_returns_err() {
let storage = MemoryStorage::new();
assert!(matches!(
storage.arity("missing"),
Err(StorageError::RelationNotFound(_))
));
}
#[test]
fn insert_wrong_arity_returns_err() -> Result<(), StorageError> {
let mut storage = MemoryStorage::new();
storage.create_relation("edge", 2)?;
assert!(matches!(
storage.insert("edge", vec![i(1)]),
Err(StorageError::ArityMismatch {
expected: 2,
got: 1
})
));
Ok(())
}
#[test]
fn delete_removes_row_then_idempotent_on_missing() -> Result<(), StorageError> {
let mut storage = MemoryStorage::new();
storage.create_relation("edge", 1)?;
let a = storage.insert("edge", vec![i(1)])?;
let b = storage.insert("edge", vec![i(2)])?;
storage.delete("edge", &a)?;
let rows = storage.scan("edge")?;
assert_eq!(rows, vec![(b.clone(), vec![i(2)])]);
// Idempotent: deleting `a` again is fine.
storage.delete("edge", &a)?;
assert_eq!(storage.scan("edge")?, vec![(b, vec![i(2)])]);
Ok(())
}
#[test]
fn delete_within_transaction_is_atomic() -> Result<(), StorageError> {
let mut storage = MemoryStorage::new();
storage.create_relation("edge", 1)?;
let a = storage.insert("edge", vec![i(1)])?;
let _b = storage.insert("edge", vec![i(2)])?;
{
let mut tx = storage.transaction()?;
tx.delete("edge", &a)?;
// Drop without commit: deletion rolled back.
}
assert_eq!(storage.scan("edge")?.len(), 2);
Ok(())
}
#[test]
fn scan_where_filters_by_column_value() -> Result<(), StorageError> {
let mut storage = MemoryStorage::new();
storage.create_relation("edge", 2)?;
storage.insert("edge", vec![i(1), i(10)])?;
let target = storage.insert("edge", vec![i(2), i(20)])?;
storage.insert("edge", vec![i(3), i(10)])?;
let target2 = storage.insert("edge", vec![i(2), i(30)])?;
// Filter on column 0 = 2.
let matches: Vec<_> = storage
.scan_where("edge", 0, &i(2))?
.collect::<Result<_, _>>()?;
assert_eq!(
matches,
vec![(target, vec![i(2), i(20)]), (target2, vec![i(2), i(30)])],
);
// Out-of-range column = no matches.
let none: Vec<_> = storage
.scan_where("edge", 5, &i(2))?
.collect::<Result<_, _>>()?;
assert!(none.is_empty());
Ok(())
}
#[test]
fn scan_iter_yields_rows_lazily() -> Result<(), StorageError> {
let mut storage = MemoryStorage::new();
storage.create_relation("edge", 1)?;
storage.insert("edge", vec![i(10)])?;
storage.insert("edge", vec![i(20)])?;
storage.insert("edge", vec![i(30)])?;
// Take only the first two rows without scanning the whole relation.
let prefix: Vec<_> = storage
.scan_iter("edge")?
.take(2)
.collect::<Result<_, _>>()?;
assert_eq!(prefix.len(), 2);
assert_eq!(prefix[0].1, vec![i(10)]);
assert_eq!(prefix[1].1, vec![i(20)]);
Ok(())
}
#[test]
fn scan_as_table_drops_row_ids() -> Result<(), StorageError> {
let mut storage = MemoryStorage::new();
storage.create_relation("edge", 2)?;
storage.insert("edge", vec![i(1), i(2)])?;
let table = scan_as_table(&storage, "edge")?;
assert_eq!(table.arity, 2);
assert_eq!(table.rows, vec![vec![i(1), i(2)]]);
Ok(())
}
}