Last updated:
0 purchases
panthersdk 0.0.24
Panther SDK
The Panther SDK module allows you to configure detections for your Panther instance.
Getting Started
Install
The Panther SDK can be installed using PIP.
pip3 install panther-sdk==0.0.24
Writing a detection
You can use the detection module in Panther SDK to configure detections. The example below shows a detection that processing an HTTP log.
Assuming we have a log type Internal.HTTP.Traffic that has the following shape:
{
"method": "POST",
"useSSL": false,
"path": "/api/some/endpoint",
"host": "internal.megacorp.com"
}
We can write a detection to check for insecure POST and PUT requests:
# import the "detection" module, as we'll be creating a rule.
from panther_sdk import detection
# The next four functions are used in the detection definition that follows. Panther
# detections allow for parts of a detection to be defined with custom Python code.
# "filter_insecure" is an example of python function that will be used to create
# a detection.PythonFilter that only matches insecure events. The event argument will
# have a python dict object corresponding to the JSON event.
def filter_insecure(event, params):
return event["useSSL"] == False
# "filter_http_methods" is a slightly more complicated python function that we will use
# to create a detection.PythonFilter later. This function uses the second "params" argument
# to make the function's logic reusable.
def filter_http_methods(event, params):
allowed_methods = params["methods"]
actual_method = event["method"]
return actual_method in allowed_methods
# "reference_generator" is another example where we will use some python code to dynamically
# change the information Panther presents on an Alert resulting from a Detection match. In
# this example, we're populating a query param with some relevant information from
# the contextual event.
def reference_generator(event):
origin = event["origin"]
return f"https://wiki.internal.megacorp.com/hosts_db?host={origin}"
# "make_context" will be used to attach arbitrary data to the resulting Alert. This
# can be used to simply change how data is presented or be used to push machine readable information
# to a downstream Alert Destination.
def make_context(event):
# The Panther SDK's hooks to arbitrary python come with some restrictions. When the
# detection is saved, the Panther backend will capture the source of provided function to use
# in realtime log data processing and other backend processes. Because of this, functions used
# to define a detection must not have references that are not local to the function body. Therefore,
# imports targeting any of the below _must_ be included in the function body:
#
# - The python standard library
# - Libraries that are enabled on your Panther instance
# - Panther "Global Helpers" that you have configured on your instance
import fnmatch
path = event["path"]
return {
"is_api_path": fnmatch.fnmatch(path, "/api/*")
}
# "pick_severity" is a custom function we are using to define the severity of Alerts based on
# the data present in the current log. The detection.Rule declaration below shows how this method
# is registered with the definition of our Rule.
def pick_severity(event) -> str:
from panther_sdk import detection # required. see notes in comment above "make_context"
if event["origin"] != "internal.megacorp.com":
return detection.SeverityInfo
return detection.SeverityInfo
# Declare a rule. Every call to "detection.Rule" in your repo will create a Rule in the Panther backend.
# This example uses a subset of the fields available to define a Rule. See "Full Dataclass API" for
# all the available fields.
detection.Rule(
# Give the Rule an id. This name must be unique on your Panther
# instance. Dot separated names following "namespace" pattern is recommended.
rule_id="Internal.HTTP.Traffic.Insecure",
# Pick a human friendly name for the rule. This will be used
# to present the Rule in the Panther Console UI
name="Detect insecure internal HTTP traffic",
# Specify one or more log types for your Rule. This setting is what
# tells the Panther backend to run this Rule against the log data depicted above.
log_types=["Internal.HTTP.Traffic"],
# Specify an enabled state for the Rule. Detections can be uploaded in a
# disabled state and will not begin processing log data until enabled.
enabled=True,
# Optionally, you can specify a list of tags to associate with the detection.
tags=["internal"],
# The "filters" list defines the sequence of matching logic that an event
# will be tested against to determine whether there is a match. Returning "True" proceeds to the
# next check. If the final check returns "True", there is a match.
filters=[
# If the HTTP transaction was secure, we're not interested in that log in this detection.
# Therefore, the first filter uses the "filter_insecure" function we defined to filter to only
# events that have useSSL set to "false".
detection.PythonFilter(func=filter_insecure),
# For now, we're only interested in getting alerted for insecure POST and PUT requests.
# We therefore define the second filter step using "filter_http_methods". Unlike "filter_insecure",
# we pass some parameters to this function.
detection.PythonFilter(func=filter_http_methods, params={'methods': ["POST", "PUT"]}),
],
# On this line, we're defining the severity that should be associated with any resulting alerts.
# We have the option of defining a static severity (simply: severity="INFO") but, in this case, we want
# to make the severity dynamic based on data from the event. To do this, we reference the "pick_severity"
# function we defined at the top of the file. We also provide a fallback value in case the dynamic function
# cannot be processed. This fallback will also be the Severity value displayed in the Panther Console UI
severity=detection.DynamicStringField(
func=pick_severity,
fallback=detection.SeverityInfo,
),
# Below we specify the "reference" field. This optional field is used to attach a link to any Alerts
# resulting from this detection. Similar to "severity" we can make this field dynamic based on
# event data. There is also a fallback value specified.
reference=detection.DynamicStringField(
func=reference_generator,
fallback="https://wiki.internal.megacorp.com/hosts_db",
),
# Fields like "runbook" and "description" are also available to further customize how any
# resulting Alerts will be displayed. These can have dynamic handlers defined, but in this
# example static string values will work just fine.
runbook="Optional runbook content",
description="A helpful description",
# Finally, we use the optional "alert_context" field to tell the Panther backend to use our
# "make_context" function to generate custom data that will be attached to any resulting Alerts.
alert_context=make_context,
)
There is support for override behavior that can be very helpful for writing factory functions for your detections.
You can provide defaults for your detection while letting customizations be set when the function is called. For example:
from panther_sdk import detection
# This is an example of a factory rule function that might live in panther_detections or your own library of detections.
# This function accepts an optional overrides object, which is passed through to the Rule instantiation. Any attributes
# set on the RuleOverrides object will override the attributes defined for the rule.
def factory_rule_func(overrides: Optional[detection.RuleOverrides] = None) -> detection.Rule:
return detection.Rule(
rule_id="default.rule.id",
filters=[...],
log_types=["AWS.ALB"],
severity=detection.SeverityMedium,
overrides=overrides,
)
# Here is an example of using the rule factory function.
def make_detections() -> None:
# We can pass no overrides at all, and it will be the default rule the function creates
r = factory_rule_func()
print(r)
# We can override the id and severity to create a more customized rule using our function from above
r = factory_rule_func(overrides=detection.RuleOverrides(
rule_id="not.important.rule",
severity=detection.SeverityInfo,
))
print(r)
r = factory_rule_func(overrides=detection.RuleOverrides(
rule_id="very.important.rule",
severity=detection.SeverityHigh,
))
print(r)
# Now we call our main detection making function to execute it when running this python script
make_detections()
Similar to overrides, you have the ability to add extensions to your Panther content.
These can be used the same way as overrides on fields that have lists.
from panther_sdk import detection
# This is an example of a factory rule function that might live in panther_detections or your own library of detections.
# This function accepts an optional extensions object, which is passed through to the Rule instantiation. Any attributes
# set on the RuleExtensions object will override the attributes defined for the rule.
def factory_rule_func(extensions: Optional[detection.RuleExtensions] = None) -> detection.Rule:
return detection.Rule(
rule_id="default.rule.id",
filters=[...],
log_types=["AWS.ALB"],
severity=detection.SeverityMedium,
extensions=extensions,
)
# Here is an example of using the rule factory function.
def make_detections() -> None:
# We can pass no extensions at all, and it will be the default rule the function creates
r = factory_rule_func()
print(r)
# We can extend filters to add logic to the rule
r = factory_rule_func(extensions=detection.RuleExtensions(
filters=[detection.PythonFilter(func=my_very_important_filter)],
))
print(r)
# Now we call our main detection making function to execute it when running this python script
make_detections()
Full Dataclass API
detection module
DynamicStringField
Make a field dynamic based on the detection input
Field
Description
Type
fallback
Fallback value in case the dynamic handler fails
str
func
Dynamic handler
Optional[Callable[[PantherEvent], str]]
DynamicDestinations
Make destinations dynamic based on the detection input
Field
Description
Type
fallback
Fallback value in case the dynamic handler fails
Optional[List[str]]
func
Dynamic handler
Callable[[PantherEvent], List[str]]
AlertGrouping
Configuration for how an alert is grouped
Field
Description
Type
period_minutes
How long should matches be grouped into an alert after the first match
int
group_by
Function to generate a key for grouping matches
Optional[Callable[[PantherEvent], str]]
PythonFilter
Create a filter by referencing a python function
Field
Description
Type
func
Provide a function whose python source will be used as the filter definition
Callable[[PantherEvent], bool]
UnitTestMock
Mock for a unit test
Field
Description
Type
name
name of the object to mock
str
return_value
string to assign as the return value for the mock
str
JSONUnitTest
Unit test with json content
Field
Description
Type
name
name of the unit test
str
expect_match
whether the data should match and trigger an alert
bool
data
json string
str
mocks
list of mocks to use in the test
Optional[List[UnitTestMock]]
Policy
Define a Policy-type detection to execute against log data in your Panther instance
Field
Description
Type
policy_id
ID for the Policy
str
ignore_patterns
Patterns of resource ids to ignore for the policy
Optional[Union[str, List[str]]]
destinations
Alert destinations for the policy
Optional[Union[str, List[str], DynamicDestinations]]
filters
Define event filters for the policy
Union[Union[PythonFilter], List[Union[PythonFilter]]]
enabled
Whether the policy is enabled or not
bool
resource_types
What resource types this policy will apply to
Union[str, List[str]]
severity
What severity alerts generated from this policy get assigned
Union[str, DynamicStringField]
name
What name to display in the UI and alerts. The PolicyID will be displayed if this field is not set.
Optional[str]
description
Description for the policy
Optional[Union[str, DynamicStringField]]
reference
The reason this policy exists, often a link to documentation
Optional[Union[str, DynamicStringField]]
reports
A mapping of framework or report names to values this policy covers for that framework
Optional[Dict[str, List[str]]]
runbook
The actions to be carried out if this policy fails, often a link to documentation
Optional[Union[str, DynamicStringField]]
tags
Tags used to categorize this policy
Optional[Union[str, List[str]]]
unit_tests
Unit tests for this policy
Optional[Union[Union[JSONUnitTest], List[Union[JSONUnitTest]]]]
alert_title
Title to use in the alert
Optional[Callable[[PantherEvent], str]]
alert_context
Optional JSON to attach to alerts generated by this policy
Optional[Callable[[PantherEvent], Dict[str, Any]]]
alert_grouping
Configuration for how an alert is grouped
Optional[AlertGrouping]
Rule
Define a Rule-type detection to execute against log data in your Panther instance
Field
Description
Type
rule_id
ID for the rule
str
severity
Severity for the rule
Union[str, DynamicStringField]
threshold
Number of matches received before an alert is triggered
int
name
Display name for the rule
Optional[str]
log_types
Log Types to associate with this rule
Union[str, List[str]]
filters
Define event filters for the rule
Union[Union[PythonFilter], List[Union[PythonFilter]]]
enabled
Whether the rule is enabled or not
bool
unit_tests
Define event filters for the rule
Optional[Union[Union[JSONUnitTest], List[Union[JSONUnitTest]]]]
tags
Tags for the rule
Optional[Union[str, List[str]]]
reference
Reference for the rule
Optional[Union[str, DynamicStringField]]
runbook
Runbook for the rule
Optional[Union[str, DynamicStringField]]
description
Description for the rule
Optional[Union[str, DynamicStringField]]
summary_attrs
Summary Attributes for the rule
Optional[List[str]]
reports
Report mappings for the rule
Optional[Dict[str, List[str]]]
destinations
Alert destinations for the rule
Optional[Union[str, List[str], DynamicDestinations]]
alert_title
Title to use in the alert
Optional[Callable[[PantherEvent], str]]
alert_context
Optional JSON to attach to alerts generated by this rule
Optional[Callable[[PantherEvent], Dict[str, Any]]]
alert_grouping
Configuration for how an alert is grouped
Optional[AlertGrouping]
ScheduledRule
Define a ScheduledRule-type detection to execute against query results in your Panther instance
Field
Description
Type
rule_id
ID for the scheduled rule
str
severity
What severity alerts generated from this scheduled rule get assigned
Union[str, DynamicStringField]
threshold
Number of matches received before an alert is triggered
int
name
Display name for the scheduled rule
Optional[str]
scheduled_queries
Scheduled queries to associate with this scheduled rule
Union[str, List[str]]
filters
Define event filters for the scheduled rule
Union[Union[PythonFilter], List[Union[PythonFilter]]]
enabled
Short description for the scheduled rule
bool
unit_tests
Define event filters for the scheduled rule
Optional[Union[Union[JSONUnitTest], List[Union[JSONUnitTest]]]]
tags
Tags for the scheduled rule
Optional[Union[str, List[str]]]
reference
Reference for the scheduled rule
Optional[Union[str, DynamicStringField]]
runbook
Runbook for the scheduled rule
Optional[Union[str, DynamicStringField]]
description
Description for the scheduled rule
Optional[Union[str, DynamicStringField]]
summary_attrs
Summary Attributes for the scheduled rule
Optional[List[str]]
reports
Report mappings for the scheduled rule
Optional[Dict[str, List[str]]]
destinations
Alert destinations for the scheduled rule
Optional[Union[str, List[str], DynamicDestinations]]
alert_title
Title to use in the alert
Optional[Callable[[PantherEvent], str]]
alert_context
Optional JSON to attach to alerts generated by this rule
Optional[Callable[[PantherEvent], Dict[str, Any]]]
alert_grouping
Configuration for how an alert is grouped
Optional[AlertGrouping]
query module
CronSchedule
Cron expression based schedule definition for a query
Field
Description
Type
expression
Defines how often queries using this schedule run
str
timeout_minutes
Defines the timeout applied to queries with this schedule
int
IntervalSchedule
Interval based schedule definition for a query
Field
Description
Type
rate_minutes
Defines how often queries using this schedule run
int
timeout_minutes
Defines the timeout applied to queries with this schedule
int
Query
A saved or scheduled query
Field
Description
Type
name
Unique name for the query
str
description
Short description for the query
str
default_database
Default database for the query
str
sql
SQL statement
str
enabled
Whether the query is enabled or not
bool
tags
Tags for the query
Optional[Union[str, List[str]]]
schedule
Schedule attached to the query
Optional[Union[IntervalSchedule, CronSchedule]]
schema module
DataModelMapping
Field
Description
Type
name
Name of the data model field. This will be the name used when accessing the field from within detections.
str
path
Path to the target value in the Panther Event. This can be a simple field name or complete JSON path starting with a $. JSON path syntax must be compatible with the jsonpath-ng Python package.
Optional[str]
func
A Python function to access the target value. The input is the Panther Event and output is the target value in the Panther Event.
Optional[Callable[[PantherEvent], Any]]
DataModel
Data Models provide a way to configure a set of unified fields across all log types.
Field
Description
Type
data_model_id
The unique identifier of the data model.
str
name
What name to display in the UI and alerts. The data_model_id will be displayed if this field is not set.
Optional[str]
enabled
Whether this data model is enabled.
Optional[bool]
log_type
What log type this data model will apply to.
str
mappings
Mapping from source field name or method to unified data model field name.
Union[DataModelMapping, List[DataModelMapping]]
For personal and professional use. You cannot resell or redistribute these repositories in their original state.
There are no reviews.