Packet Capture with Wireshark and Elasticsearch
Network packet capture and analysis are commonly done with tools like tcpdump, snort, and Wireshark. These tools provide the capability to capture packets live from networks and store the captures in PCAP files for later analysis. A much better way to store packets is to index them in Elasticsearch where you can easily search for packets based on any combination of packet fields.
The Wireshark command line application tshark works much like tcpdump with the added capabilities to recognize a wide range of protocols as well as output captured packets in JSON format. This output capability makes it a natural to use with Elasticsearch. I developed the Python application Espcap to take advantage of this capability to ciphon off JSON packets from tshark then send them to Elasticsearch to be indexed. This article discusses the design and usage of Espcap.
Packet Capture with Tshark
tshark supports numerous options that control how it captures and handles packets. The options we are interested in using for Espcap include:
-i <interface>
– The network interface from where packets will be captured.-T ek
– Output packet contents in Elasticsearch compatible JSON format.-c <count>
– The number of packets to capture, if omitted capture packets indefinitely.
Following one or more of these options on the command line, you can add a packet filter expression to capture specific packets. For example, to get all TCP packets use the expression tcp
or to capture DNS packets the expression is udp port 53
.
Here are some examples of how to use tshark to do various captures. All of the examples assume formatting output packets as Elasticsearch compatible JSON and running on MacOS with network interface en0
. If running on Linux, you would use eth0
instead.
-
All outbound and inbound HTTPs packets.
tshark -i en0 -T ek tcp port 443
-
All outbound HTTPs packets.
tshark -i en0 -T ek tcp dst port 443
-
All outbound and inbound ICMP packets:
tshark -i eth0 -T ek icmp
-
All inbound DNS packets:
tshark -i eth0 -T ek udp src port 53
Espcap Application Structure
Application Modules
Espcap is organized into these four modules representing the functional areas of the application:
tshark.py
– Wrapper API for tshark functionalityindexer.py
– Indexes packets in Elasticsearch or prints the packetsespcap.py
– Main program that accepts program arguments and initiates packet capture
Tshark API Wrapper
The Tshark API wrapper class is designed to invoke tshark in a separate process, then pipe its output to the caller. The path to the tshark executable is set in the espcap.yml file. Taking a slight detour here and jumping ahead a bit, the Espcap project is maintained in a Github repo that includes a config directory which contains espcap.py.
Class Initialization
The Tshark class includes the_config_paths
list which contains all the possible paths to espcap.yml and the _command
list, the first member of which is set to the tshark path in the init() method.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import signal
import subprocess
import sys
import os
import yaml
import json
class Tshark(object):
def __init__(self):
_config_paths = ['espcap.yml','../config/espcap.yml','/etc/espcap/espcap.yml']
_command = list()
config = `None`
for config_path in self._config_paths:
if os.path.isfile(config_path):
with open(config_path, 'r') as ymlconfig:
config = yaml.load(ymlconfig, Loader=yaml.FullLoader)
self._command.append(config['tshark_path'])
ymlconfig.close()
return
print("Could not find configuration file")
sys.exit(1)
If espcap.yml cannot be found, the application exits. When in doubt, just put espcap.yml in the same location and the Espcap program files. More on that later in the Installation section.
Command Construction
The commands supported by Espcap include live packet capture, capture from PCAP files, and listing all possible network interfaces. The following tshark command lines handle each of these use cases, respectively:
tshark -T ek -i <interface> [-c <count>] [packet filter]
tshark -T ek -r <PCAP file>
tshark -D
When count is 0 the -c <count>
part will be omitted from the command and tshark run indefinitely. Also, if the bpf argument absent, the [packet filter]
part of the command is left out and tshark captures all packets.
tshark commands are constructed with the mark_command() method:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def make_command(self, nic, count, bpf, pcap_file, interfaces):
command = self._command
if interfaces is True:
command.append('-D')
return command
command.append('-T')
command.append('ek')
if nic is not None:
command.append('-i')
command.append(nic)
if count != 0:
command.append('-c')
command.append(str(count))
if bpf is not None:
elements = bpf.split()
for element in elements:
command.append(element)
if pcap_file is not None:
command.append('-r')
command.append(pcap_file)
return command
If the tshark command line option -D
is used, it is append to the command and the make_commands() returns the command string immediately. Otherwise, all other arguments are appended to the command then it is returned. make_command() does not check to see whether the mutually exclusive nic and pcap_file are both not equal to None
since that situation is prevented by application earlir in the program flow.
Packet Capture Generators
The capture() method is written as a Python generator, which captures one packet at a time then yields each JSON formatted packet back to the caller. The command argument contains the full tshark command which must be constructed by the caller with make_command().
1
2
3
4
5
6
7
8
9
10
11
12
13
def capture(self, command):
global closing
with subprocess.Popen(command, stdout=subprocess.PIPE, bufsize=1) as proc:
for packet in proc.stdout:
packet = self._drop_index_line(packet)
if packet is None:
continue
else:
yield json.loads(packet)
if closing is True:
print('Capture interrupted')
sys.exit()
The tshark command is invoked in a separate process in line 3 with a call to subprocess.Popen() specifying that the stdout of the process will be piped back to the capture() method and each packet received by iterating over proc.stdout.
Output from tshark with the -T ek option for each packet contains two lines, one that represents an Elasticsearch index
command and the other containing the packet JSON. Here is an example of the packet structure with the packet fields omitted for sake of brevity:
{"index":{"_index":"packets-2019-05-01","_type":"pcap_file"}}
{"timestamp":"1556728888362","layers":{ <packet fields> }}
The first line in each response can be dropped because the application will use the Elasticsearch Python Client API to create the bulk index commands. _drop_index_line() is called in line 5 to take care of removing the index command lines. It returns None
if an index command was dropped or the packet contents if not. Each packet JSON string is converted to a JSON dictionary object then returned in lines 8 and it 9.
1
2
3
4
5
6
def _drop_index_line(self, line):
decoded_line = line.decode().rstrip('\n')
if decoded_line.startswith('{\"index\":') is True:
return None
else:
return decoded_line
List Network Interfaces
The list_interfaces() method simply runs the tshark -D
command then prints each of the interfaces it gets back.
1
2
3
4
def list_interfaces(self, command):
with subprocess.Popen(command, stdout=subprocess.PIPE, bufsize=1) as proc:
for interface in proc.stdout:
print(interface.decode().rstrip('\n'))
Handling Interrupts
If the packet capture is interrupted somehow, the _exit_gracefully() function is called to set the global closing flag to True
. This flag is checked after the current packet construction is done. If the flag is True
, the application is exited.
1
2
3
4
5
closing = False
def _exit_gracefully(signum, frame):
global closing
closing = True
The set_interrupt_handlers() sets this function as the interrupt handler.
1
2
3
def set_interrupt_handler(self):
signal.signal(signal.SIGTERM, _exit_gracefully)
signal.signal(signal.SIGINT, _exit_gracefully)
Packet Indexing and Display
When indexing packets in Elasticsearch, a new index is created every day. The index naming format is packets-yyyy-mm-dd. Note that Elasticsearch 7 and later only allows one _type field per index. Index IDs are automatically assigned by Elasticsearch.
Elasticsearch compatible JSON packet dictionaries are handled with two functions: index_packet() to index them in Elasticsearch and dump_packets() to print packets to stdout. index_packets() is another generator function that builds and returns action JSON objects to the Elasticsearch Python Client API helper function to bulk index packets in Elasticsearch. See the Main Application section for more details.
1
2
3
4
5
6
7
8
9
10
11
from datetime import datetime
def index_packets(self, capture, pcap_file):
for packet in capture:
timestamp = int(packet['timestamp'])/1000
action = {
'_op_type': 'index',
'_index': 'packets-' + datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d'),
'_source': packet
}
yield action
The time stamp for for the index name is obtained from the packet JSON dictionary in line 6. Action objects consist of these fields:
_opt_type
– Elasticsearch operation to perform,index
in this case._index
– Name of the index. The naming convention ispackets-yyyy-mm-dd
._type
– The index type,espcap
for both live and file capture._source
– The JSON body of the packet.
dump_packets() works in a similar fashion as index_packets() by obtaining each packet from a capture object, adding the time stamp to the display, and enumerating each packet as it is printed out.
1
2
3
4
5
6
7
8
def dump_packets(self, capture):
packet_no = 1
for packet in capture:
timestamp = int(packet['timestamp'])/1000
print('Packet no.', packet_no)
print('* packet time stamp -', datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S'))
print('* payload - ', packet)
packet_no += 1
Main Application
Process Command Line Options
The main application sorts through command line the arguments and sets up the tshark command modes. The imports pulls all the required modules, including the tshark and indexer discussed earlier and the Elasticsearch modules, including the Python Client API.
1
2
3
4
5
6
7
8
9
10
import syslog
import os
import sys
import click
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from tshark import Tshark
from indexer import index_packets, dump_packets
The click module is a convenient module that handles the numerous command line options.
1
2
3
4
5
6
7
8
9
@click.command()
@click.option('--node', default=None, help='Elasticsearch IP and port (default=None, dump packets to stdout)')
@click.option('--nic', default=None, help='Network interface for live capture (default=None, if file or dir specified)')
@click.option('--file', default=None, help='PCAP file for file capture (default=None, if nic specified)')
@click.option('--dir', default=None, help='PCAP directory for multiple file capture (default=None, if nic specified)')
@click.option('--bpf', default=None, help='Packet filter for live capture (default=all packets)')
@click.option('--chunk', default=1000, help='Number of packets to bulk index (default=1000)')
@click.option('--count', default=0, help='Number of packets to capture during live capture (default=0, capture indefinitely)')
@click.option('--list', is_flag=True, help='Lists the network interfaces')
Now for the main() function, which accepts all the command line arguments from click:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def main(node, nic, file, dir, bpf, chunk, count, list):
try:
tshark = Tshark()
tshark.set_interrupt_handler()
es = None
if node is not None:
es = Elasticsearch(node)
if list:
command = tshark.make_command(nic=None, count=0, bpf=None, pcap_file=None, interfaces=True)
tshark.list_interfaces(command)
sys.exit(0)
if nic is None and file is None and dir is None:
print('You must specify either file or live capture')
sys.exit(1)
if nic is not None and (file is not None or dir is not None):
print('You cannot specify file and live capture at the same time')
sys.exit(1)
syslog.syslog("espcap started")
if nic is not None:
init_live_capture(es=es, tshark=tshark, nic=nic, bpf=bpf, chunk=chunk, count=count)
elif file is not None:
pcap_files = []
pcap_files.append(file)
init_file_capture(es=es, tshark=tshark, pcap_files=pcap_files, chunk=chunk)
elif dir is not None:
pcap_files = []
files = os.listdir(dir)
files.sort()
for file in files:
pcap_files.append(dir+'/'+file)
init_file_capture(es=es, tshark=tshark, pcap_files=pcap_files, chunk=chunk)
except Exception as e:
print('[ERROR] ', e)
syslog.syslog(syslog.LOG_ERR, e)
sys.exit(1)
[Lines 3-8] Create the Tshark object and set the interrupt handlers. If the node
argument is set to an Elasticsearch IP and port, create an Elasticsearch client object.
[Lines 10-13] If the list of network interfaces has been requested, create the tshark command specifying all the arguments to make_command() tp be None
except the interfaces argument which is set to True
. Call list_interfaces() with the command then exit the appliction when done.
[Lines 15-21] Check to the input arguments to make sure either a network interface, file capture, or a directory of files has been specified. If all of thee arguments are None
, then print an error indicating that one of them must be set then exit the application.
[Lines 25-39] Initiate live capture, single file capture, or multiple file capture depending on this is specified. In the case of multiple file capture, build a list containing names of all the PCAP files in a given directory, then pass it to init_file_capture(). When single file capture is specified, the list contains just the one file.
The last bit of code just calls the main() function and reports when the application is done.
1
2
3
if __name__ == '__main__':
main()
print('Done')
Initiate Packet Capture
Packet capture is initiated with two functions, init_live_capture() and init_file_capture(). The first function creates a live capture session with a call to Tshark.live_capture() given a network interface, packet filter, and packet count. To print out the packets, we just pass the capture
object to dump_packets().
1
2
3
4
5
6
7
8
9
10
11
12
13
def init_live_capture(es, tshark, nic, bpf, chunk, count):
try:
command = tshark.make_command(nic=nic, count=count, bpf=bpf, pcap_file=None, interfaces=False)
capture = tshark.capture(command)
if es is None:
dump_packets(capture)
else:
helpers.bulk(client=es, actions=index_packets(capture), chunk_size=chunk, raise_on_error=True)
except Exception as e:
print('[ERROR] ', e)
syslog.syslog(syslog.LOG_ERR, e)
sys.ext(1)
The helpers.bulk() Elasticearch Python client function does all the heavy lifting to bulk index the packets in Elasticsearch. It accepts a handle to the Elasticsearch cluster we want to use for indexing, the actions produced by the index_packets() generator, the number of packets (chunk) to bulk index to Elasticsearch at a time, and whether or not exceptions will be raised for failures during bulk indexing. As we saw earlier, index_packets() accepts a capture
iterable.
init_file_capture() works pretty much the same way, except it processes one or more PCAP files. For each file a capture
object is created then passed to either of the indexer functions. init_file_capture() handles one file at a time, creating a separate capture for each, from the list of files passed to it.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def init_file_capture(es, tshark, pcap_files, chunk):
try:
print('Loading packet capture file(s)')
for pcap_file in pcap_files:
command = tshark.make_command(nic=None, count=0, bpf=None, pcap_file=pcap_file, interfaces=None)
print(pcap_file)
capture = tshark.capture(command)
if es is None:
dump_packets(capture)
else:
helpers.bulk(client=es, actions=index_packets(capture), chunk_size=chunk, raise_on_error=True)
except Exception as e:
print('[ERROR] ', e)
syslog.syslog(syslog.LOG_ERR, e)
sys.ext(1)
Running Espcap
Installation
You can download from Github at https://github.com/vichargrave/espcap. Just clone the repo and cd into the espcap/src directory, then run python -r requirements.txt
to install the elasticsearch and click modules. You may choose to this in a virtual environment, if you don’t plan on doing anything else with these modules.
Open config/espap.yml file then set the tshark_path field to the localion of your tshark instance. If you move the Espcap source files to a different location, you might want to just place espcap.yml in the same location as the application files. If you want to put the file in an entirely new location, add that path to the _config_paths list.
Test File Capture
Now you ready to capture some packets. Let’s start with file capture mode and print the output to stdout. cd into the src directory, then run espcap.py like this:
./espcap.py --file=../test_pcaps/test_dns.pcap
or like this to read all the packet capture files:
./espcap.py --dir=../test_pcaps
To test packet capture with indexing, start up a local instance of Elasticsearch or use a remote instance if you have one. Although it’s not absolutely necessary, it’s a good idea to set up a packets index template to set the date formats to be consistent with the format used in Tshark. If you are using a local Elasticsearch instance, run the packet_template.sh script for Elasticsearch 7:
../scripts/packet_template.sh localhost:9200
If you are running Elasticsearch 6.x, use the packet_template-6.x.sh script instead. Next run espcap.py as follows:
./epcap.py --node=localhost:9200 --file=../test_pcaps/test_http.pcap
You can verify the packet were indexed by running a query to get all packets in the packets-*
indexes or run the packet_query.sh script:
../scripts/packet_query.sh localhost:9200
Test Live Capture
Last but certainly not least, you’ll want to try live capture. For example, if you want to capture 300 inbound and outbound TCP packets for a server using port 443 (https) from the `en0 network interface, run espcap.py like this:
./espcap.py --node=localhost:9200 --interface=en0 --count=3000 --bpf="tcp port 443"
To run this packet capture indefinitely, omit the --count
argument or set it to 0, the default. The rules and formatting of the packet filter expression --bpf
are determined by tshark since this argument string is passed directly to the command.
Summing Up
Escpap illustrates how to use some interesting Python features, most notably iterator functions, and the Elasticsearch Python client API. The API makes good use of iterators in the bulk indexing helper functions. Iterators made it possible to delegate packet capture to our generator functions for live and file capture then continually pass back packets to the packet indexing function, which is itself a generator that conveys the packets to the Elasticsearch client bulk indexing function. This chain of functions runs concurrently to provide a stream of packets to Elasticsearch.
Another approach to capturing packets is discussed in the article Analyzing Network Packets with Wireshark, Elasticsearch, and Kibana. The system described in this article gets packets from pcap files using Filebeats and Logstash that feed them to Elasticsearch, which is more scaleable if packet captures are sent from a large number of agent systems. It would be worthwhile experimenting with the use of Logstash to handle the direct indexing of packets in Elasticsearch. Espcap could run on each agent, then forward the captures to Logstash running on the Elasticsearch cluster.
Leave a comment