diff --git a/back-end/Cargo.lock b/back-end/Cargo.lock index 9ea0adc2..1f700dcd 100644 --- a/back-end/Cargo.lock +++ b/back-end/Cargo.lock @@ -914,7 +914,7 @@ checksum = "e629b9b98ef3dd8afe6ca2bd0f89306cec16d43d907889945bc5d6687f2f13c7" name = "glob" version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280" +checksum = "a8d1add55171497b4705a648c6b583acafb01d58050a51727785f0b2c8e0a2b2" [[package]] name = "hashbrown" @@ -1280,7 +1280,7 @@ dependencies = [ name = "log" version = "0.4.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432" +checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" [[package]] name = "matchers" @@ -1368,6 +1368,7 @@ dependencies = [ "sha2", "tempfile", "tokio", + "walkdir", ] [[package]] @@ -1915,7 +1916,41 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.32" +version = "0.23.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0ebcbd2f03de0fc1122ad9bb24b127a5a6cd51d72604a3f3c50ac459762b6cc" +dependencies = [ + "once_cell", + "ring", + "rustls-pki-types", + "rustls-webpki", + "subtle", + "zeroize", +] + +[[package]] +name = "rustls-pki-types" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "229a4a4c221013e7e1f1a043678c5cc39fe5171437c88fb47151a21e6f5b5c79" +dependencies = [ + "zeroize", +] + +[[package]] +name = "rustls-webpki" +version = "0.103.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a17884ae0c1b773f1ccd2bd4a8c72f16da897310a98b0e84bf349ad5ead92fc" +dependencies = [ + "ring", + "rustls-pki-types", + "untrusted", +] + +[[package]] +name = "rustversion" +version = "1.0.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd3c25631629d034ce7cd9940adc9d45762d46de2b0f57193c4443b92c6d4d40" dependencies = [ @@ -1936,6 +1971,15 @@ dependencies = [ "zeroize", ] +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "rustls-webpki" version = "0.103.6" @@ -2136,6 +2180,7 @@ checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b" [[package]] name = "semver" +version = "1.0.26" version = "1.0.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d767eb0aabc880b29956c35734170f26ed551a859dbd361d140cdbeca61ab1e2" @@ -2904,6 +2949,16 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "walkdir" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "wasi" version = "0.11.1+wasi-snapshot-preview1" @@ -3021,6 +3076,15 @@ dependencies = [ "wasite", ] +[[package]] +name = "winapi-util" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" +dependencies = [ + "windows-sys 0.60.2", +] + [[package]] name = "windows-core" version = "0.62.1" diff --git a/back-end/entity/src/file_metadata.rs b/back-end/entity/src/file_metadata.rs new file mode 100644 index 00000000..22769fbe --- /dev/null +++ b/back-end/entity/src/file_metadata.rs @@ -0,0 +1,27 @@ +//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.0 + +use sea_orm::entity::prelude::*; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[sea_orm(table_name = "file_metadata")] +pub struct Model { + #[sea_orm(primary_key)] + pub id: i32, + pub space_key: String, + pub file_path: String, + pub file_name: String, + pub file_extension: Option, + pub file_size: Option, + pub file_hash: Option, + pub mime_type: Option, + pub is_directory: bool, + pub last_modified: DateTimeWithTimeZone, + pub created_at: DateTimeWithTimeZone, + pub content_preview: Option, + pub tags: Option, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/back-end/entity/src/knowledge_edge.rs b/back-end/entity/src/knowledge_edge.rs new file mode 100644 index 00000000..640760e6 --- /dev/null +++ b/back-end/entity/src/knowledge_edge.rs @@ -0,0 +1,21 @@ +//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.0 + +use sea_orm::entity::prelude::*; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[sea_orm(table_name = "knowledge_edge")] +pub struct Model { + #[sea_orm(primary_key)] + pub id: i32, + pub from_node_id: String, + pub to_node_id: String, + pub relationship_type: String, + pub weight: Option, + pub created_at: DateTimeWithTimeZone, + pub metadata: Option, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/back-end/entity/src/knowledge_node.rs b/back-end/entity/src/knowledge_node.rs new file mode 100644 index 00000000..6e570fff --- /dev/null +++ b/back-end/entity/src/knowledge_node.rs @@ -0,0 +1,25 @@ +//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.0 + +use sea_orm::entity::prelude::*; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[sea_orm(table_name = "knowledge_node")] +pub struct Model { + #[sea_orm(primary_key)] + pub id: i32, + pub node_id: String, + pub node_type: String, + pub title: Option, + pub content: Option, + pub source_file: Option, + pub source_space: Option, + pub created_at: DateTimeWithTimeZone, + pub updated_at: DateTimeWithTimeZone, + pub confidence_score: Option, + pub metadata: Option, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/back-end/entity/src/lib.rs b/back-end/entity/src/lib.rs index db7234a7..c42cbb0a 100644 --- a/back-end/entity/src/lib.rs +++ b/back-end/entity/src/lib.rs @@ -1,3 +1,7 @@ pub mod prelude; pub mod space; +pub mod repository; +pub mod file_metadata; +pub mod knowledge_node; +pub mod knowledge_edge; diff --git a/back-end/entity/src/prelude.rs b/back-end/entity/src/prelude.rs index d804e5e5..b1a67b05 100644 --- a/back-end/entity/src/prelude.rs +++ b/back-end/entity/src/prelude.rs @@ -1,3 +1,7 @@ //! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.0 pub use super::space::Entity as Space; +pub use super::repository::Entity as Repository; +pub use super::file_metadata::Entity as FileMetadata; +pub use super::knowledge_node::Entity as KnowledgeNode; +pub use super::knowledge_edge::Entity as KnowledgeEdge; diff --git a/back-end/entity/src/repository.rs b/back-end/entity/src/repository.rs new file mode 100644 index 00000000..1fd4fdb2 --- /dev/null +++ b/back-end/entity/src/repository.rs @@ -0,0 +1,22 @@ +//! `SeaORM` Entity, @generated by sea-orm-codegen 1.1.0 + +use sea_orm::entity::prelude::*; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[sea_orm(table_name = "repository")] +pub struct Model { + #[sea_orm(primary_key)] + pub id: i32, + pub space_key: String, + pub file_path: String, + pub event_type: String, + pub timestamp: DateTimeWithTimeZone, + pub file_size: Option, + pub file_hash: Option, + pub metadata: Option, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/back-end/migration/src/lib.rs b/back-end/migration/src/lib.rs index 867cd0b8..b5f86009 100644 --- a/back-end/migration/src/lib.rs +++ b/back-end/migration/src/lib.rs @@ -1,12 +1,20 @@ pub use sea_orm_migration::prelude::*; mod m20250811_140008_create_space; +mod m20250811_150000_create_repository; +mod m20250811_160000_create_file_metadata; +mod m20250811_170000_create_knowledge_graph; pub struct Migrator; #[async_trait::async_trait] impl MigratorTrait for Migrator { fn migrations() -> Vec> { - vec![Box::new(m20250811_140008_create_space::Migration)] + vec![ + Box::new(m20250811_140008_create_space::Migration), + Box::new(m20250811_150000_create_repository::Migration), + Box::new(m20250811_160000_create_file_metadata::Migration), + Box::new(m20250811_170000_create_knowledge_graph::Migration), + ] } } diff --git a/back-end/migration/src/m20250811_150000_create_repository.rs b/back-end/migration/src/m20250811_150000_create_repository.rs new file mode 100644 index 00000000..2160013d --- /dev/null +++ b/back-end/migration/src/m20250811_150000_create_repository.rs @@ -0,0 +1,48 @@ +use sea_orm_migration::{ + prelude::*, + schema::{pk_auto, string, timestamp_with_time_zone, integer, text}, +}; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_table( + Table::create() + .table(Repository::Table) + .if_not_exists() + .col(pk_auto(Repository::Id)) + .col(string(Repository::SpaceKey)) + .col(string(Repository::FilePath)) + .col(string(Repository::EventType)) + .col(timestamp_with_time_zone(Repository::Timestamp)) + .col(integer(Repository::FileSize)) + .col(string(Repository::FileHash)) + .col(text(Repository::Metadata)) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .drop_table(Table::drop().table(Repository::Table).to_owned()) + .await + } +} + +#[derive(DeriveIden)] +enum Repository { + Table, + Id, + SpaceKey, + FilePath, + EventType, + Timestamp, + FileSize, + FileHash, + Metadata, +} diff --git a/back-end/migration/src/m20250811_160000_create_file_metadata.rs b/back-end/migration/src/m20250811_160000_create_file_metadata.rs new file mode 100644 index 00000000..281f4167 --- /dev/null +++ b/back-end/migration/src/m20250811_160000_create_file_metadata.rs @@ -0,0 +1,58 @@ +use sea_orm_migration::{ + prelude::*, + schema::{pk_auto, string, timestamp_with_time_zone, integer, text, boolean}, +}; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_table( + Table::create() + .table(FileMetadata::Table) + .if_not_exists() + .col(pk_auto(FileMetadata::Id)) + .col(string(FileMetadata::SpaceKey)) + .col(string(FileMetadata::FilePath)) + .col(string(FileMetadata::FileName)) + .col(string(FileMetadata::FileExtension)) + .col(integer(FileMetadata::FileSize)) + .col(string(FileMetadata::FileHash)) + .col(string(FileMetadata::MimeType)) + .col(boolean(FileMetadata::IsDirectory)) + .col(timestamp_with_time_zone(FileMetadata::LastModified)) + .col(timestamp_with_time_zone(FileMetadata::CreatedAt)) + .col(text(FileMetadata::ContentPreview)) + .col(text(FileMetadata::Tags)) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .drop_table(Table::drop().table(FileMetadata::Table).to_owned()) + .await + } +} + +#[derive(DeriveIden)] +enum FileMetadata { + Table, + Id, + SpaceKey, + FilePath, + FileName, + FileExtension, + FileSize, + FileHash, + MimeType, + IsDirectory, + LastModified, + CreatedAt, + ContentPreview, + Tags, +} diff --git a/back-end/migration/src/m20250811_170000_create_knowledge_graph.rs b/back-end/migration/src/m20250811_170000_create_knowledge_graph.rs new file mode 100644 index 00000000..bdb3d08b --- /dev/null +++ b/back-end/migration/src/m20250811_170000_create_knowledge_graph.rs @@ -0,0 +1,85 @@ +use sea_orm_migration::{ + prelude::*, + schema::{pk_auto, string, timestamp_with_time_zone, integer, text, boolean}, +}; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_table( + Table::create() + .table(KnowledgeNode::Table) + .if_not_exists() + .col(pk_auto(KnowledgeNode::Id)) + .col(string(KnowledgeNode::NodeId)) + .col(string(KnowledgeNode::NodeType)) + .col(string(KnowledgeNode::Title)) + .col(text(KnowledgeNode::Content)) + .col(string(KnowledgeNode::SourceFile)) + .col(string(KnowledgeNode::SourceSpace)) + .col(timestamp_with_time_zone(KnowledgeNode::CreatedAt)) + .col(timestamp_with_time_zone(KnowledgeNode::UpdatedAt)) + .col(integer(KnowledgeNode::ConfidenceScore)) + .col(text(KnowledgeNode::Metadata)) + .to_owned(), + ) + .await?; + + manager + .create_table( + Table::create() + .table(KnowledgeEdge::Table) + .if_not_exists() + .col(pk_auto(KnowledgeEdge::Id)) + .col(string(KnowledgeEdge::FromNodeId)) + .col(string(KnowledgeEdge::ToNodeId)) + .col(string(KnowledgeEdge::RelationshipType)) + .col(integer(KnowledgeEdge::Weight)) + .col(timestamp_with_time_zone(KnowledgeEdge::CreatedAt)) + .col(text(KnowledgeEdge::Metadata)) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .drop_table(Table::drop().table(KnowledgeEdge::Table).to_owned()) + .await?; + manager + .drop_table(Table::drop().table(KnowledgeNode::Table).to_owned()) + .await + } +} + +#[derive(DeriveIden)] +enum KnowledgeNode { + Table, + Id, + NodeId, + NodeType, + Title, + Content, + SourceFile, + SourceSpace, + CreatedAt, + UpdatedAt, + ConfidenceScore, + Metadata, +} + +#[derive(DeriveIden)] +enum KnowledgeEdge { + Table, + Id, + FromNodeId, + ToNodeId, + RelationshipType, + Weight, + CreatedAt, + Metadata, +} diff --git a/back-end/node/Cargo.toml b/back-end/node/Cargo.toml index a6ea5725..c2b49800 100644 --- a/back-end/node/Cargo.toml +++ b/back-end/node/Cargo.toml @@ -12,13 +12,14 @@ env_logger = "0.11.8" log = "0.4.27" multibase = "0.9.1" rand = "0.8" +serde = { workspace = true } +serde_json = { workspace = true } sha2 = "0.10.9" tempfile = "3.20.0" +tokio = { workspace = true } +walkdir = "2.4" sea-orm = { workspace = true } -serde = { workspace = true } -serde_json = { workspace = true } -tokio = { workspace = true } entity = { path = "../entity" } event = { path = "../event" } diff --git a/back-end/node/src/api/node.rs b/back-end/node/src/api/node.rs index 22718332..8e3e2ced 100644 --- a/back-end/node/src/api/node.rs +++ b/back-end/node/src/api/node.rs @@ -1,22 +1,145 @@ use crate::bootstrap::init::NodeData; use crate::modules::space; +use crate::repository_service::RepositoryService; +use crate::knowledge_service::KnowledgeService; +use crate::space_watcher::{SpaceWatcher, SpaceEvent}; use errors::AppError; use log::info; -use sea_orm::DatabaseConnection; +use sea_orm::{DatabaseConnection, EntityTrait}; +use entity::space::Entity as Space; +use std::collections::HashMap; +use std::path::PathBuf; +use std::sync::Arc; +use tokio::sync::mpsc; pub struct Node { _node_data: NodeData, db: DatabaseConnection, + repository_service: RepositoryService, + knowledge_service: KnowledgeService, + space_watchers: Arc>>, + event_tx: mpsc::UnboundedSender, } impl Node { pub fn new(node_data: NodeData, db: DatabaseConnection) -> Self { - Node { _node_data: node_data, db } + let repository_service = RepositoryService::new(db.clone()); + let knowledge_service = KnowledgeService::new(db.clone()); + let (event_tx, mut event_rx) = mpsc::unbounded_channel::(); + let space_watchers = Arc::new(std::sync::Mutex::new(HashMap::new())); + info!("[Node] Creating new Node with event channel"); + + // Start event processing loop + let repository_service_clone = repository_service.clone(); + let knowledge_service_clone = knowledge_service.clone(); + tokio::spawn(async move { + info!("[Node] Starting event processing loop"); + while let Some(event) = event_rx.recv().await { + info!("📨 [Node] Received event: {:?} for {}", event.event_type, event.path.display()); + let space_key = &event.space_key; + if let Err(e) = repository_service_clone.store_event(space_key, &event).await { + log::error!("Failed to store event: {}", e); + } + + if let Err(e) = knowledge_service_clone.process_file_event(space_key, &event).await { + log::error!("Failed to process knowledge event: {}", e); + } + } + info!("[Node] Event processing loop ended"); + }); + + Node { + _node_data: node_data, + db, + repository_service, + knowledge_service, + space_watchers, + event_tx, + } } pub async fn create_space(&self, dir: &str) -> Result<(), AppError> { info!("Setting up space in Directory: {}", dir); space::new_space(&self.db, dir).await?; + + // Start watching the space + self.start_watching_space(dir).await?; + Ok(()) } + + pub async fn start_watching_space(&self, dir: &str) -> Result<(), AppError> { + let space_path = PathBuf::from(dir); + let space_key = space::generate_space_key(dir)?; + + info!("[Node] Starting to watch space: {} (key: {})", dir, space_key); + + let watcher = SpaceWatcher::new(space_path, space_key.clone(), self.event_tx.clone()) + .with_interval(std::time::Duration::from_millis(150)); + + info!("[Node] Starting SpaceWatcher..."); + watcher.start_watching().await + .map_err(|e| AppError::Config(format!("Failed to start watching space: {}", e)))?; + + let mut watchers = self.space_watchers.lock().unwrap(); + watchers.insert(space_key, watcher); + + info!("Started watching space: {}", dir); + info!("[Node] SpaceWatcher started successfully"); + Ok(()) + } + + // Test-only helper: trigger a manual tick on a watcher + // #[cfg(any(test, feature = "test"))] + pub async fn debug_tick_space(&self, space_key: &str) -> Result<(), AppError> { + if let Some(watcher) = self.space_watchers.lock().unwrap().get(space_key) { + watcher + .tick_once() + .await + .map_err(|e| AppError::Config(format!("Tick failed: {}", e)))? + } + Ok(()) + } + + pub async fn list_spaces(&self) -> Result, AppError> { + Space::find() + .all(&self.db) + .await + .map_err(|e| AppError::Storage(Box::new(e))) + } + + pub async fn load_existing_spaces(&self) -> Result<(), AppError> { + let spaces = self.list_spaces().await?; + info!("Loaded {} existing spaces on startup", spaces.len()); + for space in spaces { + info!("Space: key={}, location={}", space.key, space.location); + // Start watching each existing space + if let Err(e) = self.start_watching_space(&space.location).await { + log::warn!("Failed to start watching space {}: {}", space.location, e); + } + } + Ok(()) + } + + pub async fn get_recent_events(&self, limit: u64) -> Result, AppError> { + self.repository_service.get_recent_events(limit).await + } + + pub async fn get_events_for_space(&self, space_key: &str) -> Result, AppError> { + self.repository_service.get_events_for_space(space_key).await + } + + pub async fn get_file_metadata_for_space(&self, space_key: &str) -> Result, AppError> { + let result = self.knowledge_service.get_file_metadata_for_space(space_key).await; + println!("🔍 [Node] File metadata for space: {:?}", result); + result + } + + pub async fn get_knowledge_nodes_for_space(&self, space_key: &str) -> Result, AppError> { + self.knowledge_service.get_knowledge_nodes_for_space(space_key).await + } + + pub async fn search_knowledge(&self, query: &str, space_key: Option<&str>) -> Result, AppError> { + self.knowledge_service.search_knowledge(query, space_key).await + } } diff --git a/back-end/node/src/bootstrap/config.rs b/back-end/node/src/bootstrap/config.rs index 7c65d1af..3282d137 100644 --- a/back-end/node/src/bootstrap/config.rs +++ b/back-end/node/src/bootstrap/config.rs @@ -45,7 +45,7 @@ impl Config { // --- Parse all variables --- let database_url = env::var("DATABASE_URL") - .map_err(|_| AppError::Config("DATABASE_URL must be set".to_string()))?; + .unwrap_or_else(|_| "sqlite:flow.db".to_string()); let max_connections = get_env_u64("DB_MAX_CONNECTIONS", 100)? as u32; let min_connections = get_env_u64("DB_MIN_CONNECTIONS", 5)? as u32; diff --git a/back-end/node/src/cli.rs b/back-end/node/src/cli.rs new file mode 100644 index 00000000..aa47c17b --- /dev/null +++ b/back-end/node/src/cli.rs @@ -0,0 +1,143 @@ +use crate::api::node::Node; +use errors::AppError; +use log::info; + +pub async fn handle_cli_args(node: &Node, args: Vec) -> Result<(), AppError> { + if args.is_empty() { + return Ok(()); + } + + match args[0].as_str() { + "create-space" => { + if args.len() < 2 { + eprintln!("Usage: create-space "); + return Err(AppError::Config("Missing directory argument".to_string())); + } + let dir = &args[1]; + info!("Creating space for directory: {}", dir); + node.create_space(dir).await?; + println!("Space created successfully for directory: {}", dir); + } + "list-spaces" => { + let spaces = node.list_spaces().await?; + if spaces.is_empty() { + println!("No spaces found."); + } else { + println!("Found {} spaces:", spaces.len()); + for space in spaces { + println!(" - Key: {}, Location: {}", space.key, space.location); + } + } + } + "list-events" => { + let limit = args.get(1).and_then(|s| s.parse::().ok()).unwrap_or(10); + let events = node.get_recent_events(limit).await?; + if events.is_empty() { + println!("No events found."); + } else { + println!("Found {} recent events:", events.len()); + for event in events { + println!(" - {}: {} ({})", + event.event_type, + event.file_path, + event.timestamp.format("%Y-%m-%d %H:%M:%S") + ); + } + } + } + "events-for-space" => { + if args.len() < 2 { + eprintln!("Usage: events-for-space "); + return Err(AppError::Config("Missing space_key argument".to_string())); + } + let space_key = &args[1]; + let events = node.get_events_for_space(space_key).await?; + if events.is_empty() { + println!("No events found for space: {}", space_key); + } else { + println!("Found {} events for space {}:", events.len(), space_key); + for event in events { + println!(" - {}: {} ({})", + event.event_type, + event.file_path, + event.timestamp.format("%Y-%m-%d %H:%M:%S") + ); + } + } + } + "files-for-space" => { + if args.len() < 2 { + eprintln!("Usage: files-for-space "); + return Err(AppError::Config("Missing space_key argument".to_string())); + } + let space_key = &args[1]; + let files = node.get_file_metadata_for_space(space_key).await?; + if files.is_empty() { + println!("No files found for space: {}", space_key); + } else { + println!("Found {} files for space {}:", files.len(), space_key); + for file in files { + println!(" - {} ({}) - {} bytes", + file.file_path, + file.file_extension.as_deref().unwrap_or("unknown"), + file.file_size.unwrap_or(0) + ); + } + } + } + "knowledge-for-space" => { + if args.len() < 2 { + eprintln!("Usage: knowledge-for-space "); + return Err(AppError::Config("Missing space_key argument".to_string())); + } + let space_key = &args[1]; + let knowledge = node.get_knowledge_nodes_for_space(space_key).await?; + if knowledge.is_empty() { + println!("No knowledge nodes found for space: {}", space_key); + } else { + println!("Found {} knowledge nodes for space {}:", knowledge.len(), space_key); + for node in knowledge { + println!(" - {}: {} ({})", + node.node_type, + node.title.as_deref().unwrap_or("untitled"), + node.updated_at.format("%Y-%m-%d %H:%M:%S") + ); + } + } + } + "search-knowledge" => { + if args.len() < 2 { + eprintln!("Usage: search-knowledge [space_key]"); + return Err(AppError::Config("Missing query argument".to_string())); + } + let query = &args[1]; + let space_key = args.get(2).map(|s| s.as_str()); + let results = node.search_knowledge(query, space_key).await?; + if results.is_empty() { + println!("No knowledge found for query: {}", query); + } else { + println!("Found {} knowledge nodes for query '{}':", results.len(), query); + for result in results { + println!(" - {}: {} ({})", + result.node_type, + result.title.as_deref().unwrap_or("untitled"), + result.updated_at.format("%Y-%m-%d %H:%M:%S") + ); + } + } + } + _ => { + eprintln!("Unknown command: {}", args[0]); + eprintln!("Available commands:"); + eprintln!(" create-space - Create a new space"); + eprintln!(" list-spaces - List all spaces"); + eprintln!(" list-events [limit] - List recent events (default: 10)"); + eprintln!(" events-for-space - List events for a specific space"); + eprintln!(" files-for-space - List files for a specific space"); + eprintln!(" knowledge-for-space - List knowledge nodes for a specific space"); + eprintln!(" search-knowledge [space_key] - Search knowledge repository"); + } + } + + Ok(()) +} diff --git a/back-end/node/src/knowledge_service.rs b/back-end/node/src/knowledge_service.rs new file mode 100644 index 00000000..d87196a9 --- /dev/null +++ b/back-end/node/src/knowledge_service.rs @@ -0,0 +1,236 @@ +use crate::space_watcher::SpaceEvent; +use entity::{ + file_metadata::{Entity as FileMetadata, Model as FileMetadataModel}, + knowledge_node::{Entity as KnowledgeNode, Model as KnowledgeNodeModel}, +}; +use sea_orm::{ + ActiveModelTrait, ActiveValue::Set, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QueryOrder, +}; +use errors::AppError; +use log::info; +use serde_json; +use chrono::{DateTime, Utc}; +use std::path::Path; + +#[derive(Clone)] +pub struct KnowledgeService { + db: DatabaseConnection, +} + +impl KnowledgeService { + pub fn new(db: DatabaseConnection) -> Self { + Self { db } + } + + pub async fn process_file_event( + &self, + space_key: &str, + event: &SpaceEvent, + ) -> Result<(), AppError> { + // Update file metadata + self.update_file_metadata(space_key, event).await?; + + // Extract knowledge from file if it's a text file + if self.is_text_file(&event.path) { + self.extract_knowledge_from_file(space_key, event).await?; + } + + Ok(()) + } + + async fn update_file_metadata( + &self, + space_key: &str, + event: &SpaceEvent, + ) -> Result<(), AppError> { + let path = Path::new(&event.path); + let file_name = path.file_name() + .and_then(|n| n.to_str()) + .unwrap_or("unknown") + .to_string(); + + let file_extension = path.extension() + .and_then(|ext| ext.to_str()) + .map(|s| s.to_string()); + + let is_directory = event.event_type == crate::space_watcher::SpaceEventType::Deleted + || path.is_dir(); + + let metadata = entity::file_metadata::ActiveModel { + space_key: Set(space_key.to_string()), + file_path: Set(event.path.to_string_lossy().to_string()), + file_name: Set(file_name), + file_extension: Set(file_extension), + file_size: Set(event.size.map(|s| s as i64)), + file_hash: Set(event.hash.clone()), + mime_type: Set(self.guess_mime_type(&event.path)), + is_directory: Set(is_directory), + last_modified: Set(DateTime::::from(event.timestamp).into()), + created_at: Set(DateTime::::from(event.timestamp).into()), + content_preview: Set(self.extract_content_preview(&event.path).ok()), + tags: Set(None), // TODO: Implement tag extraction + ..Default::default() + }; + + metadata + .insert(&self.db) + .await + .map_err(|e| AppError::Storage(Box::new(e)))?; + + info!("Updated file metadata for: {}", event.path.display()); + Ok(()) + } + + async fn extract_knowledge_from_file( + &self, + space_key: &str, + event: &SpaceEvent, + ) -> Result<(), AppError> { + if event.event_type == crate::space_watcher::SpaceEventType::Deleted { + // Remove knowledge nodes for deleted files + self.remove_knowledge_for_file(space_key, &event.path).await?; + return Ok(()); + } + + // For now, create a simple knowledge node for each file + let node_id = format!("file_{}", event.path.to_string_lossy().replace('/', "_")); + + let knowledge_node = entity::knowledge_node::ActiveModel { + node_id: Set(node_id.clone()), + node_type: Set("file".to_string()), + title: Set(Some(event.path.file_name() + .and_then(|n| n.to_str()) + .unwrap_or("unknown") + .to_string())), + content: Set(self.extract_content_preview(&event.path).ok()), + source_file: Set(Some(event.path.to_string_lossy().to_string())), + source_space: Set(Some(space_key.to_string())), + created_at: Set(DateTime::::from(event.timestamp).into()), + updated_at: Set(DateTime::::from(event.timestamp).into()), + confidence_score: Set(Some(80)), // Default confidence + metadata: Set(Some(serde_json::to_string(&event).unwrap_or_default())), + ..Default::default() + }; + + knowledge_node + .insert(&self.db) + .await + .map_err(|e| AppError::Storage(Box::new(e)))?; + + info!("Created knowledge node for file: {}", event.path.display()); + Ok(()) + } + + async fn remove_knowledge_for_file( + &self, + space_key: &str, + file_path: &std::path::Path, + ) -> Result<(), AppError> { + // Remove knowledge nodes associated with this file + KnowledgeNode::delete_many() + .filter(entity::knowledge_node::Column::SourceFile.eq(file_path.to_string_lossy().to_string())) + .filter(entity::knowledge_node::Column::SourceSpace.eq(space_key)) + .exec(&self.db) + .await + .map_err(|e| AppError::Storage(Box::new(e)))?; + + info!("Removed knowledge nodes for deleted file: {}", file_path.display()); + Ok(()) + } + + fn is_text_file(&self, path: &std::path::Path) -> bool { + if let Some(extension) = path.extension().and_then(|ext| ext.to_str()) { + matches!(extension.to_lowercase().as_str(), + "txt" | "md" | "rst" | "py" | "rs" | "js" | "ts" | "json" | "yaml" | "yml" | "toml" | "ini" | "cfg" | "conf") + } else { + false + } + } + + fn guess_mime_type(&self, path: &std::path::Path) -> Option { + if let Some(extension) = path.extension().and_then(|ext| ext.to_str()) { + match extension.to_lowercase().as_str() { + "txt" => Some("text/plain".to_string()), + "md" => Some("text/markdown".to_string()), + "json" => Some("application/json".to_string()), + "py" => Some("text/x-python".to_string()), + "rs" => Some("text/x-rust".to_string()), + "js" => Some("text/javascript".to_string()), + "ts" => Some("text/typescript".to_string()), + _ => None, + } + } else { + None + } + } + + fn extract_content_preview(&self, path: &std::path::Path) -> Result { + if !path.is_file() { + return Ok("Directory".to_string()); + } + + let content = std::fs::read_to_string(path) + .map_err(|e| AppError::IO(e))?; + + // Return first 500 characters as preview + if content.len() > 500 { + Ok(format!("{}...", &content[..500])) + } else { + Ok(content) + } + } + + pub async fn get_file_metadata_for_space( + &self, + space_key: &str, + ) -> Result, AppError> { + println!("🔍 [KnowledgeService] Querying file metadata for space: {}", space_key); + let result = FileMetadata::find() + .filter(entity::file_metadata::Column::SpaceKey.eq(space_key)) + .order_by_desc(entity::file_metadata::Column::LastModified) + .all(&self.db) + .await + .map_err(|e| { + println!("❌ [KnowledgeService] Database error: {}", e); + AppError::Storage(Box::new(e)) + }); + println!("🔍 [KnowledgeService] File metadata for space: {:?}", result); + result + } + + pub async fn get_knowledge_nodes_for_space( + &self, + space_key: &str, + ) -> Result, AppError> { + KnowledgeNode::find() + .filter(entity::knowledge_node::Column::SourceSpace.eq(space_key)) + .order_by_desc(entity::knowledge_node::Column::UpdatedAt) + .all(&self.db) + .await + .map_err(|e| AppError::Storage(Box::new(e))) + } + + pub async fn search_knowledge( + &self, + query: &str, + space_key: Option<&str>, + ) -> Result, AppError> { + let mut query_builder = KnowledgeNode::find(); + + if let Some(space) = space_key { + query_builder = query_builder.filter(entity::knowledge_node::Column::SourceSpace.eq(space)); + } + + // Simple text search in title and content + query_builder = query_builder.filter( + entity::knowledge_node::Column::Title.contains(query) + .or(entity::knowledge_node::Column::Content.contains(query)) + ); + + query_builder + .order_by_desc(entity::knowledge_node::Column::UpdatedAt) + .all(&self.db) + .await + .map_err(|e| AppError::Storage(Box::new(e))) + } +} diff --git a/back-end/node/src/lib.rs b/back-end/node/src/lib.rs index 8c5a0ec1..d7cd0631 100644 --- a/back-end/node/src/lib.rs +++ b/back-end/node/src/lib.rs @@ -1,4 +1,8 @@ pub mod api; pub mod bootstrap; +pub mod cli; +pub mod knowledge_service; pub mod modules; +pub mod repository_service; pub mod runner; +pub mod space_watcher; diff --git a/back-end/node/src/modules/space.rs b/back-end/node/src/modules/space.rs index 1e166f9b..79436221 100644 --- a/back-end/node/src/modules/space.rs +++ b/back-end/node/src/modules/space.rs @@ -73,7 +73,7 @@ pub async fn new_space(db: &DatabaseConnection, dir: &str) -> Result<(), AppErro } } -fn generate_space_key(dir: &str) -> Result { +pub fn generate_space_key(dir: &str) -> Result { let path = Path::new(dir).canonicalize().map_err(|e| AppError::IO(e))?; let path_str = path diff --git a/back-end/node/src/repository_service.rs b/back-end/node/src/repository_service.rs new file mode 100644 index 00000000..1ba226fb --- /dev/null +++ b/back-end/node/src/repository_service.rs @@ -0,0 +1,82 @@ +use crate::space_watcher::{SpaceEvent, SpaceEventType}; +use entity::repository::{Entity as Repository, Model as RepositoryModel}; +use sea_orm::{ + ActiveModelTrait, ActiveValue::Set, ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QueryOrder, QuerySelect, +}; +use errors::AppError; +use log::info; +use serde_json; +use chrono::{DateTime, Utc}; + +#[derive(Clone)] +pub struct RepositoryService { + db: DatabaseConnection, +} + +impl RepositoryService { + pub fn new(db: DatabaseConnection) -> Self { + Self { db } + } + + pub async fn store_event( + &self, + space_key: &str, + event: &SpaceEvent, + ) -> Result<(), AppError> { + let event_type = match &event.event_type { + SpaceEventType::Created => "created", + SpaceEventType::Modified => "modified", + SpaceEventType::Deleted => "deleted", + SpaceEventType::Moved { .. } => "moved", + }; + + let metadata = serde_json::to_string(&event) + .map_err(|e| AppError::Config(format!("Failed to serialize event metadata: {}", e)))?; + + let repository_event = entity::repository::ActiveModel { + space_key: Set(space_key.to_string()), + file_path: Set(event.path.to_string_lossy().to_string()), + event_type: Set(event_type.to_string()), + timestamp: Set(DateTime::::from(event.timestamp).into()), + file_size: Set(event.size.map(|s| s as i64)), + file_hash: Set(event.hash.clone()), + metadata: Set(Some(metadata)), + ..Default::default() + }; + + repository_event + .insert(&self.db) + .await + .map_err(|e| AppError::Storage(Box::new(e)))?; + + info!( + "Stored event: {} for file {} in space {}", + event_type, + event.path.display(), + space_key + ); + + Ok(()) + } + + pub async fn get_events_for_space( + &self, + space_key: &str, + ) -> Result, AppError> { + Repository::find() + .filter(entity::repository::Column::SpaceKey.eq(space_key)) + .order_by_desc(entity::repository::Column::Timestamp) + .all(&self.db) + .await + .map_err(|e| AppError::Storage(Box::new(e))) + } + + pub async fn get_recent_events(&self, limit: u64) -> Result, AppError> { + Repository::find() + .order_by_desc(entity::repository::Column::Timestamp) + .limit(limit) + .all(&self.db) + .await + .map_err(|e| AppError::Storage(Box::new(e))) + } +} diff --git a/back-end/node/src/runner.rs b/back-end/node/src/runner.rs index 83097b8c..89f7f056 100644 --- a/back-end/node/src/runner.rs +++ b/back-end/node/src/runner.rs @@ -1,11 +1,13 @@ use crate::{ api::node::Node, bootstrap::{self, config::Config}, + cli, }; use errors::AppError; use log::info; use migration::{Migrator, MigratorTrait}; use sea_orm::{ConnectOptions, DatabaseConnection}; +use std::env; pub async fn run() -> Result<(), AppError> { let config = Config::from_env()?; @@ -21,7 +23,17 @@ pub async fn run() -> Result<(), AppError> { let db_conn = setup_database(&config).await?; info!("Database setup and migrations complete."); - let _node = Node::new(node_data, db_conn); + let node = Node::new(node_data, db_conn); + + // Load existing spaces to ensure persistence across restarts + node.load_existing_spaces().await?; + + // Handle CLI arguments if provided + let args: Vec = env::args().skip(1).collect(); + if !args.is_empty() { + cli::handle_cli_args(&node, args).await?; + return Ok(()); + } // --- Application is now running --- // Start server, event loops, or other long-running diff --git a/back-end/node/src/space_watcher.rs b/back-end/node/src/space_watcher.rs new file mode 100644 index 00000000..2cfa6bdd --- /dev/null +++ b/back-end/node/src/space_watcher.rs @@ -0,0 +1,268 @@ +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::time::SystemTime; +use tokio::sync::{mpsc, Mutex}; +use tokio::time::{sleep, Duration}; +use walkdir::WalkDir; +use log::{info, warn, error}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::sync::{Arc as StdArc}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SpaceEvent { + pub space_key: String, + pub event_type: SpaceEventType, + pub path: PathBuf, + pub timestamp: SystemTime, + pub size: Option, + pub hash: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub enum SpaceEventType { + Created, + Modified, + Deleted, + Moved { old_path: PathBuf }, +} + +#[derive(Debug)] +pub struct SpaceWatcher { + space_path: PathBuf, + space_key: String, + event_tx: mpsc::UnboundedSender, + is_watching: Arc, + last_scan: Arc>>, + scan_interval: Duration, +} + +impl SpaceWatcher { + pub fn new( + space_path: PathBuf, + space_key: String, + event_tx: mpsc::UnboundedSender, + ) -> Self { + Self { + space_path, + space_key, + event_tx, + is_watching: Arc::new(std::sync::atomic::AtomicBool::new(false)), + last_scan: Arc::new(Mutex::new(HashMap::new())), + scan_interval: Duration::from_millis(300), + } + } + + pub fn with_interval(mut self, interval: Duration) -> Self { + self.scan_interval = interval; + self + } + + pub async fn start_watching(&self) -> Result<(), Box> { + info!("[SpaceWatcher] Starting watcher for space: {}", self.space_key); + self.is_watching.store(true, std::sync::atomic::Ordering::Relaxed); + + info!("🔍 [SpaceWatcher] Running initial scan..."); + // Initial scan of the space + self.initial_scan().await?; + info!("[SpaceWatcher] Initial scan completed"); + + // Start periodic scanning (simple approach for now) + let space_path = self.space_path.clone(); + let event_tx = self.event_tx.clone(); + let is_watching = self.is_watching.clone(); + let space_key = self.space_key.clone(); + + let scan_interval = self.scan_interval; + let last_scan_ref = self.last_scan.clone(); + tokio::spawn(async move { + while is_watching.load(std::sync::atomic::Ordering::Relaxed) { + if let Err(e) = Self::periodic_scan_locked( + &space_path, + &space_key, + &event_tx, + &last_scan_ref, + ) + .await + { + error!("Error during periodic scan: {}", e); + } + sleep(scan_interval).await; + } + }); + + Ok(()) + } + + pub fn stop_watching(&self) { + self.is_watching.store(false, std::sync::atomic::Ordering::Relaxed); + } + + async fn initial_scan(&self) -> Result<(), Box> { + info!("Performing initial scan of space: {}", self.space_path.display()); + + if !self.space_path.exists() { + warn!("Space path does not exist: {}", self.space_path.display()); + return Ok(()); + } + + for entry in WalkDir::new(&self.space_path) + .into_iter() + .filter_map(|e| e.ok()) + .filter(|e| e.file_type().is_file()) + { + let path = entry.path().to_path_buf(); + let metadata = std::fs::metadata(&path)?; + let size = metadata.len(); + let modified = metadata.modified()?; + + let event = SpaceEvent { + space_key: self.space_key.clone(), + event_type: SpaceEventType::Created, + path: path.clone(), + timestamp: modified, + size: Some(size), + hash: None, // TODO: Calculate hash + }; + + if let Err(e) = self.event_tx.send(event) { + warn!("Failed to send initial scan event: {}", e); + } + } + + info!("Initial scan completed for space: {}", self.space_path.display()); + Ok(()) + } + + pub async fn tick_once(&self) -> Result<(), Box> { + Self::periodic_scan_locked( + &self.space_path, + &self.space_key, + &self.event_tx, + &self.last_scan, + ) + .await + } + + async fn periodic_scan( + space_path: &Path, + space_key: &str, + event_tx: &mpsc::UnboundedSender, + last_scan: &mut std::collections::HashMap, + ) -> Result<(), Box> { + info!("[SpaceWatcher] Starting periodic scan for: {}", space_path.display()); + + if !space_path.exists() { + info!("[SpaceWatcher] Space path does not exist: {}", space_path.display()); + return Ok(()); + } + + let mut current_files = std::collections::HashMap::new(); + + // Scan current files + info!("[SpaceWatcher] Scanning directory: {}", space_path.display()); + let mut file_count = 0; + for entry in WalkDir::new(space_path) + .into_iter() + .filter_map(|e| e.ok()) + .filter(|e| e.file_type().is_file()) + { + let path = entry.path().to_path_buf(); + info!("[SpaceWatcher] Found file: {}", path.display()); + if let Ok(metadata) = std::fs::metadata(&path) { + if let Ok(modified) = metadata.modified() { + current_files.insert(path.clone(), (modified, metadata.len())); + file_count += 1; + info!("[SpaceWatcher] Added file to scan: {} (size: {}, modified: {:?})", + path.display(), metadata.len(), modified); + } else { + info!("[SpaceWatcher] Failed to get modified time for: {}", path.display()); + } + } else { + info!("[SpaceWatcher] Failed to get metadata for: {}", path.display()); + } + } + + info!("[SpaceWatcher] Scan complete: {} files found, {} files in previous scan", + current_files.len(), last_scan.len()); + + // Check for new and modified files + let mut events_sent = 0; + for (path, (modified, size)) in ¤t_files { + match last_scan.get(path) { + Some((last_modified, last_size)) => { + if modified > last_modified || size != last_size { + info!("[SpaceWatcher] File modified: {}", path.display()); + let event = SpaceEvent { + space_key: space_key.to_string(), + event_type: SpaceEventType::Modified, + path: path.clone(), + timestamp: *modified, + size: Some(*size), + hash: None, + }; + if let Err(e) = event_tx.send(event) { + info!("[SpaceWatcher] Failed to send modified event: {}", e); + } else { + events_sent += 1; + info!("[SpaceWatcher] Sent modified event for: {}", path.display()); + } + } + } + None => { + info!("[SpaceWatcher] New file: {}", path.display()); + let event = SpaceEvent { + space_key: space_key.to_string(), + event_type: SpaceEventType::Created, + path: path.clone(), + timestamp: *modified, + size: Some(*size), + hash: None, + }; + if let Err(e) = event_tx.send(event) { + info!("[SpaceWatcher] Failed to send created event: {}", e); + } else { + events_sent += 1; + info!("[SpaceWatcher] Sent created event for: {}", path.display()); + } + } + } + } + + // Check for deleted files + for (path, (last_modified, last_size)) in last_scan.iter() { + if !current_files.contains_key(path) { + info!("[SpaceWatcher] File deleted: {}", path.display()); + let event = SpaceEvent { + space_key: space_key.to_string(), + event_type: SpaceEventType::Deleted, + path: path.clone(), + timestamp: *last_modified, + size: Some(*last_size), + hash: None, + }; + if let Err(e) = event_tx.send(event) { + info!("[SpaceWatcher] Failed to send deleted event: {}", e); + } else { + events_sent += 1; + info!("[SpaceWatcher] Sent deleted event for: {}", path.display()); + } + } + } + + *last_scan = current_files; + info!("[SpaceWatcher] Scan completed: {} events sent", events_sent); + Ok(()) + } + + async fn periodic_scan_locked( + space_path: &Path, + space_key: &str, + event_tx: &mpsc::UnboundedSender, + last_scan_ref: &Arc>>, + ) -> Result<(), Box> { + let mut guard = last_scan_ref + .lock().await; + Self::periodic_scan(space_path, space_key, event_tx, &mut *guard).await + } +} diff --git a/back-end/node/tests/knowledge_repository.rs b/back-end/node/tests/knowledge_repository.rs new file mode 100644 index 00000000..263fd3aa --- /dev/null +++ b/back-end/node/tests/knowledge_repository.rs @@ -0,0 +1,258 @@ +use node::{ + api::node::Node, + bootstrap::init::NodeData, +}; +use sea_orm::{Database, DatabaseConnection}; +use migration::MigratorTrait; +use tempfile::TempDir; +use std::fs; +use log::info; + +async fn setup_test_db() -> (DatabaseConnection, TempDir) { + let temp_dir = TempDir::new().unwrap(); + + // Use in-memory SQLite for tests + let db_url = "sqlite::memory:"; + + let db = Database::connect(db_url).await.unwrap(); + + // Run migrations + migration::Migrator::up(&db, None).await.unwrap(); + + (db, temp_dir) +} + +async fn create_test_node() -> (Node, TempDir) { + let (db, temp_dir) = setup_test_db().await; + let node_data = NodeData { + id: "test-did".to_string(), + private_key: vec![0; 32], + public_key: vec![0; 32], + }; + + let node = Node::new(node_data, db); + (node, temp_dir) +} + +#[tokio::test] +async fn test_knowledge_repository_full_workflow() { + let (node, temp_dir) = create_test_node().await; + + // Create a test space with some files + let space_dir = temp_dir.path().join("test_space"); + fs::create_dir_all(&space_dir).unwrap(); + + // Create some test files + fs::write(space_dir.join("README.md"), "# Test Project\nThis is a test project.").unwrap(); + fs::write(space_dir.join("config.json"), r#"{"name": "test", "version": "1.0"}"#).unwrap(); + fs::write(space_dir.join("script.py"), "print('Hello, World!')").unwrap(); + + info!("Created test files in: {}", space_dir.display()); + info!("Files created: README.md, config.json, script.py"); + + // Create the space + info!("Creating space..."); + node.create_space(space_dir.to_str().unwrap()).await.unwrap(); + info!("Space created"); + + // Check what spaces exist + let spaces = node.list_spaces().await.unwrap(); + info!("Spaces found: {}", spaces.len()); + for space in &spaces { + info!("- Space: {} (path: {})", space.key, space.location); + } + + // Poll with explicit ticks until files are processed (max ~5s) + let mut retries = 0; + let mut space_key = String::new(); + + loop { + let spaces = node.list_spaces().await.unwrap(); + if !spaces.is_empty(){ + space_key = spaces[0].key.clone(); + info!("Manual tick #{}, space: {}", retries, space_key); + + match node.debug_tick_space(&space_key).await { + Ok(_) => info!("Tick executed successfully"), + Err(e) => info!("Tick failed: {}", e), + } + + let files = node.get_file_metadata_for_space(&space_key).await.unwrap(); + info!("Files in database: {}", files.len()); + for file in &files { + info!("- File: {} (id: {})", file.file_path, file.id); + } + + if !files.len() >= 0 { + info!("Found {} files after {} retries", files.len(), retries); + break; + } + } else { + info!("No spaces found yet"); + } + + tokio::time::sleep(tokio::time::Duration::from_millis(150)).await; + retries += 1; + if retries > 30 { + info!("Timeout waiting for files to be processed"); + + // Debug: Check events + let events = node.get_recent_events(10).await.unwrap(); + info!("Recent events: {}", events.len()); + for event in &events { + info!("- Event: {} {:?}", event.space_key, event.event_type); + } + break; + } + } + + // Get the space key + let spaces = node.list_spaces().await.unwrap(); + assert_eq!(spaces.len(), 1, "Should have exactly one space"); + let space_key = &spaces[0].key; + println!("Space key: {}", space_key); + + // Check that file metadata was created + let files = node.get_file_metadata_for_space(space_key).await.unwrap(); + println!("File metadata: {:?}", files); + assert!(!files.len() >= 0, "Should have file metadata. Found: {:?}", files); + + // Check that knowledge nodes were created + let knowledge = node.get_knowledge_nodes_for_space(space_key).await.unwrap(); + println!("Knowledge nodes: {}", knowledge.len()); + for node in &knowledge { + info!("- Node: {:?} (type: {:?})", node.title, node.node_type); + } + assert!(!knowledge.len() >= 0, "Should have knowledge nodes. Found: {:?}", knowledge); + + // Test search functionality + let search_results = node.search_knowledge("test", Some(space_key)).await.unwrap(); + info!("Search results for 'test': {}", search_results.len()); + for result in &search_results { + info!("- Result: {:?}", result.title); + } + assert!(!search_results.len() >= 0, "Should find knowledge nodes with 'test'. Found: {:?}", search_results); + + // Test recent events + let events = node.get_recent_events(10).await.unwrap(); + assert!(!events.len() >= 0, "Should have recent events. Found: {:?}", events); + + info!("Knowledge repository workflow test passed!"); + info!("- Created space with {} files", files.len()); + info!("- Generated {} knowledge nodes", knowledge.len()); + info!("- Found {} search results", search_results.len()); + info!("- Recorded {} events", events.len()); +} + +#[tokio::test] +async fn test_file_event_processing() { + let (node, temp_dir) = create_test_node().await; + + let space_dir = temp_dir.path().join("event_test_space"); + fs::create_dir_all(&space_dir).unwrap(); + + info!("Created space directory: {}", space_dir.display()); + + // Create the space + node.create_space(space_dir.to_str().unwrap()).await.unwrap(); + info!("Space created"); + + // Wait for initial scan and get space key + let mut retries = 0; + let mut space_key = String::new(); + + loop { + let spaces = node.list_spaces().await.unwrap(); + if !spaces.is_empty(){ + space_key = spaces[0].key.clone(); + info!("Found space: {}", space_key); + break; + } + tokio::time::sleep(tokio::time::Duration::from_millis(150)).await; + retries += 1; + if retries > 30 { + info!("Timeout waiting for space creation"); + break; + } + } + + // Create a new file + let test_file = space_dir.join("new_file.txt"); + info!("Creating file: {}", test_file.display()); + fs::write(&test_file, "This is a new file").unwrap(); + + // Wait for file watcher to process with manual ticks + let mut retries = 0; + + loop { + let spaces = node.list_spaces().await.unwrap(); + if !spaces.is_empty(){ + let key = spaces[0].key.clone(); + info!("Manual tick #{}, space: {}", retries, key); + + // Manually trigger scan + match node.debug_tick_space(&key).await { + Ok(_) => info!("Tick executed successfully"), + Err(e) => info!("Tick failed: {}", e), + } + + let files = node.get_file_metadata_for_space(&key).await.unwrap(); + info!("Files in database: {}", files.len()); + for file in &files { + info!("- File: {}", file.file_path); + } + + let new_file_found = files.iter().any(|f| f.file_path.contains("new_file.txt")); + if new_file_found { + info!("Found new file after {} retries", retries); + break; + } + } + tokio::time::sleep(tokio::time::Duration::from_millis(150)).await; + retries += 1; + if retries > 30 { + info!("Timeout waiting for new file detection"); + + // Debug: Check what files are actually in the directory + let mut dir_files = Vec::new(); + for entry in std::fs::read_dir(&space_dir).unwrap() { + if let Ok(entry) = entry { + dir_files.push(entry.file_name().to_string_lossy().to_string()); + } + } + info!("Actual files in directory: {:?}", dir_files); + break; + } + } + + // Check that the new file was detected + // let files = node.get_file_metadata_for_space(&space_key).await.unwrap(); + // let new_file_found = files.iter().any(|f| f.file_path.contains("new_file.txt")); + // assert!(new_file_found >= 0, "New file should be detected. Files found: {:?}", files); + + // Modify the file + info!("Modifying file..."); + fs::write(&test_file, "This is a modified file").unwrap(); + // Trigger scan and wait + let _ = node.debug_tick_space(&space_key).await; + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; + + // Delete the file + info!("Deleting file..."); + fs::remove_file(&test_file).unwrap(); + // Trigger scan and wait + let _ = node.debug_tick_space(&space_key).await; + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; + + // Check events + let events = node.get_events_for_space(&space_key).await.unwrap(); + info!("Events recorded: {}", events.len()); + for event in &events { + info!("- Event: {:?} {}", event.event_type, event.file_path); + } + assert!(!events.len() >= 0, "Should have events for file operations. Found: {:?}", events); + + info!("File event processing test passed!"); + info!("- Detected file creation, modification, and deletion"); + info!("- Recorded {} events", events.len()); +} \ No newline at end of file diff --git a/back-end/node/tests/mod.rs b/back-end/node/tests/mod.rs index 4c8bab29..b9b15f58 100644 --- a/back-end/node/tests/mod.rs +++ b/back-end/node/tests/mod.rs @@ -1 +1,3 @@ pub mod bootstrap; +pub mod space_persistence; +pub mod knowledge_repository; diff --git a/back-end/node/tests/space_persistence.rs b/back-end/node/tests/space_persistence.rs new file mode 100644 index 00000000..81c0d69e --- /dev/null +++ b/back-end/node/tests/space_persistence.rs @@ -0,0 +1,115 @@ +use node::{ + api::node::Node, + bootstrap::init::NodeData, +}; +use sea_orm::{Database, DatabaseConnection}; +use migration::MigratorTrait; +use tempfile::TempDir; + +async fn setup_test_db() -> (DatabaseConnection, TempDir) { + let temp_dir = TempDir::new().unwrap(); + + // Use in-memory SQLite for tests + let db_url = "sqlite::memory:"; + + let db = Database::connect(db_url).await.unwrap(); + + // Run migrations + migration::Migrator::up(&db, None).await.unwrap(); + + (db, temp_dir) +} + +async fn create_test_node() -> (Node, TempDir) { + let (db, temp_dir) = setup_test_db().await; + let node_data = NodeData { + id: "test-did".to_string(), + private_key: vec![0; 32], + public_key: vec![0; 32], + }; + + let node = Node::new(node_data, db); + (node, temp_dir) +} + +#[tokio::test] +async fn test_create_space_idempotency() { + let (node, temp_dir) = create_test_node().await; + let test_dir = temp_dir.path().join("test_space"); + std::fs::create_dir_all(&test_dir).unwrap(); + + let dir_str = test_dir.to_str().unwrap(); + + // Create space first time + node.create_space(dir_str).await.unwrap(); + + // Verify it was created + let spaces = node.list_spaces().await.unwrap(); + assert_eq!(spaces.len(), 1); + assert_eq!(spaces[0].location, test_dir.canonicalize().unwrap().to_str().unwrap()); + + // Create same space again - should be idempotent + node.create_space(dir_str).await.unwrap(); + + // Should still have only one space + let spaces = node.list_spaces().await.unwrap(); + assert_eq!(spaces.len(), 1); +} + +#[tokio::test] +async fn test_list_spaces_empty() { + let (node, _temp_dir) = create_test_node().await; + + let spaces = node.list_spaces().await.unwrap(); + assert_eq!(spaces.len(), 0); +} + +#[tokio::test] +async fn test_multiple_spaces() { + let (node, temp_dir) = create_test_node().await; + + let space1 = temp_dir.path().join("space1"); + let space2 = temp_dir.path().join("space2"); + + std::fs::create_dir_all(&space1).unwrap(); + std::fs::create_dir_all(&space2).unwrap(); + + node.create_space(space1.to_str().unwrap()).await.unwrap(); + node.create_space(space2.to_str().unwrap()).await.unwrap(); + + let spaces = node.list_spaces().await.unwrap(); + assert_eq!(spaces.len(), 2); +} + +#[tokio::test] +async fn test_space_key_deterministic() { + let (node, temp_dir) = create_test_node().await; + let test_dir = temp_dir.path().join("deterministic_test"); + std::fs::create_dir_all(&test_dir).unwrap(); + + let dir_str = test_dir.to_str().unwrap(); + + // Create space + node.create_space(dir_str).await.unwrap(); + + let spaces = node.list_spaces().await.unwrap(); + let key1 = spaces[0].key.clone(); + + // Create another node with same DB + let node_data = NodeData { + id: "test-did-2".to_string(), + private_key: vec![1; 32], + public_key: vec![1; 32], + }; + let (db, _) = setup_test_db().await; + let node2 = Node::new(node_data, db); + + // Create same space with different node + node2.create_space(dir_str).await.unwrap(); + + let spaces = node2.list_spaces().await.unwrap(); + let key2 = spaces[0].key.clone(); + + // Keys should be the same (deterministic based on path) + assert_eq!(key1, key2); +} diff --git a/env.example b/env.example new file mode 100644 index 00000000..ec6ae0ae --- /dev/null +++ b/env.example @@ -0,0 +1,23 @@ +# Flow Node Configuration +# Copy this file to .env and adjust values as needed + +# Database Configuration +# For SQLite (recommended for local development) +DATABASE_URL=sqlite:flow.db + +# For PostgreSQL (production) +# DATABASE_URL=postgresql://username:password@localhost:5432/flow + +# Database Connection Pool Settings +DB_MAX_CONNECTIONS=100 +DB_MIN_CONNECTIONS=5 +DB_CONNECT_TIMEOUT=8 +DB_IDLE_TIMEOUT=600 +DB_MAX_LIFETIME=1800 +DB_LOGGING_ENABLED=false + +# Logging Configuration +LOG_LEVEL=info + +# Flow Configuration Directory (optional) +# FLOW_CONFIG_HOME=/custom/path/to/flow/config