gRPC with .NET : Server, Client & Bidirectional Streaming

Satyam Pushkar
12 min readJun 13, 2021

--

In the last article I have explained in detail about the boilerplate code for gRPC generated by .NET template. In this article I am going to explain you about different ways offered by gRPC through which client and server can communicate.

gRPC has 4 different types of methods:

  • Unary: Call starts when client sends a request message. Server works on request to create/populate response and return it to client. Client receives it and then the call gets completed.
  • Server streaming: This is similar to unary RPC, except that the server returns a stream of messages in response to a client’s request. ResponseStream.MoveNext() reads messages streamed from the service. The server streaming call is complete when ResponseStream.MoveNext() returns false. await foreach can be used to read messages if you are using C# 8 or later. The IAsyncStreamReader<T>.ReadAllAsync() extension method reads all messages from the response stream.
  • Client streaming: This is also similar to unary RPC, except that the client sends a stream of messages to the server instead of a single message and server responds with a single message. Client sends a stream of messages using RequestStream.WriteAsync(). When client has finished sending messages, RequestStream.CompleteAsync() should is called to notify the service.
  • Bidirectional streaming: This is a combination of both client and server side streaming. the call is initiated by the client invoking the method and the server receiving the client metadata, method name, and deadline. The client can choose to send messages with RequestStream.WriteAsync(). Messages streamed from the service are accessible with ResponseStream.MoveNext() or ResponseStream.ReadAllAsync(). The bi-directional streaming call is complete when the ResponseStream has no more messages.

Let’s implement and understand each one of the above. I am going to explain each of these method with example. You can find the complete code of this example at this GitHub Repo. You will find following three folders.

Protos folder will have ‘.proto’ file. Client and Server folder will have their respective implementation. Please have the code handy as it will help you in understanding. If you want to create the project on your own, please go through this article once. Because in current article my focus is more towards showcasing different types of RPC methods and streaming examples.

Unary RPC Method

This is a simple call where client sends a request and gets a response back from server.

To implement any RPC call there are 3 main steps involved:

  1. Protobuf declaration(.proto file)
  2. Server side implementation
  3. Client side implementation

Let’s understand these three implementations as part of Unary RPC.

  • Unary RPC: Protobuf declaration(.proto file)

For Unary RPC, stocks.proto file will have a definition similar to below image:

excerpts from stocks.proto for unary RPC

Here rpc GetStockListings (google.protobuf.Empty) returns (StockListing) line denotes that there is a GetStockListings procedure/method which takes no/empty request (google.protobuf.Empty denotes empty, it is imported into protobuf using import “google/protobuf/empty.proto”) and returns StockListing as response.

Same is true for GetStockPrice rpc method also. This takes Stock as request and returns StockPrice as response. You can see these request and response messages’ definitions in the above image.

  • Unary RPC: Server side implementation

Once the protobuf file is created, you need to add the reference of this .proto file into your .csproj file similar to shown in below image.

Here stocks.proto which resides in an external folder is included under solution under Protos folder. GrpcServices=”Server” specifies that this project is going to create server side implementation of defined .proto file. Once this is added you will have partial implementation added automatically. If not please build the project to add boilerplate implementation. To know more about boilerplate implementation, Please check this article.

To mimic the data I have created a list of stocks(_stocks) which has name and id property(defined in stocks.proto file as message- Stock). Please don’t worry about the authorization as of now. You can comment it out for now. I will explain about it later.

Now comes the most important part, the actual server side implementation. Grpc.Tool has already created the boilerplate code for you. StockDataService class which is derived from StockService.StockServiceBase (partial abstract class) has virtual functions for all the rpc method defined in our proto file. All we need is to override it and provide the actual implementation to process the request to generate the response. Check the below code:

//Unary RPC Method [server side]
public override Task<StockListing> GetStockListings(Empty request, ServerCallContext context)
{
return Task.FromResult(new StockListing { Stocks = { _stocks } });
}
public override Task<StockPrice> GetStockPrice(Stock request, ServerCallContext context)
{
var rnd = new Random(100);
return Task.FromResult(
new StockPrice
{
Stock = _stocks.FirstOrDefault(x => x.StockId == request.StockId),
DateTimeStamp = DateTime.UtcNow.ToTimestamp(),
Price = rnd.Next(100, 500).ToString()
});
}

The code is self-explanatory. GetStockListings RPC method is taking no input and returning list of stocks(shown in above image). GetStockPrice RPC method is taking a particular stock’s name/id and returning it’s price. For demo purpose I am generating some random number as price.

  • Unary RPC: Client side implementation

Now comes the third and the final part: how to call this RPC method defined on server side. For this purpose I have created a Console application.

The very first step is to add the reference of the .proto file into client project. This step is similar to the one explained earlier for the server side. The only difference here is GrpcServices=”Client”. It specifies that this project is going to create a client side implementation of defined .proto file. Check the image shown below.

Next step is to create a channel and a client. Please check the syntax below:

var channel = GrpcChannel.ForAddress(“https://localhost:5001");
var client = new StockService.StockServiceClient(channel);

By default server address will be “https://localhost:5001" but please check and replace with your server’s endpoint.

Now the channel and client is created, all you need is to call the method and you will get the response. You will get async and non-async implementation for the rpc method(s) defined. Ignore the headers/authorization part of code as of now. Please find the code below for calling the rpc method:

//Unary RPC Method [client side]
//Calling GetStockListings

var result = await client.GetStockListingsAsync(new Empty(), headers: headers);
foreach (var stock in result.Stocks)
{
Console.WriteLine($"{stock.StockId}\t{stock.StockName}");
}
//Calling GetStockPrice
var result = await client.GetStockPriceAsync(new Stock { StockId = “FB” }, headers: headers);
Console.WriteLine($”Stock Id: {result.Stock.StockId}\n Stock Name: {result.Stock.StockName}\n “ +
$”Stock Price: {result.Price}\n TimeStamp: {result.DateTimeStamp}”);

Server Streaming RPC Method

In this method the server returns a stream of messages in response to a client’s request. Let’s understand the three different implementation part for Server Streaming RPC.

  • Server Streaming RPC: Protobuf declaration(.proto file)

The stocks.proto file will have a definition similar to below image for Server Streaming RPC:

Please notice that there is a stream appended just before the response message(StockPrice). This denotes that once client sends a request, server will respond with a stream of responses. This same stocks.proto file should be added in both the client and server project.

  • Server Streaming RPC: Server side implementation

To provide server side implementation for this method: GetStockPriceStream, you need to override it in StockDataService class. Please check the code shown below:

//Server Streaming RPC [server side]
public override async Task GetStockPriceStream(Empty request, IServerStreamWriter<StockPrice> responseStream, ServerCallContext context)
{
int i = 10;
var rnd = new Random(100);
while (!context.CancellationToken.IsCancellationRequested && i > 0)
{
_stocks.ForEach(async s =>
{
var time = DateTime.UtcNow.ToTimestamp();
await responseStream.WriteAsync(new StockPrice
{
Stock = s,
DateTimeStamp = time,
Price = rnd.Next(100, 500).ToString()
});
});
await Task.Delay(500);
}
}

One important things to notice here: in return type all you can see is a Task. But check the second input parameter: IServerStreamWriter<StockPrice> responseStream. Using this parameter only, you will be able to send back stream of responses to client. In the above code await responseStream.WriteAsync is used to write responses to stream. This stream will be available to client for reading. Either of client or server can close this stream. As it’s a demo application, I am generating and sending random stock prices. I have used Task.Delay(500) to mimic that server is taking some time to compute.

  • Server Streaming RPC: Client side implementation

Now comes the third part: how to call this RPC method defined on server side and more importantly, how to read the stream of responses on client side. You can check the console application in code repository. In Programm.cs, you can find the below piece of code which is calling the GetStockPriceStream RPC method of server:

//Server Streaming RPC [client side]
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
using var streamingCall = client.GetStockPriceStream(new Empty(), cancellationToken: cts.Token, headers: headers);
try
{
Console.WriteLine(string.Format($"Stock Id\t Stock Name\t Stock Price\t TimeStamp"));
await foreach (var stockPrice in streamingCall.ResponseStream.ReadAllAsync(cancellationToken: cts.Token))
{
Console.WriteLine(string.Format($"{stockPrice.Stock.StockId}\t {stockPrice.Stock.StockName}\t {stockPrice.Price}\t {stockPrice.DateTimeStamp}"));
}
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
{
Console.WriteLine("Stream cancelled.");
}
catch (Exception ex)
{
Console.WriteLine("Error reading response: " + ex);
}

Please understand, how the response stream is getting read here: streamingCall.ResponseStream.ReadAllAsync. This is reading the response stream which is getting populated by server.

Client Streaming RPC Method

This is just the opposite of Server Streaming. In this method the client sends a stream of messages and server responds with a single response . Let’s understand the three different implementation part for Client Streaming RPC.

  • Client Streaming RPC: Protobuf declaration(.proto file)

For Client Streaming RPC, the stocks.proto file will have a definition similar to below image:

Please notice that stream is appended here just before the request message(Stock). This denotes that client sends a stream of requests and server will respond with one response. This same stocks.proto file should be added in both the client and server project.

  • Client Streaming RPC: Server side implementation

Once the rpc definition is added in protobuf file, build the project once. This will add the partial implementation for gRPC. Remember! Here client is going to write request stream and server is going to read it. Please take a look at the following code where GetStocksPrices is getting overridden in StockService.StockServiceBase class:

//Client Streaming RPC [server side]
public override async Task<StocksPrices> GetStocksPrices(IAsyncStreamReader<Stock> requestStream, ServerCallContext context)
{
var rnd = new Random(100);
var inputStocksList = new List<Stock>();
await foreach (var request in requestStream.ReadAllAsync())
{
inputStocksList.Add(request);
_logger.LogInformation($"Getting stock Price for {request.StockName}({request.StockId})");
}
var response = new StocksPrices();
foreach (var inputStock in inputStocksList)
{
response.StockPriceList.Add(
new StockPrice
{
Stock = inputStock,
DateTimeStamp = DateTime.UtcNow.ToTimestamp(),
Price = rnd.Next(100, 500).ToString()
});
}
return response;
}

As mentioned earlier here client is streaming, hence first input parameter: IAsyncStreamReader<Stock> requestStream is used. To read the request streams from client requestStream.ReadAllAsync() is used.

  • Client Streaming RPC: Client side implementation

To send the stream from client to server, RequestStream.WriteAsync is used. Please check the Programm.cs of the client console application for the below code:

//Client Streaming RPC [client side]
using var streamingCall = client.GetStocksPrices(headers: headers);
//send requests through requst stream
foreach (var stockId in new[] { "FB", "AAPL", "AMZN", "MSFT", "GOOG" })
{
Console.WriteLine($"Requesting details for {stockId}...");
await streamingCall.RequestStream.WriteAsync(new Stock { StockId = stockId });//Mimicing delay in sending request
await Task.Delay(1500);
}
Console.WriteLine("Completing request stream");
await streamingCall.RequestStream.CompleteAsync();
Console.WriteLine("Request stream completed");
var response = await streamingCall;
foreach (var stockPrice in response.StockPriceList)
{
Console.WriteLine(string.Format($"{stockPrice.Stock.StockId}\t {stockPrice.Stock.StockName}\t {stockPrice.Price}\t {stockPrice.DateTimeStamp}"));
}

You can see in the above code that to mark the completion of write operation on request stream, RequestStream.CompleteAsync is called.

Bidirectional Streaming RPC Method

This is the combination of server and client streaming. Here client writes to request stream and server reads from it and server writes to response stream and client reads from it. Let’s understand the three different implementation part of it.

  • Bidirectional Streaming RPC: Protobuf declaration(.proto file)

For Bidirectional Streaming RPC, check the stocks.proto file for the definition similar to below image:

Here stream is appended before both the request and response. This denotes that client and server both can send as well as read stream. This same stocks.proto file should be added in both the client and server project.

  • Bidirectional Streaming RPC: Server side implementation

Once the rpc definition is added in protobuf file, build the project once. This will add the partial implementation for gRPC. To read the request streams and parallelly process the response streams I have followed the following approach in code:

  1. Read the stream using requestStream.ReadAllAsync
  2. Create a list of tasks to handle each request
  3. A local function which defines a task to handle the request, process it and write the response to channel
  4. A background task which reads response from channel and write it to response stream using responseStream.WriteAsync
  5. A channel to write and read response messages concurrently

To make it simple I have explained in the above mentioned order but you will find the implementation in the order of 5, 4, 2, 1, 3 in the code. Please find the code for the above mentioned steps below:

//Bidirectional Streaming RPC [server side]
public override async Task GetCompanyStockPriceStream(IAsyncStreamReader<Stock> requestStream, IServerStreamWriter<StockPrice> responseStream, ServerCallContext context)
{
// we'll use a channel here to handle in-process 'messages' concurrently being written to and read from the channel.
var channel = Channel.CreateUnbounded<StockPrice>();
// background task which uses async streams to write each stockPrice from the channel to the response steam.
_ = Task.Run(async () =>
{
await foreach (var stockPrice in channel.Reader.ReadAllAsync())
{
await responseStream.WriteAsync(stockPrice);
}

});
// a list of tasks handling requests concurrently
var getCompanyStockPriceStreamRequestTasks = new List<Task>();
//Step 1
try
{
// async streams used to read and process each request from the stream as they are receieved
await foreach (var request in requestStream.ReadAllAsync())
{
_logger.LogInformation($"Getting stock Price for {request.StockName}({request.StockId})");
// start and add the request handling task
getCompanyStockPriceStreamRequestTasks.Add(GetStockPriceAsync(request));
}
_logger.LogInformation("Client finished streaming");
}
catch (Exception ex)
{
_logger.LogError(ex, "An exception occurred");
}
// wait for all responses to be written to the channel
// from the concurrent tasks handling each request
await Task.WhenAll(getCompanyStockPriceStreamRequestTasks);
channel.Writer.TryComplete();// wait for all responses to be read from the channel and streamed as responses
await channel.Reader.Completion;
_logger.LogInformation("Completed response streaming");// a local function which defines a task to handle a Company Stock Price request
// it mimics 10 consecutive stock price, simulating a delay of 0.5s
// multiple instances of this will run concurrently for each recieved request
async Task GetStockPriceAsync(Stock stock)
{
var rnd = new Random(100);
for (int i = 0; i < 10; i++)
{
var time = DateTime.UtcNow.ToTimestamp();
await channel.Writer.WriteAsync(new StockPrice
{
Stock = _stocks.FirstOrDefault(x => x.StockId == stock.StockId),
Price = rnd.Next(100, 500).ToString(),
DateTimeStamp = time
});
await Task.Delay(500);
}
}
}
  • Bidirectional Streaming RPC: Client side implementation

Now comes the third and the final part of Bidirectional Streaming: How to write to request streams and parallelly read the response streams. I have achieved this using these following steps:

  1. A background task which is reading the response streams from server by using ResponseStream.ReadAllAsync.
  2. Writing to request stream using RequestStream.WriteAsync. I have used Task.Delay to mimc delay.
  3. Calling the RequestStream.CompleteAsync to close the request stream.

To understand the above mentioned steps, please have a look at the following code from programm.cs where client is calling GetCompanyStockPriceStream.

using var streamingCall = client.GetCompanyStockPriceStream(headers: headers);// background task which uses async streams to read each stockPrice from the response steam.
_ = Task.Run(async () =>
{
try
{
await foreach (var stockPrice in streamingCall.ResponseStream.ReadAllAsync())
{
Console.WriteLine(string.Format($"{stockPrice.Stock.StockId}\t {stockPrice.Stock.StockName}\t {stockPrice.Price}\t {stockPrice.DateTimeStamp}"));
}
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
{
Console.WriteLine("Stream cancelled.");
}
catch (Exception ex)
{
Console.WriteLine("Error reading response: " + ex);
}
});
//send requests through requst stream
foreach (var stockId in new[] { "FB", "AAPL", "AMZN", "MSFT", "GOOG" })
{
Console.ForegroundColor = ConsoleColor.Cyan;
Console.WriteLine($"Requesting details for {stockId}...");
Console.ResetColor();
await streamingCall.RequestStream.WriteAsync(new Stock { StockId = stockId });//Mimicing delay in sending request
await Task.Delay(1500);
}
Console.WriteLine("Completing request stream");
await streamingCall.RequestStream.CompleteAsync();
Console.WriteLine("Request stream completed");

Here the rest of the code is all self explanatory.

To run the project locally please clone/download the code from this github repo. In the repo code for authentication and service health check endpoint is also added.

Open both the client and server solution in two different visual studio windows. Run the server application first by using ctrl+F5 in visual studio. Then run the client side application by following the same step. I have used a while loop on client side to give an option to call different RPC methods. You can select a particular option to check/debug/see respective method flow. Please find the snapshots of server(on left side) and client(on right side) logs below:

Unary RPC logs
Server Streaming RPC logs
Client Streaming RPC logs
Bidirectional Streaming RPC logs

Hope your understanding of gRPC and how different streaming works have improved.

Please check these other articles about gRPC by me:

--

--

Satyam Pushkar
Satyam Pushkar

Written by Satyam Pushkar

Software Engineer | Backend Specialist | System Architect | Cloud Native Apps | DOTNET, PYTHON | https://www.linkedin.com/in/satyampushkar/

No responses yet