Nov 14, 2015

Process NetFlow with nProbe and Elasticsearch, Logstash, and Kibana - Part 3

Part 1: http://blog.sysadmin.live/2015/11/process-netflow-with-nprobe-and.html
Part 2: http://blog.sysadmin.live/2015/11/process-netflow-with-nprobe-and_13.html

Customize nProbe and Logstash configuration

Overview

In the previous part, we have created a basic visualization and a dashboard in Kibana for NetFlow data; however, do we really need all NetFlow fields? NetFlow v9 has more than 50 field types, so it is better if we export only meaningful fields.

As Logstash received NetFlow fields from nProbe, we can configure it to process those data and add more fields or tags to analyze our network traffic better

Configure nProbe to export only significant NetFlow fields

We can run nProbe with option -T followed by a template to export only the fields we are interested in. The following template is a good start
-T "%IPV4_SRC_ADDR %L4_SRC_PORT %IPV4_DST_ADDR %L4_DST_PORT %IN_PKTS %IN_BYTES %OUT_PKTS %OUT_BYTES %SRC_MASK %DST_MASK %IN_SRC_MAC %OUT_DST_MAC %L7_PROTO_NAME %PROTOCOL_MAP %PROTOCOL"
Note: %IN_SRC_MAC %OUT_DST_MAC only shows data when we run nProbe with a mirrored port.
We can start nProbe with a template by running
nprobe.exe /c -b 1 -V 9 --collector-port 2055 -i none -n none --json-label --tcp 127.0.0.1:5544 -T "%IPV4_SRC_ADDR %L4_SRC_PORT %IPV4_DST_ADDR %L4_DST_PORT %IN_PKTS %IN_BYTES %OUT_PKTS %OUT_BYTES %SRC_MASK %DST_MASK %IN_SRC_MAC %OUT_DST_MAC %L7_PROTO_NAME %PROTOCOL_MAP %PROTOCOL"
We should now see those fields in Kibana Discover
New NetFlow fields



Customize Logstash configuration

In the beginning, our logstash.conf is very simple
input {
  tcp {
    port => 5544
  }
}

filter {
  json {
    source => "message"
  }
}

output {
  stdout {
    codec => rubydebug
  }

elasticsearch {
  hosts => ["localhost:9200"]
  index => "netflow-%{+YYYY.MM.dd}"
  }
}

The input block tells Logstash which protocol and port it should run for incoming traffic
input {
  tcp {
    port => 5544
  }
}

The filter block is where Logstash manipulate the data it received before sending to Elasticsearch.
source => "message" extracts JSON fields from the message fields. Without this filter, all JSON fields are stored in a single message field, which prevents us from searching by fields or creating graphs in Kibana.
filter {
  json {
    source => "message"
  }
}

The output block tells Logstash where it should send processed data to. In this configuration, we both want to send data to console for debugging and to Elasticsearch via http on port 9200.
output {
  stdout {
    codec => rubydebug
  }

elasticsearch {
  hosts => ["localhost:9200"]
  index => "netflow-%{+YYYY.MM.dd}"
  }
}

index => "netflow-%{+YYYY.MM.dd}" defines the format of the index name in Elasticsearch. Here we create indices daily such as netflow-2015.11.14
To create index weekly or monthly, we can change the format to
index => "netflow-%{+YYYY.MM}
or
index => "netflow-%{+GGGG.WW}

Logstash must be restarted after any configuration change.

Remove redundant fields

Let's take a look at our Kibana Discover
Kibana Discover 
We can easily see that all the fields like IN_BYTES, IPV4_SRC_ADDR are extracted from the message field. Why would we want to store the same data twice? Instead, we can keep all fields except for message. To do this, we can add a mutate filter to Logstash filter block
filter {
  json {
    source => "message"
  }
  

  # Remove redundant fields

  mutate {
    remove_field => [ "message" ]
  }
}


We can remove multiple fields separated by a comma like [ "message", "@version" ]. This mutate block must be the last lines in the filter block.

Save the configuration, restart Logstash, and take a look at Kibana again
No more "message" field

Create network tags based on IP

Apply network labels to the IPs in our network is a helpful way to identify "strangers" or perhaps to distinguish between internal and external traffic. Multiple tags can be added to the same event since we have source IP and destination IP for each flow.

To do this, we need to install Logstash cidr plugin (https://www.elastic.co/guide/en/logstash/current/plugins-filters-cidr.html)

With Command Prompt at logstash\bin, run
plugin install logstash-filter-cidr
Add additional lines to filter block and restart Logstash

filter {
  json {
    source => "message"
  }

  # Create network tags based on IP

  cidr {
    add_tag => [ "ip-src-PrivateIP" ]
    address => [ "%{IPV4_SRC_ADDR}" ]
    network => [ "10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16" ]
  }   

  cidr {
    add_tag => [ "ip-dst-PrivateIP" ]
    address => [ "%{IPV4_DST_ADDR}" ]
    network => [ "10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16" ]
  }

  cidr {
    add_tag => [ "ip-webserver" ]
    address => [ "%{IPV4_SRC_ADDR}", "%{IPV4_DST_ADDR}" ]
    network => [ "172.16.0.0/24" ]
  }

  cidr {
    add_tag => [ "ip-database" ]
    address => [ "%{IPV4_SRC_ADDR}", "%{IPV4_DST_ADDR}" ]
    network => [ "10.0.0.0/24" ]
  }

  cidr {
    add_tag => [ "ip-workstation" ]
    address => [ "%{IPV4_SRC_ADDR}", "%{IPV4_DST_ADDR}" ]
    network => [ "192.168.1.0/24" ]
  }

  # Remove redundant fields 
  mutate {
    remove_field => [ "message" ]
  }
}

Those tags can be used as filters to see only the traffic we need

Summary

This is just a basic setup for Logstash, visit https://www.elastic.co/guide/en/logstash/current/index.html for more advanced configurations

In the next part, I will show you how to create geographic information fields based on IP and create a tile map for your traffic in Kibana. We will use Logstash geoip plugin and get our hands on the first Elasticsearch index template.

Part 4: Map User Location within ELK stack

No comments:

Post a Comment