-
Novak, Matus (UG - Computer Science) authoredNovak, Matus (UG - Computer Science) authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
merge.h 1.99 KiB
#ifndef REW_COMMON_MERGE_H
#define REW_COMMON_MERGE_H
#include <vector>
#include <cassert>
#include "output.h"
REW_NAMESPACE {
/*!
* @ingroup common
*/
template<typename T, size_t N>
struct MergedValue {
T values[N] = { 0 };
};
/*!
* @ingroup common
*/
template<typename T, size_t N>
class Merge : public Output<MergedValue<T, N>> {
public:
typedef std::vector<MergedValue<T, N>> Buffer;
class Shared {
public:
Shared(Buffer& buffer, Output<MergedValue<T, N>>& output):buffer(buffer), output(output) {
}
void process(const T* data, const size_t length, const size_t offset) {
if (offsets[offset] + length > buffer.size()) {
buffer.resize(offsets[offset] + length);
}
T* start = reinterpret_cast<T*>(&buffer[offsets[offset]]);
start += offset;
auto end = start + length * N;
auto dst = start;
while (dst != end) {
*dst = *data++;
dst += N;
}
offsets[offset] += length;
size_t min = -1;
for (size_t i = 0; i < N; i++) {
if (offsets[i] < min) min = offsets[i];
}
if (min > 0) {
output.forward(buffer.data(), min);
buffer.erase(buffer.begin(), buffer.begin() + min);
for (size_t i = 0; i < N; i++) {
offsets[i] -= min;
}
}
}
Buffer& buffer;
Output<MergedValue<T, N>>& output;
size_t offsets[N] = { 0 };
};
class Source : public Input<T> {
public:
Source(Shared& shared, const size_t i):shared(shared),i(i) {
}
void process(const T* data, const size_t length) override {
shared.process(data, length, i);
}
private:
Shared& shared;
size_t i;
};
Merge():shared(buffer, dynamic_cast<Output<MergedValue<T, N>>&>(*this)) {
// Verify that there is no padding in the structure
assert(sizeof(MergedValue<T, N>) == sizeof(T) * N);
for (size_t i = 0; i < N; i++) {
sources[i] = std::make_shared<Source>(shared, i);
}
}
virtual ~Merge() = default;
Shared shared;
std::shared_ptr<Source> sources[N];
Buffer buffer;
};
}
#endif