-
Notifications
You must be signed in to change notification settings - Fork 144
feat: add CatalogProviderList support #1363
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
timsaucer
wants to merge
3
commits into
apache:main
Choose a base branch
from
timsaucer:feat/catalog-provider-list
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+494
−20
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,13 +38,61 @@ | |
|
|
||
| __all__ = [ | ||
| "Catalog", | ||
| "CatalogList", | ||
| "CatalogProvider", | ||
| "CatalogProviderList", | ||
| "Schema", | ||
| "SchemaProvider", | ||
| "Table", | ||
| ] | ||
|
|
||
|
|
||
| class CatalogList: | ||
| """DataFusion data catalog list.""" | ||
|
|
||
| def __init__(self, catalog_list: df_internal.catalog.RawCatalogList) -> None: | ||
| """This constructor is not typically called by the end user.""" | ||
| self.catalog_list = catalog_list | ||
|
|
||
| def __repr__(self) -> str: | ||
| """Print a string representation of the catalog list.""" | ||
| return self.catalog_list.__repr__() | ||
|
|
||
| def names(self) -> set[str]: | ||
| """This is an alias for `catalog_names`.""" | ||
| return self.catalog_names() | ||
|
|
||
| def catalog_names(self) -> set[str]: | ||
| """Returns the list of schemas in this catalog.""" | ||
| return self.catalog_list.catalog_names() | ||
|
|
||
| @staticmethod | ||
| def memory_catalog(ctx: SessionContext | None = None) -> CatalogList: | ||
| """Create an in-memory catalog provider list.""" | ||
| catalog_list = df_internal.catalog.RawCatalogList.memory_catalog(ctx) | ||
| return CatalogList(catalog_list) | ||
|
|
||
| def catalog(self, name: str = "datafusion") -> Schema: | ||
| """Returns the catalog with the given ``name`` from this catalog.""" | ||
| catalog = self.catalog_list.catalog(name) | ||
|
|
||
| return ( | ||
| Catalog(catalog) | ||
| if isinstance(catalog, df_internal.catalog.RawCatalog) | ||
| else catalog | ||
| ) | ||
|
|
||
| def register_catalog( | ||
| self, | ||
| name: str, | ||
| catalog: Catalog | CatalogProvider | CatalogProviderExportable, | ||
| ) -> Catalog | None: | ||
| """Register a catalog with this catalog list.""" | ||
| if isinstance(catalog, Catalog): | ||
| return self.catalog_list.register_catalog(name, catalog.catalog) | ||
| return self.catalog_list.register_catalog(name, catalog) | ||
|
|
||
|
|
||
| class Catalog: | ||
| """DataFusion data catalog.""" | ||
|
|
||
|
|
@@ -195,6 +243,38 @@ def kind(self) -> str: | |
| return self._inner.kind | ||
|
|
||
|
|
||
| class CatalogProviderList(ABC): | ||
| """Abstract class for defining a Python based Catalog Provider List.""" | ||
|
|
||
| @abstractmethod | ||
| def catalog_names(self) -> set[str]: | ||
| """Set of the names of all catalogs in this catalog list.""" | ||
| ... | ||
|
|
||
| @abstractmethod | ||
| def catalog(self, name: str) -> Catalog | None: | ||
| """Retrieve a specific catalog from this catalog list.""" | ||
| ... | ||
|
|
||
| def register_catalog( # noqa: B027 | ||
| self, name: str, catalog: CatalogProviderExportable | CatalogProvider | Catalog | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also is the |
||
| ) -> None: | ||
| """Add a catalog to this catalog list. | ||
|
|
||
| This method is optional. If your catalog provides a fixed list of catalogs, you | ||
| do not need to implement this method. | ||
| """ | ||
|
|
||
|
|
||
| class CatalogProviderListExportable(Protocol): | ||
| """Type hint for object that has __datafusion_catalog_provider_list__ PyCapsule. | ||
|
|
||
| https://docs.rs/datafusion/latest/datafusion/catalog/trait.CatalogProviderList.html | ||
| """ | ||
|
|
||
| def __datafusion_catalog_provider_list__(self, session: Any) -> object: ... | ||
|
|
||
|
|
||
| class CatalogProvider(ABC): | ||
| """Abstract class for defining a Python based Catalog Provider.""" | ||
|
|
||
|
|
@@ -229,6 +309,15 @@ def deregister_schema(self, name: str, cascade: bool) -> None: # noqa: B027 | |
| """ | ||
|
|
||
|
|
||
| class CatalogProviderExportable(Protocol): | ||
| """Type hint for object that has __datafusion_catalog_provider__ PyCapsule. | ||
|
|
||
| https://docs.rs/datafusion/latest/datafusion/catalog/trait.CatalogProvider.html | ||
| """ | ||
|
|
||
| def __datafusion_catalog_provider__(self, session: Any) -> object: ... | ||
|
|
||
|
|
||
| class SchemaProvider(ABC): | ||
| """Abstract class for defining a Python based Schema Provider.""" | ||
|
|
||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,11 +16,16 @@ | |
| # under the License. | ||
| from __future__ import annotations | ||
|
|
||
| from typing import TYPE_CHECKING | ||
|
|
||
| import datafusion as dfn | ||
| import pyarrow as pa | ||
| import pyarrow.dataset as ds | ||
| import pytest | ||
| from datafusion import SessionContext, Table, udtf | ||
| from datafusion import Catalog, SessionContext, Table, udtf | ||
|
|
||
| if TYPE_CHECKING: | ||
| from datafusion.catalog import CatalogProvider, CatalogProviderExportable | ||
|
|
||
|
|
||
| # Note we take in `database` as a variable even though we don't use | ||
|
|
@@ -93,6 +98,34 @@ def deregister_schema(self, name, cascade: bool): | |
| del self.schemas[name] | ||
|
|
||
|
|
||
| class CustomCatalogProviderList(dfn.catalog.CatalogProviderList): | ||
| def __init__(self): | ||
| self.catalogs = {"my_catalog": CustomCatalogProvider()} | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe this could have an additional catalog just to show it supports multiple ones. |
||
|
|
||
| def catalog_names(self) -> set[str]: | ||
| return set(self.catalogs.keys()) | ||
|
|
||
| def catalog(self, name: str) -> Catalog | None: | ||
| return self.catalogs[name] | ||
|
|
||
| def register_catalog( | ||
| self, name: str, catalog: CatalogProviderExportable | CatalogProvider | Catalog | ||
| ) -> None: | ||
| self.catalogs[name] = catalog | ||
|
|
||
|
|
||
| def test_python_catalog_provider_list(ctx: SessionContext): | ||
| ctx.register_catalog_provider_list(CustomCatalogProviderList()) | ||
|
|
||
| # Ensure `datafusion` catalog does not exist since | ||
| # we replaced the catalog list | ||
| assert ctx.catalog_names() == {"my_catalog"} | ||
|
|
||
| # Ensure registering works | ||
| ctx.register_catalog_provider("second_catalog", CustomCatalogProvider()) | ||
| assert ctx.catalog_names() == {"my_catalog", "second_catalog"} | ||
|
|
||
|
|
||
| def test_python_catalog_provider(ctx: SessionContext): | ||
| ctx.register_catalog_provider("my_catalog", CustomCatalogProvider()) | ||
|
|
||
|
|
||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't the return type here be
CatalogProvider?