/* * Copyright (C) 2018 StApps * This program is free software: you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the Free * Software Foundation, version 3. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for * more details. * * You should have received a copy of the GNU General Public License along with * this program. If not, see . */ import {Bulk} from '@openstapps/api/lib/bulk'; import {ConnectorClient as Client} from '@openstapps/api/lib/connectorClient'; import {HttpClient} from '@openstapps/api/lib/httpClient'; import {SCBulkAddResponse, SCMessage} from '@openstapps/core'; import {Logger} from '@openstapps/logger'; import * as promiseLimit from 'promise-limit'; import {MinimalConnector} from '.'; const api = new Client(new HttpClient(), 'http://localhost:3000'); const logger = new Logger(); async function runConnector() { const connector = new MinimalConnector(); const items = await connector.getItems(); if (items.length === 0) { throw new Error('No items fetched.'); } let bulk: Bulk; try { bulk = await api.bulk('message', 'minimal-connector'); } catch (err) { logger.error('Couldn\'t open bulk.'); throw err; } // create a concurrency limit const limit = promiseLimit(5); try { // index all items with our concurrency limit await Promise.all(items.map((item) => { return limit(() => bulk.add(item)); })); } catch (err) { logger.error('Error while indexing items.'); throw err; } try { await bulk.done(); } catch (err) { logger.error('Error while closing bulk'); throw err; } } runConnector().then(() => { logger.log('Done.'); }, (err) => { throw err; });