Skip to content

Netbox GrapQL Inventory Task¤

task api name: graphql

GrapQL Sample Usage¤

NORFAB Netbox GrapQL Command Shell Reference¤

NorFab shell supports these command options for Netbox graphql task:

Python API Reference¤

graphql¤

Function to query Netbox v3 or Netbox v4 GraphQL API.

Parameters:

Name Type Description Default
job Job

NorFab Job object containing relevant metadata

required
instance Union[None, str]

Netbox instance name

None
dry_run bool

only return query content, do not run it

False
obj Union[str, dict]

Object to query

None
filters Union[None, dict, str]

Filters to apply to the query

None
fields Union[None, list]

Fields to retrieve in the query

None
queries Union[None, dict]

Dictionary of queries to execute

None
query_string str

Raw query string to execute

None

Returns:

Name Type Description
dict Result

GraphQL request data returned by Netbox

Raises:

Type Description
RuntimeError

If required arguments are not provided

Exception

If GraphQL query fails

Source code in norfab\workers\netbox_worker\graphql_tasks.py
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
@Task(
    fastapi={"methods": ["POST"], "schema": NetboxFastApiArgs.model_json_schema()}
)
def graphql(
    self,
    job: Job,
    instance: Union[None, str] = None,
    dry_run: bool = False,
    obj: Union[str, dict] = None,
    filters: Union[None, dict, str] = None,
    fields: Union[None, list] = None,
    queries: Union[None, dict] = None,
    query_string: str = None,
) -> Result:
    """
    Function to query Netbox v3 or Netbox v4 GraphQL API.

    Args:
        job: NorFab Job object containing relevant metadata
        instance: Netbox instance name
        dry_run: only return query content, do not run it
        obj: Object to query
        filters: Filters to apply to the query
        fields: Fields to retrieve in the query
        queries: Dictionary of queries to execute
        query_string: Raw query string to execute

    Returns:
        dict: GraphQL request data returned by Netbox

    Raises:
        RuntimeError: If required arguments are not provided
        Exception: If GraphQL query fails
    """
    nb_params = self._get_instance_params(instance)
    instance = instance or self.default_instance
    ret = Result(task=f"{self.name}:graphql", resources=[instance])

    # form graphql query(ies) payload
    if queries:
        queries_list = []
        for alias, query_data in queries.items():
            query_data["alias"] = alias
            if self.nb_version[instance] >= (4, 4, 0):
                queries_list.append(_form_query_v4(**query_data))
            else:
                raise UnsupportedNetboxVersion(
                    f"{self.name} - Netbox version {self.nb_version[instance]} is not supported, "
                    f"minimum required version is {self.compatible_ge_v4}"
                )
        queries_strings = "    ".join(queries_list)
        query = f"query {{{queries_strings}}}"
    elif obj and filters and fields:
        if self.nb_version[instance] >= (4, 4, 0):
            query = _form_query_v4(obj, filters, fields)
        else:
            raise UnsupportedNetboxVersion(
                f"{self.name} - Netbox version {self.nb_version[instance]} is not supported, "
                f"minimum required version is {self.compatible_ge_v4}"
            )
        query = f"query {{{query}}}"
    elif query_string:
        query = query_string
    else:
        raise RuntimeError(
            f"{self.name} - graphql method expects queries argument or obj, filters, "
            f"fields arguments or query_string argument provided"
        )
    payload = json.dumps({"query": query})

    # form and return dry run response
    if dry_run:
        log.info(
            f"{self.name} - GraphQL dry run, returning query payload without executing"
        )
        ret.result = {
            "url": f"{nb_params['url']}/graphql/",
            "data": payload,
            "verify": nb_params.get("ssl_verify", True),
            "headers": {
                "Content-Type": "application/json",
                "Accept": "application/json",
                "Authorization": f"Token ...{nb_params['token'][-6:]}",
            },
        }
        return ret

    # send request to Netbox GraphQL API
    log.debug(
        f"{self.name} - sending GraphQL query '{payload}' to URL '{nb_params['url']}/graphql/'"
    )
    req = requests.post(
        url=f"{nb_params['url']}/graphql/",
        headers={
            "Content-Type": "application/json",
            "Accept": "application/json",
            "Authorization": f"Token {nb_params['token']}",
        },
        data=payload,
        verify=nb_params.get("ssl_verify", True),
        timeout=(self.netbox_connect_timeout, self.netbox_read_timeout),
    )
    try:
        req.raise_for_status()
    except Exception:
        raise Exception(
            f"{self.name} -  Netbox GraphQL query failed, query '{query}', "
            f"URL '{req.url}', status-code '{req.status_code}', reason '{req.reason}', "
            f"response content '{req.text}'"
        )

    # return results
    reply = req.json()
    if reply.get("errors"):
        msg = f"{self.name} - GrapQL query error '{reply['errors']}', query '{payload}'"
        log.error(msg)
        ret.errors.append(msg)
        if reply.get("data"):
            ret.result = reply["data"]  # at least return some data
    elif queries or query_string:
        ret.result = reply["data"]
    else:
        ret.result = reply["data"][obj]

    return ret

netbox_graphql¤

Execute a paginated GraphQL query against a NetBox instance, fetching all pages in parallel.

Pages are fetched in parallel batches of up to grapqhl_max_workers concurrent requests. Results across all pages are merged into a single aggregated_data dict where list fields are extended and scalar fields are overwritten.

Parameters:

Name Type Description Default
job Job

NorFab job context.

required
instance str

Name of the NetBox instance to query.

required
query str

GraphQL query string. Must accept $offset: Int! and $limit: Int! variables to support automatic pagination.

required
variables Union[None, dict]

Optional extra GraphQL variables forwarded verbatim to the GraphQL query.

None
dry_run bool

When True, return the request parameters without executing any HTTP calls.

False
offset int

Starting pagination offset (number of records to skip before the first page).

0
limit int

Number of records per page fetched from NetBox.

50

Returns:

Type Description
Result

class:Result whose result field holds the merged GraphQL data dict.

Result

On failure failed is True and errors lists the exception messages.

Source code in norfab\workers\netbox_worker\graphql_tasks.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
@Task(fastapi={"methods": ["GET"], "schema": NetboxFastApiArgs.model_json_schema()})
def netbox_graphql(
    self,
    job: Job,
    instance: str,
    query: str,
    variables: Union[None, dict] = None,
    dry_run: bool = False,
    offset: int = 0,
    limit: int = 50,
) -> Result:
    """
    Execute a paginated GraphQL query against a NetBox instance, fetching all pages in parallel.

    Pages are fetched in parallel batches of up to ``grapqhl_max_workers`` concurrent requests.
    Results across all pages are merged into a single ``aggregated_data`` dict where list
    fields are extended and scalar fields are overwritten.

    Args:
        job: NorFab job context.
        instance: Name of the NetBox instance to query.
        query: GraphQL query string. Must accept ``$offset: Int!`` and ``$limit: Int!``
            variables to support automatic pagination.
        variables: Optional extra GraphQL variables forwarded verbatim to the GraphQL query.
        dry_run: When ``True``, return the request parameters without executing any HTTP calls.
        offset: Starting pagination offset (number of records to skip before the first page).
        limit: Number of records per page fetched from NetBox.

    Returns:
        :class:`Result` whose ``result`` field holds the merged GraphQL ``data`` dict.
        On failure ``failed`` is ``True`` and ``errors`` lists the exception messages.
    """
    nb_params = self._get_instance_params(instance)
    ret = Result(task=f"{self.name}:graphql", resources=[instance])

    if dry_run:
        ret.dry_run = True
        ret.result = {
            "url": f"{nb_params['url']}/graphql/",
            "data": json.dumps({"query": query, "variables": variables or {}}),
            "verify": nb_params.get("ssl_verify", True),
            "headers": {
                "Content-Type": "application/json",
                "Accept": "application/json",
                "Authorization": f"Token ...{nb_params['token'][-6:]}",
            },
        }
        return ret

    aggregated_data: dict[str, Any] = {}
    ssl_verify = nb_params.get("ssl_verify", True)
    nb_url = nb_params["url"]

    # paginate through all results, fetching grapqhl_max_workers pages per iteration
    while True:
        batch_offsets = [
            offset + (i * limit) for i in range(self.grapqhl_max_workers)
        ]
        pages: list[tuple[int, dict[str, Any]]] = []

        with concurrent.futures.ThreadPoolExecutor(
            max_workers=self.grapqhl_max_workers
        ) as pool:
            futures: dict[concurrent.futures.Future, int] = {
                pool.submit(
                    graphql_fetch_page,
                    nb_params["token"],
                    nb_url,
                    ssl_verify,
                    query,
                    {**variables, "offset": page_offset, "limit": limit},
                    self.netbox_connect_timeout,
                    self.netbox_read_timeout,
                    self.name,
                ): page_offset
                for page_offset in batch_offsets
            }
            for future in concurrent.futures.as_completed(futures):
                page_offset = futures[future]
                try:
                    pages.append((page_offset, future.result()))
                except Exception as exc:
                    error_msg = (
                        f"Failed to fetch page at offset {page_offset}: {exc}"
                    )
                    log.error(f"{self.name} - {error_msg}")
                    ret.errors.append(error_msg)
                    ret.failed = True

        # stop immediately if any page fetch failed — results would be incomplete
        if ret.failed:
            break

        any_data_returned = False
        has_full_page = False

        # merge pages in offset order to maintain consistent result ordering
        for _, data in sorted(pages, key=lambda item: item[0]):
            page_sizes: list[int] = []
            for key, value in data.items():
                if isinstance(value, list):
                    if value:
                        any_data_returned = True
                    aggregated_data.setdefault(key, [])
                    aggregated_data[key].extend(value)
                    page_sizes.append(len(value))
                else:
                    aggregated_data[key] = value
            # a full page means there may be more data to fetch
            if page_sizes and any(size == limit for size in page_sizes):
                has_full_page = True

        # stop when no data was returned or no page was fully filled
        if not any_data_returned or not has_full_page:
            break

        offset += self.grapqhl_max_workers * limit

    if not ret.failed:
        ret.result = aggregated_data

    return ret