Skip to content

Nornir Worker

WatchDog(worker) ¤

Bases: WorkerWatchDog

Class to monitor Nornir worker performance.

Parameters:

Name Type Description Default
worker Worker

The worker instance that this NornirWorker will manage.

required

Attributes:

Name Type Description
worker Worker

The worker instance being monitored.

connections_idle_timeout int

Timeout value for idle connections.

connections_data dict

Dictionary to store connection use timestamps.

started_at float

Timestamp when the watchdog was started.

idle_connections_cleaned int

Counter for idle connections cleaned.

dead_connections_cleaned int

Counter for dead connections cleaned.

watchdog_tasks list

List of tasks for the watchdog to run in a given order.

Source code in norfab\workers\nornir_worker\nornir_worker.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def __init__(self, worker):
    super().__init__(worker)
    self.worker = worker
    self.connections_idle_timeout = worker.nornir_worker_inventory.get(
        "connections_idle_timeout", None
    )
    self.connections_data = {}  # store connections use timestamps

    # stats attributes
    self.idle_connections_cleaned = 0
    self.dead_connections_cleaned = 0

    # list of tasks for watchdog to run in given order
    self.watchdog_tasks = [
        self.connections_clean,
        self.connections_keepalive,
    ]

stats() -> Dict ¤

Collects and returns statistics about the worker.

Returns:

Name Type Description
dict Dict

A dictionary containing the following keys:

  • runs (int): The number of runs executed by the worker.
  • timestamp (str): The current time in a human-readable format.
  • alive (int): The time in seconds since the worker started.
  • dead_connections_cleaned (int): The number of dead connections cleaned.
  • idle_connections_cleaned (int): The number of idle connections cleaned.
  • worker_ram_usage_mbyte (float): The current RAM usage of the worker in megabytes.
Source code in norfab\workers\nornir_worker\nornir_worker.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def stats(self) -> Dict:
    """
    Collects and returns statistics about the worker.

    Returns:
        dict: A dictionary containing the following keys:

            - runs (int): The number of runs executed by the worker.
            - timestamp (str): The current time in a human-readable format.
            - alive (int): The time in seconds since the worker started.
            - dead_connections_cleaned (int): The number of dead connections cleaned.
            - idle_connections_cleaned (int): The number of idle connections cleaned.
            - worker_ram_usage_mbyte (float): The current RAM usage of the worker in megabytes.
    """
    return {
        "watchdog_runs": self.runs,
        "timestamp": time.ctime(),
        "uptime": format_duration(int(time.time() - self.started_at)),
        "uptime_seconds": int(time.time() - self.started_at),
        "dead_connections_cleaned": self.dead_connections_cleaned,
        "idle_connections_cleaned": self.idle_connections_cleaned,
        "worker_ram_usage_mbyte": self.get_ram_usage(),
    }

configuration() -> Dict ¤

Returns the configuration settings for the worker.

Returns:

Name Type Description
Dict Dict

A dictionary containing the configuration settings:

  • "watchdog_interval" (int): The interval for the watchdog timer.
  • "connections_idle_timeout" (int): The timeout for idle connections.
Source code in norfab\workers\nornir_worker\nornir_worker.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def configuration(self) -> Dict:
    """
    Returns the configuration settings for the worker.

    Returns:
        Dict: A dictionary containing the configuration settings:

            - "watchdog_interval" (int): The interval for the watchdog timer.
            - "connections_idle_timeout" (int): The timeout for idle connections.
    """
    return {
        "watchdog_interval": self.watchdog_interval,
        "connections_idle_timeout": self.connections_idle_timeout,
    }

connections_get() -> Dict ¤

Retrieve the current connections data.

Returns:

Name Type Description
Dict Dict

A dictionary containing the current connections data.

Source code in norfab\workers\nornir_worker\nornir_worker.py
128
129
130
131
132
133
134
135
136
137
def connections_get(self) -> Dict:
    """
    Retrieve the current connections data.

    Returns:
        Dict: A dictionary containing the current connections data.
    """
    return {
        "connections": self.connections_data,
    }

connections_update(nr: Any, plugin: str) -> None ¤

Function to update connection use timestamps for each host

Parameters:

Name Type Description Default
nr Any

Nornir object

required
plugin str

connection plugin name

required
Source code in norfab\workers\nornir_worker\nornir_worker.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def connections_update(self, nr: Any, plugin: str) -> None:
    """
    Function to update connection use timestamps for each host

    Args:
        nr: Nornir object
        plugin: connection plugin name
    """
    conn_stats = {
        "last_use": None,
        "last_keepalive": None,
        "keepalive_count": 0,
    }
    for host_name in nr.inventory.hosts:
        self.connections_data.setdefault(host_name, {})
        self.connections_data[host_name].setdefault(plugin, conn_stats.copy())
        self.connections_data[host_name][plugin]["last_use"] = time.ctime()
    log.info(
        f"{self.worker.name} - updated connections use timestamps for '{plugin}'"
    )

connections_clean() ¤

Cleans up idle connections based on the configured idle timeout.

This method checks for connections that have been idle for longer than the specified connections_idle_timeout and disconnects them. The behavior varies depending on the value of connections_idle_timeout:

  • If connections_idle_timeout is None, no connections are disconnected.
  • If connections_idle_timeout is 0, all connections are disconnected.
  • If connections_idle_timeout is greater than 0, only connections that have been idle for longer than the specified timeout are disconnected.

The method acquires a lock to ensure thread safety while modifying the connections data. It logs the disconnection actions and updates the idle_connections_cleaned counter.

Raises:

Type Description
Exception

If an error occurs while attempting to disconnect idle connections, an error message is logged.

Source code in norfab\workers\nornir_worker\nornir_worker.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
def connections_clean(self):
    """
    Cleans up idle connections based on the configured idle timeout.

    This method checks for connections that have been idle for longer than the
    specified `connections_idle_timeout` and disconnects them. The behavior
    varies depending on the value of `connections_idle_timeout`:

    - If `connections_idle_timeout` is None, no connections are disconnected.
    - If `connections_idle_timeout` is 0, all connections are disconnected.
    - If `connections_idle_timeout` is greater than 0, only connections that
      have been idle for longer than the specified timeout are disconnected.

    The method acquires a lock to ensure thread safety while modifying the
    connections data. It logs the disconnection actions and updates the
    `idle_connections_cleaned` counter.

    Raises:
        Exception: If an error occurs while attempting to disconnect idle connections, an error message is logged.
    """
    # dictionary keyed by plugin name and value as a list of hosts
    disconnect = {}
    if not self.worker.connections_lock.acquire(blocking=False):
        return
    try:
        # if idle timeout not set, connections don't age out
        if self.connections_idle_timeout is None:
            disconnect = {}
        # disconnect all connections for all hosts
        elif self.connections_idle_timeout == 0:
            disconnect = {"all": list(self.connections_data.keys())}
        # only disconnect aged/idle connections
        elif self.connections_idle_timeout > 0:
            for host_name, plugins in self.connections_data.items():
                for plugin, conn_data in plugins.items():
                    last_use = time.mktime(time.strptime(conn_data["last_use"]))
                    age = time.time() - last_use
                    if age > self.connections_idle_timeout:
                        disconnect.setdefault(plugin, [])
                        disconnect[plugin].append(host_name)
        # run task to disconnect connections for aged hosts
        for plugin, hosts in disconnect.items():
            if not hosts:
                continue
            aged_hosts = FFun(self.worker.nr, FL=hosts)
            aged_hosts.run(task=nr_connections, call="close", conn_name=plugin)
            log.debug(
                f"{self.worker.name} watchdog, disconnected '{plugin}' "
                f"connections for '{', '.join(hosts)}'"
            )
            self.idle_connections_cleaned += len(hosts)
            # wipe out connections data if all connection closed
            if plugin == "all":
                self.connections_data = {}
                break
            # remove disconnected plugin from host's connections_data
            for host in hosts:
                self.connections_data[host].pop(plugin)
                if not self.connections_data[host]:
                    self.connections_data.pop(host)
    except Exception as e:
        msg = f"{self.worker.name} - watchdog failed to close idle connections, error: {e}"
        log.error(msg)
    finally:
        self.worker.connections_lock.release()

connections_keepalive() ¤

Keepalive connections and clean up dead connections if any.

This method performs the following tasks:

  • If connections_idle_timeout is 0, it returns immediately without performing any actions.
  • Attempts to acquire a lock on worker.connections_lock to ensure thread safety.
  • Logs a debug message indicating that the keepalive process is running.
  • Uses HostsKeepalive to check and clean up dead connections, updating the dead_connections_cleaned counter.
  • Removes connections that are no longer present in the Nornir inventory.
  • Removes hosts from connections_data if they have no remaining connections.
  • Updates the keepalive statistics for each connection plugin, including the last keepalive time and keepalive count.
  • Logs an error message if an exception occurs during the keepalive process.
  • Releases the lock on worker.connections_lock in the finally block to ensure it is always released.

Raises:

Type Description
Exception

If an error occurs during the keepalive process, it is logged as an error.

Source code in norfab\workers\nornir_worker\nornir_worker.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
def connections_keepalive(self):
    """
    Keepalive connections and clean up dead connections if any.

    This method performs the following tasks:

    - If `connections_idle_timeout` is 0, it returns immediately without performing any actions.
    - Attempts to acquire a lock on `worker.connections_lock` to ensure thread safety.
    - Logs a debug message indicating that the keepalive process is running.
    - Uses `HostsKeepalive` to check and clean up dead connections, updating the `dead_connections_cleaned` counter.
    - Removes connections that are no longer present in the Nornir inventory.
    - Removes hosts from `connections_data` if they have no remaining connections.
    - Updates the keepalive statistics for each connection plugin, including the last keepalive time and keepalive count.
    - Logs an error message if an exception occurs during the keepalive process.
    - Releases the lock on `worker.connections_lock` in the `finally` block to ensure it is always released.

    Raises:
        Exception: If an error occurs during the keepalive process, it is logged as an error.
    """
    if self.connections_idle_timeout == 0:  # do not keepalive if idle is 0
        return
    if not self.worker.connections_lock.acquire(blocking=False):
        return
    try:
        log.debug(f"{self.worker.name} - watchdog running connections keepalive")
        stats = HostsKeepalive(self.worker.nr)
        self.dead_connections_cleaned += stats["dead_connections_cleaned"]
        # remove connections that are no longer present in Nornir inventory
        for host_name, host_connections in self.connections_data.items():
            # check if host is still in Nornir inventory
            if host_name not in self.worker.nr.inventory.hosts:
                self.connections_data.pop(host_name, None)
                continue
            # clean up specific connections for host
            for connection_name in list(host_connections.keys()):
                if not self.worker.nr.inventory.hosts[host_name].connections.get(
                    connection_name
                ):
                    self.connections_data[host_name].pop(connection_name)
        # remove host if no connections left
        for host_name in list(self.connections_data.keys()):
            if self.connections_data[host_name] == {}:
                self.connections_data.pop(host_name)
        # update connections statistics
        for plugins in self.connections_data.values():
            for plugin in plugins.values():
                plugin["last_keepalive"] = time.ctime()
                plugin["keepalive_count"] += 1
    except Exception as e:
        msg = f"{self.worker.name} - watchdog HostsKeepalive check error: {e}"
        log.error(msg)
    finally:
        self.worker.connections_lock.release()

NornirWorker(inventory: str, broker: str, worker_name: str, exit_event=None, init_done_event=None, log_level: str = None, log_queue: object = None) ¤

Bases: NFPWorker, TaskTask, CliTask, CfgTask, TestTask, NetworkTask, RuntimeInventoryTask, ParseTask, FileCopyTask, NetconfTask

NornirWorker class for managing Nornir Service tasks.

Parameters:

Name Type Description Default
inventory str

Path to the inventory file.

required
broker str

Broker address.

required
worker_name str

Name of the worker.

required
exit_event Event

Event to signal worker exit. Defaults to None.

None
init_done_event Event

Event to signal initialization completion. Defaults to None.

None
log_level str

Logging level. Defaults to None.

None
log_queue object

Queue for logging. Defaults to None.

None

Attributes:

Name Type Description
init_done_event Event

Event to signal initialization completion.

tf_base_path str

Base path for files folder saved using tf processor.

connections_lock Lock

Lock for managing connections.

nornir_inventory dict

Inventory data for Nornir.

watchdog WatchDog

Watchdog instance for monitoring.

Source code in norfab\workers\nornir_worker\nornir_worker.py
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
def __init__(
    self,
    inventory: str,
    broker: str,
    worker_name: str,
    exit_event=None,
    init_done_event=None,
    log_level: str = None,
    log_queue: object = None,
):
    super().__init__(
        inventory, broker, SERVICE, worker_name, exit_event, log_level, log_queue
    )
    self.init_done_event = init_done_event
    self.tf_base_path = os.path.join(self.base_dir, "tf")

    # misc attributes
    self.connections_lock = Lock()

    # initiate Nornir
    self.refresh_nornir(job=Job())

    # initiate watchdog
    self.watchdog = WatchDog(self)
    self.watchdog.start()

    # run startup hooks
    for f in self.inventory.hooks.get("nornir-startup", []):
        f["function"](self, *f.get("args", []), **f.get("kwargs", {}))

    if self.init_done_event is not None:
        self.init_done_event.set()

    log.info(f"{self.name} - Started")

worker_exit() ¤

Executes all functions registered under the "nornir-exit" hook in the inventory.

This method iterates through the list of hooks associated with the "nornir-exit" key in the inventory's hooks.

For each hook, it calls the function specified in the hook, passing the current instance (self) as the first argument, followed by any additional positional and keyword arguments specified in the hook.

Source code in norfab\workers\nornir_worker\nornir_worker.py
352
353
354
355
356
357
358
359
360
361
362
363
364
365
def worker_exit(self):
    """
    Executes all functions registered under the "nornir-exit" hook in the inventory.

    This method iterates through the list of hooks associated with the "nornir-exit"
    key in the inventory's hooks.

    For each hook, it calls the function specified in the hook, passing the current
    instance (`self`) as the first argument, followed by any additional positional
    and keyword arguments specified in the hook.
    """
    # run exit hooks
    for f in self.inventory.hooks.get("nornir-exit", []):
        f["function"](self, *f.get("args", []), **f.get("kwargs", {}))

init_nornir(inventory: dict) -> None ¤

Initializes the Nornir automation framework with the provided inventory.

This method first closes any existing Nornir connections if present, optionally emitting a progress event. It then creates a new Nornir instance using the supplied inventory dictionary, which should contain configuration for logging, runner, hosts, groups, defaults, and user-defined settings.

Parameters:

Name Type Description Default
inventory dict

A dictionary containing Nornir inventory and configuration options.

required
Source code in norfab\workers\nornir_worker\nornir_worker.py
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
def init_nornir(self, inventory: dict) -> None:
    """
    Initializes the Nornir automation framework with the provided inventory.

    This method first closes any existing Nornir connections if present, optionally emitting a progress event.
    It then creates a new Nornir instance using the supplied inventory dictionary, which should contain
    configuration for logging, runner, hosts, groups, defaults, and user-defined settings.

    Args:
        inventory (dict): A dictionary containing Nornir inventory and configuration options.
    """
    # clean up existing Nornir instance
    with self.connections_lock:
        if self.nr is not None and self.nr.inventory.hosts:
            self.nr.close_connections()

        # initiate Nornir
        self.nr = InitNornir(
            logging=inventory.get("logging", {"enabled": False}),
            runner=inventory.get("runner", {}),
            inventory={
                "plugin": "DictInventory",
                "options": {
                    "hosts": inventory.get("hosts", {}),
                    "groups": inventory.get("groups", {}),
                    "defaults": inventory.get("defaults", {}),
                },
            },
            user_defined=inventory.get("user_defined", {}),
        )

filter_hosts_and_validate(kwargs: Dict[str, Any], ret: Result) -> Tuple[Any, Result] ¤

Helper method to filter hosts and validate results.

Returns:

Name Type Description
tuple Tuple[Any, Result]

(filtered_nornir, Result) where Result status set to no_match if no hosts matched.

Source code in norfab\workers\nornir_worker\nornir_worker.py
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
def filter_hosts_and_validate(
    self, kwargs: Dict[str, Any], ret: Result
) -> Tuple[Any, Result]:
    """
    Helper method to filter hosts and validate results.

    Returns:
        tuple: (filtered_nornir, Result) where Result status set to
            `no_match` if no hosts matched.
    """
    self.nr.data.reset_failed_hosts()  # reset failed hosts before filtering
    filters = {k: kwargs.pop(k) for k in list(kwargs.keys()) if k in FFun_functions}
    filtered_nornir = FFun(self.nr, **filters)

    if not filtered_nornir.inventory.hosts:
        msg = (
            f"{self.name} - nothing to do, no hosts matched by filters '{filters}'"
        )
        log.debug(msg)
        ret.messages.append(msg)
        ret.status = "no_match"

    return filtered_nornir, ret

refresh_nornir(job: Job, progress: bool = False) -> Result ¤

Refreshes the Nornir instance by reloading the inventory from configured sources.

This method performs the following steps:

1. Loads the inventory configuration from the broker.
2. If Netbox is specified in the inventory, pulls inventory data from Netbox.
3. If Containerlab is specified in the inventory, pulls inventory data from Containerlab.
4. Initializes the Nornir instance with the refreshed inventory.
5. Optionally emits progress events at each stage if `progress` is True.

Parameters:

Name Type Description Default
job Job

NorFab Job object containing relevant metadata

required
progress bool

If True, emits progress events during the refresh process. Defaults to False.

False

The inventory configuration is expected to be a dictionary with the following keys:

  • "logging": A dictionary specifying logging configuration (default: {"enabled": False}).
  • "runner": A dictionary specifying runner options (default: {}).
  • "hosts": A dictionary specifying host details (default: {}).
  • "groups": A dictionary specifying group details (default: {}).
  • "defaults": A dictionary specifying default values (default: {}).
  • "user_defined": A dictionary specifying user-defined options (default: {}).

Returns:

Name Type Description
Result Result

A Result object indicating the outcome of the refresh operation.

Source code in norfab\workers\nornir_worker\nornir_worker.py
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
@Task(fastapi={"methods": ["POST"]})
def refresh_nornir(
    self,
    job: Job,
    progress: bool = False,
) -> Result:
    """
    Refreshes the Nornir instance by reloading the inventory from configured sources.

    This method performs the following steps:

        1. Loads the inventory configuration from the broker.
        2. If Netbox is specified in the inventory, pulls inventory data from Netbox.
        3. If Containerlab is specified in the inventory, pulls inventory data from Containerlab.
        4. Initializes the Nornir instance with the refreshed inventory.
        5. Optionally emits progress events at each stage if `progress` is True.

    Args:
        job: NorFab Job object containing relevant metadata
        progress (bool, optional): If True, emits progress events during the refresh process. Defaults to False.

    The inventory configuration is expected to be a dictionary with the following keys:

    - "logging": A dictionary specifying logging configuration (default: {"enabled": False}).
    - "runner": A dictionary specifying runner options (default: {}).
    - "hosts": A dictionary specifying host details (default: {}).
    - "groups": A dictionary specifying group details (default: {}).
    - "defaults": A dictionary specifying default values (default: {}).
    - "user_defined": A dictionary specifying user-defined options (default: {}).

    Returns:
        Result: A Result object indicating the outcome of the refresh operation.
    """
    ret = Result(task=f"{self.name}:refresh_nornir", result=True)

    # get inventory from broker
    self.nornir_worker_inventory = self.load_inventory()

    # pull Nornir inventory from Netbox
    if "netbox" in self.nornir_worker_inventory:
        self.nornir_inventory_load_netbox(job=job)
        job.event("pulled Nornir inventory data from Netbox")

    # pull Nornir inventory from Containerlab
    if "containerlab" in self.nornir_worker_inventory:
        self.nornir_inventory_load_containerlab(
            job=job,
            **self.nornir_worker_inventory["containerlab"],
            re_init_nornir=False,
        )
        job.event("pulled Nornir inventory data from Containerlab")

    job.event("pulled inventories, refreshing Nornir instance")

    self.init_nornir(self.nornir_worker_inventory)

    job.event("nornir instance refreshed")

    return ret

nornir_inventory_load_netbox(job: Job, progress: bool = False) -> Result ¤

Queries inventory data from Netbox Service and merges it into the Nornir inventory.

This function checks if there is Netbox data in the inventory and retrieves it if available. It handles retries and timeout configurations, and ensures that necessary filters or devices are specified. The retrieved inventory data is then merged into the existing Nornir inventory.

Parameters:

Name Type Description Default
job Job

NorFab Job object containing relevant metadata

required
Logs
  • Critical: If the inventory has no hosts, filters, or devices defined.
  • Error: If no inventory data is returned from Netbox.
  • Warning: If the Netbox instance returns no hosts data.
Source code in norfab\workers\nornir_worker\nornir_worker.py
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
@Task(fastapi={"methods": ["POST"]})
def nornir_inventory_load_netbox(
    self,
    job: Job,
    progress: bool = False,
) -> Result:
    """
    Queries inventory data from Netbox Service and merges it into the Nornir inventory.

    This function checks if there is Netbox data in the inventory and retrieves
    it if available. It handles retries and timeout configurations, and ensures
    that necessary filters or devices are specified. The retrieved inventory
    data is then merged into the existing Nornir inventory.

    Args:
        job: NorFab Job object containing relevant metadata

    Logs:
        - Critical: If the inventory has no hosts, filters, or devices defined.
        - Error: If no inventory data is returned from Netbox.
        - Warning: If the Netbox instance returns no hosts data.
    """
    ret = Result(task=f"{self.name}:nornir_inventory_load_netbox", result=True)

    # form Netbox inventory load arguments
    if isinstance(self.nornir_worker_inventory.get("netbox"), dict):
        kwargs = self.nornir_worker_inventory["netbox"]
    elif self.nornir_worker_inventory.get("netbox") is True:
        kwargs = {}
    timeout = max(10, kwargs.pop("timeout", 100))

    # check if need to add devices list
    if "filters" not in kwargs and "devices" not in kwargs:
        if self.nornir_worker_inventory.get("hosts"):
            kwargs["devices"] = list(self.nornir_worker_inventory["hosts"])
        else:
            msg = f"{self.name} - inventory has no hosts, Netbox filters or devices defined"
            log.warning(msg)
            ret.result = False
            ret.messages = [msg]
            return ret

    nb_inventory_data = self.client.run_job(
        service="netbox",
        task="get_nornir_inventory",
        workers="any",
        kwargs=kwargs,
        timeout=timeout,
    )

    if nb_inventory_data is None:
        msg = f"{self.name} - Netbox get_nornir_inventory no inventory returned"
        log.error(msg)
        raise RuntimeError(msg)

    # merge Netbox inventory into Nornir inventory
    for wname, wdata in nb_inventory_data.items():
        if wdata["failed"] is False and wdata["result"].get("hosts"):
            merge_recursively(self.nornir_worker_inventory, wdata["result"])
            break
    else:
        msg = (
            f"{self.name} - Netbox worker(s) "
            f"'{', '.join(list(nb_inventory_data.keys()))}' returned no hosts data."
        )
        log.error(msg)
        job.event(msg, severity="ERROR")

    job.event("completed processing Nornir inventory from Netbox")

    return ret

nornir_inventory_load_containerlab(job: Job, lab_name: str = None, groups: Union[None, list] = None, clab_workers: str = 'all', use_default_credentials: bool = True, progress: bool = False, dry_run: bool = False, re_init_nornir: bool = True) -> Result ¤

Pulls the Nornir inventory from a Containerlab lab instance and merges it with the existing Nornir inventory.

Parameters:

Name Type Description Default
job Job

NorFab Job object containing relevant metadata

required
lab_name str

The name of the Containerlab lab to retrieve the inventory from.

None
groups list

A list of group names to include into the hosts' inventory.

None
use_default_credentials bool

Whether to use default credentials for the hosts.

True

Returns:

Name Type Description
Result Result

A Result object indicating the success or failure of the operation. If successful, the Nornir inventory is updated with the retrieved data.

Notes
  • The method retrieves inventory data from a Containerlab lab using a client job.
  • If the retrieved inventory contains host data, it is merged into the existing Nornir inventory using the merge_recursively function.
  • If no inventory or host data is returned, the method logs an error and marks the operation as failed.
  • After successful merging of inventory, Nornir instance is re-initialized with the updated inventory.
Source code in norfab\workers\nornir_worker\nornir_worker.py
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
@Task(fastapi={"methods": ["POST"]})
def nornir_inventory_load_containerlab(
    self,
    job: Job,
    lab_name: str = None,
    groups: Union[None, list] = None,
    clab_workers: str = "all",
    use_default_credentials: bool = True,
    progress: bool = False,
    dry_run: bool = False,
    re_init_nornir: bool = True,
) -> Result:
    """
    Pulls the Nornir inventory from a Containerlab lab instance and merges it with the
    existing Nornir inventory.

    Args:
        job: NorFab Job object containing relevant metadata
        lab_name (str): The name of the Containerlab lab to retrieve the inventory from.
        groups (list, optional): A list of group names to include into the hosts' inventory.
        use_default_credentials (bool): Whether to use default credentials for the hosts.

    Returns:
        Result: A Result object indicating the success or failure of the operation.
                If successful, the Nornir inventory is updated with the retrieved data.

    Notes:
        - The method retrieves inventory data from a Containerlab lab using a client job.
        - If the retrieved inventory contains host data, it is merged into the existing
          Nornir inventory using the `merge_recursively` function.
        - If no inventory or host data is returned, the method logs an error and marks
          the operation as failed.
        - After successful merging of inventory, Nornir instance is re-initialized with the
          updated inventory.
    """
    groups = groups or []
    ret = Result(
        task=f"{self.name}:nornir_inventory_load_containerlab", result=True
    )
    job.event(
        f"pulling Containerlab '{lab_name or 'all'}' inventory from '{clab_workers}' workers"
    )

    clab_inventory_data = self.client.run_job(
        service="containerlab",
        task="get_nornir_inventory",
        workers=clab_workers,
        kwargs={
            "lab_name": lab_name,
            "groups": groups,
            "use_default_credentials": use_default_credentials,
        },
    )

    if clab_inventory_data is None:
        msg = f"{self.name} - Containerlab get_nornir_inventory no data returned"
        log.error(msg)
        raise RuntimeError(msg)

    job.event(f"pulled Containerlab '{lab_name or 'all'}' lab inventory")

    if dry_run is True:
        ret.result = {w: r["result"] for w, r in clab_inventory_data.items()}
        return ret

    for wname, wdata in clab_inventory_data.items():
        # use inventory from first worker that returned hosts data
        if wdata["failed"] is False and wdata["result"].get("hosts"):
            merge_recursively(self.nornir_worker_inventory, wdata["result"])
            break
    else:
        msg = (
            f"{self.name} - Containerlab worker(s) '{', '.join(list(clab_inventory_data.keys()))}' "
            f"returned no hosts data for '{lab_name}' lab."
        )
        log.error(msg)
        raise RuntimeError(msg)

    job.event(
        f"merged Containerlab '{lab_name or 'all'}' lab inventory with Nornir runtime inventory"
    )

    if re_init_nornir is True:
        self.init_nornir(self.nornir_worker_inventory)
        job.event("nornir instance re-initialized")

    return ret

load_job_data(job_data: Any) -> Dict ¤

Helper function to download job data YAML files and load it.

Parameters:

Name Type Description Default
job_data str

job data NorFab file path to download and load using YAML.

required

Returns:dict data: The job data loaded from the YAML string.

Raises:

Type Description
FileNotFoundError

If the job data is a URL and the file download fails.

Source code in norfab\workers\nornir_worker\nornir_worker.py
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
def load_job_data(self, job_data: Any) -> Dict:
    """
    Helper function to download job data YAML files and load it.

    Args:
        job_data (str): job data NorFab file path to download and load using YAML.

    Returns:dict
        data: The job data loaded from the YAML string.

    Raises:
        FileNotFoundError: If the job data is a URL and the file download fails.
    """
    if self.is_url(job_data):
        job_data = self.fetch_file(job_data)
        if job_data is None:
            msg = f"{self.name} - '{job_data}' job data file download failed"
            raise FileNotFoundError(msg)
        job_data = yaml.safe_load(job_data)

    return job_data

jinja2_network_hosts(network: str, pfxlen: bool = False) -> list ¤

Custom Jinja2 filter that return a list of hosts for a given IP network.

Parameters:

Name Type Description Default
network str

The network address in CIDR notation.

required
pfxlen bool

If True, include the prefix length in the returned host addresses. Defaults to False.

False

Returns:

Name Type Description
list list

A list of host addresses as strings. If pfxlen is True, each address will include the prefix length.

Source code in norfab\workers\nornir_worker\nornir_worker.py
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
def jinja2_network_hosts(self, network: str, pfxlen: bool = False) -> list:
    """
    Custom Jinja2 filter that return a list of hosts for a given IP network.

    Args:
        network (str): The network address in CIDR notation.
        pfxlen (bool, optional): If True, include the prefix length
            in the returned host addresses. Defaults to False.

    Returns:
        list: A list of host addresses as strings. If pfxlen is True,
            each address will include the prefix length.
    """
    ret = []
    ip_interface = ipaddress.ip_interface(network)
    prefixlen = ip_interface.network.prefixlen
    for ip in ip_interface.network.hosts():
        ret.append(f"{ip}/{prefixlen}" if pfxlen else str(ip))
    return ret

jinja2_nb_create_ip(prefix: str, device: str = None, interface: str = None, **kwargs) -> str ¤

Jinja2 filter to get or create next available IP address from prefix using Netbox service.

Source code in norfab\workers\nornir_worker\nornir_worker.py
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
def jinja2_nb_create_ip(
    self, prefix: str, device: str = None, interface: str = None, **kwargs
) -> str:
    """
    Jinja2 filter to get or create next available IP address from
    prefix using Netbox service.
    """
    kwargs["prefix"] = prefix
    kwargs["device"] = device
    kwargs["interface"] = interface
    reply = self.client.run_job(
        "netbox",
        "create_ip",
        kwargs=kwargs,
        workers="any",
        timeout=30,
    )
    # reply is a dict of {worker_name: results_dict}
    res = list(reply.values())[0]
    if res["failed"]:
        raise RuntimeError(res["messages"])
    return res["result"]["address"]

jinja2_nb_create_prefix(parent: str, description: str, prefixlen: int = 30, **kwargs) -> str ¤

Jinja2 filter to get or create next available prefix from parent prefix using Netbox service.

Source code in norfab\workers\nornir_worker\nornir_worker.py
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
def jinja2_nb_create_prefix(
    self, parent: str, description: str, prefixlen: int = 30, **kwargs
) -> str:
    """
    Jinja2 filter to get or create next available prefix from
    parent prefix using Netbox service.
    """
    kwargs["parent"] = parent
    kwargs["description"] = description
    kwargs["prefixlen"] = prefixlen
    reply = self.client.run_job(
        "netbox",
        "create_prefix",
        kwargs=kwargs,
        workers="any",
        timeout=30,
    )
    # reply is a dict of {worker_name: results_dict}
    res = list(reply.values())[0]
    if res["failed"]:
        raise RuntimeError(res["messages"])

    return res["result"]["prefix"]

jinja2_call_netbox(netbox_task: str) -> callable ¤

Returns a callable function to execute arbitrary NetBox service task.

Source code in norfab\workers\nornir_worker\nornir_worker.py
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
def jinja2_call_netbox(self, netbox_task: str) -> callable:
    """
    Returns a callable function to execute arbitrary NetBox service task.
    """

    def call_netbox(*args, **kwargs) -> dict:
        reply = self.client.run_job(
            "netbox",
            netbox_task,
            args=args,
            kwargs=kwargs,
            workers="any",
            timeout=300,
        )
        res = list(reply.values())[0]

        # check if has an error
        if res["failed"]:
            raise RuntimeError(res["messages"])

        # return result for single host only
        if len(kwargs.get("devices", [])) == 1:
            return res["result"][kwargs["devices"][0]]
        # return full results
        else:
            return res["result"]

    return call_netbox

add_jinja2_netbox() -> Dict ¤

Aggregates Jinja2 NetBox-related methods and functions into a dictionary for the ease of use within Jinja2 templates.

Source code in norfab\workers\nornir_worker\nornir_worker.py
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
def add_jinja2_netbox(self) -> Dict:
    """
    Aggregates Jinja2 NetBox-related methods and functions into a dictionary
    for the ease of use within Jinja2 templates.
    """
    return {
        "get_connections": self.jinja2_call_netbox("get_connections"),
        "get_interfaces": self.jinja2_call_netbox("get_interfaces"),
        "get_circuits": self.jinja2_call_netbox("get_circuits"),
        "get_devices": self.jinja2_call_netbox("get_devices"),
        "rest": self.jinja2_call_netbox("rest"),
        "graphql": self.jinja2_call_netbox("graphql"),
        "create_ip": self.jinja2_nb_create_ip,
        "create_prefix": self.jinja2_nb_create_prefix,
    }

add_jinja2_filters() -> Dict ¤

Adds custom filters for use in Jinja2 templates using | syntaxis.

Returns:

Name Type Description
dict Dict

A dictionary where the keys are the names of the filters and the values are the corresponding filter functions.

  • "nb_create_ip": Method to get the next IP address.
  • "nb_create_prefix": Method to get next available prefix.
  • "network_hosts": Method to get IP network hosts.
Source code in norfab\workers\nornir_worker\nornir_worker.py
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
def add_jinja2_filters(self) -> Dict:
    """
    Adds custom filters for use in Jinja2 templates using `|` syntaxis.

    Returns:
        dict (Dict): A dictionary where the keys are the names of the filters
            and the values are the corresponding filter functions.

            - "nb_create_ip": Method to get the next IP address.
            - "nb_create_prefix": Method to get next available prefix.
            - "network_hosts": Method to get IP network hosts.
    """
    return {
        "netbox.create_ip": self.jinja2_nb_create_ip,
        "netbox.create_prefix": self.jinja2_nb_create_prefix,
        "network_hosts": self.jinja2_network_hosts,
    }

get_nornir_hosts(details: bool = False, **kwargs: dict) -> Result ¤

Retrieve a list of Nornir hosts managed by this worker.

Parameters:

Name Type Description Default
details bool

If True, returns detailed information about each host.

False
**kwargs dict

Hosts filters to apply when retrieving hosts.

{}

Returns:

Type Description
Result

List[Dict]: A list of hosts with optional detailed information.

Source code in norfab\workers\nornir_worker\nornir_worker.py
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
@Task(
    fastapi={"methods": ["GET"]},
    input=GetNornirHosts,
    output=GetNornirHostsResponse,
)
def get_nornir_hosts(self, details: bool = False, **kwargs: dict) -> Result:
    """
    Retrieve a list of Nornir hosts managed by this worker.

    Args:
        details (bool): If True, returns detailed information about each host.
        **kwargs (dict): Hosts filters to apply when retrieving hosts.

    Returns:
        List[Dict]: A list of hosts with optional detailed information.
    """
    ret = Result(task=f"{self.name}:get_nornir_hosts", result={} if details else [])
    filtered_nornir, ret = self.filter_hosts_and_validate(kwargs, ret)
    if ret.status == "no_match":
        ret.result = None
    elif details:
        ret.result = {
            host_name: {
                "platform": str(host.platform),
                "hostname": str(host.hostname),
                "port": str(host.port),
                "groups": [str(g) for g in host.groups],
                "username": str(host.username),
            }
            for host_name, host in filtered_nornir.inventory.hosts.items()
        }
    else:
        ret.result = list(filtered_nornir.inventory.hosts)
    return ret

get_inventory(**kwargs: dict) -> Result ¤

Retrieve running Nornir inventory for requested hosts

Parameters:

Name Type Description Default
**kwargs dict

Fx filters used to filter the inventory.

{}

Returns:

Name Type Description
Dict Result

A dictionary representation of the filtered inventory.

Source code in norfab\workers\nornir_worker\nornir_worker.py
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
@Task(fastapi={"methods": ["GET"]})
def get_inventory(self, **kwargs: dict) -> Result:
    """
    Retrieve running Nornir inventory for requested hosts

    Args:
        **kwargs (dict): Fx filters used to filter the inventory.

    Returns:
        Dict: A dictionary representation of the filtered inventory.
    """
    ret = Result(task=f"{self.name}:get_inventory", result={})
    filtered_nornir, ret = self.filter_hosts_and_validate(kwargs, ret)
    if ret.status != "no_match":
        ret.result = filtered_nornir.inventory.dict()
    return ret

get_version() -> Result ¤

Retrieve the versions of various libraries and system information.

This method collects the version information of a predefined set of libraries and system details such as the Python version and platform. It attempts to import each library and fetch its version. If a library is not found, it is skipped.

Returns:

Name Type Description
dict Result

a dictionary with the library names as keys and their respective version numbers as values. If a library is not found, its value will be an empty string.

Source code in norfab\workers\nornir_worker\nornir_worker.py
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
@Task(fastapi={"methods": ["GET"]})
def get_version(self) -> Result:
    """
    Retrieve the versions of various libraries and system information.

    This method collects the version information of a predefined set of libraries
    and system details such as the Python version and platform. It attempts to
    import each library and fetch its version. If a library is not found, it is
    skipped.

    Returns:
        dict: a dictionary with the library names as keys and their respective
            version numbers as values. If a library is not found, its value will be
            an empty string.
    """
    libs = {
        "norfab": "",
        "scrapli": "",
        "scrapli-netconf": "",
        "scrapli-community": "",
        "paramiko": "",
        "netmiko": "",
        "napalm": "",
        "nornir": "",
        "ncclient": "",
        "nornir-netmiko": "",
        "nornir-napalm": "",
        "nornir-scrapli": "",
        "nornir-utils": "",
        "tabulate": "",
        "xmltodict": "",
        "puresnmp": "",
        "pygnmi": "",
        "pyyaml": "",
        "jmespath": "",
        "jinja2": "",
        "ttp": "",
        "nornir-salt": "",
        "lxml": "",
        "ttp-templates": "",
        "ntc-templates": "",
        "cerberus": "",
        "pydantic": "",
        "requests": "",
        "textfsm": "",
        "N2G": "",
        "dnspython": "",
        "pythonping": "",
        "python": sys.version.split(" ")[0],
        "platform": sys.platform,
    }
    # get version of packages installed
    for pkg in libs.keys():
        try:
            libs[pkg] = importlib.metadata.version(pkg)
        except importlib.metadata.PackageNotFoundError:
            pass

    return Result(result=libs)

get_watchdog_connections() -> Result ¤

Retrieve the list of connections currently managed by watchdog.

Returns:

Name Type Description
Result Result

An instance of the Result class containing the current watchdog connections.

Source code in norfab\workers\nornir_worker\nornir_worker.py
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
@Task(fastapi={"methods": ["GET"]})
def get_watchdog_connections(self) -> Result:
    """
    Retrieve the list of connections currently managed by watchdog.

    Returns:
        Result: An instance of the Result class containing the current
            watchdog connections.
    """
    return Result(result=self.watchdog.connections_get())