.NetCore上使用ElasticSeach的一个helper类

.NetCore 上使用 ElasticSeach 的一个 helper 类

 

ElasticSeach 访问封装的服务类,有需要的可以直接 COPY 过去用

IElasticSearch 类的代码如下: 

 





    /// <summary>
    /// 访问 ElasticSearch 服务接口类
    /// Monkey 2019/05/23
    /// </summary>
    public interface IESSever
    {
        /// <summary>
        /// Linq 查询的官方 Client
        /// </summary>
        IElasticClient ElasticLinqClient { get; set; }
       
        /// <summary>
        /// Js 查询的官方 Client
        /// </summary>
        IElasticLowLevelClient ElasticJsonClient { get; set; }
 
        /// <summary>
        /// 封装后的 linq 的查询方式
        /// </summary>
        /// <typeparam name="T">要查询和返回的 Json</typeparam>
        /// <param name="indexName">index 的名称</param>
        /// <param name="typeName">type 的名称</param>
        /// <param name="selector">linq 内容</param>
        /// <returns></returns>
        Task<List<T>> SearchAsync<T>(string indexName, string typeName, Func<QueryContainerDescriptor<T>, QueryContainer> selector = null) where T : class;
 
        /// <summary>
        /// 封装后的 Json 的查询方式
        /// </summary>
        /// <param name="indexName">index 的名称</param>
        /// <param name="typeName">type 的名称</param>
        /// <param name="jsonString">json 字符串</param>
        /// <returns></returns>
        Task<JToken> SearchAsync(string indexName, string typeName, string jsonString);
 
        /// <summary>
        /// 封装后的创建 index
        /// </summary>
        /// <param name="indexName"></param>
        /// <param name="shards">分片数量,即数据块最小单元</param>
        /// <returns></returns>
        Task<bool> CreateIndexAsync(string indexName, int shards = 5);
 
        /// <summary>
        /// 封装后的删除 index
        /// </summary>
        /// <param name="indexName"></param>
        /// <returns></returns>
        Task<bool> DeleteIndexAsync(string indexName);
 
        /// <summary>
        /// 插入文档
        /// </summary>
        /// <param name="indexName">索引名称</param>
        /// <param name="typeName">文档名称</param>
        /// <param name="objectDocment">文档内容</param>
        /// <param name="_id">自定义 _id</param>
        /// <returns></returns>
        Task<bool> InsertDocumentAsync(string indexName, string typeName, object objectDocment, string _id = "");
        /// <summary>
        /// 批量插入文档
        /// </summary>
        /// <typeparam name="T">文档格式</typeparam>
        /// <param name="indexName">索引名称</param>
        /// <param name="typeName"></param>
        /// <param name="listDocment">数据集合</param>
        /// <returns></returns>
        Task<bool> InsertListDocumentAsync(string indexName, string typeName, List<object> listDocment);
 
 
        /// <summary>
        /// 删除一个文档
        /// </summary>
        /// <param name="indexName">索引名称</param>
        /// <param name="typeName">类别名称</param>
        /// <param name="_id">elasticsearch 的 id</param>
        /// <returns></returns>
        Task<bool> DeleteDocumentAsync(string indexName, string typeName, string _id);
 
 
        /// <summary>
        /// 更新文档
        /// </summary>
        /// <param name="indexName">索引名称</param>
        /// <param name="typeName">类别名称</param>
        /// <param name="_id">elasticsearch 的 id</param>
        /// <param name="objectDocment">单条数据的所有内容</param>
        /// <returns></returns>
        Task<bool> UpdateDocumentAsync(string indexName, string typeName, string _id, object objectDocment);
 
        /// <summary>
        /// 批量更新文档
        /// </summary>
        /// <param name="indexName">索引名称</param>
        /// <param name="typeName">类别名称</param>
        /// <param name="listDocment">数据集合,注:docment 里要有 _id</param>
        /// <returns></returns>
 
        Task<bool> UpdateListDocumentAsync(string indexName, string typeName,List<object> listDocment);
    }



 

ElasticSearch 的方法实现





    /// <summary>
    ///访问 ElasticSearch 服务类
    /// Monkey 2019-05-16
    /// </summary>
    public class ESSever : IESSever
    {
        /// <summary>
        /// Linq 查询的官方 Client
        /// </summary>
        public IElasticClient ElasticLinqClient { get; set; }
        /// <summary>
        /// Js 查询的官方 Client
        /// </summary>
        public IElasticLowLevelClient ElasticJsonClient { get; set; }
 
        public IMemoryCache memoryCache { get; set; }
 
 
        public ESSever(IConfiguration configuration, IMemoryCache memoryCache_arg)
        {
            memoryCache = memoryCache_arg;
            var uris = configuration["ElasticSearchContext:Url"].Split(new string[] { "," }, StringSplitOptions.RemoveEmptyEntries).ToList().ConvertAll(x => new Uri(x));
            var connectionPool = new StaticConnectionPool(uris);
            var settings = new ConnectionSettings(connectionPool).RequestTimeout(TimeSpan.FromSeconds(30));//.BasicAuthentication("elastic", "n@@W#RJQ$z1#")
            this.ElasticJsonClient = new ElasticLowLevelClient(settings);
            this.ElasticLinqClient = new ElasticClient(settings);
        }
 
 
 
        /// <summary>
        /// 封装后的 linq 的查询方式
        /// </summary>
        /// <typeparam name="T">要查询和返回的 Json</typeparam>
        /// <param name="indexName">index 的名称</param>
        /// <param name="typeName">type 的名称</param>
        /// <param name="selector">linq 内容</param>
        /// <returns></returns>
        public async Task<List<T>> SearchAsync<T>(string indexName, string typeName, Func<QueryContainerDescriptor<T>, QueryContainer> selector = null) where T : class
        {
            var list = await ElasticLinqClient.SearchAsync<T>(option => option.Index(indexName.ToLower()).Type(typeName).Query(selector));
            return list.Documents.ToList();
        }
 
 
        /// <summary>
        /// 封装后的 Json 的查询方式
        /// </summary>
        /// <param name="indexName">index 的名称</param>
        /// <param name="typeName">type 的名称</param>
        /// <param name="jsonString">json 字符串</param>
        /// <returns>返回 Jobject 的内容</returns>
        public async Task<JToken> SearchAsync(string indexName, string typeName, string jsonString)
        {
            var stringRespones = await ElasticJsonClient.SearchAsync<StringResponse>(indexName.ToLower(), typeName, jsonString);
            var jobject = JObject.Parse(stringRespones.Body);
            var total = Convert.ToInt32(jobject["hits"]["total"].ToString());
            if (total > 0)
            {
                string json = string.Empty;
                var sourceArg = jobject["hits"]["hits"];
                foreach (var source in sourceArg)
                {
                    string sourceJson = source["_source"].ToString().Substring(1, source["_source"].ToString().Length - 1);
                    sourceJson = "{ \"_id\":\"" + source["_id"] + "\"," + sourceJson;
                    if (json.Length <= 0)
                        json += sourceJson;
                    else
                        json += "," + sourceJson;
 
 
                }
 
                return JToken.Parse("[" + json + "]");
            }
            return null;
        }
 
 
        /// <summary>
        /// 封装后的创建 index
        /// </summary>
        /// <param name="indexName"></param>
        /// <param name="shards">分片数量,即数据块最小单元</param>
        /// <returns></returns>
        public async Task<bool> CreateIndexAsync(string indexName, int shards = 5)
        {
            var isHaveIndex = await IsIndexExsit(indexName.ToLower());
            if (!isHaveIndex)
            {
                var stringResponse = await ElasticJsonClient.IndicesCreateAsync<StringResponse>(indexName.ToLower(),
                        PostData.String($"{{\"settings\" : {{\"index\" : {{\"number_of_replicas\" : 0, \"number_of_shards\":\"{shards}\",\"refresh_interval\":\"-1\"}}}}}}"));
                var resObj = JObject.Parse(stringResponse.Body);
                if ((bool)resObj["acknowledged"])
                {
                    return true;
                }
            }
            else
            {
                return true;
            }
            return false;
        }
 
 
        /// <summary>
        /// 检测索引是否已经存在
        /// </summary>
        /// <param name="index"></param>
        /// <returns></returns>
        public async Task<bool> IsIndexExsit(string index)
        {
            bool flag = false;
            StringResponse resStr = null;
            try
            {
                resStr = await ElasticJsonClient.IndicesExistsAsync<StringResponse>(index);
                if (resStr.HttpStatusCode == 200)
                {
                    flag = true;
                }
            }
            catch (Exception ex)
            {
            }
 
            return flag;
        }
 
        /// <summary>
        /// 封装后的删除 index
        /// </summary>
        /// <param name="indexName"></param>
        /// <returns></returns>
        public async Task<bool> DeleteIndexAsync(string indexName)
        {
            var stringRespones = await ElasticJsonClient.IndicesDeleteAsync<StringResponse>(indexName.ToLower());
            var resObj = JObject.Parse(stringRespones.Body);
            if ((bool)resObj["acknowledged"])
            {
                return true;
            }
            return false;
        }
        /// <summary>
        /// 插入单个文档
        /// </summary>
        /// <param name="indexName">索引名称</param>
        /// <param name="typeName">文档名称</param>
        /// <param name="objectDocment">文档内容</param>
        /// <param name="_id">自定义 _id</param>
        /// <returns></returns>
        public async Task<bool> InsertDocumentAsync(string indexName, string typeName,  object objectDocment,string _id = "")
        {
            var stringRespones = new StringResponse();
            if (_id.Length > 0)
                stringRespones = await ElasticJsonClient.IndexAsync<StringResponse>(indexName.ToLower(), typeName, _id, PostData.String(JsonConvert.SerializeObject(objectDocment)));
            else
                stringRespones = await ElasticJsonClient.IndexAsync<StringResponse>(indexName.ToLower(), typeName, PostData.String(JsonConvert.SerializeObject(objectDocment)));
            var resObj = JObject.Parse(stringRespones.Body);
            if ((int)resObj["_shards"]["successful"] > 0)
            {
                return true;
            }
            return false;
        }
 
 
        /// <summary>
        /// 优化写入性能
        /// </summary>
        /// <param name="index"></param>
        /// <param name="refresh"></param>
        /// <param name="replia"></param>
        /// <returns></returns>
        public async Task<bool> SetIndexRefreshAndReplia(string index, string refresh = "30s", int replia = 1)
        {
            bool flag = false;
            StringResponse resStr = null;
            try
            {
                if (memoryCache.TryGetValue("isRefreshAndReplia", out bool isrefresh))
                {
                    if (!isrefresh)
                    {
                        resStr = await ElasticJsonClient.IndicesPutSettingsAsync<StringResponse>(index.ToLower(),
                     PostData.String($"{{\"index\" : {{\"number_of_replicas\" : {replia},\"refresh_interval\":\"{refresh}\"}}}}"));
                        var resObj = JObject.Parse(resStr.Body);
                        if ((bool)resObj["acknowledged"])
                        {
                            flag = true;
                            memoryCache.Set("isRefreshAndReplia", true);
                        }
                    }
                }
                else
                {
                    resStr = await ElasticJsonClient.IndicesPutSettingsAsync<StringResponse>(index.ToLower(),
                    PostData.String($"{{\"index\" : {{\"number_of_replicas\" : {replia},\"refresh_interval\":\"{refresh}\"}}}}"));
                    var resObj = JObject.Parse(resStr.Body);
                    if ((bool)resObj["acknowledged"])
                    {
                        flag = true;
                        memoryCache.Set("isRefreshAndReplia", true);
                    }
                }
 
            }
            catch (Exception ex)
            {
            }
            return flag;
        }
 
        /// <summary>
        /// 批量插入文档
        /// </summary>
        /// <param name="indexName">索引名称</param>
        /// <param name="typeName"></param>
        /// <param name="listDocment">数据集合</param>
        /// <returns></returns>
        public async Task<bool> InsertListDocumentAsync(string indexName, string typeName, List<object> listDocment)
        {
            var isRefresh = await SetIndexRefreshAndReplia(indexName.ToLower());
            if (isRefresh)
            {
                List<string> list = new List<string>();
                foreach (var ob in listDocment)
                {
                    //{"index":{"_index":"meterdata","_type":"autoData"}}
                    var indexJsonStr = new { index = new { _index = indexName.ToLower(), _type = typeName } };
                    list.Add(JsonConvert.SerializeObject(indexJsonStr));
                    list.Add(JsonConvert.SerializeObject(ob));
                }
 
                var stringRespones = await ElasticJsonClient.BulkAsync<StringResponse>(indexName.ToLower(), typeName, PostData.MultiJson(list));
                var resObj = JObject.Parse(stringRespones.Body);
                if (!(bool)resObj["errors"])
                {
                    return true;
                }
            }
            return false;
        }
 
        /// <summary>
        /// 删除一个文档
        /// </summary>
        /// <param name="indexName">索引名称</param>
        /// <param name="typeName">类别名称</param>
        /// <param name="_id">elasticsearch 的 id</param>
        /// <returns></returns>
        public async Task<bool> DeleteDocumentAsync(string indexName, string typeName, string _id)
        {
            bool flag = false;
            StringResponse resStr = null;
            try
            {
                resStr = await ElasticJsonClient.DeleteAsync<StringResponse>(indexName.ToLower(), typeName, _id);
                var resObj = JObject.Parse(resStr.Body);
                if ((int)resObj["_shards"]["total"] == 0 || (int)resObj["_shards"]["successful"] > 0)
                {
                    flag = true;
                }
            }
            catch (Exception ex)
            {
            }
 
            return flag;
        }
 
        ///// <summary>
        ///// 更新文档  删除重建法
        ///// </summary>
        ///// <param name="indexName">索引名称</param>
        ///// <param name="typeName">类别名称</param>
        ///// <param name="_id">elasticsearch 的 id</param>
        ///// <param name="objectDocment">单条数据的所有内容</param>
        ///// <returns></returns>
        //public async Task<bool> UpdateDocumentAsync(string indexName, string typeName, string _id, object objectDocment)
        //{
        //    bool flag = false;
        //    try
        //    {
        //        string json = JsonConvert.SerializeObject(objectDocment);
        //        if (json.IndexOf("[") == 0)
        //        {
        //            var objectDocmentOne = JToken.Parse(json);
        //            json = JsonConvert.SerializeObject(objectDocmentOne[0]);
        //            int idInt = json.IndexOf("\"_id");
        //            string idJson = json.Substring(idInt, json.IndexOf(_id) + _id.Length + 1);
        //            json = json.Replace(idJson, "");
        //        }
        //        var isOk = await DeleteDocumentAsync(indexName,typeName,_id);
        //        if (isOk)
        //        {
        //            flag = await InsertDocumentAsync(indexName,typeName, JToken.Parse(json), _id);
        //        }
 
        //    }
        //    catch {}
        //    return flag;
        //}
 
        /// <summary>
        /// 更新文档  
        /// </summary>
        /// <param name="indexName">索引名称</param>
        /// <param name="typeName">类别名称</param>
        /// <param name="_id">elasticsearch 的 id</param>
        /// <param name="objectDocment">单条数据的所有内容</param>
        /// <returns></returns>
        public async Task<bool> UpdateDocumentAsync(string indexName, string typeName, string _id, object objectDocment)
        {
            bool flag = false;
            try
            {
                string json = JsonConvert.SerializeObject(objectDocment);
                if (json.IndexOf("[") == 0)
                {
                    var objectDocmentOne = JToken.Parse(json);
                    json = JsonConvert.SerializeObject(objectDocmentOne[0]);
                }
                int idInt = json.IndexOf("\"_id");
                if (idInt > 0)
                {
                    string idJson = json.Substring(idInt, json.IndexOf(_id) + _id.Length + 1);
                    json = json.Replace(idJson, "");
                }
                //{ "update" : { "_id" : "5cc2d9cf6d2d99ce58007201" } }
                //{ "doc" : { "Sex" : "王五 111" } }
                List<string> list = new List<string>();
                list.Add("{\"update\":{\"_id\":\""+_id+"\"}}");
                list.Add("{\"doc\":"+json+"}");
                var stringRespones = await ElasticJsonClient.BulkAsync<StringResponse>(indexName.ToLower(), typeName, PostData.MultiJson(list));
                var resObj = JObject.Parse(stringRespones.Body);
                if (!(bool)resObj["errors"])
                {
                    return true;
                }
            }
            catch { }
            return flag;
        }
 
        /// <summary>
        /// 批量更新文档
        /// </summary>
        /// <param name="indexName">索引名称</param>
        /// <param name="typeName">类别名称</param>
        /// <param name="listDocment">数据集合,注:docment 里要有 _id,否则更新不进去</param>
        /// <returns></returns>
 
        public async Task<bool> UpdateListDocumentAsync(string indexName, string typeName, List<object> listDocment)
        {
            bool flag = false;
            try
            {
                List<string> list = new List<string>();
                foreach (var objectDocment in listDocment)
                {
                    string json = JsonConvert.SerializeObject(objectDocment);
                    JToken docment = null;
                    var objectDocmentOne = JToken.Parse(json);
                    docment = objectDocmentOne;
                    if (json.IndexOf("[") == 0)
                    {
                        json = JsonConvert.SerializeObject(objectDocmentOne[0]);
                        docment = objectDocmentOne[0];
                    }
                    string _id = docment["_id"].ToString();
                    int idInt = json.IndexOf("\"_id");
                    if (idInt > 0)
                    {
                        string idJson = json.Substring(idInt, json.IndexOf(_id) + _id.Length + 1);
                        json = json.Replace(idJson, "");
                    }
                    //{ "update" : { "_id" : "5cc2d9cf6d2d99ce58007201" } }
                    //{ "doc" : { "Sex" : "王五 111" } }
                    list.Add("{\"update\":{\"_id\":\"" + _id + "\"}}");
                    list.Add("{\"doc\":" + json + "}");
                }
                var stringRespones = await ElasticJsonClient.BulkAsync<StringResponse>(indexName.ToLower(), typeName, PostData.MultiJson(list));
                var resObj = JObject.Parse(stringRespones.Body);
                if (!(bool)resObj["errors"])
                {
                    return true;
                }
            }
            catch { }
            return flag;
        }
 
    }




 


来源:https://blog.csdn.net/qq_38762313/article/details/90474488

 

 

© 版权声明

☆ END ☆
喜欢就点个赞吧
点赞0 分享
图片正在生成中,请稍后...