From 4cb5e433c11a2521496fb73f34799d61e496a45f Mon Sep 17 00:00:00 2001 From: luoyuxia Date: Thu, 12 Feb 2026 23:15:53 +0800 Subject: [PATCH] feat: introduce catalog api --- crates/paimon/src/catalog/mod.rs | 212 +++++++++++++++++++++++++++++++ crates/paimon/src/error.rs | 18 +++ crates/paimon/src/lib.rs | 2 + crates/paimon/src/spec/schema.rs | 4 +- crates/paimon/src/table.rs | 22 ++++ 5 files changed, 256 insertions(+), 2 deletions(-) create mode 100644 crates/paimon/src/catalog/mod.rs create mode 100644 crates/paimon/src/table.rs diff --git a/crates/paimon/src/catalog/mod.rs b/crates/paimon/src/catalog/mod.rs new file mode 100644 index 0000000..d416d43 --- /dev/null +++ b/crates/paimon/src/catalog/mod.rs @@ -0,0 +1,212 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Catalog API for Apache Paimon. +//! +//! Design aligns with [Paimon Java Catalog](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java) +//! and follows API patterns from Apache Iceberg Rust. + +use std::collections::HashMap; +use std::fmt; + +use serde::{Deserialize, Serialize}; + +/// Splitter for system table names (e.g. `table$snapshots`). +#[allow(dead_code)] +pub const SYSTEM_TABLE_SPLITTER: &str = "$"; +/// Prefix for branch in object name (e.g. `table$branch_foo`). +#[allow(dead_code)] +pub const SYSTEM_BRANCH_PREFIX: &str = "branch_"; +/// Default main branch name. +#[allow(dead_code)] +pub const DEFAULT_MAIN_BRANCH: &str = "main"; +/// Database value when the database is not known; [`Identifier::full_name`] returns only the object. +pub const UNKNOWN_DATABASE: &str = "unknown"; + +// ======================= Identifier =============================== + +/// Identifies a catalog object (e.g. a table) by database and object name. +/// +/// Corresponds to [org.apache.paimon.catalog.Identifier](https://github.com/apache/paimon/blob/release-1.3/paimon-api/src/main/java/org/apache/paimon/catalog/Identifier.java). +/// The object name may be a table name or a qualified name like `table$branch_foo` or +/// `table$snapshots` for system tables. +#[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Identifier { + /// Database name. + database: String, + /// Object name (table name, or table$branch$system for system tables). + object: String, +} + +impl Identifier { + /// Create an identifier from database and object name. + pub fn new(database: impl Into, object: impl Into) -> Self { + Self { + database: database.into(), + object: object.into(), + } + } + + /// Database name. + pub fn database(&self) -> &str { + &self.database + } + + /// Full object name (table name, or with branch/system suffix). + pub fn object(&self) -> &str { + &self.object + } + + /// Full name: when database is [`UNKNOWN_DATABASE`], returns only the object; + /// otherwise returns `database.object`. + pub fn full_name(&self) -> String { + if self.database == UNKNOWN_DATABASE { + self.object.clone() + } else { + format!("{}.{}", self.database, self.object) + } + } +} + +impl fmt::Display for Identifier { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.full_name()) + } +} + +impl fmt::Debug for Identifier { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Identifier") + .field("database", &self.database) + .field("object", &self.object) + .finish() + } +} + +// ======================= Catalog trait =============================== + +use async_trait::async_trait; + +use crate::spec::{Schema, SchemaChange}; +use crate::table::Table; +use crate::Result; + +/// Catalog API for reading and writing metadata (databases, tables) in Paimon. +/// +/// Corresponds to [org.apache.paimon.catalog.Catalog](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java). +#[async_trait] +pub trait Catalog: Send + Sync { + // ======================= database methods =============================== + + /// List names of all databases in this catalog. + /// + /// # Errors + /// Implementations may return other errors (e.g. I/O or backend-specific). + async fn list_databases(&self) -> Result>; + + /// Create a database. + /// + /// * `ignore_if_exists` - if true, do nothing when the database already exists; + /// if false, return [`crate::Error::DatabaseAlreadyExist`]. + /// + /// # Errors + /// * [`crate::Error::DatabaseAlreadyExist`] - database already exists when `ignore_if_exists` is false. + async fn create_database( + &self, + name: &str, + ignore_if_exists: bool, + properties: HashMap, + ) -> Result<()>; + + /// Drop a database. + /// + /// * `ignore_if_not_exists` - if true, do nothing when the database does not exist. + /// * `cascade` - if true, delete all tables in the database then delete the database; + /// if false, return [`crate::Error::DatabaseNotEmpty`] when not empty. + /// + /// # Errors + /// * [`crate::Error::DatabaseNotExist`] - database does not exist when `ignore_if_not_exists` is false. + /// * [`crate::Error::DatabaseNotEmpty`] - database is not empty when `cascade` is false. + async fn drop_database( + &self, + name: &str, + ignore_if_not_exists: bool, + cascade: bool, + ) -> Result<()>; + + // ======================= table methods =============================== + + /// Get table metadata for the given identifier. + /// + /// # Errors + /// * [`crate::Error::DatabaseNotExist`] - database in identifier does not exist. + /// * [`crate::Error::TableNotExist`] - table does not exist. + async fn get_table(&self, identifier: &Identifier) -> Result; + + /// List table names in a database. System tables are not listed. + /// + /// # Errors + /// * [`crate::Error::DatabaseNotExist`] - database does not exist. + async fn list_tables(&self, database_name: &str) -> Result>; + + /// Create a table. + /// + /// * `ignore_if_exists` - if true, do nothing when the table already exists; + /// if false, return [`crate::Error::TableAlreadyExist`]. + /// + /// # Errors + /// * [`crate::Error::DatabaseNotExist`] - database in identifier does not exist. + /// * [`crate::Error::TableAlreadyExist`] - table already exists when `ignore_if_exists` is false. + async fn create_table( + &self, + identifier: &Identifier, + creation: Schema, + ignore_if_exists: bool, + ) -> Result<()>; + + /// Drop a table. System tables cannot be dropped. + /// + /// # Errors + /// * [`crate::Error::TableNotExist`] - table does not exist when `ignore_if_not_exists` is false. + async fn drop_table(&self, identifier: &Identifier, ignore_if_not_exists: bool) -> Result<()>; + + /// Rename a table. + /// + /// # Errors + /// * [`crate::Error::TableNotExist`] - source table does not exist when `ignore_if_not_exists` is false. + /// * [`crate::Error::TableAlreadyExist`] - target table already exists. + async fn rename_table( + &self, + from: &Identifier, + to: &Identifier, + ignore_if_not_exists: bool, + ) -> Result<()>; + + /// Apply schema changes to a table. + /// + /// # Errors + /// * [`crate::Error::TableNotExist`] - table does not exist when `ignore_if_not_exists` is false. + /// * [`crate::Error::ColumnAlreadyExist`] - adding a column that already exists. + /// * [`crate::Error::ColumnNotExist`] - altering or dropping a column that does not exist. + async fn alter_table( + &self, + identifier: &Identifier, + changes: Vec, + ignore_if_not_exists: bool, + ) -> Result<()>; +} diff --git a/crates/paimon/src/error.rs b/crates/paimon/src/error.rs index 1d7b50b..10c1f4e 100644 --- a/crates/paimon/src/error.rs +++ b/crates/paimon/src/error.rs @@ -66,6 +66,24 @@ pub enum Error { display("Paimon hitting invalid file index format: {}", message) )] FileIndexFormatInvalid { message: String }, + + // ======================= catalog errors =============================== + #[snafu(display("Database {} already exists.", database))] + DatabaseAlreadyExist { database: String }, + #[snafu(display("Database {} does not exist.", database))] + DatabaseNotExist { database: String }, + #[snafu(display("Database {} is not empty.", database))] + DatabaseNotEmpty { database: String }, + #[snafu(display("Table {} already exists.", full_name))] + TableAlreadyExist { full_name: String }, + #[snafu(display("Table {} does not exist.", full_name))] + TableNotExist { full_name: String }, + #[snafu(display("Column {} already exists in table {}.", column, full_name))] + ColumnAlreadyExist { full_name: String, column: String }, + #[snafu(display("Column {} does not exist in table {}.", column, full_name))] + ColumnNotExist { full_name: String, column: String }, + #[snafu(display("Invalid identifier: {}", message))] + IdentifierInvalid { message: String }, } impl From for Error { diff --git a/crates/paimon/src/lib.rs b/crates/paimon/src/lib.rs index 5296a02..5a7fb66 100644 --- a/crates/paimon/src/lib.rs +++ b/crates/paimon/src/lib.rs @@ -19,6 +19,8 @@ mod error; pub use error::Error; pub use error::Result; +pub mod catalog; pub mod file_index; pub mod io; pub mod spec; +mod table; diff --git a/crates/paimon/src/spec/schema.rs b/crates/paimon/src/spec/schema.rs index 1729ea3..927a51e 100644 --- a/crates/paimon/src/spec/schema.rs +++ b/crates/paimon/src/spec/schema.rs @@ -111,7 +111,7 @@ pub const PARTITION_OPTION: &str = "partition"; /// Schema of a table (logical DDL schema). /// -/// Corresponds to [org.apache.paimon.schema.Schema](https://github.com/apache/paimon/blob/1.3/paimon-api/src/main/java/org/apache/paimon/schema/Schema.java). +/// Corresponds to [org.apache.paimon.schema.Schema](https://github.com/apache/paimon/blob/release-1.3/paimon-api/src/main/java/org/apache/paimon/schema/Schema.java). #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct Schema { @@ -124,7 +124,7 @@ pub struct Schema { impl Schema { /// Build a schema with validation. Normalizes partition/primary keys from options if present. - pub fn new( + fn new( fields: Vec, partition_keys: Vec, primary_keys: Vec, diff --git a/crates/paimon/src/table.rs b/crates/paimon/src/table.rs new file mode 100644 index 0000000..5e8d262 --- /dev/null +++ b/crates/paimon/src/table.rs @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Table API for Apache Paimon + +/// Table represents a table in the catalog. +#[derive(Debug, Clone)] +pub struct Table {}