milvus-io_bootcamp

Форк
0
94 строки · 3.2 Кб
1
import io.milvus.client.{MilvusClient, MilvusServiceClient}
2
import io.milvus.grpc.DataType
3
import io.milvus.grpc.{DataType, FlushResponse}
4
import io.milvus.param.collection.{CreateCollectionParam, FieldType, FlushParam}
5
import io.milvus.param.{ConnectParam, R, RpcStatus}
6
import org.apache.spark.SparkConf
7
import org.apache.spark.sql.types._
8
import org.apache.spark.sql.{SaveMode, SparkSession}
9
import org.slf4j.LoggerFactory
10
import zilliztech.spark.milvus.MilvusOptions.{MILVUS_COLLECTION_NAME, MILVUS_HOST, MILVUS_PORT, MILVUS_TOKEN, MILVUS_URI}
11

12
import java.util
13

14
val sparkConf = new SparkConf().setMaster("local")
15
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
16
// Fill in user's Zilliz Cloud credentials.
17
val uri = "https://in01-xxxxxxxxxxxx.aws-us-west-2.vectordb.zillizcloud.com:19535"
18
val token = "db_admin:xxxx"
19
// Specify the target Zilliz Cloud vector database collection name.
20
val collectionName = "databricks_milvus_insert_demo"
21
// This file simulates a dataframe from user's vector generation job or a Delta table that contains vectors.
22
val filePath = "/Volumes/zilliz_test/default/sample_vectors/dim32_1k.json"
23

24
// 1. Create Zilliz Cloud vector db collection through SDK, and define the schema of the collection.
25
val connectParam: ConnectParam = ConnectParam.newBuilder
26
  .withUri(uri)
27
  .withToken(token)
28
  .build
29

30
val client: MilvusClient = new MilvusServiceClient(connectParam)
31

32
val field1Name: String = "id_field"
33
val field2Name: String = "str_field"
34
val field3Name: String = "float_vector_field"
35
val fieldsSchema: util.List[FieldType] = new util.ArrayList[FieldType]
36

37
fieldsSchema.add(FieldType.newBuilder
38
  .withPrimaryKey(true)
39
  .withAutoID(false)
40
  .withDataType(DataType.Int64)
41
  .withName(field1Name)
42
  .build
43
)
44
fieldsSchema.add(FieldType.newBuilder
45
  .withDataType(DataType.VarChar)
46
  .withName(field2Name)
47
  .withMaxLength(65535)
48
  .build
49
)
50
fieldsSchema.add(FieldType.newBuilder
51
  .withDataType(DataType.FloatVector)
52
  .withName(field3Name)
53
  .withDimension(32)
54
  .build
55
)
56

57
// create collection
58
val createParam: CreateCollectionParam = CreateCollectionParam.newBuilder
59
  .withCollectionName(collectionName)
60
  .withFieldTypes(fieldsSchema)
61
  .build
62

63
val createR: R[RpcStatus] = client.createCollection(createParam)
64

65
// log.info(s"create collection ${collectionName} resp: ${createR.toString}")
66

67
// 2. Read data from file to build vector dataframe. The schema of the dataframe must logically match the schema of vector db.
68
val df = spark.read
69
  .schema(new StructType()
70
    .add(field1Name, IntegerType)
71
    .add(field2Name, StringType)
72
    .add(field3Name, ArrayType(FloatType), false))
73
  .json(filePath)
74

75
// 3. Configure output target
76
val milvusOptions = Map(
77
  MILVUS_URI -> uri,
78
  MILVUS_TOKEN -> token,
79
  MILVUS_COLLECTION_NAME -> collectionName,
80
)
81

82
// 4. Insert data to Zilliz Cloud vector db collection
83
df.write
84
  .options(milvusOptions)
85
  .format("milvus")
86
  .mode(SaveMode.Append)
87
  .save()
88

89
// flush data (The following implementation will insert the vector data row by row through Milvus SDK Insert API)
90
val flushParam: FlushParam = FlushParam.newBuilder
91
  .addCollectionName(collectionName)
92
  .build
93
val flushR: R[FlushResponse] = client.flush(flushParam)
94
println(flushR)

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.