Skip to content

Commit

Permalink
Expose pgapp connected workers and fix dialyzer errors
Browse files Browse the repository at this point in the history
  • Loading branch information
rymir committed Oct 25, 2015
1 parent 4e89f39 commit f09b4ca
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 18 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ ebin/
*~
deps/
*.config
*.plt
2 changes: 1 addition & 1 deletion rebar.config
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{deps,
[
{epgsql, ".*", {git, "git://github.com/epgsql/epgsql.git", {tag, "3.1.0"}}},
{epgsql, ".*", {git, "git://github.com/epgsql/epgsql.git", "792a93c"}},
{poolboy, ".*", {git, "git://github.com/devinus/poolboy.git", {tag, "1.4.2"}}}
]}.
21 changes: 17 additions & 4 deletions src/pgapp.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,24 @@
-module(pgapp).

%% API
-export([connect/1, connect/2,
equery/2, equery/3, equery/4,
squery/1, squery/2, squery/3,
with_transaction/1, with_transaction/2, with_transaction/3]).
-export([connect/1, connect/2]).
-export([equery/2, equery/3, equery/4]).
-export([squery/1, squery/2, squery/3]).
-export([with_transaction/1, with_transaction/2, with_transaction/3]).
-export([connected_workers/1]).

%%%===================================================================
%%% API
%%%===================================================================

-spec connect(Settings :: list()) -> {ok, WorkerPid} when
WorkerPid :: pid().
connect(Settings) ->
connect(epgsql_pool, Settings).

-spec connect(PoolName :: atom(),
Settings :: list()) -> {ok, WorkerPid} when
WorkerPid :: pid().
connect(PoolName, Settings) ->
PoolSize = proplists:get_value(size, Settings, 5),
MaxOverflow = proplists:get_value(max_overflow, Settings, 5),
Expand Down Expand Up @@ -89,6 +95,13 @@ with_transaction(PoolName, Fun) when is_function(Fun, 0) ->
with_transaction(PoolName, Fun, Timeout) when is_function(Fun, 0) ->
pgapp_worker:with_transaction(PoolName, Fun, Timeout).

-spec connected_workers(PoolName :: atom()) -> {ok, ConnectedWorkerPids} when
ConnectedWorkerPids :: list(pid()).
connected_workers(PoolName) ->
WorkerPids = gen_server:call(PoolName, get_avail_workers),
{ok, [WorkerPid || WorkerPid <- WorkerPids,
pgapp_worker:is_connected(WorkerPid)]}.

%%--------------------------------------------------------------------
%% @doc
%% @spec
Expand Down
38 changes: 25 additions & 13 deletions src/pgapp_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
-behaviour(gen_server).
-behaviour(poolboy_worker).

-export([squery/1, squery/2, squery/3,
equery/2, equery/3, equery/4,
with_transaction/2, with_transaction/3]).
-export([squery/1, squery/2, squery/3]).
-export([equery/2, equery/3, equery/4]).
-export([with_transaction/2, with_transaction/3]).
-export([is_connected/1]).

-export([start_link/1]).

Expand All @@ -29,34 +30,37 @@
-define(STATE_VAR, '$pgapp_state').

squery(Sql) ->
squery(Sql, ?TIMEOUT).

squery(PoolName, Sql) when is_atom(PoolName) ->
squery(PoolName, Sql, ?TIMEOUT);
squery(Sql, Timeout) ->
case get(?STATE_VAR) of
undefined ->
squery(epgsql_pool, Sql);
squery(epgsql_pool, Sql, Timeout);
Conn ->
epgsql:squery(Conn, Sql)
end.

squery(PoolName, Sql) ->
squery(PoolName, Sql, ?TIMEOUT).

squery(PoolName, Sql, Timeout) ->
poolboy:transaction(PoolName,
fun (Worker) ->
gen_server:call(Worker, {squery, Sql}, Timeout)
end, Timeout).


equery(Sql, Params) ->
equery(Sql, Params, ?TIMEOUT).

equery(PoolName, Sql, Params) when is_atom(PoolName) ->
equery(PoolName, Sql, Params, ?TIMEOUT);
equery(Sql, Params, Timeout) ->
case get(?STATE_VAR) of
undefined ->
equery(epgsql_pool, Sql, Params);
equery(epgsql_pool, Sql, Params, Timeout);
Conn ->
epgsql:equery(Conn, Sql, Params)
end.

equery(PoolName, Sql, Params) ->
equery(PoolName, Sql, Params, ?TIMEOUT).

equery(PoolName, Sql, Params, Timeout) ->
poolboy:transaction(PoolName,
fun (Worker) ->
Expand All @@ -74,6 +78,9 @@ with_transaction(PoolName, Fun, Timeout) ->
{transaction, Fun}, Timeout)
end, Timeout).

is_connected(WorkerPid) ->
gen_server:call(WorkerPid, {is_connected}, ?TIMEOUT).

start_link(Args) ->
gen_server:start_link(?MODULE, Args, []).

Expand All @@ -93,7 +100,12 @@ handle_call({transaction, Fun}, _From,
put(?STATE_VAR, Conn),
Result = epgsql:with_transaction(Conn, fun(_) -> Fun() end),
erase(?STATE_VAR),
{reply, Result, State}.
{reply, Result, State};

handle_call({is_connected}, _From, #state{conn = undefined} = State) ->
{reply, false, State};
handle_call({is_connected}, _From, #state{conn = _Conn} = State) ->
{reply, true, State}.

handle_cast(reconnect, State) ->
{noreply, connect(State)}.
Expand Down

0 comments on commit f09b4ca

Please sign in to comment.