6: Client/Server Communication Implementation(2)
2018/10/21
I have told you that I want to implement client/server communication two days ago. Unfortunately, I spend lots of hour for understanding Tokio.rs
and future
. So this is the part 2 of this topic.
Tokio.rs
is a framework based on future
, and future
makes writing asynchronous program easy.
Asynchronous IO is always the most difficult part in an application, not to mention the fact that Tokio.rs
is very new with few documents. Therefore, I need learn by programing, trying to make the code of Tokio.rs
works.
Finally, I have done the communication between client and server. So let's look at what I have written in StellarSQL.
Each commits have their own meaning. I will explain the commits in order, and those commit also represent the process of my thoughts.
Communication
Think about how server and client communicate with each other, we talked there is a protocol doing that. A protocol could be HTTP or defined by ourselves. Protocols are based on TCP or UDP, and we will use TCP for DBMS due to the trait that TCP won't lose packets.
Let's dig deeper for a communication.
A client creates a socket and send some bytes to a server. The bytes are encode in the format of a protocol, which clients and server should all follow with. The server receive sockets and get bytes, and the server will parse the bytes to meaningful messages according to the protocol.
Once the server get a message, which might be any commands, including searching, inserting, deleting, ect., the server will reply the result to client. In the same way, the server will give the answer in a response which is serialized by the protocol.
Commits
If you read this series after today for a long time, I believe most parts would be changed a lot. Though I am going to write down some code in the following part, you can check out commits if you like to read what the whole code looks like at this moment.
These commits are done in today:
- main: implement connection
- connection: mod add
request
andresponse
- response: enum Response and impl serialize
- request: enum Request and impl parse
- message: define struc and impl
Or you can run the command to see the version in today:
git checkout 569fd175824000d372370496949d487a87823a25
Code
Recall that we listen sockets and want to do process()
src/main.rs
let server = listener
.incoming()
.for_each(move |socket| {
let addr = socket.peer_addr().unwrap();
println!("New Connection: {}", addr);
// Spawn a task to process the connection
process(socket);
Ok(())
}).map_err(|err| {
println!("accept error = {:?}", err);
});
The process()
is doing the process of what I just talk about for communication above, and there are comments for explaining in the following snippet:
src/main.rs
fn process(socket: TcpStream) {
// split socket into two parts
let (reader, writer) = socket.split();
// make stream bytes to a message
let messages = message::new(BufReader::new(reader));
// note the `move` keyword on the closure here which moves ownership
// of the reference into the closure, which we'll need for spawning the
// client below.
//
// The `map` function here means that we'll run some code for all
// requests (lines) we receive from the client. The actual handling here
// is pretty simple, first we parse the request and if it's valid we
// generate a response.
let responses =
messages.map(move |message| match Request::parse(&message) {
Ok(req) => req,
Err(e) => return Response::Error { msg: e },
});
// At this point `responses` is a stream of `Response` types which we
// now want to write back out to the client. To do that we use
// `Stream::fold` to perform a loop here, serializing each response and
// then writing it out to the client.
let writes = responses.fold(writer, |writer, response| {
let response = response.serialize().into_bytes();
write_all(writer, response).map(|(w, _)| w)
});
// `spawn` this client to ensure it
// runs concurrently with all other clients, for now ignoring any errors
// that we see.
let connection = writes.then(move |_| Ok(()));
// Spawn the task. Internally, this submits the task to a thread pool.
tokio::spawn(connection);
}
We see message
, response
and request
in main.rs
. These modules are defined in connection
module.
message
is for parsing the stream into a Message
, but I don't implement a well designed protocol. The poll()
function will decode streams of each socket to messages, so we should implement our protocol here. Now it just see any "lines" sent from terminals entered by users as messages, so I delete \n\r
in the function poll()
(not showing here).
Note that impl<A> Stream for Message<A>
is needed if we want to use functions of future
crate and spawn the task to thread pool. (Remember Tokio.rs
is based on future
?)
src/connection/message.rs
pub struct Message<A> { /* ... */}
pub fn new<A>(a: A) -> Message<A> { /* ... */}
impl<A> Stream for Message<A> {
fn poll(&mut self) -> Poll<Option<String>, io::Error> {/* ... */}
}
request
and response
define the types of requests and responses. Just like HTTP, you send request in get
, post
, or put
, and get response with code
200, 300, 400. These modules are doing the same thing.
There should be many types in Request
, but I don't do it yet. request.parse()
will parse the request. In this step, it will check if the request is valid, such as authentication, and then return the response. The logic here might changed, because some logic could be better.
src/connection/request.rs
use Response;
pub enum Request {}
impl Request {
pub fn parse(input: &str) -> Result<Response, String> {
Ok(Response::OK {
msg: input.to_string(),
})
}
}
Response
defines all response types, and it can serialize()
the Response
in order to send back to the client. There will be more types, and serialize()
will follow the protocol.
src/connection/response.rs
pub enum Response {
OK { msg: String },
Error { msg: String },
}
impl Response {
pub fn serialize(&self) -> String {
match *self {
Response::OK { ref msg } => format!("{}\n", msg),
Response::Error { ref msg } => format!("Error: {}\n", msg),
}
}
}
So, the communication works. Clients can connect to StellarSQL. Now StellarSQL just echoes the request to response. When I finish the database and SQL parts, it will be a real DBMS.
You can build and run with:
cargo run
then open another terminal run the command, and put any words:
# A client connect to localhost 23333
telnet localhost 23333
StellarSQL will echoes what you enter. :)