Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion packages/pipeline-console-reporter/src/consoleReporter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,20 @@ export class ConsoleReporter implements ProgressReporter {
distribution: Distribution,
importedFrom?: Distribution,
importDuration?: number,
tripleCount?: number,
): void {
const s = ora({ discardStdin: false });
if (importedFrom) {
const count =
tripleCount !== undefined
? `${compactNumber.format(tripleCount)} triples, `
: '';
const duration =
importDuration !== undefined
? ` in ${chalk.bold(prettyMilliseconds(importDuration))}`
: '';
s.start(
`Imported ${importedFrom.accessUrl.toString()} (to ${distribution.accessUrl.toString()})${duration}`,
`Imported ${importedFrom.accessUrl.toString()} (${count}to ${distribution.accessUrl.toString()})${duration}`,
);
} else {
s.start(
Expand Down
53 changes: 52 additions & 1 deletion packages/pipeline-console-reporter/test/consoleReporter.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,60 @@
import { describe, it, expect } from 'vitest';
import { describe, it, expect, vi } from 'vitest';
import { ConsoleReporter } from '../src/consoleReporter.js';
import { Dataset, Distribution } from '@lde/dataset';

function makeDataset(): Dataset {
return new Dataset({
iri: new URL('http://example.org/dataset'),
distributions: [],
});
}

describe('ConsoleReporter', () => {
it('can be instantiated', () => {
const reporter = new ConsoleReporter();
expect(reporter).toBeInstanceOf(ConsoleReporter);
});

describe('distributionSelected', () => {
it('includes triple count when present', () => {
const reporter = new ConsoleReporter();
const spy = vi.spyOn(process.stderr, 'write').mockReturnValue(true);

reporter.distributionSelected(
makeDataset(),
Distribution.sparql(new URL('http://localhost:7001/sparql')),
new Distribution(
new URL('http://example.org/data.nt'),
'application/n-triples',
),
5000,
4800000,
);

const output = spy.mock.calls.map((c) => String(c[0])).join('');
expect(output).toContain('4.8M triples');
expect(output).toContain('to http://localhost:7001/sparql');
spy.mockRestore();
});

it('omits triple count when absent', () => {
const reporter = new ConsoleReporter();
const spy = vi.spyOn(process.stderr, 'write').mockReturnValue(true);

reporter.distributionSelected(
makeDataset(),
Distribution.sparql(new URL('http://localhost:7001/sparql')),
new Distribution(
new URL('http://example.org/data.nt'),
'application/n-triples',
),
5000,
);

const output = spy.mock.calls.map((c) => String(c[0])).join('');
expect(output).not.toContain('triples');
expect(output).toContain('to http://localhost:7001/sparql');
spy.mockRestore();
});
});
});
8 changes: 4 additions & 4 deletions packages/pipeline-console-reporter/vite.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ export default mergeConfig(
coverage: {
thresholds: {
autoUpdate: true,
functions: 0,
lines: 8.62,
branches: 0,
statements: 8.62,
functions: 7.14,
lines: 18.64,
branches: 12.5,
statements: 18.64,
},
},
},
Expand Down
1 change: 1 addition & 0 deletions packages/pipeline/src/distribution/importResolver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ export class ImportResolver implements DistributionResolver {
probeResults,
importResult.distribution,
Date.now() - importStart,
importResult.tripleCount,
);
}

Expand Down
1 change: 1 addition & 0 deletions packages/pipeline/src/distribution/resolver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export class ResolvedDistribution {
readonly probeResults: ProbeResultType[],
readonly importedFrom?: Distribution,
readonly importDuration?: number,
readonly tripleCount?: number,
) {}
}

Expand Down
1 change: 1 addition & 0 deletions packages/pipeline/src/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ export class Pipeline {
resolved.distribution,
resolved.importedFrom,
resolved.importDuration,
resolved.tripleCount,
);

try {
Expand Down
1 change: 1 addition & 0 deletions packages/pipeline/src/progressReporter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ export interface ProgressReporter {
distribution: Distribution,
importedFrom?: Distribution,
importDuration?: number,
tripleCount?: number,
): void;
stageStart?(stage: string): void;
stageProgress?(update: {
Expand Down
2 changes: 2 additions & 0 deletions packages/pipeline/test/distribution/importResolver.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ describe('ImportResolver', () => {
new ImportSuccessful(
Distribution.sparql(new URL('http://localhost:7878/sparql')),
'test-graph',
42000,
),
),
};
Expand All @@ -104,6 +105,7 @@ describe('ImportResolver', () => {
);
expect(resolved.probeResults).toHaveLength(1);
expect(resolved.probeResults[0]).toBeInstanceOf(DataDumpProbeResult);
expect(resolved.tripleCount).toBe(42000);
});

it('sets importedFrom on ResolvedDistribution when import succeeds', async () => {
Expand Down
5 changes: 4 additions & 1 deletion packages/pipeline/test/pipeline.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,8 @@ describe('Pipeline', () => {
sparqlDistribution,
[],
importedFromDistribution,
1000,
42000,
);

const pipeline = new Pipeline({
Expand All @@ -652,7 +654,8 @@ describe('Pipeline', () => {
dataset,
sparqlDistribution,
importedFromDistribution,
undefined,
1000,
42000,
);
});

Expand Down
4 changes: 2 additions & 2 deletions packages/pipeline/vite.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ export default mergeConfig(
thresholds: {
autoUpdate: true,
functions: 90.99,
lines: 93.97,
lines: 93.98,
branches: 89.15,
statements: 93.27,
statements: 93.28,
},
},
},
Expand Down
7 changes: 4 additions & 3 deletions packages/sparql-importer/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export interface Importer {
* Import a {@link Dataset} to a SPARQL server.
*/
import(
dataset: Dataset
dataset: Dataset,
): Promise<NotSupported | ImportFailed | ImportSuccessful>;
}

Expand All @@ -25,14 +25,15 @@ export interface Importer {
export class ImportSuccessful {
constructor(
public readonly distribution: Distribution,
public readonly identifier?: string
public readonly identifier?: string,
public readonly tripleCount?: number,
) {}
}

export class ImportFailed {
constructor(
public readonly distribution: Distribution,
public readonly error: string
public readonly error: string,
) {}
}

Expand Down
20 changes: 15 additions & 5 deletions packages/sparql-qlever/src/importer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,13 @@ export class Importer implements ImporterInterface {
distribution: Distribution & { mimeType: string },
): Promise<ImportSuccessful | ImportFailed> {
const localFile = await this.downloader.download(distribution);
await this.index(
const logs = await this.index(
localFile,
this.fileFormatFromMimeType(distribution.mimeType),
);
const tripleCount = this.parseTripleCount(logs);

return new ImportSuccessful(distribution);
return new ImportSuccessful(distribution, undefined, tripleCount);
}

private fileFormatFromMimeType(mimeType: string): fileFormat {
Expand All @@ -108,7 +109,15 @@ export class Importer implements ImporterInterface {
return format;
}

private async index(file: string, format: fileFormat): Promise<void> {
private parseTripleCount(logs: string): number | undefined {
// Extract num-triples.normal from the metadata JSON that the index
// command cats to stdout. Use a regex rather than JSON.parse because
// Docker log multiplexing prepends binary frame headers to each chunk.
const match = logs.match(/"num-triples":\{[^}]*"normal":(\d+)/);
return match ? Number(match[1]) : undefined;
}

private async index(file: string, format: fileFormat): Promise<string> {
const workingDir = dirname(file);
const settingsFile = 'index.settings.json';
// Turtle is not line-delimited, so QLever's parallel parser can't split
Expand All @@ -121,13 +130,14 @@ export class Importer implements ImporterInterface {

// TODO: write index to named volume instead of bind mount for better performance.

const metadataFile = `${this.indexName}.meta-data.json`;
const indexTask = await this.taskRunner.run(
`(zcat '${basename(file)}' 2>/dev/null || cat '${basename(
file,
)}') | qlever-index -i ${
this.indexName
} -s ${settingsFile} -F ${format} -f -`,
} -s ${settingsFile} -F ${format} -f - && cat ${metadataFile}`,
);
await this.taskRunner.wait(indexTask);
return await this.taskRunner.wait(indexTask);
}
}
1 change: 1 addition & 0 deletions packages/sparql-qlever/test/importer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ describe('Importer', () => {

const result = await importer.import(dataset);
expect(result).toBeInstanceOf(ImportSuccessful);
expect((result as ImportSuccessful).tripleCount).toBe(1);
}, 30_000);
});
});
Loading