www.digitalmars.com         C & C++   DMDScript  

digitalmars.D - [vibe.d] WebSocket mess with "cast(shared) ..."

Hi,
the idea is to perform some heavy calculation in thread and send 
results directly through WS connection back to client.

Note, connection close is handled - so ws connection is open 
during worker thread is executing.

Again, maybe I do something wrong here? I guess, there is some 
mess with references to websocket object.

On the first run (client connects to server), there are only one 
or two client, which do not receive all results. On second 
attempt to connect to server, clients __NEVER__ receive any 
results from worker threads (which are created as new on each 
connection).


```d
/*
  * Case 01
  * -------
  * Worker task is executed always in same worker thread.
  *
  * How-To:
  * - open 4 parallel WS connection to service.
  * - in each connection send channel id, eg. "ch01", "ch02", ...
  * - observe logs
  * - worker thread name is always same
  * - worker cannot write to WS?
  */

import vibe.d;
import vibe.vibe;
import vibe.core.core;
import vibe.http.server;
import vibe.http.websockets;
import vibe.http.router;
import vibe.inet.url;

import core.time;
import core.thread : Thread;
import std.conv;

static void workerFuncPingPongWS(Task caller, string channel_id, 
shared WebSocket s) nothrow {
	WebSocket ws = cast(WebSocket) s;
	int counter = 5;
	try {
		logInfo("WORKER :: thread-id=%s caller=%s channel-id=%s 
THREAD=%s", thisTid, caller, channel_id, Thread.getThis().name);
		while (receiveOnly!string() == "ping" && --counter) {
			logInfo("%s :: %s :: pong=%s", Thread.getThis().name, 
channel_id, counter);
			try {
				ws.send("pong-" ~ channel_id ~ "-" ~ Thread.getThis().name);
			} catch (Exception o) {
				logError(">>> exception=%s", o);
			}
			caller.send("pong");
			sleep(2.seconds);
		}
		caller.send("goodbye");
	} catch (Exception e) assert(false, e.msg);
}

class WebsocketService {
	 path("/ws") void getWebsocket1(scope WebSocket ws){
		logInfo("X> connected=%s, ws=%s code=%s THREAD=%s", 
ws.connected, &ws, ws.closeCode, Thread.getThis().name);
		
		auto channel_id = ws.receiveText;
		logInfo("Receive channel '%s'.", channel_id);

		auto callee = runWorkerTaskH(&workerFuncPingPongWS, 
Task.getThis, channel_id, cast(shared) ws);
		do {
			logInfo("ping");
			callee.send("ping");
		} while (receiveOnly!string() == "pong");
		
		while (true) {
			auto txt = ws.receiveText;
			logInfo("Receive '%s'. thisTid=%s", txt, thisTid);
			
			if (txt == "stop") {
				break;
			}
			ws.send(txt ~ " pong");
		}
		logInfo("Client disconnected - worker is done. THREAD=%s", 
Thread.getThis().name);
	}
}

void helloWorld(HTTPServerRequest req, HTTPServerResponse res)
{	
     res.writeBody("Hello");
}

void main()
{
	logInfo("APP::CASE::01");
	auto router = new URLRouter;
	router.registerWebInterface(new WebsocketService());
	router.get("/hello", &helloWorld);

	auto settings = new HTTPServerSettings;
	settings.port = 8080;
	settings.bindAddresses = ["::1", "127.0.0.1"];

	auto listener = listenHTTP(settings, router);
	scope (exit)
	{
		listener.stopListening();
	}

	runApplication();
}
```

server console output
```bash
[main(----) INF] APP::CASE::01
[main(----) INF] Listening for requests on http://[::1]:8080/
[main(----) INF] Listening for requests on http://127.0.0.1:8080/
[main(E7UW) INF] X> connected=true, ws=7F34D6BA1DD8 code=0 
THREAD=main
[main(0Bjv) INF] X> connected=true, ws=7F34D8BA3DD8 code=0 
THREAD=main
[main(0MxP) INF] X> connected=true, ws=7F34D7BA2DD8 code=0 
THREAD=main
[main(bsKy) INF] X> connected=true, ws=7F34D5BA0DD8 code=0 
THREAD=main

[main(E7UW) INF] Receive channel '2-fibonacci'.
[main(0Bjv) INF] Receive channel '0-fibonacci'.
[main(0MxP) INF] Receive channel '3-fibonacci'.
[main(bsKy) INF] Receive channel '1-fibonacci'.

[vibe-15(biLB) INF] WORKER :: thread-id=Tid(7f34db0876e0) 
caller=7F34DB074C00:1 channel-id=2-fibonacci THREAD=vibe-15
[vibe-4(tmZQ) INF] WORKER :: thread-id=Tid(7f34db087840) 
caller=7F34DB074A00:1 channel-id=3-fibonacci THREAD=vibe-4
[vibe-15(AXdw) INF] WORKER :: thread-id=Tid(7f34db087790) 
caller=7F34DB074800:3 channel-id=0-fibonacci THREAD=vibe-15
[vibe-15(DCEi) INF] WORKER :: thread-id=Tid(7f34db0878f0) 
caller=7F34DB074E00:1 channel-id=1-fibonacci THREAD=vibe-15

 exception=object.Exception ../../.dub/packages/vibe-core/2.2.0/vibe-core/source/v
be/core/net.d(819): Error writing data to socket.
---------------- ../../.dub/packages/vibe-core/2.2.0/vibe-core/source/vibe/core/net.d:819 safe ulong vibe.core.net.TCPConnection.write(scope const(ubyte)[], eventcore.driver.IOMode) [0x55db0d143fca] ../../.dub/packages/vibe-core/2.2.0/vibe-core/source/vibe/internal/interfacep oxy.d-mixin-302:302 safe ulong vibe.internal.interfaceproxy.InterfaceProxy!(vibe.core.stream.Stream).InterfaceProxy.ProxyImpl!(vibe.core.net.TCPConnection).ProxyImpl.__mixin8.__mixin3.__mixin3.__mixin3.__mixin3.__mixin3.__mixin3. _mixin2.write(scope void[], scope const(ubyte)[], eventcore.driver.IOMode) [0x55db0cf30395] ../../.dub/packages/vibe-core/2.2.0/vibe-core/source/vibe/internal/interfacep oxy.d-mixin-196:196 safe ulong vibe.internal.interfaceproxy.InterfaceProxy!(vibe.core.stream.OutputStream).InterfaceProxy.__mixin22.__mixin3. _mixin2.write(scope const(ubyte)[], eventcore.driver.IOMode) [0x55db0cfcf442] ../../.dub/packages/vibe-d/0.9.6/vibe-d/stream/vibe/stream/wrapper.d:199 safe ulong vibe.stream.wrapper.ConnectionProxyStream.write(scope const(ubyte)[], eventcore.driver.IOMode) [0x55db0d12ce3b] ../../.dub/packages/vibe-core/2.2.0/vibe-core/source/vibe/core/stream.d:305 safe void vibe.core.stream.OutputStream.write(scope const(ubyte)[]) [0x55db0d14d69e] ../../.dub/packages/vibe-d/0.9.6/vibe-d/http/vibe/http/websockets.d:959 safe void vibe.http.websockets.OutgoingWebSocketMessage.sendFrame(bool) [0x55db0d03a211] ../../.dub/packages/vibe-d/0.9.6/vibe-d/http/vibe/http/websockets.d:942 safe void vibe.http.websockets.OutgoingWebSocketMessage.finalize() [0x55db0d03a044] ../../.dub/packages/vibe-d/0.9.6/vibe-d/http/vibe/http/websockets.d:688 safe void vibe.http.websockets.WebSocket.send(scope void delegate(scope vibe.http.websockets.OutgoingWebSocketMessage) safe, vibe.http.websockets.FrameOpcode).__lambda3() [0x55db0d03b44c] ../../.dub/packages/vibe-core/2.2.0/vibe-core/source/vibe/core/sync.d:189 safe void vibe.core.sync.performLocked!(vibe.http.websockets.WebSocket.send(scope void delegate(scope vibe.http.websockets.OutgoingWebSocketMessage) safe, vibe.http.websockets.FrameOpcode).__lambda3(), vibe.core.sync.InterruptibleTaskMutex).performLocked(vibe.core.sync.Int rruptibleTaskMutex) [0x55db0d03b361] ../../.dub/packages/vibe-d/0.9.6/vibe-d/http/vibe/http/websockets.d:685 safe void vibe.http.websockets.WebSocket.send(scope void delegate(scope vibe.http.websockets.OutgoingWebSocketMessage) safe, vibe.http.websockets.FrameOpcode) [0x55db0d038fc3] ../../.dub/packages/vibe-d/0.9.6/vibe-d/http/vibe/http/websockets.d:657 safe void vibe.http.websockets.WebSocket.send(scope const(char)[]) [0x55db0d038ef3] source/app_forum_01_task_thread.d:33 nothrow void app_forum_01_task_thread.workerFuncPingPongWS(vibe.core.task.Task, immutable(char)[], shared(vibe.http.websockets.WebSocket)) [0x55db0cf1b209] ../../.dub/packages/vibe-core/2.2.0/vibe-core/source/vibe/core/taskpool.d:211 nothrow void vibe.core.taskpool.TaskPool.doRunTaskH!(void function(vibe.core.task.Task, immutable(char)[], shared(vibe.http.websockets.WebSocket)) nothrow*, vibe.core.task.Task, immutable(char)[], shared(vibe.http.websockets.WebSocket)).doRunTaskH(vibe.cor .task.TaskSettings, void function(vibe.core.task.Task, immutable(char)[], shared(vibe.http.websockets.WebSocket)) nothrow*, ref vibe.core.task.Task, ref immutable(char)[], ref shared(vibe.http.websockets.WebSocket)).taskFun(vibe.core.channel.Channel!( ibe.core.task.Task, 100uL).Channel, void function(vibe.core.task.Task, immutable(char)[], shared(vibe.http.websockets.WebSocket)) nothrow*, vibe.core.task.Task, immutable(char)[], shared(vibe.http.websockets.WebSocket)) [0x55db0cee96d3] ../../.dub/packages/vibe-core/2.2.0/vibe-core/source/vibe/core/task.d:737 nothrow void vibe.core.task.TaskFuncInfo.set!(void function(vibe.core.channel.Channel!(vibe.core.task.Task, 100uL).Channel, void function(vibe.core.task.Task, immutable(char)[], shared(vibe.http.websockets.WebSocket)) nothrow*, vibe.core.task.Task, immutable(char)[], shared(vibe.http.websockets.WebSocket)) nothrow*, vibe.core.channel.Channel!(vibe.core.task.Task, 100uL).Channel, void function(vibe.core.task.Task, immutable(char)[], shared(vibe.http.websockets.WebSocket)) nothrow*, vibe.core.task.Task, immutable(char)[], shared(vibe.http.websockets.WebSocket)).set(ref void function(vibe.core.channel.Channel!(vibe.core.task.Task, 100uL).Channel, void function(vibe.core.task.Task, immutable(char)[], shared(vibe.http.websockets.WebSocket)) nothrow*, vibe.core.task.Task, immutable(char)[], shared(vibe.http.websockets.WebSocket)) nothrow*, ref vibe.core.channel.Channel!(vibe.core.task.Task, 100uL).Channel, ref void function(vibe.core.task.Task, immutable(char)[], shared(vibe.http.websockets.WebSocket)) nothrow*, ref vibe.core.task.Task, ref immutable(char)[], ref shared(vibe.http.websockets.WebSocket)).callDelegate(ref vibe.core.task.TaskFuncInfo) [0x55db0ceec2f6] ../../.dub/packages/vibe-core/2.2.0/vibe-core/source/vibe/core/task.d:758 void vibe.core.task.TaskFuncInfo.call() [0x55db0d15edc5] ../../.dub/packages/vibe-core/2.2.0/vibe-core/source/vibe/core/task.d:457 nothrow void vibe.core.task.TaskFiber.run() [0x55db0d15dfbe] ??:? void core.thread.context.Callable.opCall() [0x55db0d1e3de8] ??:? fiber_entryPoint [0x55db0d23fe1b] ... ``` client console output. As you can see channel `0-fibonacci` did not received results from worker thread! ```bash ... 3-fibonacci results: ['pong-3-fibonacci-vibe-4', 'pong-3-fibonacci-vibe-4', 'pong-3-fibonacci-vibe-4', 'pong-3-fibonacci-vibe-4', 'fibonacci pong', '0 pong', '1 pong', '2 pong', '3 pong', '4 pong'] 2-fibonacci results: ['pong-2-fibonacci-vibe-15', 'pong-2-fibonacci-vibe-15', 'pong-2-fibonacci-vibe-15', 'pong-2-fibonacci-vibe-15', 'fibonacci pong', '0 pong', '1 pong', '2 pong', '3 pong', '4 pong'] 0-fibonacci results: ['fibonacci pong', '0 pong', '1 pong', '2 pong', '3 pong', '4 pong'] 1-fibonacci results: ['pong-1-fibonacci-vibe-15', 'pong-1-fibonacci-vibe-15', 'pong-1-fibonacci-vibe-15', 'pong-1-fibonacci-vibe-15', 'fibonacci pong', '0 pong', '1 pong', '2 pong', '3 pong', '4 pong'] ``` client console output on 2nd attempt (server worker thread has only errors on sending data from the thread). Here none of clients received any results from worker threads. ```bash 1-fibonacci results: ['fibonacci pong', '0 pong', '1 pong', '2 pong', '3 pong', '4 pong'] 3-fibonacci results: ['fibonacci pong', '0 pong', '1 pong', '2 pong', '3 pong', '4 pong'] 2-fibonacci results: ['fibonacci pong', '0 pong', '1 pong', '2 pong', '3 pong', '4 pong'] 0-fibonacci results: ['fibonacci pong', '0 pong', '1 pong', '2 pong', '3 pong', '4 pong'] ``` BTW, see also my other question related to `runWorkerTaskH`. Many Thanks!
Aug 09 2023