Skip to content

Broker

NFPService(name: str) ¤

Bases: object

A single NFP Service

Source code in norfab\core\broker.py
35
36
37
def __init__(self, name: str):
    self.name = name  # Service name
    self.workers = []  # list of known workers

NFPWorker(address: str, socket, socket_lock, multiplier: int, keepalive: int, service: NFPService = None) ¤

Bases: object

An NFP Worker convenience class.

Attributes:

Name Type Description
service NFPService

The service instance.

ready bool

Indicates if the worker is ready.

exit_event Event

Event to signal exit.

keepalive int

Keepalive interval in milliseconds.

multiplier int

Multiplier value.

Methods:

Name Description
start_keepalives

Starts the keepalive process for the worker.

is_ready

Checks if the worker has signaled W.READY.

destroy

Cleans up the worker, optionally disconnecting it.

Parameters:

Name Type Description Default
address str

Address to route to.

required
socket

The socket object used for communication.

required
socket_lock

The lock object to synchronize socket access.

required
multiplier int

Multiplier value, e.g., 6 times.

required
keepalive int

Keepalive interval in milliseconds, e.g., 5000 ms.

required
service NFPService

The service instance. Defaults to None.

None
Source code in norfab\core\broker.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def __init__(
    self,
    address: str,
    socket,
    socket_lock,
    multiplier: int,  # e.g. 6 times
    keepalive: int,  # e.g. 5000 ms
    service: NFPService = None,
):
    self.address = address  # Address to route to
    self.service = service
    self.ready = False
    self.socket = socket
    self.exit_event = threading.Event()
    self.keepalive = keepalive
    self.multiplier = multiplier
    self.socket_lock = socket_lock

is_ready() ¤

Check if the worker is ready.

Returns:

Name Type Description
bool

True if the worker has signaled readiness (W.READY) and the service is not None, otherwise False.

Source code in norfab\core\broker.py
100
101
102
103
104
105
106
107
def is_ready(self):
    """
    Check if the worker is ready.

    Returns:
        bool: True if the worker has signaled readiness (W.READY) and the service is not None, otherwise False.
    """
    return self.service is not None and self.ready is True

destroy(disconnect=False) ¤

Clean up routine for the broker.

This method performs the following actions:

  1. Sets the exit event to signal termination.
  2. Stops the keepaliver if it exists.
  3. Removes the current worker from the service's worker list.
  4. Optionally sends a disconnect message to the broker if disconnect is True.

Parameters:

Name Type Description Default
disconnect bool

If True, sends a disconnect message to the broker.

False
Source code in norfab\core\broker.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def destroy(self, disconnect=False):
    """
    Clean up routine for the broker.

    This method performs the following actions:

    1. Sets the exit event to signal termination.
    2. Stops the keepaliver if it exists.
    3. Removes the current worker from the service's worker list.
    4. Optionally sends a disconnect message to the broker if `disconnect` is True.

    Args:
        disconnect (bool): If True, sends a disconnect message to the broker.
    """
    self.exit_event.set()
    if hasattr(self, "keepaliver"):
        self.keepaliver.stop()
    self.service.workers.remove(self)

    if disconnect is True:
        msg = [self.address, b"", NFP.WORKER, self.service.name, NFP.DISCONNECT]
        with self.socket_lock:
            self.socket.send_multipart(msg)

NFPBroker(endpoint: str, exit_event: Event, inventory: NorFabInventory, log_level: str = None, log_queue: object = None, multiplier: int = 6, keepalive: int = 2500, init_done_event: Event = None) ¤

Attributes:

Name Type Description
private_keys_dir str

Directory for private keys.

public_keys_dir str

Directory for public keys.

broker_private_key_file str

File path for broker's private key.

broker_public_key_file str

File path for broker's public key.

keepalive int

The keepalive interval.

multiplier int

The multiplier value.

services dict

A dictionary to store services.

workers dict

A dictionary to store workers.

exit_event Event

The event to signal the broker to exit.

inventory NorFabInventory

The inventory object.

base_dir str

The base directory path from the inventory.

broker_base_dir str

The broker's base directory path.

ctx Context

The ZeroMQ context.

auth ThreadAuthenticator

The authenticator for the ZeroMQ context.

socket Socket

The ZeroMQ socket.

poller Poller

The ZeroMQ poller.

socket_lock Lock

The lock to protect the socket object.

Methods:

Name Description
setup_logging

str) -> None: Method to apply logging configuration.

mediate

Main broker work happens here.

destroy

Disconnect all workers, destroy context.

delete_worker

Deletes worker from all data structures, and deletes worker.

purge_workers

Look for & delete expired workers.

send_to_worker

NFPWorker, command: bytes, sender: bytes, uuid: bytes, data: bytes): Send message to worker. If message is provided, sends that message.

send_to_client

str, command: str, service: str, message: list): Send message to client.

process_worker

Process message received from worker.

require_worker

Finds the worker, creates if necessary.

require_service

Locates the service (creates if necessary).

process_client

Process a request coming from a client.

filter_workers

bytes, service: NFPService) -> list: Helper function to filter workers.

dispatch

Dispatch requests to waiting workers as possible.

mmi_service

Handle internal service according to 8/MMI specification.

inventory_service

Handle inventory service requests.

file_sharing_service

Handle file sharing service requests.

Parameters:

Name Type Description Default
endpoint str

The endpoint address for the broker to bind to.

required
exit_event Event

An event to signal the broker to exit.

required
inventory NorFabInventory

The inventory object containing configuration and state.

required
log_level str

The logging level. Defaults to None.

None
log_queue object

The logging queue. Defaults to None.

None
multiplier int

A multiplier value for internal use. Defaults to 6.

6
keepalive int

The keepalive interval in milliseconds. Defaults to 2500.

2500
init_done_event Event

An event to signal that initialization is done. Defaults to None.

None
Source code in norfab\core\broker.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
def __init__(
    self,
    endpoint: str,
    exit_event: Event,
    inventory: NorFabInventory,
    log_level: str = None,
    log_queue: object = None,
    multiplier: int = 6,
    keepalive: int = 2500,
    init_done_event: Event = None,
):
    self.setup_logging(log_queue, log_level)
    self.keepalive = keepalive
    self.multiplier = multiplier
    init_done_event = init_done_event or Event()

    self.services = {}
    self.workers = {}
    self.exit_event = exit_event
    self.inventory = inventory

    self.base_dir = self.inventory.base_dir
    self.broker_base_dir = os.path.join(
        self.base_dir, "__norfab__", "files", "broker"
    )
    os.makedirs(self.base_dir, exist_ok=True)
    os.makedirs(self.broker_base_dir, exist_ok=True)

    # generate certificates, create directories and load certs
    generate_certificates(
        self.broker_base_dir, cert_name="broker", inventory=inventory
    )
    self.private_keys_dir = os.path.join(self.broker_base_dir, "private_keys")
    self.public_keys_dir = os.path.join(self.broker_base_dir, "public_keys")
    self.broker_private_key_file = os.path.join(
        self.private_keys_dir, "broker.key_secret"
    )
    self.broker_public_key_file = os.path.join(self.public_keys_dir, "broker.key")
    server_public, server_secret = zmq.auth.load_certificate(
        self.broker_private_key_file
    )

    self.ctx = zmq.Context()

    # Start an authenticator for this context.
    self.auth = ThreadAuthenticator(self.ctx)
    self.auth.start()
    # self.auth.allow("0.0.0.0")
    self.auth.allow_any = True
    # Tell the authenticator how to handle CURVE requests
    self.auth.configure_curve(location=zmq.auth.CURVE_ALLOW_ANY)

    self.socket = self.ctx.socket(zmq.ROUTER)
    self.socket.curve_secretkey = server_secret
    self.socket.curve_publickey = server_public
    self.socket.curve_server = True  # must come before bind
    self.socket.linger = 0
    self.poller = zmq.Poller()
    self.poller.register(self.socket, zmq.POLLIN)
    self.socket.bind(endpoint)
    self.socket_lock = (
        threading.Lock()
    )  # used for keepalives to protect socket object

    init_done_event.set()  # signal finished initializing broker
    log.debug(f"NFPBroker - is ready and listening on {endpoint}")

setup_logging(log_queue, log_level: str) -> None ¤

Configures logging for the application.

This method sets up the logging configuration using a provided log queue and log level. It updates the logging configuration dictionary with the given log queue and log level, and then applies the configuration using logging.config.dictConfig.

Parameters:

Name Type Description Default
log_queue Queue

The queue to be used for logging.

required
log_level str

The logging level to be set. If None, the default level is used.

required

Returns:

Type Description
None

None

Source code in norfab\core\broker.py
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
def setup_logging(self, log_queue, log_level: str) -> None:
    """
    Configures logging for the application.

    This method sets up the logging configuration using a provided log queue and log level.
    It updates the logging configuration dictionary with the given log queue and log level,
    and then applies the configuration using `logging.config.dictConfig`.

    Args:
        log_queue (queue.Queue): The queue to be used for logging.
        log_level (str): The logging level to be set. If None, the default level is used.

    Returns:
        None
    """
    logging_config_producer["handlers"]["queue"]["queue"] = log_queue
    if log_level is not None:
        logging_config_producer["root"]["level"] = log_level
    logging.config.dictConfig(logging_config_producer)

mediate() ¤

Main broker work happens here.

This method continuously polls for incoming messages and processes them based on their headers. It handles messages from clients and workers, and purges inactive workers periodically. The method also checks for an exit event to gracefully shut down the broker.

Raises:

Type Description
KeyboardInterrupt

If the process is interrupted by a keyboard signal.

Source code in norfab\core\broker.py
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
def mediate(self):
    """
    Main broker work happens here.

    This method continuously polls for incoming messages and processes them
    based on their headers. It handles messages from clients and workers,
    and purges inactive workers periodically. The method also checks for an
    exit event to gracefully shut down the broker.

    Raises:
        KeyboardInterrupt: If the process is interrupted by a keyboard signal.
    """
    while True:
        try:
            items = self.poller.poll(self.keepalive)
        except KeyboardInterrupt:
            break  # Interrupted

        if items:
            msg = self.socket.recv_multipart()
            log.debug(f"NFPBroker - received '{msg}'")

            sender = msg.pop(0)
            empty = msg.pop(0)
            header = msg.pop(0)

            if header == NFP.CLIENT:
                self.process_client(sender, msg)
            elif header == NFP.WORKER:
                self.process_worker(sender, msg)

        self.purge_workers()

        # check if need to stop
        if self.exit_event.is_set():
            self.destroy()
            break

destroy() ¤

Disconnect all workers and destroy the context.

This method performs the following actions:

  1. Logs an interrupt message indicating that the broker is being killed.
  2. Iterates through all
Source code in norfab\core\broker.py
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
def destroy(self):
    """
    Disconnect all workers and destroy the context.

    This method performs the following actions:

    1. Logs an interrupt message indicating that the broker is being killed.
    2. Iterates through all
    """
    log.info(f"NFPBroker - interrupt received, killing broker")
    for name in list(self.workers.keys()):
        # in case worker self destroyed while we iterating
        if self.workers.get(name):
            self.delete_worker(self.workers[name], True)
    self.auth.stop()
    self.ctx.destroy(0)

delete_worker(worker, disconnect) ¤

Deletes a worker from all data structures and destroys the worker.

Parameters:

Name Type Description Default
worker Worker

The worker instance to be deleted.

required
disconnect bool

A flag indicating whether to disconnect the worker before deletion.

required

Returns:

Type Description

None

Source code in norfab\core\broker.py
347
348
349
350
351
352
353
354
355
356
357
358
359
def delete_worker(self, worker, disconnect):
    """
    Deletes a worker from all data structures and destroys the worker.

    Args:
        worker (Worker): The worker instance to be deleted.
        disconnect (bool): A flag indicating whether to disconnect the worker before deletion.

    Returns:
        None
    """
    worker.destroy(disconnect)
    self.workers.pop(worker.address, None)

purge_workers() ¤

Look for and delete expired workers.

This method iterates through the list of workers and checks if each worker's keepalive thread is still alive. If a worker's keepalive thread is not alive, the worker is considered expired and is deleted from the list of workers. Additionally, a log message is generated indicating that the worker's keepalive has expired.

Note

The method handles the case where a worker might be destroyed while iterating through the list of workers.

Logging

Logs an info message when a worker's keepalive has expired, including the worker's address.

Source code in norfab\core\broker.py
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
def purge_workers(self):
    """
    Look for and delete expired workers.

    This method iterates through the list of workers and checks if each worker's
    keepalive thread is still alive. If a worker's keepalive thread is not alive,
    the worker is considered expired and is deleted from the list of workers.
    Additionally, a log message is generated indicating that the worker's keepalive
    has expired.

    Note:
        The method handles the case where a worker might be destroyed while
        iterating through the list of workers.

    Logging:
        Logs an info message when a worker's keepalive has expired, including the
        worker's address.
    """
    for name in list(self.workers.keys()):
        # in case worker self destroyed while we iterating
        if self.workers.get(name):
            w = self.workers[name]
        if not w.keepaliver.is_alive():
            self.delete_worker(w, False)
            log.info(
                f"NFPBroker - {w.address.decode(encoding='utf-8')} worker keepalives expired"
            )

send_to_worker(worker: NFPWorker, command: bytes, sender: bytes, uuid: bytes, data: bytes) -> None ¤

Send a message to a worker. If a message is provided, sends that message.

Parameters:

Name Type Description Default
worker NFPWorker

The worker to send the message to.

required
command bytes

The command to send (e.g., NFP.POST or NFP.GET).

required
sender bytes

The sender's identifier.

required
uuid bytes

The unique identifier for the message.

required
data bytes

The data to be sent with the message.

required
Logs

Logs an error if the command is invalid. Logs a debug message when sending the message to the worker.

Source code in norfab\core\broker.py
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
def send_to_worker(
    self, worker: NFPWorker, command: bytes, sender: bytes, uuid: bytes, data: bytes
) -> None:
    """
    Send a message to a worker. If a message is provided, sends that message.

    Args:
        worker (NFPWorker): The worker to send the message to.
        command (bytes): The command to send (e.g., NFP.POST or NFP.GET).
        sender (bytes): The sender's identifier.
        uuid (bytes): The unique identifier for the message.
        data (bytes): The data to be sent with the message.

    Logs:
        Logs an error if the command is invalid.
        Logs a debug message when sending the message to the worker.
    """
    # Stack routing and protocol envelopes to start of message
    if command == NFP.POST:
        msg = [worker.address, b"", NFP.WORKER, NFP.POST, sender, b"", uuid, data]
    elif command == NFP.GET:
        msg = [worker.address, b"", NFP.WORKER, NFP.GET, sender, b"", uuid, data]
    else:
        log.error(f"NFPBroker - invalid worker command: {command}")
        return
    with self.socket_lock:
        log.debug(f"NFPBroker - sending to worker '{msg}'")
        self.socket.send_multipart(msg)

send_to_client(client: str, command: str, service: str, message: list) -> None ¤

Send a message to a specified client.

Parameters:

Name Type Description Default
client str

The identifier of the client to send the message to.

required
command str

The command type, either 'RESPONSE' or 'EVENT'.

required
service str

The service associated with the message.

required
message list

The message content to be sent.

required
Source code in norfab\core\broker.py
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
def send_to_client(
    self, client: str, command: str, service: str, message: list
) -> None:
    """
    Send a message to a specified client.

    Args:
        client (str): The identifier of the client to send the message to.
        command (str): The command type, either 'RESPONSE' or 'EVENT'.
        service (str): The service associated with the message.
        message (list): The message content to be sent.
    """

    # Stack routing and protocol envelopes to start of message
    if command == NFP.RESPONSE:
        msg = [client, b"", NFP.CLIENT, NFP.RESPONSE, service] + message
    elif command == NFP.EVENT:
        msg = [client, b"", NFP.CLIENT, NFP.EVENT, service] + message
    else:
        log.error(f"NFPBroker - invalid client command: {command}")
        return
    with self.socket_lock:
        log.debug(f"NFPBroker - sending to client '{msg}'")
        self.socket.send_multipart(msg)

process_worker(sender: str, msg: list) ¤

Process message received from worker.

Parameters:

Name Type Description Default
sender str

The identifier of the sender (worker).

required
msg list

The message received from the worker, where the first element is the command.

required

Commands:

  • NFP.READY: Marks the worker as ready and assigns a service to it.
  • NFP.RESPONSE: Sends a response to a client.
  • NFP.KEEPALIVE: Processes a keepalive message from the worker.
  • NFP.DISCONNECT: Handles worker disconnection.
  • NFP.EVENT: Sends an event to a client.

If the worker is not ready and an invalid command is received, the worker is deleted.

Raises:

Type Description
AttributeError

If the worker does not have the required attributes for certain commands.

IndexError

If the message list does not contain the expected number of elements for certain commands.

Source code in norfab\core\broker.py
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
def process_worker(self, sender: str, msg: list):
    """
    Process message received from worker.

    Parameters:
        sender (str): The identifier of the sender (worker).
        msg (list): The message received from the worker, where the first element is the command.

    Commands:

    - NFP.READY: Marks the worker as ready and assigns a service to it.
    - NFP.RESPONSE: Sends a response to a client.
    - NFP.KEEPALIVE: Processes a keepalive message from the worker.
    - NFP.DISCONNECT: Handles worker disconnection.
    - NFP.EVENT: Sends an event to a client.

    If the worker is not ready and an invalid command is received, the worker is deleted.

    Raises:
        AttributeError: If the worker does not have the required attributes for certain commands.
        IndexError: If the message list does not contain the expected number of elements for certain commands.
    """
    command = msg.pop(0)
    worker = self.require_worker(sender)

    if NFP.READY == command and not worker.is_ready():
        service = msg.pop(0)
        worker.service = self.require_service(service)
        worker.ready = True
        worker.start_keepalives()
        worker.service.workers.append(worker)
    elif NFP.RESPONSE == command and worker.is_ready():
        client = msg.pop(0)
        empty = msg.pop(0)
        self.send_to_client(client, NFP.RESPONSE, worker.service.name, msg)
    elif NFP.KEEPALIVE == command and hasattr(worker, "keepaliver"):
        worker.keepaliver.received_heartbeat([worker.address] + msg)
    elif NFP.DISCONNECT == command and worker.is_ready():
        self.delete_worker(worker, False)
    elif NFP.EVENT == command and worker.is_ready():
        client = msg.pop(0)
        empty = msg.pop(0)
        self.send_to_client(client, NFP.EVENT, worker.service.name, msg)
    elif not worker.is_ready():
        self.delete_worker(worker, disconnect=True)
    else:
        log.error(f"NFPBroker - invalid message: {msg}")

require_worker(address) ¤

Finds the worker associated with the given address, creating a new worker if necessary.

Parameters:

Name Type Description Default
address str

The address of the worker to find or create.

required

Returns:

Name Type Description
NFPWorker

The worker associated with the given address.

Source code in norfab\core\broker.py
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
def require_worker(self, address):
    """
    Finds the worker associated with the given address, creating a new worker if necessary.

    Args:
        address (str): The address of the worker to find or create.

    Returns:
        NFPWorker: The worker associated with the given address.
    """
    if not self.workers.get(address):
        self.workers[address] = NFPWorker(
            address=address,
            socket=self.socket,
            multiplier=self.multiplier,
            keepalive=self.keepalive,
            socket_lock=self.socket_lock,
        )
        log.info(
            f"NFPBroker - registered new worker {address.decode(encoding='utf-8')}"
        )

    return self.workers[address]

require_service(name) ¤

Locates the service by name, creating it if necessary.

Parameters:

Name Type Description Default
name str

The name of the service to locate or create.

required

Returns:

Name Type Description
NFPService

The located or newly created service instance.

Source code in norfab\core\broker.py
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
def require_service(self, name):
    """
    Locates the service by name, creating it if necessary.

    Args:
        name (str): The name of the service to locate or create.

    Returns:
        NFPService: The located or newly created service instance.
    """
    if not self.services.get(name):
        service = NFPService(name)
        self.services[name] = service
        log.debug(
            f"NFPBroker - registered new service {name.decode(encoding='utf-8')}"
        )

    return self.services[name]

process_client(sender: str, msg: list) -> None ¤

Process a request coming from a client.

Parameters:

Name Type Description Default
sender str

The identifier of the client sending the request.

required
msg list

The message received from the client, expected to be a list where the first five elements are: - command (str): The command issued by the client. - service (str): The service to which the command is directed. - target (str): The target of the command. - uuid (str): The unique identifier for the request. - data (any): The data associated with the request.

required

Raises:

Type Description
ValueError

If the command is not recognized as a valid client command.

The method processes the command by
  • Checking if the command is valid.
  • Routing the request to the appropriate service handler based on the service specified.
  • Sending an error message back to the client if the command is unsupported.
Source code in norfab\core\broker.py
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
def process_client(self, sender: str, msg: list) -> None:
    """
    Process a request coming from a client.

    Args:
        sender (str): The identifier of the client sending the request.
        msg (list): The message received from the client, expected to be a list where the first five elements are:
            - command (str): The command issued by the client.
            - service (str): The service to which the command is directed.
            - target (str): The target of the command.
            - uuid (str): The unique identifier for the request.
            - data (any): The data associated with the request.

    Raises:
        ValueError: If the command is not recognized as a valid client command.

    The method processes the command by:
        - Checking if the command is valid.
        - Routing the request to the appropriate service handler based on the service specified.
        - Sending an error message back to the client if the command is unsupported.
    """
    command = msg.pop(0)
    service = msg.pop(0)
    target = msg.pop(0)
    uuid = msg.pop(0)
    data = msg.pop(0)

    # check if valid command from client
    if command not in NFP.client_commands:
        message = f"NFPBroker - Unsupported client command '{command}'"
        log.error(message)
        self.send_to_client(
            sender, NFP.RESPONSE, service, [message.encode("utf-8")]
        )
    # Management Interface
    elif service == b"mmi.service.broker":
        self.mmi_service(sender, command, target, uuid, data)
    elif service == b"sid.service.broker":
        self.inventory_service(sender, command, target, uuid, data)
    elif service == b"fss.service.broker":
        self.file_sharing_service(sender, command, target, uuid, data)
    else:
        self.dispatch(
            sender, command, self.require_service(service), target, uuid, data
        )

filter_workers(target: bytes, service: NFPService) -> list ¤

Helper function to filter workers

Parameters:

Name Type Description Default
target bytes

bytest string, workers target

required
service NFPService

NFPService object

required
Source code in norfab\core\broker.py
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
def filter_workers(self, target: bytes, service: NFPService) -> list:
    """
    Helper function to filter workers

    Args:
        target: bytest string, workers target
        service: NFPService object
    """
    ret = []
    if not service.workers:
        log.warning(
            f"NFPBroker - '{service.name}' has no active workers registered, try later"
        )
        ret = []
    elif target == b"any":
        ret = [service.workers[random.randint(0, len(service.workers) - 1)]]
    elif target == b"all":
        ret = service.workers
    elif target in self.workers:  # single worker
        ret = [self.workers[target]]
    else:  # target list of workers
        try:
            target = json.loads(target)
            if isinstance(target, list):
                for w in target:
                    w = w.encode("utf-8")
                    if w in self.workers:
                        ret.append(self.workers[w])
                ret = list(set(ret))  # dedup workers
        except Exception as e:
            log.error(
                f"NFPBroker - Failed to load target '{target}' with error '{e}'"
            )
    return ret

dispatch(sender: str, command: bytes, service: object, target: Union[str, List[str]], uuid: str, data: Any) -> None ¤

Dispatch requests to waiting workers as possible

Parameters:

Name Type Description Default
sender str

The sender of the request.

required
command bytes

The command to be executed by the workers.

required
service Service

The service object associated with the request.

required
target str

A string indicating the addresses of the workers to dispatch to.

required
uuid str

A unique identifier for the request.

required
data Any

The data to be sent to the workers.

required
Source code in norfab\core\broker.py
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
def dispatch(
    self,
    sender: str,
    command: bytes,
    service: object,
    target: Union[str, List[str]],
    uuid: str,
    data: Any,
) -> None:
    """
    Dispatch requests to waiting workers as possible

    Args:
        sender (str): The sender of the request.
        command (bytes): The command to be executed by the workers.
        service (Service): The service object associated with the request.
        target (str): A string indicating the addresses of the workers to dispatch to.
        uuid (str): A unique identifier for the request.
        data (Any): The data to be sent to the workers.
    """
    log.debug(
        f"NFPBroker - dispatching request to workers: sender '{sender}', "
        f"command '{command}', service '{service.name}', target '{target}'"
        f"data '{data}', uuid '{uuid}'"
    )
    self.purge_workers()
    workers = self.filter_workers(target, service)

    # handle case when service has no workers registered
    if not workers:
        message = f"NFPBroker - {service.name} service failed to target workers '{target}'"
        log.error(message)
        self.send_to_client(
            sender,
            NFP.RESPONSE,
            service.name,
            [uuid, b"400", message.encode("utf-8")],
        )
    else:
        # inform client that JOB dispatched
        w_addresses = [w.address.decode("utf-8") for w in workers]
        self.send_to_client(
            sender,
            NFP.RESPONSE,
            service.name,
            [
                uuid,
                b"202",
                json.dumps(
                    {
                        "workers": w_addresses,
                        "uuid": uuid.decode("utf-8"),
                        "target": target.decode("utf-8"),
                        "status": "DISPATCHED",
                        "service": service.name.decode("utf-8"),
                    }
                ).encode("utf-8"),
            ],
        )
        # send job to workers
        for worker in workers:
            self.send_to_worker(worker, command, sender, uuid, data)

mmi_service(sender, command, target, uuid, data) ¤

Handle internal broker Management Interface (MMI) service tasks.

Parameters:

Name Type Description Default
sender str

The sender of the request.

required
command str

The command to be executed.

required
target str

The target of the command.

required
uuid str

The unique identifier for the request.

required
data str

The data payload in JSON format.

required

Supported MMI Tasks:

  • "show_workers": Returns a list of workers with their details.
  • "show_broker": Returns broker details including endpoint, status, keepalives, workers count, services count, directories, and security.
  • "show_broker_version": Returns the version of various packages and the platform.
  • "show_broker_inventory": Returns the broker's inventory.

The response is sent back to the client in a format of JSON formatted string.

Source code in norfab\core\broker.py
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
def mmi_service(self, sender, command, target, uuid, data):
    """
    Handle internal broker Management Interface (MMI) service tasks.

    Parameters:
        sender (str): The sender of the request.
        command (str): The command to be executed.
        target (str): The target of the command.
        uuid (str): The unique identifier for the request.
        data (str): The data payload in JSON format.

    Supported MMI Tasks:

    - "show_workers": Returns a list of workers with their details.
    - "show_broker": Returns broker details including endpoint, status, keepalives, workers count, services count, directories, and security.
    - "show_broker_version": Returns the version of various packages and the platform.
    - "show_broker_inventory": Returns the broker's inventory.

    The response is sent back to the client in a format of JSON formatted string.
    """
    log.debug(
        f"mmi.service.broker - processing request: sender '{sender}', "
        f"command '{command}', target '{target}'"
        f"data '{data}', uuid '{uuid}'"
    )
    data = json.loads(data)
    task = data.get("task")
    args = data.get("args", [])
    kwargs = data.get("kwargs", {})
    ret = f"Unsupported task '{task}'"
    if task == "show_workers":
        if self.workers:
            ret = [
                {
                    "name": w.address.decode("utf-8"),
                    "service": w.service.name.decode("utf-8"),
                    "status": "alive" if w.keepaliver.is_alive() else "dead",
                    "holdtime": str(w.keepaliver.show_holdtime()),
                    "keepalives tx/rx": f"{w.keepaliver.keepalives_send} / {w.keepaliver.keepalives_received}",
                    "alive (s)": str(w.keepaliver.show_alive_for()),
                }
                for k, w in self.workers.items()
            ]
            # filter reply
            service = kwargs.get("service")
            status = kwargs.get("status")
            if service and service != "all":
                ret = [w for w in ret if w["service"] == service]
            if status in ["alive", "dead"]:
                ret = [w for w in ret if w["status"] == status]
            if not ret:
                ret = [{"name": "", "service": "", "status": ""}]
        else:
            ret = [{"name": "", "service": "", "status": ""}]
    elif task == "show_broker":
        ret = {
            "endpoint": self.socket.getsockopt_string(zmq.LAST_ENDPOINT),
            "status": "active",
            "keepalives": {
                "interval": self.keepalive,
                "multiplier": self.multiplier,
            },
            "workers count": len(self.workers),
            "services count": len(self.services),
            "directories": {
                "base-dir": self.base_dir,
                "private-keys-dir": self.private_keys_dir,
                "public-keys-dir": self.public_keys_dir,
            },
            "security": {
                "broker-private-key-file": self.broker_private_key_file,
                "broker-public-key-file": self.broker_public_key_file,
            },
        }
    elif task == "show_broker_version":
        ret = {
            "norfab": "",
            "pyyaml": "",
            "pyzmq": "",
            "psutil": "",
            "tornado": "",
            "jinja2": "",
            "python": sys.version.split(" ")[0],
            "platform": sys.platform,
        }
        # get version of packages installed
        for pkg in ret.keys():
            try:
                ret[pkg] = importlib.metadata.version(pkg)
            except importlib.metadata.PackageNotFoundError:
                pass
    elif task == "show_broker_inventory":
        ret = self.inventory.dict()
    reply = json.dumps(ret).encode("utf-8")
    self.send_to_client(
        sender, NFP.RESPONSE, b"mmi.service.broker", [uuid, b"200", reply]
    )