diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index 9c1e83663..e302d793b 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -7,7 +7,7 @@ publish = false [dependencies] anyhow = "1.0.56" csv = "1.1.6" -milli = { path = "../milli" } +milli = { path = "../milli", default-features = false } mimalloc = { version = "0.1.29", default-features = false } serde_json = { version = "1.0.79", features = ["preserve_order"] } diff --git a/milli/src/update/index_documents/extract/extract_docid_word_positions.rs b/milli/src/update/index_documents/extract/extract_docid_word_positions.rs deleted file mode 100644 index e067623e2..000000000 --- a/milli/src/update/index_documents/extract/extract_docid_word_positions.rs +++ /dev/null @@ -1,174 +0,0 @@ -use std::collections::HashSet; -use std::convert::TryInto; -use std::fs::File; -use std::{io, mem, str}; - -use charabia::{SeparatorKind, Token, TokenKind, TokenizerBuilder}; -use roaring::RoaringBitmap; -use serde_json::Value; - -use super::helpers::{ - concat_u32s_array, create_sorter, sorter_into_reader, GrenadParameters, MAX_WORD_LENGTH, -}; -use crate::error::{InternalError, SerializationError}; -use crate::{absolute_from_relative_position, FieldId, Result, MAX_POSITION_PER_ATTRIBUTE}; - -/// Extracts the word and positions where this word appear and -/// prefixes it by the document id. -/// -/// Returns the generated internal documents ids and a grenad reader -/// with the list of extracted words from the given chunk of documents. -#[logging_timer::time] -pub fn extract_docid_word_positions( - obkv_documents: grenad::Reader, - indexer: GrenadParameters, - searchable_fields: &Option>, - stop_words: Option<&fst::Set<&[u8]>>, - max_positions_per_attributes: Option, -) -> Result<(RoaringBitmap, grenad::Reader)> { - let max_positions_per_attributes = max_positions_per_attributes - .map_or(MAX_POSITION_PER_ATTRIBUTE, |max| max.min(MAX_POSITION_PER_ATTRIBUTE)); - let max_memory = indexer.max_memory_by_thread(); - - let mut documents_ids = RoaringBitmap::new(); - let mut docid_word_positions_sorter = create_sorter( - grenad::SortAlgorithm::Stable, - concat_u32s_array, - indexer.chunk_compression_type, - indexer.chunk_compression_level, - indexer.max_nb_chunks, - max_memory, - ); - - let mut key_buffer = Vec::new(); - let mut field_buffer = String::new(); - let mut builder = TokenizerBuilder::new(); - if let Some(stop_words) = stop_words { - builder.stop_words(stop_words); - } - let tokenizer = builder.build(); - - let mut cursor = obkv_documents.into_cursor()?; - while let Some((key, value)) = cursor.move_on_next()? { - let document_id = key - .try_into() - .map(u32::from_be_bytes) - .map_err(|_| SerializationError::InvalidNumberSerialization)?; - let obkv = obkv::KvReader::::new(value); - - documents_ids.push(document_id); - key_buffer.clear(); - key_buffer.extend_from_slice(&document_id.to_be_bytes()); - - for (field_id, field_bytes) in obkv.iter() { - if searchable_fields.as_ref().map_or(true, |sf| sf.contains(&field_id)) { - let value = - serde_json::from_slice(field_bytes).map_err(InternalError::SerdeJson)?; - field_buffer.clear(); - if let Some(field) = json_to_string(&value, &mut field_buffer) { - let tokens = process_tokens(tokenizer.tokenize(field)) - .take_while(|(p, _)| (*p as u32) < max_positions_per_attributes); - - for (index, token) in tokens { - let token = token.lemma().trim(); - if !token.is_empty() && token.len() <= MAX_WORD_LENGTH { - key_buffer.truncate(mem::size_of::()); - key_buffer.extend_from_slice(token.as_bytes()); - - let position: u16 = index - .try_into() - .map_err(|_| SerializationError::InvalidNumberSerialization)?; - let position = absolute_from_relative_position(field_id, position); - docid_word_positions_sorter - .insert(&key_buffer, &position.to_ne_bytes())?; - } - } - } - } - } - } - - sorter_into_reader(docid_word_positions_sorter, indexer).map(|reader| (documents_ids, reader)) -} - -/// Transform a JSON value into a string that can be indexed. -fn json_to_string<'a>(value: &'a Value, buffer: &'a mut String) -> Option<&'a str> { - fn inner(value: &Value, output: &mut String) -> bool { - use std::fmt::Write; - match value { - Value::Null => false, - Value::Bool(boolean) => write!(output, "{}", boolean).is_ok(), - Value::Number(number) => write!(output, "{}", number).is_ok(), - Value::String(string) => write!(output, "{}", string).is_ok(), - Value::Array(array) => { - let mut count = 0; - for value in array { - if inner(value, output) { - output.push_str(". "); - count += 1; - } - } - // check that at least one value was written - count != 0 - } - Value::Object(object) => { - let mut buffer = String::new(); - let mut count = 0; - for (key, value) in object { - buffer.clear(); - let _ = write!(&mut buffer, "{}: ", key); - if inner(value, &mut buffer) { - buffer.push_str(". "); - // We write the "key: value. " pair only when - // we are sure that the value can be written. - output.push_str(&buffer); - count += 1; - } - } - // check that at least one value was written - count != 0 - } - } - } - - if let Value::String(string) = value { - Some(&string) - } else if inner(value, buffer) { - Some(buffer) - } else { - None - } -} - -/// take an iterator on tokens and compute their relative position depending on separator kinds -/// if it's an `Hard` separator we add an additional relative proximity of 8 between words, -/// else we keep the standart proximity of 1 between words. -fn process_tokens<'a>( - tokens: impl Iterator>, -) -> impl Iterator)> { - tokens - .skip_while(|token| token.is_separator()) - .scan((0, None), |(offset, prev_kind), token| { - match token.kind { - TokenKind::Word | TokenKind::StopWord | TokenKind::Unknown => { - *offset += match *prev_kind { - Some(TokenKind::Separator(SeparatorKind::Hard)) => 8, - Some(_) => 1, - None => 0, - }; - *prev_kind = Some(token.kind) - } - TokenKind::Separator(SeparatorKind::Hard) => { - *prev_kind = Some(token.kind); - } - TokenKind::Separator(SeparatorKind::Soft) - if *prev_kind != Some(TokenKind::Separator(SeparatorKind::Hard)) => - { - *prev_kind = Some(token.kind); - } - _ => (), - } - Some((*offset, token)) - }) - .filter(|(_, t)| t.is_word()) -} diff --git a/milli/src/update/index_documents/extract/extract_facet_number_docids.rs b/milli/src/update/index_documents/extract/extract_facet_number_docids.rs deleted file mode 100644 index 61157fa35..000000000 --- a/milli/src/update/index_documents/extract/extract_facet_number_docids.rs +++ /dev/null @@ -1,44 +0,0 @@ -use std::fs::File; -use std::io; - -use heed::{BytesDecode, BytesEncode}; - -use super::helpers::{ - create_sorter, merge_cbo_roaring_bitmaps, sorter_into_reader, GrenadParameters, -}; -use crate::heed_codec::facet::{FacetLevelValueF64Codec, FieldDocIdFacetF64Codec}; -use crate::Result; - -/// Extracts the facet number and the documents ids where this facet number appear. -/// -/// Returns a grenad reader with the list of extracted facet numbers and -/// documents ids from the given chunk of docid facet number positions. -#[logging_timer::time] -pub fn extract_facet_number_docids( - docid_fid_facet_number: grenad::Reader, - indexer: GrenadParameters, -) -> Result> { - let max_memory = indexer.max_memory_by_thread(); - - let mut facet_number_docids_sorter = create_sorter( - grenad::SortAlgorithm::Unstable, - merge_cbo_roaring_bitmaps, - indexer.chunk_compression_type, - indexer.chunk_compression_level, - indexer.max_nb_chunks, - max_memory, - ); - - let mut cursor = docid_fid_facet_number.into_cursor()?; - while let Some((key_bytes, _)) = cursor.move_on_next()? { - let (field_id, document_id, number) = - FieldDocIdFacetF64Codec::bytes_decode(key_bytes).unwrap(); - - let key = (field_id, 0, number, number); - let key_bytes = FacetLevelValueF64Codec::bytes_encode(&key).unwrap(); - - facet_number_docids_sorter.insert(key_bytes, document_id.to_ne_bytes())?; - } - - sorter_into_reader(facet_number_docids_sorter, indexer) -} diff --git a/milli/src/update/index_documents/extract/extract_facet_string_docids.rs b/milli/src/update/index_documents/extract/extract_facet_string_docids.rs deleted file mode 100644 index f7aa3730c..000000000 --- a/milli/src/update/index_documents/extract/extract_facet_string_docids.rs +++ /dev/null @@ -1,60 +0,0 @@ -use std::fs::File; -use std::iter::FromIterator; -use std::{io, str}; - -use roaring::RoaringBitmap; - -use super::helpers::{ - create_sorter, keep_first_prefix_value_merge_roaring_bitmaps, sorter_into_reader, - try_split_array_at, GrenadParameters, -}; -use crate::heed_codec::facet::{encode_prefix_string, FacetStringLevelZeroCodec}; -use crate::{FieldId, Result}; - -/// Extracts the facet string and the documents ids where this facet string appear. -/// -/// Returns a grenad reader with the list of extracted facet strings and -/// documents ids from the given chunk of docid facet string positions. -#[logging_timer::time] -pub fn extract_facet_string_docids( - docid_fid_facet_string: grenad::Reader, - indexer: GrenadParameters, -) -> Result> { - let max_memory = indexer.max_memory_by_thread(); - - let mut facet_string_docids_sorter = create_sorter( - grenad::SortAlgorithm::Stable, - keep_first_prefix_value_merge_roaring_bitmaps, - indexer.chunk_compression_type, - indexer.chunk_compression_level, - indexer.max_nb_chunks, - max_memory, - ); - - let mut key_buffer = Vec::new(); - let mut value_buffer = Vec::new(); - let mut cursor = docid_fid_facet_string.into_cursor()?; - while let Some((key, original_value_bytes)) = cursor.move_on_next()? { - let (field_id_bytes, bytes) = try_split_array_at(key).unwrap(); - let field_id = FieldId::from_be_bytes(field_id_bytes); - let (document_id_bytes, normalized_value_bytes) = try_split_array_at(bytes).unwrap(); - let document_id = u32::from_be_bytes(document_id_bytes); - let original_value = str::from_utf8(original_value_bytes)?; - - key_buffer.clear(); - FacetStringLevelZeroCodec::serialize_into( - field_id, - str::from_utf8(normalized_value_bytes)?, - &mut key_buffer, - ); - - value_buffer.clear(); - encode_prefix_string(original_value, &mut value_buffer)?; - let bitmap = RoaringBitmap::from_iter(Some(document_id)); - bitmap.serialize_into(&mut value_buffer)?; - - facet_string_docids_sorter.insert(&key_buffer, &value_buffer)?; - } - - sorter_into_reader(facet_string_docids_sorter, indexer) -} diff --git a/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs b/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs deleted file mode 100644 index f9d1443d5..000000000 --- a/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs +++ /dev/null @@ -1,151 +0,0 @@ -use std::collections::{BTreeMap, HashSet}; -use std::convert::TryInto; -use std::fs::File; -use std::io; -use std::mem::size_of; - -use heed::zerocopy::AsBytes; -use heed::BytesEncode; -use roaring::RoaringBitmap; -use serde_json::Value; - -use super::helpers::{create_sorter, keep_first, sorter_into_reader, GrenadParameters}; -use crate::error::InternalError; -use crate::facet::value_encoding::f64_into_bytes; -use crate::update::index_documents::{create_writer, writer_into_reader}; -use crate::{CboRoaringBitmapCodec, DocumentId, FieldId, Result, BEU32}; - -/// Extracts the facet values of each faceted field of each document. -/// -/// Returns the generated grenad reader containing the docid the fid and the orginal value as key -/// and the normalized value as value extracted from the given chunk of documents. -#[logging_timer::time] -pub fn extract_fid_docid_facet_values( - obkv_documents: grenad::Reader, - indexer: GrenadParameters, - faceted_fields: &HashSet, -) -> Result<(grenad::Reader, grenad::Reader, grenad::Reader)> { - let max_memory = indexer.max_memory_by_thread(); - - let mut fid_docid_facet_numbers_sorter = create_sorter( - grenad::SortAlgorithm::Stable, - keep_first, - indexer.chunk_compression_type, - indexer.chunk_compression_level, - indexer.max_nb_chunks, - max_memory.map(|m| m / 2), - ); - - let mut fid_docid_facet_strings_sorter = create_sorter( - grenad::SortAlgorithm::Stable, - keep_first, - indexer.chunk_compression_type, - indexer.chunk_compression_level, - indexer.max_nb_chunks, - max_memory.map(|m| m / 2), - ); - - let mut facet_exists_docids = BTreeMap::::new(); - - let mut key_buffer = Vec::new(); - let mut cursor = obkv_documents.into_cursor()?; - while let Some((docid_bytes, value)) = cursor.move_on_next()? { - let obkv = obkv::KvReader::new(value); - - for (field_id, field_bytes) in obkv.iter() { - if faceted_fields.contains(&field_id) { - key_buffer.clear(); - - // Set key to the field_id - // Note: this encoding is consistent with FieldIdCodec - key_buffer.extend_from_slice(&field_id.to_be_bytes()); - - // Here, we know already that the document must be added to the “field id exists” database - let document: [u8; 4] = docid_bytes[..4].try_into().ok().unwrap(); - let document = BEU32::from(document).get(); - - facet_exists_docids.entry(field_id).or_default().insert(document); - - // For the other extraction tasks, prefix the key with the field_id and the document_id - key_buffer.extend_from_slice(&docid_bytes); - - let value = - serde_json::from_slice(field_bytes).map_err(InternalError::SerdeJson)?; - - let (numbers, strings) = extract_facet_values(&value); - - // insert facet numbers in sorter - for number in numbers { - key_buffer.truncate(size_of::() + size_of::()); - if let Some(value_bytes) = f64_into_bytes(number) { - key_buffer.extend_from_slice(&value_bytes); - key_buffer.extend_from_slice(&number.to_be_bytes()); - - fid_docid_facet_numbers_sorter.insert(&key_buffer, ().as_bytes())?; - } - } - - // insert normalized and original facet string in sorter - for (normalized, original) in strings.into_iter().filter(|(n, _)| !n.is_empty()) { - key_buffer.truncate(size_of::() + size_of::()); - key_buffer.extend_from_slice(normalized.as_bytes()); - fid_docid_facet_strings_sorter.insert(&key_buffer, original.as_bytes())?; - } - } - } - } - - let mut facet_exists_docids_writer = create_writer( - indexer.chunk_compression_type, - indexer.chunk_compression_level, - tempfile::tempfile()?, - ); - for (fid, bitmap) in facet_exists_docids.into_iter() { - let bitmap_bytes = CboRoaringBitmapCodec::bytes_encode(&bitmap).unwrap(); - facet_exists_docids_writer.insert(fid.to_be_bytes(), &bitmap_bytes)?; - } - let facet_exists_docids_reader = writer_into_reader(facet_exists_docids_writer)?; - - Ok(( - sorter_into_reader(fid_docid_facet_numbers_sorter, indexer.clone())?, - sorter_into_reader(fid_docid_facet_strings_sorter, indexer.clone())?, - facet_exists_docids_reader, - )) -} - -fn extract_facet_values(value: &Value) -> (Vec, Vec<(String, String)>) { - fn inner_extract_facet_values( - value: &Value, - can_recurse: bool, - output_numbers: &mut Vec, - output_strings: &mut Vec<(String, String)>, - ) { - match value { - Value::Null => (), - Value::Bool(b) => output_strings.push((b.to_string(), b.to_string())), - Value::Number(number) => { - if let Some(float) = number.as_f64() { - output_numbers.push(float); - } - } - Value::String(original) => { - let normalized = original.trim().to_lowercase(); - output_strings.push((normalized, original.clone())); - } - Value::Array(values) => { - if can_recurse { - for value in values { - inner_extract_facet_values(value, false, output_numbers, output_strings); - } - } - } - Value::Object(_) => (), - } - } - - let mut facet_number_values = Vec::new(); - let mut facet_string_values = Vec::new(); - inner_extract_facet_values(value, true, &mut facet_number_values, &mut facet_string_values); - - (facet_number_values, facet_string_values) -} diff --git a/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs b/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs deleted file mode 100644 index d425e8d14..000000000 --- a/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs +++ /dev/null @@ -1,96 +0,0 @@ -use std::collections::HashMap; -use std::fs::File; -use std::{cmp, io}; - -use grenad::Sorter; - -use super::helpers::{ - create_sorter, merge_cbo_roaring_bitmaps, read_u32_ne_bytes, sorter_into_reader, - try_split_array_at, GrenadParameters, MergeFn, -}; -use crate::error::SerializationError; -use crate::index::db_name::DOCID_WORD_POSITIONS; -use crate::{relative_from_absolute_position, DocumentId, FieldId, Result}; - -/// Extracts the field id word count and the documents ids where -/// this field id with this amount of words appear. -/// -/// Returns a grenad reader with the list of extracted field id word counts -/// and documents ids from the given chunk of docid word positions. -#[logging_timer::time] -pub fn extract_fid_word_count_docids( - docid_word_positions: grenad::Reader, - indexer: GrenadParameters, -) -> Result> { - let max_memory = indexer.max_memory_by_thread(); - - let mut fid_word_count_docids_sorter = create_sorter( - grenad::SortAlgorithm::Unstable, - merge_cbo_roaring_bitmaps, - indexer.chunk_compression_type, - indexer.chunk_compression_level, - indexer.max_nb_chunks, - max_memory, - ); - - // This map is assumed to not consume a lot of memory. - let mut document_fid_wordcount = HashMap::new(); - let mut current_document_id = None; - - let mut cursor = docid_word_positions.into_cursor()?; - while let Some((key, value)) = cursor.move_on_next()? { - let (document_id_bytes, _word_bytes) = try_split_array_at(key) - .ok_or_else(|| SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?; - let document_id = u32::from_be_bytes(document_id_bytes); - - let curr_document_id = *current_document_id.get_or_insert(document_id); - if curr_document_id != document_id { - drain_document_fid_wordcount_into_sorter( - &mut fid_word_count_docids_sorter, - &mut document_fid_wordcount, - curr_document_id, - )?; - current_document_id = Some(document_id); - } - - for position in read_u32_ne_bytes(value) { - let (field_id, position) = relative_from_absolute_position(position); - let word_count = position as u32 + 1; - - let value = document_fid_wordcount.entry(field_id as FieldId).or_insert(0); - *value = cmp::max(*value, word_count); - } - } - - if let Some(document_id) = current_document_id { - // We must make sure that don't lose the current document field id - // word count map if we break because we reached the end of the chunk. - drain_document_fid_wordcount_into_sorter( - &mut fid_word_count_docids_sorter, - &mut document_fid_wordcount, - document_id, - )?; - } - - sorter_into_reader(fid_word_count_docids_sorter, indexer) -} - -fn drain_document_fid_wordcount_into_sorter( - fid_word_count_docids_sorter: &mut Sorter, - document_fid_wordcount: &mut HashMap, - document_id: DocumentId, -) -> Result<()> { - let mut key_buffer = Vec::new(); - - for (fid, count) in document_fid_wordcount.drain() { - if count <= 10 { - key_buffer.clear(); - key_buffer.extend_from_slice(&fid.to_be_bytes()); - key_buffer.push(count as u8); - - fid_word_count_docids_sorter.insert(&key_buffer, document_id.to_ne_bytes())?; - } - } - - Ok(()) -} diff --git a/milli/src/update/index_documents/extract/extract_word_docids.rs b/milli/src/update/index_documents/extract/extract_word_docids.rs deleted file mode 100644 index 4b965e9a8..000000000 --- a/milli/src/update/index_documents/extract/extract_word_docids.rs +++ /dev/null @@ -1,88 +0,0 @@ -use std::collections::HashSet; -use std::fs::File; -use std::io; -use std::iter::FromIterator; - -use roaring::RoaringBitmap; - -use super::helpers::{ - create_sorter, merge_roaring_bitmaps, serialize_roaring_bitmap, sorter_into_reader, - try_split_array_at, GrenadParameters, -}; -use crate::error::SerializationError; -use crate::index::db_name::DOCID_WORD_POSITIONS; -use crate::update::index_documents::helpers::read_u32_ne_bytes; -use crate::{relative_from_absolute_position, FieldId, Result}; - -/// Extracts the word and the documents ids where this word appear. -/// -/// Returns a grenad reader with the list of extracted words and -/// documents ids from the given chunk of docid word positions. -/// -/// The first returned reader is the one for normal word_docids, and the second one is for -/// exact_word_docids -#[logging_timer::time] -pub fn extract_word_docids( - docid_word_positions: grenad::Reader, - indexer: GrenadParameters, - exact_attributes: &HashSet, -) -> Result<(grenad::Reader, grenad::Reader)> { - let max_memory = indexer.max_memory_by_thread(); - - let mut word_docids_sorter = create_sorter( - grenad::SortAlgorithm::Unstable, - merge_roaring_bitmaps, - indexer.chunk_compression_type, - indexer.chunk_compression_level, - indexer.max_nb_chunks, - max_memory.map(|x| x / 2), - ); - - let mut exact_word_docids_sorter = create_sorter( - grenad::SortAlgorithm::Unstable, - merge_roaring_bitmaps, - indexer.chunk_compression_type, - indexer.chunk_compression_level, - indexer.max_nb_chunks, - max_memory.map(|x| x / 2), - ); - - let mut value_buffer = Vec::new(); - let mut cursor = docid_word_positions.into_cursor()?; - while let Some((key, positions)) = cursor.move_on_next()? { - let (document_id_bytes, word_bytes) = try_split_array_at(key) - .ok_or_else(|| SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?; - let document_id = u32::from_be_bytes(document_id_bytes); - - let bitmap = RoaringBitmap::from_iter(Some(document_id)); - serialize_roaring_bitmap(&bitmap, &mut value_buffer)?; - - // If there are no exact attributes, we do not need to iterate over positions. - if exact_attributes.is_empty() { - word_docids_sorter.insert(word_bytes, &value_buffer)?; - } else { - let mut added_to_exact = false; - let mut added_to_word_docids = false; - for position in read_u32_ne_bytes(positions) { - // as soon as we know that this word had been to both readers, we don't need to - // iterate over the positions. - if added_to_exact && added_to_word_docids { - break; - } - let (fid, _) = relative_from_absolute_position(position); - if exact_attributes.contains(&fid) && !added_to_exact { - exact_word_docids_sorter.insert(word_bytes, &value_buffer)?; - added_to_exact = true; - } else if !added_to_word_docids { - word_docids_sorter.insert(word_bytes, &value_buffer)?; - added_to_word_docids = true; - } - } - } - } - - Ok(( - sorter_into_reader(word_docids_sorter, indexer)?, - sorter_into_reader(exact_word_docids_sorter, indexer)?, - )) -} diff --git a/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs b/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs deleted file mode 100644 index 6add9d980..000000000 --- a/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs +++ /dev/null @@ -1,179 +0,0 @@ -use std::cmp::Ordering; -use std::collections::{BinaryHeap, HashMap}; -use std::fs::File; -use std::{cmp, io, mem, str, vec}; - -use super::helpers::{ - create_sorter, merge_cbo_roaring_bitmaps, read_u32_ne_bytes, sorter_into_reader, - try_split_array_at, GrenadParameters, MergeFn, -}; -use crate::error::SerializationError; -use crate::index::db_name::DOCID_WORD_POSITIONS; -use crate::proximity::{positions_proximity, MAX_DISTANCE}; -use crate::{DocumentId, Result}; - -/// Extracts the best proximity between pairs of words and the documents ids where this pair appear. -/// -/// Returns a grenad reader with the list of extracted word pairs proximities and -/// documents ids from the given chunk of docid word positions. -#[logging_timer::time] -pub fn extract_word_pair_proximity_docids( - docid_word_positions: grenad::Reader, - indexer: GrenadParameters, -) -> Result> { - let max_memory = indexer.max_memory_by_thread(); - - let mut word_pair_proximity_docids_sorter = create_sorter( - grenad::SortAlgorithm::Unstable, - merge_cbo_roaring_bitmaps, - indexer.chunk_compression_type, - indexer.chunk_compression_level, - indexer.max_nb_chunks, - max_memory.map(|m| m / 2), - ); - - // This map is assumed to not consume a lot of memory. - let mut document_word_positions_heap = BinaryHeap::new(); - let mut current_document_id = None; - - let mut cursor = docid_word_positions.into_cursor()?; - while let Some((key, value)) = cursor.move_on_next()? { - let (document_id_bytes, word_bytes) = try_split_array_at(key) - .ok_or_else(|| SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?; - let document_id = u32::from_be_bytes(document_id_bytes); - let word = str::from_utf8(word_bytes)?; - - let curr_document_id = *current_document_id.get_or_insert(document_id); - if curr_document_id != document_id { - let document_word_positions_heap = mem::take(&mut document_word_positions_heap); - document_word_positions_into_sorter( - curr_document_id, - document_word_positions_heap, - &mut word_pair_proximity_docids_sorter, - )?; - current_document_id = Some(document_id); - } - - let word = word.to_string(); - let mut positions: Vec<_> = read_u32_ne_bytes(value).collect(); - positions.sort_unstable(); - let mut iter = positions.into_iter(); - if let Some(position) = iter.next() { - document_word_positions_heap.push(PeekedWordPosition { word, position, iter }); - } - } - - if let Some(document_id) = current_document_id { - // We must make sure that don't lose the current document field id - // word count map if we break because we reached the end of the chunk. - let document_word_positions_heap = mem::take(&mut document_word_positions_heap); - document_word_positions_into_sorter( - document_id, - document_word_positions_heap, - &mut word_pair_proximity_docids_sorter, - )?; - } - - sorter_into_reader(word_pair_proximity_docids_sorter, indexer) -} - -/// Fills the list of all pairs of words with the shortest proximity between 1 and 7 inclusive. -/// -/// This list is used by the engine to calculate the documents containing words that are -/// close to each other. -fn document_word_positions_into_sorter<'b>( - document_id: DocumentId, - mut word_positions_heap: BinaryHeap>>, - word_pair_proximity_docids_sorter: &mut grenad::Sorter, -) -> Result<()> { - let mut word_pair_proximity = HashMap::new(); - let mut ordered_peeked_word_positions = Vec::new(); - while !word_positions_heap.is_empty() { - while let Some(peeked_word_position) = word_positions_heap.pop() { - ordered_peeked_word_positions.push(peeked_word_position); - if ordered_peeked_word_positions.len() == 7 { - break; - } - } - - if let Some((head, tail)) = ordered_peeked_word_positions.split_first() { - for PeekedWordPosition { word, position, .. } in tail { - let prox = positions_proximity(head.position, *position); - if prox > 0 && prox < MAX_DISTANCE { - word_pair_proximity - .entry((head.word.clone(), word.clone())) - .and_modify(|p| { - *p = cmp::min(*p, prox); - }) - .or_insert(prox); - - // We also compute the inverse proximity. - let prox = prox + 1; - if prox < MAX_DISTANCE { - word_pair_proximity - .entry((word.clone(), head.word.clone())) - .and_modify(|p| { - *p = cmp::min(*p, prox); - }) - .or_insert(prox); - } - } - } - - // Push the tail in the heap. - let tail_iter = ordered_peeked_word_positions.drain(1..); - word_positions_heap.extend(tail_iter); - - // Advance the head and push it in the heap. - if let Some(mut head) = ordered_peeked_word_positions.pop() { - if let Some(next_position) = head.iter.next() { - word_positions_heap.push(PeekedWordPosition { - word: head.word, - position: next_position, - iter: head.iter, - }); - } - } - } - } - - let mut key_buffer = Vec::new(); - for ((w1, w2), prox) in word_pair_proximity { - key_buffer.clear(); - key_buffer.extend_from_slice(w1.as_bytes()); - key_buffer.push(0); - key_buffer.extend_from_slice(w2.as_bytes()); - key_buffer.push(0); - key_buffer.push(prox as u8); - - word_pair_proximity_docids_sorter.insert(&key_buffer, &document_id.to_ne_bytes())?; - } - - Ok(()) -} - -struct PeekedWordPosition { - word: String, - position: u32, - iter: I, -} - -impl Ord for PeekedWordPosition { - fn cmp(&self, other: &Self) -> Ordering { - self.position.cmp(&other.position).reverse() - } -} - -impl PartialOrd for PeekedWordPosition { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Eq for PeekedWordPosition {} - -impl PartialEq for PeekedWordPosition { - fn eq(&self, other: &Self) -> bool { - self.position == other.position - } -} diff --git a/milli/src/update/index_documents/extract/extract_word_position_docids.rs b/milli/src/update/index_documents/extract/extract_word_position_docids.rs deleted file mode 100644 index c1661072a..000000000 --- a/milli/src/update/index_documents/extract/extract_word_position_docids.rs +++ /dev/null @@ -1,49 +0,0 @@ -use std::fs::File; -use std::io; - -use super::helpers::{ - create_sorter, merge_cbo_roaring_bitmaps, read_u32_ne_bytes, sorter_into_reader, - try_split_array_at, GrenadParameters, -}; -use crate::error::SerializationError; -use crate::index::db_name::DOCID_WORD_POSITIONS; -use crate::{DocumentId, Result}; - -/// Extracts the word positions and the documents ids where this word appear. -/// -/// Returns a grenad reader with the list of extracted words at positions and -/// documents ids from the given chunk of docid word positions. -#[logging_timer::time] -pub fn extract_word_position_docids( - docid_word_positions: grenad::Reader, - indexer: GrenadParameters, -) -> Result> { - let max_memory = indexer.max_memory_by_thread(); - - let mut word_position_docids_sorter = create_sorter( - grenad::SortAlgorithm::Unstable, - merge_cbo_roaring_bitmaps, - indexer.chunk_compression_type, - indexer.chunk_compression_level, - indexer.max_nb_chunks, - max_memory, - ); - - let mut key_buffer = Vec::new(); - let mut cursor = docid_word_positions.into_cursor()?; - while let Some((key, value)) = cursor.move_on_next()? { - let (document_id_bytes, word_bytes) = try_split_array_at(key) - .ok_or_else(|| SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?; - let document_id = DocumentId::from_be_bytes(document_id_bytes); - - for position in read_u32_ne_bytes(value) { - key_buffer.clear(); - key_buffer.extend_from_slice(word_bytes); - key_buffer.extend_from_slice(&position.to_be_bytes()); - - word_position_docids_sorter.insert(&key_buffer, &document_id.to_ne_bytes())?; - } - } - - sorter_into_reader(word_position_docids_sorter, indexer) -} diff --git a/milli/src/update/index_documents/extract/mod.rs b/milli/src/update/index_documents/extract/mod.rs deleted file mode 100644 index 157886e63..000000000 --- a/milli/src/update/index_documents/extract/mod.rs +++ /dev/null @@ -1,310 +0,0 @@ -mod extract_docid_word_positions; -mod extract_facet_number_docids; -mod extract_facet_string_docids; -mod extract_fid_docid_facet_values; -mod extract_fid_word_count_docids; -mod extract_geo_points; -mod extract_word_docids; -mod extract_word_pair_proximity_docids; -mod extract_word_position_docids; - -use std::collections::HashSet; -use std::fs::File; - -use crossbeam_channel::Sender; -use log::debug; -use rayon::prelude::*; - -use self::extract_docid_word_positions::extract_docid_word_positions; -use self::extract_facet_number_docids::extract_facet_number_docids; -use self::extract_facet_string_docids::extract_facet_string_docids; -use self::extract_fid_docid_facet_values::extract_fid_docid_facet_values; -use self::extract_fid_word_count_docids::extract_fid_word_count_docids; -use self::extract_geo_points::extract_geo_points; -use self::extract_word_docids::extract_word_docids; -use self::extract_word_pair_proximity_docids::extract_word_pair_proximity_docids; -use self::extract_word_position_docids::extract_word_position_docids; -use super::helpers::{ - as_cloneable_grenad, keep_first_prefix_value_merge_roaring_bitmaps, merge_cbo_roaring_bitmaps, - merge_roaring_bitmaps, CursorClonableMmap, GrenadParameters, MergeFn, MergeableReader, -}; -use super::{helpers, TypedChunk}; -use crate::{FieldId, Result}; - -/// Extract data for each databases from obkv documents in parallel. -/// Send data in grenad file over provided Sender. -pub(crate) fn data_from_obkv_documents( - original_obkv_chunks: impl Iterator>> + Send, - flattened_obkv_chunks: impl Iterator>> + Send, - indexer: GrenadParameters, - lmdb_writer_sx: Sender>, - searchable_fields: Option>, - faceted_fields: HashSet, - primary_key_id: FieldId, - geo_fields_ids: Option<(FieldId, FieldId)>, - stop_words: Option>, - max_positions_per_attributes: Option, - exact_attributes: HashSet, -) -> Result<()> { - original_obkv_chunks - .par_bridge() - .map(|original_documents_chunk| { - send_original_documents_data(original_documents_chunk, lmdb_writer_sx.clone()) - }) - .collect::>()?; - - let result: Result<(Vec<_>, (Vec<_>, (Vec<_>, Vec<_>)))> = flattened_obkv_chunks - .par_bridge() - .map(|flattened_obkv_chunks| { - send_and_extract_flattened_documents_data( - flattened_obkv_chunks, - indexer, - lmdb_writer_sx.clone(), - &searchable_fields, - &faceted_fields, - primary_key_id, - geo_fields_ids, - &stop_words, - max_positions_per_attributes, - ) - }) - .collect(); - - let ( - docid_word_positions_chunks, - ( - docid_fid_facet_numbers_chunks, - (docid_fid_facet_strings_chunks, facet_exists_docids_chunks), - ), - ) = result?; - - // merge facet_exists_docids and send them as a typed chunk - { - let lmdb_writer_sx = lmdb_writer_sx.clone(); - rayon::spawn(move || { - debug!("merge {} database", "facet-id-exists-docids"); - match facet_exists_docids_chunks.merge(merge_cbo_roaring_bitmaps, &indexer) { - Ok(reader) => { - let _ = lmdb_writer_sx.send(Ok(TypedChunk::FieldIdFacetExistsDocids(reader))); - } - Err(e) => { - let _ = lmdb_writer_sx.send(Err(e)); - } - } - }); - } - - spawn_extraction_task::<_, _, Vec>>( - docid_word_positions_chunks.clone(), - indexer.clone(), - lmdb_writer_sx.clone(), - extract_word_pair_proximity_docids, - merge_cbo_roaring_bitmaps, - TypedChunk::WordPairProximityDocids, - "word-pair-proximity-docids", - ); - - spawn_extraction_task::<_, _, Vec>>( - docid_word_positions_chunks.clone(), - indexer.clone(), - lmdb_writer_sx.clone(), - extract_fid_word_count_docids, - merge_cbo_roaring_bitmaps, - TypedChunk::FieldIdWordcountDocids, - "field-id-wordcount-docids", - ); - - spawn_extraction_task::<_, _, Vec<(grenad::Reader, grenad::Reader)>>( - docid_word_positions_chunks.clone(), - indexer.clone(), - lmdb_writer_sx.clone(), - move |doc_word_pos, indexer| extract_word_docids(doc_word_pos, indexer, &exact_attributes), - merge_roaring_bitmaps, - |(word_docids_reader, exact_word_docids_reader)| TypedChunk::WordDocids { - word_docids_reader, - exact_word_docids_reader, - }, - "word-docids", - ); - - spawn_extraction_task::<_, _, Vec>>( - docid_word_positions_chunks.clone(), - indexer.clone(), - lmdb_writer_sx.clone(), - extract_word_position_docids, - merge_cbo_roaring_bitmaps, - TypedChunk::WordPositionDocids, - "word-position-docids", - ); - - spawn_extraction_task::<_, _, Vec>>( - docid_fid_facet_strings_chunks.clone(), - indexer.clone(), - lmdb_writer_sx.clone(), - extract_facet_string_docids, - keep_first_prefix_value_merge_roaring_bitmaps, - TypedChunk::FieldIdFacetStringDocids, - "field-id-facet-string-docids", - ); - - spawn_extraction_task::<_, _, Vec>>( - docid_fid_facet_numbers_chunks.clone(), - indexer.clone(), - lmdb_writer_sx.clone(), - extract_facet_number_docids, - merge_cbo_roaring_bitmaps, - TypedChunk::FieldIdFacetNumberDocids, - "field-id-facet-number-docids", - ); - - Ok(()) -} - -/// Spawn a new task to extract data for a specific DB using extract_fn. -/// Generated grenad chunks are merged using the merge_fn. -/// The result of merged chunks is serialized as TypedChunk using the serialize_fn -/// and sent into lmdb_writer_sx. -fn spawn_extraction_task( - chunks: Vec>, - indexer: GrenadParameters, - lmdb_writer_sx: Sender>, - extract_fn: FE, - merge_fn: MergeFn, - serialize_fn: FS, - name: &'static str, -) where - FE: Fn(grenad::Reader, GrenadParameters) -> Result - + Sync - + Send - + 'static, - FS: Fn(M::Output) -> TypedChunk + Sync + Send + 'static, - M: MergeableReader + FromParallelIterator + Send + 'static, - M::Output: Send, -{ - rayon::spawn(move || { - let chunks: Result = - chunks.into_par_iter().map(|chunk| extract_fn(chunk, indexer.clone())).collect(); - rayon::spawn(move || match chunks { - Ok(chunks) => { - debug!("merge {} database", name); - let reader = chunks.merge(merge_fn, &indexer); - let _ = lmdb_writer_sx.send(reader.map(|r| serialize_fn(r))); - } - Err(e) => { - let _ = lmdb_writer_sx.send(Err(e)); - } - }) - }); -} - -/// Extract chunked data and send it into lmdb_writer_sx sender: -/// - documents -fn send_original_documents_data( - original_documents_chunk: Result>, - lmdb_writer_sx: Sender>, -) -> Result<()> { - let original_documents_chunk = - original_documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?; - - // TODO: create a custom internal error - lmdb_writer_sx.send(Ok(TypedChunk::Documents(original_documents_chunk))).unwrap(); - Ok(()) -} - -/// Extract chunked data and send it into lmdb_writer_sx sender: -/// - documents_ids -/// - docid_word_positions -/// - docid_fid_facet_numbers -/// - docid_fid_facet_strings -/// - docid_fid_facet_exists -fn send_and_extract_flattened_documents_data( - flattened_documents_chunk: Result>, - indexer: GrenadParameters, - lmdb_writer_sx: Sender>, - searchable_fields: &Option>, - faceted_fields: &HashSet, - primary_key_id: FieldId, - geo_fields_ids: Option<(FieldId, FieldId)>, - stop_words: &Option>, - max_positions_per_attributes: Option, -) -> Result<( - grenad::Reader, - ( - grenad::Reader, - (grenad::Reader, grenad::Reader), - ), -)> { - let flattened_documents_chunk = - flattened_documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?; - - if let Some(geo_fields_ids) = geo_fields_ids { - let documents_chunk_cloned = flattened_documents_chunk.clone(); - let lmdb_writer_sx_cloned = lmdb_writer_sx.clone(); - rayon::spawn(move || { - let result = - extract_geo_points(documents_chunk_cloned, indexer, primary_key_id, geo_fields_ids); - let _ = match result { - Ok(geo_points) => lmdb_writer_sx_cloned.send(Ok(TypedChunk::GeoPoints(geo_points))), - Err(error) => lmdb_writer_sx_cloned.send(Err(error)), - }; - }); - } - - let (docid_word_positions_chunk, docid_fid_facet_values_chunks): (Result<_>, Result<_>) = - rayon::join( - || { - let (documents_ids, docid_word_positions_chunk) = extract_docid_word_positions( - flattened_documents_chunk.clone(), - indexer.clone(), - searchable_fields, - stop_words.as_ref(), - max_positions_per_attributes, - )?; - - // send documents_ids to DB writer - let _ = lmdb_writer_sx.send(Ok(TypedChunk::NewDocumentsIds(documents_ids))); - - // send docid_word_positions_chunk to DB writer - let docid_word_positions_chunk = - unsafe { as_cloneable_grenad(&docid_word_positions_chunk)? }; - let _ = lmdb_writer_sx - .send(Ok(TypedChunk::DocidWordPositions(docid_word_positions_chunk.clone()))); - - Ok(docid_word_positions_chunk) - }, - || { - let ( - docid_fid_facet_numbers_chunk, - docid_fid_facet_strings_chunk, - fid_facet_exists_docids_chunk, - ) = extract_fid_docid_facet_values( - flattened_documents_chunk.clone(), - indexer.clone(), - faceted_fields, - )?; - - // send docid_fid_facet_numbers_chunk to DB writer - let docid_fid_facet_numbers_chunk = - unsafe { as_cloneable_grenad(&docid_fid_facet_numbers_chunk)? }; - - let _ = lmdb_writer_sx.send(Ok(TypedChunk::FieldIdDocidFacetNumbers( - docid_fid_facet_numbers_chunk.clone(), - ))); - - // send docid_fid_facet_strings_chunk to DB writer - let docid_fid_facet_strings_chunk = - unsafe { as_cloneable_grenad(&docid_fid_facet_strings_chunk)? }; - - let _ = lmdb_writer_sx.send(Ok(TypedChunk::FieldIdDocidFacetStrings( - docid_fid_facet_strings_chunk.clone(), - ))); - - Ok(( - docid_fid_facet_numbers_chunk, - (docid_fid_facet_strings_chunk, fid_facet_exists_docids_chunk), - )) - }, - ); - - Ok((docid_word_positions_chunk?, docid_fid_facet_values_chunks?)) -} diff --git a/milli/src/update/index_documents/extract2/docid_word_positions.rs b/milli/src/update/index_documents/extract2/docid_word_positions.rs new file mode 100644 index 000000000..232cdd165 --- /dev/null +++ b/milli/src/update/index_documents/extract2/docid_word_positions.rs @@ -0,0 +1,48 @@ +use std::collections::HashMap; + +use crate::{update::index_documents::MergeFn, Result}; +use grenad::Sorter; + +pub struct DocidWordPositionsExtractor<'out> { + docid: u32, + key_buffer: Vec, + value_buffer: Vec, + pub word_positions: HashMap, Vec>, + sorter: &'out mut Sorter, +} +impl<'out> DocidWordPositionsExtractor<'out> { + pub fn new(docid: u32, sorter: &'out mut Sorter) -> Self { + Self { + docid, + key_buffer: vec![], + value_buffer: vec![], + sorter, + word_positions: HashMap::default(), + } + } + + pub fn extract_from_token_and_position(&mut self, token: &[u8], position: u32) -> Result<()> { + let positions = self.word_positions.entry(token.to_vec()).or_default(); + positions.push(position); + Ok(()) + } + pub fn finish_fid(&mut self) -> crate::Result<()> { + let Self { docid, key_buffer, value_buffer, word_positions, sorter } = self; + + key_buffer.clear(); + key_buffer.extend_from_slice(&docid.to_ne_bytes()); + + for (word, positions) in word_positions.iter() { + value_buffer.clear(); + + key_buffer.truncate(std::mem::size_of::()); + key_buffer.extend_from_slice(word.as_slice()); + for pos in positions { + value_buffer.extend_from_slice(&pos.to_ne_bytes()); + } + sorter.insert(&key_buffer, &value_buffer)?; + } + word_positions.clear(); + Ok(()) + } +} diff --git a/milli/src/update/index_documents/extract2/facet_values.rs b/milli/src/update/index_documents/extract2/facet_values.rs new file mode 100644 index 000000000..4cb91cb01 --- /dev/null +++ b/milli/src/update/index_documents/extract2/facet_values.rs @@ -0,0 +1,136 @@ +use crate::{ + facet::value_encoding::f64_into_bytes, + heed_codec::facet::{encode_prefix_string, FacetLevelValueF64Codec, FacetStringLevelZeroCodec}, + update::index_documents::MergeFn, + Result, +}; +use grenad::Sorter; +use heed::{zerocopy::AsBytes, BytesEncode}; +use roaring::RoaringBitmap; +use serde_json::Value; +use std::iter::FromIterator; +use std::mem; + +pub struct FidDocIdFacetValuesExtractor<'out> { + docid: u32, + docid_value_buffer: Vec, + fid_docid_key_buffer: Vec, + fid_docid_numbers: &'out mut Sorter, + fid_docid_strings: &'out mut Sorter, + fid_docid_exists: &'out mut Sorter, + string_docids: &'out mut Sorter, + numbers_docids: &'out mut Sorter, +} +impl<'out> FidDocIdFacetValuesExtractor<'out> { + pub fn new( + docid: u32, + fid_docid_numbers: &'out mut Sorter, + fid_docid_strings: &'out mut Sorter, + fid_docid_exists: &'out mut Sorter, + string_docids: &'out mut Sorter, + numbers_docids: &'out mut Sorter, + ) -> Self { + let mut docid_value_buffer = vec![]; + docid_value_buffer.extend_from_slice(&docid.to_be_bytes()); + Self { + docid, + docid_value_buffer, + fid_docid_key_buffer: vec![], + + fid_docid_numbers, + fid_docid_strings, + fid_docid_exists, + string_docids, + numbers_docids, + } + } + + pub fn extract_from_field_id(&mut self, fid: u16, value: serde_json::Value) -> Result<()> { + // self.key_buffer availabsle + // let mut value_buffer = vec![]; + self.fid_docid_key_buffer.clear(); + + self.fid_docid_key_buffer.extend(&fid.to_be_bytes()); + + // EXISTS: + // key: fid + // value: CboRoaringBitmap of docids + self.fid_docid_exists.insert(&self.fid_docid_key_buffer, &self.docid.to_ne_bytes())?; + + self.fid_docid_key_buffer.extend(&self.docid_value_buffer); + + let (numbers, strings) = extract_facet_values(&value); + for number in numbers { + self.fid_docid_key_buffer.truncate(mem::size_of::() + mem::size_of::()); + if let Some(value_bytes) = f64_into_bytes(number) { + self.fid_docid_key_buffer.extend_from_slice(&value_bytes); + self.fid_docid_key_buffer.extend_from_slice(&number.to_be_bytes()); + + self.fid_docid_numbers.insert(&self.fid_docid_key_buffer, ().as_bytes())?; + + // FACET_NUMBERS_DOCIDS + // key: field id, level 0, number, number + // value: cboroaringbitmap of docids + // TODO: buffer this, reuse fid, 0 + // also reuse docid.to_ne_bytes buffer + let key = (fid, 0, number, number); + let key_bytes = FacetLevelValueF64Codec::bytes_encode(&key).unwrap(); + self.numbers_docids.insert(&key_bytes, &self.docid.to_ne_bytes())?; + } + } + // insert normalized and original facet string in sorter + for (normalized, original) in strings.into_iter().filter(|(n, _)| !n.is_empty()) { + self.fid_docid_key_buffer.truncate(mem::size_of::() + mem::size_of::()); + self.fid_docid_key_buffer.extend_from_slice(normalized.as_bytes()); + self.fid_docid_strings.insert(&self.fid_docid_key_buffer, original.as_bytes())?; + + // TODO: perf optimisations in this + let mut key_buffer = vec![]; + FacetStringLevelZeroCodec::serialize_into(fid, normalized.as_str(), &mut key_buffer); + let mut value_buffer = vec![]; + encode_prefix_string(original.as_str(), &mut value_buffer)?; + let bitmap = RoaringBitmap::from_iter(Some(self.docid)); + bitmap.serialize_into(&mut value_buffer)?; + self.string_docids.insert(&key_buffer, &value_buffer)?; + } + + Ok(()) + } +} + +fn extract_facet_values(value: &Value) -> (Vec, Vec<(String, String)>) { + fn inner_extract_facet_values( + value: &Value, + can_recurse: bool, + output_numbers: &mut Vec, + output_strings: &mut Vec<(String, String)>, + ) { + match value { + Value::Null => (), + Value::Bool(b) => output_strings.push((b.to_string(), b.to_string())), + Value::Number(number) => { + if let Some(float) = number.as_f64() { + output_numbers.push(float); + } + } + Value::String(original) => { + let normalized = original.trim().to_lowercase(); + output_strings.push((normalized, original.clone())); + } + Value::Array(values) => { + if can_recurse { + for value in values { + inner_extract_facet_values(value, false, output_numbers, output_strings); + } + } + } + Value::Object(_) => (), + } + } + + let mut facet_number_values = Vec::new(); + let mut facet_string_values = Vec::new(); + inner_extract_facet_values(value, true, &mut facet_number_values, &mut facet_string_values); + + (facet_number_values, facet_string_values) +} diff --git a/milli/src/update/index_documents/extract2/fid_word_count_docids.rs b/milli/src/update/index_documents/extract2/fid_word_count_docids.rs new file mode 100644 index 000000000..5712fcce7 --- /dev/null +++ b/milli/src/update/index_documents/extract2/fid_word_count_docids.rs @@ -0,0 +1,24 @@ +use crate::{update::index_documents::MergeFn, Result}; +use grenad::Sorter; + +pub struct FidWordCountDocids<'out> { + docid: u32, + key_buffer: Vec, + sorter: &'out mut Sorter, +} +impl<'out> FidWordCountDocids<'out> { + pub fn new(docid: u32, sorter: &'out mut Sorter) -> Self { + Self { docid, key_buffer: vec![], sorter } + } + + pub fn extract_from_fid_and_word_count(&mut self, fid: u16, word_count: u32) -> Result<()> { + if word_count <= 10 { + self.key_buffer.clear(); + self.key_buffer.extend_from_slice(&fid.to_be_bytes()); + self.key_buffer.push(word_count as u8); + + self.sorter.insert(&self.key_buffer, self.docid.to_ne_bytes())?; + } + Ok(()) + } +} diff --git a/milli/src/update/index_documents/extract/extract_geo_points.rs b/milli/src/update/index_documents/extract2/geo_points.rs similarity index 52% rename from milli/src/update/index_documents/extract/extract_geo_points.rs rename to milli/src/update/index_documents/extract2/geo_points.rs index 47085144a..12a4b0105 100644 --- a/milli/src/update/index_documents/extract/extract_geo_points.rs +++ b/milli/src/update/index_documents/extract2/geo_points.rs @@ -1,42 +1,41 @@ -use std::fs::File; -use std::io; - +use crate::{ + error::GeoError, update::index_documents::extract_finite_float_from_value, FieldId, + InternalError, Result, +}; use concat_arrays::concat_arrays; +use obkv::KvReader; use serde_json::Value; +use std::fs::File; -use super::helpers::{create_writer, writer_into_reader, GrenadParameters}; -use crate::error::GeoError; -use crate::update::index_documents::extract_finite_float_from_value; -use crate::{FieldId, InternalError, Result}; - -/// Extracts the geographical coordinates contained in each document under the `_geo` field. -/// -/// Returns the generated grenad reader containing the docid as key associated to the (latitude, longitude) -pub fn extract_geo_points( - obkv_documents: grenad::Reader, - indexer: GrenadParameters, - primary_key_id: FieldId, - (lat_fid, lng_fid): (FieldId, FieldId), -) -> Result> { - let mut writer = create_writer( - indexer.chunk_compression_type, - indexer.chunk_compression_level, - tempfile::tempfile()?, - ); - - let mut cursor = obkv_documents.into_cursor()?; - while let Some((docid_bytes, value)) = cursor.move_on_next()? { - let obkv = obkv::KvReader::new(value); +pub struct GeoPointsExtractor<'out> { + docid: u32, + primary_key_fid: u16, + lat_fid: u16, + lng_fid: u16, + writer: &'out mut grenad::Writer, +} +impl<'out> GeoPointsExtractor<'out> { + pub fn new( + docid: u32, + primary_key_fid: u16, + lat_fid: u16, + lng_fid: u16, + writer: &'out mut grenad::Writer, + ) -> Self { + Self { docid, primary_key_fid, lat_fid, lng_fid, writer } + } + + pub fn extract_from_obkv(&mut self, obkv: KvReader) -> Result<()> { // since we only needs the primary key when we throw an error we create this getter to // lazily get it when needed let document_id = || -> Value { - let document_id = obkv.get(primary_key_id).unwrap(); + let document_id = obkv.get(self.primary_key_fid).unwrap(); serde_json::from_slice(document_id).unwrap() }; // first we get the two fields - let lat = obkv.get(lat_fid); - let lng = obkv.get(lng_fid); + let lat = obkv.get(self.lat_fid); + let lng = obkv.get(self.lng_fid); if let Some((lat, lng)) = lat.zip(lng) { // then we extract the values @@ -51,13 +50,21 @@ pub fn extract_geo_points( .map_err(|lng| GeoError::BadLongitude { document_id: document_id(), value: lng })?; let bytes: [u8; 16] = concat_arrays![lat.to_ne_bytes(), lng.to_ne_bytes()]; - writer.insert(docid_bytes, bytes)?; + self.writer.insert(self.docid.to_be_bytes(), bytes)?; } else if lat.is_none() && lng.is_some() { return Err(GeoError::MissingLatitude { document_id: document_id() })?; } else if lat.is_some() && lng.is_none() { return Err(GeoError::MissingLongitude { document_id: document_id() })?; } + Ok(()) } - Ok(writer_into_reader(writer)?) + fn finish_docid(&mut self) {} +} + +// To make sure we don't forget to call finish_docid? +impl<'out> Drop for GeoPointsExtractor<'out> { + fn drop(&mut self) { + self.finish_docid(); + } } diff --git a/milli/src/update/index_documents/extract2/mod.rs b/milli/src/update/index_documents/extract2/mod.rs new file mode 100644 index 000000000..31706d157 --- /dev/null +++ b/milli/src/update/index_documents/extract2/mod.rs @@ -0,0 +1,553 @@ +use self::fid_word_count_docids::FidWordCountDocids; + +use super::helpers::{ + concat_u32s_array, create_sorter, keep_first, keep_first_prefix_value_merge_roaring_bitmaps, + sorter_into_reader, GrenadParameters, MergeableReader, +}; +use super::typed_chunk::TypedChunk; +use super::{ + as_cloneable_grenad, create_writer, merge_cbo_roaring_bitmaps, merge_roaring_bitmaps, + writer_into_reader, ClonableMmap, MergeFn, +}; +use crate::error::{InternalError, SerializationError}; +use crate::{FieldId, Result}; +use charabia::TokenizerBuilder; +use grenad::{Reader, SortAlgorithm, Sorter}; +use obkv::KvReader; +use serde_json::Value; +use std::collections::HashSet; +use std::convert::TryInto; +use std::fmt::Write; +use std::fs::File; +use std::io::{Cursor, Read, Seek}; +use std::{str, thread}; + +mod docid_word_positions; +mod facet_values; +mod fid_word_count_docids; +mod geo_points; +mod tokenize; +mod word_docids; +mod word_pair_proximity; +mod word_position_docid; + +#[derive(Clone, Copy)] +pub struct Context<'a> { + pub primary_key_fid: u16, + pub geo_fields_ids: Option<(u16, u16)>, + pub searchable_fields: &'a Option>, + pub faceted_fields: &'a HashSet, + pub stop_words: Option<&'a fst::Set<&'a [u8]>>, + pub max_positions_per_attributes: u32, + pub exact_attributes: &'a HashSet, + pub grenad_params: GrenadParameters, +} + +pub struct ExtractingData { + word_position_docids: Sorter, + word_pair_proximity_docids: Sorter, + docid_word_positions: Sorter, + word_docids: Sorter, + exact_word_docids: Sorter, + fid_word_count_docids: Sorter, + fid_docid_facet_exists: Sorter, + fid_docid_facet_numbers: Sorter, + fid_docid_facet_strings: Sorter, + facet_string_docids: Sorter, + facet_numbers_docids: Sorter, + geo_points: grenad::Writer, +} + +struct SkippingReaderCursor +where + R: Read + Seek, +{ + cursor: grenad::ReaderCursor, + skip: usize, + first: bool, +} +impl SkippingReaderCursor +where + R: Read + Seek, +{ + fn new(cursor: grenad::ReaderCursor, skip: usize) -> Self { + Self { cursor, skip, first: true } + } + fn next(&mut self) -> Option<(&[u8], &[u8])> { + if self.first { + self.first = false; + return self.cursor.move_on_next().unwrap(); + } + for _ in 0..self.skip { + let _ = self.cursor.move_on_next().unwrap()?; + } + self.cursor.move_on_next().unwrap() + } +} + +pub fn extract_data( + max_memory: usize, + num_threads: usize, + flattened_documents: grenad::Reader>, + ctx: Context<'_>, +) -> Result> { + let mut cursor = flattened_documents.into_cursor()?; + thread::scope(|s| { + let cursors = (0..num_threads) + .map(|_| { + let skipping_cursor = SkippingReaderCursor::new(cursor.clone(), num_threads - 1); + let _ = cursor.move_on_next(); + skipping_cursor + }) + .collect::>(); + + let handles = cursors + .into_iter() + .map(|mut cursor| { + let max_memory = max_memory / num_threads; + // TODO: when an error is thrown, cancel everything + s.spawn(move || { + let mut state = ExtractingData::new(max_memory)?; + loop { + if let Some((docid, obkv)) = cursor.next() { + state.extract_document(docid, obkv, ctx)?; + } else { + break; + }; + } + let state = state.finish(ctx.grenad_params)?; + Ok(state) + }) + }) + .collect::>(); + + handles.into_iter().map(|h| h.join().unwrap()).collect() + }) +} + +impl ExtractingData { + pub fn new(max_memory: usize) -> Result { + let word_docids = create_sorter( + SortAlgorithm::Unstable, + merge_roaring_bitmaps, + grenad::CompressionType::None, + None, + None, + Some(max_memory / 20), + ); + let word_position_docids = create_sorter( + SortAlgorithm::Unstable, + merge_cbo_roaring_bitmaps, + grenad::CompressionType::None, + None, + None, + Some(max_memory / 20), + ); + let word_pair_proximity_docids = create_sorter( + SortAlgorithm::Unstable, + merge_cbo_roaring_bitmaps, + grenad::CompressionType::None, + None, + None, + Some(max_memory / 2), + ); + let docid_word_positions = create_sorter( + grenad::SortAlgorithm::Stable, + concat_u32s_array, + grenad::CompressionType::None, + None, + None, + Some(max_memory / 10), + ); + // I used 14/20th of the memory so far + let exact_word_docids = create_sorter( + grenad::SortAlgorithm::Unstable, + merge_roaring_bitmaps, + grenad::CompressionType::None, + None, + None, + Some(max_memory / 10), + ); + // I used 16/20th of the memory so far + let fid_word_count_docids = create_sorter( + grenad::SortAlgorithm::Unstable, + merge_cbo_roaring_bitmaps, + grenad::CompressionType::None, + None, + None, + Some(max_memory / 40), + ); + // lost track of memory usage from here + + let fid_docid_facet_exists = create_sorter( + grenad::SortAlgorithm::Unstable, + merge_cbo_roaring_bitmaps, + grenad::CompressionType::None, + None, + None, + // TODO: SorterPool so I don't have to specify an arbitrary amount of memory + Some(max_memory / 40), + ); + let fid_docid_facet_numbers = create_sorter( + grenad::SortAlgorithm::Stable, + keep_first, + grenad::CompressionType::None, + None, + None, + Some(max_memory / 40), + ); + let fid_docid_facet_strings = create_sorter( + grenad::SortAlgorithm::Stable, + keep_first, + grenad::CompressionType::None, + None, + None, + Some(max_memory / 20), + ); + let facet_string_docids = create_sorter( + grenad::SortAlgorithm::Stable, + keep_first_prefix_value_merge_roaring_bitmaps, + grenad::CompressionType::None, + None, + None, + Some(max_memory / 20), + ); + // 19/20 + let facet_numbers_docids = create_sorter( + grenad::SortAlgorithm::Unstable, + merge_cbo_roaring_bitmaps, + grenad::CompressionType::None, + None, + None, + Some(max_memory / 20), + ); + // 20/20 (I am not sure that's correct anymore, it might be more) + let geo_points = create_writer(grenad::CompressionType::None, None, tempfile::tempfile()?); + + Ok(Self { + word_position_docids, + word_pair_proximity_docids, + docid_word_positions, + word_docids, + exact_word_docids, + fid_word_count_docids, + fid_docid_facet_exists, + fid_docid_facet_numbers, + fid_docid_facet_strings, + facet_string_docids, + facet_numbers_docids, + geo_points, + }) + } + + /// ## Arguments + /// - `docid`: the document id, a big-endian encoded u32 + /// - `obkv`: the content of the document encoded as object key-values + /// - `ctx`: context needed to extract the document (e.g. milli settings) + fn extract_document(&mut self, docid: &[u8], obkv: &[u8], ctx: Context<'_>) -> Result<()> { + let mut key_buffer = Vec::new(); // this could be self.key_buffer instead + let mut field_buffer = String::new(); // this could be self.field_buffer instead + + let Self { + exact_word_docids, + word_docids, + word_position_docids, + word_pair_proximity_docids, + docid_word_positions, + fid_word_count_docids, + fid_docid_facet_exists, + fid_docid_facet_numbers, + fid_docid_facet_strings, + facet_string_docids, + facet_numbers_docids, + geo_points, + } = self; + + let mut tokenizer = TokenizerBuilder::new(); + if let Some(stop_words) = ctx.stop_words { + tokenizer.stop_words(stop_words); + } + let tokenizer = tokenizer.build(); + + let docid = docid + .try_into() + .map(u32::from_be_bytes) + .map_err(|_| SerializationError::InvalidNumberSerialization)?; + let obkv = KvReader::::new(obkv); + let mut word_pair_proximity_extractor = + word_pair_proximity::WordPairProximityDocidsExtractor::new( + docid, + word_pair_proximity_docids, + ); + let mut word_position_extractor = + word_position_docid::WordPositionExtractor::new(docid, word_position_docids); + let mut docid_word_positions_extractor = + docid_word_positions::DocidWordPositionsExtractor::new(docid, docid_word_positions); + let mut word_docids_extractor = + word_docids::WordDocidsExtractor::new(docid, word_docids, exact_word_docids)?; + key_buffer.extend_from_slice(&docid.to_be_bytes()); + let mut fid_docid_facet_values_extractor = facet_values::FidDocIdFacetValuesExtractor::new( + docid, + fid_docid_facet_numbers, + fid_docid_facet_strings, + fid_docid_facet_exists, + facet_string_docids, + facet_numbers_docids, + ); + let mut fid_word_count_docids_extractor = + FidWordCountDocids::new(docid, fid_word_count_docids); + + if let Some((lat_fid, lng_fid)) = ctx.geo_fields_ids { + let mut extractor = geo_points::GeoPointsExtractor::new( + docid, + ctx.primary_key_fid, + lat_fid, + lng_fid, + geo_points, + ); + extractor.extract_from_obkv(obkv)?; + } + + 'field_ids: for (field_id, field_bytes) in obkv.iter() { + let is_searchable = + ctx.searchable_fields.as_ref().map_or(true, |sf| sf.contains(&field_id)); + let is_faceted = ctx.faceted_fields.contains(&field_id); + if !is_searchable && !is_faceted { + continue 'field_ids; + } + + let value = serde_json::from_slice(field_bytes).map_err(InternalError::SerdeJson)?; + field_buffer.clear(); + + if is_searchable { + if let Some(field) = json_to_string(&value, &mut field_buffer) { + let mut word_count = 0; + tokenize::tokenize(&ctx, field_id, field, &tokenizer, |position, token| { + word_position_extractor.extract_from_token_and_position(token, position)?; + docid_word_positions_extractor + .extract_from_token_and_position(token, position)?; + word_pair_proximity_extractor + .extract_from_token_and_position(token, position)?; + word_count = position + 1; + Ok(()) + })?; + + word_docids_extractor.extract_from_docid_word_positions_extractor( + field_id, + &docid_word_positions_extractor, + ctx, + )?; + docid_word_positions_extractor.finish_fid()?; + fid_word_count_docids_extractor + .extract_from_fid_and_word_count(field_id, word_count)?; + } + } + if is_faceted { + fid_docid_facet_values_extractor.extract_from_field_id(field_id, value)?; + } + } + word_pair_proximity_extractor.finish_docid()?; + Ok(()) + } +} + +/// Transform a JSON value into a string that can be indexed. +fn json_to_string<'a>(value: &'a Value, buffer: &'a mut String) -> Option<&'a str> { + fn inner(value: &Value, output: &mut String) -> bool { + match value { + Value::Null => false, + Value::Bool(boolean) => write!(output, "{}", boolean).is_ok(), + Value::Number(number) => write!(output, "{}", number).is_ok(), + Value::String(string) => write!(output, "{}", string).is_ok(), + Value::Array(array) => { + let mut count = 0; + for value in array { + if inner(value, output) { + output.push_str(". "); + count += 1; + } + } + // check that at least one value was written + count != 0 + } + Value::Object(object) => { + let mut buffer = String::new(); + let mut count = 0; + for (key, value) in object { + buffer.clear(); + let _ = write!(&mut buffer, "{}: ", key); + if inner(value, &mut buffer) { + buffer.push_str(". "); + // We write the "key: value. " pair only when + // we are sure that the value can be written. + output.push_str(&buffer); + count += 1; + } + } + // check that at least one value was written + count != 0 + } + } + } + + if let Value::String(string) = value { + Some(&string) + } else if inner(value, buffer) { + Some(buffer) + } else { + None + } +} + +pub struct ExtractedData { + pub word_position_docids: Reader, + pub word_pair_proximity_docids: Reader, + pub docid_word_positions: Reader, + pub word_docids: Reader, + pub exact_word_docids: Reader, + pub fid_word_count_docids: Reader, + pub fid_docid_facet_exists: Reader, + pub fid_docid_facet_numbers: Reader, + pub fid_docid_facet_strings: Reader, + pub facet_string_docids: Reader, + pub facet_numbers_docids: Reader, + pub geo_points: Reader, +} +impl ExtractingData { + fn finish(self, indexer: GrenadParameters) -> Result { + Ok(ExtractedData { + word_position_docids: sorter_into_reader(self.word_position_docids, indexer)?, + word_pair_proximity_docids: sorter_into_reader( + self.word_pair_proximity_docids, + indexer, + )?, + docid_word_positions: sorter_into_reader(self.docid_word_positions, indexer)?, + word_docids: sorter_into_reader(self.word_docids, indexer)?, + exact_word_docids: sorter_into_reader(self.exact_word_docids, indexer)?, + fid_word_count_docids: sorter_into_reader(self.fid_word_count_docids, indexer)?, + fid_docid_facet_exists: sorter_into_reader(self.fid_docid_facet_exists, indexer)?, + fid_docid_facet_numbers: sorter_into_reader(self.fid_docid_facet_numbers, indexer)?, + fid_docid_facet_strings: sorter_into_reader(self.fid_docid_facet_strings, indexer)?, + facet_string_docids: sorter_into_reader(self.facet_string_docids, indexer)?, + facet_numbers_docids: sorter_into_reader(self.facet_numbers_docids, indexer)?, + geo_points: writer_into_reader(self.geo_points)?, + }) + } +} + +pub(crate) fn merge_extracted_data( + data: Vec, + indexer: GrenadParameters, + sender: crossbeam_channel::Sender>, +) -> Result<()> { + let mut word_position_docids = vec![]; + let mut word_pair_proximity_docids = vec![]; + let mut docid_word_positions = vec![]; + let mut word_docids = vec![]; + let mut exact_word_docids = vec![]; + let mut fid_word_count_docids = vec![]; + let mut fid_docid_facet_exists = vec![]; + let mut fid_docid_facet_numbers = vec![]; + let mut fid_docid_facet_strings = vec![]; + let mut facet_string_docids = vec![]; + let mut facet_numbers_docids = vec![]; + let mut geo_points = vec![]; + + for data in data { + word_position_docids.push(data.word_position_docids); + word_pair_proximity_docids.push(data.word_pair_proximity_docids); + docid_word_positions.push(data.docid_word_positions); + word_docids.push(data.word_docids); + exact_word_docids.push(data.exact_word_docids); + fid_word_count_docids.push(data.fid_word_count_docids); + fid_docid_facet_exists.push(data.fid_docid_facet_exists); + fid_docid_facet_numbers.push(data.fid_docid_facet_numbers); + fid_docid_facet_strings.push(data.fid_docid_facet_strings); + facet_string_docids.push(data.facet_string_docids); + facet_numbers_docids.push(data.facet_numbers_docids); + geo_points.push(data.geo_points); + } + let sx = sender.clone(); + std::thread::spawn(move || { + let chunk = word_position_docids + .merge(merge_cbo_roaring_bitmaps, &indexer) + .map(TypedChunk::WordPositionDocids); + sx.send(chunk).unwrap(); + }); + let sx = sender.clone(); + std::thread::spawn(move || { + let chunk = word_pair_proximity_docids + .merge(merge_cbo_roaring_bitmaps, &indexer) + .map(TypedChunk::WordPairProximityDocids); + sx.send(chunk).unwrap(); + }); + let sx = sender.clone(); + std::thread::spawn(move || { + let chunk = docid_word_positions + .merge(concat_u32s_array, &indexer) + .map(|r| TypedChunk::DocidWordPositions(unsafe { as_cloneable_grenad(&r) }.unwrap())); + sx.send(chunk).unwrap(); + }); + let sx = sender.clone(); + std::thread::spawn(move || match word_docids.merge(merge_roaring_bitmaps, &indexer) { + Ok(word_docids) => match exact_word_docids.merge(merge_cbo_roaring_bitmaps, &indexer) { + Ok(exact_word_docids) => sx + .send(Ok(TypedChunk::WordDocids { + word_docids_reader: word_docids, + exact_word_docids_reader: exact_word_docids, + })) + .unwrap(), + Err(err) => sx.send(Err(err)).unwrap(), + }, + Err(err) => sx.send(Err(err)).unwrap(), + }); + let sx = sender.clone(); + std::thread::spawn(move || { + let chunk = fid_word_count_docids + .merge(merge_cbo_roaring_bitmaps, &indexer) + .map(TypedChunk::FieldIdWordcountDocids); + sx.send(chunk).unwrap(); + }); + let sx = sender.clone(); + std::thread::spawn(move || { + let chunk = fid_docid_facet_exists + .merge(merge_cbo_roaring_bitmaps, &indexer) + .map(TypedChunk::FieldIdFacetExistsDocids); + sx.send(chunk).unwrap(); + }); + let sx = sender.clone(); + std::thread::spawn(move || { + let chunk = fid_docid_facet_numbers.merge(keep_first, &indexer).map(|r| { + TypedChunk::FieldIdDocidFacetNumbers(unsafe { as_cloneable_grenad(&r) }.unwrap()) + }); + sx.send(chunk).unwrap(); + }); + let sx = sender.clone(); + std::thread::spawn(move || { + let chunk = fid_docid_facet_strings.merge(keep_first, &indexer).map(|r| { + TypedChunk::FieldIdDocidFacetStrings(unsafe { as_cloneable_grenad(&r) }.unwrap()) + }); + sx.send(chunk).unwrap(); + }); + let sx = sender.clone(); + std::thread::spawn(move || { + let chunk = facet_string_docids + .merge(keep_first_prefix_value_merge_roaring_bitmaps, &indexer) + .map(TypedChunk::FieldIdFacetStringDocids); + sx.send(chunk).unwrap(); + }); + let sx = sender.clone(); + std::thread::spawn(move || { + let chunk = facet_numbers_docids + .merge(merge_cbo_roaring_bitmaps, &indexer) + .map(TypedChunk::FieldIdFacetNumberDocids); + sx.send(chunk).unwrap(); + }); + let sx = sender.clone(); + std::thread::spawn(move || { + let chunk = geo_points.merge(keep_first, &indexer).map(TypedChunk::GeoPoints); + sx.send(chunk).unwrap(); + }); + + Ok(()) +} diff --git a/milli/src/update/index_documents/extract2/tokenize.rs b/milli/src/update/index_documents/extract2/tokenize.rs new file mode 100644 index 000000000..93ae963e4 --- /dev/null +++ b/milli/src/update/index_documents/extract2/tokenize.rs @@ -0,0 +1,64 @@ +use super::Context; +use crate::{ + absolute_from_relative_position, update::index_documents::helpers::MAX_WORD_LENGTH, Result, + SerializationError, +}; +use charabia::{SeparatorKind, Token, TokenKind, Tokenizer}; +use std::convert::TryInto; + +// TODO: make it clear that it returns absolute positions +pub fn tokenize( + ctx: &Context, + field_id: u16, + field: &str, + tokenizer: &Tokenizer<&[u8]>, + mut cb: impl FnMut(u32, &[u8]) -> Result<()>, +) -> Result<()> { + let tokens = process_tokens(tokenizer.tokenize(field)) + .take_while(|(p, _)| (*p as u32) < ctx.max_positions_per_attributes); + for (index, token) in tokens { + let token = token.lemma().trim(); + if !token.is_empty() && token.len() <= MAX_WORD_LENGTH { + let token_bytes = token.as_bytes(); + + let position: u16 = + index.try_into().map_err(|_| SerializationError::InvalidNumberSerialization)?; + let position = absolute_from_relative_position(field_id, position); + cb(position, token_bytes)?; + } + } + Ok(()) +} + +/// take an iterator on tokens and compute their relative position depending on separator kinds +/// if it's an `Hard` separator we add an additional relative proximity of 8 between words, +/// else we keep the standart proximity of 1 between words. +fn process_tokens<'a>( + tokens: impl Iterator>, +) -> impl Iterator)> { + tokens + .skip_while(|token| token.is_separator()) + .scan((0, None), |(offset, prev_kind), token| { + match token.kind { + TokenKind::Word | TokenKind::StopWord | TokenKind::Unknown => { + *offset += match *prev_kind { + Some(TokenKind::Separator(SeparatorKind::Hard)) => 8, + Some(_) => 1, + None => 0, + }; + *prev_kind = Some(token.kind) + } + TokenKind::Separator(SeparatorKind::Hard) => { + *prev_kind = Some(token.kind); + } + TokenKind::Separator(SeparatorKind::Soft) + if *prev_kind != Some(TokenKind::Separator(SeparatorKind::Hard)) => + { + *prev_kind = Some(token.kind); + } + _ => (), + } + Some((*offset, token)) + }) + .filter(|(_, t)| t.is_word()) +} diff --git a/milli/src/update/index_documents/extract2/word_docids.rs b/milli/src/update/index_documents/extract2/word_docids.rs new file mode 100644 index 000000000..af1c5ec6c --- /dev/null +++ b/milli/src/update/index_documents/extract2/word_docids.rs @@ -0,0 +1,50 @@ +use std::iter::FromIterator; + +use crate::{ + update::index_documents::{helpers::serialize_roaring_bitmap, MergeFn}, + Result, +}; +use grenad::Sorter; +use roaring::RoaringBitmap; + +use super::{docid_word_positions::DocidWordPositionsExtractor, Context}; + +pub struct WordDocidsExtractor<'out> { + value_buffer: Vec, + word_docids_sorter: &'out mut Sorter, + exact_word_docids_sorter: &'out mut Sorter, +} +impl<'out> WordDocidsExtractor<'out> { + pub fn new( + docid: u32, + words_sorter: &'out mut Sorter, + exact_words_sorter: &'out mut Sorter, + ) -> Result { + let mut value_buffer = vec![]; + let bitmap = RoaringBitmap::from_iter(Some(docid)); + serialize_roaring_bitmap(&bitmap, &mut value_buffer)?; + + Ok(Self { + value_buffer, + word_docids_sorter: words_sorter, + exact_word_docids_sorter: exact_words_sorter, + }) + } + pub fn extract_from_docid_word_positions_extractor( + &mut self, + fid: u16, + extractor: &DocidWordPositionsExtractor<'out>, + ctx: Context, + ) -> Result<()> { + if ctx.exact_attributes.contains(&fid) { + for word in extractor.word_positions.keys() { + self.exact_word_docids_sorter.insert(word.as_slice(), &self.value_buffer)?; + } + } else { + for word in extractor.word_positions.keys() { + self.word_docids_sorter.insert(word.as_slice(), &self.value_buffer)?; + } + } + Ok(()) + } +} diff --git a/milli/src/update/index_documents/extract2/word_pair_proximity.rs b/milli/src/update/index_documents/extract2/word_pair_proximity.rs new file mode 100644 index 000000000..6c55a3317 --- /dev/null +++ b/milli/src/update/index_documents/extract2/word_pair_proximity.rs @@ -0,0 +1,100 @@ +use std::collections::{HashMap, VecDeque}; + +use crate::{update::index_documents::MergeFn, Result}; +use grenad::Sorter; + +pub struct WordPairProximityDocidsExtractor<'out> { + docid: u32, + sorter: &'out mut Sorter, + // (word1, position) followed by (word2, position) + window: VecDeque<(Vec, u32)>, + // key is `word1 \0 word2 \0` as bytes, value is their proximity + batch: HashMap, u8, fxhash::FxBuildHasher>, +} +impl<'out> WordPairProximityDocidsExtractor<'out> { + pub fn new(docid: u32, sorter: &'out mut Sorter) -> Self { + Self { + docid, + sorter, + window: VecDeque::new(), + batch: HashMap::default(), // TODO: use better hash function + } + } + + pub fn extract_from_token_and_position(&mut self, token: &[u8], position: u32) -> Result<()> { + loop { + if let Some((word1, pos1)) = self.window.front() { + if position - pos1 <= 7 { + self.window.push_back((token.to_owned(), position)); + return Ok(()); + } else { + let mut key = vec![]; + // for each word1, word2 pair, add it to the hashmap + // then dequeue the word1 + for (word2, pos2) in self.window.iter().skip(1) { + insert_in_batch(&word1, &word2, *pos1, *pos2, &mut key, &mut self.batch); + } + self.window.pop_front(); + } + } else { + // let w = std::str::from_utf8(token).unwrap(); + // println!("push {w} at pos {position}"); + self.window.push_back((token.to_owned(), position)); + return Ok(()); + } + } + } + + pub fn finish_docid(&mut self) -> Result<()> { + while let Some((word1, pos1)) = self.window.front() { + let mut key = vec![]; + // for each word1, word2 pair, add it to the hashmap + // then dequeue the word1 + for (word2, pos2) in self.window.iter().skip(1) { + insert_in_batch(&word1, &word2, *pos1, *pos2, &mut key, &mut self.batch); + } + self.window.pop_front(); + } + let mut key_buffer = vec![]; + for (key, prox) in self.batch.iter() { + key_buffer.clear(); + key_buffer.extend_from_slice(key); + key_buffer.push(*prox); + self.sorter.insert(&key_buffer, self.docid.to_ne_bytes())?; + } + Ok(()) + } +} +fn insert_in_batch( + word1: &[u8], + word2: &[u8], + pos1: u32, + pos2: u32, + key: &mut Vec, + batch: &mut HashMap, u8, fxhash::FxBuildHasher>, +) { + key.clear(); + + key.extend_from_slice(word1); + key.push(0); + key.extend_from_slice(word2); + key.push(0); + let distance = pos2 - pos1; + let prox = batch.entry(key.clone()).or_insert(u8::MAX); + *prox = std::cmp::min(*prox, distance as u8); + assert!(*prox <= 7); + key.clear(); + + if *prox == 7 { + return; + } + + key.extend_from_slice(word2); + key.push(0); + key.extend_from_slice(word1); + key.push(0); + let distance = pos2 - pos1 + 1; + let prox = batch.entry(key.clone()).or_insert(u8::MAX); + *prox = std::cmp::min(*prox, distance as u8); + assert!(*prox <= 7); +} diff --git a/milli/src/update/index_documents/extract2/word_position_docid.rs b/milli/src/update/index_documents/extract2/word_position_docid.rs new file mode 100644 index 000000000..dfa109a0a --- /dev/null +++ b/milli/src/update/index_documents/extract2/word_position_docid.rs @@ -0,0 +1,21 @@ +use crate::{update::index_documents::MergeFn, Result}; +use grenad::Sorter; + +pub struct WordPositionExtractor<'out> { + docid: u32, + key_buffer: Vec, + sorter: &'out mut Sorter, +} +impl<'out> WordPositionExtractor<'out> { + pub fn new(docid: u32, sorter: &'out mut Sorter) -> Self { + Self { docid, key_buffer: vec![], sorter } + } + + pub fn extract_from_token_and_position(&mut self, token: &[u8], position: u32) -> Result<()> { + self.key_buffer.clear(); + self.key_buffer.extend_from_slice(token); + self.key_buffer.extend_from_slice(&position.to_be_bytes()); + self.sorter.insert(&self.key_buffer, &self.docid.to_ne_bytes())?; + Ok(()) + } +} diff --git a/milli/src/update/index_documents/helpers/grenad_helpers.rs b/milli/src/update/index_documents/helpers/grenad_helpers.rs index 202e689f8..e2acce45c 100644 --- a/milli/src/update/index_documents/helpers/grenad_helpers.rs +++ b/milli/src/update/index_documents/helpers/grenad_helpers.rs @@ -157,58 +157,6 @@ impl Default for GrenadParameters { } } -impl GrenadParameters { - /// This function use the number of threads in the current threadpool to compute the value. - /// This should be called inside of a rayon thread pool, - /// Otherwise, it will take the global number of threads. - pub fn max_memory_by_thread(&self) -> Option { - self.max_memory.map(|max_memory| max_memory / rayon::current_num_threads()) - } -} - -/// Returns an iterator that outputs grenad readers of obkv documents -/// with a maximum size of approximately `documents_chunks_size`. -/// -/// The grenad obkv entries are composed of an incremental document id big-endian -/// encoded as the key and an obkv object with an `u8` for the field as the key -/// and a simple UTF-8 encoded string as the value. -pub fn grenad_obkv_into_chunks( - reader: grenad::Reader, - indexer: GrenadParameters, - documents_chunk_size: usize, -) -> Result>>> { - let mut continue_reading = true; - let mut cursor = reader.into_cursor()?; - - let indexer_clone = indexer.clone(); - let mut transposer = move || { - if !continue_reading { - return Ok(None); - } - - let mut current_chunk_size = 0u64; - let mut obkv_documents = create_writer( - indexer_clone.chunk_compression_type, - indexer_clone.chunk_compression_level, - tempfile::tempfile()?, - ); - - while let Some((document_id, obkv)) = cursor.move_on_next()? { - obkv_documents.insert(document_id, obkv)?; - current_chunk_size += document_id.len() as u64 + obkv.len() as u64; - - if current_chunk_size >= documents_chunk_size as u64 { - return writer_into_reader(obkv_documents).map(Some); - } - } - - continue_reading = false; - writer_into_reader(obkv_documents).map(Some) - }; - - Ok(std::iter::from_fn(move || transposer().transpose())) -} - pub fn write_into_lmdb_database( wtxn: &mut heed::RwTxn, database: heed::PolyDatabase, diff --git a/milli/src/update/index_documents/helpers/mod.rs b/milli/src/update/index_documents/helpers/mod.rs index 6466a636b..76d03233e 100644 --- a/milli/src/update/index_documents/helpers/mod.rs +++ b/milli/src/update/index_documents/helpers/mod.rs @@ -8,9 +8,9 @@ use std::convert::{TryFrom, TryInto}; pub use clonable_mmap::{ClonableMmap, CursorClonableMmap}; use fst::{IntoStreamer, Streamer}; pub use grenad_helpers::{ - as_cloneable_grenad, create_sorter, create_writer, grenad_obkv_into_chunks, - merge_ignore_values, sorter_into_lmdb_database, sorter_into_reader, write_into_lmdb_database, - writer_into_reader, GrenadParameters, MergeableReader, + as_cloneable_grenad, create_sorter, create_writer, merge_ignore_values, + sorter_into_lmdb_database, sorter_into_reader, write_into_lmdb_database, writer_into_reader, + GrenadParameters, MergeableReader, }; pub use merge_functions::{ concat_u32s_array, keep_first, keep_first_prefix_value_merge_roaring_bitmaps, keep_latest_obkv, diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index e0eefe07b..03807d2f7 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -1,37 +1,36 @@ mod enrich; -mod extract; +mod extract2; mod helpers; mod transform; mod typed_chunk; -use std::collections::HashSet; -use std::io::{Cursor, Read, Seek}; -use std::iter::FromIterator; -use std::num::{NonZeroU32, NonZeroUsize}; -use std::result::Result as StdResult; - -use crossbeam_channel::{Receiver, Sender}; -use heed::types::Str; +use heed::types::{ByteSlice, Str}; use heed::Database; use log::debug; use roaring::RoaringBitmap; use serde::{Deserialize, Serialize}; use slice_group_by::GroupBy; -use typed_chunk::{write_typed_chunk_into_index, TypedChunk}; +use std::collections::HashSet; +use std::io::{Cursor, Read, Seek}; +use std::iter::FromIterator; +use std::num::{NonZeroU32, NonZeroUsize}; +use std::result::Result as StdResult; use self::enrich::enrich_documents_batch; pub use self::enrich::{ extract_finite_float_from_value, validate_document_id, validate_document_id_value, validate_geo_from_json, DocumentId, }; +use self::extract2::merge_extracted_data; +use self::helpers::GrenadParameters; pub use self::helpers::{ as_cloneable_grenad, create_sorter, create_writer, fst_stream_into_hashset, fst_stream_into_vec, merge_cbo_roaring_bitmaps, merge_roaring_bitmaps, sorter_into_lmdb_database, valid_lmdb_key, write_into_lmdb_database, writer_into_reader, ClonableMmap, MergeFn, }; -use self::helpers::{grenad_obkv_into_chunks, GrenadParameters}; pub use self::transform::{Transform, TransformOutput}; +use self::typed_chunk::TypedChunk; use crate::documents::{obkv_to_object, DocumentsBatchReader}; use crate::error::UserError; pub use crate::update::index_documents::helpers::CursorClonableMmap; @@ -218,34 +217,9 @@ where // up to date field map. self.index.put_fields_ids_map(self.wtxn, &fields_ids_map)?; - let backup_pool; - let pool = match self.indexer_config.thread_pool { - Some(ref pool) => pool, - #[cfg(not(test))] - None => { - // We initialize a bakcup pool with the default - // settings if none have already been set. - backup_pool = rayon::ThreadPoolBuilder::new().build()?; - &backup_pool - } - #[cfg(test)] - None => { - // We initialize a bakcup pool with the default - // settings if none have already been set. - backup_pool = rayon::ThreadPoolBuilder::new().num_threads(1).build()?; - &backup_pool - } - }; - let original_documents = grenad::Reader::new(original_documents)?; let flattened_documents = grenad::Reader::new(flattened_documents)?; - // create LMDB writer channel - let (lmdb_writer_sx, lmdb_writer_rx): ( - Sender>, - Receiver>, - ) = crossbeam_channel::unbounded(); - // get the primary key field id let primary_key_id = fields_ids_map.id(&primary_key).unwrap(); @@ -275,61 +249,8 @@ where None => None, }; - let stop_words = self.index.stop_words(self.wtxn)?; - let exact_attributes = self.index.exact_attributes_ids(self.wtxn)?; - - let pool_params = GrenadParameters { - chunk_compression_type: self.indexer_config.chunk_compression_type, - chunk_compression_level: self.indexer_config.chunk_compression_level, - max_memory: self.indexer_config.max_memory, - max_nb_chunks: self.indexer_config.max_nb_chunks, // default value, may be chosen. - }; - let documents_chunk_size = - self.indexer_config.documents_chunk_size.unwrap_or(1024 * 1024 * 4); // 4MiB let max_positions_per_attributes = self.indexer_config.max_positions_per_attributes; - // Run extraction pipeline in parallel. - pool.install(|| { - // split obkv file into several chunks - let original_chunk_iter = grenad_obkv_into_chunks( - original_documents, - pool_params.clone(), - documents_chunk_size, - ); - - // split obkv file into several chunks - let flattened_chunk_iter = grenad_obkv_into_chunks( - flattened_documents, - pool_params.clone(), - documents_chunk_size, - ); - - let result = original_chunk_iter.and_then(|original_chunk| { - let flattened_chunk = flattened_chunk_iter?; - // extract all databases from the chunked obkv douments - extract::data_from_obkv_documents( - original_chunk, - flattened_chunk, - pool_params, - lmdb_writer_sx.clone(), - searchable_fields, - faceted_fields, - primary_key_id, - geo_fields_ids, - stop_words, - max_positions_per_attributes, - exact_attributes, - ) - }); - - if let Err(e) = result { - let _ = lmdb_writer_sx.send(Err(e)); - } - - // needs to be droped to avoid channel waiting lock. - drop(lmdb_writer_sx) - }); - // We delete the documents that this document addition replaces. This way we are // able to simply insert all the documents even if they already exist in the database. if !replaced_documents_ids.is_empty() { @@ -339,23 +260,62 @@ where let deleted_documents_count = deletion_builder.execute()?; debug!("{} documents actually deleted", deleted_documents_count.deleted_documents); } + // Add the original documents to the index + let mut cursor = original_documents.into_cursor()?; + while let Some((key, value)) = cursor.move_on_next()? { + self.index + .documents + .remap_types::() + .put(self.wtxn, key, value)?; + } + + let stop_words = self.index.stop_words(self.wtxn)?; + let exact_attributes = self.index.exact_attributes_ids(self.wtxn)?; + let pool_params = GrenadParameters { + chunk_compression_type: self.indexer_config.chunk_compression_type, + chunk_compression_level: self.indexer_config.chunk_compression_level, + max_memory: self.indexer_config.max_memory, + max_nb_chunks: self.indexer_config.max_nb_chunks, // default value, may be chosen. + }; let index_documents_ids = self.index.documents_ids(self.wtxn)?; let index_is_empty = index_documents_ids.len() == 0; - let mut final_documents_ids = RoaringBitmap::new(); + + let context = extract2::Context { + primary_key_fid: primary_key_id, + geo_fields_ids, + searchable_fields: &searchable_fields, + faceted_fields: &faceted_fields, + stop_words: stop_words.as_ref(), + max_positions_per_attributes: max_positions_per_attributes.unwrap_or(u32::MAX), + exact_attributes: &exact_attributes, + grenad_params: pool_params, + }; + let extracted_data = extract2::extract_data( + self.indexer_config.max_memory.unwrap_or(8_000_000_000), + 8, + unsafe { as_cloneable_grenad(&flattened_documents)? }, + context, + )?; + let mut word_pair_proximity_docids = None; let mut word_position_docids = None; let mut word_docids = None; let mut exact_word_docids = None; + let mut final_documents_ids = RoaringBitmap::new(); let mut databases_seen = 0; (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { databases_seen, total_databases: TOTAL_POSTING_DATABASE_COUNT, }); - for result in lmdb_writer_rx { - let typed_chunk = match result? { + let (sx, rx) = crossbeam_channel::unbounded::>(); + merge_extracted_data(extracted_data, pool_params, sx.clone())?; + drop(sx); + + for chunk in rx { + let chunk = match chunk? { TypedChunk::WordDocids { word_docids_reader, exact_word_docids_reader } => { let cloneable_chunk = unsafe { as_cloneable_grenad(&word_docids_reader)? }; word_docids = Some(cloneable_chunk); @@ -376,9 +336,12 @@ where } otherwise => otherwise, }; - - let (docids, is_merged_database) = - write_typed_chunk_into_index(typed_chunk, &self.index, self.wtxn, index_is_empty)?; + let (docids, is_merged_database) = typed_chunk::write_typed_chunk_into_index( + chunk, + self.index, + self.wtxn, + index_is_empty, + )?; if !docids.is_empty() { final_documents_ids |= docids; let documents_seen_count = final_documents_ids.len(); @@ -413,10 +376,10 @@ where self.index.put_documents_ids(self.wtxn, &all_documents_ids)?; self.execute_prefix_databases( - word_docids, - exact_word_docids, - word_pair_proximity_docids, - word_position_docids, + word_docids.unwrap(), + exact_word_docids.unwrap(), + word_pair_proximity_docids.unwrap(), + word_position_docids.unwrap(), )?; Ok(all_documents_ids.len()) @@ -425,10 +388,10 @@ where #[logging_timer::time("IndexDocuments::{}")] pub fn execute_prefix_databases( self, - word_docids: Option>, - exact_word_docids: Option>, - word_pair_proximity_docids: Option>, - word_position_docids: Option>, + word_docids: grenad::Reader, + exact_word_docids: grenad::Reader, + word_pair_proximity_docids: grenad::Reader, + word_position_docids: grenad::Reader, ) -> Result<()> where F: Fn(UpdateIndexingStep) + Sync, @@ -494,7 +457,7 @@ where total_databases: TOTAL_POSTING_DATABASE_COUNT, }); - if let Some(word_docids) = word_docids { + if !word_docids.is_empty() { execute_word_prefix_docids( self.wtxn, word_docids, @@ -507,7 +470,7 @@ where )?; } - if let Some(exact_word_docids) = exact_word_docids { + if !exact_word_docids.is_empty() { execute_word_prefix_docids( self.wtxn, exact_word_docids, @@ -526,7 +489,7 @@ where total_databases: TOTAL_POSTING_DATABASE_COUNT, }); - if let Some(word_pair_proximity_docids) = word_pair_proximity_docids { + if !word_pair_proximity_docids.is_empty() { // Run the word prefix pair proximity docids update operation. let mut builder = WordPrefixPairProximityDocids::new(self.wtxn, self.index); builder.chunk_compression_type = self.indexer_config.chunk_compression_type; @@ -547,7 +510,7 @@ where total_databases: TOTAL_POSTING_DATABASE_COUNT, }); - if let Some(word_position_docids) = word_position_docids { + if !word_position_docids.is_empty() { // Run the words prefix position docids update operation. let mut builder = WordPrefixPositionDocids::new(self.wtxn, self.index); builder.chunk_compression_type = self.indexer_config.chunk_compression_type; diff --git a/milli/src/update/index_documents/typed_chunk.rs b/milli/src/update/index_documents/typed_chunk.rs index 5b7b00c21..d02c09705 100644 --- a/milli/src/update/index_documents/typed_chunk.rs +++ b/milli/src/update/index_documents/typed_chunk.rs @@ -234,7 +234,7 @@ pub(crate) fn write_typed_chunk_into_index( Ok((RoaringBitmap::new(), is_merged_database)) } -fn merge_word_docids_reader_into_fst( +pub fn merge_word_docids_reader_into_fst( word_docids_iter: grenad::Reader>, exact_word_docids_iter: grenad::Reader>, ) -> Result>> { @@ -251,14 +251,18 @@ fn merge_word_docids_reader_into_fst( Ok(builder.into_set()) } -fn merge_roaring_bitmaps(new_value: &[u8], db_value: &[u8], buffer: &mut Vec) -> Result<()> { +pub fn merge_roaring_bitmaps( + new_value: &[u8], + db_value: &[u8], + buffer: &mut Vec, +) -> Result<()> { let new_value = RoaringBitmap::deserialize_from(new_value)?; let db_value = RoaringBitmap::deserialize_from(db_value)?; let value = new_value | db_value; Ok(serialize_roaring_bitmap(&value, buffer)?) } -fn merge_cbo_roaring_bitmaps( +pub fn merge_cbo_roaring_bitmaps( new_value: &[u8], db_value: &[u8], buffer: &mut Vec, @@ -271,7 +275,7 @@ fn merge_cbo_roaring_bitmaps( /// Write provided entries in database using serialize_value function. /// merge_values function is used if an entry already exist in the database. -fn write_entries_into_database( +pub fn write_entries_into_database( data: grenad::Reader, database: &heed::Database, wtxn: &mut RwTxn, @@ -313,7 +317,7 @@ where /// merge_values function is used if an entry already exist in the database. /// All provided entries must be ordered. /// If the index is not empty, write_entries_into_database is called instead. -fn append_entries_into_database( +pub fn append_entries_into_database( data: grenad::Reader, database: &heed::Database, wtxn: &mut RwTxn,