Skip to content

Nornir Worker

WatchDog(worker) ¤

Bases: WorkerWatchDog

Class to monitor Nornir worker performance.

Parameters:

Name Type Description Default
worker Worker

The worker instance that this NornirWorker will manage.

required

Attributes:

Name Type Description
worker Worker

The worker instance being monitored.

connections_idle_timeout int

Timeout value for idle connections.

connections_data dict

Dictionary to store connection use timestamps.

started_at float

Timestamp when the watchdog was started.

idle_connections_cleaned int

Counter for idle connections cleaned.

dead_connections_cleaned int

Counter for dead connections cleaned.

watchdog_tasks list

List of tasks for the watchdog to run in a given order.

Source code in norfab\workers\nornir_worker\nornir_worker.py
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def __init__(self, worker) -> None:
    super().__init__(worker)
    self.worker = worker
    self.connections_idle_timeout = worker.nornir_worker_inventory.get(
        "connections_idle_timeout", None
    )
    self.connections_data = {}  # store connections use timestamps

    # stats attributes
    self.idle_connections_cleaned = 0
    self.dead_connections_cleaned = 0

    # list of tasks for watchdog to run in given order
    self.watchdog_tasks = [
        self.connections_clean,
        self.connections_keepalive,
    ]

stats() -> Dict ¤

Collects and returns statistics about the worker.

Returns:

Name Type Description
dict Dict

A dictionary containing the following keys:

  • runs (int): The number of runs executed by the worker.
  • timestamp (str): The current time in a human-readable format.
  • alive (int): The time in seconds since the worker started.
  • dead_connections_cleaned (int): The number of dead connections cleaned.
  • idle_connections_cleaned (int): The number of idle connections cleaned.
  • worker_ram_usage_mbyte (float): The current RAM usage of the worker in megabytes.
Source code in norfab\workers\nornir_worker\nornir_worker.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def stats(self) -> Dict:
    """
    Collects and returns statistics about the worker.

    Returns:
        dict: A dictionary containing the following keys:

            - runs (int): The number of runs executed by the worker.
            - timestamp (str): The current time in a human-readable format.
            - alive (int): The time in seconds since the worker started.
            - dead_connections_cleaned (int): The number of dead connections cleaned.
            - idle_connections_cleaned (int): The number of idle connections cleaned.
            - worker_ram_usage_mbyte (float): The current RAM usage of the worker in megabytes.
    """
    return {
        "watchdog_runs": self.runs,
        "timestamp": time.ctime(),
        "uptime": format_duration(int(time.time() - self.started_at)),
        "uptime_seconds": int(time.time() - self.started_at),
        "dead_connections_cleaned": self.dead_connections_cleaned,
        "idle_connections_cleaned": self.idle_connections_cleaned,
        "worker_ram_usage_mbyte": self.get_ram_usage(),
        "nornir_hosts": (
            len(self.worker.nr.inventory.hosts) if self.worker.nr else 0
        ),
        "sid_inventory_status": self.worker.status.get("sid_inventory_status"),
        "netbox_inventory_status": self.worker.status.get(
            "netbox_inventory_status"
        ),
        "containerlab_inventory_status": self.worker.status.get(
            "containerlab_inventory_status"
        ),
    }

configuration() -> Dict ¤

Returns the configuration settings for the worker.

Returns:

Name Type Description
Dict Dict

A dictionary containing the configuration settings:

  • "watchdog_interval" (int): The interval for the watchdog timer.
  • "connections_idle_timeout" (int): The timeout for idle connections.
Source code in norfab\workers\nornir_worker\nornir_worker.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def configuration(self) -> Dict:
    """
    Returns the configuration settings for the worker.

    Returns:
        Dict: A dictionary containing the configuration settings:

            - "watchdog_interval" (int): The interval for the watchdog timer.
            - "connections_idle_timeout" (int): The timeout for idle connections.
    """
    return {
        "watchdog_interval": self.watchdog_interval,
        "connections_idle_timeout": self.connections_idle_timeout,
    }

connections_get() -> Dict ¤

Retrieve the current connections data.

Returns:

Name Type Description
Dict Dict

A dictionary containing the current connections data.

Source code in norfab\workers\nornir_worker\nornir_worker.py
150
151
152
153
154
155
156
157
158
159
def connections_get(self) -> Dict:
    """
    Retrieve the current connections data.

    Returns:
        Dict: A dictionary containing the current connections data.
    """
    return {
        "connections": self.connections_data,
    }

connections_update(nr: Any, plugin: str) -> None ¤

Function to update connection use timestamps for each host

Parameters:

Name Type Description Default
nr Any

Nornir object

required
plugin str

connection plugin name

required
Source code in norfab\workers\nornir_worker\nornir_worker.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def connections_update(self, nr: Any, plugin: str) -> None:
    """
    Function to update connection use timestamps for each host

    Args:
        nr: Nornir object
        plugin: connection plugin name
    """
    conn_stats = {
        "last_use": None,
        "last_keepalive": None,
        "keepalive_count": 0,
    }
    for host_name in nr.inventory.hosts:
        self.connections_data.setdefault(host_name, {})
        self.connections_data[host_name].setdefault(plugin, conn_stats.copy())
        self.connections_data[host_name][plugin]["last_use"] = time.ctime()
    log.info(
        f"{self.worker.name} - updated connections use timestamps for '{plugin}'"
    )

connections_clean() -> None ¤

Cleans up idle connections based on the configured idle timeout.

This method checks for connections that have been idle for longer than the specified connections_idle_timeout and disconnects them. The behavior varies depending on the value of connections_idle_timeout:

  • If connections_idle_timeout is None, no connections are disconnected.
  • If connections_idle_timeout is 0, all connections are disconnected.
  • If connections_idle_timeout is greater than 0, only connections that have been idle for longer than the specified timeout are disconnected.

The method acquires a lock to ensure thread safety while modifying the connections data. It logs the disconnection actions and updates the idle_connections_cleaned counter.

Raises:

Type Description
Exception

If an error occurs while attempting to disconnect idle connections, an error message is logged.

Source code in norfab\workers\nornir_worker\nornir_worker.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
def connections_clean(self) -> None:
    """
    Cleans up idle connections based on the configured idle timeout.

    This method checks for connections that have been idle for longer than the
    specified `connections_idle_timeout` and disconnects them. The behavior
    varies depending on the value of `connections_idle_timeout`:

    - If `connections_idle_timeout` is None, no connections are disconnected.
    - If `connections_idle_timeout` is 0, all connections are disconnected.
    - If `connections_idle_timeout` is greater than 0, only connections that
      have been idle for longer than the specified timeout are disconnected.

    The method acquires a lock to ensure thread safety while modifying the
    connections data. It logs the disconnection actions and updates the
    `idle_connections_cleaned` counter.

    Raises:
        Exception: If an error occurs while attempting to disconnect idle connections, an error message is logged.
    """
    # dictionary keyed by plugin name and value as a list of hosts
    disconnect = {}
    if not self.worker.connections_lock.acquire(blocking=False):
        return
    try:
        # if idle timeout not set, connections don't age out
        if self.connections_idle_timeout is None:
            disconnect = {}
        # disconnect all connections for all hosts
        elif self.connections_idle_timeout == 0:
            disconnect = {"all": list(self.connections_data.keys())}
        # only disconnect aged/idle connections
        elif self.connections_idle_timeout > 0:
            for host_name, plugins in self.connections_data.items():
                for plugin, conn_data in plugins.items():
                    last_use = time.mktime(time.strptime(conn_data["last_use"]))
                    age = time.time() - last_use
                    if age > self.connections_idle_timeout:
                        disconnect.setdefault(plugin, [])
                        disconnect[plugin].append(host_name)
        # run task to disconnect connections for aged hosts
        for plugin, hosts in disconnect.items():
            if not hosts:
                continue
            aged_hosts = FFun(self.worker.nr, FL=hosts)
            aged_hosts.run(task=nr_connections, call="close", conn_name=plugin)
            log.debug(
                f"{self.worker.name} watchdog, disconnected '{plugin}' "
                f"connections for '{', '.join(hosts)}'"
            )
            self.idle_connections_cleaned += len(hosts)
            # wipe out connections data if all connection closed
            if plugin == "all":
                self.connections_data = {}
                break
            # remove disconnected plugin from host's connections_data
            for host in hosts:
                self.connections_data[host].pop(plugin)
                if not self.connections_data[host]:
                    self.connections_data.pop(host)
    except Exception as e:
        msg = f"{self.worker.name} - watchdog failed to close idle connections, error: {e}"
        log.error(msg)
    finally:
        self.worker.connections_lock.release()

connections_keepalive() -> None ¤

Keepalive connections and clean up dead connections if any.

This method performs the following tasks:

  • If connections_idle_timeout is 0, it returns immediately without performing any actions.
  • Attempts to acquire a lock on worker.connections_lock to ensure thread safety.
  • Logs a debug message indicating that the keepalive process is running.
  • Uses HostsKeepalive to check and clean up dead connections, updating the dead_connections_cleaned counter.
  • Removes connections that are no longer present in the Nornir inventory.
  • Removes hosts from connections_data if they have no remaining connections.
  • Updates the keepalive statistics for each connection plugin, including the last keepalive time and keepalive count.
  • Logs an error message if an exception occurs during the keepalive process.
  • Releases the lock on worker.connections_lock in the finally block to ensure it is always released.

Raises:

Type Description
Exception

If an error occurs during the keepalive process, it is logged as an error.

Source code in norfab\workers\nornir_worker\nornir_worker.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
def connections_keepalive(self) -> None:
    """
    Keepalive connections and clean up dead connections if any.

    This method performs the following tasks:

    - If `connections_idle_timeout` is 0, it returns immediately without performing any actions.
    - Attempts to acquire a lock on `worker.connections_lock` to ensure thread safety.
    - Logs a debug message indicating that the keepalive process is running.
    - Uses `HostsKeepalive` to check and clean up dead connections, updating the `dead_connections_cleaned` counter.
    - Removes connections that are no longer present in the Nornir inventory.
    - Removes hosts from `connections_data` if they have no remaining connections.
    - Updates the keepalive statistics for each connection plugin, including the last keepalive time and keepalive count.
    - Logs an error message if an exception occurs during the keepalive process.
    - Releases the lock on `worker.connections_lock` in the `finally` block to ensure it is always released.

    Raises:
        Exception: If an error occurs during the keepalive process, it is logged as an error.
    """
    if self.connections_idle_timeout == 0:  # do not keepalive if idle is 0
        return
    if not self.worker.connections_lock.acquire(blocking=False):
        return
    try:
        log.debug(f"{self.worker.name} - watchdog running connections keepalive")
        stats = HostsKeepalive(self.worker.nr)
        self.dead_connections_cleaned += stats["dead_connections_cleaned"]
        # remove connections that are no longer present in Nornir inventory
        for host_name, host_connections in self.connections_data.items():
            # check if host is still in Nornir inventory
            if host_name not in self.worker.nr.inventory.hosts:
                self.connections_data.pop(host_name, None)
                continue
            # clean up specific connections for host
            for connection_name in list(host_connections.keys()):
                if not self.worker.nr.inventory.hosts[host_name].connections.get(
                    connection_name
                ):
                    self.connections_data[host_name].pop(connection_name)
        # remove host if no connections left
        for host_name in list(self.connections_data.keys()):
            if self.connections_data[host_name] == {}:
                self.connections_data.pop(host_name)
        # update connections statistics
        for plugins in self.connections_data.values():
            for plugin in plugins.values():
                plugin["last_keepalive"] = time.ctime()
                plugin["keepalive_count"] += 1
    except Exception as e:
        msg = f"{self.worker.name} - watchdog HostsKeepalive check error: {e}"
        log.error(msg)
    finally:
        self.worker.connections_lock.release()

NornirWorker(inventory: str, broker: str, worker_name: str, exit_event=None, init_done_event=None, log_level: str = None, log_queue: object = None) ¤

Bases: NFPWorker, TaskTask, CliTask, CfgTask, TestTask, NetworkTask, InventoryTasks, ParseTask, FileCopyTask, NetconfTask, SnmpTask

NornirWorker class for managing Nornir Service tasks.

Parameters:

Name Type Description Default
inventory str

Path to the inventory file.

required
broker str

Broker address.

required
worker_name str

Name of the worker.

required
exit_event Event

Event to signal worker exit. Defaults to None.

None
init_done_event Event

Event to signal initialization completion. Defaults to None.

None
log_level str

Logging level. Defaults to None.

None
log_queue object

Queue for logging. Defaults to None.

None

Attributes:

Name Type Description
init_done_event Event

Event to signal initialization completion.

tf_base_path str

Base path for files folder saved using tf processor.

connections_lock Lock

Lock for managing connections.

nornir_inventory dict

Inventory data for Nornir.

watchdog WatchDog

Watchdog instance for monitoring.

Source code in norfab\workers\nornir_worker\nornir_worker.py
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
def __init__(
    self,
    inventory: str,
    broker: str,
    worker_name: str,
    exit_event=None,
    init_done_event=None,
    log_level: str = None,
    log_queue: object = None,
) -> None:
    super().__init__(
        inventory, broker, SERVICE, worker_name, exit_event, log_level, log_queue
    )
    self.init_done_event = init_done_event
    self.tf_base_path = os.path.join(self.base_dir, "tf")
    self.status.update(
        {
            "netbox_inventory_status": None,
            "containerlab_inventory_status": None,
        }
    )

    # misc attributes
    self.connections_lock = Lock()

    # initiate Nornir
    self.refresh_nornir(job=Job())

    # initiate watchdog
    self.watchdog = WatchDog(self)
    self.watchdog.start()

    # run startup hooks
    for f in self.inventory.hooks.get("nornir-startup", []):
        f["function"](self, *f.get("args", []), **f.get("kwargs", {}))

    if self.init_done_event is not None:
        self.init_done_event.set()

    log.info(f"{self.name} - Started")

worker_exit() -> None ¤

Executes all functions registered under the "nornir-exit" hook in the inventory.

This method iterates through the list of hooks associated with the "nornir-exit" key in the inventory's hooks.

For each hook, it calls the function specified in the hook, passing the current instance (self) as the first argument, followed by any additional positional and keyword arguments specified in the hook.

Source code in norfab\workers\nornir_worker\nornir_worker.py
381
382
383
384
385
386
387
388
389
390
391
392
393
394
def worker_exit(self) -> None:
    """
    Executes all functions registered under the "nornir-exit" hook in the inventory.

    This method iterates through the list of hooks associated with the "nornir-exit"
    key in the inventory's hooks.

    For each hook, it calls the function specified in the hook, passing the current
    instance (`self`) as the first argument, followed by any additional positional
    and keyword arguments specified in the hook.
    """
    # run exit hooks
    for f in self.inventory.hooks.get("nornir-exit", []):
        f["function"](self, *f.get("args", []), **f.get("kwargs", {}))

init_nornir(inventory: dict) -> None ¤

Initializes the Nornir automation framework with the provided inventory.

This method first closes any existing Nornir connections if present, optionally emitting a progress event. It then creates a new Nornir instance using the supplied inventory dictionary, which should contain configuration for logging, runner, hosts, groups, defaults, and user-defined settings.

Parameters:

Name Type Description Default
inventory dict

A dictionary containing Nornir inventory and configuration options.

required
Source code in norfab\workers\nornir_worker\nornir_worker.py
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
def init_nornir(self, inventory: dict) -> None:
    """
    Initializes the Nornir automation framework with the provided inventory.

    This method first closes any existing Nornir connections if present, optionally emitting a progress event.
    It then creates a new Nornir instance using the supplied inventory dictionary, which should contain
    configuration for logging, runner, hosts, groups, defaults, and user-defined settings.

    Args:
        inventory (dict): A dictionary containing Nornir inventory and configuration options.
    """
    # clean up existing Nornir instance
    with self.connections_lock:
        if self.nr is not None and self.nr.inventory.hosts:
            self.nr.close_connections()

        # initiate Nornir
        self.nr = InitNornir(
            logging=inventory.get("logging", {"enabled": False}),
            runner=inventory.get("runner", {}),
            inventory={
                "plugin": "DictInventory",
                "options": {
                    "hosts": inventory.get("hosts", {}),
                    "groups": inventory.get("groups", {}),
                    "defaults": inventory.get("defaults", {}),
                },
            },
            user_defined=inventory.get("user_defined", {}),
        )

filter_hosts_and_validate(kwargs: Dict[str, Any], ret: Result) -> Tuple[Any, Result] ¤

Helper method to filter hosts and validate results.

Returns:

Name Type Description
tuple Tuple[Any, Result]

(filtered_nornir, Result) where Result status set to no_match if no hosts matched.

Source code in norfab\workers\nornir_worker\nornir_worker.py
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
def filter_hosts_and_validate(
    self, kwargs: Dict[str, Any], ret: Result
) -> Tuple[Any, Result]:
    """
    Helper method to filter hosts and validate results.

    Returns:
        tuple: (filtered_nornir, Result) where Result status set to
            `no_match` if no hosts matched.
    """
    self.nr.data.reset_failed_hosts()  # reset failed hosts before filtering
    filters = {k: kwargs.pop(k) for k in list(kwargs.keys()) if k in FFun_functions}
    filtered_nornir = FFun(self.nr, **filters)

    if not filtered_nornir.inventory.hosts:
        msg = (
            f"{self.name} - nothing to do, no hosts matched by filters '{filters}'"
        )
        log.debug(msg)
        ret.messages.append(msg)
        ret.status = "no_match"

    return filtered_nornir, ret

load_job_data(job_data: Any) -> Dict ¤

Helper function to download job data YAML files and load it.

Parameters:

Name Type Description Default
job_data str

job data NorFab file path to download and load using YAML.

required

Returns:dict data: The job data loaded from the YAML string.

Raises:

Type Description
FileNotFoundError

If the job data is a URL and the file download fails.

Source code in norfab\workers\nornir_worker\nornir_worker.py
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
def load_job_data(self, job_data: Any) -> Dict:
    """
    Helper function to download job data YAML files and load it.

    Args:
        job_data (str): job data NorFab file path to download and load using YAML.

    Returns:dict
        data: The job data loaded from the YAML string.

    Raises:
        FileNotFoundError: If the job data is a URL and the file download fails.
    """
    if self.is_url(job_data):
        job_data = self.fetch_file(job_data)
        if job_data is None:
            msg = f"{self.name} - '{job_data}' job data file download failed"
            raise FileNotFoundError(msg)
        job_data = yaml.safe_load(job_data)

    return job_data

jinja2_network_hosts(network: str, pfxlen: bool = False) -> list ¤

Custom Jinja2 filter that return a list of hosts for a given IP network.

Parameters:

Name Type Description Default
network str

The network address in CIDR notation.

required
pfxlen bool

If True, include the prefix length in the returned host addresses. Defaults to False.

False

Returns:

Name Type Description
list list

A list of host addresses as strings. If pfxlen is True, each address will include the prefix length.

Source code in norfab\workers\nornir_worker\nornir_worker.py
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
def jinja2_network_hosts(self, network: str, pfxlen: bool = False) -> list:
    """
    Custom Jinja2 filter that return a list of hosts for a given IP network.

    Args:
        network (str): The network address in CIDR notation.
        pfxlen (bool, optional): If True, include the prefix length
            in the returned host addresses. Defaults to False.

    Returns:
        list: A list of host addresses as strings. If pfxlen is True,
            each address will include the prefix length.
    """
    ret = []
    ip_interface = ipaddress.ip_interface(network)
    prefixlen = ip_interface.network.prefixlen
    for ip in ip_interface.network.hosts():
        ret.append(f"{ip}/{prefixlen}" if pfxlen else str(ip))
    return ret

jinja2_nb_create_ip(prefix: str, device: str = None, interface: str = None, **kwargs: object) -> str ¤

Jinja2 filter to get or create next available IP address from prefix using Netbox service.

Source code in norfab\workers\nornir_worker\nornir_worker.py
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
def jinja2_nb_create_ip(
    self, prefix: str, device: str = None, interface: str = None, **kwargs: object
) -> str:
    """
    Jinja2 filter to get or create next available IP address from
    prefix using Netbox service.
    """
    kwargs["prefix"] = prefix
    kwargs["device"] = device
    kwargs["interface"] = interface
    reply = self.client.run_job(
        "netbox",
        "create_ip",
        kwargs=kwargs,
        workers="any",
        timeout=30,
    )
    # reply is a dict of {worker_name: results_dict}
    res = list(reply.values())[0]
    if res["failed"]:
        raise RuntimeError(res["messages"])
    return res["result"]["address"]

jinja2_nb_create_prefix(parent: str, description: str, prefixlen: int = 30, **kwargs: object) -> str ¤

Jinja2 filter to get or create next available prefix from parent prefix using Netbox service.

Source code in norfab\workers\nornir_worker\nornir_worker.py
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
def jinja2_nb_create_prefix(
    self, parent: str, description: str, prefixlen: int = 30, **kwargs: object
) -> str:
    """
    Jinja2 filter to get or create next available prefix from
    parent prefix using Netbox service.
    """
    kwargs["parent"] = parent
    kwargs["description"] = description
    kwargs["prefixlen"] = prefixlen
    reply = self.client.run_job(
        "netbox",
        "create_prefix",
        kwargs=kwargs,
        workers="any",
        timeout=30,
    )
    # reply is a dict of {worker_name: results_dict}
    res = list(reply.values())[0]
    if res["failed"]:
        raise RuntimeError(res["messages"])

    return res["result"]["prefix"]

jinja2_call_netbox(netbox_task: str) -> callable ¤

Returns a callable function to execute arbitrary NetBox service task.

Source code in norfab\workers\nornir_worker\nornir_worker.py
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
def jinja2_call_netbox(self, netbox_task: str) -> callable:
    """
    Returns a callable function to execute arbitrary NetBox service task.
    """

    def call_netbox(*args: object, **kwargs: object) -> dict:
        reply = self.client.run_job(
            "netbox",
            netbox_task,
            args=args,
            kwargs=kwargs,
            workers="any",
            timeout=300,
        )
        res = list(reply.values())[0]

        # check if has an error
        if res["failed"]:
            raise RuntimeError(res["messages"])

        # return result for single host only
        if len(kwargs.get("devices", [])) == 1:
            return res["result"][kwargs["devices"][0]]
        # return full results
        else:
            return res["result"]

    return call_netbox

add_jinja2_netbox() -> Dict ¤

Aggregates Jinja2 NetBox-related methods and functions into a dictionary for the ease of use within Jinja2 templates.

Source code in norfab\workers\nornir_worker\nornir_worker.py
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
def add_jinja2_netbox(self) -> Dict:
    """
    Aggregates Jinja2 NetBox-related methods and functions into a dictionary
    for the ease of use within Jinja2 templates.
    """
    return {
        "get_connections": self.jinja2_call_netbox("get_connections"),
        "get_interfaces": self.jinja2_call_netbox("get_interfaces"),
        "get_circuits": self.jinja2_call_netbox("get_circuits"),
        "get_devices": self.jinja2_call_netbox("get_devices"),
        "rest": self.jinja2_call_netbox("rest"),
        "graphql": self.jinja2_call_netbox("graphql"),
        "create_ip": self.jinja2_nb_create_ip,
        "create_prefix": self.jinja2_nb_create_prefix,
    }

add_jinja2_filters() -> Dict ¤

Adds custom filters for use in Jinja2 templates using | syntaxis.

Returns:

Name Type Description
dict Dict

A dictionary where the keys are the names of the filters and the values are the corresponding filter functions.

  • "nb_create_ip": Method to get the next IP address.
  • "nb_create_prefix": Method to get next available prefix.
  • "network_hosts": Method to get IP network hosts.
Source code in norfab\workers\nornir_worker\nornir_worker.py
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
def add_jinja2_filters(self) -> Dict:
    """
    Adds custom filters for use in Jinja2 templates using `|` syntaxis.

    Returns:
        dict (Dict): A dictionary where the keys are the names of the filters
            and the values are the corresponding filter functions.

            - "nb_create_ip": Method to get the next IP address.
            - "nb_create_prefix": Method to get next available prefix.
            - "network_hosts": Method to get IP network hosts.
    """
    return {
        "netbox.create_ip": self.jinja2_nb_create_ip,
        "netbox.create_prefix": self.jinja2_nb_create_prefix,
        "network_hosts": self.jinja2_network_hosts,
    }

get_version() -> Result ¤

Retrieve the versions of various libraries and system information.

This method collects the version information of a predefined set of libraries and system details such as the Python version and platform. It attempts to import each library and fetch its version. If a library is not found, it is skipped.

Returns:

Name Type Description
dict Result

a dictionary with the library names as keys and their respective version numbers as values. If a library is not found, its value will be an empty string.

Source code in norfab\workers\nornir_worker\nornir_worker.py
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
@Task(
    fastapi={"methods": ["GET"]},
    input=GetVersionInput,
    output=GetVersionResult,
    mcp={
        "annotations": {
            "title": "Get Version",
            "readOnlyHint": True,
            "destructiveHint": False,
            "idempotentHint": True,
            "openWorldHint": False,
        }
    },
)
def get_version(self) -> Result:
    """
    Retrieve the versions of various libraries and system information.

    This method collects the version information of a predefined set of libraries
    and system details such as the Python version and platform. It attempts to
    import each library and fetch its version. If a library is not found, it is
    skipped.

    Returns:
        dict: a dictionary with the library names as keys and their respective
            version numbers as values. If a library is not found, its value will be
            an empty string.
    """
    libs = {
        "norfab": "",
        "scrapli": "",
        "scrapli-netconf": "",
        "scrapli-community": "",
        "paramiko": "",
        "netmiko": "",
        "napalm": "",
        "nornir": "",
        "ncclient": "",
        "nornir-netmiko": "",
        "nornir-napalm": "",
        "nornir-scrapli": "",
        "nornir-utils": "",
        "tabulate": "",
        "xmltodict": "",
        "puresnmp": "",
        "pygnmi": "",
        "pyyaml": "",
        "jmespath": "",
        "jinja2": "",
        "ttp": "",
        "nornir-salt": "",
        "lxml": "",
        "ttp-templates": "",
        "ntc-templates": "",
        "cerberus": "",
        "pydantic": "",
        "requests": "",
        "textfsm": "",
        "N2G": "",
        "dnspython": "",
        "pythonping": "",
        "python": sys.version.split(" ")[0],
        "platform": sys.platform,
    }
    # get version of packages installed
    for pkg in libs.keys():
        try:
            libs[pkg] = importlib.metadata.version(pkg)
        except importlib.metadata.PackageNotFoundError:
            pass

    return Result(result=libs)

get_watchdog_connections() -> Result ¤

Retrieve the list of connections currently managed by watchdog.

Returns:

Name Type Description
Result Result

An instance of the Result class containing the current watchdog connections.

Source code in norfab\workers\nornir_worker\nornir_worker.py
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
@Task(
    fastapi={"methods": ["GET"]},
    input=GetWatchdogConnectionsInput,
    output=GetWatchdogConnectionsResult,
    mcp={
        "annotations": {
            "title": "Get Watchdog Connections",
            "readOnlyHint": True,
            "destructiveHint": False,
            "idempotentHint": True,
            "openWorldHint": True,
        }
    },
)
def get_watchdog_connections(self) -> Result:
    """
    Retrieve the list of connections currently managed by watchdog.

    Returns:
        Result: An instance of the Result class containing the current
            watchdog connections.
    """
    return Result(result=self.watchdog.connections_get())

refresh_nornir(job: Job) -> Result ¤

Refreshes the Nornir instance by reloading the inventory from configured sources.

This method performs the following steps:

1. Loads the inventory configuration from the broker.
2. If Netbox is specified in the inventory, pulls inventory data from Netbox.
3. If Containerlab is specified in the inventory, pulls inventory data from Containerlab.
4. Initializes the Nornir instance with the refreshed inventory.

Args: job: NorFab Job object containing relevant metadata

The inventory configuration is expected to be a dictionary with the following keys:

  • "logging": A dictionary specifying logging configuration (default: {"enabled": False}).
  • "runner": A dictionary specifying runner options (default: {}).
  • "hosts": A dictionary specifying host details (default: {}).
  • "groups": A dictionary specifying group details (default: {}).
  • "defaults": A dictionary specifying default values (default: {}).
  • "user_defined": A dictionary specifying user-defined options (default: {}).

Returns:

Name Type Description
Result Result

A Result object indicating the outcome of the refresh operation.

Source code in norfab\workers\nornir_worker\nornir_worker.py
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
@Task(
    fastapi={"methods": ["POST"]},
    input=RefreshNornirInput,
    output=RefreshNornirResult,
    mcp={
        "annotations": {
            "title": "Refresh Nornir Inventory",
            "readOnlyHint": False,
            "destructiveHint": False,
            "idempotentHint": True,
            "openWorldHint": True,
        }
    },
)
def refresh_nornir(
    self,
    job: Job,
) -> Result:
    """
    Refreshes the Nornir instance by reloading the inventory from configured sources.

    This method performs the following steps:

        1. Loads the inventory configuration from the broker.
        2. If Netbox is specified in the inventory, pulls inventory data from Netbox.
        3. If Containerlab is specified in the inventory, pulls inventory data from Containerlab.
        4. Initializes the Nornir instance with the refreshed inventory.
    Args:
        job: NorFab Job object containing relevant metadata

    The inventory configuration is expected to be a dictionary with the following keys:

    - "logging": A dictionary specifying logging configuration (default: {"enabled": False}).
    - "runner": A dictionary specifying runner options (default: {}).
    - "hosts": A dictionary specifying host details (default: {}).
    - "groups": A dictionary specifying group details (default: {}).
    - "defaults": A dictionary specifying default values (default: {}).
    - "user_defined": A dictionary specifying user-defined options (default: {}).

    Returns:
        Result: A Result object indicating the outcome of the refresh operation.
    """
    ret = Result(task=f"{self.name}:refresh_nornir", result=True)

    # get inventory from broker
    self.nornir_worker_inventory = self.load_inventory()

    # pull Nornir inventory from Netbox
    if "netbox" in self.nornir_worker_inventory:
        self.nornir_inventory_load_netbox(job=job)
        job.event("pulled Nornir inventory data from Netbox")

    # pull Nornir inventory from Containerlab
    if "containerlab" in self.nornir_worker_inventory:
        self.nornir_inventory_load_containerlab(
            job=job,
            **self.nornir_worker_inventory["containerlab"],
            re_init_nornir=False,
        )
        job.event("pulled Nornir inventory data from Containerlab")

    job.event("pulled inventories, refreshing Nornir instance")

    self.init_nornir(self.nornir_worker_inventory)

    job.event("nornir instance refreshed")

    return ret