This document outlines the design and implementation of a C++ pipeline framework, a powerful tool for creating efficient and modular data processing chains. It delves into the essential components and concepts that form the foundation of this framework.

Key Concepts

  • Graph: A generalized representation of data connections, comprising nodes (vertices) and edges linking these nodes.
  • Directed Graph: A graph where edges have directions, analogous to a one-way street map.
  • Acyclic Graph: A graph without any cycles, ensuring that every node's path doesn't lead back to its starting point.
  • Connected Graph: A graph where there are no 'unreachable' vertices, meaning every node can be reached from another.
  • Weakly Connected Graph: A directed graph where a path can be formed between any two vertices, ignoring edge directions.
  • Source Node: A node without any incoming edges, acting as the starting point for data flow.
  • Sink Node: A node without any outgoing edges, representing the final destination of data.
  • Source & Destination: In a directed edge, the 'source' refers to the originating node, while the 'destination' indicates the receiving node. It's crucial to differentiate these from 'source node' and 'sink node'.

Functionality and Implementation

The pipeline framework involves several key components:

1. pipeline_error Exception

This exception type represents various errors encountered during pipeline construction. Error types are encoded in the pipeline_error_kind enumeration class:

// Errors that may occur in a pipeline.
enum class pipeline_error_kind {
  // An expired node ID was provided.
  invalid_node_id,
  // Attempting to bind a non-existant slot.
  no_such_slot,
  // Attempting to bind to a slot that is already filled.
  slot_already_used,
  // The output type and input types for a connection don't match.
  connection_type_mismatch,
};

struct pipeline_error : std::exception {
  explicit pipeline_error(pipeline_error_kind kind);
  auto kind() -> pipeline_error_kind;
  auto what() -> const char *;
};
  • explicit pipeline_error(pipeline_error_kind kind);: Constructs an error with the given reason.
  • auto kind() -> pipeline_error_kind;: Returns the kind of error during construction.
  • auto what() -> const char *;: Returns a descriptive string based on kind(), e.g., 'invalid node ID', 'no such slot', etc.

2. node Class

A node represents a type-erased computation, serving as the base class for all component<I, O>s. It exposes common functionality. Some member functions are intentionally private to encourage encapsulation and allow only pipelines to modify node states.

// The result of a poll_next() operation.
enum class poll {
  // A value is available.
  ready,
  // No value is available this time, but there might be one later.
  empty,
  // No value is available, and there never will be again:
  // every future poll for this node will return `poll::closed` again.
  closed,
};

class node {
public:
  auto name() -> std::string;
private:
  auto poll_next() -> poll;
  void connect(const node* source, int slot);
  // You may add any other virtual functions you feel you may want here.
  friend class pipeline;
};
  • auto name() -> std::string;: Returns a human-readable name for the node. (Pure virtual function, must be overridden.)
  • auto poll_next() -> poll;: Processes a single tick, preparing the next value. (Pure virtual function, must be overridden.)
  • void connect(const node *source, int slot);: Connects source as the input to the given slot. (Pure virtual function, must be overridden.)

3. producer Class

This class allows inspecting a component as a producer of a specific type and retrieving its value. Specializations are needed for when Output is void (sink nodes), with the value() function omitted in these cases.

template <typename Output>
struct producer : node {
  using output_type = Output;
  auto value() -> const output_type&; // only when `Output` is not `void`
};
  • auto value() -> const output_type&;: Returns an immutable reference to the node's constructed value. (Pure virtual function, must be overridden.)

4. component Class

A component represents a single computation in the pipeline. It's parameterized by its Input and Output types.

template <typename Input, typename Output>
struct component : producer<Output> {
  using input_type = Input;
};

5. sink and source Classes

These classes simplify implementation of common component types. A sink consumes values without producing any output (end of pipeline), while a source produces values without consuming any input (start of pipeline).

template <typename Input>
struct sink : component<std::tuple<Input>, void> {};

template <typename Output>
struct source : component<std::tuple<>, Output> {
private:
  void connect(const node *source, int slot);
};

6. pipeline Class

This is the central class, managing the connections and execution of nodes in the pipeline. It allows dynamic reconfiguration even during or after execution.

// The requirements that a type `N` must satisfy
// to be used as a component in a pipeline.
template <typename N>
// 3.6.0 concept concrete_node;

class pipeline {
public:
  // 3.6.1
  using node_id = /* unspecified */; 
  // 3.6.2
  pipeline();
  pipeline(const pipeline &);
  pipeline(pipeline&&);
  auto operator=(const pipeline &) -> pipeline&;
  auto operator=(pipeline &&) -> pipeline&;
  ~pipeline();
  // 3.6.3
  template <typename N, typename... Args>
  requires concrete_node<N> and std::constructible_from<N, Args...>
  auto create_node(Args&& ...args) -> node_id;
  void erase_node(node_id n_id);
  auto get_node(node_id n_id) -> node*;
  // 3.6.4
  void connect(node_id src, node_id dst, int slot);
  void disconnect(node_id src, node_id dst);
  auto get_dependencies(node_id src) -> std::vector<std::pair<node_id, int>>;  
  // 3.6.5
  auto is_valid() -> bool;
  auto step() -> bool;
  void run();
  // 3.6.6
  friend std::ostream &operator<<(std::ostream &, const pipeline &);
};

3.6.0 concrete_node Concept

A custom concept defining the requirements for a component to be used within a pipeline. These requirements include:

  • Publishing consumed types via input_type (a std::tuple)
  • Publishing produced type via output_type
  • Deriving from node and the appropriate producer type
  • Not being an abstract class (constructible)

3.6.1 Types

  • using node_id = /* unspecified */;: An opaque handle to a node, copyable, default-constructible, and equality-comparable. An invalid node_id refers to a non-existent node in the pipeline.

3.6.2 Special Members

  • The pipeline must be default constructible.
  • Copying should be a compile error, while moving should be supported.

3.6.3 Node Management

  • auto create_node<N>(Args&& ...args) -> node_id;: Creates a new node of type N with provided arguments and returns its node_id.
  • void erase_node(node_id node);: Removes a specified node and its connections.
  • auto get_node(node_id node) -> node *;: Returns a pointer to the node associated with the given node_id (or nullptr if invalid).

3.6.4 Connection Management

  • void connect(node_id source, node_id dest, int slot);: Connects source's output to dest's input at the specified slot, ensuring type compatibility.
  • void disconnect(node_id source, node_id dest);: Removes all connections between two nodes.
  • auto get_dependencies(node_id source) const -> std::vector<std::pair<node_id, int>>;: Retrieves a list of nodes depending on source, including their connection slots.

3.6.5 Validation and Execution

  • auto is_valid() -> bool;: Validates pipeline correctness, ensuring all source slots are filled, non-sink nodes have dependents, and there are no cycles.
  • auto step() -> bool;: Performs one tick of the pipeline, processing data from sources and passing it through connected nodes, returning true if all sink nodes are closed, false otherwise.
  • void run();: Executes the pipeline until all sink nodes are closed (equivalent to repeatedly calling step()).

This pipeline framework provides a solid foundation for building sophisticated data processing systems in C++. Implementing these concepts allows for efficient and modular handling of data flow, enabling complex computations and flexible pipeline configurations. It emphasizes strong exception guarantees for most operations, ensuring robustness and predictable behavior even in error scenarios.

C++ Pipeline Implementation: Design and Concepts for Data Flow

原文地址: https://www.cveoy.top/t/topic/m61n 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录