Skip to content

Commit

Permalink
Functions for HSETs and function to unsubscribe from pub/sub (#23)
Browse files Browse the repository at this point in the history
* redis unsubscribe implementation

* added unsubscription commit

* Some functions for working with Redis hashes.

* fixed include

* fixed wrong declaration

* fixed declaration (2)

* declaration fixed (3)

* Methods for incrementing hash sets

* fixed Redis_HIncrByFloat incr type. Now it's correct float type.

* Redis_GetHInt function

Co-authored-by: Ymer <[email protected]>
  • Loading branch information
ymde and Ymer authored Mar 17, 2020
1 parent 2c5aca3 commit 147e50a
Show file tree
Hide file tree
Showing 6 changed files with 275 additions and 0 deletions.
10 changes: 10 additions & 0 deletions redis.inc
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,15 @@ native Redis_GetInt(Redis:client, const key[], &value);
native Redis_SetFloat(Redis:client, const key[], Float:value);
native Redis_GetFloat(Redis:client, const key[], &Float:value);

native Redis_SetHString(Redis:client, const key[], const field[], value[]);
native Redis_SetHInt(Redis:client, const key[], const field[], value);
native Redis_GetHString(Redis:client, const key[], const field[], value[], len = sizeof(value));
native Redis_GetHInt(Redis::client, const key[], const field[]);
native Redis_HExists(Redis:client, const key[], const field[]);
native Redis_HDel(Redis:client, const key[], const field[]);
native Redis_HIncrBy(Redis:client, const key[], const field[], incr);
native Redis_HIncrByFloat(Redis:client, const key[], const field[], Float:incr);

native Redis_Subscribe(const host[], port, const auth[], const channel[], const callback[], &PubSub:client);
native Redis_Unsubscribe(PubSub:client);
native Redis_Publish(Redis:client, const channel[], const data[]);
144 changes: 144 additions & 0 deletions src/impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,132 @@ int Impl::GetFloat(int client_id, std::string key, float& value)
return 0;
}

int Impl::SetHString(int client_id, std::string key, std::string field, std::string value)
{
cpp_redis::client* client;
int err = clientFromID(client_id, client);
if (err) {
return 1;
}

auto req = client->hset(key, field, value);
client->sync_commit();
auto r = req.get();

if (r.is_error()) {
logprintf("ERROR: %s", r.error().c_str());
return 1;
}

return 0;
}

int Impl::GetHString(int client_id, std::string key, std::string field, std::string& value)
{
cpp_redis::client* client;
int err = clientFromID(client_id, client);
if (err) {
return 1;
}

auto req = client->hget(key, field);
client->sync_commit();
auto r = req.get();

if (r.is_error()) {
logprintf("ERROR: %s", r.error().c_str());
return 1;
} else if (r.get_type() == cpp_redis::reply::type::null) {
return 2;
} else if (r.get_type() != cpp_redis::reply::type::bulk_string) {
return 3;
} else {
value = r.as_string();
}

return 0;
}

int Impl::HDel(int client_id, std::string key, std::string field)
{
cpp_redis::client* client;
int err = clientFromID(client_id, client);
if (err) {
return 1;
}

auto req = client->hdel(key, std::vector<std::string>{ field });
client->sync_commit();
auto r = req.get();

if (r.is_error()) {
logprintf("ERROR: %s", r.error().c_str());
return 2;
}

return 0;
}

int Impl::HExists(int client_id, std::string key, std::string field)
{
cpp_redis::client* client;
int err = clientFromID(client_id, client);
if (err) {
return 1;
}

auto req = client->hexists(key, field);
client->sync_commit();
auto r = req.get();

if (r.is_error()) {
logprintf("ERROR: %s", r.error().c_str());
return 0;
}

return static_cast<int>(r.as_integer());
}

int Impl::HIncrBy(int client_id, std::string key, std::string field, int incr)
{
cpp_redis::client* client;
int err = clientFromID(client_id, client);
if (err) {
return 1;
}

auto req = client->hincrby(key, field, incr);
client->sync_commit();
auto r = req.get();

if (r.is_error()) {
logprintf("ERROR: %s", r.error().c_str());
return 1;
}

return 0;
}

int Impl::HIncrByFloat(int client_id, std::string key, std::string field, float incr)
{
cpp_redis::client* client;
int err = clientFromID(client_id, client);
if (err) {
return 1;
}

auto req = client->hincrbyfloat(key, field, incr);
client->sync_commit();
auto r = req.get();

if (r.is_error()) {
logprintf("ERROR: %s", r.error().c_str());
return 1;
}

return 0;
}

int Impl::Subscribe(std::string host, int port, std::string auth, std::string channel, std::string callback, int& id)
{
cpp_redis::subscriber* sub = new cpp_redis::subscriber();
Expand All @@ -298,6 +424,7 @@ int Impl::Subscribe(std::string host, int port, std::string auth, std::string ch

clientData cd;
cd.subscriber = sub;
cd.channel = channel;
cd.host = host;
cd.port = port;
cd.auth = auth;
Expand All @@ -309,6 +436,23 @@ int Impl::Subscribe(std::string host, int port, std::string auth, std::string ch
return 0;
}

int Impl::Unsubscribe(int client_id)
{
clientData cd;

int err = clientDataFromID(client_id, cd);
if (err) {
return 1;
}

cd.subscriber->unsubscribe(cd.channel);
cd.subscriber->commit();

clients.erase(client_id);

return 0;
}

int Impl::Publish(int client_id, std::string channel, std::string data)
{
cpp_redis::client* client;
Expand Down
9 changes: 9 additions & 0 deletions src/impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ struct clientData {
std::string host;
int port;
std::string auth;
std::string channel;
bool isPubSub;
cpp_redis::subscriber* subscriber;
};
Expand Down Expand Up @@ -79,7 +80,15 @@ int GetInt(int client_id, std::string key, int& value);
int SetFloat(int client_id, std::string key, float value);
int GetFloat(int client_id, std::string key, float& value);

int SetHString(int client_id, std::string key, std::string field, std::string value);
int GetHString(int client_id, std::string key, std::string field, std::string& value);
int HExists(int client_id, std::string key, std::string field);
int HIncrBy(int client_id, std::string key, std::string field, int incr);
int HIncrByFloat(int client_id, std::string key, std::string field, float incr);
int HDel(int client_id, std::string key, std::string field);

int Subscribe(std::string host, int port, std::string auth, std::string channel, std::string callback, int& id);
int Unsubscribe(int client_id);
int Publish(int client_id, std::string channel, std::string message);

int clientFromID(int client_id, cpp_redis::client*& client);
Expand Down
10 changes: 10 additions & 0 deletions src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,17 @@ extern "C" const AMX_NATIVE_INFO native_list[] = {
{ "Redis_SetFloat", Natives::SetFloat },
{ "Redis_GetFloat", Natives::GetFloat },

{ "Redis_SetHString", Natives::SetHString },
{ "Redis_SetHInt", Natives::SetHInt },
{ "Redis_GetHString", Natives::GetHString },
{ "Redis_HExists", Natives::HExists },
{ "Redis_HDel", Natives::HDel },
{ "Redis_HIncrBy", Natives::HIncrBy },
{ "Redis_HIncrByFloat", Natives::HIncrByFloat },
{ "Redis_GetHInt", Natives::GetHInt },

{ "Redis_Subscribe", Natives::Subscribe },
{ "Redis_Unsubscribe", Natives::Unsubscribe },
{ "Redis_Publish", Natives::Publish },

{ NULL, NULL }
Expand Down
92 changes: 92 additions & 0 deletions src/natives.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,93 @@ cell Natives::GetFloat(AMX* amx, cell* params)
return ret;
}

cell Natives::SetHString(AMX* amx, cell* params)
{
int context_id = params[1];
string key = amx_GetCppString(amx, params[2]);
string field = amx_GetCppString(amx, params[3]);
string value = amx_GetCppString(amx, params[4]);

return Impl::SetHString(context_id, key, field, value);
}

cell Natives::SetHInt(AMX* amx, cell* params)
{
int context_id = params[1];
string key = amx_GetCppString(amx, params[2]);
string field = amx_GetCppString(amx, params[3]);
int value = params[4];

return Impl::SetHString(context_id, key, field, std::to_string(value));
}

cell Natives::GetHString(AMX* amx, cell* params)
{
int context_id = params[1];
string key = amx_GetCppString(amx, params[2]);
string field = amx_GetCppString(amx, params[3]);
string value;

int ret;
ret = Impl::GetHString(context_id, key, field, value);

amx_SetCppString(amx, params[4], value, params[5]);

return ret;
}

cell Natives::GetHInt(AMX* amx, cell* params)
{
int context_id = params[1];
string key = amx_GetCppString(amx, params[2]);
string field = amx_GetCppString(amx, params[3]);
string value;

int ret;
ret = Impl::GetHString(context_id, key, field, value);

return std::atoi(value.c_str());
}

cell Natives::HExists(AMX* amx, cell* params)
{
int context_id = params[1];
string key = amx_GetCppString(amx, params[2]);
string field = amx_GetCppString(amx, params[3]);

return Impl::HExists(context_id, key, field);
}

cell Natives::HDel(AMX* amx, cell* params)
{
int context_id = params[1];
string key = amx_GetCppString(amx, params[2]);
string field = amx_GetCppString(amx, params[3]);

return Impl::HDel(context_id, key, field);
}

cell Natives::HIncrBy(AMX* amx, cell* params)
{
int context_id = params[1];
string key = amx_GetCppString(amx, params[2]);
string field = amx_GetCppString(amx, params[3]);
int incr = params[4];

return Impl::HIncrBy(context_id, key, field, incr);
}

cell Natives::HIncrByFloat(AMX* amx, cell* params)
{
int context_id = params[1];
string key = amx_GetCppString(amx, params[2]);
string field = amx_GetCppString(amx, params[3]);
float incr = *(float*)&params[4];

return Impl::HIncrByFloat(context_id, key, field, incr);
}


cell Natives::Subscribe(AMX* amx, cell* params)
{
string host = amx_GetCppString(amx, params[1]);
Expand All @@ -166,6 +253,11 @@ cell Natives::Subscribe(AMX* amx, cell* params)
return Impl::Subscribe(host, port, auth, channel, callback, *addr);
}

cell Natives::Unsubscribe(AMX* amx, cell* params)
{
return Impl::Unsubscribe(params[1]);
}

cell Natives::Publish(AMX* amx, cell* params)
{
int pubsub_id = params[1];
Expand Down
10 changes: 10 additions & 0 deletions src/natives.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,17 @@ cell GetInt(AMX* amx, cell* params);
cell SetFloat(AMX* amx, cell* params);
cell GetFloat(AMX* amx, cell* params);

cell SetHString(AMX* amx, cell* params);
cell SetHInt(AMX* amx, cell* params);
cell GetHString(AMX* amx, cell* params);
cell GetHInt(AMX* amx, cell* params);
cell HExists(AMX* amx, cell* params);
cell HIncrBy(AMX* amx, cell* params);
cell HIncrByFloat(AMX* amx, cell* params);
cell HDel(AMX* amx, cell* params);

cell Subscribe(AMX* amx, cell* params);
cell Unsubscribe(AMX* amx, cell* params);
cell Publish(AMX* amx, cell* params);
};

Expand Down

0 comments on commit 147e50a

Please sign in to comment.