HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux ip-172-31-4-197 6.8.0-1036-aws #38~22.04.1-Ubuntu SMP Fri Aug 22 15:44:33 UTC 2025 x86_64
User: ubuntu (1000)
PHP: 7.4.33
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
Upload Files
File: /var/www/web.enelar.com.co/node_modules/lmdb/src/writer.cpp
/* write instructions

0-3 flags
4-7 dbi
8-11 key-size
12 ... key followed by at least 2 32-bit zeros
4 value-size
8 bytes: value pointer (or value itself)
8 compressor pointer?
8 bytes (optional): conditional version
8 bytes (optional): version
inline value?
*/
#include "lmdb-js.h"
#include <atomic>
#include <ctime>
#ifndef _WIN32
#include <unistd.h>
#endif
#ifdef _WIN32
#define ntohl _byteswap_ulong
#define htonl _byteswap_ulong
#else
#include <arpa/inet.h>
#endif

// flags:
const uint32_t NO_INSTRUCTION_YET = 0;
const int PUT = 15;
const int DEL = 13;
const int DEL_VALUE = 14;
const int START_CONDITION_BLOCK = 4;
//const int START_CONDITION_VALUE_BLOCK = 6;
const int START_BLOCK = 1;
const int BLOCK_END = 2;
const int POINTER_NEXT = 3;
const int USER_CALLBACK = 8;
const int USER_CALLBACK_STRICT_ORDER = 0x100000;
const int DROP_DB = 12;
const int HAS_KEY = 4;
const int HAS_VALUE = 2;
const int CONDITIONAL = 8;
const int CONDITIONAL_VERSION = 0x100;
const int CONDITIONAL_VERSION_LESS_THAN = 0x800;
const int CONDITIONAL_ALLOW_NOTFOUND = 0x1000;
const int ASSIGN_TIMESTAMP = 0x2000;
const int SET_VERSION = 0x200;
//const int HAS_INLINE_VALUE = 0x400;
const int COMPRESSIBLE = 0x100000;
const int DELETE_DATABASE = 0x400;
const int TXN_HAD_ERROR = 0x40000000;
const int TXN_DELIMITER = 0x8000000;
const int TXN_COMMITTED = 0x10000000;
//const int TXN_FLUSHED = 0x20000000;
const int WAITING_OPERATION = 0x2000000;
const int IF_NO_EXISTS = MDB_NOOVERWRITE; //0x10;
// result codes:
const int FAILED_CONDITION = 0x4000000;
const int FINISHED_OPERATION = 0x1000000;
const double ANY_VERSION = 3.542694326329068e-103; // special marker for any version

WriteWorker::~WriteWorker() {
	// TODO: Make sure this runs on the JS main thread, or we need to move it
	if (envForTxn->writeWorker == this)
		envForTxn->writeWorker = nullptr;
}

WriteWorker::WriteWorker(MDB_env* env, EnvWrap* envForTxn, uint32_t* instructions)
		: envForTxn(envForTxn),
		instructions(instructions),
		env(env) {
	//fprintf(stdout, "nextCompressibleArg %p\n", nextCompressibleArg);
		interruptionStatus = 0;
		resultCode = 0;
		txn = nullptr;
	}

void WriteWorker::SendUpdate() {
	if (WriteWorker::threadSafeCallsEnabled)
		napi_call_threadsafe_function(progress, nullptr, napi_tsfn_blocking);
}
MDB_txn* WriteWorker::AcquireTxn(int* flags) {
	bool commitSynchronously = *flags & TXN_SYNCHRONOUS_COMMIT;
	
	// TODO: if the conditionDepth is 0, we could allow the current worker's txn to be continued, committed and restarted
	pthread_mutex_lock(envForTxn->writingLock);
	retry:
	if (commitSynchronously && interruptionStatus == WORKER_WAITING) {
		interruptionStatus = INTERRUPT_BATCH;
		pthread_cond_signal(envForTxn->writingCond);
		pthread_cond_wait(envForTxn->writingCond, envForTxn->writingLock);
		if (interruptionStatus == RESTART_WORKER_TXN) {
			*flags |= TXN_FROM_WORKER;
			return nullptr;
		} else if (interruptionStatus == WORKER_WAITING || interruptionStatus == INTERRUPT_BATCH) {
		    interruptionStatus = WORKER_WAITING;
		    goto retry;
		} else {
			return nullptr;
		}
	} else {
		//if (interruptionStatus == RESTART_WORKER_TXN)
		//	pthread_cond_wait(envForTxn->writingCond, envForTxn->writingLock);
		interruptionStatus = USER_HAS_LOCK;
		*flags |= TXN_FROM_WORKER;
		//if (txn)
			//fprintf(stderr, "acquire lock from worker %p %u\n", txn, commitSynchronously);
		return txn;
	}
}

void WriteWorker::UnlockTxn() {
	interruptionStatus = 0;
	pthread_cond_signal(envForTxn->writingCond);
	pthread_mutex_unlock(envForTxn->writingLock);
}
int WriteWorker::WaitForCallbacks(MDB_txn** txn, bool allowCommit, uint32_t* target) {
	int rc;
	if (!finishedProgress)
		SendUpdate();
	pthread_cond_signal(envForTxn->writingCond);
	interruptionStatus = WORKER_WAITING;
	uint64_t start;
	unsigned int envFlags;
	mdb_env_get_flags(env, &envFlags);
#ifdef MDB_TRACK_METRICS
	if (envFlags & MDB_TRACK_METRICS)
		start = get_time64();
#endif
	if (target) {
		uint64_t delay = 1;
		do {
			cond_timedwait(envForTxn->writingCond, envForTxn->writingLock, delay);
			delay = delay << 1ll;
			if ((*target & 0xf) || (allowCommit && finishedProgress)) {
				// we are in position to continue writing or commit, so forward progress can be made without interrupting yet
#ifdef MDB_TRACK_METRICS
				if (envFlags & MDB_TRACK_METRICS)
					envForTxn->timeTxnWaiting += get_time64() - start;
#endif
				interruptionStatus = 0;
				return 0;
			}
		} while(interruptionStatus != INTERRUPT_BATCH);
	} else {
		pthread_cond_wait(envForTxn->writingCond, envForTxn->writingLock);
    }
#ifdef MDB_TRACK_METRICS
	if (envFlags & MDB_TRACK_METRICS)
		envForTxn->timeTxnWaiting += get_time64() - start;
#endif
	if (interruptionStatus == INTERRUPT_BATCH) { // interrupted by JS code that wants to run a synchronous transaction
		interruptionStatus = RESTART_WORKER_TXN;
		rc = mdb_txn_commit(*txn);
#ifdef MDB_EMPTY_TXN
		if (rc == MDB_EMPTY_TXN)
			rc = 0;
#endif
		if (rc == 0) {
			// wait again until the sync transaction is completed
			this->txn = *txn = nullptr;
			pthread_cond_signal(envForTxn->writingCond);
			pthread_cond_wait(envForTxn->writingCond, envForTxn->writingLock);
			// now restart our transaction
			rc = mdb_txn_begin(env, nullptr, 0, txn);
			this->txn = *txn;
			//fprintf(stderr, "Restarted txn after interruption\n");
			interruptionStatus = 0;
		}
		if (rc != 0) {
			fprintf(stdout, "wfc unlock due to error %u\n", rc);
			return rc;
		}
	} else
		interruptionStatus = 0;
	return 0;
}
int WriteWorker::DoWrites(MDB_txn* txn, EnvWrap* envForTxn, uint32_t* instruction, WriteWorker* worker) {
	MDB_val key, value;
	int rc = 0;
	int conditionDepth = 0;
	int validatedDepth = 0;
	double conditionalVersion, setVersion = 0;
	bool overlappedWord = !!worker;
	uint32_t* start;
    do {
next_inst:	start = instruction++;
		uint32_t flags = *start;
		MDB_dbi dbi = 0;
		//fprintf(stderr, "do %u %u\n", flags, get_time64());
		bool validated = conditionDepth == validatedDepth;
		if (flags & 0xc0c0) {
			fprintf(stderr, "Unknown flag bits %u %p\n", flags, start);
			fprintf(stderr, "flags after message %u\n", *start);
			worker->resultCode = 22;
			abort();
		}
		if (flags & HAS_KEY) {
			// a key based instruction, get the key
			dbi = (MDB_dbi) *instruction++;
			key.mv_size = *instruction++;
			key.mv_data = instruction;
			instruction = (uint32_t*) (((size_t) instruction + key.mv_size + 16) & (~7));
			if (flags & HAS_VALUE) {
				if (flags & COMPRESSIBLE) {
					int64_t status = -1;
					status = std::atomic_exchange((std::atomic<int64_t>*)(instruction + 2), (int64_t)1);
					if (status == 2) {
						//fprintf(stderr, "wait on compression %p\n", instruction);
						worker->interruptionStatus = WORKER_WAITING;
						do {
							pthread_cond_wait(envForTxn->writingCond, envForTxn->writingLock);
						} while (std::atomic_load((std::atomic<int64_t>*)(instruction + 2)));
						worker->interruptionStatus = 0;
					} else if (status > 2) {
						//fprintf(stderr, "doing the compression ourselves\n");
						((Compression*) (size_t) *((double*)&status))->compressInstruction(nullptr, (double*) (instruction + 2));
					} // else status is 0 and compression is done
					// compressed
					value.mv_data = (void*)(size_t) * ((size_t*)instruction);
					if ((size_t)value.mv_data > 0x1000000000000)
						fprintf(stderr, "compression not completed %p %i\n", value.mv_data, (int) status);
					value.mv_size = *(instruction - 1);
					instruction += 4; // skip compression pointers
				} else {
					value.mv_data = (void*)(size_t) * ((double*)instruction);
					value.mv_size = *(instruction - 1);
					instruction += 2;
				}
			}
			if (flags & CONDITIONAL_VERSION) {
				conditionalVersion = *((double*) instruction);
				instruction += 2;
				MDB_val conditionalValue;
				rc = mdb_get(txn, dbi, &key, &conditionalValue);
				if (rc) {
				    // not found counts as version 0, so this is acceptable for conditional less than,
				    // otherwise does not validate
					if (rc == MDB_NOTFOUND)
						validated = flags & CONDITIONAL_ALLOW_NOTFOUND;
					else
						worker->resultCode = rc;
				} else if (conditionalVersion != ANY_VERSION) {
					double version;
					memcpy(&version, conditionalValue.mv_data, 8);
					validated = validated && ((flags & CONDITIONAL_VERSION_LESS_THAN) ? version <= conditionalVersion : (version == conditionalVersion));
				}
			}
			if (flags & SET_VERSION) {
				setVersion = *((double*) instruction);
				instruction += 2;
			}
			if ((flags & IF_NO_EXISTS) && (flags & START_CONDITION_BLOCK)) {
				rc = mdb_get(txn, dbi, &key, &value);
				if (!rc)
					validated = false;
				else if (rc == MDB_NOTFOUND)
					validated = true;
				else
					worker->resultCode = rc;
			}
		} else
			instruction++;
		//fprintf(stderr, "instr flags %p %p %u\n", start, flags, conditionDepth);
		if (validated || !(flags & CONDITIONAL)) {
			switch (flags & 0xf) {
			case NO_INSTRUCTION_YET:
				instruction -= 2; // reset back to the previous flag as the current instruction
				rc = 0;
				// in windows InterlockedCompareExchange might be faster
				if (!worker->finishedProgress || conditionDepth) {
					if (std::atomic_compare_exchange_strong((std::atomic<uint32_t>*) start,
							(uint32_t*) &flags,
							(uint32_t)WAITING_OPERATION)) {
						worker->WaitForCallbacks(&txn, conditionDepth == 0, start);
					}
					goto next_inst;
				} else {
					if (std::atomic_compare_exchange_strong((std::atomic<uint32_t>*) start,
							(uint32_t*) &flags,
							(uint32_t)TXN_DELIMITER)) {
						worker->instructions = start;
						return 0;
					} else
						goto next_inst;
				}
			case BLOCK_END:
				conditionDepth--;
				if (validatedDepth > conditionDepth)
					validatedDepth--;
				if (conditionDepth < 0) {
					fprintf(stderr, "Negative condition depth");
				}
				goto next_inst;
			case PUT:
#ifdef MDB_OVERLAPPINGSYNC
				if (flags & ASSIGN_TIMESTAMP) {
					if ((*(uint64_t*)key.mv_data & 0xfffffffful) == REPLACE_WITH_TIMESTAMP) {
						ExtendedEnv* extended_env = (ExtendedEnv*) mdb_env_get_userctx(envForTxn->env);
						*(uint64_t*)key.mv_data = ((*(uint64_t*)key.mv_data >> 32) & 0x1) ?
							extended_env->getLastTime() : extended_env->getNextTime();
					}
					uint64_t first_word = *(uint64_t*)value.mv_data;
					// 0 assign new time
					// 1 assign last assigned time
					// 3 assign last recorded previous time
					// 4 record previous time
					if ((first_word & 0xffffff) == SPECIAL_WRITE) {
						if (first_word & REPLACE_WITH_TIMESTAMP_FLAG) {
							ExtendedEnv* extended_env = (ExtendedEnv*) mdb_env_get_userctx(envForTxn->env);
							uint32_t next_32 = first_word >> 32;
							if (next_32 & 4) {
								// preserve last timestamp
								MDB_val last_data;
								rc = mdb_get(txn, dbi, &key, &last_data);
								if (rc) break;
								if (flags & SET_VERSION) last_data.mv_data = (char *) last_data.mv_data + 8;
								extended_env->previousTime = *(uint64_t *) last_data.mv_data;
								//fprintf(stderr, "previous time %llx \n", previous_time);
							}
							uint64_t timestamp = (next_32 & 1) ? (next_32 & 2) ? extended_env->previousTime : extended_env->getLastTime()
															   : extended_env->getNextTime();
							if (first_word & DIRECT_WRITE) {
								// write to second word, which is used by the direct write
								*((uint64_t *) value.mv_data + 1) = timestamp ^ (next_32 >> 8);
								first_word = first_word & 0xffffffff; // clear out the offset so it is just zero (always must be at the beginning)
							} else
								*(uint64_t *) value.mv_data = timestamp ^ (next_32 >> 8);
							//fprintf(stderr, "set time %llx \n", timestamp);
						}
						if (first_word & DIRECT_WRITE) {
							// direct in-place write
							unsigned int offset = first_word >> 32;
							if (flags & SET_VERSION)
								offset += 8;
							MDB_val bytes_to_write;
							bytes_to_write.mv_data = (char*)value.mv_data + 8;
							bytes_to_write.mv_size = value.mv_size - 8;
#ifdef MDB_RPAGE_CACHE
							rc = mdb_direct_write(txn, dbi, &key, offset, &bytes_to_write);
							if (!rc) break; // success
#endif
							// if no success, this means we probably weren't able to write to a single
							// word safely, so we need to do a real put
							MDB_val last_data;
							rc = mdb_get(txn, dbi, &key, &last_data);
							if (rc) break; // failed to get
							bytes_to_write.mv_size = last_data.mv_size;
							// attempt a put, using reserve (so we can efficiently copy data in)
							rc = mdb_put(txn, dbi, &key, &bytes_to_write, (flags & (MDB_NOOVERWRITE | MDB_NODUPDATA | MDB_APPEND | MDB_APPENDDUP)) | MDB_RESERVE);
							if (!rc) {
								// copy the existing data
								memcpy(bytes_to_write.mv_data, last_data.mv_data, last_data.mv_size);
								// copy the changes
								memcpy((char*)bytes_to_write.mv_data + offset, (char*)value.mv_data + 8, value.mv_size - 8);
							}
							break; // done
						}
					}
				}
#endif
				if (flags & SET_VERSION)
					rc = putWithVersion(txn, dbi, &key, &value, flags & (MDB_NOOVERWRITE | MDB_NODUPDATA | MDB_APPEND | MDB_APPENDDUP), setVersion);
				else
					rc = mdb_put(txn, dbi, &key, &value, flags & (MDB_NOOVERWRITE | MDB_NODUPDATA | MDB_APPEND | MDB_APPENDDUP));
				if (flags & COMPRESSIBLE)
					delete value.mv_data;
				break;
			case DEL:
				rc = mdb_del(txn, dbi, &key, nullptr);
				break;
			case DEL_VALUE:
				rc = mdb_del(txn, dbi, &key, &value);
				if (flags & COMPRESSIBLE)
					delete value.mv_data;
				break;
			case START_BLOCK: case START_CONDITION_BLOCK:
				rc = validated ? 0 : MDB_NOTFOUND;
				if (validated)
					validatedDepth++;
				conditionDepth++;
				break;
			case USER_CALLBACK:
				worker->finishedProgress = false;
				worker->progressStatus = 2;
				rc = 0;
				if (flags & USER_CALLBACK_STRICT_ORDER) {
					std::atomic_fetch_or((std::atomic<uint32_t>*) start, (uint32_t) FINISHED_OPERATION); // mark it as finished so it is processed
					while (!worker->finishedProgress) {
						worker->WaitForCallbacks(&txn, conditionDepth == 0, nullptr);
					}
				}
				break;
			case DROP_DB:
				rc = mdb_drop(txn, dbi, (flags & DELETE_DATABASE) ? 1 : 0);
				break;
			case POINTER_NEXT:
				instruction = (uint32_t*)(size_t) * ((double*)instruction);
				goto next_inst;
			default:
				fprintf(stderr, "Unknown flags %u %p\n", flags, start);
				fprintf(stderr, "flags after message %u\n", *start);
				worker->resultCode = 22;
				abort();
			}
			if (rc) {
				if (!(rc == MDB_KEYEXIST || rc == MDB_NOTFOUND)) {
					if (worker) {
						worker->resultCode = rc;
					} else {
						return rc;
					}
				}
				flags = FINISHED_OPERATION | FAILED_CONDITION;
			}
			else
				flags = FINISHED_OPERATION;
		} else
			flags = FINISHED_OPERATION | FAILED_CONDITION;
		//fprintf(stderr, "finished flag %p\n", flags);
		if (overlappedWord) {
			std::atomic_fetch_or((std::atomic<uint32_t>*) start, flags);
			overlappedWord = false;
		} else
			*start |= flags;
	} while(worker); // keep iterating in async/multiple-instruction mode, just one instruction in sync mode
	return rc;
}

bool WriteWorker::threadSafeCallsEnabled = false;
void txn_callback(const void* data, int finished) {
	auto worker = (WriteWorker*) data;
	if (finished) {
		// we don't want to release our lock until *after* the txn lock is released to give other threads a better chance
		// at executing next
		if (worker->txn) {
			if (!(*worker->instructions & TXN_DELIMITER))
				fprintf(stderr, "in txn_callback not valid %p\n", *worker->instructions);
			worker->txn = nullptr;
			worker->interruptionStatus = 0;
			pthread_cond_signal(worker->envForTxn->writingCond); // in case there a sync txn waiting for us
			pthread_mutex_unlock(worker->envForTxn->writingLock);
		}
	} else // transaction is visible (to readers), but not unlocked
		worker->SendUpdate();
}


void do_write(napi_env env, void* data) {
	auto worker = (WriteWorker*) data;
	worker->Write();
	napi_release_threadsafe_function(worker->progress, napi_tsfn_abort);
}

const int READER_CHECK_INTERVAL = 600; // ten minutes
void WriteWorker::Write() {
	int rc;
	finishedProgress = true;
	unsigned int envFlags;
	mdb_env_get_flags(env, &envFlags);
	time_t now = time(0);
	if (now - envForTxn->lastReaderCheck > READER_CHECK_INTERVAL) {
		int dead;
		mdb_reader_check(env, &dead);
		envForTxn->lastReaderCheck = now;
	}
	pthread_mutex_lock(envForTxn->writingLock);
	if (!env) return;// already closed
	#ifndef _WIN32
	int retries = 0;
	retry:
	#endif
	rc = mdb_txn_begin(env, nullptr,
	#ifdef MDB_OVERLAPPINGSYNC
		(envForTxn->jsFlags & MDB_OVERLAPPINGSYNC) ? MDB_NOSYNC :
	#endif
		0, &txn);
	#if !defined(_WIN32) && defined(MDB_RPAGE_CACHE)
	if (rc == MDB_LOCK_FAILURE) {
		if (retries++ < 4) {
			sleep(1);
			goto retry;
		}
	}
	#endif
	if (rc != 0) {
		resultCode = rc;
		return;
	}
	uint32_t* start = instructions;
	rc = DoWrites(txn, envForTxn, instructions, this);
	uint32_t txnId = (uint32_t) mdb_txn_id(txn);
	if (!(*instructions & TXN_DELIMITER))
		fprintf(stderr, "after writes %p %p NOT still valid %p\n", start, instructions, *instructions);
	progressStatus = 1;
	#ifdef MDB_OVERLAPPINGSYNC
	if (envForTxn->jsFlags & MDB_OVERLAPPINGSYNC) {
		mdb_txn_set_callback(txn, txn_callback, this);
	}
	#endif
	bool had_changes = false;
	if (rc || resultCode) {
		fprintf(stderr, "do_write error %u %u\n", rc, resultCode);
		mdb_txn_abort(txn);
	} else {
		rc = mdb_txn_commit(txn);
#ifdef MDB_EMPTY_TXN
		if (rc == MDB_EMPTY_TXN)
			rc = 0;
		else {
			had_changes = true;
		}
#else
		had_changes = true;
#endif
	}
	if (!(*instructions & TXN_DELIMITER))
		fprintf(stderr, "end write %p, next start %p NOT still valid %p\n", start, instructions, *instructions);
	txn_callback(this, 1);
	if (rc || resultCode) {
		std::atomic_fetch_or((std::atomic<uint32_t>*) instructions, (uint32_t) TXN_HAD_ERROR);
		if (rc)
			resultCode = rc ? rc : resultCode;
		return;
	}
	*(instructions - 1) = txnId;
	std::atomic_fetch_or((std::atomic<uint32_t>*) instructions, (uint32_t) TXN_COMMITTED);
	if (had_changes) {
		ExtendedEnv* extended_env = (ExtendedEnv*) mdb_env_get_userctx(env);
		if (extended_env) extended_env->notifyUserCallbacks(std::string("__committed__"));
	}
}

void write_progress(napi_env env,
					napi_value js_callback,
					void* context,
					void* data) {
	if (!js_callback)
		return;
	auto worker = (WriteWorker*) context;
	napi_value result;
	napi_value undefined;
	napi_value arg;
	napi_create_int32(env, worker->progressStatus, &arg);
	napi_get_undefined(env, &undefined);
	if (worker->progressStatus == 1) {
		napi_call_function(env, undefined, js_callback, 1, &arg, &result);
		return;
	}
	if (worker->finishedProgress)
		return;
	auto envForTxn = worker->envForTxn;
	pthread_mutex_lock(envForTxn->writingLock);
	while(!worker->txn) // possible to jump in after an interrupted txn here
		pthread_cond_wait(envForTxn->writingCond, envForTxn->writingLock);
	envForTxn->writeTxn = new TxnTracked(worker->txn, 0);
	worker->finishedProgress = true;
	napi_create_int32(env, worker->progressStatus, &arg);
	napi_call_function(env, undefined, js_callback, 1, &arg, &result);
	bool is_async = false;
	napi_get_value_bool(env, result, &is_async);
	if (!is_async) {
		delete envForTxn->writeTxn;
		envForTxn->writeTxn = nullptr;
		pthread_cond_signal(envForTxn->writingCond);
		pthread_mutex_unlock(envForTxn->writingLock);
	}
}

void writes_complete(napi_env env,
					 napi_status status,
					 void* data) {
	auto worker = (WriteWorker*) data;
	worker->finishedProgress = true;
	napi_value result, arg; // we use direct napi call here because node-addon-api interface with throw a fatal error if a worker thread is terminating, and bun doesn't support escapable scopes yet
	napi_create_int32(env, worker->resultCode, &arg);
	napi_value callback;
	napi_get_reference_value(env, worker->callback, &callback);
	napi_call_function(env, callback, callback, 1, &arg, &result);
	napi_delete_reference(env, worker->callback);
	napi_delete_async_work(env, worker->work);
	delete worker;
}

Value EnvWrap::resumeWriting(const Napi::CallbackInfo& info) {
	// if we had async txns, now we resume
	delete writeTxn;
	writeTxn = nullptr;
	pthread_cond_signal(writingCond);
	pthread_mutex_unlock(writingLock);
	return info.Env().Undefined();
}

Value EnvWrap::startWriting(const Napi::CallbackInfo& info) {
	napi_env n_env = info.Env();
	if (!this->env) {
		return throwError(info.Env(), "The environment is already closed.");
	}
	hasWrites = true;
	size_t instructionAddress = info[0].As<Number>().Int64Value();
	napi_value resource;
	napi_status status;
	status = napi_create_object(n_env, &resource);
	napi_value resource_name;
	status = napi_create_string_latin1(n_env, "write", NAPI_AUTO_LENGTH, &resource_name);
	auto worker = new WriteWorker(this->env, this, (uint32_t*) instructionAddress);
	this->writeWorker = worker;
	napi_create_reference(n_env, info[1].As<Function>(), 1, &worker->callback);
	status = napi_create_async_work(n_env, resource, resource_name, do_write, writes_complete, worker, &worker->work);
	if (status != napi_ok) abort();
	napi_create_threadsafe_function(n_env, info[1].As<Function>(), resource, resource_name, 0, 1, nullptr, nullptr, worker, write_progress, &worker->progress);
	status = napi_queue_async_work(n_env, worker->work);
	if (status != napi_ok) abort();

	return info.Env().Undefined();
}

NAPI_FUNCTION(EnvWrap::write) {
	ARGS(2)
	GET_INT64_ARG(0);
	EnvWrap* ew = (EnvWrap*) i64;
	if (!ew->env) {
		napi_throw_error(env, nullptr, "The environment is already closed.");
		RETURN_UNDEFINED;
	}
	ew->hasWrites = true;
	
	napi_get_value_int64(env, args[1], &i64);
	uint32_t* instructionAddress = (uint32_t*) i64;
	int rc = 0;
	if (instructionAddress)
		rc = WriteWorker::DoWrites(ew->writeTxn->txn, ew, instructionAddress, nullptr);
	else if (ew->writeWorker) {
		pthread_cond_signal(ew->writingCond);
	}
	if (rc && !(rc == MDB_KEYEXIST || rc == MDB_NOTFOUND)) {
		throwLmdbError(env, rc);
		RETURN_UNDEFINED;
	}
	RETURN_UNDEFINED;
}