Skip to content

Commit

Permalink
Added PostgreSQL support
Browse files Browse the repository at this point in the history
  • Loading branch information
JorgeCandeias committed Dec 8, 2024
1 parent b96e9dd commit b6b2159
Show file tree
Hide file tree
Showing 4 changed files with 307 additions and 10 deletions.
272 changes: 272 additions & 0 deletions src/AdoNet/Orleans.GrainDirectory.AdoNet/PostgreSQL-GrainDirectory.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
/*
Orleans Grain Directory.
This tables stores the location of all grains in the cluster.
The rationale for this table is as follows:
1. The table will see rows inserted individually, as new grains are added.
2. The table will see rows deleted at random as grains are deactivated, without regard for order.
3. Insert/Delete churn is expected to be very high.
4. The GrainId is too large to be indexed by SQL Server.
Given the above, the table cannot be a clustered index.
Not only is the GrainId too large to index directly, the expected insert/delete churn would cause fragmentation to the point of rendering the directory unusable.
Therefore the design choice is to use a heap table with a non-clustered index on the stable hash of the grain key.
Uniqueness is then guaranteed by careful use of locks on the hash index.
*/
CREATE TABLE OrleansGrainDirectory
(
/* Identifies the cluster instance */
ClusterId VARCHAR(150) NOT NULL,

/* Identifies the grain directory provider */
ProviderId VARCHAR(150) NOT NULL,

/* Holds the hash of the grain id */
GrainIdHash INT NOT NULL,

/* Holds the grain id in text form */
GrainId TEXT NOT NULL,

/* Holds the silo address where the grain is located */
SiloAddress VARCHAR(100) NOT NULL,

/* Holds the activation id in the silo where it is located */
ActivationId VARCHAR(100) NOT NULL,

/* Holds the time at which the grain was added to the directory */
CreatedOn TIMESTAMPTZ NOT NULL
);

/*
This index is a workaround for the GrainId being too large to index by SQL Server.
Instead we index a stable hash of the GrainId.
Collisions are possible yet handled by careful use of locks on this index.
*/
CREATE INDEX IX_OrleansGrainDirectory_Lookup
ON OrleansGrainDirectory
(
ClusterId ASC,
ProviderId ASC,
GrainIdHash ASC
)
INCLUDE
(
GrainId,
SiloAddress,
ActivationId,
CreatedOn
);

/* Registers a new grain activation */
/* Registers a new grain activation */
CREATE OR REPLACE FUNCTION RegisterGrainActivation(
_ClusterId VARCHAR(150),
_ProviderId VARCHAR(150),
_GrainIdHash INT,
_GrainId TEXT,
_SiloAddress VARCHAR(100),
_ActivationId VARCHAR(100)
)
RETURNS TABLE
(
ClusterId VARCHAR(150),
ProviderId VARCHAR(150),
GrainId TEXT,
SiloAddress VARCHAR(100),
ActivationId VARCHAR(100)
)
LANGUAGE plpgsql
AS $$
#VARIABLE_CONFLICT USE_COLUMN
DECLARE
_Now TIMESTAMPTZ := NOW();
BEGIN

-- this is required to prevent both duplication and deadlocks
LOCK TABLE OrleansGrainDirectory IN EXCLUSIVE MODE;

MERGE INTO OrleansGrainDirectory AS Target
USING (SELECT _ClusterId, _ProviderId, _GrainIdHash, _GrainId, _SiloAddress, _ActivationId, _Now) AS Source
ON
Target.ClusterId = Source._ClusterId
AND Target.ProviderId = Source._ProviderId
AND Target.GrainIdHash = Source._GrainIdHash
AND Target.GrainId = Source._GrainId
WHEN NOT MATCHED THEN
INSERT
(
ClusterId,
ProviderId,
GrainIdHash,
GrainId,
SiloAddress,
ActivationId,
CreatedOn
)
VALUES
(
Source._ClusterId,
Source._ProviderId,
Source._GrainIdHash,
Source._GrainId,
Source._SiloAddress,
Source._ActivationId,
Source._Now
);

RETURN QUERY
SELECT
ClusterId,
ProviderId,
GrainId,
SiloAddress,
ActivationId
FROM OrleansGrainDirectory
WHERE ClusterId = _ClusterId
AND ProviderId = _ProviderId
AND GrainIdHash = _GrainIdHash
AND GrainId = _GrainId;

END;
$$;

INSERT INTO OrleansQuery
(
QueryKey,
QueryText
)
SELECT
'RegisterGrainActivationKey',
'SELECT * FROM RegisterGrainActivation (@ClusterId, @ProviderId, @GrainIdHash, @GrainId, @SiloAddress, @ActivationId)'
;

/* Unregisters an existing grain activation */
CREATE OR REPLACE FUNCTION UnregisterGrainActivation(
_ClusterId VARCHAR(150),
_ProviderId VARCHAR(150),
_GrainIdHash INT,
_GrainId TEXT,
_ActivationId VARCHAR(100)
)
RETURNS INT
LANGUAGE plpgsql
AS $$
DECLARE
_RowCount INT;
BEGIN

-- this is required to prevent both duplication and deadlocks
LOCK TABLE OrleansGrainDirectory IN EXCLUSIVE MODE;

DELETE FROM OrleansGrainDirectory
WHERE ClusterId = _ClusterId
AND ProviderId = _ProviderId
AND GrainIdHash = _GrainIdHash
AND GrainId = _GrainId
AND ActivationId = _ActivationId;

GET DIAGNOSTICS _RowCount = ROW_COUNT;

RETURN _RowCount;

END;
$$;

INSERT INTO OrleansQuery
(
QueryKey,
QueryText
)
SELECT
'UnregisterGrainActivationKey',
'SELECT * FROM UnregisterGrainActivation (@ClusterId, @ProviderId, @GrainIdHash, @GrainId, @ActivationId)'
;

/* Looks up an existing grain activation */
CREATE OR REPLACE FUNCTION LookupGrainActivation(
_ClusterId VARCHAR(150),
_ProviderId VARCHAR(150),
_GrainIdHash INT,
_GrainId TEXT
)
RETURNS TABLE
(
ClusterId VARCHAR(150),
ProviderId VARCHAR(150),
GrainId TEXT,
SiloAddress VARCHAR(100),
ActivationId VARCHAR(100)
)
LANGUAGE plpgsql
AS $$
#VARIABLE_CONFLICT USE_COLUMN
BEGIN

RETURN QUERY
SELECT
ClusterId,
ProviderId,
GrainId,
SiloAddress,
ActivationId
FROM
OrleansGrainDirectory
WHERE
ClusterId = _ClusterId
AND ProviderId = _ProviderId
AND GrainIdHash = _GrainIdHash
AND GrainId = _GrainId;

END;
$$;

INSERT INTO OrleansQuery
(
QueryKey,
QueryText
)
SELECT
'LookupGrainActivationKey',
'SELECT * FROM LookupGrainActivation(@ClusterId, @ProviderId, @GrainIdHash, @GrainId)'
;

/* Unregisters all grain activations in the specified silos */
CREATE OR REPLACE FUNCTION UnregisterGrainActivations(
_ClusterId VARCHAR(150),
_ProviderId VARCHAR(150),
_SiloAddresses TEXT
)
RETURNS INT
LANGUAGE plpgsql
AS $$
DECLARE
_RowCount INT;
BEGIN

-- this is required to prevent both duplication and deadlocks
LOCK TABLE OrleansGrainDirectory IN EXCLUSIVE MODE;

DELETE FROM OrleansGrainDirectory
WHERE
ClusterId = _ClusterId
AND ProviderId = _ProviderId
AND SiloAddress = ANY (string_to_array(_SiloAddresses, '|'));

GET DIAGNOSTICS _RowCount = ROW_COUNT;

RETURN _RowCount;

END;
$$;

INSERT INTO OrleansQuery
(
QueryKey,
QueryText
)
SELECT
'UnregisterGrainActivationsKey',
'SELECT * FROM UnregisterGrainActivations (@ClusterId, @ProviderId, @SiloAddresses)'
;
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using Npgsql;
using Orleans.TestingHost;
using Orleans.Tests.SqlUtils;
using Tester.Directories;
Expand All @@ -14,6 +15,17 @@ public class SqlServerAdoNetGrainDirectoryClusterTests() : AdoNetGrainDirectoryC
{
}

/// <summary>
/// Cluster tests for ADO.NET Grain Directory against PostgreSQL.
/// </summary>
public class PostgreSqlAdoNetGrainDirectoryClusterTests : AdoNetGrainDirectoryClusterTests
{
public PostgreSqlAdoNetGrainDirectoryClusterTests() : base(AdoNetInvariants.InvariantNamePostgreSql)
{
NpgsqlConnection.ClearAllPools();
}
}

/// <summary>
/// Cluster tests base class for ADO.NET Grain Directory.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.Net;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options;
using Npgsql;
using Orleans.Configuration;
using Orleans.GrainDirectory.AdoNet;
using Orleans.Tests.SqlUtils;
Expand All @@ -18,6 +19,17 @@ public class SqlServerAdoNetGrainDirectoryTests() : AdoNetGrainDirectoryTests(Ad
{
}

/// <summary>
/// Tests for <see cref="AdoNetGrainDirectory"/> against PostgreSQL.
/// </summary>
public class PostgreSQLAdoNetGrainDirectoryTests : AdoNetGrainDirectoryTests
{
public PostgreSQLAdoNetGrainDirectoryTests() : base(AdoNetInvariants.InvariantNamePostgreSql, 90)
{
NpgsqlConnection.ClearAllPools();
}
}

/// <summary>
/// Tests for <see cref="AdoNetGrainDirectory"/>.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using Npgsql;
using Orleans.GrainDirectory.AdoNet;
using Orleans.GrainDirectory.AdoNet.Storage;
using UnitTests.General;
Expand All @@ -12,32 +13,32 @@ public class SqlServerRelationalOrleansQueriesTests() : RelationalOrleansQueries
{
}

/*
/// <summary>
/// Tests the relational storage layer via <see cref="RelationalOrleansQueries"/> against MySQL.
/// Tests the relational storage layer via <see cref="RelationalOrleansQueries"/> against PostgreSQL.
/// </summary>
public class MySqlRelationalOrleansQueriesTests : RelationalOrleansQueriesTests
public class PostgreSqlRelationalOrleansQueriesTests : RelationalOrleansQueriesTests
{
public MySqlRelationalOrleansQueriesTests() : base(AdoNetInvariants.InvariantNameMySql, 100)
public PostgreSqlRelationalOrleansQueriesTests() : base(AdoNetInvariants.InvariantNamePostgreSql, 90)
{
MySqlConnection.ClearAllPools();
NpgsqlConnection.ClearAllPools();
}
}
*/

/*
/// <summary>
/// Tests the relational storage layer via <see cref="RelationalOrleansQueries"/> against PostgreSQL.
/// Tests the relational storage layer via <see cref="RelationalOrleansQueries"/> against MySQL.
/// </summary>
public class PostgreSqlRelationalOrleansQueriesTests : RelationalOrleansQueriesTests
public class MySqlRelationalOrleansQueriesTests : RelationalOrleansQueriesTests
{
public PostgreSqlRelationalOrleansQueriesTests() : base(AdoNetInvariants.InvariantNamePostgreSql, 99)
public MySqlRelationalOrleansQueriesTests() : base(AdoNetInvariants.InvariantNameMySql, 100)
{
NpgsqlConnection.ClearAllPools();
MySqlConnection.ClearAllPools();
}
}
*/



/// <summary>
/// Tests the relational storage layer via <see cref="RelationalOrleansQueries"/>.
/// </summary>
Expand Down

0 comments on commit b6b2159

Please sign in to comment.