gRPC client in C#

gRPC client in C#

Setup

Install dotnet-sdk

brew install dotnet-sdk

Create project directory

mkdir example-csharp && cd example-csharp

Create console project

dotnet new console --framework net7.0

Add gRPC packages

dotnet add package Grpc.Tools
dotnet add package Grpc.Net.Client
dotnet add package Google.Protobuf

Download machrpc.proto

curl -o machrpc.proto https://raw.githubusercontent.com/machbase/neo-grpc/main/proto/machrpc.proto

After downloading proto file, it is required to add csharp_namespace option in the file.

option csharp_namespace = "MachRpc";

Add ItemGroup in example-csharp.csproj XML file

  <ItemGroup>
    <Protobuf Include="machrpc.proto" GrpcServices="Client"/>
  </ItemGroup>

Query

Connect to server

var channel = GrpcChannel.ForAddress("http://127.0.0.1:5655");
var client = new MachRpc.Machbase.MachbaseClient(channel);

Execute query

var req = new MachRpc.QueryRequest
{
    Sql = "select * from example order by time limit ?",
    Params = { Any.Pack(new Int32Value { Value = 10 }) }
};
var rsp = client.Query(req);

Get columns info of result set

var cols = client.Columns(rsp.RowsHandle);
var headers = new List<string>{"RowNum"};
if (cols.Success) {
    foreach (var c in cols.Columns) {
        headers.Add($"{c.Name}({c.Type})");
    }
}
Console.WriteLine(String.Join("   ", headers));

This will print column labels.

NAME(string)   TIME(datetime)   VALUE(double)

Fetch results

int nrow = 0;
try
{
    while (true)
    {
        var fetch = client.RowsFetch(rsp.RowsHandle);
        if (fetch.HasNoRows)
        {
            break;
        }
        nrow++;
        var line = new List<string> { $"{nrow}   "};
        foreach (Any v in fetch.Values)
        {
            line.Add(convpb(v));
        };
        Console.WriteLine(String.Join("    ", line));
    }
}
finally
{
    if (rsp.Success && rsp.RowsHandle != null)
    {
        client.RowsClose(rsp.RowsHandle);
    }
}
⚠️
Close rows
Do not forget to close rows by calling RowsClose().

Convert Google.Protobuf.WellKnownTypes.Any to string

static string convpb(Any v)
{
    if (v.TypeUrl == "type.googleapis.com/google.protobuf.StringValue")
    {
        var sval = v.Unpack<StringValue>();
        return sval.Value;
    }
    else if (v.TypeUrl == "type.googleapis.com/google.protobuf.Timestamp")
    {
        var ts = v.Unpack<Timestamp>();
        return ts.ToDateTime().ToString("MM/dd/yyyy HH:mm:ss");
    }
    else if (v.TypeUrl == "type.googleapis.com/google.protobuf.DoubleValue")
    {
        var fv = v.Unpack<DoubleValue>();
        return fv.Value.ToString();
    }
    else
    {
        throw new Exception($"Unsupported type {v.TypeUrl}");
    }
}

Output

$ dotnet run
RowNum   NAME(string)   TIME(datetime)   VALUE(double)
1       wave.sin    2023. 02. 08 11:36:38    -0.994521
2       wave.cos    2023. 02. 08 11:36:38    -0.104538
3       wave.sin    2023. 02. 08 11:36:37    -0.866021
4       wave.cos    2023. 02. 08 11:36:37    -0.500008
5       wave.cos    2023. 02. 08 11:36:36    -0.809022
6       wave.sin    2023. 02. 08 11:36:36    -0.587778
7       wave.cos    2023. 02. 08 11:36:35    -0.978149
8       wave.sin    2023. 02. 08 11:36:35    -0.207904
9       wave.cos    2023. 02. 08 11:36:34    -0.978146
10       wave.sin    2023. 02. 08 11:36:34    0.207919

Full source code

using Grpc.Net.Client;
using Google.Protobuf.WellKnownTypes;

internal class Program
{
    private static void Main(string[] args)
    {
        var channel = GrpcChannel.ForAddress("http://127.0.0.1:5655");
        var client = new MachRpc.Machbase.MachbaseClient(channel);
        var req = new MachRpc.QueryRequest
        {
            Sql = "select * from example order by time limit ?",
            Params = { Any.Pack(new Int32Value { Value = 10 }) }
        };
        var rsp = client.Query(req);

        var cols = client.Columns(rsp.RowsHandle);
        var headers = new List<string>{"RowNum"};
        if (cols.Success)
        {
            foreach (var c in cols.Columns)
            {
                headers.Add($"{c.Name}({c.Type})");
            }
        }
        Console.WriteLine(String.Join("   ", headers));

        int nrow = 0;
        try
        {
            while (true)
            {
                var fetch = client.RowsFetch(rsp.RowsHandle);
                if (fetch.HasNoRows)
                {
                    break;
                }
                nrow++;
                var line = new List<string> { $"{nrow}   "};
                foreach (Any v in fetch.Values)
                {
                    line.Add(convpb(v));
                };
                Console.WriteLine(String.Join("    ", line));
            }
        }
        finally
        {
            if (rsp.Success && rsp.RowsHandle != null)
            {
                client.RowsClose(rsp.RowsHandle);
            }
        }
    }

    static string convpb(Any v)
    {
        if (v.TypeUrl == "type.googleapis.com/google.protobuf.StringValue")
        {
            var sval = v.Unpack<StringValue>();
            return sval.Value;
        }
        else if (v.TypeUrl == "type.googleapis.com/google.protobuf.Timestamp")
        {
            var ts = v.Unpack<Timestamp>();
            return ts.ToDateTime().ToString("yyyy/MM/dd HH:mm:ss");
        }
        else if (v.TypeUrl == "type.googleapis.com/google.protobuf.DoubleValue")
        {
            var fv = v.Unpack<DoubleValue>();
            return fv.Value.ToString();
        }
        else
        {
            throw new Exception($"Unsupported type {v.TypeUrl}");
        }
    }
}

Append

Connect to server

var channel = GrpcChannel.ForAddress("http://127.0.0.1:5655");
var client = new MachRpc.Machbase.MachbaseClient(channel);

Prepare new appender

var appender = client.Appender(new MachRpc.AppenderRequest { TableName = "example" });
var stream = client.Append();
try {
    // code that use stream & appender.Handle
}
finally {
    await stream.RequestStream.CompleteAsync();
}

Make Main() as async Task Main() to allow awiat for async operation.

private static async Task Main(string[] args) {
    /// use await
}

Write data in high speed

for (int i = 0; i < 100000; i++)
{
    var ts = new Timestamp();
    var value = 0.1234;

    long tick = TimeUtils.GetNanoseconds();
    long secs = 1_000_000_000;
    ts.Seconds = Convert.ToInt32(tick / secs);
    ts.Nanos = Convert.ToInt32(tick % secs);

    var data = new MachRpc.AppendData { Handle = appender.Handle};
    data.Params.Add(Any.Pack(new StringValue { Value = "csharp.value" }));
    data.Params.Add(Any.Pack(ts));
    data.Params.Add(Any.Pack(new DoubleValue{ Value = value }));

    await stream.RequestStream.WriteAsync(data);
}

Run and count written records

dotnet run
machbase-neo shell "select count(*) from example where name = 'csharp.value'"
 #  COUNT(*)
─────────────
 1  100000

Full source code

using Grpc.Net.Client;
using Google.Protobuf.WellKnownTypes;
using System.Diagnostics;

internal class Program
{
    private static async Task Main(string[] args)
    {
        var channel = GrpcChannel.ForAddress("http://127.0.0.1:5655");
        var client = new MachRpc.Machbase.MachbaseClient(channel);

        // Appender example
        var appender = client.Appender(new MachRpc.AppenderRequest { TableName = "example" });
        var stream = client.Append();
        
        var stopwatch = new Stopwatch();
        stopwatch.Start();
        try
        {
            for (int i = 0; i < 100000; i++)
            {
                var ts = new Timestamp();
                var value = 0.1234;

                long tick = TimeUtils.GetNanoseconds();
                long secs = 1_000_000_000;
                ts.Seconds = Convert.ToInt32(tick / secs);
                ts.Nanos = Convert.ToInt32(tick % secs);

                var data = new MachRpc.AppendData { Handle = appender.Handle};
                data.Params.Add(Any.Pack(new StringValue { Value = "csharp.value" }));
                data.Params.Add(Any.Pack(ts));
                data.Params.Add(Any.Pack(new DoubleValue{ Value = value }));
                await stream.RequestStream.WriteAsync(data);
            }
        }
        finally
        {
            await stream.RequestStream.CompleteAsync();

            stopwatch.Stop();
            var elapsed_time = stopwatch.ElapsedMilliseconds;
            Console.WriteLine($"Elapse {elapsed_time}ms.");
        }
    }

    public static class TimeUtils
    {
        public static long GetNanoseconds()
        {
            double timestamp = Stopwatch.GetTimestamp();
            double nanoseconds = 1_000_000_000.0 * timestamp / Stopwatch.Frequency;
            return (long)nanoseconds;
        }
    }
}
Last updated on