Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

[comment]: <> (When bumping [pc:VERSION_LATEST_RELEASE] create a new entry below)
### Unreleased version
### 6.2.0
- Add readUnits, upsertedCount, and matchedRecords to ResponseMetadata
- Add List operation to ResponseMetadataInterceptor

### 6.1.0
- Add ResponseMetadataListener for dataplane operation observability

Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
pineconeClientVersion = 6.1.0
pineconeClientVersion = 6.2.0
57 changes: 18 additions & 39 deletions src/integration/java/io/pinecone/clients/ConnectionsMapTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

import static io.pinecone.helpers.TestUtilities.waitUntilIndexIsReady;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class ConnectionsMapTest {
Expand Down Expand Up @@ -78,59 +80,36 @@ public void testMultipleIndexesWithMultipleClients() throws InterruptedException

// Create config2 for getting index connection and set the host
PineconeConfig config2 = new PineconeConfig(System.getenv("PINECONE_API_KEY"));
config1.setHost(host2);
config2.setHost(host2);


// Establish grpc connection for index-1
Index index1_1 = pinecone1.getIndexConnection(indexName1);
// Get connections map
ConcurrentHashMap<String, PineconeConnection> connectionsMap1_1 = pinecone1.getConnectionsMap();
ConcurrentHashMap<String, PineconeConnection> connectionsMap = pinecone1.getConnectionsMap();

// Verify connectionsMap contains only one <indexName, connection> pair i.e. for index1 and its connection
assertEquals(pinecone1.getConnectionsMap().size(), 1);
// Verify the value for index1 by comparing its value with host1 in the connectionsMap
assertEquals(host1, connectionsMap1_1.get(indexName1).toString());
// Verify indexName1 is in the map with the correct host
assertEquals(host1, connectionsMap.get(indexName1).toString());

// Establish grpc connection for index-2
Index index1_2 = pinecone1.getIndexConnection(indexName2);
// Get connections map after establishing second connection
ConcurrentHashMap<String, PineconeConnection> connectionsMap1_2 = pinecone1.getConnectionsMap();

// Verify connectionsMap contains two <indexName, connection> pairs i.e. for index1 and index2
assertEquals(connectionsMap1_2.size(), 2);
// Verify the values by checking host for both indexName1 and indexName2
assertEquals(host1, connectionsMap1_2.get(indexName1).toString());
assertEquals(host2, connectionsMap1_2.get(indexName2).toString());

// Establishing connections with index1 and index2 using another pinecone client
pinecone2.getConnection(indexName1, config1);
ConcurrentHashMap<String, PineconeConnection> connectionsMap2_1 = pinecone1.getConnectionsMap();
// Verify the new connections map is pointing to the same reference
assert connectionsMap2_1 == connectionsMap1_2;
// Verify the size of connections map is still 2 since the connection for index2 was not closed
assertEquals(2, connectionsMap2_1.size());
// Verify the connection value for index1 is host1
assertEquals(host1, connectionsMap2_1.get(indexName1).toString());

pinecone2.getConnection(indexName2, config2);
ConcurrentHashMap<String, PineconeConnection> connectionsMap2_2 = pinecone1.getConnectionsMap();
// Verify the new connections map is pointing to the same reference
assert connectionsMap2_1 == connectionsMap2_2;
// Verify the size of connections map is still 2 since the connections are not closed
assertEquals(2, connectionsMap2_2.size());
// Verify the values by checking host for both indexName1 and indexName2
assertEquals(host1, connectionsMap2_2.get(indexName1).toString());
assertEquals(host2, connectionsMap2_2.get(indexName2).toString());

// Verify both indexes are now in the map with the correct hosts
assertEquals(host1, connectionsMap.get(indexName1).toString());
assertEquals(host2, connectionsMap.get(indexName2).toString());

// Connecting a second client to the same indexes should reuse the existing map entries
assertSame(connectionsMap, pinecone2.getConnectionsMap());
assertSame(connectionsMap.get(indexName1), pinecone2.getConnection(indexName1, config1));
assertSame(connectionsMap.get(indexName2), pinecone2.getConnection(indexName2, config2));

// Close the connections
index1_1.close();
index1_2.close();

// Verify the map size is now 0
assertEquals(connectionsMap1_1.size(), 0);
assertEquals(connectionsMap1_2.size(), 0);
assertEquals(connectionsMap2_1.size(), 0);
assertEquals(connectionsMap2_2.size(), 0);
// Verify the specific entries for this test's indexes were removed
assertFalse(connectionsMap.containsKey(indexName1));
assertFalse(connectionsMap.containsKey(indexName2));

// Delete the indexes
pinecone1.deleteIndex(indexName1);
Expand Down
14 changes: 7 additions & 7 deletions src/integration/java/io/pinecone/helpers/AssertRetry.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,15 @@

import io.pinecone.exceptions.PineconeException;

import java.io.IOException;
import java.util.concurrent.ExecutionException;

public class AssertRetry {
private static final int maxRetry = 4;
private static final int delay = 1500;

public static void assertWithRetry(AssertionRunnable assertionRunnable) throws InterruptedException, PineconeException {
public static void assertWithRetry(AssertionRunnable assertionRunnable) throws Exception {
assertWithRetry(assertionRunnable, 2);
}

public static void assertWithRetry(AssertionRunnable assertionRunnable, int backOff) throws AssertionError, InterruptedException {
public static void assertWithRetry(AssertionRunnable assertionRunnable, int backOff) throws Exception {
int retryCount = 0;
int delayCount = delay;
boolean success = false;
Expand All @@ -23,7 +20,10 @@ public static void assertWithRetry(AssertionRunnable assertionRunnable, int back
try {
assertionRunnable.run();
success = true;
} catch (AssertionError | ExecutionException | IOException e) {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw e;
} catch (AssertionError | Exception e) {
errorMessage = e.getLocalizedMessage();
retryCount++;
delayCount*=backOff;
Expand All @@ -38,6 +38,6 @@ public static void assertWithRetry(AssertionRunnable assertionRunnable, int back

@FunctionalInterface
public interface AssertionRunnable {
void run() throws AssertionError, ExecutionException, InterruptedException, IOException, PineconeException;
void run() throws Exception;
}
}
76 changes: 58 additions & 18 deletions src/integration/java/io/pinecone/helpers/TestResourcesManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
*/
public class TestResourcesManager {
private static final Logger logger = LoggerFactory.getLogger(TestUtilities.class);
private static TestResourcesManager instance;
private static volatile TestResourcesManager instance;
private static final String apiKey = System.getenv("PINECONE_API_KEY");
private final int dimension = System.getenv("DIMENSION") == null
? 4
Expand All @@ -66,6 +66,10 @@ public class TestResourcesManager {
private String serverlessIndexName;
private String collectionName;
private CollectionModel collectionModel;
private Index serverlessIndexConnection;
private AsyncIndex serverlessAsyncIndexConnection;
private Index podIndexConnection;
private AsyncIndex podAsyncIndexConnection;
private final List<String> vectorIdsForDefaultNamespace = Arrays.asList("def-id1", "def-id2", "def-prefix-id3", "def-prefix-id4");
private final List<String> vectorIdsForCustomNamespace = Arrays.asList("cus-id1", "cus-id2", "cus-prefix-id3", "cus" +
"-prefix-id4");
Expand All @@ -85,7 +89,7 @@ private TestResourcesManager() {
*
* @return the {@link TestResourcesManager} instance.
*/
public static TestResourcesManager getInstance() {
public static synchronized TestResourcesManager getInstance() {
if (instance == null) {
instance = new TestResourcesManager();
}
Expand Down Expand Up @@ -185,8 +189,22 @@ public List<String> getVectorIdsFromCustomNamespace() {
*
* @return a {@link Index} connection to the serverless index.
*/
public Index getOrCreateServerlessIndexConnection() throws InterruptedException {
return getInstance().pineconeClient.getIndexConnection(getOrCreateServerlessIndex());
/**
* Gets the host for the shared serverless index. Creates the index first if it doesn't exist.
* Used by tests that need to construct their own PineconeConnection (e.g. to attach a custom listener)
* without going through the shared static connection cache.
*
* @return the host URL of the shared serverless index.
*/
public synchronized String getOrCreateServerlessIndexHost() throws InterruptedException {
return pineconeClient.describeIndex(getOrCreateServerlessIndex()).getHost();
}

public synchronized Index getOrCreateServerlessIndexConnection() throws InterruptedException {
if (serverlessIndexConnection == null) {
serverlessIndexConnection = pineconeClient.getIndexConnection(getOrCreateServerlessIndex());
}
return serverlessIndexConnection;
}

/**
Expand All @@ -195,8 +213,11 @@ public Index getOrCreateServerlessIndexConnection() throws InterruptedException
*
* @return a {@link AsyncIndex} connection to the serverless index.
*/
public AsyncIndex getOrCreateServerlessAsyncIndexConnection() throws InterruptedException {
return getInstance().pineconeClient.getAsyncIndexConnection(getOrCreateServerlessIndex());
public synchronized AsyncIndex getOrCreateServerlessAsyncIndexConnection() throws InterruptedException {
if (serverlessAsyncIndexConnection == null) {
serverlessAsyncIndexConnection = pineconeClient.getAsyncIndexConnection(getOrCreateServerlessIndex());
}
return serverlessAsyncIndexConnection;
}

/**
Expand All @@ -205,8 +226,11 @@ public AsyncIndex getOrCreateServerlessAsyncIndexConnection() throws Interrupted
*
* @return a {@link Index} connection to the pod index.
*/
public Index getOrCreatePodIndexConnection() throws InterruptedException {
return getInstance().pineconeClient.getIndexConnection(getOrCreatePodIndex());
public synchronized Index getOrCreatePodIndexConnection() throws InterruptedException {
if (podIndexConnection == null) {
podIndexConnection = pineconeClient.getIndexConnection(getOrCreatePodIndex());
}
return podIndexConnection;
}

/**
Expand All @@ -215,8 +239,11 @@ public Index getOrCreatePodIndexConnection() throws InterruptedException {
*
* @return a {@link AsyncIndex} connection to the pod index.
*/
public AsyncIndex getOrCreatePodAsyncIndexConnection() throws InterruptedException {
return getInstance().pineconeClient.getAsyncIndexConnection(getOrCreatePodIndex());
public synchronized AsyncIndex getOrCreatePodAsyncIndexConnection() throws InterruptedException {
if (podAsyncIndexConnection == null) {
podAsyncIndexConnection = pineconeClient.getAsyncIndexConnection(getOrCreatePodIndex());
}
return podAsyncIndexConnection;
}

/**
Expand All @@ -225,7 +252,7 @@ public AsyncIndex getOrCreatePodAsyncIndexConnection() throws InterruptedExcepti
*
* @return the {@link IndexModel} of the pod index.
*/
public IndexModel getOrCreatePodIndexModel() throws InterruptedException {
public synchronized IndexModel getOrCreatePodIndexModel() throws InterruptedException {
podIndexModel = pineconeClient.describeIndex(getOrCreatePodIndex());
return podIndexModel;
}
Expand All @@ -236,7 +263,7 @@ public IndexModel getOrCreatePodIndexModel() throws InterruptedException {
*
* @return the {@link IndexModel} of the serverless index.
*/
public IndexModel getOrCreateServerlessIndexModel() throws InterruptedException {
public synchronized IndexModel getOrCreateServerlessIndexModel() throws InterruptedException {
serverlessIndexModel = pineconeClient.describeIndex(getOrCreateServerlessIndex());
return serverlessIndexModel;
}
Expand All @@ -247,7 +274,7 @@ public IndexModel getOrCreateServerlessIndexModel() throws InterruptedException
*
* @return the {@link CollectionModel} of the serverless index.
*/
public CollectionModel getOrCreateCollectionModel() throws InterruptedException {
public synchronized CollectionModel getOrCreateCollectionModel() throws InterruptedException {
collectionModel = pineconeClient.describeCollection(getOrCreateCollection());
return collectionModel;
}
Expand All @@ -257,6 +284,19 @@ public CollectionModel getOrCreateCollectionModel() throws InterruptedException
* after all tests have finished running.
*/
public void cleanupResources() {
if (serverlessIndexConnection != null) {
serverlessIndexConnection.close();
}
if (serverlessAsyncIndexConnection != null) {
serverlessAsyncIndexConnection.close();
}
if (podIndexConnection != null) {
podIndexConnection.close();
}
if (podAsyncIndexConnection != null) {
podAsyncIndexConnection.close();
}

if (podIndexName != null) {
pineconeClient.deleteIndex(podIndexName);
}
Expand All @@ -280,7 +320,7 @@ public void cleanupResources() {
* @throws InterruptedException if the thread is interrupted while waiting for the index to be ready.
* @throws PineconeException if the API encounters an error during index creation or if any of the arguments are invalid.
*/
public String getOrCreatePodIndex() throws InterruptedException, PineconeException {
public synchronized String getOrCreatePodIndex() throws InterruptedException, PineconeException {
if (podIndexName != null) {
return podIndexName;
}
Expand Down Expand Up @@ -311,7 +351,7 @@ public String getOrCreatePodIndex() throws InterruptedException, PineconeExcepti
* @throws InterruptedException if the thread is interrupted while waiting for the index to be ready.
* @throws PineconeException if the API encounters an error during index creation or if any of the arguments are invalid.
*/
public String getOrCreateServerlessIndex() throws InterruptedException, PineconeException {
public synchronized String getOrCreateServerlessIndex() throws InterruptedException, PineconeException {
if (this.serverlessIndexName != null) {
return this.serverlessIndexName;
}
Expand Down Expand Up @@ -344,7 +384,7 @@ public String getOrCreateServerlessIndex() throws InterruptedException, Pinecone
* @throws InterruptedException if the thread is interrupted while waiting for the collection to be ready.
* @throws PineconeException if the API encounters an error during collection creation.
*/
public String getOrCreateCollection() throws InterruptedException, PineconeException {
public synchronized String getOrCreateCollection() throws InterruptedException, PineconeException {
if (collectionName != null) {
return collectionName;
}
Expand All @@ -359,13 +399,13 @@ public String getOrCreateCollection() throws InterruptedException, PineconeExcep
// Wait until collection is ready
int timeWaited = 0;
String collectionReady = collectionModel.getStatus().toLowerCase();
while (collectionReady != "ready" && timeWaited < 120000) {
while (!"ready".equals(collectionReady) && timeWaited < 120000) {
logger.info("Waiting for collection " + collectionName + " to be ready. Waited " + timeWaited + " " +
"milliseconds...");
Thread.sleep(5000);
timeWaited += 5000;
collectionModel = pineconeClient.describeCollection(collectionName);
collectionReady = collectionModel.getStatus();
collectionReady = collectionModel.getStatus().toLowerCase();
}

if (timeWaited > 120000) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,23 +41,23 @@ public class CollectionTest {
private static String namespace;

@BeforeAll
public static void setUp() throws InterruptedException {
public static void setUp() throws Exception {
indexName = indexManager.getOrCreatePodIndex();
collectionName = indexManager.getOrCreateCollection();
collection = indexManager.getOrCreateCollectionModel();
namespace = indexManager.getDefaultNamespace();
}

@AfterAll
public static void cleanUp() throws InterruptedException {
public static void cleanUp() throws Exception {
// Clean up indexes
for (String index : indexesToCleanUp) {
pineconeClient.deleteIndex(index);
}
}

@Test
public void testIndexFromCollectionHappyPath() throws InterruptedException {
public void testIndexFromCollectionHappyPath() throws Exception {
// Verify collection is listed
List<CollectionModel> collectionList = pineconeClient.listCollections().getCollections();
boolean collectionFound = false;
Expand Down Expand Up @@ -115,7 +115,7 @@ public void testIndexFromCollectionHappyPath() throws InterruptedException {
}

@Test
public void testCreateIndexFromCollectionWithDiffMetric() throws InterruptedException {
public void testCreateIndexFromCollectionWithDiffMetric() throws Exception {
// Use a different metric than the source index
String[] metrics = {
"cosine",
Expand Down Expand Up @@ -232,7 +232,7 @@ public void testCreateIndexWithMismatchedDimension() {
}

@Test
public void testCreateCollectionFromNotReadyIndex() throws InterruptedException {
public void testCreateCollectionFromNotReadyIndex() throws Exception {
String notReadyIndexName = RandomStringBuilder.build("from-coll", 8);
String newCollectionName = RandomStringBuilder.build("coll-", 8);
try {
Expand Down
Loading
Loading