Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions netbox_custom_objects/api/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class Meta:
model = CustomObjectTypeField
fields = (
"id",
"url",
"name",
"label",
"custom_object_type",
Expand Down Expand Up @@ -77,6 +78,10 @@ class Meta:
"weight",
"is_cloneable",
"comments",
"schema_id",
"deprecated",
"deprecated_since",
"scheduled_removal",
)

def validate(self, attrs):
Expand Down Expand Up @@ -154,15 +159,18 @@ class Meta:
"verbose_name",
"verbose_name_plural",
"slug",
"version",
"group_name",
"description",
"tags",
"created",
"last_updated",
"fields",
"schema_document",
"table_model_name",
"object_type_name",
]
read_only_fields = ("schema_document",)
brief_fields = ("id", "url", "name", "slug", "description")

def get_table_model_name(self, obj):
Expand Down
224 changes: 224 additions & 0 deletions netbox_custom_objects/exporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
"""
Exporter for the COT portable schema format (issue #388).

Converts live CustomObjectType DB state into a schema document dict that
conforms to cot_schema_v1.json. The returned dict can be serialised to YAML
or JSON by the caller.

Public API
----------
export_cot(cot) → dict # single COT definition (no top-level wrapper)
export_cots(cots) → dict # full schema document { schema_version, types }

Notes
-----
- Fields without a schema_id (created before the schema-format feature) are
skipped with a WARNING log entry. They cannot be tracked across installs.
- Attribute values that equal FIELD_DEFAULTS are omitted to keep the output
minimal (round-trip safe: the importer re-applies the same defaults).
- Tombstones (removed_fields) are read from the COT's schema_document. Until
the apply endpoint (#390) is implemented this will always be empty; once
apply is wired up, deletions will be persisted there automatically.
"""

import logging
import re

from netbox_custom_objects import constants
from netbox_custom_objects.schema_format import (
CHOICES_TO_SCHEMA_TYPE,
CUSTOM_OBJECTS_APP_LABEL_SLUG,
FIELD_DEFAULTS,
FIELD_TYPE_ATTRS,
SCHEMA_FORMAT_VERSION,
)

logger = logging.getLogger(__name__)

# Matches the generated model name produced by CustomObjectType.get_table_model_name().
# Capturing group 1 is the numeric COT id.
_TABLE_MODEL_RE = re.compile(r'^table(\d+)model$', re.IGNORECASE)

# Ordered list of field_base attributes to check for non-default values.
# Type-specific attributes (validation_*, choice_set, related_*) are handled
# separately via FIELD_TYPE_ATTRS.
_BASE_ATTRS = (
"label",
"description",
"group_name",
"primary",
"required",
"unique",
"default",
"weight",
"search_weight",
"filter_logic",
"ui_visible",
"ui_editable",
"is_cloneable",
"deprecated",
"deprecated_since",
"scheduled_removal",
)


# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------

def _encode_related_object_type(rot) -> str:
"""
Encode an ObjectType FK as a schema ``related_object_type`` string.

Built-in NetBox objects → ``"<app_label>/<model>"`` (e.g. ``"dcim/device"``)
Custom Object Types → ``"custom-objects/<slug>"``
"""
if rot.app_label == constants.APP_LABEL:
m = _TABLE_MODEL_RE.match(rot.model)
if m:
# Avoid a circular import — import here so the module can be loaded
# independently of the full Django app stack in unit tests.
from netbox_custom_objects.models import CustomObjectType # noqa: PLC0415
cot_id = int(m.group(1))
slug = CustomObjectType.objects.values_list('slug', flat=True).get(pk=cot_id)
return f"{CUSTOM_OBJECTS_APP_LABEL_SLUG}/{slug}"
return f"{rot.app_label}/{rot.model}"


def _export_field(field) -> dict:
"""
Serialise a single ``CustomObjectTypeField`` instance to a schema field dict.

Raises ``ValueError`` if ``field.schema_id`` is ``None``; callers should
pre-filter or handle this case before calling this function.
"""
if field.schema_id is None:
raise ValueError(
f"Field {field.name!r} on COT {field.custom_object_type_id!r} "
"has no schema_id and cannot be exported."
)

schema_type = CHOICES_TO_SCHEMA_TYPE[field.type]

result = {
"id": field.schema_id,
"name": field.name,
"type": schema_type,
}

# ── Base attributes (omit when equal to documented defaults) ────────────
for attr in _BASE_ATTRS:
value = getattr(field, attr)
if value != FIELD_DEFAULTS.get(attr):
result[attr] = value

# ── Type-specific attributes ─────────────────────────────────────────────
for attr in sorted(FIELD_TYPE_ATTRS[schema_type]):
if attr == "choice_set":
# Required for select/multiselect; validate.
if field.choice_set is None:
raise ValueError(
f"Field {field.name!r} is type {schema_type!r} but has no choice_set assigned."
)
result["choice_set"] = field.choice_set.name
elif attr == "related_object_type":
# Required for object/multiobject; always present.
result["related_object_type"] = _encode_related_object_type(
field.related_object_type
)
elif attr == "related_object_filter":
value = field.related_object_filter
if value != FIELD_DEFAULTS.get("related_object_filter"):
result["related_object_filter"] = value
elif attr in ("validation_regex", "validation_minimum", "validation_maximum"):
value = getattr(field, attr)
if value != FIELD_DEFAULTS.get(attr):
result[attr] = value

return result


def _removed_fields_from_document(cot) -> list:
"""
Extract the ``removed_fields`` tombstone list for *cot* from its stored
``schema_document``. Returns an empty list if the document is absent or
does not reference this COT.
"""
if not cot.schema_document:
return []
# NOTE: matches by COT name. If the COT is renamed after tombstones
# are persisted, they will not be found. This will be addressed when
# #390 (apply) is implemented and tombstones are managed more explicitly.
for type_def in cot.schema_document.get("types", []):
if type_def.get("name") == cot.name:
return list(type_def.get("removed_fields", []))
return []


# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------

def export_cot(cot) -> dict:
"""
Serialise a single ``CustomObjectType`` to its schema definition dict
(the inner object that goes inside the ``types`` list).

Fields without a ``schema_id`` are skipped; a WARNING is logged for each.
"""
result: dict = {
"name": cot.name,
"slug": cot.slug,
}

# Optional COT-level attributes — omit when blank/unset.
if cot.version:
result["version"] = cot.version
if cot.verbose_name:
result["verbose_name"] = cot.verbose_name
if cot.verbose_name_plural:
result["verbose_name_plural"] = cot.verbose_name_plural
if cot.description:
result["description"] = cot.description
if cot.group_name:
result["group_name"] = cot.group_name

# Active + deprecated fields, ordered by schema_id for stable output.
exported_fields = []
for field in cot.fields.order_by("schema_id"):
if field.schema_id is None:
logger.warning(
"Skipping field %r on COT %r during export: no schema_id assigned. "
"This field was likely created before the schema-format feature was "
"introduced and cannot be tracked portably.",
field.name,
cot.name,
)
continue
exported_fields.append(_export_field(field))

if exported_fields:
result["fields"] = exported_fields

# Tombstones from previous apply operations.
removed = _removed_fields_from_document(cot)
if removed:
result["removed_fields"] = removed

return result


def export_cots(cots) -> dict:
"""
Serialise one or more ``CustomObjectType`` instances to a complete schema
document dict (``{ schema_version, types }``) that validates against
``cot_schema_v1.json``.

*cots* may be any iterable of ``CustomObjectType`` instances.
"""
if not cots:
raise ValueError("Minimum 1 Custom Object Type required.")
return {
"schema_version": SCHEMA_FORMAT_VERSION,
"types": [export_cot(cot) for cot in cots],
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class Migration(migrations.Migration):
dependencies = [
('core', '0021_job_queue_name'),
('extras', '0134_owner'),
('netbox_custom_objects', '0004_customobjecttype_group_name'),
('netbox_custom_objects', '0005_customobjecttypefield_context'),
]

operations = [
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Generated by Django 5.2.12 on 2026-04-07 01:35

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('core', '0021_job_queue_name'),
('extras', '0134_owner'),
('netbox_custom_objects', '0006_customobjecttypefield_related_name_and_more'),
]

operations = [
migrations.AddField(
model_name='customobjecttype',
name='next_schema_id',
field=models.PositiveIntegerField(default=0, editable=False),
),
migrations.AddField(
model_name='customobjecttype',
name='schema_document',
field=models.JSONField(blank=True, null=True),
),
migrations.AddField(
model_name='customobjecttypefield',
name='deprecated',
field=models.BooleanField(default=False),
),
migrations.AddField(
model_name='customobjecttypefield',
name='deprecated_since',
field=models.CharField(blank=True, max_length=50),
),
migrations.AddField(
model_name='customobjecttypefield',
name='scheduled_removal',
field=models.CharField(blank=True, max_length=50),
),
migrations.AddField(
model_name='customobjecttypefield',
name='schema_id',
field=models.PositiveIntegerField(blank=True, null=True),
),
migrations.AlterField(
model_name='customobjecttype',
name='version',
field=models.CharField(blank=True, max_length=50),
),
migrations.AddConstraint(
model_name='customobjecttypefield',
constraint=models.UniqueConstraint(
condition=models.Q(('schema_id__isnull', False)),
fields=('schema_id', 'custom_object_type'),
name='netbox_custom_objects_customobjecttypefield_unique_schema_id',
),
),
]
63 changes: 63 additions & 0 deletions netbox_custom_objects/migrations/0008_backfill_schema_ids.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
"""
Data migration: assign schema_id to existing CustomObjectTypeField rows that
predate the schema-format feature and never received one.

Strategy
--------
For each CustomObjectType:
1. Find the current maximum schema_id already in use (may be 0 if none).
2. Assign the next available integer to every field with schema_id=NULL,
ordered by the field's primary-key (creation order) for determinism.
3. Update next_schema_id on the parent CustomObjectType to the highest ID
now assigned, so that future field additions continue from the right value.

The reverse operation is intentionally a no-op: rolling back would leave the
schema_id column in an indeterminate state, and re-running the forward
migration is safe (it only touches NULL rows).
"""

from django.db import migrations
from django.db.models import Max


# Exposed as a module-level name so tests can import and call it directly
# without going through the migration runner.
def assign_schema_ids(apps, schema_editor):
CustomObjectType = apps.get_model('netbox_custom_objects', 'CustomObjectType')
CustomObjectTypeField = apps.get_model('netbox_custom_objects', 'CustomObjectTypeField')

for cot in CustomObjectType.objects.all():
# Highest schema_id already in use for this COT (0 if none).
current_max = (
CustomObjectTypeField.objects
.filter(custom_object_type=cot, schema_id__isnull=False)
.aggregate(max_id=Max('schema_id'))['max_id'] or 0
)

# Assign the next integers to all unassigned fields, ordered by pk.
next_id = current_max + 1
for field in (
CustomObjectTypeField.objects
.filter(custom_object_type=cot, schema_id__isnull=True)
.order_by('id')
):
CustomObjectTypeField.objects.filter(pk=field.pk).update(schema_id=next_id)
next_id += 1

# Sync next_schema_id upward. Never decrease it.
highest_assigned = next_id - 1 # equals current_max when no NULLs existed
if highest_assigned > cot.next_schema_id:
CustomObjectType.objects.filter(pk=cot.pk).update(
next_schema_id=highest_assigned
)


class Migration(migrations.Migration):

dependencies = [
('netbox_custom_objects', '0007_customobjecttype_next_schema_id_and_more'),
]

operations = [
migrations.RunPython(assign_schema_ids, migrations.RunPython.noop),
]
Loading
Loading