Skip to content

Netbox Create BGP Peering Task¤

task api name: create_bgp_peering

Creates one or many BGP sessions in NetBox. Supports single-session mode (individual keyword arguments) and bulk mode (bulk_create list of dicts). IP addresses and ASNs are resolved from IPAM or created on-demand. When local_interface is provided the local address is resolved from IPAM; for P2P subnets (/30, /31, /127) the remote address is derived automatically. Optionally creates a mirror (reverse) session on the remote device.

How it Works¤

  1. Client submits create_bgp_peering request to NetBox worker
  2. NetBox worker validates that the BGP plugin is installed
  3. Worker resolves the RIR ID once (when rir is provided) for on-demand ASN creation
  4. For each spec — when local_interface is supplied, the interface is looked up in IPAM to derive local_address; P2P peer IP and remote device are derived from the subnet
  5. Worker pre-fetches all existing sessions for targeted devices (single API call) to support idempotency
  6. For each session spec:
    • Idempotency check — if the session name already exists, it is added to exists and skipped
    • In dry-run mode — the name is added to create and processing continues
    • Local and remote IPs are resolved from IPAM (or created when not found)
    • Local and remote ASNs are resolved from IPAM (or created when rir is provided)
    • Optional fields (peer group, routing policies, prefix lists) are resolved or created by name
    • When create_reverse=True a mirror session is built by swapping local/remote IPs and ASNs
  7. All prepared payloads are sent to NetBox in a single bulk-create call

Prerequisites¤

  • NetBox BGP plugin (netbox-bgp) must be installed and enabled on the NetBox instance.

Branching Support¤

create_bgp_peering is branch-aware. Pass branch=<name> to write all sessions into a NetBox Branching Plugin branch instead of main.

Dry Run Mode¤

dry_run=True returns session names without any NetBox writes:

{
    "create": ["<session_name>", ...],
    "exists": ["<session_name>", ...],
}

Sessions that already exist in NetBox appear in exists regardless of dry-run mode.

Session Naming¤

By default, sessions are named using:

{device}_{local_address}_{remote_address}

Use the name_template parameter for a custom naming scheme. The template is a Python format string with the variables device, local_address, and remote_address.

Example:

name_template="{device}_BGP_{local_address}"
# ceos-spine-1_BGP_10.0.0.1

Reverse (Mirror) Session¤

When create_reverse=True (the default), the task also creates a mirror session on the remote device by swapping local and remote IPs and ASNs. The remote device is identified from IPAM by looking up the interface that holds the remote address.

Set create_reverse=False to suppress mirror session creation.

Note

sync_bgp_peerings passes create_reverse=False when delegating to create_bgp_peering because it manages both sides of a session independently via the diff.

VRF Custom Field¤

The VRF reference is always stored in a BGP session custom field that must be configured as type Object in NetBox pointing to the VRF content-type. This means NetBox stores a reference to a single VRF object, not a plain string.

By default vrf_custom_field="vrf" means the VRF object reference is written into custom_fields["vrf"]. Pass a different name to target a different custom field:

result = client.run_job(
    "netbox",
    "create_bgp_peering",
    workers="any",
    kwargs={
        "name": "ceos-spine-1_10.0.0.1_10.0.0.2",
        "device": "ceos-spine-1",
        "local_address": "10.0.0.1",
        "remote_address": "10.0.0.2",
        "local_as": 65001,
        "remote_as": 65002,
        "rir": "lab",
        "vrf": "PROD_VRF",
        "vrf_custom_field": "tenant_vrf",  # Object-type custom field -> VRF
    },
)

The vrf parameter accepts a VRF name which is resolved to a NetBox VRF object ID before being stored as an object reference in the custom field.

Examples¤

Create a single BGP session:

nf#netbox create bgp-peering name ceos-spine-1_10.0.0.1_10.0.0.2 device ceos-spine-1 local-address 10.0.0.1 remote-address 10.0.0.2 local-as 65001 remote-as 65002 rir lab

Dry run — preview what would be created:

nf#netbox create bgp-peering name ceos-spine-1_10.0.0.1_10.0.0.2 device ceos-spine-1 local-address 10.0.0.1 remote-address 10.0.0.2 local-as 65001 remote-as 65002 dry-run

Create session from interface (P2P peer derived automatically):

nf#netbox create bgp-peering device ceos-spine-1 local-interface Ethernet1 local-as 65001 rir lab

Create session into a NetBox branch:

nf#netbox create bgp-peering name ceos-spine-1_10.0.0.1_10.0.0.2 device ceos-spine-1 local-address 10.0.0.1 remote-address 10.0.0.2 local-as 65001 remote-as 65002 rir lab branch my-bgp-branch
from norfab.core.nfapi import NorFab

nf = NorFab(inventory="./inventory.yaml")
nf.start()
client = nf.make_client()

# create a single BGP session
result = client.run_job(
    "netbox",
    "create_bgp_peering",
    workers="any",
    kwargs={
        "name": "ceos-spine-1_10.0.0.1_10.0.0.2",
        "device": "ceos-spine-1",
        "local_address": "10.0.0.1",
        "remote_address": "10.0.0.2",
        "local_as": "65001",
        "remote_as": "65002",
        "rir": "lab",
    },
)

# dry run — preview without writing
result = client.run_job(
    "netbox",
    "create_bgp_peering",
    workers="any",
    kwargs={
        "name": "ceos-spine-1_10.0.0.1_10.0.0.2",
        "device": "ceos-spine-1",
        "local_address": "10.0.0.1",
        "remote_address": "10.0.0.2",
        "local_as": "65001",
        "remote_as": "65002",
        "dry_run": True,
    },
)

# bulk create
result = client.run_job(
    "netbox",
    "create_bgp_peering",
    workers="any",
    kwargs={
        "bulk_create": [
            {
                "name": "ceos-spine-1_10.0.0.1_10.0.0.2",
                "device": "ceos-spine-1",
                "local_address": "10.0.0.1",
                "remote_address": "10.0.0.2",
                "local_as": "65001",
                "remote_as": "65002",
            },
            {
                "name": "ceos-spine-2_10.0.0.3_10.0.0.4",
                "device": "ceos-spine-2",
                "local_address": "10.0.0.3",
                "remote_address": "10.0.0.4",
                "local_as": "65001",
                "remote_as": "65003",
            },
        ],
        "rir": "lab",
    },
)

# create from interface — local address resolved from IPAM, P2P remote derived
result = client.run_job(
    "netbox",
    "create_bgp_peering",
    workers="any",
    kwargs={
        "device": "ceos-spine-1",
        "local_interface": "Ethernet1",
        "local_as": "65001",
        "rir": "lab",
    },
)

# ASN source from device custom fields
result = client.run_job(
    "netbox",
    "create_bgp_peering",
    workers="any",
    kwargs={
        "device": "ceos-spine-1",
        "local_interface": "Ethernet1",
        "asn_source": "custom_fields.asn",
        "rir": "lab",
    },
)

# create into a NetBox branch
result = client.run_job(
    "netbox",
    "create_bgp_peering",
    workers="any",
    kwargs={
        "name": "ceos-spine-1_10.0.0.1_10.0.0.2",
        "device": "ceos-spine-1",
        "local_address": "10.0.0.1",
        "remote_address": "10.0.0.2",
        "local_as": "65001",
        "remote_as": "65002",
        "rir": "lab",
        "branch": "my-bgp-branch",
    },
)

nf.destroy()

NORFAB Netbox Create BGP Peering Command Shell Reference¤

NorFab shell supports these command options for the Netbox create_bgp_peering task:

nf# man tree netbox.create.bgp-peering
root
└── netbox:    Netbox service
    └── create:    Create Netbox objects
        └── bgp-peering:    Create BGP peering session(s)
            ├── timeout:    Job timeout
            ├── workers:    Filter worker to target, default 'any'
            ├── verbose-result:    Control output details, default 'False'
            ├── progress:    Display progress events, default 'True'
            ├── instance:    Netbox instance name to target
            ├── branch:    Branching plugin branch name to use
            ├── dry-run:    Return names without writing to NetBox
            ├── name:    Session name (single mode)
            ├── device:    Local device name (single mode)
            ├── local-address:    Local IP address string
            ├── remote-address:    Remote IP address string
            ├── local-as:    Local AS number string
            ├── remote-as:    Remote AS number string
            ├── status:    Session status, default 'active'
            ├── description:    Session description
            ├── vrf:    VRF name
            ├── peer-group:    Peer group name
            ├── import-policies:    Import routing policy names
            ├── export-policies:    Export routing policy names
            ├── prefix-list-in:    Inbound prefix list name
            ├── prefix-list-out:    Outbound prefix list name
            ├── local-interface:    Local interface name or bracket-range pattern
            ├── asn-source:    Dot-path or IPAM query dict for ASN resolution
            ├── name-template:    Format string for session names
            ├── create-reverse:    Also create mirror session on remote device, default 'True'
            ├── bulk-create:    JSON list of session dicts for bulk creation
            ├── rir:    RIR name for ASN creation
            ├── vrf-custom-field:    BGP session field for VRF reference, default 'vrf'
            └── message:    Changelog message for NetBox write operations
nf#

Create one or many BGP sessions in NetBox.

Supports single-session mode (individual keyword arguments) and bulk mode (bulk_create list of dicts). IP addresses and ASNs are resolved from IPAM or created on demand. When local_interface is provided the local address is resolved from IPAM; for P2P subnets (/30, /31, /127) the remote address is derived automatically.

Parameters:

Name Type Description Default
job Job

NorFab Job object.

required
instance str

NetBox instance name.

None
name str

Session name. Derived from name_template when omitted.

None
device str

Local device name. Required in single-session mode.

None
local_address str

Local IP address string. Derived from local_interface when omitted.

None
remote_address str

Remote IP address string. Derived from P2P peer when local_interface is used.

None
local_as int

Local AS number string. Derived from asn_source when omitted.

None
remote_as int

Remote AS number string. Derived from asn_source on remote device when omitted.

None
status str

Session status. Default 'active'.

'active'
description str

Session description.

None
vrf str

VRF name.

None
peer_group str

Peer group name (resolved or created).

None
import_policies list

List of import routing-policy names.

None
export_policies list

List of export routing-policy names.

None
prefix_list_in str

Inbound prefix-list name.

None
prefix_list_out str

Outbound prefix-list name.

None
local_interface str

Local interface name or bracket-range pattern.

None
asn_source str or dict

Dot-path string through device data or dict of kwargs for nb.ipam.asn.get for automatic ASN resolution.

None
name_template str

Format string for session names. Default '{device}_{vrf}_{remote_address}'.

'{device}_{vrf}_{remote_address}'
create_reverse bool

When True also create a mirror session on the remote device with local and remote IPs/ASNs swapped. Default True.

True
bulk_create list

List of session dicts for bulk creation.

None
rir str

RIR name used when auto-creating ASNs.

None
message str

Changelog message recorded on every NetBox write.

None
branch str

NetBox branching plugin branch name.

None
dry_run bool

When True return session names without writing.

False
vrf_custom_field str

Name of the BGP session custom field that stores the VRF object reference. The custom field must be of type Object in NetBox pointing to the VRF content-type. The value is always a single VRF object reference written into custom_fields[vrf_custom_field]. Default 'vrf' means custom_fields['vrf'].

'vrf'

Returns:

Type Description
Result

Normal run::

{"created": ["name1", ...], "exists": ["name2", ...]}

Result

Dry run::

{"create": ["name1", ...], "exists": ["name2", ...]}

Source code in norfab\workers\netbox_worker\bgp_peerings_tasks.py
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
@Task(
    fastapi={"methods": ["POST"], "schema": NetboxFastApiArgs.model_json_schema()},
    input=CreateBgpPeeringInput,
)
def create_bgp_peering(
    self,
    job: Job,
    instance: Union[None, str] = None,
    # single-session mode
    name: Union[None, str] = None,
    device: Union[None, str] = None,
    local_address: Union[None, str] = None,
    remote_address: Union[None, str] = None,
    local_as: Union[None, int] = None,
    remote_as: Union[None, int] = None,
    status: str = "active",
    description: Union[None, str] = None,
    vrf: Union[None, str] = None,
    peer_group: Union[None, str] = None,
    import_policies: Union[None, list] = None,
    export_policies: Union[None, list] = None,
    prefix_list_in: Union[None, str] = None,
    prefix_list_out: Union[None, str] = None,
    # interface-driven resolution
    local_interface: Union[None, str] = None,
    asn_source: Union[None, str, dict] = None,
    name_template: Union[None, str] = "{device}_{vrf}_{remote_address}",
    # mirror session
    create_reverse: bool = True,
    # bulk mode
    bulk_create: Union[None, list] = None,
    # shared
    rir: Union[None, str] = None,
    message: Union[None, str] = None,
    branch: Union[None, str] = None,
    dry_run: bool = False,
    vrf_custom_field: str = "vrf",
) -> Result:
    """
    Create one or many BGP sessions in NetBox.

    Supports single-session mode (individual keyword arguments) and bulk mode
    (``bulk_create`` list of dicts).  IP addresses and ASNs are resolved from
    IPAM or created on demand.  When ``local_interface`` is provided the local
    address is resolved from IPAM; for P2P subnets (/30, /31, /127) the remote
    address is derived automatically.

    Args:
        job: NorFab Job object.
        instance (str, optional): NetBox instance name.
        name (str, optional): Session name. Derived from ``name_template`` when omitted.
        device (str, optional): Local device name. Required in single-session mode.
        local_address (str, optional): Local IP address string. Derived from ``local_interface`` when omitted.
        remote_address (str, optional): Remote IP address string. Derived from P2P peer when ``local_interface`` is used.
        local_as (int, optional): Local AS number string. Derived from ``asn_source`` when omitted.
        remote_as (int, optional): Remote AS number string. Derived from ``asn_source`` on remote device when omitted.
        status (str): Session status. Default ``'active'``.
        description (str, optional): Session description.
        vrf (str, optional): VRF name.
        peer_group (str, optional): Peer group name (resolved or created).
        import_policies (list, optional): List of import routing-policy names.
        export_policies (list, optional): List of export routing-policy names.
        prefix_list_in (str, optional): Inbound prefix-list name.
        prefix_list_out (str, optional): Outbound prefix-list name.
        local_interface (str, optional): Local interface name or bracket-range pattern.
        asn_source (str or dict, optional): Dot-path string through device data or
            dict of kwargs for ``nb.ipam.asn.get`` for automatic ASN resolution.
        name_template (str, optional): Format string for session names. Default
            ``'{device}_{vrf}_{remote_address}'``.
        create_reverse (bool): When ``True`` also create a mirror session on the
            remote device with local and remote IPs/ASNs swapped. Default ``True``.
        bulk_create (list, optional): List of session dicts for bulk creation.
        rir (str, optional): RIR name used when auto-creating ASNs.
        message (str, optional): Changelog message recorded on every NetBox write.
        branch (str, optional): NetBox branching plugin branch name.
        dry_run (bool): When ``True`` return session names without writing.
        vrf_custom_field (str): Name of the BGP session custom field that stores
            the VRF object reference.  The custom field must be of type Object in
            NetBox pointing to the VRF content-type.  The value is always a single
            VRF object reference written into ``custom_fields[vrf_custom_field]``.
            Default ``'vrf'`` means ``custom_fields['vrf']``.

    Returns:
        Normal run::

            {"created": ["name1", ...], "exists": ["name2", ...]}

        Dry run::

            {"create": ["name1", ...], "exists": ["name2", ...]}
    """
    instance = instance or self.default_instance
    ret = Result(
        task=f"{self.name}:create_bgp_peering",
        result={},
        resources=[instance],
    )

    # Validate BGP plugin
    if not self.has_plugin("netbox_bgp", instance, strict=True):
        ret.errors.append(
            f"'{instance}' NetBox instance has no BGP plugin installed"
        )
        ret.failed = True
        return ret

    nb = self._get_pynetbox(instance, branch=branch)

    if message:
        nb.http_session.headers["X-Changelog-Message"] = message

    # Validate VRF custom field and RIR
    vrf_custom_field = _resolve_vrf_custom_field(
        vrf_custom_field, nb, job, self.name
    )
    rir_id = _resolve_rir_id(rir, nb, job, self.name)

    # Build initial bgp_sessions list
    bgp_sessions = bulk_create or [
        {
            "name": name,
            "device": device,
            "local_address": local_address,
            "remote_address": remote_address,
            "local_as": local_as,
            "remote_as": remote_as,
            "status": status,
            "description": description,
            "vrf": vrf,
            "peer_group": peer_group,
            "import_policies": import_policies,
            "export_policies": export_policies,
            "prefix_list_in": prefix_list_in,
            "prefix_list_out": prefix_list_out,
            "local_interface": local_interface,
        }
    ]

    # Step 5a: Resolve interfaces, IPs, and discover remote devices for every
    # bgp_session in one pass.  Range expansion turns one bgp_session into many when a bracket
    # pattern is used in local_interface.
    resolved_bgp_sessions = []
    # dict keyed by device name; values populated with {id, site_id, data} after batch fetch
    all_device_names = {}

    for bgp_session in bgp_sessions:
        bgp_session_device = bgp_session["device"]
        bgp_session_local_interface = bgp_session.get("local_interface")

        if bgp_session_local_interface:
            # Expand bracket-range pattern e.g. "Ethernet[1-4]/1.101"
            intf_names = expand_alphanumeric_range(bgp_session_local_interface)

            # Batch fetch interfaces and their IPs in two calls instead of 2N
            intf_by_name = {
                i.name: i
                for i in nb.dcim.interfaces.filter(
                    device=bgp_session_device, name=intf_names, fields="id,name"
                )
            }
            found_intf_ids = [i.id for i in intf_by_name.values()]
            ips_by_intf_id: Dict[int, list] = {}
            if found_intf_ids:
                for _ip in nb.ipam.ip_addresses.filter(
                    interface_id=found_intf_ids,
                    fields="id,address,assigned_object_id",
                ):
                    ips_by_intf_id.setdefault(_ip.assigned_object_id, []).append(
                        _ip
                    )

            for intf_name in intf_names:
                intf = intf_by_name.get(intf_name)
                if not intf:
                    msg = f"interface '{intf_name}' not found on device '{bgp_session_device}'"
                    job.event(msg, severity="WARNING")
                    log.warning(f"{self.name} - {msg}")
                    ret.errors.append(msg)
                    continue

                ip_list = ips_by_intf_id.get(intf.id, [])
                if not ip_list:
                    msg = f"no IP address assigned to interface '{intf_name}' on '{bgp_session_device}'"
                    job.event(msg, severity="WARNING")
                    log.warning(f"{self.name} - {msg}")
                    ret.errors.append(msg)
                    continue

                ip_cidr = ip_list[0].address  # e.g. "10.0.0.1/31"
                local_addr = ip_cidr.split("/")[0]

                bgp_session_remote_address = bgp_session.get("remote_address")
                remote_device_name = None

                if not bgp_session_remote_address:
                    peer_ip = get_p2p_peer_ip(ip_cidr)
                    if peer_ip:
                        bgp_session_remote_address = peer_ip
                        peer_ip_list = list(
                            nb.ipam.ip_addresses.filter(address=peer_ip)
                        )
                        if peer_ip_list and peer_ip_list[0].assigned_object:
                            obj = peer_ip_list[0].assigned_object
                            if hasattr(obj, "device") and obj.device:
                                remote_device_name = obj.device.name
                                all_device_names.setdefault(
                                    remote_device_name, None
                                )

                new_bgp_session = dict(bgp_session)
                new_bgp_session["device"] = bgp_session_device
                new_bgp_session["local_address"] = local_addr
                new_bgp_session["remote_address"] = bgp_session_remote_address
                new_bgp_session["remote_device_name"] = remote_device_name

                if not new_bgp_session.get("name"):
                    try:
                        new_bgp_session["name"] = name_template.format(
                            device=bgp_session_device,
                            **new_bgp_session,
                        )
                    except Exception as exc:
                        msg = (
                            f"name_template '{name_template}' failed for session "
                            f"'{new_bgp_session.get('name')}' on '{bgp_session_device}': {exc}"
                        )
                        ret.errors.append(msg)
                        log.error(f"{self.name} - {msg}")
                        continue

                all_device_names.setdefault(bgp_session_device, None)
                resolved_bgp_sessions.append(new_bgp_session)

                # Build mirror (reverse) session at resolution time if remote device identified
                if create_reverse and remote_device_name:
                    mirror_session = dict(bgp_session)
                    mirror_session["device"] = remote_device_name
                    mirror_session["local_address"] = bgp_session_remote_address
                    mirror_session["remote_address"] = local_addr
                    mirror_session["local_as"] = bgp_session.get("remote_as")
                    mirror_session["remote_as"] = bgp_session.get("local_as")
                    mirror_session["remote_device_name"] = bgp_session_device
                    mirror_session["local_interface"] = None
                    mirror_session["name"] = None
                    try:
                        mirror_session["name"] = name_template.format(
                            device=remote_device_name,
                            **mirror_session,
                        )
                    except Exception as exc:
                        msg = (
                            f"name_template '{name_template}' failed for mirror session "
                            f"on '{remote_device_name}': {exc}"
                        )
                        ret.errors.append(msg)
                        log.error(f"{self.name} - {msg}")
                    else:
                        resolved_bgp_sessions.append(mirror_session)
        else:
            bgp_session_copy = dict(bgp_session)
            bgp_session_copy["remote_device_name"] = None
            if bgp_session_device:
                all_device_names.setdefault(bgp_session_device, None)
            resolved_bgp_sessions.append(bgp_session_copy)

    # Step 5b: Pre-fetch device data for all collected device names (single API call)
    if all_device_names:
        try:
            for dev_obj in nb.dcim.devices.filter(name=list(all_device_names)):
                all_device_names[dev_obj.name] = {
                    "id": dev_obj.id,
                    "site_id": dev_obj.site.id if dev_obj.site else None,
                    "data": dict(dev_obj),
                }
        except Exception as exc:
            log.warning(
                f"{self.name} - could not pre-fetch device data for "
                f"{list(all_device_names)}: {exc}"
            )

    # Step 5c: Pre-fetch existing sessions for idempotency (single API call)
    existing_session_names = set()
    if all_device_names:
        try:
            existing = nb.plugins.bgp.session.filter(
                device=list(all_device_names), fields="name,id"
            )
            existing_session_names = {s.name for s in existing}
        except Exception as exc:
            log.warning(
                f"{self.name} - could not pre-fetch BGP sessions for "
                f"{list(all_device_names)}: {exc}; will check per-session"
            )

    # Step 6: Process each resolved bgp_session
    if dry_run:
        result = {"create": [], "exists": []}
    else:
        result = {"created": [], "exists": []}

    _lookup_cache: dict = {}
    payloads = []

    for bgp_session in resolved_bgp_sessions:
        bgp_session_device = bgp_session.get("device")
        bgp_session_local_address = bgp_session.get("local_address")
        bgp_session_remote_address = bgp_session.get("remote_address")
        bgp_session_local_as = bgp_session.get("local_as")
        bgp_session_remote_as = bgp_session.get("remote_as")
        remote_device_name = bgp_session.get("remote_device_name")

        # Determine session name
        sname = bgp_session.get("name")
        if not sname:
            try:
                sname = name_template.format(
                    device=bgp_session_device,
                    **bgp_session,
                )
            except Exception as exc:
                msg = (
                    f"name_template '{name_template}' failed for session "
                    f"'{bgp_session.get('name')}' on '{bgp_session_device}': {exc}"
                )
                ret.errors.append(msg)
                log.error(f"{self.name} - {msg}")
                continue

        # Idempotency check (step 6i)
        if sname in existing_session_names:
            result["exists"].append(sname)
            continue

        # Dry run — report name and move on (step 6j)
        if dry_run:
            result["create"].append(sname)
            continue

        # --- Full resolution (non-dry-run) ---

        # Resolve ASNs from asn_source if not supplied (steps 6b / 6c)
        if asn_source and not bgp_session_local_as:
            dev_data = (all_device_names.get(bgp_session_device) or {}).get(
                "data", {}
            )
            bgp_session_local_as = resolve_asn_from_source(dev_data, asn_source, nb)
            if not bgp_session_local_as:
                msg = f"could not resolve local AS for '{sname}' via asn_source"
                job.event(msg, severity="ERROR")
                log.error(f"{self.name} - {msg}")
                ret.errors.append(msg)
                continue

        if asn_source and not bgp_session_remote_as:
            if not remote_device_name:
                msg = (
                    f"cannot resolve remote AS for '{sname}': remote device not "
                    f"identified and remote_as not provided"
                )
                job.event(msg, severity="ERROR")
                log.error(f"{self.name} - {msg}")
                ret.errors.append(msg)
                continue
            remote_dev_data = (all_device_names.get(remote_device_name) or {}).get(
                "data", {}
            )
            bgp_session_remote_as = resolve_asn_from_source(
                remote_dev_data, asn_source, nb
            )
            if not bgp_session_remote_as:
                msg = f"could not resolve remote AS for '{sname}' via asn_source"
                job.event(msg, severity="ERROR")
                log.error(f"{self.name} - {msg}")
                ret.errors.append(msg)
                continue

        # Resolve IP IDs and ASN IDs (steps 6d / 6e)
        local_ip_id = resolve_ip(bgp_session_local_address, nb, job, ret, self.name)
        remote_ip_id = resolve_ip(
            bgp_session_remote_address, nb, job, ret, self.name
        )
        local_as_id = resolve_asn(
            bgp_session_local_as, nb, rir_id, job, ret, self.name
        )
        remote_as_id = resolve_asn(
            bgp_session_remote_as, nb, rir_id, job, ret, self.name
        )

        resolution_errors = []
        if not local_ip_id:
            resolution_errors.append(f"local IP '{bgp_session_local_address}'")
        if not remote_ip_id:
            resolution_errors.append(f"remote IP '{bgp_session_remote_address}'")
        if not local_as_id:
            resolution_errors.append(f"local ASN '{bgp_session_local_as}'")
        if not remote_as_id:
            resolution_errors.append(f"remote ASN '{bgp_session_remote_as}'")
        if resolution_errors:
            msg = f"skipping '{sname}': could not resolve {', '.join(resolution_errors)}"
            job.event(msg, severity="WARNING")
            log.warning(f"{self.name} - {msg}")
            ret.errors.append(msg)
            continue

        # Resolve device ID and site ID (step 6f)
        dev_info = all_device_names.get(bgp_session_device)
        if not dev_info:
            msg = (
                f"device '{bgp_session_device}' not found in NetBox, "
                f"skipping '{sname}'"
            )
            job.event(msg, severity="WARNING")
            log.warning(f"{self.name} - {msg}")
            ret.errors.append(msg)
            continue

        device_id = dev_info["id"]
        site_id = dev_info["site_id"]
        addr_family = get_addr_family(bgp_session_local_address)

        payload = {
            "name": sname,
            "description": bgp_session.get("description") or "",
            "device": device_id,
            "local_address": local_ip_id,
            "local_as": local_as_id,
            "remote_address": remote_ip_id,
            "remote_as": remote_as_id,
            "status": bgp_session.get("status", "active"),
            "site": site_id,
        }

        # Optional fields (step 6h)
        payload.update(
            resolve_bgp_session_payload_fields(
                {
                    k: bgp_session[k]
                    for k in (
                        "vrf",
                        "peer_group",
                        "import_policies",
                        "export_policies",
                        "prefix_list_in",
                        "prefix_list_out",
                    )
                    if bgp_session.get(k)
                },
                nb,
                rir_id,
                job,
                ret,
                self.name,
                addr_family,
                _lookup_cache=_lookup_cache,
                vrf_custom_field=vrf_custom_field,
            )
        )

        payloads.append(payload)

    # Bulk create (step 7)
    if payloads:
        try:
            nb.plugins.bgp.session.create(payloads)
            result["created"].extend(p["name"] for p in payloads)
            msg = f"created {len(payloads)} BGP session(s)"
            job.event(msg)
            log.info(f"{self.name} - {msg}")
        except Exception as e:
            msg = f"failed to create BGP sessions: {e}"
            ret.errors.append(msg)
            log.error(f"{self.name} - {msg}")

    ret.result = result
    return ret