Below is the approach I used to dispatch/run a saved search job by its name with a custom latest and earliest boundaries.
NO changes are required to the saved search code/configuration/etc.
In the case below, the date-time is represented in epoch, so the format is set to "%s":
import time
import splunklib.client as client
import splunklib.results as results
def _run_job(job: client.Job):
# small delay to sync server and client
time.sleep(2)
# Wait for the job to finish--poll for completion and display stats
is_done = False
while not is_done:
job.refresh()
time.sleep(10.0)
is_done = job.is_done()
output = list()
rr = results.ResultsReader(job.results())
for result in rr:
if isinstance(result, results.Message):
# Diagnostic messages may be returned in the results
print('Diagnostic message {0}: {1}'.format(result.type, result.message))
elif isinstance(result, dict):
# Normal events are returned as dicts
output.append(result)
return output
def get(name):
connection_kwargs = {
'host': 'your_host_ip',
'username': 'your username',
'password': 'your password',
}
service = client.connect(**connection_kwargs)
return service.saved_searches[name, 'YOUR_APP_NAMESPACE']
def run(name, **kwargs):
saved_search = get(name)
job = saved_search.dispatch(**kwargs)
print('Dispatched Splunk Search Job <{0}> with params {1}'.format(name, kwargs))
return _run_job(job)
def main():
kwargs = {
'dispatch.latest_time': end_epoch,
'dispatch.earliest_time': start_epoch,
'dispatch.time_format': '%s',
}
result = run('YOUR_SEARCH_NAME', **kwargs)
... View more