From 2259da317a848660fc2b0a14e50e2ae5ae329889 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jovan=20Kruni=C4=87?= Date: Tue, 10 Nov 2020 15:38:42 +0100 Subject: [PATCH] fix: error thrown on consecutive connector executions Fixes #73 --- src/storage/elasticsearch/elasticsearch.ts | 7 ++-- test/common.ts | 4 ++- .../elasticsearch/elasticsearch.spec.ts | 32 ++++++++++++------- 3 files changed, 28 insertions(+), 15 deletions(-) diff --git a/src/storage/elasticsearch/elasticsearch.ts b/src/storage/elasticsearch/elasticsearch.ts index 50c4ea67..d36ba4d3 100644 --- a/src/storage/elasticsearch/elasticsearch.ts +++ b/src/storage/elasticsearch/elasticsearch.ts @@ -485,14 +485,15 @@ export class Elasticsearch implements Database { const item = await this.getObject(object.uid); - // we have to check that the item will get replaced if the index is rolled over + // check that the item will get replaced if the index is rolled over (index with the same name excluding ending uid) if (typeof item !== 'undefined') { const indexOfNew = Elasticsearch.getIndex(obj.type, bulk.source, bulk); const oldIndex = item._index; // new item doesn't replace the old one - if (oldIndex.substring(0, oldIndex.length - Elasticsearch.INDEX_UID_LENGTH + 1) - !== indexOfNew.substring(0, indexOfNew.length - Elasticsearch.INDEX_UID_LENGTH + 1)) { + // tslint:disable-next-line:no-magic-numbers + if (oldIndex.substring(0, oldIndex.lastIndexOf('_')) + !== indexOfNew.substring(0, indexOfNew.lastIndexOf('_'))) { throw new Error( // tslint:disable-next-line: no-magic-numbers `Object "${obj.uid}" already exists. Object was: ${JSON.stringify(obj, null, 2)}`, diff --git a/test/common.ts b/test/common.ts index 23133030..3e353c6d 100644 --- a/test/common.ts +++ b/test/common.ts @@ -24,6 +24,8 @@ import {MailQueue} from '../src/notification/mail-queue'; import {Bulk, BulkStorage} from '../src/storage/bulk-storage'; import getPort from 'get-port'; import {Database} from '../src/storage/database'; +import {Elasticsearch} from '../src/storage/elasticsearch/elasticsearch'; +import {v4} from 'uuid'; /** * Adds routers and configures an (express) app @@ -141,4 +143,4 @@ export const getTransport = (verified: boolean) => { } } -export const index = 'stapps_footype_foosource_foobar'; +export const getIndex = (uid?: string) => `stapps_footype_foosource_${uid ? uid : Elasticsearch.getIndexUID(v4())}`; diff --git a/test/storage/elasticsearch/elasticsearch.spec.ts b/test/storage/elasticsearch/elasticsearch.spec.ts index 7907a5f8..a8b46b20 100644 --- a/test/storage/elasticsearch/elasticsearch.spec.ts +++ b/test/storage/elasticsearch/elasticsearch.spec.ts @@ -32,7 +32,7 @@ import {Elasticsearch} from '../../../src/storage/elasticsearch/elasticsearch'; import * as Monitoring from '../../../src/storage/elasticsearch/monitoring'; import * as query from '../../../src/storage/elasticsearch/query'; import * as templating from '../../../src/storage/elasticsearch/templating'; -import {bulk, DEFAULT_TEST_TIMEOUT, getTransport, index} from '../../common'; +import {bulk, DEFAULT_TEST_TIMEOUT, getTransport, getIndex} from '../../common'; import fs from 'fs'; use(chaiAsPromised); @@ -292,7 +292,7 @@ describe('Elasticsearch', function () { }); it('should reject (throw an error) if the index name is not valid', async function () { - sandbox.stub(Elasticsearch, 'getIndex').returns(`invalid_${index}`); + sandbox.stub(Elasticsearch, 'getIndex').returns(`invalid_${getIndex}`); sandbox.createStubInstance(Client, {}); await es.init(); @@ -300,6 +300,7 @@ describe('Elasticsearch', function () { }); it('should create a new index', async function () { + const index = getIndex(); sandbox.stub(Elasticsearch, 'getIndex').returns(index); const putTemplateStub = sandbox.stub(templating, 'putTemplate'); const createStub = sandbox.stub(es.client.indices, 'create'); @@ -318,7 +319,7 @@ describe('Elasticsearch', function () { sandbox.restore(); }); it('should cleanup index in case of the expired bulk for bulk whose index is not in use', async function () { - sandbox.stub(Elasticsearch, 'getIndex').returns(index); + sandbox.stub(Elasticsearch, 'getIndex').returns(getIndex()); const clientDeleteStub = sandbox.stub(es.client.indices, 'delete'); await es.bulkExpired({...bulk, state: 'in progress'}); @@ -327,7 +328,7 @@ describe('Elasticsearch', function () { }); it('should not cleanup index in case of the expired bulk for bulk whose index is in use', async function () { - sandbox.stub(Elasticsearch, 'getIndex').returns(index); + sandbox.stub(Elasticsearch, 'getIndex').returns(getIndex()); const clientDeleteStub = sandbox.stub(es.client.indices, 'delete'); await es.bulkExpired({...bulk, state: 'done'}); @@ -342,7 +343,7 @@ describe('Elasticsearch', function () { }); it('should reject if the index name is not valid', async function () { - sandbox.stub(Elasticsearch, 'getIndex').returns(`invalid_${index}`); + sandbox.stub(Elasticsearch, 'getIndex').returns(`invalid_${getIndex()}`); sandbox.createStubInstance(Client, {}); await es.init(); @@ -350,6 +351,7 @@ describe('Elasticsearch', function () { }); it('should create a new index', async function () { + const index = getIndex(); const expectedRefreshActions = [ { add: {index: index, alias: SCThingType.Book}, @@ -423,6 +425,7 @@ describe('Elasticsearch', function () { }); it('should not post if the object already exists in an index which will not be rolled over', async function () { + const index = getIndex(); const oldIndex = index.replace('foosource', 'barsource'); const object: ElasticsearchObject = {_id: '', _index: oldIndex, _score: 0, _type: '', _source: message as SCMessage}; sandbox.stub(es.client, 'search').resolves({body:{hits: { hits: [object]}}}); @@ -431,6 +434,15 @@ describe('Elasticsearch', function () { return expect(es.post(object._source, bulk)).to.rejectedWith('exist'); }); + it('should not reject if the object already exists but in an index which will be rolled over', async function () { + const object: ElasticsearchObject = {_id: '', _index: getIndex(), _score: 0, _type: '', _source: message as SCMessage}; + sandbox.stub(es.client, 'search').resolves({body:{hits: { hits: [object]}}}); + // return index name with different generated UID (see getIndex method) + sandbox.stub(Elasticsearch, 'getIndex').returns(getIndex()); + + return expect(es.post(object._source, bulk)).to.not.rejectedWith('exist'); + }); + it('should reject if there is an object creation error on the elasticsearch side', async function () { sandbox.stub(es.client, 'search').resolves({body:{hits: { hits: []}}}); sandbox.stub(es.client, 'create').resolves({body: {created: false}}); @@ -465,8 +477,7 @@ describe('Elasticsearch', function () { sandbox.restore(); }); it('should reject to put if the object does not already exist', async function () { - const oldIndex = index.replace('foosource', 'barsource'); - const object: ElasticsearchObject = {_id: '', _index: oldIndex, _score: 0, _type: '', _source: message as SCMessage}; + const object: ElasticsearchObject = {_id: '', _index: getIndex(), _score: 0, _type: '', _source: message as SCMessage}; sandbox.stub(es.client, 'search').resolves({body:{hits: { hits: []}}}); return expect(es.put(object._source)).to.rejectedWith('exist'); @@ -474,8 +485,7 @@ describe('Elasticsearch', function () { it('should update the object if it already exists', async function () { let caughtParam: any; - const oldIndex = index.replace('foosource', 'barsource'); - const object: ElasticsearchObject = {_id: '', _index: oldIndex, _score: 0, _type: '', _source: message as SCMessage}; + const object: ElasticsearchObject = {_id: '', _index: getIndex(), _score: 0, _type: '', _source: message as SCMessage}; sandbox.stub(es.client, 'search').resolves({body:{hits: { hits: [object]}}}); // @ts-ignore const stubUpdate = sandbox.stub(es.client, 'update').callsFake((params) => { @@ -492,8 +502,8 @@ describe('Elasticsearch', function () { describe('search', async function () { let es: Elasticsearch; const sandbox = sinon.createSandbox(); - const objectMessage: ElasticsearchObject = {_id: '123', _index: index, _score: 0, _type: '', _source: message as SCMessage}; - const objectBook: ElasticsearchObject = {_id: '321', _index: index, _score: 0, _type: '', _source: book as SCBook}; + const objectMessage: ElasticsearchObject = {_id: '123', _index: getIndex(), _score: 0, _type: '', _source: message as SCMessage}; + const objectBook: ElasticsearchObject = {_id: '321', _index: getIndex(), _score: 0, _type: '', _source: book as SCBook}; const fakeEsAggregations = { '@all': { doc_count: 17,