BGP Peerings Tasks Split Plan¤
Overview¤
Split the internal create-path and update-path of sync_bgp_peerings into two
standalone, directly-callable tasks:
create_bgp_peering— creates one or many BGP sessions in NetBoxupdate_bgp_peering— updates one or many existing BGP sessions in NetBox
sync_bgp_peerings is refactored to delegate its write phase to the new tasks
(see Section 5); its external interface and return structure remain unchanged.
Coding Guidelines¤
These apply to all code written as part of this plan:
- Keep the codebase minimal — write only what is needed. No speculative abstractions or future-proofing.
- Avoid excessive helper functions — extract a helper when the logic is non-trivial or reused across two or more tasks. Keep extracted functions as plain module-level functions; do not use leading underscores.
- Linear, easy-to-follow logic — code should read top-to-bottom with no surprising jumps. Prefer flat
if/elif/elsechains over layered function calls. - No difficult constructs — avoid metaclasses, decorators-on-decorators,
functoolstricks, nested closures, generator pipelines, or anything that makes stepping through a debugger awkward. - Simple is better than clever — if two approaches produce the same result, always choose the one a junior engineer can understand without context.
- No unnecessary classes — do not wrap logic in a class just for the sake of grouping. Module-level functions are fine.
- Use descriptive human readable names for variables, avoid using short name like s, or plo, exception could be list or dict comprehensions though.
Design Goals¤
- Both tasks follow the exact same conventions as all other netbox worker tasks
(
@Task, PydanticInputmodel,Resultreturn,job.event+log.*casing rules). - Each task accepts single-object mode (individual keyword arguments) AND
bulk mode (
bulk_create/bulk_updatelist of dicts) — matching the pattern used bycreate_ip_bulk. - Shared resolution helpers (
resolve_ip,resolve_asn,resolve_or_create,get_addr_family,get_p2p_peer_ip,resolve_asn_from_source) are defined at the top ofbgp_peerings_tasks.pyas plain module-level functions (no leading underscore) and reused bycreate_bgp_peering,update_bgp_peering, and the refactoredsync_bgp_peerings. - Input validation handled by Pydantic models defined in
bgp_peerings_tasks.py(alongside the existingSyncBgpPeeringsInputandBgpSessionStatusEnum). - New tests added to
TestSyncBgpPeeringsclass (or a new sibling class) intests/test_netbox_service.py.
Feature: expand_interface_range Utility (norfab/utils/text.py)¤
A general-purpose string-expansion utility, modelled after the bracket-expansion
functions in the NetBox codebase, placed in norfab/utils/text.py so it is
reusable by any part of the codebase (not just the BGP tasks).
Function signature¤
def expand_interface_range(pattern: str) -> list[str]:
"""
Expand a bracket-notation pattern into a list of concrete strings.
Bracket groups ``[...]`` may contain:
- A comma-separated list of alternatives: ``[ge,xe]``
- A numeric range: ``[0-3]``
- A mix of both: ``[ge,xe,0-3]`` (each token is
treated as a literal unless it matches ``N-M`` with integer N and M)
Multiple bracket groups in one pattern are expanded as a cartesian product.
A pattern with no bracket groups is returned as a single-element list.
Examples::
expand_interface_range("[ge,xe]-0/0/[0-1]")
# ["ge-0/0/0", "ge-0/0/1", "xe-0/0/0", "xe-0/0/1"]
expand_interface_range("Ethernet[1-4]/1.101")
# ["Ethernet1/1.101", "Ethernet2/1.101", "Ethernet3/1.101", "Ethernet4/1.101"]
expand_interface_range("eth0")
# ["eth0"]
"""
Algorithm¤
import re
from itertools import product
BRACKET_RE = re.compile(r"\[([^\]]+)\]")
def expand_bracket(group: str) -> list[str]:
"""Expand a single bracket group content into a list of string alternatives."""
results = []
for token in group.split(","):
token = token.strip()
m = re.fullmatch(r"(\d+)-(\d+)", token)
if m:
start, end = int(m.group(1)), int(m.group(2))
step = 1 if end >= start else -1
width = len(m.group(1)) # preserve leading zeros e.g. [00-03]
results.extend(str(i).zfill(width) for i in range(start, end + step, step))
else:
results.append(token)
return results
def expand_interface_range(pattern: str) -> list[str]:
parts = BRACKET_RE.split(pattern) # alternates: literal, group, literal, group, ...
literals = parts[0::2] # even indices are literal segments
groups = parts[1::2] # odd indices are bracket group contents
if not groups:
return [pattern]
expanded_groups = [expand_bracket(g) for g in groups]
results = []
for combo in product(*expanded_groups):
s = literals[0]
for i, val in enumerate(combo):
s += val + literals[i + 1]
results.append(s)
return results
Key properties¤
- No external dependencies — pure stdlib (
re,itertools.product). - Leading-zero preservation:
[00-03]→"00","01","02","03". - Descending ranges:
[3-1]→"3","2","1". - Mixed token bracket:
[ge,xe,0-2]→"ge","xe","0","1","2". - No nested brackets (YAGNI; can be added later).
Usage in BGP tasks¤
Imported directly at the top of bgp_peerings_tasks.py:
from norfab.utils.text import expand_interface_range
No separate re-implementation in bgp_peerings_tasks.py.
Feature: Interface-Driven Address and ASN Resolution¤
local_interface argument¤
Allows the user to specify a local interface name (or a name pattern with range
expansion, e.g. Ethernet[1-10]/1.101) instead of an explicit local_address IP.
Resolution logic:
- Look up the interface(s) in NetBox (
nb.dcim.interfaces.filter(device=device, name=local_interface)). - Fetch the IP address(es) assigned to the interface from IPAM
(
nb.ipam.ip_addresses.filter(interface_id=intf.id)). - Use the IP address (without prefix length) as
local_address. - If the prefix length indicates a point-to-point subnet (
/30,/31for IPv4;/127,/128for IPv6), automatically deriveremote_addressas the peer IP in the same subnet (e.g. for10.0.0.1/31the peer is10.0.0.0; for10.0.0.1/30the peer is the other usable host in the /30). - With
remote_addressknown, optionally look up which NetBox device owns that IP (nb.ipam.ip_addresses.filter(address=remote_address)→assigned_object) to identify the remote device — this is later used byasn_sourceresolution.
When local_interface is provided together with bulk_create, each item in the
bulk list may also carry its own local_interface to override on a per-session basis.
P2P prefix-length detection table:
| Prefix length | Family | Treated as P2P |
|---|---|---|
| /30 | IPv4 | yes — peer is the other usable host |
| /31 | IPv4 | yes — peer is the other address |
| /127 | IPv6 | yes — peer is the other address |
| /128 | IPv6 | yes — loopback-style; peer lookup via NetBox only |
| other | any | no — remote_address must be supplied explicitly |
asn_source argument¤
Controls automatic ASN resolution when local_as or remote_as are not supplied
explicitly (if either value is already provided, asn_source is not invoked for
that side). Supports two input forms:
str— dot-separated path through the NetBox device data dict/list.dict— kwargs passed directly tonb.ipam.asn.get(**asn_source)to look up an ASN object from the NetBox REST API; theasnattribute of the returned object is used (e.g.{"tenant": "lab", "rir": "RFC 1918"}).
Examples:
asn_source="custom_fields.asn" # str: path through device data
asn_source="config_context.bgp.local_as" # str: nested path
asn_source="custom_fields.asn.0" # str: first element of a list
asn_source={"tenant": "lab", "rir": "RFC 1918"} # dict: IPAM ASN query
Resolution logic (module-level function resolve_asn_from_source):
def resolve_asn_from_source(device_data: dict, asn_source, nb) -> str | None:
"""
Resolve an ASN from device data or a NetBox IPAM query.
asn_source can be:
- str → dot-separated path through device_data dict/list
- dict → kwargs passed to nb.ipam.asn.get(**asn_source); uses asn_obj.asn
"""
if isinstance(asn_source, dict):
asn_obj = nb.ipam.asn.get(**asn_source)
if asn_obj is not None:
return str(asn_obj.asn)
else:
node = device_data
for key in asn_source.split("."):
if isinstance(node, dict):
node = node.get(key)
elif isinstance(node, list):
try:
node = node[int(key)]
except (ValueError, IndexError):
node = None
else:
node = None
if node is None:
break
if node is not None:
return str(node)
return None
When asn_source is provided:
- If
local_asis already supplied by the caller,asn_sourceis not invoked for the local side. - If
remote_asis already supplied by the caller,asn_sourceis not invoked for the remote side. - For a missing
local_as:resolve_asn_from_source(local_device_data, asn_source, nb)is called. - For a missing
remote_as:resolve_asn_from_source(remote_device_data, asn_source, nb)is called (remote device identified via the P2P peer IP lookup described above). - If the remote device cannot be identified from IPAM and
remote_asis not provided explicitly, this is treated as an error:log.erroris called,job.eventis emitted withseverity="ERROR", the message is appended toret.errors, and the session is skipped. - If
resolve_asn_from_sourcereturnsNonefor either side: same ERROR treatment.
This enables the minimal-input usage pattern:
create_bgp_peering(
device="ceos-leaf-1",
local_interface="Ethernet[1-4]/1.101", # range expansion → 4 interfaces
asn_source="custom_fields.asn", # str path or dict query
rir="lab",
name_template="{device}_BGP_{name}",
)
This creates one BGP session per interface, resolving IPs from the interface IPAM record, the peer IP from the /31 subnet, and both ASNs from the device/peer device fields — without the user providing any IP addresses or AS numbers directly.
1. Shared Helper Extraction¤
Current state¤
The resolution helpers are nested closures inside sync_bgp_peerings, relying on
the enclosing nb, rir_id, job, ret, and self variables via closure.
Proposed refactor¤
All helpers are moved to the top of bgp_peerings_tasks.py as plain module-level
functions with no leading underscore. Each function receives every value it needs as
an explicit argument — no closures, no implicit state.
# --- top of bgp_peerings_tasks.py, above all classes ---
def resolve_ip(address, nb, job, ret, worker_name) -> int | None:
"""Resolve or create an IP address in IPAM, return its NetBox ID or None."""
...
def resolve_asn(asn_str, nb, rir_id, job, ret, worker_name) -> int | None:
"""Resolve or create an ASN, return its NetBox ID or None."""
...
def resolve_or_create(endpoint, name, obj_type, nb, job, ret, worker_name, family=None) -> int | None:
"""Resolve or create a named BGP object (peer group / routing policy / prefix list)."""
...
def get_addr_family(address) -> str:
"""Return 'ipv4' or 'ipv6' based on the IP address version."""
...
def get_p2p_peer_ip(cidr: str) -> str | None:
"""Return the peer IP for a P2P subnet (/30, /31, /127) or None for other prefixes."""
...
def resolve_asn_from_source(device_data: dict, asn_source, nb) -> str | None:
"""Resolve ASN from device data (dot-path str) or NetBox IPAM query (dict); return ASN string or None."""
...
All three tasks (create_bgp_peering, update_bgp_peering, sync_bgp_peerings)
call these module-level functions directly, passing their local nb, rir_id,
job, ret, and self.name as arguments.
expand_interface_range lives in norfab/utils/text.py and is imported at the
top of bgp_peerings_tasks.py:
from norfab.utils.text import expand_interface_range
2. New Pydantic Models (added to bgp_peerings_tasks.py)¤
All models live in bgp_peerings_tasks.py alongside the existing SyncBgpPeeringsInput
and BgpSessionStatusEnum. No changes to netbox_models.py.
BgpSessionFields — shared base for a single BGP session's data fields¤
class BgpSessionFields(BaseModel):
name: StrictStr # session name as it will appear in NetBox
device: StrictStr # device name (used to resolve device ID)
local_address: Optional[StrictStr] = None # derived from local_interface when omitted
remote_address: Optional[StrictStr] = None # derived from P2P peer when omitted
local_as: Optional[StrictStr] = None # derived from asn_source when omitted
remote_as: Optional[StrictStr] = None # derived from asn_source on remote device
status: BgpSessionStatusEnum = "active"
description: Optional[StrictStr] = None
vrf: Optional[StrictStr] = None
peer_group: Optional[StrictStr] = None
import_policies: Optional[List[StrictStr]] = None
export_policies: Optional[List[StrictStr]] = None
prefix_list_in: Optional[List[StrictStr]] = None
prefix_list_out: Optional[List[StrictStr]] = None
local_interface: Optional[StrictStr] = None # per-session interface override
BgpSessionStatusEnum already exists in bgp_peerings_tasks.py and stays there.
CreateBgpPeeringInput(NetboxCommonArgs)¤
class CreateBgpPeeringInput(NetboxCommonArgs, use_enum_values=True):
# --- Single-session mode ---
name: Optional[StrictStr] = None
device: Optional[StrictStr] = None
local_address: Optional[StrictStr] = None # explicit IP string
remote_address: Optional[StrictStr] = None # explicit IP string; derived when local_interface + P2P
local_as: Optional[StrictStr] = None # explicit AS string; derived when asn_source set
remote_as: Optional[StrictStr] = None # explicit AS string; derived when asn_source set
status: BgpSessionStatusEnum = "active"
description: Optional[StrictStr] = None
vrf: Optional[StrictStr] = None
peer_group: Optional[StrictStr] = None
import_policies: Optional[List[StrictStr]] = None
export_policies: Optional[List[StrictStr]] = None
prefix_list_in: Optional[List[StrictStr]] = None
prefix_list_out: Optional[List[StrictStr]] = None
# --- Interface-driven resolution ---
local_interface: Optional[StrictStr] = Field(...)
asn_source: Optional[Union[StrictStr, Dict[StrictStr, Any]]] = Field(
None,
description="Dot-path string or IPAM query dict for automatic ASN resolution.",
examples=[
"str: dot-separated path through device data, e.g. 'custom_fields.asn'. "
"dict: kwargs for nb.ipam.asn.get, e.g. {'tenant': 'lab', 'rir': 'RFC 1918'}."
],
)
name_template: Optional[StrictStr] = Field(...)
# --- Mirror session ---
create_reverse: bool = Field(
True,
description="When True, also create a reverse BGP session on the remote device with local and remote IPs/ASNs swapped.",
examples=[
"Set to False to create only the local-device session and skip the automatic "
"mirror session on the remote device."
],
)
# --- Bulk mode ---
bulk_create: Optional[List[BgpSessionFields]] = Field(
None,
description="List of BGP session objects to create in bulk.",
examples=[
"List of BgpSessionFields dicts, each requiring at minimum 'name', 'device', "
"and either 'local_address' or 'local_interface'. "
"Example: [{'name': 'leaf1_10.0.0.1_10.0.0.0', 'device': 'leaf1', "
"'local_address': '10.0.0.1', 'remote_address': '10.0.0.0', "
"'local_as': '65001', 'remote_as': '65002'}]."
],
)
# --- Shared resolution options ---
rir: Optional[StrictStr] = Field(
None,
description="RIR name used when auto-creating ASNs in NetBox.",
examples=["RIR name as it appears in NetBox, e.g. 'RFC 1918', 'ARIN', 'RIPE'."],
)
message: Optional[StrictStr] = Field(
None,
description="Changelog message recorded on every NetBox write.",
examples=["Free-text string written to the NetBox changelog, e.g. 'Provisioned by NorFab'."],
)
@model_validator(mode="after")
def validate_single_or_bulk(self):
if self.bulk_create is None:
# In single-session mode device is always required.
# local_address OR local_interface must be provided.
# remote_address may be omitted when local_interface resolves a P2P subnet.
# local_as / remote_as may be omitted when asn_source is provided.
if not self.device:
raise ValueError("Single-session mode requires 'device'.")
if not self.local_address and not self.local_interface:
raise ValueError(
"Single-session mode requires either 'local_address' or 'local_interface'."
)
return self
BgpSessionUpdateFields — changeable fields for a single update¤
class BgpSessionUpdateFields(BaseModel):
name: StrictStr # existing session name to update (required)
description: Optional[StrictStr] = None
status: Optional[BgpSessionStatusEnum] = None
local_address: Optional[StrictStr] = None
remote_address: Optional[StrictStr] = None
local_as: Optional[StrictStr] = None
remote_as: Optional[StrictStr] = None
vrf: Optional[StrictStr] = None
peer_group: Optional[StrictStr] = None
import_policies: Optional[List[StrictStr]] = None
export_policies: Optional[List[StrictStr]] = None
prefix_list_in: Optional[List[StrictStr]] = None
prefix_list_out: Optional[List[StrictStr]] = None
UpdateBgpPeeringInput(NetboxCommonArgs)¤
class UpdateBgpPeeringInput(NetboxCommonArgs, use_enum_values=True):
# --- Single-session mode ---
name: Optional[StrictStr] = Field(
None,
description="Existing session name to update.",
examples=["Name of the BGP session as stored in NetBox, e.g. 'leaf1_10.0.0.1_10.0.0.0'."],
)
description: Optional[StrictStr] = None
status: Optional[BgpSessionStatusEnum] = None
local_address: Optional[StrictStr] = None
remote_address: Optional[StrictStr] = None
local_as: Optional[StrictStr] = None
remote_as: Optional[StrictStr] = None
vrf: Optional[StrictStr] = None
peer_group: Optional[StrictStr] = None
import_policies: Optional[List[StrictStr]] = None
export_policies: Optional[List[StrictStr]] = None
prefix_list_in: Optional[List[StrictStr]] = None
prefix_list_out: Optional[List[StrictStr]] = None
# --- Bulk mode ---
bulk_update: Optional[List[BgpSessionUpdateFields]] = Field(
None,
description="List of BGP sessions to update in bulk.",
examples=[
"List of BgpSessionUpdateFields dicts, each requiring 'name' plus any fields to change. "
"Example: [{'name': 'leaf1_10.0.0.1_10.0.0.0', 'description': 'uplink', 'status': 'active'}]."
],
)
# --- Shared resolution options ---
rir: Optional[StrictStr] = None
message: Optional[StrictStr] = None
@model_validator(mode="after")
def validate_single_or_bulk(self):
if self.bulk_update is None and self.name is None:
raise ValueError(
"Either 'name' (single-session mode) or 'bulk_update' (bulk mode) is required."
)
return self
3. New Task: create_bgp_peering¤
Location: norfab/workers/netbox_worker/bgp_peerings_tasks.py — new method on
NetboxBgpPeeringsTasks.
Decorator:
@Task(fastapi={"methods": ["POST"], "schema": NetboxFastApiArgs.model_json_schema()})
Signature:
def create_bgp_peering(
self,
job: Job,
instance: Union[None, str] = None,
# single-session mode
name: Union[None, str] = None,
device: Union[None, str] = None,
local_address: Union[None, str] = None,
remote_address: Union[None, str] = None,
local_as: Union[None, str] = None,
remote_as: Union[None, str] = None,
status: str = "active",
description: Union[None, str] = None,
vrf: Union[None, str] = None,
peer_group: Union[None, str] = None,
import_policies: Union[None, list] = None,
export_policies: Union[None, list] = None,
prefix_list_in: Union[None, list] = None,
prefix_list_out: Union[None, list] = None,
# interface-driven resolution
local_interface: Union[None, str] = None,
asn_source: Union[None, str, dict] = None,
name_template: Union[None, str] = None,
# mirror session
create_reverse: bool = True,
# bulk mode
bulk_create: Union[None, list] = None,
# shared
rir: Union[None, str] = None,
message: Union[None, str] = None,
branch: Union[None, str] = None,
dry_run: bool = False,
) -> Result:
Logic flow:
- Validate BGP plugin is installed (
has_plugin). - Connect pynetbox:
nb = self._get_pynetbox(instance, branch=branch). - Set changelog message header if
messageprovided. - Resolve
rir_idonce. - Build the list of session specs to process:
- Single-session mode (
bulk_create is None): one spec from keyword arguments. - Bulk mode: iterate
bulk_createlist. 5a. Resolve interfaces, IPs, and devices for every spec in one pass (before any idempotency pre-fetch). For each spec:- Record the local device name in
all_device_names. - If
local_interfaceis set (spec-level or task-level): - Expand any range pattern via
expand_interface_range. - For each expanded interface name, look it up in NetBox
(
nb.dcim.interfaces.filter(device=device, name=intf_name)) and fetch the assigned IP from IPAM (nb.ipam.ip_addresses.filter(interface_id=intf.id)). - Store the IP address portion (without prefix length) as
local_addresson the spec. - Call
get_p2p_peer_ipon the full CIDR string. If it returns a peer IP:- Store the peer IP as
remote_addresson the spec. - Look up that peer IP in IPAM (
nb.ipam.ip_addresses.filter(address=remote_ip)) to find itsassigned_object.device.name— store this asremote_deviceon the spec and add it toall_device_names.
- Store the peer IP as
- After processing the spec,
all_device_namescontains every local device name plus every remote device name discovered via P2P resolution. 5b. Pre-fetch existing sessions for idempotency (single API call): - Call
nb.plugins.bgp.session.filter(device=list(all_device_names), fields="name,id")once using the complete set built in step 5a. - Build
existing_session_names: set[str]from the results. - This set is used in steps 6i and 6k; no additional API calls are made per spec.
- Record the local device name in
- For each spec, resolve ASNs and NetBox IDs:
a.
local_address,remote_address, andremote_deviceare already resolved from step 5a; no further interface or IP lookup is needed here. b. Ifasn_sourceset andlocal_asis not yet resolved (not supplied by caller):- Fetch full device data for the local device from NetBox.
- Call
resolve_asn_from_source(device_data, asn_source, nb); use result aslocal_as. - If
resolve_asn_from_sourcereturnsNone: calllog.error, emitjob.event(severity="ERROR"), append toret.errors, skip session. c. Ifasn_sourceset andremote_asis not yet resolved (not supplied by caller): - If
remote_devicewas not found in step 5a (peer IP not in IPAM andremote_asnot supplied explicitly): calllog.error, emitjob.event(severity="ERROR"), append toret.errors, skip session. - Otherwise fetch full device data for the remote device and call
resolve_asn_from_source(remote_device_data, asn_source, nb); use result asremote_as. - If
resolve_asn_from_sourcereturnsNone: calllog.error, emitjob.event(severity="ERROR"), append toret.errors, skip session. d. Callresolve_ipforlocal_addressandremote_address(creates in IPAM if missing). e. Callresolve_asnforlocal_asandremote_as(creates ASN ifrir_idavailable). f. Resolve device ID and site ID vianb.dcim.devices.get(name=device). Site is always taken from the device object — it is never accepted as a task argument. g. If any required field is still missing: append toret.errors, skip session, emitjob.eventwithseverity="WARNING". h. Optionally resolvevrf,peer_group,import_policies,export_policies,prefix_list_in,prefix_list_outusingresolve_or_create. Policy and prefix-list fields are accepted asList[str]; each element is resolved or created individually in NetBox and its ID collected. Whensync_bgp_peeringsdelegates, it must split any pipe-separated strings into lists before building thebulk_createpayload. i. Idempotency check: look upsnameinexisting_session_names(the set pre-fetched in step 5a). If present, addsnametoexistslist, emit ajob.event(no severity), and skip — do not create a duplicate. No additional API call is made per spec. j. Ifdry_run: appendsname(the resolved session name string) to thedry_runlist. No NetBox IDs are resolved in dry-run mode — the check only produces the list of names that would be created. k. Mirror (reverse) session — only whencreate_reverse=Trueand the remote device was successfully identified from IPAM (step 5a): - Build a mirror spec by swapping every resolved value:
device= remote device namelocal_address/local_as= originalremote_address/remote_asremote_address/remote_as= originallocal_address/local_as- All other fields (
vrf,peer_group, policies, prefix lists) copied as-is.
- Derive
mirror_nameusingname_template(same template, swapped values); default:"{device}_{local_address}_{remote_address}"with swapped IPs. - Resolve device ID and site ID for the remote device via
nb.dcim.devices.get(name=remote_device_name). If not found: emitjob.eventWARNING, append toret.errors, skip mirror session. - Run idempotency check: look up
mirror_nameinexisting_session_names. If already exists: add toexists, skip. - If
dry_run: appendmirror_nametodry_runlist. - Otherwise: add the mirror payload to the same
payloadslist for bulk creation in step 7.
- If not dry_run: call
nb.plugins.bgp.session.create(payloads)in bulk. - Return
Result.
Note on local_interface + range expansion: when a range pattern expands to
multiple interfaces (e.g. Ethernet[1-4]/1.101 → 4 interfaces), the task creates
one BGP session per interface. Session name is derived from name_template if
provided, otherwise defaults to "{device}_{local_address}_{remote_address}".
Return structure (normal run):
{
"created": ["session-name-1", "session-name-2"],
"exists": ["session-name-3"], # already in NetBox, not re-created
}
Return structure (dry run):
{
"create": ["session-name-1", "session-name-2"], # names that would be created
"exists": ["session-name-3"], # names that already exist
}
4. New Task: update_bgp_peering¤
Location: same file, new method on NetboxBgpPeeringsTasks.
Decorator:
@Task(fastapi={"methods": ["PATCH"], "schema": NetboxFastApiArgs.model_json_schema()})
Signature:
def update_bgp_peering(
self,
job: Job,
instance: Union[None, str] = None,
# single-session mode
name: Union[None, str] = None,
description: Union[None, str] = None,
status: Union[None, str] = None,
local_address: Union[None, str] = None,
remote_address: Union[None, str] = None,
local_as: Union[None, str] = None,
remote_as: Union[None, str] = None,
vrf: Union[None, str] = None,
peer_group: Union[None, str] = None,
import_policies: Union[None, list] = None,
export_policies: Union[None, list] = None,
prefix_list_in: Union[None, list] = None,
prefix_list_out: Union[None, list] = None,
# bulk mode
bulk_update: Union[None, list] = None,
# shared
rir: Union[None, str] = None,
message: Union[None, str] = None,
branch: Union[None, str] = None,
dry_run: bool = False,
) -> Result:
Logic flow:
- Validate BGP plugin is installed.
- Connect pynetbox.
- Set changelog message header.
- Resolve
rir_id. - Build list of (session_name, changed_fields) pairs:
- Single-session mode (
bulk_update is None):nameis required; build one dict from any non-None keyword arguments excludingname. - Bulk mode: iterate
bulk_updatelist, each item hasnameplus optional fields. - For each session to update:
a. Fetch existing session from NetBox:
nb.plugins.bgp.session.get(name=sname). b. If not found: append toret.errors, emitjob.eventWARNING, continue. c. Determineaddr_familyusingget_addr_familyon existing session's local IP. d. Buildchanged_payload— a dict of field → new resolved value — using the same field-dispatch logic as the current update block insync_bgp_peerings(callsresolve_ip,resolve_asn,resolve_or_createas appropriate). Only include fields that are actually provided in the update spec (non-None). e. Idempotency / dry-run diff: normalise the existing session's current values into a comparable dict (same field names aschanged_payload). Rundeepdiff.DeepDiff(current_normalised, changed_payload, ignore_order=True).- If
dry_run: append{"name": sname, "diff": deepdiff_result}todry_runlist and skip the write. If diff is empty, add toin_syncinstead. - If not dry_run and diff is empty: add
snametoin_synclist, skip write. f. If not dry_run and diff non-empty: callsession.update(changed_payload), addsnametoupdatedlist.
- If
- Return
Result.
Return structure (normal run):
{
"updated": ["session-name-1"],
"in_sync": ["session-name-2"], # no changes needed
}
Return structure (dry run):
{
"update": [
{
"name": "session-name-1",
"diff": {"values_changed": {"root['description']": {"new_value": "new", "old_value": "old"}}}
}
],
"in_sync": ["session-name-2"],
}
5. sync_bgp_peerings Refactor¤
sync_bgp_peerings is refactored to delegate the entire write phase to the new
tasks. After computing full_diff, it builds two flat lists and calls:
# Build bulk_create list from full_diff
bulk_create = []
for device_name, actions in full_diff.items():
for sname in actions["create"]:
s = normalised_live[device_name][sname]
bulk_create.append({"name": sname, "device": device_name, **s})
# Build bulk_update list from full_diff
bulk_update = []
for device_name, actions in full_diff.items():
for sname, field_changes in actions["update"].items():
entry = {"name": sname}
for field, change in field_changes.items():
entry[field] = change["new_value"]
bulk_update.append(entry)
# Delegate writes
if bulk_create:
create_result = self.create_bgp_peering(
job=job, instance=instance, bulk_create=bulk_create,
rir=rir, message=message, branch=branch,
)
ret.errors.extend(create_result.errors)
if bulk_update:
update_result = self.update_bgp_peering(
job=job, instance=instance, bulk_update=bulk_update,
rir=rir, message=message, branch=branch,
)
ret.errors.extend(update_result.errors)
The deletion loop remains inline in sync_bgp_peerings (it is a simple
session.delete() loop not shared with any other task).
The existing nested-closure helpers inside sync_bgp_peerings are removed;
the task now calls the module-level functions directly.
External behaviour of sync_bgp_peerings is unchanged — the return structure
(created, updated, deleted, in_sync per device) is assembled from the
results of the delegated calls and the existing diff data.
6. CLI Shell Registration¤
Find how sync_bgp_peerings is registered in norfab/clients/nfcli_shell/nfcli_shell_client.py
and mirror the exact same pattern for the two new tasks.
Expected command paths:
nf# netbox create bgp-peering ...
nf# netbox update bgp-peering ...
Each parameter of the task function maps to one CLI argument. Follow the same
argument-naming conventions already used (hyphen-separated, e.g. dry-run,
bulk-create, local-interface, asn-source, name-template).
netbox.create.bgp-peering
├── ...
├── local-interface
├── asn-source
├── name-template
├── create-reverse
├── bulk-create
├── rir
└── message
Full expected man tree output:
nf# man tree netbox.create.bgp-peering
root
└── netbox: Netbox service
└── create: Create Netbox objects
└── bgp-peering: Create BGP peering session(s)
├── timeout
├── workers
├── verbose-result
├── progress
├── instance
├── branch
├── dry-run
├── name
├── device
├── local-address
├── remote-address
├── local-as
├── remote-as
├── status
├── description
├── vrf
├── peer-group
├── import-policies
├── export-policies
├── prefix-list-in
├── prefix-list-out
├── local-interface
├── asn-source
├── name-template
├── create-reverse
├── bulk-create
├── rir
└── message
nf# man tree netbox.update.bgp-peering root └── netbox: Netbox service └── update: Update Netbox objects └── bgp-peering: Update BGP peering session(s) ├── timeout ├── workers ├── verbose-result ├── progress ├── instance ├── branch ├── dry-run ├── name ├── description ├── status ├── local-address ├── remote-address ├── local-as ├── remote-as ├── vrf ├── peer-group ├── import-policies ├── export-policies ├── prefix-list-in ├── prefix-list-out ├── bulk-update ├── rir └── message
---
## 7. Test Plan (additions to `tests/test_netbox_service.py`)
Follow the same structure as `TestSyncBgpPeerings`:
- `setup_method` / `teardown_method` call `delete_bgp_sessions()` to ensure a clean state.
- Each test is self-contained: it creates whatever NetBox state it needs, runs the
task, asserts on the result, and cleans up any side-effect objects it created.
- Use `get_pynetbox(nfclient)` for direct NetBox verification.
- Print the raw result with `pprint.pprint(ret)` before assertions for easier debugging.
### New test class: `TestCreateBgpPeering`
| Test | What it verifies |
|---|---|
| `test_create_bgp_peering_single` | Single-session mode — `created=[name]`, session appears in NetBox |
| `test_create_bgp_peering_single_idempotent` | Session already exists — `exists=[name]`, no duplicate created |
| `test_create_bgp_peering_single_dry_run` | `dry_run=True` — `dry_run=[name]` returned, no session in NetBox |
| `test_create_bgp_peering_single_dry_run_exists` | `dry_run=True`, session already exists — `dry_run=[]`, `exists=[name]` |
| `test_create_bgp_peering_bulk` | `bulk_create=[...]` — all sessions in `created`, appear in NetBox |
| `test_create_bgp_peering_bulk_partial_idempotent` | Some sessions already exist — correct split between `created` and `exists` |
| `test_create_bgp_peering_bulk_dry_run` | `dry_run=True` + bulk — `dry_run=[names]`, no writes |
| `test_create_bgp_peering_reverse_session` | `create_reverse=True` — both local and remote sessions created |
| `test_create_bgp_peering_reverse_session_idempotent` | Remote session already exists — in `exists`, local session still created |
| `test_create_bgp_peering_reverse_disabled` | `create_reverse=False` — only local session created |
| `test_create_bgp_peering_reverse_dry_run` | `dry_run=True`, `create_reverse=True` — both names in `dry_run` |
| `test_create_bgp_peering_reverse_unknown_remote_device` | Remote device not in NetBox — mirror skipped with error, local session still created |
| `test_create_bgp_peering_missing_required` | Single mode with missing field — `failed=True` + error message |
| `test_create_bgp_peering_nonexistent_device` | Unknown device name — error appended, no crash |
| `test_create_bgp_peering_with_branch` | `branch=...` — session created in branch |
| `test_create_bgp_peering_with_peer_group_policies_prefix_lists` | Optional fields resolved/created in NetBox |
| `test_create_bgp_peering_asn_auto_create` | ASN not in NetBox — auto-created when `rir` provided |
| `test_create_bgp_peering_ip_auto_create` | IP not in NetBox — auto-created in IPAM |
| `test_create_bgp_peering_local_interface` | `local_interface` resolves `local_address` from IPAM |
| `test_create_bgp_peering_local_interface_p2p_derives_remote` | P2P /31 — `remote_address` derived automatically |
| `test_create_bgp_peering_local_interface_range` | Range pattern expands to multiple sessions |
| `test_create_bgp_peering_asn_source_custom_fields` | `asn_source="custom_fields.asn"` resolves both ASNs |
| `test_create_bgp_peering_asn_source_config_context` | `asn_source="config_context.bgp.local_as"` resolves ASNs |
| `test_create_bgp_peering_asn_source_ipam_dict` | `asn_source={"tenant": "lab"}` queries IPAM directly |
| `test_create_bgp_peering_interface_and_asn_source_combined` | Minimal input: device + interface range + asn_source |
| `test_create_bgp_peering_non_p2p_interface_requires_remote` | Non-P2P prefix — error if `remote_address` missing |
### New test class: `TestUpdateBgpPeering`
| Test | What it verifies |
|---|---|
| `test_update_bgp_peering_single` | Single-session mode — field updated, in `updated` list |
| `test_update_bgp_peering_single_dry_run` | `dry_run=True` — deepdiff returned in `dry_run`, no write |
| `test_update_bgp_peering_single_dry_run_in_sync` | `dry_run=True`, values already match — `in_sync=[name]`, empty `dry_run` |
| `test_update_bgp_peering_bulk` | `bulk_update=[...]` — all changed sessions in `updated` |
| `test_update_bgp_peering_bulk_dry_run` | `dry_run=True` + bulk — diffs in `dry_run` list |
| `test_update_bgp_peering_nonexistent_session` | Session not in NetBox — error appended, not in `updated` |
| `test_update_bgp_peering_no_changes` | All values already match — `in_sync=[name]`, `updated=[]`, no write |
| `test_update_bgp_peering_status` | `status` field updated correctly |
| `test_update_bgp_peering_description` | `description` field updated correctly |
| `test_update_bgp_peering_routing_policies` | `import_policies` / `export_policies` updated |
| `test_update_bgp_peering_with_branch` | `branch=...` — update applied to branch |
---
## 8. Documentation
Create two new files modelled **exactly** on
`docs/workers/netbox/services_netbox_service_tasks_sync_bgp_peerings.md`.
### Files to create
- `docs/workers/netbox/services_netbox_service_tasks_create_bgp_peering.md`
- `docs/workers/netbox/services_netbox_service_tasks_update_bgp_peering.md`
### Required sections (same order as the reference doc)
1. **YAML front-matter** — `tags: [netbox]`
2. **Title and task API name** — e.g. `> task api name: create_bgp_peering`
3. **One-paragraph description** of what the task does.
4. **How it Works** — numbered steps matching the logic flow in section 3 / 4 of
this plan.
5. **Prerequisites** — BGP plugin requirement, any other hard dependencies.
6. **Branching Support** paragraph (same boilerplate as sync doc).
7. **Dry Run Mode** — describe return structure with a fenced code block.
8. **Examples** — tabbed `=== "CLI"` / `=== "Python"` blocks covering:
- Single-session mode
- Bulk mode
- Dry-run
- With `branch`
- With `local_interface` + `asn_source` (create doc only)
9. **NORFAB Shell Reference** — paste the `man tree` output from section 6.
10. **Python API Reference** — MkDocs autodoc directive:
```markdown
::: norfab.workers.netbox_worker.bgp_peerings_tasks.NetboxBgpPeeringsTasks.create_bgp_peering
::: norfab.workers.netbox_worker.bgp_peerings_tasks.NetboxBgpPeeringsTasks.update_bgp_peering
Also add both new pages to mkdocs.yml nav under the Netbox worker section,
following the same ordering as the sync page.
9. File Change Summary¤
| File | Change |
|---|---|
norfab/utils/text.py |
Add expand_interface_range and expand_bracket utilities |
norfab/workers/netbox_worker/bgp_peerings_tasks.py |
Add module-level helpers (resolve_ip, resolve_asn, resolve_or_create, get_addr_family, get_p2p_peer_ip, resolve_asn_from_source) at top of file; add Pydantic models; add create_bgp_peering and update_bgp_peering methods; refactor sync_bgp_peerings to delegate writes and use module-level helpers |
norfab/workers/netbox_worker/netbox_models.py |
No changes |
norfab/clients/nfcli_shell/nfcli_shell_client.py |
Register create bgp-peering and update bgp-peering commands |
tests/test_netbox_service.py |
Add TestCreateBgpPeering and TestUpdateBgpPeering classes |
docs/workers/netbox/services_netbox_service_tasks_create_bgp_peering.md |
New doc file (modelled on sync_bgp_peerings doc) |
docs/workers/netbox/services_netbox_service_tasks_update_bgp_peering.md |
New doc file (modelled on sync_bgp_peerings doc) |
mkdocs.yml |
Add both new doc pages to nav under Netbox worker section |
10. Resolved Decisions¤
| # | Question | Decision |
|---|---|---|
| 1 | Should sync_bgp_peerings delegate writes to the new tasks? |
Yes — refactored in Section 5. Single source of truth for write logic. |
| 2 | Should site be an accepted argument? |
No — always inferred from the device object; removed from all models, signatures, and CLI. |
| 3 | Should policies be List[str]? |
Yes — all four policy/prefix-list fields (import_policies, export_policies, prefix_list_in, prefix_list_out) are List[str]. sync_bgp_peerings must split pipe-separated strings before delegating. |
| 4 | Bulk error granularity — fail all or skip bad session? | Skip and append to errors — same behaviour as sync_bgp_peerings. |
| 5 | asn_source on update_bgp_peering? |
No for now — create_bgp_peering only; add to update as a follow-up. |
| 6 | Failure when remote ASN unresolvable via asn_source? |
ERROR — call log.error, emit job.event(severity="ERROR"), append to ret.errors, skip session. Same applies when remote device cannot be identified from IPAM and remote_as was not provided explicitly. |
| 7 | Where does sync_bgp_peerings split pipe-separated policy strings? |
Inside sync_bgp_peerings, just before building the bulk_create / bulk_update payload; no Pydantic validator needed. |
| 8 | Should name_template be added to create_bgp_peering? |
Yes — added to signature, CreateBgpPeeringInput, and CLI tree. Used only when local_interface range expansion produces multiple sessions; default is "{device}_{local_address}_{remote_address}". |
| 9 | asn_source type — string paths only, or also NetBox REST query? |
Both — asn_source accepts str (dot-path through device data) or dict (kwargs for nb.ipam.asn.get). No list fallback. resolve_asn_from_source takes nb as a third argument. |
| 10 | Idempotency on create_bgp_peering? |
Yes — existing sessions are pre-fetched once with nb.plugins.bgp.session.filter(device=[...], fields="name,id") before the per-spec loop (step 5a). The idempotency check inside the loop is a set lookup — no extra API call per spec. Sessions already in NetBox are added to exists and skipped. Both normal and dry-run modes report exists. |
| 11 | Dry-run behaviour for create_bgp_peering? |
Returns {"dry_run": [names_that_would_be_created], "exists": [names_already_in_netbox]}. No ID resolution is performed in dry-run mode; only names are reported. |
| 12 | Dry-run behaviour for update_bgp_peering? |
Uses deepdiff.DeepDiff to compare the normalised current NetBox values against the proposed update. Returns {"dry_run": [{"name": ..., "diff": ...}], "in_sync": [...]}. Sessions with no diff go to in_sync. |
| 13 | Idempotency on update_bgp_peering? |
Yes — if deepdiff returns empty, session is added to in_sync and no write is performed (in both normal run and dry-run). |
| 14 | Should create_bgp_peering also create a reverse (mirror) session on the remote device? |
Yes by default — when create_reverse=True (default) and the remote device is identified from IPAM, a second session is built by swapping local↔remote IPs and ASNs. name_template is applied with the swapped values. The mirror session goes through the same idempotency check. Set create_reverse=False to suppress. sync_bgp_peerings passes create_reverse=False when delegating (it manages both sides via diff independently). |
11. Open Questions¤
No open questions at this time.