/* * Copyright (C) 2018, 2019 StApps * This program is free software: you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the Free * Software Foundation, version 3. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for * more details. * * You should have received a copy of the GNU General Public License along with * this program. If not, see . */ import {asyncPool} from '@krlwlfrt/async-pool/lib/async-pool'; import {SCSearchRequest, SCThingType} from '@openstapps/core'; import {Bar} from 'cli-progress'; import {Client} from './client'; import {ConnectorClient} from './connector-client'; import {OutOfRangeError} from './errors'; import {HttpClientInterface} from './http-client-interface'; /** * Options to set up copying data from one backend to another */ export interface CopyOptions { /** * Batch size to copy at once */ batchSize: number; /** * URL of the backend to copy from */ from: string; /** * Source identifier */ source: string; /** * URL of the backend to copy to */ to: string; /** * StAppsCore type to copy */ type: SCThingType; /** * StApps version identifier to copy data for */ version: string; } /** * Copy data for a StAppsCore type from one backend to another * * @param client HTTP client * @param options Map of options */ export async function copy(client: HttpClientInterface, options: CopyOptions): Promise { const apiIn = new Client(client, options.from, options.version); const apiOut = new ConnectorClient(client, options.to); // open a bulk const bulk = await apiOut.bulk(options.type, options.source); let searchRequest: SCSearchRequest = { filter: { arguments: { field: 'type', value: options.type, }, type: 'value', }, size: 0, }; let searchResponse = await apiIn.search(searchRequest); searchRequest.size = options.batchSize; const progressBar = new Bar({}); progressBar.start(searchResponse.pagination.total, 0); let outOfRange = false; do { try { ({searchRequest, searchResponse} = await apiIn.searchNext(searchRequest, searchResponse)); await asyncPool(ConnectorClient.ITEM_CONCURRENT_LIMIT, searchResponse.data, async (item) => { progressBar.increment(1); return bulk.add(item); }); } catch (e) { if (e instanceof OutOfRangeError) { outOfRange = true; } else { progressBar.stop(); throw e; } } } while (!outOfRange); await bulk.done(); progressBar.stop(); }