본문 바로가기

Elasticsearch

[Elasticsearch] node.js로 bulk 입력/업데이트/삭제 Bulk helpers - index, insert, update, delete

반응형

 

Elastic Bulk Operation을 할때 필요한 helpers 알아보기

 

Elasticsearch Bulk helpers 

async function index() {
    const datasetPath = path.join(__dirname, '..', 'fixtures', 'test-bulk.ndjson');
    const datasource = fs.createReadStream(datasetPath).pipe(split(JSON.parse));
    console.log('datasetPath=', datasetPath);
    console.log('datasource=', datasource);

    const result = await client.helpers.bulk<ITest>({
        datasource,      // 필수값인 인자값, An array, async generator or a readable stream to index/create/update/delete
        retries: 10,     // onDrop가기 전, retry 횟수 (default: client max retries)
        wait: 10000,     // retry가기 전, 얼마나 기다릴지 (default: 5000)
        concurrency: 10, // 몇개의 요청을 동시 실행가능하 (default: 5)
        flushBytes: 5000000, // bulk body bytes size (default: 5BM)
        flushInterval: 1000, // 마지막 document로부터 body를 flush하기 전 기다리는 시간
        onDocument (doc) { // 필수값인 인자값 / datasource의 각 document에 대입됨
            console.log('typeof doc=', typeof doc); // string -> JSON.parse를 넘기지 않는 경우
            console.log('typeof doc=', typeof doc); // object -> JSON.parse를 넘기는 경우

            return {
                index: { _index: 'test', _id: doc.id }
            }
        },
        onDrop (doc) { // document가 indexed 되지 못할 때 그리고 retries 최대수를 넘겼을 때
            console.log(`can't index document ${doc.document.id}`, doc.error);
            process.exit(1);
        },
        refreshOnCompletion: 'test'  // true일 경우 bulk operation 끝나고 모든 인덱스들을 refresh한다. (default: false)
        // refreshOnCompletion: 'test' // 인덱스이름을 적으면 해당 인덱스만 적용
    })
    console.log('result=', result);
}

위에 나열된 것들이 모두 helpers이다. 여기에서 datasource와 onDocument가 필수이고 나머지는 옵셔널하다. 

datasource 에 넣는 데이터가 만약 stream일 경우에 split2 패키지를 사용을 권장한다. 

 

refreshOnCompletion: true | false | 'index-name'  => 요거는 해당 operation이 끝나면 true일 경우 전체 인덱스들을 / 'index-name' 특정 인덱스 이름을 적으면 해당 인덱스만 다시 refresh를 시킨다. 이 후에 이어지는 함수가 조회라면 true를 해주어야 위의 operation에서 수정/추가/삭제 사항이 적용된 데이터르르 조회할 수 있다. 만약 false라면(defualt=false) 해당 operation 적용사항 이전의 내용이 조회된다. 

 

 

 

Supported Operations

Index 

client.helpers.bulk({
  datasource: myDatasource,
  onDocument (doc) {
    return {
      index: { _index: 'my-index' }
    }
  }
})

create

client.helpers.bulk({
  datasource: myDatasource,
  onDocument (doc) {
    return {
      create: { _index: 'my-index', _id: doc.id }
    }
  }
})

update

client.helpers.bulk({
  datasource: myDatasource,
  onDocument (doc) {
    // Note that the update operation requires you to return
    // an array, where the first element is the action, while
    // the second are the document option
    return [
      { update: { _index: 'my-index', _id: doc.id } },
      { doc_as_upsert: true }
    ]
  }
})

delete

client.helpers.bulk({
  datasource: myDatasource,
  onDocument (doc) {
    return {
      delete: { _index: 'my-index', _id: doc.id }
    }
  }
})

 

 

 

 

 

 

Reference : https://www.elastic.co/guide/en/elasticsearch/client/javascript-api/master/client-helpers.html

 

Client helpers | Elasticsearch JavaScript Client [master] | Elastic

The stop method stops the execution of the msearch processor, but if you are using a concurrency higher than one, the operations that are already running will not be stopped.

www.elastic.co

 

반응형