Pipeline & Database Intergration
the end of data flow, it will be consumed. When an entity has been collected, it eventually will be sent to pipelines. Pipeline provides way to do:
- cleaning/validating collected entity
- de-duplicates
- database storing
Inspection of Pipeline
Let's take a look at the simplified code of Pipeline
before diving deeper.
#![allow(unused)] fn main() { pub struct PipeLine<'pl, E, C> { initializer: Option<&'pl dyn for<'a> Fn(&'a mut App<E>) -> Option<C>>, disposer: Option<&'pl dyn for<'a> Fn(&'a mut App<E>)>, process_entity: Option<&'pl dyn for<'a> Fn(Vec<E>, &'a mut App<E>)>, process_yerr: Option< &'pl dyn for<'a> Fn( Vec<Result<Response, MetaResponse>>, &'a mut App<E>, )>, // other fields omitted ... } }
- the method
initializer
get called only once over the runtime, it returns a generic typeC
which defined by user, the generic type is usually a connection cursor to storage destination. - the method
disposer
get called once when the pipeline ends. - the method
process_entity
processes a vector of entity then consume them. - the method
process_yerr
processes a vector of failed response then consume them.
Diesel Sql
Diesel is the most productive way to interact with SQL databases. It is recommanded to get around the basics of diesel here . A detailed example is given at examples.
Other Database
Almost other databases are equipmented with rust-based driver, it is just as simple as following the documentation, implementing the necessary methods.
Here is an simple example for MongoDB Intergration with driver mongodb.
#![allow(unused)] fn main() { pub async fn establish_connection(_app: &mut App<_>) -> Option<&'static mongodb::Client> { static INIT: Once = Once::new(); static mut VAL: Option<mongodb::Client> = None; unsafe { let uri = "mongodb://127.0.0.1:27017"; INIT.call_once(|| { VAL = Some(mongodb::Client::with_uri_str(uri).await.unwrap()); }); VAL.as_ref() } } pub async fn store_item(ens: Vec<_>, _app: &mut App<_>) { // do stuff here like validating and dropping ... let client = establish_connection(_app).await; client.database("database_name_here") .collection("collection_name_here") .insert_one(...) .await .unwrap(); } // set up pipiline let pipeline = Pipeline::builder() .initializer(establish_connection) .entity_mut(store_item) .build("marker".into()); }
This pipeline will insert collected entity into MongoDB.