Updated on 3月 4, 2021
InfluxStreamSharp —— 流式读写InfluxDB
简介 / Introduction
适用于有大量数据持续性写入的业务场景,内置写入队列,按照一定的时间间隔将数据分批写入。建议写入时带入时间戳,这样读取时时间才准确。
当需要读取时,支持重现写入时的场景,举例:若在 t1 时刻写入了数据记录 a1,在 t2 = t1 + 1s 时刻写入了数据 a2,在 t2 + 5 s 时刻又写入了数据 a3,则在读取时,QueryManager会在返回 a1 记录之后1秒返回 a2,再等5秒返回 a3。就好像播放视频录像一样,进行历史再现。此功能内置懒加载功能和缓冲队列,不用担心时间段过长、数据过多造成InfluxDB查询卡死等问题。
当然若不希望使用这种查询方式,可以使用传统的查询模式,即给定时间范围,一次性将所有数据取出来。
读取和写入数据都借鉴了 EntityFramework 的思路,即采用实体对象进行映射,极大地简化了数据库读取、序列化和反序列化的步骤。在构建实体对象时,需要采用特性(Attribute)对字段中的Tag、Value和Timestamp进行标记。
Github
https://github.com/XingKongSync/InfluxStreamSharp
用法 / Usage
static void Main(string[] args)
{
//Init a InfluxDB writing buffer
WriteService.Instance.Value.Start();
//Create a database if not exist
var influx = InfluxService.Instance.Value;
influx.InitAsync(
DB_Url,
DB_UserName,
DB_Pwd,
DB_DbName,
DB_RetentionHours
).Wait();
//write data with buffering
TestStreamingWrite();
//Read all data by buffering and timing
TestStreamingRead();
Console.ReadKey();
}
static void TestStreamingWrite()
{
for (int i = 0; i < 10; i++)
{
var testModel = new DataModel.Test();
testModel.DeviceId = i.ToString();
testModel.x = i;
testModel.y = i;
testModel.LocalTime = DateTime.Now.AddMinutes(-1 * i);
//Convert custom data model to influx model
var point = ModelTransformer.Convert(testModel);
//Send the data to the writing queue, the data will be buffered and send to InfluxDB
WriteService.Instance.Value.Enqueue(point);
}
}
static void TestStreamingRead()
{
//Build a query statement
InfluxQLTemplet templet = new InfluxQLTemplet();
templet.Measurement = ModelTransformer.GetMeasurement(typeof(DataModel.Test));
//Add query reqirement
//templet.WhereEqual("DeviceId", "0");//Only query data which DeviceId equals to 0
//Construct query manager for streaming read
QueryManager manager = new QueryManager(DateTime.Now.AddMinutes(-30), DateTime.Now);//Query data within 30 miniutes
//If you want do muliti queries, please add more QueryTemplet
manager.AddInfluxQueryTemplet<DataModel.Test>(templet);
//Handle receveied data
manager.DataReceived += (object data) =>
{
if (data is DataModel.Test t)
{
Console.WriteLine($"CurrentPlayTime: {manager.CurrentPlayTime.ToString("yyyy-MM-dd HH:mm:ss")}, id: {t.DeviceId}, x: {t.x}, y: {t.y}");
}
};
//Start query data
manager.Start();
}