Skip to content

Containerlab Worker

ContainerlabWorker(inventory: str, broker: str, worker_name: str, exit_event=None, init_done_event=None, log_level: str = None, log_queue: object = None) ยค

Bases: NFPWorker

FastAPContainerlabWorker IWorker is a worker class that integrates with containerlab to run network topologies.

Parameters:

Name Type Description Default
inventory str

Inventory configuration for the worker.

required
broker str

Broker URL to connect to.

required
worker_name str

Name of this worker.

required
exit_event Event

Event to signal worker to stop/exit.

None
init_done_event Event

Event to signal when worker is done initializing.

None
log_level str

Logging level for this worker.

None
log_queue object

Queue for logging.

None
Source code in norfab\workers\containerlab_worker.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def __init__(
    self,
    inventory: str,
    broker: str,
    worker_name: str,
    exit_event=None,
    init_done_event=None,
    log_level: str = None,
    log_queue: object = None,
):
    super().__init__(
        inventory, broker, SERVICE, worker_name, exit_event, log_level, log_queue
    )
    self.init_done_event = init_done_event
    self.exit_event = exit_event

    # create directory to store lab topologies
    self.topologies_dir = os.path.join(self.base_dir, "topologies")
    os.makedirs(self.topologies_dir, exist_ok=True)

    # merge local inventory with inventory from broker
    merge_recursively(self.inventory[self.name], self.load_inventory())

    self.init_done_event.set()

worker_exit() ยค

Terminates the current process by sending a SIGTERM signal to itself.

This method retrieves the current process ID using os.getpid() and then sends a SIGTERM signal to terminate the process using os.kill().

Source code in norfab\workers\containerlab_worker.py
64
65
66
67
68
69
70
71
def worker_exit(self):
    """
    Terminates the current process by sending a SIGTERM signal to itself.

    This method retrieves the current process ID using `os.getpid()` and then
    sends a SIGTERM signal to terminate the process using `os.kill()`.
    """
    os.kill(os.getpid(), signal.SIGTERM)

get_version() ยค

Produce a report of the versions of various Python packages.

This method collects the versions of several specified Python packages and returns them in a dictionary.

Returns:

Name Type Description
Result

An object containing the task name and a dictionary with the package names as keys and their respective versions as values.

Source code in norfab\workers\containerlab_worker.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def get_version(self):
    """
    Produce a report of the versions of various Python packages.

    This method collects the versions of several specified Python packages
    and returns them in a dictionary.

    Returns:
        Result: An object containing the task name and a dictionary with
                the package names as keys and their respective versions as values.
    """
    libs = {
        "norfab": "",
        "pydantic": "",
        "python": sys.version.split(" ")[0],
        "platform": sys.platform,
        "containerlab": "",
    }
    ret = Result(task=f"{self.name}:get_version", result=libs)

    # get version of packages installed
    for pkg in libs.keys():
        try:
            libs[pkg] = importlib.metadata.version(pkg)
        except importlib.metadata.PackageNotFoundError:
            pass

    # get containerlab version
    clab_version = subprocess.run(
        ["containerlab", "version"], capture_output=True, text=True
    )
    if clab_version.returncode == 0:
        libs["containerlab"] = clab_version.stdout
        libs["containerlab"] = "\n".join(libs["containerlab"].splitlines()[6:])
    else:
        ret.failed = True
        ret.errors = [clab_version.stderr.decode("utf-8")]

    return ret

get_inventory() -> Dict ยค

Retrieve the inventory of the Containerlab worker.

Returns:

Name Type Description
Dict Dict

A dictionary containing the combined inventory of Containerlab.

Source code in norfab\workers\containerlab_worker.py
113
114
115
116
117
118
119
120
121
122
123
def get_inventory(self) -> Dict:
    """
    Retrieve the inventory of the Containerlab worker.

    Returns:
        Dict: A dictionary containing the combined inventory of Containerlab.
    """
    return Result(
        result=self.inventory[self.name],
        task=f"{self.name}:get_inventory",
    )

get_containerlab_status() -> Result ยค

Retrieve the status of the Containerlab worker.

Returns:

Name Type Description
Result Result

A result object containing the status of the Containerlab worker.

Source code in norfab\workers\containerlab_worker.py
125
126
127
128
129
130
131
132
133
134
135
136
def get_containerlab_status(self) -> Result:
    """
    Retrieve the status of the Containerlab worker.

    Returns:
        Result: A result object containing the status of the Containerlab worker.
    """
    status = "OS NOT SUPPORTED" if sys.platform.startswith("win") else "READY"
    return Result(
        task=f"{self.name}:get_containerlab_status",
        result={"status": status},
    )

get_running_labs(timeout: int = None) -> Result ยค

Retrieve a list of running containerlab lab names.

This method inspects the current state of containerlab and returns the names of labs that are currently running. The names are sorted and duplicates are removed.

Parameters:

Name Type Description Default
timeout int

The timeout value in seconds for the inspection operation. Defaults to None.

None

Returns:

Name Type Description
Result Result

A Result object containing the task name and a list of running

Result

lab names.

Source code in norfab\workers\containerlab_worker.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def get_running_labs(self, timeout: int = None) -> Result:
    """
    Retrieve a list of running containerlab lab names.

    This method inspects the current state of containerlab and returns
    the names of labs that are currently running. The names are sorted
    and duplicates are removed.

    Args:
        timeout (int, optional): The timeout value in seconds for the inspection
            operation. Defaults to None.

    Returns:
        Result: A Result object containing the task name and a list of running
        lab names.
    """
    ret = Result(task=f"{self.name}:get_running_labs", result=[])
    inspect = self.inspect(timeout=timeout)

    # form topologies list if any of them are runing
    if inspect.result:
        ret.result = [i["lab_name"] for i in inspect.result["containers"]]
        ret.result = list(sorted(set(ret.result)))

    return ret

run_containerlab_command(args: list, cwd: str = None, timeout: int = 600, ret: Result = None, env: dict = None) -> Tuple ยค

Executes a containerlab command using subprocess and processes its output.

Parameters:

Name Type Description Default
args list

The list of command-line arguments to execute.

required
cwd str

The working directory to execute the command in. Defaults to None.

None
timeout int

The timeout for the command execution in seconds. Defaults to None.

600
ret Result

An optional Norfab result object to populate with the command's output. Defaults to None.

None
env (dict, Optional)

OS Environment variables ti use when running the process

None

Returns:

Name Type Description
Tuple Tuple

If ret is None, returns a tuple containing: - output (str): The standard output of the command. - logs (list): A list of log messages from the command's standard error. - proc (subprocess.Popen): The subprocess object for the executed command.

Result Tuple

If ret is provided, returns the populated Result object with the following attributes: - result: The parsed JSON output or raw output of the command. - failed (bool): Indicates if the command execution failed. - errors (list): A list of error messages if the command failed. - messages (list): A list of log messages if the command succeeded.

Raises:

Type Description
Exception

If the output cannot be parsed as JSON when ret is provided.

Notes
  • The method reads the command's standard error line by line and processes messages containing "msg=".
  • If the command fails (non-zero return code), the ret.failed attribute is set to True, and errors are populated.
  • If the command succeeds, the ret.messages attribute is populated with log messages.
Source code in norfab\workers\containerlab_worker.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
def run_containerlab_command(
    self,
    args: list,
    cwd: str = None,
    timeout: int = 600,
    ret: Result = None,
    env: dict = None,
) -> Tuple:
    """
    Executes a containerlab command using subprocess and processes its output.

    Args:
        args (list): The list of command-line arguments to execute.
        cwd (str, optional): The working directory to execute the command in. Defaults to None.
        timeout (int, optional): The timeout for the command execution in seconds. Defaults to None.
        ret (Result, optional): An optional Norfab result object to populate with the command's output. Defaults to None.
        env (dict, Optional): OS Environment variables ti use when running the process

    Returns:
        Tuple: If `ret` is None, returns a tuple containing:
            - output (str): The standard output of the command.
            - logs (list): A list of log messages from the command's standard error.
            - proc (subprocess.Popen): The subprocess object for the executed command.
        Result: If `ret` is provided, returns the populated `Result` object with the following attributes:
            - result: The parsed JSON output or raw output of the command.
            - failed (bool): Indicates if the command execution failed.
            - errors (list): A list of error messages if the command failed.
            - messages (list): A list of log messages if the command succeeded.

    Raises:
        Exception: If the output cannot be parsed as JSON when `ret` is provided.

    Notes:
        - The method reads the command's standard error line by line and processes messages containing "msg=".
        - If the command fails (non-zero return code), the `ret.failed` attribute is set to True, and errors are populated.
        - If the command succeeds, the `ret.messages` attribute is populated with log messages.
    """
    output, logs = "", []
    begin = time.time()
    timeout = timeout or 600
    env = env or dict(os.environ)

    with subprocess.Popen(
        args,
        cwd=cwd,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True,
        env=env,
    ) as proc:
        while proc.poll() is None:
            if time.time() - begin > timeout:
                raise TimeoutError(
                    f"Containerlab output collection {timeout}s timeout expired."
                )
            msg = proc.stderr.readline().strip()
            if msg:
                self.event(msg.split("msg=")[-1].replace('\\"', "").strip('"'))
                logs.append(msg)
            time.sleep(0.01)
        # read remaining messages
        for msg in proc.stderr.readlines():
            msg = msg.strip()
            if msg:
                self.event(msg.split("msg=")[-1].replace('\\"', "").strip('"'))
                logs.append(msg)
            time.sleep(0.01)
        # read process output
        output = proc.stdout.read()

    # populate Norfab result object
    if ret is not None:
        try:
            ret.result = json.loads(output)
        except Exception as e:
            ret.result = output
            log.error(
                f"{self.name} - failed to load containerlab results into JSON, error: {e}, result: '{output}'"
            )
        # check if command failed
        if proc.returncode != 0:
            ret.failed = True
            ret.errors = ["\n".join(logs)]
        # check if got errors
        elif not output and any(
            error in log
            for log in logs
            for error in [
                "no containers found",
            ]
        ):
            ret.failed = True
            ret.errors = ["\n".join(logs)]
        else:
            ret.messages = ["\n".join(logs)]
        return ret
    # return command results as is
    else:
        return output, logs, proc

deploy(topology: str, reconfigure: bool = False, timeout: int = None, node_filter: str = None) -> Result ยค

Deploys a containerlab topology.

This method handles the deployment of a containerlab topology by downloading the topology file, organizing it into a specific folder structure, and executing the containerlab deploy command with the appropriate arguments.

Parameters:

Name Type Description Default
topology str

The path to the topology file to be deployed.

required
reconfigure bool

If True, reconfigures an already deployed lab. Defaults to False.

False
timeout int

The timeout in seconds for the deployment process. Defaults to None (no timeout).

None
node_filter str

A filter to specify which nodes to deploy.

None

Returns:

Name Type Description
Result Result

deployment results with a list of nodes deployed

Raises:

Type Description
Exception

If the topology file cannot be fetched.

Source code in norfab\workers\containerlab_worker.py
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
@Task(input=DeployTask, output=DeployTaskResponse)
def deploy(
    self,
    topology: str,
    reconfigure: bool = False,
    timeout: int = None,
    node_filter: str = None,
) -> Result:
    """
    Deploys a containerlab topology.

    This method handles the deployment of a containerlab topology by downloading
    the topology file, organizing it into a specific folder structure, and executing
    the `containerlab deploy` command with the appropriate arguments.

    Args:
        topology (str): The path to the topology file to be deployed.
        reconfigure (bool, optional): If True, reconfigures an already deployed lab.
            Defaults to False.
        timeout (int, optional): The timeout in seconds for the deployment process.
            Defaults to None (no timeout).
        node_filter (str, optional): A filter to specify which nodes to deploy.

    Returns:
        Result: deployment results with a list of nodes deployed

    Raises:
        Exception: If the topology file cannot be fetched.
    """
    ret = Result(task=f"{self.name}:deploy")

    # create folder to store topology
    topology_folder = os.path.split(os.path.split(topology)[0])[-1]
    topology_folder = os.path.join(self.topologies_dir, topology_folder)
    os.makedirs(topology_folder, exist_ok=True)

    # download topology file
    topology_file = os.path.join(topology_folder, os.path.split(topology)[-1])
    downloaded_topology_file = self.fetch_file(
        topology, raise_on_fail=True, read=False
    )
    os.rename(
        downloaded_topology_file, topology_file
    )  # move tpology file under desired folder

    # form command arguments
    args = ["containerlab", "deploy", "-f", "json", "-t", topology_file]
    if reconfigure is True:
        args.append("--reconfigure")
        self.event(f"Re-deploying lab {os.path.split(topology_file)[-1]}")
    else:
        self.event(f"Deploying lab {os.path.split(topology_file)[-1]}")
    if node_filter is not None:
        args.append("--node-filter")
        args.append(node_filter)

    # add needed env variables
    env = dict(os.environ)
    env["CLAB_VERSION_CHECK"] = "disable"

    # run containerlab command
    return self.run_containerlab_command(
        args, cwd=topology_folder, timeout=timeout, ret=ret, env=env
    )

destroy_lab(lab_name: str, timeout: int = None) -> Result ยค

Destroys a specified lab.

Parameters:

Name Type Description Default
lab_name str

The name of the lab to be destroyed.

required
timeout int

The timeout value in seconds for the operation. Defaults to None.

None

Returns:

Name Type Description
Result Result

An object containing the status of the operation, errors (if any), and the result indicating whether the lab was successfully destroyed.

Behavior
  • Retrieves the lab details using the inspect method.
  • If the lab is not found, marks the operation as failed and returns an error.
  • If the lab is found, retrieves the topology file and its folder.
  • Executes the containerlab destroy command using the topology file.
  • Updates the result to indicate success or failure of the destruction process.
Source code in norfab\workers\containerlab_worker.py
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
def destroy_lab(self, lab_name: str, timeout: int = None) -> Result:
    """
    Destroys a specified lab.

    Args:
        lab_name (str): The name of the lab to be destroyed.
        timeout (int, optional): The timeout value in seconds for the operation. Defaults to None.

    Returns:
        Result: An object containing the status of the operation, errors (if any),
                and the result indicating whether the lab was successfully destroyed.

    Behavior:
        - Retrieves the lab details using the `inspect` method.
        - If the lab is not found, marks the operation as failed and returns an error.
        - If the lab is found, retrieves the topology file and its folder.
        - Executes the `containerlab destroy` command using the topology file.
        - Updates the result to indicate success or failure of the destruction process.
    """
    ret = Result(task=f"{self.name}:destroy_lab")

    # get lab details
    inspect = self.inspect(timeout=timeout, lab_name=lab_name, details=True)

    if not inspect.result:
        ret.failed = True
        ret.errors = [f"'{lab_name}' lab not found"]
        ret.result = {lab_name: False}
    else:
        topology_file = inspect.result[0]["Labels"]["clab-topo-file"]
        topology_folder = os.path.split(topology_file)[0]

        # run destroy command
        args = ["containerlab", "destroy", "-t", topology_file]
        ret = self.run_containerlab_command(
            args, cwd=topology_folder, timeout=timeout, ret=ret
        )

        if not ret.failed:
            ret.result = {lab_name: True}

    return ret

inspect(lab_name: str = None, timeout: int = None, details: bool = False) -> Result ยค

Inspect the container lab containers configuration and status.

This method retrieves information about a specific container lab or all container labs, optionally including detailed information.

Parameters:

Name Type Description Default
lab_name str

The name of the container lab to inspect. If not provided, all container labs will be inspected.

None
timeout int

The maximum time in seconds to wait for the inspection command to complete. Defaults to None.

None
details bool

Whether to include detailed information in the inspection output. Defaults to False.

False

Returns:

Name Type Description
Result Result

An object containing the result of the inspection task.

Source code in norfab\workers\containerlab_worker.py
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
def inspect(
    self, lab_name: str = None, timeout: int = None, details: bool = False
) -> Result:
    """
    Inspect the container lab containers configuration and status.

    This method retrieves information about a specific container lab or all
    container labs, optionally including detailed information.

    Args:
        lab_name (str, optional): The name of the container lab to inspect.
            If not provided, all container labs will be inspected.
        timeout (int, optional): The maximum time in seconds to wait for the
            inspection command to complete. Defaults to None.
        details (bool, optional): Whether to include detailed information in
            the inspection output. Defaults to False.

    Returns:
        Result: An object containing the result of the inspection task.
    """
    ret = Result(task=f"{self.name}:inspect")

    if lab_name:
        args = ["containerlab", "inspect", "-f", "json", "--name", lab_name]
    else:
        args = ["containerlab", "inspect", "-f", "json", "--all"]
    if details:
        args.append("--details")

    ret = self.run_containerlab_command(args, timeout=timeout, ret=ret)

    return ret

save(lab_name: str, timeout: int = None) -> Result ยค

Saves the config of a specified lab devices by invoking the containerlab save command.

Parameters:

Name Type Description Default
lab_name str

The name of the lab to save.

required
timeout int

The maximum time in seconds to wait for the operation to complete. Defaults to None.

None

Returns:

Name Type Description
Result Result

An object containing the outcome of the save operation. If successful, result will contain a dictionary with the lab name as the key and True as the value. If unsuccessful, failed will be set to True, and errors will contain a list of error messages.

Source code in norfab\workers\containerlab_worker.py
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
def save(self, lab_name: str, timeout: int = None) -> Result:
    """
    Saves the config of a specified lab devices by invoking the `containerlab save` command.

    Args:
        lab_name (str): The name of the lab to save.
        timeout (int, optional): The maximum time in seconds to wait for the operation
            to complete. Defaults to None.

    Returns:
        Result: An object containing the outcome of the save operation. If successful,
            `result` will contain a dictionary with the lab name as the key and `True`
            as the value. If unsuccessful, `failed` will be set to True, and `errors`
            will contain a list of error messages.
    """
    ret = Result(task=f"{self.name}:save")

    # get lab details
    inspect = self.inspect(timeout=timeout, lab_name=lab_name, details=True)

    if not inspect.result:
        ret.failed = True
        ret.errors = [f"'{lab_name}' lab not found"]
        ret.result = {lab_name: False}
    else:
        topology_file = inspect.result[0]["Labels"]["clab-topo-file"]
        topology_folder = os.path.split(topology_file)[0]

        # run destroy command
        args = ["containerlab", "save", "-t", topology_file]
        ret = self.run_containerlab_command(
            args, cwd=topology_folder, timeout=timeout, ret=ret
        )

        if not ret.failed:
            ret.result = {lab_name: True}

    return ret

restart_lab(lab_name: str, timeout: int = None) -> Result ยค

Restart a specified Containerlab lab.

This method retrieves the lab details, destroys the existing lab, and redeploys it using the provided topology file.

Parameters:

Name Type Description Default
lab_name str

The name of the lab to restart.

required
timeout int

The timeout value for the operation in seconds. Defaults to None.

None

Returns:

Name Type Description
Result Result

An object containing the status of the operation, any errors encountered, and the result indicating whether the lab was successfully restarted.

Source code in norfab\workers\containerlab_worker.py
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
def restart_lab(self, lab_name: str, timeout: int = None) -> Result:
    """
    Restart a specified Containerlab lab.

    This method retrieves the lab details, destroys the existing lab, and redeploys it
    using the provided topology file.

    Args:
        lab_name (str): The name of the lab to restart.
        timeout (int, optional): The timeout value for the operation in seconds. Defaults to None.

    Returns:
        Result: An object containing the status of the operation, any errors encountered,
                and the result indicating whether the lab was successfully restarted.
    """
    ret = Result(task=f"{self.name}:restart_lab")

    # get lab details
    inspect = self.inspect(timeout=timeout, lab_name=lab_name, details=True)

    if not inspect.result:
        ret.failed = True
        ret.errors = [f"'{lab_name}' lab not found"]
        ret.result = {lab_name: False}
    else:
        topology_file = inspect.result[0]["Labels"]["clab-topo-file"]
        topology_folder = os.path.split(topology_file)[0]

        # add needed env variables
        env = dict(os.environ)
        env["CLAB_VERSION_CHECK"] = "disable"

        # run destroy command
        args = [
            "containerlab",
            "deploy",
            "-f",
            "json",
            "-t",
            topology_file,
            "--reconfigure",
        ]
        ret = self.run_containerlab_command(
            args, cwd=topology_folder, timeout=timeout, ret=ret, env=env
        )

        if not ret.failed:
            ret.result = {lab_name: True}

    return ret

get_nornir_inventory(lab_name: str = None, timeout: int = None, groups: list = None, use_default_credentials: bool = True) -> Result ยค

Retrieves the Nornir inventory for a specified lab.

This method inspects the container lab environment and generates a Nornir-compatible inventory of hosts based on the lab's configuration. It maps containerlab node kinds to Netmiko SSH platform types and extracts relevant connection details.

Parameters:

Name Type Description Default
lab_name str

The name of the container lab to inspect. If not given loads inventory for all labs.

None
timeout int

The timeout value for the inspection operation. Defaults to None.

None
groups list

A list of group names to assign to the hosts in the inventory. Defaults to None.

None
use_default_credentials bool

Use Containerlab default credentials for hosts ot not.

True

Returns:

Name Type Description
Result Result

A Result object containing the Nornir inventory. The result attribute

Result

includes a dictionary with host details. If the lab is not found or an error occurs,

Result

the failed attribute is set to True, and the errors attribute contains error messages.

Notes
  • The method uses a predefined mapping (norfab.utils.platform_map) to translate containerlab node kinds to Netmiko platform types.
  • If a container's SSH port cannot be determined, it is skipped, and an error is logged.
  • The primary host IP address is determined dynamically using a UDP socket connection or by checking the host IP address in the container's port configuration.
Example of returned inventory structure

{ "hosts": { "host_name": { "hostname": "host_ip", "platform": "netmiko_platform", "groups": ["group1", "group2"], }, ...

Source code in norfab\workers\containerlab_worker.py
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
def get_nornir_inventory(
    self,
    lab_name: str = None,
    timeout: int = None,
    groups: list = None,
    use_default_credentials: bool = True,
) -> Result:
    """
    Retrieves the Nornir inventory for a specified lab.

    This method inspects the container lab environment and generates a Nornir-compatible
    inventory of hosts based on the lab's configuration. It maps containerlab node kinds
    to Netmiko SSH platform types and extracts relevant connection details.

    Args:
        lab_name (str): The name of the container lab to inspect. If not given loads inventory for all labs.
        timeout (int, optional): The timeout value for the inspection operation. Defaults to None.
        groups (list, optional): A list of group names to assign to the hosts in the inventory. Defaults to None.
        use_default_credentials (bool, optional): Use Containerlab default credentials for hosts ot not.

    Returns:
        Result: A `Result` object containing the Nornir inventory. The `result` attribute
        includes a dictionary with host details. If the lab is not found or an error occurs,
        the `failed` attribute is set to True, and the `errors` attribute contains error messages.

    Notes:
        - The method uses a predefined mapping (`norfab.utils.platform_map`) to translate containerlab
          node kinds to Netmiko platform types.
        - If a container's SSH port cannot be determined, it is skipped, and an error is logged.
        - The primary host IP address is determined dynamically using a UDP socket connection or
          by checking the host IP address in the container's port configuration.

    Example of returned inventory structure:
        {
            "hosts": {
                "host_name": {
                    "hostname": "host_ip",
                    "platform": "netmiko_platform",
                    "groups": ["group1", "group2"],
                },
                ...
    """
    groups = groups or []
    ret = Result(task=f"{self.name}:get_nornir_inventory", result={"hosts": {}})

    # get lab details
    inspect = self.inspect(lab_name=lab_name, timeout=timeout, details=True)

    # return empty result if lab not found
    if not inspect.result:
        ret.failed = True
        ret.errors = [f"'{lab_name}' lab not found"]
        return ret

    # get host primary IP address
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
    s.connect(("1.2.3.4", 12345))
    primary_host_ip = s.getsockname()[0]
    log.debug(
        f"{self.name} - determined Containerlab host primary IP - '{primary_host_ip}'"
    )

    # form hosts inventory
    for container in inspect.result:
        host_name = container["Labels"]["clab-node-name"]
        host_port = None
        host_ip = None

        # get ssh port
        for port in container["Ports"]:
            host_ip = primary_host_ip
            if port["port"] == 22 and port["protocol"] == "tcp":
                host_port = port["host_port"]
                # get host ip address
                if port["host_ip"] not in [
                    "0.0.0.0",
                    "127.0.0.1",
                    "localhost",
                    "::",
                ]:
                    host_ip = port["host_ip"]
                break
        else:
            log.error(f"{self.name} - {host_name} failed to map SSH port.")
            continue

        # add host to Nornir inventory
        ret.result["hosts"][host_name] = {
            "hostname": host_ip,
            "port": host_port,
            "groups": groups,
        }

        # get netmiko platform
        clab_platform_name = container["Labels"]["clab-node-kind"]
        netmiko_platform = PlatformMap.convert(
            "containerlab", "netmiko", clab_platform_name
        )
        if netmiko_platform:
            ret.result["hosts"][host_name]["platform"] = netmiko_platform[
                "platform"
            ]
        else:
            log.warning(
                f"{self.name} - {host_name} clab-node-kind '{clab_platform_name}' not mapped to Netmiko platform."
            )
            continue

        # get default credentials
        if use_default_credentials:
            clab_platform = PlatformMap.get("containerlab", clab_platform_name)
            if not clab_platform:
                log.warning(
                    f"{self.name} - {host_name} clab-node-kind '{clab_platform_name}' not found."
                )
                continue
            if clab_platform.get("username"):
                ret.result["hosts"][host_name]["username"] = clab_platform[
                    "username"
                ]
            if clab_platform.get("password"):
                ret.result["hosts"][host_name]["password"] = clab_platform[
                    "password"
                ]

    return ret

deploy_netbox(lab_name: str = None, tenant: str = None, filters: list = None, devices: list = None, instance: str = None, image: str = None, ipv4_subnet: str = '172.100.100.0/24', ports: tuple = (12000, 15000), progress: bool = False, reconfigure: bool = False, timeout: int = 600, node_filter: str = None, dry_run: bool = False) -> Result ยค

Deploys a containerlab topology using device data from the Netbox database.

This method orchestrates the deployment of a containerlab topology by:

  • Inspecting existing containers to determine subnets and ports in use.
  • Allocating a management IPv4 subnet for the new lab, avoiding conflicts.
  • Downloading inventory data from Netbox for the specified lab and filters.
  • Saving the generated topology file to a dedicated folder.
  • Executing the containerlab deploy command with appropriate arguments.

To retrieve topology data from Netbox at least one of these arguments must be provided to identify a set of devices to include into Containerlab topology:

  • tenant - deploys lab using all devices and links that belong to this tenant
  • devices - lab deployed only using devices in the lists
  • filters - list of device filters to retrieve from Netbox

If multiple of above arguments provided, resulting lab topology is a sum of all devices matched.

Parameters:

Name Type Description Default
lab_name str

The name to use for the lab to deploy.

None
tenant str

Deploy lab for given tenant, lab name if not set becomes equal to tenant name.

None
filters list

List of filters to apply when fetching devices from Netbox.

None
devices list

List of specific devices to include in the topology.

None
instance str

Netbox instance identifier.

None
image str

Container image to use for devices.

None
ipv4_subnet str

Management IPv4 subnet for the lab.

'172.100.100.0/24'
ports tuple

Tuple specifying the range of ports to allocate.

(12000, 15000)
progress bool

If True, emits progress events.

False
reconfigure bool

If True, reconfigures an already deployed lab.

False
timeout int

Timeout in seconds for the deployment process.

600
node_filter str

Comma-separated string of nodes to deploy.

None
dry_run bool

If True, only generates and returns the topology inventory without deploying.

False

Returns:

Name Type Description
Result Result

deployment results with a list of nodes deployed

Raises:

Type Description
Exception

If the topology file cannot be fetched.

Source code in norfab\workers\containerlab_worker.py
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
def deploy_netbox(
    self,
    lab_name: str = None,
    tenant: str = None,
    filters: list = None,
    devices: list = None,
    instance: str = None,
    image: str = None,
    ipv4_subnet: str = "172.100.100.0/24",
    ports: tuple = (12000, 15000),
    progress: bool = False,
    reconfigure: bool = False,
    timeout: int = 600,
    node_filter: str = None,
    dry_run: bool = False,
) -> Result:
    """
    Deploys a containerlab topology using device data from the Netbox database.

    This method orchestrates the deployment of a containerlab topology by:

    - Inspecting existing containers to determine subnets and ports in use.
    - Allocating a management IPv4 subnet for the new lab, avoiding conflicts.
    - Downloading inventory data from Netbox for the specified lab and filters.
    - Saving the generated topology file to a dedicated folder.
    - Executing the `containerlab deploy` command with appropriate arguments.

    To retrieve topology data from Netbox at least one of these arguments must be provided
    to identify a set of devices to include into Containerlab topology:

    - `tenant` - deploys lab using all devices and links that belong to this tenant
    - `devices` - lab deployed only using devices in the lists
    - `filters` - list of device filters to retrieve from Netbox

    If multiple of above arguments provided, resulting lab topology is a sum of all
    devices matched.

    Args:
        lab_name (str, optional): The name to use for the lab to deploy.
        tenant (str, optional): Deploy lab for given tenant, lab name if not set
            becomes equal to tenant name.
        filters (list, optional): List of filters to apply when fetching devices from Netbox.
        devices (list, optional): List of specific devices to include in the topology.
        instance (str, optional): Netbox instance identifier.
        image (str, optional): Container image to use for devices.
        ipv4_subnet (str, optional): Management IPv4 subnet for the lab.
        ports (tuple, optional): Tuple specifying the range of ports to allocate.
        progress (bool, optional): If True, emits progress events.
        reconfigure (bool, optional): If True, reconfigures an already deployed lab.
        timeout (int, optional): Timeout in seconds for the deployment process.
        node_filter (str, optional): Comma-separated string of nodes to deploy.
        dry_run (bool, optional): If True, only generates and returns the topology
            inventory without deploying.

    Returns:
        Result: deployment results with a list of nodes deployed

    Raises:
        Exception: If the topology file cannot be fetched.
    """
    ret = Result(task=f"{self.name}:deploy_netbox")
    subnets_in_use = set()
    ports_in_use = {}

    # handle lab name and tenant name
    if lab_name is None and tenant:
        lab_name = tenant

    # inspect existing containers
    if progress:
        self.event(f"Checking existing containers")
    get_containers = self.inspect(details=True)
    if get_containers.failed is False:
        if progress:
            self.event(f"Existing containers found, retrieving details")
        for container in get_containers.result:
            clab_name = container["Labels"]["containerlab"]
            clab_topo = container["Labels"]["clab-topo-file"]
            node_name = container["Labels"]["clab-node-name"]
            # collect ports that are in use
            ports_in_use[node_name] = list(
                set(
                    [
                        f"{p['host_port']}:{p['port']}/{p['protocol']}"
                        for p in container["Ports"]
                        if "host_port" in p and "port" in p and "protocol" in p
                    ]
                )
            )
            # check existing subnets
            if (
                container["NetworkSettings"]["IPv4addr"]
                and container["NetworkSettings"]["IPv4pLen"]
            ):
                ip = ipaddress.ip_interface(
                    f"{container['NetworkSettings']['IPv4addr']}/"
                    f"{container['NetworkSettings']['IPv4pLen']}"
                )
                subnet = str(ip.network.with_prefixlen)
            else:
                with open(clab_topo, encoding="utf-8") as f:
                    clab_topo_data = yaml.safe_load(f.read())
                    if clab_topo_data.get("mgmt", {}).get("ipv4-subnet"):
                        subnet = clab_topo_data["mgmt"]["ipv4-subnet"]
                    else:
                        msg = f"{clab_name} lab {node_name} node failed to determine mgmt subnet"
                        log.warning(msg)
                        if progress:
                            self.event(msg, severity="WARNING")
                        continue
            subnets_in_use.add(subnet)
            # re-use existing lab subnet
            if clab_name == lab_name:
                ipv4_subnet = subnet
                if progress:
                    self.event(
                        f"{ipv4_subnet} not in use by existing containers, using it"
                    )
            # allocate new subnet if its in use by other lab
            elif clab_name != lab_name and ipv4_subnet == subnet:
                msg = f"{ipv4_subnet} already in use, allocating new subnet"
                log.info(msg)
                if progress:
                    self.event(msg)
                ipv4_subnet = None
        if progress:
            self.event(f"Collected TCP/UDP ports used by existing containers")

    # allocate new subnet
    if ipv4_subnet is None:
        pool = set(f"172.100.{i}.0/24" for i in range(100, 255))
        ipv4_subnet = list(sorted(pool.difference(subnets_in_use)))[0]
        if progress:
            self.event(f"{lab_name} allocated new subnet {ipv4_subnet}")

    if progress:
        self.event(f"{lab_name} fetching lab topology data from Netbox")

    # download inventory data from Netbox
    netbox_reply = self.client.run_job(
        service="netbox",
        task="get_containerlab_inventory",
        workers="any",
        timeout=timeout,
        retry=3,
        kwargs={
            "lab_name": lab_name,
            "tenant": tenant,
            "filters": filters,
            "devices": devices,
            "instance": instance,
            "image": image,
            "ipv4_subnet": ipv4_subnet,
            "ports": ports,
            "ports_map": ports_in_use,
            "progress": progress,
        },
    )

    # use inventory from first worker that returned hosts data
    for wname, wdata in netbox_reply.items():
        if wdata["failed"] is False and wdata["result"]:
            topology_inventory = wdata["result"]
            break
    else:
        msg = f"{self.name} - Netbox returned no data for '{lab_name}' lab"
        log.error(msg)
        raise RuntimeError(msg)

    if progress:
        self.event(f"{lab_name} topology data retrieved from Netbox")

    if dry_run is True:
        ret.result = topology_inventory
        return ret

    # create folder to store topology
    topology_folder = os.path.join(self.topologies_dir, lab_name)
    os.makedirs(topology_folder, exist_ok=True)

    # create topology file
    topology_file = os.path.join(topology_folder, f"{lab_name}.yaml")
    with open(topology_file, "w", encoding="utf-8") as tf:
        tf.write(yaml.dump(topology_inventory, default_flow_style=False))

    if progress:
        self.event(f"{lab_name} topology data saved to '{topology_file}'")

    # form command arguments
    args = ["containerlab", "deploy", "-f", "json", "-t", topology_file]
    if reconfigure is True:
        args.append("--reconfigure")
        if progress:
            self.event(
                f"{lab_name} re-deploying lab using {os.path.split(topology_file)[-1]} topology file"
            )
    else:
        if progress:
            self.event(
                f"{lab_name} deploying lab using {os.path.split(topology_file)[-1]} topology file"
            )
    if node_filter is not None:
        args.append("--node-filter")
        args.append(node_filter)

    # add needed env variables
    env = dict(os.environ)
    env["CLAB_VERSION_CHECK"] = "disable"

    # run containerlab command
    return self.run_containerlab_command(
        args, cwd=topology_folder, timeout=timeout, ret=ret, env=env
    )