mongoDB
[elasticsearch] mongoDB changeStream to Elasticsearch 데이터 동기화 - sync 맞추기
알로호모라
2022. 9. 5. 07:12
반응형
mongoDB 와 Elasticsearch 데이터가 연동되어 있는 경우, 데이터 생성, 수정, 삭제가 발생할 때마다 각각의 Db를 수기로 바꿔주는 것은 매우 비효율적이다! 그래서 MongoDB의 streamchange를 사용해서 변화가 생길 때 es에도 해당 변화가 업데이트되도록 만들어 주기 !
change stream을 사용하기위해 우리는 반드시 mongoDB replica set을 사용해야 한다고 함! 그 이유는 change stream이 oplog에 의해 수행되기 때문이다.
mongoDB collection에 changeStream 열기
db.collection('person').watch()
또는
Person.watch() // directly using the model Person 자체가 해당 모델 Import 해온 것
변경된 데이터 감지하기 .on('change', data => {} )
const Person = mongoose.model('Person', new mongoose.Schema({ name: String }));
// Create a change stream. The 'change' event gets emitted when there's a
// change in the database. Print what the change stream emits.
Person.watch().
on('change', data => console.log(data));
서버가 켜져있는 동안 mongoDB 데이터가 변경될 때 아래의 데이터가 나온다.
{
_id: {
_data: '8262408DAC000000012B022C0100296E5A10042890851837DB4792BE6B235E8B85489F46645F6964006462408DAC6F5C42FF5EE087A20004'
},
operationType: 'insert',
clusterTime: new Timestamp({ t: 1648397740, i: 1 }),
fullDocument: {
_id: new ObjectId("62408dac6f5c42ff5ee087a2"),
name: 'Axl Rose',
__v: 0
},
ns: { db: 'test', coll: 'people' },
documentKey: { _id: new ObjectId("62408dac6f5c42ff5ee087a2") }
}
이제 저 fullDocument에 나온 데이터 (수정된 것을 포함한 해당 도큐먼트의 전체) 를 Elasticsearch 에 맞게끔 타입을 가공해서 insert, update, delete를 해주면 된다.
const makeDoc = data => {
let doc = data?.fullDocument;
data 가공 후 필요한 요소만 Return
return [id, doc];
};
const exec = () => {
const client = ElasticSearch.getClient();
const pipeline = [
{
$match: {
$or: [{ operationType: 'insert' }, { operationType: 'update' }, { operationType: 'delete' }],
},
},
];
const watchOptions = {
fullDocument: 'updateLookup',
};
const closeChangeStream = async (timeInMs = 60000, stream) => {
return new Promise(() => {
setTimeout(() => {
console.log('Closing the change stream');
stream.close();
// resolve();
}, timeInMs);
});
};
RecommendationKeywordModel.watch(pipeline, watchOptions).on('change', data => {
const operationType = data?.operationType;
if (operationType === 'insert') {
const [id, doc] = makeDoc(data);
client.index({
index: EXAMPLE_INDEX,
id,
body: doc,
});
}
if (operationType === 'update') {
const [id, doc] = makeDoc(data);
client.update({
index: EXAMPLE_INDEX,
retry_on_conflict: 1000,
id,
doc,
});
}
if (operationType === 'delete') {
const id = data.documentKey._id;
client.delete({
index: EXAMPLE_INDEX,
id,
});
}
// closeChangeStream(1, changeStream);
});
};
export default { exec };
반응형