Splunk Search

How to reassemble bidirectional flows with transaction

emf1123
New Member

I need to assemble transactions where, depending on the direction of the traffic, the "source" might actually be the "destination", or vice-versa.

Here's a particular example, with only the important fields shown:

"_time",action,src,"s_port",dst,service,xlatesrc,xlatesport,proto
"2014-05-27T08:47:32.000-0400",accept,"10.9.0.32",52643,"72.21.81.253",80,"192.0.2.1",51051,tcp
"2014-05-27T08:48:01.000-0400",drop,"72.21.81.253",80,"192.0.2.1",51051,,,tcp
"2014-05-27T08:49:25.000-0400",drop,"72.21.81.253",80,"192.0.2.1",51051,,,tcp
"2014-05-27T08:51:18.000-0400",drop,"72.21.81.253",80,"192.0.2.1",51051,,,tcp
"2014-05-27T08:53:18.000-0400",drop,"72.21.81.253",80,"192.0.2.1",51051,,,tcp

you'll notice that "xlatesrc" in the first line becomes the "dst" on the subsequent drop events. The only real clue I have here is that the (xlatesrc,xlatesport) [or (src,s_port)] tuple equals the (dst,service) tuple, and vice-versa.

how do you reassemble these streams?

Tags (1)
0 Karma

emf1123
New Member

If it makes it more clear, the problem is that "transaction a,b" is an "AND" match, and I need an "OR" match.

0 Karma

jhupka
Path Finder

If your xlatesrc is always empty in your subsequent events, can you do something like:

... | eval tuple = if(isnull(xlatesrc), src.s_port.dst.service, dst.service.xlatesrc.xlatesport) | stats <whatever values you want> by tuple 

Essentially conditionally build your tuple of the proper IP+port+IP+port based on the xlatesrc being there or not for the fields that have your unique identifier.

0 Karma

emf1123
New Member

jhupka: I can't rely on sessions always being started with xlatesrc, and I absolutely need to match subsequent events that can happen in either direction.

I tried this, but it's still giving two "match"es:

| eval tuplesrc = if(isnull(xlatesrc), src.s_port, xlatesrc.xlatesport)
| eval tupledst = dst.service
| eval matchsrc = case(tuplesrc == xlatesrc.xlatesport, tuplesrc, tuplesrc == src.s_port, tuplesrc, tuplesrc == dst.service, tuplesrc)
| eval matchdst = case(tupledst == dst.service, tupledst, tupledst == src.s_port, tupledst)
| eval match = matchsrc.matchdst

0 Karma
Get Updates on the Splunk Community!

Announcing Scheduled Export GA for Dashboard Studio

We're excited to announce the general availability of Scheduled Export for Dashboard Studio. Starting in ...

Extending Observability Content to Splunk Cloud

Watch Now!   In this Extending Observability Content to Splunk Cloud Tech Talk, you'll see how to leverage ...

More Control Over Your Monitoring Costs with Archived Metrics GA in US-AWS!

What if there was a way you could keep all the metrics data you need while saving on storage costs?This is now ...