Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
merge.h 1.99 KiB
#ifndef REW_COMMON_MERGE_H
#define REW_COMMON_MERGE_H

#include <vector>
#include <cassert>
#include "output.h"

REW_NAMESPACE {
	/*!
	 * @ingroup common
	 */
	template<typename T, size_t N>
	struct MergedValue {
		T values[N] = { 0 };
	};
	/*!
	 * @ingroup common
	 */
	template<typename T, size_t N>
	class Merge : public Output<MergedValue<T, N>> {
	public:
		typedef std::vector<MergedValue<T, N>> Buffer;

		class Shared {
		public:
			Shared(Buffer& buffer, Output<MergedValue<T, N>>& output):buffer(buffer), output(output) {
			}

			void process(const T* data, const size_t length, const size_t offset) {
				if (offsets[offset] + length > buffer.size()) {
					buffer.resize(offsets[offset] + length);
				}

				T* start = reinterpret_cast<T*>(&buffer[offsets[offset]]);
				start += offset;
				auto end = start + length * N;
				auto dst = start;
				while (dst != end) {
					*dst = *data++;
					dst += N;
				}

				offsets[offset] += length;

				size_t min = -1;
				for (size_t i = 0; i < N; i++) {
					if (offsets[i] < min) min = offsets[i];
				}

				if (min > 0) {
					output.forward(buffer.data(), min);
					buffer.erase(buffer.begin(), buffer.begin() + min);
					for (size_t i = 0; i < N; i++) {
						offsets[i] -= min;
					}
				}
			}

			Buffer& buffer;
			Output<MergedValue<T, N>>& output;
			size_t offsets[N] = { 0 };
		};

		class Source : public Input<T> {
		public:
			Source(Shared& shared, const size_t i):shared(shared),i(i) {
				
			}

			void process(const T* data, const size_t length) override {
				shared.process(data, length, i);
			}
		private:
			Shared& shared;
			size_t i;
		};

		Merge():shared(buffer, dynamic_cast<Output<MergedValue<T, N>>&>(*this)) {
			// Verify that there is no padding in the structure
			assert(sizeof(MergedValue<T, N>) == sizeof(T) * N);

			for (size_t i = 0; i < N; i++) {
				sources[i] = std::make_shared<Source>(shared, i);
			}
		}
		virtual ~Merge() = default;

		Shared shared;
		std::shared_ptr<Source> sources[N];
		Buffer buffer;
	};
}

#endif