import time
import uuid
import json
from base64 import b64encode
from tensor import utils
[docs]class ElasticSearch(object):
"""Twisted ElasticSearch API
"""
def __init__(self, url='http://localhost:9200', user=None, password=None,
index='tensor-%Y.%m.%d'):
self.url = url.rstrip('/')
self.index = index
self.user = user
self.password = password
def _get_index(self):
return time.strftime(self.index)
def _request(self, path, data=None, method='GET'):
headers = {}
if self.user:
authorization = b64encode('%s:%s' % (self.user, self.password)).decode()
headers['Authorization'] = ['Basic ' + authorization]
return utils.HTTPRequest().getJson(
self.url + path, method, headers=headers, data=data.encode())
def _gen_id(self):
return b64encode(uuid.uuid4().bytes).decode().rstrip('=')
def stats(self):
return self._request('/_cluster/stats')
def node_stats(self):
return self._request('/_nodes/stats')
def insertIndex(self, type, data):
return self._request('/%s/%s/%s' % (
self._get_index(), type, self._gen_id()
), json.dumps(data), 'PUT')
def bulkIndex(self, data):
serdata = ""
for row in data:
if '_id' in row:
id = row['id']
del row['id']
else:
id = self._gen_id()
d = {
"index": {
"_index": self._get_index(),
"_type": row.get('type', 'event'),
"_id": id,
}
}
serdata += json.dumps(d) + '\n'
serdata += json.dumps(row) + '\n'
return self._request('/_bulk', serdata, 'PUT')