Writing Logstash Configs

General Design

Logstash in Sift Security

In any Sift installation, the logstash files that do the main field transformation of incoming data will be found in /etc/sift/logstash.d/. The files in this directory follow this model:

Convention Purpose Description
10-* Input Filters 10-kafka-s3.conf is used to pull in data from the kafka topic to start transformation, typically no editing required.
20-* Pre-processing Determination of incoming message types happens here, as well as other pre-processing logic.
30-* Data Parsing Data source-specific processing rules.
40-* Post-processing Checking case, removing unwanted fields, etc.
50-* Output Filters Output transformed data, typically no editing required.

The Logstash service will load the files in lexicographical order, so the naming convention ensures that configuration files are loaded in the right order. With all files in a single directory, customers can easily start ingesting additional data sources that are already supported.

Input Filters

We rely on the input filters to label the data source correctly. There must be logic that allows us to say that a particular location holds data of a certain type, or give us some other way to identify it (such as regular expressions). The input filters add a field called “type” to the data, and that type is used by our data-source specific filters to apply the correct filter. For example, the input from our S3 bucket for CloudTrail is given a type of “cloudtrail”. That way, the other files that apply to other data sources will not attempt to transform cloudtrail records. Therefore, all data-source specific filters must begin with an if statement that checks the value of the “type” field.

For example, if you want to send your Palo Alto networks data to Sift Security via TCP port 7438, then you would configure the following input plugin in /etc/sift/logstash.d/10-tcp.conf:

input {
  tcp {
    port => 12345
    type => "panw"
  }
}

Once created, test the validity of the logstash configs using sudo -u sift-user /opt/sift/logstash -t -f /etc/sift/logstash.d/. If the tests pass, restart logstash using sudo systemctl restart sift-logstash. Logstash runs as a systemd service and its logs are accessible using sudo journalctl -u sift-logstash.

For a complete list of supported input plugins, see the official Logstash Documentation. For a list of currently supported types, see

Structure of Data Parsing files

The data parsing files should follow the convention that 30-* files start the processing of the data source. However, if supporting the data source will take more than one file (most files should be 100 lines or less and should not exceed 400 lines), then it can be broken into 30-, 31-, 32-, etc. For example, with AWS CloudTrail, the 30- file deals with the general fields available in every type of CloudTrail event, and then those values are used to branch to various 31-* files, and then a 32-* does some cleanup specific to CloudTrail.

General Guidance on Logstash Filtering

Grok

Using the “grok” plugin is expensive. Don’t use this if you can use another filter plugin. However, using grok is invaluable when transforming data via regular expressions. There are a number of grok patterns out of the box, you can find out more about them here: https://github.com/elastic/logstash/blob/v1.4.2/patterns/grok-patterns If a grok pattern fails, it will add a _grokparsefailure to the record (you can customize the tag to your choosing). Also, you can have a hierarchy of patterns to match against, and break when the first matches (this is useful to keep grok patterns simpler).

Mutate Blocks

In some cases, if you try to perform multiple transformations on a field in one mutate block, it may not work. If this is causing difficulty, try performing the transformations in 2 separate mutate blocks.

Convert (part of Mutate)

Make sure to check for the presence of the field prior to using this filter plugin. If it’s not there, logstash will error-out and ingestion will stop. For string types this looks like if “” in [field_name] , for non-strings if [fieldname]

Rename (part of Mutate)

This is a great way to just change field names, and you don’t need to check for field presence before doing it. If the field doesn’t exist, logstash will just skip that field and continue execution.

Replace (part of Mutate)

This should be used if you have 2 fields with the same value

Adding (part of Mutate)

If you add to a field that already exists, it changes the value to an array, and adds your value to the end of that. This may or may not be desired. If it is not desired to get an array like this, make sure to check if the field exists first (see above). Also, you can use the value in one field as part of the value in another using strings like “%{other_field}”. If the other field does not exist, it will put that string verbatim with the percent sign into your record, which is usually undesirable (so check if the other field exists first).

Output from Logstash Filters

In general, the author of the Logstash files should realize that the records coming out of logstash must contain field names that match what is expected in Sift’s graph mappings.

CloudHunter Logstash Configuration

AWS CloudTrail Data

General reference for the structure of a CloudTrail event is available here: http://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-event-reference-record-contents.html In the 30-aws-cloudtrail.conf file, we handle the fields from all cloudtrail events. The nested json format is flattened, and we use the “aws_event_source” and/or the “aws_event_name” field to designate what additional data filters will transform the event. As shown in the documentation from AWS, the eventSource field (which becomes aws_event_source) tells us the service that generated the event. The eventName (which becomes aws_event_name) tells us specifically what action was taken.

General CloudTrail Background

CloudTrail logs are saved in json format within compressed files to a S3 bucket. Each file could contain multiple events from all different services. Each file contains a “Records” array, which must be split into individual events, and then each event is transformed. As noted in the documentation, each event may have a “requestParameters” field and a “responseElements” field. These two fields that are handled very differently from one AWS service to another. The request parameters contains information about the request (like the request from a user on the console), while response elements contain information about the response from the AWS service. There can be a lot of fields nested in both, which contain valuable data about the entities and relationships we would like to graph.

CloudTrail Event Source Guidelines

Non-mutating events, such as List*, Get*, Describe* are typically not very interesting. These types of API calls should not be put into the graph, nor should they be used for anomaly detection. The only except to this so far is the “GetObject” event from S3, which indicates a file download. When creating a new logstash config file, the author should not spend much time to extract data from these events. The 30-aws-cloudtrail.conf will do most of the transformation needed. For any events triggered by a user, cloudtrail will indicate the IAM username and IP address linked to the event. These are always designated as the src_user and src_ip. If a service made the call, there’s a src_host instead and no src_user. When dealing with multiple event names, the author of the logstash config file should break them up into different if blocks, or at least put comments in the logstash file. Other people should be able to review the logstash config and have a good idea of what fields to expect from the events being transformed. This understanding is necessary for graph mappings.

Reconciling Request and Response Fields

Since we could have data duplicated between the request and response fields, it is helpful to consolidate this information for ingestion. Although being able to see the request and subsequent response contents can be useful, most users of Sift will only care about this if the two values are different. It’s easy enough within logstash to check if the fields are the same, and if so, just save the value to one field name. If the values are not the same, then we should save the requested value in one field, denoted with “_request” postfix, and the response is the regular field name. In some cases, there is no request field, so you can just rename the response field accordingly. The config should handle both conditions. See the example below

if "" in [aws_access_key_id] and "" in [aws_access_key_id_response] {
  mutate {
    replace => { "aws_access_key_id" => "%{aws_access_key_id_response}" }
  }
}
if ![aws_access_key_id] {
  mutate {
    rename => { "[aws_access_key_id_response]" => "aws_access_key_id" }
  }
}

Non-CloudHunter Logstash Configuration

How to add a new data source to the graph

The graph model that we use is based off of the Common Information Model defined by Splunk. This model has a number of “data models” which are categories such as “Network Traffic”, “Application State”, etc. For each one of these data models, we have a graph mapping. In order to get a new data source into the graph, you need to identify which data model(s) the records in your data belong in (where different records can have different data models they map to and a single record could belong to multiple data models). This decision is usually based on the inherent correlation between the data and data model as well as the graph edges that this data model writes. In order for your data to match into a data model, you need to add the relevant tag(s) that will get you there. Then, you need to make sure that your fields match the ones that our system expects (which are standardized by CIM)

Concise steps to add a data source

  1. Examine available data models, and decide which are relevant to your data models.
  2. Create a logstash file which does the following:
  • Uses fields in the events to add tags to match into a data source
  • Transforms fields in CIM-compliant versions
  • Adds fields that CIM and our model expects (e.g. sourcettype, pretty_sourcetype, eventtype)
  • If there are any special edges you want, make sure that the appropriate status and action-generating fields are set (e.g. in the web model, http status code is used to distinguish between successes and failures)
  1. Use that logstash to put data into the graph
  2. Examine the output to ensure entities are present and relationships make sense according to your data.

Finding sourcetypes and eventtypes

The sourcetype field contains two pieces of information about your record: where it came from and the data format that it came it (e.g. cisco:esa:textmail, cisco:esa:http). These are somewhat standardized (search the Splunk docs for your data source), but are user-defined in the end.

Since sourcetypes are usually not nice enough to display to the user, and since the user does not usually care about the format a log comes in as, we have defined an additional field named pretty_sourcetype which will be shown to the user (e.g. “Cisco ESA”, “Carbonblack”). If this is not provided, it is inferred from sourcetype.

In addition to sourcetype, CIM provides another level of detail about events called eventtype. This field usually contains strings which divide the data into natural categories (e.g. cisco_esa_email, cisco_esa_authentication).

Examples of how to set sourcetype and eventtype are found within Splunk docs and plugins such as this for windows. More sources and information are available within the splunkbase, which can be searched for your data source. Download the TA for your data source and look in the default folder, which has files that are easily readable describing eventtypes, sourcetypes and tags.

Setting action and status fields

Depending on which SIM category you are putting an event into, you will need to specify key fields in order to let the product know which edges to write to the graph. For example, a failed network traffic event will write out a failed (red) edge, whereas a success will not. For that category, this is specified by setting the action to “blocked”. If your data is already is Splunk CIM format, then there is usually nothing to do here.

If you are mapping a new source, you should go to the row that corresponds to the graph model you are using and use the suggested approach. While any of the approaches for a graph model will work, the suggestions are given in decreasing order of preference.

Graph Model If the field Has the value Will label as
All status, action fail (anywhere in value) failure
All eventtype fail (anywhere in value) failure
All eventtype modified or modify (anywhere in value) modified
All tags modified or modify (anywhere in any tag) modified
All tags fail (anywhere in any tag) failure
Application State status critical or warning failure
Authentication action fail (anywhere in value) failure
Change Management action acl_modified, cleared, created, deleted, modified, updated modified
Email, Malware action blocked, quarantined, deleted modified
Email, IDS, Malware action blocked, dropped, quarantined failure
Network Resolution (DNS) reply_code_id, reply_code 0, noerror success (otherwise failure)
Network Resolution (DNS) query_type update modified
Network Communicate, Network Session action blocked failure
Update status invalid failure
Web http_method PUT, POST, DELETE modified
Web status 4xx or 5xx (any 400 or 500 http status code) failure
Cloud (event name starts with) AssumeRole, SwitchRole, PassRole, ExitRole, ConsoleLogin, CheckMfa, Describe, Get, Head, List accessed (otherwise modified)

For Example

As an example, let us consider an example data format and walk through the process of converting this to get into the graph.

We will use Cisco IronPort emails as an example. A natural data model to use for the is the email data model. In order to match into that data model, we need to apply the email tag. This leads to a logstash configuration such as follows:

filter {
  if [type] == "cisco-ironport" {

    # add tags and metadata

      mutate {
          add_tag => ["email"]
          add_field => {
              "sourcetype" => "cisco:esa:textmail"
              "pretty_sourcetype" => "Cisco ESA"
              "eventtype" => "cisco_esa_email"
          }
      }

    # parse the original message to fields

      grok {
        break_on_match => "false"
        match => { "message" =>
            [ "%{SYSLOGTIMESTAMP:email_time}",
              "MID %{NUMBER:message_id}",
              "ICID %{NUMBER:email_icid}",
              "[Ss]ubject:? %{QUOTEDSTRING:message_subject}",
              "[Ff]rom:? <%{EMAILADDRESS:src_user}>",
              "[Tt]o:? <%{EMAILADDRESS:recipient}>"
            ]
        }
      }


        mutate {lowercase => ["src_user"]}
        mutate {lowercase => ["recipient"]}

    mutate {
      remove_field => [ "message" ]
    }
    date {
     match => [ "email_time", "MMM dd HH:mm:ss" ]
     target => "@timestamp"
    }
  }
}

An event that comes in looking like:

2015-08-18T14:35:26-05:00 esa textmaillog: Info: MID 320793005 ICID 850203967 From: <AUuG/qjrLSx+5bJvrumTSFg==_1103829160382==@in.constantcontact.com>

Will be transformed into:

{
  "action_type": "access",
  "status_type": "success",
  "email_icid": "850203967",
  "email_info": "From: <AUuG/qjrLSx+5bJvrumTSFg==_1103829160382==@in.constantcontact.com>",
  "eventtype": "cisco_esa_email",
  "ingestion_datasource": "cisco-ironport",
  "logstash_info": "{\"@timestamp\":\"2017-07-14T15:18:50.008Z\",\"@version\":\"1\",\"host\":\"alan2.dev.siftsec.com\",\"type\":\"cisco-ironport\",\"ingestion_datasource\":\"cisco-ironport\",\"tags\":[]}",
  "message_id": "320793005",
  "pretty_sourcetype": "Cisco ESA",
  "sourcetype": "cisco:esa:textmail",
  "src_user": "auug/qjrlsx+5bjvrumtsfg==_1103829160382==@in.constantcontact.com",
  "tags": [
    "email"
  ],
  "type": "cisco-ironport"
}