Skip to content

[#1067] Database Adapter multiple threads#1070

Open
marcocapozzoli wants to merge 21 commits intomasterfrom
masc/1067-db-adapter-thread-pool
Open

[#1067] Database Adapter multiple threads#1070
marcocapozzoli wants to merge 21 commits intomasterfrom
masc/1067-db-adapter-thread-pool

Conversation

@marcocapozzoli
Copy link
Copy Markdown
Collaborator

Resolves #1067

…sumer count in ThreadPool from 4 to 8 for improved concurrency.
…agement

- Added BoundedSharedQueue class with enqueue, dequeue, empty, and size methods.
- Updated DatabaseMappingJob, AtomPersistenceJob, and AtomPersistenceJob2 to use BoundedSharedQueue.
- Refactored Pipeline and PostgresWrapper to replace SharedQueue with BoundedSharedQueue.
- Introduced BatchConsumer for managing batch processing with ThreadPool.
- Adjusted logging and batch size configurations for better performance.
… to Atom/Metta transformation

- Added DatabaseLoader.h and DatabaseLoader.cc to handle database mapping jobs.
- Introduced DatabaseMapper.h and DatabaseMapper.cc for mapping SQL rows to Metta S-Expressions and Atom objects.
- Created DatabaseTypes.h to define structures for SQL rows and column values.
- Updated PostgresWrapper and DatabaseWrapper to include new database mapping functionalities.
- Enhanced main application with a new entry point for database adapter operations.
- Updated build configurations to include new source files and dependencies.
- Modified test files to integrate new database loader and mapper functionalities.
Comment thread src/db_adapter/BoundedSharedQueue.h Outdated
private:
queue<void*> queue_;
mutex mtx_;
condition_variable not_empty_;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't see where this is used

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, I changed the code and forgot to remove that member

Comment thread src/db_adapter/DatabaseLoader.cc Outdated
}
this->atoms.push_back(atom);
}
if (!this->atoms.empty() && this->atoms.size() >= BATCH_SIZE) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (!this->atoms.empty() && this->atoms.size() >= BATCH_SIZE) {
if (this->atoms.size() >= BATCH_SIZE) {

Comment thread src/db_adapter/DatabaseLoader.cc Outdated
if (this->input_queue->empty()) break;
auto atom = (Atom*) this->input_queue->dequeue();
this->count++;
if (atom == nullptr) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use nullptr when you make comparison with unique_ptr<type> or shared_ptr<type>.
When using raw pointers (Type*) you should use NULL instead.

Comment thread src/db_adapter/DatabaseLoader.cc Outdated
if (this->input_queue->empty()) break;
auto atom = (Atom*) this->input_queue->dequeue();
this->count++;
if (atom == nullptr) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (atom == nullptr) {
if (atom == NULL) {

Comment thread src/db_adapter/DatabaseLoader.cc Outdated
try {
this->atomdb->add_atoms(atoms, false, true);

auto elapsed =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Utils::StopWatch instead of calling time functions directly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Database adapter with support for multiple threads.

2 participants