Datadog DLP Tutorial

Datadog is a monitoring and analytics tool for information technology (IT) and DevOps teams that can be used to determine performance metrics as well as event monitoring for infrastructure and cloud services. This tutorial demonstrates how to use the Nightfall API for scanning your Datadog logs/metrics/events.

This tutorial allows you to scan your Datadog instance using the Nightfall API/SDK.

You will need a few things first to use this tutorial:

  • A Datadog account with an API key and Application key

  • A Nightfall API key

  • An existing Nightfall Detection Rule

  • A Python 3 environment (version 3.7 or later)

  • Python Nightfall SDK

We need to install the nightfall and requests library using pip. All the other libraries we will be using are built into Python.

pip install nightfall=1.2.0
pip install requests

We will be using Python and installing/importing the following libraries:

import argparse
import csv
import json
import os
import sys
import time
import collections

import requests
from nightfall import Nightfall

Next we define the Detection Rule with which we wish to scan our data. The Detection Rule can be pre-made in the Nightfall web app and referenced by UUID.

Note, we are setting the Datadog authentication information as the below environment variables, and referencing the values from there:

  • DD_API_KEY

  • DD_APPLICATION_KEY

dd_api_key = os.environ.get('DD_API_KEY')
dd_application_key = os.environ.get('DD_APPLICATION_KEY')
nightfall_api_key = os.environ.get('NIGHTFALL_API_KEY')
detectionRuleUUID = os.environ.get('DETECTION_RULE_UUID')

Next we abstract a nightfall class from the SDK, for our API key.

nightfall = Nightfall(nightfall_api_key)

First we will set up the connection with Datadog, and get the data to be scanned from there.

The three different code sample options below are for the three different available items from Datadog to scan:

  1. logs - Scans the 100 most recent logs from Datadog.

  2. metrics - Scans all active metric tags from the last 24 hours.

  3. events - Scans all events from the last 24 hours.

Each one of these options saves the data into a data_to_scan list of tuples where the first element in the tuple is the id of the data to scan and the second element is a string of data to scan.

Please follow that same option in the next few panes:

# This will return the most recent 100 logs from Datadog.

dd_url = 'https://api.datadoghq.com/api/v2/logs/events?page[limit]=100'

dd_headers = {
    'Content-Type': 'application/json',
    'DD-API-KEY': dd_api_key,
    'DD-APPLICATION-KEY': dd_application_key
}

try:
    response = requests.get(
      url=dd_url,
      headers=dd_headers
    )

    response.raise_for_status()

except requests.HTTPError:
    msg = f"ERROR: Datadog API returned: {response.status_code}"
    sys.exit(msg)


# List of log ids and their message
data_to_scan = []
for log in response.json()['data']:
    data_to_scan.append((log['id'], log['attributes']['message']))
"""Uses Datadog API to retrieve all metric names submitted in the last 24 hours
    from datadog, then iterates over all the tags attached """

from_time = int(time.time()) - 60 * 60 * 24 * 1
dd_list_metrics_url = f"https://api.datadoghq.com/api/v1/metrics?from={from_time}"
dd_metric_metadata_url = "https://api.datadoghq.com/api/v2/metrics/{metric_name}/all-tags"

dd_headers = {
    'Content-Type': 'application/json',
    'DD-API-KEY': dd_api_key,
    'DD-APPLICATION-KEY': dd_application_key
}

try:
    response = requests.get(
        url=dd_list_metrics_url,
        headers=dd_headers
    )

    response.raise_for_status()

except requests.HTTPError:
    msg = f"ERROR: Datadog API returned: {response.status_code}"
    sys.exit(msg)

# List of metrics and their tags
data_to_scan = []
for metric_name in response.json()["metrics"]:
    try:
        response = requests.get(
            url=dd_metric_metadata_url.format(metric_name=metric_name),
            headers=dd_headers
        )

        response.raise_for_status()

    except requests.HTTPError:
        msg = f"ERROR: Datadog API returned: {response.status_code}"
        sys.exit(msg)

    json_resp = response.json()["data"]
    data_to_scan.append((metric_name, str(json_resp["attributes"]["tags"])))
"""Uses Datadog API to retrieve all events submitted in the last 24 hours
    from datadog and extracts scannable content"""

to_time = int(time.time())
from_time = to_time - 60 * 60 * 24 * 1
dd_list_events_url = f"https://api.datadoghq.com/api/v1/events"

dd_headers = {
    'Content-Type': 'application/json',
    'DD-API-KEY': dd_api_key,
    'DD-APPLICATION-KEY': dd_application_key
}

events = []
while from_time < to_time:
    dd_query = {
        'start': from_time,
        'end': to_time,
    }

    try:
        response = requests.get(
            url=dd_list_events_url,
            headers=dd_headers,
            params=dd_query,
        )

        response.raise_for_status()

    except requests.HTTPError:
        msg = f"ERROR: Datadog API returned: {response.status_code}"
        sys.exit(msg)

    dd_resp = response.json()

    events += dd_resp["events"]
    if len(dd_resp["events"]) < 1000:
        break

    from_time = events[-1]["date_happened"]

# List of event urls and their tags, titles, and texts
data_to_scan = []
for e in events:
    data_to_scan.append((e["url"], str((e["tags"], e["title"], e["text"]))))

We then run a scan on the aggregated data from using the Nightfall SDK. Since all of the examples create the same data_to_scan list, we can use the same code to scan them all.

findings, redactions = nightfall.scan_text(
    [data[1] for data in data_to_scan],
    detection_rule_uuids=[detectionRuleUUID]
)

To review the results, we will write the findings to an output csv file:

all_findings = []
all_findings.append(
    [
        'id', 'detector', 'confidence',
        'finding_start', 'finding_end', 'finding'
    ]
)

for finding_idx, findings in enumerate(findings):
    data_id = data_to_scan[finding_idx][0]

    for item in findings:
        row = [
            data_id,
            item.detector_name,
            item.confidence.value,
            item.byte_range.start,
            item.byte_range.end,
            item.finding,
        ]
        all_findings.append(row)


if len(all_findings) > 1:
    filename = "nf_datadog_output-" + str(int(time.time())) + ".csv"
    with open(filename, 'w') as output_file:
        csv_writer = csv.writer(output_file, delimiter=',')
        csv_writer.writerows(all_findings)
    print("Output findings written to", filename)

else:
    print('No sensitive data detected. Hooray!')

Note

The results of the scan will be outputted to a file named nf_datadog_output-TIMESTAMP.csv.

This example will include the full finding below. As the finding might be a piece of sensitive data, we would recommend using the Redaction feature of the Nightfall API to mask your data. More information can be seen in the 'Using Redaction to Mask Findings' section below.

Using Redaction to Mask Findings

With the Nightfall API, you are also able to redact and mask your Datadog findings. You can add a Redaction Config, as part of your Detection Rule. For more information on how to use redaction, and its specific options, please refer to the guide here.

Using the File Scanning Endpoint with Datadog

The example above is specific to the Nightfall Text Scanning API. To scan files, we can use a similar process as we did the text scanning endpoint. The process is broken down in the sections below, as the file scanning process is more intensive.

Prerequisites

To utilize the File Scanning API you need the following:

  • An active API Key authorized for file scanning passed via the header Authorization: Bearer — see Authentication and Security

  • A Nightfall Detection Policy associated with a webhook URL

  • A web server configured to listen for file scanning results (more information below)

Steps to use the Endpoint

  1. Retrieve data from Datadog

Similar to the process at the beginning of this tutorial for the text scanning endpoint, we will now initialize our and retrieve the data we like, from Datadog. This can be either logs/metrics/events. The below example will show logs:

# This will return the most recent 100 logs from Datadog.

dd_url = 'https://api.datadoghq.com/api/v2/logs/events?page[limit]=100'

dd_headers = {
    'Content-Type': 'application/json',
    'DD-API-KEY': dd_api_key,
    'DD-APPLICATION-KEY': dd_application_key
}

try:
    response = requests.get(
        url=dd_url,
        headers=dd_headers
    )

    response.raise_for_status()

except requests.HTTPError:
    msg = f"ERROR: Datadog API returned: {response.status_code}"
    sys.exit(msg)


dd_data = response.json()['data']

scan_logs = [
    ['event_id', 'message']
]
for log in dd_data:
    scan_logs.append([
        log['attributes']['id'],
        log['attributes']['message']
    ])

Now we go through write the logs to a .csv file.

filename = "nf_datadog_input-" + str(int(time.time())) + ".csv"  

with open(filename, 'w') as output_file:
  csv_writer = csv.writer(output_file, delimiter=',')
  csv_writer.writerows(scan_logs)
     
print("Datadog Logs Written to: ", filename)
  1. Begin the file upload process to the Scan API, with the above written .csv file, as shown here.

  2. Once the files have been uploaded, begin using the scan endpoint mentioned here. Note: As can be seen in the documentation, a webhook server is required for the scan endpoint, to which it will send the scanning results. An example webhook server setup can be seen here.

  3. The scanning endpoint will work asynchronously for the files uploaded, so you can monitor the webhook server to see the API responses and file scan findings as they come in.

Last updated