Introduction

dyer is designed for reliable, flexible and fast Request-Response based service, including data processing, web-crawling and so on, providing some friendly, flexible, comprehensive features without compromising speed.

dyer provides some high-level features:

  • asynchronous, lock-free, concurrent streaming and I/O, make the best of thread pool, network, and system resource.
  • Event-driven, once you set the initials and recursive conditions, dyer will handle the rest of it.
  • User-friendly and flexible, dyer offers high-level, easy to use wrappers and APIs what does a lot for you.

Dyer-cli

Before stepping into the topic, dyer-cli is highly recommanded to be installed. Dyer-cli is a handy tool for your easy and fast use of dyer,

Installation

dyer-cli is public crate, just run the following in the terminal,

cargo install dyer-cli 

once installed, type dyer in the terminal to check, if something like following it is successfully installed.

Handy tool for dyer

USAGE:
   dyer [subcommand] [options]
   eg. dyer new myproject --debug 
	 ...

Create Project

Dyer-cli generates a template that contains many useful instances and instructions when using dyer with following code:

dyer new myproject

It will create a project called myproject and the files layout displays:

|___Cargo.toml
|___Readme.md
|___data/
|___data/tasks/
|___src/
    |___src/affix.rs
    |___src/entity.rs
    |___src/parser.rs
    |___src/actor.rs
    |___src/middleware.rs
    |___src/pipeline.rs

Project layout and its role

Main functionality of each file:

  • the affix.rs serves as an actor to adjust and satisfy additional requirement
  • the entity.rs contains entities/data structure to be used/collected
  • the parser.rs contains functions that extract entities from response
  • the actor.rs contains initial when opening and final things to do when closing
  • the middleware.rs contains Some middlewares that process data at runtime
  • the pipeline.rs contains entities manipulation including data-storage, displsying and so on
  • the lib.rs exports all modules inside the directory, just do nothing here normally
  • Cargo.toml is the basic configuration of the project
  • README.md contains some instructions of the project
  • data/ place to store/load files of App when load-balancing and backup

Basic Procedures

Then it is your show time, basically there are simple example items(function, enum, struct) in each file you can follow. After that check your code

dyer check

if you run it the first time, dyer-cli will download the crates and then check the code. if some warning happens such as unused import or dead code the command does a lot for you:

dyer fix

A wraper of cargo fix, if some warning happens such as unused import or dead code the command does a lot for you. However it won't help if some errors occur, if so, you have to debug the code manually.

Edit dyer.cfg file in the root directory

the file contains some configurations of ArgApp that will update periodically, for more details see [dyer.cfg Configuration]

When the program compiles, haha run it:

dyer run

Actor

Actor is trait that processing some methods to set up necessary conditions before the whole programs starts/ends. it basically categorizes:

  • Preparations Beforehand method new, open_actor and close_actor are provided to serve that purpose. First of all, you can define a struct according to your requirement, eg:

#![allow(unused)]
fn main() {
#[dyer::actor]
struct MyActor {
	start_uris: Vec<String>
	...
}
}

the struct MyActor should contain appropirate fields which initialized by the method new.


#![allow(unused)]
fn main() {
#[dyer::async_trait]
impl Actor<_, _> for MyActor {
	async fn new() -> Self {
		Self {
			start_uris: vec!["https://example.domain/path/to/site1".to_string(),
									"https://example.domain/path/to/site2".to_string() ] 
			//other fields 
			...
		}
	}
	// other method of Actor
	...
}
}

before the whole program starts, the method open_actor gets called. preparation should be done here! but wait, what should we do here? let's extend the example above a little bit.

all start uris are stored by lines in a file uris.txt


#![allow(unused)]
fn main() {
#[dyer::actor]
pub struct MyActor {
	start_uris: Vec<String>
}

#[dyer::async_trait]
impl Actor<_, _> for MyActor {
	async fn new() -> Self {
		use std::io::Read;

		let mut file = std::fs::File::open("path/to/uris.txt").unwrap();
		let buf = std::io::BufReader::new(file);
		let uris = buf.lines().map(|line| {
			 line.unwrap()
		}).collect::<Vec<String>>();
		Self {
			start_uris: uris
		}
	}

	async fn open_actor(&mut self, _app: &mut App<_>) {
		self.start_uris.for_each(|uri| {
			Task::get(uri)
				.parser(..)
				.body(Body::empty(), "myactor_identifier".into())
				.unwrap()
		});
	}
	// other method of Actor
	...
}
}

Analogously you can do some staff with close_actor when program ends.

  • Assignments Entry The program cannot starts without Task, entry_task serve as a way to add tasks to the lists. It expects a vector of Task when the function ends,

#![allow(unused)]
fn main() {
#[dyer::async_trait]
impl Actor<_, _> for MyActor {
	async fn entry_task(&mut self) -> Result<Vec<Task>, Box<dyn Error>> {
		self.start_uris.map(|uri| {
			Task::get(uri)
				.parser(..)
				.body(Body::empty(), "myactor_identifier".into())
				.unwrap()
		}).collect::<_>()
	}
	// other method of Actor
	...
}
}

As for entry_affix, it is commonly not necessary unless modification is required for that Task, But what is that? before we answer that let's take a look at the structure of Task,


#![allow(unused)]
fn main() {
pub struct Task {
    /// main infomation that represents a `Task`
    pub(crate) inner: InnerTask,
    /// Formdata, files or other request parameters stored here
    pub(crate) body: Body,
		...
}
pub struct InnerTask {
    pub uri: Uri,
    /// request's vesoin
    pub method: Method,
    /// additional headers if necessary
    pub headers: HeaderMap<HeaderValue>,
		...
}
}

it is obvious to see that a Task almost contains infomation to make a request.

But when does entry_affix play its role? Here are some scenarios that you may use it.

  1. Headers Modification (eg. Cookies, User-Agents, Tokens, and etc.)
  2. javascript simulation
  3. FFI and others

Here we focus on the first one(most used) and an example is given at section Actor.

Middleware

Middleware hooks all requests/responses and their derivatives of dyer, including Task, Affix, Request, Response, error and entiry. it's flexible, low-level, scale to modify the data flow of dyer.

Inspection of Middleware

before we dive deeper into what middleware is, let take a look at some simplified code of Middleware


#![allow(unused)]
fn main() {
pub struct MiddleWare<'md, E> {
    handle_affix:
        Option<&'md dyn for<'a> Fn(&'a mut Vec<Affix>, &'a mut App<E>)>,
    handle_task:
        Option<&'md dyn for<'a> Fn(&'a mut Vec<Task>, &'a mut App<E>)>,
    handle_req:
        Option<&'md dyn for<'a> Fn(&'a mut Vec<Request>, &'a mut App<E>)>,
    handle_res:
        Option<&'md dyn for<'a> Fn(&'a mut Vec<Response>, &'a mut App<E>)>,
    handle_entity:
        Option<&'md dyn for<'a> Fn(&'a mut Vec<E>, &'a mut App<E>)>,
    handle_yerr: Option<
        &'md dyn for<'a> Fn(
            &'a mut Vec<Result<Response, MetaResponse>>,
            &'a mut App<E>,
        )>,
    handle_err: Option<
        &'md dyn for<'a> Fn(
            &'a mut Vec<Result<Response, MetaResponse>>,
            &'a mut App<E>,
        )>,
		// some other fields
		...
}
}

As shown above, it accepts some nullable async function as handlers for requests, response and its derivatives. let's log out errors:


#![allow(unused)]
fn main() {
pub async fn log_err(errs: &mut Vec<Result<Response, MetaResponse>, _: &mut App<E>> {
	for r in errs.iter() {
		match r {
			Ok(data) => {
				println!("failed request to {}", data.metas.info.uri);
			},
			Err(e) => {
				println!("failed request to {}", e.info.uri);
			}
		}
	}
}

// set up `handle_err` 
let middleware = MiddleWare::builder().err_mut(&log_err).build("marker".into());
}

that middleware will log out uri of failed response.

Pipeline & Database Intergration

the end of data flow, it will be consumed. When an entity has been collected, it eventually will be sent to pipelines. Pipeline provides way to do:

  • cleaning/validating collected entity
  • de-duplicates
  • database storing

Inspection of Pipeline

Let's take a look at the simplified code of Pipeline before diving deeper.


#![allow(unused)]
fn main() {
pub struct PipeLine<'pl, E, C> { 
    initializer: Option<&'pl dyn for<'a> Fn(&'a mut App<E>) -> Option<C>>,
 
    disposer: Option<&'pl dyn for<'a> Fn(&'a mut App<E>)>,          
 
    process_entity: 
        Option<&'pl dyn for<'a> Fn(Vec<E>, &'a mut App<E>)>, 
 
    process_yerr: Option< 
        &'pl dyn for<'a> Fn( 
            Vec<Result<Response, MetaResponse>>, 
            &'a mut App<E>, 
        )>,
		// other fields omitted
		...
}
}
  • the method initializer get called only once over the runtime, it returns a generic type C which defined by user, the generic type is usually a connection cursor to storage destination.
  • the method disposer get called once when the pipeline ends.
  • the method process_entity processes a vector of entity then consume them.
  • the method process_yerr processes a vector of failed response then consume them.

Diesel Sql

Diesel is the most productive way to interact with SQL databases. It is recommanded to get around the basics of diesel here . A detailed example is given at examples.

Other Database

Almost other databases are equipmented with rust-based driver, it is just as simple as following the documentation, implementing the necessary methods.

Here is an simple example for MongoDB Intergration with driver mongodb.


#![allow(unused)]
fn main() {
pub async fn establish_connection(_app: &mut App<_>) -> Option<&'static mongodb::Client> {
		static INIT: Once = Once::new();
    static mut VAL: Option<mongodb::Client> = None;
    unsafe {                        
        let uri = "mongodb://127.0.0.1:27017";
        INIT.call_once(|| {
            VAL = Some(mongodb::Client::with_uri_str(uri).await.unwrap());  
        });                
        VAL.as_ref()                                                                    
    }
}

pub async fn store_item(ens: Vec<_>, _app: &mut App<_>) {
	// do stuff here like validating and dropping 
	...
	let client = establish_connection(_app).await;
	client.database("database_name_here")
		.collection("collection_name_here")
		.insert_one(...)
		.await
		.unwrap();
}

// set up pipiline 
let pipeline = Pipeline::builder()
	.initializer(establish_connection)
	.entity_mut(store_item)
	.build("marker".into());
}

This pipeline will insert collected entity into MongoDB.

Affix

Affix is the fringerprint when making a request. In general, affix is not necessay unless the target site requires visitor meet some criteria. Affix, by far, mainly focus on modification of Headers.

assign a user-agent for each Task with file user-agents.txt containing user-agents by lines


#![allow(unused)]
fn main() {
// src/affix.rs
pub struct Aff {
	uas: Vec<String>
	iter: std::iter::Cycle<String>,
}

#[dyer::async_trait]
impl Affixor for Aff {
	// this function only runs once 
	async fn init(&mut self) {
		use std::io::Read;

		let mut file = std::fs::File::open("path/to/user-agents.txt").unwrap();
		let buf = std::io::BufReader::new(file);
		let uas = buf.lines().map(|line| {
			 line.unwrap()
		}).collect::<Vec<String>>();
		self.uas = uas;
		self.iter = self.uas.iter().cycle();
	}

	// if the affix isn't obtained via network(request-response), just return `None` 
	async fn invoke(&mut self) -> Option<dyer::Request> {
			None
	}
	// dyer combine the `Affix` returned by this function to each `Task` before make an request
	async fn parse(&mut self, _: Option<Result<Response, MetaResponse>>) -> Option<dyer::Affix> {
		// return the user-agent in order
		self.iter.next().to_owned()
	}

	// other method of Affixor
	...
}

// src/actor.rs

#[dyer::async_trait]
impl Actor<_, _> for MyActor {
	async fn entry_affix(&mut self) -> Option<Aff> {
			Some(Aff)
	}

	// other method of Actor
	...
}

}

Attribute

You may notice that some components are annotated with something like #[dyer::entity], #[dyer::actor] or others, they are attribute Macros what transforms a block of code into another code block. All of availiable attributes are following.

  • #[dyer::affix] mark the type annotated for Affix
  • #[dyer::actor] mark the type annotated for Actor
  • #[dyer::middleware] mark the type annotated for Middleware
  • #[dyer::pipeline] mark the type annotated for Pipeline
  • #[dyer::parser] mark the type annotated for parser, any function with this attribute can parse response.
  • #[dyer::entity] mark the type annotated for entity, any type with this attribute can contain data to be collected.
  • #[dyer::async_trait] mark the type annotated for async_trait, note that it is a wrapper of crate async_trait

Problem & Feedback

It is, of course, probable that bugs and errors lie in somewhere, and defects may appear in an unexpected way, if you got any one, comments and suggestions are welcome, please new a issue in my github.