-
Notifications
You must be signed in to change notification settings - Fork 0
Add: RenameRequest for entry (LDAP object) #918
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Changes from all commits
1af18af
7de6ccd
3afa37b
944ce52
66fc7dd
a278193
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,85 @@ | ||
| """Schemas for main router. | ||
|
|
||
| Copyright (c) 2024 MultiFactor | ||
| License: https://github.com/MultiDirectoryLab/MultiDirectory/blob/main/LICENSE | ||
| """ | ||
|
|
||
| from dishka import AsyncContainer | ||
| from pydantic import BaseModel | ||
| from sqlalchemy.ext.asyncio import AsyncSession | ||
|
|
||
| from ldap_protocol.ldap_requests import ( | ||
| ModifyDNRequest as LDAPModifyDNRequest, | ||
| ModifyRequest as LDAPModifyRequest, | ||
| ) | ||
| from ldap_protocol.ldap_responses import LDAPResult | ||
| from ldap_protocol.objects import Changes | ||
|
|
||
|
|
||
| class RenameRequest(BaseModel): | ||
| """Rename request schema. | ||
|
|
||
| Combines ModifyDN and Modify operations. | ||
| """ | ||
|
|
||
| object: str | ||
| newrdn: str | ||
| changes: list[Changes] | ||
|
|
||
| @property | ||
| def _new_object(self) -> str: | ||
| return f"{self.newrdn},{','.join(self.object.split(',')[1:])}" | ||
|
|
||
| @property | ||
| def _oldrdn(self) -> str: | ||
| return self.object.split(",")[0] | ||
|
|
||
| async def _modify_dn_request( | ||
| self, | ||
| container: AsyncContainer, | ||
| entry: str, | ||
| newrdn: str, | ||
| ) -> LDAPResult: | ||
| modify_dn_request = LDAPModifyDNRequest( | ||
| entry=entry, | ||
| newrdn=newrdn, | ||
| deleteoldrdn=True, | ||
| new_superior=None, | ||
| ) | ||
| return await modify_dn_request.handle_api(container) | ||
|
|
||
| async def _expire_session_objects(self, container: AsyncContainer) -> None: | ||
| session = await container.get(AsyncSession) | ||
| session.expire_all() | ||
|
|
||
| async def _modify_request(self, container: AsyncContainer) -> LDAPResult: | ||
| modify_request = LDAPModifyRequest( | ||
| object=self._new_object, | ||
| changes=self.changes, | ||
| ) | ||
| return await modify_request.handle_api(container) | ||
|
|
||
| async def handle_api(self, container: AsyncContainer) -> LDAPResult: | ||
| """Handle RenameRequest by executing ModifyDN then Modify. | ||
|
|
||
| If ModifyRequest fails, rollback the ModifyDnRequest and return error. | ||
| """ | ||
| modify_dn_response = await self._modify_dn_request( | ||
| container, | ||
| self.object, | ||
| self.newrdn, | ||
| ) | ||
| if not modify_dn_response or modify_dn_response.result_code != 0: | ||
| return modify_dn_response | ||
|
|
||
| await self._expire_session_objects(container) | ||
|
|
||
| modify_response = await self._modify_request(container) | ||
| if not modify_response or modify_response.result_code != 0: | ||
| await self._modify_dn_request( | ||
| container, | ||
| self._new_object, | ||
| self._oldrdn, | ||
| ) | ||
|
|
||
| return modify_response |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,138 @@ | ||
| """Test API Rename. | ||
|
|
||
| Copyright (c) 2026 MultiFactor | ||
| License: https://github.com/MultiDirectoryLab/MultiDirectory/blob/main/LICENSE | ||
| """ | ||
|
|
||
| import pytest | ||
| from httpx import AsyncClient | ||
|
|
||
| from ldap_protocol.ldap_codes import LDAPCodes | ||
| from ldap_protocol.ldap_requests.modify import Operation | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| @pytest.mark.usefixtures("adding_test_user") | ||
| @pytest.mark.usefixtures("setup_session") | ||
| @pytest.mark.usefixtures("session") | ||
| async def test_api_correct_rename_user(http_client: AsyncClient) -> None: | ||
| response = await http_client.put( | ||
| "/entry/rename", | ||
| json={ | ||
| "object": "cn=test,dc=md,dc=test", | ||
| "newrdn": "cn=admin2", | ||
| "changes": [ | ||
| { | ||
| "operation": Operation.REPLACE, | ||
| "modification": { | ||
| "type": "sAMAccountName", | ||
| "vals": ["admin2"], | ||
| }, | ||
| }, | ||
| { | ||
| "operation": Operation.REPLACE, | ||
| "modification": { | ||
| "type": "displayName", | ||
| "vals": ["Administrator"], | ||
| }, | ||
| }, | ||
| ], | ||
| }, | ||
| ) | ||
|
|
||
| data = response.json() | ||
| assert isinstance(data, dict) | ||
| assert data.get("resultCode") == LDAPCodes.SUCCESS | ||
|
|
||
| response = await http_client.post( | ||
| "entry/search", | ||
milov-dmitriy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| json={ | ||
| "base_object": "cn=admin2,dc=md,dc=test", | ||
| "scope": 0, | ||
| "deref_aliases": 0, | ||
| "size_limit": 1000, | ||
| "time_limit": 10, | ||
| "types_only": True, | ||
milov-dmitriy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| "filter": "(objectClass=*)", | ||
| "attributes": ["*"], | ||
| "page_number": 1, | ||
| }, | ||
| ) | ||
|
|
||
| data = response.json() | ||
| assert data["resultCode"] == LDAPCodes.SUCCESS | ||
| assert data["search_result"][0]["object_name"] == "cn=admin2,dc=md,dc=test" | ||
|
|
||
| for attr in data["search_result"][0]["partial_attributes"]: | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. тут тоже можно сжать. либо собрать в 1 цикл, либо сделать функцию проверки и снизу тоже заюзать
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. выписал это и отдельным пр сделаю по всем тестам
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. из вариантов такое могу предложить, вролде должно работать sAMAccountName_found = False
displayName_found = False
for attr in data["search_result"][0]["partial_attributes"]:
if attr["type"] == "sAMAccountName":
assert attr["vals"][0] == "admin2"
sAMAccountName_found = True
elif attr["type"] == "displayName":
assert attr["vals"][0] == "Administrator"
displayName_found = True
if sAMAccountName_found and displayName_found:
break
if not sAMAccountName_found:
raise Exception("User without sAMAccountName")
if not displayName_found:
raise Exception("User without displayName")
#----------------------------
attrs_dict = {attr["type"]: attr["vals"][0]
for attr in data["search_result"][0]["partial_attributes"]}
assert attrs_dict.get("sAMAccountName") == "admin2"
assert attrs_dict.get("displayName") == "Administrator" |
||
| if attr["type"] == "sAMAccountName": | ||
| assert attr["vals"][0] == "admin2" | ||
milov-dmitriy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| break | ||
| else: | ||
| raise Exception("User without sAMAccountName") | ||
milov-dmitriy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| for attr in data["search_result"][0]["partial_attributes"]: | ||
| if attr["type"] == "displayName": | ||
| assert attr["vals"][0] == "Administrator" | ||
| break | ||
| else: | ||
| raise Exception("User without displayName") | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| @pytest.mark.usefixtures("adding_test_computer") | ||
| @pytest.mark.usefixtures("setup_session") | ||
| @pytest.mark.usefixtures("session") | ||
| async def test_api_correct_rename_computer(http_client: AsyncClient) -> None: | ||
| response = await http_client.put( | ||
| "/entry/rename", | ||
| json={ | ||
| "object": "cn=mycomputer,dc=md,dc=test", | ||
| "newrdn": "cn=maincomputer", | ||
| "changes": [ | ||
| { | ||
| "operation": Operation.REPLACE, | ||
| "modification": { | ||
| "type": "sAMAccountName", | ||
| "vals": ["__invalid name for error__"], | ||
| }, | ||
| }, | ||
| { | ||
| "operation": Operation.REPLACE, | ||
| "modification": { | ||
| "type": "displayName", | ||
| "vals": ["Main Computer"], | ||
| }, | ||
| }, | ||
| ], | ||
| }, | ||
| ) | ||
|
|
||
| data = response.json() | ||
| assert isinstance(data, dict) | ||
| assert data.get("resultCode") == LDAPCodes.UNDEFINED_ATTRIBUTE_TYPE | ||
|
|
||
| response = await http_client.post( | ||
| "entry/search", | ||
milov-dmitriy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| json={ | ||
| "base_object": "cn=mycomputer,dc=md,dc=test", | ||
| "scope": 0, | ||
| "deref_aliases": 0, | ||
| "size_limit": 1000, | ||
| "time_limit": 10, | ||
| "types_only": True, | ||
| "filter": "(objectClass=*)", | ||
| "attributes": ["*"], | ||
| "page_number": 1, | ||
| }, | ||
| ) | ||
|
|
||
| data = response.json() | ||
| assert data["resultCode"] == LDAPCodes.SUCCESS | ||
| assert data["search_result"][0]["object_name"] == "cn=mycomputer,dc=md,dc=test" # noqa: E501 # fmt: skip | ||
|
|
||
| for attr in data["search_result"][0]["partial_attributes"]: | ||
| if attr["type"] == "name": | ||
| assert attr["vals"][0] == "mycomputer name" | ||
| break | ||
| else: | ||
| raise Exception("Computer without name") | ||
Uh oh!
There was an error while loading. Please reload this page.