Skip to content

Netbox Worker

NetboxWorker(inventory, broker, worker_name, exit_event=None, init_done_event=None, log_level: str = None, log_queue: object = None) ¤

Bases: NFPWorker, NetboxGraphqlTasks, NetboxDesignTasks, NetboxInterfacesTasks, NetboxDevicesTasks, NetboxConnectionsTasks, NetboxCircuitsTasks, NetboxNornirInventoryTasks, NetboxBgpPeeringsTasks, NetboxPrefixTasks, NetboxContainerlabInventoryTasks, NetboxIpTasks, NetboxBranchTasks, NetboxCrudTasks

NetboxWorker class for interacting with Netbox API and managing inventory.

Parameters:

Name Type Description Default
inventory dict

The inventory data.

required
broker object

The broker instance.

required
worker_name str

The name of the worker.

required
exit_event Event

Event to signal exit.

None
init_done_event Event

Event to signal initialization completion.

None
log_level int

Logging level.

None
log_queue object

Queue for logging.

None

Raises:

Type Description
AssertionError

If the inventory has no Netbox instances.

Attributes:

Name Type Description
default_instance str

Default Netbox instance name.

inventory dict

Inventory data.

nb_version tuple

Netbox version.

compatible_ge_v4 tuple

Minimum supported Netbox v4 version (4.4.0+).

Source code in norfab\workers\netbox_worker\netbox_worker.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def __init__(
    self,
    inventory,
    broker,
    worker_name,
    exit_event=None,
    init_done_event=None,
    log_level: str = None,
    log_queue: object = None,
) -> None:
    super().__init__(
        inventory, broker, SERVICE, worker_name, exit_event, log_level, log_queue
    )
    self.init_done_event = init_done_event
    self.cache = None

    # get inventory from broker
    self.netbox_inventory = self.load_inventory()
    if not self.netbox_inventory:
        log.critical(
            f"{self.name} - Broker {self.broker} returned no inventory for {self.name}, killing myself..."
        )
        self.destroy()

    assert self.netbox_inventory.get(
        "instances"
    ), f"{self.name} - inventory has no Netbox instances"

    # extract parameters from imvemtory
    self.netbox_connect_timeout = self.netbox_inventory.get(
        "netbox_connect_timeout", 10
    )
    self.netbox_read_timeout = self.netbox_inventory.get("netbox_read_timeout", 300)
    self.cache_use = self.netbox_inventory.get("cache_use", True)
    self.cache_ttl = self.netbox_inventory.get("cache_ttl", 31557600)  # 1 Year
    self.branch_create_timeout = self.netbox_inventory.get(
        "branch_create_timeout", 120
    )
    self.grapqhl_max_workers = self.netbox_inventory.get("grapqhl_max_workers", 4)

    # find default instance
    for name, params in self.netbox_inventory["instances"].items():
        if params.get("default") is True:
            self.default_instance = name
            break
    else:
        self.default_instance = name

    log.info(f"{self.name} - Default Netbox instance: '{self.default_instance}'")

    # check Netbox compatibility
    self._verify_compatibility()

    # instantiate cache
    self.cache_dir = os.path.join(self.base_dir, "cache")
    os.makedirs(self.cache_dir, exist_ok=True)
    self.cache = self._get_diskcache()

    self.init_done_event.set()
    log.info(f"{self.name} - Started")

worker_exit() -> None ¤

Worker exist sanity checks. Closes the cache if it exists.

This method checks if the cache attribute is present and not None. If the cache exists, it closes the cache to release any resources associated with it.

Source code in norfab\workers\netbox_worker\netbox_worker.py
148
149
150
151
152
153
154
155
156
157
def worker_exit(self) -> None:
    """
    Worker exist sanity checks. Closes the cache if it exists.

    This method checks if the cache attribute is present and not None.
    If the cache exists, it closes the cache to release any resources
    associated with it.
    """
    if self.cache:
        self.cache.close()

get_inventory() -> Result ¤

NorFab Task to return running inventory for NetBox worker.

Returns:

Name Type Description
dict Result

A dictionary containing the NetBox inventory.

Source code in norfab\workers\netbox_worker\netbox_worker.py
163
164
165
166
167
168
169
170
171
172
173
@Task(fastapi={"methods": ["GET"], "schema": NetboxFastApiArgs.model_json_schema()})
def get_inventory(self) -> Result:
    """
    NorFab Task to return running inventory for NetBox worker.

    Returns:
        dict: A dictionary containing the NetBox inventory.
    """
    return Result(
        task=f"{self.name}:get_inventory", result=dict(self.netbox_inventory)
    )

get_version(**kwargs: Any) -> Result ¤

Retrieves the version information of Netbox instances.

Returns:

Name Type Description
dict Result

A dictionary containing the version information of the Netbox

Source code in norfab\workers\netbox_worker\netbox_worker.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
@Task(fastapi={"methods": ["GET"], "schema": NetboxFastApiArgs.model_json_schema()})
def get_version(self, **kwargs: Any) -> Result:
    """
    Retrieves the version information of Netbox instances.

    Returns:
        dict: A dictionary containing the version information of the Netbox
    """
    libs = {
        "norfab": "",
        "pynetbox": "",
        "requests": "",
        "python": sys.version.split(" ")[0],
        "platform": sys.platform,
        "diskcache": "",
        "netbox_version": self.nb_version,
    }
    # get version of packages installed
    for pkg in libs.keys():
        try:
            libs[pkg] = importlib.metadata.version(pkg)
        except importlib.metadata.PackageNotFoundError:
            pass

    return Result(task=f"{self.name}:get_version", result=libs)

get_netbox_status(instance: Union[None, str] = None) -> Result ¤

Retrieve the status of NetBox instances.

This method queries the status of a specific NetBox instance if the instance parameter is provided. If no instance is specified, it queries the status of all instances in the NetBox inventory.

Parameters:

Name Type Description Default
instance str

The name of the specific NetBox instance to query.

None

Returns:

Name Type Description
dict Result

A dictionary containing the status of the requested NetBox instance(s).

Source code in norfab\workers\netbox_worker\netbox_worker.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
@Task(fastapi={"methods": ["GET"], "schema": NetboxFastApiArgs.model_json_schema()})
def get_netbox_status(self, instance: Union[None, str] = None) -> Result:
    """
    Retrieve the status of NetBox instances.

    This method queries the status of a specific NetBox instance if the
    `instance` parameter is provided. If no instance is specified, it
    queries the status of all instances in the NetBox inventory.

    Args:
        instance (str, optional): The name of the specific NetBox instance to query.

    Returns:
        dict: A dictionary containing the status of the requested NetBox
              instance(s).
    """
    ret = Result(result={}, task=f"{self.name}:get_netbox_status")
    if instance:
        log.info(f"{self.name} - fetching '{instance}' Netbox status")
        ret.result[instance] = self._query_netbox_status(instance)
    else:
        for name in self.netbox_inventory["instances"].keys():
            log.info(f"{self.name} - fetching '{name}' Netbox status")
            ret.result[name] = self._query_netbox_status(name)
    log.info(f"{self.name} - Netbox instance(s) status retrieval completed")
    return ret

get_compatibility(job: Job) -> Result ¤

Checks the compatibility of Netbox instances based on their version.

This method retrieves the status and version of Netbox instances and determines if they are compatible with the required versions. It logs a warning if any instance is not reachable.

Parameters:

Name Type Description Default
job Job

NorFab Job object containing relevant metadata

required

Returns:

Name Type Description
dict Result

A dictionary where the keys are the instance names and the values are booleans indicating compatibility (True/False) or None if the instance is not reachable.

Source code in norfab\workers\netbox_worker\netbox_worker.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
@Task(fastapi={"methods": ["GET"], "schema": NetboxFastApiArgs.model_json_schema()})
def get_compatibility(self, job: Job) -> Result:
    """
    Checks the compatibility of Netbox instances based on their version.

    This method retrieves the status and version of Netbox instances and determines
    if they are compatible with the required versions. It logs a warning if any
    instance is not reachable.

    Args:
        job: NorFab Job object containing relevant metadata

    Returns:
        dict: A dictionary where the keys are the instance names and the values are
              booleans indicating compatibility (True/False) or None if the instance
              is not reachable.
    """
    ret = Result(task=f"{self.name}:get_compatibility", result={})
    netbox_status = self.get_netbox_status(job=job)
    for instance, params in netbox_status.result.items():
        if params["status"] is not True:
            log.warning(f"{self.name} - {instance} Netbox instance not reachable")
            job.event(f"instance '{instance}' is not reachable")
            ret.result[instance] = None
        else:
            if "-docker-" in params["netbox-version"].lower():
                self.nb_version[instance] = tuple(
                    [
                        int(i)
                        for i in params["netbox-version"]
                        .lower()
                        .split("-docker-")[0]
                        .split(".")
                    ]
                )
            else:
                self.nb_version[instance] = tuple(
                    [int(i) for i in params["netbox-version"].split(".")]
                )
            # check Netbox 4.4+ compatibility
            if self.nb_version[instance] >= self.compatible_ge_v4:
                ret.result[instance] = True
                msg = f"instance '{instance}' version {'.'.join(str(v) for v in self.nb_version[instance])} is compatible"
                log.info(msg)
                job.event(msg)
            else:
                ret.result[instance] = False
                msg = (
                    f"{self.name} - {instance} Netbox version {self.nb_version[instance]} is not supported, "
                    f"minimum required version is {self.compatible_ge_v4}"
                )
                log.error(msg)
                job.event(msg)

    return ret

has_plugin(plugin_name: str, instance: str, strict: bool = False) -> bool ¤

Check if a specified plugin is installed in a given NetBox instance.

Parameters:

Name Type Description Default
plugin_name str

The name of the plugin to check for.

required
instance str

The identifier or address of the NetBox instance.

required
strict bool

If True, raises a RuntimeError when the plugin is not found.

False

Returns:

Name Type Description
bool bool

True if the plugin is installed, False otherwise.

Source code in norfab\workers\netbox_worker\netbox_worker.py
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
def has_plugin(self, plugin_name: str, instance: str, strict: bool = False) -> bool:
    """
    Check if a specified plugin is installed in a given NetBox instance.

    Args:
        plugin_name (str): The name of the plugin to check for.
        instance (str): The identifier or address of the NetBox instance.
        strict (bool, optional): If True, raises a RuntimeError when the plugin is not found.

    Returns:
        bool: True if the plugin is installed, False otherwise.
    """
    nb_status = self._query_netbox_status(instance)

    if plugin_name in nb_status["plugins"]:
        return True
    elif strict is True:
        raise RuntimeError(
            f"'{instance}' Netbox instance has no '{plugin_name}' plugin installed"
        )

    return False

make_diff(source_data: dict, target_data: dict) -> dict ¤

Compute an actionable diff between two nested dictionaries and classify each entity as create, delete, update, or in_sync.

Both arguments share the same two-level structure: the outer key typically is a device name and the inner key is a unique entity identifier (name, slug, or any hashable value). The inner value is a flat dict of comparable fields:

{
    "<device_name>": {
        "<entity_id>": {<field>: <value>, ...},
    }
}

source_data represents the desired or discovered state (e.g. live device data), while target_data represents the current state stored in the target system (e.g. NetBox). Entities present in source_data but absent from target_data are classified as create; entities present in target_data but absent from source_data are classified as delete; entities present in both with differing field values are classified as update; identical entities are in_sync.

Parameters:

Name Type Description Default
source_data dict

Nested dict representing the discovered/live state. Outer key is the device name; inner key is the entity identifier; value is a flat dict of entity fields.

required
target_data dict

Nested dict representing the desired/managed state. Same structure as source_data.

required

Returns:

Name Type Description
dict dict

Keyed by group name, each value contains:

dict
  • create (list[str]): Entity identifiers to be created.
dict
  • delete (list[str]): Entity identifiers to be deleted.
dict
  • update (dict): Entities with field-level changes, keyed by entity identifier. Each entry maps changed field names to a dict with old_value (current) and new_value (desired):

{ "<entity_id>": { "<field>": {"old_value": <current>, "new_value": <desired>} } }

dict
  • in_sync (list[str]): Entity identifiers that are identical in both datasets.
Source code in norfab\workers\netbox_worker\netbox_worker.py
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
def make_diff(
    self,
    source_data: dict,
    target_data: dict,
) -> dict:
    """
    Compute an actionable diff between two nested dictionaries and classify
    each entity as ``create``, ``delete``, ``update``, or ``in_sync``.

    Both arguments share the same two-level structure: the outer key typically is a
    device name and the inner key is a unique entity identifier (name, slug, or any
    hashable value). The inner value is a flat dict of comparable fields:

    ```
    {
        "<device_name>": {
            "<entity_id>": {<field>: <value>, ...},
        }
    }
    ```

    ``source_data`` represents the *desired* or *discovered* state (e.g. live
    device data), while ``target_data`` represents the *current* state stored
    in the target system (e.g. NetBox). Entities present in ``source_data``
    but absent from ``target_data`` are classified as ``create``; entities
    present in ``target_data`` but absent from ``source_data`` are classified
    as ``delete``; entities present in both with differing field values are
    classified as ``update``; identical entities are ``in_sync``.

    Args:
        source_data: Nested dict representing the discovered/live state.
            Outer key is the device name; inner key is the entity identifier;
            value is a flat dict of entity fields.
        target_data: Nested dict representing the desired/managed state.
            Same structure as ``source_data``.

    Returns:
        dict: Keyed by group name, each value contains:

        - ``create`` (list[str]): Entity identifiers to be created.
        - ``delete`` (list[str]): Entity identifiers to be deleted.
        - ``update`` (dict): Entities with field-level changes, keyed by
            entity identifier. Each entry maps changed field names to a dict
            with ``old_value`` (current) and ``new_value`` (desired):

              ```
              {
                  "<entity_id>": {
                      "<field>": {"old_value": <current>, "new_value": <desired>}
                  }
              }
            ```

        - ``in_sync`` (list[str]): Entity identifiers that are identical in
            both datasets.
    """
    result = {}
    diff = DeepDiff(
        target_data,
        source_data,
        ignore_order=True,
        view="tree",
        threshold_to_diff_deeper=0,
    )

    all_devices = set(source_data.keys()) | set(target_data.keys())
    for device_name in all_devices:
        result[device_name] = {
            "create": [],
            "delete": [],
            "update": {},
            "in_sync": [],
        }

    for item in diff.get("dictionary_item_added", []):
        path = item.path(output_format="list")
        if len(path) == 1:
            # Entire device is new in source — all its sessions are missing in target
            device_name = path[0]
            result[device_name]["create"].extend(source_data[device_name].keys())
        elif len(path) == 2:
            # Individual session is new within an existing device
            device_name, sname = path
            result[device_name]["create"].append(sname)

    for item in diff.get("dictionary_item_removed", []):
        path = item.path(output_format="list")
        if len(path) == 1:
            # Entire device is absent in source — all its sessions are missing in source
            device_name = path[0]
            result[device_name]["delete"].extend(target_data[device_name].keys())
        elif len(path) == 2:
            # Individual session removed within an existing device
            device_name, sname = path
            result[device_name]["delete"].append(sname)

    for item in diff.get("values_changed", []):
        path = item.path(output_format="list")
        if len(path) == 3:
            device_name, sname, field = path
            result[device_name]["update"].setdefault(sname, {})[field] = {
                "old_value": item.t1,
                "new_value": item.t2,
            }

    for item in diff.get("type_changes", []):
        path = item.path(output_format="list")
        if len(path) == 3:
            device_name, sname, field = path
            result[device_name]["update"].setdefault(sname, {})[field] = {
                "old_value": item.t1,
                "new_value": item.t2,
            }

    for item in diff.get("iterable_item_added", []):
        path = item.path(output_format="list")
        if len(path) == 4:
            # Item added to a list field within an existing entity
            device_name, sname, field, _ = path
            entity_updates = result[device_name]["update"].setdefault(sname, {})
            if field not in entity_updates:
                entity_updates[field] = {
                    "old_value": target_data[device_name][sname][field],
                    "new_value": source_data[device_name][sname][field],
                }

    for device_name in all_devices:
        result[device_name]["create"] = sorted(result[device_name]["create"])
        result[device_name]["delete"] = sorted(result[device_name]["delete"])

        # calculate in sync entities
        src_entities_keys = set(source_data.get(device_name, {}).keys())
        tgt_entities_keys = set(target_data.get(device_name, {}).keys())
        common = src_entities_keys & tgt_entities_keys
        result[device_name]["in_sync"] = sorted(
            i for i in common if i not in result[device_name]["update"]
        )

    return result

cache_list(keys: str = '*', details: bool = False) -> Result ¤

Retrieve a list of cache keys, optionally with details about each key.

Parameters:

Name Type Description Default
keys str

A pattern to match cache keys against. Defaults to "*".

'*'
details bool

If True, include detailed information about each cache key. Defaults to False.

False

Returns:

Name Type Description
list Result

A list of cache keys or a list of dictionaries with detailed information if details is True.

Source code in norfab\workers\netbox_worker\netbox_worker.py
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
@Task(fastapi={"methods": ["GET"], "schema": NetboxFastApiArgs.model_json_schema()})
def cache_list(self, keys: str = "*", details: bool = False) -> Result:
    """
    Retrieve a list of cache keys, optionally with details about each key.

    Args:
        keys (str): A pattern to match cache keys against. Defaults to "*".
        details (bool): If True, include detailed information about each cache key. Defaults to False.

    Returns:
        list: A list of cache keys or a list of dictionaries with detailed information if `details` is True.
    """
    self.cache.expire()
    ret = Result(task=f"{self.name}:cache_list", result=[])
    for cache_key in self.cache:
        if fnmatchcase(cache_key, keys):
            if details:
                _, expires = self.cache.get(cache_key, expire_time=True)
                expires = datetime.fromtimestamp(expires)
                creation = expires - timedelta(seconds=self.cache_ttl)
                age = datetime.now() - creation
                ret.result.append(
                    {
                        "key": cache_key,
                        "age": str(age),
                        "creation": str(creation),
                        "expires": str(expires),
                    }
                )
            else:
                ret.result.append(cache_key)
    return ret

cache_clear(job: Job, key: str = None, keys: str = None) -> Result ¤

Clears specified keys from the cache.

Parameters:

Name Type Description Default
job Job

NorFab Job object containing relevant metadata

required
key str

A specific key to remove from the cache.

None
keys str

A glob pattern to match multiple keys to remove from the cache.

None

Returns:

Name Type Description
list Result

A list of keys that were successfully removed from the cache.

Raises:

Type Description
RuntimeError

If a specified key or a key matching the glob pattern could not be removed from the cache.

Notes:

  • If neither key nor keys is provided, the function will return a message indicating that there is nothing to clear.
  • If key is provided, it will attempt to remove that specific key from the cache.
  • If keys is provided, it will attempt to remove all keys matching the glob pattern from the cache.
Source code in norfab\workers\netbox_worker\netbox_worker.py
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
@Task(
    fastapi={"methods": ["DELETE"], "schema": NetboxFastApiArgs.model_json_schema()}
)
def cache_clear(self, job: Job, key: str = None, keys: str = None) -> Result:
    """
    Clears specified keys from the cache.

    Args:
        job: NorFab Job object containing relevant metadata
        key (str, optional): A specific key to remove from the cache.
        keys (str, optional): A glob pattern to match multiple keys to remove from the cache.

    Returns:
        list: A list of keys that were successfully removed from the cache.

    Raises:
        RuntimeError: If a specified key or a key matching the glob pattern could not be removed from the cache.

    Notes:

    - If neither `key` nor `keys` is provided, the function will return a message indicating that there is nothing to clear.
    - If `key` is provided, it will attempt to remove that specific key from the cache.
    - If `keys` is provided, it will attempt to remove all keys matching the glob pattern from the cache.
    """
    ret = Result(task=f"{self.name}:cache_clear", result=[])
    # check if has keys to clear
    if key == keys == None:  # noqa
        ret.result = "Nothing to clear, specify key or keys"
        return ret
    # remove specific key from cache
    if key:
        if key in self.cache:
            if self.cache.delete(key, retry=True):
                ret.result.append(key)
                log.debug(f"{self.name} - Removed cache key '{key}'")
                job.event(f"removed cache key '{key}'")
            else:
                raise RuntimeError(f"Failed to remove {key} from cache")
        else:
            log.warning(f"{self.name} - Cache key '{key}' not found")
            ret.messages.append(f"Key {key} not in cache.")
    # remove all keys matching glob pattern
    if keys:
        log.info(f"{self.name} - Clearing cache keys matching pattern '{keys}'")
        for cache_key in self.cache:
            if fnmatchcase(cache_key, keys):
                if self.cache.delete(cache_key, retry=True):
                    ret.result.append(cache_key)
                    log.info(f"{self.name} - Removed cache key '{cache_key}'")
                else:
                    raise RuntimeError(f"Failed to remove {cache_key} from cache")
        job.event(
            f"removed {len(ret.result)} cache key(s) matching pattern '{keys}'"
        )
    return ret

cache_get(job: Job, key: str = None, keys: str = None, raise_missing: bool = False) -> Result ¤

Retrieve values from the cache based on a specific key or a pattern of keys.

Parameters:

Name Type Description Default
job Job

NorFab Job object containing relevant metadata

required
key str

A specific key to retrieve from the cache.

None
keys str

A glob pattern to match multiple keys in the cache.

None
raise_missing bool

If True, raises a KeyError if the specific key is not found in the cache. Defaults to False.

False

Returns:

Name Type Description
dict Result

A dictionary containing the results of the cache retrieval. The keys are the cache keys and the values are the corresponding cache values.

Raises:

Type Description
KeyError

If raise_missing is True and the specific key is not found in the cache.

Source code in norfab\workers\netbox_worker\netbox_worker.py
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
@Task(fastapi={"methods": ["GET"], "schema": NetboxFastApiArgs.model_json_schema()})
def cache_get(
    self, job: Job, key: str = None, keys: str = None, raise_missing: bool = False
) -> Result:
    """
    Retrieve values from the cache based on a specific key or a pattern of keys.

    Args:
        job: NorFab Job object containing relevant metadata
        key (str, optional): A specific key to retrieve from the cache.
        keys (str, optional): A glob pattern to match multiple keys in the cache.
        raise_missing (bool, optional): If True, raises a KeyError if the specific
            key is not found in the cache. Defaults to False.

    Returns:
        dict: A dictionary containing the results of the cache retrieval. The keys are
            the cache keys and the values are the corresponding cache values.

    Raises:
        KeyError: If raise_missing is True and the specific key is not found in the cache.
    """
    ret = Result(task=f"{self.name}:cache_clear", result={})
    # get specific key from cache
    if key:
        if key in self.cache:
            ret.result[key] = self.cache[key]
        elif raise_missing:
            raise KeyError(f"Key {key} not in cache.")
    # get all keys matching glob pattern
    if keys:
        for cache_key in self.cache:
            if fnmatchcase(cache_key, keys):
                ret.result[cache_key] = self.cache[cache_key]
    return ret

rest(job: Job, instance: Union[None, str] = None, method: str = 'get', api: str = '', **kwargs: Any) -> Result ¤

Sends a request to the Netbox REST API.

Parameters:

Name Type Description Default
instance str

The Netbox instance name to get parameters for.

None
method str

The HTTP method to use for the request (e.g., 'get', 'post'). Defaults to "get".

'get'
api str

The API endpoint to send the request to. Defaults to "".

''
**kwargs Any

Additional arguments to pass to the request (e.g., params, data, json).

{}

Returns:

Type Description
Result

Union[dict, list]: The JSON response from the API, parsed into a dictionary or list.

Raises:

Type Description
HTTPError

If the HTTP request returned an unsuccessful status code.

Source code in norfab\workers\netbox_worker\netbox_worker.py
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
@Task(
    fastapi={"methods": ["POST"], "schema": NetboxFastApiArgs.model_json_schema()}
)
def rest(
    self,
    job: Job,
    instance: Union[None, str] = None,
    method: str = "get",
    api: str = "",
    **kwargs: Any,
) -> Result:
    """
    Sends a request to the Netbox REST API.

    Args:
        instance (str, optional): The Netbox instance name to get parameters for.
        method (str, optional): The HTTP method to use for the request (e.g., 'get', 'post'). Defaults to "get".
        api (str, optional): The API endpoint to send the request to. Defaults to "".
        **kwargs: Additional arguments to pass to the request (e.g., params, data, json).

    Returns:
        Union[dict, list]: The JSON response from the API, parsed into a dictionary or list.

    Raises:
        requests.exceptions.HTTPError: If the HTTP request returned an unsuccessful status code.
    """
    ret = Result(task=f"{self.name}:rest", result={})
    nb_params = self._get_instance_params(instance)
    api = api.strip("/")

    log.info(f"{self.name} - REST {method.upper()} '{nb_params['url']}/api/{api}/'")

    # send request to Netbox REST API
    response = getattr(requests, method)(
        url=f"{nb_params['url']}/api/{api}/",
        headers={
            "Content-Type": "application/json",
            "Accept": "application/json",
            "Authorization": f"Token {nb_params['token']}",
        },
        verify=nb_params.get("ssl_verify", True),
        **kwargs,
    )

    try:
        response.raise_for_status()
    except Exception as e:
        log.error(
            f"{self.name} - REST {method.upper()} '{nb_params['url']}/api/{api}/' failed, status {response.status_code}, error: {e}"
        )
        ret.result = response.status_code
        return ret

    try:
        ret.result = response.json()
    except Exception as e:
        log.debug(f"Failed to decode json, error: {e}")
        ret.result = response.text if response.text else response.status_code

    return ret

compare_netbox_object_state(desired_state: dict, current_state: dict, ignore_fields: Union[list, None] = None, ignore_if_not_empty: Union[list, None] = None, diff: dict = None) -> tuple ¤

Compare desired state with current NetBox object state and return fields that need updating.

Parameters:

Name Type Description Default
desired_state dict

Dictionary with desired field values.

required
current_state dict

Dictionary with current NetBox object field values.

required
ignore_fields list

List of field names to ignore completely.

None
ignore_if_not_empty list

List of field names to ignore if they have non-empty values in current_state (won't overwrite existing data).

None
diff dict

Dictionary to accumulate field differences. If not provided, a new dictionary will be created.

None

Returns:

Name Type Description
tuple tuple

A tuple containing: - updates (dict): Dictionary containing only fields that need to be updated with their new values. - diff (dict): Dictionary containing the differences with '+' (new value) and '-' (old value) keys.

Example

desired = {"serial": "ABC123", "asset_tag": "TAG001", "comments": "New comment"} current = {"serial": "OLD123", "asset_tag": "", "comments": "Existing"} ignore_fields = [] ignore_if_not_empty = ["comments"] updates, diff = compare_netbox_object_state(desired, current, ignore_fields, ignore_if_not_empty) updates

Source code in norfab\workers\netbox_worker\netbox_worker.py
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
def compare_netbox_object_state(
    self,
    desired_state: dict,
    current_state: dict,
    ignore_fields: Union[list, None] = None,
    ignore_if_not_empty: Union[list, None] = None,
    diff: dict = None,
) -> tuple:
    """
    Compare desired state with current NetBox object state and return fields that need updating.

    Args:
        desired_state (dict): Dictionary with desired field values.
        current_state (dict): Dictionary with current NetBox object field values.
        ignore_fields (list, optional): List of field names to ignore completely.
        ignore_if_not_empty (list, optional): List of field names to ignore if they have
            non-empty values in current_state (won't overwrite existing data).
        diff (dict, optional): Dictionary to accumulate field differences. If not provided,
            a new dictionary will be created.

    Returns:
        tuple: A tuple containing:
            - updates (dict): Dictionary containing only fields that need to be updated with their new values.
            - diff (dict): Dictionary containing the differences with '+' (new value) and '-' (old value) keys.

    Example:
        >>> desired = {"serial": "ABC123", "asset_tag": "TAG001", "comments": "New comment"}
        >>> current = {"serial": "OLD123", "asset_tag": "", "comments": "Existing"}
        >>> ignore_fields = []
        >>> ignore_if_not_empty = ["comments"]
        >>> updates, diff = compare_netbox_object_state(desired, current, ignore_fields, ignore_if_not_empty)
        >>> updates
        {"serial": "ABC123", "asset_tag": "TAG001"}
    """
    ignore_fields = ignore_fields or []
    ignore_if_not_empty = ignore_if_not_empty or []
    updates = {}
    diff = diff or {}

    for field, desired_value in desired_state.items():
        # Skip if field is in ignore list
        if field in ignore_fields:
            continue

        # Get current value, default to None if field doesn't exist
        current_value = current_state.get(field)

        # Skip if field is in ignore_if_not_empty and current value is not empty
        if field in ignore_if_not_empty and current_value:
            continue

        # Compare values and add to updates if different
        if current_value != desired_value:
            updates[field] = desired_value
            diff[field] = {
                "-": current_value,
                "+": desired_value,
            }

    return updates, diff

get_nornir_hosts(kwargs: dict, timeout: int) -> List[str] ¤

Retrieves a list of unique Nornir hosts from Nornir service based on provided filter criteria.

Parameters:

Name Type Description Default
kwargs dict

Dictionary of keyword arguments, where keys starting with 'F' are used as filters.

required
timeout int

Timeout value (in seconds) for the job execution.

required

Returns:

Name Type Description
list List[str]

Sorted list of unique Nornir host names that match the filter criteria.

Notes
  • Only filters with keys starting with 'F' are considered.
  • Hosts are collected from all workers where the job did not fail.
Source code in norfab\workers\netbox_worker\netbox_worker.py
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
def get_nornir_hosts(self, kwargs: dict, timeout: int) -> List[str]:
    """
    Retrieves a list of unique Nornir hosts from Nornir service based on provided filter criteria.

    Args:
        kwargs (dict): Dictionary of keyword arguments, where keys starting with 'F' are used as filters.
        timeout (int): Timeout value (in seconds) for the job execution.

    Returns:
        list: Sorted list of unique Nornir host names that match the filter criteria.

    Notes:
        - Only filters with keys starting with 'F' are considered.
        - Hosts are collected from all workers where the job did not fail.
    """
    ret = []
    filters = {k: v for k, v in kwargs.items() if k.startswith("F")}
    if filters:
        log.info(
            f"{self.name} - get_nornir_hosts querying Nornir service with filters: {filters}"
        )
        nornir_hosts = self.client.run_job(
            "nornir",
            "get_nornir_hosts",
            kwargs=filters,
            workers="all",
            timeout=timeout,
        )
        for w, r in nornir_hosts.items():
            if r["failed"] is False and isinstance(r["result"], list):
                ret.extend(r["result"])
            elif r["failed"]:
                log.warning(
                    f"{self.name} - Get nornir hosts worker '{w}' failed: {r.get('errors')}"
                )

    unique_hosts = list(sorted(set(ret)))
    log.info(f"{self.name} - get_nornir_hosts resolved {len(unique_hosts)} host(s)")
    return unique_hosts