Splunk Search

How to reassemble bidirectional flows with transaction

emf1123
New Member

I need to assemble transactions where, depending on the direction of the traffic, the "source" might actually be the "destination", or vice-versa.

Here's a particular example, with only the important fields shown:

"_time",action,src,"s_port",dst,service,xlatesrc,xlatesport,proto
"2014-05-27T08:47:32.000-0400",accept,"10.9.0.32",52643,"72.21.81.253",80,"192.0.2.1",51051,tcp
"2014-05-27T08:48:01.000-0400",drop,"72.21.81.253",80,"192.0.2.1",51051,,,tcp
"2014-05-27T08:49:25.000-0400",drop,"72.21.81.253",80,"192.0.2.1",51051,,,tcp
"2014-05-27T08:51:18.000-0400",drop,"72.21.81.253",80,"192.0.2.1",51051,,,tcp
"2014-05-27T08:53:18.000-0400",drop,"72.21.81.253",80,"192.0.2.1",51051,,,tcp

you'll notice that "xlatesrc" in the first line becomes the "dst" on the subsequent drop events. The only real clue I have here is that the (xlatesrc,xlatesport) [or (src,s_port)] tuple equals the (dst,service) tuple, and vice-versa.

how do you reassemble these streams?

Tags (1)
0 Karma

emf1123
New Member

If it makes it more clear, the problem is that "transaction a,b" is an "AND" match, and I need an "OR" match.

0 Karma

jhupka
Path Finder

If your xlatesrc is always empty in your subsequent events, can you do something like:

... | eval tuple = if(isnull(xlatesrc), src.s_port.dst.service, dst.service.xlatesrc.xlatesport) | stats <whatever values you want> by tuple 

Essentially conditionally build your tuple of the proper IP+port+IP+port based on the xlatesrc being there or not for the fields that have your unique identifier.

0 Karma

emf1123
New Member

jhupka: I can't rely on sessions always being started with xlatesrc, and I absolutely need to match subsequent events that can happen in either direction.

I tried this, but it's still giving two "match"es:

| eval tuplesrc = if(isnull(xlatesrc), src.s_port, xlatesrc.xlatesport)
| eval tupledst = dst.service
| eval matchsrc = case(tuplesrc == xlatesrc.xlatesport, tuplesrc, tuplesrc == src.s_port, tuplesrc, tuplesrc == dst.service, tuplesrc)
| eval matchdst = case(tupledst == dst.service, tupledst, tupledst == src.s_port, tupledst)
| eval match = matchsrc.matchdst

0 Karma
Get Updates on the Splunk Community!

Archived Metrics Now Available for APAC and EMEA realms

We’re excited to announce the launch of Archived Metrics in Splunk Infrastructure Monitoring for our customers ...

Detecting Remote Code Executions With the Splunk Threat Research Team

WATCH NOWRemote code execution (RCE) vulnerabilities pose a significant risk to organizations. If exploited, ...

Enter the Dashboard Challenge and Watch the .conf24 Global Broadcast!

The Splunk Community Dashboard Challenge is still happening, and it's not too late to enter for the week of ...