Snowflake is a data warehouse built on top of the Amazon Web Services or Microsoft Azure cloud infrastructure. This tutorial demonstrates how to use the Nightfall API for scanning a Snowflake database.
This tutorial allows you to scan your Snowflake databases using the Nightfall API/SDK.
You will need a few things first to use this tutorial:
A Snowflake account with at least one database
A Nightfall API key
An existing Nightfall Detection Rule
Most recent version of Python Nightfall SDK
We will first install the required Snowflake Python connector modules and the Nightfall SDK that we need to work with:
We will set the size and length limits for data allowed by the Nightfall API per request. Also, we extract our API Key, and abstract a nightfall class from the SDK, for it.
size_limit =500000length_limit =50000
Next we extract our API Key, and abstract a nightfall class from the SDK, for it.
Next we define the Detection Rule with which we wish to scan our data. The Detection Rule can be pre-made in the Nightfall web app and referenced by UUID.
First we will set up the connection with Snowflake, and get the data to be scanned from there.
Note, we are setting the Snowflake authentication information as the below environment variables, and referencing the values from there:
SNOWFLAKE_USER
SNOWFLAKE_PASSWORD
SNOWFLAKE_ACCOUNT
SNOWFLAKE_DATABASE
SNOWFLAKE_SCHEMA
SNOWFLAKE_TABLE
SNOWFLAKE_PRIMARY_KEY
connection = snowflake.connector.connect(user=os.environ.get('SNOWFLAKE_USER'), password=os.environ.get('SNOWFLAKE_PASSWORD'), account=os.environ.get('SNOWFLAKE_ACCOUNT'), schema=os.environ.get('SNOWFLAKE_SCHEMA'), database=os.environ.get('SNOWFLAKE_DATABASE'))table_name = os.environ.get('SNOWFLAKE_TABLE')primary_key = os.environ.get('SNOWFLAKE_PRIMARY_KEY')cursor= connection.cursor()sql = f""" SELECT * FROM {table_name} LIMIT 1000; """cursor.execute(sql)cols = [i[0] for i incursor.description]data =cursor.fetchall()
We can then check the data size, and as long as it is below the aforementioned limits, can be ran through the API.
If the data payloads are larger than the size or length limits of the API, extra code will be required to further chunk the data into smaller bits that are processable by the Nightfall scan API.
This can be seen in the second and third code panes below:
primary_key_col = []iflen(data)==0:raiseException('Table is empty! No data to scan.')all_findings = []for col_idx, col inenumerate(columns): payload = [str(i[col_idx])for i in data]if col == primary_key: primary_key_col = payload col_size = sys.getsizeof(payload)if col_size < size_limit: resp = nightfall.scanText( [payload], detection_rule_uuids=[detectionRuleUUID]) col_resp = json.loads(resp)for item_idx, item inenumerate(col_resp):if item !=None:for finding in item: finding['column']= coltry: finding['index']= primary_key_col[item_idx]except: finding['index']= item_idx all_findings.append(finding)
for big in big_items: item_size = sys.getsizeof(big) chunks_req = (item_size // size_limit) +1 chunk_len =len(item)// chunks_req cursor =0 item_findings = []for _ inrange(chunks_req): p = item[cursor :min(cursor + chunk_len, len(item))] resp = nightfall.scanText({"text": [[p]],"detectionRuleUUIDs": [conditionSetUUID]}) item_findings.extend(json.loads(resp.text)) cursor += chunk_lenif item_findings == []:raiseException(f"Error while scanning large item at column {col}, Index {primary_key_col[big]}")for find_chunk in item_resp:if find_chunk !=None:for finding in find_chunk: finding['column']= coltry: finding['index']= primary_key_col[big]except: finding['index']= big all_findings.append(finding)
To review the results, we will print the number of findings, and write the findings to an output file:
print(f"{len(all_findings)} sensitive findings in {os.environ.get('SNOWFLAKE_TABLE')}")withopen('snowflake_findings.json', 'w')as output_file: json.dump(all_findings, output_file)
The following are potential ways to continue building upon this service:
Writing Nightfall results to a database and reading that into a visualization tool
Redacting sensitive findings in place once they are detected, either automatically or as a follow-up script once findings have been reviewed
Using Redaction to Mask Findings
With the Nightfall API, you are also able to redact and mask your Snowflake findings. You can add a Redaction Config, as part of your Detection Rule. For more information on how to use redaction, and its specific options, please refer to the guide here.
Using the File Scanning Endpoint with Snowflake
The example above is specific to the Nightfall Text Scanning API. To scan files, we can use a similar process as we did the text scanning endpoint. The process is broken down into the sections below, as the file scanning process is more intensive.
Prerequisites
To utilize the File Scanning API you need the following:
An active API Key authorized for file scanning passed via the header Authorization: Bearer — see Authentication and Security
A Nightfall Detection Policy associated with a webhook URL
A web server configured to listen for file scanning results (more information below)
Steps to use the Endpoint
Retrieve data from Snowflake
Similar to the process in the beginning of this tutorial for the text scanning endpoint, we will now initialize our Snowflake Connection. Once the session is established, we can query from Snowflake.
connection = snowflake.connector.connect( user=os.environ.get('SNOWFLAKE_USER'), password=os.environ.get('SNOWFLAKE_PASSWORD'), account=os.environ.get('SNOWFLAKE_ACCOUNT'), schema=os.environ.get('SNOWFLAKE_SCHEMA'), database=os.environ.get('SNOWFLAKE_DATABASE'))table_name = os.environ.get('SNOWFLAKE_TABLE')primary_key = os.environ.get('SNOWFLAKE_PRIMARY_KEY')cursor = connection.cursor()sql =f""" SELECT * FROM {table_name} LIMIT 1000; """cursor.execute(sql)cols = [i[0]for i in cursor.description]data = cursor.fetchall()
Now we go through the data and write to a .csv file.
primary_key_col = []iflen(data)==0:raiseException('Table is empty! No data to scan.')filename ="nf_snowflake_input-"+str(int(time.time()))+".csv"for col_idx, col inenumerate(columns): payload = [str(i[col_idx])for i in data] withopen(filename, 'w')as output_file: csv_writer = csv.writer(output_file, delimiter=',') csv_writer.writerows(payload)print("Snowflake Data Written to: ", filename)
Begin the file upload process to the Scan API, with the above written .csv file, as shown here.
Once the files have been uploaded, begin using the scan endpoint mentioned here. Note: As can be seen in the documentation, a webhook server is required for the scan endpoint, to which it will send the scanning results. An example webhook server setup can be seen here.
The scanning endpoint will work asynchronously for the files uploaded, so you can monitor the webhook server to see the API responses and file scan findings as they come in.