Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

experimenting with suspension of requestCycle via servlet 3 async #214

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.function.Supplier;

import org.apache.wicket.Application;
import org.apache.wicket.request.cycle.RequestCycle;
import org.apache.wicket.settings.ExceptionSettings.ThreadDumpStrategy;
import org.apache.wicket.util.LazyInitializer;
import org.apache.wicket.util.lang.Threads;
Expand Down Expand Up @@ -93,8 +94,8 @@ public Duration getTimeout(int pageId)
*/
public void lockPage(int pageId) throws CouldNotLockPageException
{
final Thread thread = Thread.currentThread();
final PageLock lock = new PageLock(pageId, thread);
final RequestCycle cycle = RequestCycle.get();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a good idea.
Two threads should not work on the same page instance at the same time.
This will lead to concurrency related issues.
E.g. two Ajax requests will have their own Request Cycles but might work on the same page instance this way. Without AsyncContext being needed at all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not true!

As long as one RequestCycle holds the lock on a page, no other RequestCycle can get a lock. It doesn't make any difference whether we lock on the thread or the RequestCycle.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How a RequestCycle holds a lock on a page ?
Where is this lock in the code ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PageLock is the lock: as long as there is a PageLock instance, all access to the corresponding page has to wait.

Currently PageLock holds a reference to the thread, but the thread isn't used actually apart from identifying the lock. The RequestCycle can be used equally well for that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I see!
But why do you need to change this code at all ?
PageLock#getCycle() is not used anywhere in the PR.
The thread is very useful to identify problems with long running requests at the moment. The thread stack trace is telling me much more than a long (the request cycle start time).
I'd prefer to add the RequestCycle as yet another field to PageLock than removing the thread. But I don't really see the need to change PageAccessSynchronizer at all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, we could keep the thread reference. But when the request is suspended, a certain time no thread is keeping the lock (the page is still locked though) and from a certain point in time another thread would pick up the lock.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that timeframe the page should be unlocked. On resume the new thread should acquire a new lock or wait if it is not available at the moment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No I don't think so: The request processing hasn't finished yet (e.g. rendering the page hasn't been started yet), so the lock on the page cannot be released.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any updates here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing from me - I didn't have any use for this yet, so I'm not sure how useful this is for components and how it should work exactly.

final PageLock lock = new PageLock(pageId, cycle);
final Time start = Time.now();

boolean locked = false;
Expand All @@ -110,12 +111,12 @@ public void lockPage(int pageId) throws CouldNotLockPageException
if (isDebugEnabled)
{
logger.debug("'{}' attempting to acquire lock to page with id '{}'",
thread.getName(), pageId);
cycle.getStartTime(), pageId);
}

previous = locks.get().putIfAbsent(pageId, lock);

if (previous == null || previous.thread == thread)
if (previous == null || previous.cycle == cycle)
{
// first thread to acquire lock or lock is already owned by this thread
locked = true;
Expand All @@ -134,7 +135,7 @@ public void lockPage(int pageId) throws CouldNotLockPageException
{
if (isDebugEnabled)
{
logger.debug("{} acquired lock to page {}", thread.getName(), pageId);
logger.debug("{} acquired lock to page {}", cycle.getStartTime(), pageId);
}
}
else
Expand All @@ -144,8 +145,8 @@ public void lockPage(int pageId) throws CouldNotLockPageException
logger.warn(
"Thread '{}' failed to acquire lock to page with id '{}', attempted for {} out of allowed {}." +
" The thread that holds the lock has name '{}'.",
thread.getName(), pageId, start.elapsedSince(), timeout,
previous.thread.getName());
cycle.getStartTime(), pageId, start.elapsedSince(), timeout,
previous.cycle.getStartTime());
if (Application.exists())
{
ThreadDumpStrategy strategy = Application.get()
Expand All @@ -157,15 +158,15 @@ public void lockPage(int pageId) throws CouldNotLockPageException
Threads.dumpAllThreads(logger);
break;
case THREAD_HOLDING_LOCK :
Threads.dumpSingleThread(logger, previous.thread);
// Threads.dumpSingleThread(logger, previous.thread);
break;
case NO_THREADS :
default :
// do nothing
}
}
}
throw new CouldNotLockPageException(pageId, thread.getName(), timeout);
throw new CouldNotLockPageException(pageId, "" + cycle.getStartTime(), timeout);
}
}

Expand All @@ -190,7 +191,7 @@ public void unlockPage(int pageId)

private void internalUnlockPages(final Integer pageId)
{
final Thread thread = Thread.currentThread();
final RequestCycle cycle = RequestCycle.get();
final Iterator<PageLock> locks = this.locks.get().values().iterator();

final boolean isDebugEnabled = logger.isDebugEnabled();
Expand All @@ -200,12 +201,12 @@ private void internalUnlockPages(final Integer pageId)
// remove all locks held by this thread if 'pageId' is not specified
// otherwise just the lock for this 'pageId'
final PageLock lock = locks.next();
if ((pageId == null || pageId == lock.pageId) && lock.thread == thread)
if ((pageId == null || pageId == lock.pageId) && lock.cycle == cycle)
{
locks.remove();
if (isDebugEnabled)
{
logger.debug("'{}' released lock to page with id '{}'", thread.getName(),
logger.debug("'{}' released lock to page with id '{}'", cycle.getStartTime(),
lock.pageId);
}
// if any locks were removed notify threads waiting for a lock
Expand Down Expand Up @@ -288,21 +289,21 @@ public static class PageLock
/** page id */
private final int pageId;

/** thread that owns the lock */
private final Thread thread;
/** cycle that owns the lock */
private final RequestCycle cycle;

private volatile boolean released = false;

/**
* Constructor
*
* @param pageId
* @param thread
* @param cycle
*/
public PageLock(int pageId, Thread thread)
public PageLock(int pageId, RequestCycle cycle)
{
this.pageId = pageId;
this.thread = thread;
this.cycle = cycle;
}

/**
Expand All @@ -316,9 +317,9 @@ public int getPageId()
/**
* @return thread that owns the lock
*/
public Thread getThread()
public RequestCycle getCycle()
{
return thread;
return cycle;
}

final synchronized void waitForRelease(long remaining, boolean isDebugEnabled)
Expand All @@ -331,15 +332,15 @@ final synchronized void waitForRelease(long remaining, boolean isDebugEnabled)
{
logger.debug(
"lock for page with id {} no longer locked by {}, falling through", pageId,
thread.getName());
cycle.getStartTime());
}
return;
}

if (isDebugEnabled)
{
logger.debug("{} waiting for lock to page {} for {}",
thread.getName(), pageId, Duration.milliseconds(remaining));
cycle.getStartTime(), pageId, Duration.milliseconds(remaining));
}
try
{
Expand All @@ -355,7 +356,7 @@ final synchronized void markReleased(boolean isDebugEnabled)
{
if (isDebugEnabled)
{
logger.debug("'{}' notifying blocked threads", thread.getName());
logger.debug("'{}' notifying blocked threads", cycle.getStartTime());
}
released = true;
notifyAll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ private Url getContextRelativeUrl(String uri, String filterPrefix)
}
StringBuilder url = new StringBuilder();
uri = Strings.stripJSessionId(uri);
String contextPath = httpServletRequest.getContextPath();
String contextPath = getContextPath();

if (LOG.isDebugEnabled())
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@

import java.util.Optional;

import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.http.HttpServletRequest;

import org.apache.wicket.Application;
import org.apache.wicket.MetaDataEntry;
import org.apache.wicket.MetaDataKey;
Expand Down Expand Up @@ -45,6 +50,7 @@
import org.apache.wicket.request.component.IRequestablePage;
import org.apache.wicket.request.handler.resource.ResourceReferenceRequestHandler;
import org.apache.wicket.request.handler.resource.ResourceRequestHandler;
import org.apache.wicket.request.http.WebResponse;
import org.apache.wicket.request.mapper.parameter.PageParameters;
import org.apache.wicket.request.resource.IResource;
import org.apache.wicket.request.resource.ResourceReference;
Expand Down Expand Up @@ -123,6 +129,8 @@ private static void set(RequestCycle requestCycle)

private Response activeResponse;

private SuspensionImpl suspension;

/**
* Construct.
*
Expand Down Expand Up @@ -222,7 +230,9 @@ public boolean processRequestAndDetach()
}
finally
{
detach();
if (suspension == null) {
detach();
}
}
return result;
}
Expand Down Expand Up @@ -283,7 +293,12 @@ private void execute(IRequestHandler handler)
IRequestHandler next = requestHandlerExecutor.execute(handler);
listeners.onRequestHandlerExecuted(this, handler);

handler = next;
if (suspension == null) {
handler = next;
} else {
handler = null;
suspension.handler = next;
}
}
catch (RuntimeException e)
{
Expand All @@ -304,6 +319,22 @@ private void execute(IRequestHandler handler)
}
}

/**
* Suspend the request cycle to be resumed on another thread.
*
* @param timeout {@literal 0} for no timeout
* @return suspension to be resumed
*/
public Suspension suspend(long timeout) {
HttpServletRequest httpServletRequest = (HttpServletRequest)request.getContainerRequest();

AsyncContext context = httpServletRequest.startAsync();
context.setTimeout(timeout);
suspension = new SuspensionImpl(context);

return suspension;
}

/**
* Execute a requestHandler for the given exception.
*
Expand Down Expand Up @@ -925,4 +956,122 @@ protected void detach(IRequestHandler handler)

}

public interface Suspension {
void resume();
}

class SuspensionImpl implements Suspension, AsyncListener {

AsyncContext asyncContext;

Application application;

ClassLoader classLoader;

IRequestHandler handler;

SuspensionImpl(AsyncContext asyncContext)
{
this.asyncContext = asyncContext;
this.asyncContext.addListener(this);

application = Application.get();
classLoader = Thread.currentThread().getContextClassLoader();
}

/**
* Resume the suspension.
*/
public synchronized void resume()
{
if (suspension == null) {
throw new WicketRuntimeException("not longer suspended");
}
suspension = null;

final ThreadContext previousThreadContext = ThreadContext.detach();
ThreadContext.setApplication(application);

final ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader();
if (previousClassLoader != classLoader)
{
Thread.currentThread().setContextClassLoader(classLoader);
}

try
{
set(RequestCycle.this);

execute(handler);

((WebResponse)getResponse()).flush();

detach();
}
finally
{
set(null);

ThreadContext.restore(previousThreadContext);

if (classLoader != previousClassLoader)
{
Thread.currentThread().setContextClassLoader(previousClassLoader);
}

asyncContext.complete();
}
}

@Override
public void onComplete(AsyncEvent event)
{
}

@Override
public synchronized void onTimeout(AsyncEvent event)
{
suspension = null;

final ThreadContext previousThreadContext = ThreadContext.detach();
ThreadContext.setApplication(application);

final ClassLoader previousClassLoader = Thread.currentThread().getContextClassLoader();
if (previousClassLoader != classLoader)
{
Thread.currentThread().setContextClassLoader(classLoader);
}

try
{
set(RequestCycle.this);

detach();
}
finally
{
set(null);

ThreadContext.restore(previousThreadContext);

if (classLoader != previousClassLoader)
{
Thread.currentThread().setContextClassLoader(previousClassLoader);
}

asyncContext.complete();
}
}

@Override
public void onError(AsyncEvent event)
{
}


@Override
public void onStartAsync(AsyncEvent event)
{
}
}
}
Loading