Splunk Search

Why am I getting inconsistent search results using export with the Splunk Python SDK?

aiguofer
Engager

I've written a search that creates a stats table with a medium sized result with around 5 cols and 100k+ rows. When I run the search in Splunk Web, it gives me consistent results. If I create a search job, wait for it to finish, then fetch all results (running multiple times with the correct offsets), then I get the same consistent results as in Splunk Web.

However, if I try to run the same search using export and stream the results, I get inconsistent results. Interestingly enough, I seem to always end up with more results than with a search job. When I dig deeper, I find that there are repeated entries, but I still seem to end up with more results total after I remove the duplicates. Even weirder, I get different results each time I run it.

Here's my working search:

def wait_for_search_job(job):
    while True:
        while not job.is_ready():
            pass
        stats = {"isDone": job["isDone"],
                 "doneProgress": float(job["doneProgress"])*100,
                 "scanCount": int(job["scanCount"]),
                 "eventCount": int(job["eventCount"]),
                 "resultCount": int(job["resultCount"])}

        status = ("%(doneProgress)03.1f%%   %(scanCount)d scanned   "
                  "%(eventCount)d matched   %(resultCount)d results") % stats

        clear_output()
        display(status)
        if stats["isDone"] == "1":
            display("Done!")
            break
        sleep(5)
    return

def fetch_all_results(job):
    result_count = int(job["resultCount"])
    num_results = 50000
    iterations = int(ceil(1. * result_count / num_results))

    rows = []

    for i in range(iterations):
        offset = i * num_results

        for result in results.ResultsReader(job.results(count=num_results, offset=offset)):
            if isinstance(result, dict):
                rows.append(result)
            elif isinstance(result, results.Message):
                # Diagnostic messages may be returned in the results
                print "Message: %s" % result

    return rows

def get_splunk_hits_search():
    # Create a Service instance and log in 
    service = splunk.connect(
        host=HOST,
        port=PORT,
        username=USERNAME,
        password=PASSWORD
    )

    kwargs_export = {
        "earliest_time": datetime.datetime(2015, 6, 29).isoformat(),
        "latest_time": datetime.datetime(2016, 4, 11).isoformat(),
        "search_mode": "normal"
    }

    api_hits = """
     search <search_string>
     | rex field=req.url ".*branch_id=(?<branch_id>[a-zA-Z0-9._-]+)&?.*" 
     | rex field=req.originalUrl ".*api/(v1/)?(?<endpoint>\w+)/?.*" 
     | fillnull value=NULL endpoint branch_id 
     | bin _time span=7d as week 
     | eval week=strftime(week, "%Y-%m-%d") 
     | eval day=strftime(_time, "%Y-%m-%d") 
     | stats count dc(day) as days_visited by req.user.account week endpoint req.headers.user-agent branch_id 
     | rename req.headers.user-agent as user_agent, req.user.account as accountnumber, count as hits
     """

    job = service.jobs.create(api_hits, **kwargs_export)
    wait_for_search_job(job)
    res = fetch_all_results(job)  
    df = pd.DataFrame.from_dict(res)
    return job, df

Here's my kinda-working but unreliable export:

def get_splunk_hits_export():
    # Create a Service instance and log in 
    service = splunk.connect(
        host=HOST,
        port=PORT,
        username=USERNAME,
        password=PASSWORD)

    kwargs_export = {
        "earliest_time": datetime.datetime(2015, 6, 29).isoformat(),
        "latest_time": datetime.datetime(2016, 4, 11).isoformat(),
        "search_mode": "normal"
    }

    api_hits = """
     search <search_string>
     | rex field=req.url ".*branch_id=(?<branch_id>[a-zA-Z0-9._-]+)&?.*" 
     | rex field=req.originalUrl ".*api/(v1/)?(?<endpoint>\w+)/?.*" 
     | fillnull value=NULL endpoint branch_id 
     | bin _time span=7d as week 
     | eval week=strftime(week, "%Y-%m-%d") 
     | eval day=strftime(_time, "%Y-%m-%d") 
     | stats count dc(day) as days_visited by req.user.account week endpoint req.headers.user-agent branch_id 
     | rename req.headers.user-agent as user_agent, req.user.account as accountnumber, count as hits
     """

    exportsearch_results = service.jobs.export(api_hits, **kwargs_export)

    rows = []

    for result in results.ResultsReader(exportsearch_results):
        if isinstance(result, dict):
            rows.append(result)
        elif isinstance(result, results.Message):
            # Diagnostic messages may be returned in the results
            print "Message: %s" % result

    df = pd.DataFrame.from_dict(res)
    return df
1 Solution

tthrockm
Explorer

you are likely getting dupes in the second case because of previews being enabled; try disabling them via assert rr.is_preview == False

ref: http://docs.splunk.com/Documentation/Splunk/6.2.5/Search/Exportsearchresults#Python_SDK

View solution in original post

tthrockm
Explorer

you are likely getting dupes in the second case because of previews being enabled; try disabling them via assert rr.is_preview == False

ref: http://docs.splunk.com/Documentation/Splunk/6.2.5/Search/Exportsearchresults#Python_SDK

aiguofer
Engager

That did it! I set the following and now I get consistent results that match my 2 step search!

     kwargs_export = {
         "earliest_time": datetime.datetime(2015, 6, 29).isoformat(),
         "latest_time": datetime.datetime(2016, 4, 11).isoformat(),
         "search_mode": "normal",
         "preview": False
     }
0 Karma

rithvikmundra
Explorer

Using "preview": False in kwargs_export solved this issue for me. Thanks @aiguofer.

0 Karma
Get Updates on the Splunk Community!

Index This | I am a number, but when you add ‘G’ to me, I go away. What number am I?

March 2024 Edition Hayyy Splunk Education Enthusiasts and the Eternally Curious!  We’re back with another ...

What’s New in Splunk App for PCI Compliance 5.3.1?

The Splunk App for PCI Compliance allows customers to extend the power of their existing Splunk solution with ...

Extending Observability Content to Splunk Cloud

Register to join us !   In this Extending Observability Content to Splunk Cloud Tech Talk, you'll see how to ...