C++ Pipeline Implementation: Design and Concepts for Data Flow
This document outlines the design and implementation of a C++ pipeline framework, a powerful tool for creating efficient and modular data processing chains. It delves into the essential components and concepts that form the foundation of this framework.
Key Concepts
- Graph: A generalized representation of data connections, comprising nodes (vertices) and edges linking these nodes.
- Directed Graph: A graph where edges have directions, analogous to a one-way street map.
- Acyclic Graph: A graph without any cycles, ensuring that every node's path doesn't lead back to its starting point.
- Connected Graph: A graph where there are no 'unreachable' vertices, meaning every node can be reached from another.
- Weakly Connected Graph: A directed graph where a path can be formed between any two vertices, ignoring edge directions.
- Source Node: A node without any incoming edges, acting as the starting point for data flow.
- Sink Node: A node without any outgoing edges, representing the final destination of data.
- Source & Destination: In a directed edge, the 'source' refers to the originating node, while the 'destination' indicates the receiving node. It's crucial to differentiate these from 'source node' and 'sink node'.
Functionality and Implementation
The pipeline framework involves several key components:
1. pipeline_error Exception
This exception type represents various errors encountered during pipeline construction. Error types are encoded in the pipeline_error_kind enumeration class:
// Errors that may occur in a pipeline.
enum class pipeline_error_kind {
// An expired node ID was provided.
invalid_node_id,
// Attempting to bind a non-existant slot.
no_such_slot,
// Attempting to bind to a slot that is already filled.
slot_already_used,
// The output type and input types for a connection don't match.
connection_type_mismatch,
};
struct pipeline_error : std::exception {
explicit pipeline_error(pipeline_error_kind kind);
auto kind() -> pipeline_error_kind;
auto what() -> const char *;
};
explicit pipeline_error(pipeline_error_kind kind);: Constructs an error with the given reason.auto kind() -> pipeline_error_kind;: Returns the kind of error during construction.auto what() -> const char *;: Returns a descriptive string based onkind(), e.g., 'invalid node ID', 'no such slot', etc.
2. node Class
A node represents a type-erased computation, serving as the base class for all component<I, O>s. It exposes common functionality. Some member functions are intentionally private to encourage encapsulation and allow only pipelines to modify node states.
// The result of a poll_next() operation.
enum class poll {
// A value is available.
ready,
// No value is available this time, but there might be one later.
empty,
// No value is available, and there never will be again:
// every future poll for this node will return `poll::closed` again.
closed,
};
class node {
public:
auto name() -> std::string;
private:
auto poll_next() -> poll;
void connect(const node* source, int slot);
// You may add any other virtual functions you feel you may want here.
friend class pipeline;
};
auto name() -> std::string;: Returns a human-readable name for the node. (Pure virtual function, must be overridden.)auto poll_next() -> poll;: Processes a single tick, preparing the next value. (Pure virtual function, must be overridden.)void connect(const node *source, int slot);: Connectssourceas the input to the givenslot. (Pure virtual function, must be overridden.)
3. producer Class
This class allows inspecting a component as a producer of a specific type and retrieving its value. Specializations are needed for when Output is void (sink nodes), with the value() function omitted in these cases.
template <typename Output>
struct producer : node {
using output_type = Output;
auto value() -> const output_type&; // only when `Output` is not `void`
};
auto value() -> const output_type&;: Returns an immutable reference to the node's constructed value. (Pure virtual function, must be overridden.)
4. component Class
A component represents a single computation in the pipeline. It's parameterized by its Input and Output types.
template <typename Input, typename Output>
struct component : producer<Output> {
using input_type = Input;
};
5. sink and source Classes
These classes simplify implementation of common component types. A sink consumes values without producing any output (end of pipeline), while a source produces values without consuming any input (start of pipeline).
template <typename Input>
struct sink : component<std::tuple<Input>, void> {};
template <typename Output>
struct source : component<std::tuple<>, Output> {
private:
void connect(const node *source, int slot);
};
6. pipeline Class
This is the central class, managing the connections and execution of nodes in the pipeline. It allows dynamic reconfiguration even during or after execution.
// The requirements that a type `N` must satisfy
// to be used as a component in a pipeline.
template <typename N>
// 3.6.0 concept concrete_node;
class pipeline {
public:
// 3.6.1
using node_id = /* unspecified */;
// 3.6.2
pipeline();
pipeline(const pipeline &);
pipeline(pipeline&&);
auto operator=(const pipeline &) -> pipeline&;
auto operator=(pipeline &&) -> pipeline&;
~pipeline();
// 3.6.3
template <typename N, typename... Args>
requires concrete_node<N> and std::constructible_from<N, Args...>
auto create_node(Args&& ...args) -> node_id;
void erase_node(node_id n_id);
auto get_node(node_id n_id) -> node*;
// 3.6.4
void connect(node_id src, node_id dst, int slot);
void disconnect(node_id src, node_id dst);
auto get_dependencies(node_id src) -> std::vector<std::pair<node_id, int>>;
// 3.6.5
auto is_valid() -> bool;
auto step() -> bool;
void run();
// 3.6.6
friend std::ostream &operator<<(std::ostream &, const pipeline &);
};
3.6.0 concrete_node Concept
A custom concept defining the requirements for a component to be used within a pipeline. These requirements include:
- Publishing consumed types via
input_type(astd::tuple) - Publishing produced type via
output_type - Deriving from
nodeand the appropriateproducertype - Not being an abstract class (constructible)
3.6.1 Types
using node_id = /* unspecified */;: An opaque handle to a node, copyable, default-constructible, and equality-comparable. An invalidnode_idrefers to a non-existent node in the pipeline.
3.6.2 Special Members
- The
pipelinemust be default constructible. - Copying should be a compile error, while moving should be supported.
3.6.3 Node Management
auto create_node<N>(Args&& ...args) -> node_id;: Creates a new node of typeNwith provided arguments and returns itsnode_id.void erase_node(node_id node);: Removes a specified node and its connections.auto get_node(node_id node) -> node *;: Returns a pointer to the node associated with the givennode_id(ornullptrif invalid).
3.6.4 Connection Management
void connect(node_id source, node_id dest, int slot);: Connectssource's output todest's input at the specified slot, ensuring type compatibility.void disconnect(node_id source, node_id dest);: Removes all connections between two nodes.auto get_dependencies(node_id source) const -> std::vector<std::pair<node_id, int>>;: Retrieves a list of nodes depending onsource, including their connection slots.
3.6.5 Validation and Execution
auto is_valid() -> bool;: Validates pipeline correctness, ensuring all source slots are filled, non-sink nodes have dependents, and there are no cycles.auto step() -> bool;: Performs one tick of the pipeline, processing data from sources and passing it through connected nodes, returningtrueif all sink nodes are closed,falseotherwise.void run();: Executes the pipeline until all sink nodes are closed (equivalent to repeatedly callingstep()).
This pipeline framework provides a solid foundation for building sophisticated data processing systems in C++. Implementing these concepts allows for efficient and modular handling of data flow, enabling complex computations and flexible pipeline configurations. It emphasizes strong exception guarantees for most operations, ensuring robustness and predictable behavior even in error scenarios.
原文地址: https://www.cveoy.top/t/topic/m61n 著作权归作者所有。请勿转载和采集!