Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ cc_binary(
linkstatic = 1,
deps = [
"//commons:commons_lib",
"//tests/main:database_adapter_main_lib",
"//main:database_adapter_main_lib",
"@mbedtls",
],
)
Expand Down
61 changes: 44 additions & 17 deletions src/db_adapter/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,23 @@ cc_library(
name = "db_adapter_lib",
includes = ["."],
deps = [
":bounded_shared_queue",
":context_loader",
":data_mapper",
":data_types",
":database_adapter_runner",
":database_connection",
":database_loader",
":database_mapper",
":database_types",
":database_wrapper",
":pipeline",
"//commons:commons_lib",
"//commons/atoms:atoms_lib",
"//db_adapter/postgres:postgres_lib",
],
)

cc_library(
name = "data_types",
hdrs = ["DataTypes.h"],
name = "database_types",
hdrs = ["DatabaseTypes.h"],
includes = ["."],
deps = [
"//commons:commons_lib",
Expand All @@ -29,12 +31,12 @@ cc_library(
)

cc_library(
name = "data_mapper",
srcs = ["DataMapper.cc"],
hdrs = ["DataMapper.h"],
name = "database_mapper",
srcs = ["DatabaseMapper.cc"],
hdrs = ["DatabaseMapper.h"],
includes = ["."],
deps = [
":data_types",
":database_types",
"//commons:commons_lib",
"//commons/atoms:atoms_lib",
],
Expand All @@ -46,9 +48,9 @@ cc_library(
hdrs = ["DatabaseWrapper.h"],
includes = ["."],
deps = [
":data_mapper",
":data_types",
":database_connection",
":database_mapper",
":database_types",
"//commons:commons_lib",
"//commons/atoms:atoms_lib",
],
Expand All @@ -60,7 +62,7 @@ cc_library(
hdrs = ["ContextLoader.h"],
includes = ["."],
deps = [
":data_types",
":database_types",
"//commons:commons_lib",
"@nlohmann_json//:json",
],
Expand All @@ -72,20 +74,21 @@ cc_library(
hdrs = ["DatabaseConnection.h"],
includes = ["."],
deps = [
":data_types",
":database_types",
"//commons:commons_lib",
"//commons/atoms:atoms_lib",
"//commons/processor:processor_lib",
],
)

cc_library(
name = "pipeline",
srcs = ["Pipeline.cc"],
hdrs = ["Pipeline.h"],
name = "database_loader",
srcs = ["DatabaseLoader.cc"],
hdrs = ["DatabaseLoader.h"],
includes = ["."],
deps = [
":data_types",
":bounded_shared_queue",
":database_types",
":database_wrapper",
"//atomdb",
"//atomdb:atomdb_singleton",
Expand All @@ -94,3 +97,27 @@ cc_library(
"//db_adapter/postgres:postgres_lib",
],
)

cc_library(
name = "bounded_shared_queue",
srcs = ["BoundedSharedQueue.cc"],
hdrs = ["BoundedSharedQueue.h"],
includes = ["."],
deps = [
"//commons:commons_lib",
],
)

cc_library(
name = "database_adapter_runner",
srcs = ["DatabaseAdapterRunner.cc"],
hdrs = ["DatabaseAdapterRunner.h"],
includes = ["."],
deps = [
":bounded_shared_queue",
":database_loader",
":database_types",
"//commons:commons_lib",
"//commons/atoms:atoms_lib",
],
)
32 changes: 32 additions & 0 deletions src/db_adapter/BoundedSharedQueue.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#include "BoundedSharedQueue.h"

using namespace db_adapter;

BoundedSharedQueue::BoundedSharedQueue(size_t max_size) : max_size_(max_size) {}

BoundedSharedQueue::~BoundedSharedQueue() {}

void BoundedSharedQueue::enqueue(void* item) {
unique_lock<mutex> lock(mtx_);
not_full_.wait(lock, [this] { return queue_.size() < max_size_; });
queue_.push(item);
}

void* BoundedSharedQueue::dequeue() {
lock_guard<mutex> lock(mtx_);
if (queue_.empty()) return nullptr;
void* item = queue_.front();
queue_.pop();
not_full_.notify_one();
return item;
}

bool BoundedSharedQueue::empty() {
lock_guard<mutex> lock(mtx_);
return queue_.empty();
}

size_t BoundedSharedQueue::size() {
lock_guard<mutex> lock(mtx_);
return queue_.size();
}
29 changes: 29 additions & 0 deletions src/db_adapter/BoundedSharedQueue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#pragma once

#include <condition_variable>
#include <cstddef>
#include <mutex>
#include <queue>

using namespace std;

namespace db_adapter {

class BoundedSharedQueue {
public:
BoundedSharedQueue(size_t max_size);
~BoundedSharedQueue();

void enqueue(void* item);
void* dequeue();
bool empty();
size_t size();

private:
queue<void*> queue_;
mutex mtx_;
condition_variable not_empty_;
condition_variable not_full_;
size_t max_size_;
};
} // namespace db_adapter
2 changes: 1 addition & 1 deletion src/db_adapter/ContextLoader.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#include <string>
#include <vector>

#include "DataTypes.h"
#include "DatabaseTypes.h"

using namespace std;
using namespace db_adapter;
Expand Down
92 changes: 92 additions & 0 deletions src/db_adapter/DatabaseAdapterRunner.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
#include "DatabaseAdapterRunner.h"

#include <chrono>
#include <thread>

#include "AtomDBSingleton.h"
#include "BoundedSharedQueue.h"
#include "DatabaseLoader.h"
#include "DedicatedThread.h"
#include "Processor.h"
#include "Utils.h"
#include "processor/ThreadPool.h"

#define LOG_LEVEL INFO_LEVEL
#include "Logger.h"

using namespace std;
using namespace commons;
using namespace atomdb;
using namespace db_adapter;
using namespace processor;

DatabaseAdapterRunner::DatabaseAdapterRunner(const string& host,
int port,
const string& database,
const string& username,
const string& password,
const vector<TableMapping>& tables_mapping,
const vector<string>& queries_SQL,
MAPPER_TYPE mapper_type)
: host(host),
port(port),
database(database),
username(username),
password(password),
tables_mapping(tables_mapping),
queries_SQL(queries_SQL),
mapper_type(mapper_type) {}

void DatabaseAdapterRunner::run() {
auto queue = make_shared<BoundedSharedQueue>(100000);

DatabaseMappingJob db_mapping_job(host, port, database, username, password, mapper_type, queue);

auto producer = make_shared<DedicatedThread>("producer", &db_mapping_job);
if (!tables_mapping.empty()) {
for (const auto& table_mapping : tables_mapping) {
db_mapping_job.add_task_table(table_mapping);
}
}
LOG_DEBUG("Loaded " + to_string(tables_mapping.size()) + " table mappings from context file.");
if (!queries_SQL.empty()) {
for (size_t i = 0; i < queries_SQL.size(); i++) {
db_mapping_job.add_task_query("custom_query_" + to_string(i), queries_SQL[i]);
}
}
LOG_DEBUG("Loaded " + to_string(queries_SQL.size()) + " queries from query file.");

unsigned int num_threads = 8;
ThreadPool pool("consumers_pool", num_threads);
pool.setup();
pool.start();

BatchConsumer consumer(queue, pool, BATCH_SIZE);

producer->setup();
producer->start();
LOG_DEBUG("Producer thread started.");

while (true) {
consumer.dispatch();

if (db_mapping_job.is_finished() && !consumer.is_producer_finished()) {
LOG_INFO("Mapping completed. Loading data into DAS.");
producer->stop();
consumer.set_producer_finished();
}

if (consumer.is_producer_finished() && queue->empty()) {
consumer.dispatch();
break;
}

if (queue->empty()) {
this_thread::sleep_for(chrono::milliseconds(5));
}
}

pool.wait();
pool.stop();
LOG_INFO("Loading completed! Total atoms: " << consumer.get_total_count());
}
37 changes: 37 additions & 0 deletions src/db_adapter/DatabaseAdapterRunner.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#pragma once

#include <memory>
#include <string>
#include <vector>

#include "DatabaseTypes.h"

using namespace std;

namespace db_adapter {

class DatabaseAdapterRunner {
public:
DatabaseAdapterRunner(const string& host,
int port,
const string& database,
const string& username,
const string& password,
const vector<TableMapping>& tables_mapping,
const vector<string>& queries_SQL,
MAPPER_TYPE mapper_type);

void run();

private:
string host;
int port;
string database;
string username;
string password;
vector<TableMapping> tables_mapping;
vector<string> queries_SQL;
MAPPER_TYPE mapper_type;
};

} // namespace db_adapter
Loading
Loading