es入门
es的简单入门。
资源
- https://www.elastic.co/guide/cn/elasticsearch/guide/current/preface.html
- https://learnku.com/docs/elasticsearch73/7.3
- es-python文档
- python-elasticsearch基本用法
- https://www.cnblogs.com/Wu13241454771/p/13576288.html
- es的分词
kibana
创建索引
# 流水线
##注意 ingets需要开启
PUT _ingest/pipeline/my_timestamp_pipeline
{
"description": "Adds a field to a document with the time of ingestion",
"processors": [
{
"set": {
"field": "ingest_timestamp",
"value": "{{_ingest.timestamp}}"
}
}
]
}
# 创建索引
PUT pa_log_clean_with_time
{
"settings": {
"default_pipeline": "my_timestamp_pipeline"
}
}
# 最大分片设置
PUT /_cluster/settings
{
"transient": {
"cluster": {
"max_shards_per_node":5000
}
}
}
crud
# 添加
POST /test/_doc
{
"msg" :"Eating an apple a day keeps doctor away"
}
# 查询
POST /test/_search
{
"query": {
"match": {
"msg": "eat"
}
}
}
# 查看分词效果
POST /test/_analyze
{
"field": "msg",
"text": "我爱北京天安门"
}
# 删除
DELETE /test/_doc/xJ636HsBVVbMBtr2nSGm
KQL
https://www.cnblogs.com/-beyond/p/14159002.html
查询语法
https://blog.csdn.net/mon_star/article/details/102934620
可以多层级嵌套使用。
https://www.cnblogs.com/xiohao/p/12970224.html
python客户端
增删改查
#coding=utf-8
import elasticsearch
from elasticsearch.helpers import bulk
import csv
def init_es_client():
es = elasticsearch.Elasticsearch(
hosts=["10.131.224.14",'10.131.224.15','10.131.224.19'],port=54926
)
return es
es = init_es_client()
print(es.info())
# 查找
def search(es_client, start, end):
result = es_client.search(index="[YOUR-INDEX]", body={
"query": {
"bool": {
"must":[
{ "range": { "[YOUR-DATE]":{ "gte": start, "lte": end }}},
{
"bool": {
"should": [
{ "match_phrase": { "[YOUR-CONTENT]": "[KEYWORD-1]"}},
{ "match_phrase": { "[YOUR-CONTENT]": "[KEYWORD-2]"}}
]
}
}
]
}
}
})
return result
# 插入
data ={"recv_addr":'测试导入地址','dtime':'2021-09-14 15:40'}
#es.index(index='gpdata_addr',body=data)
# 批量导入
arr = []
data ={"recv_addr":'测试导入地址2','dtime':'2021-09-14 15:40'}
for i in range(1,1000):
data['recv_addr'] = '测试导入地址%d'%i
arr.append(data)
res,_ = bulk(es,arr,index='gpdata_addr',raise_on_error = True)
print(res)
# 删除 先查找id、再删除
while True:
result = es.search(index="gpdata_addr")
#print(result)
#print(result['hits']['hits'])
if not result:
break
for item in result['hits']['hits']:
print(item['_id'])
# 删除
try:
es.delete(index="gpdata_addr",id=item['_id'])
except:
print('%s not found'%item['_id'])
批量导入1
#coding=utf-8
import elasticsearch
from elasticsearch.helpers import bulk
import csv
def init_es_client():
es = elasticsearch.Elasticsearch(
hosts=["10.131.224.14",'10.131.224.15','10.131.224.19'],port=54926
)
return es
es = init_es_client()
# 批量导入
batch = 0
with open('gpdata_addr.csv','r') as f:
reader = csv.reader(f)
num = 0
batch = 0
arr = []
for row in reader:
num = num + 1
if num % 10000 == 0:
batch = batch + 1
print('batch: %s'%batch)
res,_ = bulk(es,arr,index='gpdata_addr',raise_on_error = True)
arr = []
print(res)
data ={"recv_addr":str(row[0]),'dtime':str(row[1])}
arr.append(data)
if len(arr) > 0:
res,_ = bulk(es,arr,index='gpdata_addr',raise_on_error = True)
print(res)
批量导入2
因为不清楚bluk的用法,自己写的多进程来快速、批量导入数据的脚本。思路非常简单,主进程用来分资源,子进程用来处理资源。进程间的同步,写得简单,存在隐患,主进程写文件,并不停的查看哪个文件清空了,子进程,处理完数据后,则删除文件,并休眠3秒,好让主进程有足够的时间来,写文件。
具体用法参见:
https://elasticsearch-py.readthedocs.io/en/v7.14.1/helpers.html#example
- 主进程,分割文件
#coding=utf-8
import os
import time
def get_file():
while True:
for i in range(8):
fn = 'gpdata_addr_%d.csv'%i
if not os.path.exists(fn):
return fn
time.sleep(1)
print('等待新文件')
with open('gpdata_addr.csv','r') as fi:
is_end = False # 读文件结束
while True:
new_file = get_file()
fo = open(new_file,'w')
print('create new file %s'%new_file)
output =[]
for i in range(1,100000+1):
line = fi.readline()
if line:
output.append(line)
else:
is_end = true
break
fo.write(''.join(output))
fo.close()
output = []
if is_end:
break
- 工作进程,消费文件,并删除
#coding=utf-8
import elasticsearch
from elasticsearch.helpers import bulk
import csv
import sys
import os
import time
if len(sys.argv) <2 :
print('缺少参数 id')
os.exit(1)
pid = sys.argv[1]
csv_file = 'gpdata_addr_%s.csv'%pid
def init_es_client():
es = elasticsearch.Elasticsearch(
hosts=["10.1.1.1",'10.1.1.2','10.1.1.3'],port=54926
)
return es
es = init_es_client()
# 批量导入
batch = 0
while True:
if not os.path.exists(csv_file):
print('file [%s] not found,sleep'%csv_file)
time.sleep(3)
continue
with open(csv_file,'r') as f:
reader = csv.reader(f)
num = 0
# batch = 0
arr = []
for row in reader:
num = num + 1
if num % 10000 == 0:
batch = batch + 1
print('batch: %s'%batch)
res,_ = bulk(es,arr,index='gpdata_addr',raise_on_error = False,raise_on_exception = False)
arr = []
print(res)
data ={"recv_addr":str(row[0]),'dtime':str(row[1])}
arr.append(data)
if len(arr) > 0:
res,_ = bulk(es,arr,index='gpdata_addr',raise_on_error = False,raise_on_exception = False)
print(res)
print('remove %s'%csv_file)
os.remove(csv_file)
time.sleep(3)
核心代码如下:
res,_ = bulk(es,arr,index='gpdata_addr',raise_on_error = False,raise_on_exception = False)
raise_on_error
是否报错误,但是连接错误,无法屏蔽。
raise_on_exception
异常是否会退出等。如果不想中间因为异常退出,长期跑下去,这个需要设置。(具体异常做了什么处理,待确定……)
备份
一些用到的查询语法
es的查询语法,真是负责,貌似手写语法树的感觉。很多的语法都可以嵌套的使用。
query:
bool:
should:
- term:
virtual_ip: 10.131.10.100
# virtual_port: 11400
- term:
virtual_ip: 10.131.10.100
# virtual_port: 11400
must_not:
- term:
clientip: 10.131.213.103
- term:
clientip: 10.131.202.214
query:
bool:
should:
- match:
virtual_ip: 10.131.10.100
virtual_port: 11400
- match:
virtual_ip: 10.131.10.100
virtual_port: 11400
must_not:
- match:
clientip: 10.131.213.103
- match:
clientip: 10.131.202.214
query:
bool:
should:
- bool:
must:
- match:
virtual_ip: 10.131.10.100
- match:
virtual_port: 11400
- bool:
must:
- match:
virtual_ip: 10.131.10.100
- match:
virtual_port: 11400
must_not:
- match:
clientip: 10.131.213.103
- match:
clientip: 10.131.202.214
must:
- range:
"timestamp":
gte: '2021-12-06T09:00:00'
lt: '2021-12-06T09:30:00'
query:
bool:
must:
- match:
virtual_ip: 10.131.10.100
- match:
virtual_port: 11400
- range:
"timestamp":
gte: '2021-12-06T09:00:00Z'
lt: '2021-12-06T09:30:00Z'