diff --git a/packages/pipeline-console-reporter/src/consoleReporter.ts b/packages/pipeline-console-reporter/src/consoleReporter.ts index 7786f3b..02f6453 100644 --- a/packages/pipeline-console-reporter/src/consoleReporter.ts +++ b/packages/pipeline-console-reporter/src/consoleReporter.ts @@ -18,7 +18,6 @@ export class ConsoleReporter implements ProgressReporter { private datasetStartTime = 0; private datasetTotal = 0; private datasetIndex = 0; - private analysisResults: DistributionAnalysisResult[] = []; pipelineStart(_name: string): void { this.stageSpinner = ora({ @@ -49,20 +48,59 @@ export class ConsoleReporter implements ProgressReporter { console.info(`Dataset ${chalk.bold(dataset.iri.toString())}${counter}`); } - distributionsAnalyzed( - _dataset: Dataset, - results: DistributionAnalysisResult[] - ): void { - this.analysisResults = results; + distributionProbed(result: DistributionAnalysisResult): void { + const url = result.distribution.accessUrl.toString(); + const typeLabel = + result.type === 'sparql' + ? 'SPARQL endpoint' + : result.type === 'data-dump' + ? 'Data dump' + : 'Network error'; + + const s = ora({ discardStdin: false }); + if (result.available) { + const detail = + result.statusCode !== undefined ? ` (HTTP ${result.statusCode})` : ''; + s.start(`${typeLabel} ${url}${detail}`); + s.succeed(); + } else { + const detail = result.error + ? ` (${result.error})` + : result.statusCode !== undefined + ? ` (HTTP ${result.statusCode})` + : ''; + s.start(`${typeLabel} ${url}${detail}`); + s.fail(); + } + } + + importFailed(_distribution: Distribution, error: string): void { + const s = ora({ discardStdin: false }); + s.start(`Import failed: ${error}`); + s.fail(); } distributionSelected( _dataset: Dataset, distribution: Distribution, importedFrom?: Distribution, - importDuration?: number + importDuration?: number, ): void { - this.printAnalysisResults(distribution, importedFrom, importDuration); + const s = ora({ discardStdin: false }); + if (importedFrom) { + const duration = + importDuration !== undefined + ? ` in ${chalk.bold(prettyMilliseconds(importDuration))}` + : ''; + s.start( + `Imported ${importedFrom.accessUrl.toString()} (to ${distribution.accessUrl.toString()})${duration}`, + ); + } else { + s.start( + `${distribution.accessUrl.toString()} ${chalk.dim('(selected)')}`, + ); + } + s.succeed(); } stageStart(stage: string): void { @@ -78,9 +116,9 @@ export class ConsoleReporter implements ProgressReporter { if (this.stageSpinner) { const elapsed = prettyMilliseconds(Date.now() - this.stageStartTime); this.stageSpinner.suffixText = `${compactNumber.format( - update.itemsProcessed + update.itemsProcessed, )} items, ${compactNumber.format( - update.quadsGenerated + update.quadsGenerated, )} quads, ${elapsed}`; } } @@ -91,11 +129,11 @@ export class ConsoleReporter implements ProgressReporter { itemsProcessed: number; quadsGenerated: number; duration: number; - } + }, ): void { if (this.stageSpinner) { this.stageSpinner.suffixText = `took ${chalk.bold( - prettyMilliseconds(result.duration) + prettyMilliseconds(result.duration), )}`; this.stageSpinner.succeed(); this.stageSpinner = undefined; @@ -122,14 +160,13 @@ export class ConsoleReporter implements ProgressReporter { const s = ora({ discardStdin: false, text: `Completed in ${chalk.bold( - prettyMilliseconds(Date.now() - this.datasetStartTime) + prettyMilliseconds(Date.now() - this.datasetStartTime), )}`, }).start(); s.succeed(); } datasetSkipped(_dataset: Dataset, reason: string): void { - this.printAnalysisResults(); const s = ora({ discardStdin: false, text: `Skipped: ${chalk.red(reason)}`, @@ -137,65 +174,11 @@ export class ConsoleReporter implements ProgressReporter { s.fail(); } - private printAnalysisResults( - selected?: Distribution, - importedFrom?: Distribution, - importDuration?: number - ): void { - // Match by selected distribution URL, or by importedFrom URL (when a data - // dump was imported to a local SPARQL endpoint, the selected distribution - // is the local endpoint which doesn't appear in probe results). - const selectedUrl = selected?.accessUrl.toString(); - const importedFromUrl = importedFrom?.accessUrl.toString(); - - for (const result of this.analysisResults) { - const resultUrl = result.distribution.accessUrl.toString(); - const isSelected = - selected && - (resultUrl === selectedUrl || resultUrl === importedFromUrl); - const typeLabel = - result.type === 'sparql' - ? 'SPARQL endpoint' - : result.type === 'data-dump' - ? 'Data dump' - : 'Network error'; - const url = resultUrl; - - const s = ora({ discardStdin: false }); - if (isSelected) { - if (importedFrom) { - const duration = - importDuration !== undefined - ? ` in ${chalk.bold(prettyMilliseconds(importDuration))}` - : ''; - s.start(`Imported ${url} (to ${selectedUrl!})${duration}`); - } else { - s.start(`${typeLabel} ${url} ${chalk.dim('(selected)')}`); - } - s.succeed(); - } else if (result.available) { - const detail = - result.statusCode !== undefined ? ` (HTTP ${result.statusCode})` : ''; - s.start(`${typeLabel} ${url}${detail}`); - s.succeed(); - } else { - const detail = result.error - ? ` (${result.error})` - : result.statusCode !== undefined - ? ` (HTTP ${result.statusCode})` - : ''; - s.start(`${typeLabel} ${url}${detail}`); - s.fail(); - } - } - this.analysisResults = []; - } - pipelineComplete(result: { duration: number }): void { console.info( `\nPipeline completed in ${chalk.bold( - prettyMilliseconds(result.duration) - )}` + prettyMilliseconds(result.duration), + )}`, ); } } diff --git a/packages/pipeline-console-reporter/vite.config.ts b/packages/pipeline-console-reporter/vite.config.ts index be264f1..4c8cc1b 100644 --- a/packages/pipeline-console-reporter/vite.config.ts +++ b/packages/pipeline-console-reporter/vite.config.ts @@ -11,9 +11,9 @@ export default mergeConfig( thresholds: { autoUpdate: true, functions: 0, - lines: 9.23, + lines: 8.62, branches: 0, - statements: 9.23, + statements: 8.62, }, }, }, diff --git a/packages/pipeline/src/distribution/importResolver.ts b/packages/pipeline/src/distribution/importResolver.ts index 662206c..361c436 100644 --- a/packages/pipeline/src/distribution/importResolver.ts +++ b/packages/pipeline/src/distribution/importResolver.ts @@ -4,6 +4,7 @@ import { ImportFailed, ImportSuccessful } from '@lde/sparql-importer'; import type { SparqlServer } from '@lde/sparql-server'; import { type DistributionResolver, + type ResolveCallbacks, NoDistributionAvailable, ResolvedDistribution, } from './resolver.js'; @@ -44,7 +45,7 @@ export class ImportResolver implements DistributionResolver { async resolve( ...args: Parameters ): Promise { - const [dataset] = args; + const [dataset, callbacks] = args; const result = await this.inner.resolve(...args); // 'sparql' strategy (default): use SPARQL endpoint if inner found one. @@ -56,12 +57,13 @@ export class ImportResolver implements DistributionResolver { } // Either 'import' strategy or inner found nothing: import a data dump. - return this.importDataset(dataset, result.probeResults); + return this.importDataset(dataset, result.probeResults, callbacks); } private async importDataset( dataset: Dataset, probeResults: NoDistributionAvailable['probeResults'], + callbacks?: ResolveCallbacks, ): Promise { const importStart = Date.now(); const importResult = await this.options.importer.import(dataset); @@ -83,6 +85,13 @@ export class ImportResolver implements DistributionResolver { ); } + if (importResult instanceof ImportFailed) { + callbacks?.onImportFailed?.( + importResult.distribution, + importResult.error, + ); + } + return new NoDistributionAvailable( dataset, 'No SPARQL endpoint or importable data dump available', diff --git a/packages/pipeline/src/distribution/index.ts b/packages/pipeline/src/distribution/index.ts index e7ea5e9..5b94268 100644 --- a/packages/pipeline/src/distribution/index.ts +++ b/packages/pipeline/src/distribution/index.ts @@ -18,6 +18,7 @@ export { NoDistributionAvailable, SparqlDistributionResolver, type DistributionResolver, + type ResolveCallbacks, type SparqlDistributionResolverOptions, } from './resolver.js'; diff --git a/packages/pipeline/src/distribution/resolver.ts b/packages/pipeline/src/distribution/resolver.ts index 859f976..468a15e 100644 --- a/packages/pipeline/src/distribution/resolver.ts +++ b/packages/pipeline/src/distribution/resolver.ts @@ -20,9 +20,18 @@ export class NoDistributionAvailable { ) {} } +/** Callbacks fired during distribution resolution. */ +export interface ResolveCallbacks { + /** Called each time a single distribution probe completes. */ + onProbe?: (distribution: Distribution, result: ProbeResultType) => void; + /** Called when importing a distribution fails. */ + onImportFailed?: (distribution: Distribution, error: string) => void; +} + export interface DistributionResolver { resolve( dataset: Dataset, + callbacks?: ResolveCallbacks, ): Promise; cleanup?(): Promise; } @@ -49,11 +58,14 @@ export class SparqlDistributionResolver implements DistributionResolver { async resolve( dataset: Dataset, + callbacks?: ResolveCallbacks, ): Promise { const results = await Promise.all( - dataset.distributions.map((distribution) => - probe(distribution, this.timeout), - ), + dataset.distributions.map(async (distribution) => { + const result = await probe(distribution, this.timeout); + callbacks?.onProbe?.(distribution, result); + return result; + }), ); // Find first valid SPARQL endpoint. diff --git a/packages/pipeline/src/pipeline.ts b/packages/pipeline/src/pipeline.ts index 1d97082..d46cc52 100644 --- a/packages/pipeline/src/pipeline.ts +++ b/packages/pipeline/src/pipeline.ts @@ -140,12 +140,16 @@ export class Pipeline { private async processDataset(dataset: Dataset): Promise { this.reporter?.datasetStart?.(dataset); - const resolved = await this.distributionResolver.resolve(dataset); - - this.reporter?.distributionsAnalyzed?.( - dataset, - mapProbeResults(dataset, resolved.probeResults), - ); + const resolved = await this.distributionResolver.resolve(dataset, { + onProbe: (distribution, result) => { + this.reporter?.distributionProbed?.( + mapProbeResult(distribution, result), + ); + }, + onImportFailed: (distribution, error) => { + this.reporter?.importFailed?.(distribution, error); + }, + }); if (resolved instanceof NoDistributionAvailable) { this.reporter?.datasetSkipped?.(dataset, resolved.message); @@ -309,27 +313,25 @@ export class Pipeline { } } -function mapProbeResults( - dataset: Dataset, - probeResults: ProbeResultType[], -): DistributionAnalysisResult[] { - return probeResults.map((result, index) => { - if (result instanceof NetworkError) { - return { - distribution: dataset.distributions[index], - type: 'network-error' as const, - available: false, - error: result.message, - }; - } +function mapProbeResult( + distribution: Distribution, + result: ProbeResultType, +): DistributionAnalysisResult { + if (result instanceof NetworkError) { return { - distribution: dataset.distributions[index], - type: - result instanceof SparqlProbeResult - ? ('sparql' as const) - : ('data-dump' as const), - available: result.isSuccess(), - statusCode: result.statusCode, + distribution, + type: 'network-error' as const, + available: false, + error: result.message, }; - }); + } + return { + distribution, + type: + result instanceof SparqlProbeResult + ? ('sparql' as const) + : ('data-dump' as const), + available: result.isSuccess(), + statusCode: result.statusCode, + }; } diff --git a/packages/pipeline/src/progressReporter.ts b/packages/pipeline/src/progressReporter.ts index 14e73f4..4dc5c79 100644 --- a/packages/pipeline/src/progressReporter.ts +++ b/packages/pipeline/src/progressReporter.ts @@ -12,10 +12,10 @@ export interface ProgressReporter { pipelineStart?(name: string): void; datasetsSelected?(count: number, duration: number): void; datasetStart?(dataset: Dataset): void; - distributionsAnalyzed?( - dataset: Dataset, - results: DistributionAnalysisResult[], - ): void; + /** Called each time a single distribution probe completes. */ + distributionProbed?(result: DistributionAnalysisResult): void; + /** Called when importing a distribution fails. */ + importFailed?(distribution: Distribution, error: string): void; distributionSelected?( dataset: Dataset, distribution: Distribution, diff --git a/packages/pipeline/test/pipeline.test.ts b/packages/pipeline/test/pipeline.test.ts index 06af461..f8f76dc 100644 --- a/packages/pipeline/test/pipeline.test.ts +++ b/packages/pipeline/test/pipeline.test.ts @@ -7,11 +7,13 @@ import { ResolvedDistribution, NoDistributionAvailable, type DistributionResolver, + type ResolveCallbacks, } from '../src/distribution/resolver.js'; import { SparqlProbeResult, DataDumpProbeResult, NetworkError, + type ProbeResultType, } from '../src/distribution/probe.js'; import type { Writer } from '../src/writer/writer.js'; import type { ProgressReporter } from '../src/progressReporter.js'; @@ -40,8 +42,17 @@ function makeDatasetSelector(...datasets: Dataset[]): DatasetSelector { function makeResolver( result: ResolvedDistribution | NoDistributionAvailable, + /** Distributions and probe results to fire via onProbe callback. */ + probes?: Array<{ distribution: Distribution; result: ProbeResultType }>, ): DistributionResolver { - return { resolve: vi.fn().mockResolvedValue(result) }; + return { + resolve: vi.fn(async (_dataset: Dataset, callbacks?: ResolveCallbacks) => { + for (const p of probes ?? []) { + callbacks?.onProbe?.(p.distribution, p.result); + } + return result; + }), + }; } function makeResolvedDistribution(): ResolvedDistribution { @@ -68,8 +79,9 @@ function makeReporter(): RequiredReporter { datasetsSelected: vi.fn>(), datasetStart: vi.fn>(), - distributionsAnalyzed: - vi.fn>(), + distributionProbed: + vi.fn>(), + importFailed: vi.fn>(), distributionSelected: vi.fn>(), stageStart: vi.fn>(), @@ -469,7 +481,6 @@ describe('Pipeline', () => { const callOrder = [ reporter.pipelineStart, reporter.datasetStart, - reporter.distributionsAnalyzed, reporter.distributionSelected, reporter.stageStart, reporter.stageComplete, @@ -537,7 +548,7 @@ describe('Pipeline', () => { await expect(pipeline.run()).resolves.toBeUndefined(); }); - it('distributionsAnalyzed reports probe results correctly', async () => { + it('distributionProbed called once per distribution with correct result', async () => { const reporter = makeReporter(); const sparqlDist = Distribution.sparql( @@ -583,35 +594,36 @@ describe('Pipeline', () => { dataDumpResult, networkError, ]), + [ + { distribution: sparqlDist, result: sparqlResult }, + { distribution: dataDumpDist, result: dataDumpResult }, + { distribution: downDist, result: networkError }, + ], ), reporter, }); await pipeline.run(); - expect(reporter.distributionsAnalyzed).toHaveBeenCalledWith( - datasetWithDists, - [ - { - distribution: sparqlDist, - type: 'sparql', - available: true, - statusCode: 200, - }, - { - distribution: dataDumpDist, - type: 'data-dump', - available: false, - statusCode: 404, - }, - { - distribution: downDist, - type: 'network-error', - available: false, - error: 'Connection refused', - }, - ], - ); + expect(reporter.distributionProbed).toHaveBeenCalledTimes(3); + expect(reporter.distributionProbed).toHaveBeenCalledWith({ + distribution: sparqlDist, + type: 'sparql', + available: true, + statusCode: 200, + }); + expect(reporter.distributionProbed).toHaveBeenCalledWith({ + distribution: dataDumpDist, + type: 'data-dump', + available: false, + statusCode: 404, + }); + expect(reporter.distributionProbed).toHaveBeenCalledWith({ + distribution: downDist, + type: 'network-error', + available: false, + error: 'Connection refused', + }); }); it('distributionSelected reports importedFrom when import was used', async () => { @@ -644,7 +656,7 @@ describe('Pipeline', () => { ); }); - it('distributionsAnalyzed called even when dataset is skipped', async () => { + it('distributionProbed called even when dataset is skipped', async () => { const reporter = makeReporter(); const downDist = new Distribution( new URL('http://example.org/down'), @@ -667,23 +679,20 @@ describe('Pipeline', () => { new NoDistributionAvailable(datasetWithDist, 'No SPARQL endpoint', [ networkError, ]), + [{ distribution: downDist, result: networkError }], ), reporter, }); await pipeline.run(); - expect(reporter.distributionsAnalyzed).toHaveBeenCalledWith( - datasetWithDist, - [ - { - distribution: downDist, - type: 'network-error', - available: false, - error: 'Connection refused', - }, - ], - ); + expect(reporter.distributionProbed).toHaveBeenCalledTimes(1); + expect(reporter.distributionProbed).toHaveBeenCalledWith({ + distribution: downDist, + type: 'network-error', + available: false, + error: 'Connection refused', + }); expect(reporter.distributionSelected).not.toHaveBeenCalled(); }); diff --git a/packages/pipeline/vite.config.ts b/packages/pipeline/vite.config.ts index 1cb82bd..867d402 100644 --- a/packages/pipeline/vite.config.ts +++ b/packages/pipeline/vite.config.ts @@ -11,10 +11,10 @@ export default mergeConfig( coverage: { thresholds: { autoUpdate: true, - functions: 91.81, - lines: 94.11, - branches: 89.47, - statements: 93.4, + functions: 90.99, + lines: 93.97, + branches: 89.15, + statements: 93.27, }, }, },