Refine your search:

hello, i have a subset of results from a search. i now that if I have a clientIP=x.x.x.x, this is proxied and i need to do another search to the outer proxy logs to retrieve the real IP address of the client; this can be achieved by doing a separate search on another sourcetype in the same exact timestamp.

what i need to do, if possibile, is to rewrite the content of the clientIP value with another retrieved with a subsearch, passing the time range and username field. something like this (but this one cannot work):

sourcetype=foo 
| eval clientIP=if(clientIP == "x.x.x.x", ([ sourcetype=proxy earliest=$earliest$ latest=$latest$ user=$username$ |fields realIP|rename realIP AS search]), clientIP)

the hard part (for me) is passing the field values to the subsearch.

i need to present the data as a single result, thus i need to to do all the trasnforms inside that search.

thanks in advance

asked 19 Sep '11, 10:04

johnnymc's gravatar image

johnnymc
1553210
accept rate: 0%

edited 19 Sep '11, 10:30

dwaddle's gravatar image

dwaddle ♦
17.9k2934


3 Answers:

If I'm understanding what your trying to do correctly, it sounds like you want to launch a sub-search for each and every event returned by your base search (sourcetype=foo). It also looks like you only want the sub-search to run conditionally if you don't have a valid clientIP value. Does that sound correct?

First off, I don't think there is any way to conditionally launch a subsearch. Each subsearch is only run once, and is evaluated and expanded into the main search, and then the main search runs. Now, you can launch a search per-event using the map command, so you may need to do something like that, however in that case, the output from the individual "map" run search is our final output, so you lose all the events from your base search in the output. So, you'll need to do all of this inside of a sub-search and then recombine the two sets of results using a join command, or something...

For the record, this is a complete guess. This would take lots of examples and probably a few hours of messing around to get something that actually works properly...

sourcetype=foo | join type=outer user [ search sourcetype=foo clientIP=="x.x.x.x" | map search="search earliest=$earliest$ latest=$latest$ sourcetype=proxy user=$username$" | fields _time, clientIP, user ]

I'm not sure if you can get join to do the correct time correlation that you need. It's possible that you'll need to do some sort of funky transaction instead (since it's the only search command that lets you pull events together based on a range of time, but there are also a number of down-sides to that approach.)

Good luck ;-)

Alternate approaches: Don't forget that you can always programatically call splunk searches. It sounds like you have something complex enough that it may warrant that kind of effort, and that would give you FULL control. It's also possible that generating lookups periodically could be a much better way to handle this.

link

answered 19 Sep '11, 11:56

Lowell's gravatar image

Lowell ♦
12.5k2422113
accept rate: 41%

Thank you both for you exhaustive and helpful answers.

After a lot of effort in trial and error, i ended up piping search results to a custom lookup script.

The script does as search by itself, but only if needed; it leaves the field as-is, spitting out the same if the ip is NOT proxied, and not doing anything to preserve resources) .

The script works by passing the old (proxied - in the script below - is 1.2.3.4) SourceIP value, and a timestamp (to be matched against the other log) and a discriminator (in my case, since this is a pop3 authentication, is the customer username). the _time value needs to be rewritten, so do an eval (see TS below).

Please note that you need to pass an empty value to be populated by the output. you can 'construct' this field in the search by using a eval.

yoursearch | eval TS=_time | lookup RealPopIP TS Account SourceIP OUTPUT RealIP

This took me a lot of time to understand, otherwise it doesn't work.

transform stanza needed:

[RealPopIP]
external_cmd = RealIP.py TS Account SourceIP RealIP
fields_list = TS,Account,SourceIP,RealIP

for sake of completeness: the script works by being passed (via STDIN) by the splunk server a partially filled CSV file, and spitting out a filled csv file by stdout. you also need to pass the field names as arguments (see the former transform above) the script is not perfect to take it as-is, maybe can be useful to others :)

#!/usr/bin/env python
#
# script to change specific field values
# eg. if IP is proxied
# leave the value as is if doesn't match
#
# 21.09.2011
#
import csv,sys,commands,socket
import time,splunk.auth,splunk.search

USER_NAME = 'admin'
PASSWORD = 'password'

def main():
    r = csv.reader(sys.stdin)

    if len(sys.argv) != 5:
        for l in r:
            csv.writer(sys.stdout).writerow(l)
        sys.exit(0)

    # read cmd line arguments
    timestampf = sys.argv[1]
    userf = sys.argv[2]
    sourceipf = sys.argv[3]
    realipf = sys.argv[4]

    w = None
    header = []
    first = True

    # authenticate to splunk server
    key = splunk.auth.getSessionKey(USER_NAME, PASSWORD)

    for line in r:   
        if first:
            header = line
            t=header
            headers=[]

            for e in t:
                e=e.lstrip('[')
                e=e.rstrip(']')
                e=e.replace('\'','')
                e=e.replace(' ','')
                headers.append(e)

            csv.writer(sys.stdout).writerow(header)
            w = csv.DictWriter(sys.stdout, header)

            first = False
            continue

        # Read the result
        result = {}
        i = 0
        while i < len(header):
            if i < len(line):
                result[header[i]] = line[i]
            else:
                result[header[i]] = ''
            i += 1

        if len(result[timestampf]) and len(result[userf]) and len(result[sourceipf]):
            # proxied IP - need to rewrite to real customer IP address
         if result[sourceipf] == "1.2.3.4":
                searchstring = 'search sourcetype="realipsourcetype" clientIP=* user=%s | head 1' % (result[userf])

                # Perform the lookup (splunk search)
                ts=int(result[timestampf])
                job = splunk.search.dispatch(searchstring, earliest_time=ts-15, latest_time=ts+15, required_field_list='clientIP')

                # wait for results
                while not job.isDone:
                   time.sleep(.25)

                result[realipf]=str(job.results[0].fields['clientIP'])

                w.writerow(result)

            else:
                # ip is not proxied, leave as is
                result[realipf]=result[sourceipf]
                w.writerow(result)

        else:
            # format is not correct - spit out as came
            w.writerow(result)

main()

Hope will be useful to others.

link

answered 27 Sep '11, 02:38

johnnymc's gravatar image

johnnymc
1553210
accept rate: 0%

edited 27 Sep '11, 05:37

1

Kudos on getting back with such a detailed explanation on how you managed to solve the problem. Nice!

(27 Sep '11, 03:28) Ayn ♦
1

Just FYI, if you use a custom search command instead of a external "lookup" you can pass in authentication and not have to hard-code your login credentials; which raises security concerns. (I created an app that provides an example of that http://splunk-base.splunk.com/apps/22370/runsavedsearch-alert-action)

(27 Sep '11, 12:37) Lowell ♦

thanks a lot, i'll try to use it this way.

(28 Sep '11, 00:10) johnnymc

Oh, you should also be aware of a bug in early 4.1.x releases: http://splunk-base.splunk.com/answers/3238/can-a-custom-search-command-launch-a-splunk-search

(28 Sep '11, 06:54) Lowell ♦

In addition to Lowell's answer that covers the most: from what I've gathered, what you want to achieve cannot be easily/efficiently achieved by issuing searches on indexed data. I've been wishing for a similar feature myself, i.e. something that can perform "lookups" on indexed data as easily as you can perform lookups using .csv files and external scripts.

With manageable volumes of data, Splunk can perform time-based lookups that automatically grab values that were valid for an event's specific time period. Typical use-case is to feed Splunk with a DHCP log with time, IP and MAC address, and then use it as a time-based lookup in order to automatically tie an IP to a MAC address for each event in your searches (as long as the IP address exists in the DHCP log of course). I'm guessing your proxy logs are pretty big so feeding them to Splunk as a lookup csv is far from an ideal solution. Nevertheless, should you want to try it there's information on time-based lookups available in the docs: http://docs.splunk.com/Documentation/Splunk/latest/Knowledge/Addfieldsfromexternaldatasources#Example_of_time-based_fields_lookup

link

answered 19 Sep '11, 14:30

Ayn's gravatar image

Ayn ♦
34.4k3817
accept rate: 40%

Post your answer
toggle preview

Follow this question

Log In to enable email subscriptions

RSS:

Answers

Answers + Comments

Markdown Basics

  • *italic* or _italic_
  • **bold** or __bold__
  • link:[text](http://url.com/ "Title")
  • image?![alt text](/path/img.jpg "Title")
  • numbered list: 1. Foo 2. Bar
  • to add a line break simply add two spaces to where you would like the new line to be.
  • basic HTML tags are also supported

Tags:

×408
×373
×295

Asked: 19 Sep '11, 10:04

Seen: 2,319 times

Last updated: 28 Sep '11, 06:54

Copyright © 2005-2014 Splunk Inc. All rights reserved.