Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions app/api/main/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
DeleteRequest,
ModifyDNRequest,
ModifyRequest,
RenameRequest,
)
from ldap_protocol.ldap_responses import LDAPResult
from ldap_protocol.utils.queries import set_or_update_primary_group
Expand Down Expand Up @@ -123,6 +124,15 @@ async def modify_dn_many(
return results


@entry_router.put("/rename", error_map=error_map)
async def rename(
request: RenameRequest,
req: Request,
) -> LDAPResult:
"""LDAP rename entry request."""
return await request.handle_api(req.state.dishka_container)


@entry_router.delete("/delete", error_map=error_map)
async def delete(
request: DeleteRequest,
Expand Down
3 changes: 2 additions & 1 deletion app/ldap_protocol/ldap_requests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from .extended import ExtendedRequest
from .modify import ModifyRequest
from .modify_dn import ModifyDNRequest
from .rename import RenameRequest
from .search import SearchRequest

requests: list[type[BaseRequest]] = [
Expand All @@ -32,4 +33,4 @@
}


__all__ = ["protocol_id_map", "BaseRequest"]
__all__ = ["protocol_id_map", "BaseRequest", "RenameRequest"]
85 changes: 85 additions & 0 deletions app/ldap_protocol/ldap_requests/rename.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
"""Schemas for main router.

Copyright (c) 2024 MultiFactor
License: https://github.com/MultiDirectoryLab/MultiDirectory/blob/main/LICENSE
"""

from dishka import AsyncContainer
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession

from ldap_protocol.ldap_requests import (
ModifyDNRequest as LDAPModifyDNRequest,
ModifyRequest as LDAPModifyRequest,
)
from ldap_protocol.ldap_responses import LDAPResult
from ldap_protocol.objects import Changes


class RenameRequest(BaseModel):
"""Rename request schema.

Combines ModifyDN and Modify operations.
"""

object: str
newrdn: str
changes: list[Changes]

@property
def _new_object(self) -> str:
return f"{self.newrdn},{','.join(self.object.split(',')[1:])}"

@property
def _oldrdn(self) -> str:
return self.object.split(",")[0]

async def _modify_dn_request(
self,
container: AsyncContainer,
entry: str,
newrdn: str,
) -> LDAPResult:
modify_dn_request = LDAPModifyDNRequest(
entry=entry,
newrdn=newrdn,
deleteoldrdn=True,
new_superior=None,
)
return await modify_dn_request.handle_api(container)

async def _expire_session_objects(self, container: AsyncContainer) -> None:
session = await container.get(AsyncSession)
session.expire_all()

async def _modify_request(self, container: AsyncContainer) -> LDAPResult:
modify_request = LDAPModifyRequest(
object=self._new_object,
changes=self.changes,
)
return await modify_request.handle_api(container)

async def handle_api(self, container: AsyncContainer) -> LDAPResult:
"""Handle RenameRequest by executing ModifyDN then Modify.

If ModifyRequest fails, rollback the ModifyDnRequest and return error.
"""
modify_dn_response = await self._modify_dn_request(
container,
self.object,
self.newrdn,
)
if not modify_dn_response or modify_dn_response.result_code != 0:
return modify_dn_response

await self._expire_session_objects(container)

modify_response = await self._modify_request(container)
if not modify_response or modify_response.result_code != 0:
await self._modify_dn_request(
container,
self._new_object,
self._oldrdn,
)

return modify_response
2 changes: 1 addition & 1 deletion interface
24 changes: 24 additions & 0 deletions tests/test_api/test_main/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,30 @@ async def adding_test_user(
assert auth.cookies.get("id")


@pytest_asyncio.fixture(scope="function")
async def adding_test_computer(
http_client: AsyncClient,
) -> None:
"""Test api correct (name) add."""
response = await http_client.post(
"/entry/add",
json={
"entry": "cn=mycomputer,dc=md,dc=test",
"password": None,
"attributes": [
{"type": "name", "vals": ["mycomputer name"]},
{"type": "cn", "vals": ["mycomputer"]},
{"type": "objectClass", "vals": ["computer", "top"]},
],
},
)

data = response.json()

assert isinstance(data, dict)
assert data.get("resultCode") == LDAPCodes.SUCCESS


@pytest_asyncio.fixture(scope="function")
async def add_dns_settings(
session: AsyncSession,
Expand Down
138 changes: 138 additions & 0 deletions tests/test_api/test_main/test_router/test_rename.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
"""Test API Rename.

Copyright (c) 2026 MultiFactor
License: https://github.com/MultiDirectoryLab/MultiDirectory/blob/main/LICENSE
"""

import pytest
from httpx import AsyncClient

from ldap_protocol.ldap_codes import LDAPCodes
from ldap_protocol.ldap_requests.modify import Operation


@pytest.mark.asyncio
@pytest.mark.usefixtures("adding_test_user")
@pytest.mark.usefixtures("setup_session")
@pytest.mark.usefixtures("session")
async def test_api_correct_rename_user(http_client: AsyncClient) -> None:
response = await http_client.put(
"/entry/rename",
json={
"object": "cn=test,dc=md,dc=test",
"newrdn": "cn=admin2",
"changes": [
{
"operation": Operation.REPLACE,
"modification": {
"type": "sAMAccountName",
"vals": ["admin2"],
},
},
{
"operation": Operation.REPLACE,
"modification": {
"type": "displayName",
"vals": ["Administrator"],
},
},
],
},
)

data = response.json()
assert isinstance(data, dict)
assert data.get("resultCode") == LDAPCodes.SUCCESS

response = await http_client.post(
"entry/search",
json={
"base_object": "cn=admin2,dc=md,dc=test",
"scope": 0,
"deref_aliases": 0,
"size_limit": 1000,
"time_limit": 10,
"types_only": True,
"filter": "(objectClass=*)",
"attributes": ["*"],
"page_number": 1,
},
)

data = response.json()
assert data["resultCode"] == LDAPCodes.SUCCESS
assert data["search_result"][0]["object_name"] == "cn=admin2,dc=md,dc=test"

for attr in data["search_result"][0]["partial_attributes"]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

тут тоже можно сжать. либо собрать в 1 цикл, либо сделать функцию проверки и снизу тоже заюзать

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

выписал это и отдельным пр сделаю по всем тестам

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

из вариантов такое могу предложить, вролде должно работать

sAMAccountName_found = False
displayName_found = False

for attr in data["search_result"][0]["partial_attributes"]:
    if attr["type"] == "sAMAccountName":
        assert attr["vals"][0] == "admin2"
        sAMAccountName_found = True
    elif attr["type"] == "displayName":
        assert attr["vals"][0] == "Administrator"
        displayName_found = True

    if sAMAccountName_found and displayName_found:
        break

if not sAMAccountName_found:
    raise Exception("User without sAMAccountName")
if not displayName_found:
    raise Exception("User without displayName")
#----------------------------
attrs_dict = {attr["type"]: attr["vals"][0] 
              for attr in data["search_result"][0]["partial_attributes"]}

assert attrs_dict.get("sAMAccountName") == "admin2"
assert attrs_dict.get("displayName") == "Administrator"

if attr["type"] == "sAMAccountName":
assert attr["vals"][0] == "admin2"
break
else:
raise Exception("User without sAMAccountName")

for attr in data["search_result"][0]["partial_attributes"]:
if attr["type"] == "displayName":
assert attr["vals"][0] == "Administrator"
break
else:
raise Exception("User without displayName")


@pytest.mark.asyncio
@pytest.mark.usefixtures("adding_test_computer")
@pytest.mark.usefixtures("setup_session")
@pytest.mark.usefixtures("session")
async def test_api_correct_rename_computer(http_client: AsyncClient) -> None:
response = await http_client.put(
"/entry/rename",
json={
"object": "cn=mycomputer,dc=md,dc=test",
"newrdn": "cn=maincomputer",
"changes": [
{
"operation": Operation.REPLACE,
"modification": {
"type": "sAMAccountName",
"vals": ["__invalid name for error__"],
},
},
{
"operation": Operation.REPLACE,
"modification": {
"type": "displayName",
"vals": ["Main Computer"],
},
},
],
},
)

data = response.json()
assert isinstance(data, dict)
assert data.get("resultCode") == LDAPCodes.UNDEFINED_ATTRIBUTE_TYPE

response = await http_client.post(
"entry/search",
json={
"base_object": "cn=mycomputer,dc=md,dc=test",
"scope": 0,
"deref_aliases": 0,
"size_limit": 1000,
"time_limit": 10,
"types_only": True,
"filter": "(objectClass=*)",
"attributes": ["*"],
"page_number": 1,
},
)

data = response.json()
assert data["resultCode"] == LDAPCodes.SUCCESS
assert data["search_result"][0]["object_name"] == "cn=mycomputer,dc=md,dc=test" # noqa: E501 # fmt: skip

for attr in data["search_result"][0]["partial_attributes"]:
if attr["type"] == "name":
assert attr["vals"][0] == "mycomputer name"
break
else:
raise Exception("Computer without name")