Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 53 additions & 70 deletions packages/pipeline-console-reporter/src/consoleReporter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ export class ConsoleReporter implements ProgressReporter {
private datasetStartTime = 0;
private datasetTotal = 0;
private datasetIndex = 0;
private analysisResults: DistributionAnalysisResult[] = [];

pipelineStart(_name: string): void {
this.stageSpinner = ora({
Expand Down Expand Up @@ -49,20 +48,59 @@ export class ConsoleReporter implements ProgressReporter {
console.info(`Dataset ${chalk.bold(dataset.iri.toString())}${counter}`);
}

distributionsAnalyzed(
_dataset: Dataset,
results: DistributionAnalysisResult[]
): void {
this.analysisResults = results;
distributionProbed(result: DistributionAnalysisResult): void {
const url = result.distribution.accessUrl.toString();
const typeLabel =
result.type === 'sparql'
? 'SPARQL endpoint'
: result.type === 'data-dump'
? 'Data dump'
: 'Network error';

const s = ora({ discardStdin: false });
if (result.available) {
const detail =
result.statusCode !== undefined ? ` (HTTP ${result.statusCode})` : '';
s.start(`${typeLabel} ${url}${detail}`);
s.succeed();
} else {
const detail = result.error
? ` (${result.error})`
: result.statusCode !== undefined
? ` (HTTP ${result.statusCode})`
: '';
s.start(`${typeLabel} ${url}${detail}`);
s.fail();
}
}

importFailed(_distribution: Distribution, error: string): void {
const s = ora({ discardStdin: false });
s.start(`Import failed: ${error}`);
s.fail();
}

distributionSelected(
_dataset: Dataset,
distribution: Distribution,
importedFrom?: Distribution,
importDuration?: number
importDuration?: number,
): void {
this.printAnalysisResults(distribution, importedFrom, importDuration);
const s = ora({ discardStdin: false });
if (importedFrom) {
const duration =
importDuration !== undefined
? ` in ${chalk.bold(prettyMilliseconds(importDuration))}`
: '';
s.start(
`Imported ${importedFrom.accessUrl.toString()} (to ${distribution.accessUrl.toString()})${duration}`,
);
} else {
s.start(
`${distribution.accessUrl.toString()} ${chalk.dim('(selected)')}`,
);
}
s.succeed();
}

stageStart(stage: string): void {
Expand All @@ -78,9 +116,9 @@ export class ConsoleReporter implements ProgressReporter {
if (this.stageSpinner) {
const elapsed = prettyMilliseconds(Date.now() - this.stageStartTime);
this.stageSpinner.suffixText = `${compactNumber.format(
update.itemsProcessed
update.itemsProcessed,
)} items, ${compactNumber.format(
update.quadsGenerated
update.quadsGenerated,
)} quads, ${elapsed}`;
}
}
Expand All @@ -91,11 +129,11 @@ export class ConsoleReporter implements ProgressReporter {
itemsProcessed: number;
quadsGenerated: number;
duration: number;
}
},
): void {
if (this.stageSpinner) {
this.stageSpinner.suffixText = `took ${chalk.bold(
prettyMilliseconds(result.duration)
prettyMilliseconds(result.duration),
)}`;
this.stageSpinner.succeed();
this.stageSpinner = undefined;
Expand All @@ -122,80 +160,25 @@ export class ConsoleReporter implements ProgressReporter {
const s = ora({
discardStdin: false,
text: `Completed in ${chalk.bold(
prettyMilliseconds(Date.now() - this.datasetStartTime)
prettyMilliseconds(Date.now() - this.datasetStartTime),
)}`,
}).start();
s.succeed();
}

datasetSkipped(_dataset: Dataset, reason: string): void {
this.printAnalysisResults();
const s = ora({
discardStdin: false,
text: `Skipped: ${chalk.red(reason)}`,
}).start();
s.fail();
}

private printAnalysisResults(
selected?: Distribution,
importedFrom?: Distribution,
importDuration?: number
): void {
// Match by selected distribution URL, or by importedFrom URL (when a data
// dump was imported to a local SPARQL endpoint, the selected distribution
// is the local endpoint which doesn't appear in probe results).
const selectedUrl = selected?.accessUrl.toString();
const importedFromUrl = importedFrom?.accessUrl.toString();

for (const result of this.analysisResults) {
const resultUrl = result.distribution.accessUrl.toString();
const isSelected =
selected &&
(resultUrl === selectedUrl || resultUrl === importedFromUrl);
const typeLabel =
result.type === 'sparql'
? 'SPARQL endpoint'
: result.type === 'data-dump'
? 'Data dump'
: 'Network error';
const url = resultUrl;

const s = ora({ discardStdin: false });
if (isSelected) {
if (importedFrom) {
const duration =
importDuration !== undefined
? ` in ${chalk.bold(prettyMilliseconds(importDuration))}`
: '';
s.start(`Imported ${url} (to ${selectedUrl!})${duration}`);
} else {
s.start(`${typeLabel} ${url} ${chalk.dim('(selected)')}`);
}
s.succeed();
} else if (result.available) {
const detail =
result.statusCode !== undefined ? ` (HTTP ${result.statusCode})` : '';
s.start(`${typeLabel} ${url}${detail}`);
s.succeed();
} else {
const detail = result.error
? ` (${result.error})`
: result.statusCode !== undefined
? ` (HTTP ${result.statusCode})`
: '';
s.start(`${typeLabel} ${url}${detail}`);
s.fail();
}
}
this.analysisResults = [];
}

pipelineComplete(result: { duration: number }): void {
console.info(
`\nPipeline completed in ${chalk.bold(
prettyMilliseconds(result.duration)
)}`
prettyMilliseconds(result.duration),
)}`,
);
}
}
4 changes: 2 additions & 2 deletions packages/pipeline-console-reporter/vite.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ export default mergeConfig(
thresholds: {
autoUpdate: true,
functions: 0,
lines: 9.23,
lines: 8.62,
branches: 0,
statements: 9.23,
statements: 8.62,
},
},
},
Expand Down
13 changes: 11 additions & 2 deletions packages/pipeline/src/distribution/importResolver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { ImportFailed, ImportSuccessful } from '@lde/sparql-importer';
import type { SparqlServer } from '@lde/sparql-server';
import {
type DistributionResolver,
type ResolveCallbacks,
NoDistributionAvailable,
ResolvedDistribution,
} from './resolver.js';
Expand Down Expand Up @@ -44,7 +45,7 @@ export class ImportResolver implements DistributionResolver {
async resolve(
...args: Parameters<DistributionResolver['resolve']>
): Promise<ResolvedDistribution | NoDistributionAvailable> {
const [dataset] = args;
const [dataset, callbacks] = args;
const result = await this.inner.resolve(...args);

// 'sparql' strategy (default): use SPARQL endpoint if inner found one.
Expand All @@ -56,12 +57,13 @@ export class ImportResolver implements DistributionResolver {
}

// Either 'import' strategy or inner found nothing: import a data dump.
return this.importDataset(dataset, result.probeResults);
return this.importDataset(dataset, result.probeResults, callbacks);
}

private async importDataset(
dataset: Dataset,
probeResults: NoDistributionAvailable['probeResults'],
callbacks?: ResolveCallbacks,
): Promise<ResolvedDistribution | NoDistributionAvailable> {
const importStart = Date.now();
const importResult = await this.options.importer.import(dataset);
Expand All @@ -83,6 +85,13 @@ export class ImportResolver implements DistributionResolver {
);
}

if (importResult instanceof ImportFailed) {
callbacks?.onImportFailed?.(
importResult.distribution,
importResult.error,
);
}

return new NoDistributionAvailable(
dataset,
'No SPARQL endpoint or importable data dump available',
Expand Down
1 change: 1 addition & 0 deletions packages/pipeline/src/distribution/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export {
NoDistributionAvailable,
SparqlDistributionResolver,
type DistributionResolver,
type ResolveCallbacks,
type SparqlDistributionResolverOptions,
} from './resolver.js';

Expand Down
18 changes: 15 additions & 3 deletions packages/pipeline/src/distribution/resolver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,18 @@ export class NoDistributionAvailable {
) {}
}

/** Callbacks fired during distribution resolution. */
export interface ResolveCallbacks {
/** Called each time a single distribution probe completes. */
onProbe?: (distribution: Distribution, result: ProbeResultType) => void;
/** Called when importing a distribution fails. */
onImportFailed?: (distribution: Distribution, error: string) => void;
}

export interface DistributionResolver {
resolve(
dataset: Dataset,
callbacks?: ResolveCallbacks,
): Promise<ResolvedDistribution | NoDistributionAvailable>;
cleanup?(): Promise<void>;
}
Expand All @@ -49,11 +58,14 @@ export class SparqlDistributionResolver implements DistributionResolver {

async resolve(
dataset: Dataset,
callbacks?: ResolveCallbacks,
): Promise<ResolvedDistribution | NoDistributionAvailable> {
const results = await Promise.all(
dataset.distributions.map((distribution) =>
probe(distribution, this.timeout),
),
dataset.distributions.map(async (distribution) => {
const result = await probe(distribution, this.timeout);
callbacks?.onProbe?.(distribution, result);
return result;
}),
);

// Find first valid SPARQL endpoint.
Expand Down
56 changes: 29 additions & 27 deletions packages/pipeline/src/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,16 @@ export class Pipeline {
private async processDataset(dataset: Dataset): Promise<void> {
this.reporter?.datasetStart?.(dataset);

const resolved = await this.distributionResolver.resolve(dataset);

this.reporter?.distributionsAnalyzed?.(
dataset,
mapProbeResults(dataset, resolved.probeResults),
);
const resolved = await this.distributionResolver.resolve(dataset, {
onProbe: (distribution, result) => {
this.reporter?.distributionProbed?.(
mapProbeResult(distribution, result),
);
},
onImportFailed: (distribution, error) => {
this.reporter?.importFailed?.(distribution, error);
},
});

if (resolved instanceof NoDistributionAvailable) {
this.reporter?.datasetSkipped?.(dataset, resolved.message);
Expand Down Expand Up @@ -309,27 +313,25 @@ export class Pipeline {
}
}

function mapProbeResults(
dataset: Dataset,
probeResults: ProbeResultType[],
): DistributionAnalysisResult[] {
return probeResults.map((result, index) => {
if (result instanceof NetworkError) {
return {
distribution: dataset.distributions[index],
type: 'network-error' as const,
available: false,
error: result.message,
};
}
function mapProbeResult(
distribution: Distribution,
result: ProbeResultType,
): DistributionAnalysisResult {
if (result instanceof NetworkError) {
return {
distribution: dataset.distributions[index],
type:
result instanceof SparqlProbeResult
? ('sparql' as const)
: ('data-dump' as const),
available: result.isSuccess(),
statusCode: result.statusCode,
distribution,
type: 'network-error' as const,
available: false,
error: result.message,
};
});
}
return {
distribution,
type:
result instanceof SparqlProbeResult
? ('sparql' as const)
: ('data-dump' as const),
available: result.isSuccess(),
statusCode: result.statusCode,
};
}
8 changes: 4 additions & 4 deletions packages/pipeline/src/progressReporter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ export interface ProgressReporter {
pipelineStart?(name: string): void;
datasetsSelected?(count: number, duration: number): void;
datasetStart?(dataset: Dataset): void;
distributionsAnalyzed?(
dataset: Dataset,
results: DistributionAnalysisResult[],
): void;
/** Called each time a single distribution probe completes. */
distributionProbed?(result: DistributionAnalysisResult): void;
/** Called when importing a distribution fails. */
importFailed?(distribution: Distribution, error: string): void;
distributionSelected?(
dataset: Dataset,
distribution: Distribution,
Expand Down
Loading