Splunk Search

How to reassemble bidirectional flows with transaction

emf1123
New Member

I need to assemble transactions where, depending on the direction of the traffic, the "source" might actually be the "destination", or vice-versa.

Here's a particular example, with only the important fields shown:

"_time",action,src,"s_port",dst,service,xlatesrc,xlatesport,proto
"2014-05-27T08:47:32.000-0400",accept,"10.9.0.32",52643,"72.21.81.253",80,"192.0.2.1",51051,tcp
"2014-05-27T08:48:01.000-0400",drop,"72.21.81.253",80,"192.0.2.1",51051,,,tcp
"2014-05-27T08:49:25.000-0400",drop,"72.21.81.253",80,"192.0.2.1",51051,,,tcp
"2014-05-27T08:51:18.000-0400",drop,"72.21.81.253",80,"192.0.2.1",51051,,,tcp
"2014-05-27T08:53:18.000-0400",drop,"72.21.81.253",80,"192.0.2.1",51051,,,tcp

you'll notice that "xlatesrc" in the first line becomes the "dst" on the subsequent drop events. The only real clue I have here is that the (xlatesrc,xlatesport) [or (src,s_port)] tuple equals the (dst,service) tuple, and vice-versa.

how do you reassemble these streams?

Tags (1)
0 Karma

emf1123
New Member

If it makes it more clear, the problem is that "transaction a,b" is an "AND" match, and I need an "OR" match.

0 Karma

jhupka
Path Finder

If your xlatesrc is always empty in your subsequent events, can you do something like:

... | eval tuple = if(isnull(xlatesrc), src.s_port.dst.service, dst.service.xlatesrc.xlatesport) | stats <whatever values you want> by tuple 

Essentially conditionally build your tuple of the proper IP+port+IP+port based on the xlatesrc being there or not for the fields that have your unique identifier.

0 Karma

emf1123
New Member

jhupka: I can't rely on sessions always being started with xlatesrc, and I absolutely need to match subsequent events that can happen in either direction.

I tried this, but it's still giving two "match"es:

| eval tuplesrc = if(isnull(xlatesrc), src.s_port, xlatesrc.xlatesport)
| eval tupledst = dst.service
| eval matchsrc = case(tuplesrc == xlatesrc.xlatesport, tuplesrc, tuplesrc == src.s_port, tuplesrc, tuplesrc == dst.service, tuplesrc)
| eval matchdst = case(tupledst == dst.service, tupledst, tupledst == src.s_port, tupledst)
| eval match = matchsrc.matchdst

0 Karma
Get Updates on the Splunk Community!

Introducing the Splunk Community Dashboard Challenge!

Welcome to Splunk Community Dashboard Challenge! This is your chance to showcase your skills in creating ...

Built-in Service Level Objectives Management to Bridge the Gap Between Service & ...

Wednesday, May 29, 2024  |  11AM PST / 2PM ESTRegister now and join us to learn more about how you can ...

Get Your Exclusive Splunk Certified Cybersecurity Defense Engineer Certification at ...

We’re excited to announce a new Splunk certification exam being released at .conf24! If you’re headed to Vegas ...