es入门

es的简单入门。

资源

kibana

创建索引

# 流水线
##注意 ingets需要开启
PUT _ingest/pipeline/my_timestamp_pipeline
{
  "description": "Adds a field to a document with the time of ingestion",
  "processors": [
    {
      "set": {
        "field": "ingest_timestamp",
        "value": "{{_ingest.timestamp}}"
      }
    }
  ]
}

# 创建索引
PUT pa_log_clean_with_time
{
  "settings": {
    "default_pipeline": "my_timestamp_pipeline"
  }
}

# 最大分片设置

PUT /_cluster/settings
{
  "transient": {
    "cluster": {
      "max_shards_per_node":5000
    }
  }
}

crud

# 添加
POST /test/_doc
{
  "msg" :"Eating an apple a day keeps doctor away"
}

# 查询
POST /test/_search
{
  "query": {
    "match": {
      "msg": "eat"
    }
  }
}

# 查看分词效果

POST /test/_analyze
{
  "field": "msg",
  "text": "我爱北京天安门"
}

# 删除
DELETE /test/_doc/xJ636HsBVVbMBtr2nSGm

KQL

查询语法

https://www.cnblogs.com/-beyond/p/14159002.html

查询语法

https://blog.csdn.net/mon_star/article/details/102934620

可以多层级嵌套使用。

https://www.cnblogs.com/xiohao/p/12970224.html

python客户端

增删改查

#coding=utf-8

import elasticsearch
from elasticsearch.helpers import bulk
import csv

def init_es_client():
    es = elasticsearch.Elasticsearch(
        hosts=["10.131.224.14",'10.131.224.15','10.131.224.19'],port=54926
    )
    return es
es = init_es_client()

print(es.info())


# 查找
def search(es_client, start, end):
    result = es_client.search(index="[YOUR-INDEX]", body={
        "query": {
            "bool": {
                "must":[
                    { "range": { "[YOUR-DATE]":{ "gte": start, "lte": end }}}, 
                    {
                        "bool": {
                            "should": [
                                { "match_phrase": { "[YOUR-CONTENT]": "[KEYWORD-1]"}},
                                { "match_phrase": { "[YOUR-CONTENT]": "[KEYWORD-2]"}}
                            ]
                        }
                    }
                ]
            }
        }
    })
    return result
    
# 插入
data ={"recv_addr":'测试导入地址','dtime':'2021-09-14 15:40'}
#es.index(index='gpdata_addr',body=data)


# 批量导入
arr = []
data ={"recv_addr":'测试导入地址2','dtime':'2021-09-14 15:40'}
for i in range(1,1000):
    data['recv_addr'] = '测试导入地址%d'%i
    arr.append(data)
res,_ = bulk(es,arr,index='gpdata_addr',raise_on_error = True)
print(res)


# 删除   先查找id、再删除
while True:
    result = es.search(index="gpdata_addr")
    #print(result)
    #print(result['hits']['hits'])
    if not result:
        break
    for item in result['hits']['hits']:
        print(item['_id'])
        # 删除
        try:
            es.delete(index="gpdata_addr",id=item['_id'])
        except:
            print('%s  not found'%item['_id'])
        

批量导入1

#coding=utf-8

import elasticsearch
from elasticsearch.helpers import bulk
import csv

def init_es_client():
    es = elasticsearch.Elasticsearch(
        hosts=["10.131.224.14",'10.131.224.15','10.131.224.19'],port=54926
    )
    return es

es = init_es_client()
# 批量导入

batch = 0
with open('gpdata_addr.csv','r') as f:
    reader = csv.reader(f)
    num = 0
    batch = 0
    arr = []
    for row in reader:
        num = num + 1
        if num % 10000 == 0:
            batch = batch + 1
            print('batch: %s'%batch)
            res,_ = bulk(es,arr,index='gpdata_addr',raise_on_error = True)
            arr = []
            print(res)
        data ={"recv_addr":str(row[0]),'dtime':str(row[1])}
        arr.append(data)
        
    if len(arr) > 0:
        res,_ = bulk(es,arr,index='gpdata_addr',raise_on_error = True)
        print(res)

批量导入2

因为不清楚bluk的用法,自己写的多进程来快速、批量导入数据的脚本。思路非常简单,主进程用来分资源,子进程用来处理资源。进程间的同步,写得简单,存在隐患,主进程写文件,并不停的查看哪个文件清空了,子进程,处理完数据后,则删除文件,并休眠3秒,好让主进程有足够的时间来,写文件。

具体用法参见:

https://elasticsearch-py.readthedocs.io/en/v7.14.1/helpers.html#example

  • 主进程,分割文件
#coding=utf-8
import os
import time

def get_file():
    while True:
        for i in range(8):
            fn = 'gpdata_addr_%d.csv'%i
            if not os.path.exists(fn):
                return fn
        time.sleep(1)
        print('等待新文件')

with open('gpdata_addr.csv','r') as fi:
    is_end = False # 读文件结束
    while True:
        new_file = get_file()
        fo = open(new_file,'w')
        print('create new file %s'%new_file)
        output =[]
        for i in range(1,100000+1):
            line = fi.readline()
            if line:
                output.append(line)
            else:
                is_end = true
                break
        fo.write(''.join(output))
        fo.close()
        output = []
        if is_end:
            break
  • 工作进程,消费文件,并删除
#coding=utf-8

import elasticsearch
from elasticsearch.helpers import bulk
import csv
import sys
import os
import time

if len(sys.argv) <2 :
    print('缺少参数 id')
    os.exit(1)
pid = sys.argv[1]
csv_file = 'gpdata_addr_%s.csv'%pid

def init_es_client():
    es = elasticsearch.Elasticsearch(
        hosts=["10.1.1.1",'10.1.1.2','10.1.1.3'],port=54926
    )
    return es

es = init_es_client()

# 批量导入

batch = 0
while True:
    if not os.path.exists(csv_file):
        print('file [%s] not found,sleep'%csv_file)
        time.sleep(3)
        continue
    with open(csv_file,'r') as f:
        reader = csv.reader(f)
        num = 0
        # batch = 0
        arr = []
        for row in reader:
            num = num + 1
            if num % 10000 == 0:
                batch = batch + 1
                print('batch: %s'%batch)
                res,_ = bulk(es,arr,index='gpdata_addr',raise_on_error = False,raise_on_exception = False)
                arr = []
                print(res)
            data ={"recv_addr":str(row[0]),'dtime':str(row[1])}
            arr.append(data)
            
        if len(arr) > 0:
            res,_ = bulk(es,arr,index='gpdata_addr',raise_on_error = False,raise_on_exception = False)
            print(res)
    print('remove %s'%csv_file)
    os.remove(csv_file)
    time.sleep(3)

核心代码如下:

res,_ = bulk(es,arr,index='gpdata_addr',raise_on_error = False,raise_on_exception = False)
  • raise_on_error

    是否报错误,但是连接错误,无法屏蔽。

  • raise_on_exception

    异常是否会退出等。如果不想中间因为异常退出,长期跑下去,这个需要设置。(具体异常做了什么处理,待确定……)

备份

一些用到的查询语法

es的查询语法,真是负责,貌似手写语法树的感觉。很多的语法都可以嵌套的使用。

query:
    bool:
        should:
            - term:
                virtual_ip: 10.131.10.100
                # virtual_port: 11400
            - term:
                virtual_ip: 10.131.10.100
                # virtual_port: 11400
        must_not:
            - term:
                clientip: 10.131.213.103
            - term:
                clientip: 10.131.202.214
                
query:
    bool:
        should:
            - match:
                virtual_ip: 10.131.10.100
                virtual_port: 11400
            - match:
                virtual_ip: 10.131.10.100
                virtual_port: 11400
        must_not:
            - match:
                clientip: 10.131.213.103
            - match:
                clientip: 10.131.202.214

query:
    bool:
        should:
            - bool:
                must:
                - match:
                    virtual_ip: 10.131.10.100
                - match:
                    virtual_port: 11400
            - bool:
                must:
                - match:
                    virtual_ip: 10.131.10.100
                - match:
                    virtual_port: 11400
        must_not:
            - match:
                clientip: 10.131.213.103
            - match:
                clientip: 10.131.202.214
        must:
            - range:
                "timestamp": 
                    gte: '2021-12-06T09:00:00'
                    lt:  '2021-12-06T09:30:00'
                    
query:
    bool:
        must:
        - match:
            virtual_ip: 10.131.10.100
        - match:
            virtual_port: 11400
        - range:
            "timestamp": 
                gte: '2021-12-06T09:00:00Z'
                lt:  '2021-12-06T09:30:00Z'