mysql批量导入数据至elasticsearch(java)

一、mybatis.xml配置

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE configuration
    PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
    "http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration>
<environments default="development">
    <environment id="development">
        <transactionManager type="JDBC"/>
        <dataSource type="POOLED">
            <property name="driver" value="com.mysql.jdbc.Driver"/>
            <property name="url" value="jdbc:mysql://127.0.0.1:3306/qingcheng_goods?characterEncoding=UTF8"/>
            <property name="username" value="root"/>
            <property name="password" value="root"/>
        </dataSource>
    </environment>
</environments>

<mappers>
    <!--注册映射文件-->
    <mapper resource="com/hpu/dao/SkuMapper.xml"/>
</mappers>    
</configuration>

二、读取mybatis.xml配置工具类

import org.apache.ibatis.io.Resources;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.ibatis.session.SqlSessionFactoryBuilder;

import java.io.IOException;
import java.io.InputStream;

public class MyBatisUtil {
private static volatile SqlSessionFactory sqlSessionFactory;

public static SqlSession getSqlSession() {
    try {
        if (sqlSessionFactory == null) {
            //读取主配置文件
            InputStream input = Resources.getResourceAsStream("mybatis.xml");
            synchronized (MyBatisUtil.class) {
                if (sqlSessionFactory == null){
                    sqlSessionFactory = new SqlSessionFactoryBuilder().build(input);
                }
            }

        }
    } catch (IOException e) {
        e.printStackTrace();
    }

    return sqlSessionFactory.openSession();
}

}

三、接口+Sku

 public interface SkuDao {
    Sku findById(String id);
    List<Sku> findAll();
}

public class Sku {
private String id;//商品id

private String sn;//商品条码

private String name;//SKU名称

private Integer price;//价格(分)

private Integer num;//库存数量

private Integer alertNum;//库存预警数量

private String image;//商品图片

private String images;//商品图片列表

private Integer weight;//重量(克)

private java.util.Date createTime;//创建时间

private java.util.Date updateTime;//更新时间

private String spuId;//SPUID

private Integer categoryId;//类目ID

private String categoryName;//类目名称

private String brandName;//品牌名称

private String spec;//规格

private Integer saleNum;//销量

private Integer commentNum;//评论数

private String status;//商品状态 1-正常,2-下架,3-删除

public String getId() {
    return id;
}
public void setId(String id) {
    this.id = id;
}

public String getSn() {
    return sn;
}
public void setSn(String sn) {
    this.sn = sn;
}

public String getName() {
    return name;
}
public void setName(String name) {
    this.name = name;
}

public Integer getPrice() {
    return price;
}
public void setPrice(Integer price) {
    this.price = price;
}

public Integer getNum() {
    return num;
}
public void setNum(Integer num) {
    this.num = num;
}

public Integer getAlertNum() {
    return alertNum;
}
public void setAlertNum(Integer alertNum) {
    this.alertNum = alertNum;
}

public String getImage() {
    return image;
}
public void setImage(String image) {
    this.image = image;
}

public String getImages() {
    return images;
}
public void setImages(String images) {
    this.images = images;
}

public Integer getWeight() {
    return weight;
}
public void setWeight(Integer weight) {
    this.weight = weight;
}

public java.util.Date getCreateTime() {
    return createTime;
}
public void setCreateTime(java.util.Date createTime) {
    this.createTime = createTime;
}

public java.util.Date getUpdateTime() {
    return updateTime;
}
public void setUpdateTime(java.util.Date updateTime) {
    this.updateTime = updateTime;
}

public String getSpuId() {
    return spuId;
}
public void setSpuId(String spuId) {
    this.spuId = spuId;
}

public Integer getCategoryId() {
    return categoryId;
}
public void setCategoryId(Integer categoryId) {
    this.categoryId = categoryId;
}

public String getCategoryName() {
    return categoryName;
}
public void setCategoryName(String categoryName) {
    this.categoryName = categoryName;
}

public String getBrandName() {
    return brandName;
}
public void setBrandName(String brandName) {
    this.brandName = brandName;
}

public String getSpec() {
    return spec;
}
public void setSpec(String spec) {
    this.spec = spec;
}

public Integer getSaleNum() {
    return saleNum;
}
public void setSaleNum(Integer saleNum) {
    this.saleNum = saleNum;
}

public Integer getCommentNum() {
    return commentNum;
}
public void setCommentNum(Integer commentNum) {
    this.commentNum = commentNum;
}

public String getStatus() {
    return status;
}
public void setStatus(String status) {
    this.status = status;
}

@Override
public String toString() {
    return "Sku{" +
            "id='" + id + '\'' +
            ", sn='" + sn + '\'' +
            ", name='" + name + '\'' +
            ", price=" + price +
            ", num=" + num +
            ", alertNum=" + alertNum +
            ", image='" + image + '\'' +
            ", images='" + images + '\'' +
            ", weight=" + weight +
            ", createTime=" + createTime +
            ", updateTime=" + updateTime +
            ", spuId='" + spuId + '\'' +
            ", categoryId=" + categoryId +
            ", categoryName='" + categoryName + '\'' +
            ", brandName='" + brandName + '\'' +
            ", spec='" + spec + '\'' +
            ", saleNum=" + saleNum +
            ", commentNum=" + commentNum +
            ", status='" + status + '\'' +
            '}';
}

}

四、导入数据类

import com.alibaba.fastjson.JSON;
import com.hpu.bean.Sku;
import com.hpu.dao.SkuDao;
import org.apache.http.HttpHost;
import org.apache.ibatis.io.Resources;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.ibatis.session.SqlSessionFactoryBuilder;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.junit.Test;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class SkuDaoImpl implements SkuDao{
    private SqlSession sqlSession;
    public Sku findById(String id) {
        Sku sku = null;
        try {
            //读取主配置文件
            InputStream input = Resources.getResourceAsStream("mybatis.xml");
            //创建SqlSessionFactory对象
            SqlSessionFactory sessionFactory = new SqlSessionFactoryBuilder().build(input);
            //创建SqlSession对象
            sqlSession = sessionFactory.openSession();
            //新增数据操作
            sku = sqlSession.selectOne("findById",id);
            //提交SqlSession
            sqlSession.commit();

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (sqlSession != null){
                sqlSession.close();
            }
        }
        return sku;
    }

    public List<Sku> findAll() {
        List<Sku> skuList = null;

        try {
            //读取主配置文件
            InputStream input = Resources.getResourceAsStream("mybatis.xml");
            //创建SqlSessionFactory对象
            SqlSessionFactory sessionFactory = new SqlSessionFactoryBuilder().build(input);
            //创建SqlSession对象
            sqlSession = sessionFactory.openSession();
            //新增数据操作
            skuList = sqlSession.selectList("findAll");
            //提交SqlSession
            sqlSession.commit();

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (sqlSession != null){
                sqlSession.close();
            }
        }
        return skuList;
    }

    @Test
    public void fun(){
   //     List<Sku> skuList = findAll();
   //     System.out.println(skuList.get(0));
       importSku();

    }

    public   void importSku() {
        List<Sku> skuList = findAll();

        //1.连解rest接口

        HttpHost http = new HttpHost("127.0.0.1", 9200, "http");
        RestClientBuilder builder = RestClient.builder(http);
        RestHighLevelClient restHighLevelClient = new RestHighLevelClient(builder);

        //2.封装请求对象
        BulkRequest bulkRequest = new BulkRequest();
        for (Sku sku : skuList) {
            IndexRequest indexRequest = new IndexRequest("test", "typedoc", sku.getId());
            Map<String, Object> skuMap = new HashMap();

            skuMap.put("name", sku.getName());
            skuMap.put("brandName", sku.getBrandName());
            skuMap.put("categoryName", sku.getCategoryName());
            skuMap.put("price", sku.getPrice());
            skuMap.put("createTime", sku.getCreateTime());
            skuMap.put("saleNum", sku.getSaleNum());
            skuMap.put("image",sku.getImage());
            skuMap.put("commentNum", sku.getCommentNum());
            Map<String, Object> spec = JSON.parseObject(sku.getSpec());
            skuMap.put("spec", spec);

            indexRequest.source(skuMap);
            bulkRequest.add(indexRequest);
        }
        //3.获取响应结果
        int status = 0;
        try {
            BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest);
            status = bulkResponse.status().getStatus();
            System.out.println("status:"+status);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                restHighLevelClient.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }
    }

注:引入pom依赖配置

 <dependencies>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>6.2.2</version>
        </dependency>
        <dependency>
            <groupId>org.mybatis</groupId>
            <artifactId>mybatis</artifactId>
            <version>3.4.6</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.46</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>RELEASE</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
    </dependencies>

    <build>
        <resources>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.xml</include>
                </includes>
            </resource>
        </resources>
    </build>

查询数据#参照

Last modification:December 6th, 2019 at 06:51 pm

Leave a Comment