milvus-io_bootcamp
94 строки · 3.2 Кб
1import io.milvus.client.{MilvusClient, MilvusServiceClient}
2import io.milvus.grpc.DataType
3import io.milvus.grpc.{DataType, FlushResponse}
4import io.milvus.param.collection.{CreateCollectionParam, FieldType, FlushParam}
5import io.milvus.param.{ConnectParam, R, RpcStatus}
6import org.apache.spark.SparkConf
7import org.apache.spark.sql.types._
8import org.apache.spark.sql.{SaveMode, SparkSession}
9import org.slf4j.LoggerFactory
10import zilliztech.spark.milvus.MilvusOptions.{MILVUS_COLLECTION_NAME, MILVUS_HOST, MILVUS_PORT, MILVUS_TOKEN, MILVUS_URI}
11
12import java.util
13
14val sparkConf = new SparkConf().setMaster("local")
15val spark = SparkSession.builder().config(sparkConf).getOrCreate()
16// Fill in user's Zilliz Cloud credentials.
17val uri = "https://in01-xxxxxxxxxxxx.aws-us-west-2.vectordb.zillizcloud.com:19535"
18val token = "db_admin:xxxx"
19// Specify the target Zilliz Cloud vector database collection name.
20val collectionName = "databricks_milvus_insert_demo"
21// This file simulates a dataframe from user's vector generation job or a Delta table that contains vectors.
22val filePath = "/Volumes/zilliz_test/default/sample_vectors/dim32_1k.json"
23
24// 1. Create Zilliz Cloud vector db collection through SDK, and define the schema of the collection.
25val connectParam: ConnectParam = ConnectParam.newBuilder
26.withUri(uri)
27.withToken(token)
28.build
29
30val client: MilvusClient = new MilvusServiceClient(connectParam)
31
32val field1Name: String = "id_field"
33val field2Name: String = "str_field"
34val field3Name: String = "float_vector_field"
35val fieldsSchema: util.List[FieldType] = new util.ArrayList[FieldType]
36
37fieldsSchema.add(FieldType.newBuilder
38.withPrimaryKey(true)
39.withAutoID(false)
40.withDataType(DataType.Int64)
41.withName(field1Name)
42.build
43)
44fieldsSchema.add(FieldType.newBuilder
45.withDataType(DataType.VarChar)
46.withName(field2Name)
47.withMaxLength(65535)
48.build
49)
50fieldsSchema.add(FieldType.newBuilder
51.withDataType(DataType.FloatVector)
52.withName(field3Name)
53.withDimension(32)
54.build
55)
56
57// create collection
58val createParam: CreateCollectionParam = CreateCollectionParam.newBuilder
59.withCollectionName(collectionName)
60.withFieldTypes(fieldsSchema)
61.build
62
63val createR: R[RpcStatus] = client.createCollection(createParam)
64
65// log.info(s"create collection ${collectionName} resp: ${createR.toString}")
66
67// 2. Read data from file to build vector dataframe. The schema of the dataframe must logically match the schema of vector db.
68val df = spark.read
69.schema(new StructType()
70.add(field1Name, IntegerType)
71.add(field2Name, StringType)
72.add(field3Name, ArrayType(FloatType), false))
73.json(filePath)
74
75// 3. Configure output target
76val milvusOptions = Map(
77MILVUS_URI -> uri,
78MILVUS_TOKEN -> token,
79MILVUS_COLLECTION_NAME -> collectionName,
80)
81
82// 4. Insert data to Zilliz Cloud vector db collection
83df.write
84.options(milvusOptions)
85.format("milvus")
86.mode(SaveMode.Append)
87.save()
88
89// flush data (The following implementation will insert the vector data row by row through Milvus SDK Insert API)
90val flushParam: FlushParam = FlushParam.newBuilder
91.addCollectionName(collectionName)
92.build
93val flushR: R[FlushResponse] = client.flush(flushParam)
94println(flushR)