/* This file is part of Jellyfish.
Jellyfish is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
Jellyfish is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with Jellyfish. If not, see .
*/
#ifndef __HASH_COUNTER_HPP__
#define __HASH_COUNTER_HPP__
#include
#include
#include
#include
/// Cooperative version of the hash_counter. In this implementation,
/// it is expected that the given number of threads will call the
/// `add` method regularly. In case the hash table is full, it gets
/// enlarged using all the threads. After the work is done, every
/// thread must call promptly the `done` method.
namespace jellyfish{ namespace cooperative {
template
class hash_counter {
public:
typedef typename large_hash::array array;
typedef typename array::key_type key_type;
typedef typename array::mapped_type mapped_type;
typedef typename array::value_type value_type;
typedef typename array::reference reference;
typedef typename array::const_reference const_reference;
typedef typename array::pointer pointer;
typedef typename array::const_pointer const_pointer;
typedef typename array::eager_iterator eager_iterator;
typedef typename array::lazy_iterator lazy_iterator;
protected:
array* ary_;
array* new_ary_;
uint16_t nb_threads_;
locks::pthread::barrier size_barrier_;
volatile uint16_t size_thid_, done_threads_;
bool do_size_doubling_;
dumper_t* dumper_;
public:
hash_counter(size_t size, // Size of hash. To be rounded up to a power of 2
uint16_t key_len, // Size of key in bits
uint16_t val_len, // Size of val in bits
uint16_t nb_threads, // Number of threads accessing this hash
uint16_t reprobe_limit = 126, // Maximum reprobe
const size_t* reprobes = jellyfish::quadratic_reprobes) :
ary_(new array(size, key_len, val_len, reprobe_limit, reprobes)),
new_ary_(0),
nb_threads_(nb_threads),
size_barrier_(nb_threads),
size_thid_(0),
done_threads_(0),
do_size_doubling_(true),
dumper_(0)
{ }
~hash_counter() {
delete ary_;
}
array* ary() { return ary_; }
const array* ary() const { return ary_; }
size_t size() const { return ary_->size(); }
uint16_t key_len() const { return ary_->key_len(); }
uint16_t val_len() const { return ary_->val_len(); }
uint16_t nb_threads() const { return nb_threads; }
uint16_t reprobe_limit() const { return ary_->max_reprobe(); }
/// Whether we attempt to double the size of the hash when full.
bool do_size_doubling() const { return do_size_doubling_; }
/// Set whether we attempt to double the size of the hash when full.
void do_size_doubling(bool v) { do_size_doubling_ = v; }
/// Set dumper responsible for cleaning out the array.
void dumper(dumper_t *d) { dumper_ = d; }
/// Add `v` to the entry `k`. It returns in `is_new` true if the
/// entry `k` did not exist in the hash. In `id` is returned the
/// final position of `k` in the hash array.
void add(const Key& k, uint64_t v, bool* is_new, size_t* id) {
unsigned int carry_shift = 0;
bool* is_new_ptr = is_new;
size_t* id_ptr = id;
bool is_new_void = false;
size_t id_void = false;
while(!ary_->add(k, v, &carry_shift, is_new_ptr, id_ptr)) {
handle_full_ary();
v &= ~(uint64_t)0 << carry_shift;
// If carry_shift == 0, failed to allocate the first field for
// key, hence status of is_new and value for id are not
// determined yet. On the other hand, if carry_shift > 0, we
// failed while adding extra field for large key, so the status
// of is_new and value of id are known. We do not update them in future
// calls.
if(carry_shift) {
is_new_ptr = &is_new_void;
id_ptr = &id_void;
}
}
}
/// Add `v` to the entry `k`. This method is multi-thread safe. If
/// the entry for `k` does not exists, it is inserted.
///
/// @param k Key to add to
/// @param v Value to add
inline void add(const Key& k, uint64_t v) {
bool is_new;
size_t id;
add(k, v, &is_new, &id);
}
/// Insert the key `k` in the hash. The value is not changed or set
/// to 0 if not already in the hash.
///
/// @param k Key to insert
inline void set(const Key& k) {
bool is_new;
size_t id;
set(k, &is_new, &id);
}
/// Insert the key `k` in the hash. The value is not changed or set
/// to 0 if not already in the hash. Set `is_new` to true if `k` did
/// not already exist in the hash. In `id` is returned the final
/// position of `k` in the hash.
void set(const Key& k, bool* is_new, size_t* id) {
while(!ary_->set(k, is_new, id))
handle_full_ary();
}
/// Update the value of key `k` by adding `v`, if `k` is already
/// present in the hash, otherwise this nothing happens. Returns
/// true if `k` is already in the hash, false otherwise.
bool update_add(const Key& k, uint64_t v) {
Key tmp_key;
return update_add(k, v, tmp_key);
}
bool update_add(const Key& k, uint64_t v, Key& tmp_key) {
unsigned int carry_shift = 0;
while(true) {
if(ary_->update_add(k, v, &carry_shift, tmp_key))
return true;
if(carry_shift == 0)
return false;
handle_full_ary();
v &= ~(uint64_t)0 << carry_shift;
}
}
/// Signify that thread is done and wait for all threads to be done.
void done() {
atomic_t::fetch_add(&done_threads_, (uint16_t)1);
while(!handle_full_ary()) ;
}
protected:
// Double the size of the hash and return false. Unless all the
// thread have reported they are done, in which case do nothing and
// return true.
bool handle_full_ary() {
bool serial_thread = size_barrier_.wait();
if(done_threads_ >= nb_threads_) // All done?
return true;
bool success = false;
if(do_size_doubling_)
success = success || double_size(serial_thread);
if(!success && dumper_) {
if(serial_thread)
dumper_->dump(ary_);
success = true;
size_barrier_.wait();
}
if(!success)
throw std::runtime_error("Hash full");
return false;
}
bool double_size(bool serial_thread) {
if(serial_thread) {// Allocate new array for size doubling
try {
new_ary_ = new array(ary_->size() * 2, ary_->key_len(), ary_->val_len(),
ary_->max_reprobe(), ary_->reprobes());
} catch(typename array::ErrorAllocation e) {
new_ary_ = 0;
}
}
size_thid_ = 0;
size_barrier_.wait();
array* my_ary = *(array* volatile*)&new_ary_;
if(!my_ary) // Allocation failed
return false;
// Copy data from old to new
uint16_t id = atomic_t::fetch_add(&size_thid_, (uint16_t)1);
// Why doesn't the following work? Seems like a bug to
// me. Equivalent call works in test_large_hash_array. Or am I
// missing something?
// eager_iterator it = ary_->iterator_slice(id, nb_threads_);
eager_iterator it = ary_->eager_slice(id, nb_threads_);
while(it.next())
my_ary->add(it.key(), it.val());
size_barrier_.wait();
if(serial_thread) { // Set new ary to be current and free old
delete ary_;
ary_ = new_ary_;
}
// Done. Last sync point
size_barrier_.wait();
return true;
}
};
} } // namespace jellyfish { namespace cooperative {
#endif /* __HASH_COUNTER_HPP__ */