본문 바로가기

mongoDB

[elasticsearch] mongoDB changeStream to Elasticsearch 데이터 동기화 - sync 맞추기

반응형

mongoDB 와 Elasticsearch 데이터가 연동되어 있는 경우, 데이터 생성, 수정, 삭제가 발생할 때마다 각각의 Db를 수기로 바꿔주는 것은 매우 비효율적이다!  그래서 MongoDB의 streamchange를 사용해서 변화가 생길 때 es에도 해당 변화가 업데이트되도록 만들어 주기 ! 

 

change stream을 사용하기위해 우리는 반드시 mongoDB replica set을 사용해야 한다고 함!  그 이유는 change stream이 oplog에 의해 수행되기 때문이다. 

 

 

mongoDB collection에 changeStream 열기

db.collection('person').watch() 

또는

Person.watch() // directly using the model Person 자체가 해당 모델 Import 해온 것

 

 

변경된 데이터 감지하기 .on('change', data => {} ) 

const Person = mongoose.model('Person', new mongoose.Schema({ name: String }));

// Create a change stream. The 'change' event gets emitted when there's a
// change in the database. Print what the change stream emits.
Person.watch().
  on('change', data => console.log(data));

서버가 켜져있는 동안 mongoDB 데이터가 변경될 때 아래의 데이터가 나온다. 

{
  _id: {
    _data: '8262408DAC000000012B022C0100296E5A10042890851837DB4792BE6B235E8B85489F46645F6964006462408DAC6F5C42FF5EE087A20004'
  },
  operationType: 'insert',
  clusterTime: new Timestamp({ t: 1648397740, i: 1 }),
  fullDocument: {
    _id: new ObjectId("62408dac6f5c42ff5ee087a2"),
    name: 'Axl Rose',
    __v: 0
  },
  ns: { db: 'test', coll: 'people' },
  documentKey: { _id: new ObjectId("62408dac6f5c42ff5ee087a2") }
}

 

이제 저 fullDocument에 나온 데이터 (수정된 것을 포함한 해당 도큐먼트의 전체) 를 Elasticsearch 에 맞게끔 타입을 가공해서 insert, update, delete를 해주면 된다. 

 


const makeDoc = data => {
  let doc = data?.fullDocument;
 	data 가공 후 필요한 요소만 Return 

  return [id, doc];
};

const exec = () => {
  const client = ElasticSearch.getClient();

  const pipeline = [
    {
      $match: {
        $or: [{ operationType: 'insert' }, { operationType: 'update' }, { operationType: 'delete' }],
      },
    },
  ];
  const watchOptions = {
    fullDocument: 'updateLookup',
  };

  const closeChangeStream = async (timeInMs = 60000, stream) => {
    return new Promise(() => {
      setTimeout(() => {
        console.log('Closing the change stream');
        stream.close();
        // resolve();
      }, timeInMs);
    });
  };

  RecommendationKeywordModel.watch(pipeline, watchOptions).on('change', data => {
    const operationType = data?.operationType;

    if (operationType === 'insert') {
      const [id, doc] = makeDoc(data);
      client.index({
        index: EXAMPLE_INDEX,
        id,
        body: doc,
      });
    }
    if (operationType === 'update') {
      const [id, doc] = makeDoc(data);
      client.update({
        index: EXAMPLE_INDEX,
        retry_on_conflict: 1000,
        id,
        doc,
      });
    }
    if (operationType === 'delete') {
      const id = data.documentKey._id;
      client.delete({
        index: EXAMPLE_INDEX,
        id,
      });
    }

    // closeChangeStream(1, changeStream);
  });

};

export default { exec };

 

반응형