Introduction
dyer is designed for reliable, flexible and fast Request-Response based service, including data processing, web-crawling and so on, providing some friendly, flexible, comprehensive features without compromising speed.
dyer
provides some high-level features:
- asynchronous, lock-free, concurrent streaming and I/O, make the best of thread pool, network, and system resource.
- Event-driven, once you set the initials and recursive conditions,
dyer
will handle the rest of it. - User-friendly and flexible,
dyer
offers high-level, easy to use wrappers and APIs what does a lot for you.
Dyer-cli
Before stepping into the topic, dyer-cli is highly recommanded to be installed. Dyer-cli is a handy tool for your easy and fast use of dyer,
Installation
dyer-cli
is public crate, just run the following in the terminal,
cargo install dyer-cli
once installed, type dyer
in the terminal to check, if something like following it is successfully installed.
Handy tool for dyer
USAGE:
dyer [subcommand] [options]
eg. dyer new myproject --debug
...
Create Project
Dyer-cli generates a template that contains many useful instances and instructions when using dyer with following code:
dyer new myproject
It will create a project called myproject
and the files layout displays:
|___Cargo.toml
|___Readme.md
|___data/
|___data/tasks/
|___src/
|___src/affix.rs
|___src/entity.rs
|___src/parser.rs
|___src/actor.rs
|___src/middleware.rs
|___src/pipeline.rs
Project layout and its role
Main functionality of each file:
- the
affix.rs
serves as an actor to adjust and satisfy additional requirement - the
entity.rs
contains entities/data structure to be used/collected - the
parser.rs
contains functions that extract entities from response - the
actor.rs
contains initial when opening and final things to do when closing - the
middleware.rs
contains Some middlewares that process data at runtime - the
pipeline.rs
contains entities manipulation including data-storage, displsying and so on - the
lib.rs
exports all modules inside the directory, just do nothing here normally Cargo.toml
is the basic configuration of the projectREADME.md
contains some instructions of the projectdata/
place to store/load files ofApp
when load-balancing and backup
Basic Procedures
Then it is your show time, basically there are simple example items(function
, enum
, struct
)
in each file you can follow. After that check your code
dyer check
if you run it the first time, dyer-cli will download the crates and then check the code.
if some warning happens such as unused import
or dead code
the command does a lot for you:
dyer fix
A wraper of cargo fix
, if some warning happens such as unused import
or dead code
the command does a lot for you. However it won't help if some errors occur, if so, you have to debug the code manually.
Edit dyer.cfg
file in the root directory
the file contains some configurations of ArgApp
that will update periodically, for more details see
[dyer.cfg Configuration]
When the program compiles, haha run it:
dyer run
Actor
Actor is trait that processing some methods to set up necessary conditions before the whole programs starts/ends. it basically categorizes:
- Preparations Beforehand method
new
,open_actor
andclose_actor
are provided to serve that purpose. First of all, you can define a struct according to your requirement, eg:
#![allow(unused)] fn main() { #[dyer::actor] struct MyActor { start_uris: Vec<String> ... } }
the struct MyActor
should contain appropirate fields which initialized by the method new
.
#![allow(unused)] fn main() { #[dyer::async_trait] impl Actor<_, _> for MyActor { async fn new() -> Self { Self { start_uris: vec!["https://example.domain/path/to/site1".to_string(), "https://example.domain/path/to/site2".to_string() ] //other fields ... } } // other method of Actor ... } }
before the whole program starts, the method open_actor
gets called. preparation should be done here! but wait, what should we do here? let's extend the example above a little bit.
all start uris are stored by lines in a file
uris.txt
#![allow(unused)] fn main() { #[dyer::actor] pub struct MyActor { start_uris: Vec<String> } #[dyer::async_trait] impl Actor<_, _> for MyActor { async fn new() -> Self { use std::io::Read; let mut file = std::fs::File::open("path/to/uris.txt").unwrap(); let buf = std::io::BufReader::new(file); let uris = buf.lines().map(|line| { line.unwrap() }).collect::<Vec<String>>(); Self { start_uris: uris } } async fn open_actor(&mut self, _app: &mut App<_>) { self.start_uris.for_each(|uri| { Task::get(uri) .parser(..) .body(Body::empty(), "myactor_identifier".into()) .unwrap() }); } // other method of Actor ... } }
Analogously you can do some staff with close_actor
when program ends.
- Assignments Entry The program cannot starts without
Task
,entry_task
serve as a way to add tasks to the lists. It expects a vector ofTask
when the function ends,
#![allow(unused)] fn main() { #[dyer::async_trait] impl Actor<_, _> for MyActor { async fn entry_task(&mut self) -> Result<Vec<Task>, Box<dyn Error>> { self.start_uris.map(|uri| { Task::get(uri) .parser(..) .body(Body::empty(), "myactor_identifier".into()) .unwrap() }).collect::<_>() } // other method of Actor ... } }
As for entry_affix
, it is commonly not necessary unless modification is required for that Task
,
But what is that? before we answer that let's take a look at the structure of Task,
#![allow(unused)] fn main() { pub struct Task { /// main infomation that represents a `Task` pub(crate) inner: InnerTask, /// Formdata, files or other request parameters stored here pub(crate) body: Body, ... } pub struct InnerTask { pub uri: Uri, /// request's vesoin pub method: Method, /// additional headers if necessary pub headers: HeaderMap<HeaderValue>, ... } }
it is obvious to see that a Task
almost contains infomation to make a request.
But when does entry_affix
play its role? Here are some scenarios that you may use it.
- Headers Modification (eg. Cookies, User-Agents, Tokens, and etc.)
- javascript simulation
- FFI and others
Here we focus on the first one(most used) and an example is given at section Actor.
Middleware
Middleware hooks all requests/responses and their derivatives of dyer, including Task
, Affix
, Request
, Response
, error
and entiry
. it's flexible, low-level, scale to modify the data flow of dyer.
Inspection of Middleware
before we dive deeper into what middleware is, let take a look at some simplified code of Middleware
#![allow(unused)] fn main() { pub struct MiddleWare<'md, E> { handle_affix: Option<&'md dyn for<'a> Fn(&'a mut Vec<Affix>, &'a mut App<E>)>, handle_task: Option<&'md dyn for<'a> Fn(&'a mut Vec<Task>, &'a mut App<E>)>, handle_req: Option<&'md dyn for<'a> Fn(&'a mut Vec<Request>, &'a mut App<E>)>, handle_res: Option<&'md dyn for<'a> Fn(&'a mut Vec<Response>, &'a mut App<E>)>, handle_entity: Option<&'md dyn for<'a> Fn(&'a mut Vec<E>, &'a mut App<E>)>, handle_yerr: Option< &'md dyn for<'a> Fn( &'a mut Vec<Result<Response, MetaResponse>>, &'a mut App<E>, )>, handle_err: Option< &'md dyn for<'a> Fn( &'a mut Vec<Result<Response, MetaResponse>>, &'a mut App<E>, )>, // some other fields ... } }
As shown above, it accepts some nullable async function as handlers for requests, response and its derivatives. let's log out errors:
#![allow(unused)] fn main() { pub async fn log_err(errs: &mut Vec<Result<Response, MetaResponse>, _: &mut App<E>> { for r in errs.iter() { match r { Ok(data) => { println!("failed request to {}", data.metas.info.uri); }, Err(e) => { println!("failed request to {}", e.info.uri); } } } } // set up `handle_err` let middleware = MiddleWare::builder().err_mut(&log_err).build("marker".into()); }
that middleware will log out uri of failed response.
Pipeline & Database Intergration
the end of data flow, it will be consumed. When an entity has been collected, it eventually will be sent to pipelines. Pipeline provides way to do:
- cleaning/validating collected entity
- de-duplicates
- database storing
Inspection of Pipeline
Let's take a look at the simplified code of Pipeline
before diving deeper.
#![allow(unused)] fn main() { pub struct PipeLine<'pl, E, C> { initializer: Option<&'pl dyn for<'a> Fn(&'a mut App<E>) -> Option<C>>, disposer: Option<&'pl dyn for<'a> Fn(&'a mut App<E>)>, process_entity: Option<&'pl dyn for<'a> Fn(Vec<E>, &'a mut App<E>)>, process_yerr: Option< &'pl dyn for<'a> Fn( Vec<Result<Response, MetaResponse>>, &'a mut App<E>, )>, // other fields omitted ... } }
- the method
initializer
get called only once over the runtime, it returns a generic typeC
which defined by user, the generic type is usually a connection cursor to storage destination. - the method
disposer
get called once when the pipeline ends. - the method
process_entity
processes a vector of entity then consume them. - the method
process_yerr
processes a vector of failed response then consume them.
Diesel Sql
Diesel is the most productive way to interact with SQL databases. It is recommanded to get around the basics of diesel here . A detailed example is given at examples.
Other Database
Almost other databases are equipmented with rust-based driver, it is just as simple as following the documentation, implementing the necessary methods.
Here is an simple example for MongoDB Intergration with driver mongodb.
#![allow(unused)] fn main() { pub async fn establish_connection(_app: &mut App<_>) -> Option<&'static mongodb::Client> { static INIT: Once = Once::new(); static mut VAL: Option<mongodb::Client> = None; unsafe { let uri = "mongodb://127.0.0.1:27017"; INIT.call_once(|| { VAL = Some(mongodb::Client::with_uri_str(uri).await.unwrap()); }); VAL.as_ref() } } pub async fn store_item(ens: Vec<_>, _app: &mut App<_>) { // do stuff here like validating and dropping ... let client = establish_connection(_app).await; client.database("database_name_here") .collection("collection_name_here") .insert_one(...) .await .unwrap(); } // set up pipiline let pipeline = Pipeline::builder() .initializer(establish_connection) .entity_mut(store_item) .build("marker".into()); }
This pipeline will insert collected entity into MongoDB.
Affix
Affix is the fringerprint when making a request. In general, affix is not necessay unless the target site requires visitor meet some criteria. Affix, by far, mainly focus on modification of Headers.
assign a user-agent for each Task with file
user-agents.txt
containing user-agents by lines
#![allow(unused)] fn main() { // src/affix.rs pub struct Aff { uas: Vec<String> iter: std::iter::Cycle<String>, } #[dyer::async_trait] impl Affixor for Aff { // this function only runs once async fn init(&mut self) { use std::io::Read; let mut file = std::fs::File::open("path/to/user-agents.txt").unwrap(); let buf = std::io::BufReader::new(file); let uas = buf.lines().map(|line| { line.unwrap() }).collect::<Vec<String>>(); self.uas = uas; self.iter = self.uas.iter().cycle(); } // if the affix isn't obtained via network(request-response), just return `None` async fn invoke(&mut self) -> Option<dyer::Request> { None } // dyer combine the `Affix` returned by this function to each `Task` before make an request async fn parse(&mut self, _: Option<Result<Response, MetaResponse>>) -> Option<dyer::Affix> { // return the user-agent in order self.iter.next().to_owned() } // other method of Affixor ... } // src/actor.rs #[dyer::async_trait] impl Actor<_, _> for MyActor { async fn entry_affix(&mut self) -> Option<Aff> { Some(Aff) } // other method of Actor ... } }
Attribute
You may notice that some components are annotated with something like #[dyer::entity]
, #[dyer::actor]
or others, they are attribute Macros what transforms a block of code into another code block.
All of availiable attributes are following.
#[dyer::affix]
mark the type annotated forAffix
#[dyer::actor]
mark the type annotated forActor
#[dyer::middleware]
mark the type annotated forMiddleware
#[dyer::pipeline]
mark the type annotated forPipeline
#[dyer::parser]
mark the type annotated forparser
, any function with this attribute can parse response.#[dyer::entity]
mark the type annotated forentity
, any type with this attribute can contain data to be collected.#[dyer::async_trait]
mark the type annotated forasync_trait
, note that it is a wrapper of crate async_trait
Problem & Feedback
It is, of course, probable that bugs and errors lie in somewhere, and defects may appear in an unexpected way, if you got any one, comments and suggestions are welcome, please new a issue in my github.